Merge branch 'experimental-2'

This commit is contained in:
Ali
2021-09-22 02:23:51 +03:00
1718 changed files with 56606 additions and 24940 deletions

View File

@@ -52,15 +52,15 @@ public enum DeletedMessageId: Hashable {
public final class AccountStateManager {
private let queue = Queue()
private let accountPeerId: PeerId
public let accountPeerId: PeerId
private let accountManager: AccountManager<TelegramAccountManagerTypes>
private let postbox: Postbox
private let network: Network
private let callSessionManager: CallSessionManager
public let postbox: Postbox
public let network: Network
private let callSessionManager: CallSessionManager?
private let addIsContactUpdates: ([(PeerId, Bool)]) -> Void
private let shouldKeepOnlinePresence: Signal<Bool, NoError>
private let peerInputActivityManager: PeerInputActivityManager
private let peerInputActivityManager: PeerInputActivityManager?
let auxiliaryMethods: AccountAuxiliaryMethods
var transformOutgoingMessageMedia: TransformOutgoingMessageMedia?
@@ -166,7 +166,17 @@ public final class AccountStateManager {
private let appliedQtsPromise = Promise<Int32?>(nil)
private let appliedQtsDisposable = MetaDisposable()
init(accountPeerId: PeerId, accountManager: AccountManager<TelegramAccountManagerTypes>, postbox: Postbox, network: Network, callSessionManager: CallSessionManager, addIsContactUpdates: @escaping ([(PeerId, Bool)]) -> Void, shouldKeepOnlinePresence: Signal<Bool, NoError>, peerInputActivityManager: PeerInputActivityManager, auxiliaryMethods: AccountAuxiliaryMethods) {
init(
accountPeerId: PeerId,
accountManager: AccountManager<TelegramAccountManagerTypes>,
postbox: Postbox,
network: Network,
callSessionManager: CallSessionManager?,
addIsContactUpdates: @escaping ([(PeerId, Bool)]) -> Void,
shouldKeepOnlinePresence: Signal<Bool, NoError>,
peerInputActivityManager: PeerInputActivityManager?,
auxiliaryMethods: AccountAuxiliaryMethods
) {
self.accountPeerId = accountPeerId
self.accountManager = accountManager
self.postbox = postbox
@@ -185,7 +195,7 @@ public final class AccountStateManager {
self.appliedQtsDisposable.dispose()
}
func reset() {
public func reset() {
self.queue.async {
if self.updateService == nil {
self.updateService = UpdateMessageService(peerId: self.accountPeerId)
@@ -416,12 +426,9 @@ public final class AccountStateManager {
|> take(1)
|> mapToSignal { [weak self] state -> Signal<(difference: Api.updates.Difference?, finalStatte: AccountReplayedFinalState?, skipBecauseOfError: Bool), NoError> in
if let authorizedState = state.state {
var flags: Int32 = 0
var ptsTotalLimit: Int32? = nil
#if DEBUG
//flags = 1 << 0
//ptsTotalLimit = 1000
#endif
let flags: Int32 = 0
let ptsTotalLimit: Int32? = nil
let request = network.request(Api.functions.updates.getDifference(flags: flags, pts: authorizedState.pts, ptsTotalLimit: ptsTotalLimit, date: authorizedState.date, qts: authorizedState.qts))
|> map(Optional.init)
|> `catch` { error -> Signal<Api.updates.Difference?, MTRpcError> in
@@ -441,10 +448,6 @@ public final class AccountStateManager {
switch difference {
case .differenceTooLong:
preconditionFailure()
/*return accountStateReset(postbox: postbox, network: network, accountPeerId: accountPeerId) |> mapToSignal { _ -> Signal<(Api.updates.Difference?, AccountReplayedFinalState?), NoError> in
return .complete()
}
|> then(.single((nil, nil)))*/
default:
return initialStateWithDifference(postbox: postbox, difference: difference)
|> mapToSignal { state -> Signal<(difference: Api.updates.Difference?, finalStatte: AccountReplayedFinalState?, skipBecauseOfError: Bool), NoError> in
@@ -463,7 +466,7 @@ public final class AccountStateManager {
let removePossiblyDeliveredMessagesUniqueIds = self?.removePossiblyDeliveredMessagesUniqueIds ?? Dictionary()
return postbox.transaction { transaction -> (difference: Api.updates.Difference?, finalStatte: AccountReplayedFinalState?, skipBecauseOfError: Bool) in
let startTime = CFAbsoluteTimeGetCurrent()
let replayedState = replayFinalState(accountManager: accountManager, postbox: postbox, accountPeerId: accountPeerId, mediaBox: mediaBox, encryptionProvider: network.encryptionProvider, transaction: transaction, auxiliaryMethods: auxiliaryMethods, finalState: finalState, removePossiblyDeliveredMessagesUniqueIds: removePossiblyDeliveredMessagesUniqueIds)
let replayedState = replayFinalState(accountManager: accountManager, postbox: postbox, accountPeerId: accountPeerId, mediaBox: mediaBox, encryptionProvider: network.encryptionProvider, transaction: transaction, auxiliaryMethods: auxiliaryMethods, finalState: finalState, removePossiblyDeliveredMessagesUniqueIds: removePossiblyDeliveredMessagesUniqueIds, ignoreDate: false)
let deltaTime = CFAbsoluteTimeGetCurrent() - startTime
if deltaTime > 1.0 {
Logger.shared.log("State", "replayFinalState took \(deltaTime)s")
@@ -510,7 +513,7 @@ public final class AccountStateManager {
if let difference = difference {
switch difference {
case .differenceSlice:
strongSelf.addOperation(.pollDifference(events), position: .first)
strongSelf.addOperation(.pollDifference(events), position: .first)
default:
if !events.isEmpty {
strongSelf.insertProcessEvents(events)
@@ -533,9 +536,6 @@ public final class AccountStateManager {
assertionFailure()
}
}
}, error: { _ in
assertionFailure()
Logger.shared.log("AccountStateManager", "processUpdateGroups signal completed with error")
})
case let .collectUpdateGroups(_, timeout):
self.operationTimer?.invalidate()
@@ -584,7 +584,7 @@ public final class AccountStateManager {
return nil
} else {
let startTime = CFAbsoluteTimeGetCurrent()
let result = replayFinalState(accountManager: accountManager, postbox: postbox, accountPeerId: accountPeerId, mediaBox: mediaBox, encryptionProvider: network.encryptionProvider, transaction: transaction, auxiliaryMethods: auxiliaryMethods, finalState: finalState, removePossiblyDeliveredMessagesUniqueIds: removePossiblyDeliveredMessagesUniqueIds)
let result = replayFinalState(accountManager: accountManager, postbox: postbox, accountPeerId: accountPeerId, mediaBox: mediaBox, encryptionProvider: network.encryptionProvider, transaction: transaction, auxiliaryMethods: auxiliaryMethods, finalState: finalState, removePossiblyDeliveredMessagesUniqueIds: removePossiblyDeliveredMessagesUniqueIds, ignoreDate: false)
let deltaTime = CFAbsoluteTimeGetCurrent() - startTime
if deltaTime > 1.0 {
Logger.shared.log("State", "replayFinalState took \(deltaTime)s")
@@ -634,9 +634,7 @@ public final class AccountStateManager {
}
}
}
let _ = (signal |> deliverOn(self.queue)).start(error: { _ in
completed()
}, completed: {
let _ = (signal |> deliverOn(self.queue)).start(completed: {
completed()
})
case let .processEvents(operationId, events):
@@ -646,7 +644,7 @@ public final class AccountStateManager {
let topOperation = strongSelf.operations.removeFirst()
if case .processEvents(operationId, _) = topOperation.content {
if !events.updatedTypingActivities.isEmpty {
strongSelf.peerInputActivityManager.transaction { manager in
strongSelf.peerInputActivityManager?.transaction { manager in
for (chatPeerId, peerActivities) in events.updatedTypingActivities {
for (peerId, activity) in peerActivities {
if let activity = activity {
@@ -666,12 +664,12 @@ public final class AccountStateManager {
}
if !events.updatedCalls.isEmpty {
for call in events.updatedCalls {
strongSelf.callSessionManager.updateSession(call, completion: { _ in })
strongSelf.callSessionManager?.updateSession(call, completion: { _ in })
}
}
if !events.addedCallSignalingData.isEmpty {
for (id, data) in events.addedCallSignalingData {
strongSelf.callSessionManager.addCallSignalingData(id: id, data: data)
strongSelf.callSessionManager?.addCallSignalingData(id: id, data: data)
}
}
if !events.updatedGroupCallParticipants.isEmpty {
@@ -737,8 +735,6 @@ public final class AccountStateManager {
if let strongSelf = self {
strongSelf.notificationMessagesPipe.putNext(messages)
}
}, error: { _ in
completed()
}, completed: {
completed()
})
@@ -808,7 +804,7 @@ public final class AccountStateManager {
let removePossiblyDeliveredMessagesUniqueIds = self.removePossiblyDeliveredMessagesUniqueIds
let signal = self.postbox.transaction { transaction -> AccountReplayedFinalState? in
let startTime = CFAbsoluteTimeGetCurrent()
let result = replayFinalState(accountManager: accountManager, postbox: postbox, accountPeerId: accountPeerId, mediaBox: mediaBox, encryptionProvider: network.encryptionProvider, transaction: transaction, auxiliaryMethods: auxiliaryMethods, finalState: finalState, removePossiblyDeliveredMessagesUniqueIds: removePossiblyDeliveredMessagesUniqueIds)
let result = replayFinalState(accountManager: accountManager, postbox: postbox, accountPeerId: accountPeerId, mediaBox: mediaBox, encryptionProvider: network.encryptionProvider, transaction: transaction, auxiliaryMethods: auxiliaryMethods, finalState: finalState, removePossiblyDeliveredMessagesUniqueIds: removePossiblyDeliveredMessagesUniqueIds, ignoreDate: false)
let deltaTime = CFAbsoluteTimeGetCurrent() - startTime
if deltaTime > 1.0 {
Logger.shared.log("State", "replayFinalState took \(deltaTime)s")
@@ -833,13 +829,148 @@ public final class AccountStateManager {
}
completion()
}
}, error: { _ in
assertionFailure()
Logger.shared.log("AccountStateManager", "processUpdateGroups signal completed with error")
completion()
})
}
}
func standaloneReplayAsynchronouslyBuiltFinalState(finalState: AccountFinalState) -> Signal<Never, NoError> {
if !finalState.state.preCachedResources.isEmpty {
for (resource, data) in finalState.state.preCachedResources {
self.postbox.mediaBox.storeResourceData(resource.id, data: data)
}
}
let accountPeerId = self.accountPeerId
let accountManager = self.accountManager
let postbox = self.postbox
let mediaBox = self.postbox.mediaBox
let network = self.network
let auxiliaryMethods = self.auxiliaryMethods
let removePossiblyDeliveredMessagesUniqueIds = self.removePossiblyDeliveredMessagesUniqueIds
let signal = self.postbox.transaction { transaction -> AccountReplayedFinalState? in
let startTime = CFAbsoluteTimeGetCurrent()
let result = replayFinalState(accountManager: accountManager, postbox: postbox, accountPeerId: accountPeerId, mediaBox: mediaBox, encryptionProvider: network.encryptionProvider, transaction: transaction, auxiliaryMethods: auxiliaryMethods, finalState: finalState, removePossiblyDeliveredMessagesUniqueIds: removePossiblyDeliveredMessagesUniqueIds, ignoreDate: false)
let deltaTime = CFAbsoluteTimeGetCurrent() - startTime
if deltaTime > 1.0 {
Logger.shared.log("State", "replayFinalState took \(deltaTime)s")
}
return result
}
|> map({ ($0, finalState) })
|> deliverOn(self.queue)
return signal
|> ignoreValues
}
public func standalonePollDifference() -> Signal<Bool, NoError> {
let queue = self.queue
let accountManager = self.accountManager
let postbox = self.postbox
let network = self.network
let mediaBox = postbox.mediaBox
let accountPeerId = self.accountPeerId
let auxiliaryMethods = self.auxiliaryMethods
let signal = postbox.stateView()
|> mapToSignal { view -> Signal<AuthorizedAccountState, NoError> in
if let state = view.state as? AuthorizedAccountState {
return .single(state)
} else {
return .complete()
}
}
|> take(1)
|> mapToSignal { [weak self] state -> Signal<(difference: Api.updates.Difference?, finalStatte: AccountReplayedFinalState?, skipBecauseOfError: Bool), NoError> in
if let authorizedState = state.state {
let flags: Int32 = 0
let ptsTotalLimit: Int32? = nil
let request = network.request(Api.functions.updates.getDifference(flags: flags, pts: authorizedState.pts, ptsTotalLimit: ptsTotalLimit, date: Int32.max, qts: authorizedState.qts))
|> map(Optional.init)
|> `catch` { error -> Signal<Api.updates.Difference?, MTRpcError> in
if error.errorCode == 406 && error.errorDescription == "AUTH_KEY_DUPLICATED" {
return .single(nil)
} else {
return .fail(error)
}
}
|> retryRequest
return request
|> mapToSignal { difference -> Signal<(difference: Api.updates.Difference?, finalStatte: AccountReplayedFinalState?, skipBecauseOfError: Bool), NoError> in
guard let difference = difference else {
return .single((nil, nil, true))
}
switch difference {
case .differenceTooLong:
preconditionFailure()
default:
return initialStateWithDifference(postbox: postbox, difference: difference)
|> mapToSignal { state -> Signal<(difference: Api.updates.Difference?, finalStatte: AccountReplayedFinalState?, skipBecauseOfError: Bool), NoError> in
if state.initialState.state != authorizedState {
Logger.shared.log("State", "pollDifference initial state \(authorizedState) != current state \(state.initialState.state)")
return .single((nil, nil, false))
} else {
return finalStateWithDifference(postbox: postbox, network: network, state: state, difference: difference)
|> deliverOn(queue)
|> mapToSignal { finalState -> Signal<(difference: Api.updates.Difference?, finalStatte: AccountReplayedFinalState?, skipBecauseOfError: Bool), NoError> in
if !finalState.state.preCachedResources.isEmpty {
for (resource, data) in finalState.state.preCachedResources {
mediaBox.storeResourceData(resource.id, data: data)
}
}
let removePossiblyDeliveredMessagesUniqueIds = self?.removePossiblyDeliveredMessagesUniqueIds ?? Dictionary()
return postbox.transaction { transaction -> (difference: Api.updates.Difference?, finalStatte: AccountReplayedFinalState?, skipBecauseOfError: Bool) in
let startTime = CFAbsoluteTimeGetCurrent()
let replayedState = replayFinalState(
accountManager: accountManager,
postbox: postbox,
accountPeerId: accountPeerId,
mediaBox: mediaBox,
encryptionProvider: network.encryptionProvider,
transaction: transaction,
auxiliaryMethods: auxiliaryMethods,
finalState: finalState,
removePossiblyDeliveredMessagesUniqueIds: removePossiblyDeliveredMessagesUniqueIds,
ignoreDate: true
)
let deltaTime = CFAbsoluteTimeGetCurrent() - startTime
if deltaTime > 1.0 {
Logger.shared.log("State", "replayFinalState took \(deltaTime)s")
}
if let replayedState = replayedState {
return (difference, replayedState, false)
} else {
return (nil, nil, false)
}
}
}
}
}
}
}
} else {
return .single((nil, nil, false))
}
}
|> deliverOn(self.queue)
return signal
|> mapToSignal { difference, _, _ -> Signal<Bool, NoError> in
if let difference = difference {
switch difference {
case .differenceSlice:
return .single(false)
default:
return .single(true)
}
} else {
return .single(true)
}
}
}
private func insertProcessEvents(_ events: AccountFinalStateEvents) {
if !events.isEmpty {
@@ -1053,13 +1184,17 @@ public final class AccountStateManager {
if let updates = Api.parse(Buffer(data: rawData)) as? Api.Updates {
switch updates {
case let .updates(updates, users, chats, date, seq):
case let .updates(updates, _, _, _, _):
for update in updates {
switch update {
case let .updatePhoneCall(phoneCall):
self.callSessionManager.updateSession(phoneCall, completion: { result in
completion(result)
})
if let callSessionManager = self.callSessionManager {
callSessionManager.updateSession(phoneCall, completion: { result in
completion(result)
})
} else {
completion(nil)
}
return
default:
break
@@ -1122,7 +1257,7 @@ public func messagesForNotification(transaction: Transaction, id: MessageId, alw
if let notificationSettings = transaction.getPeerNotificationSettings(notificationPeerId) as? TelegramPeerNotificationSettings {
var defaultSound: PeerMessageSound = .bundledModern(id: 0)
var defaultNotify: Bool = true
if let globalNotificationSettings = transaction.getPreferencesEntry(key: PreferencesKeys.globalNotifications) as? GlobalNotificationSettings {
if let globalNotificationSettings = transaction.getPreferencesEntry(key: PreferencesKeys.globalNotifications)?.get(GlobalNotificationSettings.self) {
if id.peerId.namespace == Namespaces.Peer.CloudUser {
defaultNotify = globalNotificationSettings.effective.privateChats.enabled
defaultSound = globalNotificationSettings.effective.privateChats.sound