import Foundation import Postbox import SwiftSignalKit import TelegramApi import MtProtoKit private enum AccountStateManagerOperationContent { case pollDifference(Int32, AccountFinalStateEvents) case collectUpdateGroups([UpdateGroup], Double) case processUpdateGroups([UpdateGroup]) case custom(Int32, Signal) case pollCompletion(Int32, [MessageId], [(Int32, ([MessageId]) -> Void)]) case processEvents(Int32, AccountFinalStateEvents) case replayAsynchronouslyBuiltFinalState(AccountFinalState, () -> Void) } private final class AccountStateManagerOperation { var isRunning: Bool = false let content: AccountStateManagerOperationContent init(content: AccountStateManagerOperationContent) { self.content = content } } private enum AccountStateManagerAddOperationPosition { case first case last } private typealias SignalKitTimer = SwiftSignalKit.Timer private enum CustomOperationEvent { case Next(T) case Error(E) case Completion } private final class UpdatedWebpageSubscriberContext { let subscribers = Bag<(TelegramMediaWebpage) -> Void>() } private final class UpdatedPeersNearbySubscriberContext { let subscribers = Bag<([PeerNearby]) -> Void>() } public enum DeletedMessageId: Hashable { case global(Int32) case messageId(MessageId) } public final class AccountStateManager { public final class IncomingCallUpdate { public let callId: Int64 public let callAccessHash: Int64 public let timestamp: Int32 public let peer: EnginePeer init( callId: Int64, callAccessHash: Int64, timestamp: Int32, peer: EnginePeer ) { self.callId = callId self.callAccessHash = callAccessHash self.timestamp = timestamp self.peer = peer } } private final class Impl { private final class ChannelOperationData { let pts: Int32? let disposable: Disposable var isCompleted: Bool = false init(pts: Int32?, disposable: Disposable) { self.pts = pts self.disposable = disposable } deinit { self.disposable.dispose() } } private final class ChannelOperationsContext { let associatedDifferenceId: Int32 var pendingChannels: [PeerId: ChannelOperationData] = [:] var channelResults: [PeerId: AccountMutableState] = [:] var events: AccountFinalStateEvents = AccountFinalStateEvents() var canComplete: Bool = false var isInternallyComplete: Bool { return !self.pendingChannels.contains(where: { !$0.value.isCompleted }) } init(associatedDifferenceId: Int32) { self.associatedDifferenceId = associatedDifferenceId } } private let queue: Queue public let accountPeerId: PeerId private let accountManager: AccountManager public let postbox: Postbox public let network: Network private let callSessionManager: CallSessionManager? private let addIsContactUpdates: ([(PeerId, Bool)]) -> Void private let shouldKeepOnlinePresence: Signal private let peerInputActivityManager: PeerInputActivityManager? let auxiliaryMethods: AccountAuxiliaryMethods var transformOutgoingMessageMedia: TransformOutgoingMessageMedia? private var updateService: UpdateMessageService? private let updateServiceDisposable = MetaDisposable() private var operations_: [AccountStateManagerOperation] = [] private var operations: [AccountStateManagerOperation] { get { assert(self.queue.isCurrent()) return self.operations_ } set(value) { assert(self.queue.isCurrent()) self.operations_ = value } } private let operationDisposable = MetaDisposable() private var operationTimer: SignalKitTimer? private var currentChannelOperationsContext: ChannelOperationsContext? private var removePossiblyDeliveredMessagesUniqueIds: [Int64: PeerId] = [:] private let currentValidityMarker: Int64 = Int64.random(in: Int64.min ... Int64.max) private var nextId: Int32 = 0 private func getNextId() -> Int32 { self.nextId += 1 return self.nextId } private let isUpdatingValue = ValuePromise(true) private var currentIsUpdatingValue = true { didSet { if self.currentIsUpdatingValue != oldValue { self.isUpdatingValue.set(self.currentIsUpdatingValue) } } } public var isUpdating: Signal { return self.isUpdatingValue.get() } private let notificationMessagesPipe = ValuePipe<[([Message], PeerGroupId, Bool, MessageHistoryThreadData?)]>() public var notificationMessages: Signal<[([Message], PeerGroupId, Bool, MessageHistoryThreadData?)], NoError> { return self.notificationMessagesPipe.signal() } private let reactionNotificationsPipe = ValuePipe<[(reactionAuthor: Peer, reaction: MessageReaction.Reaction, message: Message, timestamp: Int32)]>() public var reactionNotifications: Signal<[(reactionAuthor: Peer, reaction: MessageReaction.Reaction, message: Message, timestamp: Int32)], NoError> { return self.reactionNotificationsPipe.signal() } private let displayAlertsPipe = ValuePipe<[(text: String, isDropAuth: Bool)]>() public var displayAlerts: Signal<[(text: String, isDropAuth: Bool)], NoError> { return self.displayAlertsPipe.signal() } private let dismissBotWebViewsPipe = ValuePipe<[Int64]>() public var dismissBotWebViews: Signal<[Int64], NoError> { return self.dismissBotWebViewsPipe.signal() } private let externallyUpdatedPeerIdsPipe = ValuePipe<[PeerId]>() var externallyUpdatedPeerIds: Signal<[PeerId], NoError> { return self.externallyUpdatedPeerIdsPipe.signal() } private let termsOfServiceUpdateValue = Atomic(value: nil) private let termsOfServiceUpdatePromise = Promise(nil) public var termsOfServiceUpdate: Signal { return self.termsOfServiceUpdatePromise.get() } private let appUpdateInfoValue = Atomic(value: nil) private let appUpdateInfoPromise = Promise(nil) public var appUpdateInfo: Signal { return self.appUpdateInfoPromise.get() } private let appliedIncomingReadMessagesPipe = ValuePipe<[MessageId]>() public var appliedIncomingReadMessages: Signal<[MessageId], NoError> { return self.appliedIncomingReadMessagesPipe.signal() } private let significantStateUpdateCompletedPipe = ValuePipe() var significantStateUpdateCompleted: Signal { return self.significantStateUpdateCompletedPipe.signal() } private let authorizationListUpdatesPipe = ValuePipe() var authorizationListUpdates: Signal { return self.authorizationListUpdatesPipe.signal() } private let threadReadStateUpdatesPipe = ValuePipe<(incoming: [MessageId: MessageId.Id], outgoing: [MessageId: MessageId.Id])>() var threadReadStateUpdates: Signal<(incoming: [MessageId: MessageId.Id], outgoing: [MessageId: MessageId.Id]), NoError> { return self.threadReadStateUpdatesPipe.signal() } private let groupCallParticipantUpdatesPipe = ValuePipe<[(Int64, GroupCallParticipantsContext.Update)]>() public var groupCallParticipantUpdates: Signal<[(Int64, GroupCallParticipantsContext.Update)], NoError> { return self.groupCallParticipantUpdatesPipe.signal() } private let deletedMessagesPipe = ValuePipe<[DeletedMessageId]>() public var deletedMessages: Signal<[DeletedMessageId], NoError> { return self.deletedMessagesPipe.signal() } private var updatedWebpageContexts: [MediaId: UpdatedWebpageSubscriberContext] = [:] private var updatedPeersNearbyContext = UpdatedPeersNearbySubscriberContext() private let delayNotificatonsUntil = Atomic(value: nil) private let appliedMaxMessageIdPromise = Promise(nil) private let appliedMaxMessageIdDisposable = MetaDisposable() private let appliedQtsPromise = Promise(nil) private let appliedQtsDisposable = MetaDisposable() let updateConfigRequested: (() -> Void)? let isPremiumUpdated: (() -> Void)? init( queue: Queue, accountPeerId: PeerId, accountManager: AccountManager, postbox: Postbox, network: Network, callSessionManager: CallSessionManager?, addIsContactUpdates: @escaping ([(PeerId, Bool)]) -> Void, shouldKeepOnlinePresence: Signal, peerInputActivityManager: PeerInputActivityManager?, auxiliaryMethods: AccountAuxiliaryMethods, updateConfigRequested: (() -> Void)?, isPremiumUpdated: (() -> Void)? ) { self.queue = queue self.accountPeerId = accountPeerId self.accountManager = accountManager self.postbox = postbox self.network = network self.callSessionManager = callSessionManager self.addIsContactUpdates = addIsContactUpdates self.shouldKeepOnlinePresence = shouldKeepOnlinePresence self.peerInputActivityManager = peerInputActivityManager self.auxiliaryMethods = auxiliaryMethods self.updateConfigRequested = updateConfigRequested self.isPremiumUpdated = isPremiumUpdated } deinit { self.updateServiceDisposable.dispose() self.operationDisposable.dispose() self.appliedMaxMessageIdDisposable.dispose() self.appliedQtsDisposable.dispose() } public func reset() { self.queue.async { if self.updateService == nil { self.updateService = UpdateMessageService(peerId: self.accountPeerId) self.updateServiceDisposable.set(self.updateService!.pipe.signal().start(next: { [weak self] groups in if let strongSelf = self { strongSelf.addUpdateGroups(groups) } })) self.network.mtProto.add(self.updateService) } self.operationDisposable.set(nil) self.replaceOperations(with: .pollDifference(self.getNextId(), AccountFinalStateEvents())) self.startFirstOperation() let appliedValues: [(MetaDisposable, Signal, Bool)] = [ (self.appliedMaxMessageIdDisposable, self.appliedMaxMessageIdPromise.get(), true), (self.appliedQtsDisposable, self.appliedQtsPromise.get(), false) ] for (disposable, value, isMaxMessageId) in appliedValues { let network = self.network disposable.set((combineLatest(queue: self.queue, self.shouldKeepOnlinePresence, value) |> mapToSignal { shouldKeepOnlinePresence, value -> Signal in guard let value = value else { return .complete() } if !shouldKeepOnlinePresence { return .complete() } return .single(value) } |> distinctUntilChanged |> mapToSignal { value -> Signal in if isMaxMessageId { return network.request(Api.functions.messages.receivedMessages(maxId: value)) |> ignoreValues |> `catch` { _ -> Signal in return .complete() } } else { if value == 0 { return .complete() } else { return network.request(Api.functions.messages.receivedQueue(maxQts: value)) |> ignoreValues |> `catch` { _ -> Signal in return .complete() } } } }).start()) } } } func addUpdates(_ updates: Api.Updates) { self.queue.async { self.updateService?.addUpdates(updates) } } func addUpdateGroups(_ groups: [UpdateGroup]) { self.queue.async { if let last = self.operations.last { switch last.content { case .pollDifference, .processUpdateGroups, .custom, .pollCompletion, .processEvents, .replayAsynchronouslyBuiltFinalState: self.addOperation(.collectUpdateGroups(groups, 0.0), position: .last) case let .collectUpdateGroups(currentGroups, timeout): let operation = AccountStateManagerOperation(content: .collectUpdateGroups(currentGroups + groups, timeout)) operation.isRunning = last.isRunning self.operations[self.operations.count - 1] = operation self.startFirstOperation() } } else { self.addOperation(.collectUpdateGroups(groups, 0.0), position: .last) } } } func addReplayAsynchronouslyBuiltFinalState(_ finalState: AccountFinalState) -> Signal { return Signal { subscriber in self.queue.async { self.addOperation(.replayAsynchronouslyBuiltFinalState(finalState, { subscriber.putNext(true) subscriber.putCompletion() }), position: .last) } return EmptyDisposable } } func addCustomOperation(_ f: Signal) -> Signal { let pipe = ValuePipe>() return Signal { subscriber in let disposable = pipe.signal().start(next: { event in switch event { case let .Next(next): subscriber.putNext(next) case let .Error(error): subscriber.putError(error) case .Completion: subscriber.putCompletion() } }) let signal = Signal { subscriber in return f.start(next: { next in pipe.putNext(.Next(next)) }, error: { error in pipe.putNext(.Error(error)) subscriber.putCompletion() }, completed: { pipe.putNext(.Completion) subscriber.putCompletion() }) } self.addOperation(.custom(self.getNextId(), signal), position: .last) return disposable } |> runOn(self.queue) } private func replaceOperations(with content: AccountStateManagerOperationContent) { var collectedProcessUpdateGroups: [AccountStateManagerOperationContent] = [] var collectedMessageIds: [MessageId] = [] var collectedPollCompletionSubscribers: [(Int32, ([MessageId]) -> Void)] = [] var collectedReplayAsynchronouslyBuiltFinalState: [(AccountFinalState, () -> Void)] = [] var processEvents: [(Int32, AccountFinalStateEvents)] = [] var customOperations: [(Int32, Signal)] = [] var replacedOperations: [AccountStateManagerOperation] = [] for i in 0 ..< self.operations.count { if self.operations[i].isRunning { replacedOperations.append(self.operations[i]) } else { switch self.operations[i].content { case .processUpdateGroups: collectedProcessUpdateGroups.append(self.operations[i].content) case let .pollCompletion(_, messageIds, subscribers): collectedMessageIds.append(contentsOf: messageIds) collectedPollCompletionSubscribers.append(contentsOf: subscribers) case let .replayAsynchronouslyBuiltFinalState(finalState, completion): collectedReplayAsynchronouslyBuiltFinalState.append((finalState, completion)) case let .processEvents(operationId, events): processEvents.append((operationId, events)) case let .custom(operationId, customSignal): customOperations.append((operationId, customSignal)) default: break } } } replacedOperations.append(contentsOf: collectedProcessUpdateGroups.map { AccountStateManagerOperation(content: $0) }) replacedOperations.append(AccountStateManagerOperation(content: content)) if !collectedPollCompletionSubscribers.isEmpty || !collectedMessageIds.isEmpty { replacedOperations.append(AccountStateManagerOperation(content: .pollCompletion(self.getNextId(), collectedMessageIds, collectedPollCompletionSubscribers))) } for (finalState, completion) in collectedReplayAsynchronouslyBuiltFinalState { replacedOperations.append(AccountStateManagerOperation(content: .replayAsynchronouslyBuiltFinalState(finalState, completion))) } for (operationId, events) in processEvents { replacedOperations.append(AccountStateManagerOperation(content: .processEvents(operationId, events))) } for (operationId, customSignal) in customOperations { replacedOperations.append(AccountStateManagerOperation(content: .custom(operationId, customSignal))) } self.operations.removeAll() self.operations.append(contentsOf: replacedOperations) } private func addOperation(_ content: AccountStateManagerOperationContent, position: AccountStateManagerAddOperationPosition) { self.queue.async { let operation = AccountStateManagerOperation(content: content) switch position { case .first: if self.operations.isEmpty || !self.operations[0].isRunning { self.operations.insert(operation, at: 0) self.startFirstOperation() } else { self.operations.insert(operation, at: 1) } case .last: let begin = self.operations.isEmpty self.operations.append(operation) if begin { self.startFirstOperation() } } } } private func addAsyncResetChannels(peers: [(peer: Peer, pts: Int32?)]) { //TODO:pts guard let operation = self.operations.first, case let .pollDifference(initialId, _) = operation.content else { assertionFailure("Invalid state") return } let channelOperationsContext: ChannelOperationsContext if let current = self.currentChannelOperationsContext { channelOperationsContext = current } else { channelOperationsContext = ChannelOperationsContext(associatedDifferenceId: initialId) self.currentChannelOperationsContext = channelOperationsContext } for peer in peers { let replaceChannelOperation: Bool if let current = channelOperationsContext.pendingChannels[peer.peer.id] { if let pts = peer.pts, let currentPts = current.pts { replaceChannelOperation = currentPts > pts } else { replaceChannelOperation = false } } else { replaceChannelOperation = true } if replaceChannelOperation { let currentValidityMarker = self.currentValidityMarker let _ = (self.postbox.transaction { transaction -> Void in if let state = transaction.getState() as? AuthorizedAccountState { var invalidatedChannels = state.invalidatedChannels.filter({ $0.peerId != peer.peer.id }) invalidatedChannels.append(AuthorizedAccountState.InvalidatedChannel( peerId: peer.peer.id, pts: peer.pts, validityMarker: currentValidityMarker )) transaction.setState(state.withInvalidatedChannels(invalidatedChannels)) } }).start() let channelDisposable = MetaDisposable() channelOperationsContext.pendingChannels[peer.peer.id] = ChannelOperationData(pts: peer.pts, disposable: channelDisposable) channelDisposable.set((self.postbox.transaction { transaction -> AccountMutableState in return initialStateWithPeerIds( transaction, peerIds: Set([peer.peer.id]), activeChannelIds: Set(), referencedReplyMessageIds: ReferencedReplyMessageIds(), referencedGeneralMessageIds: Set(), peerIdsRequiringLocalChatState: Set(), locallyGeneratedMessageTimestamps: [:] ) } |> mapToSignal { state -> Signal in return resetChannels( accountPeerId: self.accountPeerId, postbox: self.postbox, network: self.network, peers: [peer.peer], state: state ) } |> deliverOn(self.queue)).start(next: { [weak self] result in guard let self = self else { return } guard let channelOperationsContext = self.currentChannelOperationsContext else { assertionFailure("Invalid state") return } if let pendingChannelData = channelOperationsContext.pendingChannels[peer.peer.id] { pendingChannelData.isCompleted = true } else { assertionFailure("Invalid state") } channelOperationsContext.channelResults[peer.peer.id] = result self.checkChannelOperationsCompletion() })) } } } private func checkChannelOperationsCompletion() { guard let channelOperationsContext = self.currentChannelOperationsContext else { return } if !channelOperationsContext.canComplete { return } if !channelOperationsContext.isInternallyComplete { return } let states = channelOperationsContext.channelResults.sorted(by: { $0.key < $1.key }).map(\.value) if !states.isEmpty { var finalMutableState = states[0] if states.count > 1 { for i in 1 ..< states.count { finalMutableState.merge(states[i]) } } let accountManager = self.accountManager let postbox = self.postbox let accountPeerId = self.accountPeerId let mediaBox = self.postbox.mediaBox let network = self.network let auxiliaryMethods = self.auxiliaryMethods let events = channelOperationsContext.events let _ = (self.postbox.transaction { transaction -> AccountReplayedFinalState? in if let state = transaction.getState() as? AuthorizedAccountState { transaction.setState(state.withInvalidatedChannels([])) } return replayFinalState( accountManager: accountManager, postbox: postbox, accountPeerId: accountPeerId, mediaBox: mediaBox, encryptionProvider: network.encryptionProvider, transaction: transaction, auxiliaryMethods: auxiliaryMethods, finalState: AccountFinalState( state: finalMutableState, shouldPoll: false, incomplete: false, missingUpdatesFromChannels: Set(), discard: false ), removePossiblyDeliveredMessagesUniqueIds: [:], ignoreDate: false, skipVerification: true ) } |> deliverOn(self.queue)).start(next: { [weak self] finalState in guard let strongSelf = self else { return } strongSelf.currentChannelOperationsContext = nil if let finalState = finalState { var mergedEvents = events mergedEvents = mergedEvents.union(with: AccountFinalStateEvents(state: finalState)) if !mergedEvents.isEmpty { strongSelf.insertProcessEvents(mergedEvents) } } strongSelf.currentIsUpdatingValue = false strongSelf.significantStateUpdateCompletedPipe.putNext(Void()) }) } else { self.currentChannelOperationsContext = nil let events = channelOperationsContext.events if !events.isEmpty { self.insertProcessEvents(events) } self.currentIsUpdatingValue = false self.significantStateUpdateCompletedPipe.putNext(Void()) } } private func startFirstOperation() { guard let operation = self.operations.first else { return } guard !operation.isRunning else { return } operation.isRunning = true let currentValidityMarker = self.currentValidityMarker switch operation.content { case let .pollDifference(_, currentEvents): self.operationTimer?.invalidate() self.currentIsUpdatingValue = true let queue = self.queue let accountManager = self.accountManager let postbox = self.postbox let network = self.network let mediaBox = postbox.mediaBox let accountPeerId = self.accountPeerId let auxiliaryMethods = self.auxiliaryMethods let signal = postbox.transaction { transaction -> (AuthorizedAccountState?, [(peer: Peer, pts: Int32?)], Bool) in let state = transaction.getState() as? AuthorizedAccountState var disableParallelChannelReset = false if let appConfig = transaction.getPreferencesEntry(key: PreferencesKeys.appConfiguration)?.get(AppConfiguration.self), let data = appConfig.data { if let _ = data["ios_disable_parallel_channel_reset_v2"] { disableParallelChannelReset = true } } var invalidatedChannels: [(peer: Peer, pts: Int32?)] = [] if let state = state, !disableParallelChannelReset { for record in state.invalidatedChannels { if record.validityMarker != currentValidityMarker { if let peer = transaction.getPeer(record.peerId) { invalidatedChannels.append((peer, record.pts)) } } } } return (state, invalidatedChannels, disableParallelChannelReset) } |> deliverOn(self.queue) |> mapToSignal { [weak self] state, invalidatedChannels, disableParallelChannelReset -> Signal<(difference: Api.updates.Difference?, finalStatte: AccountReplayedFinalState?, skipBecauseOfError: Bool, resetState: Bool), NoError> in if let state = state, let authorizedState = state.state { let flags: Int32 let ptsTotalLimit: Int32? flags = 1 << 0 ptsTotalLimit = 1000 if let strongSelf = self { if !invalidatedChannels.isEmpty { strongSelf.addAsyncResetChannels(peers: invalidatedChannels) } } let request = network.request(Api.functions.updates.getDifference(flags: flags, pts: authorizedState.pts, ptsTotalLimit: ptsTotalLimit, date: authorizedState.date, qts: authorizedState.qts)) |> map(Optional.init) |> `catch` { error -> Signal in if error.errorCode == 406 && error.errorDescription == "AUTH_KEY_DUPLICATED" { return .single(nil) } else { return .fail(error) } } |> retryRequest return request |> mapToSignal { difference -> Signal<(difference: Api.updates.Difference?, finalStatte: AccountReplayedFinalState?, skipBecauseOfError: Bool, resetState: Bool), NoError> in guard let difference = difference else { return .single((nil, nil, true, false)) } switch difference { case .differenceTooLong: return .single((nil, nil, false, true)) default: return initialStateWithDifference(postbox: postbox, difference: difference) |> mapToSignal { state -> Signal<(difference: Api.updates.Difference?, finalStatte: AccountReplayedFinalState?, skipBecauseOfError: Bool, resetState: Bool), NoError> in if state.initialState.state != authorizedState { Logger.shared.log("State", "pollDifference initial state \(authorizedState) != current state \(state.initialState.state)") return .single((nil, nil, false, false)) } else { return finalStateWithDifference(accountPeerId: accountPeerId, postbox: postbox, network: network, state: state, difference: difference, asyncResetChannels: disableParallelChannelReset ? nil : { peers in queue.async { self?.addAsyncResetChannels(peers: peers) } }) |> deliverOn(queue) |> mapToSignal { finalState -> Signal<(difference: Api.updates.Difference?, finalStatte: AccountReplayedFinalState?, skipBecauseOfError: Bool, resetState: Bool), NoError> in if !finalState.state.preCachedResources.isEmpty { for (resource, data) in finalState.state.preCachedResources { mediaBox.storeResourceData(resource.id, data: data) } } let removePossiblyDeliveredMessagesUniqueIds = self?.removePossiblyDeliveredMessagesUniqueIds ?? Dictionary() return postbox.transaction { transaction -> (difference: Api.updates.Difference?, finalStatte: AccountReplayedFinalState?, skipBecauseOfError: Bool, resetState: Bool) in let startTime = CFAbsoluteTimeGetCurrent() let replayedState = replayFinalState(accountManager: accountManager, postbox: postbox, accountPeerId: accountPeerId, mediaBox: mediaBox, encryptionProvider: network.encryptionProvider, transaction: transaction, auxiliaryMethods: auxiliaryMethods, finalState: finalState, removePossiblyDeliveredMessagesUniqueIds: removePossiblyDeliveredMessagesUniqueIds, ignoreDate: false, skipVerification: false) let deltaTime = CFAbsoluteTimeGetCurrent() - startTime if deltaTime > 1.0 { Logger.shared.log("State", "replayFinalState took \(deltaTime)s") } if let replayedState = replayedState { return (difference, replayedState, false, false) } else { return (nil, nil, false, false) } } } } } } } } else { let appliedState = network.request(Api.functions.updates.getState()) |> retryRequest |> mapToSignal { state in return postbox.transaction { transaction -> (difference: Api.updates.Difference?, finalStatte: AccountReplayedFinalState?, skipBecauseOfError: Bool, resetState: Bool) in if let currentState = transaction.getState() as? AuthorizedAccountState { switch state { case let .state(pts, qts, date, seq, _): transaction.setState(currentState.changedState(AuthorizedAccountState.State(pts: pts, qts: qts, date: date, seq: seq))) } } return (nil, nil, false, false) } } return appliedState } } |> deliverOn(self.queue) let _ = signal.start(next: { [weak self] difference, finalState, skipBecauseOfError, resetState in guard let strongSelf = self else { return } if resetState { let _ = (_internal_resetAccountState(postbox: postbox, network: network, accountPeerId: accountPeerId) |> deliverOn(strongSelf.queue)).start(completed: { guard let strongSelf = self else { return } if case .pollDifference = strongSelf.operations.removeFirst().content { strongSelf.startFirstOperation() } }) } else { if case .pollDifference = strongSelf.operations.removeFirst().content { let events: AccountFinalStateEvents if let finalState = finalState { events = currentEvents.union(with: AccountFinalStateEvents(state: finalState)) } else { events = currentEvents } if let difference = difference { switch difference { case .differenceSlice: strongSelf.addOperation(.pollDifference(strongSelf.getNextId(), events), position: .first) default: if let currentChannelOperationsContext = strongSelf.currentChannelOperationsContext { currentChannelOperationsContext.canComplete = true currentChannelOperationsContext.events = currentChannelOperationsContext.events.union(with: events) strongSelf.checkChannelOperationsCompletion() } else { if !events.isEmpty { strongSelf.insertProcessEvents(events) } strongSelf.currentIsUpdatingValue = false strongSelf.significantStateUpdateCompletedPipe.putNext(Void()) } } } else if skipBecauseOfError { if !events.isEmpty { strongSelf.insertProcessEvents(events) } } else { if !events.isEmpty { strongSelf.insertProcessEvents(events) } strongSelf.replaceOperations(with: .pollDifference(strongSelf.getNextId(), AccountFinalStateEvents())) } strongSelf.startFirstOperation() } } }) case let .collectUpdateGroups(_, timeout): self.operationTimer?.invalidate() let operationTimer = SignalKitTimer(timeout: timeout, repeat: false, completion: { [weak self] in if let strongSelf = self { let firstOperation = strongSelf.operations.removeFirst() if case let .collectUpdateGroups(groups, _) = firstOperation.content { if timeout.isEqual(to: 0.0) { strongSelf.addOperation(.processUpdateGroups(groups), position: .first) } else { Logger.shared.log("AccountStateManager", "timeout while waiting for updates") strongSelf.replaceOperations(with: .pollDifference(strongSelf.getNextId(), AccountFinalStateEvents())) } strongSelf.startFirstOperation() } else { assertionFailure() } } }, queue: self.queue) self.operationTimer = operationTimer operationTimer.start() case let .processUpdateGroups(groups): self.operationTimer?.invalidate() let accountManager = self.accountManager let postbox = self.postbox let network = self.network let auxiliaryMethods = self.auxiliaryMethods let accountPeerId = self.accountPeerId let mediaBox = postbox.mediaBox let queue = self.queue let signal = initialStateWithUpdateGroups(postbox: postbox, groups: groups) |> mapToSignal { [weak self] state -> Signal<(AccountReplayedFinalState?, AccountFinalState), NoError> in return finalStateWithUpdateGroups(accountPeerId: accountPeerId, postbox: postbox, network: network, state: state, groups: groups, asyncResetChannels: nil) |> deliverOn(queue) |> mapToSignal { finalState in if !finalState.discard && !finalState.state.preCachedResources.isEmpty { for (resource, data) in finalState.state.preCachedResources { postbox.mediaBox.storeResourceData(resource.id, data: data) } } let removePossiblyDeliveredMessagesUniqueIds = self?.removePossiblyDeliveredMessagesUniqueIds ?? Dictionary() return postbox.transaction { transaction -> AccountReplayedFinalState? in if finalState.discard { return nil } else { let startTime = CFAbsoluteTimeGetCurrent() let result = replayFinalState(accountManager: accountManager, postbox: postbox, accountPeerId: accountPeerId, mediaBox: mediaBox, encryptionProvider: network.encryptionProvider, transaction: transaction, auxiliaryMethods: auxiliaryMethods, finalState: finalState, removePossiblyDeliveredMessagesUniqueIds: removePossiblyDeliveredMessagesUniqueIds, ignoreDate: false, skipVerification: false) let deltaTime = CFAbsoluteTimeGetCurrent() - startTime if deltaTime > 1.0 { Logger.shared.log("State", "replayFinalState took \(deltaTime)s") } return result } } |> map({ ($0, finalState) }) |> deliverOn(queue) } } let _ = signal.start(next: { [weak self] replayedState, finalState in if let strongSelf = self { if case let .processUpdateGroups(groups) = strongSelf.operations.removeFirst().content { if let replayedState = replayedState, !finalState.shouldPoll { let events = AccountFinalStateEvents(state: replayedState) if !events.isEmpty { strongSelf.insertProcessEvents(events) } if finalState.incomplete || !finalState.missingUpdatesFromChannels.isEmpty { strongSelf.addOperation(.collectUpdateGroups(groups, 2.0), position: .last) } } else { if let replayedState = replayedState { let events = AccountFinalStateEvents(state: replayedState) if !events.displayAlerts.isEmpty { strongSelf.insertProcessEvents(AccountFinalStateEvents(displayAlerts: events.displayAlerts)) } } strongSelf.replaceOperations(with: .pollDifference(strongSelf.getNextId(), AccountFinalStateEvents())) } strongSelf.startFirstOperation() } else { assertionFailure() } } }) case let .custom(operationId, signal): self.operationTimer?.invalidate() let completed: () -> Void = { [weak self] in if let strongSelf = self { let topOperation = strongSelf.operations.removeFirst() if case .custom(operationId, _) = topOperation.content { strongSelf.startFirstOperation() } else { assertionFailure() } } } let _ = (signal |> deliverOn(self.queue)).start(completed: { completed() }) case let .processEvents(operationId, events): self.operationTimer?.invalidate() let completed: () -> Void = { [weak self] in if let strongSelf = self { let topOperation = strongSelf.operations.removeFirst() if case .processEvents(operationId, _) = topOperation.content { if !events.updatedTypingActivities.isEmpty { strongSelf.peerInputActivityManager?.transaction { manager in for (chatPeerId, peerActivities) in events.updatedTypingActivities { for (peerId, activity) in peerActivities { if let activity = activity { manager.addActivity(chatPeerId: chatPeerId, peerId: peerId, activity: activity) } else { manager.removeAllActivities(chatPeerId: chatPeerId, peerId: peerId) } } } } } if !events.updatedWebpages.isEmpty { strongSelf.notifyUpdatedWebpages(events.updatedWebpages) } if let updatedPeersNearby = events.updatedPeersNearby { strongSelf.notifyUpdatedPeersNearby(updatedPeersNearby) } if !events.updatedCalls.isEmpty { for call in events.updatedCalls { strongSelf.callSessionManager?.updateSession(call, completion: { _ in }) } } if !events.addedCallSignalingData.isEmpty { for (id, data) in events.addedCallSignalingData { strongSelf.callSessionManager?.addCallSignalingData(id: id, data: data) } } if !events.updatedGroupCallParticipants.isEmpty { strongSelf.groupCallParticipantUpdatesPipe.putNext(events.updatedGroupCallParticipants) } if !events.updatedIncomingThreadReadStates.isEmpty || !events.updatedOutgoingThreadReadStates.isEmpty { strongSelf.threadReadStateUpdatesPipe.putNext((events.updatedIncomingThreadReadStates, events.updatedOutgoingThreadReadStates)) } if !events.isContactUpdates.isEmpty { strongSelf.addIsContactUpdates(events.isContactUpdates) } if let updatedMaxMessageId = events.updatedMaxMessageId { strongSelf.appliedMaxMessageIdPromise.set(.single(updatedMaxMessageId)) } if let updatedQts = events.updatedQts { strongSelf.appliedQtsPromise.set(.single(updatedQts)) } var pollCount = 0 for i in 0 ..< strongSelf.operations.count { if case let .pollCompletion(pollId, messageIds, subscribers) = strongSelf.operations[i].content { pollCount += 1 var updatedMessageIds = messageIds updatedMessageIds.append(contentsOf: events.addedIncomingMessageIds) let operation = AccountStateManagerOperation(content: .pollCompletion(pollId, updatedMessageIds, subscribers)) operation.isRunning = strongSelf.operations[i].isRunning strongSelf.operations[i] = operation } } assert(pollCount <= 1) strongSelf.startFirstOperation() } else { assertionFailure() } } } if events.delayNotificatonsUntil != nil { let _ = self.delayNotificatonsUntil.swap(events.delayNotificatonsUntil) } let signal = self.postbox.transaction { transaction -> [([Message], PeerGroupId, Bool, MessageHistoryThreadData?)] in var messageList: [([Message], PeerGroupId, Bool, MessageHistoryThreadData?)] = [] for id in events.addedIncomingMessageIds { let (messages, notify, _, _, threadData) = messagesForNotification(transaction: transaction, id: id, alwaysReturnMessage: false) if !messages.isEmpty { messageList.append((messages, .root, notify, threadData)) } } var wasScheduledMessages: [Message] = [] for id in events.wasScheduledMessageIds { if let message = transaction.getMessage(id) { wasScheduledMessages.append(message) } } if !wasScheduledMessages.isEmpty { var threadData: MessageHistoryThreadData? let first = wasScheduledMessages[0] for attr in first.attributes { if let attribute = attr as? ReplyMessageAttribute { if let threadId = attribute.threadMessageId { threadData = transaction.getMessageHistoryThreadInfo(peerId: first.id.peerId, threadId: makeMessageThreadId(threadId))?.data.get(MessageHistoryThreadData.self) } } } messageList.append((wasScheduledMessages, .root, true, threadData)) } return messageList } let _ = (signal |> deliverOn(self.queue)).start(next: { [weak self] messages in if let strongSelf = self { strongSelf.notificationMessagesPipe.putNext(messages) } }, completed: { completed() }) let timestamp = Int32(Date().timeIntervalSince1970) let minReactionTimestamp = timestamp - 20 let reactionEvents = events.addedReactionEvents.compactMap { event -> (reactionAuthor: Peer, reaction: MessageReaction.Reaction, message: Message, timestamp: Int32)? in if event.timestamp >= minReactionTimestamp { return (event.reactionAuthor, event.reaction, event.message, event.timestamp) } else { return nil } } self.reactionNotificationsPipe.putNext(reactionEvents) if !events.displayAlerts.isEmpty { self.displayAlertsPipe.putNext(events.displayAlerts) } if !events.dismissBotWebViews.isEmpty { self.dismissBotWebViewsPipe.putNext(events.dismissBotWebViews) } if !events.externallyUpdatedPeerId.isEmpty { self.externallyUpdatedPeerIdsPipe.putNext(Array(events.externallyUpdatedPeerId)) } if events.authorizationListUpdated { self.authorizationListUpdatesPipe.putNext(Void()) } if !events.deletedMessageIds.isEmpty { self.deletedMessagesPipe.putNext(events.deletedMessageIds) } if events.updateConfig { self.updateConfigRequested?() } if events.isPremiumUpdated { self.isPremiumUpdated?() } case let .pollCompletion(pollId, preMessageIds, preSubscribers): if self.operations.count > 1 { self.operations.removeFirst() self.postponePollCompletionOperation(messageIds: preMessageIds, subscribers: preSubscribers) self.startFirstOperation() } else { self.operationTimer?.invalidate() let signal = self.network.request(Api.functions.help.test()) |> deliverOn(self.queue) let completed: () -> Void = { [weak self] in if let strongSelf = self { let topOperation = strongSelf.operations.removeFirst() if case let .pollCompletion(topPollId, messageIds, subscribers) = topOperation.content { assert(topPollId == pollId) if strongSelf.operations.isEmpty { for (_, f) in subscribers { f(messageIds) } } else { strongSelf.postponePollCompletionOperation(messageIds: messageIds, subscribers: subscribers) } strongSelf.startFirstOperation() } else { assertionFailure() } } } let _ = (signal |> deliverOn(self.queue)).start(error: { _ in completed() }, completed: { completed() }) } case let .replayAsynchronouslyBuiltFinalState(finalState, completion): if !finalState.state.preCachedResources.isEmpty { for (resource, data) in finalState.state.preCachedResources { self.postbox.mediaBox.storeResourceData(resource.id, data: data) } } let accountPeerId = self.accountPeerId let accountManager = self.accountManager let postbox = self.postbox let mediaBox = self.postbox.mediaBox let network = self.network let auxiliaryMethods = self.auxiliaryMethods let removePossiblyDeliveredMessagesUniqueIds = self.removePossiblyDeliveredMessagesUniqueIds let signal = self.postbox.transaction { transaction -> AccountReplayedFinalState? in let startTime = CFAbsoluteTimeGetCurrent() let result = replayFinalState(accountManager: accountManager, postbox: postbox, accountPeerId: accountPeerId, mediaBox: mediaBox, encryptionProvider: network.encryptionProvider, transaction: transaction, auxiliaryMethods: auxiliaryMethods, finalState: finalState, removePossiblyDeliveredMessagesUniqueIds: removePossiblyDeliveredMessagesUniqueIds, ignoreDate: false, skipVerification: false) let deltaTime = CFAbsoluteTimeGetCurrent() - startTime if deltaTime > 1.0 { Logger.shared.log("State", "replayFinalState took \(deltaTime)s") } return result } |> map({ ($0, finalState) }) |> deliverOn(self.queue) let _ = signal.start(next: { [weak self] replayedState, finalState in if let strongSelf = self { if case .replayAsynchronouslyBuiltFinalState = strongSelf.operations.removeFirst().content { if let replayedState = replayedState { let events = AccountFinalStateEvents(state: replayedState) if !events.isEmpty { strongSelf.insertProcessEvents(events) } } strongSelf.startFirstOperation() } else { assertionFailure() } completion() } }) } } func standaloneReplayAsynchronouslyBuiltFinalState(finalState: AccountFinalState) -> Signal { if !finalState.state.preCachedResources.isEmpty { for (resource, data) in finalState.state.preCachedResources { self.postbox.mediaBox.storeResourceData(resource.id, data: data) } } let accountPeerId = self.accountPeerId let accountManager = self.accountManager let postbox = self.postbox let mediaBox = self.postbox.mediaBox let network = self.network let auxiliaryMethods = self.auxiliaryMethods let removePossiblyDeliveredMessagesUniqueIds = self.removePossiblyDeliveredMessagesUniqueIds let signal = self.postbox.transaction { transaction -> AccountReplayedFinalState? in let startTime = CFAbsoluteTimeGetCurrent() let result = replayFinalState(accountManager: accountManager, postbox: postbox, accountPeerId: accountPeerId, mediaBox: mediaBox, encryptionProvider: network.encryptionProvider, transaction: transaction, auxiliaryMethods: auxiliaryMethods, finalState: finalState, removePossiblyDeliveredMessagesUniqueIds: removePossiblyDeliveredMessagesUniqueIds, ignoreDate: false, skipVerification: false) let deltaTime = CFAbsoluteTimeGetCurrent() - startTime if deltaTime > 1.0 { Logger.shared.log("State", "replayFinalState took \(deltaTime)s") } return result } |> map({ ($0, finalState) }) |> deliverOn(self.queue) return signal |> ignoreValues } public func standalonePollDifference() -> Signal { let queue = self.queue let accountManager = self.accountManager let postbox = self.postbox let network = self.network let mediaBox = postbox.mediaBox let accountPeerId = self.accountPeerId let auxiliaryMethods = self.auxiliaryMethods let signal = postbox.stateView() |> mapToSignal { view -> Signal in if let state = view.state as? AuthorizedAccountState { return .single(state) } else { return .complete() } } |> take(1) |> mapToSignal { [weak self] state -> Signal<(difference: Api.updates.Difference?, finalStatte: AccountReplayedFinalState?, skipBecauseOfError: Bool), NoError> in if let authorizedState = state.state { let flags: Int32 = 0 let ptsTotalLimit: Int32? = nil let request = network.request(Api.functions.updates.getDifference(flags: flags, pts: authorizedState.pts, ptsTotalLimit: ptsTotalLimit, date: Int32.max, qts: authorizedState.qts)) |> map(Optional.init) |> `catch` { error -> Signal in if error.errorCode == 406 && error.errorDescription == "AUTH_KEY_DUPLICATED" { return .single(nil) } else { return .fail(error) } } |> retryRequest return request |> mapToSignal { difference -> Signal<(difference: Api.updates.Difference?, finalStatte: AccountReplayedFinalState?, skipBecauseOfError: Bool), NoError> in guard let difference = difference else { return .single((nil, nil, true)) } switch difference { case .differenceTooLong: preconditionFailure() default: return initialStateWithDifference(postbox: postbox, difference: difference) |> mapToSignal { state -> Signal<(difference: Api.updates.Difference?, finalStatte: AccountReplayedFinalState?, skipBecauseOfError: Bool), NoError> in if state.initialState.state != authorizedState { Logger.shared.log("State", "pollDifference initial state \(authorizedState) != current state \(state.initialState.state)") return .single((nil, nil, false)) } else { return finalStateWithDifference(accountPeerId: accountPeerId, postbox: postbox, network: network, state: state, difference: difference, asyncResetChannels: nil) |> deliverOn(queue) |> mapToSignal { finalState -> Signal<(difference: Api.updates.Difference?, finalStatte: AccountReplayedFinalState?, skipBecauseOfError: Bool), NoError> in if !finalState.state.preCachedResources.isEmpty { for (resource, data) in finalState.state.preCachedResources { mediaBox.storeResourceData(resource.id, data: data) } } let removePossiblyDeliveredMessagesUniqueIds = self?.removePossiblyDeliveredMessagesUniqueIds ?? Dictionary() return postbox.transaction { transaction -> (difference: Api.updates.Difference?, finalStatte: AccountReplayedFinalState?, skipBecauseOfError: Bool) in let startTime = CFAbsoluteTimeGetCurrent() let replayedState = replayFinalState( accountManager: accountManager, postbox: postbox, accountPeerId: accountPeerId, mediaBox: mediaBox, encryptionProvider: network.encryptionProvider, transaction: transaction, auxiliaryMethods: auxiliaryMethods, finalState: finalState, removePossiblyDeliveredMessagesUniqueIds: removePossiblyDeliveredMessagesUniqueIds, ignoreDate: true, skipVerification: false ) let deltaTime = CFAbsoluteTimeGetCurrent() - startTime if deltaTime > 1.0 { Logger.shared.log("State", "replayFinalState took \(deltaTime)s") } if let replayedState = replayedState { return (difference, replayedState, false) } else { return (nil, nil, false) } } } } } } } } else { return .single((nil, nil, false)) } } |> deliverOn(self.queue) return signal |> mapToSignal { difference, _, _ -> Signal in if let difference = difference { switch difference { case .differenceSlice: return .single(false) default: return .single(true) } } else { return .single(true) } } } private func insertProcessEvents(_ events: AccountFinalStateEvents) { if !events.isEmpty { let operation = AccountStateManagerOperation(content: .processEvents(self.getNextId(), events)) var inserted = false for i in 0 ..< self.operations.count { if self.operations[i].isRunning { continue } if case .processEvents = self.operations[i].content { continue } self.operations.insert(operation, at: i) inserted = true break } if !inserted { self.operations.append(operation) } } } private func postponePollCompletionOperation(messageIds: [MessageId], subscribers: [(Int32, ([MessageId]) -> Void)]) { self.addOperation(.pollCompletion(self.getNextId(), messageIds, subscribers), position: .last) for i in 0 ..< self.operations.count { if case .pollCompletion = self.operations[i].content { if i != self.operations.count - 1 { assertionFailure() } } } } private func addPollCompletion(_ f: @escaping ([MessageId]) -> Void) -> Int32 { assert(self.queue.isCurrent()) let updatedId: Int32 = self.getNextId() for i in 0 ..< self.operations.count { if case let .pollCompletion(pollId, messageIds, subscribers) = self.operations[i].content { var subscribers = subscribers subscribers.append((updatedId, f)) let operation = AccountStateManagerOperation(content: .pollCompletion(pollId, messageIds, subscribers)) operation.isRunning = self.operations[i].isRunning self.operations[i] = operation return updatedId } } self.addOperation(.pollCompletion(self.getNextId(), [], [(updatedId, f)]), position: .last) return updatedId } private func removePollCompletion(_ id: Int32) { for i in 0 ..< self.operations.count { if case let .pollCompletion(pollId, messages, subscribers) = self.operations[i].content { for j in 0 ..< subscribers.count { if subscribers[j].0 == id { var subscribers = subscribers subscribers.remove(at: j) let operation = AccountStateManagerOperation(content: .pollCompletion(pollId, messages, subscribers)) operation.isRunning = self.operations[i].isRunning self.operations[i] = operation break } } } } } public func pollStateUpdateCompletion() -> Signal<[MessageId], NoError> { return Signal { [weak self] subscriber in let disposable = MetaDisposable() if let strongSelf = self { strongSelf.queue.async { let id = strongSelf.addPollCompletion({ messageIds in subscriber.putNext(messageIds) subscriber.putCompletion() }) disposable.set(ActionDisposable { if let strongSelf = self { strongSelf.queue.async { strongSelf.removePollCompletion(id) } } }) } } return disposable } } public func updatedWebpage(_ webpageId: MediaId) -> Signal { let queue = self.queue return Signal { [weak self] subscriber in let disposable = MetaDisposable() queue.async { if let strongSelf = self { let context: UpdatedWebpageSubscriberContext if let current = strongSelf.updatedWebpageContexts[webpageId] { context = current } else { context = UpdatedWebpageSubscriberContext() strongSelf.updatedWebpageContexts[webpageId] = context } let index = context.subscribers.add({ media in subscriber.putNext(media) }) disposable.set(ActionDisposable { if let strongSelf = self { if let context = strongSelf.updatedWebpageContexts[webpageId] { context.subscribers.remove(index) if context.subscribers.isEmpty { strongSelf.updatedWebpageContexts.removeValue(forKey: webpageId) } } } }) } } return disposable } } private func notifyUpdatedWebpages(_ updatedWebpages: [MediaId: TelegramMediaWebpage]) { for (id, context) in self.updatedWebpageContexts { if let media = updatedWebpages[id] { for subscriber in context.subscribers.copyItems() { subscriber(media) } } } } func notifyAppliedIncomingReadMessages(_ ids: [MessageId]) { self.appliedIncomingReadMessagesPipe.putNext(ids) } public func getDelayNotificatonsUntil() -> Int32? { return self.delayNotificatonsUntil.with { $0 } } func modifyTermsOfServiceUpdate(_ f: @escaping (TermsOfServiceUpdate?) -> (TermsOfServiceUpdate?)) { self.queue.async { let current = self.termsOfServiceUpdateValue.with { $0 } let updated = f(current) if (current != updated) { let _ = self.termsOfServiceUpdateValue.swap(updated) self.termsOfServiceUpdatePromise.set(.single(updated)) } } } func modifyAppUpdateInfo(_ f: @escaping (AppUpdateInfo?) -> (AppUpdateInfo?)) { self.queue.async { let current = self.appUpdateInfoValue.with { $0 } let updated = f(current) if (current != updated) { let _ = self.appUpdateInfoValue.swap(updated) self.appUpdateInfoPromise.set(.single(updated)) } } } public func updatedPeersNearby() -> Signal<[PeerNearby], NoError> { let queue = self.queue return Signal { [weak self] subscriber in let disposable = MetaDisposable() queue.async { if let strongSelf = self { let index = strongSelf.updatedPeersNearbyContext.subscribers.add({ peersNearby in subscriber.putNext(peersNearby) }) disposable.set(ActionDisposable { if let strongSelf = self { strongSelf.updatedPeersNearbyContext.subscribers.remove(index) } }) } } return disposable } } private func notifyUpdatedPeersNearby(_ updatedPeersNearby: [PeerNearby]) { for subscriber in self.updatedPeersNearbyContext.subscribers.copyItems() { subscriber(updatedPeersNearby) } } func notifyDeletedMessages(messageIds: [MessageId]) { self.deletedMessagesPipe.putNext(messageIds.map { .messageId($0) }) } public func processIncomingCallUpdate(data: Data, completion: @escaping ((CallSessionRingingState, CallSession)?) -> Void) { var rawData = data let reader = BufferReader(Buffer(data: data)) if let signature = reader.readInt32(), signature == 0x3072cfa1 { if let compressedData = parseBytes(reader) { if let decompressedData = MTGzip.decompress(compressedData.makeData()) { rawData = decompressedData } } } if let updates = Api.parse(Buffer(data: rawData)) as? Api.Updates { switch updates { case let .updates(updates, _, _, _, _): for update in updates { switch update { case let .updatePhoneCall(phoneCall): if let callSessionManager = self.callSessionManager { callSessionManager.updateSession(phoneCall, completion: { result in completion(result) }) } else { completion(nil) } return default: break } } default: break } } completion(nil) } func removePossiblyDeliveredMessages(uniqueIds: [Int64: PeerId]) { self.queue.async { self.removePossiblyDeliveredMessagesUniqueIds.merge(uniqueIds, uniquingKeysWith: { _, rhs in rhs }) } } } private let impl: QueueLocalObject public let accountPeerId: PeerId public let postbox: Postbox public let network: Network let auxiliaryMethods: AccountAuxiliaryMethods //var transformOutgoingMessageMedia: TransformOutgoingMessageMedia? public var isUpdating: Signal { return self.impl.signalWith { impl, subscriber in return impl.isUpdating.start(next: subscriber.putNext, error: subscriber.putError, completed: subscriber.putCompletion) } } public var notificationMessages: Signal<[([Message], PeerGroupId, Bool, MessageHistoryThreadData?)], NoError> { return self.impl.signalWith { impl, subscriber in return impl.notificationMessages.start(next: subscriber.putNext, error: subscriber.putError, completed: subscriber.putCompletion) } } public var reactionNotifications: Signal<[(reactionAuthor: Peer, reaction: MessageReaction.Reaction, message: Message, timestamp: Int32)], NoError> { return self.impl.signalWith { impl, subscriber in return impl.reactionNotifications.start(next: subscriber.putNext, error: subscriber.putError, completed: subscriber.putCompletion) } } public var displayAlerts: Signal<[(text: String, isDropAuth: Bool)], NoError> { return self.impl.signalWith { impl, subscriber in return impl.displayAlerts.start(next: subscriber.putNext, error: subscriber.putError, completed: subscriber.putCompletion) } } public var dismissBotWebViews: Signal<[Int64], NoError> { return self.impl.signalWith { impl, subscriber in return impl.dismissBotWebViews.start(next: subscriber.putNext, error: subscriber.putError, completed: subscriber.putCompletion) } } var externallyUpdatedPeerIds: Signal<[PeerId], NoError> { return self.impl.signalWith { impl, subscriber in return impl.externallyUpdatedPeerIds.start(next: subscriber.putNext, error: subscriber.putError, completed: subscriber.putCompletion) } } public var termsOfServiceUpdate: Signal { return self.impl.signalWith { impl, subscriber in return impl.termsOfServiceUpdate.start(next: subscriber.putNext, error: subscriber.putError, completed: subscriber.putCompletion) } } public var appUpdateInfo: Signal { return self.impl.signalWith { impl, subscriber in return impl.appUpdateInfo.start(next: subscriber.putNext, error: subscriber.putError, completed: subscriber.putCompletion) } } public var appliedIncomingReadMessages: Signal<[MessageId], NoError> { return self.impl.signalWith { impl, subscriber in return impl.appliedIncomingReadMessages.start(next: subscriber.putNext, error: subscriber.putError, completed: subscriber.putCompletion) } } var significantStateUpdateCompleted: Signal { return self.impl.signalWith { impl, subscriber in return impl.significantStateUpdateCompleted.start(next: subscriber.putNext, error: subscriber.putError, completed: subscriber.putCompletion) } } var authorizationListUpdates: Signal { return self.impl.signalWith { impl, subscriber in return impl.authorizationListUpdates.start(next: subscriber.putNext, error: subscriber.putError, completed: subscriber.putCompletion) } } var threadReadStateUpdates: Signal<(incoming: [MessageId: MessageId.Id], outgoing: [MessageId: MessageId.Id]), NoError> { return self.impl.signalWith { impl, subscriber in return impl.threadReadStateUpdates.start(next: subscriber.putNext, error: subscriber.putError, completed: subscriber.putCompletion) } } public var groupCallParticipantUpdates: Signal<[(Int64, GroupCallParticipantsContext.Update)], NoError> { return self.impl.signalWith { impl, subscriber in return impl.groupCallParticipantUpdates.start(next: subscriber.putNext, error: subscriber.putError, completed: subscriber.putCompletion) } } public var deletedMessages: Signal<[DeletedMessageId], NoError> { return self.impl.signalWith { impl, subscriber in return impl.deletedMessages.start(next: subscriber.putNext, error: subscriber.putError, completed: subscriber.putCompletion) } } var updateConfigRequested: (() -> Void)? var isPremiumUpdated: (() -> Void)? init( accountPeerId: PeerId, accountManager: AccountManager, postbox: Postbox, network: Network, callSessionManager: CallSessionManager?, addIsContactUpdates: @escaping ([(PeerId, Bool)]) -> Void, shouldKeepOnlinePresence: Signal, peerInputActivityManager: PeerInputActivityManager?, auxiliaryMethods: AccountAuxiliaryMethods ) { let queue = Queue(name: "AccountStateManager") self.accountPeerId = accountPeerId self.postbox = postbox self.network = network self.auxiliaryMethods = auxiliaryMethods var updateConfigRequestedImpl: (() -> Void)? var isPremiumUpdatedImpl: (() -> Void)? self.impl = QueueLocalObject(queue: queue, generate: { return Impl( queue: queue, accountPeerId: accountPeerId, accountManager: accountManager, postbox: postbox, network: network, callSessionManager: callSessionManager, addIsContactUpdates: addIsContactUpdates, shouldKeepOnlinePresence: shouldKeepOnlinePresence, peerInputActivityManager: peerInputActivityManager, auxiliaryMethods: auxiliaryMethods, updateConfigRequested: { updateConfigRequestedImpl?() }, isPremiumUpdated: { isPremiumUpdatedImpl?() } ) }) updateConfigRequestedImpl = { [weak self] in self?.updateConfigRequested?() } isPremiumUpdatedImpl = { [weak self] in self?.isPremiumUpdated?() } } func reset() { self.impl.with { impl in impl.reset() } } func addUpdates(_ updates: Api.Updates) { self.impl.with { impl in impl.addUpdates(updates) } } func addUpdateGroups(_ groups: [UpdateGroup]) { self.impl.with { impl in impl.addUpdateGroups(groups) } } func addReplayAsynchronouslyBuiltFinalState(_ finalState: AccountFinalState) -> Signal { return self.impl.signalWith { impl, subscriber in return impl.addReplayAsynchronouslyBuiltFinalState(finalState).start(next: subscriber.putNext, error: subscriber.putError, completed: subscriber.putCompletion) } } func standaloneReplayAsynchronouslyBuiltFinalState(finalState: AccountFinalState) -> Signal { return self.impl.signalWith { impl, subscriber in return impl.standaloneReplayAsynchronouslyBuiltFinalState(finalState: finalState).start(next: subscriber.putNext, error: subscriber.putError, completed: subscriber.putCompletion) } } func notifyAppliedIncomingReadMessages(_ ids: [MessageId]) { self.impl.with { impl in impl.notifyAppliedIncomingReadMessages(ids) } } func modifyAppUpdateInfo(_ f: @escaping (AppUpdateInfo?) -> (AppUpdateInfo?)) { self.impl.with { impl in impl.modifyAppUpdateInfo(f) } } public func pollStateUpdateCompletion() -> Signal<[MessageId], NoError> { return self.impl.signalWith { impl, subscriber in return impl.pollStateUpdateCompletion().start(next: subscriber.putNext, error: subscriber.putError, completed: subscriber.putCompletion) } } func notifyDeletedMessages(messageIds: [MessageId]) { self.impl.with { impl in impl.notifyDeletedMessages(messageIds: messageIds) } } func removePossiblyDeliveredMessages(uniqueIds: [Int64: PeerId]) { self.impl.with { impl in impl.removePossiblyDeliveredMessages(uniqueIds: uniqueIds) } } public func updatedPeersNearby() -> Signal<[PeerNearby], NoError> { return self.impl.signalWith { impl, subscriber in return impl.updatedPeersNearby().start(next: subscriber.putNext, error: subscriber.putError, completed: subscriber.putCompletion) } } func addCustomOperation(_ f: Signal) -> Signal { return self.impl.signalWith { impl, subscriber in return impl.addCustomOperation(f).start(next: subscriber.putNext, error: subscriber.putError, completed: subscriber.putCompletion) } } func modifyTermsOfServiceUpdate(_ f: @escaping (TermsOfServiceUpdate?) -> (TermsOfServiceUpdate?)) { self.impl.with { impl in impl.modifyTermsOfServiceUpdate(f) } } public func updatedWebpage(_ webpageId: MediaId) -> Signal { return self.impl.signalWith { impl, subscriber in return impl.updatedWebpage(webpageId).start(next: subscriber.putNext, error: subscriber.putError, completed: subscriber.putCompletion) } } public func processIncomingCallUpdate(data: Data, completion: @escaping ((CallSessionRingingState, CallSession)?) -> Void) { self.impl.with { impl in impl.processIncomingCallUpdate(data: data, completion: completion) } } public func standalonePollDifference() -> Signal { return self.impl.signalWith { impl, subscriber in return impl.standalonePollDifference().start(next: subscriber.putNext, error: subscriber.putError, completed: subscriber.putCompletion) } } public static func extractIncomingCallUpdate(data: Data) -> IncomingCallUpdate? { var rawData = data let reader = BufferReader(Buffer(data: data)) if let signature = reader.readInt32(), signature == 0x3072cfa1 { if let compressedData = parseBytes(reader) { if let decompressedData = MTGzip.decompress(compressedData.makeData()) { rawData = decompressedData } } } guard let updates = Api.parse(Buffer(data: rawData)) as? Api.Updates else { return nil } switch updates { case let .updates(updates, users, _, _, _): var peers: [Peer] = [] for user in users { peers.append(TelegramUser(user: user)) } for update in updates { switch update { case let .updatePhoneCall(phoneCall): switch phoneCall { case let .phoneCallRequested(_, id, accessHash, date, adminId, _, _, _): guard let peer = peers.first(where: { $0.id == PeerId(namespace: Namespaces.Peer.CloudUser, id: PeerId.Id._internalFromInt64Value(adminId)) }) else { return nil } return IncomingCallUpdate( callId: id, callAccessHash: accessHash, timestamp: date, peer: EnginePeer(peer) ) default: break } default: break } } return nil default: return nil } } } func resolveNotificationSettings(list: [TelegramPeerNotificationSettings], defaultSettings: MessageNotificationSettings) -> (sound: PeerMessageSound, notify: Bool, displayContents: Bool) { var sound: PeerMessageSound = defaultSettings.sound var notify = defaultSettings.enabled var displayContents = defaultSettings.displayPreviews for item in list.reversed() { if case .default = item.messageSound { } else { sound = item.messageSound } switch item.displayPreviews { case .default: break case .show: displayContents = true case .hide: displayContents = false } switch item.muteState { case .default: break case .unmuted: notify = true case let .muted(deadline): let timestamp = Int32(CFAbsoluteTimeGetCurrent() + NSTimeIntervalSince1970) if deadline > timestamp { notify = false } else { notify = true } } } if case .default = sound { sound = defaultCloudPeerNotificationSound } return (sound, notify, displayContents) } public func messagesForNotification(transaction: Transaction, id: MessageId, alwaysReturnMessage: Bool) -> (messages: [Message], notify: Bool, sound: PeerMessageSound, displayContents: Bool, threadData: MessageHistoryThreadData?) { guard let message = transaction.getMessage(id) else { Logger.shared.log("AccountStateManager", "notification message doesn't exist") return ([], false, defaultCloudPeerNotificationSound, false, nil) } var notify = true var muted = false var displayContents = true var threadData: MessageHistoryThreadData? for attribute in message.attributes { if let attribute = attribute as? NotificationInfoMessageAttribute { if attribute.flags.contains(.muted) { muted = true } } if let attribute = attribute as? ReplyMessageAttribute { if let threadId = attribute.threadMessageId { threadData = transaction.getMessageHistoryThreadInfo(peerId: message.id.peerId, threadId: makeMessageThreadId(threadId))?.data.get(MessageHistoryThreadData.self) } } } if threadData == nil, let threadId = message.threadId { threadData = transaction.getMessageHistoryThreadInfo(peerId: message.id.peerId, threadId: threadId)?.data.get(MessageHistoryThreadData.self) } for media in message.media { if let action = media as? TelegramMediaAction { switch action.action { case .groupMigratedToChannel, .channelMigratedFromGroup: notify = false default: break } } } var notificationPeerId = id.peerId let peer = transaction.getPeer(id.peerId) if let peer = peer, let associatedPeerId = peer.associatedPeerId { notificationPeerId = associatedPeerId } if message.personal, let author = message.author { notificationPeerId = author.id } var notificationSettingsStack: [TelegramPeerNotificationSettings] = [] if let threadId = message.threadId, let threadData = transaction.getMessageHistoryThreadInfo(peerId: message.id.peerId, threadId: threadId)?.data.get(MessageHistoryThreadData.self) { notificationSettingsStack.append(threadData.notificationSettings) } if let notificationSettings = transaction.getPeerNotificationSettings(id: notificationPeerId) as? TelegramPeerNotificationSettings { notificationSettingsStack.append(notificationSettings) } let globalNotificationSettings = transaction.getPreferencesEntry(key: PreferencesKeys.globalNotifications)?.get(GlobalNotificationSettings.self) ?? GlobalNotificationSettings.defaultSettings let defaultNotificationSettings: MessageNotificationSettings if id.peerId.namespace == Namespaces.Peer.CloudUser { defaultNotificationSettings = globalNotificationSettings.effective.privateChats } else if id.peerId.namespace == Namespaces.Peer.SecretChat { defaultNotificationSettings = globalNotificationSettings.effective.privateChats displayContents = false } else if id.peerId.namespace == Namespaces.Peer.CloudChannel, let peer = peer as? TelegramChannel, case .broadcast = peer.info { defaultNotificationSettings = globalNotificationSettings.effective.channels } else { defaultNotificationSettings = globalNotificationSettings.effective.groupChats } let (resolvedSound, resolvedNotify, resolvedDisplayContents) = resolveNotificationSettings(list: notificationSettingsStack, defaultSettings: defaultNotificationSettings) var sound = resolvedSound if !resolvedNotify { notify = false } if !resolvedDisplayContents { displayContents = false } if muted { sound = .none } if let channel = message.peers[message.id.peerId] as? TelegramChannel { if !channel.flags.contains(.isForum) { threadData = nil } switch channel.participationStatus { case .kicked, .left: return ([], false, sound, false, threadData) case .member: break } } var foundReadState = false var isUnread = true if let readState = transaction.getCombinedPeerReadState(id.peerId) { if readState.isIncomingMessageIndexRead(message.index) { isUnread = false } foundReadState = true } if !foundReadState { Logger.shared.log("AccountStateManager", "read state for \(id.peerId) is undefined") } var resultMessages: [Message] = [message] var messageGroup: [Message]? if message.forwardInfo != nil && message.sourceReference == nil { messageGroup = transaction.getMessageForwardedGroup(message.id) } else if message.groupingKey != nil { messageGroup = transaction.getMessageGroup(message.id) } if let messageGroup = messageGroup { resultMessages.append(contentsOf: messageGroup.filter({ $0.id != message.id })) } if notify { return (resultMessages, isUnread, sound, displayContents, threadData) } else { return (alwaysReturnMessage ? resultMessages : [], false, sound, displayContents, threadData) } }