Swiftgram/submodules/Postbox/Sources/MessageHistoryReadStateTable.swift
2019-11-01 17:11:12 +04:00

573 lines
30 KiB
Swift

import Foundation
private let traceReadStates = false
enum ApplyInteractiveMaxReadIdResult {
case None
case Push(thenSync: Bool)
}
private final class InternalPeerReadStates {
var namespaces: [MessageId.Namespace: PeerReadState]
init(namespaces: [MessageId.Namespace: PeerReadState]) {
self.namespaces = namespaces
}
}
final class MessageHistoryReadStateTable: Table {
static func tableSpec(_ id: Int32) -> ValueBoxTable {
return ValueBoxTable(id: id, keyType: .int64, compactValuesOnCreation: false)
}
private let seedConfiguration: SeedConfiguration
private var cachedPeerReadStates: [PeerId: InternalPeerReadStates?] = [:]
private var updatedInitialPeerReadStates: [PeerId: [MessageId.Namespace: PeerReadState]] = [:]
private let sharedKey = ValueBoxKey(length: 8)
private func key(_ id: PeerId) -> ValueBoxKey {
self.sharedKey.setInt64(0, value: id.toInt64())
return self.sharedKey
}
init(valueBox: ValueBox, table: ValueBoxTable, seedConfiguration: SeedConfiguration) {
self.seedConfiguration = seedConfiguration
super.init(valueBox: valueBox, table: table)
}
private func get(_ id: PeerId) -> InternalPeerReadStates? {
if let states = self.cachedPeerReadStates[id] {
return states
} else {
if let value = self.valueBox.get(self.table, key: self.key(id)) {
var count: Int32 = 0
value.read(&count, offset: 0, length: 4)
var stateByNamespace: [MessageId.Namespace: PeerReadState] = [:]
for _ in 0 ..< count {
var namespaceId: Int32 = 0
value.read(&namespaceId, offset: 0, length: 4)
let state: PeerReadState
var kind: Int8 = 0
value.read(&kind, offset: 0, length: 1)
if kind == 0 {
var maxIncomingReadId: Int32 = 0
var maxOutgoingReadId: Int32 = 0
var maxKnownId: Int32 = 0
var count: Int32 = 0
value.read(&maxIncomingReadId, offset: 0, length: 4)
value.read(&maxOutgoingReadId, offset: 0, length: 4)
value.read(&maxKnownId, offset: 0, length: 4)
value.read(&count, offset: 0, length: 4)
var flags: Int32 = 0
value.read(&flags, offset: 0, length: 4)
let markedUnread = (flags & (1 << 0)) != 0
state = .idBased(maxIncomingReadId: maxIncomingReadId, maxOutgoingReadId: maxOutgoingReadId, maxKnownId: maxKnownId, count: count, markedUnread: markedUnread)
} else {
var maxIncomingReadTimestamp: Int32 = 0
var maxIncomingReadIdPeerId: Int64 = 0
var maxIncomingReadIdNamespace: Int32 = 0
var maxIncomingReadIdId: Int32 = 0
var maxOutgoingReadTimestamp: Int32 = 0
var maxOutgoingReadIdPeerId: Int64 = 0
var maxOutgoingReadIdNamespace: Int32 = 0
var maxOutgoingReadIdId: Int32 = 0
var count: Int32 = 0
value.read(&maxIncomingReadTimestamp, offset: 0, length: 4)
value.read(&maxIncomingReadIdPeerId, offset: 0, length: 8)
value.read(&maxIncomingReadIdNamespace, offset: 0, length: 4)
value.read(&maxIncomingReadIdId, offset: 0, length: 4)
value.read(&maxOutgoingReadTimestamp, offset: 0, length: 4)
value.read(&maxOutgoingReadIdPeerId, offset: 0, length: 8)
value.read(&maxOutgoingReadIdNamespace, offset: 0, length: 4)
value.read(&maxOutgoingReadIdId, offset: 0, length: 4)
value.read(&count, offset: 0, length: 4)
var flags: Int32 = 0
value.read(&flags, offset: 0, length: 4)
let markedUnread = (flags & (1 << 0)) != 0
state = .indexBased(maxIncomingReadIndex: MessageIndex(id: MessageId(peerId: PeerId(maxIncomingReadIdPeerId), namespace: maxIncomingReadIdNamespace, id: maxIncomingReadIdId), timestamp: maxIncomingReadTimestamp), maxOutgoingReadIndex: MessageIndex(id: MessageId(peerId: PeerId(maxOutgoingReadIdPeerId), namespace: maxOutgoingReadIdNamespace, id: maxOutgoingReadIdId), timestamp: maxOutgoingReadTimestamp), count: count, markedUnread: markedUnread)
}
stateByNamespace[namespaceId] = state
}
let states = InternalPeerReadStates(namespaces: stateByNamespace)
self.cachedPeerReadStates[id] = states
return states
} else {
self.cachedPeerReadStates[id] = nil
return nil
}
}
}
func getCombinedState(_ peerId: PeerId) -> CombinedPeerReadState? {
if let states = self.get(peerId) {
return CombinedPeerReadState(states: states.namespaces.map({$0}))
}
return nil
}
private func markReadStatesAsUpdated(_ peerId: PeerId, namespaces: [MessageId.Namespace: PeerReadState]) {
if self.updatedInitialPeerReadStates[peerId] == nil {
self.updatedInitialPeerReadStates[peerId] = namespaces
}
}
func resetStates(_ peerId: PeerId, namespaces: [MessageId.Namespace: PeerReadState]) -> CombinedPeerReadState? {
if traceReadStates {
print("[ReadStateTable] resetStates peerId: \(peerId), namespaces: \(namespaces)")
}
if let states = self.get(peerId) {
var updated = false
for (namespace, state) in namespaces {
if states.namespaces[namespace] == nil || states.namespaces[namespace]! != state {
self.markReadStatesAsUpdated(peerId, namespaces: states.namespaces)
updated = true
}
states.namespaces[namespace] = state
}
if updated {
return CombinedPeerReadState(states: states.namespaces.map({$0}))
} else {
return nil
}
} else {
self.markReadStatesAsUpdated(peerId, namespaces: [:])
let states = InternalPeerReadStates(namespaces: namespaces)
self.cachedPeerReadStates[peerId] = states
return CombinedPeerReadState(states: states.namespaces.map({$0}))
}
}
func addIncomingMessages(_ peerId: PeerId, indices: Set<MessageIndex>) -> (CombinedPeerReadState?, Bool) {
var indicesByNamespace: [MessageId.Namespace: [MessageIndex]] = [:]
for index in indices {
if indicesByNamespace[index.id.namespace] != nil {
indicesByNamespace[index.id.namespace]!.append(index)
} else {
indicesByNamespace[index.id.namespace] = [index]
}
}
if let states = self.get(peerId) {
if traceReadStates {
print("[ReadStateTable] addIncomingMessages peerId: \(peerId), indices: \(indices) (before: \(states.namespaces))")
}
var updated = false
let invalidated = false
for (namespace, namespaceIndices) in indicesByNamespace {
let currentState = states.namespaces[namespace] ?? self.seedConfiguration.defaultMessageNamespaceReadStates[namespace]
if let currentState = currentState {
var addedUnreadCount: Int32 = 0
for index in namespaceIndices {
switch currentState {
case let .idBased(maxIncomingReadId, _, maxKnownId, _, _):
if index.id.id > maxKnownId && index.id.id > maxIncomingReadId {
addedUnreadCount += 1
}
case let .indexBased(maxIncomingReadIndex, _, _, _):
if index > maxIncomingReadIndex {
addedUnreadCount += 1
}
}
}
if addedUnreadCount != 0 {
self.markReadStatesAsUpdated(peerId, namespaces: states.namespaces)
states.namespaces[namespace] = currentState.withAddedCount(addedUnreadCount)
updated = true
if traceReadStates {
print("[ReadStateTable] added \(addedUnreadCount)")
}
}
}
}
return (updated ? CombinedPeerReadState(states: states.namespaces.map({$0})) : nil, invalidated)
} else {
if traceReadStates {
print("[ReadStateTable] addIncomingMessages peerId: \(peerId), just invalidated)")
}
return (nil, true)
}
}
func deleteMessages(_ peerId: PeerId, indices: [MessageIndex], incomingStatsInIndices: (PeerId, MessageId.Namespace, [MessageIndex]) -> (Int, Bool)) -> (CombinedPeerReadState?, Bool) {
var indicesByNamespace: [MessageId.Namespace: [MessageIndex]] = [:]
for index in indices {
if indicesByNamespace[index.id.namespace] != nil {
indicesByNamespace[index.id.namespace]!.append(index)
} else {
indicesByNamespace[index.id.namespace] = [index]
}
}
if let states = self.get(peerId) {
if traceReadStates {
print("[ReadStateTable] deleteMessages peerId: \(peerId), ids: \(indices) (before: \(states.namespaces))")
}
var updated = false
var invalidate = false
for (namespace, namespaceIndices) in indicesByNamespace {
if let currentState = states.namespaces[namespace] {
var unreadIndices: [MessageIndex] = []
for index in namespaceIndices {
if !currentState.isIncomingMessageIndexRead(index) {
unreadIndices.append(index)
}
}
let (knownCount, holes) = incomingStatsInIndices(peerId, namespace, unreadIndices)
if holes {
invalidate = true
}
self.markReadStatesAsUpdated(peerId, namespaces: states.namespaces)
var updatedState = currentState.withAddedCount(Int32(-knownCount))
if updatedState.count < 0 {
invalidate = true
updatedState = currentState.withAddedCount(-updatedState.count)
}
states.namespaces[namespace] = updatedState
updated = true
} else {
invalidate = true
}
}
return (updated ? CombinedPeerReadState(states: states.namespaces.map({$0})) : nil, invalidate)
} else {
return (nil, true)
}
}
func applyIncomingMaxReadId(_ messageId: MessageId, incomingStatsInRange: (MessageId.Namespace, MessageId.Id, MessageId.Id) -> (count: Int, holes: Bool), topMessageId: (MessageId.Id, Bool)?) -> (CombinedPeerReadState?, Bool) {
if let states = self.get(messageId.peerId), let state = states.namespaces[messageId.namespace] {
if traceReadStates {
print("[ReadStateTable] applyMaxReadId peerId: \(messageId.peerId), maxReadId: \(messageId) (before: \(states.namespaces))")
}
switch state {
case let .idBased(maxIncomingReadId, maxOutgoingReadId, maxKnownId, count, markedUnread):
if maxIncomingReadId < messageId.id || (topMessageId != nil && (messageId.id == topMessageId!.0 || topMessageId!.1) && state.count != 0) || markedUnread {
var (deltaCount, holes) = incomingStatsInRange(messageId.namespace, maxIncomingReadId + 1, messageId.id)
if traceReadStates {
print("[ReadStateTable] applyMaxReadId after deltaCount: \(deltaCount), holes: \(holes)")
}
if let topMessageId = topMessageId, (messageId.id == topMessageId.0 || topMessageId.1) {
if deltaCount != Int(state.count) {
deltaCount = Int(state.count)
holes = true
}
}
self.markReadStatesAsUpdated(messageId.peerId, namespaces: states.namespaces)
states.namespaces[messageId.namespace] = .idBased(maxIncomingReadId: messageId.id, maxOutgoingReadId: maxOutgoingReadId, maxKnownId: maxKnownId, count: max(0, count - Int32(deltaCount)), markedUnread: false)
return (CombinedPeerReadState(states: states.namespaces.map({$0})), holes)
}
case .indexBased:
assertionFailure()
break
}
} else {
return (nil, true)
}
return (nil, false)
}
func applyIncomingMaxReadIndex(_ messageIndex: MessageIndex, topMessageIndex: MessageIndex?, incomingStatsInRange: (MessageIndex, MessageIndex) -> (count: Int, holes: Bool, readMesageIds: [MessageId])) -> (CombinedPeerReadState?, Bool, [MessageId]) {
if let states = self.get(messageIndex.id.peerId), let state = states.namespaces[messageIndex.id.namespace] {
if traceReadStates {
print("[ReadStateTable] applyIncomingMaxReadIndex peerId: \(messageIndex.id.peerId), maxReadIndex: \(messageIndex) (before: \(states.namespaces))")
}
switch state {
case .idBased:
assertionFailure()
case let .indexBased(maxIncomingReadIndex, maxOutgoingReadIndex, count, markedUnread):
var readPastTopIndex = false
if let topMessageIndex = topMessageIndex, messageIndex >= topMessageIndex && count != 0 {
readPastTopIndex = true
}
if maxIncomingReadIndex < messageIndex || markedUnread || readPastTopIndex {
let (realDeltaCount, holes, messageIds) = incomingStatsInRange(maxIncomingReadIndex.successor(), messageIndex)
var deltaCount = realDeltaCount
if readPastTopIndex {
deltaCount = max(Int(count), deltaCount)
}
if traceReadStates {
print("[ReadStateTable] applyIncomingMaxReadIndex after deltaCount: \(deltaCount), holes: \(holes)")
}
self.markReadStatesAsUpdated(messageIndex.id.peerId, namespaces: states.namespaces)
states.namespaces[messageIndex.id.namespace] = .indexBased(maxIncomingReadIndex: messageIndex, maxOutgoingReadIndex: maxOutgoingReadIndex, count: max(0, count - Int32(deltaCount)), markedUnread: false)
return (CombinedPeerReadState(states: states.namespaces.map({$0})), holes, messageIds)
}
}
} else {
return (nil, true, [])
}
return (nil, false, [])
}
func applyOutgoingMaxReadId(_ messageId: MessageId) -> (CombinedPeerReadState?, Bool) {
if let states = self.get(messageId.peerId), let state = states.namespaces[messageId.namespace] {
switch state {
case let .idBased(maxIncomingReadId, maxOutgoingReadId, maxKnownId, count, markedUnread):
if maxOutgoingReadId < messageId.id {
self.markReadStatesAsUpdated(messageId.peerId, namespaces: states.namespaces)
states.namespaces[messageId.namespace] = .idBased(maxIncomingReadId: maxIncomingReadId, maxOutgoingReadId: messageId.id, maxKnownId: maxKnownId, count: count, markedUnread: markedUnread)
return (CombinedPeerReadState(states: states.namespaces.map({$0})), false)
}
case .indexBased:
assertionFailure()
break
}
} else {
return (nil, true)
}
return (nil, false)
}
func applyOutgoingMaxReadIndex(_ messageIndex: MessageIndex, outgoingIndexStatsInRange: (MessageIndex, MessageIndex) -> [MessageId]) -> (CombinedPeerReadState?, Bool, [MessageId]) {
if let states = self.get(messageIndex.id.peerId), let state = states.namespaces[messageIndex.id.namespace] {
switch state {
case .idBased:
assertionFailure()
break
case let .indexBased(maxIncomingReadIndex, maxOutgoingReadIndex, count, markedUnread):
if maxOutgoingReadIndex < messageIndex {
let messageIds: [MessageId] = outgoingIndexStatsInRange(maxOutgoingReadIndex.successor(), messageIndex)
self.markReadStatesAsUpdated(messageIndex.id.peerId, namespaces: states.namespaces)
states.namespaces[messageIndex.id.namespace] = .indexBased(maxIncomingReadIndex: maxIncomingReadIndex, maxOutgoingReadIndex: messageIndex, count: count, markedUnread: markedUnread)
return (CombinedPeerReadState(states: states.namespaces.map({$0})), false, messageIds)
}
}
} else {
return (nil, true, [])
}
return (nil, false, [])
}
func applyInteractiveMaxReadIndex(postbox: Postbox, messageIndex: MessageIndex, incomingStatsInRange: (MessageId.Namespace, MessageId.Id, MessageId.Id) -> (count: Int, holes: Bool), incomingIndexStatsInRange: (MessageIndex, MessageIndex) -> (count: Int, holes: Bool, readMesageIds: [MessageId]), topMessageId: (MessageId.Id, Bool)?, topMessageIndexByNamespace: (MessageId.Namespace) -> MessageIndex?) -> (combinedState: CombinedPeerReadState?, ApplyInteractiveMaxReadIdResult, readMesageIds: [MessageId]) {
if let states = self.get(messageIndex.id.peerId) {
if let state = states.namespaces[messageIndex.id.namespace] {
switch state {
case .idBased:
let (combinedState, holes) = self.applyIncomingMaxReadId(messageIndex.id, incomingStatsInRange: incomingStatsInRange, topMessageId: topMessageId)
if let combinedState = combinedState {
return (combinedState, .Push(thenSync: holes), [])
}
return (combinedState, holes ? .Push(thenSync: true) : .None, [])
case .indexBased:
let topMessageIndex: MessageIndex? = topMessageIndexByNamespace(messageIndex.id.namespace)
let (combinedState, holes, messageIds) = self.applyIncomingMaxReadIndex(messageIndex, topMessageIndex: topMessageIndex, incomingStatsInRange: incomingIndexStatsInRange)
if let combinedState = combinedState {
return (combinedState, .Push(thenSync: holes), messageIds)
}
return (combinedState, holes ? .Push(thenSync: true) : .None, messageIds)
}
} else {
for (namespace, state) in states.namespaces {
if let topIndex = topMessageIndexByNamespace(namespace), topIndex <= messageIndex {
switch state {
case .idBased:
let (combinedState, holes) = self.applyIncomingMaxReadId(topIndex.id, incomingStatsInRange: incomingStatsInRange, topMessageId: nil)
if let combinedState = combinedState {
return (combinedState, .Push(thenSync: holes), [])
}
return (combinedState, holes ? .Push(thenSync: true) : .None, [])
case .indexBased:
let (combinedState, holes, messageIds) = self.applyIncomingMaxReadIndex(topIndex, topMessageIndex: topMessageIndexByNamespace(namespace), incomingStatsInRange: incomingIndexStatsInRange)
if let combinedState = combinedState {
return (combinedState, .Push(thenSync: holes), messageIds)
}
return (combinedState, holes ? .Push(thenSync: true) : .None, messageIds)
}
}
}
return (nil, .Push(thenSync: true), [])
}
} else {
return (nil, .Push(thenSync: true), [])
}
}
func applyInteractiveMarkUnread(peerId: PeerId, namespace: MessageId.Namespace, value: Bool) -> CombinedPeerReadState? {
if let states = self.get(peerId), let state = states.namespaces[namespace] {
switch state {
case let .idBased(maxIncomingReadId, maxOutgoingReadId, maxKnownId, count, markedUnread):
if markedUnread != value {
self.markReadStatesAsUpdated(peerId, namespaces: states.namespaces)
states.namespaces[namespace] = .idBased(maxIncomingReadId: maxIncomingReadId, maxOutgoingReadId: maxOutgoingReadId, maxKnownId: maxKnownId, count: count, markedUnread: value)
return CombinedPeerReadState(states: states.namespaces.map({$0}))
} else {
return nil
}
case let .indexBased(maxIncomingReadIndex, maxOutgoingReadIndex, count, markedUnread):
if markedUnread != value {
self.markReadStatesAsUpdated(peerId, namespaces: states.namespaces)
states.namespaces[namespace] = .indexBased(maxIncomingReadIndex: maxIncomingReadIndex, maxOutgoingReadIndex: maxOutgoingReadIndex, count: count, markedUnread: value)
return CombinedPeerReadState(states: states.namespaces.map({$0}))
} else {
return nil
}
}
} else {
return nil
}
}
func transactionUnreadCountDeltas() -> [PeerId: Int32] {
var deltas: [PeerId: Int32] = [:]
for (id, initialNamespaces) in self.updatedInitialPeerReadStates {
var initialCount: Int32 = 0
for (_, state) in initialNamespaces {
initialCount += state.count
}
var updatedCount: Int32 = 0
if let maybeStates = self.cachedPeerReadStates[id] {
if let states = maybeStates {
for (_, state) in states.namespaces {
updatedCount += state.count
}
}
} else {
assertionFailure()
}
if initialCount != updatedCount {
deltas[id] = updatedCount - initialCount
}
}
return deltas
}
func transactionAlteredInitialPeerCombinedReadStates() -> [PeerId: CombinedPeerReadState] {
var result: [PeerId: CombinedPeerReadState] = [:]
for (peerId, namespacesAndStates) in self.updatedInitialPeerReadStates {
var states: [(MessageId.Namespace, PeerReadState)] = []
for (namespace, state) in namespacesAndStates {
states.append((namespace, state))
}
result[peerId] = CombinedPeerReadState(states: states)
}
return result
}
override func clearMemoryCache() {
self.cachedPeerReadStates.removeAll()
assert(self.updatedInitialPeerReadStates.isEmpty)
}
override func beforeCommit() {
if !self.updatedInitialPeerReadStates.isEmpty {
let sharedBuffer = WriteBuffer()
for (id, initialNamespaces) in self.updatedInitialPeerReadStates {
if let wrappedStates = self.cachedPeerReadStates[id], let states = wrappedStates {
sharedBuffer.reset()
var count: Int32 = Int32(states.namespaces.count)
sharedBuffer.write(&count, offset: 0, length: 4)
for (namespace, state) in states.namespaces {
var namespaceId: Int32 = namespace
sharedBuffer.write(&namespaceId, offset: 0, length: 4)
switch state {
case .idBased(var maxIncomingReadId, var maxOutgoingReadId, var maxKnownId, var count, let markedUnread):
var kind: Int8 = 0
sharedBuffer.write(&kind, offset: 0, length: 1)
sharedBuffer.write(&maxIncomingReadId, offset: 0, length: 4)
sharedBuffer.write(&maxOutgoingReadId, offset: 0, length: 4)
sharedBuffer.write(&maxKnownId, offset: 0, length: 4)
sharedBuffer.write(&count, offset: 0, length: 4)
var flags: Int32 = 0
if markedUnread {
flags |= (1 << 0)
}
sharedBuffer.write(&flags, offset: 0, length: 4)
case .indexBased(let maxIncomingReadIndex, let maxOutgoingReadIndex, var count, let markedUnread):
var kind: Int8 = 1
sharedBuffer.write(&kind, offset: 0, length: 1)
var maxIncomingReadTimestamp: Int32 = maxIncomingReadIndex.timestamp
var maxIncomingReadIdPeerId: Int64 = maxIncomingReadIndex.id.peerId.toInt64()
var maxIncomingReadIdNamespace: Int32 = maxIncomingReadIndex.id.namespace
var maxIncomingReadIdId: Int32 = maxIncomingReadIndex.id.id
var maxOutgoingReadTimestamp: Int32 = maxOutgoingReadIndex.timestamp
var maxOutgoingReadIdPeerId: Int64 = maxOutgoingReadIndex.id.peerId.toInt64()
var maxOutgoingReadIdNamespace: Int32 = maxOutgoingReadIndex.id.namespace
var maxOutgoingReadIdId: Int32 = maxOutgoingReadIndex.id.id
sharedBuffer.write(&maxIncomingReadTimestamp, offset: 0, length: 4)
sharedBuffer.write(&maxIncomingReadIdPeerId, offset: 0, length: 8)
sharedBuffer.write(&maxIncomingReadIdNamespace, offset: 0, length: 4)
sharedBuffer.write(&maxIncomingReadIdId, offset: 0, length: 4)
sharedBuffer.write(&maxOutgoingReadTimestamp, offset: 0, length: 4)
sharedBuffer.write(&maxOutgoingReadIdPeerId, offset: 0, length: 8)
sharedBuffer.write(&maxOutgoingReadIdNamespace, offset: 0, length: 4)
sharedBuffer.write(&maxOutgoingReadIdId, offset: 0, length: 4)
sharedBuffer.write(&count, offset: 0, length: 4)
var flags: Int32 = 0
if markedUnread {
flags |= 1 << 0
}
sharedBuffer.write(&flags, offset: 0, length: 4)
}
}
self.valueBox.set(self.table, key: self.key(id), value: sharedBuffer)
} else {
self.valueBox.remove(self.table, key: self.key(id), secure: false)
}
}
self.updatedInitialPeerReadStates.removeAll()
}
}
}