mirror of
https://github.com/Swiftgram/Telegram-iOS.git
synced 2025-12-23 22:55:00 +00:00
no message
This commit is contained in:
@@ -6,7 +6,7 @@ public protocol PostboxState: Coding {
|
||||
|
||||
}
|
||||
|
||||
public class Modifier<State: PostboxState> {
|
||||
public final class Modifier<State: PostboxState> {
|
||||
private weak var postbox: Postbox<State>?
|
||||
|
||||
private init(postbox: Postbox<State>) {
|
||||
@@ -28,6 +28,10 @@ public class Modifier<State: PostboxState> {
|
||||
public func setState(state: State) {
|
||||
self.postbox?.setState(state)
|
||||
}
|
||||
|
||||
public func updatePeers(peers: [Peer], update: (Peer, Peer) -> Peer) {
|
||||
self.postbox?.updatePeers(peers, update: update)
|
||||
}
|
||||
}
|
||||
|
||||
public final class Postbox<State: PostboxState> {
|
||||
@@ -38,7 +42,10 @@ public final class Postbox<State: PostboxState> {
|
||||
private var database: Database!
|
||||
|
||||
private var peerMessageViews: [PeerId : Bag<(MutableMessageView, Pipe<MessageView>)>] = [:]
|
||||
private var deferredMessageViewsToUpdate: [(MutableMessageView, Pipe<MessageView>)] = []
|
||||
private var peerViews: Bag<(MutablePeerView, Pipe<PeerView>)> = Bag()
|
||||
private var deferredPeerViewsToUpdate: [(MutablePeerView, Pipe<PeerView>)] = []
|
||||
|
||||
private var statePipe: Pipe<State> = Pipe()
|
||||
|
||||
public init(basePath: String, messageNamespaces: [MessageId.Namespace]) {
|
||||
@@ -48,21 +55,45 @@ public final class Postbox<State: PostboxState> {
|
||||
}
|
||||
|
||||
private func openDatabase() {
|
||||
do {
|
||||
try NSFileManager.defaultManager().createDirectoryAtPath(basePath, withIntermediateDirectories: true, attributes: nil)
|
||||
} catch _ {
|
||||
}
|
||||
self.database = Database(basePath.stringByAppendingPathComponent("db"))
|
||||
|
||||
self.queue.dispatch {
|
||||
let startTime = CFAbsoluteTimeGetCurrent()
|
||||
|
||||
do {
|
||||
try NSFileManager.defaultManager().createDirectoryAtPath(self.basePath, withIntermediateDirectories: true, attributes: nil)
|
||||
} catch _ {
|
||||
}
|
||||
self.database = Database(self.basePath.stringByAppendingPathComponent("db"))
|
||||
|
||||
let result = self.database.scalar("PRAGMA user_version") as! Int64
|
||||
if result == 1 {
|
||||
let version: Int64 = 7
|
||||
if result == version {
|
||||
print("(Postbox schema version \(result))")
|
||||
} else {
|
||||
if result != 0 {
|
||||
print("(Postbox migrating to version \(version))")
|
||||
do {
|
||||
try NSFileManager.defaultManager().removeItemAtPath(self.basePath)
|
||||
try NSFileManager.defaultManager().createDirectoryAtPath(self.basePath, withIntermediateDirectories: true, attributes: nil)
|
||||
} catch (_) {
|
||||
}
|
||||
|
||||
self.database = Database(self.basePath.stringByAppendingPathComponent("db"))
|
||||
}
|
||||
print("(Postbox creating schema)")
|
||||
self.createSchema()
|
||||
self.database.execute("PRAGMA user_version = 1")
|
||||
self.database.execute("PRAGMA user_version = \(version)")
|
||||
}
|
||||
|
||||
self.database.adjustChunkSize()
|
||||
self.database.execute("PRAGMA page_size=1024")
|
||||
self.database.execute("PRAGMA cache_size=-2097152")
|
||||
self.database.execute("PRAGMA synchronous=NORMAL")
|
||||
self.database.execute("PRAGMA journal_mode=truncate")
|
||||
self.database.execute("PRAGMA temp_store=MEMORY")
|
||||
//self.database.execute("PRAGMA wal_autocheckpoint=32")
|
||||
//self.database.execute("PRAGMA journal_size_limit=1536")
|
||||
|
||||
print("(Postbox initialization took \((CFAbsoluteTimeGetCurrent() - startTime) * 1000.0) ms")
|
||||
}
|
||||
}
|
||||
|
||||
@@ -92,7 +123,7 @@ public final class Postbox<State: PostboxState> {
|
||||
self.database.execute("CREATE INDEX peer_entries_entry on peer_entries (entry)")
|
||||
|
||||
//peers
|
||||
self.database.execute("CREATE TABLE peers (peerId INTEGER PRIMARY KEY, data BLOB)")
|
||||
self.database.execute("CREATE TABLE peers (id INTEGER PRIMARY KEY, data BLOB)")
|
||||
}
|
||||
|
||||
private class func peerViewEntryIndexForBlob(blob: Blob) -> PeerViewEntryIndex {
|
||||
@@ -197,14 +228,23 @@ public final class Postbox<State: PostboxState> {
|
||||
return grouped
|
||||
}
|
||||
|
||||
private class func messagesGroupedByPeerId(messages: [Message]) -> [PeerId : [Message]] {
|
||||
var grouped: [PeerId : [Message]] = [:]
|
||||
private class func messagesGroupedByPeerId(messages: [Message]) -> [(PeerId, [Message])] {
|
||||
var grouped: [(PeerId, [Message])] = []
|
||||
|
||||
for message in messages {
|
||||
if grouped[message.id.peerId] != nil {
|
||||
grouped[message.id.peerId]!.append(message)
|
||||
} else {
|
||||
grouped[message.id.peerId] = [message]
|
||||
var i = 0
|
||||
let count = grouped.count
|
||||
var found = false
|
||||
while i < count {
|
||||
if grouped[i].0 == message.id.peerId {
|
||||
grouped[i].1.append(message)
|
||||
found = true
|
||||
break
|
||||
}
|
||||
i++
|
||||
}
|
||||
if !found {
|
||||
grouped.append((message.id.peerId, [message]))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -325,8 +365,12 @@ public final class Postbox<State: PostboxState> {
|
||||
return ids
|
||||
}
|
||||
|
||||
private var cachedState: State?
|
||||
|
||||
private func setState(state: State) {
|
||||
self.queue.dispatch {
|
||||
self.cachedState = state
|
||||
|
||||
let encoder = Encoder()
|
||||
encoder.encodeRootObject(state)
|
||||
let blob = Blob(data: encoder.makeData())
|
||||
@@ -337,16 +381,21 @@ public final class Postbox<State: PostboxState> {
|
||||
}
|
||||
|
||||
private func getState() -> State? {
|
||||
for row in self.database.prepareCached("SELECT data FROM state WHERE id = ?").run(Int64(0)) {
|
||||
let data = (row[0] as! Blob).data
|
||||
let buffer = ReadBuffer(memory: UnsafeMutablePointer(data.bytes), length: data.length, freeWhenDone: false)
|
||||
let decoder = Decoder(buffer: buffer)
|
||||
if let state = decoder.decodeRootObject() as? State {
|
||||
return state
|
||||
if let cachedState = self.cachedState {
|
||||
return cachedState
|
||||
} else {
|
||||
for row in self.database.prepareCached("SELECT data FROM state WHERE id = ?").run(Int64(0)) {
|
||||
let data = (row[0] as! Blob).data
|
||||
let buffer = ReadBuffer(memory: UnsafeMutablePointer(data.bytes), length: data.length, freeWhenDone: false)
|
||||
let decoder = Decoder(buffer: buffer)
|
||||
if let state = decoder.decodeRootObject() as? State {
|
||||
self.cachedState = state
|
||||
return state
|
||||
}
|
||||
break
|
||||
}
|
||||
break
|
||||
return nil
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
public func state() -> Signal<State?, NoError> {
|
||||
@@ -394,11 +443,11 @@ public final class Postbox<State: PostboxState> {
|
||||
}
|
||||
|
||||
private func addMessages(messages: [Message], medias: [Media]) {
|
||||
let messageInsertStatement = self.database.prepare("INSERT INTO peer_messages (peerId, namespace, id, data, associatedMediaIds, timestamp) VALUES (?, ?, ?, ?, ?, ?)")
|
||||
let peerMediaInsertStatement = self.database.prepare("INSERT INTO peer_media (peerId, mediaNamespace, messageNamespace, messageId) VALUES (?, ?, ?, ?)")
|
||||
let mediaInsertStatement = self.database.prepare("INSERT INTO media (namespace, id, data, associatedMessageIds) VALUES (?, ?, ?, ?)")
|
||||
let referencedMessageIdsStatement = self.database.prepare("SELECT associatedMessageIds FROM media WHERE namespace = ? AND id = ?")
|
||||
let updateReferencedMessageIdsStatement = self.database.prepare("UPDATE media SET associatedMessageIds = ? WHERE namespace = ? AND id = ?")
|
||||
let messageInsertStatement = self.database.prepareCached("INSERT INTO peer_messages (peerId, namespace, id, data, associatedMediaIds, timestamp) VALUES (?, ?, ?, ?, ?, ?)")
|
||||
let peerMediaInsertStatement = self.database.prepareCached("INSERT INTO peer_media (peerId, mediaNamespace, messageNamespace, messageId) VALUES (?, ?, ?, ?)")
|
||||
let mediaInsertStatement = self.database.prepareCached("INSERT INTO media (namespace, id, data, associatedMessageIds) VALUES (?, ?, ?, ?)")
|
||||
let referencedMessageIdsStatement = self.database.prepareCached("SELECT associatedMessageIds FROM media WHERE namespace = ? AND id = ?")
|
||||
let updateReferencedMessageIdsStatement = self.database.prepareCached("UPDATE media SET associatedMessageIds = ? WHERE namespace = ? AND id = ?")
|
||||
|
||||
let encoder = Encoder()
|
||||
|
||||
@@ -437,18 +486,12 @@ public final class Postbox<State: PostboxState> {
|
||||
}
|
||||
}
|
||||
|
||||
let relatedViews = self.peerMessageViews[peerId] ?? Bag()
|
||||
|
||||
for message in peerMessages {
|
||||
if existingMessageIds.contains(message.id) {
|
||||
continue
|
||||
}
|
||||
existingMessageIds.insert(message.id)
|
||||
|
||||
for record in relatedViews.copyItems() {
|
||||
record.0.add(message)
|
||||
}
|
||||
|
||||
let index = MessageIndex(message)
|
||||
if maxMessage == nil || index > maxMessage!.0 {
|
||||
maxMessage = (index, message)
|
||||
@@ -458,8 +501,8 @@ public final class Postbox<State: PostboxState> {
|
||||
encoder.encodeRootObject(message)
|
||||
let messageBlob = Blob(data: encoder.makeData())
|
||||
|
||||
let referencedMediaIdsMediaIdsBlob = Postbox.blobForMediaIds(message.referencedMediaIds)
|
||||
for id in message.referencedMediaIds {
|
||||
let referencedMediaIdsMediaIdsBlob = Postbox.blobForMediaIds(message.mediaIds)
|
||||
for id in message.mediaIds {
|
||||
if messageIdsByMediaId[id] != nil {
|
||||
messageIdsByMediaId[id]!.append(message.id)
|
||||
} else {
|
||||
@@ -469,17 +512,28 @@ public final class Postbox<State: PostboxState> {
|
||||
|
||||
messageInsertStatement.run(peerId.toInt64(), Int64(message.id.namespace), Int64(message.id.id), messageBlob, referencedMediaIdsMediaIdsBlob, Int64(message.timestamp))
|
||||
|
||||
for id in message.referencedMediaIds {
|
||||
for id in message.mediaIds {
|
||||
peerMediaInsertStatement.run(peerId.toInt64(), Int64(id.namespace), Int64(message.id.namespace), Int64(message.id.id))
|
||||
}
|
||||
}
|
||||
|
||||
for record in relatedViews.copyItems() {
|
||||
record.1.putNext(MessageView(record.0))
|
||||
if let relatedViews = self.peerMessageViews[peerId] {
|
||||
for record in relatedViews.copyItems() {
|
||||
var updated = false
|
||||
for message in peerMessages {
|
||||
if record.0.add(RenderedMessage(message: message)) {
|
||||
updated = true
|
||||
}
|
||||
}
|
||||
|
||||
if updated {
|
||||
self.deferMessageViewUpdate(record.0, pipe: record.1)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if let maxMessage = maxMessage {
|
||||
self.updatePeerEntry(peerId, message: maxMessage.1)
|
||||
self.updatePeerEntry(peerId, message: RenderedMessage(message: maxMessage.1))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -585,56 +639,197 @@ public final class Postbox<State: PostboxState> {
|
||||
return grouped
|
||||
}
|
||||
|
||||
private func peerWithId(peerId: PeerId) -> Peer? {
|
||||
for row in self.database.prepare("SELECT data FROM peers WHERE peerId = ?").run(peerId.toInt64()) {
|
||||
let data = (row[0] as! Blob).data
|
||||
let decoder = Decoder(buffer: ReadBuffer(memory: UnsafeMutablePointer(data.bytes), length: data.length, freeWhenDone: false))
|
||||
if let peer = decoder.decodeRootObject() as? Peer {
|
||||
return peer
|
||||
} else {
|
||||
print("(PostBox: can't decode peer)")
|
||||
private func mediaWithIds(ids: [MediaId]) -> [MediaId : Media] {
|
||||
if ids.count == 0 {
|
||||
return [:]
|
||||
} else {
|
||||
let select = self.database.prepareCached("SELECT data FROM media WHERE namespace = ? AND id = ?")
|
||||
var result: [MediaId : Media] = [:]
|
||||
|
||||
for id in ids {
|
||||
for row in select.run(Int64(id.namespace), id.id) {
|
||||
let blob = row[0] as! Blob
|
||||
if let media = Decoder(buffer: ReadBuffer(memory: UnsafeMutablePointer<Void>(blob.data.bytes), length: blob.data.length, freeWhenDone: false)) as? Media {
|
||||
result[media.id] = media
|
||||
}
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
break
|
||||
return result
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
var cachedPeers: [PeerId : Peer] = [:]
|
||||
|
||||
private func updatePeerEntry(peerId: PeerId, message: Message?, replace: Bool = false) {
|
||||
private func peerWithId(peerId: PeerId) -> Peer? {
|
||||
if let cachedPeer = cachedPeers[peerId] {
|
||||
return cachedPeer
|
||||
} else {
|
||||
for row in self.database.prepareCached("SELECT data FROM peers WHERE id = ?").run(peerId.toInt64()) {
|
||||
let data = (row[0] as! Blob).data
|
||||
let decoder = Decoder(buffer: ReadBuffer(memory: UnsafeMutablePointer(data.bytes), length: data.length, freeWhenDone: false))
|
||||
if let peer = decoder.decodeRootObject() as? Peer {
|
||||
cachedPeers[peer.id] = peer
|
||||
return peer
|
||||
} else {
|
||||
print("(PostBox: can't decode peer)")
|
||||
}
|
||||
|
||||
break
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
private func peersWithIds(ids: [PeerId]) -> [PeerId : Peer] {
|
||||
if ids.count == 0 {
|
||||
return [:]
|
||||
} else {
|
||||
var remainingIds: [PeerId] = []
|
||||
|
||||
var peers: [PeerId : Peer] = [:]
|
||||
|
||||
for id in ids {
|
||||
if let cachedPeer = cachedPeers[id] {
|
||||
peers[id] = cachedPeer
|
||||
} else {
|
||||
remainingIds.append(id)
|
||||
}
|
||||
}
|
||||
|
||||
if remainingIds.count != 0 {
|
||||
let rows: Statement
|
||||
if ids.count == 1 {
|
||||
rows = self.database.prepareCached("SELECT data FROM peers WHERE id = ?").run(ids[0].toInt64())
|
||||
} else if ids.count == 2 {
|
||||
rows = self.database.prepareCached("SELECT data FROM peers WHERE id IN (?, ?)").run(ids[0].toInt64(), ids[1].toInt64())
|
||||
} else {
|
||||
var query = "SELECT data FROM peers WHERE id IN ("
|
||||
var first = true
|
||||
for id in ids {
|
||||
if first {
|
||||
first = false
|
||||
query += "\(id.toInt64())"
|
||||
} else {
|
||||
query += ",\(id.toInt64())"
|
||||
}
|
||||
}
|
||||
query += ")"
|
||||
rows = self.database.prepare(query).run()
|
||||
}
|
||||
|
||||
for row in rows {
|
||||
let blob = row[0] as! Blob
|
||||
if let peer = Decoder(buffer: ReadBuffer(memory: UnsafeMutablePointer<Void>(blob.data.bytes), length: blob.data.length, freeWhenDone: false)).decodeRootObject() as? Peer {
|
||||
self.cachedPeers[peer.id] = peer
|
||||
peers[peer.id] = peer
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return peers
|
||||
}
|
||||
}
|
||||
|
||||
private func deferPeerViewUpdate(view: MutablePeerView, pipe: Pipe<PeerView>) {
|
||||
var i = 0
|
||||
var found = false
|
||||
while i < self.deferredPeerViewsToUpdate.count {
|
||||
if self.deferredPeerViewsToUpdate[i].1 === pipe {
|
||||
self.deferredPeerViewsToUpdate[i] = (view, pipe)
|
||||
found = true
|
||||
break
|
||||
}
|
||||
i++
|
||||
}
|
||||
if !found {
|
||||
self.deferredPeerViewsToUpdate.append((view, pipe))
|
||||
}
|
||||
}
|
||||
|
||||
private func deferMessageViewUpdate(view: MutableMessageView, pipe: Pipe<MessageView>) {
|
||||
var i = 0
|
||||
var found = false
|
||||
while i < self.deferredPeerViewsToUpdate.count {
|
||||
if self.deferredMessageViewsToUpdate[i].1 === pipe {
|
||||
self.deferredMessageViewsToUpdate[i] = (view, pipe)
|
||||
found = true
|
||||
break
|
||||
}
|
||||
i++
|
||||
}
|
||||
if !found {
|
||||
self.deferredMessageViewsToUpdate.append((view, pipe))
|
||||
}
|
||||
}
|
||||
|
||||
private func performDeferredUpdates() {
|
||||
let deferredPeerViewsToUpdate = self.deferredPeerViewsToUpdate
|
||||
self.deferredPeerViewsToUpdate.removeAll()
|
||||
|
||||
for entry in deferredPeerViewsToUpdate {
|
||||
let viewRenderedMessages = self.renderedMessages(entry.0.incompleteMessages())
|
||||
if viewRenderedMessages.count != 0 {
|
||||
var viewRenderedMessagesDict: [MessageId : RenderedMessage] = [:]
|
||||
for message in viewRenderedMessages {
|
||||
viewRenderedMessagesDict[message.message.id] = message
|
||||
}
|
||||
entry.0.completeMessages(viewRenderedMessagesDict)
|
||||
}
|
||||
|
||||
entry.1.putNext(PeerView(entry.0))
|
||||
}
|
||||
|
||||
let deferredMessageViewsToUpdate = self.deferredMessageViewsToUpdate
|
||||
self.deferredMessageViewsToUpdate.removeAll()
|
||||
|
||||
for entry in deferredMessageViewsToUpdate {
|
||||
let viewRenderedMessages = self.renderedMessages(entry.0.incompleteMessages())
|
||||
if viewRenderedMessages.count != 0 {
|
||||
var viewRenderedMessagesDict: [MessageId : RenderedMessage] = [:]
|
||||
for message in viewRenderedMessages {
|
||||
viewRenderedMessagesDict[message.message.id] = message
|
||||
}
|
||||
entry.0.completeMessages(viewRenderedMessagesDict)
|
||||
}
|
||||
|
||||
entry.1.putNext(MessageView(entry.0))
|
||||
}
|
||||
}
|
||||
|
||||
private func updatePeerEntry(peerId: PeerId, message: RenderedMessage?, replace: Bool = false) {
|
||||
var currentIndex: PeerViewEntryIndex?
|
||||
for row in self.database.prepare("SELECT entry FROM peer_entries WHERE peerId = ?").run(peerId.toInt64()) {
|
||||
for row in self.database.prepareCached("SELECT entry FROM peer_entries WHERE peerId = ?").run(peerId.toInt64()) {
|
||||
currentIndex = Postbox.peerViewEntryIndexForBlob(row[0] as! Blob)
|
||||
break
|
||||
}
|
||||
|
||||
var updatedPeerMessage: Message?
|
||||
var updatedPeerMessage: RenderedMessage?
|
||||
|
||||
if let currentIndex = currentIndex {
|
||||
if let message = message {
|
||||
let messageIndex = MessageIndex(message)
|
||||
let messageIndex = MessageIndex(message.message)
|
||||
if replace || currentIndex.messageIndex < messageIndex {
|
||||
let updatedIndex = PeerViewEntryIndex(peerId: peerId, messageIndex: messageIndex)
|
||||
updatedPeerMessage = message
|
||||
let updatedBlob = Postbox.blobForPeerViewEntryIndex(updatedIndex)
|
||||
self.database.prepare("UPDATE peer_entries SET entry = ? WHERE peerId = ?").run(updatedBlob, peerId.toInt64())
|
||||
self.database.prepareCached("UPDATE peer_entries SET entry = ? WHERE peerId = ?").run(updatedBlob, peerId.toInt64())
|
||||
}
|
||||
} else if replace {
|
||||
//TODO: remove?
|
||||
}
|
||||
} else if let message = message {
|
||||
updatedPeerMessage = message
|
||||
let updatedIndex = PeerViewEntryIndex(peerId: peerId, messageIndex: MessageIndex(message))
|
||||
let updatedIndex = PeerViewEntryIndex(peerId: peerId, messageIndex: MessageIndex(message.message))
|
||||
let updatedBlob = Postbox.blobForPeerViewEntryIndex(updatedIndex)
|
||||
self.database.prepare("INSERT INTO peer_entries (peerId, entry) VALUES (?, ?)").run(peerId.toInt64(), updatedBlob)
|
||||
self.database.prepareCached("INSERT INTO peer_entries (peerId, entry) VALUES (?, ?)").run(peerId.toInt64(), updatedBlob)
|
||||
}
|
||||
|
||||
if let updatedPeerMessage = updatedPeerMessage {
|
||||
var peer: Peer?
|
||||
for (view, sink) in self.peerViews.copyItems() {
|
||||
|
||||
for (view, pipe) in self.peerViews.copyItems() {
|
||||
if peer == nil {
|
||||
for entry in view.entries {
|
||||
if entry.peerId == peerId {
|
||||
@@ -659,7 +854,7 @@ public final class Postbox<State: PostboxState> {
|
||||
view.addEntry(entry)
|
||||
view.complete(context, fetchEarlier: self.fetchPeerEntriesRelative(true), fetchLater: self.fetchPeerEntriesRelative(false))
|
||||
|
||||
sink.putNext(PeerView(view))
|
||||
self.deferPeerViewUpdate(view, pipe: pipe)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -671,11 +866,11 @@ public final class Postbox<State: PostboxState> {
|
||||
|
||||
for (peerId, messageIds) in Postbox.messageIdsGroupedByPeerId(ids) {
|
||||
if let relatedViews = self.peerMessageViews[peerId] {
|
||||
for (view, sink) in relatedViews.copyItems() {
|
||||
for (view, pipe) in relatedViews.copyItems() {
|
||||
let context = view.remove(Set<MessageId>(messageIds))
|
||||
if !context.empty() {
|
||||
view.complete(context, fetchEarlier: self.fetchMessagesRelative(peerId, earlier: true), fetchLater: self.fetchMessagesRelative(peerId, earlier: false))
|
||||
sink.putNext(MessageView(view))
|
||||
self.deferMessageViewUpdate(view, pipe: pipe)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -735,16 +930,16 @@ public final class Postbox<State: PostboxState> {
|
||||
updatedMessageIdsByMediaId[mediaId] = messageIds
|
||||
|
||||
if messageIds.count == 0 {
|
||||
self.database.prepare("INSERT OR IGNORE INTO media_cleanup (namespace, id, data) VALUES (?, ?, ?)").run(Int64(namespace), mediaId.id, row[1] as! Blob)
|
||||
self.database.prepareCached("INSERT OR IGNORE INTO media_cleanup (namespace, id, data) VALUES (?, ?, ?)").run(Int64(namespace), mediaId.id, row[1] as! Blob)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for (mediaId, messageIds) in updatedMessageIdsByMediaId {
|
||||
if messageIds.count == 0 {
|
||||
self.database.prepare("DELETE FROM media WHERE namespace = ? AND id = ?").run(Int64(mediaId.namespace), mediaId.id)
|
||||
self.database.prepareCached("DELETE FROM media WHERE namespace = ? AND id = ?").run(Int64(mediaId.namespace), mediaId.id)
|
||||
} else {
|
||||
self.database.prepare("UPDATE media SET associatedMessageIds = ? WHERE namespace = ? AND id = ?").run(Postbox.blobForMessageIds(messageIds), Int64(mediaId.namespace), mediaId.id)
|
||||
self.database.prepareCached("UPDATE media SET associatedMessageIds = ? WHERE namespace = ? AND id = ?").run(Postbox.blobForMessageIds(messageIds), Int64(mediaId.namespace), mediaId.id)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -755,13 +950,79 @@ public final class Postbox<State: PostboxState> {
|
||||
}
|
||||
}
|
||||
|
||||
private func updatePeers(peers: [Peer], update: (Peer, Peer) -> Peer) {
|
||||
if peers.count == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
if peers.count == -1 {
|
||||
|
||||
} else {
|
||||
var peerIds: [PeerId] = []
|
||||
for peer in peers {
|
||||
peerIds.append(peer.id)
|
||||
}
|
||||
|
||||
let currentPeers = self.peersWithIds(peerIds)
|
||||
|
||||
let updatePeer = self.database.prepareCached("UPDATE peers SET data = ? WHERE id = ?")
|
||||
let insertPeer = self.database.prepareCached("INSERT INTO peers (id, data) VALUES (?, ?)")
|
||||
let encoder = Encoder()
|
||||
|
||||
var updatedPeers: [PeerId : Peer] = [:]
|
||||
|
||||
for updatedPeer in peers {
|
||||
let currentPeer = currentPeers[updatedPeer.id]
|
||||
|
||||
var finalPeer = updatedPeer
|
||||
if let currentPeer = currentPeer {
|
||||
finalPeer = update(currentPeer, updatedPeer)
|
||||
}
|
||||
|
||||
if currentPeer == nil || !finalPeer.equalsTo(currentPeer!) {
|
||||
updatedPeers[finalPeer.id] = finalPeer
|
||||
self.cachedPeers[finalPeer.id] = finalPeer
|
||||
|
||||
encoder.reset()
|
||||
encoder.encodeRootObject(finalPeer)
|
||||
|
||||
if currentPeer != nil {
|
||||
updatePeer.run(Blob(data: encoder.makeData()), finalPeer.id.toInt64())
|
||||
} else {
|
||||
insertPeer.run(finalPeer.id.toInt64(), Blob(data: encoder.makeData()))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for record in self.peerViews.copyItems() {
|
||||
if record.0.updatePeers(updatedPeers) {
|
||||
deferPeerViewUpdate(record.0, pipe: record.1)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public func modify<T>(f: Modifier<State> -> T) -> Signal<T, NoError> {
|
||||
return Signal { subscriber in
|
||||
self.queue.dispatch {
|
||||
//#if DEBUG
|
||||
let startTime = CFAbsoluteTimeGetCurrent()
|
||||
//#endif
|
||||
|
||||
self.database.transaction()
|
||||
let result = f(Modifier(postbox: self))
|
||||
//print("(Postbox modify took \((CFAbsoluteTimeGetCurrent() - startTime) * 1000.0) ms)")
|
||||
//#if DEBUG
|
||||
//startTime = CFAbsoluteTimeGetCurrent()
|
||||
//#endif
|
||||
self.database.commit()
|
||||
|
||||
//#if DEBUG
|
||||
print("(Postbox commit took \((CFAbsoluteTimeGetCurrent() - startTime) * 1000.0) ms)")
|
||||
//#endif
|
||||
|
||||
self.performDeferredUpdates()
|
||||
|
||||
subscriber.putNext(result)
|
||||
subscriber.putCompletion()
|
||||
}
|
||||
@@ -772,7 +1033,7 @@ public final class Postbox<State: PostboxState> {
|
||||
private func findAdjacentMessageIds(peerId: PeerId, namespace: MessageId.Namespace, index: MessageIndex) -> (MessageId.Id?, MessageId.Id?) {
|
||||
var minId: MessageId.Id?
|
||||
var maxId: MessageId.Id?
|
||||
for row in self.database.prepare("SELECT MIN(id), MAX(id) FROM peer_messages WHERE peerId = ? AND namespace = ?").run(peerId.toInt64(), Int64(namespace)) {
|
||||
for row in self.database.prepareCached("SELECT MIN(id), MAX(id) FROM peer_messages WHERE peerId = ? AND namespace = ?").run(peerId.toInt64(), Int64(namespace)) {
|
||||
minId = MessageId.Id(row[0] as! Int64)
|
||||
maxId = MessageId.Id(row[1] as! Int64)
|
||||
}
|
||||
@@ -780,7 +1041,7 @@ public final class Postbox<State: PostboxState> {
|
||||
if let minId = minId, maxId = maxId {
|
||||
var minTimestamp: Int32!
|
||||
var maxTimestamp: Int32!
|
||||
for row in self.database.prepare("SELECT id, timestamp FROM peer_messages WHERE peerId = ? AND namespace = ? AND id IN (?, ?)").run(peerId.toInt64(), Int64(namespace), Int64(minId), Int64(maxId)) {
|
||||
for row in self.database.prepareCached("SELECT id, timestamp FROM peer_messages WHERE peerId = ? AND namespace = ? AND id IN (?, ?)").run(peerId.toInt64(), Int64(namespace), Int64(minId), Int64(maxId)) {
|
||||
let id = Int32(row[0] as! Int64)
|
||||
let timestamp = Int32(row[1] as! Int64)
|
||||
if id == minId {
|
||||
@@ -790,8 +1051,8 @@ public final class Postbox<State: PostboxState> {
|
||||
}
|
||||
}
|
||||
|
||||
let earlierMidStatement = self.database.prepare("SELECT id, timestamp FROM peer_messages WHERE peerId = ? AND namespace = ? AND id <= ? LIMIT 1")
|
||||
let laterMidStatement = self.database.prepare("SELECT id, timestamp FROM peer_messages WHERE peerId = ? AND namespace = ? AND id >= ? LIMIT 1")
|
||||
let earlierMidStatement = self.database.prepareCached("SELECT id, timestamp FROM peer_messages WHERE peerId = ? AND namespace = ? AND id <= ? LIMIT 1")
|
||||
let laterMidStatement = self.database.prepareCached("SELECT id, timestamp FROM peer_messages WHERE peerId = ? AND namespace = ? AND id >= ? LIMIT 1")
|
||||
|
||||
func lowerBound(timestamp: Int32) -> MessageId.Id? {
|
||||
var leftId = minId
|
||||
@@ -883,16 +1144,16 @@ public final class Postbox<State: PostboxState> {
|
||||
}
|
||||
}
|
||||
|
||||
private func fetchMessagesAround(peerId: PeerId, anchorId: MessageId, count: Int) -> ([Message], [MessageId.Namespace : Message], [MessageId.Namespace : Message]) {
|
||||
var messages: [Message] = []
|
||||
private func fetchMessagesAround(peerId: PeerId, anchorId: MessageId, count: Int) -> ([RenderedMessage], [MessageId.Namespace : RenderedMessage], [MessageId.Namespace : RenderedMessage]) {
|
||||
var messages: [RenderedMessage] = []
|
||||
|
||||
messages += self.fetchMessagesRelative(peerId, earlier: true)(namespace: anchorId.namespace, id: anchorId.id, count: count + 1)
|
||||
messages += self.fetchMessagesRelative(peerId, earlier: false)(namespace: anchorId.namespace, id: anchorId.id - 1, count: count + 1)
|
||||
|
||||
messages.sortInPlace({ MessageIndex($0) < MessageIndex($1) })
|
||||
messages.sortInPlace({ MessageIndex($0.message) < MessageIndex($1.message) })
|
||||
var i = messages.count - 1
|
||||
while i >= 1 {
|
||||
if messages[i].id == messages[i - 1].id {
|
||||
if messages[i].message.id == messages[i - 1].message.id {
|
||||
messages.removeAtIndex(i)
|
||||
}
|
||||
i--
|
||||
@@ -903,19 +1164,19 @@ public final class Postbox<State: PostboxState> {
|
||||
} else {
|
||||
var index: MessageIndex!
|
||||
for message in messages {
|
||||
if message.id == anchorId {
|
||||
index = MessageIndex(message)
|
||||
if message.message.id == anchorId {
|
||||
index = MessageIndex(message.message)
|
||||
break
|
||||
}
|
||||
}
|
||||
if index == nil {
|
||||
var closestId: MessageId.Id = messages[0].id.id
|
||||
var closestId: MessageId.Id = messages[0].message.id.id
|
||||
var closestDistance = abs(closestId - anchorId.id)
|
||||
let closestTimestamp: Int32 = messages[0].timestamp
|
||||
let closestTimestamp: Int32 = messages[0].message.timestamp
|
||||
for message in messages {
|
||||
if abs(message.id.id - anchorId.id) < closestDistance {
|
||||
closestId = message.id.id
|
||||
closestDistance = abs(message.id.id - anchorId.id)
|
||||
if abs(message.message.id.id - anchorId.id) < closestDistance {
|
||||
closestId = message.message.id.id
|
||||
closestDistance = abs(message.message.id.id - anchorId.id)
|
||||
}
|
||||
}
|
||||
index = MessageIndex(id: MessageId(peerId: peerId, namespace: anchorId.namespace, id: closestId), timestamp: closestTimestamp)
|
||||
@@ -933,10 +1194,10 @@ public final class Postbox<State: PostboxState> {
|
||||
}
|
||||
}
|
||||
|
||||
messages.sortInPlace({ MessageIndex($0) < MessageIndex($1) })
|
||||
messages.sortInPlace({ MessageIndex($0.message) < MessageIndex($1.message) })
|
||||
var i = messages.count - 1
|
||||
while i >= 1 {
|
||||
if messages[i].id == messages[i - 1].id {
|
||||
if messages[i].message.id == messages[i - 1].message.id {
|
||||
messages.removeAtIndex(i)
|
||||
}
|
||||
i--
|
||||
@@ -945,16 +1206,16 @@ public final class Postbox<State: PostboxState> {
|
||||
var anchorIndex = messages.count / 2
|
||||
i = 0
|
||||
while i < messages.count {
|
||||
if messages[i].id == index.id {
|
||||
if messages[i].message.id == index.id {
|
||||
anchorIndex = i
|
||||
break
|
||||
}
|
||||
i++
|
||||
}
|
||||
|
||||
var filteredMessages: [Message] = []
|
||||
var earlier: [MessageId.Namespace : Message] = [:]
|
||||
var later: [MessageId.Namespace : Message] = [:]
|
||||
var filteredMessages: [RenderedMessage] = []
|
||||
var earlier: [MessageId.Namespace : RenderedMessage] = [:]
|
||||
var later: [MessageId.Namespace : RenderedMessage] = [:]
|
||||
|
||||
i = anchorIndex
|
||||
var j = anchorIndex - 1
|
||||
@@ -977,32 +1238,32 @@ public final class Postbox<State: PostboxState> {
|
||||
|
||||
i = leftIndex - 1
|
||||
while i >= 0 {
|
||||
if earlier[messages[i].id.namespace] == nil {
|
||||
earlier[messages[i].id.namespace] = messages[i]
|
||||
if earlier[messages[i].message.id.namespace] == nil {
|
||||
earlier[messages[i].message.id.namespace] = messages[i]
|
||||
}
|
||||
i--
|
||||
}
|
||||
|
||||
i = rightIndex + 1
|
||||
while i < messages.count {
|
||||
if later[messages[i].id.namespace] == nil {
|
||||
later[messages[i].id.namespace] = messages[i]
|
||||
if later[messages[i].message.id.namespace] == nil {
|
||||
later[messages[i].message.id.namespace] = messages[i]
|
||||
}
|
||||
i++
|
||||
}
|
||||
|
||||
filteredMessages.sortInPlace({ MessageIndex($0) < MessageIndex($1) })
|
||||
filteredMessages.sortInPlace({ MessageIndex($0.message) < MessageIndex($1.message) })
|
||||
|
||||
return (filteredMessages, earlier, later)
|
||||
}
|
||||
}
|
||||
|
||||
private func fetchMessagesRelative(peerId: PeerId, earlier: Bool)(namespace: MessageId.Namespace, id: MessageId.Id?, count: Int) -> [Message] {
|
||||
private func fetchMessagesRelative(peerId: PeerId, earlier: Bool)(namespace: MessageId.Namespace, id: MessageId.Id?, count: Int) -> [RenderedMessage] {
|
||||
var messages: [Message] = []
|
||||
|
||||
let sign = earlier ? "<" : ">"
|
||||
let order = earlier ? "DESC" : "ASC"
|
||||
let statement = self.database.prepare("SELECT data, associatedMediaIds FROM peer_messages WHERE peerId = ? AND namespace = ? AND id \(sign) ? ORDER BY id \(order) LIMIT \(count)")
|
||||
let statement = self.database.prepareCached("SELECT data, associatedMediaIds FROM peer_messages WHERE peerId = ? AND namespace = ? AND id \(sign) ? ORDER BY id \(order) LIMIT ?")
|
||||
let bound: Int64
|
||||
if let id = id {
|
||||
bound = Int64(id)
|
||||
@@ -1012,7 +1273,7 @@ public final class Postbox<State: PostboxState> {
|
||||
bound = Int64(Int32.min)
|
||||
}
|
||||
|
||||
for row in statement.run(Int64(peerId.toInt64()), Int64(namespace), bound) {
|
||||
for row in statement.run(Int64(peerId.toInt64()), Int64(namespace), bound, Int64(count)) {
|
||||
let data = (row[0] as! Blob).data
|
||||
let decoder = Decoder(buffer: ReadBuffer(memory: UnsafeMutablePointer(data.bytes), length: data.length, freeWhenDone: false))
|
||||
if let message = decoder.decodeRootObject() as? Message {
|
||||
@@ -1022,7 +1283,7 @@ public final class Postbox<State: PostboxState> {
|
||||
}
|
||||
}
|
||||
|
||||
return messages
|
||||
return self.renderedMessages(messages)
|
||||
}
|
||||
|
||||
private func fetchPeerEntryIndicesRelative(earlier: Bool)(index: PeerViewEntryIndex?, count: Int) -> [PeerViewEntryIndex] {
|
||||
@@ -1049,12 +1310,12 @@ public final class Postbox<State: PostboxState> {
|
||||
return entries
|
||||
}
|
||||
|
||||
private func messageForPeer(peerId: PeerId, id: MessageId) -> Message? {
|
||||
private func messageForPeer(peerId: PeerId, id: MessageId) -> RenderedMessage? {
|
||||
for row in self.database.prepareCached("SELECT data, associatedMediaIds FROM peer_messages WHERE peerId = ? AND namespace = ? AND id = ?").run(peerId.toInt64(), Int64(id.namespace), Int64(id.id)) {
|
||||
let data = (row[0] as! Blob).data
|
||||
let decoder = Decoder(buffer: ReadBuffer(memory: UnsafeMutablePointer(data.bytes), length: data.length, freeWhenDone: false))
|
||||
if let message = decoder.decodeRootObject() as? Message {
|
||||
return message
|
||||
return self.renderedMessages([message]).first
|
||||
} else {
|
||||
print("(PostBox: can't decode message)")
|
||||
}
|
||||
@@ -1098,38 +1359,98 @@ public final class Postbox<State: PostboxState> {
|
||||
return entries
|
||||
}
|
||||
|
||||
private func fetchMessagesTail(peerId: PeerId, count: Int) -> [Message] {
|
||||
var messages: [Message] = []
|
||||
private func renderedMessages(messages: [Message]) -> [RenderedMessage] {
|
||||
if messages.count == 0 {
|
||||
return []
|
||||
}
|
||||
|
||||
var peerIds = Set<PeerId>()
|
||||
var mediaIds = Set<MediaId>()
|
||||
|
||||
for message in messages {
|
||||
for peerId in message.peerIds {
|
||||
peerIds.insert(peerId)
|
||||
}
|
||||
for mediaId in message.mediaIds {
|
||||
mediaIds.insert(mediaId)
|
||||
}
|
||||
}
|
||||
|
||||
var arrayPeerIds: [PeerId] = []
|
||||
for id in peerIds {
|
||||
arrayPeerIds.append(id)
|
||||
}
|
||||
let peers = self.peersWithIds(arrayPeerIds)
|
||||
|
||||
var arrayMediaIds: [MediaId] = []
|
||||
for id in mediaIds {
|
||||
arrayMediaIds.append(id)
|
||||
}
|
||||
let medias = self.mediaWithIds(arrayMediaIds)
|
||||
|
||||
var result: [RenderedMessage] = []
|
||||
|
||||
for message in messages {
|
||||
if message.peerIds.count == 0 && message.mediaIds.count == 0 {
|
||||
result.append(RenderedMessage(message: message, peers: [], media: []))
|
||||
} else {
|
||||
var messagePeers: [Peer] = []
|
||||
for id in message.peerIds {
|
||||
if let peer = peers[id] {
|
||||
messagePeers.append(peer)
|
||||
}
|
||||
}
|
||||
|
||||
var messageMedia: [Media] = []
|
||||
for id in message.mediaIds {
|
||||
if let media = medias[id] {
|
||||
messageMedia.append(media)
|
||||
}
|
||||
}
|
||||
|
||||
result.append(RenderedMessage(message: message, peers: messagePeers, media: messageMedia))
|
||||
}
|
||||
}
|
||||
|
||||
return result
|
||||
}
|
||||
|
||||
private func fetchMessagesTail(peerId: PeerId, count: Int) -> [RenderedMessage] {
|
||||
var messages: [RenderedMessage] = []
|
||||
|
||||
for namespace in self.messageNamespaces {
|
||||
messages += self.fetchMessagesRelative(peerId, earlier: true)(namespace: namespace, id: nil, count: count)
|
||||
}
|
||||
|
||||
messages.sortInPlace({ MessageIndex($0) < MessageIndex($1)})
|
||||
messages.sortInPlace({ MessageIndex($0.message) < MessageIndex($1.message)})
|
||||
|
||||
return messages
|
||||
}
|
||||
|
||||
public func tailMessageViewForPeerId(peerId: PeerId, count: Int) -> Signal<MessageView, NoError> {
|
||||
return Signal { subscriber in
|
||||
let startTime = CFAbsoluteTimeGetCurrent()
|
||||
|
||||
let disposable = MetaDisposable()
|
||||
|
||||
self.queue.dispatch {
|
||||
let tail = self.fetchMessagesTail(peerId, count: count + 1)
|
||||
|
||||
var messages: [Message] = []
|
||||
print("tailMessageViewForPeerId fetch: \((CFAbsoluteTimeGetCurrent() - startTime) * 1000.0) ms")
|
||||
|
||||
var messages: [RenderedMessage] = []
|
||||
var i = tail.count - 1
|
||||
while i >= 0 && i >= tail.count - count {
|
||||
messages.insert(tail[i], atIndex: 0)
|
||||
i--
|
||||
}
|
||||
|
||||
var earlier: [MessageId.Namespace : Message] = [:]
|
||||
var earlier: [MessageId.Namespace : RenderedMessage] = [:]
|
||||
|
||||
for namespace in self.messageNamespaces {
|
||||
var i = tail.count - count - 1
|
||||
while i >= 0 {
|
||||
if tail[i].id.namespace == namespace {
|
||||
if tail[i].message.id.namespace == namespace {
|
||||
earlier[namespace] = tail[i]
|
||||
break
|
||||
}
|
||||
@@ -1184,19 +1505,19 @@ public final class Postbox<State: PostboxState> {
|
||||
if around.0.count == 0 {
|
||||
let tail = self.fetchMessagesTail(peerId, count: count + 1)
|
||||
|
||||
var messages: [Message] = []
|
||||
var messages: [RenderedMessage] = []
|
||||
var i = tail.count - 1
|
||||
while i >= 0 && i >= tail.count - count {
|
||||
messages.insert(tail[i], atIndex: 0)
|
||||
i--
|
||||
}
|
||||
|
||||
var earlier: [MessageId.Namespace : Message] = [:]
|
||||
var earlier: [MessageId.Namespace : RenderedMessage] = [:]
|
||||
|
||||
for namespace in self.messageNamespaces {
|
||||
var i = tail.count - count - 1
|
||||
while i >= 0 {
|
||||
if tail[i].id.namespace == namespace {
|
||||
if tail[i].message.id.namespace == namespace {
|
||||
earlier[namespace] = tail[i]
|
||||
break
|
||||
}
|
||||
@@ -1249,7 +1570,12 @@ public final class Postbox<State: PostboxState> {
|
||||
let disposable = MetaDisposable()
|
||||
|
||||
self.queue.dispatch {
|
||||
let startTime = CFAbsoluteTimeGetCurrent()
|
||||
|
||||
let tail = self.fetchPeerEntriesRelative(true)(index: nil, count: count + 1)
|
||||
self.fetchPeerEntriesRelative(true)(index: nil, count: count + 1)
|
||||
|
||||
print("(Postbox fetchPeerEntriesRelative took \((CFAbsoluteTimeGetCurrent() - startTime) * 1000.0) ms)")
|
||||
|
||||
var entries: [PeerViewEntry] = []
|
||||
var i = tail.count - 1
|
||||
|
||||
Reference in New Issue
Block a user