no message

This commit is contained in:
Peter 2018-09-25 01:36:14 +01:00
parent 103d1719dc
commit d7ff3e80fa
5 changed files with 143 additions and 14 deletions

View File

@ -285,7 +285,7 @@ func enqueueMessages(transaction: Transaction, account: Account, peerId: PeerId,
if transformedMedia { if transformedMedia {
infoFlags.insert(.transformedMedia) infoFlags.insert(.transformedMedia)
} }
attributes.append(OutgoingMessageInfoAttribute(uniqueId: randomId, flags: infoFlags)) attributes.append(OutgoingMessageInfoAttribute(uniqueId: randomId, flags: infoFlags, acknowledged: false))
globallyUniqueIds.append(randomId) globallyUniqueIds.append(randomId)
switch message { switch message {

View File

@ -490,6 +490,11 @@ private extension NetworkContextProxyId {
} }
} }
public enum NetworkRequestResult<T> {
case result(T)
case acknowledged
}
public final class Network: NSObject, MTRequestMessageServiceDelegate { public final class Network: NSObject, MTRequestMessageServiceDelegate {
private let queue: Queue private let queue: Queue
public let datacenterId: Int public let datacenterId: Int
@ -695,6 +700,67 @@ public final class Network: NSObject, MTRequestMessageServiceDelegate {
} }
} }
public func requestWithAcknowledgement<T>(_ data: (FunctionDescription, Buffer, DeserializeFunctionResponse<T>), tag: NetworkRequestDependencyTag? = nil, automaticFloodWait: Bool = true) -> Signal<NetworkRequestResult<T>, MTRpcError> {
let requestService = self.requestService
return Signal { subscriber in
let request = MTRequest()
request.setPayload(data.1.makeData() as Data, metadata: WrappedRequestMetadata(metadata: WrappedFunctionDescription(data.0), tag: tag), responseParser: { response in
if let result = data.2.parse(Buffer(data: response)) {
return BoxedMessage(result)
}
return nil
})
request.dependsOnPasswordEntry = false
request.shouldContinueExecutionWithErrorContext = { errorContext in
guard let errorContext = errorContext else {
return true
}
if errorContext.floodWaitSeconds > 0 && !automaticFloodWait {
return false
}
return true
}
request.acknowledgementReceived = {
subscriber.putNext(.acknowledged)
}
request.completed = { (boxedResponse, timestamp, error) -> () in
if let error = error {
subscriber.putError(error)
} else {
if let result = (boxedResponse as! BoxedMessage).body as? T {
subscriber.putNext(.result(result))
subscriber.putCompletion()
}
else {
subscriber.putError(MTRpcError(errorCode: 500, errorDescription: "TL_VERIFICATION_ERROR"))
}
}
}
if let tag = tag {
request.shouldDependOnRequest = { other in
if let other = other, let metadata = other.metadata as? WrappedRequestMetadata, let otherTag = metadata.tag {
return tag.shouldDependOn(other: otherTag)
}
return false
}
}
let internalId: Any! = request.internalId
requestService.add(request)
return ActionDisposable { [weak requestService] in
requestService?.removeRequest(byInternalId: internalId)
}
}
}
public func request<T>(_ data: (FunctionDescription, Buffer, DeserializeFunctionResponse<T>), tag: NetworkRequestDependencyTag? = nil, automaticFloodWait: Bool = true) -> Signal<T, MTRpcError> { public func request<T>(_ data: (FunctionDescription, Buffer, DeserializeFunctionResponse<T>), tag: NetworkRequestDependencyTag? = nil, automaticFloodWait: Bool = true) -> Signal<T, MTRpcError> {
let requestService = self.requestService let requestService = self.requestService
return Signal { subscriber in return Signal { subscriber in

View File

@ -22,23 +22,50 @@ public struct OutgoingMessageInfoFlags: OptionSet {
public class OutgoingMessageInfoAttribute: MessageAttribute { public class OutgoingMessageInfoAttribute: MessageAttribute {
public let uniqueId: Int64 public let uniqueId: Int64
public let flags: OutgoingMessageInfoFlags public let flags: OutgoingMessageInfoFlags
public let acknowledged: Bool
init(uniqueId: Int64, flags: OutgoingMessageInfoFlags) { init(uniqueId: Int64, flags: OutgoingMessageInfoFlags, acknowledged: Bool) {
self.uniqueId = uniqueId self.uniqueId = uniqueId
self.flags = flags self.flags = flags
self.acknowledged = acknowledged
} }
required public init(decoder: PostboxDecoder) { required public init(decoder: PostboxDecoder) {
self.uniqueId = decoder.decodeInt64ForKey("u", orElse: 0) self.uniqueId = decoder.decodeInt64ForKey("u", orElse: 0)
self.flags = OutgoingMessageInfoFlags(rawValue: decoder.decodeInt32ForKey("f", orElse: 0)) self.flags = OutgoingMessageInfoFlags(rawValue: decoder.decodeInt32ForKey("f", orElse: 0))
self.acknowledged = decoder.decodeInt32ForKey("ack", orElse: 0) != 0
} }
public func encode(_ encoder: PostboxEncoder) { public func encode(_ encoder: PostboxEncoder) {
encoder.encodeInt64(self.uniqueId, forKey: "u") encoder.encodeInt64(self.uniqueId, forKey: "u")
encoder.encodeInt32(self.flags.rawValue, forKey: "f") encoder.encodeInt32(self.flags.rawValue, forKey: "f")
encoder.encodeInt32(self.acknowledged ? 1 : 0, forKey: "ack")
} }
public func withUpdatedFlags(_ flags: OutgoingMessageInfoFlags) -> OutgoingMessageInfoAttribute { public func withUpdatedFlags(_ flags: OutgoingMessageInfoFlags) -> OutgoingMessageInfoAttribute {
return OutgoingMessageInfoAttribute(uniqueId: self.uniqueId, flags: flags) return OutgoingMessageInfoAttribute(uniqueId: self.uniqueId, flags: flags, acknowledged: self.acknowledged)
}
public func withUpdatedAcknowledged(_ acknowledged: Bool) -> OutgoingMessageInfoAttribute {
return OutgoingMessageInfoAttribute(uniqueId: self.uniqueId, flags: self.flags, acknowledged: acknowledged)
}
}
public extension Message {
public var isSentOrAcknowledged: Bool {
if self.flags.contains(.Failed) {
return false
} else if self.flags.isSending {
for attribute in self.attributes {
if let attribute = attribute as? OutgoingMessageInfoAttribute {
if attribute.acknowledged {
return true
}
}
}
return false
} else {
return true
}
} }
} }

View File

@ -855,20 +855,23 @@ public final class PendingMessageManager {
let dependencyTag = PendingMessageRequestDependencyTag(messageId: messageId) let dependencyTag = PendingMessageRequestDependencyTag(messageId: messageId)
let sendMessageRequest: Signal<Api.Updates, MTRpcError> let sendMessageRequest: Signal<NetworkRequestResult<Api.Updates>, MTRpcError>
switch content.content { switch content.content {
case .text: case .text:
sendMessageRequest = network.request(Api.functions.messages.sendMessage(flags: flags, peer: inputPeer, replyToMsgId: replyMessageId, message: message.text, randomId: uniqueId, replyMarkup: nil, entities: messageEntities), tag: dependencyTag) sendMessageRequest = network.requestWithAcknowledgement(Api.functions.messages.sendMessage(flags: flags, peer: inputPeer, replyToMsgId: replyMessageId, message: message.text, randomId: uniqueId, replyMarkup: nil, entities: messageEntities), tag: dependencyTag)
case let .media(inputMedia, text): case let .media(inputMedia, text):
sendMessageRequest = network.request(Api.functions.messages.sendMedia(flags: flags, peer: inputPeer, replyToMsgId: replyMessageId, media: inputMedia, message: text, randomId: uniqueId, replyMarkup: nil, entities: messageEntities), tag: dependencyTag) sendMessageRequest = network.request(Api.functions.messages.sendMedia(flags: flags, peer: inputPeer, replyToMsgId: replyMessageId, media: inputMedia, message: text, randomId: uniqueId, replyMarkup: nil, entities: messageEntities), tag: dependencyTag)
|> map(NetworkRequestResult.result)
case let .forward(sourceInfo): case let .forward(sourceInfo):
if let forwardSourceInfoAttribute = forwardSourceInfoAttribute, let sourcePeer = transaction.getPeer(forwardSourceInfoAttribute.messageId.peerId), let sourceInputPeer = apiInputPeer(sourcePeer) { if let forwardSourceInfoAttribute = forwardSourceInfoAttribute, let sourcePeer = transaction.getPeer(forwardSourceInfoAttribute.messageId.peerId), let sourceInputPeer = apiInputPeer(sourcePeer) {
sendMessageRequest = network.request(Api.functions.messages.forwardMessages(flags: 0, fromPeer: sourceInputPeer, id: [sourceInfo.messageId.id], randomId: [uniqueId], toPeer: inputPeer), tag: dependencyTag) sendMessageRequest = network.request(Api.functions.messages.forwardMessages(flags: 0, fromPeer: sourceInputPeer, id: [sourceInfo.messageId.id], randomId: [uniqueId], toPeer: inputPeer), tag: dependencyTag)
|> map(NetworkRequestResult.result)
} else { } else {
sendMessageRequest = .fail(MTRpcError(errorCode: 400, errorDescription: "internal")) sendMessageRequest = .fail(MTRpcError(errorCode: 400, errorDescription: "internal"))
} }
case let .chatContextResult(chatContextResult): case let .chatContextResult(chatContextResult):
sendMessageRequest = network.request(Api.functions.messages.sendInlineBotResult(flags: flags, peer: inputPeer, replyToMsgId: replyMessageId, randomId: uniqueId, queryId: chatContextResult.queryId, id: chatContextResult.id)) sendMessageRequest = network.request(Api.functions.messages.sendInlineBotResult(flags: flags, peer: inputPeer, replyToMsgId: replyMessageId, randomId: uniqueId, queryId: chatContextResult.queryId, id: chatContextResult.id))
|> map(NetworkRequestResult.result)
case .secretMedia: case .secretMedia:
assertionFailure() assertionFailure()
sendMessageRequest = .fail(MTRpcError(errorCode: 400, errorDescription: "internal")) sendMessageRequest = .fail(MTRpcError(errorCode: 400, errorDescription: "internal"))
@ -877,13 +880,20 @@ public final class PendingMessageManager {
return sendMessageRequest return sendMessageRequest
|> deliverOn(queue) |> deliverOn(queue)
|> mapToSignal { result -> Signal<Void, MTRpcError> in |> mapToSignal { result -> Signal<Void, MTRpcError> in
if let strongSelf = self { guard let strongSelf = self else {
return .never()
}
switch result {
case .acknowledged:
return strongSelf.applyAcknowledgedMessage(postbox: postbox, message: message)
|> mapError { _ -> MTRpcError in
return MTRpcError(errorCode: 400, errorDescription: "internal")
}
case let .result(result):
return strongSelf.applySentMessage(postbox: postbox, stateManager: stateManager, message: message, result: result) return strongSelf.applySentMessage(postbox: postbox, stateManager: stateManager, message: message, result: result)
|> mapError { _ -> MTRpcError in |> mapError { _ -> MTRpcError in
return MTRpcError(errorCode: 400, errorDescription: "internal") return MTRpcError(errorCode: 400, errorDescription: "internal")
} }
} else {
return .never()
} }
} }
|> `catch` { error -> Signal<Void, NoError> in |> `catch` { error -> Signal<Void, NoError> in
@ -925,6 +935,32 @@ public final class PendingMessageManager {
} |> switchToLatest } |> switchToLatest
} }
private func applyAcknowledgedMessage(postbox: Postbox, message: Message) -> Signal<Void, NoError> {
return postbox.transaction { transaction -> Void in
transaction.updateMessage(message.id, update: { currentMessage in
var attributes = message.attributes
var found = false
for i in 0 ..< attributes.count {
if let attribute = attributes[i] as? OutgoingMessageInfoAttribute {
attributes[i] = attribute.withUpdatedAcknowledged(true)
found = true
break
}
}
if !found {
return .skip
}
var storeForwardInfo: StoreMessageForwardInfo?
if let forwardInfo = currentMessage.forwardInfo {
storeForwardInfo = StoreMessageForwardInfo(authorId: forwardInfo.author.id, sourceId: forwardInfo.source?.id, sourceMessageId: forwardInfo.sourceMessageId, date: forwardInfo.date, authorSignature: forwardInfo.authorSignature)
}
return .update(StoreMessage(id: currentMessage.id, globallyUniqueId: currentMessage.globallyUniqueId, groupingKey: currentMessage.groupingKey, timestamp: currentMessage.timestamp, flags: StoreMessageFlags(currentMessage.flags), tags: currentMessage.tags, globalTags: currentMessage.globalTags, localTags: currentMessage.localTags, forwardInfo: storeForwardInfo, authorId: currentMessage.author?.id, text: currentMessage.text, attributes: attributes, media: currentMessage.media))
})
}
}
private func applySentMessage(postbox: Postbox, stateManager: AccountStateManager, message: Message, result: Api.Updates) -> Signal<Void, NoError> { private func applySentMessage(postbox: Postbox, stateManager: AccountStateManager, message: Message, result: Api.Updates) -> Signal<Void, NoError> {
return applyUpdateMessage(postbox: postbox, stateManager: stateManager, message: message, result: result) |> afterDisposed { [weak self] in return applyUpdateMessage(postbox: postbox, stateManager: stateManager, message: message, result: result) |> afterDisposed { [weak self] in
if let strongSelf = self { if let strongSelf = self {

View File

@ -275,7 +275,7 @@ private func uploadedMediaImageContent(network: Network, postbox: Postbox, trans
let attribute = updatedAttributes[index] as! OutgoingMessageInfoAttribute let attribute = updatedAttributes[index] as! OutgoingMessageInfoAttribute
updatedAttributes[index] = attribute.withUpdatedFlags(attribute.flags.union([.transformedMedia])) updatedAttributes[index] = attribute.withUpdatedFlags(attribute.flags.union([.transformedMedia]))
} else { } else {
updatedAttributes.append(OutgoingMessageInfoAttribute(uniqueId: arc4random64(), flags: [.transformedMedia])) updatedAttributes.append(OutgoingMessageInfoAttribute(uniqueId: arc4random64(), flags: [.transformedMedia], acknowledged: false))
} }
return .update(StoreMessage(id: currentMessage.id, globallyUniqueId: currentMessage.globallyUniqueId, groupingKey: currentMessage.groupingKey, timestamp: currentMessage.timestamp, flags: StoreMessageFlags(currentMessage.flags), tags: currentMessage.tags, globalTags: currentMessage.globalTags, localTags: currentMessage.localTags, forwardInfo: storeForwardInfo, authorId: currentMessage.author?.id, text: currentMessage.text, attributes: updatedAttributes, media: currentMessage.media)) return .update(StoreMessage(id: currentMessage.id, globallyUniqueId: currentMessage.globallyUniqueId, groupingKey: currentMessage.groupingKey, timestamp: currentMessage.timestamp, flags: StoreMessageFlags(currentMessage.flags), tags: currentMessage.tags, globalTags: currentMessage.globalTags, localTags: currentMessage.localTags, forwardInfo: storeForwardInfo, authorId: currentMessage.author?.id, text: currentMessage.text, attributes: updatedAttributes, media: currentMessage.media))
}) })
@ -533,7 +533,7 @@ private func uploadedMediaFileContent(network: Network, postbox: Postbox, auxili
let attribute = updatedAttributes[index] as! OutgoingMessageInfoAttribute let attribute = updatedAttributes[index] as! OutgoingMessageInfoAttribute
updatedAttributes[index] = attribute.withUpdatedFlags(attribute.flags.union([.transformedMedia])) updatedAttributes[index] = attribute.withUpdatedFlags(attribute.flags.union([.transformedMedia]))
} else { } else {
updatedAttributes.append(OutgoingMessageInfoAttribute(uniqueId: arc4random64(), flags: [.transformedMedia])) updatedAttributes.append(OutgoingMessageInfoAttribute(uniqueId: arc4random64(), flags: [.transformedMedia], acknowledged: false))
} }
return .update(StoreMessage(id: currentMessage.id, globallyUniqueId: currentMessage.globallyUniqueId, groupingKey: currentMessage.groupingKey, timestamp: currentMessage.timestamp, flags: StoreMessageFlags(currentMessage.flags), tags: currentMessage.tags, globalTags: currentMessage.globalTags, localTags: currentMessage.localTags, forwardInfo: storeForwardInfo, authorId: currentMessage.author?.id, text: currentMessage.text, attributes: updatedAttributes, media: currentMessage.media)) return .update(StoreMessage(id: currentMessage.id, globallyUniqueId: currentMessage.globallyUniqueId, groupingKey: currentMessage.groupingKey, timestamp: currentMessage.timestamp, flags: StoreMessageFlags(currentMessage.flags), tags: currentMessage.tags, globalTags: currentMessage.globalTags, localTags: currentMessage.localTags, forwardInfo: storeForwardInfo, authorId: currentMessage.author?.id, text: currentMessage.text, attributes: updatedAttributes, media: currentMessage.media))
}) })