diff --git a/Postbox/MessageView.swift b/Postbox/MessageView.swift index 20c9fa6fee..786aa15576 100644 --- a/Postbox/MessageView.swift +++ b/Postbox/MessageView.swift @@ -11,6 +11,10 @@ public final class MutableMessageView: Printable { self.invalidLater = [] self.removedMessages = false } + + func empty() -> Bool { + return !self.removedMessages && self.invalidEarlier.count == 0 && self.invalidLater.count == 0 + } } let namespaces: [MessageId.Namespace] diff --git a/Postbox/PeerView.swift b/Postbox/PeerView.swift index 328c57497e..cf396590ce 100644 --- a/Postbox/PeerView.swift +++ b/Postbox/PeerView.swift @@ -1,13 +1,21 @@ import Foundation public class PeerViewEntry { - public let peer: Peer + public let peerId: PeerId + public let peer: Peer? public let message: Message public init(peer: Peer, message: Message) { + self.peerId = peer.id self.peer = peer self.message = message } + + public init(peerId: PeerId, message: Message) { + self.peerId = peerId + self.peer = nil + self.message = message + } } public struct PeerViewEntryIndex: Equatable, Comparable { @@ -15,7 +23,7 @@ public struct PeerViewEntryIndex: Equatable, Comparable { public let messageIndex: MessageIndex public init(_ entry: PeerViewEntry) { - self.peerId = entry.peer.id + self.peerId = entry.peerId self.messageIndex = MessageIndex(entry.message) } @@ -52,14 +60,12 @@ public final class MutablePeerView: Printable { var removedEntries = false } - let tags: [Int32] let count: Int var earlier: PeerViewEntry? var later: PeerViewEntry? var entries: [PeerViewEntry] - public init(tags: [Int32], count: Int, earlier: PeerViewEntry?, entries: [PeerViewEntry], later: PeerViewEntry?) { - self.tags = tags + public init(count: Int, earlier: PeerViewEntry?, entries: [PeerViewEntry], later: PeerViewEntry?) { self.count = count self.earlier = earlier self.entries = entries @@ -70,20 +76,20 @@ public final class MutablePeerView: Printable { var invalidationContext = context ?? RemoveContext() if let earlier = self.earlier { - if peerId == earlier.peer.id { + if peerId == earlier.peerId { invalidationContext.invalidEarlier = true } } if let later = self.later { - if peerId == later.peer.id { + if peerId == later.peerId { invalidationContext.invalidLater = true } } var i = 0 while i < self.entries.count { - if self.entries[i].peer.id == peerId { + if self.entries[i].peerId == peerId { self.entries.removeAtIndex(i) invalidationContext.removedEntries = true break @@ -251,7 +257,7 @@ public final class MutablePeerView: Printable { if let earlier = self.earlier { string += "more(" - string += "(p \(earlier.peer.id.namespace):\(earlier.peer.id.id), m \(earlier.message.id.namespace):\(earlier.message.id.id)—\(earlier.message.timestamp)" + string += "(p \(earlier.peerId.namespace):\(earlier.peerId.id), m \(earlier.message.id.namespace):\(earlier.message.id.id)—\(earlier.message.timestamp)" string += ") " } @@ -263,13 +269,65 @@ public final class MutablePeerView: Printable { } else { string += ", " } - string += "(p \(entry.peer.id.namespace):\(entry.peer.id.id), m \(entry.message.id.namespace):\(entry.message.id.id)—\(entry.message.timestamp))" + string += "(p \(entry.peerId.namespace):\(entry.peerId.id), m \(entry.message.id.namespace):\(entry.message.id.id)—\(entry.message.timestamp))" } string += "]" if let later = self.later { string += " more(" - string += "(p \(later.peer.id.namespace):\(later.peer.id.id), m \(later.message.id.namespace):\(later.message.id.id)—\(later.message.timestamp)" + string += "(p \(later.peerId.namespace):\(later.peerId), m \(later.message.id.namespace):\(later.message.id.id)—\(later.message.timestamp)" + string += ")" + } + + return string + } +} + +public final class PeerView: Printable { + let earlier: PeerViewEntryIndex? + let later: PeerViewEntryIndex? + let entries: [PeerViewEntry] + + init(_ mutableView: MutablePeerView) { + if let earlier = mutableView.earlier { + self.earlier = PeerViewEntryIndex(earlier) + } else { + self.earlier = nil + } + + if let later = mutableView.later { + self.later = PeerViewEntryIndex(later) + } else { + self.later = nil + } + + self.entries = mutableView.entries + } + + public var description: String { + var string = "" + + if let earlier = self.earlier { + string += "more(" + string += "(p \(earlier.peerId.namespace):\(earlier.peerId.id), m \(earlier.messageIndex.id.namespace):\(earlier.messageIndex.id.id)—\(earlier.messageIndex.timestamp)" + string += ") " + } + + string += "[" + var first = true + for entry in self.entries { + if first { + first = false + } else { + string += ", " + } + string += "(p \(entry.peerId.namespace):\(entry.peerId.id), m \(entry.message.id.namespace):\(entry.message.id.id)—\(entry.message.timestamp))" + } + string += "]" + + if let later = self.later { + string += " more(" + string += "(p \(later.peerId.namespace):\(later.peerId), m \(later.messageIndex.id.namespace):\(later.messageIndex.id.id)—\(later.messageIndex.timestamp)" string += ")" } diff --git a/Postbox/Postbox.swift b/Postbox/Postbox.swift index 6018c1d352..b5a6f87549 100644 --- a/Postbox/Postbox.swift +++ b/Postbox/Postbox.swift @@ -28,6 +28,7 @@ public final class Postbox { private var database: Database! private var peerMessageViews: [PeerId : Bag<(MutableMessageView, Pipe)>] = [:] + private var peerViews: Bag<(MutablePeerView, Pipe)> = Bag() public init(basePath: String, messageNamespaces: [MessageId.Namespace]) { self.basePath = basePath @@ -52,7 +53,7 @@ public final class Postbox { private func createSchema() { //state - self.database.execute("CREATE TABLE state (key INTEGER PRIMARY KEY, data BLOB)") + self.database.execute("CREATE TABLE state (data BLOB)") //peer_messages self.database.execute("CREATE TABLE peer_messages (peerId INTEGER, namespace INTEGER, id INTEGER, data BLOB, associatedMediaIds BLOB, timestamp INTEGER, PRIMARY KEY(peerId, namespace, id))") @@ -68,49 +69,13 @@ public final class Postbox { self.database.execute("CREATE TABLE media_cleanup (namespace INTEGER, id INTEGER, data BLOB, PRIMARY KEY(namespace, id))") //peer_entries - self.database.execute("CREATE TABLE peer_entries (entry BLOB)") + self.database.execute("CREATE TABLE peer_entries (peerId INTEGER PRIMARY KEY, entry BLOB)") self.database.execute("CREATE INDEX peer_entries_entry on peer_entries (entry)") //peers self.database.execute("CREATE TABLE peers (peerId INTEGER PRIMARY KEY, data BLOB)") } - public func _testBlobsPrepare(range: Int) { - self.database.execute("CREATE TABLE test_blobs (number INTEGER, data BLOB)") - self.database.execute("CREATE INDEX test_blobs_index_data ON test_blobs (data)") - - self.database.transaction() - let insert = self.database.prepare("INSERT INTO test_blobs (number, data) VALUES (?, ?)") - - for i in 0 ..< range { - var value = Int32(bigEndian: Int32(i)) - let blob = Blob(bytes: &value, length: 4) - insert.run(Int64(i), blob) - } - self.database.commit() - } - - public func _testBlobsTest(range: Int) { - let select = self.database.prepare("SELECT number FROM test_blobs WHERE data < ? ORDER BY data DESC LIMIT 1") - - for i in 1 ..< 1000 { - var value = 1 + Int32(arc4random_uniform(UInt32(range - 1))) - var binary = Int32(bigEndian: value) - var found = false - for row in select.run(Blob(bytes: &binary, length: 4)) { - found = true - let selected = Int32(row[0] as! Int64) - if selected != value - 1 { - assertionFailure("invalid value \(selected) != \(value - 1)") - } - break - } - if !found { - assertionFailure("value not found for \(value - 1)") - } - } - } - private class func peerViewEntryIndexForBlob(blob: Blob) -> PeerViewEntryIndex { let buffer = ReadBuffer(memory: UnsafeMutablePointer(blob.data.bytes), length: blob.data.length, freeWhenDone: false) var offset: Int = 0 @@ -357,7 +322,7 @@ public final class Postbox { var messageIdsByMediaId: [MediaId : [MessageId]] = [:] for (peerId, peerMessages) in Postbox.messagesGroupedByPeerId(messages) { - var maxTimestamp: Int32 = Int32.min + var maxMessage: (MessageIndex, Message)? var messageIds: [MessageId] = [] var seenMessageIds = Set() @@ -402,8 +367,9 @@ public final class Postbox { record.0.add(message) } - if maxTimestamp == Int32.min || message.timestamp > maxTimestamp { - maxTimestamp = message.timestamp + let index = MessageIndex(message) + if maxMessage == nil || index > maxMessage!.0 { + maxMessage = (index, message) } encoder.reset() @@ -430,17 +396,8 @@ public final class Postbox { record.1.putNext(MessageView(record.0)) } - var currentMaxTimestamp: Int32? - for row in self.database.prepare("SELECT timestamp FROM top_messages WHERE peerId = ?").run(peerId.toInt64()) { - currentMaxTimestamp = Int32(row[0] as! Int64) - } - - if let currentMaxTimestamp = currentMaxTimestamp { - if maxTimestamp > currentMaxTimestamp { - self.database.prepare("UPDATE top_messages SET timestamp = ? WHERE peerId = ?").run(Int64(maxTimestamp), peerId.toInt64()) - } - } else { - self.database.prepare("INSERT INTO top_messages(peerId, timestamp) VALUES(?, ?)").run(peerId.toInt64(), Int64(maxTimestamp)) + if let maxMessage = maxMessage { + self.updatePeerEntry(peerId, message: maxMessage.1) } } @@ -546,25 +503,79 @@ public final class Postbox { return grouped } - private func updateTopMessageTimestampForPeerId(peerId: PeerId) { - let selectIdStatement = self.database.prepare("SELECT MAX(id) FROM peer_messages WHERE peerId = ? AND namespace = ?") - let selectTimestampStatement = self.database.prepare("SELECT timestamp FROM peer_messages WHERE peerId = ? AND namespace = ? AND id = ?") - let updateTimestampStatement = self.database.prepare("UPDATE top_messages SET timestamp = ? WHERE peerId = ?") - - var maxTimestamp = Int32.min - for namespace in self.messageNamespaces { - for rowId in selectIdStatement.run(peerId.toInt64(), Int64(namespace)) { - for rowTimestamp in selectTimestampStatement.run(peerId.toInt64(), Int64(namespace), rowId[0]) { - let timestamp = Int32(rowTimestamp[0] as! Int64) - if timestamp > maxTimestamp { - maxTimestamp = timestamp - } - } + private func peerWithId(peerId: PeerId) -> Peer? { + for row in self.database.prepare("SELECT data FROM peers WHERE peerId = ?").run(peerId.toInt64()) { + let data = (row[0] as! Blob).data + let decoder = Decoder(buffer: ReadBuffer(memory: UnsafeMutablePointer(data.bytes), length: data.length, freeWhenDone: false)) + if let peer = decoder.decodeRootObject() as? Peer { + return peer + } else { + println("(PostBox: can't decode peer)") } + + break } - if maxTimestamp != Int32.min { - updateTimestampStatement.run(Int64(maxTimestamp), peerId.toInt64()) + return nil + } + + private func updatePeerEntry(peerId: PeerId, message: Message?, replace: Bool = false) { + var currentIndex: PeerViewEntryIndex? + for row in self.database.prepare("SELECT entry FROM peer_entries WHERE peerId = ?").run(peerId.toInt64()) { + currentIndex = Postbox.peerViewEntryIndexForBlob(row[0] as! Blob) + break + } + + var updatedPeerMessage: Message? + + if let currentIndex = currentIndex { + if let message = message { + let messageIndex = MessageIndex(message) + if replace || currentIndex.messageIndex < messageIndex { + let updatedIndex = PeerViewEntryIndex(peerId: peerId, messageIndex: messageIndex) + updatedPeerMessage = message + let updatedBlob = Postbox.blobForPeerViewEntryIndex(updatedIndex) + self.database.prepare("UPDATE peer_entries SET entry = ? WHERE peerId = ?").run(updatedBlob, peerId.toInt64()) + } + } else if replace { + //TODO: remove? + } + } else if let message = message { + updatedPeerMessage = message + let updatedIndex = PeerViewEntryIndex(peerId: peerId, messageIndex: MessageIndex(message)) + let updatedBlob = Postbox.blobForPeerViewEntryIndex(updatedIndex) + self.database.prepare("INSERT INTO peer_entries (peerId, entry) VALUES (?, ?)").run(peerId.toInt64(), updatedBlob) + } + + if let updatedPeerMessage = updatedPeerMessage { + var peer: Peer? + for (view, sink) in self.peerViews.copyItems() { + + if peer == nil { + for entry in view.entries { + if entry.peerId == peerId { + peer = entry.peer + break + } + } + + if peer == nil { + peer = self.peerWithId(peerId) + } + } + + let entry: PeerViewEntry + if let peer = peer { + entry = PeerViewEntry(peer: peer, message: updatedPeerMessage) + } else { + entry = PeerViewEntry(peerId: peerId, message: updatedPeerMessage) + } + + let context = view.removeEntry(nil, peerId: peerId) + view.addEntry(entry) + + sink.putNext(PeerView(view)) + } } } @@ -573,6 +584,19 @@ public final class Postbox { let messageIdsByNamespace = Postbox.messageIdsGroupedByNamespace(messageIds) let messageIdsByMediaId = self.loadMessageIdsByMediaIdForPeerId(peerId, idsByNamespace: messageIdsByNamespace) + for (peerId, messageIds) in Postbox.messageIdsGroupedByPeerId(ids) { + if let relatedViews = self.peerMessageViews[peerId] { + for (view, sink) in relatedViews.copyItems() { + let context = view.remove(Set(messageIds)) + if !context.empty() { + view.complete(context, fetchEarlier: self.fetchMessagesRelative(peerId, earlier: true), fetchLater: self.fetchMessagesRelative(peerId, earlier: false)) + sink.putNext(MessageView(view)) + } + } + } + } + + for (namespace, messageIds) in messageIdsByNamespace { var queryString = "DELETE FROM peer_messages WHERE peerId = ? AND namespace = ? AND id IN (" var first = true @@ -641,7 +665,9 @@ public final class Postbox { } } - self.updateTopMessageTimestampForPeerId(peerId) + let tail = self.fetchMessagesTail(peerId, count: 1) + + self.updatePeerEntry(peerId, message: tail.first, replace: true) } } @@ -653,12 +679,6 @@ public final class Postbox { } } - private struct FetchedMessage { - let id: MessageId - let timestamp: Int32 - let data: NSData - } - private func findAdjacentMessageIds(peerId: PeerId, namespace: MessageId.Namespace, index: MessageIndex) -> (MessageId.Id?, MessageId.Id?) { var minId: MessageId.Id? var maxId: MessageId.Id? @@ -915,6 +935,71 @@ public final class Postbox { return messages } + private func fetchPeerEntryIndicesRelative(earlier: Bool)(index: PeerViewEntryIndex?, count: Int) -> [PeerViewEntryIndex] { + var entries: [PeerViewEntryIndex] = [] + + let rows: Statement + + if let index = index { + let bound = Postbox.blobForPeerViewEntryIndex(index) + let sign = earlier ? "<" : ">" + let order = earlier ? "DESC" : "ASC" + let statement = self.database.prepareCached("SELECT entry FROM peer_entries WHERE entry \(sign) ? ORDER BY entry \(order) LIMIT ?") + rows = statement.run(bound, Int64(count)) + } else { + let order = earlier ? "DESC" : "ASC" + let statement = self.database.prepareCached("SELECT entry FROM peer_entries ORDER BY entry \(order) LIMIT ?") + rows = statement.run(Int64(count)) + } + + return entries + } + + private func messageForPeer(peerId: PeerId, id: MessageId) -> Message? { + for row in self.database.prepareCached("SELECT data, associatedMediaIds FROM peer_messages WHERE peerId = ? AND namespace = ? AND id = ?").run(peerId.toInt64(), Int64(id.namespace), Int64(id.id)) { + let data = (row[0] as! Blob).data + let decoder = Decoder(buffer: ReadBuffer(memory: UnsafeMutablePointer(data.bytes), length: data.length, freeWhenDone: false)) + if let message = decoder.decodeRootObject() as? Message { + return message + } else { + println("(PostBox: can't decode message)") + } + break + } + + return nil + } + + private func fetchPeerEntriesRelative(earlier: Bool)(index: PeerViewEntryIndex?, count: Int) -> [PeerViewEntry] { + var entries: [PeerViewEntry] = [] + var peers: [PeerId : Peer] = [:] + for entryIndex in self.fetchPeerEntryIndicesRelative(earlier)(index: index, count: count) { + var peer: Peer? + + if let cachedPeer = peers[entryIndex.peerId] { + peer = cachedPeer + } else { + if let fetchedPeer = self.peerWithId(entryIndex.peerId) { + peer = fetchedPeer + peers[fetchedPeer.id] = fetchedPeer + } + } + + if let message = self.messageForPeer(entryIndex.peerId, id: entryIndex.messageIndex.id) { + let entry: PeerViewEntry + if let peer = peer { + entry = PeerViewEntry(peer: peer, message: message) + } else { + entry = PeerViewEntry(peerId: entryIndex.peerId, message: message) + } + } else { + println("(PostBox: missing message for peer entry)") + } + } + + return entries + } + private func fetchMessagesTail(peerId: PeerId, count: Int) -> [Message] { var messages: [Message] = [] @@ -1061,6 +1146,57 @@ public final class Postbox { } } + public func tailPeerView(count: Int) -> Signal { + return Signal { subscriber in + let disposable = MetaDisposable() + + self.queue.dispatch { + let tail = self.fetchPeerEntriesRelative(true)(index: nil, count: count + 1) + + var entries: [PeerViewEntry] = [] + var i = tail.count - 1 + while i >= 0 && i >= tail.count - count { + entries.insert(tail[i], atIndex: 0) + i-- + } + + var earlier: PeerViewEntry? + + for namespace in self.messageNamespaces { + var i = tail.count - count - 1 + while i >= 0 { + earlier = tail[i] + break + } + } + + let mutableView = MutablePeerView(count: count, earlier: earlier, entries: entries, later: nil) + let record = (mutableView, Pipe()) + + let index = self.peerViews.add(record) + + subscriber.putNext(PeerView(mutableView)) + + let pipeDisposable = record.1.signal().start(next: { next in + subscriber.putNext(next) + }) + + disposable.set(ActionDisposable { [weak self] in + pipeDisposable.dispose() + + if let strongSelf = self { + strongSelf.queue.dispatch { + strongSelf.peerViews.remove(index) + } + } + return + }) + } + + return disposable + } + } + func printMessages(messages: [Message]) { var string = "" string += "[" @@ -1078,13 +1214,6 @@ public final class Postbox { } public func _dumpTables() { - println("\n-----------") - println("top_messages") - println("------------") - for row in self.database.prepare("SELECT peerId, timestamp FROM top_messages ORDER BY timestamp DESC").run() { - println("\(PeerId(row[0] as! Int64)): \(row[1] as! Int64)") - } - println("\n------------") println("peer_messages") println("-------------") diff --git a/PostboxTests/PostboxTests.swift b/PostboxTests/PostboxTests.swift index 957984c544..6b1fca9f4c 100644 --- a/PostboxTests/PostboxTests.swift +++ b/PostboxTests/PostboxTests.swift @@ -678,7 +678,7 @@ class PostboxTests: XCTestCase { } func testPeerView() { - let view = MutablePeerView(tags: [], count: 3, earlier: nil, entries: [], later: nil) + let view = MutablePeerView(count: 3, earlier: nil, entries: [], later: nil) let messageNamespaceCloud = TestMessageNamespace.Cloud.rawValue let otherId = PeerId(namespace: TestPeerNamespace.User.rawValue, id: 2000) @@ -694,7 +694,7 @@ class PostboxTests: XCTestCase { } else { string += ", " } - string += "(p \(entry.peer.id.namespace):\(entry.peer.id.id), m \(entry.message.id.namespace):\(entry.message.id.id)—\(entry.message.timestamp))" + string += "(p \(entry.peerId.namespace):\(entry.peerId.id), m \(entry.message.id.namespace):\(entry.message.id.id)—\(entry.message.timestamp))" } string += "]" println("\(string)") @@ -711,7 +711,7 @@ class PostboxTests: XCTestCase { } func remove(peerId: PeerId, context: MutablePeerView.RemoveContext? = nil) -> MutablePeerView.RemoveContext { - entries = entries.filter({ $0.peer.id != peerId }) + entries = entries.filter({ $0.peerId != peerId }) return view.removeEntry(context, peerId: peerId) } @@ -773,4 +773,46 @@ class PostboxTests: XCTestCase { println("\(view)") } } + + func testPeerViewTail() { + declareEncodable(TestMessage.self, { TestMessage(decoder: $0) }) + declareEncodable(TestMedia.self, { TestMedia(decoder: $0) }) + + let otherId = PeerId(namespace: TestPeerNamespace.User.rawValue, id: 2000) + let messageNamespace = TestMessageNamespace.Cloud.rawValue + + let basePath = "/tmp/postboxtest" + NSFileManager.defaultManager().removeItemAtPath(basePath, error: nil) + let postbox = Postbox(basePath: basePath, messageNamespaces: [messageNamespace]) + + postbox.modify { state in + let testMedia = TestMedia(id: MediaId(namespace: TestMediaNamespace.Test.rawValue, id: 1)) + for i in 0 ..< 10 { + let messageId = MessageId(peerId: otherId, namespace: messageNamespace, id: Int32(i + 1)) + let message = TestMessage(id: messageId, authorId: otherId, date: Int32(i + 100), text: "\(i)", referencedMediaIds: [testMedia.id]) + state.addMessages([message, message], medias: [testMedia]) + } + return + } + + postbox.tailPeerView(3).start(next: { next in + println(next) + }) + + postbox.tailMessageViewForPeerId(otherId, count: 4).start(next: { next in + println(next) + }) + + postbox.modify { state in + let testMedia = TestMedia(id: MediaId(namespace: TestMediaNamespace.Test.rawValue, id: 1)) + for i in 10 ..< 15 { + let messageId = MessageId(peerId: otherId, namespace: messageNamespace, id: Int32(i + 1)) + let message = TestMessage(id: messageId, authorId: otherId, date: Int32(i + 100), text: "\(i)", referencedMediaIds: [testMedia.id]) + state.addMessages([message, message], medias: [testMedia]) + } + return + } + + postbox._sync() + } }