Separate regular and channel state sync

This commit is contained in:
Ali 2021-09-14 18:47:43 +04:00
parent 200d908724
commit d04f8798c6
3 changed files with 280 additions and 27 deletions

View File

@ -35,7 +35,11 @@ private struct NotificationContent {
var title: String?
var subtitle: String?
var body: String?
var threadId: String?
var sound: String?
var badge: Int?
var category: String?
var userInfo: [AnyHashable: Any] = [:]
func asNotificationContent() -> UNNotificationContent {
let content = UNMutableNotificationContent()
@ -44,10 +48,24 @@ private struct NotificationContent {
content.subtitle = self.subtitle ?? ""
content.body = self.body ?? ""
if let threadId = self.threadId {
content.threadIdentifier = threadId
}
if let sound = self.sound {
content.sound = UNNotificationSound(named: UNNotificationSoundName(rawValue: sound))
}
if let badge = self.badge {
content.badge = badge as NSNumber
}
if let category = self.category {
content.categoryIdentifier = category
}
content.userInfo = self.userInfo
return content
}
}
@ -174,11 +192,79 @@ private final class NotificationServiceHandler {
return
}
var peerId: PeerId?
var messageId: MessageId.Id?
if let messageIdString = payloadJson["msg_id"] as? String {
content.userInfo["msg_id"] = messageIdString
messageId = Int32(messageIdString)
}
if let fromIdString = payloadJson["from_id"] as? String {
content.userInfo["from_id"] = fromIdString
if let userIdValue = Int64(fromIdString) {
peerId = PeerId(namespace: Namespaces.Peer.CloudUser, id: PeerId.Id._internalFromInt64Value(userIdValue))
}
} else if let chatIdString = payloadJson["chat_id"] as? String {
content.userInfo["chat_id"] = chatIdString
if let chatIdValue = Int64(chatIdString) {
peerId = PeerId(namespace: Namespaces.Peer.CloudGroup, id: PeerId.Id._internalFromInt64Value(chatIdValue))
}
} else if let channelIdString = payloadJson["channel_id"] as? String {
content.userInfo["channel_id"] = channelIdString
if let channelIdValue = Int64(channelIdString) {
peerId = PeerId(namespace: Namespaces.Peer.CloudChannel, id: PeerId.Id._internalFromInt64Value(channelIdValue))
}
}
if let silentString = payloadJson["silent"] as? String {
if let silentValue = Int(silentString), silentValue != 0 {
if let title = content.title {
content.title = "\(title) 🔕"
}
}
}
if let threadId = aps["thread-id"] as? String {
content.threadId = threadId
}
if let sound = aps["sound"] as? String {
content.sound = sound
}
if let category = aps["category"] as? String {
content.category = category
let _ = messageId
/*if (peerId != 0 && messageId != 0 && parsedAttachment != nil && attachmentData != nil) {
userInfo[@"peerId"] = @(peerId);
userInfo[@"messageId.namespace"] = @(0);
userInfo[@"messageId.id"] = @(messageId);
userInfo[@"media"] = [attachmentData base64EncodedStringWithOptions:0];
if (isExpandableMedia) {
if ([categoryString isEqualToString:@"r"]) {
_bestAttemptContent.categoryIdentifier = @"withReplyMedia";
} else if ([categoryString isEqualToString:@"m"]) {
_bestAttemptContent.categoryIdentifier = @"withMuteMedia";
}
}
}*/
}
/*if (accountInfos.accounts.count > 1) {
if (_bestAttemptContent.title.length != 0 && account.peerName.length != 0) {
_bestAttemptContent.title = [NSString stringWithFormat:@"%@ → %@", _bestAttemptContent.title, account.peerName];
}
}*/
updateCurrentContent(content.asNotificationContent())
if let stateManager = strongSelf.stateManager {
stateManager.network.shouldKeepConnection.set(.single(true))
strongSelf.pollDisposable.set(stateManager.pollStateUpdateCompletion().start(completed: {
if let stateManager = strongSelf.stateManager, let peerId = peerId {
let pollCompletion: () -> Void = {
queue.async {
guard let strongSelf = self, let stateManager = strongSelf.stateManager else {
completed()
@ -197,8 +283,38 @@ private final class NotificationServiceHandler {
completed()
})
}
}))
stateManager.reset()
}
stateManager.network.shouldKeepConnection.set(.single(true))
if peerId.namespace == Namespaces.Peer.CloudChannel {
strongSelf.pollDisposable.set(pollChannelOnce(
postbox: stateManager.postbox,
network: stateManager.network,
peerId: peerId,
stateManager: stateManager,
delayCompletion: false
).start(completed: {
pollCompletion()
}))
} else {
enum ControlError {
case restart
}
let signal = stateManager.standalonePollDifference()
|> castError(ControlError.self)
|> mapToSignal { result -> Signal<Never, ControlError> in
if result {
return .complete()
} else {
return .fail(.restart)
}
}
|> restartIfError
strongSelf.pollDisposable.set(signal.start(completed: {
pollCompletion()
}))
}
} else {
completed()
}

View File

@ -1706,8 +1706,8 @@ private func resolveMissingPeerChatInfos(network: Network, state: AccountMutable
}
}
func keepPollingChannel(postbox: Postbox, network: Network, peerId: PeerId, stateManager: AccountStateManager) -> Signal<Int32, NoError> {
let signal: Signal<Int32, NoError> = postbox.transaction { transaction -> Signal<Int32, NoError> in
public func pollChannelOnce(postbox: Postbox, network: Network, peerId: PeerId, stateManager: AccountStateManager, delayCompletion: Bool) -> Signal<Int32, NoError> {
return postbox.transaction { transaction -> Signal<Int32, NoError> in
guard let accountState = (transaction.getState() as? AuthorizedAccountState)?.state, let peer = transaction.getPeer(peerId) else {
return .complete()
|> delay(30.0, queue: Queue.concurrentDefaultQueue())
@ -1745,15 +1745,25 @@ func keepPollingChannel(postbox: Postbox, network: Network, peerId: PeerId, stat
|> mapToSignal { finalState -> Signal<Int32, NoError> in
return stateManager.addReplayAsynchronouslyBuiltFinalState(finalState)
|> mapToSignal { _ -> Signal<Int32, NoError> in
return .single(timeout ?? 30) |> then(.complete() |> delay(Double(timeout ?? 30), queue: Queue.concurrentDefaultQueue()))
if delayCompletion {
return .single(timeout ?? 30)
|> then(
.complete()
|> delay(Double(timeout ?? 30), queue: Queue.concurrentDefaultQueue())
)
} else {
return .single(timeout ?? 30)
}
}
}
}
}
|> switchToLatest
}
func keepPollingChannel(postbox: Postbox, network: Network, peerId: PeerId, stateManager: AccountStateManager) -> Signal<Int32, NoError> {
return pollChannelOnce(postbox: postbox, network: network, peerId: peerId, stateManager: stateManager, delayCompletion: true)
|> restart
return signal
|> delay(1.0, queue: .concurrentDefaultQueue())
}
@ -2254,7 +2264,18 @@ private func recordPeerActivityTimestamp(peerId: PeerId, timestamp: Int32, into
}
}
func replayFinalState(accountManager: AccountManager<TelegramAccountManagerTypes>, postbox: Postbox, accountPeerId: PeerId, mediaBox: MediaBox, encryptionProvider: EncryptionProvider, transaction: Transaction, auxiliaryMethods: AccountAuxiliaryMethods, finalState: AccountFinalState, removePossiblyDeliveredMessagesUniqueIds: [Int64: PeerId]) -> AccountReplayedFinalState? {
func replayFinalState(
accountManager: AccountManager<TelegramAccountManagerTypes>,
postbox: Postbox,
accountPeerId: PeerId,
mediaBox: MediaBox,
encryptionProvider: EncryptionProvider,
transaction: Transaction,
auxiliaryMethods: AccountAuxiliaryMethods,
finalState: AccountFinalState,
removePossiblyDeliveredMessagesUniqueIds: [Int64: PeerId],
ignoreDate: Bool
) -> AccountReplayedFinalState? {
let verified = verifyTransaction(transaction, finalState: finalState.state)
if !verified {
Logger.shared.log("State", "failed to verify final state")
@ -2780,10 +2801,14 @@ func replayFinalState(accountManager: AccountManager<TelegramAccountManagerTypes
markUnseenPersonalMessage(transaction: transaction, id: id, addSynchronizeAction: false)
}
}
case let .UpdateState(state):
case let .UpdateState(innerState):
let currentState = transaction.getState() as! AuthorizedAccountState
transaction.setState(currentState.changedState(state))
Logger.shared.log("State", "apply state \(state)")
var updatedInnerState = innerState
if ignoreDate, let previousInnerState = currentState.state {
updatedInnerState = AuthorizedAccountState.State(pts: updatedInnerState.pts, qts: updatedInnerState.qts, date: previousInnerState.date, seq: updatedInnerState.seq)
}
transaction.setState(currentState.changedState(updatedInnerState))
Logger.shared.log("State", "apply state \(updatedInnerState)")
case let .UpdateChannelState(peerId, pts):
var state = (transaction.getPeerChatState(peerId) as? ChannelState) ?? ChannelState(pts: pts, invalidatedPts: nil, synchronizedUntilMessageId: nil)
state = state.withUpdatedPts(pts)

View File

@ -166,7 +166,17 @@ public final class AccountStateManager {
private let appliedQtsPromise = Promise<Int32?>(nil)
private let appliedQtsDisposable = MetaDisposable()
init(accountPeerId: PeerId, accountManager: AccountManager<TelegramAccountManagerTypes>, postbox: Postbox, network: Network, callSessionManager: CallSessionManager?, addIsContactUpdates: @escaping ([(PeerId, Bool)]) -> Void, shouldKeepOnlinePresence: Signal<Bool, NoError>, peerInputActivityManager: PeerInputActivityManager?, auxiliaryMethods: AccountAuxiliaryMethods) {
init(
accountPeerId: PeerId,
accountManager: AccountManager<TelegramAccountManagerTypes>,
postbox: Postbox,
network: Network,
callSessionManager: CallSessionManager?,
addIsContactUpdates: @escaping ([(PeerId, Bool)]) -> Void,
shouldKeepOnlinePresence: Signal<Bool, NoError>,
peerInputActivityManager: PeerInputActivityManager?,
auxiliaryMethods: AccountAuxiliaryMethods
) {
self.accountPeerId = accountPeerId
self.accountManager = accountManager
self.postbox = postbox
@ -423,10 +433,7 @@ public final class AccountStateManager {
if let authorizedState = state.state {
let flags: Int32 = 0
let ptsTotalLimit: Int32? = nil
#if DEBUG
//flags = 1 << 0
//ptsTotalLimit = 1000
#endif
let request = network.request(Api.functions.updates.getDifference(flags: flags, pts: authorizedState.pts, ptsTotalLimit: ptsTotalLimit, date: authorizedState.date, qts: authorizedState.qts))
|> map(Optional.init)
|> `catch` { error -> Signal<Api.updates.Difference?, MTRpcError> in
@ -446,10 +453,6 @@ public final class AccountStateManager {
switch difference {
case .differenceTooLong:
preconditionFailure()
/*return accountStateReset(postbox: postbox, network: network, accountPeerId: accountPeerId) |> mapToSignal { _ -> Signal<(Api.updates.Difference?, AccountReplayedFinalState?), NoError> in
return .complete()
}
|> then(.single((nil, nil)))*/
default:
return initialStateWithDifference(postbox: postbox, difference: difference)
|> mapToSignal { state -> Signal<(difference: Api.updates.Difference?, finalStatte: AccountReplayedFinalState?, skipBecauseOfError: Bool), NoError> in
@ -468,7 +471,7 @@ public final class AccountStateManager {
let removePossiblyDeliveredMessagesUniqueIds = self?.removePossiblyDeliveredMessagesUniqueIds ?? Dictionary()
return postbox.transaction { transaction -> (difference: Api.updates.Difference?, finalStatte: AccountReplayedFinalState?, skipBecauseOfError: Bool) in
let startTime = CFAbsoluteTimeGetCurrent()
let replayedState = replayFinalState(accountManager: accountManager, postbox: postbox, accountPeerId: accountPeerId, mediaBox: mediaBox, encryptionProvider: network.encryptionProvider, transaction: transaction, auxiliaryMethods: auxiliaryMethods, finalState: finalState, removePossiblyDeliveredMessagesUniqueIds: removePossiblyDeliveredMessagesUniqueIds)
let replayedState = replayFinalState(accountManager: accountManager, postbox: postbox, accountPeerId: accountPeerId, mediaBox: mediaBox, encryptionProvider: network.encryptionProvider, transaction: transaction, auxiliaryMethods: auxiliaryMethods, finalState: finalState, removePossiblyDeliveredMessagesUniqueIds: removePossiblyDeliveredMessagesUniqueIds, ignoreDate: false)
let deltaTime = CFAbsoluteTimeGetCurrent() - startTime
if deltaTime > 1.0 {
Logger.shared.log("State", "replayFinalState took \(deltaTime)s")
@ -515,7 +518,7 @@ public final class AccountStateManager {
if let difference = difference {
switch difference {
case .differenceSlice:
strongSelf.addOperation(.pollDifference(events), position: .first)
strongSelf.addOperation(.pollDifference(events), position: .first)
default:
if !events.isEmpty {
strongSelf.insertProcessEvents(events)
@ -586,7 +589,7 @@ public final class AccountStateManager {
return nil
} else {
let startTime = CFAbsoluteTimeGetCurrent()
let result = replayFinalState(accountManager: accountManager, postbox: postbox, accountPeerId: accountPeerId, mediaBox: mediaBox, encryptionProvider: network.encryptionProvider, transaction: transaction, auxiliaryMethods: auxiliaryMethods, finalState: finalState, removePossiblyDeliveredMessagesUniqueIds: removePossiblyDeliveredMessagesUniqueIds)
let result = replayFinalState(accountManager: accountManager, postbox: postbox, accountPeerId: accountPeerId, mediaBox: mediaBox, encryptionProvider: network.encryptionProvider, transaction: transaction, auxiliaryMethods: auxiliaryMethods, finalState: finalState, removePossiblyDeliveredMessagesUniqueIds: removePossiblyDeliveredMessagesUniqueIds, ignoreDate: false)
let deltaTime = CFAbsoluteTimeGetCurrent() - startTime
if deltaTime > 1.0 {
Logger.shared.log("State", "replayFinalState took \(deltaTime)s")
@ -806,7 +809,7 @@ public final class AccountStateManager {
let removePossiblyDeliveredMessagesUniqueIds = self.removePossiblyDeliveredMessagesUniqueIds
let signal = self.postbox.transaction { transaction -> AccountReplayedFinalState? in
let startTime = CFAbsoluteTimeGetCurrent()
let result = replayFinalState(accountManager: accountManager, postbox: postbox, accountPeerId: accountPeerId, mediaBox: mediaBox, encryptionProvider: network.encryptionProvider, transaction: transaction, auxiliaryMethods: auxiliaryMethods, finalState: finalState, removePossiblyDeliveredMessagesUniqueIds: removePossiblyDeliveredMessagesUniqueIds)
let result = replayFinalState(accountManager: accountManager, postbox: postbox, accountPeerId: accountPeerId, mediaBox: mediaBox, encryptionProvider: network.encryptionProvider, transaction: transaction, auxiliaryMethods: auxiliaryMethods, finalState: finalState, removePossiblyDeliveredMessagesUniqueIds: removePossiblyDeliveredMessagesUniqueIds, ignoreDate: false)
let deltaTime = CFAbsoluteTimeGetCurrent() - startTime
if deltaTime > 1.0 {
Logger.shared.log("State", "replayFinalState took \(deltaTime)s")
@ -834,6 +837,115 @@ public final class AccountStateManager {
})
}
}
public func standalonePollDifference() -> Signal<Bool, NoError> {
let queue = self.queue
let accountManager = self.accountManager
let postbox = self.postbox
let network = self.network
let mediaBox = postbox.mediaBox
let accountPeerId = self.accountPeerId
let auxiliaryMethods = self.auxiliaryMethods
let signal = postbox.stateView()
|> mapToSignal { view -> Signal<AuthorizedAccountState, NoError> in
if let state = view.state as? AuthorizedAccountState {
return .single(state)
} else {
return .complete()
}
}
|> take(1)
|> mapToSignal { [weak self] state -> Signal<(difference: Api.updates.Difference?, finalStatte: AccountReplayedFinalState?, skipBecauseOfError: Bool), NoError> in
if let authorizedState = state.state {
let flags: Int32 = 0
let ptsTotalLimit: Int32? = nil
let request = network.request(Api.functions.updates.getDifference(flags: flags, pts: authorizedState.pts, ptsTotalLimit: ptsTotalLimit, date: Int32.max, qts: authorizedState.qts))
|> map(Optional.init)
|> `catch` { error -> Signal<Api.updates.Difference?, MTRpcError> in
if error.errorCode == 406 && error.errorDescription == "AUTH_KEY_DUPLICATED" {
return .single(nil)
} else {
return .fail(error)
}
}
|> retryRequest
return request
|> mapToSignal { difference -> Signal<(difference: Api.updates.Difference?, finalStatte: AccountReplayedFinalState?, skipBecauseOfError: Bool), NoError> in
guard let difference = difference else {
return .single((nil, nil, true))
}
switch difference {
case .differenceTooLong:
preconditionFailure()
default:
return initialStateWithDifference(postbox: postbox, difference: difference)
|> mapToSignal { state -> Signal<(difference: Api.updates.Difference?, finalStatte: AccountReplayedFinalState?, skipBecauseOfError: Bool), NoError> in
if state.initialState.state != authorizedState {
Logger.shared.log("State", "pollDifference initial state \(authorizedState) != current state \(state.initialState.state)")
return .single((nil, nil, false))
} else {
return finalStateWithDifference(postbox: postbox, network: network, state: state, difference: difference)
|> deliverOn(queue)
|> mapToSignal { finalState -> Signal<(difference: Api.updates.Difference?, finalStatte: AccountReplayedFinalState?, skipBecauseOfError: Bool), NoError> in
if !finalState.state.preCachedResources.isEmpty {
for (resource, data) in finalState.state.preCachedResources {
mediaBox.storeResourceData(resource.id, data: data)
}
}
let removePossiblyDeliveredMessagesUniqueIds = self?.removePossiblyDeliveredMessagesUniqueIds ?? Dictionary()
return postbox.transaction { transaction -> (difference: Api.updates.Difference?, finalStatte: AccountReplayedFinalState?, skipBecauseOfError: Bool) in
let startTime = CFAbsoluteTimeGetCurrent()
let replayedState = replayFinalState(
accountManager: accountManager,
postbox: postbox,
accountPeerId: accountPeerId,
mediaBox: mediaBox,
encryptionProvider: network.encryptionProvider,
transaction: transaction,
auxiliaryMethods: auxiliaryMethods,
finalState: finalState,
removePossiblyDeliveredMessagesUniqueIds: removePossiblyDeliveredMessagesUniqueIds,
ignoreDate: true
)
let deltaTime = CFAbsoluteTimeGetCurrent() - startTime
if deltaTime > 1.0 {
Logger.shared.log("State", "replayFinalState took \(deltaTime)s")
}
if let replayedState = replayedState {
return (difference, replayedState, false)
} else {
return (nil, nil, false)
}
}
}
}
}
}
}
} else {
return .single((nil, nil, false))
}
}
|> deliverOn(self.queue)
return signal
|> mapToSignal { difference, _, _ -> Signal<Bool, NoError> in
if let difference = difference {
switch difference {
case .differenceSlice:
return .single(false)
default:
return .single(true)
}
} else {
return .single(true)
}
}
}
private func insertProcessEvents(_ events: AccountFinalStateEvents) {
if !events.isEmpty {