diff --git a/submodules/TelegramCore/Sources/AccountIntermediateState.swift b/submodules/TelegramCore/Sources/AccountIntermediateState.swift index 6bfa5208fc..53a8891e78 100644 --- a/submodules/TelegramCore/Sources/AccountIntermediateState.swift +++ b/submodules/TelegramCore/Sources/AccountIntermediateState.swift @@ -599,6 +599,7 @@ struct AccountFinalState { var state: AccountMutableState var shouldPoll: Bool var incomplete: Bool + var missingUpdatesFromChannels: Set var discard: Bool } diff --git a/submodules/TelegramCore/Sources/AccountStateManagementUtils.swift b/submodules/TelegramCore/Sources/AccountStateManagementUtils.swift index 6148122312..1f2e6e89d2 100644 --- a/submodules/TelegramCore/Sources/AccountStateManagementUtils.swift +++ b/submodules/TelegramCore/Sources/AccountStateManagementUtils.swift @@ -417,6 +417,19 @@ private func initialStateWithPeerIds(_ transaction: Transaction, peerIds: Set Signal { @@ -778,6 +791,8 @@ private func finalStateWithUpdatesAndServerTime(postbox: Postbox, network: Netwo channelsToPoll.formUnion(updatedState.initialState.channelsToPollExplicitely) } + var missingUpdatesFromChannels = Set() + for update in sortedUpdates(updates) { switch update { case let .updateChannelTooLong(_, channelId, channelPts): @@ -798,10 +813,9 @@ private func finalStateWithUpdatesAndServerTime(postbox: Postbox, network: Netwo updatedState.deleteMessages(messages.map({ MessageId(peerId: peerId, namespace: Namespaces.Message.Cloud, id: $0) })) updatedState.updateChannelState(peerId, pts: pts) } else { - if !channelsToPoll.contains(peerId) { - Logger.shared.log("State", "channel \(peerId) (\((updatedState.peers[peerId] as? TelegramChannel)?.title ?? "nil")) delete pts hole") - channelsToPoll.insert(peerId) - //updatedMissingUpdates = true + if !missingUpdatesFromChannels.contains(peerId) { + Logger.shared.log("State", "channel \(peerId) (\((updatedState.peers[peerId] as? TelegramChannel)?.title ?? "nil")) delete pts hole \(previousState.pts) + \(ptsCount) != \(pts)") + missingUpdatesFromChannels.insert(peerId) } } } else { @@ -827,10 +841,9 @@ private func finalStateWithUpdatesAndServerTime(postbox: Postbox, network: Netwo updatedState.editMessage(messageId, message: message.withUpdatedAttributes(attributes)) updatedState.updateChannelState(peerId, pts: pts) } else { - if !channelsToPoll.contains(peerId) { - Logger.shared.log("State", "channel \(peerId) (\((updatedState.peers[peerId] as? TelegramChannel)?.title ?? "nil")) edit message pts hole") - channelsToPoll.insert(peerId) - //updatedMissingUpdates = true + if !missingUpdatesFromChannels.contains(peerId) { + Logger.shared.log("State", "channel \(peerId) (\((updatedState.peers[peerId] as? TelegramChannel)?.title ?? "nil")) edit message pts hole \(previousState.pts) + \(ptsCount) != \(pts)") + missingUpdatesFromChannels.insert(peerId) } } } else { @@ -859,7 +872,7 @@ private func finalStateWithUpdatesAndServerTime(postbox: Postbox, network: Netwo updatedState.updateChannelState(peerId, pts: pts) } else { if !channelsToPoll.contains(peerId) { - Logger.shared.log("State", "channel \(peerId) (\((updatedState.peers[peerId] as? TelegramChannel)?.title ?? "nil")) updateWebPage pts hole") + Logger.shared.log("State", "channel \(peerId) (\((updatedState.peers[peerId] as? TelegramChannel)?.title ?? "nil")) updateWebPage pts hole \(previousState.pts) + \(ptsCount) != \(pts)") channelsToPoll.insert(peerId) } } @@ -919,11 +932,10 @@ private func finalStateWithUpdatesAndServerTime(postbox: Postbox, network: Netwo updatedState.updateChannelSynchronizedUntilMessage(id.peerId, id: id.id) } } else { - if !channelsToPoll.contains(message.id.peerId) { - Logger.shared.log("State", "channel \(message.id.peerId) (\((updatedState.peers[message.id.peerId] as? TelegramChannel)?.title ?? "nil")) message pts hole") + if !missingUpdatesFromChannels.contains(message.id.peerId) { + Logger.shared.log("State", "channel \(message.id.peerId) (\((updatedState.peers[message.id.peerId] as? TelegramChannel)?.title ?? "nil")) message pts hole \(previousState.pts) + \(ptsCount) != \(pts)") ; - channelsToPoll.insert(message.id.peerId) - //updatedMissingUpdates = true + missingUpdatesFromChannels.insert(message.id.peerId) } } } else { @@ -1134,10 +1146,9 @@ private func finalStateWithUpdatesAndServerTime(postbox: Postbox, network: Netwo }, pinned: (flags & (1 << 0)) != 0) updatedState.updateChannelState(peerId, pts: pts) } else { - if !channelsToPoll.contains(peerId) { - Logger.shared.log("State", "channel \(peerId) (\((updatedState.peers[peerId] as? TelegramChannel)?.title ?? "nil")) pinned messages pts hole") - channelsToPoll.insert(peerId) - //updatedMissingUpdates = true + if !missingUpdatesFromChannels.contains(peerId) { + Logger.shared.log("State", "channel \(peerId) (\((updatedState.peers[peerId] as? TelegramChannel)?.title ?? "nil")) pinned messages pts hole \(previousState.pts) + \(ptsCount) != \(pts)") + missingUpdatesFromChannels.insert(peerId) } } } else { @@ -1417,7 +1428,7 @@ private func finalStateWithUpdatesAndServerTime(postbox: Postbox, network: Netwo pollChannelSignals = [] } } else { - for peerId in channelsToPoll { + for peerId in channelsToPoll.union(missingUpdatesFromChannels) { if let peer = updatedState.peers[peerId] { pollChannelSignals.append(pollChannel(network: network, peer: peer, state: updatedState.branch())) } else { @@ -1445,7 +1456,7 @@ private func finalStateWithUpdatesAndServerTime(postbox: Postbox, network: Netwo |> mapToSignal { resultingState -> Signal in return resolveMissingPeerChatInfos(network: network, state: resultingState) |> map { resultingState, resolveError -> AccountFinalState in - return AccountFinalState(state: resultingState, shouldPoll: shouldPoll || hadError || resolveError, incomplete: missingUpdates, discard: resolveError) + return AccountFinalState(state: resultingState, shouldPoll: shouldPoll || hadError || resolveError, incomplete: missingUpdates, missingUpdatesFromChannels: Set(), discard: resolveError) } } } @@ -1671,7 +1682,7 @@ func keepPollingChannel(postbox: Postbox, network: Network, peerId: PeerId, stat |> mapToSignal { resultingState -> Signal in return resolveMissingPeerChatInfos(network: network, state: resultingState) |> map { resultingState, _ -> AccountFinalState in - return AccountFinalState(state: resultingState, shouldPoll: false, incomplete: false, discard: false) + return AccountFinalState(state: resultingState, shouldPoll: false, incomplete: false, missingUpdatesFromChannels: Set(), discard: false) } } |> mapToSignal { finalState -> Signal in @@ -1860,11 +1871,7 @@ private func resetChannels(network: Network, peers: [Peer], state: AccountMutabl private func pollChannel(network: Network, peer: Peer, state: AccountMutableState) -> Signal<(AccountMutableState, Bool, Int32?), NoError> { if let inputChannel = apiInputChannel(peer) { let limit: Int32 - #if DEBUG - limit = 1 - #else - limit = 20 - #endif + limit = 100 let pollPts: Int32 if let channelState = state.channelStates[peer.id] { @@ -2108,16 +2115,6 @@ private func verifyTransaction(_ transaction: Transaction, finalState: AccountMu return !failed } -private enum ReplayFinalStateIncomplete { - case MoreDataNeeded - case PollRequired -} - -private enum ReplayFinalStateResult { - case Completed - case Incomplete(ReplayFinalStateIncomplete) -} - private final class OptimizeAddMessagesState { var messages: [StoreMessage] var location: AddMessagesLocation diff --git a/submodules/TelegramCore/Sources/AccountStateManager.swift b/submodules/TelegramCore/Sources/AccountStateManager.swift index 00def9e4fb..94a7e191a0 100644 --- a/submodules/TelegramCore/Sources/AccountStateManager.swift +++ b/submodules/TelegramCore/Sources/AccountStateManager.swift @@ -605,7 +605,7 @@ public final class AccountStateManager { if !events.isEmpty { strongSelf.insertProcessEvents(events) } - if finalState.incomplete { + if finalState.incomplete || !finalState.missingUpdatesFromChannels.isEmpty { strongSelf.addOperation(.collectUpdateGroups(groups, 2.0), position: .last) } } else { @@ -622,9 +622,6 @@ public final class AccountStateManager { assertionFailure() } } - }, error: { _ in - assertionFailure() - Logger.shared.log("AccountStateManager", "processUpdateGroups signal completed with error") }) case let .custom(operationId, signal): self.operationTimer?.invalidate() diff --git a/submodules/TelegramCore/Sources/SynchronizePeerReadState.swift b/submodules/TelegramCore/Sources/SynchronizePeerReadState.swift index ec2773ead4..6d745cb599 100644 --- a/submodules/TelegramCore/Sources/SynchronizePeerReadState.swift +++ b/submodules/TelegramCore/Sources/SynchronizePeerReadState.swift @@ -60,7 +60,7 @@ private func dialogTopMessage(network: Network, postbox: Postbox, peerId: PeerId if let message = apiMessages.first, let timestamp = message.timestamp { return .single((message.rawId, timestamp)) } else { - return .fail(.retry) + return .single(nil) } } } @@ -333,7 +333,7 @@ private func pushPeerReadState(network: Network, postbox: Postbox, stateManager: return stateManager.addCustomOperation(postbox.transaction { transaction -> PeerReadStateValidationError? in if let readStates = transaction.getPeerReadStates(peerId) { for (namespace, currentReadState) in readStates where namespace == namespaceAndReadState.0 { - if currentReadState == namespaceAndReadState.1 { + if currentReadState.count == namespaceAndReadState.1.count { transaction.confirmSynchronizedIncomingReadState(peerId) return nil }