diff --git a/Postbox/Coding.swift b/Postbox/Coding.swift index 1a2b69c812..97285394f7 100644 --- a/Postbox/Coding.swift +++ b/Postbox/Coding.swift @@ -705,6 +705,49 @@ public final class Decoder { } } + public func decodeObjectArrayWithDecoderForKey(key: StaticString) -> [T] { + if Decoder.positionOnKey(UnsafePointer(self.buffer.memory), offset: &self.offset, maxOffset: self.buffer.length, length: self.buffer.length, key: key, valueType: .ObjectArray) { + var length: Int32 = 0 + memcpy(&length, self.buffer.memory + self.offset, 4) + self.offset += 4 + + var array: [T] = [] + array.reserveCapacity(Int(length)) + + var failed = false + var i: Int32 = 0 + while i < length { + var typeHash: Int32 = 0 + memcpy(&typeHash, self.buffer.memory + self.offset, 4) + self.offset += 4 + + var objectLength: Int32 = 0 + memcpy(&objectLength, self.buffer.memory + self.offset, 4) + + let innerDecoder = Decoder(buffer: ReadBuffer(memory: self.buffer.memory + (self.offset + 4), length: Int(objectLength), freeWhenDone: false)) + self.offset += 4 + Int(objectLength) + + if !failed { + if let object = T(decoder: innerDecoder) as? T { + array.append(object) + } else { + failed = true + } + } + + i += 1 + } + + if failed { + return [] + } else { + return array + } + } else { + return [] + } + } + public func decodeObjectArrayForKey(key: StaticString) -> [T] { if Decoder.positionOnKey(UnsafePointer(self.buffer.memory), offset: &self.offset, maxOffset: self.buffer.length, length: self.buffer.length, key: key, valueType: .ObjectArray) { var length: Int32 = 0 diff --git a/Postbox/MessageHistoryIndexTable.swift b/Postbox/MessageHistoryIndexTable.swift index 9b4f704072..7c6a2b6a41 100644 --- a/Postbox/MessageHistoryIndexTable.swift +++ b/Postbox/MessageHistoryIndexTable.swift @@ -642,6 +642,18 @@ final class MessageHistoryIndexTable: Table { return true }, limit: 0) + self.valueBox.range(self.tableId, start: self.key(MessageId(peerId: peerId, namespace: namespace, id: maxId)), end: self.upperBound(peerId, namespace: namespace), values: { key, value in + var flags: Int8 = 0 + value.read(&flags, offset: 0, length: 1) + if (flags & HistoryEntryTypeMask) == HistoryEntryTypeHole { + value.reset() + if case let .Hole(hole) = readHistoryIndexEntry(peerId, namespace: namespace, key: key, value: value) where hole.min <= maxId && hole.maxIndex.id.id >= maxId { + holes = true + } + } + return false + }, limit: 1) + return (count, holes) } diff --git a/Postbox/MessageHistoryReadStateTable.swift b/Postbox/MessageHistoryReadStateTable.swift index f0fb8772fd..648bd13d75 100644 --- a/Postbox/MessageHistoryReadStateTable.swift +++ b/Postbox/MessageHistoryReadStateTable.swift @@ -40,14 +40,17 @@ final class MessageHistoryReadStateTable: Table { var stateByNamespace: [MessageId.Namespace: PeerReadState] = [:] for _ in 0 ..< count { var namespaceId: Int32 = 0 - var maxReadId: Int32 = 0 + var maxIncomingReadId: Int32 = 0 + var maxOutgoingReadId: Int32 = 0 var maxKnownId: Int32 = 0 var count: Int32 = 0 value.read(&namespaceId, offset: 0, length: 4) - value.read(&maxReadId, offset: 0, length: 4) + value.read(&maxIncomingReadId, offset: 0, length: 4) + value.read(&maxOutgoingReadId, offset: 0, length: 4) value.read(&maxKnownId, offset: 0, length: 4) value.read(&count, offset: 0, length: 4) - let state = PeerReadState(maxReadId: maxReadId, maxKnownId: maxKnownId, count: count) + + let state = PeerReadState(maxIncomingReadId: maxIncomingReadId, maxOutgoingReadId: maxOutgoingReadId, maxKnownId: maxKnownId, count: count) stateByNamespace[namespaceId] = state } let states = InternalPeerReadStates(namespaces: stateByNamespace) @@ -96,7 +99,7 @@ final class MessageHistoryReadStateTable: Table { } } - func addIncomingMessages(peerId: PeerId, ids: [MessageId]) -> (CombinedPeerReadState?, Bool) { + func addIncomingMessages(peerId: PeerId, ids: Set) -> (CombinedPeerReadState?, Bool) { var idsByNamespace: [MessageId.Namespace: [MessageId.Id]] = [:] for id in ids { if idsByNamespace[id.namespace] != nil { @@ -118,14 +121,14 @@ final class MessageHistoryReadStateTable: Table { var addedUnreadCount: Int32 = 0 var maxIncomingId: Int32 = 0 for id in ids { - if id > currentState.maxKnownId { + if id > currentState.maxKnownId && id > currentState.maxIncomingReadId { addedUnreadCount += 1 maxIncomingId = max(id, maxIncomingId) } } if addedUnreadCount != 0 { - states.namespaces[namespace] = PeerReadState(maxReadId: currentState.maxReadId, maxKnownId: currentState.maxKnownId, count: currentState.count + addedUnreadCount) + states.namespaces[namespace] = PeerReadState(maxIncomingReadId: currentState.maxIncomingReadId, maxOutgoingReadId: currentState.maxOutgoingReadId, maxKnownId: currentState.maxKnownId, count: currentState.count + addedUnreadCount) updated = true if traceReadStates { @@ -171,7 +174,7 @@ final class MessageHistoryReadStateTable: Table { if let currentState = states.namespaces[namespace] { var unreadIds: [MessageId.Id] = [] for id in ids { - if id > currentState.maxReadId { + if id > currentState.maxIncomingReadId { unreadIds.append(id) } } @@ -181,7 +184,7 @@ final class MessageHistoryReadStateTable: Table { invalidate = true } - states.namespaces[namespace] = PeerReadState(maxReadId: currentState.maxReadId, maxKnownId: currentState.maxKnownId, count: currentState.count - knownCount) + states.namespaces[namespace] = PeerReadState(maxIncomingReadId: currentState.maxIncomingReadId, maxOutgoingReadId: currentState.maxOutgoingReadId, maxKnownId: currentState.maxKnownId, count: currentState.count - knownCount) updated = true } else { invalidate = true @@ -200,14 +203,14 @@ final class MessageHistoryReadStateTable: Table { return (nil, false) } - func applyMaxReadId(messageId: MessageId, incomingStatsInRange: (MessageId.Id, MessageId.Id) -> (count: Int, holes: Bool), topMessageId: MessageId.Id?) -> (CombinedPeerReadState?, Bool) { + func applyIncomingMaxReadId(messageId: MessageId, incomingStatsInRange: (MessageId.Id, MessageId.Id) -> (count: Int, holes: Bool), topMessageId: MessageId.Id?) -> (CombinedPeerReadState?, Bool) { if let states = self.get(messageId.peerId), state = states.namespaces[messageId.namespace] { if traceReadStates { print("[ReadStateTable] applyMaxReadId peerId: \(messageId.peerId), maxReadId: \(messageId.id) (before: \(states.namespaces))") } - if state.maxReadId < messageId.id || messageId.id == topMessageId { - var (deltaCount, holes) = incomingStatsInRange(state.maxReadId + 1, messageId.id) + if state.maxIncomingReadId < messageId.id || messageId.id == topMessageId { + var (deltaCount, holes) = incomingStatsInRange(state.maxIncomingReadId + 1, messageId.id) if traceReadStates { print("[ReadStateTable] applyMaxReadId after deltaCount: \(deltaCount), holes: \(holes)") @@ -220,7 +223,7 @@ final class MessageHistoryReadStateTable: Table { } } - states.namespaces[messageId.namespace] = PeerReadState(maxReadId: messageId.id, maxKnownId: state.maxKnownId, count: state.count - Int32(deltaCount)) + states.namespaces[messageId.namespace] = PeerReadState(maxIncomingReadId: messageId.id, maxOutgoingReadId: state.maxOutgoingReadId, maxKnownId: state.maxKnownId, count: state.count - Int32(deltaCount)) self.updatedPeerIds.insert(messageId.peerId) return (CombinedPeerReadState(states: states.namespaces.map({$0})), holes) } @@ -231,8 +234,22 @@ final class MessageHistoryReadStateTable: Table { return (nil, false) } + func applyOutgoingMaxReadId(messageId: MessageId) -> (CombinedPeerReadState?, Bool) { + if let states = self.get(messageId.peerId), state = states.namespaces[messageId.namespace] { + if state.maxOutgoingReadId < messageId.id { + states.namespaces[messageId.namespace] = PeerReadState(maxIncomingReadId: state.maxIncomingReadId, maxOutgoingReadId: state.maxOutgoingReadId, maxKnownId: state.maxKnownId, count: state.count) + self.updatedPeerIds.insert(messageId.peerId) + return (CombinedPeerReadState(states: states.namespaces.map({$0})), false) + } + } else { + return (nil, true) + } + + return (nil, false) + } + func applyInteractiveMaxReadId(messageId: MessageId, incomingStatsInRange: (MessageId.Id, MessageId.Id) -> (count: Int, holes: Bool), topMessageId: MessageId.Id?) -> (combinedState: CombinedPeerReadState?, ApplyInteractiveMaxReadIdResult) { - let (combinedState, holes) = self.applyMaxReadId(messageId, incomingStatsInRange: incomingStatsInRange, topMessageId: topMessageId) + let (combinedState, holes) = self.applyIncomingMaxReadId(messageId, incomingStatsInRange: incomingStatsInRange, topMessageId: topMessageId) if let combinedState = combinedState { return (combinedState, .Push(thenSync: holes)) @@ -250,11 +267,13 @@ final class MessageHistoryReadStateTable: Table { sharedBuffer.write(&count, offset: 0, length: 4) for (namespace, state) in states.namespaces { var namespaceId: Int32 = namespace - var maxReadId: Int32 = state.maxReadId + var maxIncomingReadId: Int32 = state.maxIncomingReadId + var maxOutgoingReadId: Int32 = state.maxOutgoingReadId var maxKnownId: Int32 = state.maxKnownId var count: Int32 = state.count sharedBuffer.write(&namespaceId, offset: 0, length: 4) - sharedBuffer.write(&maxReadId, offset: 0, length: 4) + sharedBuffer.write(&maxIncomingReadId, offset: 0, length: 4) + sharedBuffer.write(&maxOutgoingReadId, offset: 0, length: 4) sharedBuffer.write(&maxKnownId, offset: 0, length: 4) sharedBuffer.write(&count, offset: 0, length: 4) } diff --git a/Postbox/MessageHistoryTable.swift b/Postbox/MessageHistoryTable.swift index d83f01ef6c..38fdf2891c 100644 --- a/Postbox/MessageHistoryTable.swift +++ b/Postbox/MessageHistoryTable.swift @@ -118,7 +118,7 @@ final class MessageHistoryTable: Table { var outputOperations: [MessageHistoryOperation] = [] var accumulatedRemoveIndices: [MessageIndex] = [] - var addedIncomingMessageIds: [MessageId] = [] + var addedIncomingMessageIds = Set() var removedMessageIds: [MessageId] = [] for operation in operations { switch operation { @@ -167,9 +167,10 @@ final class MessageHistoryTable: Table { } } if message.flags.contains(.Incoming) { - addedIncomingMessageIds.append(message.id) + addedIncomingMessageIds.insert(message.id) } case let .Remove(index): + addedIncomingMessageIds.remove(index.id) self.justRemove(index, unsentMessageOperations: &unsentMessageOperations) accumulatedRemoveIndices.append(index) case let .Update(index, storeMessage): @@ -298,7 +299,7 @@ final class MessageHistoryTable: Table { topMessageId = index.id.id } - let (combinedState, invalidated) = self.readStateTable.applyMaxReadId(messageId, incomingStatsInRange: { fromId, toId in + let (combinedState, invalidated) = self.readStateTable.applyIncomingMaxReadId(messageId, incomingStatsInRange: { fromId, toId in return self.messageHistoryIndexTable.incomingMessageCountInRange(messageId.peerId, namespace: messageId.namespace, minId: fromId, maxId: toId) }, topMessageId: topMessageId) @@ -315,6 +316,22 @@ final class MessageHistoryTable: Table { } } + func applyOutgoingReadMaxId(messageId: MessageId, inout operationsByPeerId: [PeerId: [MessageHistoryOperation]], inout updatedPeerReadStateOperations: [PeerId: PeerReadStateSynchronizationOperation?]) { + let (combinedState, invalidated) = self.readStateTable.applyOutgoingMaxReadId(messageId) + + if let combinedState = combinedState { + if operationsByPeerId[messageId.peerId] == nil { + operationsByPeerId[messageId.peerId] = [.UpdateReadState(combinedState)] + } else { + operationsByPeerId[messageId.peerId]!.append(.UpdateReadState(combinedState)) + } + } + + if invalidated { + self.synchronizeReadStateTable.set(messageId.peerId, operation: .Validate, operations: &updatedPeerReadStateOperations) + } + } + func applyInteractiveMaxReadId(messageId: MessageId, inout operationsByPeerId: [PeerId: [MessageHistoryOperation]], inout updatedPeerReadStateOperations: [PeerId: PeerReadStateSynchronizationOperation?]) { var topMessageId: MessageId.Id? if let topEntry = self.messageHistoryIndexTable.top(messageId.peerId, namespace: messageId.namespace), case let .Message(index) = topEntry { @@ -574,6 +591,18 @@ final class MessageHistoryTable: Table { } } + private func updateMedia(from from: [Media], to: [Media]) { + if from.count != 0 { + assertionFailure() + } + + for media in from { + if let id = media.id { + + } + } + } + private func justUpdate(index: MessageIndex, message: InternalStoreMessage, sharedKey: ValueBoxKey, sharedBuffer: WriteBuffer, sharedEncoder: Encoder, inout unsentMessageOperations: [IntermediateMessageHistoryUnsentOperation]) -> IntermediateMessage? { if let previousMessage = self.getMessage(index) { self.valueBox.remove(self.tableId, key: self.key(index)) @@ -596,11 +625,10 @@ final class MessageHistoryTable: Table { assertionFailure() } - let previousEmbeddedMediaData = previousMessage.embeddedMediaData - var previousMedia: [Media] = [] - if previousEmbeddedMediaData.length > 4 { + if previousMessage.embeddedMediaData.length > 4 { var embeddedMediaCount: Int32 = 0 + let previousEmbeddedMediaData = previousMessage.embeddedMediaData previousEmbeddedMediaData.read(&embeddedMediaCount, offset: 0, length: 4) for _ in 0 ..< embeddedMediaCount { var mediaLength: Int32 = 0 @@ -612,65 +640,16 @@ final class MessageHistoryTable: Table { } } + var previousReferencedMedia: [Media] = [] for mediaId in previousMessage.referencedMedia { if let media = self.messageMediaTable.get(mediaId) { previousMedia.append(media) } } - let updatedMedia = message.media - var removedMediaIds: [MediaId] = [] - var updatedEmbeddedMedia: [Media] = [] - var updatedReferencedMediaIds: [MediaId] = [] - - for media in updatedMedia { - assertionFailure() - if let mediaId = media.id { - var found = false - for previous in previousMedia { - if let previousId = previous.id { - if previousId == mediaId { - found = true - break - } - } - } - if !found { - let result = self.messageMediaTable.set(media, index: MessageIndex(message), messageHistoryTable: self) - switch result { - case let .Embed(embedded): - updatedEmbeddedMedia.append(embedded) - case .Reference: - updatedReferencedMediaIds.append(mediaId) - } - } else { - - } - } else { - updatedEmbeddedMedia.append(media) - } - } - for media in previousMedia { - assertionFailure() - if let mediaId = media.id { - var found = false - for updated in updatedMedia { - if let updatedId = updated.id { - if updatedId == mediaId { - found = true - break - } - } - } - if !found { - assertionFailure() - } - } else { - self.messageMediaTable.removeEmbeddedMedia(media) - } - } + self.updateMedia(from: previousMedia, to: message.media) sharedBuffer.reset() @@ -1070,6 +1049,7 @@ final class MessageHistoryTable: Table { var forwardInfo: MessageForwardInfo? if let internalForwardInfo = message.forwardInfo, forwardAuthor = peerTable.get(internalForwardInfo.authorId) { var source: Peer? + if let sourceId = internalForwardInfo.sourceId { source = peerTable.get(sourceId) } @@ -1260,7 +1240,7 @@ final class MessageHistoryTable: Table { func maxReadIndex(peerId: PeerId) -> MessageHistoryAnchorIndex? { if let combinedState = self.readStateTable.getCombinedState(peerId), state = combinedState.states.first where state.1.count != 0 { - return self.anchorIndex(MessageId(peerId: peerId, namespace: state.0, id: state.1.maxReadId)) + return self.anchorIndex(MessageId(peerId: peerId, namespace: state.0, id: state.1.maxIncomingReadId)) } return nil } diff --git a/Postbox/MessageHistoryView.swift b/Postbox/MessageHistoryView.swift index bf7b27b039..a6403034f7 100644 --- a/Postbox/MessageHistoryView.swift +++ b/Postbox/MessageHistoryView.swift @@ -470,7 +470,7 @@ public final class MessageHistoryView { var maxNamespaceIndex: MessageIndex? var index = entries.count - 1 for entry in entries.reverse() { - if entry.index.id.namespace == namespace && entry.index.id.id <= state.maxReadId { + if entry.index.id.namespace == namespace && entry.index.id.id <= state.maxIncomingReadId { maxNamespaceIndex = entry.index break } diff --git a/Postbox/PeerReadState.swift b/Postbox/PeerReadState.swift index ab58e09fef..ee3d20ba52 100644 --- a/Postbox/PeerReadState.swift +++ b/Postbox/PeerReadState.swift @@ -1,22 +1,24 @@ public struct PeerReadState: Equatable, CustomStringConvertible { - public let maxReadId: MessageId.Id + public let maxIncomingReadId: MessageId.Id + public let maxOutgoingReadId: MessageId.Id public let maxKnownId: MessageId.Id public let count: Int32 - public init(maxReadId: MessageId.Id, maxKnownId: MessageId.Id, count: Int32) { - self.maxReadId = maxReadId + public init(maxIncomingReadId: MessageId.Id, maxOutgoingReadId: MessageId.Id, maxKnownId: MessageId.Id, count: Int32) { + self.maxIncomingReadId = maxIncomingReadId + self.maxOutgoingReadId = maxOutgoingReadId self.maxKnownId = maxKnownId self.count = count } public var description: String { - return "(PeerReadState maxReadId: \(maxReadId), maxKnownId: \(maxKnownId), count: \(count))" + return "(PeerReadState maxIncomingReadId: \(maxIncomingReadId), maxOutgoingReadId: \(maxOutgoingReadId) maxKnownId: \(maxKnownId), count: \(count)" } } public func ==(lhs: PeerReadState, rhs: PeerReadState) -> Bool { - return lhs.maxReadId == rhs.maxReadId && lhs.maxKnownId == rhs.maxKnownId && lhs.count == rhs.count + return lhs.maxIncomingReadId == rhs.maxIncomingReadId && lhs.maxOutgoingReadId == rhs.maxOutgoingReadId && lhs.maxKnownId == rhs.maxKnownId && lhs.count == rhs.count } public struct CombinedPeerReadState { diff --git a/Postbox/Postbox.swift b/Postbox/Postbox.swift index aa7d968adc..2e9b6b1536 100644 --- a/Postbox/Postbox.swift +++ b/Postbox/Postbox.swift @@ -57,6 +57,10 @@ public final class Modifier { self.postbox?.applyIncomingReadMaxId(messageId) } + public func applyOutgoingReadMaxId(messageId: MessageId) { + self.postbox?.applyOutgoingReadMaxId(messageId) + } + public func applyInteractiveReadMaxId(messageId: MessageId) { self.postbox?.applyInteractiveReadMaxId(messageId) } @@ -221,16 +225,18 @@ public final class Postbox { //self.debugRestoreState("beforeHoles") // debugging unread counters - //self.debugSaveState("afterLogin") //self.debugRestoreState("afterLogin") + //self.debugSaveState("previous") + //self.debugRestoreState("previous") + //#endif self.valueBox = SqliteValueBox(basePath: self.basePath + "/db") self.metadataTable = MetadataTable(valueBox: self.valueBox, tableId: 0) let userVersion: Int32? = self.metadataTable.userVersion() - let currentUserVersion: Int32 = 3 + let currentUserVersion: Int32 = 5 if userVersion != currentUserVersion { self.valueBox.drop() @@ -398,6 +404,10 @@ public final class Postbox { self.messageHistoryTable.applyIncomingReadMaxId(messageId, operationsByPeerId: &self.currentOperationsByPeerId, updatedPeerReadStateOperations: &self.currentUpdatedSynchronizeReadStateOperations) } + private func applyOutgoingReadMaxId(messageId: MessageId) { + self.messageHistoryTable.applyOutgoingReadMaxId(messageId, operationsByPeerId: &self.currentOperationsByPeerId, updatedPeerReadStateOperations: &self.currentUpdatedSynchronizeReadStateOperations) + } + private func applyInteractiveReadMaxId(messageId: MessageId) { self.messageHistoryTable.applyInteractiveMaxReadId(messageId, operationsByPeerId: &self.currentOperationsByPeerId, updatedPeerReadStateOperations: &self.currentUpdatedSynchronizeReadStateOperations) }