no message

This commit is contained in:
Peter
2015-07-16 15:39:12 +03:00
parent 13f3951ddd
commit 52b9f7291a
4 changed files with 329 additions and 96 deletions

View File

@@ -28,6 +28,7 @@ public final class Postbox {
private var database: Database!
private var peerMessageViews: [PeerId : Bag<(MutableMessageView, Pipe<MessageView>)>] = [:]
private var peerViews: Bag<(MutablePeerView, Pipe<PeerView>)> = Bag()
public init(basePath: String, messageNamespaces: [MessageId.Namespace]) {
self.basePath = basePath
@@ -52,7 +53,7 @@ public final class Postbox {
private func createSchema() {
//state
self.database.execute("CREATE TABLE state (key INTEGER PRIMARY KEY, data BLOB)")
self.database.execute("CREATE TABLE state (data BLOB)")
//peer_messages
self.database.execute("CREATE TABLE peer_messages (peerId INTEGER, namespace INTEGER, id INTEGER, data BLOB, associatedMediaIds BLOB, timestamp INTEGER, PRIMARY KEY(peerId, namespace, id))")
@@ -68,49 +69,13 @@ public final class Postbox {
self.database.execute("CREATE TABLE media_cleanup (namespace INTEGER, id INTEGER, data BLOB, PRIMARY KEY(namespace, id))")
//peer_entries
self.database.execute("CREATE TABLE peer_entries (entry BLOB)")
self.database.execute("CREATE TABLE peer_entries (peerId INTEGER PRIMARY KEY, entry BLOB)")
self.database.execute("CREATE INDEX peer_entries_entry on peer_entries (entry)")
//peers
self.database.execute("CREATE TABLE peers (peerId INTEGER PRIMARY KEY, data BLOB)")
}
public func _testBlobsPrepare(range: Int) {
self.database.execute("CREATE TABLE test_blobs (number INTEGER, data BLOB)")
self.database.execute("CREATE INDEX test_blobs_index_data ON test_blobs (data)")
self.database.transaction()
let insert = self.database.prepare("INSERT INTO test_blobs (number, data) VALUES (?, ?)")
for i in 0 ..< range {
var value = Int32(bigEndian: Int32(i))
let blob = Blob(bytes: &value, length: 4)
insert.run(Int64(i), blob)
}
self.database.commit()
}
public func _testBlobsTest(range: Int) {
let select = self.database.prepare("SELECT number FROM test_blobs WHERE data < ? ORDER BY data DESC LIMIT 1")
for i in 1 ..< 1000 {
var value = 1 + Int32(arc4random_uniform(UInt32(range - 1)))
var binary = Int32(bigEndian: value)
var found = false
for row in select.run(Blob(bytes: &binary, length: 4)) {
found = true
let selected = Int32(row[0] as! Int64)
if selected != value - 1 {
assertionFailure("invalid value \(selected) != \(value - 1)")
}
break
}
if !found {
assertionFailure("value not found for \(value - 1)")
}
}
}
private class func peerViewEntryIndexForBlob(blob: Blob) -> PeerViewEntryIndex {
let buffer = ReadBuffer(memory: UnsafeMutablePointer(blob.data.bytes), length: blob.data.length, freeWhenDone: false)
var offset: Int = 0
@@ -357,7 +322,7 @@ public final class Postbox {
var messageIdsByMediaId: [MediaId : [MessageId]] = [:]
for (peerId, peerMessages) in Postbox.messagesGroupedByPeerId(messages) {
var maxTimestamp: Int32 = Int32.min
var maxMessage: (MessageIndex, Message)?
var messageIds: [MessageId] = []
var seenMessageIds = Set<MessageId>()
@@ -402,8 +367,9 @@ public final class Postbox {
record.0.add(message)
}
if maxTimestamp == Int32.min || message.timestamp > maxTimestamp {
maxTimestamp = message.timestamp
let index = MessageIndex(message)
if maxMessage == nil || index > maxMessage!.0 {
maxMessage = (index, message)
}
encoder.reset()
@@ -430,17 +396,8 @@ public final class Postbox {
record.1.putNext(MessageView(record.0))
}
var currentMaxTimestamp: Int32?
for row in self.database.prepare("SELECT timestamp FROM top_messages WHERE peerId = ?").run(peerId.toInt64()) {
currentMaxTimestamp = Int32(row[0] as! Int64)
}
if let currentMaxTimestamp = currentMaxTimestamp {
if maxTimestamp > currentMaxTimestamp {
self.database.prepare("UPDATE top_messages SET timestamp = ? WHERE peerId = ?").run(Int64(maxTimestamp), peerId.toInt64())
}
} else {
self.database.prepare("INSERT INTO top_messages(peerId, timestamp) VALUES(?, ?)").run(peerId.toInt64(), Int64(maxTimestamp))
if let maxMessage = maxMessage {
self.updatePeerEntry(peerId, message: maxMessage.1)
}
}
@@ -546,25 +503,79 @@ public final class Postbox {
return grouped
}
private func updateTopMessageTimestampForPeerId(peerId: PeerId) {
let selectIdStatement = self.database.prepare("SELECT MAX(id) FROM peer_messages WHERE peerId = ? AND namespace = ?")
let selectTimestampStatement = self.database.prepare("SELECT timestamp FROM peer_messages WHERE peerId = ? AND namespace = ? AND id = ?")
let updateTimestampStatement = self.database.prepare("UPDATE top_messages SET timestamp = ? WHERE peerId = ?")
var maxTimestamp = Int32.min
for namespace in self.messageNamespaces {
for rowId in selectIdStatement.run(peerId.toInt64(), Int64(namespace)) {
for rowTimestamp in selectTimestampStatement.run(peerId.toInt64(), Int64(namespace), rowId[0]) {
let timestamp = Int32(rowTimestamp[0] as! Int64)
if timestamp > maxTimestamp {
maxTimestamp = timestamp
}
}
private func peerWithId(peerId: PeerId) -> Peer? {
for row in self.database.prepare("SELECT data FROM peers WHERE peerId = ?").run(peerId.toInt64()) {
let data = (row[0] as! Blob).data
let decoder = Decoder(buffer: ReadBuffer(memory: UnsafeMutablePointer(data.bytes), length: data.length, freeWhenDone: false))
if let peer = decoder.decodeRootObject() as? Peer {
return peer
} else {
println("(PostBox: can't decode peer)")
}
break
}
if maxTimestamp != Int32.min {
updateTimestampStatement.run(Int64(maxTimestamp), peerId.toInt64())
return nil
}
private func updatePeerEntry(peerId: PeerId, message: Message?, replace: Bool = false) {
var currentIndex: PeerViewEntryIndex?
for row in self.database.prepare("SELECT entry FROM peer_entries WHERE peerId = ?").run(peerId.toInt64()) {
currentIndex = Postbox.peerViewEntryIndexForBlob(row[0] as! Blob)
break
}
var updatedPeerMessage: Message?
if let currentIndex = currentIndex {
if let message = message {
let messageIndex = MessageIndex(message)
if replace || currentIndex.messageIndex < messageIndex {
let updatedIndex = PeerViewEntryIndex(peerId: peerId, messageIndex: messageIndex)
updatedPeerMessage = message
let updatedBlob = Postbox.blobForPeerViewEntryIndex(updatedIndex)
self.database.prepare("UPDATE peer_entries SET entry = ? WHERE peerId = ?").run(updatedBlob, peerId.toInt64())
}
} else if replace {
//TODO: remove?
}
} else if let message = message {
updatedPeerMessage = message
let updatedIndex = PeerViewEntryIndex(peerId: peerId, messageIndex: MessageIndex(message))
let updatedBlob = Postbox.blobForPeerViewEntryIndex(updatedIndex)
self.database.prepare("INSERT INTO peer_entries (peerId, entry) VALUES (?, ?)").run(peerId.toInt64(), updatedBlob)
}
if let updatedPeerMessage = updatedPeerMessage {
var peer: Peer?
for (view, sink) in self.peerViews.copyItems() {
if peer == nil {
for entry in view.entries {
if entry.peerId == peerId {
peer = entry.peer
break
}
}
if peer == nil {
peer = self.peerWithId(peerId)
}
}
let entry: PeerViewEntry
if let peer = peer {
entry = PeerViewEntry(peer: peer, message: updatedPeerMessage)
} else {
entry = PeerViewEntry(peerId: peerId, message: updatedPeerMessage)
}
let context = view.removeEntry(nil, peerId: peerId)
view.addEntry(entry)
sink.putNext(PeerView(view))
}
}
}
@@ -573,6 +584,19 @@ public final class Postbox {
let messageIdsByNamespace = Postbox.messageIdsGroupedByNamespace(messageIds)
let messageIdsByMediaId = self.loadMessageIdsByMediaIdForPeerId(peerId, idsByNamespace: messageIdsByNamespace)
for (peerId, messageIds) in Postbox.messageIdsGroupedByPeerId(ids) {
if let relatedViews = self.peerMessageViews[peerId] {
for (view, sink) in relatedViews.copyItems() {
let context = view.remove(Set<MessageId>(messageIds))
if !context.empty() {
view.complete(context, fetchEarlier: self.fetchMessagesRelative(peerId, earlier: true), fetchLater: self.fetchMessagesRelative(peerId, earlier: false))
sink.putNext(MessageView(view))
}
}
}
}
for (namespace, messageIds) in messageIdsByNamespace {
var queryString = "DELETE FROM peer_messages WHERE peerId = ? AND namespace = ? AND id IN ("
var first = true
@@ -641,7 +665,9 @@ public final class Postbox {
}
}
self.updateTopMessageTimestampForPeerId(peerId)
let tail = self.fetchMessagesTail(peerId, count: 1)
self.updatePeerEntry(peerId, message: tail.first, replace: true)
}
}
@@ -653,12 +679,6 @@ public final class Postbox {
}
}
private struct FetchedMessage {
let id: MessageId
let timestamp: Int32
let data: NSData
}
private func findAdjacentMessageIds(peerId: PeerId, namespace: MessageId.Namespace, index: MessageIndex) -> (MessageId.Id?, MessageId.Id?) {
var minId: MessageId.Id?
var maxId: MessageId.Id?
@@ -915,6 +935,71 @@ public final class Postbox {
return messages
}
private func fetchPeerEntryIndicesRelative(earlier: Bool)(index: PeerViewEntryIndex?, count: Int) -> [PeerViewEntryIndex] {
var entries: [PeerViewEntryIndex] = []
let rows: Statement
if let index = index {
let bound = Postbox.blobForPeerViewEntryIndex(index)
let sign = earlier ? "<" : ">"
let order = earlier ? "DESC" : "ASC"
let statement = self.database.prepareCached("SELECT entry FROM peer_entries WHERE entry \(sign) ? ORDER BY entry \(order) LIMIT ?")
rows = statement.run(bound, Int64(count))
} else {
let order = earlier ? "DESC" : "ASC"
let statement = self.database.prepareCached("SELECT entry FROM peer_entries ORDER BY entry \(order) LIMIT ?")
rows = statement.run(Int64(count))
}
return entries
}
private func messageForPeer(peerId: PeerId, id: MessageId) -> Message? {
for row in self.database.prepareCached("SELECT data, associatedMediaIds FROM peer_messages WHERE peerId = ? AND namespace = ? AND id = ?").run(peerId.toInt64(), Int64(id.namespace), Int64(id.id)) {
let data = (row[0] as! Blob).data
let decoder = Decoder(buffer: ReadBuffer(memory: UnsafeMutablePointer(data.bytes), length: data.length, freeWhenDone: false))
if let message = decoder.decodeRootObject() as? Message {
return message
} else {
println("(PostBox: can't decode message)")
}
break
}
return nil
}
private func fetchPeerEntriesRelative(earlier: Bool)(index: PeerViewEntryIndex?, count: Int) -> [PeerViewEntry] {
var entries: [PeerViewEntry] = []
var peers: [PeerId : Peer] = [:]
for entryIndex in self.fetchPeerEntryIndicesRelative(earlier)(index: index, count: count) {
var peer: Peer?
if let cachedPeer = peers[entryIndex.peerId] {
peer = cachedPeer
} else {
if let fetchedPeer = self.peerWithId(entryIndex.peerId) {
peer = fetchedPeer
peers[fetchedPeer.id] = fetchedPeer
}
}
if let message = self.messageForPeer(entryIndex.peerId, id: entryIndex.messageIndex.id) {
let entry: PeerViewEntry
if let peer = peer {
entry = PeerViewEntry(peer: peer, message: message)
} else {
entry = PeerViewEntry(peerId: entryIndex.peerId, message: message)
}
} else {
println("(PostBox: missing message for peer entry)")
}
}
return entries
}
private func fetchMessagesTail(peerId: PeerId, count: Int) -> [Message] {
var messages: [Message] = []
@@ -1061,6 +1146,57 @@ public final class Postbox {
}
}
public func tailPeerView(count: Int) -> Signal<PeerView, NoError> {
return Signal { subscriber in
let disposable = MetaDisposable()
self.queue.dispatch {
let tail = self.fetchPeerEntriesRelative(true)(index: nil, count: count + 1)
var entries: [PeerViewEntry] = []
var i = tail.count - 1
while i >= 0 && i >= tail.count - count {
entries.insert(tail[i], atIndex: 0)
i--
}
var earlier: PeerViewEntry?
for namespace in self.messageNamespaces {
var i = tail.count - count - 1
while i >= 0 {
earlier = tail[i]
break
}
}
let mutableView = MutablePeerView(count: count, earlier: earlier, entries: entries, later: nil)
let record = (mutableView, Pipe<PeerView>())
let index = self.peerViews.add(record)
subscriber.putNext(PeerView(mutableView))
let pipeDisposable = record.1.signal().start(next: { next in
subscriber.putNext(next)
})
disposable.set(ActionDisposable { [weak self] in
pipeDisposable.dispose()
if let strongSelf = self {
strongSelf.queue.dispatch {
strongSelf.peerViews.remove(index)
}
}
return
})
}
return disposable
}
}
func printMessages(messages: [Message]) {
var string = ""
string += "["
@@ -1078,13 +1214,6 @@ public final class Postbox {
}
public func _dumpTables() {
println("\n-----------")
println("top_messages")
println("------------")
for row in self.database.prepare("SELECT peerId, timestamp FROM top_messages ORDER BY timestamp DESC").run() {
println("\(PeerId(row[0] as! Int64)): \(row[1] as! Int64)")
}
println("\n------------")
println("peer_messages")
println("-------------")