From d7ff3e80faa9f1ad391ceba6816b197b9d1c7bfc Mon Sep 17 00:00:00 2001 From: Peter <> Date: Tue, 25 Sep 2018 01:36:14 +0100 Subject: [PATCH] no message --- TelegramCore/EnqueueMessage.swift | 2 +- TelegramCore/Network.swift | 68 ++++++++++++++++++- .../OutgoingMessageInfoAttribute.swift | 31 ++++++++- TelegramCore/PendingMessageManager.swift | 52 +++++++++++--- .../PendingMessageUploadedContent.swift | 4 +- 5 files changed, 143 insertions(+), 14 deletions(-) diff --git a/TelegramCore/EnqueueMessage.swift b/TelegramCore/EnqueueMessage.swift index e69435a656..b863b9ebe9 100644 --- a/TelegramCore/EnqueueMessage.swift +++ b/TelegramCore/EnqueueMessage.swift @@ -285,7 +285,7 @@ func enqueueMessages(transaction: Transaction, account: Account, peerId: PeerId, if transformedMedia { infoFlags.insert(.transformedMedia) } - attributes.append(OutgoingMessageInfoAttribute(uniqueId: randomId, flags: infoFlags)) + attributes.append(OutgoingMessageInfoAttribute(uniqueId: randomId, flags: infoFlags, acknowledged: false)) globallyUniqueIds.append(randomId) switch message { diff --git a/TelegramCore/Network.swift b/TelegramCore/Network.swift index e8fa18e5c0..ac54377524 100644 --- a/TelegramCore/Network.swift +++ b/TelegramCore/Network.swift @@ -490,6 +490,11 @@ private extension NetworkContextProxyId { } } +public enum NetworkRequestResult { + case result(T) + case acknowledged +} + public final class Network: NSObject, MTRequestMessageServiceDelegate { private let queue: Queue public let datacenterId: Int @@ -695,7 +700,68 @@ public final class Network: NSObject, MTRequestMessageServiceDelegate { } } - public func request(_ data: (FunctionDescription, Buffer, DeserializeFunctionResponse), tag: NetworkRequestDependencyTag? = nil, automaticFloodWait: Bool = true) -> Signal { + public func requestWithAcknowledgement(_ data: (FunctionDescription, Buffer, DeserializeFunctionResponse), tag: NetworkRequestDependencyTag? = nil, automaticFloodWait: Bool = true) -> Signal, MTRpcError> { + let requestService = self.requestService + return Signal { subscriber in + let request = MTRequest() + + request.setPayload(data.1.makeData() as Data, metadata: WrappedRequestMetadata(metadata: WrappedFunctionDescription(data.0), tag: tag), responseParser: { response in + if let result = data.2.parse(Buffer(data: response)) { + return BoxedMessage(result) + } + return nil + }) + + request.dependsOnPasswordEntry = false + + request.shouldContinueExecutionWithErrorContext = { errorContext in + guard let errorContext = errorContext else { + return true + } + if errorContext.floodWaitSeconds > 0 && !automaticFloodWait { + return false + } + return true + } + + request.acknowledgementReceived = { + subscriber.putNext(.acknowledged) + } + + request.completed = { (boxedResponse, timestamp, error) -> () in + if let error = error { + subscriber.putError(error) + } else { + if let result = (boxedResponse as! BoxedMessage).body as? T { + subscriber.putNext(.result(result)) + subscriber.putCompletion() + } + else { + subscriber.putError(MTRpcError(errorCode: 500, errorDescription: "TL_VERIFICATION_ERROR")) + } + } + } + + if let tag = tag { + request.shouldDependOnRequest = { other in + if let other = other, let metadata = other.metadata as? WrappedRequestMetadata, let otherTag = metadata.tag { + return tag.shouldDependOn(other: otherTag) + } + return false + } + } + + let internalId: Any! = request.internalId + + requestService.add(request) + + return ActionDisposable { [weak requestService] in + requestService?.removeRequest(byInternalId: internalId) + } + } + } + + public func request(_ data: (FunctionDescription, Buffer, DeserializeFunctionResponse), tag: NetworkRequestDependencyTag? = nil, automaticFloodWait: Bool = true) -> Signal { let requestService = self.requestService return Signal { subscriber in let request = MTRequest() diff --git a/TelegramCore/OutgoingMessageInfoAttribute.swift b/TelegramCore/OutgoingMessageInfoAttribute.swift index 27216a9fe6..123124fc26 100644 --- a/TelegramCore/OutgoingMessageInfoAttribute.swift +++ b/TelegramCore/OutgoingMessageInfoAttribute.swift @@ -22,23 +22,50 @@ public struct OutgoingMessageInfoFlags: OptionSet { public class OutgoingMessageInfoAttribute: MessageAttribute { public let uniqueId: Int64 public let flags: OutgoingMessageInfoFlags + public let acknowledged: Bool - init(uniqueId: Int64, flags: OutgoingMessageInfoFlags) { + init(uniqueId: Int64, flags: OutgoingMessageInfoFlags, acknowledged: Bool) { self.uniqueId = uniqueId self.flags = flags + self.acknowledged = acknowledged } required public init(decoder: PostboxDecoder) { self.uniqueId = decoder.decodeInt64ForKey("u", orElse: 0) self.flags = OutgoingMessageInfoFlags(rawValue: decoder.decodeInt32ForKey("f", orElse: 0)) + self.acknowledged = decoder.decodeInt32ForKey("ack", orElse: 0) != 0 } public func encode(_ encoder: PostboxEncoder) { encoder.encodeInt64(self.uniqueId, forKey: "u") encoder.encodeInt32(self.flags.rawValue, forKey: "f") + encoder.encodeInt32(self.acknowledged ? 1 : 0, forKey: "ack") } public func withUpdatedFlags(_ flags: OutgoingMessageInfoFlags) -> OutgoingMessageInfoAttribute { - return OutgoingMessageInfoAttribute(uniqueId: self.uniqueId, flags: flags) + return OutgoingMessageInfoAttribute(uniqueId: self.uniqueId, flags: flags, acknowledged: self.acknowledged) + } + + public func withUpdatedAcknowledged(_ acknowledged: Bool) -> OutgoingMessageInfoAttribute { + return OutgoingMessageInfoAttribute(uniqueId: self.uniqueId, flags: self.flags, acknowledged: acknowledged) + } +} + +public extension Message { + public var isSentOrAcknowledged: Bool { + if self.flags.contains(.Failed) { + return false + } else if self.flags.isSending { + for attribute in self.attributes { + if let attribute = attribute as? OutgoingMessageInfoAttribute { + if attribute.acknowledged { + return true + } + } + } + return false + } else { + return true + } } } diff --git a/TelegramCore/PendingMessageManager.swift b/TelegramCore/PendingMessageManager.swift index c90e137950..e16f309459 100644 --- a/TelegramCore/PendingMessageManager.swift +++ b/TelegramCore/PendingMessageManager.swift @@ -855,20 +855,23 @@ public final class PendingMessageManager { let dependencyTag = PendingMessageRequestDependencyTag(messageId: messageId) - let sendMessageRequest: Signal + let sendMessageRequest: Signal, MTRpcError> switch content.content { case .text: - sendMessageRequest = network.request(Api.functions.messages.sendMessage(flags: flags, peer: inputPeer, replyToMsgId: replyMessageId, message: message.text, randomId: uniqueId, replyMarkup: nil, entities: messageEntities), tag: dependencyTag) + sendMessageRequest = network.requestWithAcknowledgement(Api.functions.messages.sendMessage(flags: flags, peer: inputPeer, replyToMsgId: replyMessageId, message: message.text, randomId: uniqueId, replyMarkup: nil, entities: messageEntities), tag: dependencyTag) case let .media(inputMedia, text): sendMessageRequest = network.request(Api.functions.messages.sendMedia(flags: flags, peer: inputPeer, replyToMsgId: replyMessageId, media: inputMedia, message: text, randomId: uniqueId, replyMarkup: nil, entities: messageEntities), tag: dependencyTag) + |> map(NetworkRequestResult.result) case let .forward(sourceInfo): if let forwardSourceInfoAttribute = forwardSourceInfoAttribute, let sourcePeer = transaction.getPeer(forwardSourceInfoAttribute.messageId.peerId), let sourceInputPeer = apiInputPeer(sourcePeer) { sendMessageRequest = network.request(Api.functions.messages.forwardMessages(flags: 0, fromPeer: sourceInputPeer, id: [sourceInfo.messageId.id], randomId: [uniqueId], toPeer: inputPeer), tag: dependencyTag) + |> map(NetworkRequestResult.result) } else { sendMessageRequest = .fail(MTRpcError(errorCode: 400, errorDescription: "internal")) } case let .chatContextResult(chatContextResult): sendMessageRequest = network.request(Api.functions.messages.sendInlineBotResult(flags: flags, peer: inputPeer, replyToMsgId: replyMessageId, randomId: uniqueId, queryId: chatContextResult.queryId, id: chatContextResult.id)) + |> map(NetworkRequestResult.result) case .secretMedia: assertionFailure() sendMessageRequest = .fail(MTRpcError(errorCode: 400, errorDescription: "internal")) @@ -877,14 +880,21 @@ public final class PendingMessageManager { return sendMessageRequest |> deliverOn(queue) |> mapToSignal { result -> Signal in - if let strongSelf = self { - return strongSelf.applySentMessage(postbox: postbox, stateManager: stateManager, message: message, result: result) - |> mapError { _ -> MTRpcError in - return MTRpcError(errorCode: 400, errorDescription: "internal") - } - } else { + guard let strongSelf = self else { return .never() } + switch result { + case .acknowledged: + return strongSelf.applyAcknowledgedMessage(postbox: postbox, message: message) + |> mapError { _ -> MTRpcError in + return MTRpcError(errorCode: 400, errorDescription: "internal") + } + case let .result(result): + return strongSelf.applySentMessage(postbox: postbox, stateManager: stateManager, message: message, result: result) + |> mapError { _ -> MTRpcError in + return MTRpcError(errorCode: 400, errorDescription: "internal") + } + } } |> `catch` { error -> Signal in queue.async { @@ -925,6 +935,32 @@ public final class PendingMessageManager { } |> switchToLatest } + private func applyAcknowledgedMessage(postbox: Postbox, message: Message) -> Signal { + return postbox.transaction { transaction -> Void in + transaction.updateMessage(message.id, update: { currentMessage in + var attributes = message.attributes + var found = false + for i in 0 ..< attributes.count { + if let attribute = attributes[i] as? OutgoingMessageInfoAttribute { + attributes[i] = attribute.withUpdatedAcknowledged(true) + found = true + break + } + } + + if !found { + return .skip + } + + var storeForwardInfo: StoreMessageForwardInfo? + if let forwardInfo = currentMessage.forwardInfo { + storeForwardInfo = StoreMessageForwardInfo(authorId: forwardInfo.author.id, sourceId: forwardInfo.source?.id, sourceMessageId: forwardInfo.sourceMessageId, date: forwardInfo.date, authorSignature: forwardInfo.authorSignature) + } + return .update(StoreMessage(id: currentMessage.id, globallyUniqueId: currentMessage.globallyUniqueId, groupingKey: currentMessage.groupingKey, timestamp: currentMessage.timestamp, flags: StoreMessageFlags(currentMessage.flags), tags: currentMessage.tags, globalTags: currentMessage.globalTags, localTags: currentMessage.localTags, forwardInfo: storeForwardInfo, authorId: currentMessage.author?.id, text: currentMessage.text, attributes: attributes, media: currentMessage.media)) + }) + } + } + private func applySentMessage(postbox: Postbox, stateManager: AccountStateManager, message: Message, result: Api.Updates) -> Signal { return applyUpdateMessage(postbox: postbox, stateManager: stateManager, message: message, result: result) |> afterDisposed { [weak self] in if let strongSelf = self { diff --git a/TelegramCore/PendingMessageUploadedContent.swift b/TelegramCore/PendingMessageUploadedContent.swift index 8584153cbe..84840b3774 100644 --- a/TelegramCore/PendingMessageUploadedContent.swift +++ b/TelegramCore/PendingMessageUploadedContent.swift @@ -275,7 +275,7 @@ private func uploadedMediaImageContent(network: Network, postbox: Postbox, trans let attribute = updatedAttributes[index] as! OutgoingMessageInfoAttribute updatedAttributes[index] = attribute.withUpdatedFlags(attribute.flags.union([.transformedMedia])) } else { - updatedAttributes.append(OutgoingMessageInfoAttribute(uniqueId: arc4random64(), flags: [.transformedMedia])) + updatedAttributes.append(OutgoingMessageInfoAttribute(uniqueId: arc4random64(), flags: [.transformedMedia], acknowledged: false)) } return .update(StoreMessage(id: currentMessage.id, globallyUniqueId: currentMessage.globallyUniqueId, groupingKey: currentMessage.groupingKey, timestamp: currentMessage.timestamp, flags: StoreMessageFlags(currentMessage.flags), tags: currentMessage.tags, globalTags: currentMessage.globalTags, localTags: currentMessage.localTags, forwardInfo: storeForwardInfo, authorId: currentMessage.author?.id, text: currentMessage.text, attributes: updatedAttributes, media: currentMessage.media)) }) @@ -533,7 +533,7 @@ private func uploadedMediaFileContent(network: Network, postbox: Postbox, auxili let attribute = updatedAttributes[index] as! OutgoingMessageInfoAttribute updatedAttributes[index] = attribute.withUpdatedFlags(attribute.flags.union([.transformedMedia])) } else { - updatedAttributes.append(OutgoingMessageInfoAttribute(uniqueId: arc4random64(), flags: [.transformedMedia])) + updatedAttributes.append(OutgoingMessageInfoAttribute(uniqueId: arc4random64(), flags: [.transformedMedia], acknowledged: false)) } return .update(StoreMessage(id: currentMessage.id, globallyUniqueId: currentMessage.globallyUniqueId, groupingKey: currentMessage.groupingKey, timestamp: currentMessage.timestamp, flags: StoreMessageFlags(currentMessage.flags), tags: currentMessage.tags, globalTags: currentMessage.globalTags, localTags: currentMessage.localTags, forwardInfo: storeForwardInfo, authorId: currentMessage.author?.id, text: currentMessage.text, attributes: updatedAttributes, media: currentMessage.media)) })