Support chat list state reset

This commit is contained in:
Ali 2023-02-03 20:36:35 +01:00
parent 25e57e15c9
commit f66213b63e
5 changed files with 226 additions and 20 deletions

View File

@ -199,6 +199,26 @@ final class ChatListTable: Table {
}
}
func getAllPeerIds() -> [PeerId] {
return self.indexTable.getAllPeerIds()
}
func removeAllEntries(groupId: PeerGroupId, exceptPeerNamespace: PeerId.Namespace, operations: inout [PeerGroupId: [ChatListOperation]]) {
loop: for entry in self.allEntries(groupId: groupId) {
switch entry {
case let .hole(hole):
addOperation(.RemoveHoles([ChatListIndex(pinningIndex: nil, messageIndex: hole.index)]), groupId: groupId, to: &operations)
self.justRemoveHole(groupId: groupId, index: hole.index)
case let .message(index, _):
if index.messageIndex.id.peerId.namespace == exceptPeerNamespace {
continue loop
}
addOperation(.RemoveEntry([index]), groupId: groupId, to: &operations)
self.justRemoveMessageIndex(groupId: groupId, index: index)
}
}
}
func getPinnedItemIds(groupId: PeerGroupId, messageHistoryTable: MessageHistoryTable, peerChatInterfaceStateTable: PeerChatInterfaceStateTable) -> [(id: PinnedItemId, rank: Int)] {
var itemIds: [(id: PinnedItemId, rank: Int)] = []
self.valueBox.range(self.table, start: self.upperBound(groupId: groupId), end: self.key(groupId: groupId, index: ChatListIndex(pinningIndex: UInt16.max - 1, messageIndex: MessageIndex.absoluteUpperBound()), type: .message).successor, values: { key, value in
@ -366,6 +386,21 @@ final class ChatListTable: Table {
}
}
func getHoles(groupId: PeerGroupId) -> [ChatListHole] {
var result: [ChatListHole] = []
self.valueBox.range(self.table, start: self.upperBound(groupId: groupId), end: self.lowerBound(groupId: groupId), values: { key, value in
let (keyGroupId, pinningIndex, messageIndex, type) = extractKey(key)
assert(groupId == keyGroupId)
let index = ChatListIndex(pinningIndex: pinningIndex, messageIndex: messageIndex)
if type == ChatListEntryType.hole.rawValue {
result.append(ChatListHole(index: index.messageIndex))
}
return true
}, limit: 0)
return result
}
func replaceHole(groupId: PeerGroupId, index: MessageIndex, hole: ChatListHole?, operations: inout [PeerGroupId: [ChatListOperation]]) {
self.ensureInitialized(groupId: groupId)

View File

@ -132,6 +132,16 @@ public final class Transaction {
self.postbox?.replaceChatListHole(groupId: groupId, index: index, hole: hole)
}
public func allChatListHoles(groupId: PeerGroupId) -> [ChatListHole] {
assert(!self.disposed)
return self.postbox?.chatListTable.getHoles(groupId: groupId) ?? []
}
public func addChatListHole(groupId: PeerGroupId, hole: ChatListHole) {
assert(!self.disposed)
self.postbox?.addChatListHole(groupId: groupId, hole: hole)
}
public func deleteMessages(_ messageIds: [MessageId], forEachMedia: ((Media) -> Void)?) {
assert(!self.disposed)
self.postbox?.deleteMessages(messageIds, forEachMedia: forEachMedia)
@ -393,6 +403,16 @@ public final class Transaction {
self.postbox?.updatePeerChatListInclusion(id, inclusion: inclusion)
}
public func removeAllChatListEntries(groupId: PeerGroupId, exceptPeerNamespace: PeerId.Namespace) {
assert(!self.disposed)
self.postbox?.removeAllChatListEntries(groupId: groupId, exceptPeerNamespace: exceptPeerNamespace)
}
public func chatListGetAllPeerIds() -> [PeerId] {
assert(!self.disposed)
return self.postbox?.chatListTable.getAllPeerIds() ?? []
}
public func updateCurrentPeerNotificationSettings(_ notificationSettings: [PeerId: PeerNotificationSettings]) {
assert(!self.disposed)
self.postbox?.updateCurrentPeerNotificationSettings(notificationSettings)
@ -1282,7 +1302,7 @@ public func openPostbox(basePath: String, seedConfiguration: SeedConfiguration,
#if DEBUG
//debugSaveState(basePath: basePath + "/db", name: "previous2")
//debugRestoreState(basePath: basePath + "/db", name: "previous2")
debugRestoreState(basePath: basePath + "/db", name: "previous2")
#endif
let startTime = CFAbsoluteTimeGetCurrent()
@ -1897,6 +1917,10 @@ final class PostboxImpl {
self.chatListTable.replaceHole(groupId: groupId, index: index, hole: hole, operations: &self.currentChatListOperations)
}
fileprivate func addChatListHole(groupId: PeerGroupId, hole: ChatListHole) {
self.chatListTable.addHole(groupId: groupId, hole: hole, operations: &self.currentChatListOperations)
}
fileprivate func deleteMessages(_ messageIds: [MessageId], forEachMedia: ((Media) -> Void)?) {
self.messageHistoryTable.removeMessages(messageIds, operationsByPeerId: &self.currentOperationsByPeerId, updatedMedia: &self.currentUpdatedMedia, unsentMessageOperations: &currentUnsentOperations, updatedPeerReadStateOperations: &self.currentUpdatedSynchronizeReadStateOperations, globalTagsOperations: &self.currentGlobalTagsOperations, pendingActionsOperations: &self.currentPendingMessageActionsOperations, updatedMessageActionsSummaries: &self.currentUpdatedMessageActionsSummaries, updatedMessageTagSummaries: &self.currentUpdatedMessageTagSummaries, invalidateMessageTagSummaries: &self.currentInvalidateMessageTagSummaries, localTagsOperations: &self.currentLocalTagsOperations, timestampBasedMessageAttributesOperations: &self.currentTimestampBasedMessageAttributesOperations, forEachMedia: forEachMedia)
}
@ -2226,6 +2250,10 @@ final class PostboxImpl {
})
}
fileprivate func removeAllChatListEntries(groupId: PeerGroupId, exceptPeerNamespace: PeerId.Namespace) {
self.chatListTable.removeAllEntries(groupId: groupId, exceptPeerNamespace: exceptPeerNamespace, operations: &self.currentChatListOperations)
}
fileprivate func getPinnedItemIds(groupId: PeerGroupId) -> [PinnedItemId] {
var itemIds = self.chatListTable.getPinnedItemIds(groupId: groupId, messageHistoryTable: self.messageHistoryTable, peerChatInterfaceStateTable: self.peerChatInterfaceStateTable)
for (peerId, inclusion) in self.currentUpdatedChatListInclusions {

View File

@ -677,10 +677,13 @@ public final class AccountStateManager {
return (state, invalidatedChannels, disableParallelChannelReset)
}
|> deliverOn(self.queue)
|> mapToSignal { [weak self] state, invalidatedChannels, disableParallelChannelReset -> Signal<(difference: Api.updates.Difference?, finalStatte: AccountReplayedFinalState?, skipBecauseOfError: Bool), NoError> in
|> mapToSignal { [weak self] state, invalidatedChannels, disableParallelChannelReset -> Signal<(difference: Api.updates.Difference?, finalStatte: AccountReplayedFinalState?, skipBecauseOfError: Bool, resetState: Bool), NoError> in
if let state = state, let authorizedState = state.state {
let flags: Int32 = 0
let ptsTotalLimit: Int32? = nil
let flags: Int32
let ptsTotalLimit: Int32?
flags = 1 << 0
ptsTotalLimit = 1000
if let strongSelf = self {
if !invalidatedChannels.isEmpty {
@ -700,19 +703,19 @@ public final class AccountStateManager {
|> retryRequest
return request
|> mapToSignal { difference -> Signal<(difference: Api.updates.Difference?, finalStatte: AccountReplayedFinalState?, skipBecauseOfError: Bool), NoError> in
|> mapToSignal { difference -> Signal<(difference: Api.updates.Difference?, finalStatte: AccountReplayedFinalState?, skipBecauseOfError: Bool, resetState: Bool), NoError> in
guard let difference = difference else {
return .single((nil, nil, true))
return .single((nil, nil, true, false))
}
switch difference {
case .differenceTooLong:
preconditionFailure()
return .single((nil, nil, false, true))
default:
return initialStateWithDifference(postbox: postbox, difference: difference)
|> mapToSignal { state -> Signal<(difference: Api.updates.Difference?, finalStatte: AccountReplayedFinalState?, skipBecauseOfError: Bool), NoError> in
|> mapToSignal { state -> Signal<(difference: Api.updates.Difference?, finalStatte: AccountReplayedFinalState?, skipBecauseOfError: Bool, resetState: Bool), NoError> in
if state.initialState.state != authorizedState {
Logger.shared.log("State", "pollDifference initial state \(authorizedState) != current state \(state.initialState.state)")
return .single((nil, nil, false))
return .single((nil, nil, false, false))
} else {
return finalStateWithDifference(accountPeerId: accountPeerId, postbox: postbox, network: network, state: state, difference: difference, asyncResetChannels: disableParallelChannelReset ? nil : { peers in
queue.async {
@ -720,14 +723,14 @@ public final class AccountStateManager {
}
})
|> deliverOn(queue)
|> mapToSignal { finalState -> Signal<(difference: Api.updates.Difference?, finalStatte: AccountReplayedFinalState?, skipBecauseOfError: Bool), NoError> in
|> mapToSignal { finalState -> Signal<(difference: Api.updates.Difference?, finalStatte: AccountReplayedFinalState?, skipBecauseOfError: Bool, resetState: Bool), NoError> in
if !finalState.state.preCachedResources.isEmpty {
for (resource, data) in finalState.state.preCachedResources {
mediaBox.storeResourceData(resource.id, data: data)
}
}
let removePossiblyDeliveredMessagesUniqueIds = self?.removePossiblyDeliveredMessagesUniqueIds ?? Dictionary()
return postbox.transaction { transaction -> (difference: Api.updates.Difference?, finalStatte: AccountReplayedFinalState?, skipBecauseOfError: Bool) in
return postbox.transaction { transaction -> (difference: Api.updates.Difference?, finalStatte: AccountReplayedFinalState?, skipBecauseOfError: Bool, resetState: Bool) in
let startTime = CFAbsoluteTimeGetCurrent()
let replayedState = replayFinalState(accountManager: accountManager, postbox: postbox, accountPeerId: accountPeerId, mediaBox: mediaBox, encryptionProvider: network.encryptionProvider, transaction: transaction, auxiliaryMethods: auxiliaryMethods, finalState: finalState, removePossiblyDeliveredMessagesUniqueIds: removePossiblyDeliveredMessagesUniqueIds, ignoreDate: false, skipVerification: false)
let deltaTime = CFAbsoluteTimeGetCurrent() - startTime
@ -736,9 +739,9 @@ public final class AccountStateManager {
}
if let replayedState = replayedState {
return (difference, replayedState, false)
return (difference, replayedState, false, false)
} else {
return (nil, nil, false)
return (nil, nil, false, false)
}
}
}
@ -750,14 +753,14 @@ public final class AccountStateManager {
let appliedState = network.request(Api.functions.updates.getState())
|> retryRequest
|> mapToSignal { state in
return postbox.transaction { transaction -> (difference: Api.updates.Difference?, finalStatte: AccountReplayedFinalState?, skipBecauseOfError: Bool) in
return postbox.transaction { transaction -> (difference: Api.updates.Difference?, finalStatte: AccountReplayedFinalState?, skipBecauseOfError: Bool, resetState: Bool) in
if let currentState = transaction.getState() as? AuthorizedAccountState {
switch state {
case let .state(pts, qts, date, seq, _):
transaction.setState(currentState.changedState(AuthorizedAccountState.State(pts: pts, qts: qts, date: date, seq: seq)))
}
}
return (nil, nil, false)
return (nil, nil, false, false)
}
}
return appliedState
@ -765,8 +768,21 @@ public final class AccountStateManager {
}
|> deliverOn(self.queue)
let _ = signal.start(next: { [weak self] difference, finalState, skipBecauseOfError in
if let strongSelf = self {
let _ = signal.start(next: { [weak self] difference, finalState, skipBecauseOfError, resetState in
guard let strongSelf = self else {
return
}
if resetState {
let _ = (_internal_resetAccountState(postbox: postbox, network: network, accountPeerId: accountPeerId)
|> deliverOn(strongSelf.queue)).start(completed: {
guard let strongSelf = self else {
return
}
if case .pollDifference = strongSelf.operations.removeFirst().content {
strongSelf.startFirstOperation()
}
})
} else {
if case .pollDifference = strongSelf.operations.removeFirst().content {
let events: AccountFinalStateEvents
if let finalState = finalState {
@ -803,8 +819,6 @@ public final class AccountStateManager {
strongSelf.replaceOperations(with: .pollDifference(strongSelf.getNextId(), AccountFinalStateEvents()))
}
strongSelf.startFirstOperation()
} else {
assertionFailure()
}
}
})

View File

@ -100,7 +100,7 @@ func resolveUnknownEmojiFiles<T>(postbox: Postbox, source: FetchMessageHistoryHo
}
}
private func withResolvedAssociatedMessages<T>(postbox: Postbox, source: FetchMessageHistoryHoleSource, peers: [PeerId: Peer], storeMessages: [StoreMessage], _ f: @escaping (Transaction, [Peer], [StoreMessage]) -> T) -> Signal<T, NoError> {
func withResolvedAssociatedMessages<T>(postbox: Postbox, source: FetchMessageHistoryHoleSource, peers: [PeerId: Peer], storeMessages: [StoreMessage], _ f: @escaping (Transaction, [Peer], [StoreMessage]) -> T) -> Signal<T, NoError> {
return postbox.transaction { transaction -> Signal<T, NoError> in
var storedIds = Set<MessageId>()
var referencedReplyIds = ReferencedReplyMessageIds()

View File

@ -0,0 +1,129 @@
import Foundation
import SwiftSignalKit
import Postbox
import TelegramApi
func _internal_resetAccountState(postbox: Postbox, network: Network, accountPeerId: PeerId) -> Signal<Never, NoError> {
return network.request(Api.functions.updates.getState())
|> retryRequest
|> mapToSignal { state -> Signal<Never, NoError> in
let chatList = fetchChatList(postbox: postbox, network: network, location: .general, upperBound: .absoluteUpperBound(), hash: 0, limit: 100)
return chatList
|> mapToSignal { fetchedChats -> Signal<Never, NoError> in
guard let fetchedChats = fetchedChats else {
return .never()
}
return withResolvedAssociatedMessages(postbox: postbox, source: .network(network), peers: Dictionary(fetchedChats.peers.map({ ($0.id, $0) }), uniquingKeysWith: { lhs, _ in lhs }), storeMessages: fetchedChats.storeMessages, { transaction, additionalPeers, additionalMessages -> Void in
transaction.removeAllChatListEntries(groupId: .root, exceptPeerNamespace: Namespaces.Peer.SecretChat)
transaction.removeAllChatListEntries(groupId: .group(1), exceptPeerNamespace: Namespaces.Peer.SecretChat)
for peerId in transaction.chatListGetAllPeerIds() {
if peerId.namespace != Namespaces.Peer.SecretChat {
transaction.updatePeerChatListInclusion(peerId, inclusion: .notIncluded)
}
if peerId.namespace != Namespaces.Peer.SecretChat {
transaction.addHole(peerId: peerId, threadId: nil, namespace: Namespaces.Message.Cloud, space: .everywhere, range: 1 ... (Int32.max - 1))
}
}
updatePeers(transaction: transaction, peers: fetchedChats.peers + additionalPeers, update: { _, updated -> Peer in
return updated
})
for (threadMessageId, data) in fetchedChats.threadInfos {
if let entry = StoredMessageHistoryThreadInfo(data.data) {
transaction.setMessageHistoryThreadInfo(peerId: threadMessageId.peerId, threadId: Int64(threadMessageId.id), info: entry)
}
transaction.replaceMessageTagSummary(peerId: threadMessageId.peerId, threadId: Int64(threadMessageId.id), tagMask: .unseenPersonalMessage, namespace: Namespaces.Message.Cloud, count: data.unreadMentionCount, maxId: data.topMessageId)
transaction.replaceMessageTagSummary(peerId: threadMessageId.peerId, threadId: Int64(threadMessageId.id), tagMask: .unseenReaction, namespace: Namespaces.Message.Cloud, count: data.unreadReactionCount, maxId: data.topMessageId)
}
updatePeerPresences(transaction: transaction, accountPeerId: accountPeerId, peerPresences: fetchedChats.peerPresences)
transaction.updateCurrentPeerNotificationSettings(fetchedChats.notificationSettings)
let _ = transaction.addMessages(fetchedChats.storeMessages, location: .UpperHistoryBlock)
let _ = transaction.addMessages(additionalMessages, location: .Random)
transaction.resetIncomingReadStates(fetchedChats.readStates)
for (peerId, autoremoveValue) in fetchedChats.ttlPeriods {
transaction.updatePeerCachedData(peerIds: Set([peerId]), update: { _, current in
if peerId.namespace == Namespaces.Peer.CloudUser {
let current = (current as? CachedUserData) ?? CachedUserData()
return current.withUpdatedAutoremoveTimeout(autoremoveValue)
} else if peerId.namespace == Namespaces.Peer.CloudChannel {
let current = (current as? CachedChannelData) ?? CachedChannelData()
return current.withUpdatedAutoremoveTimeout(autoremoveValue)
} else if peerId.namespace == Namespaces.Peer.CloudGroup {
let current = (current as? CachedGroupData) ?? CachedGroupData()
return current.withUpdatedAutoremoveTimeout(autoremoveValue)
} else {
return current
}
})
}
for hole in transaction.allChatListHoles(groupId: .root) {
transaction.replaceChatListHole(groupId: .root, index: hole.index, hole: nil)
}
for hole in transaction.allChatListHoles(groupId: .group(1)) {
transaction.replaceChatListHole(groupId: .group(1), index: hole.index, hole: nil)
}
if let hole = fetchedChats.lowerNonPinnedIndex.flatMap(ChatListHole.init) {
transaction.addChatListHole(groupId: .root, hole: hole)
}
transaction.addChatListHole(groupId: .group(1), hole: ChatListHole(index: MessageIndex(id: MessageId(peerId: PeerId(namespace: Namespaces.Peer.Empty, id: PeerId.Id._internalFromInt64Value(0)), namespace: Namespaces.Message.Cloud, id: 1), timestamp: Int32.max - 1)))
for peerId in fetchedChats.chatPeerIds {
if let peer = transaction.getPeer(peerId) {
transaction.updatePeerChatListInclusion(peerId, inclusion: .ifHasMessagesOrOneOf(groupId: .root, pinningIndex: transaction.getPeerChatListIndex(peerId)?.1.pinningIndex, minTimestamp: minTimestampForPeerInclusion(peer)))
} else {
assertionFailure()
}
}
for (peerId, peerGroupId) in fetchedChats.peerGroupIds {
if let peer = transaction.getPeer(peerId) {
transaction.updatePeerChatListInclusion(peerId, inclusion: .ifHasMessagesOrOneOf(groupId: peerGroupId, pinningIndex: nil, minTimestamp: minTimestampForPeerInclusion(peer)))
} else {
assertionFailure()
}
}
for (peerId, pts) in fetchedChats.channelStates {
if let current = transaction.getPeerChatState(peerId) as? ChannelState {
transaction.setPeerChatState(peerId, state: current.withUpdatedPts(pts))
} else {
transaction.setPeerChatState(peerId, state: ChannelState(pts: pts, invalidatedPts: nil, synchronizedUntilMessageId: nil))
}
}
if let replacePinnedItemIds = fetchedChats.pinnedItemIds {
transaction.setPinnedItemIds(groupId: .root, itemIds: replacePinnedItemIds.map(PinnedItemId.peer))
}
for (peerId, summary) in fetchedChats.mentionTagSummaries {
transaction.replaceMessageTagSummary(peerId: peerId, threadId: nil, tagMask: .unseenPersonalMessage, namespace: Namespaces.Message.Cloud, count: summary.count, maxId: summary.range.maxId)
}
for (peerId, summary) in fetchedChats.reactionTagSummaries {
transaction.replaceMessageTagSummary(peerId: peerId, threadId: nil, tagMask: .unseenReaction, namespace: Namespaces.Message.Cloud, count: summary.count, maxId: summary.range.maxId)
}
for (groupId, summary) in fetchedChats.folderSummaries {
transaction.resetPeerGroupSummary(groupId: groupId, namespace: Namespaces.Message.Cloud, summary: summary)
}
transaction.reindexUnreadCounters()
if let currentState = transaction.getState() as? AuthorizedAccountState {
switch state {
case let .state(pts, qts, date, seq, _):
transaction.setState(currentState.changedState(AuthorizedAccountState.State(pts: pts, qts: qts, date: date, seq: seq)))
}
}
})
|> ignoreValues
}
}
}