import Foundation import Postbox import SwiftSignalKit import TelegramApi import MtProtoKit import SyncCore private enum AccountStateManagerOperationContent { case pollDifference(AccountFinalStateEvents) case collectUpdateGroups([UpdateGroup], Double) case processUpdateGroups([UpdateGroup]) case custom(Int32, Signal) case pollCompletion(Int32, [MessageId], [(Int32, ([MessageId]) -> Void)]) case processEvents(Int32, AccountFinalStateEvents) case replayAsynchronouslyBuiltFinalState(AccountFinalState, () -> Void) } private final class AccountStateManagerOperation { var isRunning: Bool = false let content: AccountStateManagerOperationContent init(content: AccountStateManagerOperationContent) { self.content = content } } private enum AccountStateManagerAddOperationPosition { case first case last } private typealias SignalKitTimer = SwiftSignalKit.Timer private enum CustomOperationEvent { case Next(T) case Error(E) case Completion } private final class UpdatedWebpageSubscriberContext { let subscribers = Bag<(TelegramMediaWebpage) -> Void>() } private final class UpdatedPeersNearbySubscriberContext { let subscribers = Bag<([PeerNearby]) -> Void>() } public enum DeletedMessageId: Hashable { case global(Int32) case messageId(MessageId) } public final class AccountStateManager { private let queue = Queue() private let accountPeerId: PeerId private let accountManager: AccountManager private let postbox: Postbox private let network: Network private let callSessionManager: CallSessionManager private let addIsContactUpdates: ([(PeerId, Bool)]) -> Void private let shouldKeepOnlinePresence: Signal private let peerInputActivityManager: PeerInputActivityManager let auxiliaryMethods: AccountAuxiliaryMethods var transformOutgoingMessageMedia: TransformOutgoingMessageMedia? private var updateService: UpdateMessageService? private let updateServiceDisposable = MetaDisposable() private var operations_: [AccountStateManagerOperation] = [] private var operations: [AccountStateManagerOperation] { get { assert(self.queue.isCurrent()) return self.operations_ } set(value) { assert(self.queue.isCurrent()) self.operations_ = value } } private let operationDisposable = MetaDisposable() private var operationTimer: SignalKitTimer? private var removePossiblyDeliveredMessagesUniqueIds: [Int64: PeerId] = [:] private var nextId: Int32 = 0 private func getNextId() -> Int32 { self.nextId += 1 return self.nextId } private let isUpdatingValue = ValuePromise(false) private var currentIsUpdatingValue = false { didSet { if self.currentIsUpdatingValue != oldValue { self.isUpdatingValue.set(self.currentIsUpdatingValue) } } } public var isUpdating: Signal { return self.isUpdatingValue.get() } private let notificationMessagesPipe = ValuePipe<[([Message], PeerGroupId, Bool)]>() public var notificationMessages: Signal<[([Message], PeerGroupId, Bool)], NoError> { return self.notificationMessagesPipe.signal() } private let displayAlertsPipe = ValuePipe<[(text: String, isDropAuth: Bool)]>() public var displayAlerts: Signal<[(text: String, isDropAuth: Bool)], NoError> { return self.displayAlertsPipe.signal() } private let externallyUpdatedPeerIdsPipe = ValuePipe<[PeerId]>() var externallyUpdatedPeerIds: Signal<[PeerId], NoError> { return self.externallyUpdatedPeerIdsPipe.signal() } private let termsOfServiceUpdateValue = Atomic(value: nil) private let termsOfServiceUpdatePromise = Promise(nil) public var termsOfServiceUpdate: Signal { return self.termsOfServiceUpdatePromise.get() } private let appUpdateInfoValue = Atomic(value: nil) private let appUpdateInfoPromise = Promise(nil) public var appUpdateInfo: Signal { return self.appUpdateInfoPromise.get() } private let appliedIncomingReadMessagesPipe = ValuePipe<[MessageId]>() public var appliedIncomingReadMessages: Signal<[MessageId], NoError> { return self.appliedIncomingReadMessagesPipe.signal() } private let significantStateUpdateCompletedPipe = ValuePipe() var significantStateUpdateCompleted: Signal { return self.significantStateUpdateCompletedPipe.signal() } private let authorizationListUpdatesPipe = ValuePipe() var authorizationListUpdates: Signal { return self.authorizationListUpdatesPipe.signal() } private let threadReadStateUpdatesPipe = ValuePipe<(incoming: [MessageId: MessageId.Id], outgoing: [MessageId: MessageId.Id])>() var threadReadStateUpdates: Signal<(incoming: [MessageId: MessageId.Id], outgoing: [MessageId: MessageId.Id]), NoError> { return self.threadReadStateUpdatesPipe.signal() } private let groupCallParticipantUpdatesPipe = ValuePipe<[(Int64, GroupCallParticipantsContext.Update)]>() public var groupCallParticipantUpdates: Signal<[(Int64, GroupCallParticipantsContext.Update)], NoError> { return self.groupCallParticipantUpdatesPipe.signal() } private let deletedMessagesPipe = ValuePipe<[DeletedMessageId]>() public var deletedMessages: Signal<[DeletedMessageId], NoError> { return self.deletedMessagesPipe.signal() } private var updatedWebpageContexts: [MediaId: UpdatedWebpageSubscriberContext] = [:] private var updatedPeersNearbyContext = UpdatedPeersNearbySubscriberContext() private let delayNotificatonsUntil = Atomic(value: nil) private let appliedMaxMessageIdPromise = Promise(nil) private let appliedMaxMessageIdDisposable = MetaDisposable() private let appliedQtsPromise = Promise(nil) private let appliedQtsDisposable = MetaDisposable() init(accountPeerId: PeerId, accountManager: AccountManager, postbox: Postbox, network: Network, callSessionManager: CallSessionManager, addIsContactUpdates: @escaping ([(PeerId, Bool)]) -> Void, shouldKeepOnlinePresence: Signal, peerInputActivityManager: PeerInputActivityManager, auxiliaryMethods: AccountAuxiliaryMethods) { self.accountPeerId = accountPeerId self.accountManager = accountManager self.postbox = postbox self.network = network self.callSessionManager = callSessionManager self.addIsContactUpdates = addIsContactUpdates self.shouldKeepOnlinePresence = shouldKeepOnlinePresence self.peerInputActivityManager = peerInputActivityManager self.auxiliaryMethods = auxiliaryMethods } deinit { self.updateServiceDisposable.dispose() self.operationDisposable.dispose() self.appliedMaxMessageIdDisposable.dispose() self.appliedQtsDisposable.dispose() } func reset() { self.queue.async { if self.updateService == nil { self.updateService = UpdateMessageService(peerId: self.accountPeerId) self.updateServiceDisposable.set(self.updateService!.pipe.signal().start(next: { [weak self] groups in if let strongSelf = self { strongSelf.addUpdateGroups(groups) } })) self.network.mtProto.add(self.updateService) } self.operationDisposable.set(nil) self.replaceOperations(with: .pollDifference(AccountFinalStateEvents())) self.startFirstOperation() let appliedValues: [(MetaDisposable, Signal, Bool)] = [ (self.appliedMaxMessageIdDisposable, self.appliedMaxMessageIdPromise.get(), true), (self.appliedQtsDisposable, self.appliedQtsPromise.get(), false) ] for (disposable, value, isMaxMessageId) in appliedValues { let network = self.network disposable.set((combineLatest(queue: self.queue, self.shouldKeepOnlinePresence, value) |> mapToSignal { shouldKeepOnlinePresence, value -> Signal in guard let value = value else { return .complete() } if !shouldKeepOnlinePresence { return .complete() } return .single(value) } |> distinctUntilChanged |> mapToSignal { value -> Signal in if isMaxMessageId { return network.request(Api.functions.messages.receivedMessages(maxId: value)) |> ignoreValues |> `catch` { _ -> Signal in return .complete() } } else { if value == 0 { return .complete() } else { return network.request(Api.functions.messages.receivedQueue(maxQts: value)) |> ignoreValues |> `catch` { _ -> Signal in return .complete() } } } }).start()) } } } func addUpdates(_ updates: Api.Updates) { self.queue.async { self.updateService?.addUpdates(updates) } } func addUpdateGroups(_ groups: [UpdateGroup]) { self.queue.async { if let last = self.operations.last { switch last.content { case .pollDifference, .processUpdateGroups, .custom, .pollCompletion, .processEvents, .replayAsynchronouslyBuiltFinalState: self.addOperation(.collectUpdateGroups(groups, 0.0), position: .last) case let .collectUpdateGroups(currentGroups, timeout): let operation = AccountStateManagerOperation(content: .collectUpdateGroups(currentGroups + groups, timeout)) operation.isRunning = last.isRunning self.operations[self.operations.count - 1] = operation self.startFirstOperation() } } else { self.addOperation(.collectUpdateGroups(groups, 0.0), position: .last) } } } func addReplayAsynchronouslyBuiltFinalState(_ finalState: AccountFinalState) -> Signal { return Signal { subscriber in self.queue.async { self.addOperation(.replayAsynchronouslyBuiltFinalState(finalState, { subscriber.putNext(true) subscriber.putCompletion() }), position: .last) } return EmptyDisposable } } func addCustomOperation(_ f: Signal) -> Signal { let pipe = ValuePipe>() return Signal { subscriber in let disposable = pipe.signal().start(next: { event in switch event { case let .Next(next): subscriber.putNext(next) case let .Error(error): subscriber.putError(error) case .Completion: subscriber.putCompletion() } }) let signal = Signal { subscriber in return f.start(next: { next in pipe.putNext(.Next(next)) }, error: { error in pipe.putNext(.Error(error)) subscriber.putCompletion() }, completed: { pipe.putNext(.Completion) subscriber.putCompletion() }) } self.addOperation(.custom(self.getNextId(), signal), position: .last) return disposable } |> runOn(self.queue) } private func replaceOperations(with content: AccountStateManagerOperationContent) { var collectedProcessUpdateGroups: [AccountStateManagerOperationContent] = [] var collectedMessageIds: [MessageId] = [] var collectedPollCompletionSubscribers: [(Int32, ([MessageId]) -> Void)] = [] var collectedReplayAsynchronouslyBuiltFinalState: [(AccountFinalState, () -> Void)] = [] var processEvents: [(Int32, AccountFinalStateEvents)] = [] var customOperations: [(Int32, Signal)] = [] var replacedOperations: [AccountStateManagerOperation] = [] for i in 0 ..< self.operations.count { if self.operations[i].isRunning { replacedOperations.append(self.operations[i]) } else { switch self.operations[i].content { case .processUpdateGroups: collectedProcessUpdateGroups.append(self.operations[i].content) case let .pollCompletion(_, messageIds, subscribers): collectedMessageIds.append(contentsOf: messageIds) collectedPollCompletionSubscribers.append(contentsOf: subscribers) case let .replayAsynchronouslyBuiltFinalState(finalState, completion): collectedReplayAsynchronouslyBuiltFinalState.append((finalState, completion)) case let .processEvents(operationId, events): processEvents.append((operationId, events)) case let .custom(operationId, customSignal): customOperations.append((operationId, customSignal)) default: break } } } replacedOperations.append(contentsOf: collectedProcessUpdateGroups.map { AccountStateManagerOperation(content: $0) }) replacedOperations.append(AccountStateManagerOperation(content: content)) if !collectedPollCompletionSubscribers.isEmpty || !collectedMessageIds.isEmpty { replacedOperations.append(AccountStateManagerOperation(content: .pollCompletion(self.getNextId(), collectedMessageIds, collectedPollCompletionSubscribers))) } for (finalState, completion) in collectedReplayAsynchronouslyBuiltFinalState { replacedOperations.append(AccountStateManagerOperation(content: .replayAsynchronouslyBuiltFinalState(finalState, completion))) } for (operationId, events) in processEvents { replacedOperations.append(AccountStateManagerOperation(content: .processEvents(operationId, events))) } for (operationId, customSignal) in customOperations { replacedOperations.append(AccountStateManagerOperation(content: .custom(operationId, customSignal))) } self.operations.removeAll() self.operations.append(contentsOf: replacedOperations) } private func addOperation(_ content: AccountStateManagerOperationContent, position: AccountStateManagerAddOperationPosition) { self.queue.async { let operation = AccountStateManagerOperation(content: content) switch position { case .first: if self.operations.isEmpty || !self.operations[0].isRunning { self.operations.insert(operation, at: 0) self.startFirstOperation() } else { self.operations.insert(operation, at: 1) } case .last: let begin = self.operations.isEmpty self.operations.append(operation) if begin { self.startFirstOperation() } } } } private func startFirstOperation() { guard let operation = self.operations.first else { return } guard !operation.isRunning else { return } operation.isRunning = true switch operation.content { case let .pollDifference(currentEvents): self.operationTimer?.invalidate() self.currentIsUpdatingValue = true let queue = self.queue let accountManager = self.accountManager let postbox = self.postbox let network = self.network let mediaBox = postbox.mediaBox let accountPeerId = self.accountPeerId let auxiliaryMethods = self.auxiliaryMethods let signal = postbox.stateView() |> mapToSignal { view -> Signal in if let state = view.state as? AuthorizedAccountState { return .single(state) } else { return .complete() } } |> take(1) |> mapToSignal { [weak self] state -> Signal<(difference: Api.updates.Difference?, finalStatte: AccountReplayedFinalState?, skipBecauseOfError: Bool), NoError> in if let authorizedState = state.state { var flags: Int32 = 0 var ptsTotalLimit: Int32? = nil #if DEBUG //flags = 1 << 0 //ptsTotalLimit = 1000 #endif let request = network.request(Api.functions.updates.getDifference(flags: flags, pts: authorizedState.pts, ptsTotalLimit: ptsTotalLimit, date: authorizedState.date, qts: authorizedState.qts)) |> map(Optional.init) |> `catch` { error -> Signal in if error.errorCode == 406 && error.errorDescription == "AUTH_KEY_DUPLICATED" { return .single(nil) } else { return .fail(error) } } |> retryRequest return request |> mapToSignal { difference -> Signal<(difference: Api.updates.Difference?, finalStatte: AccountReplayedFinalState?, skipBecauseOfError: Bool), NoError> in guard let difference = difference else { return .single((nil, nil, true)) } switch difference { case .differenceTooLong: preconditionFailure() /*return accountStateReset(postbox: postbox, network: network, accountPeerId: accountPeerId) |> mapToSignal { _ -> Signal<(Api.updates.Difference?, AccountReplayedFinalState?), NoError> in return .complete() } |> then(.single((nil, nil)))*/ default: return initialStateWithDifference(postbox: postbox, difference: difference) |> mapToSignal { state -> Signal<(difference: Api.updates.Difference?, finalStatte: AccountReplayedFinalState?, skipBecauseOfError: Bool), NoError> in if state.initialState.state != authorizedState { Logger.shared.log("State", "pollDifference initial state \(authorizedState) != current state \(state.initialState.state)") return .single((nil, nil, false)) } else { return finalStateWithDifference(postbox: postbox, network: network, state: state, difference: difference) |> deliverOn(queue) |> mapToSignal { finalState -> Signal<(difference: Api.updates.Difference?, finalStatte: AccountReplayedFinalState?, skipBecauseOfError: Bool), NoError> in if !finalState.state.preCachedResources.isEmpty { for (resource, data) in finalState.state.preCachedResources { mediaBox.storeResourceData(resource.id, data: data) } } let removePossiblyDeliveredMessagesUniqueIds = self?.removePossiblyDeliveredMessagesUniqueIds ?? Dictionary() return postbox.transaction { transaction -> (difference: Api.updates.Difference?, finalStatte: AccountReplayedFinalState?, skipBecauseOfError: Bool) in let startTime = CFAbsoluteTimeGetCurrent() let replayedState = replayFinalState(accountManager: accountManager, postbox: postbox, accountPeerId: accountPeerId, mediaBox: mediaBox, encryptionProvider: network.encryptionProvider, transaction: transaction, auxiliaryMethods: auxiliaryMethods, finalState: finalState, removePossiblyDeliveredMessagesUniqueIds: removePossiblyDeliveredMessagesUniqueIds) let deltaTime = CFAbsoluteTimeGetCurrent() - startTime if deltaTime > 1.0 { Logger.shared.log("State", "replayFinalState took \(deltaTime)s") } if let replayedState = replayedState { return (difference, replayedState, false) } else { return (nil, nil, false) } } } } } } } } else { let appliedState = network.request(Api.functions.updates.getState()) |> retryRequest |> mapToSignal { state in return postbox.transaction { transaction -> (difference: Api.updates.Difference?, finalStatte: AccountReplayedFinalState?, skipBecauseOfError: Bool) in if let currentState = transaction.getState() as? AuthorizedAccountState { switch state { case let .state(pts, qts, date, seq, _): transaction.setState(currentState.changedState(AuthorizedAccountState.State(pts: pts, qts: qts, date: date, seq: seq))) } } return (nil, nil, false) } } return appliedState } } |> deliverOn(self.queue) let _ = signal.start(next: { [weak self] difference, finalState, skipBecauseOfError in if let strongSelf = self { if case .pollDifference = strongSelf.operations.removeFirst().content { let events: AccountFinalStateEvents if let finalState = finalState { events = currentEvents.union(with: AccountFinalStateEvents(state: finalState)) } else { events = currentEvents } if let difference = difference { switch difference { case .differenceSlice: strongSelf.addOperation(.pollDifference(events), position: .first) default: if !events.isEmpty { strongSelf.insertProcessEvents(events) } strongSelf.currentIsUpdatingValue = false strongSelf.significantStateUpdateCompletedPipe.putNext(Void()) } } else if skipBecauseOfError { if !events.isEmpty { strongSelf.insertProcessEvents(events) } } else { if !events.isEmpty { strongSelf.insertProcessEvents(events) } strongSelf.replaceOperations(with: .pollDifference(AccountFinalStateEvents())) } strongSelf.startFirstOperation() } else { assertionFailure() } } }, error: { _ in assertionFailure() Logger.shared.log("AccountStateManager", "processUpdateGroups signal completed with error") }) case let .collectUpdateGroups(_, timeout): self.operationTimer?.invalidate() let operationTimer = SignalKitTimer(timeout: timeout, repeat: false, completion: { [weak self] in if let strongSelf = self { let firstOperation = strongSelf.operations.removeFirst() if case let .collectUpdateGroups(groups, _) = firstOperation.content { if timeout.isEqual(to: 0.0) { strongSelf.addOperation(.processUpdateGroups(groups), position: .first) } else { Logger.shared.log("AccountStateManager", "timeout while waiting for updates") strongSelf.replaceOperations(with: .pollDifference(AccountFinalStateEvents())) } strongSelf.startFirstOperation() } else { assertionFailure() } } }, queue: self.queue) self.operationTimer = operationTimer operationTimer.start() case let .processUpdateGroups(groups): self.operationTimer?.invalidate() let accountManager = self.accountManager let postbox = self.postbox let network = self.network let auxiliaryMethods = self.auxiliaryMethods let accountPeerId = self.accountPeerId let mediaBox = postbox.mediaBox let queue = self.queue let signal = initialStateWithUpdateGroups(postbox: postbox, groups: groups) |> mapToSignal { [weak self] state -> Signal<(AccountReplayedFinalState?, AccountFinalState), NoError> in return finalStateWithUpdateGroups(postbox: postbox, network: network, state: state, groups: groups) |> deliverOn(queue) |> mapToSignal { finalState in if !finalState.discard && !finalState.state.preCachedResources.isEmpty { for (resource, data) in finalState.state.preCachedResources { postbox.mediaBox.storeResourceData(resource.id, data: data) } } let removePossiblyDeliveredMessagesUniqueIds = self?.removePossiblyDeliveredMessagesUniqueIds ?? Dictionary() return postbox.transaction { transaction -> AccountReplayedFinalState? in if finalState.discard { return nil } else { let startTime = CFAbsoluteTimeGetCurrent() let result = replayFinalState(accountManager: accountManager, postbox: postbox, accountPeerId: accountPeerId, mediaBox: mediaBox, encryptionProvider: network.encryptionProvider, transaction: transaction, auxiliaryMethods: auxiliaryMethods, finalState: finalState, removePossiblyDeliveredMessagesUniqueIds: removePossiblyDeliveredMessagesUniqueIds) let deltaTime = CFAbsoluteTimeGetCurrent() - startTime if deltaTime > 1.0 { Logger.shared.log("State", "replayFinalState took \(deltaTime)s") } return result } } |> map({ ($0, finalState) }) |> deliverOn(queue) } } let _ = signal.start(next: { [weak self] replayedState, finalState in if let strongSelf = self { if case let .processUpdateGroups(groups) = strongSelf.operations.removeFirst().content { if let replayedState = replayedState, !finalState.shouldPoll { let events = AccountFinalStateEvents(state: replayedState) if !events.isEmpty { strongSelf.insertProcessEvents(events) } if finalState.incomplete { strongSelf.addOperation(.collectUpdateGroups(groups, 2.0), position: .last) } } else { if let replayedState = replayedState { let events = AccountFinalStateEvents(state: replayedState) if !events.displayAlerts.isEmpty { strongSelf.insertProcessEvents(AccountFinalStateEvents(displayAlerts: events.displayAlerts)) } } strongSelf.replaceOperations(with: .pollDifference(AccountFinalStateEvents())) } strongSelf.startFirstOperation() } else { assertionFailure() } } }, error: { _ in assertionFailure() Logger.shared.log("AccountStateManager", "processUpdateGroups signal completed with error") }) case let .custom(operationId, signal): self.operationTimer?.invalidate() let completed: () -> Void = { [weak self] in if let strongSelf = self { let topOperation = strongSelf.operations.removeFirst() if case .custom(operationId, _) = topOperation.content { strongSelf.startFirstOperation() } else { assertionFailure() } } } let _ = (signal |> deliverOn(self.queue)).start(error: { _ in completed() }, completed: { completed() }) case let .processEvents(operationId, events): self.operationTimer?.invalidate() let completed: () -> Void = { [weak self] in if let strongSelf = self { let topOperation = strongSelf.operations.removeFirst() if case .processEvents(operationId, _) = topOperation.content { if !events.updatedTypingActivities.isEmpty { strongSelf.peerInputActivityManager.transaction { manager in for (chatPeerId, peerActivities) in events.updatedTypingActivities { for (peerId, activity) in peerActivities { if let activity = activity { manager.addActivity(chatPeerId: chatPeerId, peerId: peerId, activity: activity) } else { manager.removeAllActivities(chatPeerId: chatPeerId, peerId: peerId) } } } } } if !events.updatedWebpages.isEmpty { strongSelf.notifyUpdatedWebpages(events.updatedWebpages) } if let updatedPeersNearby = events.updatedPeersNearby { strongSelf.notifyUpdatedPeersNearby(updatedPeersNearby) } if !events.updatedCalls.isEmpty { for call in events.updatedCalls { strongSelf.callSessionManager.updateSession(call, completion: { _ in }) } } if !events.addedCallSignalingData.isEmpty { for (id, data) in events.addedCallSignalingData { strongSelf.callSessionManager.addCallSignalingData(id: id, data: data) } } if !events.updatedGroupCallParticipants.isEmpty { strongSelf.groupCallParticipantUpdatesPipe.putNext(events.updatedGroupCallParticipants) } if !events.updatedIncomingThreadReadStates.isEmpty || !events.updatedOutgoingThreadReadStates.isEmpty { strongSelf.threadReadStateUpdatesPipe.putNext((events.updatedIncomingThreadReadStates, events.updatedOutgoingThreadReadStates)) } if !events.isContactUpdates.isEmpty { strongSelf.addIsContactUpdates(events.isContactUpdates) } if let updatedMaxMessageId = events.updatedMaxMessageId { strongSelf.appliedMaxMessageIdPromise.set(.single(updatedMaxMessageId)) } if let updatedQts = events.updatedQts { strongSelf.appliedQtsPromise.set(.single(updatedQts)) } var pollCount = 0 for i in 0 ..< strongSelf.operations.count { if case let .pollCompletion(pollId, messageIds, subscribers) = strongSelf.operations[i].content { pollCount += 1 var updatedMessageIds = messageIds updatedMessageIds.append(contentsOf: events.addedIncomingMessageIds) let operation = AccountStateManagerOperation(content: .pollCompletion(pollId, updatedMessageIds, subscribers)) operation.isRunning = strongSelf.operations[i].isRunning strongSelf.operations[i] = operation } } assert(pollCount <= 1) strongSelf.startFirstOperation() } else { assertionFailure() } } } if events.delayNotificatonsUntil != nil { let _ = self.delayNotificatonsUntil.swap(events.delayNotificatonsUntil) } let signal = self.postbox.transaction { transaction -> [([Message], PeerGroupId, Bool)] in var messageList: [([Message], PeerGroupId, Bool)] = [] for id in events.addedIncomingMessageIds { let (messages, notify, _, _) = messagesForNotification(transaction: transaction, id: id, alwaysReturnMessage: false) if !messages.isEmpty { messageList.append((messages, .root, notify)) } } var wasScheduledMessages: [Message] = [] for id in events.wasScheduledMessageIds { if let message = transaction.getMessage(id) { wasScheduledMessages.append(message) } } if !wasScheduledMessages.isEmpty { messageList.append((wasScheduledMessages, .root, true)) } return messageList } let _ = (signal |> deliverOn(self.queue)).start(next: { [weak self] messages in if let strongSelf = self { strongSelf.notificationMessagesPipe.putNext(messages) } }, error: { _ in completed() }, completed: { completed() }) if !events.displayAlerts.isEmpty { self.displayAlertsPipe.putNext(events.displayAlerts) } if !events.externallyUpdatedPeerId.isEmpty { self.externallyUpdatedPeerIdsPipe.putNext(Array(events.externallyUpdatedPeerId)) } if events.authorizationListUpdated { self.authorizationListUpdatesPipe.putNext(Void()) } if !events.deletedMessageIds.isEmpty { self.deletedMessagesPipe.putNext(events.deletedMessageIds) } case let .pollCompletion(pollId, preMessageIds, preSubscribers): if self.operations.count > 1 { self.operations.removeFirst() self.postponePollCompletionOperation(messageIds: preMessageIds, subscribers: preSubscribers) self.startFirstOperation() } else { self.operationTimer?.invalidate() let signal = self.network.request(Api.functions.help.test()) |> deliverOn(self.queue) let completed: () -> Void = { [weak self] in if let strongSelf = self { let topOperation = strongSelf.operations.removeFirst() if case let .pollCompletion(topPollId, messageIds, subscribers) = topOperation.content { assert(topPollId == pollId) if strongSelf.operations.isEmpty { for (_, f) in subscribers { f(messageIds) } } else { strongSelf.postponePollCompletionOperation(messageIds: messageIds, subscribers: subscribers) } strongSelf.startFirstOperation() } else { assertionFailure() } } } let _ = (signal |> deliverOn(self.queue)).start(error: { _ in completed() }, completed: { completed() }) } case let .replayAsynchronouslyBuiltFinalState(finalState, completion): if !finalState.state.preCachedResources.isEmpty { for (resource, data) in finalState.state.preCachedResources { self.postbox.mediaBox.storeResourceData(resource.id, data: data) } } let accountPeerId = self.accountPeerId let accountManager = self.accountManager let postbox = self.postbox let mediaBox = self.postbox.mediaBox let network = self.network let auxiliaryMethods = self.auxiliaryMethods let removePossiblyDeliveredMessagesUniqueIds = self.removePossiblyDeliveredMessagesUniqueIds let signal = self.postbox.transaction { transaction -> AccountReplayedFinalState? in let startTime = CFAbsoluteTimeGetCurrent() let result = replayFinalState(accountManager: accountManager, postbox: postbox, accountPeerId: accountPeerId, mediaBox: mediaBox, encryptionProvider: network.encryptionProvider, transaction: transaction, auxiliaryMethods: auxiliaryMethods, finalState: finalState, removePossiblyDeliveredMessagesUniqueIds: removePossiblyDeliveredMessagesUniqueIds) let deltaTime = CFAbsoluteTimeGetCurrent() - startTime if deltaTime > 1.0 { Logger.shared.log("State", "replayFinalState took \(deltaTime)s") } return result } |> map({ ($0, finalState) }) |> deliverOn(self.queue) let _ = signal.start(next: { [weak self] replayedState, finalState in if let strongSelf = self { if case .replayAsynchronouslyBuiltFinalState = strongSelf.operations.removeFirst().content { if let replayedState = replayedState { let events = AccountFinalStateEvents(state: replayedState) if !events.isEmpty { strongSelf.insertProcessEvents(events) } } strongSelf.startFirstOperation() } else { assertionFailure() } completion() } }, error: { _ in assertionFailure() Logger.shared.log("AccountStateManager", "processUpdateGroups signal completed with error") completion() }) } } private func insertProcessEvents(_ events: AccountFinalStateEvents) { if !events.isEmpty { let operation = AccountStateManagerOperation(content: .processEvents(self.getNextId(), events)) var inserted = false for i in 0 ..< self.operations.count { if self.operations[i].isRunning { continue } if case .processEvents = self.operations[i].content { continue } self.operations.insert(operation, at: i) inserted = true break } if !inserted { self.operations.append(operation) } } } private func postponePollCompletionOperation(messageIds: [MessageId], subscribers: [(Int32, ([MessageId]) -> Void)]) { self.addOperation(.pollCompletion(self.getNextId(), messageIds, subscribers), position: .last) for i in 0 ..< self.operations.count { if case .pollCompletion = self.operations[i].content { if i != self.operations.count - 1 { assertionFailure() } } } } private func addPollCompletion(_ f: @escaping ([MessageId]) -> Void) -> Int32 { assert(self.queue.isCurrent()) let updatedId: Int32 = self.getNextId() for i in 0 ..< self.operations.count { if case let .pollCompletion(pollId, messageIds, subscribers) = self.operations[i].content { var subscribers = subscribers subscribers.append((updatedId, f)) let operation = AccountStateManagerOperation(content: .pollCompletion(pollId, messageIds, subscribers)) operation.isRunning = self.operations[i].isRunning self.operations[i] = operation return updatedId } } self.addOperation(.pollCompletion(self.getNextId(), [], [(updatedId, f)]), position: .last) return updatedId } private func removePollCompletion(_ id: Int32) { for i in 0 ..< self.operations.count { if case let .pollCompletion(pollId, messages, subscribers) = self.operations[i].content { for j in 0 ..< subscribers.count { if subscribers[j].0 == id { var subscribers = subscribers subscribers.remove(at: j) let operation = AccountStateManagerOperation(content: .pollCompletion(pollId, messages, subscribers)) operation.isRunning = self.operations[i].isRunning self.operations[i] = operation break } } } } } public func pollStateUpdateCompletion() -> Signal<[MessageId], NoError> { return Signal { [weak self] subscriber in let disposable = MetaDisposable() if let strongSelf = self { strongSelf.queue.async { let id = strongSelf.addPollCompletion({ messageIds in subscriber.putNext(messageIds) subscriber.putCompletion() }) disposable.set(ActionDisposable { if let strongSelf = self { strongSelf.queue.async { strongSelf.removePollCompletion(id) } } }) } } return disposable } } public func updatedWebpage(_ webpageId: MediaId) -> Signal { let queue = self.queue return Signal { [weak self] subscriber in let disposable = MetaDisposable() queue.async { if let strongSelf = self { let context: UpdatedWebpageSubscriberContext if let current = strongSelf.updatedWebpageContexts[webpageId] { context = current } else { context = UpdatedWebpageSubscriberContext() strongSelf.updatedWebpageContexts[webpageId] = context } let index = context.subscribers.add({ media in subscriber.putNext(media) }) disposable.set(ActionDisposable { if let strongSelf = self { if let context = strongSelf.updatedWebpageContexts[webpageId] { context.subscribers.remove(index) if context.subscribers.isEmpty { strongSelf.updatedWebpageContexts.removeValue(forKey: webpageId) } } } }) } } return disposable } } private func notifyUpdatedWebpages(_ updatedWebpages: [MediaId: TelegramMediaWebpage]) { for (id, context) in self.updatedWebpageContexts { if let media = updatedWebpages[id] { for subscriber in context.subscribers.copyItems() { subscriber(media) } } } } func notifyAppliedIncomingReadMessages(_ ids: [MessageId]) { self.appliedIncomingReadMessagesPipe.putNext(ids) } public func getDelayNotificatonsUntil() -> Int32? { return self.delayNotificatonsUntil.with { $0 } } func modifyTermsOfServiceUpdate(_ f: @escaping (TermsOfServiceUpdate?) -> (TermsOfServiceUpdate?)) { self.queue.async { let current = self.termsOfServiceUpdateValue.with { $0 } let updated = f(current) if (current != updated) { let _ = self.termsOfServiceUpdateValue.swap(updated) self.termsOfServiceUpdatePromise.set(.single(updated)) } } } func modifyAppUpdateInfo(_ f: @escaping (AppUpdateInfo?) -> (AppUpdateInfo?)) { self.queue.async { let current = self.appUpdateInfoValue.with { $0 } let updated = f(current) if (current != updated) { let _ = self.appUpdateInfoValue.swap(updated) self.appUpdateInfoPromise.set(.single(updated)) } } } public func updatedPeersNearby() -> Signal<[PeerNearby], NoError> { let queue = self.queue return Signal { [weak self] subscriber in let disposable = MetaDisposable() queue.async { if let strongSelf = self { let index = strongSelf.updatedPeersNearbyContext.subscribers.add({ peersNearby in subscriber.putNext(peersNearby) }) disposable.set(ActionDisposable { if let strongSelf = self { strongSelf.updatedPeersNearbyContext.subscribers.remove(index) } }) } } return disposable } } private func notifyUpdatedPeersNearby(_ updatedPeersNearby: [PeerNearby]) { for subscriber in self.updatedPeersNearbyContext.subscribers.copyItems() { subscriber(updatedPeersNearby) } } func notifyDeletedMessages(messageIds: [MessageId]) { self.deletedMessagesPipe.putNext(messageIds.map { .messageId($0) }) } public func processIncomingCallUpdate(data: Data, completion: @escaping ((CallSessionRingingState, CallSession)?) -> Void) { var rawData = data let reader = BufferReader(Buffer(data: data)) if let signature = reader.readInt32(), signature == 0x3072cfa1 { if let compressedData = parseBytes(reader) { if let decompressedData = MTGzip.decompress(compressedData.makeData()) { rawData = decompressedData } } } if let updates = Api.parse(Buffer(data: rawData)) as? Api.Updates { switch updates { case let .updates(updates, users, chats, date, seq): for update in updates { switch update { case let .updatePhoneCall(phoneCall): self.callSessionManager.updateSession(phoneCall, completion: { result in completion(result) }) return default: break } } default: break } } completion(nil) } func removePossiblyDeliveredMessages(uniqueIds: [Int64: PeerId]) { self.queue.async { self.removePossiblyDeliveredMessagesUniqueIds.merge(uniqueIds, uniquingKeysWith: { _, rhs in rhs }) } } } public func messagesForNotification(transaction: Transaction, id: MessageId, alwaysReturnMessage: Bool) -> (messages: [Message], notify: Bool, sound: PeerMessageSound, displayContents: Bool) { guard let message = transaction.getMessage(id) else { Logger.shared.log("AccountStateManager", "notification message doesn't exist") return ([], false, .bundledModern(id: 0), false) } var notify = true var sound: PeerMessageSound = .bundledModern(id: 0) var muted = false var displayContents = true for attribute in message.attributes { if let attribute = attribute as? NotificationInfoMessageAttribute { if attribute.flags.contains(.muted) { muted = true } } } for media in message.media { if let action = media as? TelegramMediaAction { switch action.action { case .groupMigratedToChannel, .channelMigratedFromGroup: notify = false default: break } } } let timestamp = Int32(CFAbsoluteTimeGetCurrent() + NSTimeIntervalSince1970) var notificationPeerId = id.peerId let peer = transaction.getPeer(id.peerId) if let peer = peer, let associatedPeerId = peer.associatedPeerId { notificationPeerId = associatedPeerId } if message.personal, let author = message.author { notificationPeerId = author.id } if let notificationSettings = transaction.getPeerNotificationSettings(notificationPeerId) as? TelegramPeerNotificationSettings { var defaultSound: PeerMessageSound = .bundledModern(id: 0) var defaultNotify: Bool = true if let globalNotificationSettings = transaction.getPreferencesEntry(key: PreferencesKeys.globalNotifications) as? GlobalNotificationSettings { if id.peerId.namespace == Namespaces.Peer.CloudUser { defaultNotify = globalNotificationSettings.effective.privateChats.enabled defaultSound = globalNotificationSettings.effective.privateChats.sound displayContents = globalNotificationSettings.effective.privateChats.displayPreviews } else if id.peerId.namespace == Namespaces.Peer.SecretChat { defaultNotify = globalNotificationSettings.effective.privateChats.enabled defaultSound = globalNotificationSettings.effective.privateChats.sound displayContents = false } else if id.peerId.namespace == Namespaces.Peer.CloudChannel, let peer = peer as? TelegramChannel, case .broadcast = peer.info { defaultNotify = globalNotificationSettings.effective.channels.enabled defaultSound = globalNotificationSettings.effective.channels.sound displayContents = globalNotificationSettings.effective.channels.displayPreviews } else { defaultNotify = globalNotificationSettings.effective.groupChats.enabled defaultSound = globalNotificationSettings.effective.groupChats.sound displayContents = globalNotificationSettings.effective.groupChats.displayPreviews } } switch notificationSettings.muteState { case .default: if !defaultNotify { notify = false } case let .muted(until): if until >= timestamp { notify = false } case .unmuted: break } if case .default = notificationSettings.messageSound { sound = defaultSound } else { sound = notificationSettings.messageSound } } else { Logger.shared.log("AccountStateManager", "notification settings for \(notificationPeerId) are undefined") } if muted { sound = .none } if let channel = message.peers[message.id.peerId] as? TelegramChannel { switch channel.participationStatus { case .kicked, .left: return ([], false, sound, false) case .member: break } } var foundReadState = false var isUnread = true if let readState = transaction.getCombinedPeerReadState(id.peerId) { if readState.isIncomingMessageIndexRead(message.index) { isUnread = false } foundReadState = true } if !foundReadState { Logger.shared.log("AccountStateManager", "read state for \(id.peerId) is undefined") } var resultMessages: [Message] = [message] var messageGroup: [Message]? if message.forwardInfo != nil && message.sourceReference == nil { messageGroup = transaction.getMessageForwardedGroup(message.id) } else if message.groupingKey != nil { messageGroup = transaction.getMessageGroup(message.id) } if let messageGroup = messageGroup { resultMessages.append(contentsOf: messageGroup.filter({ $0.id != message.id })) } if notify { return (resultMessages, isUnread, sound, displayContents) } else { return (alwaysReturnMessage ? resultMessages : [], false, sound, displayContents) } }