import Foundation import Postbox import SwiftSignalKit import TelegramApi import MtProtoKit import SyncCore private final class HistoryStateValidationBatch { private let disposable: Disposable let invalidatedState: HistoryState? var cancelledMessageIds = Set() init(disposable: Disposable, invalidatedState: HistoryState? = nil) { self.disposable = disposable self.invalidatedState = invalidatedState } deinit { self.disposable.dispose() } } private final class HistoryStateValidationContext { var batchReferences: [MessageId: HistoryStateValidationBatch] = [:] var batch: HistoryStateValidationBatch? } private enum HistoryState { case channel(PeerId, ChannelState) //case group(PeerGroupId, TelegramPeerGroupState) case scheduledMessages(PeerId) var hasInvalidationIndex: Bool { switch self { case let .channel(_, state): return state.invalidatedPts != nil /*case let .group(_, state): return state.invalidatedStateIndex != nil*/ case .scheduledMessages: return false } } func isMessageValid(_ message: Message) -> Bool { switch self { case let .channel(_, state): if let invalidatedPts = state.invalidatedPts { var messagePts: Int32? inner: for attribute in message.attributes { if let attribute = attribute as? ChannelMessageStateVersionAttribute { messagePts = attribute.pts break inner } } var requiresValidation = false if let messagePts = messagePts { if messagePts < invalidatedPts { requiresValidation = true } } else { requiresValidation = true } return !requiresValidation } else { return true } /*case let .group(_, state): if let invalidatedStateIndex = state.invalidatedStateIndex { var messageStateIndex: Int32? inner: for attribute in message.attributes { if let attribute = attribute as? PeerGroupMessageStateVersionAttribute { messageStateIndex = attribute.stateIndex break inner } } var requiresValidation = false if let messageStateIndex = messageStateIndex { if messageStateIndex < invalidatedStateIndex { requiresValidation = true } } else { requiresValidation = true } return !requiresValidation } else { return true }*/ case .scheduledMessages: return false } } func matchesPeerId(_ peerId: PeerId) -> Bool { switch self { case let .channel(statePeerId, _): return statePeerId == peerId /*case .group: return true*/ case let .scheduledMessages(statePeerId): return statePeerId == peerId } } } private func slicedForValidationMessages(_ messages: [MessageId]) -> [[MessageId]] { let block = 64 if messages.count <= block { return [messages] } else { var result: [[MessageId]] = [] var offset = 0 while offset < messages.count { result.append(Array(messages[offset ..< min(offset + block, messages.count)])) offset += block } return result } } final class HistoryViewStateValidationContexts { private let queue: Queue private let postbox: Postbox private let network: Network private let accountPeerId: PeerId private var contexts: [Int32: HistoryStateValidationContext] = [:] private var previousPeerValidationTimestamps: [PeerId: Double] = [:] init(queue: Queue, postbox: Postbox, network: Network, accountPeerId: PeerId) { self.queue = queue self.postbox = postbox self.network = network self.accountPeerId = accountPeerId } func updateView(id: Int32, view: MessageHistoryView?, location: ChatLocationInput? = nil) { assert(self.queue.isCurrent()) guard let view = view, view.tagMask == nil || view.tagMask == MessageTags.unseenPersonalMessage || view.tagMask == MessageTags.music || view.tagMask == MessageTags.pinned else { if self.contexts[id] != nil { self.contexts.removeValue(forKey: id) } return } var historyState: HistoryState? for entry in view.additionalData { if case let .peerChatState(peerId, chatState) = entry { if let chatState = chatState as? ChannelState { historyState = .channel(peerId, chatState) } break } } if let location = location, case let .external(peerId, threadId, _) = location { var rangesToInvalidate: [[MessageId]] = [] let addToRange: (MessageId, inout [[MessageId]]) -> Void = { id, ranges in if ranges.isEmpty { ranges = [[id]] } else { ranges[ranges.count - 1].append(id) } } let addRangeBreak: (inout [[MessageId]]) -> Void = { ranges in if ranges.last?.count != 0 { ranges.append([]) } } for entry in view.entries { if entry.message.id.peerId == peerId && entry.message.id.namespace == Namespaces.Message.Cloud { addToRange(entry.message.id, &rangesToInvalidate) } } if !rangesToInvalidate.isEmpty && rangesToInvalidate[rangesToInvalidate.count - 1].isEmpty { rangesToInvalidate.removeLast() } var invalidatedMessageIds = Set() if !rangesToInvalidate.isEmpty { let context: HistoryStateValidationContext if let current = self.contexts[id] { context = current } else { context = HistoryStateValidationContext() self.contexts[id] = context } var addedRanges: [[MessageId]] = [] for messages in rangesToInvalidate { for id in messages { invalidatedMessageIds.insert(id) if context.batchReferences[id] != nil { addRangeBreak(&addedRanges) } else { addToRange(id, &addedRanges) } } addRangeBreak(&addedRanges) } if !addedRanges.isEmpty && addedRanges[addedRanges.count - 1].isEmpty { addedRanges.removeLast() } for rangeMessages in addedRanges { for messages in slicedForValidationMessages(rangeMessages) { let disposable = MetaDisposable() let batch = HistoryStateValidationBatch(disposable: disposable, invalidatedState: historyState) for messageId in messages { context.batchReferences[messageId] = batch } disposable.set((validateReplyThreadMessagesBatch(postbox: self.postbox, network: self.network, accountPeerId: self.accountPeerId, peerId: peerId, threadMessageId: makeThreadIdMessageId(peerId: peerId, threadId: threadId).id, messageIds: messages) |> deliverOn(self.queue)).start(completed: { [weak self, weak batch] in if let strongSelf = self, let context = strongSelf.contexts[id], let batch = batch { var completedMessageIds: [MessageId] = [] for (messageId, messageBatch) in context.batchReferences { if messageBatch === batch { completedMessageIds.append(messageId) } } for messageId in completedMessageIds { //context.batchReferences.removeValue(forKey: messageId) } } })) } } } if let context = self.contexts[id] { var removeIds: [MessageId] = [] for batchMessageId in context.batchReferences.keys { if !invalidatedMessageIds.contains(batchMessageId) { removeIds.append(batchMessageId) } } for messageId in removeIds { context.batchReferences.removeValue(forKey: messageId) } } } else if let historyState = historyState, historyState.hasInvalidationIndex { var rangesToInvalidate: [[MessageId]] = [] let addToRange: (MessageId, inout [[MessageId]]) -> Void = { id, ranges in if ranges.isEmpty { ranges = [[id]] } else { ranges[ranges.count - 1].append(id) } } let addRangeBreak: (inout [[MessageId]]) -> Void = { ranges in if ranges.last?.count != 0 { ranges.append([]) } } for entry in view.entries { if historyState.matchesPeerId(entry.message.id.peerId) && entry.message.id.namespace == Namespaces.Message.Cloud { if let tag = view.tagMask { if !entry.message.tags.contains(tag) { continue } } if !historyState.isMessageValid(entry.message) { addToRange(entry.message.id, &rangesToInvalidate) } else { addRangeBreak(&rangesToInvalidate) } } } if !rangesToInvalidate.isEmpty && rangesToInvalidate[rangesToInvalidate.count - 1].isEmpty { rangesToInvalidate.removeLast() } var invalidatedMessageIds = Set() if !rangesToInvalidate.isEmpty { let context: HistoryStateValidationContext if let current = self.contexts[id] { context = current } else { context = HistoryStateValidationContext() self.contexts[id] = context } var addedRanges: [[MessageId]] = [] for messages in rangesToInvalidate { for id in messages { invalidatedMessageIds.insert(id) if context.batchReferences[id] != nil { addRangeBreak(&addedRanges) } else { addToRange(id, &addedRanges) } } addRangeBreak(&addedRanges) } if !addedRanges.isEmpty && addedRanges[addedRanges.count - 1].isEmpty { addedRanges.removeLast() } for rangeMessages in addedRanges { for messages in slicedForValidationMessages(rangeMessages) { let disposable = MetaDisposable() let batch = HistoryStateValidationBatch(disposable: disposable, invalidatedState: historyState) for messageId in messages { context.batchReferences[messageId] = batch } disposable.set((validateChannelMessagesBatch(postbox: self.postbox, network: self.network, accountPeerId: self.accountPeerId, tag: view.tagMask, messageIds: messages, historyState: historyState) |> deliverOn(self.queue)).start(completed: { [weak self, weak batch] in if let strongSelf = self, let context = strongSelf.contexts[id], let batch = batch { var completedMessageIds: [MessageId] = [] for (messageId, messageBatch) in context.batchReferences { if messageBatch === batch { completedMessageIds.append(messageId) } } for messageId in completedMessageIds { context.batchReferences.removeValue(forKey: messageId) } } })) } } } if let context = self.contexts[id] { var removeIds: [MessageId] = [] for batchMessageId in context.batchReferences.keys { if !invalidatedMessageIds.contains(batchMessageId) { removeIds.append(batchMessageId) } } for messageId in removeIds { context.batchReferences.removeValue(forKey: messageId) } } } else if view.namespaces.contains(Namespaces.Message.ScheduledCloud) { if let _ = self.contexts[id] { } else if let location = location, case let .peer(peerId) = location { let timestamp = self.network.context.globalTime() if let previousTimestamp = self.previousPeerValidationTimestamps[peerId], timestamp < previousTimestamp + 60 { } else { self.previousPeerValidationTimestamps[peerId] = timestamp let context = HistoryStateValidationContext() self.contexts[id] = context let disposable = MetaDisposable() let batch = HistoryStateValidationBatch(disposable: disposable) context.batch = batch let messages: [Message] = view.entries.map { $0.message }.filter { $0.id.namespace == Namespaces.Message.ScheduledCloud } disposable.set((validateScheduledMessagesBatch(postbox: self.postbox, network: self.network, accountPeerId: peerId, tag: nil, messages: messages, historyState: .scheduledMessages(peerId)) |> deliverOn(self.queue)).start(completed: { [weak self] in if let strongSelf = self, let context = strongSelf.contexts[id] { context.batch = nil } })) } } } } } private func hashForScheduledMessages(_ messages: [Message]) -> Int32 { var acc: UInt32 = 0 let sorted = messages.sorted(by: { $0.timestamp > $1.timestamp }) for message in sorted { acc = (acc &* 20261) &+ UInt32(message.id.id) var editTimestamp: Int32 = 0 inner: for attribute in message.attributes { if let attribute = attribute as? EditedMessageAttribute { editTimestamp = attribute.date break inner } } acc = (acc &* 20261) &+ UInt32(editTimestamp) acc = (acc &* 20261) &+ UInt32(message.timestamp) } return Int32(bitPattern: acc & UInt32(0x7FFFFFFF)) } private func hashForMessages(_ messages: [Message], withChannelIds: Bool) -> Int32 { var acc: UInt32 = 0 let sorted = messages.sorted(by: { $0.index > $1.index }) for message in sorted { if withChannelIds { acc = (acc &* 20261) &+ UInt32(message.id.peerId.id) } acc = (acc &* 20261) &+ UInt32(message.id.id) var timestamp = message.timestamp inner: for attribute in message.attributes { if let attribute = attribute as? EditedMessageAttribute { timestamp = attribute.date break inner } } if message.tags.contains(.pinned) { acc = (acc &* 20261) &+ UInt32(1) } acc = (acc &* 20261) &+ UInt32(timestamp) } return Int32(bitPattern: acc & UInt32(0x7FFFFFFF)) } private func hashForMessages(_ messages: [StoreMessage], withChannelIds: Bool) -> Int32 { var acc: UInt32 = 0 for message in messages { if case let .Id(id) = message.id { if withChannelIds { acc = (acc &* 20261) &+ UInt32(id.peerId.id) } acc = (acc &* 20261) &+ UInt32(id.id) var timestamp = message.timestamp inner: for attribute in message.attributes { if let attribute = attribute as? EditedMessageAttribute { timestamp = attribute.date break inner } } acc = (acc &* 20261) &+ UInt32(timestamp) } } return Int32(bitPattern: acc & UInt32(0x7FFFFFFF)) } private enum ValidatedMessages { case notModified case messages([Api.Message], [Api.Chat], [Api.User], Int32?) } private func validateChannelMessagesBatch(postbox: Postbox, network: Network, accountPeerId: PeerId, tag: MessageTags?, messageIds: [MessageId], historyState: HistoryState) -> Signal { return postbox.transaction { transaction -> Signal in var previousMessages: [Message] = [] var previous: [MessageId: Message] = [:] for messageId in messageIds { if let message = transaction.getMessage(messageId) { previousMessages.append(message) previous[message.id] = message } } var signal: Signal switch historyState { case let .channel(peerId, _): let hash = hashForMessages(previousMessages, withChannelIds: false) Logger.shared.log("HistoryValidation", "validate batch for \(peerId): \(previousMessages.map({ $0.id }))") if let peer = transaction.getPeer(peerId), let inputPeer = apiInputPeer(peer) { let requestSignal: Signal if let tag = tag { if tag == MessageTags.unseenPersonalMessage { requestSignal = network.request(Api.functions.messages.getUnreadMentions(peer: inputPeer, offsetId: messageIds[messageIds.count - 1].id + 1, addOffset: 0, limit: Int32(messageIds.count), maxId: messageIds[messageIds.count - 1].id + 1, minId: messageIds[0].id - 1)) } else if let filter = messageFilterForTagMask(tag) { requestSignal = network.request(Api.functions.messages.search(flags: 0, peer: inputPeer, q: "", fromId: nil, topMsgId: nil, filter: filter, minDate: 0, maxDate: 0, offsetId: messageIds[messageIds.count - 1].id + 1, addOffset: 0, limit: Int32(messageIds.count), maxId: messageIds[messageIds.count - 1].id + 1, minId: messageIds[0].id - 1, hash: hash)) } else { assertionFailure() requestSignal = .complete() } } else { requestSignal = network.request(Api.functions.messages.getHistory(peer: inputPeer, offsetId: messageIds[messageIds.count - 1].id + 1, offsetDate: 0, addOffset: 0, limit: Int32(messageIds.count), maxId: messageIds[messageIds.count - 1].id + 1, minId: messageIds[0].id - 1, hash: hash)) } signal = requestSignal |> map { result -> ValidatedMessages in let messages: [Api.Message] let chats: [Api.Chat] let users: [Api.User] var channelPts: Int32? switch result { case let .messages(messages: apiMessages, chats: apiChats, users: apiUsers): messages = apiMessages chats = apiChats users = apiUsers case let .messagesSlice(_, _, _, _, messages: apiMessages, chats: apiChats, users: apiUsers): messages = apiMessages chats = apiChats users = apiUsers case let .channelMessages(_, pts, _, _, apiMessages, apiChats, apiUsers): messages = apiMessages chats = apiChats users = apiUsers channelPts = pts case .messagesNotModified: return .notModified } return .messages(messages, chats, users, channelPts) } } else { return .complete() } default: signal = .complete() } return validateBatch(postbox: postbox, network: network, transaction: transaction, accountPeerId: accountPeerId, tag: tag, historyState: historyState, signal: signal, previous: previous, messageNamespace: Namespaces.Message.Cloud) } |> switchToLatest } private func validateReplyThreadMessagesBatch(postbox: Postbox, network: Network, accountPeerId: PeerId, peerId: PeerId, threadMessageId: Int32, messageIds: [MessageId]) -> Signal { return postbox.transaction { transaction -> Signal in var previousMessages: [Message] = [] var previous: [MessageId: Message] = [:] for messageId in messageIds { if let message = transaction.getMessage(messageId) { previousMessages.append(message) previous[message.id] = message } } var signal: Signal let hash = hashForMessages(previousMessages, withChannelIds: false) Logger.shared.log("HistoryValidation", "validate reply thread batch for \(peerId): \(previousMessages.map({ $0.id }))") if let peer = transaction.getPeer(peerId), let inputPeer = apiInputPeer(peer) { let requestSignal: Signal requestSignal = network.request(Api.functions.messages.getReplies(peer: inputPeer, msgId: threadMessageId, offsetId: messageIds[messageIds.count - 1].id, offsetDate: 0, addOffset: -1, limit: Int32(messageIds.count), maxId: messageIds[messageIds.count - 1].id + 1, minId: messageIds[0].id - 1, hash: hash)) signal = requestSignal |> map { result -> ValidatedMessages in let messages: [Api.Message] let chats: [Api.Chat] let users: [Api.User] var channelPts: Int32? switch result { case let .messages(messages: apiMessages, chats: apiChats, users: apiUsers): messages = apiMessages chats = apiChats users = apiUsers case let .messagesSlice(_, _, _, _, messages: apiMessages, chats: apiChats, users: apiUsers): messages = apiMessages chats = apiChats users = apiUsers case let .channelMessages(_, pts, _, _, apiMessages, apiChats, apiUsers): messages = apiMessages chats = apiChats users = apiUsers channelPts = pts case .messagesNotModified: return .notModified } return .messages(messages, chats, users, channelPts) } } else { return .complete() } return validateReplyThreadBatch(postbox: postbox, network: network, transaction: transaction, accountPeerId: accountPeerId, peerId: peerId, threadId: makeMessageThreadId(MessageId(peerId: peerId, namespace: Namespaces.Message.Cloud, id: threadMessageId)), signal: signal, previous: previous, messageNamespace: Namespaces.Message.Cloud) } |> switchToLatest } private func validateScheduledMessagesBatch(postbox: Postbox, network: Network, accountPeerId: PeerId, tag: MessageTags?, messages: [Message], historyState: HistoryState) -> Signal { return postbox.transaction { transaction -> Signal in var signal: Signal switch historyState { case let .scheduledMessages(peerId): if let peer = transaction.getPeer(peerId), let inputPeer = apiInputPeer(peer) { let hash = hashForScheduledMessages(messages) signal = network.request(Api.functions.messages.getScheduledHistory(peer: inputPeer, hash: hash)) |> map { result -> ValidatedMessages in let messages: [Api.Message] let chats: [Api.Chat] let users: [Api.User] switch result { case let .messages(messages: apiMessages, chats: apiChats, users: apiUsers): messages = apiMessages chats = apiChats users = apiUsers case let .messagesSlice(_, _, _, _, messages: apiMessages, chats: apiChats, users: apiUsers): messages = apiMessages chats = apiChats users = apiUsers case let .channelMessages(_, _, _, _, apiMessages, apiChats, apiUsers): messages = apiMessages chats = apiChats users = apiUsers case .messagesNotModified: return .notModified } return .messages(messages, chats, users, nil) } } else { signal = .complete() } default: signal = .complete() } var previous: [MessageId: Message] = [:] for message in messages { previous[message.id] = message } return validateBatch(postbox: postbox, network: network, transaction: transaction, accountPeerId: accountPeerId, tag: tag, historyState: historyState, signal: signal, previous: previous, messageNamespace: Namespaces.Message.ScheduledCloud) } |> switchToLatest } private func validateBatch(postbox: Postbox, network: Network, transaction: Transaction, accountPeerId: PeerId, tag: MessageTags?, historyState: HistoryState, signal: Signal, previous: [MessageId: Message], messageNamespace: MessageId.Namespace) -> Signal { return signal |> map(Optional.init) |> `catch` { _ -> Signal in return .single(nil) } |> mapToSignal { result -> Signal in guard let result = result else { return .complete() } switch result { case let .messages(messages, _, _, channelPts): var storeMessages: [StoreMessage] = [] for message in messages { if let storeMessage = StoreMessage(apiMessage: message, namespace: messageNamespace) { var attributes = storeMessage.attributes if let channelPts = channelPts { attributes.append(ChannelMessageStateVersionAttribute(pts: channelPts)) } storeMessages.append(storeMessage.withUpdatedAttributes(attributes)) } } var validMessageIds = Set() for message in storeMessages { if case let .Id(id) = message.id { validMessageIds.insert(id) } } var maybeRemovedMessageIds: [MessageId] = [] for id in previous.keys { if !validMessageIds.contains(id) { maybeRemovedMessageIds.append(id) } } let actuallyRemovedMessagesSignal: Signal, NoError> if maybeRemovedMessageIds.isEmpty { actuallyRemovedMessagesSignal = .single(Set()) } else { switch historyState { case let .channel(peerId, _): actuallyRemovedMessagesSignal = postbox.transaction { transaction -> Signal, NoError> in if let inputChannel = transaction.getPeer(peerId).flatMap(apiInputChannel) { return network.request(Api.functions.channels.getMessages(channel: inputChannel, id: maybeRemovedMessageIds.map({ Api.InputMessage.inputMessageID(id: $0.id) }))) |> map { result -> Set in let apiMessages: [Api.Message] switch result { case let .channelMessages(_, _, _, _, messages, _, _): apiMessages = messages case let .messages(messages, _, _): apiMessages = messages case let .messagesSlice(_, _, _, _, messages, _, _): apiMessages = messages case .messagesNotModified: return Set() } var ids = Set() for message in apiMessages { if let parsedMessage = StoreMessage(apiMessage: message, namespace: messageNamespace), case let .Id(id) = parsedMessage.id { if let tag = tag { if parsedMessage.tags.contains(tag) { ids.insert(id) } } else { ids.insert(id) } } } return Set(maybeRemovedMessageIds).subtracting(ids) } |> `catch` { _ -> Signal, NoError> in return .single(Set(maybeRemovedMessageIds)) } } return .single(Set(maybeRemovedMessageIds)) } |> switchToLatest default: actuallyRemovedMessagesSignal = .single(Set(maybeRemovedMessageIds)) } } return actuallyRemovedMessagesSignal |> mapToSignal { removedMessageIds -> Signal in return postbox.transaction { transaction -> Void in var validMessageIds = Set() for message in storeMessages { if case let .Id(id) = message.id { validMessageIds.insert(id) let previousMessage = previous[id] ?? transaction.getMessage(id) if let previousMessage = previousMessage { var updatedTimestamp = message.timestamp inner: for attribute in message.attributes { if let attribute = attribute as? EditedMessageAttribute { updatedTimestamp = attribute.date break inner } } var timestamp = previousMessage.timestamp inner: for attribute in previousMessage.attributes { if let attribute = attribute as? EditedMessageAttribute { timestamp = attribute.date break inner } } transaction.updateMessage(id, update: { currentMessage in if updatedTimestamp != timestamp { var updatedLocalTags = message.localTags if currentMessage.localTags.contains(.OutgoingLiveLocation) { updatedLocalTags.insert(.OutgoingLiveLocation) } return .update(message.withUpdatedLocalTags(updatedLocalTags)) } else { var storeForwardInfo: StoreMessageForwardInfo? if let forwardInfo = currentMessage.forwardInfo { storeForwardInfo = StoreMessageForwardInfo(authorId: forwardInfo.author?.id, sourceId: forwardInfo.source?.id, sourceMessageId: forwardInfo.sourceMessageId, date: forwardInfo.date, authorSignature: forwardInfo.authorSignature, psaType: forwardInfo.psaType) } var attributes = currentMessage.attributes if let channelPts = channelPts { for i in (0 ..< attributes.count).reversed() { if let _ = attributes[i] as? ChannelMessageStateVersionAttribute { attributes.remove(at: i) } } attributes.append(ChannelMessageStateVersionAttribute(pts: channelPts)) } let updatedFlags = StoreMessageFlags(currentMessage.flags) return .update(StoreMessage(id: message.id, globallyUniqueId: currentMessage.globallyUniqueId, groupingKey: currentMessage.groupingKey, threadId: currentMessage.threadId, timestamp: currentMessage.timestamp, flags: updatedFlags, tags: currentMessage.tags, globalTags: currentMessage.globalTags, localTags: currentMessage.localTags, forwardInfo: storeForwardInfo, authorId: currentMessage.author?.id, text: currentMessage.text, attributes: attributes, media: currentMessage.media)) } }) if previous[id] == nil { print("\(id) missing") } } else { let _ = transaction.addMessages([message], location: .Random) } } } if let tag = tag { for (_, previousMessage) in previous { if !validMessageIds.contains(previousMessage.id) { transaction.updateMessage(previousMessage.id, update: { currentMessage in var updatedTags = currentMessage.tags updatedTags.remove(tag) var storeForwardInfo: StoreMessageForwardInfo? if let forwardInfo = currentMessage.forwardInfo { storeForwardInfo = StoreMessageForwardInfo(authorId: forwardInfo.author?.id, sourceId: forwardInfo.source?.id, sourceMessageId: forwardInfo.sourceMessageId, date: forwardInfo.date, authorSignature: forwardInfo.authorSignature, psaType: forwardInfo.psaType) } var attributes = currentMessage.attributes for i in (0 ..< attributes.count).reversed() { switch historyState { case .channel: if let _ = attributes[i] as? ChannelMessageStateVersionAttribute { attributes.remove(at: i) } default: break } } switch historyState { case let .channel(_, channelState): attributes.append(ChannelMessageStateVersionAttribute(pts: channelState.pts)) default: break } return .update(StoreMessage(id: currentMessage.id, globallyUniqueId: currentMessage.globallyUniqueId, groupingKey: currentMessage.groupingKey, threadId: currentMessage.threadId, timestamp: currentMessage.timestamp, flags: StoreMessageFlags(currentMessage.flags), tags: updatedTags, globalTags: currentMessage.globalTags, localTags: currentMessage.localTags, forwardInfo: storeForwardInfo, authorId: currentMessage.author?.id, text: currentMessage.text, attributes: attributes, media: currentMessage.media)) }) } } } for id in removedMessageIds { if !validMessageIds.contains(id) { if let tag = tag { transaction.updateMessage(id, update: { currentMessage in var updatedTags = currentMessage.tags updatedTags.remove(tag) var storeForwardInfo: StoreMessageForwardInfo? if let forwardInfo = currentMessage.forwardInfo { storeForwardInfo = StoreMessageForwardInfo(authorId: forwardInfo.author?.id, sourceId: forwardInfo.source?.id, sourceMessageId: forwardInfo.sourceMessageId, date: forwardInfo.date, authorSignature: forwardInfo.authorSignature, psaType: forwardInfo.psaType) } var attributes = currentMessage.attributes for i in (0 ..< attributes.count).reversed() { switch historyState { case .channel: if let _ = attributes[i] as? ChannelMessageStateVersionAttribute { attributes.remove(at: i) } default: break } } switch historyState { case let .channel(_, channelState): attributes.append(ChannelMessageStateVersionAttribute(pts: channelState.pts)) default: break } return .update(StoreMessage(id: currentMessage.id, globallyUniqueId: currentMessage.globallyUniqueId, groupingKey: currentMessage.groupingKey, threadId: currentMessage.threadId, timestamp: currentMessage.timestamp, flags: StoreMessageFlags(currentMessage.flags), tags: updatedTags, globalTags: currentMessage.globalTags, localTags: currentMessage.localTags, forwardInfo: storeForwardInfo, authorId: currentMessage.author?.id, text: currentMessage.text, attributes: attributes, media: currentMessage.media)) }) } else { deleteMessages(transaction: transaction, mediaBox: postbox.mediaBox, ids: [id]) Logger.shared.log("HistoryValidation", "deleting message \(id) in \(id.peerId)") } } } } } case .notModified: return postbox.transaction { transaction -> Void in for id in previous.keys { transaction.updateMessage(id, update: { currentMessage in var storeForwardInfo: StoreMessageForwardInfo? if let forwardInfo = currentMessage.forwardInfo { storeForwardInfo = StoreMessageForwardInfo(authorId: forwardInfo.author?.id, sourceId: forwardInfo.source?.id, sourceMessageId: forwardInfo.sourceMessageId, date: forwardInfo.date, authorSignature: forwardInfo.authorSignature, psaType: forwardInfo.psaType) } var attributes = currentMessage.attributes for i in (0 ..< attributes.count).reversed() { switch historyState { case .channel: if let _ = attributes[i] as? ChannelMessageStateVersionAttribute { attributes.remove(at: i) } default: break } } switch historyState { case let .channel(_, channelState): attributes.append(ChannelMessageStateVersionAttribute(pts: channelState.pts)) default: break } return .update(StoreMessage(id: currentMessage.id, globallyUniqueId: currentMessage.globallyUniqueId, groupingKey: currentMessage.groupingKey, threadId: currentMessage.threadId, timestamp: currentMessage.timestamp, flags: StoreMessageFlags(currentMessage.flags), tags: currentMessage.tags, globalTags: currentMessage.globalTags, localTags: currentMessage.localTags, forwardInfo: storeForwardInfo, authorId: currentMessage.author?.id, text: currentMessage.text, attributes: attributes, media: currentMessage.media)) }) } } } } } private func validateReplyThreadBatch(postbox: Postbox, network: Network, transaction: Transaction, accountPeerId: PeerId, peerId: PeerId, threadId: Int64, signal: Signal, previous: [MessageId: Message], messageNamespace: MessageId.Namespace) -> Signal { return signal |> map(Optional.init) |> `catch` { _ -> Signal in return .single(nil) } |> mapToSignal { result -> Signal in guard let result = result else { return .complete() } switch result { case let .messages(messages, _, _, channelPts): var storeMessages: [StoreMessage] = [] for message in messages { if let storeMessage = StoreMessage(apiMessage: message, namespace: messageNamespace) { var attributes = storeMessage.attributes if let channelPts = channelPts { attributes.append(ChannelMessageStateVersionAttribute(pts: channelPts)) } storeMessages.append(storeMessage.withUpdatedAttributes(attributes)) } } var validMessageIds = Set() for message in storeMessages { if case let .Id(id) = message.id { validMessageIds.insert(id) } } var maybeRemovedMessageIds: [MessageId] = [] for id in previous.keys { if !validMessageIds.contains(id) { maybeRemovedMessageIds.append(id) } } let actuallyRemovedMessagesSignal: Signal, NoError> if maybeRemovedMessageIds.isEmpty { actuallyRemovedMessagesSignal = .single(Set()) } else { actuallyRemovedMessagesSignal = postbox.transaction { transaction -> Signal, NoError> in if let inputChannel = transaction.getPeer(peerId).flatMap(apiInputChannel) { return network.request(Api.functions.channels.getMessages(channel: inputChannel, id: maybeRemovedMessageIds.map({ Api.InputMessage.inputMessageID(id: $0.id) }))) |> map { result -> Set in let apiMessages: [Api.Message] switch result { case let .channelMessages(_, _, _, _, messages, _, _): apiMessages = messages case let .messages(messages, _, _): apiMessages = messages case let .messagesSlice(_, _, _, _, messages, _, _): apiMessages = messages case .messagesNotModified: return Set() } var ids = Set() for message in apiMessages { if let parsedMessage = StoreMessage(apiMessage: message, namespace: messageNamespace), case let .Id(id) = parsedMessage.id { ids.insert(id) } } return Set(maybeRemovedMessageIds).subtracting(ids) } |> `catch` { _ -> Signal, NoError> in return .single(Set(maybeRemovedMessageIds)) } } return .single(Set(maybeRemovedMessageIds)) } |> switchToLatest } return actuallyRemovedMessagesSignal |> mapToSignal { removedMessageIds -> Signal in return postbox.transaction { transaction -> Void in var validMessageIds = Set() for message in storeMessages { if case let .Id(id) = message.id { validMessageIds.insert(id) let previousMessage = previous[id] ?? transaction.getMessage(id) if let previousMessage = previousMessage { var updatedTimestamp = message.timestamp inner: for attribute in message.attributes { if let attribute = attribute as? EditedMessageAttribute { updatedTimestamp = attribute.date break inner } } var timestamp = previousMessage.timestamp inner: for attribute in previousMessage.attributes { if let attribute = attribute as? EditedMessageAttribute { timestamp = attribute.date break inner } } transaction.updateMessage(id, update: { currentMessage in if updatedTimestamp != timestamp { var updatedLocalTags = message.localTags if currentMessage.localTags.contains(.OutgoingLiveLocation) { updatedLocalTags.insert(.OutgoingLiveLocation) } return .update(message.withUpdatedLocalTags(updatedLocalTags)) } else { var storeForwardInfo: StoreMessageForwardInfo? if let forwardInfo = currentMessage.forwardInfo { storeForwardInfo = StoreMessageForwardInfo(authorId: forwardInfo.author?.id, sourceId: forwardInfo.source?.id, sourceMessageId: forwardInfo.sourceMessageId, date: forwardInfo.date, authorSignature: forwardInfo.authorSignature, psaType: forwardInfo.psaType) } var attributes = currentMessage.attributes if let channelPts = channelPts { for i in (0 ..< attributes.count).reversed() { if let _ = attributes[i] as? ChannelMessageStateVersionAttribute { attributes.remove(at: i) } } attributes.append(ChannelMessageStateVersionAttribute(pts: channelPts)) } let updatedFlags = StoreMessageFlags(currentMessage.flags) return .update(StoreMessage(id: message.id, globallyUniqueId: currentMessage.globallyUniqueId, groupingKey: currentMessage.groupingKey, threadId: currentMessage.threadId, timestamp: currentMessage.timestamp, flags: updatedFlags, tags: currentMessage.tags, globalTags: currentMessage.globalTags, localTags: currentMessage.localTags, forwardInfo: storeForwardInfo, authorId: currentMessage.author?.id, text: currentMessage.text, attributes: attributes, media: currentMessage.media)) } }) if previous[id] == nil { print("\(id) missing") } } else { let _ = transaction.addMessages([message], location: .Random) } } } for id in removedMessageIds { if !validMessageIds.contains(id) { deleteMessages(transaction: transaction, mediaBox: postbox.mediaBox, ids: [id]) Logger.shared.log("HistoryValidation", "deleting thread message \(id) in \(id.peerId)") } } } } case .notModified: return .complete() } } }