import Foundation import Postbox import SwiftSignalKit import TelegramApi import MtProtoKit public enum CallListViewType { case all case missed } public enum CallListViewEntry { case message(Message, [Message]) case hole(MessageIndex) } public final class CallListView { public let entries: [CallListViewEntry] public let earlier: MessageIndex? public let later: MessageIndex? init(entries: [CallListViewEntry], earlier: MessageIndex?, later: MessageIndex?) { self.entries = entries self.earlier = earlier self.later = later } } private func pendingWebpages(entries: [MessageHistoryEntry]) -> (Set, [MessageId: (MediaId, String)]) { var messageIds = Set() var localWebpages: [MessageId: (MediaId, String)] = [:] for entry in entries { for media in entry.message.media { if let media = media as? TelegramMediaWebpage { if case let .Pending(_, url) = media.content { messageIds.insert(entry.message.id) if let url = url, media.webpageId.namespace == Namespaces.Media.LocalWebpage { localWebpages[entry.message.id] = (media.webpageId, url) } } break } } } return (messageIds, localWebpages) } private func pollMessages(entries: [MessageHistoryEntry]) -> (Set, [MessageId: Message]) { var messageIds = Set() var messages: [MessageId: Message] = [:] for entry in entries { for media in entry.message.media { if let poll = media as? TelegramMediaPoll, poll.pollId.namespace == Namespaces.Media.CloudPoll, entry.message.id.namespace == Namespaces.Message.Cloud, !poll.isClosed { messageIds.insert(entry.message.id) messages[entry.message.id] = entry.message break } } } return (messageIds, messages) } private func fetchWebpage(account: Account, messageId: MessageId, threadId: Int64?) -> Signal { let accountPeerId = account.peerId return account.postbox.loadedPeerWithId(messageId.peerId) |> take(1) |> mapToSignal { peer in if let inputPeer = apiInputPeer(peer) { let targetMessageNamespace: MessageId.Namespace if Namespaces.Message.allScheduled.contains(messageId.namespace) { targetMessageNamespace = Namespaces.Message.ScheduledCloud } else if Namespaces.Message.allQuickReply.contains(messageId.namespace) { targetMessageNamespace = Namespaces.Message.QuickReplyCloud } else { targetMessageNamespace = Namespaces.Message.Cloud } let messages: Signal if Namespaces.Message.allScheduled.contains(messageId.namespace) { messages = account.network.request(Api.functions.messages.getScheduledMessages(peer: inputPeer, id: [messageId.id])) } else if Namespaces.Message.allQuickReply.contains(messageId.namespace) { if let threadId { messages = account.network.request(Api.functions.messages.getQuickReplyMessages(flags: 1 << 0, shortcutId: Int32(clamping: threadId), id: [messageId.id], hash: 0)) } else { messages = .never() } } else { switch inputPeer { case let .inputPeerChannel(channelId, accessHash): messages = account.network.request(Api.functions.channels.getMessages(channel: Api.InputChannel.inputChannel(channelId: channelId, accessHash: accessHash), id: [Api.InputMessage.inputMessageID(id: messageId.id)])) default: messages = account.network.request(Api.functions.messages.getMessages(id: [Api.InputMessage.inputMessageID(id: messageId.id)])) } } return messages |> retryRequest |> mapToSignal { result in let messages: [Api.Message] let chats: [Api.Chat] let users: [Api.User] switch result { case let .messages(messages: apiMessages, chats: apiChats, users: apiUsers): messages = apiMessages chats = apiChats users = apiUsers case let .messagesSlice(_, _, _, _, messages: apiMessages, chats: apiChats, users: apiUsers): messages = apiMessages chats = apiChats users = apiUsers case let .channelMessages(_, _, _, _, apiMessages, apiTopics, apiChats, apiUsers): messages = apiMessages let _ = apiTopics chats = apiChats users = apiUsers case .messagesNotModified: messages = [] chats = [] users = [] } return account.postbox.transaction { transaction -> Void in let parsedPeers = AccumulatedPeers(transaction: transaction, chats: chats, users: users) for message in messages { if let storeMessage = StoreMessage(apiMessage: message, accountPeerId: accountPeerId, peerIsForum: peer.isForum, namespace: targetMessageNamespace) { var webpage: TelegramMediaWebpage? for media in storeMessage.media { if let media = media as? TelegramMediaWebpage { webpage = media } } if let webpage = webpage { updateMessageMedia(transaction: transaction, id: webpage.webpageId, media: webpage) } else { if let previousMessage = transaction.getMessage(messageId) { for media in previousMessage.media { if let media = media as? TelegramMediaWebpage { updateMessageMedia(transaction: transaction, id: media.webpageId, media: nil) break } } } } break } } updatePeers(transaction: transaction, accountPeerId: accountPeerId, peers: parsedPeers) } } } else { return .complete() } } } private func fetchPoll(account: Account, messageId: MessageId) -> Signal { return account.postbox.loadedPeerWithId(messageId.peerId) |> take(1) |> mapToSignal { peer -> Signal in guard let inputPeer = apiInputPeer(peer) else { return .complete() } return account.network.request(Api.functions.messages.getPollResults(peer: inputPeer, msgId: messageId.id)) |> map(Optional.init) |> `catch` { _ -> Signal in return .single(nil) } |> mapToSignal { updates -> Signal in if let updates = updates { account.stateManager.addUpdates(updates) } return .complete() } } } private func wrappedHistoryViewAdditionalData(chatLocation: ChatLocationInput, additionalData: [AdditionalMessageHistoryViewData]) -> [AdditionalMessageHistoryViewData] { var result = additionalData switch chatLocation { case let .peer(peerId, _): if peerId.namespace == Namespaces.Peer.CloudChannel { if result.firstIndex(where: { if case .peerChatState = $0 { return true } else { return false } }) == nil { result.append(.peerChatState(peerId)) } } case let .thread(peerId, _, _): if peerId.namespace == Namespaces.Peer.CloudChannel { if result.firstIndex(where: { if case .peerChatState = $0 { return true } else { return false } }) == nil { result.append(.peerChatState(peerId)) } } case .customChatContents: break } return result } private final class PeerCachedDataContext { var viewIds = Set() var timestamp: Double? var hasCachedData: Bool = false let disposable = MetaDisposable() deinit { self.disposable.dispose() } } private final class CachedChannelParticipantsContext { var subscribers = Bag() var timestamp: Double? let disposable = MetaDisposable() deinit { self.disposable.dispose() } } private final class ChannelPollingContext { var subscribers = Bag() let disposable = MetaDisposable() let isUpdated = Promise(false) private(set) var isUpdatedValue: Bool = false private var isUpdatedDisposable: Disposable? init(queue: Queue) { self.isUpdatedDisposable = (self.isUpdated.get() |> deliverOn(queue)).start(next: { [weak self] value in self?.isUpdatedValue = value }) } deinit { self.disposable.dispose() self.isUpdatedDisposable?.dispose() } } private final class FeaturedStickerPacksContext { var subscribers = Bag() let disposable = MetaDisposable() var timestamp: Double? deinit { self.disposable.dispose() } } private struct ViewCountContextState { struct ReplyInfo { var commentsPeerId: PeerId? var maxReadIncomingMessageId: MessageId? var maxMessageId: MessageId? } var timestamp: Int32 var clientId: Int32 var result: ReplyInfo? func isStillValidFor(_ other: ViewCountContextState) -> Bool { if other.timestamp > self.timestamp + 30 { return false } if other.clientId > self.clientId { return false } return true } } public final class AccountViewTracker { weak var account: Account? private let accountPeerId: PeerId private let queue = Queue() private var nextViewId: Int32 = 0 private var viewPendingWebpageMessageIds: [Int32: Set] = [:] private var viewPollMessageIds: [Int32: Set] = [:] private var pendingWebpageMessageIds: [MessageId: Int] = [:] private var pollMessageIds: [MessageId: Int] = [:] private var webpageDisposables: [MessageId: Disposable] = [:] private var pollDisposables: [MessageId: Disposable] = [:] private var viewVisibleCallListHoleIds: [Int32: Set] = [:] private var visibleCallListHoleIds: [MessageIndex: Int] = [:] private var visibleCallListHoleDisposables: [MessageIndex: Disposable] = [:] private var updatedViewCountMessageIdsAndTimestamps: [MessageId: ViewCountContextState] = [:] private var nextUpdatedViewCountDisposableId: Int32 = 0 private var updatedViewCountDisposables = DisposableDict() private var updatedReactionsMessageIdsAndTimestamps: [MessageId: Int32] = [:] private var nextUpdatedReactionsDisposableId: Int32 = 0 private var updatedReactionsDisposables = DisposableDict() private var updatedSeenLiveLocationMessageIdsAndTimestamps: [MessageId: Int32] = [:] private var nextSeenLiveLocationDisposableId: Int32 = 0 private var seenLiveLocationDisposables = DisposableDict() private var updatedExtendedMediaMessageIdsAndTimestamps: [MessageId: Int32] = [:] private var nextUpdatedExtendedMediaDisposableId: Int32 = 0 private var updatedExtendedMediaDisposables = DisposableDict() private var updatedUnsupportedMediaMessageIdsAndTimestamps: [MessageAndThreadId: Int32] = [:] private var refreshSecretChatMediaMessageIdsAndTimestamps: [MessageId: Int32] = [:] private var refreshStoriesForMessageIdsAndTimestamps: [MessageId: Int32] = [:] private var nextUpdatedUnsupportedMediaDisposableId: Int32 = 0 private var updatedUnsupportedMediaDisposables = DisposableDict() private var refreshStoriesForPeerIdsAndTimestamps: [PeerId: Int32] = [:] private var refreshStoriesForPeerIdsDebounceDisposable: Disposable? private var pendingRefreshStoriesForPeerIds: [PeerId] = [] private var refreshCanSendMessagesForPeerIdsAndTimestamps: [PeerId: Int32] = [:] private var refreshCanSendMessagesForPeerIdsDebounceDisposable: Disposable? private var pendingRefreshCanSendMessagesForPeerIds: [PeerId] = [] private var updatedSeenPersonalMessageIds = Set() private var updatedReactionsSeenForMessageIds = Set() private var cachedDataContexts: [PeerId: PeerCachedDataContext] = [:] private var cachedChannelParticipantsContexts: [PeerId: CachedChannelParticipantsContext] = [:] private var channelPollingContexts: [PeerId: ChannelPollingContext] = [:] private var featuredStickerPacksContext: FeaturedStickerPacksContext? private var featuredEmojiPacksContext: FeaturedStickerPacksContext? let chatHistoryPreloadManager: ChatHistoryPreloadManager private let historyViewStateValidationContexts: HistoryViewStateValidationContexts public var orderedPreloadMedia: Signal<[ChatHistoryPreloadMediaItem], NoError> { return self.chatHistoryPreloadManager.orderedMedia } private let externallyUpdatedPeerIdDisposable = MetaDisposable() public let chatListPreloadItems = Promise>([]) private let hiddenChatListFilterIdsValue = Atomic<[Int32: Bag]>(value: [:]) private let hiddenChatListFilterIdsPromise = ValuePromise>(Set()) public var hiddenChatListFilterIds: Signal, NoError> { return self.hiddenChatListFilterIdsPromise.get() } var resetPeerHoleManagement: ((PeerId) -> Void)? private var quickRepliesUpdateDisposable: Disposable? private var quickRepliesUpdateTimestamp: Double = 0.0 private var businessLinksUpdateDisposable: Disposable? private var businessLinksUpdateTimestamp: Double = 0.0 init(account: Account) { self.account = account self.accountPeerId = account.peerId self.historyViewStateValidationContexts = HistoryViewStateValidationContexts(queue: self.queue, postbox: account.postbox, network: account.network, accountPeerId: account.peerId) self.chatHistoryPreloadManager = ChatHistoryPreloadManager(postbox: account.postbox, network: account.network, accountPeerId: account.peerId, networkState: account.networkState, preloadItemsSignal: self.chatListPreloadItems.get() |> distinctUntilChanged |> map { Array($0) }) self.externallyUpdatedPeerIdDisposable.set((account.stateManager.externallyUpdatedPeerIds |> deliverOn(self.queue)).start(next: { [weak self] peerIds in guard let strongSelf = self else { return } for (peerId, _) in strongSelf.cachedDataContexts { if peerIds.contains(peerId) { strongSelf.forceUpdateCachedPeerData(peerId: peerId) } } })) } deinit { self.updatedViewCountDisposables.dispose() self.updatedReactionsDisposables.dispose() self.externallyUpdatedPeerIdDisposable.dispose() self.quickRepliesUpdateDisposable?.dispose() self.businessLinksUpdateDisposable?.dispose() } func reset() { self.queue.async { self.cachedDataContexts.removeAll() } } private func updatePendingWebpages(viewId: Int32, threadId: Int64?, messageIds: Set, localWebpages: [MessageId: (MediaId, String)]) { self.queue.async { var addedMessageIds: [MessageId] = [] var removedMessageIds: [MessageId] = [] let viewMessageIds: Set = self.viewPendingWebpageMessageIds[viewId] ?? Set() let viewAddedMessageIds = messageIds.subtracting(viewMessageIds) let viewRemovedMessageIds = viewMessageIds.subtracting(messageIds) for messageId in viewAddedMessageIds { if let count = self.pendingWebpageMessageIds[messageId] { self.pendingWebpageMessageIds[messageId] = count + 1 } else { self.pendingWebpageMessageIds[messageId] = 1 addedMessageIds.append(messageId) } } for messageId in viewRemovedMessageIds { if let count = self.pendingWebpageMessageIds[messageId] { if count == 1 { self.pendingWebpageMessageIds.removeValue(forKey: messageId) removedMessageIds.append(messageId) } else { self.pendingWebpageMessageIds[messageId] = count - 1 } } else { assertionFailure() } } if messageIds.isEmpty { self.viewPendingWebpageMessageIds.removeValue(forKey: viewId) } else { self.viewPendingWebpageMessageIds[viewId] = messageIds } for messageId in removedMessageIds { if let disposable = self.webpageDisposables.removeValue(forKey: messageId) { disposable.dispose() } } if let account = self.account { for messageId in addedMessageIds { if self.webpageDisposables[messageId] == nil { if let (_, url) = localWebpages[messageId] { self.webpageDisposables[messageId] = (webpagePreview(account: account, urls: [url]) |> mapToSignal { result -> Signal in guard case let .result(webpageResult) = result else { return .complete() } return account.postbox.transaction { transaction -> Void in if let webpageResult = webpageResult { transaction.updateMessage(messageId, update: { currentMessage in let storeForwardInfo = currentMessage.forwardInfo.flatMap(StoreMessageForwardInfo.init) var media = currentMessage.media for i in 0 ..< media.count { if let _ = media[i] as? TelegramMediaWebpage { media[i] = webpageResult.webpage break } } return .update(StoreMessage(id: currentMessage.id, globallyUniqueId: currentMessage.globallyUniqueId, groupingKey: currentMessage.groupingKey, threadId: currentMessage.threadId, timestamp: currentMessage.timestamp, flags: StoreMessageFlags(currentMessage.flags), tags: currentMessage.tags, globalTags: currentMessage.globalTags, localTags: currentMessage.localTags, forwardInfo: storeForwardInfo, authorId: currentMessage.author?.id, text: currentMessage.text, attributes: currentMessage.attributes, media: media)) }) } } }).start(completed: { [weak self] in if let strongSelf = self { strongSelf.queue.async { strongSelf.webpageDisposables.removeValue(forKey: messageId) } } }) } else if messageId.namespace == Namespaces.Message.Cloud { self.webpageDisposables[messageId] = fetchWebpage(account: account, messageId: messageId, threadId: threadId).start(completed: { [weak self] in if let strongSelf = self { strongSelf.queue.async { strongSelf.webpageDisposables.removeValue(forKey: messageId) } } }) } } else { assertionFailure() } } } } } private func updatePolls(viewId: Int32, messageIds: Set, messages: [MessageId: Message]) { let queue = self.queue self.queue.async { var addedMessageIds: [MessageId] = [] var removedMessageIds: [MessageId] = [] let viewMessageIds: Set = self.viewPollMessageIds[viewId] ?? Set() let viewAddedMessageIds = messageIds.subtracting(viewMessageIds) let viewRemovedMessageIds = viewMessageIds.subtracting(messageIds) for messageId in viewAddedMessageIds { if let count = self.pollMessageIds[messageId] { self.pollMessageIds[messageId] = count + 1 } else { self.pollMessageIds[messageId] = 1 addedMessageIds.append(messageId) } } for messageId in viewRemovedMessageIds { if let count = self.pollMessageIds[messageId] { if count == 1 { self.pollMessageIds.removeValue(forKey: messageId) removedMessageIds.append(messageId) } else { self.pollMessageIds[messageId] = count - 1 } } else { assertionFailure() } } if messageIds.isEmpty { self.viewPollMessageIds.removeValue(forKey: viewId) } else { self.viewPollMessageIds[viewId] = messageIds } for messageId in removedMessageIds { if let disposable = self.pollDisposables.removeValue(forKey: messageId) { disposable.dispose() } } if let account = self.account { for messageId in addedMessageIds { if self.pollDisposables[messageId] == nil { var deadlineTimer: Signal = .single(false) if let message = messages[messageId] { for media in message.media { if let poll = media as? TelegramMediaPoll { if let _ = poll.deadlineTimeout, message.id.namespace == Namespaces.Message.Cloud { let startDate: Int32 if let forwardInfo = message.forwardInfo { startDate = forwardInfo.date } else { startDate = message.timestamp } let timestamp = Int32(CFAbsoluteTimeGetCurrent() + NSTimeIntervalSince1970) let remainingTime = timestamp - startDate - 1 if remainingTime > 0 { deadlineTimer = .single(false) |> then( .single(true) |> suspendAwareDelay(Double(remainingTime), queue: queue) ) } else { deadlineTimer = .single(true) } } } } } let pollSignal: Signal = deadlineTimer |> distinctUntilChanged |> mapToSignal { reachedDeadline -> Signal in if reachedDeadline { var signal = fetchPoll(account: account, messageId: messageId) |> ignoreValues signal = (signal |> then( .complete() |> delay(0.5, queue: Queue.concurrentDefaultQueue()) )) |> restart return signal } else { var signal = fetchPoll(account: account, messageId: messageId) |> ignoreValues signal = (signal |> then( .complete() |> delay(30.0, queue: Queue.concurrentDefaultQueue()) )) |> restart return signal } } self.pollDisposables[messageId] = pollSignal.start() } else { assertionFailure() } } } } } private func updateVisibleCallListHoles(viewId: Int32, holeIds: Set) { self.queue.async { var addedHoleIds: [MessageIndex] = [] var removedHoleIds: [MessageIndex] = [] let viewHoleIds: Set = self.viewVisibleCallListHoleIds[viewId] ?? Set() let viewAddedHoleIds = holeIds.subtracting(viewHoleIds) let viewRemovedHoleIds = viewHoleIds.subtracting(holeIds) for holeId in viewAddedHoleIds { if let count = self.visibleCallListHoleIds[holeId] { self.visibleCallListHoleIds[holeId] = count + 1 } else { self.visibleCallListHoleIds[holeId] = 1 addedHoleIds.append(holeId) } } for holeId in viewRemovedHoleIds { if let count = self.visibleCallListHoleIds[holeId] { if count == 1 { self.visibleCallListHoleIds.removeValue(forKey: holeId) removedHoleIds.append(holeId) } else { self.visibleCallListHoleIds[holeId] = count - 1 } } else { assertionFailure() } } if holeIds.isEmpty { self.viewVisibleCallListHoleIds.removeValue(forKey: viewId) } else { self.viewVisibleCallListHoleIds[viewId] = holeIds } for holeId in removedHoleIds { if let disposable = self.visibleCallListHoleDisposables.removeValue(forKey: holeId) { disposable.dispose() } } if let account = self.account { for holeId in addedHoleIds { if self.visibleCallListHoleDisposables[holeId] == nil { self.visibleCallListHoleDisposables[holeId] = fetchCallListHole(network: account.network, postbox: account.postbox, accountPeerId: account.peerId, holeIndex: holeId).start(completed: { [weak self] in if let strongSelf = self { strongSelf.queue.async { strongSelf.visibleCallListHoleDisposables.removeValue(forKey: holeId) } } }) } else { assertionFailure() } } } } } public struct UpdatedMessageReplyInfo { var timestamp: Int32 var commentsPeerId: PeerId var maxReadIncomingMessageId: MessageId? var maxMessageId: MessageId? } func applyMaxReadIncomingMessageIdForReplyInfo(id: MessageId, maxReadIncomingMessageId: MessageId) { self.queue.async { if var state = self.updatedViewCountMessageIdsAndTimestamps[id], var result = state.result { result.maxReadIncomingMessageId = maxReadIncomingMessageId state.result = result self.updatedViewCountMessageIdsAndTimestamps[id] = state } } } public func replyInfoForMessageId(_ id: MessageId) -> Signal { return Signal { [weak self] subscriber in let state = self?.updatedViewCountMessageIdsAndTimestamps[id] let result = state?.result if let state = state, let result = result, let commentsPeerId = result.commentsPeerId { subscriber.putNext(UpdatedMessageReplyInfo(timestamp: state.timestamp, commentsPeerId: commentsPeerId, maxReadIncomingMessageId: result.maxReadIncomingMessageId, maxMessageId: result.maxMessageId)) } else { subscriber.putNext(nil) } subscriber.putCompletion() return EmptyDisposable } |> runOn(self.queue) } public func updateReplyInfoForMessageId(_ id: MessageId, info: UpdatedMessageReplyInfo) { self.queue.async { [weak self] in guard let strongSelf = self else { return } guard let current = strongSelf.updatedViewCountMessageIdsAndTimestamps[id] else { return } strongSelf.updatedViewCountMessageIdsAndTimestamps[id] = ViewCountContextState(timestamp: Int32(CFAbsoluteTimeGetCurrent()), clientId: current.clientId, result: ViewCountContextState.ReplyInfo(commentsPeerId: info.commentsPeerId, maxReadIncomingMessageId: info.maxReadIncomingMessageId, maxMessageId: info.maxMessageId)) } } public func updateViewCountForMessageIds(messageIds: Set, clientId: Int32) { self.queue.async { var addedMessageIds: [MessageId] = [] let updatedState = ViewCountContextState(timestamp: Int32(CFAbsoluteTimeGetCurrent()), clientId: clientId, result: nil) for messageId in messageIds { let messageTimestamp = self.updatedViewCountMessageIdsAndTimestamps[messageId] if messageTimestamp == nil || !messageTimestamp!.isStillValidFor(updatedState) { self.updatedViewCountMessageIdsAndTimestamps[messageId] = updatedState addedMessageIds.append(messageId) } } if !addedMessageIds.isEmpty { for (peerId, messageIds) in messagesIdsGroupedByPeerId(Set(addedMessageIds)) { let disposableId = self.nextUpdatedViewCountDisposableId self.nextUpdatedViewCountDisposableId += 1 if let account = self.account { let accountPeerId = account.peerId let signal: Signal<[MessageId: ViewCountContextState], NoError> = (account.postbox.transaction { transaction -> Signal<[MessageId: ViewCountContextState], NoError> in guard let peer = transaction.getPeer(peerId), let inputPeer = apiInputPeer(peer) else { return .complete() } return account.network.request(Api.functions.messages.getMessagesViews(peer: inputPeer, id: messageIds.map { $0.id }, increment: .boolTrue)) |> map(Optional.init) |> `catch` { _ -> Signal in return .single(nil) } |> mapToSignal { result -> Signal<[MessageId: ViewCountContextState], NoError> in guard case let .messageViews(viewCounts, chats, users)? = result else { return .complete() } return account.postbox.transaction { transaction -> [MessageId: ViewCountContextState] in var resultStates: [MessageId: ViewCountContextState] = [:] let parsedPeers = AccumulatedPeers(transaction: transaction, chats: chats, users: users) updatePeers(transaction: transaction, accountPeerId: accountPeerId, peers: parsedPeers) for i in 0 ..< messageIds.count { if i < viewCounts.count { if case let .messageViews(_, views, forwards, replies) = viewCounts[i] { transaction.updateMessage(messageIds[i], update: { currentMessage in let storeForwardInfo = currentMessage.forwardInfo.flatMap(StoreMessageForwardInfo.init) var attributes = currentMessage.attributes var foundReplies = false var commentsChannelId: PeerId? var recentRepliersPeerIds: [PeerId]? var repliesCount: Int32? var repliesMaxId: Int32? var repliesReadMaxId: Int32? if let replies = replies { switch replies { case let .messageReplies(_, repliesCountValue, _, recentRepliers, channelId, maxId, readMaxId): if let channelId = channelId { commentsChannelId = PeerId(namespace: Namespaces.Peer.CloudChannel, id: PeerId.Id._internalFromInt64Value(channelId)) } repliesCount = repliesCountValue if let recentRepliers = recentRepliers { recentRepliersPeerIds = recentRepliers.map { $0.peerId } } else { recentRepliersPeerIds = nil } repliesMaxId = maxId repliesReadMaxId = readMaxId } } var maxMessageId: MessageId? if let commentsChannelId = commentsChannelId { if let repliesMaxId = repliesMaxId { maxMessageId = MessageId(peerId: commentsChannelId, namespace: Namespaces.Message.Cloud, id: repliesMaxId) } } loop: for j in 0 ..< attributes.count { if let attribute = attributes[j] as? ViewCountMessageAttribute { if let views = views { attributes[j] = ViewCountMessageAttribute(count: max(attribute.count, Int(views))) } } else if let _ = attributes[j] as? ForwardCountMessageAttribute { if let forwards = forwards { attributes[j] = ForwardCountMessageAttribute(count: Int(forwards)) } } else if let attribute = attributes[j] as? ReplyThreadMessageAttribute { foundReplies = true if let repliesCount = repliesCount { var resolvedMaxReadMessageId: MessageId.Id? if let previousMaxReadMessageId = attribute.maxReadMessageId, let repliesReadMaxIdValue = repliesReadMaxId { resolvedMaxReadMessageId = max(previousMaxReadMessageId, repliesReadMaxIdValue) repliesReadMaxId = resolvedMaxReadMessageId } else if let repliesReadMaxIdValue = repliesReadMaxId { resolvedMaxReadMessageId = repliesReadMaxIdValue repliesReadMaxId = resolvedMaxReadMessageId } else { resolvedMaxReadMessageId = attribute.maxReadMessageId } attributes[j] = ReplyThreadMessageAttribute(count: repliesCount, latestUsers: recentRepliersPeerIds ?? [], commentsPeerId: commentsChannelId, maxMessageId: repliesMaxId, maxReadMessageId: resolvedMaxReadMessageId) } } } var maxReadIncomingMessageId: MessageId? if let commentsChannelId = commentsChannelId { if let repliesReadMaxId = repliesReadMaxId { maxReadIncomingMessageId = MessageId(peerId: commentsChannelId, namespace: Namespaces.Message.Cloud, id: repliesReadMaxId) } } resultStates[messageIds[i]] = ViewCountContextState(timestamp: Int32(CFAbsoluteTimeGetCurrent()), clientId: clientId, result: ViewCountContextState.ReplyInfo(commentsPeerId: commentsChannelId, maxReadIncomingMessageId: maxReadIncomingMessageId, maxMessageId: maxMessageId)) if !foundReplies, let repliesCount = repliesCount { attributes.append(ReplyThreadMessageAttribute(count: repliesCount, latestUsers: recentRepliersPeerIds ?? [], commentsPeerId: commentsChannelId, maxMessageId: repliesMaxId, maxReadMessageId: repliesReadMaxId)) } return .update(StoreMessage(id: currentMessage.id, globallyUniqueId: currentMessage.globallyUniqueId, groupingKey: currentMessage.groupingKey, threadId: currentMessage.threadId, timestamp: currentMessage.timestamp, flags: StoreMessageFlags(currentMessage.flags), tags: currentMessage.tags, globalTags: currentMessage.globalTags, localTags: currentMessage.localTags, forwardInfo: storeForwardInfo, authorId: currentMessage.author?.id, text: currentMessage.text, attributes: attributes, media: currentMessage.media)) }) } } } return resultStates } } } |> switchToLatest) |> afterDisposed { [weak self] in self?.queue.async { self?.updatedViewCountDisposables.set(nil, forKey: disposableId) } } |> deliverOn(self.queue) self.updatedViewCountDisposables.set(signal.start(next: { [weak self] updatedStates in guard let strongSelf = self else { return } for (id, state) in updatedStates { strongSelf.updatedViewCountMessageIdsAndTimestamps[id] = state } }), forKey: disposableId) } } } } } public func updateReactionsForMessageIds(messageIds: Set, force: Bool = false) { self.queue.async { var addedMessageIds: [MessageId] = [] let timestamp = Int32(CFAbsoluteTimeGetCurrent()) for messageId in messageIds { let messageTimestamp = self.updatedReactionsMessageIdsAndTimestamps[messageId] if messageTimestamp == nil || messageTimestamp! < timestamp - 1 * 20 || force { self.updatedReactionsMessageIdsAndTimestamps[messageId] = timestamp addedMessageIds.append(messageId) } } if !addedMessageIds.isEmpty { for (peerId, messageIds) in messagesIdsGroupedByPeerId(Set(addedMessageIds)) { let disposableId = self.nextUpdatedReactionsDisposableId self.nextUpdatedReactionsDisposableId += 1 if let account = self.account { let signal = (account.postbox.transaction { transaction -> Signal in if let peer = transaction.getPeer(peerId), let inputPeer = apiInputPeer(peer) { return account.network.request(Api.functions.messages.getMessagesReactions(peer: inputPeer, id: messageIds.map { $0.id })) |> map(Optional.init) |> `catch` { _ -> Signal in return .single(nil) } |> mapToSignal { updates -> Signal in guard let updates = updates else { return .complete() } return account.postbox.transaction { transaction -> Void in let updateList: [Api.Update] switch updates { case let .updates(updates, _, _, _, _): updateList = updates case let .updatesCombined(updates, _, _, _, _, _): updateList = updates case let .updateShort(update, _): updateList = [update] default: updateList = [] } for update in updateList { switch update { case let .updateMessageReactions(_, peer, msgId, _, reactions): transaction.updateMessage(MessageId(peerId: peer.peerId, namespace: Namespaces.Message.Cloud, id: msgId), update: { currentMessage in var updatedReactions = ReactionsMessageAttribute(apiReactions: reactions) let storeForwardInfo = currentMessage.forwardInfo.flatMap(StoreMessageForwardInfo.init) var added = false var attributes = currentMessage.attributes loop: for j in 0 ..< attributes.count { if let attribute = attributes[j] as? ReactionsMessageAttribute { added = true updatedReactions = attribute.withUpdatedResults(reactions) if updatedReactions == attribute { return .skip } attributes[j] = updatedReactions break loop } } if !added { attributes.append(updatedReactions) } var tags = currentMessage.tags if updatedReactions.hasUnseen { tags.insert(.unseenReaction) } else { tags.remove(.unseenReaction) } return .update(StoreMessage(id: currentMessage.id, globallyUniqueId: currentMessage.globallyUniqueId, groupingKey: currentMessage.groupingKey, threadId: currentMessage.threadId, timestamp: currentMessage.timestamp, flags: StoreMessageFlags(currentMessage.flags), tags: tags, globalTags: currentMessage.globalTags, localTags: currentMessage.localTags, forwardInfo: storeForwardInfo, authorId: currentMessage.author?.id, text: currentMessage.text, attributes: attributes, media: currentMessage.media)) }) default: break } } } } } else { return .complete() } } |> switchToLatest) |> afterDisposed { [weak self] in self?.queue.async { self?.updatedReactionsDisposables.set(nil, forKey: disposableId) } } self.updatedReactionsDisposables.set(signal.start(), forKey: disposableId) } } } } } public func updateSeenLiveLocationForMessageIds(messageIds: Set) { self.queue.async { var addedMessageIds: [MessageId] = [] let timestamp = Int32(CFAbsoluteTimeGetCurrent()) for messageId in messageIds { let messageTimestamp = self.updatedSeenLiveLocationMessageIdsAndTimestamps[messageId] if messageTimestamp == nil || messageTimestamp! < timestamp - 1 * 60 { self.updatedSeenLiveLocationMessageIdsAndTimestamps[messageId] = timestamp addedMessageIds.append(messageId) } } if !addedMessageIds.isEmpty { for (peerId, messageIds) in messagesIdsGroupedByPeerId(Set(addedMessageIds)) { let disposableId = self.nextSeenLiveLocationDisposableId self.nextSeenLiveLocationDisposableId += 1 if let account = self.account { let signal = (account.postbox.transaction { transaction -> Signal in if let peer = transaction.getPeer(peerId), let inputPeer = apiInputPeer(peer) { let request: Signal switch inputPeer { case .inputPeerChat, .inputPeerSelf, .inputPeerUser: request = account.network.request(Api.functions.messages.readMessageContents(id: messageIds.map { $0.id })) |> map { _ in true } case let .inputPeerChannel(channelId, accessHash): request = account.network.request(Api.functions.channels.readMessageContents(channel: .inputChannel(channelId: channelId, accessHash: accessHash), id: messageIds.map { $0.id })) |> map { _ in true } default: return .complete() } return request |> `catch` { _ -> Signal in return .single(false) } |> mapToSignal { _ -> Signal in return .complete() } } else { return .complete() } } |> switchToLatest) |> afterDisposed { [weak self] in self?.queue.async { self?.seenLiveLocationDisposables.set(nil, forKey: disposableId) } } self.seenLiveLocationDisposables.set(signal.start(), forKey: disposableId) } } } } } public func updatedExtendedMediaForMessageIds(messageIds: Set) { self.queue.async { var addedMessageIds: [MessageId] = [] let timestamp = Int32(CFAbsoluteTimeGetCurrent()) for messageId in messageIds { let messageTimestamp = self.updatedExtendedMediaMessageIdsAndTimestamps[messageId] if messageTimestamp == nil || messageTimestamp! < timestamp - 30 { self.updatedExtendedMediaMessageIdsAndTimestamps[messageId] = timestamp addedMessageIds.append(messageId) } } if !addedMessageIds.isEmpty { for (peerId, messageIds) in messagesIdsGroupedByPeerId(Set(addedMessageIds)) { let disposableId = self.nextUpdatedExtendedMediaDisposableId self.nextUpdatedExtendedMediaDisposableId += 1 if let account = self.account { let signal = account.postbox.transaction { transaction -> Peer? in if let peer = transaction.getPeer(peerId) { return peer } else { return nil } } |> mapToSignal { peer -> Signal in guard let peer = peer, let inputPeer = apiInputPeer(peer) else { return .complete() } return account.network.request(Api.functions.messages.getExtendedMedia(peer: inputPeer, id: messageIds.map { $0.id })) |> map(Optional.init) |> `catch` { _ -> Signal in return .single(nil) } |> mapToSignal { updates -> Signal in if let updates = updates { account.stateManager.addUpdates(updates) } return .complete() } } |> afterDisposed { [weak self] in self?.queue.async { self?.updatedExtendedMediaDisposables.set(nil, forKey: disposableId) } } self.updatedExtendedMediaDisposables.set(signal.start(), forKey: disposableId) } } } } } public func updateUnsupportedMediaForMessageIds(messageIds: Set) { self.queue.async { var addedMessageIds: [MessageAndThreadId] = [] let timestamp = Int32(CFAbsoluteTimeGetCurrent()) for messageId in messageIds { let messageTimestamp = self.updatedUnsupportedMediaMessageIdsAndTimestamps[messageId] if messageTimestamp == nil || messageTimestamp! < timestamp - 10 * 60 * 60 { self.updatedUnsupportedMediaMessageIdsAndTimestamps[messageId] = timestamp addedMessageIds.append(messageId) } } if !addedMessageIds.isEmpty { for (peerIdAndThreadId, messageIds) in messagesIdsGroupedByPeerId(Set(addedMessageIds)) { let disposableId = self.nextUpdatedUnsupportedMediaDisposableId self.nextUpdatedUnsupportedMediaDisposableId += 1 if let account = self.account { let accountPeerId = account.peerId let signal = account.postbox.transaction { transaction -> Peer? in if let peer = transaction.getPeer(peerIdAndThreadId.peerId) { return peer } else { return nil } } |> mapToSignal { peer -> Signal in guard let peer = peer else { return .complete() } var fetchSignal: Signal? if let messageId = messageIds.first, messageId.namespace == Namespaces.Message.ScheduledCloud { if let inputPeer = apiInputPeer(peer) { fetchSignal = account.network.request(Api.functions.messages.getScheduledMessages(peer: inputPeer, id: messageIds.map { $0.id })) } } else if let messageId = messageIds.first, messageId.namespace == Namespaces.Message.QuickReplyCloud { if let threadId = peerIdAndThreadId.threadId { fetchSignal = account.network.request(Api.functions.messages.getQuickReplyMessages(flags: 1 << 0, shortcutId: Int32(clamping: threadId), id: messageIds.map { $0.id }, hash: 0)) } else { fetchSignal = .never() } } else if peerIdAndThreadId.peerId.namespace == Namespaces.Peer.CloudUser || peerIdAndThreadId.peerId.namespace == Namespaces.Peer.CloudGroup { fetchSignal = account.network.request(Api.functions.messages.getMessages(id: messageIds.map { Api.InputMessage.inputMessageID(id: $0.id) })) } else if peerIdAndThreadId.peerId.namespace == Namespaces.Peer.CloudChannel { if let inputChannel = apiInputChannel(peer) { fetchSignal = account.network.request(Api.functions.channels.getMessages(channel: inputChannel, id: messageIds.map { Api.InputMessage.inputMessageID(id: $0.id) })) } } guard let signal = fetchSignal else { return .complete() } return signal |> map { result -> (Peer, [Api.Message], [Api.Chat], [Api.User]) in switch result { case let .messages(messages, chats, users): return (peer, messages, chats, users) case let .messagesSlice(_, _, _, _, messages, chats, users): return (peer, messages, chats, users) case let .channelMessages(_, _, _, _, messages, _, chats, users): return (peer, messages, chats, users) case .messagesNotModified: return (peer, [], [], []) } } |> `catch` { _ in return Signal<(Peer, [Api.Message], [Api.Chat], [Api.User]), NoError>.single((peer, [], [], [])) } |> mapToSignal { topPeer, messages, chats, users -> Signal in return account.postbox.transaction { transaction -> Void in let parsedPeers = AccumulatedPeers(transaction: transaction, chats: chats, users: users) updatePeers(transaction: transaction, accountPeerId: accountPeerId, peers: parsedPeers) for message in messages { guard let storeMessage = StoreMessage(apiMessage: message, accountPeerId: accountPeerId, peerIsForum: topPeer.isForum) else { continue } guard case let .Id(id) = storeMessage.id else { continue } transaction.updateMessage(id, update: { _ in return .update(storeMessage) }) } } } } |> afterDisposed { [weak self] in self?.queue.async { self?.updatedUnsupportedMediaDisposables.set(nil, forKey: disposableId) } } self.updatedUnsupportedMediaDisposables.set(signal.start(), forKey: disposableId) } } } } } public func refreshSecretMediaMediaForMessageIds(messageIds: Set) { self.queue.async { var addedMessageIds: [MessageId] = [] let timestamp = Int32(CFAbsoluteTimeGetCurrent()) for messageId in messageIds { let messageTimestamp = self.refreshSecretChatMediaMessageIdsAndTimestamps[messageId] if messageTimestamp == nil { self.refreshSecretChatMediaMessageIdsAndTimestamps[messageId] = timestamp addedMessageIds.append(messageId) } } if !addedMessageIds.isEmpty { for (_, messageIds) in messagesIdsGroupedByPeerId(Set(addedMessageIds)) { let disposableId = self.nextUpdatedUnsupportedMediaDisposableId self.nextUpdatedUnsupportedMediaDisposableId += 1 if let account = self.account { let signal = account.postbox.transaction { transaction -> [TelegramMediaFile] in var result: [TelegramMediaFile] = [] for id in messageIds { if let message = transaction.getMessage(id) { for media in message.media { if let file = media as? TelegramMediaFile, file.isAnimatedSticker { result.append(file) } } } } return result } |> mapToSignal { files -> Signal in guard !files.isEmpty else { return .complete() } var stickerPacks = Set() for file in files { for attribute in file.attributes { if case let .Sticker(_, packReferenceValue, _) = attribute, let packReference = packReferenceValue { if case .id = packReference { stickerPacks.insert(packReference) } } } } var requests: [Signal] = [] for reference in stickerPacks { if case let .id(id, accessHash) = reference { requests.append(account.network.request(Api.functions.messages.getStickerSet(stickerset: .inputStickerSetID(id: id, accessHash: accessHash), hash: 0)) |> map(Optional.init) |> `catch` { _ -> Signal in return .single(nil) }) } } if requests.isEmpty { return .complete() } return combineLatest(requests) |> mapToSignal { results -> Signal in return account.postbox.transaction { transaction -> Void in for result in results { switch result { case let .stickerSet(_, _, _, documents)?: for document in documents { if let file = telegramMediaFileFromApiDocument(document, altDocuments: []) { if transaction.getMedia(file.fileId) != nil { let _ = transaction.updateMedia(file.fileId, update: file) } } } default: break } } } } } |> afterDisposed { [weak self] in self?.queue.async { self?.updatedUnsupportedMediaDisposables.set(nil, forKey: disposableId) } } self.updatedUnsupportedMediaDisposables.set(signal.start(), forKey: disposableId) } } } } } public func refreshStoriesForMessageIds(messageIds: Set) { self.queue.async { var addedMessageIds: [MessageId] = [] let timestamp = Int32(CFAbsoluteTimeGetCurrent()) for messageId in messageIds { let messageTimestamp = self.refreshStoriesForMessageIdsAndTimestamps[messageId] var refresh = false if let messageTimestamp = messageTimestamp { refresh = messageTimestamp < timestamp - 60 } else { refresh = true } if refresh { self.refreshStoriesForMessageIdsAndTimestamps[messageId] = timestamp addedMessageIds.append(messageId) } } if !addedMessageIds.isEmpty { for (_, messageIds) in messagesIdsGroupedByPeerId(Set(addedMessageIds)) { let disposableId = self.nextUpdatedUnsupportedMediaDisposableId self.nextUpdatedUnsupportedMediaDisposableId += 1 if let account = self.account { let signal = account.postbox.transaction { transaction -> Set in var result = Set() for id in messageIds { if let message = transaction.getMessage(id) { for media in message.media { if let storyMedia = media as? TelegramMediaStory { result.insert(storyMedia.storyId) } else if let webpage = media as? TelegramMediaWebpage, case let .Loaded(content) = webpage.content, let story = content.story { result.insert(story.storyId) } } for attribute in message.attributes { if let attribute = attribute as? ReplyStoryAttribute { result.insert(attribute.storyId) } } } } return result } |> mapToSignal { ids -> Signal in guard !ids.isEmpty else { return .complete() } var requests: [Signal] = [] var idsGroupedByPeerId: [PeerId: Set] = [:] for id in ids { if idsGroupedByPeerId[id.peerId] == nil { idsGroupedByPeerId[id.peerId] = Set([id.id]) } else { idsGroupedByPeerId[id.peerId]?.insert(id.id) } } for (peerId, ids) in idsGroupedByPeerId { requests.append(_internal_refreshStories(account: account, peerId: peerId, ids: Array(ids))) } return combineLatest(requests) |> ignoreValues } |> afterDisposed { [weak self] in self?.queue.async { self?.updatedUnsupportedMediaDisposables.set(nil, forKey: disposableId) } } self.updatedUnsupportedMediaDisposables.set(signal.start(), forKey: disposableId) } } } } } public func refreshStoryStatsForPeerIds(peerIds: [PeerId]) { self.queue.async { self.pendingRefreshStoriesForPeerIds.append(contentsOf: peerIds) if self.refreshStoriesForPeerIdsDebounceDisposable == nil { self.refreshStoriesForPeerIdsDebounceDisposable = (Signal.complete() |> delay(0.15, queue: self.queue)).start(completed: { self.refreshStoriesForPeerIdsDebounceDisposable = nil let pendingPeerIds = self.pendingRefreshStoriesForPeerIds self.pendingRefreshStoriesForPeerIds.removeAll() self.internalRefreshStoryStatsForPeerIds(peerIds: pendingPeerIds) }) } } } private func internalRefreshStoryStatsForPeerIds(peerIds: [PeerId]) { self.queue.async { var addedPeerIds: [PeerId] = [] let timestamp = Int32(CFAbsoluteTimeGetCurrent()) for peerId in peerIds { let messageTimestamp = self.refreshStoriesForPeerIdsAndTimestamps[peerId] var refresh = false if let messageTimestamp = messageTimestamp { refresh = messageTimestamp < timestamp - 60 * 60 } else { refresh = true } if refresh { self.refreshStoriesForPeerIdsAndTimestamps[peerId] = timestamp addedPeerIds.append(peerId) } } if !addedPeerIds.isEmpty { let disposableId = self.nextUpdatedUnsupportedMediaDisposableId self.nextUpdatedUnsupportedMediaDisposableId += 1 if let account = self.account { let signal = account.postbox.transaction { transaction -> [(PeerId, Api.InputPeer)] in return addedPeerIds.compactMap { id -> (PeerId, Api.InputPeer)? in if let user = transaction.getPeer(id).flatMap(apiInputPeer) { return (id, user) } else { return nil } } } |> mapToSignal { inputPeers -> Signal in guard !inputPeers.isEmpty else { return .complete() } var requests: [Signal] = [] let batchCount = 100 var startIndex = 0 while startIndex < inputPeers.count { var slice: [(PeerId, Api.InputPeer)] = [] for i in startIndex ..< min(startIndex + batchCount, inputPeers.count) { slice.append(inputPeers[i]) } startIndex += batchCount requests.append(account.network.request(Api.functions.stories.getPeerMaxIDs(id: slice.map(\.1))) |> `catch` { _ -> Signal<[Int32], NoError> in return .single([]) } |> mapToSignal { result -> Signal in return account.postbox.transaction { transaction in for i in 0 ..< result.count { if i < slice.count { let value = result[i] if value <= 0 { transaction.clearStoryItemsInexactMaxId(peerId: slice[i].0) } else { transaction.setStoryItemsInexactMaxId(peerId: slice[i].0, id: value) } } } } |> ignoreValues }) } return combineLatest(requests) |> ignoreValues } |> afterDisposed { [weak self] in self?.queue.async { self?.updatedUnsupportedMediaDisposables.set(nil, forKey: disposableId) } } self.updatedUnsupportedMediaDisposables.set(signal.start(), forKey: disposableId) } } } } public func refreshCanSendMessagesForPeerIds(peerIds: [PeerId]) { self.queue.async { self.pendingRefreshCanSendMessagesForPeerIds.append(contentsOf: peerIds) if self.refreshCanSendMessagesForPeerIdsDebounceDisposable == nil { self.refreshCanSendMessagesForPeerIdsDebounceDisposable = (Signal.complete() |> delay(0.15, queue: self.queue)).start(completed: { self.refreshCanSendMessagesForPeerIdsDebounceDisposable = nil let pendingPeerIds = self.pendingRefreshCanSendMessagesForPeerIds self.pendingRefreshCanSendMessagesForPeerIds.removeAll() self.internalRefreshCanSendMessagesStatsForPeerIds(peerIds: pendingPeerIds) }) } } } private func internalRefreshCanSendMessagesStatsForPeerIds(peerIds: [PeerId]) { self.queue.async { var addedPeerIds: [PeerId] = [] let timestamp = Int32(CFAbsoluteTimeGetCurrent()) for peerId in peerIds { let messageTimestamp = self.refreshCanSendMessagesForPeerIdsAndTimestamps[peerId] var refresh = false if let messageTimestamp = messageTimestamp { refresh = messageTimestamp < timestamp - 60 * 60 } else { refresh = true } if refresh { self.refreshCanSendMessagesForPeerIdsAndTimestamps[peerId] = timestamp addedPeerIds.append(peerId) } } if !addedPeerIds.isEmpty { let disposableId = self.nextUpdatedUnsupportedMediaDisposableId self.nextUpdatedUnsupportedMediaDisposableId += 1 if let account = self.account { let signal = account.postbox.transaction { transaction -> [(PeerId, Api.InputUser)] in return addedPeerIds.compactMap { id -> (PeerId, Api.InputUser)? in if let user = transaction.getPeer(id).flatMap(apiInputUser) { return (id, user) } else { return nil } } } |> mapToSignal { inputPeers -> Signal in guard !inputPeers.isEmpty else { return .complete() } var requests: [Signal] = [] let batchCount = 100 var startIndex = 0 while startIndex < inputPeers.count { var slice: [(PeerId, Api.InputUser)] = [] for i in startIndex ..< min(startIndex + batchCount, inputPeers.count) { slice.append(inputPeers[i]) } startIndex += batchCount requests.append(account.network.request(Api.functions.users.getIsPremiumRequiredToContact(id: slice.map(\.1))) |> `catch` { _ -> Signal<[Api.Bool], NoError> in return .single([]) } |> mapToSignal { result -> Signal in return account.postbox.transaction { transaction in for i in 0 ..< result.count { if i < slice.count { let value = result[i] transaction.updatePeerCachedData(peerIds: Set([slice[i].0]), update: { _, cachedData in var cachedData = cachedData as? CachedUserData ?? CachedUserData(about: nil, botInfo: nil, editableBotInfo: nil, peerStatusSettings: nil, pinnedMessageId: nil, isBlocked: false, commonGroupCount: 0, voiceCallsAvailable: true, videoCallsAvailable: true, callsPrivate: true, canPinMessages: true, hasScheduledMessages: true, autoremoveTimeout: .unknown, themeEmoticon: nil, photo: .unknown, personalPhoto: .unknown, fallbackPhoto: .unknown, premiumGiftOptions: [], voiceMessagesAvailable: true, wallpaper: nil, flags: [], businessHours: nil, businessLocation: nil, greetingMessage: nil, awayMessage: nil, connectedBot: nil, businessIntro: .unknown, birthday: nil, personalChannel: .unknown, botPreview: nil) var flags = cachedData.flags if case .boolTrue = value { flags.insert(.premiumRequired) } else { flags.remove(.premiumRequired) } cachedData = cachedData.withUpdatedFlags(flags) return cachedData }) } } } |> ignoreValues }) } return combineLatest(requests) |> ignoreValues } |> afterDisposed { [weak self] in self?.queue.async { self?.updatedUnsupportedMediaDisposables.set(nil, forKey: disposableId) } } self.updatedUnsupportedMediaDisposables.set(signal.start(), forKey: disposableId) } } } } public func updateMarkAllMentionsSeen(peerId: PeerId, threadId: Int64?) { self.queue.async { guard let account = self.account else { return } let _ = (account.postbox.transaction { transaction -> Set in let ids = Set(transaction.getMessageIndicesWithTag(peerId: peerId, threadId: threadId, namespace: Namespaces.Message.Cloud, tag: .unseenPersonalMessage).map({ $0.id })) for id in ids { transaction.updateMessage(id, update: { currentMessage in let storeForwardInfo = currentMessage.forwardInfo.flatMap(StoreMessageForwardInfo.init) var attributes = currentMessage.attributes for i in 0 ..< attributes.count { if let attribute = attributes[i] as? ConsumablePersonalMentionMessageAttribute { attributes[i] = ConsumablePersonalMentionMessageAttribute(consumed: true, pending: attribute.pending) break } } var tags = currentMessage.tags tags.remove(.unseenPersonalMessage) return .update(StoreMessage(id: currentMessage.id, globallyUniqueId: currentMessage.globallyUniqueId, groupingKey: currentMessage.groupingKey, threadId: currentMessage.threadId, timestamp: currentMessage.timestamp, flags: StoreMessageFlags(currentMessage.flags), tags: tags, globalTags: currentMessage.globalTags, localTags: currentMessage.localTags, forwardInfo: storeForwardInfo, authorId: currentMessage.author?.id, text: currentMessage.text, attributes: attributes, media: currentMessage.media)) }) } if let summary = transaction.getMessageTagSummary(peerId: peerId, threadId: threadId, tagMask: .unseenPersonalMessage, namespace: Namespaces.Message.Cloud, customTag: nil), summary.count > 0 { var maxId: Int32 = summary.range.maxId if let index = transaction.getTopPeerMessageIndex(peerId: peerId, namespace: Namespaces.Message.Cloud) { maxId = index.id.id } transaction.replaceMessageTagSummary(peerId: peerId, threadId: threadId, tagMask: .unseenPersonalMessage, namespace: Namespaces.Message.Cloud, customTag: nil, count: 0, maxId: maxId) addSynchronizeMarkAllUnseenPersonalMessagesOperation(transaction: transaction, peerId: peerId, maxId: summary.range.maxId) } return ids } |> deliverOn(self.queue)).start() } } public func updateMarkMentionsSeenForMessageIds(messageIds: Set) { self.queue.async { var addedMessageIds: [MessageId] = [] for messageId in messageIds { if !self.updatedSeenPersonalMessageIds.contains(messageId) { self.updatedSeenPersonalMessageIds.insert(messageId) addedMessageIds.append(messageId) } } if !addedMessageIds.isEmpty { if let account = self.account { let _ = (account.postbox.transaction { transaction -> Void in for id in addedMessageIds { if let message = transaction.getMessage(id) { var consume = false inner: for attribute in message.attributes { if let attribute = attribute as? ConsumablePersonalMentionMessageAttribute, !attribute.consumed, !attribute.pending { consume = true break inner } } if consume { transaction.updateMessage(id, update: { currentMessage in var attributes = currentMessage.attributes loop: for j in 0 ..< attributes.count { if let attribute = attributes[j] as? ConsumablePersonalMentionMessageAttribute { attributes[j] = ConsumablePersonalMentionMessageAttribute(consumed: attribute.consumed, pending: true) break loop } } return .update(StoreMessage(id: currentMessage.id, globallyUniqueId: currentMessage.globallyUniqueId, groupingKey: currentMessage.groupingKey, threadId: currentMessage.threadId, timestamp: currentMessage.timestamp, flags: StoreMessageFlags(currentMessage.flags), tags: currentMessage.tags, globalTags: currentMessage.globalTags, localTags: currentMessage.localTags, forwardInfo: currentMessage.forwardInfo.flatMap(StoreMessageForwardInfo.init), authorId: currentMessage.author?.id, text: currentMessage.text, attributes: attributes, media: currentMessage.media)) }) transaction.setPendingMessageAction(type: .consumeUnseenPersonalMessage, id: id, action: ConsumePersonalMessageAction()) } } } }).start() } } } } public func updateMarkAllReactionsSeen(peerId: PeerId, threadId: Int64?) { self.queue.async { guard let account = self.account else { return } let _ = (account.postbox.transaction { transaction -> Set in let ids = Set(transaction.getMessageIndicesWithTag(peerId: peerId, threadId: threadId, namespace: Namespaces.Message.Cloud, tag: .unseenReaction).map({ $0.id })) for id in ids { transaction.updateMessage(id, update: { currentMessage in let storeForwardInfo = currentMessage.forwardInfo.flatMap(StoreMessageForwardInfo.init) var attributes = currentMessage.attributes for i in 0 ..< attributes.count { if let attribute = attributes[i] as? ReactionsMessageAttribute { attributes[i] = attribute.withAllSeen() break } } var tags = currentMessage.tags tags.remove(.unseenReaction) return .update(StoreMessage(id: currentMessage.id, globallyUniqueId: currentMessage.globallyUniqueId, groupingKey: currentMessage.groupingKey, threadId: currentMessage.threadId, timestamp: currentMessage.timestamp, flags: StoreMessageFlags(currentMessage.flags), tags: tags, globalTags: currentMessage.globalTags, localTags: currentMessage.localTags, forwardInfo: storeForwardInfo, authorId: currentMessage.author?.id, text: currentMessage.text, attributes: attributes, media: currentMessage.media)) }) } if let summary = transaction.getMessageTagSummary(peerId: peerId, threadId: threadId, tagMask: .unseenReaction, namespace: Namespaces.Message.Cloud, customTag: nil) { var maxId: Int32 = summary.range.maxId if let index = transaction.getTopPeerMessageIndex(peerId: peerId, namespace: Namespaces.Message.Cloud) { maxId = index.id.id } transaction.replaceMessageTagSummary(peerId: peerId, threadId: threadId, tagMask: .unseenReaction, namespace: Namespaces.Message.Cloud, customTag: nil, count: 0, maxId: maxId) addSynchronizeMarkAllUnseenReactionsOperation(transaction: transaction, peerId: peerId, maxId: summary.range.maxId) } return ids } |> deliverOn(self.queue)).start() } } public func updateMarkReactionsSeenForMessageIds(messageIds: Set) { self.queue.async { let addedMessageIds: [MessageId] = Array(messageIds) if !addedMessageIds.isEmpty { if let account = self.account { let _ = (account.postbox.transaction { transaction -> Void in for id in addedMessageIds { if let _ = transaction.getMessage(id) { transaction.updateMessage(id, update: { currentMessage in if !currentMessage.tags.contains(.unseenReaction) { return .skip } var attributes = currentMessage.attributes loop: for j in 0 ..< attributes.count { if let attribute = attributes[j] as? ReactionsMessageAttribute { attributes[j] = attribute.withAllSeen() break loop } } var tags = currentMessage.tags tags.remove(.unseenReaction) return .update(StoreMessage(id: currentMessage.id, globallyUniqueId: currentMessage.globallyUniqueId, groupingKey: currentMessage.groupingKey, threadId: currentMessage.threadId, timestamp: currentMessage.timestamp, flags: StoreMessageFlags(currentMessage.flags), tags: tags, globalTags: currentMessage.globalTags, localTags: currentMessage.localTags, forwardInfo: currentMessage.forwardInfo.flatMap(StoreMessageForwardInfo.init), authorId: currentMessage.author?.id, text: currentMessage.text, attributes: attributes, media: currentMessage.media)) }) if transaction.getPendingMessageAction(type: .readReaction, id: id) == nil { transaction.setPendingMessageAction(type: .readReaction, id: id, action: ReadReactionAction()) } } } }).start() } } } } public func forceUpdateCachedPeerData(peerId: PeerId) { self.queue.async { let context: PeerCachedDataContext if let existingContext = self.cachedDataContexts[peerId] { context = existingContext } else { context = PeerCachedDataContext() self.cachedDataContexts[peerId] = context } context.timestamp = CFAbsoluteTimeGetCurrent() guard let account = self.account else { return } let queue = self.queue context.disposable.set(combineLatest(fetchAndUpdateSupplementalCachedPeerData(peerId: peerId, accountPeerId: account.peerId, network: account.network, postbox: account.postbox), _internal_fetchAndUpdateCachedPeerData(accountPeerId: account.peerId, peerId: peerId, network: account.network, postbox: account.postbox)).start(next: { [weak self] supplementalStatus, cachedStatus in queue.async { guard let strongSelf = self else { return } if !supplementalStatus || !cachedStatus { if let existingContext = strongSelf.cachedDataContexts[peerId] { existingContext.timestamp = nil } } } })) } } private func updateCachedPeerData(peerId: PeerId, accountPeerId: PeerId, viewId: Int32, hasCachedData: Bool) { self.queue.async { let context: PeerCachedDataContext var dataUpdated = false if let existingContext = self.cachedDataContexts[peerId] { context = existingContext context.hasCachedData = hasCachedData if context.timestamp == nil || abs(CFAbsoluteTimeGetCurrent() - context.timestamp!) > 60.0 * 5 { context.timestamp = CFAbsoluteTimeGetCurrent() dataUpdated = true } } else { context = PeerCachedDataContext() context.hasCachedData = hasCachedData self.cachedDataContexts[peerId] = context if !context.hasCachedData || context.timestamp == nil || abs(CFAbsoluteTimeGetCurrent() - context.timestamp!) > 60.0 * 5 { context.timestamp = CFAbsoluteTimeGetCurrent() dataUpdated = true } } context.viewIds.insert(viewId) if dataUpdated { guard let account = self.account else { return } let queue = self.queue context.disposable.set(combineLatest(fetchAndUpdateSupplementalCachedPeerData(peerId: peerId, accountPeerId: accountPeerId, network: account.network, postbox: account.postbox), _internal_fetchAndUpdateCachedPeerData(accountPeerId: account.peerId, peerId: peerId, network: account.network, postbox: account.postbox)).start(next: { [weak self] supplementalStatus, cachedStatus in queue.async { guard let strongSelf = self else { return } if !supplementalStatus || !cachedStatus { if let existingContext = strongSelf.cachedDataContexts[peerId] { existingContext.timestamp = nil } } } })) } } } private func removePeerView(peerId: PeerId, id: Int32) { self.queue.async { if let context = self.cachedDataContexts[peerId] { context.viewIds.remove(id) if context.viewIds.isEmpty { context.disposable.set(nil) context.hasCachedData = false } } } } public func polledChannel(peerId: PeerId) -> Signal { return Signal { subscriber in let disposable = MetaDisposable() self.queue.async { let context: ChannelPollingContext if let current = self.channelPollingContexts[peerId] { context = current } else { context = ChannelPollingContext(queue: self.queue) self.channelPollingContexts[peerId] = context } if context.subscribers.isEmpty { if let account = self.account { let queue = self.queue Logger.shared.log("AccountViewTracker", "polledChannel: \(peerId) add keep polling") context.disposable.set(keepPollingChannel(accountPeerId: account.peerId, postbox: account.postbox, network: account.network, peerId: peerId, stateManager: account.stateManager).start(next: { [weak context] isValidForTimeout in queue.async { guard let context = context else { return } Logger.shared.log("AccountViewTracker", "polledChannel: \(peerId) set context isUpdated true for \(isValidForTimeout) seconds") context.isUpdated.set( .single(true) |> then( .single(false) |> delay(Double(isValidForTimeout), queue: queue) ) ) } })) } } let index = context.subscribers.add(Void()) disposable.set(ActionDisposable { self.queue.async { if let context = self.channelPollingContexts[peerId] { context.subscribers.remove(index) if context.subscribers.isEmpty { Logger.shared.log("AccountViewTracker", "polledChannel: \(peerId) remove keep polling") context.disposable.set(nil) } } } }) } return disposable } } func wrappedMessageHistorySignal(chatLocation: ChatLocationInput, signal: Signal<(MessageHistoryView, ViewUpdateType, InitialMessageHistoryData?), NoError>, fixedCombinedReadStates: MessageHistoryViewReadState?, addHoleIfNeeded: Bool) -> Signal<(MessageHistoryView, ViewUpdateType, InitialMessageHistoryData?), NoError> { var signal = signal if let postbox = self.account?.postbox, let peerId = chatLocation.peerId, let threadId = chatLocation.threadId { let viewKey: PostboxViewKey = .messageHistoryThreadInfo(peerId: peerId, threadId: threadId) let fixedReadStates = Atomic(value: nil) signal = combineLatest(signal, postbox.combinedView(keys: [viewKey])) |> map { view, additionalViews -> (MessageHistoryView, ViewUpdateType, InitialMessageHistoryData?) in var view = view if let threadInfo = additionalViews.views[viewKey] as? MessageHistoryThreadInfoView, let data = threadInfo.info?.data.get(MessageHistoryThreadData.self) { let readState = CombinedPeerReadState(states: [(Namespaces.Message.Cloud, .idBased(maxIncomingReadId: data.maxIncomingReadId, maxOutgoingReadId: data.maxOutgoingReadId, maxKnownId: data.maxKnownMessageId, count: data.incomingUnreadCount, markedUnread: false))]) let fixed: MessageHistoryViewReadState? if let fixedCombinedReadStates = fixedCombinedReadStates { fixed = fixedCombinedReadStates } else { fixed = fixedReadStates.modify { current in if let current = current { return current } else { return .peer([peerId: readState]) } } } view.0 = MessageHistoryView( base: view.0, fixed: fixed, transient: .peer([peerId: readState]) ) } return view } } let history = withState(signal, { [weak self] () -> Int32 in if let strongSelf = self { return OSAtomicIncrement32(&strongSelf.nextViewId) } else { return -1 } }, next: { [weak self] next, viewId in if let strongSelf = self { strongSelf.queue.async { let (messageIds, localWebpages) = pendingWebpages(entries: next.0.entries) strongSelf.updatePendingWebpages(viewId: viewId, threadId: chatLocation.threadId, messageIds: messageIds, localWebpages: localWebpages) let (pollMessageIds, pollMessageDict) = pollMessages(entries: next.0.entries) strongSelf.updatePolls(viewId: viewId, messageIds: pollMessageIds, messages: pollMessageDict) if case let .peer(peerId, _) = chatLocation, peerId.namespace == Namespaces.Peer.CloudChannel { strongSelf.historyViewStateValidationContexts.updateView(id: viewId, view: next.0, location: chatLocation) } else if case let .thread(peerId, _, _) = chatLocation, peerId.namespace == Namespaces.Peer.CloudChannel { strongSelf.historyViewStateValidationContexts.updateView(id: viewId, view: next.0, location: chatLocation) } } } }, disposed: { [weak self] viewId in if let strongSelf = self { strongSelf.queue.async { strongSelf.updatePendingWebpages(viewId: viewId, threadId: chatLocation.threadId, messageIds: [], localWebpages: [:]) strongSelf.updatePolls(viewId: viewId, messageIds: [], messages: [:]) switch chatLocation { case let .peer(peerId, _): if peerId.namespace == Namespaces.Peer.CloudChannel { strongSelf.historyViewStateValidationContexts.updateView(id: viewId, view: nil, location: chatLocation) } case let .thread(peerId, _, _): if peerId.namespace == Namespaces.Peer.CloudChannel { strongSelf.historyViewStateValidationContexts.updateView(id: viewId, view: nil, location: chatLocation) } case .customChatContents: break } } } }) let peerId: PeerId? switch chatLocation { case let .peer(peerIdValue, _): peerId = peerIdValue case let .thread(peerIdValue, _, _): peerId = peerIdValue case .customChatContents: peerId = nil } if let peerId = peerId, peerId.namespace == Namespaces.Peer.CloudChannel { return Signal<(MessageHistoryView, ViewUpdateType, InitialMessageHistoryData?), NoError> { subscriber in let combinedDisposable = MetaDisposable() self.queue.async { let polled = self.polledChannel(peerId: peerId).start() var addHole = false let pollingCompleted: Signal if let context = self.channelPollingContexts[peerId] { if !context.isUpdatedValue { addHole = true } pollingCompleted = context.isUpdated.get() } else { addHole = true pollingCompleted = .single(true) } let resetPeerHoleManagement = self.resetPeerHoleManagement let isAutomaticallyTracked = self.account!.postbox.transaction { transaction -> Bool in if transaction.getPeerChatListIndex(peerId) == nil { if addHole { resetPeerHoleManagement?(peerId) transaction.addHole(peerId: peerId, threadId: nil, namespace: Namespaces.Message.Cloud, space: .everywhere, range: 1 ... (Int32.max - 1)) } return false } else { return true } } let historyIsValid = combineLatest(queue: self.queue, pollingCompleted, isAutomaticallyTracked ) |> map { lhs, rhs -> Bool in return lhs || rhs } var loaded = false let validHistory = historyIsValid |> distinctUntilChanged |> take(until: { next in if next { return SignalTakeAction(passthrough: true, complete: true) } else { return SignalTakeAction(passthrough: true, complete: false) } }) |> mapToSignal { isValid -> Signal<(MessageHistoryView, ViewUpdateType, InitialMessageHistoryData?), NoError> in if isValid { assert(!loaded) loaded = true return history } else { let view = MessageHistoryView(tag: nil, namespaces: .all, entries: [], holeEarlier: true, holeLater: true, isLoading: true) return .single((view, .Initial, nil)) } } let disposable = validHistory.start(next: { next in subscriber.putNext(next) }, completed: { subscriber.putCompletion() }) combinedDisposable.set(ActionDisposable { disposable.dispose() polled.dispose() }) } return combinedDisposable } } else { return history } } public func scheduledMessagesViewForLocation(_ chatLocation: ChatLocationInput, additionalData: [AdditionalMessageHistoryViewData] = []) -> Signal<(MessageHistoryView, ViewUpdateType, InitialMessageHistoryData?), NoError> { if let account = self.account { let signal = account.postbox.aroundMessageHistoryViewForLocation(chatLocation, anchor: .upperBound, ignoreMessagesInTimestampRange: nil, ignoreMessageIds: Set(), count: 200, fixedCombinedReadStates: nil, topTaggedMessageIdNamespaces: [], tag: nil, appendMessagesFromTheSameGroup: false, namespaces: .just(Namespaces.Message.allScheduled), orderStatistics: [], additionalData: additionalData) return withState(signal, { [weak self] () -> Int32 in if let strongSelf = self { return OSAtomicIncrement32(&strongSelf.nextViewId) } else { return -1 } }, next: { [weak self] next, viewId in if let strongSelf = self { strongSelf.queue.async { let (messageIds, localWebpages) = pendingWebpages(entries: next.0.entries) strongSelf.updatePendingWebpages(viewId: viewId, threadId: chatLocation.threadId, messageIds: messageIds, localWebpages: localWebpages) strongSelf.historyViewStateValidationContexts.updateView(id: viewId, view: next.0, location: chatLocation) } } }, disposed: { [weak self] viewId in if let strongSelf = self { strongSelf.queue.async { strongSelf.updatePendingWebpages(viewId: viewId, threadId: chatLocation.threadId, messageIds: [], localWebpages: [:]) strongSelf.historyViewStateValidationContexts.updateView(id: viewId, view: nil, location: nil) } } }) } else { return .never() } } public func quickReplyMessagesViewForLocation(quickReplyId: Int32, additionalData: [AdditionalMessageHistoryViewData] = []) -> Signal<(MessageHistoryView, ViewUpdateType, InitialMessageHistoryData?), NoError> { guard let account = self.account else { return .never() } let chatLocation: ChatLocationInput = .peer(peerId: account.peerId, threadId: Int64(quickReplyId)) let signal = account.postbox.aroundMessageHistoryViewForLocation(chatLocation, anchor: .upperBound, ignoreMessagesInTimestampRange: nil, ignoreMessageIds: Set(), count: 200, fixedCombinedReadStates: nil, topTaggedMessageIdNamespaces: [], tag: nil, appendMessagesFromTheSameGroup: false, namespaces: .just(Namespaces.Message.allQuickReply), orderStatistics: [], additionalData: additionalData) return withState(signal, { [weak self] () -> Int32 in if let strongSelf = self { return OSAtomicIncrement32(&strongSelf.nextViewId) } else { return -1 } }, next: { [weak self] next, viewId in if let strongSelf = self { strongSelf.queue.async { let (messageIds, localWebpages) = pendingWebpages(entries: next.0.entries) strongSelf.updatePendingWebpages(viewId: viewId, threadId: Int64(quickReplyId), messageIds: messageIds, localWebpages: localWebpages) strongSelf.historyViewStateValidationContexts.updateView(id: viewId, view: next.0, location: chatLocation) } } }, disposed: { [weak self] viewId in if let strongSelf = self { strongSelf.queue.async { strongSelf.updatePendingWebpages(viewId: viewId, threadId: Int64(quickReplyId), messageIds: [], localWebpages: [:]) strongSelf.historyViewStateValidationContexts.updateView(id: viewId, view: nil, location: nil) } } }) } public func pendingQuickReplyMessagesViewForLocation(shortcut: String) -> Signal<(MessageHistoryView, ViewUpdateType, InitialMessageHistoryData?), NoError> { guard let account = self.account else { return .never() } let chatLocation: ChatLocationInput = .peer(peerId: account.peerId, threadId: nil) let signal = account.postbox.aroundMessageHistoryViewForLocation(chatLocation, anchor: .upperBound, ignoreMessagesInTimestampRange: nil, ignoreMessageIds: Set(), count: 200, fixedCombinedReadStates: nil, topTaggedMessageIdNamespaces: [], tag: nil, appendMessagesFromTheSameGroup: false, namespaces: .just([Namespaces.Message.QuickReplyLocal]), orderStatistics: [], additionalData: []) |> map { view, update, initialData in var entries: [MessageHistoryEntry] = [] for entry in view.entries { var matches = false inner: for attribute in entry.message.attributes { if let attribute = attribute as? OutgoingQuickReplyMessageAttribute { if attribute.shortcut == shortcut { matches = true } break inner } } if matches { entries.append(entry) } } let mappedView = MessageHistoryView(tag: nil, namespaces: .just([Namespaces.Message.QuickReplyLocal]), entries: entries, holeEarlier: false, holeLater: false, isLoading: false) return (mappedView, update, initialData) } return signal } public func aroundMessageOfInterestHistoryViewForLocation(_ chatLocation: ChatLocationInput, ignoreMessagesInTimestampRange: ClosedRange? = nil, ignoreMessageIds: Set = Set(), count: Int, tag: HistoryViewInputTag? = nil, appendMessagesFromTheSameGroup: Bool = false, orderStatistics: MessageHistoryViewOrderStatistics = [], additionalData: [AdditionalMessageHistoryViewData] = [], useRootInterfaceStateForThread: Bool = false) -> Signal<(MessageHistoryView, ViewUpdateType, InitialMessageHistoryData?), NoError> { if let account = self.account { let signal: Signal<(MessageHistoryView, ViewUpdateType, InitialMessageHistoryData?), NoError> if let peerId = chatLocation.peerId, let threadId = chatLocation.threadId, tag == nil { signal = account.postbox.transaction { transaction -> (MessageHistoryThreadData?, MessageIndex?) in let interfaceState = transaction.getPeerChatThreadInterfaceState(peerId, threadId: threadId) return ( transaction.getMessageHistoryThreadInfo(peerId: peerId, threadId: threadId)?.data.get(MessageHistoryThreadData.self), interfaceState?.historyScrollMessageIndex ) } |> mapToSignal { threadInfo, scrollRestorationIndex -> Signal<(MessageHistoryView, ViewUpdateType, InitialMessageHistoryData?), NoError> in if peerId == account.peerId { let anchor: HistoryViewInputAnchor if let scrollRestorationIndex { anchor = .index(scrollRestorationIndex) } else { anchor = .upperBound } return account.postbox.aroundMessageHistoryViewForLocation( chatLocation, anchor: anchor, ignoreMessagesInTimestampRange: ignoreMessagesInTimestampRange, ignoreMessageIds: ignoreMessageIds, count: count, fixedCombinedReadStates: .peer([peerId: CombinedPeerReadState(states: [ (Namespaces.Message.Cloud, PeerReadState.idBased(maxIncomingReadId: Int32.max - 1, maxOutgoingReadId: Int32.max - 1, maxKnownId: Int32.max - 1, count: 0, markedUnread: false)) ])]), topTaggedMessageIdNamespaces: [], tag: tag, appendMessagesFromTheSameGroup: false, namespaces: .not(Namespaces.Message.allNonRegular), orderStatistics: orderStatistics, additionalData: wrappedHistoryViewAdditionalData(chatLocation: chatLocation, additionalData: additionalData), useRootInterfaceStateForThread: useRootInterfaceStateForThread ) } else { if let threadInfo = threadInfo { let anchor: HistoryViewInputAnchor if threadInfo.maxIncomingReadId <= 1 { anchor = .message(MessageId(peerId: peerId, namespace: Namespaces.Message.Cloud, id: 1)) } else if threadInfo.incomingUnreadCount > 0 && tag == nil { let customUnreadMessageId = MessageId(peerId: peerId, namespace: Namespaces.Message.Cloud, id: threadInfo.maxIncomingReadId) anchor = .message(customUnreadMessageId) } else { anchor = .upperBound } return account.postbox.aroundMessageHistoryViewForLocation( chatLocation, anchor: anchor, ignoreMessagesInTimestampRange: ignoreMessagesInTimestampRange, ignoreMessageIds: ignoreMessageIds, count: count, fixedCombinedReadStates: nil, topTaggedMessageIdNamespaces: [], tag: tag, appendMessagesFromTheSameGroup: false, namespaces: .not(Namespaces.Message.allNonRegular), orderStatistics: orderStatistics, additionalData: wrappedHistoryViewAdditionalData(chatLocation: chatLocation, additionalData: additionalData), useRootInterfaceStateForThread: useRootInterfaceStateForThread ) } } return account.postbox.aroundMessageOfInterestHistoryViewForChatLocation(chatLocation, ignoreMessagesInTimestampRange: ignoreMessagesInTimestampRange, ignoreMessageIds: ignoreMessageIds, count: count, topTaggedMessageIdNamespaces: [Namespaces.Message.Cloud], tag: tag, appendMessagesFromTheSameGroup: appendMessagesFromTheSameGroup, namespaces: .not(Namespaces.Message.allNonRegular), orderStatistics: orderStatistics, customUnreadMessageId: nil, additionalData: wrappedHistoryViewAdditionalData(chatLocation: chatLocation, additionalData: additionalData), useRootInterfaceStateForThread: useRootInterfaceStateForThread) } } else { signal = account.postbox.aroundMessageOfInterestHistoryViewForChatLocation(chatLocation, ignoreMessagesInTimestampRange: ignoreMessagesInTimestampRange, ignoreMessageIds: ignoreMessageIds, count: count, topTaggedMessageIdNamespaces: [Namespaces.Message.Cloud], tag: tag, appendMessagesFromTheSameGroup: appendMessagesFromTheSameGroup, namespaces: .not(Namespaces.Message.allNonRegular), orderStatistics: orderStatistics, customUnreadMessageId: nil, additionalData: wrappedHistoryViewAdditionalData(chatLocation: chatLocation, additionalData: additionalData), useRootInterfaceStateForThread: useRootInterfaceStateForThread) } return wrappedMessageHistorySignal(chatLocation: chatLocation, signal: signal, fixedCombinedReadStates: nil, addHoleIfNeeded: true) } else { return .never() } } public func aroundIdMessageHistoryViewForLocation(_ chatLocation: ChatLocationInput, ignoreMessagesInTimestampRange: ClosedRange? = nil, ignoreMessageIds: Set = Set(), count: Int, ignoreRelatedChats: Bool, messageId: MessageId, tag: HistoryViewInputTag? = nil, appendMessagesFromTheSameGroup: Bool = false, orderStatistics: MessageHistoryViewOrderStatistics = [], additionalData: [AdditionalMessageHistoryViewData] = [], useRootInterfaceStateForThread: Bool = false) -> Signal<(MessageHistoryView, ViewUpdateType, InitialMessageHistoryData?), NoError> { if let account = self.account { let signal = account.postbox.aroundIdMessageHistoryViewForLocation(chatLocation, ignoreMessagesInTimestampRange: ignoreMessagesInTimestampRange, ignoreMessageIds: ignoreMessageIds, count: count, ignoreRelatedChats: ignoreRelatedChats, messageId: messageId, topTaggedMessageIdNamespaces: [Namespaces.Message.Cloud], tag: tag, appendMessagesFromTheSameGroup: appendMessagesFromTheSameGroup, namespaces: .not(Namespaces.Message.allNonRegular), orderStatistics: orderStatistics, additionalData: wrappedHistoryViewAdditionalData(chatLocation: chatLocation, additionalData: additionalData), useRootInterfaceStateForThread: useRootInterfaceStateForThread) return wrappedMessageHistorySignal(chatLocation: chatLocation, signal: signal, fixedCombinedReadStates: nil, addHoleIfNeeded: false) } else { return .never() } } public func aroundMessageHistoryViewForLocation(_ chatLocation: ChatLocationInput, ignoreMessagesInTimestampRange: ClosedRange? = nil, ignoreMessageIds: Set = Set(), index: MessageHistoryAnchorIndex, anchorIndex: MessageHistoryAnchorIndex, count: Int, clipHoles: Bool = true, ignoreRelatedChats: Bool = false, fixedCombinedReadStates: MessageHistoryViewReadState?, tag: HistoryViewInputTag? = nil, appendMessagesFromTheSameGroup: Bool = false, orderStatistics: MessageHistoryViewOrderStatistics = [], additionalData: [AdditionalMessageHistoryViewData] = [], useRootInterfaceStateForThread: Bool = false) -> Signal<(MessageHistoryView, ViewUpdateType, InitialMessageHistoryData?), NoError> { if let account = self.account { let inputAnchor: HistoryViewInputAnchor switch index { case .upperBound: inputAnchor = .upperBound case .lowerBound: inputAnchor = .lowerBound case let .message(index): inputAnchor = .index(index) } let signal = account.postbox.aroundMessageHistoryViewForLocation(chatLocation, anchor: inputAnchor, ignoreMessagesInTimestampRange: ignoreMessagesInTimestampRange, ignoreMessageIds: ignoreMessageIds, count: count, clipHoles: clipHoles, ignoreRelatedChats: ignoreRelatedChats, fixedCombinedReadStates: fixedCombinedReadStates, topTaggedMessageIdNamespaces: [Namespaces.Message.Cloud], tag: tag, appendMessagesFromTheSameGroup: appendMessagesFromTheSameGroup, namespaces: .not(Namespaces.Message.allNonRegular), orderStatistics: orderStatistics, additionalData: wrappedHistoryViewAdditionalData(chatLocation: chatLocation, additionalData: additionalData), useRootInterfaceStateForThread: useRootInterfaceStateForThread) return wrappedMessageHistorySignal(chatLocation: chatLocation, signal: signal, fixedCombinedReadStates: fixedCombinedReadStates, addHoleIfNeeded: false) } else { return .never() } } func wrappedPeerViewSignal(peerId: PeerId, signal: Signal, updateData: Bool) -> Signal { if updateData { self.queue.async { if let existingContext = self.cachedDataContexts[peerId] { existingContext.timestamp = nil } } } return withState(signal, { [weak self] () -> Int32 in if let strongSelf = self { return OSAtomicIncrement32(&strongSelf.nextViewId) } else { return -1 } }, next: { [weak self] next, viewId in if let strongSelf = self, let account = strongSelf.account { strongSelf.updateCachedPeerData(peerId: peerId, accountPeerId: account.peerId, viewId: viewId, hasCachedData: next.cachedData != nil) } }, disposed: { [weak self] viewId in if let strongSelf = self { strongSelf.removePeerView(peerId: peerId, id: viewId) } }) } public func peerView(_ peerId: PeerId, updateData: Bool = false) -> Signal { if let account = self.account { return wrappedPeerViewSignal(peerId: peerId, signal: account.postbox.peerView(id: peerId), updateData: updateData) } else { return .never() } } public func featuredStickerPacks() -> Signal<[FeaturedStickerPackItem], NoError> { return Signal { subscriber in if let account = self.account { let view = account.postbox.combinedView(keys: [.orderedItemList(id: Namespaces.OrderedItemList.CloudFeaturedStickerPacks)]).start(next: { next in if let view = next.views[.orderedItemList(id: Namespaces.OrderedItemList.CloudFeaturedStickerPacks)] as? OrderedItemListView { subscriber.putNext(view.items.map { $0.contents.get(FeaturedStickerPackItem.self)! }) } else { subscriber.putNext([]) } }, completed: { subscriber.putCompletion() }) let disposable = MetaDisposable() self.queue.async { let context: FeaturedStickerPacksContext if let current = self.featuredStickerPacksContext { context = current } else { context = FeaturedStickerPacksContext() self.featuredStickerPacksContext = context } let timestamp = CFAbsoluteTimeGetCurrent() if context.timestamp == nil || abs(context.timestamp! - timestamp) > 60.0 * 60.0 { context.timestamp = timestamp context.disposable.set(updatedFeaturedStickerPacks(network: account.network, postbox: account.postbox, category: .stickerPacks).start()) } let index = context.subscribers.add(Void()) disposable.set(ActionDisposable { self.queue.async { if let context = self.featuredStickerPacksContext { context.subscribers.remove(index) } } }) } return ActionDisposable { view.dispose() disposable.dispose() } } else { subscriber.putNext([]) subscriber.putCompletion() return EmptyDisposable } } } public func featuredEmojiPacks() -> Signal<[FeaturedStickerPackItem], NoError> { return Signal { subscriber in if let account = self.account { let view = account.postbox.combinedView(keys: [.orderedItemList(id: Namespaces.OrderedItemList.CloudFeaturedEmojiPacks)]).start(next: { next in if let view = next.views[.orderedItemList(id: Namespaces.OrderedItemList.CloudFeaturedEmojiPacks)] as? OrderedItemListView { subscriber.putNext(view.items.map { $0.contents.get(FeaturedStickerPackItem.self)! }) } else { subscriber.putNext([]) } }, completed: { subscriber.putCompletion() }) let disposable = MetaDisposable() self.queue.async { let context: FeaturedStickerPacksContext if let current = self.featuredEmojiPacksContext { context = current } else { context = FeaturedStickerPacksContext() self.featuredEmojiPacksContext = context } let timestamp = CFAbsoluteTimeGetCurrent() if context.timestamp == nil || abs(context.timestamp! - timestamp) > 60.0 * 60.0 { context.timestamp = timestamp context.disposable.set(updatedFeaturedStickerPacks(network: account.network, postbox: account.postbox, category: .emojiPacks).start()) } let index = context.subscribers.add(Void()) disposable.set(ActionDisposable { self.queue.async { if let context = self.featuredEmojiPacksContext { context.subscribers.remove(index) } } }) } return ActionDisposable { view.dispose() disposable.dispose() } } else { subscriber.putNext([]) subscriber.putCompletion() return EmptyDisposable } } } public func callListView(type: CallListViewType, index: MessageIndex, count: Int) -> Signal { if let account = self.account { let granularity: Int32 = 60 * 60 * 24 let timezoneOffset: Int32 = { let nowTimestamp = Int32(CFAbsoluteTimeGetCurrent() + NSTimeIntervalSince1970) var now: time_t = time_t(nowTimestamp) var timeinfoNow: tm = tm() localtime_r(&now, &timeinfoNow) return Int32(timeinfoNow.tm_gmtoff) }() let groupingPredicate: (Message, Message) -> Bool = { lhs, rhs in if lhs.id.peerId != rhs.id.peerId { return false } let lhsTimestamp = ((lhs.timestamp + timezoneOffset) / (granularity)) * (granularity) let rhsTimestamp = ((rhs.timestamp + timezoneOffset) / (granularity)) * (granularity) if lhsTimestamp != rhsTimestamp { return false } var lhsVideo = false var lhsMissed = false var lhsOther = false inner: for media in lhs.media { if let action = media as? TelegramMediaAction { if case let .phoneCall(_, discardReason, _, video) = action.action { lhsVideo = video if lhs.flags.contains(.Incoming), let discardReason = discardReason, case .missed = discardReason { lhsMissed = true } else { lhsOther = true } break inner } } } var rhsVideo = false var rhsMissed = false var rhsOther = false inner: for media in rhs.media { if let action = media as? TelegramMediaAction { if case let .phoneCall(_, discardReason, _, video) = action.action { rhsVideo = video if rhs.flags.contains(.Incoming), let discardReason = discardReason, case .missed = discardReason { rhsMissed = true } else { rhsOther = true } break inner } } } if lhsMissed != rhsMissed || lhsOther != rhsOther || lhsVideo != rhsVideo { return false } return true } let key = PostboxViewKey.globalMessageTags(globalTag: type == .all ? GlobalMessageTags.Calls : GlobalMessageTags.MissedCalls, position: index, count: count, groupingPredicate: groupingPredicate) let signal = account.postbox.combinedView(keys: [key]) |> map { view -> GlobalMessageTagsView in let messageView = view.views[key] as! GlobalMessageTagsView return messageView } let managed = withState(signal, { [weak self] () -> Int32 in if let strongSelf = self { return OSAtomicIncrement32(&strongSelf.nextViewId) } else { return -1 } }, next: { [weak self] next, viewId in if let strongSelf = self { var holes = Set() for entry in next.entries { if case let .hole(index) = entry { holes.insert(index) } } strongSelf.updateVisibleCallListHoles(viewId: viewId, holeIds: holes) } }, disposed: { [weak self] viewId in if let strongSelf = self { strongSelf.updateVisibleCallListHoles(viewId: viewId, holeIds: Set()) } }) return managed |> map { view -> CallListView in var entries: [CallListViewEntry] = [] if !view.entries.isEmpty { var currentMessages: [Message] = [] for entry in view.entries { switch entry { case .hole: if !currentMessages.isEmpty { entries.append(.message(currentMessages[currentMessages.count - 1], currentMessages)) currentMessages.removeAll() } //entries.append(.hole(index)) case let .message(message): if currentMessages.isEmpty || groupingPredicate(message, currentMessages[currentMessages.count - 1]) { currentMessages.append(message) } else { if !currentMessages.isEmpty { entries.append(.message(currentMessages[currentMessages.count - 1], currentMessages)) currentMessages.removeAll() } currentMessages.append(message) } } } if !currentMessages.isEmpty { entries.append(.message(currentMessages[currentMessages.count - 1], currentMessages)) currentMessages.removeAll() } } return CallListView(entries: entries, earlier: view.earlier, later: view.later) } } else { return .never() } } public func unseenPersonalMessagesAndReactionCount(peerId: PeerId, threadId: Int64?) -> Signal<(mentionCount: Int32, reactionCount: Int32), NoError> { if let account = self.account { let pendingMentionsKey: PostboxViewKey = .pendingMessageActionsSummary(type: .consumeUnseenPersonalMessage, peerId: peerId, namespace: Namespaces.Message.Cloud) let summaryMentionsKey: PostboxViewKey = .historyTagSummaryView(tag: .unseenPersonalMessage, peerId: peerId, threadId: threadId, namespace: Namespaces.Message.Cloud, customTag: nil) let pendingReactionsKey: PostboxViewKey = .pendingMessageActionsSummary(type: .readReaction, peerId: peerId, namespace: Namespaces.Message.Cloud) let summaryReactionsKey: PostboxViewKey = .historyTagSummaryView(tag: .unseenReaction, peerId: peerId, threadId: threadId, namespace: Namespaces.Message.Cloud, customTag: nil) return account.postbox.combinedView(keys: [pendingMentionsKey, summaryMentionsKey, pendingReactionsKey, summaryReactionsKey]) |> map { views -> (mentionCount: Int32, reactionCount: Int32) in var mentionCount: Int32 = 0 if let view = views.views[pendingMentionsKey] as? PendingMessageActionsSummaryView { mentionCount -= view.count } if let view = views.views[summaryMentionsKey] as? MessageHistoryTagSummaryView { if let unseenCount = view.count { mentionCount += unseenCount } } var reactionCount: Int32 = 0 if let view = views.views[summaryReactionsKey] as? MessageHistoryTagSummaryView { if let unseenCount = view.count { reactionCount += unseenCount } } return (max(0, mentionCount), max(0, reactionCount)) } |> distinctUntilChanged(isEqual: { lhs, rhs in if lhs.mentionCount != rhs.mentionCount { return false } if lhs.reactionCount != rhs.reactionCount { return false } return true }) } else { return .never() } } private func wrappedChatListView(signal: Signal<(ChatListView, ViewUpdateType), NoError>) -> Signal<(ChatListView, ViewUpdateType), NoError> { return withState(signal, { [weak self] () -> Int32 in if let strongSelf = self { return OSAtomicIncrement32(&strongSelf.nextViewId) } else { return -1 } }, next: { [weak self] next, viewId in if let strongSelf = self { strongSelf.queue.async { } } }, disposed: { [weak self] viewId in if let strongSelf = self { strongSelf.queue.async { } } }) } public func tailChatListView(groupId: PeerGroupId, filterPredicate: ChatListFilterPredicate? = nil, count: Int, shouldLoadCanMessagePeer: Bool = false) -> Signal<(ChatListView, ViewUpdateType), NoError> { if let account = self.account { return self.wrappedChatListView(signal: account.postbox.tailChatListView( groupId: groupId, filterPredicate: filterPredicate, count: count, summaryComponents: ChatListEntrySummaryComponents( components: [ ChatListEntryMessageTagSummaryKey( tag: .unseenPersonalMessage, actionType: PendingMessageActionType.consumeUnseenPersonalMessage ): ChatListEntrySummaryComponents.Component( tagSummary: ChatListEntryMessageTagSummaryComponent(namespace: Namespaces.Message.Cloud), actionsSummary: ChatListEntryPendingMessageActionsSummaryComponent(namespace: Namespaces.Message.Cloud) ), ChatListEntryMessageTagSummaryKey( tag: .unseenReaction, actionType: PendingMessageActionType.readReaction ): ChatListEntrySummaryComponents.Component( tagSummary: ChatListEntryMessageTagSummaryComponent(namespace: Namespaces.Message.Cloud), actionsSummary: ChatListEntryPendingMessageActionsSummaryComponent(namespace: Namespaces.Message.Cloud) ) ] ), extractCachedData: shouldLoadCanMessagePeer ? extractCachedDataIsPremiumRequiredToMessage : nil, accountPeerId: shouldLoadCanMessagePeer ? account.peerId : nil )) } else { return .never() } } public func aroundChatListView(groupId: PeerGroupId, filterPredicate: ChatListFilterPredicate? = nil, index: ChatListIndex, count: Int, shouldLoadCanMessagePeer: Bool = false) -> Signal<(ChatListView, ViewUpdateType), NoError> { if let account = self.account { return self.wrappedChatListView(signal: account.postbox.aroundChatListView( groupId: groupId, filterPredicate: filterPredicate, index: index, count: count, summaryComponents: ChatListEntrySummaryComponents( components: [ ChatListEntryMessageTagSummaryKey( tag: .unseenPersonalMessage, actionType: PendingMessageActionType.consumeUnseenPersonalMessage ): ChatListEntrySummaryComponents.Component( tagSummary: ChatListEntryMessageTagSummaryComponent(namespace: Namespaces.Message.Cloud), actionsSummary: ChatListEntryPendingMessageActionsSummaryComponent(namespace: Namespaces.Message.Cloud) ), ChatListEntryMessageTagSummaryKey( tag: .unseenReaction, actionType: PendingMessageActionType.readReaction ): ChatListEntrySummaryComponents.Component( tagSummary: ChatListEntryMessageTagSummaryComponent(namespace: Namespaces.Message.Cloud), actionsSummary: ChatListEntryPendingMessageActionsSummaryComponent(namespace: Namespaces.Message.Cloud) ) ] ), extractCachedData: shouldLoadCanMessagePeer ? extractCachedDataIsPremiumRequiredToMessage : nil, accountPeerId: shouldLoadCanMessagePeer ? account.peerId : nil )) } else { return .never() } } public func addHiddenChatListFilterIds(_ ids: [Int32]) -> Disposable { var indices: [Int32: Int] = [:] var updatedIds = Set() let _ = self.hiddenChatListFilterIdsValue.modify { value in var value = value for id in ids { let bag: Bag if let current = value[id] { bag = current } else { bag = Bag() value[id] = bag } indices[id] = bag.add(Void()) } for (id, bag) in value { if !bag.isEmpty { updatedIds.insert(id) } } return value } self.hiddenChatListFilterIdsPromise.set(updatedIds) return ActionDisposable { [weak self] in DispatchQueue.main.async { guard let `self` = self else { return } var updatedIds = Set() let _ = self.hiddenChatListFilterIdsValue.modify { value in for id in ids { if let bag = value[id], let index = indices[id] { bag.remove(index) } } for (id, bag) in value { if !bag.isEmpty { updatedIds.insert(id) } } return value } self.hiddenChatListFilterIdsPromise.set(updatedIds) } } } public func keepQuickRepliesApproximatelyUpdated() { self.queue.async { guard let account = self.account else { return } let timestamp = CFAbsoluteTimeGetCurrent() if self.quickRepliesUpdateTimestamp + 16 * 60 * 60 < timestamp { self.quickRepliesUpdateTimestamp = timestamp self.quickRepliesUpdateDisposable?.dispose() self.quickRepliesUpdateDisposable = _internal_keepShortcutMessagesUpdated(account: account).startStrict() } } } public func keepBusinessLinksApproximatelyUpdated() { self.queue.async { guard let account = self.account else { return } let timestamp = CFAbsoluteTimeGetCurrent() if self.businessLinksUpdateTimestamp + 16 * 60 * 60 < timestamp { self.businessLinksUpdateTimestamp = timestamp self.businessLinksUpdateDisposable?.dispose() self.businessLinksUpdateDisposable = _internal_refreshBusinessChatLinks(postbox: account.postbox, network: account.network, accountPeerId: account.peerId).startStrict() } } } } public final class ExtractedChatListItemCachedData: Hashable { public let isPremiumRequiredToMessage: Bool public init(isPremiumRequiredToMessage: Bool) { self.isPremiumRequiredToMessage = isPremiumRequiredToMessage } public static func ==(lhs: ExtractedChatListItemCachedData, rhs: ExtractedChatListItemCachedData) -> Bool { return true } public func hash(into hasher: inout Hasher) { hasher.combine(self.isPremiumRequiredToMessage) } } private func extractCachedDataIsPremiumRequiredToMessage(_ cachedData: CachedPeerData) -> AnyHashable? { if let cachedData = cachedData as? CachedUserData { return ExtractedChatListItemCachedData(isPremiumRequiredToMessage: cachedData.flags.contains(.premiumRequired)) } return nil }