no message

This commit is contained in:
Peter
2017-03-21 19:58:26 +03:00
parent 1ae775b2f4
commit 5278f451f1
5 changed files with 222 additions and 42 deletions

View File

@@ -15,8 +15,16 @@ public struct PendingMessageStatus: Equatable {
}
}
private enum PendingMessageState {
case none
case waitingForUploadToStart(Signal<PendingMessageUploadedContentResult, NoError>)
case uploading
case sending
}
private final class PendingMessageContext {
var disposable: MetaDisposable?
var state: PendingMessageState = .none
let disposable = MetaDisposable()
var status: PendingMessageStatus?
var statusSubscribers = Bag<(PendingMessageStatus?) -> Void>()
}
@@ -74,10 +82,12 @@ public final class PendingMessageManager {
let addedMessageIds = messageIds.subtracting(self.pendingMessageIds)
let removedMessageIds = self.pendingMessageIds.subtracting(messageIds)
var updateUploadingPeerIds = Set<PeerId>()
for id in removedMessageIds {
if let context = self.messageContexts[id] {
context.disposable?.dispose()
context.disposable = nil
context.state = .none
updateUploadingPeerIds.insert(id.peerId)
context.disposable.dispose()
if context.statusSubscribers.isEmpty {
self.messageContexts.removeValue(forKey: id)
}
@@ -89,6 +99,12 @@ public final class PendingMessageManager {
}
self.pendingMessageIds = messageIds
if !updateUploadingPeerIds.isEmpty {
for peerId in updateUploadingPeerIds {
self.updateWaitingUploads(peerId: peerId)
}
}
}
}
@@ -102,7 +118,6 @@ public final class PendingMessageManager {
messageContext = current
} else {
messageContext = PendingMessageContext()
messageContext.disposable = MetaDisposable()
self.messageContexts[id] = messageContext
}
@@ -116,7 +131,7 @@ public final class PendingMessageManager {
self.queue.async {
if let current = self.messageContexts[id] {
current.statusSubscribers.remove(index)
if current.statusSubscribers.isEmpty && current.disposable == nil {
if case .none = current.status, current.statusSubscribers.isEmpty {
self.messageContexts.removeValue(forKey: id)
}
}
@@ -128,6 +143,24 @@ public final class PendingMessageManager {
}
}
private func canBeginUploadingMessage(id: MessageId) -> Bool {
assert(self.queue.isCurrent())
let messageIdsForPeer: [MessageId] = self.messageContexts.keys.filter({ $0.peerId == id.peerId }).sorted()
for contextId in messageIdsForPeer {
if contextId < id {
let context = self.messageContexts[contextId]!
if case .uploading = context.state {
return false
}
} else {
break
}
}
return true
}
private func beginSendingMessages(_ ids: [MessageId]) {
assert(self.queue.isCurrent())
@@ -137,7 +170,6 @@ public final class PendingMessageManager {
messageContext = current
} else {
messageContext = PendingMessageContext()
messageContext.disposable = MetaDisposable()
self.messageContexts[id] = messageContext
}
@@ -163,7 +195,7 @@ public final class PendingMessageManager {
if let strongSelf = self {
assert(strongSelf.queue.isCurrent())
for message in messages {
for message in messages.sorted(by: { $0.id < $1.id }) {
guard let messageContext = strongSelf.messageContexts[message.id] else {
continue
}
@@ -172,7 +204,20 @@ public final class PendingMessageManager {
continue
}
let uploadedContent = uploadedMessageContent(network: strongSelf.network, postbox: strongSelf.postbox, transformOutgoingMessageMedia: strongSelf.transformOutgoingMessageMedia, message: message)
let contentToUpload = messageContentToUpload(network: strongSelf.network, postbox: strongSelf.postbox, transformOutgoingMessageMedia: strongSelf.transformOutgoingMessageMedia, message: message)
switch contentToUpload {
case let .ready(message, content):
strongSelf.beginSendingMessage(messageContext: messageContext, message: message, content: content)
case let .upload(uploadSignal):
if strongSelf.canBeginUploadingMessage(id: message.id) {
strongSelf.beginUploadingMessage(messageContext: messageContext, id: message.id, uploadSignal: uploadSignal)
} else {
messageContext.state = .waitingForUploadToStart(uploadSignal)
}
}
/*let uploadedContent = uploadedMessageContent(network: strongSelf.network, postbox: strongSelf.postbox, transformOutgoingMessageMedia: strongSelf.transformOutgoingMessageMedia, message: message)
let sendMessage = uploadedContent
|> mapToSignal { contentResult -> Signal<PendingMessageResult, NoError> in
@@ -191,11 +236,10 @@ public final class PendingMessageManager {
}
}
messageContext.disposable?.set((sendMessage |> deliverOn(strongSelf.queue) |> afterDisposed {
messageContext.disposable.set((sendMessage |> deliverOn(strongSelf.queue) |> afterDisposed {
if let strongSelf = self {
assert(strongSelf.queue.isCurrent())
if let current = strongSelf.messageContexts[message.id] {
current.disposable = nil
for subscriber in current.statusSubscribers.copyItems() {
subscriber(nil)
}
@@ -219,12 +263,130 @@ public final class PendingMessageManager {
}
}
}
}))
}))*/
}
}
}))
}
private func beginSendingMessage(messageContext: PendingMessageContext, message: Message, content: PendingMessageUploadedContent) {
messageContext.state = .sending
let sendMessage: Signal<PendingMessageResult, NoError> = self.sendMessageContent(network: self.network, postbox: self.postbox, stateManager: self.stateManager, message: message, content: content)
|> map { next -> PendingMessageResult in
return .progress(1.0)
}
messageContext.disposable.set((sendMessage |> deliverOn(self.queue) |> afterDisposed { [weak self] in
if let strongSelf = self {
assert(strongSelf.queue.isCurrent())
if let current = strongSelf.messageContexts[message.id] {
current.status = .none
for subscriber in current.statusSubscribers.copyItems() {
subscriber(nil)
}
if current.statusSubscribers.isEmpty {
strongSelf.messageContexts.removeValue(forKey: message.id)
}
}
}
}).start(next: { [weak self] next in
if let strongSelf = self {
assert(strongSelf.queue.isCurrent())
switch next {
case let .progress(progress):
if let current = strongSelf.messageContexts[message.id] {
let status = PendingMessageStatus(progress: progress)
current.status = status
for subscriber in current.statusSubscribers.copyItems() {
subscriber(status)
}
}
}
}
}))
}
private func beginUploadingMessage(messageContext: PendingMessageContext, id: MessageId, uploadSignal: Signal<PendingMessageUploadedContentResult, NoError>) {
messageContext.state = .uploading
messageContext.disposable.set((uploadSignal |> deliverOn(self.queue)).start(next: { [weak self] next in
if let strongSelf = self {
assert(strongSelf.queue.isCurrent())
switch next {
case let .progress(progress):
if let current = strongSelf.messageContexts[id] {
let status = PendingMessageStatus(progress: progress)
current.status = status
for subscriber in current.statusSubscribers.copyItems() {
subscriber(status)
}
}
case let .content(message, content):
if let current = strongSelf.messageContexts[id] {
strongSelf.beginSendingMessage(messageContext: current, message: message, content: content)
strongSelf.updateWaitingUploads(peerId: id.peerId)
}
}
}
}))
}
private func updateWaitingUploads(peerId: PeerId) {
assert(self.queue.isCurrent())
let messageIdsForPeer: [MessageId] = self.messageContexts.keys.filter({ $0.peerId == peerId }).sorted()
loop: for contextId in messageIdsForPeer {
let context = self.messageContexts[contextId]!
if case let .waitingForUploadToStart(uploadSignal) = context.state {
if self.canBeginUploadingMessage(id: contextId) {
context.state = .uploading
context.disposable.set((uploadSignal |> deliverOn(self.queue)).start(next: { [weak self] next in
if let strongSelf = self {
assert(strongSelf.queue.isCurrent())
switch next {
case let .progress(progress):
if let current = strongSelf.messageContexts[contextId] {
let status = PendingMessageStatus(progress: progress)
current.status = status
for subscriber in current.statusSubscribers.copyItems() {
subscriber(status)
}
}
case let .content(message, content):
if let current = strongSelf.messageContexts[message.id] {
current.state = .sending
current.disposable.set((strongSelf.sendMessageContent(network: strongSelf.network, postbox: strongSelf.postbox, stateManager: strongSelf.stateManager, message: message, content: content)
|> map { next -> PendingMessageResult in
return .progress(1.0)
}).start(next: { next in
if let strongSelf = self {
assert(strongSelf.queue.isCurrent())
switch next {
case let .progress(progress):
if let current = strongSelf.messageContexts[message.id] {
let status = PendingMessageStatus(progress: progress)
current.status = status
for subscriber in current.statusSubscribers.copyItems() {
subscriber(status)
}
}
}
}
}))
}
}
}
}))
}
break loop
}
}
}
private func sendMessageContent(network: Network, postbox: Postbox, stateManager: AccountStateManager, message: Message, content: PendingMessageUploadedContent) -> Signal<Void, NoError> {
return postbox.modify { [weak self] modifier -> Signal<Void, NoError> in
if message.id.peerId.namespace == Namespaces.Peer.SecretChat {