diff --git a/TelegramCore/Account.swift b/TelegramCore/Account.swift index 5268e2879a..811cafcda5 100644 --- a/TelegramCore/Account.swift +++ b/TelegramCore/Account.swift @@ -251,7 +251,7 @@ let telegramPostboxSeedConfiguration: SeedConfiguration = { } else { return [.regularChatsAndPrivateGroups] } - }) + }, additionalChatListIndexNamespace: Namespaces.Message.Cloud) }() public func accountWithId(networkArguments: NetworkInitializationArguments, id: AccountRecordId, supplementary: Bool, rootPath: String, beginWithTestingEnvironment: Bool, auxiliaryMethods: AccountAuxiliaryMethods, shouldKeepAutoConnection: Bool = true) -> Signal { diff --git a/TelegramCore/AccountStateManager.swift b/TelegramCore/AccountStateManager.swift index 702bb249d6..228a466089 100644 --- a/TelegramCore/AccountStateManager.swift +++ b/TelegramCore/AccountStateManager.swift @@ -369,7 +369,11 @@ public final class AccountStateManager { |> take(1) |> mapToSignal { state -> Signal<(Api.updates.Difference?, AccountReplayedFinalState?), NoError> in if let authorizedState = state.state { - let request = network.request(Api.functions.updates.getDifference(flags: 1 << 0, pts: authorizedState.pts, ptsTotalLimit: 1000, date: authorizedState.date, qts: authorizedState.qts)) + var ptsTotalLimit: Int32 = 10000 + #if DEBUG + ptsTotalLimit = 1000 + #endif + let request = network.request(Api.functions.updates.getDifference(flags: 1 << 0, pts: authorizedState.pts, ptsTotalLimit: ptsTotalLimit, date: authorizedState.date, qts: authorizedState.qts)) |> retryRequest return request diff --git a/TelegramCore/AccountStateReset.swift b/TelegramCore/AccountStateReset.swift index 8b4beb6372..c03c384d9a 100644 --- a/TelegramCore/AccountStateReset.swift +++ b/TelegramCore/AccountStateReset.swift @@ -9,254 +9,483 @@ import Foundation import MtProtoKitDynamic #endif +private struct LocalChatListEntryRange { + var upperBound: ChatListIndex? + var lowerBound: ChatListIndex + var count: Int32 + var hash: UInt32 + + var apiHash: Int32 { + return Int32(bitPattern: self.hash & UInt32(0x7FFFFFFF)) + } +} + +private func combineHash(_ value: Int32, into hash: inout UInt32) { + let low = UInt32(bitPattern: value) + hash = (hash &* 20261) &+ low +} + +private func localChatListEntryRanges(_ entries: [ChatListNamespaceEntry], limit: Int) -> [LocalChatListEntryRange] { + var result: [LocalChatListEntryRange] = [] + var currentRange: LocalChatListEntryRange? + for i in 0 ..< entries.count { + switch entries[i] { + case let .peer(index, readState, topMessageAttributes, tagSummary, interfaceState): + var updatedRange: LocalChatListEntryRange + if let current = currentRange { + updatedRange = current + } else { + updatedRange = LocalChatListEntryRange(upperBound: result.last?.lowerBound, lowerBound: index, count: 0, hash: 0) + } + updatedRange.lowerBound = index + updatedRange.count += 1 + + /* + dialog.pinned ? 1 : 0, + dialog.unread_mark ? 1 : 0, + dialog.peer.channel_id || dialog.peer.chat_id || dialog.peer.user_id, + dialog.top_message.id, + top_message.edit_date || top_message.date, + dialog.read_inbox_max_id, + dialog.read_outbox_max_id, + dialog.unread_count, + dialog.unread_mentions_count, + draft.draft.date || 0 + + */ + + combineHash(index.pinningIndex != nil ? 1 : 0, into: &updatedRange.hash) + if let readState = readState, readState.markedUnread { + combineHash(1, into: &updatedRange.hash) + } else { + combineHash(0, into: &updatedRange.hash) + } + combineHash(index.messageIndex.id.peerId.id, into: &updatedRange.hash) + combineHash(index.messageIndex.id.id, into: &updatedRange.hash) + var timestamp = index.messageIndex.timestamp + for attribute in topMessageAttributes { + if let attribute = attribute as? EditedMessageAttribute { + timestamp = max(timestamp, attribute.date) + } + } + combineHash(timestamp, into: &updatedRange.hash) + if let readState = readState, case let .idBased(maxIncomingReadId, maxOutgoingReadId, _, count, _) = readState { + combineHash(maxIncomingReadId, into: &updatedRange.hash) + combineHash(maxOutgoingReadId, into: &updatedRange.hash) + combineHash(count, into: &updatedRange.hash) + } else { + combineHash(0, into: &updatedRange.hash) + combineHash(0, into: &updatedRange.hash) + combineHash(0, into: &updatedRange.hash) + } + + if let tagSummary = tagSummary { + combineHash(tagSummary.count, into: &updatedRange.hash) + } else { + combineHash(0, into: &updatedRange.hash) + } + + if let embeddedState = interfaceState?.chatListEmbeddedState { + combineHash(embeddedState.timestamp, into: &updatedRange.hash) + } else { + combineHash(0, into: &updatedRange.hash) + } + + if Int(updatedRange.count) >= limit { + result.append(updatedRange) + currentRange = nil + } else { + currentRange = updatedRange + } + case .hole: + if let currentRangeValue = currentRange { + result.append(currentRangeValue) + currentRange = nil + } + } + } + if let currentRangeValue = currentRange { + result.append(currentRangeValue) + currentRange = nil + } + return result +} + +private struct ResolvedChatListResetRange { + let head: Bool + let local: LocalChatListEntryRange + let remote: FetchedChatList +} + func accountStateReset(postbox: Postbox, network: Network, accountPeerId: PeerId) -> Signal { let pinnedChats: Signal = network.request(Api.functions.messages.getPinnedDialogs()) - |> retryRequest - let state: Signal = - network.request(Api.functions.updates.getState()) - |> retryRequest + |> retryRequest + let state: Signal = network.request(Api.functions.updates.getState()) + |> retryRequest - return combineLatest(network.request(Api.functions.messages.getDialogs(flags: 0, /*feed*//*feedId: nil,*/ offsetDate: 0, offsetId: 0, offsetPeer: .inputPeerEmpty, limit: 100, hash: 0)) - |> retryRequest, pinnedChats, state) - |> mapToSignal { result, pinnedChats, state -> Signal in - var dialogsDialogs: [Api.Dialog] = [] - var dialogsMessages: [Api.Message] = [] - var dialogsChats: [Api.Chat] = [] - var dialogsUsers: [Api.User] = [] - - var holeExists = false - - switch result { - case let .dialogs(dialogs, messages, chats, users): - dialogsDialogs = dialogs - dialogsMessages = messages - dialogsChats = chats - dialogsUsers = users - case let .dialogsSlice(_, dialogs, messages, chats, users): - dialogsDialogs = dialogs - dialogsMessages = messages - dialogsChats = chats - dialogsUsers = users - holeExists = true - case .dialogsNotModified: - dialogsDialogs = [] - dialogsMessages = [] - dialogsChats = [] - dialogsUsers = [] - } - - let replacePinnedItemIds: [PinnedItemId] - switch pinnedChats { - case let .peerDialogs(apiDialogs, apiMessages, apiChats, apiUsers, _): - dialogsDialogs.append(contentsOf: apiDialogs) - dialogsMessages.append(contentsOf: apiMessages) - dialogsChats.append(contentsOf: apiChats) - dialogsUsers.append(contentsOf: apiUsers) - - var itemIds: [PinnedItemId] = [] - - loop: for dialog in apiDialogs { - switch dialog { - case let .dialog(_, peer, _, _, _, _, _, _, _, _): - itemIds.append(.peer(peer.peerId)) - /*feed*/ - /*case let .dialogFeed(_, _, _, feedId, _, _, _, _): - itemIds.append(.group(PeerGroupId(rawValue: feedId))) - continue loop*/ - } + return postbox.transaction { transaction -> [ChatListNamespaceEntry] in + return transaction.getChatListNamespaceEntries(groupId: nil, namespace: Namespaces.Message.Cloud, summaryTag: MessageTags.unseenPersonalMessage) + } + |> mapToSignal { localChatListEntries -> Signal in + let localRanges = localChatListEntryRanges(localChatListEntries, limit: 10) + var signal: Signal = .complete() + for i in 0 ..< localRanges.count { + let upperBound: MessageIndex + let head = i == 0 + let localRange = localRanges[i] + if let rangeUpperBound = localRange.upperBound { + upperBound = rangeUpperBound.messageIndex + } else { + upperBound = MessageIndex(id: MessageId(peerId: PeerId(namespace: Namespaces.Peer.Empty, id: 0), namespace: Namespaces.Message.Cloud, id: 0), timestamp: 0) } - replacePinnedItemIds = itemIds - } - - var replacementHole: ChatListHole? - var storeMessages: [StoreMessage] = [] - var readStates: [PeerId: [MessageId.Namespace: PeerReadState]] = [:] - var mentionTagSummaries: [PeerId: MessageHistoryTagNamespaceSummary] = [:] - var chatStates: [PeerId: PeerChatState] = [:] - var notificationSettings: [PeerId: PeerNotificationSettings] = [:] - - var topMesageIds: [PeerId: MessageId] = [:] - - loop: for dialog in dialogsDialogs { - let apiPeer: Api.Peer - let apiReadInboxMaxId: Int32 - let apiReadOutboxMaxId: Int32 - let apiTopMessage: Int32 - let apiUnreadCount: Int32 - let apiMarkedUnread: Bool - let apiUnreadMentionsCount: Int32 - var apiChannelPts: Int32? - let apiNotificationSettings: Api.PeerNotifySettings - switch dialog { - case let .dialog(flags, peer, topMessage, readInboxMaxId, readOutboxMaxId, unreadCount, unreadMentionsCount, peerNotificationSettings, pts, _): - apiPeer = peer - apiTopMessage = topMessage - apiReadInboxMaxId = readInboxMaxId - apiReadOutboxMaxId = readOutboxMaxId - apiUnreadCount = unreadCount - apiMarkedUnread = (flags & (1 << 3)) != 0 - apiUnreadMentionsCount = unreadMentionsCount - apiNotificationSettings = peerNotificationSettings - apiChannelPts = pts - /*feed*/ - /*case .dialogFeed: - //assertionFailure() - continue loop*/ - } - - let peerId: PeerId - switch apiPeer { - case let .peerUser(userId): - peerId = PeerId(namespace: Namespaces.Peer.CloudUser, id: userId) - case let .peerChat(chatId): - peerId = PeerId(namespace: Namespaces.Peer.CloudGroup, id: chatId) - case let .peerChannel(channelId): - peerId = PeerId(namespace: Namespaces.Peer.CloudChannel, id: channelId) - } - - if readStates[peerId] == nil { - readStates[peerId] = [:] - } - readStates[peerId]![Namespaces.Message.Cloud] = .idBased(maxIncomingReadId: apiReadInboxMaxId, maxOutgoingReadId: apiReadOutboxMaxId, maxKnownId: apiTopMessage, count: apiUnreadCount, markedUnread: apiMarkedUnread) - - if apiTopMessage != 0 { - mentionTagSummaries[peerId] = MessageHistoryTagNamespaceSummary(version: 1, count: apiUnreadMentionsCount, range: MessageHistoryTagNamespaceCountValidityRange(maxId: apiTopMessage)) - topMesageIds[peerId] = MessageId(peerId: peerId, namespace: Namespaces.Message.Cloud, id: apiTopMessage) - } - - if let apiChannelPts = apiChannelPts { - chatStates[peerId] = ChannelState(pts: apiChannelPts, invalidatedPts: apiChannelPts) - } else if peerId.namespace == Namespaces.Peer.CloudGroup || peerId.namespace == Namespaces.Peer.CloudUser { - switch state { - case let .state(pts, _, _, _, _): - chatStates[peerId] = RegularChatState(invalidatedPts: pts) + let rangeSignal: Signal = fetchChatList(postbox: postbox, network: network, location: .general, upperBound: upperBound, hash: localRange.apiHash, limit: localRange.count) + |> map { remote -> ResolvedChatListResetRange? in + if let remote = remote { + return ResolvedChatListResetRange(head: head, local: localRange, remote: remote) + } else { + return nil } } - notificationSettings[peerId] = TelegramPeerNotificationSettings(apiSettings: apiNotificationSettings) + signal = signal + |> then(rangeSignal) } + let collectedResolvedRanges: Signal<[ResolvedChatListResetRange], NoError> = signal + |> map { next -> [ResolvedChatListResetRange] in + if let next = next { + return [next] + } else { + return [] + } + } + |> reduceLeft(value: [], f: { list, next in + var list = list + list.append(contentsOf: next) + return list + }) - for message in dialogsMessages { - if let storeMessage = StoreMessage(apiMessage: message) { - storeMessages.append(storeMessage) + return combineLatest(collectedResolvedRanges, state) + |> mapToSignal { collectedRanges, state -> Signal in + return postbox.transaction { transaction -> Void in + for range in collectedRanges { + let previousPeerIds = transaction.resetChatList(keepPeerNamespaces: [Namespaces.Peer.SecretChat], upperBound: range.local.upperBound ?? ChatListIndex.absoluteUpperBound, lowerBound: range.local.lowerBound) + + updatePeers(transaction: transaction, peers: range.remote.peers, update: { _, updated -> Peer in + return updated + }) + updatePeerPresences(transaction: transaction, accountPeerId: accountPeerId, peerPresences: range.remote.peerPresences) + + transaction.updateCurrentPeerNotificationSettings(range.remote.notificationSettings) + + var allPeersWithMessages = Set() + for message in range.remote.storeMessages { + allPeersWithMessages.insert(message.id.peerId) + } + + for (_, messageId) in range.remote.topMessageIds { + if messageId.id > 1 { + var skipHole = false + if let localTopId = transaction.getTopPeerMessageIndex(peerId: messageId.peerId, namespace: messageId.namespace)?.id { + if localTopId >= messageId { + skipHole = true + } + } + if !skipHole { + transaction.addHole(MessageId(peerId: messageId.peerId, namespace: messageId.namespace, id: messageId.id - 1)) + } + } + } + + let _ = transaction.addMessages(range.remote.storeMessages, location: .UpperHistoryBlock) + + transaction.resetIncomingReadStates(range.remote.readStates) + + for (peerId, chatState) in range.remote.chatStates { + if let chatState = chatState as? ChannelState { + if let current = transaction.getPeerChatState(peerId) as? ChannelState { + transaction.setPeerChatState(peerId, state: current.withUpdatedPts(chatState.pts)) + } else { + transaction.setPeerChatState(peerId, state: chatState) + } + } else { + transaction.setPeerChatState(peerId, state: chatState) + } + } + + for (peerId, summary) in range.remote.mentionTagSummaries { + transaction.replaceMessageTagSummary(peerId: peerId, tagMask: .unseenPersonalMessage, namespace: Namespaces.Message.Cloud, count: summary.count, maxId: summary.range.maxId) + } + + let namespacesWithHoles: [PeerId.Namespace: [MessageId.Namespace]] = [ + Namespaces.Peer.CloudUser: [Namespaces.Message.Cloud], + Namespaces.Peer.CloudGroup: [Namespaces.Message.Cloud], + Namespaces.Peer.CloudChannel: [Namespaces.Message.Cloud] + ] + for peerId in previousPeerIds { + if !allPeersWithMessages.contains(peerId), let namespaces = namespacesWithHoles[peerId.namespace] { + for namespace in namespaces { + transaction.addHole(MessageId(peerId: peerId, namespace: namespace, id: Int32.max - 1)) + } + } + } + + if range.head { + transaction.setPinnedItemIds(range.remote.pinnedItemIds ?? []) + } + } + + if let currentState = transaction.getState() as? AuthorizedAccountState, let embeddedState = currentState.state { + switch state { + case let .state(pts, _, _, seq, _): + transaction.setState(currentState.changedState(AuthorizedAccountState.State(pts: pts, qts: embeddedState.qts, date: embeddedState.date, seq: seq))) + } + } } } - if holeExists { - for dialog in dialogsDialogs { - switch dialog { - case let .dialog(flags, peer, topMessage, _, _, _, _, _, _, _): - let isPinned = (flags & (1 << 2)) != 0 - - if !isPinned { - var timestamp: Int32? - for message in storeMessages { - if case let .Id(id) = message.id, id.id == topMessage { - timestamp = message.timestamp - } - } - - if let timestamp = timestamp { - let index = MessageIndex(id: MessageId(peerId: peer.peerId, namespace: Namespaces.Message.Cloud, id: topMessage - 1), timestamp: timestamp) - if (replacementHole == nil || replacementHole!.index > index) { - replacementHole = ChatListHole(index: index) - } - } + return combineLatest(network.request(Api.functions.messages.getDialogs(flags: 0, offsetDate: 0, offsetId: 0, offsetPeer: .inputPeerEmpty, limit: 100, hash: 0)) + |> retryRequest, pinnedChats, state) + |> mapToSignal { result, pinnedChats, state -> Signal in + var dialogsDialogs: [Api.Dialog] = [] + var dialogsMessages: [Api.Message] = [] + var dialogsChats: [Api.Chat] = [] + var dialogsUsers: [Api.User] = [] + + var holeExists = false + + switch result { + case let .dialogs(dialogs, messages, chats, users): + dialogsDialogs = dialogs + dialogsMessages = messages + dialogsChats = chats + dialogsUsers = users + case let .dialogsSlice(_, dialogs, messages, chats, users): + dialogsDialogs = dialogs + dialogsMessages = messages + dialogsChats = chats + dialogsUsers = users + holeExists = true + case .dialogsNotModified: + dialogsDialogs = [] + dialogsMessages = [] + dialogsChats = [] + dialogsUsers = [] + } + + let replacePinnedItemIds: [PinnedItemId] + switch pinnedChats { + case let .peerDialogs(apiDialogs, apiMessages, apiChats, apiUsers, _): + dialogsDialogs.append(contentsOf: apiDialogs) + dialogsMessages.append(contentsOf: apiMessages) + dialogsChats.append(contentsOf: apiChats) + dialogsUsers.append(contentsOf: apiUsers) + + var itemIds: [PinnedItemId] = [] + + loop: for dialog in apiDialogs { + switch dialog { + case let .dialog(_, peer, _, _, _, _, _, _, _, _): + itemIds.append(.peer(peer.peerId)) + /*feed*/ + /*case let .dialogFeed(_, _, _, feedId, _, _, _, _): + itemIds.append(.group(PeerGroupId(rawValue: feedId))) + continue loop*/ } + } + + replacePinnedItemIds = itemIds + } + + var replacementHole: ChatListHole? + var storeMessages: [StoreMessage] = [] + var readStates: [PeerId: [MessageId.Namespace: PeerReadState]] = [:] + var mentionTagSummaries: [PeerId: MessageHistoryTagNamespaceSummary] = [:] + var chatStates: [PeerId: PeerChatState] = [:] + var notificationSettings: [PeerId: PeerNotificationSettings] = [:] + + var topMesageIds: [PeerId: MessageId] = [:] + + loop: for dialog in dialogsDialogs { + let apiPeer: Api.Peer + let apiReadInboxMaxId: Int32 + let apiReadOutboxMaxId: Int32 + let apiTopMessage: Int32 + let apiUnreadCount: Int32 + let apiMarkedUnread: Bool + let apiUnreadMentionsCount: Int32 + var apiChannelPts: Int32? + let apiNotificationSettings: Api.PeerNotifySettings + switch dialog { + case let .dialog(flags, peer, topMessage, readInboxMaxId, readOutboxMaxId, unreadCount, unreadMentionsCount, peerNotificationSettings, pts, _): + apiPeer = peer + apiTopMessage = topMessage + apiReadInboxMaxId = readInboxMaxId + apiReadOutboxMaxId = readOutboxMaxId + apiUnreadCount = unreadCount + apiMarkedUnread = (flags & (1 << 3)) != 0 + apiUnreadMentionsCount = unreadMentionsCount + apiNotificationSettings = peerNotificationSettings + apiChannelPts = pts /*feed*/ /*case .dialogFeed: //assertionFailure() - break*/ + continue loop*/ + } + + let peerId: PeerId + switch apiPeer { + case let .peerUser(userId): + peerId = PeerId(namespace: Namespaces.Peer.CloudUser, id: userId) + case let .peerChat(chatId): + peerId = PeerId(namespace: Namespaces.Peer.CloudGroup, id: chatId) + case let .peerChannel(channelId): + peerId = PeerId(namespace: Namespaces.Peer.CloudChannel, id: channelId) + } + + if readStates[peerId] == nil { + readStates[peerId] = [:] + } + readStates[peerId]![Namespaces.Message.Cloud] = .idBased(maxIncomingReadId: apiReadInboxMaxId, maxOutgoingReadId: apiReadOutboxMaxId, maxKnownId: apiTopMessage, count: apiUnreadCount, markedUnread: apiMarkedUnread) + + if apiTopMessage != 0 { + mentionTagSummaries[peerId] = MessageHistoryTagNamespaceSummary(version: 1, count: apiUnreadMentionsCount, range: MessageHistoryTagNamespaceCountValidityRange(maxId: apiTopMessage)) + topMesageIds[peerId] = MessageId(peerId: peerId, namespace: Namespaces.Message.Cloud, id: apiTopMessage) + } + + if let apiChannelPts = apiChannelPts { + chatStates[peerId] = ChannelState(pts: apiChannelPts, invalidatedPts: apiChannelPts) + } else if peerId.namespace == Namespaces.Peer.CloudGroup || peerId.namespace == Namespaces.Peer.CloudUser { + switch state { + case let .state(pts, _, _, _, _): + chatStates[peerId] = RegularChatState(invalidatedPts: pts) + } + } + + notificationSettings[peerId] = TelegramPeerNotificationSettings(apiSettings: apiNotificationSettings) + } + + for message in dialogsMessages { + if let storeMessage = StoreMessage(apiMessage: message) { + storeMessages.append(storeMessage) } } - } - - var peers: [Peer] = [] - var peerPresences: [PeerId: PeerPresence] = [:] - for chat in dialogsChats { - if let groupOrChannel = parseTelegramGroupOrChannel(chat: chat) { - peers.append(groupOrChannel) - } - } - for user in dialogsUsers { - let telegramUser = TelegramUser(user: user) - peers.append(telegramUser) - if let presence = TelegramUserPresence(apiUser: user) { - peerPresences[telegramUser.id] = presence - } - } - - return withResolvedAssociatedMessages(postbox: postbox, source: .network(network), storeMessages: storeMessages, { transaction, additionalPeers, additionalMessages in - let previousPeerIds = transaction.resetChatList(keepPeerNamespaces: Set([Namespaces.Peer.SecretChat]), replacementHole: replacementHole) - updatePeers(transaction: transaction, peers: peers, update: { _, updated -> Peer in - return updated - }) - updatePeerPresences(transaction: transaction, accountPeerId: accountPeerId, peerPresences: peerPresences) - - transaction.updateCurrentPeerNotificationSettings(notificationSettings) - - var allPeersWithMessages = Set() - for message in storeMessages { - allPeersWithMessages.insert(message.id.peerId) + if holeExists { + for dialog in dialogsDialogs { + switch dialog { + case let .dialog(flags, peer, topMessage, _, _, _, _, _, _, _): + let isPinned = (flags & (1 << 2)) != 0 + + if !isPinned { + var timestamp: Int32? + for message in storeMessages { + if case let .Id(id) = message.id, id.id == topMessage { + timestamp = message.timestamp + } + } + + if let timestamp = timestamp { + let index = MessageIndex(id: MessageId(peerId: peer.peerId, namespace: Namespaces.Message.Cloud, id: topMessage - 1), timestamp: timestamp) + if (replacementHole == nil || replacementHole!.index > index) { + replacementHole = ChatListHole(index: index) + } + } + } + /*feed*/ + /*case .dialogFeed: + //assertionFailure() + break*/ + } + } } - for (_, messageId) in topMesageIds { - if messageId.id > 1 { - var skipHole = false - if let localTopId = transaction.getTopPeerMessageIndex(peerId: messageId.peerId, namespace: messageId.namespace)?.id { - if localTopId >= messageId { - skipHole = true + var peers: [Peer] = [] + var peerPresences: [PeerId: PeerPresence] = [:] + for chat in dialogsChats { + if let groupOrChannel = parseTelegramGroupOrChannel(chat: chat) { + peers.append(groupOrChannel) + } + } + for user in dialogsUsers { + let telegramUser = TelegramUser(user: user) + peers.append(telegramUser) + if let presence = TelegramUserPresence(apiUser: user) { + peerPresences[telegramUser.id] = presence + } + } + + return withResolvedAssociatedMessages(postbox: postbox, source: .network(network), storeMessages: storeMessages, { transaction, additionalPeers, additionalMessages in + let previousPeerIds = transaction.resetChatList(keepPeerNamespaces: Set([Namespaces.Peer.SecretChat]), replacementHole: replacementHole) + + updatePeers(transaction: transaction, peers: peers, update: { _, updated -> Peer in + return updated + }) + updatePeerPresences(transaction: transaction, accountPeerId: accountPeerId, peerPresences: peerPresences) + + transaction.updateCurrentPeerNotificationSettings(notificationSettings) + + var allPeersWithMessages = Set() + for message in storeMessages { + allPeersWithMessages.insert(message.id.peerId) + } + + for (_, messageId) in topMesageIds { + if messageId.id > 1 { + var skipHole = false + if let localTopId = transaction.getTopPeerMessageIndex(peerId: messageId.peerId, namespace: messageId.namespace)?.id { + if localTopId >= messageId { + skipHole = true + } + } + if !skipHole { + transaction.addHole(MessageId(peerId: messageId.peerId, namespace: messageId.namespace, id: messageId.id - 1)) } } - if !skipHole { - transaction.addHole(MessageId(peerId: messageId.peerId, namespace: messageId.namespace, id: messageId.id - 1)) - } } - } - - let _ = transaction.addMessages(storeMessages, location: .UpperHistoryBlock) - - transaction.resetIncomingReadStates(readStates) - - for (peerId, chatState) in chatStates { - if let chatState = chatState as? ChannelState { - if let current = transaction.getPeerChatState(peerId) as? ChannelState { - transaction.setPeerChatState(peerId, state: current.withUpdatedPts(chatState.pts)) + + let _ = transaction.addMessages(storeMessages, location: .UpperHistoryBlock) + + transaction.resetIncomingReadStates(readStates) + + for (peerId, chatState) in chatStates { + if let chatState = chatState as? ChannelState { + if let current = transaction.getPeerChatState(peerId) as? ChannelState { + transaction.setPeerChatState(peerId, state: current.withUpdatedPts(chatState.pts)) + } else { + transaction.setPeerChatState(peerId, state: chatState) + } } else { transaction.setPeerChatState(peerId, state: chatState) } - } else { - transaction.setPeerChatState(peerId, state: chatState) } - } - - transaction.setPinnedItemIds(replacePinnedItemIds) - - for (peerId, summary) in mentionTagSummaries { - transaction.replaceMessageTagSummary(peerId: peerId, tagMask: .unseenPersonalMessage, namespace: Namespaces.Message.Cloud, count: summary.count, maxId: summary.range.maxId) - } - - let namespacesWithHoles: [PeerId.Namespace: [MessageId.Namespace]] = [ - Namespaces.Peer.CloudUser: [Namespaces.Message.Cloud], - Namespaces.Peer.CloudGroup: [Namespaces.Message.Cloud], - Namespaces.Peer.CloudChannel: [Namespaces.Message.Cloud] - ] - for peerId in previousPeerIds { - if !allPeersWithMessages.contains(peerId), let namespaces = namespacesWithHoles[peerId.namespace] { - for namespace in namespaces { - transaction.addHole(MessageId(peerId: peerId, namespace: namespace, id: Int32.max - 1)) + + transaction.setPinnedItemIds(replacePinnedItemIds) + + for (peerId, summary) in mentionTagSummaries { + transaction.replaceMessageTagSummary(peerId: peerId, tagMask: .unseenPersonalMessage, namespace: Namespaces.Message.Cloud, count: summary.count, maxId: summary.range.maxId) + } + + let namespacesWithHoles: [PeerId.Namespace: [MessageId.Namespace]] = [ + Namespaces.Peer.CloudUser: [Namespaces.Message.Cloud], + Namespaces.Peer.CloudGroup: [Namespaces.Message.Cloud], + Namespaces.Peer.CloudChannel: [Namespaces.Message.Cloud] + ] + for peerId in previousPeerIds { + if !allPeersWithMessages.contains(peerId), let namespaces = namespacesWithHoles[peerId.namespace] { + for namespace in namespaces { + transaction.addHole(MessageId(peerId: peerId, namespace: namespace, id: Int32.max - 1)) + } } } - } - - if let currentState = transaction.getState() as? AuthorizedAccountState, let embeddedState = currentState.state { - switch state { - case let .state(pts, _, _, seq, _): - transaction.setState(currentState.changedState(AuthorizedAccountState.State(pts: pts, qts: embeddedState.qts, date: embeddedState.date, seq: seq))) + + if let currentState = transaction.getState() as? AuthorizedAccountState, let embeddedState = currentState.state { + switch state { + case let .state(pts, _, _, seq, _): + transaction.setState(currentState.changedState(AuthorizedAccountState.State(pts: pts, qts: embeddedState.qts, date: embeddedState.date, seq: seq))) + } } - } - }) + }) + } } } diff --git a/TelegramCore/ChannelBlacklist.swift b/TelegramCore/ChannelBlacklist.swift index 763571ecb0..dc8d0e28ee 100644 --- a/TelegramCore/ChannelBlacklist.swift +++ b/TelegramCore/ChannelBlacklist.swift @@ -132,10 +132,10 @@ public func channelBlacklistParticipants(account: Account, peerId: PeerId) -> Si } } -public func updateChannelMemberBannedRights(account: Account, peerId: PeerId, memberId: PeerId, rights: TelegramChannelBannedRights?) -> Signal<(ChannelParticipant?, RenderedChannelParticipant), NoError> { +public func updateChannelMemberBannedRights(account: Account, peerId: PeerId, memberId: PeerId, rights: TelegramChannelBannedRights?) -> Signal<(ChannelParticipant?, RenderedChannelParticipant?), NoError> { return fetchChannelParticipant(account: account, peerId: peerId, participantId: memberId) - |> mapToSignal { currentParticipant -> Signal<(ChannelParticipant?, RenderedChannelParticipant), NoError> in - return account.postbox.transaction { transaction -> Signal<(ChannelParticipant?, RenderedChannelParticipant), NoError> in + |> mapToSignal { currentParticipant -> Signal<(ChannelParticipant?, RenderedChannelParticipant?), NoError> in + return account.postbox.transaction { transaction -> Signal<(ChannelParticipant?, RenderedChannelParticipant?), NoError> in if let peer = transaction.getPeer(peerId), let inputChannel = apiInputChannel(peer), let _ = transaction.getPeer(account.peerId), let memberPeer = transaction.getPeer(memberId), let inputUser = apiInputUser(memberPeer) { let updatedParticipant: ChannelParticipant if let currentParticipant = currentParticipant, case let .member(_, invitedAt, _, currentBanInfo) = currentParticipant { @@ -159,100 +159,102 @@ public func updateChannelMemberBannedRights(account: Account, peerId: PeerId, me let effectiveRights: TelegramChannelBannedRights = rights ?? TelegramChannelBannedRights(flags: [], untilDate: 0) return account.network.request(Api.functions.channels.editBanned(channel: inputChannel, userId: inputUser, bannedRights: effectiveRights.apiBannedRights)) - |> retryRequest - |> mapToSignal { result -> Signal<(ChannelParticipant?, RenderedChannelParticipant), NoError> in - account.stateManager.addUpdates(result) - - return account.postbox.transaction { transaction -> (ChannelParticipant?, RenderedChannelParticipant) in - transaction.updatePeerCachedData(peerIds: Set([peerId]), update: { _, cachedData -> CachedPeerData? in - if let cachedData = cachedData as? CachedChannelData { - var updatedData = cachedData - var wasKicked = false - var wasBanned = false - var wasMember = false - var wasAdmin = false - if let currentParticipant = currentParticipant { - switch currentParticipant { - case .creator: - break - case let .member(_, _, adminInfo, banInfo): - if let adminInfo = adminInfo { - wasAdmin = true - } - if let banInfo = banInfo { - if banInfo.rights.flags.contains(.banReadMessages) { - wasKicked = true - } else if !banInfo.rights.flags.isEmpty { - wasBanned = true - } - } - wasMember = true - } + |> retryRequest + |> mapToSignal { result -> Signal<(ChannelParticipant?, RenderedChannelParticipant?), NoError> in + account.stateManager.addUpdates(result) + + var wasKicked = false + var wasBanned = false + var wasMember = false + var wasAdmin = false + if let currentParticipant = currentParticipant { + switch currentParticipant { + case .creator: + break + case let .member(_, _, adminInfo, banInfo): + if let _ = adminInfo { + wasAdmin = true + } + if let banInfo = banInfo { + if banInfo.rights.flags.contains(.banReadMessages) { + wasKicked = true + } else if !banInfo.rights.flags.isEmpty { + wasBanned = true + wasMember = true } - - - var isKicked = false - var isBanned = false - if effectiveRights.flags.contains(.banReadMessages) { - isKicked = true - } else if !effectiveRights.flags.isEmpty { - isBanned = true - } - - let isMember = !wasKicked && !effectiveRights.flags.contains(.banReadMessages) - - if isKicked != wasKicked { - if let kickedCount = updatedData.participantsSummary.kickedCount { - updatedData = updatedData.withUpdatedParticipantsSummary(updatedData.participantsSummary.withUpdatedKickedCount(max(0, kickedCount + (isKicked ? 1 : -1)))) - } - } - - if isBanned != wasBanned { - if let bannedCount = updatedData.participantsSummary.bannedCount { - updatedData = updatedData.withUpdatedParticipantsSummary(updatedData.participantsSummary.withUpdatedBannedCount(max(0, bannedCount + (isBanned ? 1 : -1)))) - } - } - - if wasAdmin { - if let adminCount = updatedData.participantsSummary.adminCount { - updatedData = updatedData.withUpdatedParticipantsSummary(updatedData.participantsSummary.withUpdatedAdminCount(max(0, adminCount - 1))) - } - } - - if isMember != wasMember { - if let memberCount = updatedData.participantsSummary.memberCount { - updatedData = updatedData.withUpdatedParticipantsSummary(updatedData.participantsSummary.withUpdatedMemberCount(max(0, memberCount + (isMember ? 1 : -1)))) - } - - if !isMember, let topParticipants = updatedData.topParticipants { - var updatedParticipants = topParticipants.participants - if let index = updatedParticipants.index(where: { $0.peerId == memberId }) { - updatedParticipants.remove(at: index) - - updatedData = updatedData.withUpdatedTopParticipants(CachedChannelParticipants(participants: updatedParticipants)) - } - } - } - - return updatedData } else { - return cachedData + wasMember = true } - }) - var peers: [PeerId: Peer] = [:] - var presences: [PeerId: PeerPresence] = [:] - peers[memberPeer.id] = memberPeer - if let presence = transaction.getPeerPresence(peerId: memberPeer.id) { - presences[memberPeer.id] = presence - } - if case let .member(_, _, _, maybeBanInfo) = updatedParticipant, let banInfo = maybeBanInfo { - if let peer = transaction.getPeer(banInfo.restrictedBy) { - peers[peer.id] = peer - } - } - return (currentParticipant, RenderedChannelParticipant(participant: updatedParticipant, peer: memberPeer, peers: peers, presences: presences)) } } + + var isKicked = false + var isBanned = false + if effectiveRights.flags.contains(.banReadMessages) { + isKicked = true + } else if !effectiveRights.flags.isEmpty { + isBanned = true + } + + let isMember = !wasKicked && !effectiveRights.flags.contains(.banReadMessages) + + return account.postbox.transaction { transaction -> (ChannelParticipant?, RenderedChannelParticipant?) in + transaction.updatePeerCachedData(peerIds: Set([peerId]), update: { _, cachedData -> CachedPeerData? in + if let cachedData = cachedData as? CachedChannelData { + var updatedData = cachedData + if isKicked != wasKicked { + if let kickedCount = updatedData.participantsSummary.kickedCount { + updatedData = updatedData.withUpdatedParticipantsSummary(updatedData.participantsSummary.withUpdatedKickedCount(max(0, kickedCount + (isKicked ? 1 : -1)))) + } + } + + if isBanned != wasBanned { + if let bannedCount = updatedData.participantsSummary.bannedCount { + updatedData = updatedData.withUpdatedParticipantsSummary(updatedData.participantsSummary.withUpdatedBannedCount(max(0, bannedCount + (isBanned ? 1 : -1)))) + } + } + + if wasAdmin { + if let adminCount = updatedData.participantsSummary.adminCount { + updatedData = updatedData.withUpdatedParticipantsSummary(updatedData.participantsSummary.withUpdatedAdminCount(max(0, adminCount - 1))) + } + } + + if isMember != wasMember { + if let memberCount = updatedData.participantsSummary.memberCount { + updatedData = updatedData.withUpdatedParticipantsSummary(updatedData.participantsSummary.withUpdatedMemberCount(max(0, memberCount + (isMember ? 1 : -1)))) + } + + if !isMember, let topParticipants = updatedData.topParticipants { + var updatedParticipants = topParticipants.participants + if let index = updatedParticipants.index(where: { $0.peerId == memberId }) { + updatedParticipants.remove(at: index) + + updatedData = updatedData.withUpdatedTopParticipants(CachedChannelParticipants(participants: updatedParticipants)) + } + } + } + + return updatedData + } else { + return cachedData + } + }) + var peers: [PeerId: Peer] = [:] + var presences: [PeerId: PeerPresence] = [:] + peers[memberPeer.id] = memberPeer + if let presence = transaction.getPeerPresence(peerId: memberPeer.id) { + presences[memberPeer.id] = presence + } + if case let .member(_, _, _, maybeBanInfo) = updatedParticipant, let banInfo = maybeBanInfo { + if let peer = transaction.getPeer(banInfo.restrictedBy) { + peers[peer.id] = peer + } + } + + return (currentParticipant, isMember ? RenderedChannelParticipant(participant: updatedParticipant, peer: memberPeer, peers: peers, presences: presences) : nil) + } + } } else { return .complete() } diff --git a/TelegramCore/FetchChatList.swift b/TelegramCore/FetchChatList.swift index 885c75063d..c95e55988d 100644 --- a/TelegramCore/FetchChatList.swift +++ b/TelegramCore/FetchChatList.swift @@ -23,7 +23,7 @@ struct ParsedDialogs { let readStates: [PeerId: [MessageId.Namespace: PeerReadState]] let mentionTagSummaries: [PeerId: MessageHistoryTagNamespaceSummary] let chatStates: [PeerId: PeerChatState] - + let topMessageIds: [PeerId: MessageId] let storeMessages: [StoreMessage] let lowerNonPinnedIndex: MessageIndex? @@ -54,6 +54,7 @@ private func parseDialogs(apiDialogs: [Api.Dialog], apiMessages: [Api.Message], var readStates: [PeerId: [MessageId.Namespace: PeerReadState]] = [:] var mentionTagSummaries: [PeerId: MessageHistoryTagNamespaceSummary] = [:] var chatStates: [PeerId: PeerChatState] = [:] + var topMessageIds: [PeerId: MessageId] = [:] var storeMessages: [StoreMessage] = [] var nonPinnedDialogsTopMessageIds = Set() @@ -104,6 +105,7 @@ private func parseDialogs(apiDialogs: [Api.Dialog], apiMessages: [Api.Message], if apiTopMessage != 0 { mentionTagSummaries[peerId] = MessageHistoryTagNamespaceSummary(version: 1, count: apiUnreadMentionsCount, range: MessageHistoryTagNamespaceCountValidityRange(maxId: apiTopMessage)) + topMessageIds[peerId] = MessageId(peerId: peerId, namespace: Namespaces.Message.Cloud, id: apiTopMessage) } if let apiChannelPts = apiChannelPts { @@ -165,7 +167,7 @@ private func parseDialogs(apiDialogs: [Api.Dialog], apiMessages: [Api.Message], readStates: readStates, mentionTagSummaries: mentionTagSummaries, chatStates: chatStates, - + topMessageIds: topMessageIds, storeMessages: storeMessages, lowerNonPinnedIndex: lowerNonPinnedIndex, @@ -181,6 +183,7 @@ struct FetchedChatList { let mentionTagSummaries: [PeerId: MessageHistoryTagNamespaceSummary] let chatStates: [PeerId: PeerChatState] let storeMessages: [StoreMessage] + let topMessageIds: [PeerId: MessageId] let lowerNonPinnedIndex: MessageIndex? @@ -188,7 +191,7 @@ struct FetchedChatList { let feeds: [(PeerGroupId, MessageIndex?)] } -func fetchChatList(postbox: Postbox, network: Network, location: FetchChatListLocation, upperBound: MessageIndex) -> Signal { +func fetchChatList(postbox: Postbox, network: Network, location: FetchChatListLocation, upperBound: MessageIndex, hash: Int32, limit: Int32) -> Signal { return postbox.stateView() |> mapToSignal { view -> Signal in if let state = view.state as? AuthorizedAccountState { @@ -198,25 +201,25 @@ func fetchChatList(postbox: Postbox, network: Network, location: FetchChatListLo } } |> take(1) - |> mapToSignal { _ -> Signal in + |> mapToSignal { _ -> Signal in let offset: Signal<(Int32, Int32, Api.InputPeer), NoError> if upperBound.id.peerId.namespace == Namespaces.Peer.Empty { offset = single((0, 0, Api.InputPeer.inputPeerEmpty), NoError.self) } else { offset = postbox.loadedPeerWithId(upperBound.id.peerId) - |> take(1) - |> map { peer in - return (upperBound.timestamp, upperBound.id.id + 1, apiInputPeer(peer) ?? .inputPeerEmpty) + |> take(1) + |> map { peer in + return (upperBound.timestamp, upperBound.id.id + 1, apiInputPeer(peer) ?? .inputPeerEmpty) } } return offset - |> mapToSignal { (timestamp, id, peer) -> Signal in + |> mapToSignal { (timestamp, id, peer) -> Signal in let additionalPinnedChats: Signal if case .general = location, case .inputPeerEmpty = peer, timestamp == 0 { additionalPinnedChats = network.request(Api.functions.messages.getPinnedDialogs()) - |> retryRequest - |> map(Optional.init) + |> retryRequest + |> map(Optional.init) } else { additionalPinnedChats = .single(nil) } @@ -233,11 +236,14 @@ func fetchChatList(postbox: Postbox, network: Network, location: FetchChatListLo flags |= 1 << 1*/ break } - let requestChats = network.request(Api.functions.messages.getDialogs(flags: flags/*feed*//*, feedId: requestFeedId*/, offsetDate: timestamp, offsetId: id, offsetPeer: peer, limit: 100, hash: 0)) - |> retryRequest + let requestChats = network.request(Api.functions.messages.getDialogs(flags: flags/*feed*//*, feedId: requestFeedId*/, offsetDate: timestamp, offsetId: id, offsetPeer: peer, limit: limit, hash: hash)) + |> retryRequest return combineLatest(requestChats, additionalPinnedChats) - |> mapToSignal { remoteChats, pinnedChats -> Signal in + |> mapToSignal { remoteChats, pinnedChats -> Signal in + if case .dialogsNotModified = remoteChats { + return .single(nil) + } let extractedRemoteDialogs = extractDialogsData(dialogs: remoteChats) let parsedRemoteChats = parseDialogs(apiDialogs: extractedRemoteDialogs.apiDialogs, apiMessages: extractedRemoteDialogs.apiMessages, apiChats: extractedRemoteDialogs.apiChats, apiUsers: extractedRemoteDialogs.apiUsers, apiIsAtLowestBoundary: extractedRemoteDialogs.apiIsAtLowestBoundary) var parsedPinnedChats: ParsedDialogs? @@ -269,7 +275,7 @@ func fetchChatList(postbox: Postbox, network: Network, location: FetchChatListLo } return combineLatest(feedSignals) - |> map { feeds -> FetchedChatList in + |> map { feeds -> FetchedChatList? in var peers: [Peer] = [] var peerPresences: [PeerId: PeerPresence] = [:] var notificationSettings: [PeerId: PeerNotificationSettings] = [:] @@ -277,6 +283,7 @@ func fetchChatList(postbox: Postbox, network: Network, location: FetchChatListLo var mentionTagSummaries: [PeerId: MessageHistoryTagNamespaceSummary] = [:] var chatStates: [PeerId: PeerChatState] = [:] var storeMessages: [StoreMessage] = [] + var topMessageIds: [PeerId: MessageId] = [:] peers.append(contentsOf: parsedRemoteChats.peers) peerPresences.merge(parsedRemoteChats.peerPresences, uniquingKeysWith: { _, updated in updated }) @@ -285,6 +292,7 @@ func fetchChatList(postbox: Postbox, network: Network, location: FetchChatListLo mentionTagSummaries.merge(parsedRemoteChats.mentionTagSummaries, uniquingKeysWith: { _, updated in updated }) chatStates.merge(parsedRemoteChats.chatStates, uniquingKeysWith: { _, updated in updated }) storeMessages.append(contentsOf: parsedRemoteChats.storeMessages) + topMessageIds.merge(parsedRemoteChats.topMessageIds, uniquingKeysWith: { _, updated in updated }) if let parsedPinnedChats = parsedPinnedChats { peers.append(contentsOf: parsedPinnedChats.peers) @@ -294,6 +302,7 @@ func fetchChatList(postbox: Postbox, network: Network, location: FetchChatListLo mentionTagSummaries.merge(parsedPinnedChats.mentionTagSummaries, uniquingKeysWith: { _, updated in updated }) chatStates.merge(parsedPinnedChats.chatStates, uniquingKeysWith: { _, updated in updated }) storeMessages.append(contentsOf: parsedPinnedChats.storeMessages) + topMessageIds.merge(parsedPinnedChats.topMessageIds, uniquingKeysWith: { _, updated in updated }) } for (_, feedChats) in feeds { @@ -314,6 +323,7 @@ func fetchChatList(postbox: Postbox, network: Network, location: FetchChatListLo mentionTagSummaries: mentionTagSummaries, chatStates: chatStates, storeMessages: storeMessages, + topMessageIds: topMessageIds, lowerNonPinnedIndex: parsedRemoteChats.lowerNonPinnedIndex, diff --git a/TelegramCore/Holes.swift b/TelegramCore/Holes.swift index 342d390962..9793ba0923 100644 --- a/TelegramCore/Holes.swift +++ b/TelegramCore/Holes.swift @@ -567,8 +567,13 @@ func fetchChatListHole(postbox: Postbox, network: Network, accountPeerId: PeerId } else { location = .general } - return fetchChatList(postbox: postbox, network: network, location: location, upperBound: hole.index) + return fetchChatList(postbox: postbox, network: network, location: location, upperBound: hole.index, hash: 0, limit: 100) |> mapToSignal { fetchedChats -> Signal in + guard let fetchedChats = fetchedChats else { + return postbox.transaction { transaction -> Void in + transaction.replaceChatListHole(groupId: groupId, index: hole.index, hole: nil) + } + } return withResolvedAssociatedMessages(postbox: postbox, source: .network(network), storeMessages: fetchedChats.storeMessages, { transaction, additionalPeers, additionalMessages in for peer in fetchedChats.peers { updatePeers(transaction: transaction, peers: [peer], update: { _, updated -> Peer in diff --git a/TelegramCore/MultipartUpload.swift b/TelegramCore/MultipartUpload.swift index 5530391974..0d6322de09 100644 --- a/TelegramCore/MultipartUpload.swift +++ b/TelegramCore/MultipartUpload.swift @@ -315,7 +315,7 @@ private final class MultipartUploadManager { case let .data(data): fileData = data } - if let fileData = fileData { + if let fileData = fileData, fileData.count >= partOffset + partSize { let partData = self.state.transform(data: fileData.subdata(in: partOffset ..< (partOffset + partSize))) var currentBigTotalParts = self.bigTotalParts if self.bigParts && resourceData.complete && partOffset + partSize == resourceData.size { diff --git a/TelegramCore/SynchronizePeerReadState.swift b/TelegramCore/SynchronizePeerReadState.swift index e9aa1460b9..7ae3996d1c 100644 --- a/TelegramCore/SynchronizePeerReadState.swift +++ b/TelegramCore/SynchronizePeerReadState.swift @@ -247,101 +247,101 @@ private func pushPeerReadState(network: Network, postbox: Postbox, stateManager: if peerId.namespace == Namespaces.Peer.SecretChat { return inputSecretChat(postbox: postbox, peerId: peerId) - |> mapToSignal { inputPeer -> Signal in - switch readState { - case .idBased: + |> mapToSignal { inputPeer -> Signal in + switch readState { + case .idBased: + return .single(readState) + case let .indexBased(maxIncomingReadIndex, _, _, _): + return network.request(Api.functions.messages.readEncryptedHistory(peer: inputPeer, maxDate: maxIncomingReadIndex.timestamp)) + |> retryRequest + |> mapToSignalPromotingError { _ -> Signal in return .single(readState) - case let .indexBased(maxIncomingReadIndex, _, _, _): - return network.request(Api.functions.messages.readEncryptedHistory(peer: inputPeer, maxDate: maxIncomingReadIndex.timestamp)) - |> retryRequest - |> mapToSignalPromotingError { _ -> Signal in - return .single(readState) - } - } + } } + } } else { return inputPeer(postbox: postbox, peerId: peerId) - |> mapToSignal { inputPeer -> Signal in - switch inputPeer { - case let .inputPeerChannel(channelId, accessHash): - switch readState { - case let .idBased(maxIncomingReadId, _, _, _, markedUnread): - var pushSignal: Signal = network.request(Api.functions.channels.readHistory(channel: Api.InputChannel.inputChannel(channelId: channelId, accessHash: accessHash), maxId: maxIncomingReadId)) + |> mapToSignal { inputPeer -> Signal in + switch inputPeer { + case let .inputPeerChannel(channelId, accessHash): + switch readState { + case let .idBased(maxIncomingReadId, _, _, _, markedUnread): + var pushSignal: Signal = network.request(Api.functions.channels.readHistory(channel: Api.InputChannel.inputChannel(channelId: channelId, accessHash: accessHash), maxId: maxIncomingReadId)) + |> `catch` { _ -> Signal in + return .complete() + } + |> mapToSignal { _ -> Signal in + return .complete() + } + if markedUnread { + pushSignal = pushSignal + |> then(network.request(Api.functions.messages.markDialogUnread(flags: 1 << 0, peer: .inputDialogPeer(peer: inputPeer))) |> `catch` { _ -> Signal in return .complete() } |> mapToSignal { _ -> Signal in return .complete() - } - if markedUnread { - pushSignal = pushSignal - |> then(network.request(Api.functions.messages.markDialogUnread(flags: 1 << 0, peer: .inputDialogPeer(peer: inputPeer))) - |> `catch` { _ -> Signal in - return .complete() + }) + } + return pushSignal + |> mapError { _ -> VerifyReadStateError in return VerifyReadStateError.Retry } + |> mapToSignal { _ -> Signal in + return .complete() + } + |> then(Signal.single(readState)) + case .indexBased: + return .single(readState) + } + + default: + switch readState { + case let .idBased(maxIncomingReadId, _, _, _, markedUnread): + var pushSignal: Signal = network.request(Api.functions.messages.readHistory(peer: inputPeer, maxId: maxIncomingReadId)) + |> map(Optional.init) + |> `catch` { _ -> Signal in + return .single(nil) + } + |> mapToSignal { result -> Signal in + if let result = result { + switch result { + case let .affectedMessages(pts, ptsCount): + stateManager.addUpdateGroups([.updatePts(pts: pts, ptsCount: ptsCount)]) } - |> mapToSignal { _ -> Signal in - return .complete() - }) } - return pushSignal - |> mapError { _ -> VerifyReadStateError in return VerifyReadStateError.Retry } - |> mapToSignal { _ -> Signal in + return .complete() + } + + if markedUnread { + pushSignal = pushSignal + |> then(network.request(Api.functions.messages.markDialogUnread(flags: 1 << 0, peer: .inputDialogPeer(peer: inputPeer))) + |> `catch` { _ -> Signal in return .complete() } - |> then(Signal.single(readState)) - case .indexBased: - return .single(readState) - } - - default: - switch readState { - case let .idBased(maxIncomingReadId, _, _, _, markedUnread): - var pushSignal: Signal = network.request(Api.functions.messages.readHistory(peer: inputPeer, maxId: maxIncomingReadId)) - |> map(Optional.init) - |> `catch` { _ -> Signal in - return .single(nil) - } - |> mapToSignal { result -> Signal in - if let result = result { - switch result { - case let .affectedMessages(pts, ptsCount): - stateManager.addUpdateGroups([.updatePts(pts: pts, ptsCount: ptsCount)]) - } - } + |> mapToSignal { _ -> Signal in return .complete() - } - - if markedUnread { - pushSignal = pushSignal - |> then(network.request(Api.functions.messages.markDialogUnread(flags: 1 << 0, peer: .inputDialogPeer(peer: inputPeer))) - |> `catch` { _ -> Signal in - return .complete() - } - |> mapToSignal { _ -> Signal in - return .complete() - }) - } - - return pushSignal - |> mapError { _ -> VerifyReadStateError in return VerifyReadStateError.Retry } - |> mapToSignal { _ -> Signal in - return .complete() - } - |> then(Signal.single(readState)) - case .indexBased: - return .single(readState) - } - } + }) + } + + return pushSignal + |> mapError { _ -> VerifyReadStateError in return VerifyReadStateError.Retry } + |> mapToSignal { _ -> Signal in + return .complete() + } + |> then(Signal.single(readState)) + case .indexBased: + return .single(readState) + } } + } } } private func pushPeerReadState(network: Network, postbox: Postbox, stateManager: AccountStateManager, peerId: PeerId) -> Signal { - let currentReadState = postbox.transaction { transaction -> PeerReadState? in + let currentReadState = postbox.transaction { transaction -> (MessageId.Namespace, PeerReadState)? in if let readStates = transaction.getPeerReadStates(peerId) { for (namespace, readState) in readStates { if namespace == Namespaces.Message.Cloud || namespace == Namespaces.Message.SecretIncoming { - return readState + return (namespace, readState) } } } @@ -349,20 +349,23 @@ private func pushPeerReadState(network: Network, postbox: Postbox, stateManager: } let pushedState = currentReadState - |> mapToSignalPromotingError { readState -> Signal in - if let readState = readState { + |> mapToSignalPromotingError { namespaceAndReadState -> Signal<(MessageId.Namespace, PeerReadState), VerifyReadStateError> in + if let (namespace, readState) = namespaceAndReadState { return pushPeerReadState(network: network, postbox: postbox, stateManager: stateManager, peerId: peerId, readState: readState) + |> map { updatedReadState -> (MessageId.Namespace, PeerReadState) in + return (namespace, updatedReadState) + } } else { return .complete() } } let verifiedState = pushedState - |> mapToSignal { readState -> Signal in + |> mapToSignal { namespaceAndReadState -> Signal in return stateManager.addCustomOperation(postbox.transaction { transaction -> VerifyReadStateError? in if let readStates = transaction.getPeerReadStates(peerId) { - for (namespace, currentReadState) in readStates where namespace == Namespaces.Message.Cloud { - if currentReadState == readState { + for (namespace, currentReadState) in readStates where namespace == namespaceAndReadState.0 { + if currentReadState == namespaceAndReadState.1 { transaction.confirmSynchronizedIncomingReadState(peerId) return nil }