no message

This commit is contained in:
Peter
2018-07-20 13:09:56 +03:00
parent c192cd39e4
commit cbe8943bc7
43 changed files with 2064 additions and 724 deletions

View File

@@ -2,9 +2,11 @@ import Foundation
#if os(macOS)
import PostboxMac
import SwiftSignalKitMac
import MtProtoKitMac
#else
import Postbox
import SwiftSignalKit
import MtProtoKitDynamic
#endif
public struct PendingMessageStatus: Equatable {
@@ -20,7 +22,7 @@ private enum PendingMessageState {
case none
case waitingForUploadToStart(groupId: Int64?, upload: Signal<PendingMessageUploadedContentResult, PendingMessageUploadError>)
case uploading(groupId: Int64?)
case waitingToBeSent(groupId: Int64?, content: PendingMessageUploadedContent)
case waitingToBeSent(groupId: Int64?, content: PendingMessageUploadedContentAndReuploadInfo)
case sending(groupId: Int64?)
var groupId: Int64? {
@@ -41,9 +43,11 @@ private enum PendingMessageState {
private final class PendingMessageContext {
var state: PendingMessageState = .none
let disposable = MetaDisposable()
let uploadDisposable = MetaDisposable()
let sendDisposable = MetaDisposable()
var status: PendingMessageStatus?
var statusSubscribers = Bag<(PendingMessageStatus?) -> Void>()
var forcedReuploadOnce: Bool = false
}
private final class PeerPendingMessagesSummaryContext {
@@ -91,6 +95,7 @@ public final class PendingMessageManager {
private let auxiliaryMethods: AccountAuxiliaryMethods
private let stateManager: AccountStateManager
private let messageMediaPreuploadManager: MessageMediaPreuploadManager
private let revalidationContext: MediaReferenceRevalidationContext
private let queue = Queue()
@@ -107,12 +112,13 @@ public final class PendingMessageManager {
var transformOutgoingMessageMedia: TransformOutgoingMessageMedia?
init(network: Network, postbox: Postbox, auxiliaryMethods: AccountAuxiliaryMethods, stateManager: AccountStateManager, messageMediaPreuploadManager: MessageMediaPreuploadManager) {
init(network: Network, postbox: Postbox, auxiliaryMethods: AccountAuxiliaryMethods, stateManager: AccountStateManager, messageMediaPreuploadManager: MessageMediaPreuploadManager, revalidationContext: MediaReferenceRevalidationContext) {
self.network = network
self.postbox = postbox
self.auxiliaryMethods = auxiliaryMethods
self.stateManager = stateManager
self.messageMediaPreuploadManager = messageMediaPreuploadManager
self.revalidationContext = revalidationContext
}
deinit {
@@ -133,7 +139,8 @@ public final class PendingMessageManager {
}
context.state = .none
updateUploadingPeerIds.insert(id.peerId)
context.disposable.dispose()
context.sendDisposable.dispose()
context.uploadDisposable.dispose()
if context.statusSubscribers.isEmpty {
self.messageContexts.removeValue(forKey: id)
}
@@ -252,29 +259,12 @@ public final class PendingMessageManager {
continue
}
let contentToUpload = messageContentToUpload(network: strongSelf.network, postbox: strongSelf.postbox, auxiliaryMethods: strongSelf.auxiliaryMethods, transformOutgoingMessageMedia: strongSelf.transformOutgoingMessageMedia, messageMediaPreuploadManager: strongSelf.messageMediaPreuploadManager, message: message)
let contentUploadSignal = messageContentToUpload(network: strongSelf.network, postbox: strongSelf.postbox, auxiliaryMethods: strongSelf.auxiliaryMethods, transformOutgoingMessageMedia: strongSelf.transformOutgoingMessageMedia, messageMediaPreuploadManager: strongSelf.messageMediaPreuploadManager, revalidationContext: strongSelf.revalidationContext, forceReupload: messageContext.forcedReuploadOnce, message: message)
switch contentToUpload {
case let .ready(content):
if let groupingKey = message.groupingKey {
strongSelf.beginSendingMessage(messageContext: messageContext, messageId: message.id, groupId: message.groupingKey, content: content)
if let current = currentGroupId, current != groupingKey {
strongSelf.beginSendingGroupIfPossible(groupId: current)
}
} else {
if let currentGroupId = currentGroupId {
strongSelf.beginSendingGroupIfPossible(groupId: currentGroupId)
}
strongSelf.beginSendingMessage(messageContext: messageContext, messageId: message.id, groupId: message.groupingKey, content: content)
}
currentGroupId = message.groupingKey
case let .upload(uploadSignal):
if strongSelf.canBeginUploadingMessage(id: message.id) {
strongSelf.beginUploadingMessage(messageContext: messageContext, id: message.id, groupId: message.groupingKey, uploadSignal: uploadSignal)
} else {
messageContext.state = .waitingForUploadToStart(groupId: message.groupingKey, upload: uploadSignal)
}
if strongSelf.canBeginUploadingMessage(id: message.id) {
strongSelf.beginUploadingMessage(messageContext: messageContext, id: message.id, groupId: message.groupingKey, uploadSignal: contentUploadSignal)
} else {
messageContext.state = .waitingForUploadToStart(groupId: message.groupingKey, upload: contentUploadSignal)
}
}
@@ -285,7 +275,7 @@ public final class PendingMessageManager {
}))
}
private func beginSendingMessage(messageContext: PendingMessageContext, messageId: MessageId, groupId: Int64?, content: PendingMessageUploadedContent) {
private func beginSendingMessage(messageContext: PendingMessageContext, messageId: MessageId, groupId: Int64?, content: PendingMessageUploadedContentAndReuploadInfo) {
if let groupId = groupId {
messageContext.state = .waitingToBeSent(groupId: groupId, content: content)
} else {
@@ -299,8 +289,8 @@ public final class PendingMessageManager {
}
}
private func dataForPendingMessageGroup(_ groupId: Int64) -> [(messageContext: PendingMessageContext, messageId: MessageId, content: PendingMessageUploadedContent)]? {
var result: [(messageContext: PendingMessageContext, messageId: MessageId, content: PendingMessageUploadedContent)] = []
private func dataForPendingMessageGroup(_ groupId: Int64) -> [(messageContext: PendingMessageContext, messageId: MessageId, content: PendingMessageUploadedContentAndReuploadInfo)]? {
var result: [(messageContext: PendingMessageContext, messageId: MessageId, content: PendingMessageUploadedContentAndReuploadInfo)] = []
loop: for (id, context) in self.messageContexts {
switch context.state {
@@ -332,16 +322,18 @@ public final class PendingMessageManager {
}
}
private func commitSendingMessageGroup(groupId: Int64, messages: [(messageContext: PendingMessageContext, messageId: MessageId, content: PendingMessageUploadedContent)]) {
private func commitSendingMessageGroup(groupId: Int64, messages: [(messageContext: PendingMessageContext, messageId: MessageId, content: PendingMessageUploadedContentAndReuploadInfo)]) {
for (context, _, _) in messages {
context.state = .sending(groupId: groupId)
}
let sendMessage: Signal<PendingMessageResult, NoError> = self.sendGroupMessagesContent(network: self.network, postbox: self.postbox, stateManager: self.stateManager, group: messages.map { ($0.1, $0.2) })
|> map { next -> PendingMessageResult in
return .progress(1.0)
}
messages[0].0.disposable.set((sendMessage |> deliverOn(self.queue) |> afterDisposed { [weak self] in
if let strongSelf = self {
|> map { next -> PendingMessageResult in
return .progress(1.0)
}
messages[0].0.sendDisposable.set((sendMessage
|> deliverOn(self.queue)
|> afterDisposed { [weak self] in
/*if let strongSelf = self {
assert(strongSelf.queue.isCurrent())
for (_, id, _) in messages {
if let current = strongSelf.messageContexts[id] {
@@ -354,18 +346,20 @@ public final class PendingMessageManager {
}
}
}
}
}*/
}).start())
}
private func commitSendingSingleMessage(messageContext: PendingMessageContext, messageId: MessageId, content: PendingMessageUploadedContent) {
private func commitSendingSingleMessage(messageContext: PendingMessageContext, messageId: MessageId, content: PendingMessageUploadedContentAndReuploadInfo) {
messageContext.state = .sending(groupId: nil)
let sendMessage: Signal<PendingMessageResult, NoError> = self.sendMessageContent(network: self.network, postbox: self.postbox, stateManager: self.stateManager, messageId: messageId, content: content)
|> map { next -> PendingMessageResult in
return .progress(1.0)
}
messageContext.disposable.set((sendMessage |> deliverOn(self.queue) |> afterDisposed { [weak self] in
if let strongSelf = self {
|> map { next -> PendingMessageResult in
return .progress(1.0)
}
messageContext.sendDisposable.set((sendMessage
|> deliverOn(self.queue)
|> afterDisposed { [weak self] in
/*if let strongSelf = self {
assert(strongSelf.queue.isCurrent())
if let current = strongSelf.messageContexts[messageId] {
current.status = .none
@@ -376,7 +370,7 @@ public final class PendingMessageManager {
strongSelf.messageContexts.removeValue(forKey: messageId)
}
}
}
}*/
}).start(next: { [weak self] next in
if let strongSelf = self {
assert(strongSelf.queue.isCurrent())
@@ -404,7 +398,9 @@ public final class PendingMessageManager {
subscriber(status)
}
messageContext.disposable.set((uploadSignal |> deliverOn(self.queue) |> `catch` { [weak self] _ -> Signal<PendingMessageUploadedContentResult, NoError> in
messageContext.uploadDisposable.set((uploadSignal
|> deliverOn(self.queue)
|> `catch` { [weak self] _ -> Signal<PendingMessageUploadedContentResult, NoError> in
if let strongSelf = self {
let modify = strongSelf.postbox.transaction { transaction -> Void in
transaction.updateMessage(id, update: { currentMessage in
@@ -415,7 +411,8 @@ public final class PendingMessageManager {
return .update(StoreMessage(id: id, globallyUniqueId: currentMessage.globallyUniqueId, groupingKey: currentMessage.groupingKey, timestamp: currentMessage.timestamp, flags: [.Failed], tags: currentMessage.tags, globalTags: currentMessage.globalTags, localTags: currentMessage.localTags, forwardInfo: storeForwardInfo, authorId: currentMessage.author?.id, text: currentMessage.text, attributes: currentMessage.attributes, media: currentMessage.media))
})
}
return modify |> mapToSignal { _ in return .complete() }
return modify
|> mapToSignal { _ in return .complete() }
}
return .fail(Void())
}).start(next: { [weak self] next in
@@ -459,7 +456,7 @@ public final class PendingMessageManager {
subscriber(status)
}
context.disposable.set((uploadSignal |> deliverOn(self.queue)).start(next: { [weak self] next in
context.uploadDisposable.set((uploadSignal |> deliverOn(self.queue)).start(next: { [weak self] next in
if let strongSelf = self {
assert(strongSelf.queue.isCurrent())
@@ -489,7 +486,7 @@ public final class PendingMessageManager {
}
}
private func sendGroupMessagesContent(network: Network, postbox: Postbox, stateManager: AccountStateManager, group: [(messageId: MessageId, content: PendingMessageUploadedContent)]) -> Signal<Void, NoError> {
private func sendGroupMessagesContent(network: Network, postbox: Postbox, stateManager: AccountStateManager, group: [(messageId: MessageId, content: PendingMessageUploadedContentAndReuploadInfo)]) -> Signal<Void, NoError> {
return postbox.transaction { [weak self] transaction -> Signal<Void, NoError> in
if group.isEmpty {
return .complete()
@@ -497,7 +494,7 @@ public final class PendingMessageManager {
let peerId = group[0].messageId.peerId
var messages: [(Message, PendingMessageUploadedContent)] = []
var messages: [(Message, PendingMessageUploadedContentAndReuploadInfo)] = []
for (id, content) in group {
if let message = transaction.getMessage(id) {
messages.append((message, content))
@@ -532,7 +529,7 @@ public final class PendingMessageManager {
}
}
let sendMessageRequest: Signal<Api.Updates, NoError>
let sendMessageRequest: Signal<Api.Updates, MTRpcError>
if isForward {
flags |= (1 << 9)
@@ -547,7 +544,7 @@ public final class PendingMessageManager {
}
if let uniqueId = uniqueId {
switch content {
switch content.content {
case let .forward(forwardAttribute):
forwardIds.append((forwardAttribute.messageId, uniqueId))
default:
@@ -561,27 +558,21 @@ public final class PendingMessageManager {
let forwardPeerIds = Set(forwardIds.map { $0.0.peerId })
if forwardPeerIds.count != 1 {
assertionFailure()
sendMessageRequest = .fail(NoError())
sendMessageRequest = .fail(MTRpcError(errorCode: 400, errorDescription: "Invalid forward peer ids"))
} else if let inputSourcePeerId = forwardPeerIds.first, let inputSourcePeer = transaction.getPeer(inputSourcePeerId).flatMap(apiInputPeer) {
let dependencyTag = PendingMessageRequestDependencyTag(messageId: messages[0].0.id)
sendMessageRequest = network.request(Api.functions.messages.forwardMessages(flags: flags, fromPeer: inputSourcePeer, id: forwardIds.map { $0.0.id }, randomId: forwardIds.map { $0.1 }, toPeer: inputPeer), tag: dependencyTag)
|> mapError { _ -> NoError in
return NoError()
}
} else {
assertionFailure()
sendMessageRequest = .fail(NoError())
sendMessageRequest = .fail(MTRpcError(errorCode: 400, errorDescription: "Invalid forward source"))
}
//messages.forwardMessages flags:# silent:flags.5?true background:flags.6?true with_my_score:flags.8?true from_peer:InputPeer id:Vector<int> random_id:Vector<long> to_peer:InputPeer grouped:flags.9?true = Updates;
} else {
flags |= (1 << 7)
if let _ = replyMessageId {
flags |= Int32(1 << 0)
}
//messages.sendMultiMedia flags:# silent:flags.5?true background:flags.6?true clear_draft:flags.7?true peer:InputPeer reply_to_msg_id:flags.0?int multi_media:Vector<InputSingleMedia> = Updates;
var singleMedias: [Api.InputSingleMedia] = []
for (message, content) in messages {
var uniqueId: Int64?
@@ -592,7 +583,7 @@ public final class PendingMessageManager {
}
}
if let uniqueId = uniqueId {
switch content {
switch content.content {
case let .media(inputMedia, text):
var messageEntities: [Api.MessageEntity]?
for attribute in message.attributes {
@@ -616,32 +607,36 @@ public final class PendingMessageManager {
}
sendMessageRequest = network.request(Api.functions.messages.sendMultiMedia(flags: flags, peer: inputPeer, replyToMsgId: replyMessageId, multiMedia: singleMedias))
|> mapError { _ -> NoError in
return NoError()
}
}
return sendMessageRequest
|> mapToSignal { result -> Signal<Void, NoError> in
if let strongSelf = self {
return strongSelf.applySentGroupMessages(postbox: postbox, stateManager: stateManager, messages: messages.map { $0.0 }, result: result)
} else {
return .never()
|> mapToSignal { result -> Signal<Void, MTRpcError> in
if let strongSelf = self {
return strongSelf.applySentGroupMessages(postbox: postbox, stateManager: stateManager, messages: messages.map { $0.0 }, result: result)
|> mapError { _ -> MTRpcError in
return MTRpcError(errorCode: 400, errorDescription: "empty")
}
} else {
return .never()
}
|> `catch` { _ -> Signal<Void, NoError> in
return failMessages(postbox: postbox, ids: group.map { $0.0 })
}
|> `catch` { error -> Signal<Void, NoError> in
if error.errorDescription.hasPrefix("FILEREF_INVALID") || error.errorDescription.hasPrefix("FILE_REFERENCE_") {
}
return failMessages(postbox: postbox, ids: group.map { $0.0 })
}
} else {
assertionFailure()
return failMessages(postbox: postbox, ids: group.map { $0.0 })
}
} |> switchToLatest
}
|> switchToLatest
}
private static func sendSecretMessageContent(transaction: Transaction, message: Message, content: PendingMessageUploadedContent) {
private static func sendSecretMessageContent(transaction: Transaction, message: Message, content: PendingMessageUploadedContentAndReuploadInfo) {
var secretFile: SecretChatOutgoingFile?
switch content {
switch content.content {
case let .secretMedia(file, size, key):
if let fileReference = SecretChatOutgoingFileReference(file) {
secretFile = SecretChatOutgoingFile(reference: fileReference, size: size, key: key)
@@ -724,7 +719,8 @@ public final class PendingMessageManager {
}
}
private func sendMessageContent(network: Network, postbox: Postbox, stateManager: AccountStateManager, messageId: MessageId, content: PendingMessageUploadedContent) -> Signal<Void, NoError> {
private func sendMessageContent(network: Network, postbox: Postbox, stateManager: AccountStateManager, messageId: MessageId, content: PendingMessageUploadedContentAndReuploadInfo) -> Signal<Void, NoError> {
let queue = self.queue
return postbox.transaction { [weak self] transaction -> Signal<Void, NoError> in
guard let message = transaction.getMessage(messageId) else {
return .complete()
@@ -772,47 +768,49 @@ public final class PendingMessageManager {
let dependencyTag = PendingMessageRequestDependencyTag(messageId: messageId)
let sendMessageRequest: Signal<Api.Updates, NoError>
switch content {
let sendMessageRequest: Signal<Api.Updates, MTRpcError>
switch content.content {
case .text:
sendMessageRequest = network.request(Api.functions.messages.sendMessage(flags: flags, peer: inputPeer, replyToMsgId: replyMessageId, message: message.text, randomId: uniqueId, replyMarkup: nil, entities: messageEntities), tag: dependencyTag)
|> mapError { _ -> NoError in
return NoError()
}
case let .media(inputMedia, text):
sendMessageRequest = network.request(Api.functions.messages.sendMedia(flags: flags, peer: inputPeer, replyToMsgId: replyMessageId, media: inputMedia, message: text, randomId: uniqueId, replyMarkup: nil, entities: messageEntities), tag: dependencyTag)
|> mapError { _ -> NoError in
return NoError()
}
case let .forward(sourceInfo):
if let forwardSourceInfoAttribute = forwardSourceInfoAttribute, let sourcePeer = transaction.getPeer(forwardSourceInfoAttribute.messageId.peerId), let sourceInputPeer = apiInputPeer(sourcePeer) {
sendMessageRequest = network.request(Api.functions.messages.forwardMessages(flags: 0, fromPeer: sourceInputPeer, id: [sourceInfo.messageId.id], randomId: [uniqueId], toPeer: inputPeer), tag: dependencyTag)
|> mapError { _ -> NoError in
return NoError()
}
} else {
sendMessageRequest = .fail(NoError())
sendMessageRequest = .fail(MTRpcError(errorCode: 400, errorDescription: "internal"))
}
case let .chatContextResult(chatContextResult):
sendMessageRequest = network.request(Api.functions.messages.sendInlineBotResult(flags: flags, peer: inputPeer, replyToMsgId: replyMessageId, randomId: uniqueId, queryId: chatContextResult.queryId, id: chatContextResult.id))
|> mapError { _ -> NoError in
return NoError()
}
case .secretMedia:
assertionFailure()
sendMessageRequest = .fail(NoError())
sendMessageRequest = .fail(MTRpcError(errorCode: 400, errorDescription: "internal"))
}
return sendMessageRequest
|> mapToSignal { result -> Signal<Void, NoError> in
if let strongSelf = self {
return strongSelf.applySentMessage(postbox: postbox, stateManager: stateManager, message: message, result: result)
} else {
return .never()
|> mapToSignal { result -> Signal<Void, MTRpcError> in
if let strongSelf = self {
return strongSelf.applySentMessage(postbox: postbox, stateManager: stateManager, message: message, result: result)
|> mapError { _ -> MTRpcError in
return MTRpcError(errorCode: 400, errorDescription: "internal")
}
} else {
return .never()
}
|> `catch` { _ -> Signal<Void, NoError> in
let modify = postbox.transaction { transaction -> Void in
}
|> `catch` { error -> Signal<Void, NoError> in
queue.async {
guard let strongSelf = self, let context = strongSelf.messageContexts[messageId] else {
return
}
if error.errorDescription.hasPrefix("FILEREF_INVALID") || error.errorDescription.hasPrefix("FILE_REFERENCE_") {
if !context.forcedReuploadOnce {
context.forcedReuploadOnce = true
strongSelf.beginSendingMessages([messageId])
return
}
}
let _ = (postbox.transaction { transaction -> Void in
transaction.updateMessage(message.id, update: { currentMessage in
var storeForwardInfo: StoreMessageForwardInfo?
if let forwardInfo = currentMessage.forwardInfo {
@@ -820,9 +818,10 @@ public final class PendingMessageManager {
}
return .update(StoreMessage(id: message.id, globallyUniqueId: currentMessage.globallyUniqueId, groupingKey: currentMessage.groupingKey, timestamp: currentMessage.timestamp, flags: [.Failed], tags: currentMessage.tags, globalTags: currentMessage.globalTags, localTags: currentMessage.localTags, forwardInfo: storeForwardInfo, authorId: currentMessage.author?.id, text: currentMessage.text, attributes: currentMessage.attributes, media: currentMessage.media))
})
}
return modify
}).start()
}
return .complete()
}
} else {
return postbox.transaction { transaction -> Void in