import Foundation
import Postbox
import SwiftSignalKit
import TelegramApi
import MtProtoKit


private enum AccountStateManagerOperationContent {
    case pollDifference(AccountFinalStateEvents)
    case collectUpdateGroups([UpdateGroup], Double)
    case processUpdateGroups([UpdateGroup])
    case custom(Int32, Signal<Void, NoError>)
    case pollCompletion(Int32, [MessageId], [(Int32, ([MessageId]) -> Void)])
    case processEvents(Int32, AccountFinalStateEvents)
    case replayAsynchronouslyBuiltFinalState(AccountFinalState, () -> Void)
}

private final class AccountStateManagerOperation {
    var isRunning: Bool = false
    let content: AccountStateManagerOperationContent
    
    init(content: AccountStateManagerOperationContent) {
        self.content = content
    }
}

private enum AccountStateManagerAddOperationPosition {
    case first
    case last
}

private typealias SignalKitTimer = SwiftSignalKit.Timer

private enum CustomOperationEvent<T, E> {
    case Next(T)
    case Error(E)
    case Completion
}

private final class UpdatedWebpageSubscriberContext {
    let subscribers = Bag<(TelegramMediaWebpage) -> Void>()
}

private final class UpdatedPeersNearbySubscriberContext {
    let subscribers = Bag<([PeerNearby]) -> Void>()
}

public enum DeletedMessageId: Hashable {
    case global(Int32)
    case messageId(MessageId)
}

public final class AccountStateManager {
    private let queue = Queue()
    public let accountPeerId: PeerId
    private let accountManager: AccountManager<TelegramAccountManagerTypes>
    public let postbox: Postbox
    public let network: Network
    private let callSessionManager: CallSessionManager?
    private let addIsContactUpdates: ([(PeerId, Bool)]) -> Void
    private let shouldKeepOnlinePresence: Signal<Bool, NoError>
    
    private let peerInputActivityManager: PeerInputActivityManager?
    let auxiliaryMethods: AccountAuxiliaryMethods
    var transformOutgoingMessageMedia: TransformOutgoingMessageMedia?
    
    private var updateService: UpdateMessageService?
    private let updateServiceDisposable = MetaDisposable()
    
    private var operations_: [AccountStateManagerOperation] = []
    private var operations: [AccountStateManagerOperation] {
        get {
            assert(self.queue.isCurrent())
            return self.operations_
        } set(value) {
            assert(self.queue.isCurrent())
            self.operations_ = value
        }
    }
    private let operationDisposable = MetaDisposable()
    private var operationTimer: SignalKitTimer?
    
    private var removePossiblyDeliveredMessagesUniqueIds: [Int64: PeerId] = [:]
    
    private var nextId: Int32 = 0
    private func getNextId() -> Int32 {
        self.nextId += 1
        return self.nextId
    }
    
    private let isUpdatingValue = ValuePromise<Bool>(false)
    private var currentIsUpdatingValue = false {
        didSet {
            if self.currentIsUpdatingValue != oldValue {
                self.isUpdatingValue.set(self.currentIsUpdatingValue)
            }
        }
    }
    public var isUpdating: Signal<Bool, NoError> {
        return self.isUpdatingValue.get()
    }
    
    private let notificationMessagesPipe = ValuePipe<[([Message], PeerGroupId, Bool)]>()
    public var notificationMessages: Signal<[([Message], PeerGroupId, Bool)], NoError> {
        return self.notificationMessagesPipe.signal()
    }
    
    private let reactionNotificationsPipe = ValuePipe<[(reactionAuthor: Peer, message: Message)]>()
    public var reactionNotifications: Signal<[(reactionAuthor: Peer, message: Message)], NoError> {
        return self.reactionNotificationsPipe.signal()
    }
    
    private let displayAlertsPipe = ValuePipe<[(text: String, isDropAuth: Bool)]>()
    public var displayAlerts: Signal<[(text: String, isDropAuth: Bool)], NoError> {
        return self.displayAlertsPipe.signal()
    }
    
    private let externallyUpdatedPeerIdsPipe = ValuePipe<[PeerId]>()
    var externallyUpdatedPeerIds: Signal<[PeerId], NoError> {
        return self.externallyUpdatedPeerIdsPipe.signal()
    }
    
    private let termsOfServiceUpdateValue = Atomic<TermsOfServiceUpdate?>(value: nil)
    private let termsOfServiceUpdatePromise = Promise<TermsOfServiceUpdate?>(nil)
    public var termsOfServiceUpdate: Signal<TermsOfServiceUpdate?, NoError> {
        return self.termsOfServiceUpdatePromise.get()
    }
    
    private let appUpdateInfoValue = Atomic<AppUpdateInfo?>(value: nil)
    private let appUpdateInfoPromise = Promise<AppUpdateInfo?>(nil)
    public var appUpdateInfo: Signal<AppUpdateInfo?, NoError> {
        return self.appUpdateInfoPromise.get()
    }
    
    private let appliedIncomingReadMessagesPipe = ValuePipe<[MessageId]>()
    public var appliedIncomingReadMessages: Signal<[MessageId], NoError> {
        return self.appliedIncomingReadMessagesPipe.signal()
    }
    
    private let significantStateUpdateCompletedPipe = ValuePipe<Void>()
    var significantStateUpdateCompleted: Signal<Void, NoError> {
        return self.significantStateUpdateCompletedPipe.signal()
    }
    
    private let authorizationListUpdatesPipe = ValuePipe<Void>()
    var authorizationListUpdates: Signal<Void, NoError> {
        return self.authorizationListUpdatesPipe.signal()
    }
    
    private let threadReadStateUpdatesPipe = ValuePipe<(incoming: [MessageId: MessageId.Id], outgoing: [MessageId: MessageId.Id])>()
    var threadReadStateUpdates: Signal<(incoming: [MessageId: MessageId.Id], outgoing: [MessageId: MessageId.Id]), NoError> {
        return self.threadReadStateUpdatesPipe.signal()
    }
    
    private let groupCallParticipantUpdatesPipe = ValuePipe<[(Int64, GroupCallParticipantsContext.Update)]>()
    public var groupCallParticipantUpdates: Signal<[(Int64, GroupCallParticipantsContext.Update)], NoError> {
        return self.groupCallParticipantUpdatesPipe.signal()
    }
    
    private let deletedMessagesPipe = ValuePipe<[DeletedMessageId]>()
    public var deletedMessages: Signal<[DeletedMessageId], NoError> {
        return self.deletedMessagesPipe.signal()
    }
    
    private var updatedWebpageContexts: [MediaId: UpdatedWebpageSubscriberContext] = [:]
    private var updatedPeersNearbyContext = UpdatedPeersNearbySubscriberContext()
    
    private let delayNotificatonsUntil = Atomic<Int32?>(value: nil)
    private let appliedMaxMessageIdPromise = Promise<Int32?>(nil)
    private let appliedMaxMessageIdDisposable = MetaDisposable()
    private let appliedQtsPromise = Promise<Int32?>(nil)
    private let appliedQtsDisposable = MetaDisposable()
    
    init(
        accountPeerId: PeerId,
        accountManager: AccountManager<TelegramAccountManagerTypes>,
        postbox: Postbox,
        network: Network,
        callSessionManager: CallSessionManager?,
        addIsContactUpdates: @escaping ([(PeerId, Bool)]) -> Void,
        shouldKeepOnlinePresence: Signal<Bool, NoError>,
        peerInputActivityManager: PeerInputActivityManager?,
        auxiliaryMethods: AccountAuxiliaryMethods
    ) {
        self.accountPeerId = accountPeerId
        self.accountManager = accountManager
        self.postbox = postbox
        self.network = network
        self.callSessionManager = callSessionManager
        self.addIsContactUpdates = addIsContactUpdates
        self.shouldKeepOnlinePresence = shouldKeepOnlinePresence
        self.peerInputActivityManager = peerInputActivityManager
        self.auxiliaryMethods = auxiliaryMethods
    }
    
    deinit {
        self.updateServiceDisposable.dispose()
        self.operationDisposable.dispose()
        self.appliedMaxMessageIdDisposable.dispose()
        self.appliedQtsDisposable.dispose()
    }
    
    public func reset() {
        self.queue.async {
            if self.updateService == nil {
                self.updateService = UpdateMessageService(peerId: self.accountPeerId)
                self.updateServiceDisposable.set(self.updateService!.pipe.signal().start(next: { [weak self] groups in
                    if let strongSelf = self {
                        strongSelf.addUpdateGroups(groups)
                    }
                }))
                self.network.mtProto.add(self.updateService)
            }
            self.operationDisposable.set(nil)
            self.replaceOperations(with: .pollDifference(AccountFinalStateEvents()))
            self.startFirstOperation()
            
            let appliedValues: [(MetaDisposable, Signal<Int32?, NoError>, Bool)] = [
                (self.appliedMaxMessageIdDisposable, self.appliedMaxMessageIdPromise.get(), true),
                (self.appliedQtsDisposable, self.appliedQtsPromise.get(), false)
            ]
            
            for (disposable, value, isMaxMessageId) in appliedValues {
                let network = self.network
                disposable.set((combineLatest(queue: self.queue, self.shouldKeepOnlinePresence, value)
                |> mapToSignal { shouldKeepOnlinePresence, value -> Signal<Int32, NoError> in
                    guard let value = value else {
                        return .complete()
                    }
                    if !shouldKeepOnlinePresence {
                        return .complete()
                    }
                    return .single(value)
                }
                |> distinctUntilChanged
                |> mapToSignal { value -> Signal<Never, NoError> in
                    if isMaxMessageId {
                        return network.request(Api.functions.messages.receivedMessages(maxId: value))
                        |> ignoreValues
                        |> `catch` { _ -> Signal<Never, NoError> in
                            return .complete()
                        }
                    } else {
                        if value == 0 {
                            return .complete()
                        } else {
                            return network.request(Api.functions.messages.receivedQueue(maxQts: value))
                            |> ignoreValues
                            |> `catch` { _ -> Signal<Never, NoError> in
                                return .complete()
                            }
                        }
                    }
                }).start())
            }
        }
    }
    
    func addUpdates(_ updates: Api.Updates) {
        self.queue.async {
            self.updateService?.addUpdates(updates)
        }
    }
    
    func addUpdateGroups(_ groups: [UpdateGroup]) {
        self.queue.async {
            if let last = self.operations.last {
                switch last.content {
                    case .pollDifference, .processUpdateGroups, .custom, .pollCompletion, .processEvents, .replayAsynchronouslyBuiltFinalState:
                        self.addOperation(.collectUpdateGroups(groups, 0.0), position: .last)
                    case let .collectUpdateGroups(currentGroups, timeout):
                        let operation = AccountStateManagerOperation(content: .collectUpdateGroups(currentGroups + groups, timeout))
                        operation.isRunning = last.isRunning
                        self.operations[self.operations.count - 1] = operation
                        self.startFirstOperation()
                }
            } else {
                self.addOperation(.collectUpdateGroups(groups, 0.0), position: .last)
            }
        }
    }
    
    func addReplayAsynchronouslyBuiltFinalState(_ finalState: AccountFinalState) -> Signal<Bool, NoError> {
        return Signal { subscriber in
            self.queue.async {
                self.addOperation(.replayAsynchronouslyBuiltFinalState(finalState, {
                    subscriber.putNext(true)
                    subscriber.putCompletion()
                }), position: .last)
            }
            return EmptyDisposable
        }
    }
    
    func addCustomOperation<T, E>(_ f: Signal<T, E>) -> Signal<T, E> {
        let pipe = ValuePipe<CustomOperationEvent<T, E>>()
        return Signal<T, E> { subscriber in
            let disposable = pipe.signal().start(next: { event in
                switch event {
                    case let .Next(next):
                        subscriber.putNext(next)
                    case let .Error(error):
                        subscriber.putError(error)
                    case .Completion:
                        subscriber.putCompletion()
                }
            })
            
            let signal = Signal<Void, NoError> { subscriber in
                return f.start(next: { next in
                    pipe.putNext(.Next(next))
                }, error: { error in
                    pipe.putNext(.Error(error))
                    subscriber.putCompletion()
                }, completed: {
                    pipe.putNext(.Completion)
                    subscriber.putCompletion()
                })
            }
            
            self.addOperation(.custom(self.getNextId(), signal), position: .last)
            
            return disposable
        } |> runOn(self.queue)
    }
    
    private func replaceOperations(with content: AccountStateManagerOperationContent) {
        var collectedProcessUpdateGroups: [AccountStateManagerOperationContent] = []
        var collectedMessageIds: [MessageId] = []
        var collectedPollCompletionSubscribers: [(Int32, ([MessageId]) -> Void)] = []
        var collectedReplayAsynchronouslyBuiltFinalState: [(AccountFinalState, () -> Void)] = []
        var processEvents: [(Int32, AccountFinalStateEvents)] = []
        var customOperations: [(Int32, Signal<Void, NoError>)] = []
        
        var replacedOperations: [AccountStateManagerOperation] = []
        
        for i in 0 ..< self.operations.count {
            if self.operations[i].isRunning {
                replacedOperations.append(self.operations[i])
            } else {
                switch self.operations[i].content {
                    case .processUpdateGroups:
                        collectedProcessUpdateGroups.append(self.operations[i].content)
                    case let .pollCompletion(_, messageIds, subscribers):
                        collectedMessageIds.append(contentsOf: messageIds)
                        collectedPollCompletionSubscribers.append(contentsOf: subscribers)
                    case let .replayAsynchronouslyBuiltFinalState(finalState, completion):
                        collectedReplayAsynchronouslyBuiltFinalState.append((finalState, completion))
                    case let .processEvents(operationId, events):
                        processEvents.append((operationId, events))
                    case let .custom(operationId, customSignal):
                        customOperations.append((operationId, customSignal))
                    default:
                        break
                }
            }
        }
        
        replacedOperations.append(contentsOf: collectedProcessUpdateGroups.map { AccountStateManagerOperation(content: $0) })
        
        replacedOperations.append(AccountStateManagerOperation(content: content))
        
        if !collectedPollCompletionSubscribers.isEmpty || !collectedMessageIds.isEmpty {
            replacedOperations.append(AccountStateManagerOperation(content: .pollCompletion(self.getNextId(), collectedMessageIds, collectedPollCompletionSubscribers)))
        }
        
        for (finalState, completion) in collectedReplayAsynchronouslyBuiltFinalState {
            replacedOperations.append(AccountStateManagerOperation(content: .replayAsynchronouslyBuiltFinalState(finalState, completion)))
        }
        
        for (operationId, events) in processEvents {
            replacedOperations.append(AccountStateManagerOperation(content: .processEvents(operationId, events)))
        }
        
        for (operationId, customSignal) in customOperations {
            replacedOperations.append(AccountStateManagerOperation(content: .custom(operationId, customSignal)))
        }
        
        self.operations.removeAll()
        self.operations.append(contentsOf: replacedOperations)
    }
    
    private func addOperation(_ content: AccountStateManagerOperationContent, position: AccountStateManagerAddOperationPosition) {
        self.queue.async {
            let operation = AccountStateManagerOperation(content: content)
            switch position {
                case .first:
                    if self.operations.isEmpty || !self.operations[0].isRunning {
                        self.operations.insert(operation, at: 0)
                        self.startFirstOperation()
                    } else {
                        self.operations.insert(operation, at: 1)
                    }
                case .last:
                    let begin = self.operations.isEmpty
                    self.operations.append(operation)
                    if begin {
                        self.startFirstOperation()
                    }
            }
        }
    }
    
    private func startFirstOperation() {
        guard let operation = self.operations.first else {
            return
        }
        guard !operation.isRunning else {
            return
        }
        operation.isRunning = true
        switch operation.content {
            case let .pollDifference(currentEvents):
                self.operationTimer?.invalidate()
                self.currentIsUpdatingValue = true
                let queue = self.queue
                let accountManager = self.accountManager
                let postbox = self.postbox
                let network = self.network
                let mediaBox = postbox.mediaBox
                let accountPeerId = self.accountPeerId
                let auxiliaryMethods = self.auxiliaryMethods
                let signal = postbox.stateView()
                |> mapToSignal { view -> Signal<AuthorizedAccountState, NoError> in
                    if let state = view.state as? AuthorizedAccountState {
                        return .single(state)
                    } else {
                        return .complete()
                    }
                }
                |> take(1)
                |> mapToSignal { [weak self] state -> Signal<(difference: Api.updates.Difference?, finalStatte: AccountReplayedFinalState?, skipBecauseOfError: Bool), NoError> in
                    if let authorizedState = state.state {
                        let flags: Int32 = 0
                        let ptsTotalLimit: Int32? = nil
                        
                        let request = network.request(Api.functions.updates.getDifference(flags: flags, pts: authorizedState.pts, ptsTotalLimit: ptsTotalLimit, date: authorizedState.date, qts: authorizedState.qts))
                        |> map(Optional.init)
                        |> `catch` { error -> Signal<Api.updates.Difference?, MTRpcError> in
                            if error.errorCode == 406 && error.errorDescription == "AUTH_KEY_DUPLICATED" {
                                return .single(nil)
                            } else {
                                return .fail(error)
                            }
                        }
                        |> retryRequest
                        
                        return request
                        |> mapToSignal { difference -> Signal<(difference: Api.updates.Difference?, finalStatte: AccountReplayedFinalState?, skipBecauseOfError: Bool), NoError> in
                            guard let difference = difference else {
                                return .single((nil, nil, true))
                            }
                            switch difference {
                                case .differenceTooLong:
                                    preconditionFailure()
                                default:
                                    return initialStateWithDifference(postbox: postbox, difference: difference)
                                    |> mapToSignal { state -> Signal<(difference: Api.updates.Difference?, finalStatte: AccountReplayedFinalState?, skipBecauseOfError: Bool), NoError> in
                                        if state.initialState.state != authorizedState {
                                            Logger.shared.log("State", "pollDifference initial state \(authorizedState) != current state \(state.initialState.state)")
                                            return .single((nil, nil, false))
                                        } else {
                                            return finalStateWithDifference(postbox: postbox, network: network, state: state, difference: difference)
                                                |> deliverOn(queue)
                                                |> mapToSignal { finalState -> Signal<(difference: Api.updates.Difference?, finalStatte: AccountReplayedFinalState?, skipBecauseOfError: Bool), NoError> in
                                                    if !finalState.state.preCachedResources.isEmpty {
                                                        for (resource, data) in finalState.state.preCachedResources {
                                                            mediaBox.storeResourceData(resource.id, data: data)
                                                        }
                                                    }
                                                    let removePossiblyDeliveredMessagesUniqueIds = self?.removePossiblyDeliveredMessagesUniqueIds ?? Dictionary()
                                                    return postbox.transaction { transaction -> (difference: Api.updates.Difference?, finalStatte: AccountReplayedFinalState?, skipBecauseOfError: Bool) in
                                                        let startTime = CFAbsoluteTimeGetCurrent()
                                                        let replayedState = replayFinalState(accountManager: accountManager, postbox: postbox, accountPeerId: accountPeerId, mediaBox: mediaBox, encryptionProvider: network.encryptionProvider, transaction: transaction, auxiliaryMethods: auxiliaryMethods, finalState: finalState, removePossiblyDeliveredMessagesUniqueIds: removePossiblyDeliveredMessagesUniqueIds, ignoreDate: false)
                                                        let deltaTime = CFAbsoluteTimeGetCurrent() - startTime
                                                        if deltaTime > 1.0 {
                                                            Logger.shared.log("State", "replayFinalState took \(deltaTime)s")
                                                        }
                                                        
                                                        if let replayedState = replayedState {
                                                            return (difference, replayedState, false)
                                                        } else {
                                                            return (nil, nil, false)
                                                        }
                                                    }
                                            }
                                        }
                                    }
                            }
                        }
                    } else {
                        let appliedState = network.request(Api.functions.updates.getState())
                        |> retryRequest
                        |> mapToSignal { state in
                            return postbox.transaction { transaction -> (difference: Api.updates.Difference?, finalStatte: AccountReplayedFinalState?, skipBecauseOfError: Bool) in
                                if let currentState = transaction.getState() as? AuthorizedAccountState {
                                    switch state {
                                        case let .state(pts, qts, date, seq, _):
                                            transaction.setState(currentState.changedState(AuthorizedAccountState.State(pts: pts, qts: qts, date: date, seq: seq)))
                                    }
                                }
                                return (nil, nil, false)
                            }
                        }
                        return appliedState
                    }
                }
                |> deliverOn(self.queue)
                let _ = signal.start(next: { [weak self] difference, finalState, skipBecauseOfError in
                    if let strongSelf = self {
                        if case .pollDifference = strongSelf.operations.removeFirst().content {
                            let events: AccountFinalStateEvents
                            if let finalState = finalState {
                                events = currentEvents.union(with: AccountFinalStateEvents(state: finalState))
                            } else {
                                events = currentEvents
                            }
                            if let difference = difference {
                                switch difference {
                                    case .differenceSlice:
                                        strongSelf.addOperation(.pollDifference(events), position: .first)
                                    default:
                                        if !events.isEmpty {
                                            strongSelf.insertProcessEvents(events)
                                        }
                                        strongSelf.currentIsUpdatingValue = false
                                    strongSelf.significantStateUpdateCompletedPipe.putNext(Void())
                                }
                            } else if skipBecauseOfError {
                                if !events.isEmpty {
                                    strongSelf.insertProcessEvents(events)
                                }
                            } else {
                                if !events.isEmpty {
                                    strongSelf.insertProcessEvents(events)
                                }
                                strongSelf.replaceOperations(with: .pollDifference(AccountFinalStateEvents()))
                            }
                            strongSelf.startFirstOperation()
                        } else {
                            assertionFailure()
                        }
                    }
                })
            case let .collectUpdateGroups(_, timeout):
                self.operationTimer?.invalidate()
                let operationTimer = SignalKitTimer(timeout: timeout, repeat: false, completion: { [weak self] in
                    if let strongSelf = self {
                        let firstOperation = strongSelf.operations.removeFirst()
                        if case let .collectUpdateGroups(groups, _) = firstOperation.content {
                            if timeout.isEqual(to: 0.0) {
                                strongSelf.addOperation(.processUpdateGroups(groups), position: .first)
                            } else {
                                Logger.shared.log("AccountStateManager", "timeout while waiting for updates")
                                strongSelf.replaceOperations(with: .pollDifference(AccountFinalStateEvents()))
                            }
                            strongSelf.startFirstOperation()
                        } else {
                            assertionFailure()
                        }
                    }
                }, queue: self.queue)
                self.operationTimer = operationTimer
                operationTimer.start()
            case let .processUpdateGroups(groups):
                self.operationTimer?.invalidate()
                let accountManager = self.accountManager
                let postbox = self.postbox
                let network = self.network
                let auxiliaryMethods = self.auxiliaryMethods
                let accountPeerId = self.accountPeerId
                let mediaBox = postbox.mediaBox
                let queue = self.queue
                let signal = initialStateWithUpdateGroups(postbox: postbox, groups: groups)
                |> mapToSignal { [weak self] state -> Signal<(AccountReplayedFinalState?, AccountFinalState), NoError> in
                    return finalStateWithUpdateGroups(postbox: postbox, network: network, state: state, groups: groups)
                    |> deliverOn(queue)
                    |> mapToSignal { finalState in
                        if !finalState.discard && !finalState.state.preCachedResources.isEmpty {
                            for (resource, data) in finalState.state.preCachedResources {
                                postbox.mediaBox.storeResourceData(resource.id, data: data)
                            }
                        }
                        
                        let removePossiblyDeliveredMessagesUniqueIds = self?.removePossiblyDeliveredMessagesUniqueIds ?? Dictionary()
                        
                        return postbox.transaction { transaction -> AccountReplayedFinalState? in
                            if finalState.discard {
                                return nil
                            } else {
                                let startTime = CFAbsoluteTimeGetCurrent()
                                let result = replayFinalState(accountManager: accountManager, postbox: postbox, accountPeerId: accountPeerId, mediaBox: mediaBox, encryptionProvider: network.encryptionProvider, transaction: transaction, auxiliaryMethods: auxiliaryMethods, finalState: finalState, removePossiblyDeliveredMessagesUniqueIds: removePossiblyDeliveredMessagesUniqueIds, ignoreDate: false)
                                let deltaTime = CFAbsoluteTimeGetCurrent() - startTime
                                if deltaTime > 1.0 {
                                    Logger.shared.log("State", "replayFinalState took \(deltaTime)s")
                                }
                                return result
                            }
                        }
                        |> map({ ($0, finalState) })
                        |> deliverOn(queue)
                    }
                }
                let _ = signal.start(next: { [weak self] replayedState, finalState in
                    if let strongSelf = self {
                        if case let .processUpdateGroups(groups) = strongSelf.operations.removeFirst().content {
                            if let replayedState = replayedState, !finalState.shouldPoll {
                                let events = AccountFinalStateEvents(state: replayedState)
                                if !events.isEmpty {
                                    strongSelf.insertProcessEvents(events)
                                }
                                if finalState.incomplete || !finalState.missingUpdatesFromChannels.isEmpty {
                                    strongSelf.addOperation(.collectUpdateGroups(groups, 2.0), position: .last)
                                }
                            } else {
                                if let replayedState = replayedState {
                                    let events = AccountFinalStateEvents(state: replayedState)
                                    if !events.displayAlerts.isEmpty {
                                        strongSelf.insertProcessEvents(AccountFinalStateEvents(displayAlerts: events.displayAlerts))
                                    }
                                }
                                strongSelf.replaceOperations(with: .pollDifference(AccountFinalStateEvents()))
                            }
                            strongSelf.startFirstOperation()
                        } else {
                            assertionFailure()
                        }
                    }
                })
            case let .custom(operationId, signal):
                self.operationTimer?.invalidate()
                let completed: () -> Void = { [weak self] in
                    if let strongSelf = self {
                        let topOperation = strongSelf.operations.removeFirst()
                        if case .custom(operationId, _) = topOperation.content {
                            strongSelf.startFirstOperation()
                        } else {
                            assertionFailure()
                        }
                    }
                }
                let _ = (signal |> deliverOn(self.queue)).start(completed: {
                    completed()
                })
            case let .processEvents(operationId, events):
                self.operationTimer?.invalidate()
                let completed: () -> Void = { [weak self] in
                    if let strongSelf = self {
                        let topOperation = strongSelf.operations.removeFirst()
                        if case .processEvents(operationId, _) = topOperation.content {
                            if !events.updatedTypingActivities.isEmpty {
                                strongSelf.peerInputActivityManager?.transaction { manager in
                                    for (chatPeerId, peerActivities) in events.updatedTypingActivities {
                                        for (peerId, activity) in peerActivities {
                                            if let activity = activity {
                                                manager.addActivity(chatPeerId: chatPeerId, peerId: peerId, activity: activity)
                                            } else {
                                                manager.removeAllActivities(chatPeerId: chatPeerId, peerId: peerId)
                                            }
                                        }
                                    }
                                }
                            }
                            if !events.updatedWebpages.isEmpty {
                                strongSelf.notifyUpdatedWebpages(events.updatedWebpages)
                            }
                            if let updatedPeersNearby = events.updatedPeersNearby {
                                strongSelf.notifyUpdatedPeersNearby(updatedPeersNearby)
                            }
                            if !events.updatedCalls.isEmpty {
                                for call in events.updatedCalls {
                                    strongSelf.callSessionManager?.updateSession(call, completion: { _ in })
                                }
                            }
                            if !events.addedCallSignalingData.isEmpty {
                                for (id, data) in events.addedCallSignalingData {
                                    strongSelf.callSessionManager?.addCallSignalingData(id: id, data: data)
                                }
                            }
                            if !events.updatedGroupCallParticipants.isEmpty {
                                strongSelf.groupCallParticipantUpdatesPipe.putNext(events.updatedGroupCallParticipants)
                            }
                            if !events.updatedIncomingThreadReadStates.isEmpty || !events.updatedOutgoingThreadReadStates.isEmpty {
                                strongSelf.threadReadStateUpdatesPipe.putNext((events.updatedIncomingThreadReadStates, events.updatedOutgoingThreadReadStates))
                            }
                            if !events.isContactUpdates.isEmpty {
                                strongSelf.addIsContactUpdates(events.isContactUpdates)
                            }
                            if let updatedMaxMessageId = events.updatedMaxMessageId {
                                strongSelf.appliedMaxMessageIdPromise.set(.single(updatedMaxMessageId))
                            }
                            if let updatedQts = events.updatedQts {
                                strongSelf.appliedQtsPromise.set(.single(updatedQts))
                            }
                            var pollCount = 0
                            for i in 0 ..< strongSelf.operations.count {
                                if case let .pollCompletion(pollId, messageIds, subscribers) = strongSelf.operations[i].content {
                                    pollCount += 1
                                    var updatedMessageIds = messageIds
                                    updatedMessageIds.append(contentsOf: events.addedIncomingMessageIds)
                                    let operation = AccountStateManagerOperation(content: .pollCompletion(pollId, updatedMessageIds, subscribers))
                                    operation.isRunning = strongSelf.operations[i].isRunning
                                    strongSelf.operations[i] = operation
                                }
                            }
                            assert(pollCount <= 1)
                            strongSelf.startFirstOperation()
                        } else {
                            assertionFailure()
                        }
                    }
                }
                
                if events.delayNotificatonsUntil != nil {
                    let _ = self.delayNotificatonsUntil.swap(events.delayNotificatonsUntil)
                }
                
                let signal = self.postbox.transaction { transaction -> [([Message], PeerGroupId, Bool)] in
                    var messageList: [([Message], PeerGroupId, Bool)] = []
                    for id in events.addedIncomingMessageIds {
                        let (messages, notify, _, _) = messagesForNotification(transaction: transaction, id: id, alwaysReturnMessage: false)
                        if !messages.isEmpty {
                            messageList.append((messages, .root, notify))
                        }
                    }
                    var wasScheduledMessages: [Message] = []
                    for id in events.wasScheduledMessageIds {
                        if let message = transaction.getMessage(id) {
                             wasScheduledMessages.append(message)
                        }
                    }
                    if !wasScheduledMessages.isEmpty {
                        messageList.append((wasScheduledMessages, .root, true))
                    }
                    return messageList
                }
                
                let _ = (signal
                |> deliverOn(self.queue)).start(next: { [weak self] messages in
                    if let strongSelf = self {
                        strongSelf.notificationMessagesPipe.putNext(messages)
                    }
                }, completed: {
                    completed()
                })
            
                let timestamp = Int32(Date().timeIntervalSince1970)
                let minReactionTimestamp = timestamp - 20
                let reactionEvents = events.addedReactionEvents.compactMap { event -> (reactionAuthor: Peer, message: Message)? in
                    if event.timestamp >= minReactionTimestamp {
                        return (event.reactionAuthor, event.message)
                    } else {
                        return nil
                    }
                }
                if !reactionEvents.isEmpty {
                    self.reactionNotificationsPipe.putNext(reactionEvents)
                }
            
                if !events.displayAlerts.isEmpty {
                    self.displayAlertsPipe.putNext(events.displayAlerts)
                }
            
                if !events.externallyUpdatedPeerId.isEmpty {
                    self.externallyUpdatedPeerIdsPipe.putNext(Array(events.externallyUpdatedPeerId))
                }
            
                if events.authorizationListUpdated {
                    self.authorizationListUpdatesPipe.putNext(Void())
                }
                
                if !events.deletedMessageIds.isEmpty {
                    self.deletedMessagesPipe.putNext(events.deletedMessageIds)
                }
            case let .pollCompletion(pollId, preMessageIds, preSubscribers):
                if self.operations.count > 1 {
                    self.operations.removeFirst()
                    self.postponePollCompletionOperation(messageIds: preMessageIds, subscribers: preSubscribers)
                    self.startFirstOperation()
                } else {
                    self.operationTimer?.invalidate()
                    let signal = self.network.request(Api.functions.help.test())
                    |> deliverOn(self.queue)
                    let completed: () -> Void = { [weak self] in
                        if let strongSelf = self {
                            let topOperation = strongSelf.operations.removeFirst()
                            if case let .pollCompletion(topPollId, messageIds, subscribers) = topOperation.content {
                                assert(topPollId == pollId)
                                
                                if strongSelf.operations.isEmpty {
                                    for (_, f) in subscribers {
                                        f(messageIds)
                                    }
                                } else {
                                    strongSelf.postponePollCompletionOperation(messageIds: messageIds, subscribers: subscribers)
                                }
                                strongSelf.startFirstOperation()
                            } else {
                                assertionFailure()
                            }
                        }
                    }
                    let _ = (signal |> deliverOn(self.queue)).start(error: { _ in
                        completed()
                    }, completed: {
                        completed()
                    })
                }
            case let .replayAsynchronouslyBuiltFinalState(finalState, completion):
                if !finalState.state.preCachedResources.isEmpty {
                    for (resource, data) in finalState.state.preCachedResources {
                        self.postbox.mediaBox.storeResourceData(resource.id, data: data)
                    }
                }
                
                let accountPeerId = self.accountPeerId
                let accountManager = self.accountManager
                let postbox = self.postbox
                let mediaBox = self.postbox.mediaBox
                let network = self.network
                let auxiliaryMethods = self.auxiliaryMethods
                let removePossiblyDeliveredMessagesUniqueIds = self.removePossiblyDeliveredMessagesUniqueIds
                let signal = self.postbox.transaction { transaction -> AccountReplayedFinalState? in
                    let startTime = CFAbsoluteTimeGetCurrent()
                    let result = replayFinalState(accountManager: accountManager, postbox: postbox, accountPeerId: accountPeerId, mediaBox: mediaBox, encryptionProvider: network.encryptionProvider, transaction: transaction, auxiliaryMethods: auxiliaryMethods, finalState: finalState, removePossiblyDeliveredMessagesUniqueIds: removePossiblyDeliveredMessagesUniqueIds, ignoreDate: false)
                    let deltaTime = CFAbsoluteTimeGetCurrent() - startTime
                    if deltaTime > 1.0 {
                        Logger.shared.log("State", "replayFinalState took \(deltaTime)s")
                    }
                    return result
                }
                |> map({ ($0, finalState) })
                |> deliverOn(self.queue)
                
                let _ = signal.start(next: { [weak self] replayedState, finalState in
                    if let strongSelf = self {
                        if case .replayAsynchronouslyBuiltFinalState = strongSelf.operations.removeFirst().content {
                            if let replayedState = replayedState {
                                let events = AccountFinalStateEvents(state: replayedState)
                                if !events.isEmpty {
                                    strongSelf.insertProcessEvents(events)
                                }
                            }
                            strongSelf.startFirstOperation()
                        } else {
                            assertionFailure()
                        }
                        completion()
                    }
                })
        }
    }

    func standaloneReplayAsynchronouslyBuiltFinalState(finalState: AccountFinalState) -> Signal<Never, NoError> {
        if !finalState.state.preCachedResources.isEmpty {
            for (resource, data) in finalState.state.preCachedResources {
                self.postbox.mediaBox.storeResourceData(resource.id, data: data)
            }
        }

        let accountPeerId = self.accountPeerId
        let accountManager = self.accountManager
        let postbox = self.postbox
        let mediaBox = self.postbox.mediaBox
        let network = self.network
        let auxiliaryMethods = self.auxiliaryMethods
        let removePossiblyDeliveredMessagesUniqueIds = self.removePossiblyDeliveredMessagesUniqueIds
        let signal = self.postbox.transaction { transaction -> AccountReplayedFinalState? in
            let startTime = CFAbsoluteTimeGetCurrent()
            let result = replayFinalState(accountManager: accountManager, postbox: postbox, accountPeerId: accountPeerId, mediaBox: mediaBox, encryptionProvider: network.encryptionProvider, transaction: transaction, auxiliaryMethods: auxiliaryMethods, finalState: finalState, removePossiblyDeliveredMessagesUniqueIds: removePossiblyDeliveredMessagesUniqueIds, ignoreDate: false)
            let deltaTime = CFAbsoluteTimeGetCurrent() - startTime
            if deltaTime > 1.0 {
                Logger.shared.log("State", "replayFinalState took \(deltaTime)s")
            }
            return result
        }
        |> map({ ($0, finalState) })
        |> deliverOn(self.queue)

        return signal
        |> ignoreValues
    }

    public func standalonePollDifference() -> Signal<Bool, NoError> {
        let queue = self.queue
        let accountManager = self.accountManager
        let postbox = self.postbox
        let network = self.network
        let mediaBox = postbox.mediaBox
        let accountPeerId = self.accountPeerId
        let auxiliaryMethods = self.auxiliaryMethods

        let signal = postbox.stateView()
        |> mapToSignal { view -> Signal<AuthorizedAccountState, NoError> in
            if let state = view.state as? AuthorizedAccountState {
                return .single(state)
            } else {
                return .complete()
            }
        }
        |> take(1)
        |> mapToSignal { [weak self] state -> Signal<(difference: Api.updates.Difference?, finalStatte: AccountReplayedFinalState?, skipBecauseOfError: Bool), NoError> in
            if let authorizedState = state.state {
                let flags: Int32 = 0
                let ptsTotalLimit: Int32? = nil

                let request = network.request(Api.functions.updates.getDifference(flags: flags, pts: authorizedState.pts, ptsTotalLimit: ptsTotalLimit, date: Int32.max, qts: authorizedState.qts))
                |> map(Optional.init)
                |> `catch` { error -> Signal<Api.updates.Difference?, MTRpcError> in
                    if error.errorCode == 406 && error.errorDescription == "AUTH_KEY_DUPLICATED" {
                        return .single(nil)
                    } else {
                        return .fail(error)
                    }
                }
                |> retryRequest

                return request
                |> mapToSignal { difference -> Signal<(difference: Api.updates.Difference?, finalStatte: AccountReplayedFinalState?, skipBecauseOfError: Bool), NoError> in
                    guard let difference = difference else {
                        return .single((nil, nil, true))
                    }
                    switch difference {
                    case .differenceTooLong:
                        preconditionFailure()
                    default:
                        return initialStateWithDifference(postbox: postbox, difference: difference)
                        |> mapToSignal { state -> Signal<(difference: Api.updates.Difference?, finalStatte: AccountReplayedFinalState?, skipBecauseOfError: Bool), NoError> in
                            if state.initialState.state != authorizedState {
                                Logger.shared.log("State", "pollDifference initial state \(authorizedState) != current state \(state.initialState.state)")
                                return .single((nil, nil, false))
                            } else {
                                return finalStateWithDifference(postbox: postbox, network: network, state: state, difference: difference)
                                |> deliverOn(queue)
                                |> mapToSignal { finalState -> Signal<(difference: Api.updates.Difference?, finalStatte: AccountReplayedFinalState?, skipBecauseOfError: Bool), NoError> in
                                    if !finalState.state.preCachedResources.isEmpty {
                                        for (resource, data) in finalState.state.preCachedResources {
                                            mediaBox.storeResourceData(resource.id, data: data)
                                        }
                                    }
                                    let removePossiblyDeliveredMessagesUniqueIds = self?.removePossiblyDeliveredMessagesUniqueIds ?? Dictionary()
                                    return postbox.transaction { transaction -> (difference: Api.updates.Difference?, finalStatte: AccountReplayedFinalState?, skipBecauseOfError: Bool) in
                                        let startTime = CFAbsoluteTimeGetCurrent()
                                        let replayedState = replayFinalState(
                                            accountManager: accountManager,
                                            postbox: postbox,
                                            accountPeerId: accountPeerId,
                                            mediaBox: mediaBox,
                                            encryptionProvider: network.encryptionProvider,
                                            transaction: transaction,
                                            auxiliaryMethods: auxiliaryMethods,
                                            finalState: finalState,
                                            removePossiblyDeliveredMessagesUniqueIds: removePossiblyDeliveredMessagesUniqueIds,
                                            ignoreDate: true
                                        )
                                        let deltaTime = CFAbsoluteTimeGetCurrent() - startTime
                                        if deltaTime > 1.0 {
                                            Logger.shared.log("State", "replayFinalState took \(deltaTime)s")
                                        }

                                        if let replayedState = replayedState {
                                            return (difference, replayedState, false)
                                        } else {
                                            return (nil, nil, false)
                                        }
                                    }
                                }
                            }
                        }
                    }
                }
            } else {
                return .single((nil, nil, false))
            }
        }
        |> deliverOn(self.queue)

        return signal
        |> mapToSignal { difference, _, _ -> Signal<Bool, NoError> in
            if let difference = difference {
                switch difference {
                case .differenceSlice:
                    return .single(false)
                default:
                    return .single(true)
                }
            } else {
                return .single(true)
            }
        }
    }
    
    private func insertProcessEvents(_ events: AccountFinalStateEvents) {
        if !events.isEmpty {
            let operation = AccountStateManagerOperation(content: .processEvents(self.getNextId(), events))
            var inserted = false
            for i in 0 ..< self.operations.count {
                if self.operations[i].isRunning {
                    continue
                }
                if case .processEvents = self.operations[i].content {
                    continue
                }
                self.operations.insert(operation, at: i)
                inserted = true
                break
            }
            if !inserted {
                self.operations.append(operation)
            }
        }
    }
    
    private func postponePollCompletionOperation(messageIds: [MessageId], subscribers: [(Int32, ([MessageId]) -> Void)]) {
        self.addOperation(.pollCompletion(self.getNextId(), messageIds, subscribers), position: .last)
        
        for i in 0 ..< self.operations.count {
            if case .pollCompletion = self.operations[i].content {
                if i != self.operations.count - 1 {
                    assertionFailure()
                }
            }
        }
    }
    
    private func addPollCompletion(_ f: @escaping ([MessageId]) -> Void) -> Int32 {
        assert(self.queue.isCurrent())
        
        let updatedId: Int32 = self.getNextId()
        
        for i in 0 ..< self.operations.count {
            if case let .pollCompletion(pollId, messageIds, subscribers) = self.operations[i].content {
                var subscribers = subscribers
                subscribers.append((updatedId, f))
                let operation = AccountStateManagerOperation(content: .pollCompletion(pollId, messageIds, subscribers))
                operation.isRunning = self.operations[i].isRunning
                self.operations[i] = operation
                return updatedId
            }
        }
        
        self.addOperation(.pollCompletion(self.getNextId(), [], [(updatedId, f)]), position: .last)
        
        return updatedId
    }
    
    private func removePollCompletion(_ id: Int32) {
        for i in 0 ..< self.operations.count {
            if case let .pollCompletion(pollId, messages, subscribers) = self.operations[i].content {
                for j in 0 ..< subscribers.count {
                    if subscribers[j].0 == id {
                        var subscribers = subscribers
                        subscribers.remove(at: j)
                        let operation = AccountStateManagerOperation(content: .pollCompletion(pollId, messages, subscribers))
                        operation.isRunning = self.operations[i].isRunning
                        self.operations[i] = operation
                        break
                    }
                }
            }
        }
    }
    
    public func pollStateUpdateCompletion() -> Signal<[MessageId], NoError> {
        return Signal { [weak self] subscriber in
            let disposable = MetaDisposable()
            if let strongSelf = self {
                strongSelf.queue.async {
                    let id = strongSelf.addPollCompletion({ messageIds in
                        subscriber.putNext(messageIds)
                        subscriber.putCompletion()
                    })
                    
                    disposable.set(ActionDisposable {
                        if let strongSelf = self {
                            strongSelf.queue.async {
                                strongSelf.removePollCompletion(id)
                            }
                        }
                    })
                }
            }
            return disposable
        }
    }
    
    public func updatedWebpage(_ webpageId: MediaId) -> Signal<TelegramMediaWebpage, NoError> {
        let queue = self.queue
        return Signal { [weak self] subscriber in
            let disposable = MetaDisposable()
            queue.async {
                if let strongSelf = self {
                    let context: UpdatedWebpageSubscriberContext
                    if let current = strongSelf.updatedWebpageContexts[webpageId] {
                        context = current
                    } else {
                        context = UpdatedWebpageSubscriberContext()
                        strongSelf.updatedWebpageContexts[webpageId] = context
                    }
                    
                    let index = context.subscribers.add({ media in
                        subscriber.putNext(media)
                    })
                    
                    disposable.set(ActionDisposable {
                        if let strongSelf = self {
                            if let context = strongSelf.updatedWebpageContexts[webpageId] {
                                context.subscribers.remove(index)
                                if context.subscribers.isEmpty {
                                    strongSelf.updatedWebpageContexts.removeValue(forKey: webpageId)
                                }
                            }
                        }
                    })
                }
            }
            return disposable
        }
    }
    
    private func notifyUpdatedWebpages(_ updatedWebpages: [MediaId: TelegramMediaWebpage]) {
        for (id, context) in self.updatedWebpageContexts {
            if let media = updatedWebpages[id] {
                for subscriber in context.subscribers.copyItems() {
                    subscriber(media)
                }
            }
        }
    }
    
    func notifyAppliedIncomingReadMessages(_ ids: [MessageId]) {
        self.appliedIncomingReadMessagesPipe.putNext(ids)
    }
    
    public func getDelayNotificatonsUntil() -> Int32? {
        return self.delayNotificatonsUntil.with { $0 }
    }
    
    func modifyTermsOfServiceUpdate(_ f: @escaping (TermsOfServiceUpdate?) -> (TermsOfServiceUpdate?)) {
        self.queue.async {
            let current = self.termsOfServiceUpdateValue.with { $0 }
            let updated = f(current)
            if (current != updated) {
                let _ = self.termsOfServiceUpdateValue.swap(updated)
                self.termsOfServiceUpdatePromise.set(.single(updated))
            }
        }
    }
    
    func modifyAppUpdateInfo(_ f: @escaping (AppUpdateInfo?) -> (AppUpdateInfo?)) {
        self.queue.async {
            let current = self.appUpdateInfoValue.with { $0 }
            let updated = f(current)
            if (current != updated) {
                let _ = self.appUpdateInfoValue.swap(updated)
                self.appUpdateInfoPromise.set(.single(updated))
            }
        }
    }
    
    public func updatedPeersNearby() -> Signal<[PeerNearby], NoError> {
        let queue = self.queue
        return Signal { [weak self] subscriber in
            let disposable = MetaDisposable()
            queue.async {
                if let strongSelf = self {
                    let index = strongSelf.updatedPeersNearbyContext.subscribers.add({ peersNearby in
                        subscriber.putNext(peersNearby)
                    })
                    
                    disposable.set(ActionDisposable {
                        if let strongSelf = self {
                            strongSelf.updatedPeersNearbyContext.subscribers.remove(index)
                        }
                    })
                }
            }
            return disposable
        }
    }
    
    private func notifyUpdatedPeersNearby(_ updatedPeersNearby: [PeerNearby]) {
        for subscriber in self.updatedPeersNearbyContext.subscribers.copyItems() {
            subscriber(updatedPeersNearby)
        }
    }
    
    func notifyDeletedMessages(messageIds: [MessageId]) {
        self.deletedMessagesPipe.putNext(messageIds.map { .messageId($0) })
    }

    public final class IncomingCallUpdate {
        public let callId: Int64
        public let callAccessHash: Int64
        public let timestamp: Int32
        public let peer: EnginePeer

        init(
            callId: Int64,
            callAccessHash: Int64,
            timestamp: Int32,
            peer: EnginePeer
        ) {
            self.callId = callId
            self.callAccessHash = callAccessHash
            self.timestamp = timestamp
            self.peer = peer
        }
    }

    public static func extractIncomingCallUpdate(data: Data) -> IncomingCallUpdate? {
        var rawData = data
        let reader = BufferReader(Buffer(data: data))
        if let signature = reader.readInt32(), signature == 0x3072cfa1 {
            if let compressedData = parseBytes(reader) {
                if let decompressedData = MTGzip.decompress(compressedData.makeData()) {
                    rawData = decompressedData
                }
            }
        }

        guard let updates = Api.parse(Buffer(data: rawData)) as? Api.Updates else {
            return nil
        }
        switch updates {
        case let .updates(updates, users, _, _, _):
            var peers: [Peer] = []
            for user in users {
                peers.append(TelegramUser(user: user))
            }

            for update in updates {
                switch update {
                case let .updatePhoneCall(phoneCall):
                    switch phoneCall {
                    case let .phoneCallRequested(_, id, accessHash, date, adminId, _, _, _):
                        guard let peer = peers.first(where: { $0.id == PeerId(namespace: Namespaces.Peer.CloudUser, id: PeerId.Id._internalFromInt64Value(adminId)) }) else {
                            return nil
                        }
                        return IncomingCallUpdate(
                            callId: id,
                            callAccessHash: accessHash,
                            timestamp: date,
                            peer: EnginePeer(peer)
                        )
                    default:
                        break
                    }
                default:
                    break
                }
            }

            return nil
        default:
            return nil
        }
    }
    
    public func processIncomingCallUpdate(data: Data, completion: @escaping ((CallSessionRingingState, CallSession)?) -> Void) {
        var rawData = data
        let reader = BufferReader(Buffer(data: data))
        if let signature = reader.readInt32(), signature == 0x3072cfa1 {
            if let compressedData = parseBytes(reader) {
                if let decompressedData = MTGzip.decompress(compressedData.makeData()) {
                    rawData = decompressedData
                }
            }
        }
        
        if let updates = Api.parse(Buffer(data: rawData)) as? Api.Updates {
            switch updates {
            case let .updates(updates, _, _, _, _):
                for update in updates {
                    switch update {
                    case let .updatePhoneCall(phoneCall):
                        if let callSessionManager = self.callSessionManager {
                            callSessionManager.updateSession(phoneCall, completion: { result in
                                completion(result)
                            })
                        } else {
                            completion(nil)
                        }
                        return
                    default:
                        break
                    }
                }
            default:
                break
            }
        }
        completion(nil)
    }
    
    func removePossiblyDeliveredMessages(uniqueIds: [Int64: PeerId]) {
        self.queue.async {
            self.removePossiblyDeliveredMessagesUniqueIds.merge(uniqueIds, uniquingKeysWith: { _, rhs in rhs })
        }
    }
}

public func messagesForNotification(transaction: Transaction, id: MessageId, alwaysReturnMessage: Bool) -> (messages: [Message], notify: Bool, sound: PeerMessageSound, displayContents: Bool) {
    guard let message = transaction.getMessage(id) else {
        Logger.shared.log("AccountStateManager", "notification message doesn't exist")
        return ([], false, .bundledModern(id: 0), false)
    }

    var notify = true
    var sound: PeerMessageSound = .bundledModern(id: 0)
    var muted = false
    var displayContents = true
    
    for attribute in message.attributes {
        if let attribute = attribute as? NotificationInfoMessageAttribute {
            if attribute.flags.contains(.muted) {
                muted = true
            }
        }
    }
    for media in message.media {
        if let action = media as? TelegramMediaAction {
            switch action.action {
                case .groupMigratedToChannel, .channelMigratedFromGroup:
                    notify = false
                default:
                    break
            }
        }
    }
    
    let timestamp = Int32(CFAbsoluteTimeGetCurrent() + NSTimeIntervalSince1970)
    
    var notificationPeerId = id.peerId
    let peer = transaction.getPeer(id.peerId)
    if let peer = peer, let associatedPeerId = peer.associatedPeerId {
        notificationPeerId = associatedPeerId
    }
    if message.personal, let author = message.author {
        notificationPeerId = author.id
    }
    
    if let notificationSettings = transaction.getPeerNotificationSettings(notificationPeerId) as? TelegramPeerNotificationSettings {
        var defaultSound: PeerMessageSound = .bundledModern(id: 0)
        var defaultNotify: Bool = true
        if let globalNotificationSettings = transaction.getPreferencesEntry(key: PreferencesKeys.globalNotifications)?.get(GlobalNotificationSettings.self) {
            if id.peerId.namespace == Namespaces.Peer.CloudUser {
                defaultNotify = globalNotificationSettings.effective.privateChats.enabled
                defaultSound = globalNotificationSettings.effective.privateChats.sound
                displayContents = globalNotificationSettings.effective.privateChats.displayPreviews
            } else if id.peerId.namespace == Namespaces.Peer.SecretChat {
                defaultNotify = globalNotificationSettings.effective.privateChats.enabled
                defaultSound = globalNotificationSettings.effective.privateChats.sound
                displayContents = false
            } else if id.peerId.namespace == Namespaces.Peer.CloudChannel, let peer = peer as? TelegramChannel, case .broadcast = peer.info {
                defaultNotify = globalNotificationSettings.effective.channels.enabled
                defaultSound = globalNotificationSettings.effective.channels.sound
                displayContents = globalNotificationSettings.effective.channels.displayPreviews
            } else {
                defaultNotify = globalNotificationSettings.effective.groupChats.enabled
                defaultSound = globalNotificationSettings.effective.groupChats.sound
                displayContents = globalNotificationSettings.effective.groupChats.displayPreviews
            }
        }
        switch notificationSettings.muteState {
            case .default:
                if !defaultNotify {
                    notify = false
                }
            case let .muted(until):
                if until >= timestamp {
                    notify = false
                }
            case .unmuted:
                break
        }
        if case .default = notificationSettings.messageSound {
            sound = defaultSound
        } else {
            sound = notificationSettings.messageSound
        }
    } else {
        Logger.shared.log("AccountStateManager", "notification settings for \(notificationPeerId) are undefined")
    }
    
    if muted {
        sound = .none
    }
    
    if let channel = message.peers[message.id.peerId] as? TelegramChannel {
        switch channel.participationStatus {
            case .kicked, .left:
                return ([], false, sound, false)
            case .member:
                break
        }
    }
    
    var foundReadState = false
    var isUnread = true
    if let readState = transaction.getCombinedPeerReadState(id.peerId) {
        if readState.isIncomingMessageIndexRead(message.index) {
            isUnread = false
        }
        foundReadState = true
    }
    
    if !foundReadState {
        Logger.shared.log("AccountStateManager", "read state for \(id.peerId) is undefined")
    }
    
    var resultMessages: [Message] = [message]
    
    var messageGroup: [Message]?
    if message.forwardInfo != nil && message.sourceReference == nil {
        messageGroup = transaction.getMessageForwardedGroup(message.id)
    } else if message.groupingKey != nil {
        messageGroup = transaction.getMessageGroup(message.id)
    }
    if let messageGroup = messageGroup {
        resultMessages.append(contentsOf: messageGroup.filter({ $0.id != message.id }))
    }
    
    if notify {
        return (resultMessages, isUnread, sound, displayContents)
    } else {
        return (alwaysReturnMessage ? resultMessages : [], false, sound, displayContents)
    }
}