diff --git a/Postbox.xcodeproj/project.pbxproj b/Postbox.xcodeproj/project.pbxproj index b4aab6edf5..4bba6c3596 100644 --- a/Postbox.xcodeproj/project.pbxproj +++ b/Postbox.xcodeproj/project.pbxproj @@ -13,6 +13,7 @@ D0079F651D5A457A00A27A2C /* ContactPeersView.swift in Sources */ = {isa = PBXBuildFile; fileRef = D0079F641D5A457A00A27A2C /* ContactPeersView.swift */; }; D0079F6B1D5B3AAB00A27A2C /* PeerNameIndexRepresentation.swift in Sources */ = {isa = PBXBuildFile; fileRef = D0079F6A1D5B3AAB00A27A2C /* PeerNameIndexRepresentation.swift */; }; D00EED1E1C81F28D00341DFF /* MessageHistoryTagsTable.swift in Sources */ = {isa = PBXBuildFile; fileRef = D00EED1D1C81F28D00341DFF /* MessageHistoryTagsTable.swift */; }; + D010B61A1E1E463900C3E282 /* PeerMergedOperationLogIndexTable.swift in Sources */ = {isa = PBXBuildFile; fileRef = D010B6191E1E463900C3E282 /* PeerMergedOperationLogIndexTable.swift */; }; D01F7D9B1CBEC390008765C9 /* MessageHistoryInvalidatedReadStateTable.swift in Sources */ = {isa = PBXBuildFile; fileRef = D01F7D9A1CBEC390008765C9 /* MessageHistoryInvalidatedReadStateTable.swift */; }; D01F7D9D1CBF8586008765C9 /* SynchronizePeerReadStatesView.swift in Sources */ = {isa = PBXBuildFile; fileRef = D01F7D9C1CBF8586008765C9 /* SynchronizePeerReadStatesView.swift */; }; D021E0D41DB4FAE100C6B04F /* ItemCollection.swift in Sources */ = {isa = PBXBuildFile; fileRef = D021E0D31DB4FAE100C6B04F /* ItemCollection.swift */; }; @@ -149,6 +150,7 @@ D0E3A7881B28AE9C00A402D9 /* Coding.swift in Sources */ = {isa = PBXBuildFile; fileRef = D0E3A7871B28AE9C00A402D9 /* Coding.swift */; }; D0E3A79E1B28B50400A402D9 /* Message.swift in Sources */ = {isa = PBXBuildFile; fileRef = D0E3A79D1B28B50400A402D9 /* Message.swift */; }; D0E3A7A21B28B7DC00A402D9 /* Media.swift in Sources */ = {isa = PBXBuildFile; fileRef = D0E3A7A11B28B7DC00A402D9 /* Media.swift */; }; + D0F019FD1E1DA0CC00F05AB3 /* PeerMergedOperationLogView.swift in Sources */ = {isa = PBXBuildFile; fileRef = D0F019FC1E1DA0CC00F05AB3 /* PeerMergedOperationLogView.swift */; }; D0F3CC721DDE1CDC008148FA /* ItemCacheTable.swift in Sources */ = {isa = PBXBuildFile; fileRef = D0F3CC711DDE1CDC008148FA /* ItemCacheTable.swift */; }; D0F3CC741DDE1EB9008148FA /* ItemCacheMetaTable.swift in Sources */ = {isa = PBXBuildFile; fileRef = D0F3CC731DDE1EB9008148FA /* ItemCacheMetaTable.swift */; }; D0F7AB321DCFAB18009AD9A1 /* PeerChatTopIndexableMessageIds.swift in Sources */ = {isa = PBXBuildFile; fileRef = D0F7AB311DCFAB18009AD9A1 /* PeerChatTopIndexableMessageIds.swift */; }; @@ -216,6 +218,7 @@ D0079F641D5A457A00A27A2C /* ContactPeersView.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = ContactPeersView.swift; sourceTree = ""; }; D0079F6A1D5B3AAB00A27A2C /* PeerNameIndexRepresentation.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = PeerNameIndexRepresentation.swift; sourceTree = ""; }; D00EED1D1C81F28D00341DFF /* MessageHistoryTagsTable.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = MessageHistoryTagsTable.swift; sourceTree = ""; }; + D010B6191E1E463900C3E282 /* PeerMergedOperationLogIndexTable.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = PeerMergedOperationLogIndexTable.swift; sourceTree = ""; }; D01F7D9A1CBEC390008765C9 /* MessageHistoryInvalidatedReadStateTable.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = MessageHistoryInvalidatedReadStateTable.swift; sourceTree = ""; }; D01F7D9C1CBF8586008765C9 /* SynchronizePeerReadStatesView.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = SynchronizePeerReadStatesView.swift; sourceTree = ""; }; D021E0D31DB4FAE100C6B04F /* ItemCollection.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = ItemCollection.swift; sourceTree = ""; }; @@ -305,6 +308,7 @@ D0E3A7871B28AE9C00A402D9 /* Coding.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = Coding.swift; sourceTree = ""; }; D0E3A79D1B28B50400A402D9 /* Message.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = Message.swift; sourceTree = ""; }; D0E3A7A11B28B7DC00A402D9 /* Media.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = Media.swift; sourceTree = ""; }; + D0F019FC1E1DA0CC00F05AB3 /* PeerMergedOperationLogView.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = PeerMergedOperationLogView.swift; sourceTree = ""; }; D0F3CC711DDE1CDC008148FA /* ItemCacheTable.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = ItemCacheTable.swift; sourceTree = ""; }; D0F3CC731DDE1EB9008148FA /* ItemCacheMetaTable.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = ItemCacheMetaTable.swift; sourceTree = ""; }; D0F7AB311DCFAB18009AD9A1 /* PeerChatTopIndexableMessageIds.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = PeerChatTopIndexableMessageIds.swift; sourceTree = ""; }; @@ -444,6 +448,7 @@ D07827C21E008F7300071108 /* PeerNameTokenIndexTable.swift */, D07827C41E00B23F00071108 /* PeerNameIndexTable.swift */, D0AD23261E194D1C00A7089A /* PeerOperationLogTable.swift */, + D010B6191E1E463900C3E282 /* PeerMergedOperationLogIndexTable.swift */, D0AD23281E196B6400A7089A /* PeerOperationLogMetadataTable.swift */, ); name = Tables; @@ -509,6 +514,7 @@ D0DF0C8E1D81A350008AEB01 /* PeerView.swift */, D021E0DB1DB5237C00C6B04F /* ItemCollectionsView.swift */, D0A18D661E16874D004C6734 /* UnreadMessageCountsView.swift */, + D0F019FC1E1DA0CC00F05AB3 /* PeerMergedOperationLogView.swift */, ); name = Views; sourceTree = ""; @@ -871,6 +877,7 @@ D08C713E1C512EA500779C0F /* MessageHistoryTable.swift in Sources */, D0079F5A1D592E8B00A27A2C /* ContactTable.swift in Sources */, D0AB0B8C1D65D488002C78E7 /* MessageHistoryHolesView.swift in Sources */, + D010B61A1E1E463900C3E282 /* PeerMergedOperationLogIndexTable.swift in Sources */, D0F9E8671C58D08900037222 /* ChatListIndexTable.swift in Sources */, D0E3A7A21B28B7DC00A402D9 /* Media.swift in Sources */, D0E3A7881B28AE9C00A402D9 /* Coding.swift in Sources */, @@ -894,6 +901,7 @@ D03120FA1DA540F0006A2A60 /* CachedPeerData.swift in Sources */, D07516791B2EC90400AE42E0 /* SQLite-Bridging.m in Sources */, D0977FA01B8244D7009994B2 /* SqliteValueBox.swift in Sources */, + D0F019FD1E1DA0CC00F05AB3 /* PeerMergedOperationLogView.swift in Sources */, D0F9E8651C58CB7F00037222 /* ChatListTable.swift in Sources */, D0F9E8711C5A0E9B00037222 /* PeerTable.swift in Sources */, D07CFF851DCA99C400761F81 /* InitialMessageHistoryData.swift in Sources */, diff --git a/Postbox/Coding.swift b/Postbox/Coding.swift index af779414c7..45ea6857f4 100644 --- a/Postbox/Coding.swift +++ b/Postbox/Coding.swift @@ -38,9 +38,9 @@ public func persistentHash32(_ string: String) -> Int32 { private let emptyMemory = malloc(1)! public class MemoryBuffer: Equatable, CustomStringConvertible { - var memory: UnsafeMutableRawPointer + public internal(set) var memory: UnsafeMutableRawPointer var capacity: Int - var length: Int + public internal(set) var length: Int var freeWhenDone: Bool public init(copyOf buffer: MemoryBuffer) { diff --git a/Postbox/Message.swift b/Postbox/Message.swift index e2ad1c66d9..f358da19ce 100644 --- a/Postbox/Message.swift +++ b/Postbox/Message.swift @@ -209,6 +209,10 @@ public struct MessageFlags: OptionSet { rawValue |= MessageFlags.TopIndexable.rawValue } + if flags.contains(StoreMessageFlags.Sending) { + rawValue |= MessageFlags.Sending.rawValue + } + self.rawValue = rawValue } @@ -217,6 +221,7 @@ public struct MessageFlags: OptionSet { public static let Incoming = MessageFlags(rawValue: 4) public static let Personal = MessageFlags(rawValue: 8) public static let TopIndexable = MessageFlags(rawValue: 16) + public static let Sending = MessageFlags(rawValue: 32) } public struct StoreMessageForwardInfo { @@ -324,11 +329,42 @@ public struct StoreMessageFlags: OptionSet { self.rawValue = 0 } + public init(_ flags: MessageFlags) { + var rawValue: UInt32 = 0 + + if flags.contains(.Unsent) { + rawValue |= StoreMessageFlags.Unsent.rawValue + } + + if flags.contains(.Failed) { + rawValue |= StoreMessageFlags.Failed.rawValue + } + + if flags.contains(.Incoming) { + rawValue |= StoreMessageFlags.Incoming.rawValue + } + + if flags.contains(.Personal) { + rawValue |= StoreMessageFlags.Personal.rawValue + } + + if flags.contains(.TopIndexable) { + rawValue |= StoreMessageFlags.TopIndexable.rawValue + } + + if flags.contains(.Sending) { + rawValue |= StoreMessageFlags.Sending.rawValue + } + + self.rawValue = rawValue + } + public static let Unsent = StoreMessageFlags(rawValue: 1) public static let Failed = StoreMessageFlags(rawValue: 2) public static let Incoming = StoreMessageFlags(rawValue: 4) public static let Personal = StoreMessageFlags(rawValue: 8) public static let TopIndexable = StoreMessageFlags(rawValue: 16) + public static let Sending = StoreMessageFlags(rawValue: 32) } public enum StoreMessageId { diff --git a/Postbox/MessageHistoryIndexTable.swift b/Postbox/MessageHistoryIndexTable.swift index ebf2df414c..02acd75da6 100644 --- a/Postbox/MessageHistoryIndexTable.swift +++ b/Postbox/MessageHistoryIndexTable.swift @@ -151,8 +151,12 @@ final class MessageHistoryIndexTable: Table { func ensureInitialized(_ peerId: PeerId, operations: inout [MessageHistoryIndexOperation]) { if !self.metadataTable.isInitialized(peerId) { - for namespace in self.seedConfiguration.initializeMessageNamespacesWithHoles { - self.justInsertHole(MessageHistoryHole(stableId: self.metadataTable.getNextStableMessageIndexId(), maxIndex: MessageIndex(id: MessageId(peerId: peerId, namespace: namespace, id: Int32.max), timestamp: Int32.max), min: 1, tags: MessageTags.All.rawValue), operations: &operations) + var processedMessageNamespaces = Set() + for (peerNamespace, messageNamespace) in self.seedConfiguration.initializeMessageNamespacesWithHoles { + if peerId.namespace == peerNamespace, !processedMessageNamespaces.contains(messageNamespace) { + processedMessageNamespaces.insert(messageNamespace) + self.justInsertHole(MessageHistoryHole(stableId: self.metadataTable.getNextStableMessageIndexId(), maxIndex: MessageIndex(id: MessageId(peerId: peerId, namespace: messageNamespace, id: Int32.max), timestamp: Int32.max), min: 1, tags: MessageTags.All.rawValue), operations: &operations) + } } self.metadataTable.setInitialized(peerId) diff --git a/Postbox/MessageHistoryTable.swift b/Postbox/MessageHistoryTable.swift index 0d2f874a7f..d5c7ec0ace 100644 --- a/Postbox/MessageHistoryTable.swift +++ b/Postbox/MessageHistoryTable.swift @@ -782,7 +782,7 @@ final class MessageHistoryTable: Table { } } - if previousMediaIds != updatedMediaIds || index.id != message.id { + if previousMediaIds != updatedMediaIds || index != MessageIndex(message) { for (_, media) in previousEmbeddedMediaWithIds { self.messageMediaTable.removeEmbeddedMedia(media) } @@ -1278,13 +1278,21 @@ final class MessageHistoryTable: Table { } var author: Peer? + var peers = SimpleDictionary() if let authorId = message.authorId { author = peerTable.get(authorId) } - var peers = SimpleDictionary() if let chatPeer = peerTable.get(message.id.peerId) { peers[chatPeer.id] = chatPeer + + if let associatedPeerIds = chatPeer.associatedPeerIds { + for peerId in associatedPeerIds { + if let peer = peerTable.get(peerId) { + peers[peer.id] = peer + } + } + } } for media in parsedMedia { diff --git a/Postbox/Peer.swift b/Postbox/Peer.swift index f5c9a454e2..12b070bc13 100644 --- a/Postbox/Peer.swift +++ b/Postbox/Peer.swift @@ -14,11 +14,13 @@ public struct PeerId: Hashable, CustomStringConvertible, Comparable { public init(_ n: Int64) { self.namespace = Int32((n >> 32) & 0x7fffffff) - self.id = Int32(n & 0x7fffffff) + self.id = unsafeBitCast(UInt32(n & 0xffffffff), to: Int32.self) } public func toInt64() -> Int64 { - return (Int64(self.namespace) << 32) | Int64(self.id) + + + return (Int64(self.namespace) << 32) | unsafeBitCast(UInt64(unsafeBitCast(self.id, to: UInt32.self)), to: Int64.self) } public static func encodeArrayToBuffer(_ array: [PeerId], buffer: WriteBuffer) { @@ -95,6 +97,7 @@ public func <(lhs: PeerId, rhs: PeerId) -> Bool { public protocol Peer: class, Coding { var id: PeerId { get } var indexName: PeerIndexNameRepresentation { get } + var associatedPeerIds: [PeerId]? { get } func isEqual(_ other: Peer) -> Bool } diff --git a/Postbox/PeerMergedOperationLogIndexTable.swift b/Postbox/PeerMergedOperationLogIndexTable.swift new file mode 100644 index 0000000000..a7cc240f14 --- /dev/null +++ b/Postbox/PeerMergedOperationLogIndexTable.swift @@ -0,0 +1,62 @@ +import Foundation + +final class PeerMergedOperationLogIndexTable: Table { + static func tableSpec(_ id: Int32) -> ValueBoxTable { + return ValueBoxTable(id: id, keyType: .binary) + } + + private let metadataTable: PeerOperationLogMetadataTable + + init(valueBox: ValueBox, table: ValueBoxTable, metadataTable: PeerOperationLogMetadataTable) { + self.metadataTable = metadataTable + + super.init(valueBox: valueBox, table: table) + } + + private func key(tag: PeerOperationLogTag, index: Int32) -> ValueBoxKey { + let key = ValueBoxKey(length: 5) + key.setUInt8(0, value: tag.rawValue) + key.setInt32(1, value: index) + return key + } + + func add(peerId: PeerId, tag: PeerOperationLogTag, tagLocalIndex: Int32) -> Int32 { + let index = self.metadataTable.takeNextMergedIndex() + let buffer = WriteBuffer() + var peerIdValue: Int64 = peerId.toInt64() + var tagLocalIndexValue: Int32 = tagLocalIndex + buffer.write(&peerIdValue, offset: 0, length: 8) + buffer.write(&tagLocalIndexValue, offset: 0, length: 4) + self.valueBox.set(self.table, key: self.key(tag: tag, index: index), value: buffer) + return index + } + + func remove(tag: PeerOperationLogTag, mergedIndices: [Int32]) { + for index in mergedIndices { + self.valueBox.remove(self.table, key: self.key(tag: tag, index: index)) + } + } + + func getTagLocalIndices(tag: PeerOperationLogTag, fromMergedIndex: Int32, limit: Int) -> [(PeerId, Int32)] { + var result: [(PeerId, Int32)] = [] + self.valueBox.range(self.table, start: self.key(tag: tag, index: fromMergedIndex == 0 ? 0 : fromMergedIndex - 1), end: self.key(tag: tag, index: Int32.max), values: { key, value in + var peerIdValue: Int64 = 0 + var tagLocalIndexValue: Int32 = 0 + value.read(&peerIdValue, offset: 0, length: 8) + value.read(&tagLocalIndexValue, offset: 0, length: 4) + result.append((PeerId(peerIdValue), tagLocalIndexValue)) + return true + }, limit: limit) + return result + } + + func tailIndex(tag: PeerOperationLogTag) -> Int32? { + var result: Int32? + self.valueBox.range(self.table, start: self.key(tag: tag, index: Int32.max), end: self.key(tag: tag, index: 0), keys: { + key in + result = key.getInt32(1) + return false + }, limit: 1) + return result + } +} diff --git a/Postbox/PeerMergedOperationLogView.swift b/Postbox/PeerMergedOperationLogView.swift new file mode 100644 index 0000000000..73171c16a7 --- /dev/null +++ b/Postbox/PeerMergedOperationLogView.swift @@ -0,0 +1,96 @@ +import Foundation + +final class MutablePeerMergedOperationLogView { + let tag: PeerOperationLogTag + var entries: [PeerMergedOperationLogEntry] + var tailIndex: Int32? + let limit: Int + + init(tag: PeerOperationLogTag, limit: Int, getOperations: (PeerOperationLogTag, Int32, Int) -> [PeerMergedOperationLogEntry], getTailIndex: (PeerOperationLogTag) -> Int32?) { + self.tag = tag + self.entries = getOperations(tag, 0, limit) + self.tailIndex = getTailIndex(tag) + self.limit = limit + } + + func replay(operations: [PeerMergedOperationLogOperation], getOperations: (PeerOperationLogTag, Int32, Int) -> [PeerMergedOperationLogEntry], getTailIndex: (PeerOperationLogTag) -> Int32?) -> Bool { + var updated = false + var invalidatedTail = false + + for operation in operations { + switch operation { + case let .append(entry): + if entry.tag == self.tag { + if let tailIndex = self.tailIndex { + assert(entry.mergedIndex > tailIndex) + self.tailIndex = entry.mergedIndex + if self.entries.count < self.limit { + self.entries.append(entry) + updated = true + } + } else { + updated = true + assert(self.entries.isEmpty) + self.entries.append(entry) + self.tailIndex = entry.mergedIndex + } + } + case let .remove(tag, mergedIndices): + if tag == self.tag { + updated = true + for i in (0 ..< self.entries.count).reversed() { + if mergedIndices.contains(self.entries[i].mergedIndex) { + self.entries.remove(at: i) + } + } + if let tailIndex = self.tailIndex, mergedIndices.contains(tailIndex) { + self.tailIndex = nil + invalidatedTail = true + } + } + } + } + + if updated { + if invalidatedTail { + self.tailIndex = getTailIndex(self.tag) + } + if self.entries.count < self.limit { + if let tailIndex = self.tailIndex { + if self.entries.isEmpty || self.entries.last!.mergedIndex < tailIndex { + var fromIndex: Int32 = 0 + if !self.entries.isEmpty { + fromIndex = self.entries.last!.mergedIndex + 1 + } + for entry in getOperations(self.tag, fromIndex, self.limit - self.entries.count) { + self.entries.append(entry) + } + for i in 0 ..< self.entries.count { + if i != 0 { + assert(self.entries[i].mergedIndex == self.entries[i - 1].mergedIndex + 1) + } + } + if !self.entries.isEmpty { + assert(self.entries.last!.mergedIndex <= tailIndex) + } + } + } + } else { + assert(self.tailIndex != nil) + if let tailIndex = self.tailIndex { + assert(self.entries.last!.mergedIndex == tailIndex) + } + } + } + + return updated + } +} + +public final class PeerMergedOperationLogView { + public let entries: [PeerMergedOperationLogEntry] + + init(_ view: MutablePeerMergedOperationLogView) { + self.entries = view.entries + } +} diff --git a/Postbox/PeerNameIndexTable.swift b/Postbox/PeerNameIndexTable.swift index ebd23ea4e1..f5991b76c0 100644 --- a/Postbox/PeerNameIndexTable.swift +++ b/Postbox/PeerNameIndexTable.swift @@ -173,7 +173,7 @@ final class PeerNameIndexTable: Table { if let peer = self.peerTable.get(peerId) { updatedTokens = peer.indexName.indexTokens } else { - assertionFailure() + //assertionFailure() updatedTokens = [] } } diff --git a/Postbox/PeerOperationLogMetadataTable.swift b/Postbox/PeerOperationLogMetadataTable.swift index ae413c7ec7..aacfda417b 100644 --- a/Postbox/PeerOperationLogMetadataTable.swift +++ b/Postbox/PeerOperationLogMetadataTable.swift @@ -2,6 +2,7 @@ import Foundation private enum PeerOperationLogIndexNamespace: Int8 { case nextTagLocalIndex = 0 + case nextMergedIndex = 1 } final class PeerOperationLogMetadataTable: Table { @@ -9,11 +10,17 @@ final class PeerOperationLogMetadataTable: Table { return ValueBoxTable(id: id, keyType: .binary) } + private func keyForNextMergedIndex() -> ValueBoxKey { + let key = ValueBoxKey(length: 1) + key.setInt8(0, value: PeerOperationLogIndexNamespace.nextMergedIndex.rawValue) + return key + } + private func keyForNextLocalIndex(peerId: PeerId, tag: PeerOperationLogTag) -> ValueBoxKey { let key = ValueBoxKey(length: 1 + 8 + 1) key.setInt8(0, value: PeerOperationLogIndexNamespace.nextTagLocalIndex.rawValue) key.setInt64(1, value: peerId.toInt64()) - key.setInt8(9, value: tag.rawValue) + key.setUInt8(9, value: tag.rawValue) return key } @@ -35,7 +42,29 @@ final class PeerOperationLogMetadataTable: Table { func takeNextLocalIndex(peerId: PeerId, tag: PeerOperationLogTag) -> Int32 { let index = self.getNextLocalIndex(peerId: peerId, tag: tag) - self.setNextLocalIndex(peerId: peerId, tag: tag, index: index) + self.setNextLocalIndex(peerId: peerId, tag: tag, index: index + 1) + return index + } + + func getNextMergedIndex() -> Int32 { + if let value = self.valueBox.get(self.table, key: self.keyForNextMergedIndex()) { + assert(value.length == 4) + var index: Int32 = 0 + value.read(&index, offset: 0, length: 4) + return index + } else { + return 1 + } + } + + private func setNextMergedIndex(_ index: Int32) { + var indexValue: Int32 = index + self.valueBox.set(self.table, key: self.keyForNextMergedIndex(), value: MemoryBuffer(memory: &indexValue, capacity: 4, length: 4, freeWhenDone: false)) + } + + func takeNextMergedIndex() -> Int32 { + let index = self.getNextMergedIndex() + self.setNextMergedIndex(index + 1) return index } } diff --git a/Postbox/PeerOperationLogTable.swift b/Postbox/PeerOperationLogTable.swift index a6d83f96f9..c2fb7a048d 100644 --- a/Postbox/PeerOperationLogTable.swift +++ b/Postbox/PeerOperationLogTable.swift @@ -1,24 +1,119 @@ import Foundation -public enum PeerOperationLogTag: Int8 { - case inbox = 0 - case outbox = 1 +enum PeerMergedOperationLogOperation { + case append(PeerMergedOperationLogEntry) + case remove(tag: PeerOperationLogTag, mergedIndices: Set) +} + +public struct PeerMergedOperationLogEntry { + public let peerId: PeerId + public let tag: PeerOperationLogTag + public let tagLocalIndex: Int32 + public let mergedIndex: Int32 + public let contents: Coding +} + +public enum StorePeerOperationLogEntryTagLocalIndex { + case automatic + case manual(Int32) +} + +public enum StorePeerOperationLogEntryTagMergedIndex { + case none + case automatic } public struct PeerOperationLogEntry { - let tagLocalIndex: Int32 - let contents: Coding + public let peerId: PeerId + public let tag: PeerOperationLogTag + public let tagLocalIndex: Int32 + public let mergedIndex: Int32? + public let contents: Coding + + public func withUpdatedContents(_ contents: Coding) -> PeerOperationLogEntry { + return PeerOperationLogEntry(peerId: self.peerId, tag: self.tag, tagLocalIndex: self.tagLocalIndex, mergedIndex: self.mergedIndex, contents: contents) + } + + public var mergedEntry: PeerMergedOperationLogEntry? { + if let mergedIndex = self.mergedIndex { + return PeerMergedOperationLogEntry(peerId: self.peerId, tag: self.tag, tagLocalIndex: self.tagLocalIndex, mergedIndex: mergedIndex, contents: self.contents) + } else { + return nil + } + } } -final class PeerOperationLogQueue: Table { +public struct PeerOperationLogTag: Equatable { + let rawValue: UInt8 + + public init(value: Int) { + self.rawValue = UInt8(value) + } + + public static func ==(lhs: PeerOperationLogTag, rhs: PeerOperationLogTag) -> Bool { + return lhs.rawValue == rhs.rawValue + } +} + +public enum PeerOperationLogEntryUpdateContents { + case none + case update(Coding) +} + +public enum PeerOperationLogEntryUpdateTagMergedIndex { + case none + case remove + case newAutomatic +} + +public struct PeerOperationLogEntryUpdate { + let mergedIndex: PeerOperationLogEntryUpdateTagMergedIndex + let contents: PeerOperationLogEntryUpdateContents + + public init(mergedIndex: PeerOperationLogEntryUpdateTagMergedIndex, contents: PeerOperationLogEntryUpdateContents) { + self.mergedIndex = mergedIndex + self.contents = contents + } +} + +private func parseEntry(peerId: PeerId, tag: PeerOperationLogTag, tagLocalIndex: Int32, _ value: ReadBuffer) -> PeerOperationLogEntry? { + var hasMergedIndex: Int8 = 0 + value.read(&hasMergedIndex, offset: 0, length: 1) + var mergedIndex: Int32? + if hasMergedIndex != 0 { + var mergedIndexValue: Int32 = 0 + value.read(&mergedIndexValue, offset: 0, length: 4) + mergedIndex = mergedIndexValue + } + var contentLength: Int32 = 0 + value.read(&contentLength, offset: 0, length: 4) + assert(value.length - value.offset == Int(contentLength)) + if let contents = Decoder(buffer: MemoryBuffer(memory: value.memory.advanced(by: value.offset), capacity: Int(contentLength), length: Int(contentLength), freeWhenDone: false)).decodeRootObject() { + return PeerOperationLogEntry(peerId: peerId, tag: tag, tagLocalIndex: tagLocalIndex, mergedIndex: mergedIndex, contents: contents) + } else { + return nil + } +} + +private func parseMergedEntry(peerId: PeerId, tag: PeerOperationLogTag, tagLocalIndex: Int32, _ value: ReadBuffer) -> PeerMergedOperationLogEntry? { + if let entry = parseEntry(peerId: peerId, tag: tag, tagLocalIndex: tagLocalIndex, value), let mergedIndex = entry.mergedIndex { + return PeerMergedOperationLogEntry(peerId: entry.peerId, tag: entry.tag, tagLocalIndex: entry.tagLocalIndex, mergedIndex: mergedIndex, contents: entry.contents) + } else { + return nil + } +} + +final class PeerOperationLogTable: Table { static func tableSpec(_ id: Int32) -> ValueBoxTable { return ValueBoxTable(id: id, keyType: .binary) } private let metadataTable: PeerOperationLogMetadataTable + private let mergedIndexTable: PeerMergedOperationLogIndexTable - init(valueBox: ValueBox, table: ValueBoxTable, metadataTable: PeerOperationLogMetadataTable) { + init(valueBox: ValueBox, table: ValueBoxTable, metadataTable: PeerOperationLogMetadataTable, mergedIndexTable: PeerMergedOperationLogIndexTable) { self.metadataTable = metadataTable + self.mergedIndexTable = mergedIndexTable super.init(valueBox: valueBox, table: table) } @@ -26,28 +121,229 @@ final class PeerOperationLogQueue: Table { private func key(peerId: PeerId, tag: PeerOperationLogTag, index: Int32) -> ValueBoxKey { let key = ValueBoxKey(length: 8 + 1 + 4) key.setInt64(0, value: peerId.toInt64()) - key.setInt8(1, value: tag.rawValue) + key.setUInt8(1, value: tag.rawValue) key.setInt32(9, value: index) return key } - func addEntryAndTakeNextTagLocalIndex(peerId: PeerId, tag: PeerOperationLogTag, contents: Coding) -> Int32 { - let index = self.metadataTable.takeNextLocalIndex(peerId: peerId, tag: tag) - let encoder = Encoder() - encoder.encodeRootObject(contents) - self.valueBox.set(self.table, key: self.key(peerId: peerId, tag: tag, index: index), value: encoder.readBufferNoCopy()) - return index + func getNextEntryLocalIndex(peerId: PeerId, tag: PeerOperationLogTag) -> Int32 { + return self.metadataTable.getNextLocalIndex(peerId: peerId, tag: tag) } - func removeEntries(peerId: PeerId, tag: PeerOperationLogTag, withIndicesLowerThan index: Int32) { + func addEntry(peerId: PeerId, tag: PeerOperationLogTag, tagLocalIndex: StorePeerOperationLogEntryTagLocalIndex, tagMergedIndex: StorePeerOperationLogEntryTagMergedIndex, contents: Coding, operations: inout [PeerMergedOperationLogOperation]) { + let index: Int32 + switch tagLocalIndex { + case .automatic: + index = self.metadataTable.takeNextLocalIndex(peerId: peerId, tag: tag) + case let .manual(manualIndex): + index = manualIndex + } + + var mergedIndex: Int32? + switch tagMergedIndex { + case .automatic: + mergedIndex = self.mergedIndexTable.add(peerId: peerId, tag: tag, tagLocalIndex: index) + case .none: + break + } + + let buffer = WriteBuffer() + var hasMergedIndex: Int8 = mergedIndex != nil ? 1 : 0 + buffer.write(&hasMergedIndex, offset: 0, length: 1) + if let mergedIndex = mergedIndex { + var mergedIndexValue: Int32 = mergedIndex + buffer.write(&mergedIndexValue, offset: 0, length: 4) + } + + let encoder = Encoder() + encoder.encodeRootObject(contents) + let contentBuffer = encoder.readBufferNoCopy() + var contentBufferLength: Int32 = Int32(contentBuffer.length) + buffer.write(&contentBufferLength, offset: 0, length: 4) + buffer.write(contentBuffer.memory, offset: 0, length: contentBuffer.length) + + self.valueBox.set(self.table, key: self.key(peerId: peerId, tag: tag, index: index), value: buffer) + if let mergedIndex = mergedIndex { + operations.append(.append(PeerMergedOperationLogEntry(peerId: peerId, tag: tag, tagLocalIndex: index, mergedIndex: mergedIndex, contents: contents))) + } + } + + func removeEntry(peerId: PeerId, tag: PeerOperationLogTag, tagLocalIndex index: Int32, operations: inout [PeerMergedOperationLogOperation]) -> Bool { var indices: [Int32] = [] - self.valueBox.range(self.table, start: self.key(peerId: peerId, tag: tag, index: 0).predecessor, end: self.key(peerId: peerId, tag: tag, index: index), keys: { key in - indices.append(key.getInt32(9)) + var mergedIndices: [Int32] = [] + var removed = false + if let value = self.valueBox.get(self.table, key: self.key(peerId: peerId, tag: tag, index: index)) { + indices.append(index) + var hasMergedIndex: Int8 = 0 + value.read(&hasMergedIndex, offset: 0, length: 1) + if hasMergedIndex != 0 { + var mergedIndex: Int32 = 0 + value.read(&mergedIndex, offset: 0, length: 4) + mergedIndices.append(mergedIndex) + } + removed = true + } + + for index in indices { + self.valueBox.remove(self.table, key: self.key(peerId: peerId, tag: tag, index: index)) + } + + if !mergedIndices.isEmpty { + self.mergedIndexTable.remove(tag: tag, mergedIndices: mergedIndices) + operations.append(.remove(tag: tag, mergedIndices: Set(mergedIndices))) + } + return removed + } + + func removeAllEntries(peerId: PeerId, tag: PeerOperationLogTag, operations: inout [PeerMergedOperationLogOperation]) { + var indices: [Int32] = [] + var mergedIndices: [Int32] = [] + self.valueBox.range(self.table, start: self.key(peerId: peerId, tag: tag, index: 0).predecessor, end: self.key(peerId: peerId, tag: tag, index: Int32.max).successor, values: { key, value in + let index = key.getInt32(9) + indices.append(index) + var hasMergedIndex: Int8 = 0 + value.read(&hasMergedIndex, offset: 0, length: 1) + if hasMergedIndex != 0 { + var mergedIndex: Int32 = 0 + value.read(&mergedIndex, offset: 0, length: 4) + mergedIndices.append(mergedIndex) + } return true }, limit: 0) for index in indices { self.valueBox.remove(self.table, key: self.key(peerId: peerId, tag: tag, index: index)) } + + if !mergedIndices.isEmpty { + self.mergedIndexTable.remove(tag: tag, mergedIndices: mergedIndices) + operations.append(.remove(tag: tag, mergedIndices: Set(mergedIndices))) + } + } + + func removeEntries(peerId: PeerId, tag: PeerOperationLogTag, withTagLocalIndicesEqualToOrLowerThan maxTagLocalIndex: Int32, operations: inout [PeerMergedOperationLogOperation]) { + var indices: [Int32] = [] + var mergedIndices: [Int32] = [] + self.valueBox.range(self.table, start: self.key(peerId: peerId, tag: tag, index: 0).predecessor, end: self.key(peerId: peerId, tag: tag, index: maxTagLocalIndex).successor, values: { key, value in + let index = key.getInt32(9) + indices.append(index) + var hasMergedIndex: Int8 = 0 + value.read(&hasMergedIndex, offset: 0, length: 1) + if hasMergedIndex != 0 { + var mergedIndex: Int32 = 0 + value.read(&mergedIndex, offset: 0, length: 4) + mergedIndices.append(mergedIndex) + } + return true + }, limit: 0) + + for index in indices { + self.valueBox.remove(self.table, key: self.key(peerId: peerId, tag: tag, index: index)) + } + + if !mergedIndices.isEmpty { + self.mergedIndexTable.remove(tag: tag, mergedIndices: mergedIndices) + operations.append(.remove(tag: tag, mergedIndices: Set(mergedIndices))) + } + } + + func getMergedEntries(tag: PeerOperationLogTag, fromIndex: Int32, limit: Int) -> [PeerMergedOperationLogEntry] { + var entries: [PeerMergedOperationLogEntry] = [] + for (peerId, tagLocalIndex) in self.mergedIndexTable.getTagLocalIndices(tag: tag, fromMergedIndex: fromIndex, limit: limit) { + if let value = self.valueBox.get(self.table, key: self.key(peerId: peerId, tag: tag, index: tagLocalIndex)) { + if let entry = parseMergedEntry(peerId: peerId, tag: tag, tagLocalIndex: tagLocalIndex, value) { + entries.append(entry) + } else { + assertionFailure() + } + } else { + assertionFailure() + } + } + return entries + } + + func enumerateEntries(peerId: PeerId, tag: PeerOperationLogTag, _ f: (PeerOperationLogEntry) -> Bool) { + self.valueBox.range(self.table, start: self.key(peerId: peerId, tag: tag, index: 0).predecessor, end: self.key(peerId: peerId, tag: tag, index: Int32.max).successor, values: { key, value in + if let entry = parseEntry(peerId: peerId, tag: tag, tagLocalIndex: key.getInt32(9), value) { + if !f(entry) { + return false + } + } else { + assertionFailure() + } + return true + }, limit: 0) + } + + func updateEntry(peerId: PeerId, tag: PeerOperationLogTag, tagLocalIndex: Int32, f: (PeerOperationLogEntry?) -> PeerOperationLogEntryUpdate, operations: inout [PeerMergedOperationLogOperation]) { + let key = self.key(peerId: peerId, tag: tag, index: tagLocalIndex) + if let value = self.valueBox.get(self.table, key: key) { + var hasMergedIndex: Int8 = 0 + value.read(&hasMergedIndex, offset: 0, length: 1) + var mergedIndex: Int32? + if hasMergedIndex != 0 { + var mergedIndexValue: Int32 = 0 + value.read(&mergedIndexValue, offset: 0, length: 4) + mergedIndex = mergedIndexValue + } + let previousMergedIndex = mergedIndex + var contentLength: Int32 = 0 + value.read(&contentLength, offset: 0, length: 4) + assert(value.length - value.offset == Int(contentLength)) + if let contents = Decoder(buffer: MemoryBuffer(memory: value.memory.advanced(by: value.offset), capacity: Int(contentLength), length: Int(contentLength), freeWhenDone: false)).decodeRootObject() { + let entryUpdate = f(PeerOperationLogEntry(peerId: peerId, tag: tag, tagLocalIndex: tagLocalIndex, mergedIndex: mergedIndex, contents: contents)) + switch entryUpdate.mergedIndex { + case .none: + break + case .remove: + if let mergedIndexValue = mergedIndex { + mergedIndex = nil + self.mergedIndexTable.remove(tag: tag, mergedIndices: [mergedIndexValue]) + operations.append(.remove(tag: tag, mergedIndices: Set([mergedIndexValue]))) + } + case .newAutomatic: + if let mergedIndexValue = mergedIndex { + self.mergedIndexTable.remove(tag: tag, mergedIndices: [mergedIndexValue]) + operations.append(.remove(tag: tag, mergedIndices: Set([mergedIndexValue]))) + } + let updatedMergedIndexValue = self.mergedIndexTable.add(peerId: peerId, tag: tag, tagLocalIndex: tagLocalIndex) + mergedIndex = updatedMergedIndexValue + operations.append(.append(PeerMergedOperationLogEntry(peerId: peerId, tag: tag, tagLocalIndex: tagLocalIndex, mergedIndex: updatedMergedIndexValue, contents: contents))) + } + var updatedContents: Coding? + switch entryUpdate.contents { + case .none: + break + case let .update(contents): + updatedContents = contents + } + if previousMergedIndex != mergedIndex || updatedContents != nil { + let buffer = WriteBuffer() + var hasMergedIndex: Int8 = mergedIndex != nil ? 1 : 0 + buffer.write(&hasMergedIndex, offset: 0, length: 1) + if let mergedIndex = mergedIndex { + var mergedIndexValue: Int32 = mergedIndex + buffer.write(&mergedIndexValue, offset: 0, length: 4) + } + + let encoder = Encoder() + if let updatedContents = updatedContents { + encoder.encodeRootObject(updatedContents) + } else { + encoder.encodeRootObject(contents) + } + let contentBuffer = encoder.readBufferNoCopy() + var contentBufferLength: Int32 = Int32(contentBuffer.length) + buffer.write(&contentBufferLength, offset: 0, length: 4) + buffer.write(contentBuffer.memory, offset: 0, length: contentBuffer.length) + self.valueBox.set(self.table, key: key, value: buffer) + } + } else { + assertionFailure() + } + } else { + f(nil) + } } } diff --git a/Postbox/PeerView.swift b/Postbox/PeerView.swift index 391b292ca0..b5ade8d940 100644 --- a/Postbox/PeerView.swift +++ b/Postbox/PeerView.swift @@ -15,6 +15,11 @@ final class MutablePeerView { self.peerIsContact = peerIsContact var peerIds = Set() peerIds.insert(peerId) + if let peer = getPeer(peerId), let associatedPeerIds = peer.associatedPeerIds { + for peerId in associatedPeerIds { + peerIds.insert(peerId) + } + } if let cachedData = cachedData { peerIds.formUnion(cachedData.peerIds) } @@ -26,6 +31,16 @@ final class MutablePeerView { self.peerPresences[id] = presence } } + if let peer = self.peers[peerId], let associatedPeerIds = peer.associatedPeerIds { + for id in associatedPeerIds { + if let peer = getPeer(id) { + self.peers[id] = peer + } + if let presence = getPeerPresence(id) { + self.peerPresences[id] = presence + } + } + } } func replay(updatedPeers: [PeerId: Peer], updatedNotificationSettings: [PeerId: PeerNotificationSettings], updatedCachedPeerData: [PeerId: CachedPeerData], updatedPeerPresences: [PeerId: PeerPresence], replaceContactPeerIds: Set?, getPeer: (PeerId) -> Peer?, getPeerPresence: (PeerId) -> PeerPresence?) -> Bool { @@ -37,6 +52,11 @@ final class MutablePeerView { var peerIds = Set() peerIds.insert(self.peerId) + if let peer = getPeer(self.peerId), let associatedPeerIds = peer.associatedPeerIds { + for peerId in associatedPeerIds { + peerIds.insert(peerId) + } + } peerIds.formUnion(cachedData.peerIds) for id in peerIds { @@ -77,6 +97,11 @@ final class MutablePeerView { } else { var peerIds = Set() peerIds.insert(self.peerId) + if let peer = getPeer(self.peerId), let associatedPeerIds = peer.associatedPeerIds { + for peerId in associatedPeerIds { + peerIds.insert(peerId) + } + } if let cachedData = self.cachedData { peerIds.formUnion(cachedData.peerIds) } diff --git a/Postbox/Postbox.swift b/Postbox/Postbox.swift index 155a1b195a..75ad3ecbc5 100644 --- a/Postbox/Postbox.swift +++ b/Postbox/Postbox.swift @@ -173,6 +173,46 @@ public final class Modifier { public func retrieveItemCacheEntry(id: ItemCacheEntryId) -> Coding? { return self.postbox?.retrieveItemCacheEntry(id: id) } + + public func operationLogGetNextEntryLocalIndex(peerId: PeerId, tag: PeerOperationLogTag) -> Int32 { + if let postbox = self.postbox { + return postbox.operationLogGetNextEntryLocalIndex(peerId: peerId, tag: tag) + } else { + return 0 + } + } + + public func operationLogAddEntry(peerId: PeerId, tag: PeerOperationLogTag, tagLocalIndex: StorePeerOperationLogEntryTagLocalIndex, tagMergedIndex: StorePeerOperationLogEntryTagMergedIndex, contents: Coding) { + self.postbox?.operationLogAddEntry(peerId: peerId, tag: tag, tagLocalIndex: tagLocalIndex, tagMergedIndex: tagMergedIndex, contents: contents) + } + + public func operationLogRemoveEntry(peerId: PeerId, tag: PeerOperationLogTag, tagLocalIndex: Int32) -> Bool { + if let postbox = self.postbox { + return postbox.operationLogRemoveEntry(peerId: peerId, tag: tag, tagLocalIndex: tagLocalIndex) + } else { + return false + } + } + + public func operationLogRemoveAllEntries(peerId: PeerId, tag: PeerOperationLogTag) { + self.postbox?.operationLogRemoveAllEntries(peerId: peerId, tag: tag) + } + + public func operationLogRemoveEntries(peerId: PeerId, tag: PeerOperationLogTag, withTagLocalIndicesEqualToOrLowerThan maxTagLocalIndex: Int32) { + self.postbox?.operationLogRemoveEntries(peerId: peerId, tag: tag, withTagLocalIndicesEqualToOrLowerThan: maxTagLocalIndex) + } + + public func operationLogUpdateEntry(peerId: PeerId, tag: PeerOperationLogTag, tagLocalIndex: Int32, _ f: (PeerOperationLogEntry?) -> PeerOperationLogEntryUpdate) { + self.postbox?.operationLogUpdateEntry(peerId: peerId, tag: tag, tagLocalIndex: tagLocalIndex, f) + } + + public func operationLogEnumerateEntries(peerId: PeerId, tag: PeerOperationLogTag, _ f: (PeerOperationLogEntry) -> Bool) { + self.postbox?.operationLogEnumerateEntries(peerId: peerId, tag: tag, f) + } + + /*public func operationLogTransformOperationContent(peerId: PeerId, tag: PeerOperationLogTag, tagLocalIndex: Int32, _ f: (PeerMergedOperationLogEntry?) -> PeerMergedOperationLogEntryTransform) { + self.postbox?.operationLogTransformOperationContent(peerId: peerId, tag: tag, tagLocalIndex: tagLocalIndex, f) + }*/ } fileprivate class PipeNotifier: NSObject { @@ -225,6 +265,7 @@ public final class Postbox { private var currentUpdatedPeerPresences: [PeerId: PeerPresence] = [:] private var currentUpdatedPeerChatListEmbeddedStates: [PeerId: PeerChatListEmbeddedInterfaceState?] = [:] private var currentUpdatedTotalUnreadCount: Int32? + private var currentPeerMergedOperationLogOperations: [PeerMergedOperationLogOperation] = [] private var currentReplaceChatListHoles: [(MessageIndex, ChatListHole?)] = [] private var currentReplacedContactPeerIds: Set? @@ -270,6 +311,9 @@ public final class Postbox { var peerNameTokenIndexTable: PeerNameTokenIndexTable! var peerNameIndexTable: PeerNameIndexTable! var peerChatTopTaggedMessageIdsTable: PeerChatTopTaggedMessageIdsTable! + var peerOperationLogMetadataTable: PeerOperationLogMetadataTable! + var peerMergedOperationLogIndexTable: PeerMergedOperationLogIndexTable! + var peerOperationLogTable: PeerOperationLogTable! //temporary var peerRatingTable: RatingTable! @@ -386,6 +430,9 @@ public final class Postbox { self.chatListIndexTable = ChatListIndexTable(valueBox: self.valueBox, table: ChatListIndexTable.tableSpec(8), peerNameIndexTable: self.peerNameIndexTable, metadataTable: self.messageHistoryMetadataTable, readStateTable: self.readStateTable, notificationSettingsTable: self.peerNotificationSettingsTable) self.chatListTable = ChatListTable(valueBox: self.valueBox, table: ChatListTable.tableSpec(9), indexTable: self.chatListIndexTable, metadataTable: self.messageHistoryMetadataTable, seedConfiguration: self.seedConfiguration) self.peerChatTopTaggedMessageIdsTable = PeerChatTopTaggedMessageIdsTable(valueBox: self.valueBox, table: PeerChatTopTaggedMessageIdsTable.tableSpec(28)) + self.peerOperationLogMetadataTable = PeerOperationLogMetadataTable(valueBox: self.valueBox, table: PeerOperationLogMetadataTable.tableSpec(29)) + self.peerMergedOperationLogIndexTable = PeerMergedOperationLogIndexTable(valueBox: self.valueBox, table: PeerMergedOperationLogIndexTable.tableSpec(30), metadataTable: self.peerOperationLogMetadataTable!) + self.peerOperationLogTable = PeerOperationLogTable(valueBox: self.valueBox, table: PeerOperationLogTable.tableSpec(31), metadataTable: self.peerOperationLogMetadataTable, mergedIndexTable: self.peerMergedOperationLogIndexTable) self.tables.append(self.keychainTable) self.tables.append(self.peerTable) @@ -414,6 +461,9 @@ public final class Postbox { self.tables.append(self.peerNameIndexTable) self.tables.append(self.peerNameTokenIndexTable) self.tables.append(self.peerChatTopTaggedMessageIdsTable) + self.tables.append(self.peerOperationLogMetadataTable) + self.tables.append(self.peerMergedOperationLogIndexTable) + self.tables.append(self.peerOperationLogTable) self.transactionStateVersion = self.metadataTable.transactionStateVersion() @@ -429,6 +479,10 @@ public final class Postbox { return self.messageHistoryMetadataTable.getChatListTotalUnreadCount() }, getPeerReadState: { peerId in return self.readStateTable.getCombinedState(peerId) + }, operationLogGetOperations: { tag, fromIndex, limit in + return self.peerOperationLogTable.getMergedEntries(tag: tag, fromIndex: fromIndex, limit: limit) + }, operationLogGetTailIndex: { tag in + return self.peerMergedOperationLogIndexTable.tailIndex(tag: tag) }, unsentMessageIds: self.messageHistoryUnsentTable!.get(), synchronizePeerReadStateOperations: self.synchronizeReadStateTable!.get()) print("(Postbox initialization took \((CFAbsoluteTimeGetCurrent() - startTime) * 1000.0) ms") @@ -800,7 +854,7 @@ public final class Postbox { let transactionParticipationInTotalUnreadCountUpdates = self.peerNotificationSettingsTable.transactionParticipationInTotalUnreadCountUpdates() self.chatListIndexTable.commitWithTransactionUnreadCountDeltas(transactionUnreadCountDeltas, transactionParticipationInTotalUnreadCountUpdates: transactionParticipationInTotalUnreadCountUpdates, updatedTotalUnreadCount: &self.currentUpdatedTotalUnreadCount) - let transaction = PostboxTransaction(currentOperationsByPeerId: self.currentOperationsByPeerId, peerIdsWithFilledHoles: self.currentFilledHolesByPeerId, removedHolesByPeerId: self.currentRemovedHolesByPeerId, chatListOperations: chatListOperations, currentUpdatedPeers: self.currentUpdatedPeers, currentUpdatedPeerNotificationSettings: self.currentUpdatedPeerNotificationSettings, currentUpdatedCachedPeerData: self.currentUpdatedCachedPeerData, currentUpdatedPeerPresences: currentUpdatedPeerPresences, currentUpdatedPeerChatListEmbeddedStates: self.currentUpdatedPeerChatListEmbeddedStates, currentUpdatedTotalUnreadCount: self.currentUpdatedTotalUnreadCount, peerIdsWithUpdatedUnreadCounts: Set(transactionUnreadCountDeltas.keys), unsentMessageOperations: self.currentUnsentOperations, updatedSynchronizePeerReadStateOperations: self.currentUpdatedSynchronizeReadStateOperations, updatedMedia: self.currentUpdatedMedia, replaceContactPeerIds: self.currentReplacedContactPeerIds, currentUpdatedMasterClientId: currentUpdatedMasterClientId) + let transaction = PostboxTransaction(currentOperationsByPeerId: self.currentOperationsByPeerId, peerIdsWithFilledHoles: self.currentFilledHolesByPeerId, removedHolesByPeerId: self.currentRemovedHolesByPeerId, chatListOperations: chatListOperations, currentUpdatedPeers: self.currentUpdatedPeers, currentUpdatedPeerNotificationSettings: self.currentUpdatedPeerNotificationSettings, currentUpdatedCachedPeerData: self.currentUpdatedCachedPeerData, currentUpdatedPeerPresences: currentUpdatedPeerPresences, currentUpdatedPeerChatListEmbeddedStates: self.currentUpdatedPeerChatListEmbeddedStates, currentUpdatedTotalUnreadCount: self.currentUpdatedTotalUnreadCount, peerIdsWithUpdatedUnreadCounts: Set(transactionUnreadCountDeltas.keys), currentPeerMergedOperationLogOperations: self.currentPeerMergedOperationLogOperations, unsentMessageOperations: self.currentUnsentOperations, updatedSynchronizePeerReadStateOperations: self.currentUpdatedSynchronizeReadStateOperations, updatedMedia: self.currentUpdatedMedia, replaceContactPeerIds: self.currentReplacedContactPeerIds, currentUpdatedMasterClientId: currentUpdatedMasterClientId) var updatedTransactionState: Int64? var updatedMasterClientId: Int64? if !transaction.isEmpty { @@ -828,6 +882,8 @@ public final class Postbox { self.currentUpdatedCachedPeerData.removeAll() self.currentUpdatedPeerPresences.removeAll() self.currentUpdatedPeerChatListEmbeddedStates.removeAll() + self.currentUpdatedTotalUnreadCount = nil + self.currentPeerMergedOperationLogOperations.removeAll() for table in self.tables { table.beforeCommit() @@ -1398,6 +1454,55 @@ public final class Postbox { } } + public func mergedOperationLogView(tag: PeerOperationLogTag, limit: Int) -> Signal { + return self.modify { modifier -> Signal in + let view = MutablePeerMergedOperationLogView(tag: tag, limit: limit, getOperations: { tag, fromIndex, limit in + return self.peerOperationLogTable.getMergedEntries(tag: tag, fromIndex: fromIndex, limit: limit) + }, getTailIndex: { tag in + return self.peerMergedOperationLogIndexTable.tailIndex(tag: tag) + }) + let (index, signal) = self.viewTracker.addPeerMergedOperationLogView(view) + + return (.single(PeerMergedOperationLogView(view)) + |> then(signal)) + |> afterDisposed { [weak self] in + if let strongSelf = self { + strongSelf.queue.async { + strongSelf.viewTracker.removePeerMergedOperationLogView(index) + } + } + } + } |> switchToLatest + } + + fileprivate func operationLogGetNextEntryLocalIndex(peerId: PeerId, tag: PeerOperationLogTag) -> Int32 { + return self.peerOperationLogTable.getNextEntryLocalIndex(peerId: peerId, tag: tag) + } + + fileprivate func operationLogAddEntry(peerId: PeerId, tag: PeerOperationLogTag, tagLocalIndex: StorePeerOperationLogEntryTagLocalIndex, tagMergedIndex: StorePeerOperationLogEntryTagMergedIndex, contents: Coding) { + self.peerOperationLogTable.addEntry(peerId: peerId, tag: tag, tagLocalIndex: tagLocalIndex, tagMergedIndex: tagMergedIndex, contents: contents, operations: &self.currentPeerMergedOperationLogOperations) + } + + fileprivate func operationLogRemoveEntry(peerId: PeerId, tag: PeerOperationLogTag, tagLocalIndex: Int32) -> Bool { + return self.peerOperationLogTable.removeEntry(peerId: peerId, tag: tag, tagLocalIndex: tagLocalIndex, operations: &self.currentPeerMergedOperationLogOperations) + } + + fileprivate func operationLogRemoveAllEntries(peerId: PeerId, tag: PeerOperationLogTag) { + self.peerOperationLogTable.removeAllEntries(peerId: peerId, tag: tag, operations: &self.currentPeerMergedOperationLogOperations) + } + + fileprivate func operationLogRemoveEntries(peerId: PeerId, tag: PeerOperationLogTag, withTagLocalIndicesEqualToOrLowerThan maxTagLocalIndex: Int32) { + self.peerOperationLogTable.removeEntries(peerId: peerId, tag: tag, withTagLocalIndicesEqualToOrLowerThan: maxTagLocalIndex, operations: &self.currentPeerMergedOperationLogOperations) + } + + fileprivate func operationLogUpdateEntry(peerId: PeerId, tag: PeerOperationLogTag, tagLocalIndex: Int32, _ f: (PeerOperationLogEntry?) -> PeerOperationLogEntryUpdate) { + self.peerOperationLogTable.updateEntry(peerId: peerId, tag: tag, tagLocalIndex: tagLocalIndex, f: f, operations: &self.currentPeerMergedOperationLogOperations) + } + + fileprivate func operationLogEnumerateEntries(peerId: PeerId, tag: PeerOperationLogTag, _ f: (PeerOperationLogEntry) -> Bool) { + self.peerOperationLogTable.enumerateEntries(peerId: peerId, tag: tag, f) + } + public func isMasterClient() -> Signal { return self.modify { modifier -> Signal in let sessionClientId = self.sessionClientId diff --git a/Postbox/PostboxTransaction.swift b/Postbox/PostboxTransaction.swift index 673b7f5636..94144804db 100644 --- a/Postbox/PostboxTransaction.swift +++ b/Postbox/PostboxTransaction.swift @@ -12,6 +12,7 @@ final class PostboxTransaction { let currentUpdatedPeerChatListEmbeddedStates: [PeerId: PeerChatListEmbeddedInterfaceState?] let currentUpdatedTotalUnreadCount: Int32? let peerIdsWithUpdatedUnreadCounts: Set + let currentPeerMergedOperationLogOperations: [PeerMergedOperationLogOperation] let unsentMessageOperations: [IntermediateMessageHistoryUnsentOperation] let updatedSynchronizePeerReadStateOperations: [PeerId: PeerReadStateSynchronizationOperation?] @@ -68,10 +69,13 @@ final class PostboxTransaction { if !peerIdsWithUpdatedUnreadCounts.isEmpty { return false } + if !currentPeerMergedOperationLogOperations.isEmpty { + return false + } return true } - init(currentOperationsByPeerId: [PeerId: [MessageHistoryOperation]], peerIdsWithFilledHoles: [PeerId: [MessageIndex: HoleFillDirection]], removedHolesByPeerId: [PeerId: [MessageIndex: HoleFillDirection]], chatListOperations: [ChatListOperation], currentUpdatedPeers: [PeerId: Peer], currentUpdatedPeerNotificationSettings: [PeerId: PeerNotificationSettings], currentUpdatedCachedPeerData: [PeerId: CachedPeerData], currentUpdatedPeerPresences: [PeerId: PeerPresence], currentUpdatedPeerChatListEmbeddedStates: [PeerId: PeerChatListEmbeddedInterfaceState?], currentUpdatedTotalUnreadCount: Int32?, peerIdsWithUpdatedUnreadCounts: Set, unsentMessageOperations: [IntermediateMessageHistoryUnsentOperation], updatedSynchronizePeerReadStateOperations: [PeerId: PeerReadStateSynchronizationOperation?], updatedMedia: [MediaId: Media?], replaceContactPeerIds: Set?, currentUpdatedMasterClientId: Int64?) { + init(currentOperationsByPeerId: [PeerId: [MessageHistoryOperation]], peerIdsWithFilledHoles: [PeerId: [MessageIndex: HoleFillDirection]], removedHolesByPeerId: [PeerId: [MessageIndex: HoleFillDirection]], chatListOperations: [ChatListOperation], currentUpdatedPeers: [PeerId: Peer], currentUpdatedPeerNotificationSettings: [PeerId: PeerNotificationSettings], currentUpdatedCachedPeerData: [PeerId: CachedPeerData], currentUpdatedPeerPresences: [PeerId: PeerPresence], currentUpdatedPeerChatListEmbeddedStates: [PeerId: PeerChatListEmbeddedInterfaceState?], currentUpdatedTotalUnreadCount: Int32?, peerIdsWithUpdatedUnreadCounts: Set, currentPeerMergedOperationLogOperations: [PeerMergedOperationLogOperation], unsentMessageOperations: [IntermediateMessageHistoryUnsentOperation], updatedSynchronizePeerReadStateOperations: [PeerId: PeerReadStateSynchronizationOperation?], updatedMedia: [MediaId: Media?], replaceContactPeerIds: Set?, currentUpdatedMasterClientId: Int64?) { self.currentOperationsByPeerId = currentOperationsByPeerId self.peerIdsWithFilledHoles = peerIdsWithFilledHoles self.removedHolesByPeerId = removedHolesByPeerId @@ -83,6 +87,7 @@ final class PostboxTransaction { self.currentUpdatedPeerChatListEmbeddedStates = currentUpdatedPeerChatListEmbeddedStates self.currentUpdatedTotalUnreadCount = currentUpdatedTotalUnreadCount self.peerIdsWithUpdatedUnreadCounts = peerIdsWithUpdatedUnreadCounts + self.currentPeerMergedOperationLogOperations = currentPeerMergedOperationLogOperations self.unsentMessageOperations = unsentMessageOperations self.updatedSynchronizePeerReadStateOperations = updatedSynchronizePeerReadStateOperations self.updatedMedia = updatedMedia diff --git a/Postbox/SeedConfiguration.swift b/Postbox/SeedConfiguration.swift index cd0f8320be..60845ed053 100644 --- a/Postbox/SeedConfiguration.swift +++ b/Postbox/SeedConfiguration.swift @@ -2,10 +2,10 @@ import Foundation public final class SeedConfiguration { let initializeChatListWithHoles: [ChatListHole] - let initializeMessageNamespacesWithHoles: [MessageId.Namespace] + let initializeMessageNamespacesWithHoles: [(PeerId.Namespace, MessageId.Namespace)] let existingMessageTags: MessageTags - public init(initializeChatListWithHoles: [ChatListHole], initializeMessageNamespacesWithHoles: [MessageId.Namespace], existingMessageTags: MessageTags) { + public init(initializeChatListWithHoles: [ChatListHole], initializeMessageNamespacesWithHoles: [(PeerId.Namespace, MessageId.Namespace)], existingMessageTags: MessageTags) { self.initializeChatListWithHoles = initializeChatListWithHoles self.initializeMessageNamespacesWithHoles = initializeMessageNamespacesWithHoles self.existingMessageTags = existingMessageTags diff --git a/Postbox/ValueBoxKey.swift b/Postbox/ValueBoxKey.swift index e88a1b97a2..2099139e9b 100644 --- a/Postbox/ValueBoxKey.swift +++ b/Postbox/ValueBoxKey.swift @@ -58,6 +58,11 @@ public struct ValueBoxKey: Hashable, CustomStringConvertible, Comparable { memcpy(self.memory + offset, &varValue, 1) } + public func setUInt8(_ offset: Int, value: UInt8) { + var varValue = value + memcpy(self.memory + offset, &varValue, 1) + } + public func getInt32(_ offset: Int) -> Int32 { var value: Int32 = 0 memcpy(&value, self.memory + offset, 4) @@ -82,6 +87,12 @@ public struct ValueBoxKey: Hashable, CustomStringConvertible, Comparable { return value } + public func getUInt8(_ offset: Int) -> UInt8 { + var value: UInt8 = 0 + memcpy(&value, self.memory + offset, 1) + return value + } + public func prefix(_ length: Int) -> ValueBoxKey { assert(length <= self.length, "length <= self.length") let key = ValueBoxKey(length: length) diff --git a/Postbox/ViewTracker.swift b/Postbox/ViewTracker.swift index f424a9572b..9489400b3c 100644 --- a/Postbox/ViewTracker.swift +++ b/Postbox/ViewTracker.swift @@ -26,6 +26,8 @@ final class ViewTracker { private let getPeerPresence: (PeerId) -> PeerPresence? private let getTotalUnreadCount: () -> Int32 private let getPeerReadState: (PeerId) -> CombinedPeerReadState? + private let operationLogGetOperations: (PeerOperationLogTag, Int32, Int) -> [PeerMergedOperationLogEntry] + private let operationLogGetTailIndex: (PeerOperationLogTag) -> Int32? private var chatListViews = Bag<(MutableChatListView, ValuePipe<(ChatListView, ViewUpdateType)>)>() private var messageHistoryViews: [PeerId: Bag<(MutableMessageHistoryView, ValuePipe<(MessageHistoryView, ViewUpdateType)>)>] = [:] @@ -47,8 +49,9 @@ final class ViewTracker { private var peerViews = Bag<(MutablePeerView, ValuePipe)>() private var unreadMessageCountsViews = Bag<(MutableUnreadMessageCountsView, ValuePipe)>() + private var peerMergedOperationLogViews = Bag<(MutablePeerMergedOperationLogView, ValuePipe)>() - init(queue: Queue, fetchEarlierHistoryEntries: @escaping (PeerId, MessageIndex?, Int, MessageTags?) -> [MutableMessageHistoryEntry], fetchLaterHistoryEntries: @escaping (PeerId, MessageIndex?, Int, MessageTags?) -> [MutableMessageHistoryEntry], fetchEarlierChatEntries: @escaping (MessageIndex?, Int) -> [MutableChatListEntry], fetchLaterChatEntries: @escaping (MessageIndex?, Int) -> [MutableChatListEntry], fetchAnchorIndex: @escaping (MessageId) -> MessageHistoryAnchorIndex?, renderMessage: @escaping (IntermediateMessage) -> Message, getPeer: @escaping (PeerId) -> Peer?, getPeerNotificationSettings: @escaping (PeerId) -> PeerNotificationSettings?, getCachedPeerData: @escaping (PeerId) -> CachedPeerData?, getPeerPresence: @escaping (PeerId) -> PeerPresence?, getTotalUnreadCount: @escaping () -> Int32, getPeerReadState: @escaping (PeerId) -> CombinedPeerReadState?, unsentMessageIds: [MessageId], synchronizePeerReadStateOperations: [PeerId: PeerReadStateSynchronizationOperation]) { + init(queue: Queue, fetchEarlierHistoryEntries: @escaping (PeerId, MessageIndex?, Int, MessageTags?) -> [MutableMessageHistoryEntry], fetchLaterHistoryEntries: @escaping (PeerId, MessageIndex?, Int, MessageTags?) -> [MutableMessageHistoryEntry], fetchEarlierChatEntries: @escaping (MessageIndex?, Int) -> [MutableChatListEntry], fetchLaterChatEntries: @escaping (MessageIndex?, Int) -> [MutableChatListEntry], fetchAnchorIndex: @escaping (MessageId) -> MessageHistoryAnchorIndex?, renderMessage: @escaping (IntermediateMessage) -> Message, getPeer: @escaping (PeerId) -> Peer?, getPeerNotificationSettings: @escaping (PeerId) -> PeerNotificationSettings?, getCachedPeerData: @escaping (PeerId) -> CachedPeerData?, getPeerPresence: @escaping (PeerId) -> PeerPresence?, getTotalUnreadCount: @escaping () -> Int32, getPeerReadState: @escaping (PeerId) -> CombinedPeerReadState?, operationLogGetOperations: @escaping (PeerOperationLogTag, Int32, Int) -> [PeerMergedOperationLogEntry], operationLogGetTailIndex: @escaping (PeerOperationLogTag) -> Int32?, unsentMessageIds: [MessageId], synchronizePeerReadStateOperations: [PeerId: PeerReadStateSynchronizationOperation]) { self.queue = queue self.fetchEarlierHistoryEntries = fetchEarlierHistoryEntries self.fetchLaterHistoryEntries = fetchLaterHistoryEntries @@ -62,6 +65,8 @@ final class ViewTracker { self.getPeerPresence = getPeerPresence self.getTotalUnreadCount = getTotalUnreadCount self.getPeerReadState = getPeerReadState + self.operationLogGetOperations = operationLogGetOperations + self.operationLogGetTailIndex = operationLogGetTailIndex self.unsentMessageView = UnsentMessageHistoryView(ids: unsentMessageIds) self.synchronizeReadStatesView = MutableSynchronizePeerReadStatesView(operations: synchronizePeerReadStateOperations) @@ -182,6 +187,17 @@ final class ViewTracker { self.peerViews.remove(index) } + func addPeerMergedOperationLogView(_ view: MutablePeerMergedOperationLogView) -> (Bag<(MutablePeerMergedOperationLogView, ValuePipe)>.Index, Signal) { + let record = (view, ValuePipe()) + let index = self.peerMergedOperationLogViews.add(record) + + return (index, record.1.signal()) + } + + func removePeerMergedOperationLogView(_ index: Bag<(MutablePeerMergedOperationLogView, ValuePipe)>.Index) { + self.peerMergedOperationLogViews.remove(index) + } + func refreshViewsDueToExternalTransaction(fetchAroundChatEntries: (_ index: MessageIndex, _ count: Int) -> (entries: [MutableChatListEntry], earlier: MutableChatListEntry?, later: MutableChatListEntry?), fetchAroundHistoryEntries: (_ index: MessageIndex, _ count: Int, _ tagMask: MessageTags?) -> (entries: [MutableMessageHistoryEntry], lower: MutableMessageHistoryEntry?, upper: MutableMessageHistoryEntry?), fetchUnsentMessageIds: () -> [MessageId], fetchSynchronizePeerReadStateOperations: () -> [PeerId: PeerReadStateSynchronizationOperation]) { var updateTrackedHolesPeerIds: [PeerId] = [] @@ -358,6 +374,12 @@ final class ViewTracker { pipe.putNext(PeerView(mutableView)) } } + + for (mutableView, pipe) in self.peerMergedOperationLogViews.copyItems() { + if mutableView.replay(operations: transaction.currentPeerMergedOperationLogOperations, getOperations: self.operationLogGetOperations, getTailIndex: self.operationLogGetTailIndex) { + pipe.putNext(PeerMergedOperationLogView(mutableView)) + } + } } private func updateTrackedChatListHoles() {