Swiftgram/Postbox/MessageHistoryIndexTable.swift
2019-03-08 10:18:29 +00:00

330 lines
14 KiB
Swift

import Foundation
public enum AddMessagesLocation {
case Random
case UpperHistoryBlock
}
enum MessageHistoryIndexOperation {
case InsertMessage(InternalStoreMessage)
case InsertExistingMessage(InternalStoreMessage)
case Remove(index: MessageIndex)
case Update(MessageIndex, InternalStoreMessage)
case UpdateTimestamp(MessageIndex, Int32)
}
private let HistoryEntryTypeMask: Int8 = 1
private let HistoryEntryTypeMessage: Int8 = 0
private let HistoryEntryMessageFlagIncoming: Int8 = 1 << 1
private func readHistoryIndexEntry(_ peerId: PeerId, namespace: MessageId.Namespace, key: ValueBoxKey, value: ReadBuffer) -> MessageIndex {
var flags: Int8 = 0
value.read(&flags, offset: 0, length: 1)
var timestamp: Int32 = 0
value.read(&timestamp, offset: 0, length: 4)
let index = MessageIndex(id: MessageId(peerId: peerId, namespace: namespace, id: key.getInt32(8 + 4)), timestamp: timestamp)
return index
}
private func modifyHistoryIndexEntryTimestamp(value: ReadBuffer, timestamp: Int32) -> MemoryBuffer {
let buffer = WriteBuffer()
buffer.write(value.memory.advanced(by: 0), offset: 0, length: 1)
var varTimestamp: Int32 = timestamp
buffer.write(&varTimestamp, offset: 0, length: 4)
buffer.write(value.memory.advanced(by: 5), offset: 0, length: value.length - 5)
return buffer
}
final class MessageHistoryIndexTable: Table {
static func tableSpec(_ id: Int32) -> ValueBoxTable {
return ValueBoxTable(id: id, keyType: .binary)
}
let globalMessageIdsNamespace: Int32
let globalMessageIdsTable: GlobalMessageIdsTable
let metadataTable: MessageHistoryMetadataTable
let seedConfiguration: SeedConfiguration
init(valueBox: ValueBox, table: ValueBoxTable, globalMessageIdsTable: GlobalMessageIdsTable, metadataTable: MessageHistoryMetadataTable, seedConfiguration: SeedConfiguration) {
self.globalMessageIdsTable = globalMessageIdsTable
self.globalMessageIdsNamespace = globalMessageIdsTable.namespace
self.seedConfiguration = seedConfiguration
self.metadataTable = metadataTable
super.init(valueBox: valueBox, table: table)
}
private func key(_ id: MessageId) -> ValueBoxKey {
let key = ValueBoxKey(length: 8 + 4 + 4)
key.setInt64(0, value: id.peerId.toInt64())
key.setInt32(8, value: id.namespace)
key.setInt32(8 + 4, value: id.id)
return key
}
private func lowerBound(_ peerId: PeerId, namespace: MessageId.Namespace) -> ValueBoxKey {
let key = ValueBoxKey(length: 8 + 4)
key.setInt64(0, value: peerId.toInt64())
key.setInt32(8, value: namespace)
return key
}
private func upperBound(_ peerId: PeerId, namespace: MessageId.Namespace) -> ValueBoxKey {
let key = ValueBoxKey(length: 8 + 4)
key.setInt64(0, value: peerId.toInt64())
key.setInt32(8, value: namespace)
return key.successor
}
func addMessages(_ messages: [InternalStoreMessage], operations: inout [MessageHistoryIndexOperation]) {
if messages.count == 0 {
return
}
for message in messages {
let index = MessageIndex(id: message.id, timestamp: message.timestamp)
if self.valueBox.exists(self.table, key: self.key(index.id)) {
operations.append(.InsertExistingMessage(message))
} else {
self.justInsertMessage(message, operations: &operations)
}
}
}
func removeMessage(_ id: MessageId, operations: inout [MessageHistoryIndexOperation]) {
if let index = self.getIndex(id) {
self.justRemove(index, operations: &operations)
}
}
func removeMessagesInRange(peerId: PeerId, namespace: MessageId.Namespace, minId: MessageId.Id, maxId: MessageId.Id, operations: inout [MessageHistoryIndexOperation]) {
if minId > maxId {
assertionFailure()
return
}
var removeMessageIds: [MessageId] = []
self.valueBox.range(self.table, start: self.key(MessageId(peerId: peerId, namespace: namespace, id: minId)).predecessor, end: self.key(MessageId(peerId: peerId, namespace: namespace, id: maxId)).successor, values: { key, value in
let index = readHistoryIndexEntry(peerId, namespace: namespace, key: key, value: value)
removeMessageIds.append(index.id)
return true
}, limit: 0)
for id in removeMessageIds {
self.removeMessage(id, operations: &operations)
}
}
func updateMessage(_ id: MessageId, message: InternalStoreMessage, operations: inout [MessageHistoryIndexOperation]) {
if let previousIndex = self.getIndex(id) {
if previousIndex != MessageIndex(message) {
var intermediateOperations: [MessageHistoryIndexOperation] = []
self.removeMessage(id, operations: &intermediateOperations)
self.addMessages([message], operations: &intermediateOperations)
for operation in intermediateOperations {
switch operation {
case let .Remove(index) where index == previousIndex:
operations.append(.Update(previousIndex, message))
case let .InsertMessage(insertMessage) where MessageIndex(insertMessage) == MessageIndex(message):
break
default:
operations.append(operation)
}
}
} else {
operations.append(.Update(previousIndex, message))
}
}
}
func updateTimestamp(_ id: MessageId, timestamp: Int32, operations: inout [MessageHistoryIndexOperation]) {
if let previousData = self.valueBox.get(self.table, key: self.key(id)), let previousIndex = self.getIndex(id), previousIndex.timestamp != timestamp {
let updatedEntry = modifyHistoryIndexEntryTimestamp(value: previousData, timestamp: timestamp)
self.valueBox.remove(self.table, key: self.key(id))
self.valueBox.set(self.table, key: self.key(id), value: updatedEntry)
operations.append(.UpdateTimestamp(MessageIndex(id: id, timestamp: previousIndex.timestamp), timestamp))
}
}
private func justInsertMessage(_ message: InternalStoreMessage, operations: inout [MessageHistoryIndexOperation]) {
let index = MessageIndex(id: message.id, timestamp: message.timestamp)
let value = WriteBuffer()
var flags: Int8 = HistoryEntryTypeMessage
if message.flags.contains(.Incoming) {
flags |= HistoryEntryMessageFlagIncoming
}
var timestamp: Int32 = index.timestamp
value.write(&flags, offset: 0, length: 1)
value.write(&timestamp, offset: 0, length: 4)
self.valueBox.set(self.table, key: self.key(index.id), value: value)
operations.append(.InsertMessage(message))
if index.id.namespace == self.globalMessageIdsNamespace {
self.globalMessageIdsTable.set(index.id.id, id: index.id)
}
}
private func justRemove(_ index: MessageIndex, operations: inout [MessageHistoryIndexOperation]) {
self.valueBox.remove(self.table, key: self.key(index.id))
operations.append(.Remove(index: index))
if index.id.namespace == self.globalMessageIdsNamespace {
self.globalMessageIdsTable.remove(index.id.id)
}
}
func getIndex(_ id: MessageId) -> MessageIndex? {
let key = self.key(id)
if let value = self.valueBox.get(self.table, key: key) {
return readHistoryIndexEntry(id.peerId, namespace: id.namespace, key: key, value: value)
} else {
return nil
}
}
func top(_ peerId: PeerId, namespace: MessageId.Namespace) -> MessageIndex? {
var index: MessageIndex?
self.valueBox.range(self.table, start: self.upperBound(peerId, namespace: namespace), end: self.lowerBound(peerId, namespace: namespace), values: { key, value in
index = readHistoryIndexEntry(peerId, namespace: namespace, key: key, value: value)
return false
}, limit: 1)
return index
}
func exists(_ id: MessageId) -> Bool {
return self.valueBox.exists(self.table, key: self.key(id))
}
func incomingMessageCountInRange(_ peerId: PeerId, namespace: MessageId.Namespace, minId: MessageId.Id, maxId: MessageId.Id) -> (Int, Bool) {
var count = 0
var holes = false
if minId <= maxId {
self.valueBox.range(self.table, start: self.key(MessageId(peerId: peerId, namespace: namespace, id: minId)).predecessor, end: self.key(MessageId(peerId: peerId, namespace: namespace, id: maxId)).successor, values: { _, value in
var flags: Int8 = 0
value.read(&flags, offset: 0, length: 1)
if (flags & HistoryEntryMessageFlagIncoming) != 0 {
count += 1
}
return true
}, limit: 0)
}
return (count, holes)
}
func incomingMessageCountInIds(_ peerId: PeerId, namespace: MessageId.Namespace, ids: [MessageId.Id]) -> (Int, Bool) {
var count = 0
var holes = false
for id in ids {
if let value = self.valueBox.get(self.table, key: self.key(MessageId(peerId: peerId, namespace: namespace, id: id))) {
var flags: Int8 = 0
value.read(&flags, offset: 0, length: 1)
if (flags & HistoryEntryMessageFlagIncoming) != 0 {
count += 1
}
}
}
return (count, holes)
}
func entriesAround(id: MessageId, count: Int) -> ([MessageIndex], MessageIndex?, MessageIndex?) {
var lowerEntries: [MessageIndex] = []
var upperEntries: [MessageIndex] = []
var lower: MessageIndex?
var upper: MessageIndex?
self.valueBox.range(self.table, start: self.key(id), end: self.lowerBound(id.peerId, namespace: id.namespace), values: { key, value in
lowerEntries.append(readHistoryIndexEntry(id.peerId, namespace: id.namespace, key: key, value: value))
return true
}, limit: count / 2 + 1)
if lowerEntries.count >= count / 2 + 1 {
lower = lowerEntries.last
lowerEntries.removeLast()
}
self.valueBox.range(self.table, start: self.key(id).predecessor, end: self.upperBound(id.peerId, namespace: id.namespace), values: { key, value in
upperEntries.append(readHistoryIndexEntry(id.peerId, namespace: id.namespace, key: key, value: value))
return true
}, limit: count - lowerEntries.count + 1)
if upperEntries.count >= count - lowerEntries.count + 1 {
upper = upperEntries.last
upperEntries.removeLast()
}
if lowerEntries.count != 0 && lowerEntries.count + upperEntries.count < count {
var additionalLowerEntries: [MessageIndex] = []
self.valueBox.range(self.table, start: self.key(lowerEntries.last!.id), end: self.lowerBound(id.peerId, namespace: id.namespace), values: { key, value in
additionalLowerEntries.append(readHistoryIndexEntry(id.peerId, namespace: id.namespace, key: key, value: value))
return true
}, limit: count - lowerEntries.count - upperEntries.count + 1)
if additionalLowerEntries.count >= count - lowerEntries.count + upperEntries.count + 1 {
lower = additionalLowerEntries.last
additionalLowerEntries.removeLast()
}
lowerEntries.append(contentsOf: additionalLowerEntries)
}
var entries: [MessageIndex] = []
entries.append(contentsOf: lowerEntries.reversed())
entries.append(contentsOf: upperEntries)
return (entries: entries, lower: lower, upper: upper)
}
func earlierEntries(id: MessageId, count: Int) -> [MessageIndex] {
var entries: [MessageIndex] = []
let key = self.key(id)
self.valueBox.range(self.table, start: key, end: self.lowerBound(id.peerId, namespace: id.namespace), values: { key, value in
entries.append(readHistoryIndexEntry(id.peerId, namespace: id.namespace, key: key, value: value))
return true
}, limit: count)
return entries
}
func laterEntries(id: MessageId, count: Int) -> [MessageIndex] {
var entries: [MessageIndex] = []
let key = self.key(id)
self.valueBox.range(self.table, start: key, end: self.upperBound(id.peerId, namespace: id.namespace), values: { key, value in
entries.append(readHistoryIndexEntry(id.peerId, namespace: id.namespace, key: key, value: value))
return true
}, limit: count)
return entries
}
func debugList(_ peerId: PeerId, namespace: MessageId.Namespace) -> [MessageIndex] {
var list: [MessageIndex] = []
self.valueBox.range(self.table, start: self.lowerBound(peerId, namespace: namespace), end: self.upperBound(peerId, namespace: namespace), values: { key, value in
list.append(readHistoryIndexEntry(peerId, namespace: namespace, key: key, value: value))
return true
}, limit: 0)
return list
}
func closestIndex(id: MessageId) -> MessageIndex? {
if let index = self.getIndex(id) {
return index
} else {
var index: MessageIndex?
self.valueBox.range(self.table, start: self.key(id).successor, end: self.lowerBound(id.peerId, namespace: id.namespace), values: { key, value in
index = readHistoryIndexEntry(id.peerId, namespace: id.namespace, key: key, value: value)
return true
}, limit: 1)
if index == nil {
self.valueBox.range(self.table, start: self.key(id).predecessor, end: self.upperBound(id.peerId, namespace: id.namespace), values: { key, value in
index = readHistoryIndexEntry(id.peerId, namespace: id.namespace, key: key, value: value)
return true
}, limit: 1)
}
return index
}
}
}