Swiftgram/TelegramCore/AccountStateManagementUtils.swift
2018-10-06 01:18:17 +04:00

2493 lines
128 KiB
Swift

import Foundation
#if os(macOS)
import PostboxMac
import SwiftSignalKitMac
import MtProtoKitMac
#else
import Postbox
import SwiftSignalKit
import MtProtoKitDynamic
#endif
private func peerIdsFromUpdateGroups(_ groups: [UpdateGroup]) -> Set<PeerId> {
var peerIds = Set<PeerId>()
for group in groups {
for update in group.updates {
for peerId in update.peerIds {
peerIds.insert(peerId)
}
}
switch group {
case let .updateChannelPts(channelId, _, _):
peerIds.insert(PeerId(namespace: Namespaces.Peer.CloudChannel, id: channelId))
default:
break
}
}
return peerIds
}
private func activeChannelsFromUpdateGroups(_ groups: [UpdateGroup]) -> Set<PeerId> {
var peerIds = Set<PeerId>()
for group in groups {
for chat in group.chats {
switch chat {
case .channel:
if let channel = parseTelegramGroupOrChannel(chat: chat) as? TelegramChannel {
if channel.participationStatus == .member {
peerIds.insert(channel.id)
}
}
default:
break
}
}
}
return peerIds
}
private func associatedMessageIdsFromUpdateGroups(_ groups: [UpdateGroup]) -> Set<MessageId> {
var messageIds = Set<MessageId>()
for group in groups {
for update in group.updates {
if let associatedMessageIds = update.associatedMessageIds {
for messageId in associatedMessageIds {
messageIds.insert(messageId)
}
}
}
}
return messageIds
}
private func peersWithNewMessagesFromUpdateGroups(_ groups: [UpdateGroup]) -> Set<PeerId> {
var peerIds = Set<PeerId>()
for group in groups {
for update in group.updates {
if let messageId = update.messageId {
peerIds.insert(messageId.peerId)
}
switch update {
case let .updateChannelTooLong(_, channelId, _):
let peerId = PeerId(namespace: Namespaces.Peer.CloudChannel, id: channelId)
peerIds.insert(peerId)
default:
break
}
}
}
return peerIds
}
private func locallyGeneratedMessageTimestampsFromUpdateGroups(_ groups: [UpdateGroup]) -> [PeerId: [(MessageId.Namespace, Int32)]] {
var messageTimestamps: [PeerId: [(MessageId.Namespace, Int32)]] = [:]
for group in groups {
for update in group.updates {
switch update {
case let .updateServiceNotification(_, date, _, _, _, _):
if let date = date {
let peerId = PeerId(namespace: Namespaces.Peer.CloudUser, id: 777000)
if messageTimestamps[peerId] == nil {
messageTimestamps[peerId] = [(Namespaces.Message.Local, date)]
} else {
messageTimestamps[peerId]!.append((Namespaces.Message.Local, date))
}
}
default:
break
}
}
}
return messageTimestamps
}
private func peerIdsFromDifference(_ difference: Api.updates.Difference) -> Set<PeerId> {
var peerIds = Set<PeerId>()
switch difference {
case let .difference(newMessages, _, otherUpdates, _, _, _):
for message in newMessages {
for peerId in apiMessagePeerIds(message) {
peerIds.insert(peerId)
}
}
for update in otherUpdates {
for peerId in update.peerIds {
peerIds.insert(peerId)
}
}
case .differenceEmpty:
break
case let .differenceSlice(newMessages, _, otherUpdates, _, _, _):
for message in newMessages {
for peerId in apiMessagePeerIds(message) {
peerIds.insert(peerId)
}
}
for update in otherUpdates {
for peerId in update.peerIds {
peerIds.insert(peerId)
}
}
case .differenceTooLong:
assertionFailure()
break
}
return peerIds
}
private func activeChannelsFromDifference(_ difference: Api.updates.Difference) -> Set<PeerId> {
var peerIds = Set<PeerId>()
var chats: [Api.Chat] = []
switch difference {
case let .difference(difference):
chats = difference.chats
case .differenceEmpty:
break
case let .differenceSlice(differenceSlice):
chats = differenceSlice.chats
case .differenceTooLong:
break
}
for chat in chats {
switch chat {
case .channel:
if let channel = parseTelegramGroupOrChannel(chat: chat) as? TelegramChannel {
if channel.participationStatus == .member {
peerIds.insert(channel.id)
}
}
default:
break
}
}
return peerIds
}
private func associatedMessageIdsFromDifference(_ difference: Api.updates.Difference) -> Set<MessageId> {
var messageIds = Set<MessageId>()
switch difference {
case let .difference(newMessages, _, otherUpdates, _, _, _):
for message in newMessages {
if let associatedMessageIds = apiMessageAssociatedMessageIds(message) {
for messageId in associatedMessageIds {
messageIds.insert(messageId)
}
}
}
for update in otherUpdates {
if let associatedMessageIds = update.associatedMessageIds {
for messageId in associatedMessageIds {
messageIds.insert(messageId)
}
}
}
case .differenceEmpty:
break
case let .differenceSlice(newMessages, _, otherUpdates, _, _, _):
for message in newMessages {
if let associatedMessageIds = apiMessageAssociatedMessageIds(message) {
for messageId in associatedMessageIds {
messageIds.insert(messageId)
}
}
}
for update in otherUpdates {
if let associatedMessageIds = update.associatedMessageIds {
for messageId in associatedMessageIds {
messageIds.insert(messageId)
}
}
}
case .differenceTooLong:
break
}
return messageIds
}
private func peersWithNewMessagesFromDifference(_ difference: Api.updates.Difference) -> Set<PeerId> {
var peerIds = Set<PeerId>()
switch difference {
case let .difference(newMessages, _, otherUpdates, _, _, _):
for message in newMessages {
if let messageId = message.id {
peerIds.insert(messageId.peerId)
}
}
for update in otherUpdates {
if let messageId = update.messageId {
peerIds.insert(messageId.peerId)
}
switch update {
case let .updateChannelTooLong(_, channelId, _):
let peerId = PeerId(namespace: Namespaces.Peer.CloudChannel, id: channelId)
peerIds.insert(peerId)
default:
break
}
}
case .differenceEmpty:
break
case let .differenceSlice(newMessages, _, otherUpdates, _, _, _):
for message in newMessages {
if let messageId = message.id {
peerIds.insert(messageId.peerId)
}
}
for update in otherUpdates {
if let messageId = update.messageId {
peerIds.insert(messageId.peerId)
}
switch update {
case let .updateChannelTooLong(_, channelId, _):
let peerId = PeerId(namespace: Namespaces.Peer.CloudChannel, id: channelId)
peerIds.insert(peerId)
default:
break
}
}
case .differenceTooLong:
break
}
return peerIds
}
private func locallyGeneratedMessageTimestampsFromDifference(_ difference: Api.updates.Difference) -> [PeerId: [(MessageId.Namespace, Int32)]] {
var messageTimestamps: [PeerId: [(MessageId.Namespace, Int32)]] = [:]
var otherUpdates: [Api.Update]?
switch difference {
case let .difference(_, _, apiOtherUpdates, _, _, _):
otherUpdates = apiOtherUpdates
case .differenceEmpty:
break
case let .differenceSlice(_, _, apiOtherUpdates, _, _, _):
otherUpdates = apiOtherUpdates
case .differenceTooLong:
break
}
if let otherUpdates = otherUpdates {
for update in otherUpdates {
switch update {
case let .updateServiceNotification(_, date, _, _, _, _):
if let date = date {
let peerId = PeerId(namespace: Namespaces.Peer.CloudUser, id: 777000)
if messageTimestamps[peerId] == nil {
messageTimestamps[peerId] = [(Namespaces.Message.Local, date)]
} else {
messageTimestamps[peerId]!.append((Namespaces.Message.Local, date))
}
}
default:
break
}
}
}
return messageTimestamps
}
private func initialStateWithPeerIds(_ transaction: Transaction, peerIds: Set<PeerId>, activeChannelIds: Set<PeerId>, associatedMessageIds: Set<MessageId>, peerIdsWithNewMessages: Set<PeerId>, locallyGeneratedMessageTimestamps: [PeerId: [(MessageId.Namespace, Int32)]]) -> AccountMutableState {
var peers: [PeerId: Peer] = [:]
var chatStates: [PeerId: PeerChatState] = [:]
var channelsToPollExplicitely = Set<PeerId>()
for peerId in peerIds {
if let peer = transaction.getPeer(peerId) {
peers[peerId] = peer
}
if peerId.namespace == Namespaces.Peer.CloudChannel {
if let channelState = transaction.getPeerChatState(peerId) as? ChannelState {
chatStates[peerId] = channelState
}
} else if peerId.namespace == Namespaces.Peer.CloudUser || peerId.namespace == Namespaces.Peer.CloudGroup {
if let chatState = transaction.getPeerChatState(peerId) as? RegularChatState {
chatStates[peerId] = chatState
}
}
}
for peerId in activeChannelIds {
if transaction.getTopPeerMessageIndex(peerId: peerId, namespace: Namespaces.Message.Cloud) == nil {
channelsToPollExplicitely.insert(peerId)
} else if let channel = transaction.getPeer(peerId) as? TelegramChannel, channel.participationStatus != .member {
channelsToPollExplicitely.insert(peerId)
}
}
let storedMessages = transaction.filterStoredMessageIds(associatedMessageIds)
var storedMessagesByPeerIdAndTimestamp: [PeerId: Set<MessageIndex>] = [:]
if !locallyGeneratedMessageTimestamps.isEmpty {
for (peerId, namespacesAndTimestamps) in locallyGeneratedMessageTimestamps {
for (namespace, timestamp) in namespacesAndTimestamps {
if let messageId = transaction.storedMessageId(peerId: peerId, namespace: namespace, timestamp: timestamp) {
if storedMessagesByPeerIdAndTimestamp[peerId] == nil {
storedMessagesByPeerIdAndTimestamp[peerId] = Set([MessageIndex(id: messageId, timestamp: timestamp)])
} else {
storedMessagesByPeerIdAndTimestamp[peerId]!.insert(MessageIndex(id: messageId, timestamp: timestamp))
}
}
}
}
}
var peerNotificationSettings: [PeerId: PeerNotificationSettings] = [:]
var readInboxMaxIds: [PeerId: MessageId] = [:]
var cloudReadStates: [PeerId: PeerReadState] = [:]
for peerId in peerIdsWithNewMessages {
if let notificationSettings = transaction.getPeerNotificationSettings(peerId) {
peerNotificationSettings[peerId] = notificationSettings
}
if let readStates = transaction.getPeerReadStates(peerId) {
for (namespace, state) in readStates {
if namespace == Namespaces.Message.Cloud {
cloudReadStates[peerId] = state
switch state {
case let .idBased(maxIncomingReadId, _, _, _, _):
readInboxMaxIds[peerId] = MessageId(peerId: peerId, namespace: Namespaces.Message.Cloud, id: maxIncomingReadId)
case .indexBased:
break
}
break
}
}
}
}
return AccountMutableState(initialState: AccountInitialState(state: (transaction.getState() as? AuthorizedAccountState)!.state!, peerIds: peerIds, peerIdsWithNewMessages: peerIdsWithNewMessages, chatStates: chatStates, peerNotificationSettings: peerNotificationSettings, locallyGeneratedMessageTimestamps: locallyGeneratedMessageTimestamps, cloudReadStates: cloudReadStates, channelsToPollExplicitely: channelsToPollExplicitely), initialPeers: peers, initialReferencedMessageIds: associatedMessageIds, initialStoredMessages: storedMessages, initialReadInboxMaxIds: readInboxMaxIds, storedMessagesByPeerIdAndTimestamp: storedMessagesByPeerIdAndTimestamp)
}
func initialStateWithUpdateGroups(_ account: Account, groups: [UpdateGroup]) -> Signal<AccountMutableState, NoError> {
return account.postbox.transaction { transaction -> AccountMutableState in
let peerIds = peerIdsFromUpdateGroups(groups)
let activeChannelIds = activeChannelsFromUpdateGroups(groups)
let associatedMessageIds = associatedMessageIdsFromUpdateGroups(groups)
let peerIdsWithNewMessages = peersWithNewMessagesFromUpdateGroups(groups)
return initialStateWithPeerIds(transaction, peerIds: peerIds, activeChannelIds: activeChannelIds, associatedMessageIds: associatedMessageIds, peerIdsWithNewMessages: peerIdsWithNewMessages, locallyGeneratedMessageTimestamps: locallyGeneratedMessageTimestampsFromUpdateGroups(groups))
}
}
func initialStateWithDifference(_ account: Account, difference: Api.updates.Difference) -> Signal<AccountMutableState, NoError> {
return account.postbox.transaction { transaction -> AccountMutableState in
let peerIds = peerIdsFromDifference(difference)
let activeChannelIds = activeChannelsFromDifference(difference)
let associatedMessageIds = associatedMessageIdsFromDifference(difference)
let peerIdsWithNewMessages = peersWithNewMessagesFromDifference(difference)
return initialStateWithPeerIds(transaction, peerIds: peerIds, activeChannelIds: activeChannelIds, associatedMessageIds: associatedMessageIds, peerIdsWithNewMessages: peerIdsWithNewMessages, locallyGeneratedMessageTimestamps: locallyGeneratedMessageTimestampsFromDifference(difference))
}
}
func finalStateWithUpdateGroups(_ account: Account, state: AccountMutableState, groups: [UpdateGroup]) -> Signal<AccountFinalState, NoError> {
var updatedState = state
var hadReset = false
var ptsUpdatesAfterHole: [PtsUpdate] = []
var qtsUpdatesAfterHole: [QtsUpdate] = []
var seqGroupsAfterHole: [SeqUpdates] = []
for case .reset in groups {
hadReset = true
break
}
var currentPtsUpdates = ptsUpdates(groups)
currentPtsUpdates.sort(by: { $0.ptsRange.0 < $1.ptsRange.0 })
var currentQtsUpdates = qtsUpdates(groups)
currentQtsUpdates.sort(by: { $0.qtsRange.0 < $1.qtsRange.0 })
var currentSeqGroups = seqGroups(groups)
currentSeqGroups.sort(by: { $0.seqRange.0 < $1.seqRange.0 })
var collectedUpdates: [Api.Update] = []
for update in currentPtsUpdates {
if updatedState.state.pts >= update.ptsRange.0 {
if let update = update.update, case .updateWebPage = update {
collectedUpdates.append(update)
}
//skip old update
}
else if ptsUpdatesAfterHole.count == 0 && updatedState.state.pts == update.ptsRange.0 - update.ptsRange.1 {
//TODO: apply pts update
updatedState.mergeChats(update.chats)
updatedState.mergeUsers(update.users)
if let ptsUpdate = update.update {
collectedUpdates.append(ptsUpdate)
}
updatedState.updateState(AuthorizedAccountState.State(pts: update.ptsRange.0, qts: updatedState.state.qts, date: updatedState.state.date, seq: updatedState.state.seq))
} else {
if ptsUpdatesAfterHole.count == 0 {
Logger.shared.log("State", "update pts hole: \(update.ptsRange.0) != \(updatedState.state.pts) + \(update.ptsRange.1)")
}
ptsUpdatesAfterHole.append(update)
}
}
for update in currentQtsUpdates {
if updatedState.state.qts >= update.qtsRange.0 + update.qtsRange.1 {
//skip old update
} else if qtsUpdatesAfterHole.count == 0 && updatedState.state.qts == update.qtsRange.0 - update.qtsRange.1 {
//TODO apply qts update
updatedState.mergeChats(update.chats)
updatedState.mergeUsers(update.users)
collectedUpdates.append(update.update)
updatedState.updateState(AuthorizedAccountState.State(pts: updatedState.state.pts, qts: update.qtsRange.0, date: updatedState.state.date, seq: updatedState.state.seq))
} else {
if qtsUpdatesAfterHole.count == 0 {
Logger.shared.log("State", "update qts hole: \(update.qtsRange.0) != \(updatedState.state.qts) + \(update.qtsRange.1)")
}
qtsUpdatesAfterHole.append(update)
}
}
for group in currentSeqGroups {
if updatedState.state.seq >= group.seqRange.0 + group.seqRange.1 {
//skip old update
} else if seqGroupsAfterHole.count == 0 && updatedState.state.seq == group.seqRange.0 - group.seqRange.1 {
collectedUpdates.append(contentsOf: group.updates)
updatedState.mergeChats(group.chats)
updatedState.mergeUsers(group.users)
updatedState.updateState(AuthorizedAccountState.State(pts: updatedState.state.pts, qts: updatedState.state.qts, date: group.date, seq: group.seqRange.0))
} else {
if seqGroupsAfterHole.count == 0 {
Logger.shared.log("State", "update seq hole: \(group.seqRange.0) != \(updatedState.state.seq) + \(group.seqRange.1)")
}
seqGroupsAfterHole.append(group)
}
}
var currentDateGroups = dateGroups(groups)
currentDateGroups.sort(by: { group1, group2 -> Bool in
switch group1 {
case let .withDate(_, date1, _, _):
switch group2 {
case let .withDate(_, date2, _, _):
return date1 < date2
default:
return false
}
default:
return false
}
})
var updatesDate: Int32?
for group in currentDateGroups {
switch group {
case let .withDate(updates, date, users, chats):
collectedUpdates.append(contentsOf: updates)
updatedState.mergeChats(chats)
updatedState.mergeUsers(users)
if updatesDate == nil {
updatesDate = date
}
default:
break
}
}
for case let .updateChannelPts(channelId, pts, ptsCount) in groups {
collectedUpdates.append(Api.Update.updateDeleteChannelMessages(channelId: channelId, messages: [], pts: pts, ptsCount: ptsCount))
}
return finalStateWithUpdates(account: account, state: updatedState, updates: collectedUpdates, shouldPoll: hadReset, missingUpdates: !ptsUpdatesAfterHole.isEmpty || !qtsUpdatesAfterHole.isEmpty || !seqGroupsAfterHole.isEmpty, shouldResetChannels: true, updatesDate: updatesDate)
}
func finalStateWithDifference(account: Account, state: AccountMutableState, difference: Api.updates.Difference) -> Signal<AccountFinalState, NoError> {
var updatedState = state
var messages: [Api.Message] = []
var encryptedMessages: [Api.EncryptedMessage] = []
var updates: [Api.Update] = []
var chats: [Api.Chat] = []
var users: [Api.User] = []
switch difference {
case let .difference(newMessages, newEncryptedMessages, otherUpdates, apiChats, apiUsers, apiState):
messages = newMessages
encryptedMessages = newEncryptedMessages
updates = otherUpdates
chats = apiChats
users = apiUsers
switch apiState {
case let .state(pts, qts, date, seq, _):
updatedState.updateState(AuthorizedAccountState.State(pts: pts, qts: qts, date: date, seq: seq))
}
case let .differenceEmpty(date, seq):
updatedState.updateState(AuthorizedAccountState.State(pts: updatedState.state.pts, qts: updatedState.state.qts, date: date, seq: seq))
case let .differenceSlice(newMessages, newEncryptedMessages, otherUpdates, apiChats, apiUsers, apiState):
messages = newMessages
encryptedMessages = newEncryptedMessages
updates = otherUpdates
chats = apiChats
users = apiUsers
switch apiState {
case let .state(pts, qts, date, seq, _):
updatedState.updateState(AuthorizedAccountState.State(pts: pts, qts: qts, date: date, seq: seq))
}
case .differenceTooLong:
assertionFailure()
break
}
updatedState.mergeChats(chats)
updatedState.mergeUsers(users)
for message in messages {
if let preCachedResources = message.preCachedResources {
for (resource, data) in preCachedResources {
updatedState.addPreCachedResource(resource, data: data)
}
}
if let message = StoreMessage(apiMessage: message) {
updatedState.addMessages([message], location: .UpperHistoryBlock)
}
}
if !encryptedMessages.isEmpty {
updatedState.addSecretMessages(encryptedMessages)
}
return finalStateWithUpdates(account: account, state: updatedState, updates: updates, shouldPoll: false, missingUpdates: false, shouldResetChannels: true, updatesDate: nil)
}
private func sortedUpdates(_ updates: [Api.Update]) -> [Api.Update] {
var result: [Api.Update] = []
var updatesByChannel: [PeerId: [Api.Update]] = [:]
for update in updates {
switch update {
case let .updateChannelTooLong(_, channelId, _):
let peerId = PeerId(namespace: Namespaces.Peer.CloudChannel, id: channelId)
if updatesByChannel[peerId] == nil {
updatesByChannel[peerId] = [update]
} else {
updatesByChannel[peerId]!.append(update)
}
case let .updateDeleteChannelMessages(channelId, _, _, _):
let peerId = PeerId(namespace: Namespaces.Peer.CloudChannel, id: channelId)
if updatesByChannel[peerId] == nil {
updatesByChannel[peerId] = [update]
} else {
updatesByChannel[peerId]!.append(update)
}
case let .updateNewChannelMessage(message, _, _):
if let peerId = apiMessagePeerId(message) {
if updatesByChannel[peerId] == nil {
updatesByChannel[peerId] = [update]
} else {
updatesByChannel[peerId]!.append(update)
}
} else {
result.append(update)
}
case let .updateEditChannelMessage(message, _, _):
if let peerId = apiMessagePeerId(message) {
if updatesByChannel[peerId] == nil {
updatesByChannel[peerId] = [update]
} else {
updatesByChannel[peerId]!.append(update)
}
} else {
result.append(update)
}
case let .updateChannelWebPage(channelId, _, _, _):
let peerId = PeerId(namespace: Namespaces.Peer.CloudChannel, id: channelId)
if updatesByChannel[peerId] == nil {
updatesByChannel[peerId] = [update]
} else {
updatesByChannel[peerId]!.append(update)
}
case let .updateChannelAvailableMessages(channelId, _):
let peerId = PeerId(namespace: Namespaces.Peer.CloudChannel, id: channelId)
if updatesByChannel[peerId] == nil {
updatesByChannel[peerId] = [update]
} else {
updatesByChannel[peerId]!.append(update)
}
default:
result.append(update)
}
}
for (_, updates) in updatesByChannel {
let sortedUpdates = updates.sorted(by: { lhs, rhs in
var lhsPts: Int32?
var rhsPts: Int32?
switch lhs {
case let .updateDeleteChannelMessages(_, _, pts, _):
lhsPts = pts
case let .updateNewChannelMessage(_, pts, _):
lhsPts = pts
case let .updateChannelWebPage(_, _, pts, _):
lhsPts = pts
case let .updateEditChannelMessage(_, pts, _):
lhsPts = pts
default:
break
}
switch rhs {
case let .updateDeleteChannelMessages(_, _, pts, _):
rhsPts = pts
case let .updateNewChannelMessage(_, pts, _):
rhsPts = pts
case let .updateChannelWebPage(_, _, pts, _):
rhsPts = pts
case let .updateEditChannelMessage(_, pts, _):
rhsPts = pts
default:
break
}
if let lhsPts = lhsPts, let rhsPts = rhsPts {
return lhsPts < rhsPts
} else if let _ = lhsPts {
return true
} else {
return false
}
})
result.append(contentsOf: sortedUpdates)
}
return result
}
private func finalStateWithUpdates(account: Account, state: AccountMutableState, updates: [Api.Update], shouldPoll: Bool, missingUpdates: Bool, shouldResetChannels: Bool, updatesDate: Int32?) -> Signal<AccountFinalState, NoError> {
return account.network.currentGlobalTime
|> take(1)
|> mapToSignal { serverTime -> Signal<AccountFinalState, NoError> in
return finalStateWithUpdatesAndServerTime(account: account, state: state, updates: updates, shouldPoll: shouldPoll, missingUpdates: missingUpdates, shouldResetChannels: shouldResetChannels, updatesDate: updatesDate, serverTime: Int32(serverTime))
}
}
private func finalStateWithUpdatesAndServerTime(account: Account, state: AccountMutableState, updates: [Api.Update], shouldPoll: Bool, missingUpdates: Bool, shouldResetChannels: Bool, updatesDate: Int32?, serverTime: Int32) -> Signal<AccountFinalState, NoError> {
var updatedState = state
var channelsToPoll = Set<PeerId>()
if !updatedState.initialState.channelsToPollExplicitely.isEmpty {
channelsToPoll.formUnion(updatedState.initialState.channelsToPollExplicitely)
}
for update in sortedUpdates(updates) {
switch update {
case let .updateChannelTooLong(_, channelId, channelPts):
let peerId = PeerId(namespace: Namespaces.Peer.CloudChannel, id: channelId)
if !channelsToPoll.contains(peerId) {
if let channelPts = channelPts, let channelState = state.chatStates[peerId] as? ChannelState, channelState.pts >= channelPts {
Logger.shared.log("State", "channel \(peerId) (\((updatedState.peers[peerId] as? TelegramChannel)?.title ?? "nil")) skip updateChannelTooLong by pts")
} else {
channelsToPoll.insert(peerId)
}
}
case let .updateDeleteChannelMessages(channelId, messages, pts: pts, ptsCount):
let peerId = PeerId(namespace: Namespaces.Peer.CloudChannel, id: channelId)
if let previousState = updatedState.chatStates[peerId] as? ChannelState {
if previousState.pts >= pts {
Logger.shared.log("State", "channel \(peerId) (\((updatedState.peers[peerId] as? TelegramChannel)?.title ?? "nil")) skip old delete update")
} else if previousState.pts + ptsCount == pts {
updatedState.deleteMessages(messages.map({ MessageId(peerId: peerId, namespace: Namespaces.Message.Cloud, id: $0) }))
updatedState.updateChannelState(peerId, state: previousState.withUpdatedPts(pts))
} else {
if !channelsToPoll.contains(peerId) {
Logger.shared.log("State", "channel \(peerId) (\((updatedState.peers[peerId] as? TelegramChannel)?.title ?? "nil")) delete pts hole")
channelsToPoll.insert(peerId)
//updatedMissingUpdates = true
}
}
} else {
if !channelsToPoll.contains(peerId) {
//Logger.shared.log("State", "channel \(peerId) (\((updatedState.peers[peerId] as? TelegramChannel)?.title ?? "nil")) state unknown")
channelsToPoll.insert(peerId)
}
}
case let .updateEditChannelMessage(apiMessage, pts, ptsCount):
if let message = StoreMessage(apiMessage: apiMessage), case let .Id(messageId) = message.id {
let peerId = messageId.peerId
if let previousState = updatedState.chatStates[peerId] as? ChannelState {
if previousState.pts >= pts {
Logger.shared.log("State", "channel \(peerId) (\((updatedState.peers[peerId] as? TelegramChannel)?.title ?? "nil")) skip old edit update")
} else if previousState.pts + ptsCount == pts {
if let preCachedResources = apiMessage.preCachedResources {
for (resource, data) in preCachedResources {
updatedState.addPreCachedResource(resource, data: data)
}
}
var attributes = message.attributes
attributes.append(ChannelMessageStateVersionAttribute(pts: pts))
updatedState.editMessage(messageId, message: message.withUpdatedAttributes(attributes))
updatedState.updateChannelState(peerId, state: previousState.withUpdatedPts(pts))
} else {
if !channelsToPoll.contains(peerId) {
Logger.shared.log("State", "channel \(peerId) (\((updatedState.peers[peerId] as? TelegramChannel)?.title ?? "nil")) edit message pts hole")
channelsToPoll.insert(peerId)
//updatedMissingUpdates = true
}
}
} else {
if !channelsToPoll.contains(peerId) {
//Logger.shared.log("State", "channel \(peerId) (\((updatedState.peers[peerId] as? TelegramChannel)?.title ?? "nil")) state unknown")
channelsToPoll.insert(peerId)
}
}
} else {
Logger.shared.log("State", "Invalid updateEditChannelMessage")
}
case let .updateChannelWebPage(channelId, apiWebpage, pts, ptsCount):
let peerId = PeerId(namespace: Namespaces.Peer.CloudChannel, id: channelId)
if let previousState = updatedState.chatStates[peerId] as? ChannelState {
if previousState.pts >= pts {
} else if previousState.pts + ptsCount == pts {
switch apiWebpage {
case let .webPageEmpty(id):
updatedState.updateMedia(MediaId(namespace: Namespaces.Media.CloudWebpage, id: id), media: nil)
default:
if let webpage = telegramMediaWebpageFromApiWebpage(apiWebpage, url: nil) {
updatedState.updateMedia(webpage.webpageId, media: webpage)
}
}
updatedState.updateChannelState(peerId, state: previousState.withUpdatedPts(pts))
} else {
if !channelsToPoll.contains(peerId) {
Logger.shared.log("State", "channel \(peerId) (\((updatedState.peers[peerId] as? TelegramChannel)?.title ?? "nil")) updateWebPage pts hole")
channelsToPoll.insert(peerId)
}
}
} else {
if !channelsToPoll.contains(peerId) {
channelsToPoll.insert(peerId)
}
}
case let .updateChannelAvailableMessages(channelId, minId):
let peerId = PeerId(namespace: Namespaces.Peer.CloudChannel, id: channelId)
updatedState.updateMinAvailableMessage(MessageId(peerId: peerId, namespace: Namespaces.Message.Cloud, id: minId))
case let .updateDeleteMessages(messages, _, _):
updatedState.deleteMessagesWithGlobalIds(messages)
case let .updateEditMessage(apiMessage, _, _):
if let message = StoreMessage(apiMessage: apiMessage), case let .Id(messageId) = message.id {
if let preCachedResources = apiMessage.preCachedResources {
for (resource, data) in preCachedResources {
updatedState.addPreCachedResource(resource, data: data)
}
}
updatedState.editMessage(messageId, message: message)
}
case let .updateNewChannelMessage(apiMessage, pts, ptsCount):
if let message = StoreMessage(apiMessage: apiMessage) {
if let previousState = updatedState.chatStates[message.id.peerId] as? ChannelState {
if previousState.pts >= pts {
let messageText: String
if Logger.shared.redactSensitiveData {
messageText = "[[redacted]]"
} else {
messageText = message.text
}
Logger.shared.log("State", "channel \(message.id.peerId) (\((updatedState.peers[message.id.peerId] as? TelegramChannel)?.title ?? "nil")) skip old message \(message.id) (\(messageText))")
} else if previousState.pts + ptsCount == pts {
if let preCachedResources = apiMessage.preCachedResources {
for (resource, data) in preCachedResources {
updatedState.addPreCachedResource(resource, data: data)
}
}
var attributes = message.attributes
attributes.append(ChannelMessageStateVersionAttribute(pts: pts))
updatedState.addMessages([message.withUpdatedAttributes(attributes)], location: .UpperHistoryBlock)
updatedState.updateChannelState(message.id.peerId, state: previousState.withUpdatedPts(pts))
} else {
if !channelsToPoll.contains(message.id.peerId) {
Logger.shared.log("State", "channel \(message.id.peerId) (\((updatedState.peers[message.id.peerId] as? TelegramChannel)?.title ?? "nil")) message pts hole")
;
channelsToPoll.insert(message.id.peerId)
//updatedMissingUpdates = true
}
}
} else {
if !channelsToPoll.contains(message.id.peerId) {
Logger.shared.log("State", "channel \(message.id.peerId) (\((updatedState.peers[message.id.peerId] as? TelegramChannel)?.title ?? "nil")) state unknown")
channelsToPoll.insert(message.id.peerId)
}
}
}
case let .updateNewMessage(apiMessage, _, _):
if let message = StoreMessage(apiMessage: apiMessage) {
if let preCachedResources = apiMessage.preCachedResources {
for (resource, data) in preCachedResources {
updatedState.addPreCachedResource(resource, data: data)
}
}
updatedState.addMessages([message], location: .UpperHistoryBlock)
}
case let .updateServiceNotification(_, date, type, text, media, entities):
if let date = date {
let peerId = PeerId(namespace: Namespaces.Peer.CloudUser, id: 777000)
if updatedState.peers[peerId] == nil {
updatedState.updatePeer(peerId, { peer in
if peer == nil {
return TelegramUser(id: peerId, accessHash: nil, firstName: "Telegram Notifications", lastName: nil, username: nil, phone: nil, photo: [], botInfo: BotUserInfo(flags: [], inlinePlaceholder: nil), restrictionInfo: nil, flags: [.isVerified])
} else {
return peer
}
})
}
var alreadyStored = false
if let storedMessages = updatedState.storedMessagesByPeerIdAndTimestamp[peerId] {
for index in storedMessages {
if index.timestamp == date {
alreadyStored = true
break
}
}
}
if alreadyStored {
Logger.shared.log("State", "skipping message at \(date) for \(peerId): already stored")
} else {
var attributes: [MessageAttribute] = []
if !entities.isEmpty {
attributes.append(TextEntitiesMessageAttribute(entities: messageTextEntitiesFromApiEntities(entities)))
}
let messageText = text
var medias: [Media] = []
let (mediaValue, expirationTimer) = textMediaAndExpirationTimerFromApiMedia(media, peerId)
if let mediaValue = mediaValue {
medias.append(mediaValue)
}
if let expirationTimer = expirationTimer {
attributes.append(AutoremoveTimeoutMessageAttribute(timeout: expirationTimer, countdownBeginTime: nil))
}
let message = StoreMessage(peerId: peerId, namespace: Namespaces.Message.Local, globallyUniqueId: nil, groupingKey: nil, timestamp: date, flags: [.Incoming], tags: [], globalTags: [], localTags: [], forwardInfo: nil, authorId: peerId, text: messageText, attributes: attributes, media: [])
updatedState.addMessages([message], location: .UpperHistoryBlock)
}
} else {
updatedState.addDisplayAlert(text)
}
case let .updateReadChannelInbox(channelId, maxId):
updatedState.readInbox(MessageId(peerId: PeerId(namespace: Namespaces.Peer.CloudChannel, id: channelId), namespace: Namespaces.Message.Cloud, id: maxId))
case let .updateReadChannelOutbox(channelId, maxId):
updatedState.readOutbox(MessageId(peerId: PeerId(namespace: Namespaces.Peer.CloudChannel, id: channelId), namespace: Namespaces.Message.Cloud, id: maxId))
case let .updateReadHistoryInbox(peer, maxId, _, _):
updatedState.readInbox(MessageId(peerId: peer.peerId, namespace: Namespaces.Message.Cloud, id: maxId))
case let .updateReadHistoryOutbox(peer, maxId, _, _):
updatedState.readOutbox(MessageId(peerId: peer.peerId, namespace: Namespaces.Message.Cloud, id: maxId))
case let .updateDialogUnreadMark(flags, peer):
let peerId: PeerId
switch peer {
case let .dialogPeer(peer):
peerId = peer.peerId
}
updatedState.updatePeerChatUnreadMark(peerId, namespace: Namespaces.Message.Cloud, value: (flags & (1 << 0)) != 0)
/*feed*/
/*case let .updateReadFeed(_, feedId, maxPosition, unreadCount, unreadMutedCount):
switch maxPosition {
case let .feedPosition(date, peer, id):
let index = MessageIndex(id: MessageId(peerId: peer.peerId, namespace: Namespaces.Message.Cloud, id: id), timestamp: date)
updatedState.readGroupFeedInbox(groupId: PeerGroupId(rawValue: feedId), index: index)
}*/
case let .updateWebPage(apiWebpage, _, _):
switch apiWebpage {
case let .webPageEmpty(id):
updatedState.updateMedia(MediaId(namespace: Namespaces.Media.CloudWebpage, id: id), media: nil)
default:
if let webpage = telegramMediaWebpageFromApiWebpage(apiWebpage, url: nil) {
updatedState.updateMedia(webpage.webpageId, media: webpage)
}
}
case let .updateNotifySettings(apiPeer, apiNotificationSettings):
switch apiPeer {
case let .notifyPeer(peer):
let notificationSettings = TelegramPeerNotificationSettings(apiSettings: apiNotificationSettings)
updatedState.updateNotificationSettings(.peer(peer.peerId), notificationSettings: notificationSettings)
case .notifyUsers:
updatedState.updateGlobalNotificationSettings(.privateChats, notificationSettings: MessageNotificationSettings(apiSettings: apiNotificationSettings))
case .notifyChats:
updatedState.updateGlobalNotificationSettings(.groups, notificationSettings: MessageNotificationSettings(apiSettings: apiNotificationSettings))
default:
break
}
case let .updateChatParticipants(participants):
let groupPeerId: PeerId
switch participants {
case let .chatParticipants(chatId, _, _):
groupPeerId = PeerId(namespace: Namespaces.Peer.CloudGroup, id: chatId)
case let .chatParticipantsForbidden(_, chatId, _):
groupPeerId = PeerId(namespace: Namespaces.Peer.CloudGroup, id: chatId)
}
updatedState.updateCachedPeerData(groupPeerId, { current in
let previous: CachedGroupData
if let current = current as? CachedGroupData {
previous = current
} else {
previous = CachedGroupData()
}
return previous.withUpdatedParticipants(CachedGroupParticipants(apiParticipants: participants))
})
case let .updateChatParticipantAdd(chatId, userId, inviterId, date, _):
let groupPeerId = PeerId(namespace: Namespaces.Peer.CloudGroup, id: chatId)
let userPeerId = PeerId(namespace: Namespaces.Peer.CloudUser, id: userId)
let inviterPeerId = PeerId(namespace: Namespaces.Peer.CloudUser, id: inviterId)
updatedState.updateCachedPeerData(groupPeerId, { current in
if let current = current as? CachedGroupData, let participants = current.participants {
var updatedParticipants = participants.participants
if updatedParticipants.index(where: { $0.peerId == userPeerId }) == nil {
updatedParticipants.append(.member(id: userPeerId, invitedBy: inviterPeerId, invitedAt: date))
}
return current.withUpdatedParticipants(CachedGroupParticipants(participants: updatedParticipants, version: participants.version))
} else {
return current
}
})
case let .updateChatParticipantDelete(chatId, userId, _):
let groupPeerId = PeerId(namespace: Namespaces.Peer.CloudGroup, id: chatId)
let userPeerId = PeerId(namespace: Namespaces.Peer.CloudUser, id: userId)
updatedState.updateCachedPeerData(groupPeerId, { current in
if let current = current as? CachedGroupData, let participants = current.participants {
var updatedParticipants = participants.participants
if let index = updatedParticipants.index(where: { $0.peerId == userPeerId }) {
updatedParticipants.remove(at: index)
}
return current.withUpdatedParticipants(CachedGroupParticipants(participants: updatedParticipants, version: participants.version))
} else {
return current
}
})
case let .updateChatParticipantAdmin(chatId, userId, isAdmin, _):
let groupPeerId = PeerId(namespace: Namespaces.Peer.CloudGroup, id: chatId)
let userPeerId = PeerId(namespace: Namespaces.Peer.CloudUser, id: userId)
updatedState.updateCachedPeerData(groupPeerId, { current in
if let current = current as? CachedGroupData, let participants = current.participants {
var updatedParticipants = participants.participants
if let index = updatedParticipants.index(where: { $0.peerId == userPeerId }) {
if isAdmin == .boolTrue {
if case let .member(id, invitedBy, invitedAt) = updatedParticipants[index] {
updatedParticipants[index] = .admin(id: id, invitedBy: invitedBy, invitedAt: invitedAt)
}
} else {
if case let .admin(id, invitedBy, invitedAt) = updatedParticipants[index] {
updatedParticipants[index] = .member(id: id, invitedBy: invitedBy, invitedAt: invitedAt)
}
}
}
return current.withUpdatedParticipants(CachedGroupParticipants(participants: updatedParticipants, version: participants.version))
} else {
return current
}
})
case let .updateChatAdmins(chatId, enabled, version):
updatedState.updatePeer(PeerId(namespace: Namespaces.Peer.CloudGroup, id: chatId), { peer in
if let group = peer as? TelegramGroup {//, group.version == version - 1 {
var flags = group.flags
switch enabled {
case .boolTrue:
flags.insert(.adminsEnabled)
case .boolFalse:
let _ = flags.remove(.adminsEnabled)
}
return group.updateFlags(flags: flags, version: max(group.version, Int(version)))
} else {
return peer
}
})
case let .updateChannelPinnedMessage(channelId, id):
let channelPeerId = PeerId(namespace: Namespaces.Peer.CloudChannel, id: channelId)
updatedState.updateCachedPeerData(channelPeerId, { current in
let previous: CachedChannelData
if let current = current as? CachedChannelData {
previous = current
} else {
previous = CachedChannelData()
}
return previous.withUpdatedPinnedMessageId(id == 0 ? nil : MessageId(peerId: channelPeerId, namespace: Namespaces.Message.Cloud, id: id))
})
case let .updateUserBlocked(userId, blocked):
let userPeerId = PeerId(namespace: Namespaces.Peer.CloudUser, id: userId)
updatedState.updateCachedPeerData(userPeerId, { current in
let previous: CachedUserData
if let current = current as? CachedUserData {
previous = current
} else {
previous = CachedUserData()
}
return previous.withUpdatedIsBlocked(blocked == .boolTrue)
})
case let .updateUserStatus(userId, status):
updatedState.mergePeerPresences([PeerId(namespace: Namespaces.Peer.CloudUser, id: userId): status], explicit: true)
case let .updateUserName(userId, firstName, lastName, username):
//TODO add contact checking for apply first and last name
updatedState.updatePeer(PeerId(namespace: Namespaces.Peer.CloudUser, id: userId), { peer in
if let user = peer as? TelegramUser {
return user.withUpdatedUsername(username)
} else {
return peer
}
})
case let .updateUserPhoto(userId, _, photo, _):
updatedState.updatePeer(PeerId(namespace: Namespaces.Peer.CloudUser, id: userId), { peer in
if let user = peer as? TelegramUser {
return user.withUpdatedPhoto(parsedTelegramProfilePhoto(photo))
} else {
return peer
}
})
case let .updateUserPhone(userId, phone):
updatedState.updatePeer(PeerId(namespace: Namespaces.Peer.CloudUser, id: userId), { peer in
if let user = peer as? TelegramUser {
return user.withUpdatedPhone(phone)
} else {
return peer
}
})
case let .updateContactLink(userId, myLink, foreignLink):
let isContact: Bool
switch myLink {
case .contactLinkContact:
isContact = true
default:
isContact = false
}
let userPeerId = PeerId(namespace: Namespaces.Peer.CloudUser, id: userId)
updatedState.updatePeerIsContact(userPeerId, isContact: isContact)
updatedState.updateCachedPeerData(userPeerId, { current in
let previous: CachedUserData
if let current = current as? CachedUserData {
previous = current
} else {
previous = CachedUserData()
}
let hasPhone: Bool?
switch foreignLink {
case .contactLinkContact, .contactLinkHasPhone:
hasPhone = true
case .contactLinkNone:
hasPhone = false
case .contactLinkUnknown:
hasPhone = nil
}
return previous.withUpdatedHasAccountPeerPhone(hasPhone)
})
case let .updateEncryption(chat, date):
updatedState.updateSecretChat(chat: chat, timestamp: date)
case let .updateNewEncryptedMessage(message, _):
updatedState.addSecretMessages([message])
case let .updateEncryptedMessagesRead(chatId, maxDate, date):
updatedState.readSecretOutbox(peerId: PeerId(namespace: Namespaces.Peer.SecretChat, id: chatId), timestamp: maxDate, actionTimestamp: date)
case let .updateUserTyping(userId, type):
if let date = updatesDate, date + 60 > serverTime {
updatedState.addPeerInputActivity(chatPeerId: PeerId(namespace: Namespaces.Peer.CloudUser, id: userId), peerId: PeerId(namespace: Namespaces.Peer.CloudUser, id: userId), activity: PeerInputActivity(apiType: type))
}
case let .updateChatUserTyping(chatId, userId, type):
if let date = updatesDate, date + 60 > serverTime {
updatedState.addPeerInputActivity(chatPeerId: PeerId(namespace: Namespaces.Peer.CloudGroup, id: chatId), peerId: PeerId(namespace: Namespaces.Peer.CloudUser, id: userId), activity: PeerInputActivity(apiType: type))
updatedState.addPeerInputActivity(chatPeerId: PeerId(namespace: Namespaces.Peer.CloudChannel, id: chatId), peerId: PeerId(namespace: Namespaces.Peer.CloudUser, id: userId), activity: PeerInputActivity(apiType: type))
}
case let .updateEncryptedChatTyping(chatId):
if let date = updatesDate, date + 60 > serverTime {
updatedState.addPeerInputActivity(chatPeerId: PeerId(namespace: Namespaces.Peer.SecretChat, id: chatId), peerId: nil, activity: .typingText)
}
case let .updateDialogPinned(flags, peer):
let item: PinnedItemId
switch peer {
case let .dialogPeer(peer):
item = .peer(peer.peerId)
/*feed*/
/*case let .dialogPeerFeed(feedId):
item = .group(PeerGroupId(rawValue: feedId))*/
}
if (flags & (1 << 0)) != 0 {
updatedState.addUpdatePinnedItemIds(.pin(item))
} else {
updatedState.addUpdatePinnedItemIds(.unpin(item))
}
case let .updatePinnedDialogs(_, order):
if let order = order {
updatedState.addUpdatePinnedItemIds(.reorder(order.map {
let item: PinnedItemId
switch $0 {
case let .dialogPeer(peer):
item = .peer(peer.peerId)
/*feed*/
/*case let .dialogPeerFeed(feedId):
item = .group(PeerGroupId(rawValue: feedId))*/
}
return item
}))
} else {
updatedState.addUpdatePinnedItemIds(.sync)
}
case let .updateReadMessagesContents(messages, _, _):
updatedState.addReadMessagesContents((nil, messages))
case let .updateChannelReadMessagesContents(channelId, messages):
updatedState.addReadMessagesContents((PeerId(namespace: Namespaces.Peer.CloudChannel, id: channelId), messages))
case let .updateChannelMessageViews(channelId, id, views):
updatedState.addUpdateMessageImpressionCount(id: MessageId(peerId: PeerId(namespace: Namespaces.Peer.CloudChannel, id: channelId), namespace: Namespaces.Message.Cloud, id: id), count: views)
case let .updateNewStickerSet(stickerset):
updatedState.addUpdateInstalledStickerPacks(.add(stickerset))
case let .updateStickerSetsOrder(flags, order):
let namespace: SynchronizeInstalledStickerPacksOperationNamespace
if (flags & (1 << 0)) != 0 {
namespace = .masks
} else {
namespace = .stickers
}
updatedState.addUpdateInstalledStickerPacks(.reorder(namespace, order))
case .updateStickerSets:
updatedState.addUpdateInstalledStickerPacks(.sync)
case .updateSavedGifs:
updatedState.addUpdateRecentGifs()
case let .updateDraftMessage(peer, draft):
let inputState: SynchronizeableChatInputState?
switch draft {
case .draftMessageEmpty:
inputState = nil
case let .draftMessage(_, replyToMsgId, message, entities, date):
var replyToMessageId: MessageId?
if let replyToMsgId = replyToMsgId {
replyToMessageId = MessageId(peerId: peer.peerId, namespace: Namespaces.Message.Cloud, id: replyToMsgId)
}
inputState = SynchronizeableChatInputState(replyToMessageId: replyToMessageId, text: message, timestamp: date)
}
updatedState.addUpdateChatInputState(peerId: peer.peerId, state: inputState)
case let .updatePhoneCall(phoneCall):
updatedState.addUpdateCall(phoneCall)
case .updateLangPackTooLong:
updatedState.updateLangPack(nil)
case let .updateLangPack(difference):
updatedState.updateLangPack(difference)
default:
break
}
}
var pollChannelSignals: [Signal<(AccountMutableState, Bool, Int32?), NoError>] = []
if channelsToPoll.isEmpty {
pollChannelSignals = []
} else if shouldResetChannels {
var channelPeers: [Peer] = []
for peerId in channelsToPoll {
if let peer = updatedState.peers[peerId] {
channelPeers.append(peer)
} else {
Logger.shared.log("State", "can't reset channel \(peerId): no peer found")
}
}
if !channelPeers.isEmpty {
let resetSignal = resetChannels(account, peers: channelPeers, state: updatedState)
|> map { resultState -> (AccountMutableState, Bool, Int32?) in
return (resultState, true, nil)
}
pollChannelSignals = [resetSignal]
} else {
pollChannelSignals = []
}
} else {
for peerId in channelsToPoll {
if let peer = updatedState.peers[peerId] {
pollChannelSignals.append(pollChannel(account, peer: peer, state: updatedState.branch()))
} else {
Logger.shared.log("State", "can't poll channel \(peerId): no peer found")
}
}
}
return combineLatest(pollChannelSignals) |> mapToSignal { states -> Signal<AccountFinalState, NoError> in
var finalState: AccountMutableState = updatedState
var hadError = false
if shouldResetChannels && states.count != 0 {
assert(states.count == 1)
finalState = states[0].0
} else {
for (state, success, _) in states {
finalState.merge(state)
if !success {
hadError = true
}
}
}
return resolveAssociatedMessages(account: account, state: finalState)
|> mapToSignal { resultingState -> Signal<AccountFinalState, NoError> in
return resolveMissingPeerNotificationSettings(account: account, state: resultingState)
|> mapToSignal { resultingState -> Signal<AccountFinalState, NoError> in
return resolveMissingPeerCloudReadStates(account: account, state: resultingState)
|> map { resultingState -> AccountFinalState in
return AccountFinalState(state: resultingState, shouldPoll: shouldPoll || hadError, incomplete: missingUpdates)
}
}
}
}
}
private func resolveAssociatedMessages(account: Account, state: AccountMutableState) -> Signal<AccountMutableState, NoError> {
let missingMessageIds = state.referencedMessageIds.subtracting(state.storedMessages)
if missingMessageIds.isEmpty {
return .single(state)
} else {
var missingPeers = false
var signals: [Signal<([Api.Message], [Api.Chat], [Api.User]), NoError>] = []
for (peerId, messageIds) in messagesIdsGroupedByPeerId(missingMessageIds) {
if let peer = state.peers[peerId] {
var signal: Signal<Api.messages.Messages, MTRpcError>?
if peerId.namespace == Namespaces.Peer.CloudUser || peerId.namespace == Namespaces.Peer.CloudGroup {
signal = account.network.request(Api.functions.messages.getMessages(id: messageIds.map({ Api.InputMessage.inputMessageID(id: $0.id) })))
} else if peerId.namespace == Namespaces.Peer.CloudChannel {
if let inputChannel = apiInputChannel(peer) {
signal = account.network.request(Api.functions.channels.getMessages(channel: inputChannel, id: messageIds.map({ Api.InputMessage.inputMessageID(id: $0.id) })))
}
}
if let signal = signal {
signals.append(signal |> map { result in
switch result {
case let .messages(messages, chats, users):
return (messages, chats, users)
case let .messagesSlice(_, messages, chats, users):
return (messages, chats, users)
case let .channelMessages(_, _, _, messages, chats, users):
return (messages, chats, users)
case .messagesNotModified:
return ([], [], [])
}
} |> `catch` { _ in
return Signal<([Api.Message], [Api.Chat], [Api.User]), NoError>.single(([], [], []))
})
}
} else {
missingPeers = true
}
}
let fetchMessages = combineLatest(signals)
return fetchMessages |> map { results in
var updatedState = state
for (messages, chats, users) in results {
if !messages.isEmpty {
var storeMessages: [StoreMessage] = []
for message in messages {
if let message = StoreMessage(apiMessage: message) {
storeMessages.append(message)
}
}
updatedState.addMessages(storeMessages, location: .Random)
}
if !chats.isEmpty {
updatedState.mergeChats(chats)
}
if !users.isEmpty {
updatedState.mergeUsers(users)
}
}
return updatedState
}
}
}
private func resolveMissingPeerNotificationSettings(account: Account, state: AccountMutableState) -> Signal<AccountMutableState, NoError> {
var missingPeers: [PeerId: Api.InputPeer] = [:]
for peerId in state.initialState.peerIdsWithNewMessages {
if state.peerNotificationSettings[peerId] == nil {
if let peer = state.peers[peerId], let inputPeer = apiInputPeer(peer) {
missingPeers[peerId] = inputPeer
} else {
Logger.shared.log("State", "can't fetch notification settings for peer \(peerId): can't create inputPeer")
}
}
}
if missingPeers.isEmpty {
return .single(state)
} else {
Logger.shared.log("State", "will fetch notification settings for \(missingPeers.count) peers")
var signals: [Signal<(PeerId, PeerNotificationSettings)?, NoError>] = []
for (peerId, peer) in missingPeers {
let fetchSettings = account.network.request(Api.functions.account.getNotifySettings(peer: .inputNotifyPeer(peer: peer)))
|> map { settings -> (PeerId, PeerNotificationSettings)? in
return (peerId, TelegramPeerNotificationSettings(apiSettings: settings))
}
|> `catch` { _ -> Signal<(PeerId, PeerNotificationSettings)?, NoError> in
return .single(nil)
}
signals.append(fetchSettings)
}
return combineLatest(signals)
|> map { peersAndSettings -> AccountMutableState in
var updatedState = state
for pair in peersAndSettings {
if let (peerId, settings) = pair {
updatedState.updateNotificationSettings(.peer(peerId), notificationSettings: settings)
}
}
return updatedState
}
}
}
private func resolveMissingPeerCloudReadStates(account: Account, state: AccountMutableState) -> Signal<AccountMutableState, NoError> {
var missingPeers: [PeerId: Api.InputPeer] = [:]
for peerId in state.initialState.peerIdsWithNewMessages {
if state.initialState.cloudReadStates[peerId] == nil && (peerId.namespace == Namespaces.Peer.CloudUser || peerId.namespace == Namespaces.Peer.CloudGroup) {
if let peer = state.peers[peerId], let inputPeer = apiInputPeer(peer) {
missingPeers[peerId] = inputPeer
} else {
Logger.shared.log("State", "can't fetch notification settings for peer \(peerId): can't create inputPeer")
}
}
}
if missingPeers.isEmpty {
return .single(state)
} else {
Logger.shared.log("State", "will fetch cloud read states for \(missingPeers.count) peers")
var signals: [Signal<(PeerId, PeerReadState)?, NoError>] = []
for (peerId, inputPeer) in missingPeers {
let fetchSettings = fetchPeerCloudReadState(network: account.network, postbox: account.postbox, peerId: peerId, inputPeer: inputPeer)
|> map { state -> (PeerId, PeerReadState)? in
return state.flatMap { (peerId, $0) }
}
signals.append(fetchSettings)
}
return combineLatest(signals)
|> map { peersAndSettings -> AccountMutableState in
var updatedState = state
for pair in peersAndSettings {
if let (peerId, state) = pair {
if case let .idBased(maxIncomingReadId, maxOutgoingReadId, maxKnownId, count, markedUnread) = state {
updatedState.resetReadState(peerId, namespace: Namespaces.Message.Cloud, maxIncomingReadId: maxIncomingReadId, maxOutgoingReadId: maxOutgoingReadId, maxKnownId: maxKnownId, count: count, markedUnread: markedUnread)
}
}
}
return updatedState
}
}
}
func keepPollingChannel(account: Account, peerId: PeerId, stateManager: AccountStateManager) -> Signal<Void, NoError> {
return account.postbox.transaction { transaction -> Signal<Void, NoError> in
if let accountState = (transaction.getState() as? AuthorizedAccountState)?.state, let peer = transaction.getPeer(peerId) {
var chatStates: [PeerId: PeerChatState] = [:]
if let channelState = transaction.getPeerChatState(peerId) as? ChannelState {
chatStates[peerId] = channelState
}
let initialPeers: [PeerId: Peer] = [peerId: peer]
var peerNotificationSettings: [PeerId: TelegramPeerNotificationSettings] = [:]
if let notificationSettings = transaction.getPeerNotificationSettings(peerId) as? TelegramPeerNotificationSettings {
peerNotificationSettings[peerId] = notificationSettings
}
let initialState = AccountMutableState(initialState: AccountInitialState(state: accountState, peerIds: Set(), peerIdsWithNewMessages: Set(), chatStates: chatStates, peerNotificationSettings: peerNotificationSettings, locallyGeneratedMessageTimestamps: [:], cloudReadStates: [:], channelsToPollExplicitely: Set()), initialPeers: initialPeers, initialReferencedMessageIds: Set(), initialStoredMessages: Set(), initialReadInboxMaxIds: [:], storedMessagesByPeerIdAndTimestamp: [:])
return pollChannel(account, peer: peer, state: initialState)
|> mapToSignal { (finalState, _, timeout) -> Signal<Void, NoError> in
return resolveAssociatedMessages(account: account, state: finalState)
|> mapToSignal { resultingState -> Signal<AccountFinalState, NoError> in
return resolveMissingPeerNotificationSettings(account: account, state: resultingState)
|> map { resultingState -> AccountFinalState in
return AccountFinalState(state: resultingState, shouldPoll: false, incomplete: false)
}
}
|> mapToSignal { finalState -> Signal<Void, NoError> in
return stateManager.addReplayAsynchronouslyBuiltFinalState(finalState)
|> mapToSignal { _ -> Signal<Void, NoError> in
return .complete() |> delay(Double(timeout ?? 30), queue: Queue.concurrentDefaultQueue())
}
}
}
} else {
return .complete() |> delay(30.0, queue: Queue.concurrentDefaultQueue())
}
} |> switchToLatest |> restart
}
private func resetChannels(_ account: Account, peers: [Peer], state: AccountMutableState) -> Signal<AccountMutableState, NoError> {
var inputPeers: [Api.InputDialogPeer] = []
for peer in peers {
if let inputPeer = apiInputPeer(peer) {
inputPeers.append(.inputDialogPeer(peer: inputPeer))
}
}
return account.network.request(Api.functions.messages.getPeerDialogs(peers: inputPeers))
|> map(Optional.init)
|> `catch` { error -> Signal<Api.messages.PeerDialogs?, NoError> in
if error.errorDescription == "CHANNEL_PRIVATE" && inputPeers.count == 1 {
return .single(nil)
} else {
return .single(nil)
}
}
|> mapToSignal { result -> Signal<AccountMutableState, NoError> in
var updatedState = state
var dialogsChats: [Api.Chat] = []
var dialogsUsers: [Api.User] = []
var storeMessages: [StoreMessage] = []
var readStates: [PeerId: [MessageId.Namespace: PeerReadState]] = [:]
var mentionTagSummaries: [PeerId: MessageHistoryTagNamespaceSummary] = [:]
var channelStates: [PeerId: ChannelState] = [:]
var notificationSettings: [PeerId: PeerNotificationSettings] = [:]
if let result = result {
switch result {
case let .peerDialogs(dialogs, messages, chats, users, _):
dialogsChats.append(contentsOf: chats)
dialogsUsers.append(contentsOf: users)
loop: for dialog in dialogs {
let apiPeer: Api.Peer
let apiReadInboxMaxId: Int32
let apiReadOutboxMaxId: Int32
let apiTopMessage: Int32
let apiUnreadCount: Int32
let apiUnreadMentionsCount: Int32
var apiChannelPts: Int32?
let apiNotificationSettings: Api.PeerNotifySettings
let apiMarkedUnread: Bool
switch dialog {
case let .dialog(flags, peer, topMessage, readInboxMaxId, readOutboxMaxId, unreadCount, unreadMentionsCount, peerNotificationSettings, pts, _):
apiPeer = peer
apiTopMessage = topMessage
apiReadInboxMaxId = readInboxMaxId
apiReadOutboxMaxId = readOutboxMaxId
apiUnreadCount = unreadCount
apiMarkedUnread = (flags & (1 << 3)) != 0
apiUnreadMentionsCount = unreadMentionsCount
apiNotificationSettings = peerNotificationSettings
apiChannelPts = pts
/*feed*/
/*case .dialogFeed:
assertionFailure()
continue loop*/
}
let peerId: PeerId
switch apiPeer {
case let .peerUser(userId):
peerId = PeerId(namespace: Namespaces.Peer.CloudUser, id: userId)
case let .peerChat(chatId):
peerId = PeerId(namespace: Namespaces.Peer.CloudGroup, id: chatId)
case let .peerChannel(channelId):
peerId = PeerId(namespace: Namespaces.Peer.CloudChannel, id: channelId)
}
if readStates[peerId] == nil {
readStates[peerId] = [:]
}
readStates[peerId]![Namespaces.Message.Cloud] = .idBased(maxIncomingReadId: apiReadInboxMaxId, maxOutgoingReadId: apiReadOutboxMaxId, maxKnownId: apiTopMessage, count: apiUnreadCount, markedUnread: apiMarkedUnread)
if apiTopMessage != 0 {
mentionTagSummaries[peerId] = MessageHistoryTagNamespaceSummary(version: 1, count: apiUnreadMentionsCount, range: MessageHistoryTagNamespaceCountValidityRange(maxId: apiTopMessage))
}
if let apiChannelPts = apiChannelPts {
channelStates[peerId] = ChannelState(pts: apiChannelPts, invalidatedPts: apiChannelPts)
}
notificationSettings[peerId] = TelegramPeerNotificationSettings(apiSettings: apiNotificationSettings)
}
for message in messages {
if let storeMessage = StoreMessage(apiMessage: message) {
var updatedStoreMessage = storeMessage
if case let .Id(id) = storeMessage.id {
if let channelState = channelStates[id.peerId] {
var updatedAttributes = storeMessage.attributes
updatedAttributes.append(ChannelMessageStateVersionAttribute(pts: channelState.pts))
updatedStoreMessage = updatedStoreMessage.withUpdatedAttributes(updatedAttributes)
}
}
storeMessages.append(updatedStoreMessage)
}
}
}
}
updatedState.mergeChats(dialogsChats)
updatedState.mergeUsers(dialogsUsers)
for message in storeMessages {
if case let .Id(id) = message.id, id.namespace == Namespaces.Message.Cloud {
updatedState.setNeedsHoleFromPreviousState(peerId: id.peerId, namespace: id.namespace)
}
}
updatedState.addMessages(storeMessages, location: .UpperHistoryBlock)
for (peerId, peerReadStates) in readStates {
for (namespace, state) in peerReadStates {
switch state {
case let .idBased(maxIncomingReadId, maxOutgoingReadId, maxKnownId, count, markedUnread):
updatedState.resetReadState(peerId, namespace: namespace, maxIncomingReadId: maxIncomingReadId, maxOutgoingReadId: maxOutgoingReadId, maxKnownId: maxKnownId, count: count, markedUnread: markedUnread)
default:
assertionFailure()
break
}
}
}
for (peerId, tagSummary) in mentionTagSummaries {
updatedState.resetMessageTagSummary(peerId, namespace: Namespaces.Message.Cloud, count: tagSummary.count, range: tagSummary.range)
}
for (peerId, channelState) in channelStates {
updatedState.updateChannelState(peerId, state: channelState)
}
for (peerId, settings) in notificationSettings {
updatedState.updateNotificationSettings(.peer(peerId), notificationSettings: settings)
}
// TODO: delete messages later than top
return resolveAssociatedMessages(account: account, state: updatedState)
|> mapToSignal { resultingState -> Signal<AccountMutableState, NoError> in
return .single(resultingState)
}
}
}
private func pollChannel(_ account: Account, peer: Peer, state: AccountMutableState) -> Signal<(AccountMutableState, Bool, Int32?), NoError> {
if let inputChannel = apiInputChannel(peer) {
let limit: Int32 = 20
let pollPts: Int32
if let channelState = state.chatStates[peer.id] as? ChannelState {
pollPts = channelState.pts
} else {
pollPts = 1
}
return (account.network.request(Api.functions.updates.getChannelDifference(flags: 0, channel: inputChannel, filter: .channelMessagesFilterEmpty, pts: pollPts, limit: limit))
|> map(Optional.init)
|> `catch` { error -> Signal<Api.updates.ChannelDifference?, MTRpcError> in
if error.errorDescription == "CHANNEL_PRIVATE" {
return .single(nil)
} else {
return .fail(error)
}
})
|> retryRequest
|> map { difference -> (AccountMutableState, Bool, Int32?) in
var updatedState = state
var apiTimeout: Int32?
if let difference = difference {
switch difference {
case let .channelDifference(_, pts, timeout, newMessages, otherUpdates, chats, users):
apiTimeout = timeout
let channelState: ChannelState
if let previousState = updatedState.chatStates[peer.id] as? ChannelState {
channelState = previousState.withUpdatedPts(pts)
} else {
channelState = ChannelState(pts: pts, invalidatedPts: nil)
}
updatedState.updateChannelState(peer.id, state: channelState)
updatedState.mergeChats(chats)
updatedState.mergeUsers(users)
for apiMessage in newMessages {
if let message = StoreMessage(apiMessage: apiMessage) {
if let preCachedResources = apiMessage.preCachedResources {
for (resource, data) in preCachedResources {
updatedState.addPreCachedResource(resource, data: data)
}
}
updatedState.addMessages([message], location: .UpperHistoryBlock)
}
}
for update in otherUpdates {
switch update {
case let .updateDeleteChannelMessages(_, messages, _, _):
let peerId = peer.id
updatedState.deleteMessages(messages.map({ MessageId(peerId: peerId, namespace: Namespaces.Message.Cloud, id: $0) }))
case let .updateEditChannelMessage(apiMessage, _, _):
if let message = StoreMessage(apiMessage: apiMessage), case let .Id(messageId) = message.id, messageId.peerId == peer.id {
if let preCachedResources = apiMessage.preCachedResources {
for (resource, data) in preCachedResources {
updatedState.addPreCachedResource(resource, data: data)
}
}
var attributes = message.attributes
attributes.append(ChannelMessageStateVersionAttribute(pts: pts))
updatedState.editMessage(messageId, message: message.withUpdatedAttributes(attributes))
} else {
Logger.shared.log("State", "Invalid updateEditChannelMessage")
}
case let .updateChannelPinnedMessage(_, id):
updatedState.updateCachedPeerData(peer.id, { current in
let previous: CachedChannelData
if let current = current as? CachedChannelData {
previous = current
} else {
previous = CachedChannelData()
}
return previous.withUpdatedPinnedMessageId(id == 0 ? nil : MessageId(peerId: peer.id, namespace: Namespaces.Message.Cloud, id: id))
})
case let .updateChannelReadMessagesContents(_, messages):
updatedState.addReadMessagesContents((peer.id, messages))
case let .updateChannelMessageViews(_, id, views):
updatedState.addUpdateMessageImpressionCount(id: MessageId(peerId: peer.id, namespace: Namespaces.Message.Cloud, id: id), count: views)
case let .updateChannelWebPage(_, apiWebpage, _, _):
switch apiWebpage {
case let .webPageEmpty(id):
updatedState.updateMedia(MediaId(namespace: Namespaces.Media.CloudWebpage, id: id), media: nil)
default:
if let webpage = telegramMediaWebpageFromApiWebpage(apiWebpage, url: nil) {
updatedState.updateMedia(webpage.webpageId, media: webpage)
}
}
case let .updateChannelAvailableMessages(_, minId):
let messageId = MessageId(peerId: peer.id, namespace: Namespaces.Message.Cloud, id: minId)
updatedState.updateMinAvailableMessage(messageId)
updatedState.updateCachedPeerData(peer.id, { current in
let previous: CachedChannelData
if let current = current as? CachedChannelData {
previous = current
} else {
previous = CachedChannelData()
}
return previous.withUpdatedMinAvailableMessageId(messageId)
})
default:
break
}
}
case let .channelDifferenceEmpty(_, pts, timeout):
apiTimeout = timeout
let channelState: ChannelState
if let previousState = updatedState.chatStates[peer.id] as? ChannelState {
channelState = previousState.withUpdatedPts(pts)
} else {
channelState = ChannelState(pts: pts, invalidatedPts: nil)
}
updatedState.updateChannelState(peer.id, state: channelState)
case let .channelDifferenceTooLong(_, pts, timeout, topMessage, readInboxMaxId, readOutboxMaxId, unreadCount, unreadMentionsCount, messages, chats, users):
apiTimeout = timeout
let channelState = ChannelState(pts: pts, invalidatedPts: pts)
updatedState.updateChannelState(peer.id, state: channelState)
updatedState.mergeChats(chats)
updatedState.mergeUsers(users)
updatedState.setNeedsHoleFromPreviousState(peerId: peer.id, namespace: Namespaces.Message.Cloud)
for apiMessage in messages {
if let message = StoreMessage(apiMessage: apiMessage) {
if let preCachedResources = apiMessage.preCachedResources {
for (resource, data) in preCachedResources {
updatedState.addPreCachedResource(resource, data: data)
}
}
let location: AddMessagesLocation
if case let .Id(id) = message.id, id.id == topMessage {
location = .UpperHistoryBlock
} else {
location = .Random
}
updatedState.addMessages([message], location: location)
}
}
updatedState.resetReadState(peer.id, namespace: Namespaces.Message.Cloud, maxIncomingReadId: readInboxMaxId, maxOutgoingReadId: readOutboxMaxId, maxKnownId: topMessage, count: unreadCount, markedUnread: nil)
updatedState.resetMessageTagSummary(peer.id, namespace: Namespaces.Message.Cloud, count: unreadMentionsCount, range: MessageHistoryTagNamespaceCountValidityRange(maxId: topMessage))
}
}
return (updatedState, difference != nil, apiTimeout)
}
} else {
Logger.shared.log("State", "can't poll channel \(peer.id): can't create inputChannel")
return single((state, true, nil), NoError.self)
}
}
private func verifyTransaction(_ transaction: Transaction, finalState: AccountMutableState) -> Bool {
var hadUpdateState = false
var channelsWithUpdatedStates = Set<PeerId>()
var missingPeerIds: [PeerId] = []
for peerId in finalState.initialState.peerIds {
if finalState.peers[peerId] == nil {
missingPeerIds.append(peerId)
}
}
if !missingPeerIds.isEmpty {
Logger.shared.log("State", "missing peers \(missingPeerIds)")
return false
}
for operation in finalState.operations {
switch operation {
case let .UpdateChannelState(peerId, _):
channelsWithUpdatedStates.insert(peerId)
case .UpdateState:
hadUpdateState = true
default:
break
}
}
var failed = false
if hadUpdateState {
var previousStateMatches = false
let currentState = (transaction.getState() as? AuthorizedAccountState)?.state
let previousState = finalState.initialState.state
if let currentState = currentState {
previousStateMatches = previousState == currentState
} else {
previousStateMatches = false
}
if !previousStateMatches {
Logger.shared.log("State", ".UpdateState previous state \(previousState) doesn't match current state \(String(describing: currentState))")
failed = true
}
}
for peerId in channelsWithUpdatedStates {
let currentState = transaction.getPeerChatState(peerId)
var previousStateMatches = false
let previousState = finalState.initialState.chatStates[peerId] as? ChannelState
if let currentState = currentState, let previousState = previousState {
if currentState.equals(previousState) {
previousStateMatches = true
}
} else if currentState == nil && previousState == nil {
previousStateMatches = true
}
if !previousStateMatches {
Logger.shared.log("State", ".UpdateChannelState for \(peerId), previous state \(String(describing: previousState)) doesn't match current state \(String(describing: currentState))")
failed = true
}
}
return !failed
}
private enum ReplayFinalStateIncomplete {
case MoreDataNeeded
case PollRequired
}
private enum ReplayFinalStateResult {
case Completed
case Incomplete(ReplayFinalStateIncomplete)
}
private final class OptimizeAddMessagesState {
var messages: [StoreMessage]
var location: AddMessagesLocation
init(messages: [StoreMessage], location: AddMessagesLocation) {
self.messages = messages
self.location = location
}
}
private func optimizedOperations(_ operations: [AccountStateMutationOperation]) -> [AccountStateMutationOperation] {
var result: [AccountStateMutationOperation] = []
var updatedState: AuthorizedAccountState.State?
var updatedChannelStates: [PeerId: ChannelState] = [:]
var currentAddMessages: OptimizeAddMessagesState?
for operation in operations {
switch operation {
case .DeleteMessages, .DeleteMessagesWithGlobalIds, .EditMessage, .UpdateMedia, .MergeApiChats, .MergeApiUsers, .MergePeerPresences, .UpdatePeer, .ReadInbox, .ReadOutbox, .ReadGroupFeedInbox, .ResetReadState, .UpdatePeerChatUnreadMark, .ResetMessageTagSummary, .UpdateNotificationSettings, .UpdateGlobalNotificationSettings, .UpdateSecretChat, .AddSecretMessages, .ReadSecretOutbox, .AddPeerInputActivity, .UpdateCachedPeerData, .UpdatePinnedItemIds, .ReadMessageContents, .UpdateMessageImpressionCount, .UpdateInstalledStickerPacks, .UpdateRecentGifs, .UpdateChatInputState, .UpdateCall, .UpdateLangPack, .UpdateMinAvailableMessage, .UpdateIsContact:
if let currentAddMessages = currentAddMessages, !currentAddMessages.messages.isEmpty {
result.append(.AddMessages(currentAddMessages.messages, currentAddMessages.location))
}
currentAddMessages = nil
result.append(operation)
case let .UpdateState(state):
updatedState = state
case let .UpdateChannelState(peerId, state):
updatedChannelStates[peerId] = state
case let .AddMessages(messages, location):
if let currentAddMessages = currentAddMessages, currentAddMessages.location == location {
currentAddMessages.messages.append(contentsOf: messages)
} else {
if let currentAddMessages = currentAddMessages, !currentAddMessages.messages.isEmpty {
result.append(.AddMessages(currentAddMessages.messages, currentAddMessages.location))
}
currentAddMessages = OptimizeAddMessagesState(messages: messages, location: location)
}
}
}
if let currentAddMessages = currentAddMessages, !currentAddMessages.messages.isEmpty {
result.append(.AddMessages(currentAddMessages.messages, currentAddMessages.location))
}
if let updatedState = updatedState {
result.append(.UpdateState(updatedState))
}
for (peerId, state) in updatedChannelStates {
result.append(.UpdateChannelState(peerId, state))
}
return result
}
private var testAddInvalidation = false
func replayFinalState(accountPeerId: PeerId, mediaBox: MediaBox, transaction: Transaction, auxiliaryMethods: AccountAuxiliaryMethods, finalState: AccountFinalState) -> AccountReplayedFinalState? {
let verified = verifyTransaction(transaction, finalState: finalState.state)
if !verified {
Logger.shared.log("State", "failed to verify final state")
return nil
}
var peerIdsWithAddedSecretMessages = Set<PeerId>()
var updatedTypingActivities: [PeerId: [PeerId: PeerInputActivity?]] = [:]
var updatedSecretChatTypingActivities = Set<PeerId>()
var updatedWebpages: [MediaId: TelegramMediaWebpage] = [:]
var updatedCalls: [Api.PhoneCall] = []
var isContactUpdates: [(PeerId, Bool)] = []
var stickerPackOperations: [AccountStateUpdateStickerPacksOperation] = []
var recentlyUsedStickers: [MediaId: (MessageIndex, TelegramMediaFile)] = [:]
var recentlyUsedGifs: [MediaId: (MessageIndex, TelegramMediaFile)] = [:]
var syncRecentGifs = false
var langPackDifferences: [Api.LangPackDifference] = []
var pollLangPack = false
var delayNotificatonsUntil: Int32?
var addHolesToGroupFeedIds = Set<PeerGroupId>()
for (peerId, namespaces) in finalState.state.namespacesWithHolesFromPreviousState {
for namespace in namespaces {
transaction.addHole(MessageId(peerId: peerId, namespace: namespace, id: Int32.max))
if namespace == Namespaces.Message.Cloud {
let peer: Peer? = finalState.state.peers[peerId] ?? transaction.getPeer(peerId)
if let peer = peer {
var groupId: PeerGroupId?
if let channel = peer as? TelegramChannel {
groupId = channel.peerGroupId
}
if groupId == nil {
groupId = transaction.getPeerGroupId(peerId)
}
if let groupId = groupId {
addHolesToGroupFeedIds.insert(groupId)
}
} else {
assertionFailure()
}
}
}
}
for groupId in addHolesToGroupFeedIds {
transaction.addFeedHoleFromLatestEntries(groupId: groupId)
let groupState = (transaction.getPeerGroupState(groupId) as? TelegramPeerGroupState) ?? TelegramPeerGroupState()
transaction.setPeerGroupState(groupId, state: groupState.withInvalidatedStateIndex())
}
if !testAddInvalidation {
testAddInvalidation = true
let groupId = PeerGroupId(rawValue: 1)
let groupState = (transaction.getPeerGroupState(groupId) as? TelegramPeerGroupState) ?? TelegramPeerGroupState()
transaction.setPeerGroupState(groupId, state: groupState.withInvalidatedStateIndex())
}
var addedOperationIncomingMessageIds: [MessageId] = []
for operation in finalState.state.operations {
switch operation {
case let .AddMessages(messages, location):
if case .UpperHistoryBlock = location {
for message in messages {
if case let .Id(id) = message.id, message.flags.contains(.Incoming) {
addedOperationIncomingMessageIds.append(id)
}
}
}
default:
break
}
}
var addedIncomingMessageIds: [MessageId] = []
if !addedOperationIncomingMessageIds.isEmpty {
let existingIds = transaction.filterStoredMessageIds(Set(addedOperationIncomingMessageIds))
for id in addedOperationIncomingMessageIds {
if !existingIds.contains(id) {
addedIncomingMessageIds.append(id)
}
}
}
for operation in optimizedOperations(finalState.state.operations) {
switch operation {
case let .AddMessages(messages, location):
let _ = transaction.addMessages(messages, location: location)
if case .UpperHistoryBlock = location {
for message in messages {
let chatPeerId = message.id.peerId
if let authorId = message.authorId {
let activityValue: PeerInputActivity? = nil
if updatedTypingActivities[chatPeerId] == nil {
updatedTypingActivities[chatPeerId] = [authorId: activityValue]
} else {
updatedTypingActivities[chatPeerId]![authorId] = activityValue
}
}
if !message.flags.contains(.Incoming), message.forwardInfo == nil {
inner: for media in message.media {
if let file = media as? TelegramMediaFile {
for attribute in file.attributes {
switch attribute {
case .Sticker:
if let index = message.index {
if let (currentIndex, _) = recentlyUsedStickers[file.fileId] {
if currentIndex < index {
recentlyUsedStickers[file.fileId] = (index, file)
}
} else {
recentlyUsedStickers[file.fileId] = (index, file)
}
}
case .Animated:
if let index = message.index {
if let (currentIndex, _) = recentlyUsedGifs[file.fileId] {
if currentIndex < index {
recentlyUsedGifs[file.fileId] = (index, file)
}
} else {
recentlyUsedGifs[file.fileId] = (index, file)
}
}
default:
break
}
}
break inner
}
}
}
}
}
case let .DeleteMessagesWithGlobalIds(ids):
transaction.deleteMessagesWithGlobalIds(ids)
case let .DeleteMessages(ids):
deleteMessages(transaction: transaction, mediaBox: mediaBox, ids: ids)
case let .UpdateMinAvailableMessage(id):
transaction.deleteMessagesInRange(peerId: id.peerId, namespace: id.namespace, minId: 1, maxId: id.id)
case let .EditMessage(id, message):
transaction.updateMessage(id, update: { previousMessage in
var updatedFlags = message.flags
var updatedLocalTags = message.localTags
if previousMessage.localTags.contains(.OutgoingLiveLocation) {
updatedLocalTags.insert(.OutgoingLiveLocation)
}
if message.flags.contains(.Incoming) {
updatedFlags.insert(.Incoming)
} else {
updatedFlags.remove(.Incoming)
}
return .update(message.withUpdatedLocalTags(updatedLocalTags).withUpdatedFlags(updatedFlags))
})
case let .UpdateMedia(id, media):
if let media = media as? TelegramMediaWebpage {
updatedWebpages[id] = media
}
updateMessageMedia(transaction: transaction, id: id, media: media)
case let .ReadInbox(messageId):
transaction.applyIncomingReadMaxId(messageId)
case let .ReadOutbox(messageId):
transaction.applyOutgoingReadMaxId(messageId)
case let .ReadGroupFeedInbox(groupId, index):
transaction.applyGroupFeedReadMaxIndex(groupId: groupId, index: index)
case let .ResetReadState(peerId, namespace, maxIncomingReadId, maxOutgoingReadId, maxKnownId, count, markedUnread):
var markedUnreadValue: Bool = false
if let markedUnread = markedUnread {
markedUnreadValue = markedUnread
} else if let states = transaction.getPeerReadStates(peerId) {
inner: for (stateNamespace, stateValue) in states {
if stateNamespace == namespace {
markedUnreadValue = stateValue.markedUnread
break inner
}
}
}
transaction.resetIncomingReadStates([peerId: [namespace: .idBased(maxIncomingReadId: maxIncomingReadId, maxOutgoingReadId: maxOutgoingReadId, maxKnownId: maxKnownId, count: count, markedUnread: markedUnreadValue)]])
case let .UpdatePeerChatUnreadMark(peerId, namespace, value):
transaction.applyMarkUnread(peerId: peerId, namespace: namespace, value: value, interactive: false)
case let .ResetMessageTagSummary(peerId, namespace, count, range):
transaction.replaceMessageTagSummary(peerId: peerId, tagMask: .unseenPersonalMessage, namespace: namespace, count: count, maxId: range.maxId)
if count == 0 {
let ids = transaction.getMessageIndicesWithTag(peerId: peerId, tag: .unseenPersonalMessage).map({ $0.id })
for id in ids {
if id.namespace == namespace {
markUnseenPersonalMessage(transaction: transaction, id: id, addSynchronizeAction: false)
}
}
}
case let .UpdateState(state):
let currentState = transaction.getState() as! AuthorizedAccountState
transaction.setState(currentState.changedState(state))
Logger.shared.log("State", "apply state \(state)")
case let .UpdateChannelState(peerId, channelState):
transaction.setPeerChatState(peerId, state: channelState)
case let .UpdateNotificationSettings(subject, notificationSettings):
switch subject {
case let .peer(peerId):
transaction.updateCurrentPeerNotificationSettings([peerId: notificationSettings])
}
case let .UpdateGlobalNotificationSettings(subject, notificationSettings):
switch subject {
case .privateChats:
transaction.updatePreferencesEntry(key: PreferencesKeys.globalNotifications, { current in
var previous: GlobalNotificationSettings
if let current = current as? GlobalNotificationSettings {
previous = current
} else {
previous = GlobalNotificationSettings.defaultSettings
}
return GlobalNotificationSettings(toBeSynchronized: previous.toBeSynchronized, remote: previous.remote.withUpdatedPrivateChats { _ in
return notificationSettings
})
})
case .groups:
transaction.updatePreferencesEntry(key: PreferencesKeys.globalNotifications, { current in
var previous: GlobalNotificationSettings
if let current = current as? GlobalNotificationSettings {
previous = current
} else {
previous = GlobalNotificationSettings.defaultSettings
}
return GlobalNotificationSettings(toBeSynchronized: previous.toBeSynchronized, remote: previous.remote.withUpdatedGroupChats { _ in
return notificationSettings
})
})
}
case let .MergeApiChats(chats):
var peers: [Peer] = []
for chat in chats {
if let groupOrChannel = mergeGroupOrChannel(lhs: transaction.getPeer(chat.peerId), rhs: chat) {
peers.append(groupOrChannel)
}
}
updatePeers(transaction: transaction, peers: peers, update: { _, updated in
return updated
})
case let .MergeApiUsers(users):
var peers: [Peer] = []
for user in users {
if let telegramUser = TelegramUser.merge(transaction.getPeer(user.peerId) as? TelegramUser, rhs: user) {
peers.append(telegramUser)
}
}
updatePeers(transaction: transaction, peers: peers, update: { _, updated in
return updated
})
case let .UpdatePeer(id, f):
if let peer = f(transaction.getPeer(id)) {
updatePeers(transaction: transaction, peers: [peer], update: { _, updated in
return updated
})
}
case let .UpdateCachedPeerData(id, f):
transaction.updatePeerCachedData(peerIds: Set([id]), update: { _, current in
return f(current)
})
case let .MergePeerPresences(statuses, explicit):
var presences: [PeerId: PeerPresence] = [:]
for (peerId, status) in statuses {
if peerId == accountPeerId {
if explicit {
switch status {
case let .userStatusOnline(timestamp):
delayNotificatonsUntil = timestamp + 30
case let .userStatusOffline(timestamp):
delayNotificatonsUntil = timestamp
default:
break
}
}
} else {
let presence = TelegramUserPresence(apiStatus: status)
presences[peerId] = presence
}
}
transaction.updatePeerPresences(presences)
case let .UpdateSecretChat(chat, _):
updateSecretChat(accountPeerId: accountPeerId, transaction: transaction, chat: chat, requestData: nil)
case let .AddSecretMessages(messages):
for message in messages {
let peerId = message.peerId
transaction.operationLogAddEntry(peerId: peerId, tag: OperationLogTags.SecretIncomingEncrypted, tagLocalIndex: .automatic, tagMergedIndex: .none, contents: SecretChatIncomingEncryptedOperation(message: message))
peerIdsWithAddedSecretMessages.insert(peerId)
}
case let .ReadSecretOutbox(peerId, maxTimestamp, actionTimestamp):
applyOutgoingReadMaxIndex(transaction: transaction, index: MessageIndex.upperBound(peerId: peerId, timestamp: maxTimestamp, namespace: Namespaces.Message.Local), beginCountdownAt: actionTimestamp)
case let .AddPeerInputActivity(chatPeerId, peerId, activity):
if let peerId = peerId {
if updatedTypingActivities[chatPeerId] == nil {
updatedTypingActivities[chatPeerId] = [peerId: activity]
} else {
updatedTypingActivities[chatPeerId]![peerId] = activity
}
} else if chatPeerId.namespace == Namespaces.Peer.SecretChat {
updatedSecretChatTypingActivities.insert(chatPeerId)
}
case let .UpdatePinnedItemIds(pinnedOperation):
switch pinnedOperation {
case let .pin(itemId):
switch itemId {
case let .peer(peerId):
if transaction.getPeer(peerId) == nil || transaction.getPeerChatListInclusion(peerId) == .notSpecified {
addSynchronizePinnedChatsOperation(transaction: transaction)
} else {
var currentItemIds = transaction.getPinnedItemIds()
if !currentItemIds.contains(.peer(peerId)) {
currentItemIds.insert(.peer(peerId), at: 0)
transaction.setPinnedItemIds(currentItemIds)
}
}
case let .group(groupId):
break
}
case let .unpin(itemId):
switch itemId {
case let .peer(peerId):
var currentItemIds = transaction.getPinnedItemIds()
if let index = currentItemIds.index(of: .peer(peerId)) {
currentItemIds.remove(at: index)
transaction.setPinnedItemIds(currentItemIds)
} else {
addSynchronizePinnedChatsOperation(transaction: transaction)
}
case let .group(groupId):
break
}
case let .reorder(itemIds):
let currentItemIds = transaction.getPinnedItemIds()
if Set(itemIds) == Set(currentItemIds) {
transaction.setPinnedItemIds(itemIds)
} else {
addSynchronizePinnedChatsOperation(transaction: transaction)
}
case .sync:
addSynchronizePinnedChatsOperation(transaction: transaction)
}
case let .ReadMessageContents(peerId, messageIds):
if let peerId = peerId {
for id in messageIds {
markMessageContentAsConsumedRemotely(transaction: transaction, messageId: MessageId(peerId: peerId, namespace: Namespaces.Message.Cloud, id: id))
}
} else {
for messageId in transaction.messageIdsForGlobalIds(messageIds) {
markMessageContentAsConsumedRemotely(transaction: transaction, messageId: messageId)
}
}
case let .UpdateMessageImpressionCount(id, count):
transaction.updateMessage(id, update: { currentMessage in
var storeForwardInfo: StoreMessageForwardInfo?
if let forwardInfo = currentMessage.forwardInfo {
storeForwardInfo = StoreMessageForwardInfo(authorId: forwardInfo.author.id, sourceId: forwardInfo.source?.id, sourceMessageId: forwardInfo.sourceMessageId, date: forwardInfo.date, authorSignature: forwardInfo.authorSignature)
}
var attributes = currentMessage.attributes
loop: for j in 0 ..< attributes.count {
if let attribute = attributes[j] as? ViewCountMessageAttribute {
attributes[j] = ViewCountMessageAttribute(count: max(attribute.count, Int(count)))
break loop
}
}
return .update(StoreMessage(id: currentMessage.id, globallyUniqueId: currentMessage.globallyUniqueId, groupingKey: currentMessage.groupingKey, timestamp: currentMessage.timestamp, flags: StoreMessageFlags(currentMessage.flags), tags: currentMessage.tags, globalTags: currentMessage.globalTags, localTags: currentMessage.localTags, forwardInfo: storeForwardInfo, authorId: currentMessage.author?.id, text: currentMessage.text, attributes: attributes, media: currentMessage.media))
})
case let .UpdateInstalledStickerPacks(operation):
stickerPackOperations.append(operation)
case .UpdateRecentGifs:
syncRecentGifs = true
case let .UpdateChatInputState(peerId, inputState):
transaction.updatePeerChatInterfaceState(peerId, update: { current in
return auxiliaryMethods.updatePeerChatInputState(current, inputState)
})
case let .UpdateCall(call):
updatedCalls.append(call)
case let .UpdateLangPack(difference):
if let difference = difference {
langPackDifferences.append(difference)
} else {
pollLangPack = true
}
case let .UpdateIsContact(peerId, value):
isContactUpdates.append((peerId, value))
}
}
for peerId in finalState.state.initialState.peerIdsWithNewMessages {
updatePeerChatInclousionWithNewMessages(transaction: transaction, id: peerId)
}
if !stickerPackOperations.isEmpty {
if stickerPackOperations.contains(where: {
if case .sync = $0 {
return true
} else {
return false
}
}) {
addSynchronizeInstalledStickerPacksOperation(transaction: transaction, namespace: .stickers, content: .sync)
addSynchronizeInstalledStickerPacksOperation(transaction: transaction, namespace: .masks, content: .sync)
} else {
var syncStickers = false
var syncMasks = false
loop: for operation in stickerPackOperations {
switch operation {
case let .add(apiSet):
let namespace: ItemCollectionId.Namespace
var items: [ItemCollectionItem] = []
let info: StickerPackCollectionInfo
switch apiSet {
case let .stickerSet(set, packs, documents):
var indexKeysByFile: [MediaId: [MemoryBuffer]] = [:]
for pack in packs {
switch pack {
case let .stickerPack(text, fileIds):
let key = ValueBoxKey(text).toMemoryBuffer()
for fileId in fileIds {
let mediaId = MediaId(namespace: Namespaces.Media.CloudFile, id: fileId)
if indexKeysByFile[mediaId] == nil {
indexKeysByFile[mediaId] = [key]
} else {
indexKeysByFile[mediaId]!.append(key)
}
}
break
}
}
for apiDocument in documents {
if let file = telegramMediaFileFromApiDocument(apiDocument), let id = file.id {
let fileIndexKeys: [MemoryBuffer]
if let indexKeys = indexKeysByFile[id] {
fileIndexKeys = indexKeys
} else {
fileIndexKeys = []
}
items.append(StickerPackItem(index: ItemCollectionItemIndex(index: Int32(items.count), id: id.id), file: file, indexKeys: fileIndexKeys))
}
}
switch set {
case let .stickerSet(flags, _, _, _, _, _, _, _):
if (flags & (1 << 3)) != 0 {
namespace = Namespaces.ItemCollection.CloudMaskPacks
} else {
namespace = Namespaces.ItemCollection.CloudStickerPacks
}
}
info = StickerPackCollectionInfo(apiSet: set, namespace: namespace)
}
if namespace == Namespaces.ItemCollection.CloudMaskPacks && syncMasks {
continue loop
} else if namespace == Namespaces.ItemCollection.CloudStickerPacks && syncStickers {
continue loop
}
var updatedInfos = transaction.getItemCollectionsInfos(namespace: info.id.namespace).map { $0.1 as! StickerPackCollectionInfo }
if let index = updatedInfos.index(where: { $0.id == info.id }) {
let currentInfo = updatedInfos[index]
updatedInfos.remove(at: index)
updatedInfos.insert(currentInfo, at: 0)
} else {
updatedInfos.insert(info, at: 0)
transaction.replaceItemCollectionItems(collectionId: info.id, items: items)
}
transaction.replaceItemCollectionInfos(namespace: info.id.namespace, itemCollectionInfos: updatedInfos.map { ($0.id, $0) })
case let .reorder(namespace, ids):
let collectionNamespace: ItemCollectionId.Namespace
switch namespace {
case .stickers:
collectionNamespace = Namespaces.ItemCollection.CloudStickerPacks
case .masks:
collectionNamespace = Namespaces.ItemCollection.CloudMaskPacks
}
let currentInfos = transaction.getItemCollectionsInfos(namespace: collectionNamespace).map { $0.1 as! StickerPackCollectionInfo }
if Set(currentInfos.map { $0.id.id }) != Set(ids) {
switch namespace {
case .stickers:
syncStickers = true
case .masks:
syncMasks = true
}
} else {
var currentDict: [ItemCollectionId: StickerPackCollectionInfo] = [:]
for info in currentInfos {
currentDict[info.id] = info
}
var updatedInfos: [StickerPackCollectionInfo] = []
for id in ids {
let currentInfo = currentDict[ItemCollectionId(namespace: collectionNamespace, id: id)]!
updatedInfos.append(currentInfo)
}
transaction.replaceItemCollectionInfos(namespace: collectionNamespace, itemCollectionInfos: updatedInfos.map { ($0.id, $0) })
}
case .sync:
syncStickers = true
syncMasks = true
break loop
}
}
if syncStickers {
addSynchronizeInstalledStickerPacksOperation(transaction: transaction, namespace: .stickers, content: .sync)
}
if syncMasks {
addSynchronizeInstalledStickerPacksOperation(transaction: transaction, namespace: .masks, content: .sync)
}
}
}
if !recentlyUsedStickers.isEmpty {
let stickerFiles: [TelegramMediaFile] = recentlyUsedStickers.values.sorted(by: {
return $0.0 < $1.0
}).map({ $0.1 })
for file in stickerFiles {
transaction.addOrMoveToFirstPositionOrderedItemListItem(collectionId: Namespaces.OrderedItemList.CloudRecentStickers, item: OrderedItemListEntry(id: RecentMediaItemId(file.fileId).rawValue, contents: RecentMediaItem(file)), removeTailIfCountExceeds: 20)
}
}
if syncRecentGifs {
addSynchronizeSavedGifsOperation(transaction: transaction, operation: .sync)
} else {
let gifFiles: [TelegramMediaFile] = recentlyUsedGifs.values.sorted(by: {
return $0.0 < $1.0
}).map({ $0.1 })
for file in gifFiles {
transaction.addOrMoveToFirstPositionOrderedItemListItem(collectionId: Namespaces.OrderedItemList.CloudRecentGifs, item: OrderedItemListEntry(id: RecentMediaItemId(file.fileId).rawValue, contents: RecentMediaItem(file)), removeTailIfCountExceeds: 200)
}
}
for chatPeerId in updatedSecretChatTypingActivities {
if let peer = transaction.getPeer(chatPeerId) as? TelegramSecretChat {
let authorId = peer.regularPeerId
let activityValue: PeerInputActivity? = .typingText
if updatedTypingActivities[chatPeerId] == nil {
updatedTypingActivities[chatPeerId] = [authorId: activityValue]
} else {
updatedTypingActivities[chatPeerId]![authorId] = activityValue
}
}
}
var addedSecretMessageIds: [MessageId] = []
var addedSecretMessageAuthorIds: [PeerId: PeerId] = [:]
for peerId in peerIdsWithAddedSecretMessages {
while true {
let keychain = (transaction.getPeerChatState(peerId) as? SecretChatState)?.keychain
if processSecretChatIncomingEncryptedOperations(transaction: transaction, peerId: peerId) {
let processResult = processSecretChatIncomingDecryptedOperations(mediaBox: mediaBox, transaction: transaction, peerId: peerId)
if !processResult.addedMessages.isEmpty {
for message in processResult.addedMessages {
if case let .Id(id) = message.id {
addedSecretMessageIds.append(id)
if let authorId = message.authorId {
if addedSecretMessageAuthorIds[peerId] == nil {
addedSecretMessageAuthorIds[peerId] = authorId
}
}
}
}
}
}
let updatedKeychain = (transaction.getPeerChatState(peerId) as? SecretChatState)?.keychain
if updatedKeychain == keychain {
break
}
}
}
for (chatPeerId, authorId) in addedSecretMessageAuthorIds {
let activityValue: PeerInputActivity? = nil
if updatedTypingActivities[chatPeerId] == nil {
updatedTypingActivities[chatPeerId] = [authorId: activityValue]
} else {
updatedTypingActivities[chatPeerId]![authorId] = activityValue
}
}
if pollLangPack {
addSynchronizeLocalizationUpdatesOperation(transaction: transaction)
} else if !langPackDifferences.isEmpty {
langPackDifferences.sort(by: { lhs, rhs in
let lhsVersion: Int32
switch lhs {
case let .langPackDifference(_, fromVersion, _, _):
lhsVersion = fromVersion
}
let rhsVersion: Int32
switch rhs {
case let .langPackDifference(_, fromVersion, _, _):
rhsVersion = fromVersion
}
return lhsVersion < rhsVersion
})
for difference in langPackDifferences {
if !tryApplyingLanguageDifference(transaction: transaction, difference: difference) {
addSynchronizeLocalizationUpdatesOperation(transaction: transaction)
break
}
}
}
addedIncomingMessageIds.append(contentsOf: addedSecretMessageIds)
return AccountReplayedFinalState(state: finalState, addedIncomingMessageIds: addedIncomingMessageIds, addedSecretMessageIds: addedSecretMessageIds, updatedTypingActivities: updatedTypingActivities, updatedWebpages: updatedWebpages, updatedCalls: updatedCalls, isContactUpdates: isContactUpdates, delayNotificatonsUntil: delayNotificatonsUntil)
}