mirror of
https://github.com/Swiftgram/Telegram-iOS.git
synced 2025-06-16 05:55:20 +00:00
1051 lines
43 KiB
Swift
1051 lines
43 KiB
Swift
import Foundation
|
|
|
|
import SwiftSignalKit
|
|
|
|
public final class Postbox {
|
|
public class Modifier {
|
|
private unowned var postbox: Postbox
|
|
|
|
private init(postbox: Postbox) {
|
|
self.postbox = postbox
|
|
}
|
|
|
|
public func addMessages(messages: [Message], medias: [Media]) {
|
|
self.postbox.addMessages(messages, medias: medias)
|
|
}
|
|
|
|
public func deleteMessagesWithIds(ids: [MessageId]) {
|
|
self.postbox.deleteMessagesWithIds(ids)
|
|
}
|
|
}
|
|
|
|
private let basePath: String
|
|
private let ownerId: PeerId
|
|
private let messageNamespaces: [MessageId.Namespace]
|
|
|
|
private let queue = SwiftSignalKit.Queue()
|
|
private var database: Database!
|
|
|
|
private var peerMessageViews: [PeerId : Bag<(MutableMessageView, Pipe<MessageView>)>] = [:]
|
|
|
|
public init(basePath: String, ownerId: PeerId, messageNamespaces: [MessageId.Namespace]) {
|
|
self.basePath = basePath
|
|
self.ownerId = ownerId
|
|
self.messageNamespaces = messageNamespaces
|
|
self.openDatabase()
|
|
}
|
|
|
|
private func openDatabase() {
|
|
NSFileManager.defaultManager().createDirectoryAtPath(basePath, withIntermediateDirectories: true, attributes: nil, error: nil)
|
|
self.database = Database(basePath.stringByAppendingPathComponent("db"))
|
|
|
|
self.queue.dispatch {
|
|
let result = self.database.userVersion
|
|
if result == 1 {
|
|
println("(Postbox schema version \(result))")
|
|
} else {
|
|
println("(Postbox creating schema)")
|
|
self.createSchema()
|
|
}
|
|
}
|
|
}
|
|
|
|
private func createSchema() {
|
|
self.database.transaction()
|
|
|
|
//state
|
|
self.database.execute("CREATE TABLE state (key INTEGER PRIMARY KEY, data BLOB)")
|
|
|
|
//peer_messages
|
|
self.database.execute("CREATE TABLE peer_messages (peerId INTEGER, namespace INTEGER, id INTEGER, data BLOB, associatedMediaIds BLOB, timestamp INTEGER, PRIMARY KEY(peerId, namespace, id))")
|
|
|
|
//peer_maxread
|
|
self.database.execute("CREATE TABLE peer_maxread (peerId INTEGER, namespace INTEGER, id INTEGER)")
|
|
|
|
//peer_media
|
|
self.database.execute("CREATE TABLE peer_media (peerId INTEGER, mediaNamespace INTEGER, messageNamespace INTEGER, messageId INTEGER, PRIMARY KEY (peerId, mediaNamespace, messageNamespace, messageId))")
|
|
self.database.execute("CREATE INDEX peer_media_peerId_messageNamespace_messageId ON peer_media (peerId, messageNamespace, messageId)")
|
|
|
|
//media
|
|
self.database.execute("CREATE TABLE media (namespace INTEGER, id INTEGER, data BLOB, associatedMessageIds BLOB, PRIMARY KEY (namespace, id))")
|
|
|
|
//media_cleanup
|
|
self.database.execute("CREATE TABLE media_cleanup (namespace INTEGER, id INTEGER, data BLOB, PRIMARY KEY(namespace, id))")
|
|
|
|
//messages_readstate
|
|
self.database.execute("CREATE TABLE messages_readstate (peerId INTEGER, namespace INTEGER, maxUnreadIncomingId INTEGER, maxUnreadOutgoingId INTEGER, incomingUnreadCount INTEGER, PRIMARY KEY (peerId, namespace))")
|
|
|
|
//top_messages
|
|
self.database.execute("CREATE TABLE top_messages (peerId INTEGER PRIMARY KEY, timestamp INTEGER)")
|
|
|
|
//peers
|
|
self.database.execute("CREATE TABLE peers (peerId INTEGER PRIMARY KEY, data BLOB)")
|
|
|
|
//user_statuses
|
|
self.database.execute("CREATE TABLE user_statuses (id INTEGER PRIMARY KEY, data BLOB)")
|
|
|
|
self.database.commit()
|
|
}
|
|
|
|
private class func messageIdsGroupedByNamespace(ids: [MessageId]) -> [MessageId.Namespace : [MessageId]] {
|
|
var grouped: [MessageId.Namespace : [MessageId]] = [:]
|
|
|
|
for id in ids {
|
|
if grouped[id.namespace] != nil {
|
|
grouped[id.namespace]!.append(id)
|
|
} else {
|
|
grouped[id.namespace] = [id]
|
|
}
|
|
}
|
|
|
|
return grouped
|
|
}
|
|
|
|
private class func mediaIdsGroupedByNamespaceFromMediaArray(mediaArray: [Media]) -> [MediaId.Namespace : [MediaId]] {
|
|
var grouped: [MediaId.Namespace : [MediaId]] = [:]
|
|
var seenMediaIds = Set<MediaId>()
|
|
|
|
for media in mediaArray {
|
|
if !seenMediaIds.contains(media.id) {
|
|
seenMediaIds.insert(media.id)
|
|
if grouped[media.id.namespace] != nil {
|
|
grouped[media.id.namespace]!.append(media.id)
|
|
} else {
|
|
grouped[media.id.namespace] = [media.id]
|
|
}
|
|
}
|
|
}
|
|
|
|
return grouped
|
|
}
|
|
|
|
private class func mediaIdsGroupedByNamespaceFromSet(ids: Set<MediaId>) -> [MediaId.Namespace : [MediaId]] {
|
|
var grouped: [MediaId.Namespace : [MediaId]] = [:]
|
|
|
|
for id in ids {
|
|
if var groupedIds = grouped[id.namespace] {
|
|
grouped[id.namespace]!.append(id)
|
|
} else {
|
|
grouped[id.namespace] = [id]
|
|
}
|
|
}
|
|
|
|
return grouped
|
|
}
|
|
|
|
private class func mediaIdsGroupedByNamespaceFromDictionaryKeys<T>(dict: [MediaId : T]) -> [MediaId.Namespace : [MediaId]] {
|
|
var grouped: [MediaId.Namespace : [MediaId]] = [:]
|
|
|
|
for (id, _) in dict {
|
|
if grouped[id.namespace] != nil {
|
|
grouped[id.namespace]!.append(id)
|
|
} else {
|
|
grouped[id.namespace] = [id]
|
|
}
|
|
}
|
|
|
|
return grouped
|
|
}
|
|
|
|
private class func messagesGroupedByPeerId(messages: [Message]) -> [PeerId : [Message]] {
|
|
var grouped: [PeerId : [Message]] = [:]
|
|
|
|
for message in messages {
|
|
if grouped[message.id.peerId] != nil {
|
|
grouped[message.id.peerId]!.append(message)
|
|
} else {
|
|
grouped[message.id.peerId] = [message]
|
|
}
|
|
}
|
|
|
|
return grouped
|
|
}
|
|
|
|
private class func messageIdsGroupedByPeerId(messageIds: [MessageId]) -> [PeerId : [MessageId]] {
|
|
var grouped: [PeerId : [MessageId]] = [:]
|
|
|
|
for id in messageIds {
|
|
if grouped[id.peerId] != nil {
|
|
grouped[id.peerId]!.append(id)
|
|
} else {
|
|
grouped[id.peerId] = [id]
|
|
}
|
|
}
|
|
|
|
return grouped
|
|
}
|
|
|
|
private class func blobForMediaIds(ids: [MediaId]) -> Blob {
|
|
let data = NSMutableData()
|
|
var version: Int8 = 1
|
|
data.appendBytes(&version, length: 1)
|
|
|
|
var count = Int32(ids.count)
|
|
data.appendBytes(&count, length:4)
|
|
|
|
for id in ids {
|
|
var mNamespace = id.namespace
|
|
var mId = id.id
|
|
data.appendBytes(&mNamespace, length: 4)
|
|
data.appendBytes(&mId, length: 8)
|
|
}
|
|
|
|
return Blob(data: data)
|
|
}
|
|
|
|
private static func mediaIdsForBlob(blob: Blob) -> [MediaId] {
|
|
var ids: [MediaId] = []
|
|
|
|
var offset: Int = 0
|
|
var version = 0
|
|
blob.data.getBytes(&version, range: NSMakeRange(offset, 1))
|
|
offset += 1
|
|
|
|
if version == 1 {
|
|
var count: Int32 = 0
|
|
blob.data.getBytes(&count, range: NSMakeRange(offset, 4))
|
|
offset += 4
|
|
|
|
var i = 0
|
|
while i < Int(count) {
|
|
var mNamespace: Int32 = 0
|
|
var mId: Int64 = 0
|
|
blob.data.getBytes(&mNamespace, range: NSMakeRange(offset, 4))
|
|
blob.data.getBytes(&mId, range: NSMakeRange(offset + 4, 8))
|
|
ids.append(MediaId(namespace: mNamespace, id: mId))
|
|
offset += 12
|
|
i++
|
|
}
|
|
}
|
|
|
|
return ids
|
|
}
|
|
|
|
private class func blobForMessageIds(ids: [MessageId]) -> Blob {
|
|
let data = NSMutableData()
|
|
var version: Int8 = 1
|
|
data.appendBytes(&version, length: 1)
|
|
|
|
var count = Int32(ids.count)
|
|
data.appendBytes(&count, length:4)
|
|
|
|
for id in ids {
|
|
var mPeerNamespace = id.peerId.namespace
|
|
var mPeerId = id.peerId.id
|
|
var mNamespace = id.namespace
|
|
var mId = id.id
|
|
data.appendBytes(&mPeerNamespace, length: 4)
|
|
data.appendBytes(&mPeerId, length: 4)
|
|
data.appendBytes(&mNamespace, length: 4)
|
|
data.appendBytes(&mId, length: 4)
|
|
}
|
|
|
|
return Blob(data: data)
|
|
}
|
|
|
|
private static func messageIdsForBlob(blob: Blob) -> [MessageId] {
|
|
var ids: [MessageId] = []
|
|
|
|
var offset: Int = 0
|
|
var version = 0
|
|
blob.data.getBytes(&version, range: NSMakeRange(offset, 1))
|
|
offset += 1
|
|
|
|
if version == 1 {
|
|
var count: Int32 = 0
|
|
blob.data.getBytes(&count, range: NSMakeRange(offset, 4))
|
|
offset += 4
|
|
|
|
var i = 0
|
|
while i < Int(count) {
|
|
var mPeerNamespace: Int32 = 0
|
|
var mPeerId: Int32 = 0
|
|
var mNamespace: Int32 = 0
|
|
var mId: Int32 = 0
|
|
blob.data.getBytes(&mPeerNamespace, range: NSMakeRange(offset, 4))
|
|
blob.data.getBytes(&mPeerId, range: NSMakeRange(offset + 4, 4))
|
|
blob.data.getBytes(&mNamespace, range: NSMakeRange(offset + 8, 4))
|
|
blob.data.getBytes(&mId, range: NSMakeRange(offset + 12, 4))
|
|
ids.append(MessageId(peerId: PeerId(namespace: mPeerNamespace, id: mPeerId), namespace: mNamespace, id: mId))
|
|
offset += 16
|
|
i++
|
|
}
|
|
}
|
|
|
|
return ids
|
|
}
|
|
|
|
private func addMessages(messages: [Message], medias: [Media]) {
|
|
let messageInsertStatement = self.database.prepare("INSERT INTO peer_messages (peerId, namespace, id, data, associatedMediaIds, timestamp) VALUES (?, ?, ?, ?, ?, ?)")
|
|
let peerMediaInsertStatement = self.database.prepare("INSERT INTO peer_media (peerId, mediaNamespace, messageNamespace, messageId) VALUES (?, ?, ?, ?)")
|
|
let mediaInsertStatement = self.database.prepare("INSERT INTO media (namespace, id, data, associatedMessageIds) VALUES (?, ?, ?, ?)")
|
|
let referencedMessageIdsStatement = self.database.prepare("SELECT associatedMessageIds FROM media WHERE namespace = ? AND id = ?")
|
|
let updateReferencedMessageIdsStatement = self.database.prepare("UPDATE media SET associatedMessageIds = ? WHERE namespace = ? AND id = ?")
|
|
|
|
let encoder = Encoder()
|
|
|
|
var messageIdsByMediaId: [MediaId : [MessageId]] = [:]
|
|
for (peerId, peerMessages) in Postbox.messagesGroupedByPeerId(messages) {
|
|
var maxTimestamp: Int32 = Int32.min
|
|
|
|
var messageIds: [MessageId] = []
|
|
var seenMessageIds = Set<MessageId>()
|
|
for message in peerMessages {
|
|
if !seenMessageIds.contains(message.id) {
|
|
seenMessageIds.insert(message.id)
|
|
messageIds.append(message.id)
|
|
}
|
|
}
|
|
|
|
var existingMessageIds = Set<MessageId>()
|
|
|
|
let messageIdsByNamespace = Postbox.messageIdsGroupedByNamespace(messageIds)
|
|
for (namespace, ids) in messageIdsByNamespace {
|
|
var queryString = "SELECT id FROM peer_messages WHERE peerId = ? AND namespace = ? AND id IN ("
|
|
var first = true
|
|
for id in ids {
|
|
if first {
|
|
first = false
|
|
} else {
|
|
queryString += ","
|
|
}
|
|
queryString += "\(id.id)"
|
|
}
|
|
queryString += ")"
|
|
|
|
let statement = self.database.prepare(queryString)
|
|
for row in statement.run(peerId.toInt64(), Int64(namespace)) {
|
|
existingMessageIds.insert(MessageId(peerId: peerId, namespace: namespace, id: Int32(row[0] as! Int64)))
|
|
}
|
|
}
|
|
|
|
var relatedViews = self.peerMessageViews[peerId] ?? Bag()
|
|
|
|
for message in peerMessages {
|
|
if existingMessageIds.contains(message.id) {
|
|
continue
|
|
}
|
|
existingMessageIds.insert(message.id)
|
|
|
|
for record in relatedViews.copyItems() {
|
|
record.0.add(message)
|
|
}
|
|
|
|
if maxTimestamp == Int32.min || message.timestamp > maxTimestamp {
|
|
maxTimestamp = message.timestamp
|
|
}
|
|
|
|
encoder.reset()
|
|
encoder.encodeRootObject(message)
|
|
let messageBlob = Blob(data: encoder.makeData())
|
|
|
|
let referencedMediaIdsMediaIdsBlob = Postbox.blobForMediaIds(message.referencedMediaIds)
|
|
for id in message.referencedMediaIds {
|
|
if messageIdsByMediaId[id] != nil {
|
|
messageIdsByMediaId[id]!.append(message.id)
|
|
} else {
|
|
messageIdsByMediaId[id] = [message.id]
|
|
}
|
|
}
|
|
|
|
messageInsertStatement.run(peerId.toInt64(), Int64(message.id.namespace), Int64(message.id.id), messageBlob, referencedMediaIdsMediaIdsBlob, Int64(message.timestamp))
|
|
|
|
for id in message.referencedMediaIds {
|
|
peerMediaInsertStatement.run(peerId.toInt64(), Int64(id.namespace), Int64(message.id.namespace), Int64(message.id.id))
|
|
}
|
|
}
|
|
|
|
for record in relatedViews.copyItems() {
|
|
record.1.putNext(MessageView(record.0))
|
|
}
|
|
|
|
var currentMaxTimestamp: Int32?
|
|
for row in self.database.prepare("SELECT timestamp FROM top_messages WHERE peerId = ?").run(peerId.toInt64()) {
|
|
currentMaxTimestamp = Int32(row[0] as! Int64)
|
|
}
|
|
|
|
if let currentMaxTimestamp = currentMaxTimestamp {
|
|
if maxTimestamp > currentMaxTimestamp {
|
|
self.database.prepare("UPDATE top_messages SET timestamp = ? WHERE peerId = ?").run(Int64(maxTimestamp), peerId.toInt64())
|
|
}
|
|
} else {
|
|
self.database.prepare("INSERT INTO top_messages(peerId, timestamp) VALUES(?, ?)").run(peerId.toInt64(), Int64(maxTimestamp))
|
|
}
|
|
}
|
|
|
|
var existingMediaIds = Set<MediaId>()
|
|
|
|
for (namespace, ids) in Postbox.mediaIdsGroupedByNamespaceFromMediaArray(medias) {
|
|
var queryString = "SELECT id FROM media WHERE namespace = ? AND id IN (";
|
|
var first = true
|
|
for id in ids {
|
|
if first {
|
|
first = false
|
|
} else {
|
|
queryString += ","
|
|
}
|
|
queryString += "\(id.id)"
|
|
}
|
|
queryString += ")"
|
|
for row in self.database.prepare(queryString).run(Int64(namespace)) {
|
|
existingMediaIds.insert(MediaId(namespace: namespace, id: row[0] as! Int64))
|
|
}
|
|
}
|
|
|
|
var processedMediaIdsForMessageIds = Set<MediaId>()
|
|
for media in medias {
|
|
if existingMediaIds.contains(media.id) {
|
|
continue
|
|
}
|
|
existingMediaIds.insert(media.id)
|
|
|
|
encoder.reset()
|
|
encoder.encodeRootObject(media)
|
|
let mediaBlob = Blob(data: encoder.makeData())
|
|
|
|
processedMediaIdsForMessageIds.insert(media.id)
|
|
if let messageIds = messageIdsByMediaId[media.id] {
|
|
let referencedMessageIdsBlob = Postbox.blobForMessageIds(messageIds)
|
|
mediaInsertStatement.run(Int64(media.id.namespace), media.id.id, mediaBlob, referencedMessageIdsBlob)
|
|
}
|
|
}
|
|
|
|
var updatedMessageIdsBlobByMediaId: [MediaId : Blob] = [:]
|
|
for (mediaId, messageIds) in messageIdsByMediaId {
|
|
if !processedMediaIdsForMessageIds.contains(mediaId) {
|
|
for row in referencedMessageIdsStatement.run(Int64(mediaId.namespace), mediaId.id) {
|
|
var currentMessageIds = Postbox.messageIdsForBlob(row[0] as! Blob)
|
|
currentMessageIds += messageIds
|
|
updatedMessageIdsBlobByMediaId[mediaId] = Postbox.blobForMessageIds(currentMessageIds)
|
|
}
|
|
}
|
|
}
|
|
|
|
for (mediaId, messageIds) in updatedMessageIdsBlobByMediaId {
|
|
updateReferencedMessageIdsStatement.run(messageIds, Int64(mediaId.namespace), mediaId.id)
|
|
}
|
|
}
|
|
|
|
private func loadMessageIdsByMediaIdForPeerId(peerId: PeerId, idsByNamespace: [MessageId.Namespace : [MessageId]]) -> [MediaId : [MessageId]] {
|
|
var grouped: [MediaId : [MessageId]] = [:]
|
|
|
|
for (namespace, ids) in idsByNamespace {
|
|
var messageIdByNamespaceAndIdQuery = "SELECT messageId FROM peer_media WHERE peerId = ? AND messageNamespace = ? AND messageId IN ("
|
|
var first = true
|
|
for id in ids {
|
|
if first {
|
|
first = false
|
|
} else {
|
|
messageIdByNamespaceAndIdQuery += ","
|
|
}
|
|
messageIdByNamespaceAndIdQuery += "\(id.id)"
|
|
}
|
|
messageIdByNamespaceAndIdQuery += ")"
|
|
|
|
var messageIdsWithMedia: [MessageId] = []
|
|
for row in self.database.prepare(messageIdByNamespaceAndIdQuery).run(peerId.toInt64(), Int64(namespace)) {
|
|
messageIdsWithMedia.append(MessageId(peerId: peerId, namespace: namespace, id:Int32(row[0] as! Int64)))
|
|
}
|
|
|
|
var associatedMediaIdsQueryString = "SELECT id, associatedMediaIds FROM peer_messages WHERE peerId = ? AND namespace = ? AND id IN ("
|
|
first = true
|
|
for id in messageIdsWithMedia {
|
|
if first {
|
|
first = false
|
|
} else {
|
|
associatedMediaIdsQueryString += ","
|
|
}
|
|
associatedMediaIdsQueryString += "\(id.id)"
|
|
}
|
|
associatedMediaIdsQueryString += ")"
|
|
|
|
for row in self.database.prepare(associatedMediaIdsQueryString).run(peerId.toInt64(), Int64(namespace)) {
|
|
let id = MessageId(peerId: peerId, namespace: namespace, id: Int32(row[0] as! Int64))
|
|
let referencedMediaIds = Postbox.mediaIdsForBlob(row[1] as! Blob)
|
|
for mediaId in referencedMediaIds {
|
|
if grouped[mediaId] != nil {
|
|
grouped[mediaId]!.append(id)
|
|
} else {
|
|
grouped[mediaId] = [id]
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
return grouped
|
|
}
|
|
|
|
private func updateTopMessageTimestampForPeerId(peerId: PeerId) {
|
|
let selectIdStatement = self.database.prepare("SELECT MAX(id) FROM peer_messages WHERE peerId = ? AND namespace = ?")
|
|
let selectTimestampStatement = self.database.prepare("SELECT timestamp FROM peer_messages WHERE peerId = ? AND namespace = ? AND id = ?")
|
|
let updateTimestampStatement = self.database.prepare("UPDATE top_messages SET timestamp = ? WHERE peerId = ?")
|
|
|
|
var maxTimestamp = Int32.min
|
|
for namespace in self.messageNamespaces {
|
|
for rowId in selectIdStatement.run(peerId.toInt64(), Int64(namespace)) {
|
|
for rowTimestamp in selectTimestampStatement.run(peerId.toInt64(), Int64(namespace), rowId[0]) {
|
|
let timestamp = Int32(rowTimestamp[0] as! Int64)
|
|
if timestamp > maxTimestamp {
|
|
maxTimestamp = timestamp
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
if maxTimestamp != Int32.min {
|
|
updateTimestampStatement.run(Int64(maxTimestamp), peerId.toInt64())
|
|
}
|
|
}
|
|
|
|
private func deleteMessagesWithIds(ids: [MessageId]) {
|
|
for (peerId, messageIds) in Postbox.messageIdsGroupedByPeerId(ids) {
|
|
let messageIdsByNamespace = Postbox.messageIdsGroupedByNamespace(messageIds)
|
|
let messageIdsByMediaId = self.loadMessageIdsByMediaIdForPeerId(peerId, idsByNamespace: messageIdsByNamespace)
|
|
|
|
for (namespace, messageIds) in messageIdsByNamespace {
|
|
var queryString = "DELETE FROM peer_messages WHERE peerId = ? AND namespace = ? AND id IN ("
|
|
var first = true
|
|
for id in messageIds {
|
|
if first {
|
|
first = false
|
|
} else {
|
|
queryString += ","
|
|
}
|
|
queryString += "\(id.id)"
|
|
}
|
|
queryString += ")"
|
|
self.database.prepare(queryString).run(peerId.toInt64(), Int64(namespace))
|
|
}
|
|
|
|
for (namespace, messageIds) in messageIdsByNamespace {
|
|
var queryString = "DELETE FROM peer_media WHERE peerId = ? AND messageNamespace = ? AND messageId IN ("
|
|
var first = true
|
|
for id in messageIds {
|
|
if first {
|
|
first = false
|
|
} else {
|
|
queryString += ","
|
|
}
|
|
queryString += "\(id.id)"
|
|
}
|
|
queryString += ")"
|
|
self.database.prepare(queryString).run(peerId.toInt64(), Int64(namespace))
|
|
}
|
|
|
|
let mediaIdsByNamespace = Postbox.mediaIdsGroupedByNamespaceFromDictionaryKeys(messageIdsByMediaId)
|
|
var updatedMessageIdsByMediaId: [MediaId : [MessageId]] = [:]
|
|
|
|
for (namespace, mediaIds) in mediaIdsByNamespace {
|
|
var queryString = "SELECT id, data, associatedMessageIds FROM media WHERE namespace = ? AND id in ("
|
|
var first = true
|
|
for id in mediaIds {
|
|
if first {
|
|
first = false
|
|
} else {
|
|
queryString += ","
|
|
}
|
|
queryString += "\(id.id)"
|
|
}
|
|
queryString += ")"
|
|
for row in self.database.prepare(queryString).run(Int64(namespace)) {
|
|
let mediaId = MediaId(namespace: namespace, id: row[0] as! Int64)
|
|
if let removedMessageIds = messageIdsByMediaId[mediaId] {
|
|
var messageIds = Postbox.messageIdsForBlob(row[2] as! Blob).filter {
|
|
!contains(removedMessageIds, $0)
|
|
}
|
|
updatedMessageIdsByMediaId[mediaId] = messageIds
|
|
|
|
if messageIds.count == 0 {
|
|
self.database.prepare("INSERT OR IGNORE INTO media_cleanup (namespace, id, data) VALUES (?, ?, ?)").run(Int64(namespace), mediaId.id, row[1] as! Blob)
|
|
}
|
|
}
|
|
}
|
|
|
|
for (mediaId, messageIds) in updatedMessageIdsByMediaId {
|
|
if messageIds.count == 0 {
|
|
self.database.prepare("DELETE FROM media WHERE namespace = ? AND id = ?").run(Int64(mediaId.namespace), mediaId.id)
|
|
} else {
|
|
self.database.prepare("UPDATE media SET associatedMessageIds = ? WHERE namespace = ? AND id = ?").run(Postbox.blobForMessageIds(messageIds), Int64(mediaId.namespace), mediaId.id)
|
|
}
|
|
}
|
|
}
|
|
|
|
self.updateTopMessageTimestampForPeerId(peerId)
|
|
}
|
|
}
|
|
|
|
public func modify(f: Modifier -> Void) {
|
|
self.queue.dispatch {
|
|
self.database.transaction()
|
|
f(Modifier(postbox: self))
|
|
self.database.commit()
|
|
}
|
|
}
|
|
|
|
private struct FetchedMessage {
|
|
let id: MessageId
|
|
let timestamp: Int32
|
|
let data: NSData
|
|
}
|
|
|
|
private func findAdjacentMessageIds(peerId: PeerId, namespace: MessageId.Namespace, index: MessageIndex) -> (MessageId.Id?, MessageId.Id?) {
|
|
var minId: MessageId.Id?
|
|
var maxId: MessageId.Id?
|
|
for row in self.database.prepare("SELECT MIN(id), MAX(id) FROM peer_messages WHERE peerId = ? AND namespace = ?").run(peerId.toInt64(), Int64(namespace)) {
|
|
minId = MessageId.Id(row[0] as! Int64)
|
|
maxId = MessageId.Id(row[1] as! Int64)
|
|
}
|
|
|
|
if let minId = minId, maxId = maxId {
|
|
var minTimestamp: Int32!
|
|
var maxTimestamp: Int32!
|
|
for row in self.database.prepare("SELECT id, timestamp FROM peer_messages WHERE peerId = ? AND namespace = ? AND id IN (?, ?)").run(peerId.toInt64(), Int64(namespace), Int64(minId), Int64(maxId)) {
|
|
let id = Int32(row[0] as! Int64)
|
|
let timestamp = Int32(row[1] as! Int64)
|
|
if id == minId {
|
|
minTimestamp = timestamp
|
|
} else {
|
|
maxTimestamp = timestamp
|
|
}
|
|
}
|
|
|
|
let earlierMidStatement = self.database.prepare("SELECT id, timestamp FROM peer_messages WHERE peerId = ? AND namespace = ? AND id <= ? LIMIT 1")
|
|
let laterMidStatement = self.database.prepare("SELECT id, timestamp FROM peer_messages WHERE peerId = ? AND namespace = ? AND id >= ? LIMIT 1")
|
|
|
|
func lowerBound(timestamp: Int32) -> MessageId.Id? {
|
|
var leftId = minId
|
|
var leftTimestamp = minTimestamp
|
|
var rightId = maxId
|
|
var rightTimestamp = maxTimestamp
|
|
|
|
while leftTimestamp <= timestamp && rightTimestamp >= timestamp {
|
|
let approximateMiddleId = leftId + (rightId - leftId) / 2
|
|
if approximateMiddleId == leftId {
|
|
return rightId
|
|
}
|
|
var middleId: MessageId.Id?
|
|
var middleTimestamp: Int32?
|
|
for row in earlierMidStatement.run(peerId.toInt64(), Int64(namespace), Int64(approximateMiddleId)) {
|
|
middleId = MessageId.Id(row[0] as! Int64)
|
|
middleTimestamp = Int32(row[1] as! Int64)
|
|
break
|
|
}
|
|
if middleId == leftId {
|
|
return rightId
|
|
}
|
|
|
|
if let middleId = middleId, middleTimestamp = middleTimestamp {
|
|
if middleTimestamp >= timestamp {
|
|
rightId = middleId
|
|
rightTimestamp = middleTimestamp
|
|
} else {
|
|
leftId = middleId
|
|
leftTimestamp = middleTimestamp
|
|
}
|
|
} else {
|
|
return nil
|
|
}
|
|
}
|
|
|
|
return leftId
|
|
}
|
|
|
|
func upperBound(timestamp: Int32) -> MessageId.Id? {
|
|
var leftId = minId
|
|
var leftTimestamp = minTimestamp
|
|
var rightId = maxId
|
|
var rightTimestamp = maxTimestamp
|
|
|
|
while leftTimestamp <= timestamp && rightTimestamp >= timestamp {
|
|
let approximateMiddleId = leftId + (rightId - leftId) / 2
|
|
if approximateMiddleId == leftId {
|
|
return leftId
|
|
}
|
|
var middleId: MessageId.Id?
|
|
var middleTimestamp: Int32?
|
|
for row in earlierMidStatement.run(peerId.toInt64(), Int64(namespace), Int64(approximateMiddleId)) {
|
|
middleId = MessageId.Id(row[0] as! Int64)
|
|
middleTimestamp = Int32(row[1] as! Int64)
|
|
break
|
|
}
|
|
if middleId == leftId {
|
|
return leftId
|
|
}
|
|
|
|
if let middleId = middleId, middleTimestamp = middleTimestamp {
|
|
if middleTimestamp <= timestamp {
|
|
rightId = middleId
|
|
rightTimestamp = middleTimestamp
|
|
} else {
|
|
leftId = middleId
|
|
leftTimestamp = middleTimestamp
|
|
}
|
|
} else {
|
|
return nil
|
|
}
|
|
}
|
|
|
|
return rightTimestamp
|
|
}
|
|
|
|
if index.id.namespace < namespace {
|
|
let left = upperBound(index.timestamp - 1)
|
|
let right = lowerBound(index.timestamp)
|
|
return (left, right)
|
|
} else {
|
|
let left = upperBound(index.timestamp)
|
|
let right = lowerBound(index.timestamp + 1)
|
|
return (left, right)
|
|
}
|
|
} else {
|
|
return (nil, nil)
|
|
}
|
|
}
|
|
|
|
private func fetchMessagesAround(peerId: PeerId, anchorId: MessageId, count: Int) -> ([Message], [MessageId.Namespace : Message], [MessageId.Namespace : Message]) {
|
|
var messages: [Message] = []
|
|
|
|
messages += self.fetchMessagesRelative(peerId, earlier: true)(namespace: anchorId.namespace, id: anchorId.id, count: count + 1)
|
|
messages += self.fetchMessagesRelative(peerId, earlier: false)(namespace: anchorId.namespace, id: anchorId.id - 1, count: count + 1)
|
|
|
|
messages.sort({ MessageIndex($0) < MessageIndex($1) })
|
|
var i = messages.count - 1
|
|
while i >= 1 {
|
|
if messages[i].id == messages[i - 1].id {
|
|
messages.removeAtIndex(i)
|
|
}
|
|
i--
|
|
}
|
|
|
|
if messages.count == 0 {
|
|
return ([], [:], [:])
|
|
} else {
|
|
var index: MessageIndex!
|
|
for message in messages {
|
|
if message.id == anchorId {
|
|
index = MessageIndex(message)
|
|
break
|
|
}
|
|
}
|
|
if index == nil {
|
|
var closestId: MessageId.Id = messages[0].id.id
|
|
var closestDistance = abs(closestId - anchorId.id)
|
|
var closestTimestamp: Int32 = messages[0].timestamp
|
|
for message in messages {
|
|
if abs(message.id.id - anchorId.id) < closestDistance {
|
|
closestId = message.id.id
|
|
closestDistance = abs(message.id.id - anchorId.id)
|
|
}
|
|
}
|
|
index = MessageIndex(id: MessageId(peerId: peerId, namespace: anchorId.namespace, id: closestId), timestamp: closestTimestamp)
|
|
}
|
|
|
|
for namespace in self.messageNamespaces {
|
|
if namespace != anchorId.namespace {
|
|
let (left, right) = self.findAdjacentMessageIds(peerId, namespace: namespace, index: index)
|
|
if let left = left {
|
|
messages += self.fetchMessagesRelative(peerId, earlier: true)(namespace: namespace, id: left + 1, count: count + 1)
|
|
}
|
|
if let right = right {
|
|
messages += self.fetchMessagesRelative(peerId, earlier: false)(namespace: namespace, id: right - 1, count: count + 1)
|
|
}
|
|
}
|
|
}
|
|
|
|
messages.sort({ MessageIndex($0) < MessageIndex($1) })
|
|
var i = messages.count - 1
|
|
while i >= 1 {
|
|
if messages[i].id == messages[i - 1].id {
|
|
messages.removeAtIndex(i)
|
|
}
|
|
i--
|
|
}
|
|
|
|
var anchorIndex = messages.count / 2
|
|
i = 0
|
|
while i < messages.count {
|
|
if messages[i].id == index.id {
|
|
anchorIndex = i
|
|
break
|
|
}
|
|
i++
|
|
}
|
|
|
|
var filteredMessages: [Message] = []
|
|
var earlier: [MessageId.Namespace : Message] = [:]
|
|
var later: [MessageId.Namespace : Message] = [:]
|
|
|
|
i = anchorIndex
|
|
var j = anchorIndex - 1
|
|
var leftIndex = j
|
|
var rightIndex = i
|
|
|
|
while i < messages.count || j >= 0 {
|
|
if i < messages.count && filteredMessages.count < count {
|
|
filteredMessages.append(messages[i])
|
|
rightIndex = i
|
|
}
|
|
if j >= 0 && filteredMessages.count < count {
|
|
filteredMessages.append(messages[j])
|
|
leftIndex = j
|
|
}
|
|
|
|
i++
|
|
j--
|
|
}
|
|
|
|
i = leftIndex - 1
|
|
while i >= 0 {
|
|
if earlier[messages[i].id.namespace] == nil {
|
|
earlier[messages[i].id.namespace] = messages[i]
|
|
}
|
|
i--
|
|
}
|
|
|
|
i = rightIndex + 1
|
|
while i < messages.count {
|
|
if later[messages[i].id.namespace] == nil {
|
|
later[messages[i].id.namespace] = messages[i]
|
|
}
|
|
i++
|
|
}
|
|
|
|
filteredMessages.sort({ MessageIndex($0) < MessageIndex($1) })
|
|
|
|
return (filteredMessages, earlier, later)
|
|
}
|
|
}
|
|
|
|
private func fetchMessagesRelative(peerId: PeerId, earlier: Bool)(namespace: MessageId.Namespace, id: MessageId.Id?, count: Int) -> [Message] {
|
|
var messages: [Message] = []
|
|
|
|
let sign = earlier ? "<" : ">"
|
|
let order = earlier ? "DESC" : "ASC"
|
|
let statement = self.database.prepare("SELECT data, associatedMediaIds FROM peer_messages WHERE peerId = ? AND namespace = ? AND id \(sign) ? ORDER BY id \(order) LIMIT \(count)")
|
|
let bound: Int64
|
|
if let id = id {
|
|
bound = Int64(id)
|
|
} else if earlier {
|
|
bound = Int64(Int32.max)
|
|
} else {
|
|
bound = Int64(Int32.min)
|
|
}
|
|
|
|
for row in statement.run(Int64(peerId.toInt64()), Int64(namespace), bound) {
|
|
let data = (row[0] as! Blob).data
|
|
let decoder = Decoder(buffer: ReadBuffer(memory: UnsafeMutablePointer(data.bytes), length: data.length, freeWhenDone: false))
|
|
if let message = decoder.decodeRootObject() as? Message {
|
|
messages.append(message)
|
|
} else {
|
|
println("(PostBox: can't decode message)")
|
|
}
|
|
}
|
|
|
|
return messages
|
|
}
|
|
|
|
private func fetchMessagesTail(peerId: PeerId, count: Int) -> [Message] {
|
|
var messages: [Message] = []
|
|
|
|
for namespace in self.messageNamespaces {
|
|
messages += self.fetchMessagesRelative(peerId, earlier: true)(namespace: namespace, id: nil, count: count)
|
|
}
|
|
|
|
messages.sort({ MessageIndex($0) < MessageIndex($1)})
|
|
|
|
return messages
|
|
}
|
|
|
|
public func tailMessageViewForPeerId(peerId: PeerId, count: Int) -> Signal<MessageView, NoError> {
|
|
return Signal { subscriber in
|
|
let disposable = MetaDisposable()
|
|
|
|
self.queue.dispatch {
|
|
let tail = self.fetchMessagesTail(peerId, count: count + 1)
|
|
|
|
var messages: [Message] = []
|
|
var i = tail.count - 1
|
|
while i >= 0 && i >= tail.count - count {
|
|
messages.insert(tail[i], atIndex: 0)
|
|
i--
|
|
}
|
|
|
|
var earlier: [MessageId.Namespace : Message] = [:]
|
|
|
|
for namespace in self.messageNamespaces {
|
|
var i = tail.count - count - 1
|
|
while i >= 0 {
|
|
if tail[i].id.namespace == namespace {
|
|
earlier[namespace] = tail[i]
|
|
break
|
|
}
|
|
i--
|
|
}
|
|
}
|
|
|
|
let mutableView = MutableMessageView(namespaces: self.messageNamespaces, count: count, earlier: earlier, messages: messages, later: [:])
|
|
let record = (mutableView, Pipe<MessageView>())
|
|
|
|
let index: Bag<(MutableMessageView, Pipe<MessageView>)>.Index
|
|
if let bag = self.peerMessageViews[peerId] {
|
|
index = bag.add(record)
|
|
} else {
|
|
let bag = Bag<(MutableMessageView, Pipe<MessageView>)>()
|
|
index = bag.add(record)
|
|
self.peerMessageViews[peerId] = bag
|
|
}
|
|
|
|
subscriber.putNext(MessageView(mutableView))
|
|
|
|
let pipeDisposable = record.1.signal().start(next: { next in
|
|
subscriber.putNext(next)
|
|
})
|
|
|
|
disposable.set(ActionDisposable { [weak self] in
|
|
pipeDisposable.dispose()
|
|
|
|
if let strongSelf = self {
|
|
strongSelf.queue.dispatch {
|
|
if let bag = strongSelf.peerMessageViews[peerId] {
|
|
bag.remove(index)
|
|
}
|
|
}
|
|
}
|
|
return
|
|
})
|
|
}
|
|
|
|
return disposable
|
|
}
|
|
}
|
|
|
|
public func aroundMessageViewForPeerId(peerId: PeerId, id: MessageId, count: Int) -> Signal<MessageView, NoError> {
|
|
return Signal { subscriber in
|
|
let disposable = MetaDisposable()
|
|
|
|
self.queue.dispatch {
|
|
let mutableView: MutableMessageView
|
|
|
|
let around = self.fetchMessagesAround(peerId, anchorId: id, count: count)
|
|
if around.0.count == 0 {
|
|
let tail = self.fetchMessagesTail(peerId, count: count + 1)
|
|
|
|
var messages: [Message] = []
|
|
var i = tail.count - 1
|
|
while i >= 0 && i >= tail.count - count {
|
|
messages.insert(tail[i], atIndex: 0)
|
|
i--
|
|
}
|
|
|
|
var earlier: [MessageId.Namespace : Message] = [:]
|
|
|
|
for namespace in self.messageNamespaces {
|
|
var i = tail.count - count - 1
|
|
while i >= 0 {
|
|
if tail[i].id.namespace == namespace {
|
|
earlier[namespace] = tail[i]
|
|
break
|
|
}
|
|
i--
|
|
}
|
|
}
|
|
|
|
mutableView = MutableMessageView(namespaces: self.messageNamespaces, count: count, earlier: earlier, messages: messages, later: [:])
|
|
} else {
|
|
mutableView = MutableMessageView(namespaces: self.messageNamespaces, count: count, earlier: around.1, messages: around.0, later: around.2)
|
|
}
|
|
|
|
let record = (mutableView, Pipe<MessageView>())
|
|
|
|
let index: Bag<(MutableMessageView, Pipe<MessageView>)>.Index
|
|
if let bag = self.peerMessageViews[peerId] {
|
|
index = bag.add(record)
|
|
} else {
|
|
let bag = Bag<(MutableMessageView, Pipe<MessageView>)>()
|
|
index = bag.add(record)
|
|
self.peerMessageViews[peerId] = bag
|
|
}
|
|
|
|
subscriber.putNext(MessageView(mutableView))
|
|
|
|
let pipeDisposable = record.1.signal().start(next: { next in
|
|
subscriber.putNext(next)
|
|
})
|
|
|
|
disposable.set(ActionDisposable { [weak self] in
|
|
pipeDisposable.dispose()
|
|
|
|
if let strongSelf = self {
|
|
strongSelf.queue.dispatch {
|
|
if let bag = strongSelf.peerMessageViews[peerId] {
|
|
bag.remove(index)
|
|
}
|
|
}
|
|
}
|
|
return
|
|
})
|
|
}
|
|
|
|
return disposable
|
|
}
|
|
}
|
|
|
|
func printMessages(messages: [Message]) {
|
|
var string = ""
|
|
string += "["
|
|
var first = true
|
|
for message in messages {
|
|
if first {
|
|
first = false
|
|
} else {
|
|
string += ", "
|
|
}
|
|
string += "\(message.id.namespace): \(message.id.id)—\(message.timestamp)"
|
|
}
|
|
string += "]"
|
|
println(string)
|
|
}
|
|
|
|
public func _dumpTables() {
|
|
println("\n-----------")
|
|
println("top_messages")
|
|
println("------------")
|
|
for row in self.database.prepare("SELECT peerId, timestamp FROM top_messages ORDER BY timestamp DESC").run() {
|
|
println("\(PeerId(row[0] as! Int64)): \(row[1] as! Int64)")
|
|
}
|
|
|
|
println("\n------------")
|
|
println("peer_messages")
|
|
println("-------------")
|
|
for row in self.database.prepare("SELECT peerId, namespace, id, timestamp, associatedMediaIds FROM peer_messages").run() {
|
|
println("peer(\(PeerId(row[0] as! Int64))) id(\(MessageId(peerId: PeerId(row[0] as! Int64), namespace: Int32(row[1] as! Int64), id:(Int32(row[2] as! Int64))))) timestamp(\(row[3] as! Int64)) media(\(Postbox.mediaIdsForBlob(row[4] as! Blob)))")
|
|
}
|
|
|
|
println("\n---------")
|
|
println("peer_media")
|
|
println("----------")
|
|
for row in self.database.prepare("SELECT peerId, mediaNamespace, messageNamespace, messageId FROM peer_media").run() {
|
|
println("peer(\(PeerId(row[0] as! Int64))) namespace(\(row[1] as! Int64)) id(\(MessageId(peerId: PeerId(row[0] as! Int64), namespace: Int32(row[2] as! Int64), id:Int32(row[3] as! Int64))))")
|
|
}
|
|
|
|
println("\n----")
|
|
println("media")
|
|
println("-----")
|
|
for row in self.database.prepare("SELECT namespace, id, associatedMessageIds FROM media").run() {
|
|
println("id(\(MediaId(namespace: Int32(row[0] as! Int64), id:(row[1] as! Int64)))) messages(\(Postbox.messageIdsForBlob(row[2] as! Blob)))")
|
|
}
|
|
|
|
println("\n------------")
|
|
println("media_cleanup")
|
|
println("-------------")
|
|
for row in self.database.prepare("SELECT namespace, id FROM media_cleanup").run() {
|
|
println("id(\(MediaId(namespace: Int32(row[0] as! Int64), id:(row[1] as! Int64))))")
|
|
}
|
|
}
|
|
|
|
public func _sync() {
|
|
self.queue.sync {
|
|
}
|
|
}
|
|
}
|