no message

This commit is contained in:
Peter
2017-01-05 01:14:32 +04:00
parent 8fdd767461
commit 84c0a86d8f
24 changed files with 686 additions and 114 deletions

View File

@@ -21,7 +21,7 @@ final class MessageHistoryReadStateTable: Table {
}
private var cachedPeerReadStates: [PeerId: InternalPeerReadStates?] = [:]
private var updatedPeerIds = Set<PeerId>()
private var updatedInitialPeerReadStates: [PeerId: [MessageId.Namespace: PeerReadState]] = [:]
private let sharedKey = ValueBoxKey(length: 8)
@@ -74,29 +74,33 @@ final class MessageHistoryReadStateTable: Table {
return nil
}
private func markReadStatesAsUpdated(_ peerId: PeerId, namespaces: [MessageId.Namespace: PeerReadState]) {
if self.updatedInitialPeerReadStates[peerId] == nil {
self.updatedInitialPeerReadStates[peerId] = namespaces
}
}
func resetStates(_ peerId: PeerId, namespaces: [MessageId.Namespace: PeerReadState]) -> CombinedPeerReadState? {
if traceReadStates {
print("[ReadStateTable] resetStates peerId: \(peerId), namespaces: \(namespaces)")
}
self.updatedPeerIds.insert(peerId)
if let states = self.get(peerId) {
var updated = false
for (namespace, state) in namespaces {
if states.namespaces[namespace] == nil || states.namespaces[namespace]! != state {
self.markReadStatesAsUpdated(peerId, namespaces: states.namespaces)
updated = true
}
states.namespaces[namespace] = state
}
if updated {
self.updatedPeerIds.insert(peerId)
return CombinedPeerReadState(states: states.namespaces.map({$0}))
} else {
return nil
}
} else {
self.updatedPeerIds.insert(peerId)
self.markReadStatesAsUpdated(peerId, namespaces: [:])
let states = InternalPeerReadStates(namespaces: namespaces)
self.cachedPeerReadStates[peerId] = states
return CombinedPeerReadState(states: states.namespaces.map({$0}))
@@ -132,6 +136,8 @@ final class MessageHistoryReadStateTable: Table {
}
if addedUnreadCount != 0 {
self.markReadStatesAsUpdated(peerId, namespaces: states.namespaces)
states.namespaces[namespace] = PeerReadState(maxIncomingReadId: currentState.maxIncomingReadId, maxOutgoingReadId: currentState.maxOutgoingReadId, maxKnownId: currentState.maxKnownId, count: currentState.count + addedUnreadCount)
updated = true
@@ -142,10 +148,6 @@ final class MessageHistoryReadStateTable: Table {
}
}
if updated {
self.updatedPeerIds.insert(peerId)
}
return (updated ? CombinedPeerReadState(states: states.namespaces.map({$0})) : nil, invalidated)
} else {
if traceReadStates {
@@ -186,6 +188,8 @@ final class MessageHistoryReadStateTable: Table {
invalidate = true
}
self.markReadStatesAsUpdated(peerId, namespaces: states.namespaces)
states.namespaces[namespace] = PeerReadState(maxIncomingReadId: currentState.maxIncomingReadId, maxOutgoingReadId: currentState.maxOutgoingReadId, maxKnownId: currentState.maxKnownId, count: currentState.count - knownCount)
updated = true
} else {
@@ -193,10 +197,6 @@ final class MessageHistoryReadStateTable: Table {
}
}
if updated {
self.updatedPeerIds.insert(peerId)
}
return (updated ? CombinedPeerReadState(states: states.namespaces.map({$0})) : nil, invalidate)
} else {
return (nil, true)
@@ -223,8 +223,9 @@ final class MessageHistoryReadStateTable: Table {
}
}
self.markReadStatesAsUpdated(messageId.peerId, namespaces: states.namespaces)
states.namespaces[messageId.namespace] = PeerReadState(maxIncomingReadId: messageId.id, maxOutgoingReadId: state.maxOutgoingReadId, maxKnownId: state.maxKnownId, count: state.count - Int32(deltaCount))
self.updatedPeerIds.insert(messageId.peerId)
return (CombinedPeerReadState(states: states.namespaces.map({$0})), holes)
}
} else {
@@ -237,8 +238,9 @@ final class MessageHistoryReadStateTable: Table {
func applyOutgoingMaxReadId(_ messageId: MessageId) -> (CombinedPeerReadState?, Bool) {
if let states = self.get(messageId.peerId), let state = states.namespaces[messageId.namespace] {
if state.maxOutgoingReadId < messageId.id {
self.markReadStatesAsUpdated(messageId.peerId, namespaces: states.namespaces)
states.namespaces[messageId.namespace] = PeerReadState(maxIncomingReadId: state.maxIncomingReadId, maxOutgoingReadId: messageId.id, maxKnownId: state.maxKnownId, count: state.count)
self.updatedPeerIds.insert(messageId.peerId)
return (CombinedPeerReadState(states: states.namespaces.map({$0})), false)
}
} else {
@@ -258,35 +260,63 @@ final class MessageHistoryReadStateTable: Table {
return (combinedState, holes ? .Push(thenSync: true) : .None)
}
func transactionUnreadCountDeltas() -> [PeerId: Int32] {
var deltas: [PeerId: Int32] = [:]
for (id, initialNamespaces) in self.updatedInitialPeerReadStates {
var initialCount: Int32 = 0
for (_, state) in initialNamespaces {
initialCount += state.count
}
var updatedCount: Int32 = 0
if let maybeStates = self.cachedPeerReadStates[id] {
if let states = maybeStates {
for (_, state) in states.namespaces {
updatedCount += state.count
}
}
} else {
assertionFailure()
}
if initialCount != updatedCount {
deltas[id] = updatedCount - initialCount
}
}
return deltas
}
override func clearMemoryCache() {
self.cachedPeerReadStates.removeAll()
self.updatedPeerIds.removeAll()
assert(self.updatedInitialPeerReadStates.isEmpty)
}
override func beforeCommit() {
let sharedBuffer = WriteBuffer()
for id in self.updatedPeerIds {
if let wrappedStates = self.cachedPeerReadStates[id], let states = wrappedStates {
sharedBuffer.reset()
var count: Int32 = Int32(states.namespaces.count)
sharedBuffer.write(&count, offset: 0, length: 4)
for (namespace, state) in states.namespaces {
var namespaceId: Int32 = namespace
var maxIncomingReadId: Int32 = state.maxIncomingReadId
var maxOutgoingReadId: Int32 = state.maxOutgoingReadId
var maxKnownId: Int32 = state.maxKnownId
var count: Int32 = state.count
sharedBuffer.write(&namespaceId, offset: 0, length: 4)
sharedBuffer.write(&maxIncomingReadId, offset: 0, length: 4)
sharedBuffer.write(&maxOutgoingReadId, offset: 0, length: 4)
sharedBuffer.write(&maxKnownId, offset: 0, length: 4)
if !self.updatedInitialPeerReadStates.isEmpty {
let sharedBuffer = WriteBuffer()
for (id, initialNamespaces) in self.updatedInitialPeerReadStates {
if let wrappedStates = self.cachedPeerReadStates[id], let states = wrappedStates {
sharedBuffer.reset()
var count: Int32 = Int32(states.namespaces.count)
sharedBuffer.write(&count, offset: 0, length: 4)
for (namespace, state) in states.namespaces {
var namespaceId: Int32 = namespace
var maxIncomingReadId: Int32 = state.maxIncomingReadId
var maxOutgoingReadId: Int32 = state.maxOutgoingReadId
var maxKnownId: Int32 = state.maxKnownId
var count: Int32 = state.count
sharedBuffer.write(&namespaceId, offset: 0, length: 4)
sharedBuffer.write(&maxIncomingReadId, offset: 0, length: 4)
sharedBuffer.write(&maxOutgoingReadId, offset: 0, length: 4)
sharedBuffer.write(&maxKnownId, offset: 0, length: 4)
sharedBuffer.write(&count, offset: 0, length: 4)
}
self.valueBox.set(self.table, key: self.key(id), value: sharedBuffer)
} else {
self.valueBox.remove(self.table, key: self.key(id))
}
self.valueBox.set(self.table, key: self.key(id), value: sharedBuffer)
} else {
self.valueBox.remove(self.table, key: self.key(id))
}
self.updatedInitialPeerReadStates.removeAll()
}
self.updatedPeerIds.removeAll()
}
}