mirror of
https://github.com/Swiftgram/Telegram-iOS.git
synced 2025-06-16 05:55:20 +00:00
1430 lines
68 KiB
Swift
1430 lines
68 KiB
Swift
import Foundation
|
|
import Postbox
|
|
import SwiftSignalKit
|
|
import TelegramApi
|
|
import MtProtoKit
|
|
|
|
|
|
private enum AccountStateManagerOperationContent {
|
|
case pollDifference(AccountFinalStateEvents)
|
|
case collectUpdateGroups([UpdateGroup], Double)
|
|
case processUpdateGroups([UpdateGroup])
|
|
case custom(Int32, Signal<Void, NoError>)
|
|
case pollCompletion(Int32, [MessageId], [(Int32, ([MessageId]) -> Void)])
|
|
case processEvents(Int32, AccountFinalStateEvents)
|
|
case replayAsynchronouslyBuiltFinalState(AccountFinalState, () -> Void)
|
|
}
|
|
|
|
private final class AccountStateManagerOperation {
|
|
var isRunning: Bool = false
|
|
let content: AccountStateManagerOperationContent
|
|
|
|
init(content: AccountStateManagerOperationContent) {
|
|
self.content = content
|
|
}
|
|
}
|
|
|
|
private enum AccountStateManagerAddOperationPosition {
|
|
case first
|
|
case last
|
|
}
|
|
|
|
private typealias SignalKitTimer = SwiftSignalKit.Timer
|
|
|
|
private enum CustomOperationEvent<T, E> {
|
|
case Next(T)
|
|
case Error(E)
|
|
case Completion
|
|
}
|
|
|
|
private final class UpdatedWebpageSubscriberContext {
|
|
let subscribers = Bag<(TelegramMediaWebpage) -> Void>()
|
|
}
|
|
|
|
private final class UpdatedPeersNearbySubscriberContext {
|
|
let subscribers = Bag<([PeerNearby]) -> Void>()
|
|
}
|
|
|
|
public enum DeletedMessageId: Hashable {
|
|
case global(Int32)
|
|
case messageId(MessageId)
|
|
}
|
|
|
|
public final class AccountStateManager {
|
|
private let queue = Queue()
|
|
public let accountPeerId: PeerId
|
|
private let accountManager: AccountManager<TelegramAccountManagerTypes>
|
|
public let postbox: Postbox
|
|
public let network: Network
|
|
private let callSessionManager: CallSessionManager?
|
|
private let addIsContactUpdates: ([(PeerId, Bool)]) -> Void
|
|
private let shouldKeepOnlinePresence: Signal<Bool, NoError>
|
|
|
|
private let peerInputActivityManager: PeerInputActivityManager?
|
|
let auxiliaryMethods: AccountAuxiliaryMethods
|
|
var transformOutgoingMessageMedia: TransformOutgoingMessageMedia?
|
|
|
|
private var updateService: UpdateMessageService?
|
|
private let updateServiceDisposable = MetaDisposable()
|
|
|
|
private var operations_: [AccountStateManagerOperation] = []
|
|
private var operations: [AccountStateManagerOperation] {
|
|
get {
|
|
assert(self.queue.isCurrent())
|
|
return self.operations_
|
|
} set(value) {
|
|
assert(self.queue.isCurrent())
|
|
self.operations_ = value
|
|
}
|
|
}
|
|
private let operationDisposable = MetaDisposable()
|
|
private var operationTimer: SignalKitTimer?
|
|
|
|
private var removePossiblyDeliveredMessagesUniqueIds: [Int64: PeerId] = [:]
|
|
|
|
private var nextId: Int32 = 0
|
|
private func getNextId() -> Int32 {
|
|
self.nextId += 1
|
|
return self.nextId
|
|
}
|
|
|
|
private let isUpdatingValue = ValuePromise<Bool>(false)
|
|
private var currentIsUpdatingValue = false {
|
|
didSet {
|
|
if self.currentIsUpdatingValue != oldValue {
|
|
self.isUpdatingValue.set(self.currentIsUpdatingValue)
|
|
}
|
|
}
|
|
}
|
|
public var isUpdating: Signal<Bool, NoError> {
|
|
return self.isUpdatingValue.get()
|
|
}
|
|
|
|
private let notificationMessagesPipe = ValuePipe<[([Message], PeerGroupId, Bool)]>()
|
|
public var notificationMessages: Signal<[([Message], PeerGroupId, Bool)], NoError> {
|
|
return self.notificationMessagesPipe.signal()
|
|
}
|
|
|
|
private let reactionNotificationsPipe = ValuePipe<[(reactionAuthor: Peer, message: Message)]>()
|
|
public var reactionNotifications: Signal<[(reactionAuthor: Peer, message: Message)], NoError> {
|
|
return self.reactionNotificationsPipe.signal()
|
|
}
|
|
|
|
private let displayAlertsPipe = ValuePipe<[(text: String, isDropAuth: Bool)]>()
|
|
public var displayAlerts: Signal<[(text: String, isDropAuth: Bool)], NoError> {
|
|
return self.displayAlertsPipe.signal()
|
|
}
|
|
|
|
private let externallyUpdatedPeerIdsPipe = ValuePipe<[PeerId]>()
|
|
var externallyUpdatedPeerIds: Signal<[PeerId], NoError> {
|
|
return self.externallyUpdatedPeerIdsPipe.signal()
|
|
}
|
|
|
|
private let termsOfServiceUpdateValue = Atomic<TermsOfServiceUpdate?>(value: nil)
|
|
private let termsOfServiceUpdatePromise = Promise<TermsOfServiceUpdate?>(nil)
|
|
public var termsOfServiceUpdate: Signal<TermsOfServiceUpdate?, NoError> {
|
|
return self.termsOfServiceUpdatePromise.get()
|
|
}
|
|
|
|
private let appUpdateInfoValue = Atomic<AppUpdateInfo?>(value: nil)
|
|
private let appUpdateInfoPromise = Promise<AppUpdateInfo?>(nil)
|
|
public var appUpdateInfo: Signal<AppUpdateInfo?, NoError> {
|
|
return self.appUpdateInfoPromise.get()
|
|
}
|
|
|
|
private let appliedIncomingReadMessagesPipe = ValuePipe<[MessageId]>()
|
|
public var appliedIncomingReadMessages: Signal<[MessageId], NoError> {
|
|
return self.appliedIncomingReadMessagesPipe.signal()
|
|
}
|
|
|
|
private let significantStateUpdateCompletedPipe = ValuePipe<Void>()
|
|
var significantStateUpdateCompleted: Signal<Void, NoError> {
|
|
return self.significantStateUpdateCompletedPipe.signal()
|
|
}
|
|
|
|
private let authorizationListUpdatesPipe = ValuePipe<Void>()
|
|
var authorizationListUpdates: Signal<Void, NoError> {
|
|
return self.authorizationListUpdatesPipe.signal()
|
|
}
|
|
|
|
private let threadReadStateUpdatesPipe = ValuePipe<(incoming: [MessageId: MessageId.Id], outgoing: [MessageId: MessageId.Id])>()
|
|
var threadReadStateUpdates: Signal<(incoming: [MessageId: MessageId.Id], outgoing: [MessageId: MessageId.Id]), NoError> {
|
|
return self.threadReadStateUpdatesPipe.signal()
|
|
}
|
|
|
|
private let groupCallParticipantUpdatesPipe = ValuePipe<[(Int64, GroupCallParticipantsContext.Update)]>()
|
|
public var groupCallParticipantUpdates: Signal<[(Int64, GroupCallParticipantsContext.Update)], NoError> {
|
|
return self.groupCallParticipantUpdatesPipe.signal()
|
|
}
|
|
|
|
private let deletedMessagesPipe = ValuePipe<[DeletedMessageId]>()
|
|
public var deletedMessages: Signal<[DeletedMessageId], NoError> {
|
|
return self.deletedMessagesPipe.signal()
|
|
}
|
|
|
|
private var updatedWebpageContexts: [MediaId: UpdatedWebpageSubscriberContext] = [:]
|
|
private var updatedPeersNearbyContext = UpdatedPeersNearbySubscriberContext()
|
|
|
|
private let delayNotificatonsUntil = Atomic<Int32?>(value: nil)
|
|
private let appliedMaxMessageIdPromise = Promise<Int32?>(nil)
|
|
private let appliedMaxMessageIdDisposable = MetaDisposable()
|
|
private let appliedQtsPromise = Promise<Int32?>(nil)
|
|
private let appliedQtsDisposable = MetaDisposable()
|
|
|
|
init(
|
|
accountPeerId: PeerId,
|
|
accountManager: AccountManager<TelegramAccountManagerTypes>,
|
|
postbox: Postbox,
|
|
network: Network,
|
|
callSessionManager: CallSessionManager?,
|
|
addIsContactUpdates: @escaping ([(PeerId, Bool)]) -> Void,
|
|
shouldKeepOnlinePresence: Signal<Bool, NoError>,
|
|
peerInputActivityManager: PeerInputActivityManager?,
|
|
auxiliaryMethods: AccountAuxiliaryMethods
|
|
) {
|
|
self.accountPeerId = accountPeerId
|
|
self.accountManager = accountManager
|
|
self.postbox = postbox
|
|
self.network = network
|
|
self.callSessionManager = callSessionManager
|
|
self.addIsContactUpdates = addIsContactUpdates
|
|
self.shouldKeepOnlinePresence = shouldKeepOnlinePresence
|
|
self.peerInputActivityManager = peerInputActivityManager
|
|
self.auxiliaryMethods = auxiliaryMethods
|
|
}
|
|
|
|
deinit {
|
|
self.updateServiceDisposable.dispose()
|
|
self.operationDisposable.dispose()
|
|
self.appliedMaxMessageIdDisposable.dispose()
|
|
self.appliedQtsDisposable.dispose()
|
|
}
|
|
|
|
public func reset() {
|
|
self.queue.async {
|
|
if self.updateService == nil {
|
|
self.updateService = UpdateMessageService(peerId: self.accountPeerId)
|
|
self.updateServiceDisposable.set(self.updateService!.pipe.signal().start(next: { [weak self] groups in
|
|
if let strongSelf = self {
|
|
strongSelf.addUpdateGroups(groups)
|
|
}
|
|
}))
|
|
self.network.mtProto.add(self.updateService)
|
|
}
|
|
self.operationDisposable.set(nil)
|
|
self.replaceOperations(with: .pollDifference(AccountFinalStateEvents()))
|
|
self.startFirstOperation()
|
|
|
|
let appliedValues: [(MetaDisposable, Signal<Int32?, NoError>, Bool)] = [
|
|
(self.appliedMaxMessageIdDisposable, self.appliedMaxMessageIdPromise.get(), true),
|
|
(self.appliedQtsDisposable, self.appliedQtsPromise.get(), false)
|
|
]
|
|
|
|
for (disposable, value, isMaxMessageId) in appliedValues {
|
|
let network = self.network
|
|
disposable.set((combineLatest(queue: self.queue, self.shouldKeepOnlinePresence, value)
|
|
|> mapToSignal { shouldKeepOnlinePresence, value -> Signal<Int32, NoError> in
|
|
guard let value = value else {
|
|
return .complete()
|
|
}
|
|
if !shouldKeepOnlinePresence {
|
|
return .complete()
|
|
}
|
|
return .single(value)
|
|
}
|
|
|> distinctUntilChanged
|
|
|> mapToSignal { value -> Signal<Never, NoError> in
|
|
if isMaxMessageId {
|
|
return network.request(Api.functions.messages.receivedMessages(maxId: value))
|
|
|> ignoreValues
|
|
|> `catch` { _ -> Signal<Never, NoError> in
|
|
return .complete()
|
|
}
|
|
} else {
|
|
if value == 0 {
|
|
return .complete()
|
|
} else {
|
|
return network.request(Api.functions.messages.receivedQueue(maxQts: value))
|
|
|> ignoreValues
|
|
|> `catch` { _ -> Signal<Never, NoError> in
|
|
return .complete()
|
|
}
|
|
}
|
|
}
|
|
}).start())
|
|
}
|
|
}
|
|
}
|
|
|
|
func addUpdates(_ updates: Api.Updates) {
|
|
self.queue.async {
|
|
self.updateService?.addUpdates(updates)
|
|
}
|
|
}
|
|
|
|
func addUpdateGroups(_ groups: [UpdateGroup]) {
|
|
self.queue.async {
|
|
if let last = self.operations.last {
|
|
switch last.content {
|
|
case .pollDifference, .processUpdateGroups, .custom, .pollCompletion, .processEvents, .replayAsynchronouslyBuiltFinalState:
|
|
self.addOperation(.collectUpdateGroups(groups, 0.0), position: .last)
|
|
case let .collectUpdateGroups(currentGroups, timeout):
|
|
let operation = AccountStateManagerOperation(content: .collectUpdateGroups(currentGroups + groups, timeout))
|
|
operation.isRunning = last.isRunning
|
|
self.operations[self.operations.count - 1] = operation
|
|
self.startFirstOperation()
|
|
}
|
|
} else {
|
|
self.addOperation(.collectUpdateGroups(groups, 0.0), position: .last)
|
|
}
|
|
}
|
|
}
|
|
|
|
func addReplayAsynchronouslyBuiltFinalState(_ finalState: AccountFinalState) -> Signal<Bool, NoError> {
|
|
return Signal { subscriber in
|
|
self.queue.async {
|
|
self.addOperation(.replayAsynchronouslyBuiltFinalState(finalState, {
|
|
subscriber.putNext(true)
|
|
subscriber.putCompletion()
|
|
}), position: .last)
|
|
}
|
|
return EmptyDisposable
|
|
}
|
|
}
|
|
|
|
func addCustomOperation<T, E>(_ f: Signal<T, E>) -> Signal<T, E> {
|
|
let pipe = ValuePipe<CustomOperationEvent<T, E>>()
|
|
return Signal<T, E> { subscriber in
|
|
let disposable = pipe.signal().start(next: { event in
|
|
switch event {
|
|
case let .Next(next):
|
|
subscriber.putNext(next)
|
|
case let .Error(error):
|
|
subscriber.putError(error)
|
|
case .Completion:
|
|
subscriber.putCompletion()
|
|
}
|
|
})
|
|
|
|
let signal = Signal<Void, NoError> { subscriber in
|
|
return f.start(next: { next in
|
|
pipe.putNext(.Next(next))
|
|
}, error: { error in
|
|
pipe.putNext(.Error(error))
|
|
subscriber.putCompletion()
|
|
}, completed: {
|
|
pipe.putNext(.Completion)
|
|
subscriber.putCompletion()
|
|
})
|
|
}
|
|
|
|
self.addOperation(.custom(self.getNextId(), signal), position: .last)
|
|
|
|
return disposable
|
|
} |> runOn(self.queue)
|
|
}
|
|
|
|
private func replaceOperations(with content: AccountStateManagerOperationContent) {
|
|
var collectedProcessUpdateGroups: [AccountStateManagerOperationContent] = []
|
|
var collectedMessageIds: [MessageId] = []
|
|
var collectedPollCompletionSubscribers: [(Int32, ([MessageId]) -> Void)] = []
|
|
var collectedReplayAsynchronouslyBuiltFinalState: [(AccountFinalState, () -> Void)] = []
|
|
var processEvents: [(Int32, AccountFinalStateEvents)] = []
|
|
var customOperations: [(Int32, Signal<Void, NoError>)] = []
|
|
|
|
var replacedOperations: [AccountStateManagerOperation] = []
|
|
|
|
for i in 0 ..< self.operations.count {
|
|
if self.operations[i].isRunning {
|
|
replacedOperations.append(self.operations[i])
|
|
} else {
|
|
switch self.operations[i].content {
|
|
case .processUpdateGroups:
|
|
collectedProcessUpdateGroups.append(self.operations[i].content)
|
|
case let .pollCompletion(_, messageIds, subscribers):
|
|
collectedMessageIds.append(contentsOf: messageIds)
|
|
collectedPollCompletionSubscribers.append(contentsOf: subscribers)
|
|
case let .replayAsynchronouslyBuiltFinalState(finalState, completion):
|
|
collectedReplayAsynchronouslyBuiltFinalState.append((finalState, completion))
|
|
case let .processEvents(operationId, events):
|
|
processEvents.append((operationId, events))
|
|
case let .custom(operationId, customSignal):
|
|
customOperations.append((operationId, customSignal))
|
|
default:
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
replacedOperations.append(contentsOf: collectedProcessUpdateGroups.map { AccountStateManagerOperation(content: $0) })
|
|
|
|
replacedOperations.append(AccountStateManagerOperation(content: content))
|
|
|
|
if !collectedPollCompletionSubscribers.isEmpty || !collectedMessageIds.isEmpty {
|
|
replacedOperations.append(AccountStateManagerOperation(content: .pollCompletion(self.getNextId(), collectedMessageIds, collectedPollCompletionSubscribers)))
|
|
}
|
|
|
|
for (finalState, completion) in collectedReplayAsynchronouslyBuiltFinalState {
|
|
replacedOperations.append(AccountStateManagerOperation(content: .replayAsynchronouslyBuiltFinalState(finalState, completion)))
|
|
}
|
|
|
|
for (operationId, events) in processEvents {
|
|
replacedOperations.append(AccountStateManagerOperation(content: .processEvents(operationId, events)))
|
|
}
|
|
|
|
for (operationId, customSignal) in customOperations {
|
|
replacedOperations.append(AccountStateManagerOperation(content: .custom(operationId, customSignal)))
|
|
}
|
|
|
|
self.operations.removeAll()
|
|
self.operations.append(contentsOf: replacedOperations)
|
|
}
|
|
|
|
private func addOperation(_ content: AccountStateManagerOperationContent, position: AccountStateManagerAddOperationPosition) {
|
|
self.queue.async {
|
|
let operation = AccountStateManagerOperation(content: content)
|
|
switch position {
|
|
case .first:
|
|
if self.operations.isEmpty || !self.operations[0].isRunning {
|
|
self.operations.insert(operation, at: 0)
|
|
self.startFirstOperation()
|
|
} else {
|
|
self.operations.insert(operation, at: 1)
|
|
}
|
|
case .last:
|
|
let begin = self.operations.isEmpty
|
|
self.operations.append(operation)
|
|
if begin {
|
|
self.startFirstOperation()
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
private func startFirstOperation() {
|
|
guard let operation = self.operations.first else {
|
|
return
|
|
}
|
|
guard !operation.isRunning else {
|
|
return
|
|
}
|
|
operation.isRunning = true
|
|
switch operation.content {
|
|
case let .pollDifference(currentEvents):
|
|
self.operationTimer?.invalidate()
|
|
self.currentIsUpdatingValue = true
|
|
let queue = self.queue
|
|
let accountManager = self.accountManager
|
|
let postbox = self.postbox
|
|
let network = self.network
|
|
let mediaBox = postbox.mediaBox
|
|
let accountPeerId = self.accountPeerId
|
|
let auxiliaryMethods = self.auxiliaryMethods
|
|
let signal = postbox.stateView()
|
|
|> mapToSignal { view -> Signal<AuthorizedAccountState, NoError> in
|
|
if let state = view.state as? AuthorizedAccountState {
|
|
return .single(state)
|
|
} else {
|
|
return .complete()
|
|
}
|
|
}
|
|
|> take(1)
|
|
|> mapToSignal { [weak self] state -> Signal<(difference: Api.updates.Difference?, finalStatte: AccountReplayedFinalState?, skipBecauseOfError: Bool), NoError> in
|
|
if let authorizedState = state.state {
|
|
let flags: Int32 = 0
|
|
let ptsTotalLimit: Int32? = nil
|
|
|
|
let request = network.request(Api.functions.updates.getDifference(flags: flags, pts: authorizedState.pts, ptsTotalLimit: ptsTotalLimit, date: authorizedState.date, qts: authorizedState.qts))
|
|
|> map(Optional.init)
|
|
|> `catch` { error -> Signal<Api.updates.Difference?, MTRpcError> in
|
|
if error.errorCode == 406 && error.errorDescription == "AUTH_KEY_DUPLICATED" {
|
|
return .single(nil)
|
|
} else {
|
|
return .fail(error)
|
|
}
|
|
}
|
|
|> retryRequest
|
|
|
|
return request
|
|
|> mapToSignal { difference -> Signal<(difference: Api.updates.Difference?, finalStatte: AccountReplayedFinalState?, skipBecauseOfError: Bool), NoError> in
|
|
guard let difference = difference else {
|
|
return .single((nil, nil, true))
|
|
}
|
|
switch difference {
|
|
case .differenceTooLong:
|
|
preconditionFailure()
|
|
default:
|
|
return initialStateWithDifference(postbox: postbox, difference: difference)
|
|
|> mapToSignal { state -> Signal<(difference: Api.updates.Difference?, finalStatte: AccountReplayedFinalState?, skipBecauseOfError: Bool), NoError> in
|
|
if state.initialState.state != authorizedState {
|
|
Logger.shared.log("State", "pollDifference initial state \(authorizedState) != current state \(state.initialState.state)")
|
|
return .single((nil, nil, false))
|
|
} else {
|
|
return finalStateWithDifference(postbox: postbox, network: network, state: state, difference: difference)
|
|
|> deliverOn(queue)
|
|
|> mapToSignal { finalState -> Signal<(difference: Api.updates.Difference?, finalStatte: AccountReplayedFinalState?, skipBecauseOfError: Bool), NoError> in
|
|
if !finalState.state.preCachedResources.isEmpty {
|
|
for (resource, data) in finalState.state.preCachedResources {
|
|
mediaBox.storeResourceData(resource.id, data: data)
|
|
}
|
|
}
|
|
let removePossiblyDeliveredMessagesUniqueIds = self?.removePossiblyDeliveredMessagesUniqueIds ?? Dictionary()
|
|
return postbox.transaction { transaction -> (difference: Api.updates.Difference?, finalStatte: AccountReplayedFinalState?, skipBecauseOfError: Bool) in
|
|
let startTime = CFAbsoluteTimeGetCurrent()
|
|
let replayedState = replayFinalState(accountManager: accountManager, postbox: postbox, accountPeerId: accountPeerId, mediaBox: mediaBox, encryptionProvider: network.encryptionProvider, transaction: transaction, auxiliaryMethods: auxiliaryMethods, finalState: finalState, removePossiblyDeliveredMessagesUniqueIds: removePossiblyDeliveredMessagesUniqueIds, ignoreDate: false)
|
|
let deltaTime = CFAbsoluteTimeGetCurrent() - startTime
|
|
if deltaTime > 1.0 {
|
|
Logger.shared.log("State", "replayFinalState took \(deltaTime)s")
|
|
}
|
|
|
|
if let replayedState = replayedState {
|
|
return (difference, replayedState, false)
|
|
} else {
|
|
return (nil, nil, false)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
} else {
|
|
let appliedState = network.request(Api.functions.updates.getState())
|
|
|> retryRequest
|
|
|> mapToSignal { state in
|
|
return postbox.transaction { transaction -> (difference: Api.updates.Difference?, finalStatte: AccountReplayedFinalState?, skipBecauseOfError: Bool) in
|
|
if let currentState = transaction.getState() as? AuthorizedAccountState {
|
|
switch state {
|
|
case let .state(pts, qts, date, seq, _):
|
|
transaction.setState(currentState.changedState(AuthorizedAccountState.State(pts: pts, qts: qts, date: date, seq: seq)))
|
|
}
|
|
}
|
|
return (nil, nil, false)
|
|
}
|
|
}
|
|
return appliedState
|
|
}
|
|
}
|
|
|> deliverOn(self.queue)
|
|
let _ = signal.start(next: { [weak self] difference, finalState, skipBecauseOfError in
|
|
if let strongSelf = self {
|
|
if case .pollDifference = strongSelf.operations.removeFirst().content {
|
|
let events: AccountFinalStateEvents
|
|
if let finalState = finalState {
|
|
events = currentEvents.union(with: AccountFinalStateEvents(state: finalState))
|
|
} else {
|
|
events = currentEvents
|
|
}
|
|
if let difference = difference {
|
|
switch difference {
|
|
case .differenceSlice:
|
|
strongSelf.addOperation(.pollDifference(events), position: .first)
|
|
default:
|
|
if !events.isEmpty {
|
|
strongSelf.insertProcessEvents(events)
|
|
}
|
|
strongSelf.currentIsUpdatingValue = false
|
|
strongSelf.significantStateUpdateCompletedPipe.putNext(Void())
|
|
}
|
|
} else if skipBecauseOfError {
|
|
if !events.isEmpty {
|
|
strongSelf.insertProcessEvents(events)
|
|
}
|
|
} else {
|
|
if !events.isEmpty {
|
|
strongSelf.insertProcessEvents(events)
|
|
}
|
|
strongSelf.replaceOperations(with: .pollDifference(AccountFinalStateEvents()))
|
|
}
|
|
strongSelf.startFirstOperation()
|
|
} else {
|
|
assertionFailure()
|
|
}
|
|
}
|
|
})
|
|
case let .collectUpdateGroups(_, timeout):
|
|
self.operationTimer?.invalidate()
|
|
let operationTimer = SignalKitTimer(timeout: timeout, repeat: false, completion: { [weak self] in
|
|
if let strongSelf = self {
|
|
let firstOperation = strongSelf.operations.removeFirst()
|
|
if case let .collectUpdateGroups(groups, _) = firstOperation.content {
|
|
if timeout.isEqual(to: 0.0) {
|
|
strongSelf.addOperation(.processUpdateGroups(groups), position: .first)
|
|
} else {
|
|
Logger.shared.log("AccountStateManager", "timeout while waiting for updates")
|
|
strongSelf.replaceOperations(with: .pollDifference(AccountFinalStateEvents()))
|
|
}
|
|
strongSelf.startFirstOperation()
|
|
} else {
|
|
assertionFailure()
|
|
}
|
|
}
|
|
}, queue: self.queue)
|
|
self.operationTimer = operationTimer
|
|
operationTimer.start()
|
|
case let .processUpdateGroups(groups):
|
|
self.operationTimer?.invalidate()
|
|
let accountManager = self.accountManager
|
|
let postbox = self.postbox
|
|
let network = self.network
|
|
let auxiliaryMethods = self.auxiliaryMethods
|
|
let accountPeerId = self.accountPeerId
|
|
let mediaBox = postbox.mediaBox
|
|
let queue = self.queue
|
|
let signal = initialStateWithUpdateGroups(postbox: postbox, groups: groups)
|
|
|> mapToSignal { [weak self] state -> Signal<(AccountReplayedFinalState?, AccountFinalState), NoError> in
|
|
return finalStateWithUpdateGroups(postbox: postbox, network: network, state: state, groups: groups)
|
|
|> deliverOn(queue)
|
|
|> mapToSignal { finalState in
|
|
if !finalState.discard && !finalState.state.preCachedResources.isEmpty {
|
|
for (resource, data) in finalState.state.preCachedResources {
|
|
postbox.mediaBox.storeResourceData(resource.id, data: data)
|
|
}
|
|
}
|
|
|
|
let removePossiblyDeliveredMessagesUniqueIds = self?.removePossiblyDeliveredMessagesUniqueIds ?? Dictionary()
|
|
|
|
return postbox.transaction { transaction -> AccountReplayedFinalState? in
|
|
if finalState.discard {
|
|
return nil
|
|
} else {
|
|
let startTime = CFAbsoluteTimeGetCurrent()
|
|
let result = replayFinalState(accountManager: accountManager, postbox: postbox, accountPeerId: accountPeerId, mediaBox: mediaBox, encryptionProvider: network.encryptionProvider, transaction: transaction, auxiliaryMethods: auxiliaryMethods, finalState: finalState, removePossiblyDeliveredMessagesUniqueIds: removePossiblyDeliveredMessagesUniqueIds, ignoreDate: false)
|
|
let deltaTime = CFAbsoluteTimeGetCurrent() - startTime
|
|
if deltaTime > 1.0 {
|
|
Logger.shared.log("State", "replayFinalState took \(deltaTime)s")
|
|
}
|
|
return result
|
|
}
|
|
}
|
|
|> map({ ($0, finalState) })
|
|
|> deliverOn(queue)
|
|
}
|
|
}
|
|
let _ = signal.start(next: { [weak self] replayedState, finalState in
|
|
if let strongSelf = self {
|
|
if case let .processUpdateGroups(groups) = strongSelf.operations.removeFirst().content {
|
|
if let replayedState = replayedState, !finalState.shouldPoll {
|
|
let events = AccountFinalStateEvents(state: replayedState)
|
|
if !events.isEmpty {
|
|
strongSelf.insertProcessEvents(events)
|
|
}
|
|
if finalState.incomplete || !finalState.missingUpdatesFromChannels.isEmpty {
|
|
strongSelf.addOperation(.collectUpdateGroups(groups, 2.0), position: .last)
|
|
}
|
|
} else {
|
|
if let replayedState = replayedState {
|
|
let events = AccountFinalStateEvents(state: replayedState)
|
|
if !events.displayAlerts.isEmpty {
|
|
strongSelf.insertProcessEvents(AccountFinalStateEvents(displayAlerts: events.displayAlerts))
|
|
}
|
|
}
|
|
strongSelf.replaceOperations(with: .pollDifference(AccountFinalStateEvents()))
|
|
}
|
|
strongSelf.startFirstOperation()
|
|
} else {
|
|
assertionFailure()
|
|
}
|
|
}
|
|
})
|
|
case let .custom(operationId, signal):
|
|
self.operationTimer?.invalidate()
|
|
let completed: () -> Void = { [weak self] in
|
|
if let strongSelf = self {
|
|
let topOperation = strongSelf.operations.removeFirst()
|
|
if case .custom(operationId, _) = topOperation.content {
|
|
strongSelf.startFirstOperation()
|
|
} else {
|
|
assertionFailure()
|
|
}
|
|
}
|
|
}
|
|
let _ = (signal |> deliverOn(self.queue)).start(completed: {
|
|
completed()
|
|
})
|
|
case let .processEvents(operationId, events):
|
|
self.operationTimer?.invalidate()
|
|
let completed: () -> Void = { [weak self] in
|
|
if let strongSelf = self {
|
|
let topOperation = strongSelf.operations.removeFirst()
|
|
if case .processEvents(operationId, _) = topOperation.content {
|
|
if !events.updatedTypingActivities.isEmpty {
|
|
strongSelf.peerInputActivityManager?.transaction { manager in
|
|
for (chatPeerId, peerActivities) in events.updatedTypingActivities {
|
|
for (peerId, activity) in peerActivities {
|
|
if let activity = activity {
|
|
manager.addActivity(chatPeerId: chatPeerId, peerId: peerId, activity: activity)
|
|
} else {
|
|
manager.removeAllActivities(chatPeerId: chatPeerId, peerId: peerId)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
if !events.updatedWebpages.isEmpty {
|
|
strongSelf.notifyUpdatedWebpages(events.updatedWebpages)
|
|
}
|
|
if let updatedPeersNearby = events.updatedPeersNearby {
|
|
strongSelf.notifyUpdatedPeersNearby(updatedPeersNearby)
|
|
}
|
|
if !events.updatedCalls.isEmpty {
|
|
for call in events.updatedCalls {
|
|
strongSelf.callSessionManager?.updateSession(call, completion: { _ in })
|
|
}
|
|
}
|
|
if !events.addedCallSignalingData.isEmpty {
|
|
for (id, data) in events.addedCallSignalingData {
|
|
strongSelf.callSessionManager?.addCallSignalingData(id: id, data: data)
|
|
}
|
|
}
|
|
if !events.updatedGroupCallParticipants.isEmpty {
|
|
strongSelf.groupCallParticipantUpdatesPipe.putNext(events.updatedGroupCallParticipants)
|
|
}
|
|
if !events.updatedIncomingThreadReadStates.isEmpty || !events.updatedOutgoingThreadReadStates.isEmpty {
|
|
strongSelf.threadReadStateUpdatesPipe.putNext((events.updatedIncomingThreadReadStates, events.updatedOutgoingThreadReadStates))
|
|
}
|
|
if !events.isContactUpdates.isEmpty {
|
|
strongSelf.addIsContactUpdates(events.isContactUpdates)
|
|
}
|
|
if let updatedMaxMessageId = events.updatedMaxMessageId {
|
|
strongSelf.appliedMaxMessageIdPromise.set(.single(updatedMaxMessageId))
|
|
}
|
|
if let updatedQts = events.updatedQts {
|
|
strongSelf.appliedQtsPromise.set(.single(updatedQts))
|
|
}
|
|
var pollCount = 0
|
|
for i in 0 ..< strongSelf.operations.count {
|
|
if case let .pollCompletion(pollId, messageIds, subscribers) = strongSelf.operations[i].content {
|
|
pollCount += 1
|
|
var updatedMessageIds = messageIds
|
|
updatedMessageIds.append(contentsOf: events.addedIncomingMessageIds)
|
|
let operation = AccountStateManagerOperation(content: .pollCompletion(pollId, updatedMessageIds, subscribers))
|
|
operation.isRunning = strongSelf.operations[i].isRunning
|
|
strongSelf.operations[i] = operation
|
|
}
|
|
}
|
|
assert(pollCount <= 1)
|
|
strongSelf.startFirstOperation()
|
|
} else {
|
|
assertionFailure()
|
|
}
|
|
}
|
|
}
|
|
|
|
if events.delayNotificatonsUntil != nil {
|
|
let _ = self.delayNotificatonsUntil.swap(events.delayNotificatonsUntil)
|
|
}
|
|
|
|
let signal = self.postbox.transaction { transaction -> [([Message], PeerGroupId, Bool)] in
|
|
var messageList: [([Message], PeerGroupId, Bool)] = []
|
|
for id in events.addedIncomingMessageIds {
|
|
let (messages, notify, _, _) = messagesForNotification(transaction: transaction, id: id, alwaysReturnMessage: false)
|
|
if !messages.isEmpty {
|
|
messageList.append((messages, .root, notify))
|
|
}
|
|
}
|
|
var wasScheduledMessages: [Message] = []
|
|
for id in events.wasScheduledMessageIds {
|
|
if let message = transaction.getMessage(id) {
|
|
wasScheduledMessages.append(message)
|
|
}
|
|
}
|
|
if !wasScheduledMessages.isEmpty {
|
|
messageList.append((wasScheduledMessages, .root, true))
|
|
}
|
|
return messageList
|
|
}
|
|
|
|
let _ = (signal
|
|
|> deliverOn(self.queue)).start(next: { [weak self] messages in
|
|
if let strongSelf = self {
|
|
strongSelf.notificationMessagesPipe.putNext(messages)
|
|
}
|
|
}, completed: {
|
|
completed()
|
|
})
|
|
|
|
let timestamp = Int32(Date().timeIntervalSince1970)
|
|
let minReactionTimestamp = timestamp - 20
|
|
let reactionEvents = events.addedReactionEvents.compactMap { event -> (reactionAuthor: Peer, message: Message)? in
|
|
if event.timestamp >= minReactionTimestamp {
|
|
return (event.reactionAuthor, event.message)
|
|
} else {
|
|
return nil
|
|
}
|
|
}
|
|
if !reactionEvents.isEmpty {
|
|
self.reactionNotificationsPipe.putNext(reactionEvents)
|
|
}
|
|
|
|
if !events.displayAlerts.isEmpty {
|
|
self.displayAlertsPipe.putNext(events.displayAlerts)
|
|
}
|
|
|
|
if !events.externallyUpdatedPeerId.isEmpty {
|
|
self.externallyUpdatedPeerIdsPipe.putNext(Array(events.externallyUpdatedPeerId))
|
|
}
|
|
|
|
if events.authorizationListUpdated {
|
|
self.authorizationListUpdatesPipe.putNext(Void())
|
|
}
|
|
|
|
if !events.deletedMessageIds.isEmpty {
|
|
self.deletedMessagesPipe.putNext(events.deletedMessageIds)
|
|
}
|
|
case let .pollCompletion(pollId, preMessageIds, preSubscribers):
|
|
if self.operations.count > 1 {
|
|
self.operations.removeFirst()
|
|
self.postponePollCompletionOperation(messageIds: preMessageIds, subscribers: preSubscribers)
|
|
self.startFirstOperation()
|
|
} else {
|
|
self.operationTimer?.invalidate()
|
|
let signal = self.network.request(Api.functions.help.test())
|
|
|> deliverOn(self.queue)
|
|
let completed: () -> Void = { [weak self] in
|
|
if let strongSelf = self {
|
|
let topOperation = strongSelf.operations.removeFirst()
|
|
if case let .pollCompletion(topPollId, messageIds, subscribers) = topOperation.content {
|
|
assert(topPollId == pollId)
|
|
|
|
if strongSelf.operations.isEmpty {
|
|
for (_, f) in subscribers {
|
|
f(messageIds)
|
|
}
|
|
} else {
|
|
strongSelf.postponePollCompletionOperation(messageIds: messageIds, subscribers: subscribers)
|
|
}
|
|
strongSelf.startFirstOperation()
|
|
} else {
|
|
assertionFailure()
|
|
}
|
|
}
|
|
}
|
|
let _ = (signal |> deliverOn(self.queue)).start(error: { _ in
|
|
completed()
|
|
}, completed: {
|
|
completed()
|
|
})
|
|
}
|
|
case let .replayAsynchronouslyBuiltFinalState(finalState, completion):
|
|
if !finalState.state.preCachedResources.isEmpty {
|
|
for (resource, data) in finalState.state.preCachedResources {
|
|
self.postbox.mediaBox.storeResourceData(resource.id, data: data)
|
|
}
|
|
}
|
|
|
|
let accountPeerId = self.accountPeerId
|
|
let accountManager = self.accountManager
|
|
let postbox = self.postbox
|
|
let mediaBox = self.postbox.mediaBox
|
|
let network = self.network
|
|
let auxiliaryMethods = self.auxiliaryMethods
|
|
let removePossiblyDeliveredMessagesUniqueIds = self.removePossiblyDeliveredMessagesUniqueIds
|
|
let signal = self.postbox.transaction { transaction -> AccountReplayedFinalState? in
|
|
let startTime = CFAbsoluteTimeGetCurrent()
|
|
let result = replayFinalState(accountManager: accountManager, postbox: postbox, accountPeerId: accountPeerId, mediaBox: mediaBox, encryptionProvider: network.encryptionProvider, transaction: transaction, auxiliaryMethods: auxiliaryMethods, finalState: finalState, removePossiblyDeliveredMessagesUniqueIds: removePossiblyDeliveredMessagesUniqueIds, ignoreDate: false)
|
|
let deltaTime = CFAbsoluteTimeGetCurrent() - startTime
|
|
if deltaTime > 1.0 {
|
|
Logger.shared.log("State", "replayFinalState took \(deltaTime)s")
|
|
}
|
|
return result
|
|
}
|
|
|> map({ ($0, finalState) })
|
|
|> deliverOn(self.queue)
|
|
|
|
let _ = signal.start(next: { [weak self] replayedState, finalState in
|
|
if let strongSelf = self {
|
|
if case .replayAsynchronouslyBuiltFinalState = strongSelf.operations.removeFirst().content {
|
|
if let replayedState = replayedState {
|
|
let events = AccountFinalStateEvents(state: replayedState)
|
|
if !events.isEmpty {
|
|
strongSelf.insertProcessEvents(events)
|
|
}
|
|
}
|
|
strongSelf.startFirstOperation()
|
|
} else {
|
|
assertionFailure()
|
|
}
|
|
completion()
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
func standaloneReplayAsynchronouslyBuiltFinalState(finalState: AccountFinalState) -> Signal<Never, NoError> {
|
|
if !finalState.state.preCachedResources.isEmpty {
|
|
for (resource, data) in finalState.state.preCachedResources {
|
|
self.postbox.mediaBox.storeResourceData(resource.id, data: data)
|
|
}
|
|
}
|
|
|
|
let accountPeerId = self.accountPeerId
|
|
let accountManager = self.accountManager
|
|
let postbox = self.postbox
|
|
let mediaBox = self.postbox.mediaBox
|
|
let network = self.network
|
|
let auxiliaryMethods = self.auxiliaryMethods
|
|
let removePossiblyDeliveredMessagesUniqueIds = self.removePossiblyDeliveredMessagesUniqueIds
|
|
let signal = self.postbox.transaction { transaction -> AccountReplayedFinalState? in
|
|
let startTime = CFAbsoluteTimeGetCurrent()
|
|
let result = replayFinalState(accountManager: accountManager, postbox: postbox, accountPeerId: accountPeerId, mediaBox: mediaBox, encryptionProvider: network.encryptionProvider, transaction: transaction, auxiliaryMethods: auxiliaryMethods, finalState: finalState, removePossiblyDeliveredMessagesUniqueIds: removePossiblyDeliveredMessagesUniqueIds, ignoreDate: false)
|
|
let deltaTime = CFAbsoluteTimeGetCurrent() - startTime
|
|
if deltaTime > 1.0 {
|
|
Logger.shared.log("State", "replayFinalState took \(deltaTime)s")
|
|
}
|
|
return result
|
|
}
|
|
|> map({ ($0, finalState) })
|
|
|> deliverOn(self.queue)
|
|
|
|
return signal
|
|
|> ignoreValues
|
|
}
|
|
|
|
public func standalonePollDifference() -> Signal<Bool, NoError> {
|
|
let queue = self.queue
|
|
let accountManager = self.accountManager
|
|
let postbox = self.postbox
|
|
let network = self.network
|
|
let mediaBox = postbox.mediaBox
|
|
let accountPeerId = self.accountPeerId
|
|
let auxiliaryMethods = self.auxiliaryMethods
|
|
|
|
let signal = postbox.stateView()
|
|
|> mapToSignal { view -> Signal<AuthorizedAccountState, NoError> in
|
|
if let state = view.state as? AuthorizedAccountState {
|
|
return .single(state)
|
|
} else {
|
|
return .complete()
|
|
}
|
|
}
|
|
|> take(1)
|
|
|> mapToSignal { [weak self] state -> Signal<(difference: Api.updates.Difference?, finalStatte: AccountReplayedFinalState?, skipBecauseOfError: Bool), NoError> in
|
|
if let authorizedState = state.state {
|
|
let flags: Int32 = 0
|
|
let ptsTotalLimit: Int32? = nil
|
|
|
|
let request = network.request(Api.functions.updates.getDifference(flags: flags, pts: authorizedState.pts, ptsTotalLimit: ptsTotalLimit, date: Int32.max, qts: authorizedState.qts))
|
|
|> map(Optional.init)
|
|
|> `catch` { error -> Signal<Api.updates.Difference?, MTRpcError> in
|
|
if error.errorCode == 406 && error.errorDescription == "AUTH_KEY_DUPLICATED" {
|
|
return .single(nil)
|
|
} else {
|
|
return .fail(error)
|
|
}
|
|
}
|
|
|> retryRequest
|
|
|
|
return request
|
|
|> mapToSignal { difference -> Signal<(difference: Api.updates.Difference?, finalStatte: AccountReplayedFinalState?, skipBecauseOfError: Bool), NoError> in
|
|
guard let difference = difference else {
|
|
return .single((nil, nil, true))
|
|
}
|
|
switch difference {
|
|
case .differenceTooLong:
|
|
preconditionFailure()
|
|
default:
|
|
return initialStateWithDifference(postbox: postbox, difference: difference)
|
|
|> mapToSignal { state -> Signal<(difference: Api.updates.Difference?, finalStatte: AccountReplayedFinalState?, skipBecauseOfError: Bool), NoError> in
|
|
if state.initialState.state != authorizedState {
|
|
Logger.shared.log("State", "pollDifference initial state \(authorizedState) != current state \(state.initialState.state)")
|
|
return .single((nil, nil, false))
|
|
} else {
|
|
return finalStateWithDifference(postbox: postbox, network: network, state: state, difference: difference)
|
|
|> deliverOn(queue)
|
|
|> mapToSignal { finalState -> Signal<(difference: Api.updates.Difference?, finalStatte: AccountReplayedFinalState?, skipBecauseOfError: Bool), NoError> in
|
|
if !finalState.state.preCachedResources.isEmpty {
|
|
for (resource, data) in finalState.state.preCachedResources {
|
|
mediaBox.storeResourceData(resource.id, data: data)
|
|
}
|
|
}
|
|
let removePossiblyDeliveredMessagesUniqueIds = self?.removePossiblyDeliveredMessagesUniqueIds ?? Dictionary()
|
|
return postbox.transaction { transaction -> (difference: Api.updates.Difference?, finalStatte: AccountReplayedFinalState?, skipBecauseOfError: Bool) in
|
|
let startTime = CFAbsoluteTimeGetCurrent()
|
|
let replayedState = replayFinalState(
|
|
accountManager: accountManager,
|
|
postbox: postbox,
|
|
accountPeerId: accountPeerId,
|
|
mediaBox: mediaBox,
|
|
encryptionProvider: network.encryptionProvider,
|
|
transaction: transaction,
|
|
auxiliaryMethods: auxiliaryMethods,
|
|
finalState: finalState,
|
|
removePossiblyDeliveredMessagesUniqueIds: removePossiblyDeliveredMessagesUniqueIds,
|
|
ignoreDate: true
|
|
)
|
|
let deltaTime = CFAbsoluteTimeGetCurrent() - startTime
|
|
if deltaTime > 1.0 {
|
|
Logger.shared.log("State", "replayFinalState took \(deltaTime)s")
|
|
}
|
|
|
|
if let replayedState = replayedState {
|
|
return (difference, replayedState, false)
|
|
} else {
|
|
return (nil, nil, false)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
} else {
|
|
return .single((nil, nil, false))
|
|
}
|
|
}
|
|
|> deliverOn(self.queue)
|
|
|
|
return signal
|
|
|> mapToSignal { difference, _, _ -> Signal<Bool, NoError> in
|
|
if let difference = difference {
|
|
switch difference {
|
|
case .differenceSlice:
|
|
return .single(false)
|
|
default:
|
|
return .single(true)
|
|
}
|
|
} else {
|
|
return .single(true)
|
|
}
|
|
}
|
|
}
|
|
|
|
private func insertProcessEvents(_ events: AccountFinalStateEvents) {
|
|
if !events.isEmpty {
|
|
let operation = AccountStateManagerOperation(content: .processEvents(self.getNextId(), events))
|
|
var inserted = false
|
|
for i in 0 ..< self.operations.count {
|
|
if self.operations[i].isRunning {
|
|
continue
|
|
}
|
|
if case .processEvents = self.operations[i].content {
|
|
continue
|
|
}
|
|
self.operations.insert(operation, at: i)
|
|
inserted = true
|
|
break
|
|
}
|
|
if !inserted {
|
|
self.operations.append(operation)
|
|
}
|
|
}
|
|
}
|
|
|
|
private func postponePollCompletionOperation(messageIds: [MessageId], subscribers: [(Int32, ([MessageId]) -> Void)]) {
|
|
self.addOperation(.pollCompletion(self.getNextId(), messageIds, subscribers), position: .last)
|
|
|
|
for i in 0 ..< self.operations.count {
|
|
if case .pollCompletion = self.operations[i].content {
|
|
if i != self.operations.count - 1 {
|
|
assertionFailure()
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
private func addPollCompletion(_ f: @escaping ([MessageId]) -> Void) -> Int32 {
|
|
assert(self.queue.isCurrent())
|
|
|
|
let updatedId: Int32 = self.getNextId()
|
|
|
|
for i in 0 ..< self.operations.count {
|
|
if case let .pollCompletion(pollId, messageIds, subscribers) = self.operations[i].content {
|
|
var subscribers = subscribers
|
|
subscribers.append((updatedId, f))
|
|
let operation = AccountStateManagerOperation(content: .pollCompletion(pollId, messageIds, subscribers))
|
|
operation.isRunning = self.operations[i].isRunning
|
|
self.operations[i] = operation
|
|
return updatedId
|
|
}
|
|
}
|
|
|
|
self.addOperation(.pollCompletion(self.getNextId(), [], [(updatedId, f)]), position: .last)
|
|
|
|
return updatedId
|
|
}
|
|
|
|
private func removePollCompletion(_ id: Int32) {
|
|
for i in 0 ..< self.operations.count {
|
|
if case let .pollCompletion(pollId, messages, subscribers) = self.operations[i].content {
|
|
for j in 0 ..< subscribers.count {
|
|
if subscribers[j].0 == id {
|
|
var subscribers = subscribers
|
|
subscribers.remove(at: j)
|
|
let operation = AccountStateManagerOperation(content: .pollCompletion(pollId, messages, subscribers))
|
|
operation.isRunning = self.operations[i].isRunning
|
|
self.operations[i] = operation
|
|
break
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
public func pollStateUpdateCompletion() -> Signal<[MessageId], NoError> {
|
|
return Signal { [weak self] subscriber in
|
|
let disposable = MetaDisposable()
|
|
if let strongSelf = self {
|
|
strongSelf.queue.async {
|
|
let id = strongSelf.addPollCompletion({ messageIds in
|
|
subscriber.putNext(messageIds)
|
|
subscriber.putCompletion()
|
|
})
|
|
|
|
disposable.set(ActionDisposable {
|
|
if let strongSelf = self {
|
|
strongSelf.queue.async {
|
|
strongSelf.removePollCompletion(id)
|
|
}
|
|
}
|
|
})
|
|
}
|
|
}
|
|
return disposable
|
|
}
|
|
}
|
|
|
|
public func updatedWebpage(_ webpageId: MediaId) -> Signal<TelegramMediaWebpage, NoError> {
|
|
let queue = self.queue
|
|
return Signal { [weak self] subscriber in
|
|
let disposable = MetaDisposable()
|
|
queue.async {
|
|
if let strongSelf = self {
|
|
let context: UpdatedWebpageSubscriberContext
|
|
if let current = strongSelf.updatedWebpageContexts[webpageId] {
|
|
context = current
|
|
} else {
|
|
context = UpdatedWebpageSubscriberContext()
|
|
strongSelf.updatedWebpageContexts[webpageId] = context
|
|
}
|
|
|
|
let index = context.subscribers.add({ media in
|
|
subscriber.putNext(media)
|
|
})
|
|
|
|
disposable.set(ActionDisposable {
|
|
if let strongSelf = self {
|
|
if let context = strongSelf.updatedWebpageContexts[webpageId] {
|
|
context.subscribers.remove(index)
|
|
if context.subscribers.isEmpty {
|
|
strongSelf.updatedWebpageContexts.removeValue(forKey: webpageId)
|
|
}
|
|
}
|
|
}
|
|
})
|
|
}
|
|
}
|
|
return disposable
|
|
}
|
|
}
|
|
|
|
private func notifyUpdatedWebpages(_ updatedWebpages: [MediaId: TelegramMediaWebpage]) {
|
|
for (id, context) in self.updatedWebpageContexts {
|
|
if let media = updatedWebpages[id] {
|
|
for subscriber in context.subscribers.copyItems() {
|
|
subscriber(media)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func notifyAppliedIncomingReadMessages(_ ids: [MessageId]) {
|
|
self.appliedIncomingReadMessagesPipe.putNext(ids)
|
|
}
|
|
|
|
public func getDelayNotificatonsUntil() -> Int32? {
|
|
return self.delayNotificatonsUntil.with { $0 }
|
|
}
|
|
|
|
func modifyTermsOfServiceUpdate(_ f: @escaping (TermsOfServiceUpdate?) -> (TermsOfServiceUpdate?)) {
|
|
self.queue.async {
|
|
let current = self.termsOfServiceUpdateValue.with { $0 }
|
|
let updated = f(current)
|
|
if (current != updated) {
|
|
let _ = self.termsOfServiceUpdateValue.swap(updated)
|
|
self.termsOfServiceUpdatePromise.set(.single(updated))
|
|
}
|
|
}
|
|
}
|
|
|
|
func modifyAppUpdateInfo(_ f: @escaping (AppUpdateInfo?) -> (AppUpdateInfo?)) {
|
|
self.queue.async {
|
|
let current = self.appUpdateInfoValue.with { $0 }
|
|
let updated = f(current)
|
|
if (current != updated) {
|
|
let _ = self.appUpdateInfoValue.swap(updated)
|
|
self.appUpdateInfoPromise.set(.single(updated))
|
|
}
|
|
}
|
|
}
|
|
|
|
public func updatedPeersNearby() -> Signal<[PeerNearby], NoError> {
|
|
let queue = self.queue
|
|
return Signal { [weak self] subscriber in
|
|
let disposable = MetaDisposable()
|
|
queue.async {
|
|
if let strongSelf = self {
|
|
let index = strongSelf.updatedPeersNearbyContext.subscribers.add({ peersNearby in
|
|
subscriber.putNext(peersNearby)
|
|
})
|
|
|
|
disposable.set(ActionDisposable {
|
|
if let strongSelf = self {
|
|
strongSelf.updatedPeersNearbyContext.subscribers.remove(index)
|
|
}
|
|
})
|
|
}
|
|
}
|
|
return disposable
|
|
}
|
|
}
|
|
|
|
private func notifyUpdatedPeersNearby(_ updatedPeersNearby: [PeerNearby]) {
|
|
for subscriber in self.updatedPeersNearbyContext.subscribers.copyItems() {
|
|
subscriber(updatedPeersNearby)
|
|
}
|
|
}
|
|
|
|
func notifyDeletedMessages(messageIds: [MessageId]) {
|
|
self.deletedMessagesPipe.putNext(messageIds.map { .messageId($0) })
|
|
}
|
|
|
|
public final class IncomingCallUpdate {
|
|
public let callId: Int64
|
|
public let callAccessHash: Int64
|
|
public let timestamp: Int32
|
|
public let peer: EnginePeer
|
|
|
|
init(
|
|
callId: Int64,
|
|
callAccessHash: Int64,
|
|
timestamp: Int32,
|
|
peer: EnginePeer
|
|
) {
|
|
self.callId = callId
|
|
self.callAccessHash = callAccessHash
|
|
self.timestamp = timestamp
|
|
self.peer = peer
|
|
}
|
|
}
|
|
|
|
public static func extractIncomingCallUpdate(data: Data) -> IncomingCallUpdate? {
|
|
var rawData = data
|
|
let reader = BufferReader(Buffer(data: data))
|
|
if let signature = reader.readInt32(), signature == 0x3072cfa1 {
|
|
if let compressedData = parseBytes(reader) {
|
|
if let decompressedData = MTGzip.decompress(compressedData.makeData()) {
|
|
rawData = decompressedData
|
|
}
|
|
}
|
|
}
|
|
|
|
guard let updates = Api.parse(Buffer(data: rawData)) as? Api.Updates else {
|
|
return nil
|
|
}
|
|
switch updates {
|
|
case let .updates(updates, users, _, _, _):
|
|
var peers: [Peer] = []
|
|
for user in users {
|
|
peers.append(TelegramUser(user: user))
|
|
}
|
|
|
|
for update in updates {
|
|
switch update {
|
|
case let .updatePhoneCall(phoneCall):
|
|
switch phoneCall {
|
|
case let .phoneCallRequested(_, id, accessHash, date, adminId, _, _, _):
|
|
guard let peer = peers.first(where: { $0.id == PeerId(namespace: Namespaces.Peer.CloudUser, id: PeerId.Id._internalFromInt64Value(adminId)) }) else {
|
|
return nil
|
|
}
|
|
return IncomingCallUpdate(
|
|
callId: id,
|
|
callAccessHash: accessHash,
|
|
timestamp: date,
|
|
peer: EnginePeer(peer)
|
|
)
|
|
default:
|
|
break
|
|
}
|
|
default:
|
|
break
|
|
}
|
|
}
|
|
|
|
return nil
|
|
default:
|
|
return nil
|
|
}
|
|
}
|
|
|
|
public func processIncomingCallUpdate(data: Data, completion: @escaping ((CallSessionRingingState, CallSession)?) -> Void) {
|
|
var rawData = data
|
|
let reader = BufferReader(Buffer(data: data))
|
|
if let signature = reader.readInt32(), signature == 0x3072cfa1 {
|
|
if let compressedData = parseBytes(reader) {
|
|
if let decompressedData = MTGzip.decompress(compressedData.makeData()) {
|
|
rawData = decompressedData
|
|
}
|
|
}
|
|
}
|
|
|
|
if let updates = Api.parse(Buffer(data: rawData)) as? Api.Updates {
|
|
switch updates {
|
|
case let .updates(updates, _, _, _, _):
|
|
for update in updates {
|
|
switch update {
|
|
case let .updatePhoneCall(phoneCall):
|
|
if let callSessionManager = self.callSessionManager {
|
|
callSessionManager.updateSession(phoneCall, completion: { result in
|
|
completion(result)
|
|
})
|
|
} else {
|
|
completion(nil)
|
|
}
|
|
return
|
|
default:
|
|
break
|
|
}
|
|
}
|
|
default:
|
|
break
|
|
}
|
|
}
|
|
completion(nil)
|
|
}
|
|
|
|
func removePossiblyDeliveredMessages(uniqueIds: [Int64: PeerId]) {
|
|
self.queue.async {
|
|
self.removePossiblyDeliveredMessagesUniqueIds.merge(uniqueIds, uniquingKeysWith: { _, rhs in rhs })
|
|
}
|
|
}
|
|
}
|
|
|
|
public func messagesForNotification(transaction: Transaction, id: MessageId, alwaysReturnMessage: Bool) -> (messages: [Message], notify: Bool, sound: PeerMessageSound, displayContents: Bool) {
|
|
guard let message = transaction.getMessage(id) else {
|
|
Logger.shared.log("AccountStateManager", "notification message doesn't exist")
|
|
return ([], false, .bundledModern(id: 0), false)
|
|
}
|
|
|
|
var notify = true
|
|
var sound: PeerMessageSound = .bundledModern(id: 0)
|
|
var muted = false
|
|
var displayContents = true
|
|
|
|
for attribute in message.attributes {
|
|
if let attribute = attribute as? NotificationInfoMessageAttribute {
|
|
if attribute.flags.contains(.muted) {
|
|
muted = true
|
|
}
|
|
}
|
|
}
|
|
for media in message.media {
|
|
if let action = media as? TelegramMediaAction {
|
|
switch action.action {
|
|
case .groupMigratedToChannel, .channelMigratedFromGroup:
|
|
notify = false
|
|
default:
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
let timestamp = Int32(CFAbsoluteTimeGetCurrent() + NSTimeIntervalSince1970)
|
|
|
|
var notificationPeerId = id.peerId
|
|
let peer = transaction.getPeer(id.peerId)
|
|
if let peer = peer, let associatedPeerId = peer.associatedPeerId {
|
|
notificationPeerId = associatedPeerId
|
|
}
|
|
if message.personal, let author = message.author {
|
|
notificationPeerId = author.id
|
|
}
|
|
|
|
if let notificationSettings = transaction.getPeerNotificationSettings(notificationPeerId) as? TelegramPeerNotificationSettings {
|
|
var defaultSound: PeerMessageSound = .bundledModern(id: 0)
|
|
var defaultNotify: Bool = true
|
|
if let globalNotificationSettings = transaction.getPreferencesEntry(key: PreferencesKeys.globalNotifications)?.get(GlobalNotificationSettings.self) {
|
|
if id.peerId.namespace == Namespaces.Peer.CloudUser {
|
|
defaultNotify = globalNotificationSettings.effective.privateChats.enabled
|
|
defaultSound = globalNotificationSettings.effective.privateChats.sound
|
|
displayContents = globalNotificationSettings.effective.privateChats.displayPreviews
|
|
} else if id.peerId.namespace == Namespaces.Peer.SecretChat {
|
|
defaultNotify = globalNotificationSettings.effective.privateChats.enabled
|
|
defaultSound = globalNotificationSettings.effective.privateChats.sound
|
|
displayContents = false
|
|
} else if id.peerId.namespace == Namespaces.Peer.CloudChannel, let peer = peer as? TelegramChannel, case .broadcast = peer.info {
|
|
defaultNotify = globalNotificationSettings.effective.channels.enabled
|
|
defaultSound = globalNotificationSettings.effective.channels.sound
|
|
displayContents = globalNotificationSettings.effective.channels.displayPreviews
|
|
} else {
|
|
defaultNotify = globalNotificationSettings.effective.groupChats.enabled
|
|
defaultSound = globalNotificationSettings.effective.groupChats.sound
|
|
displayContents = globalNotificationSettings.effective.groupChats.displayPreviews
|
|
}
|
|
}
|
|
switch notificationSettings.muteState {
|
|
case .default:
|
|
if !defaultNotify {
|
|
notify = false
|
|
}
|
|
case let .muted(until):
|
|
if until >= timestamp {
|
|
notify = false
|
|
}
|
|
case .unmuted:
|
|
break
|
|
}
|
|
if case .default = notificationSettings.messageSound {
|
|
sound = defaultSound
|
|
} else {
|
|
sound = notificationSettings.messageSound
|
|
}
|
|
} else {
|
|
Logger.shared.log("AccountStateManager", "notification settings for \(notificationPeerId) are undefined")
|
|
}
|
|
|
|
if muted {
|
|
sound = .none
|
|
}
|
|
|
|
if let channel = message.peers[message.id.peerId] as? TelegramChannel {
|
|
switch channel.participationStatus {
|
|
case .kicked, .left:
|
|
return ([], false, sound, false)
|
|
case .member:
|
|
break
|
|
}
|
|
}
|
|
|
|
var foundReadState = false
|
|
var isUnread = true
|
|
if let readState = transaction.getCombinedPeerReadState(id.peerId) {
|
|
if readState.isIncomingMessageIndexRead(message.index) {
|
|
isUnread = false
|
|
}
|
|
foundReadState = true
|
|
}
|
|
|
|
if !foundReadState {
|
|
Logger.shared.log("AccountStateManager", "read state for \(id.peerId) is undefined")
|
|
}
|
|
|
|
var resultMessages: [Message] = [message]
|
|
|
|
var messageGroup: [Message]?
|
|
if message.forwardInfo != nil && message.sourceReference == nil {
|
|
messageGroup = transaction.getMessageForwardedGroup(message.id)
|
|
} else if message.groupingKey != nil {
|
|
messageGroup = transaction.getMessageGroup(message.id)
|
|
}
|
|
if let messageGroup = messageGroup {
|
|
resultMessages.append(contentsOf: messageGroup.filter({ $0.id != message.id }))
|
|
}
|
|
|
|
if notify {
|
|
return (resultMessages, isUnread, sound, displayContents)
|
|
} else {
|
|
return (alwaysReturnMessage ? resultMessages : [], false, sound, displayContents)
|
|
}
|
|
}
|