Added support for request progress reporting

This commit is contained in:
Peter 2018-11-17 02:54:57 +04:00
parent 8d7f1c6241
commit 4dcdfd8751
4 changed files with 177 additions and 141 deletions

View File

@ -532,9 +532,21 @@ private extension NetworkContextProxyId {
}
}
public struct NetworkRequestAdditionalInfo: OptionSet {
public var rawValue: Int32
public init(rawValue: Int32) {
self.rawValue = rawValue
}
public static let acknowledgement = NetworkRequestAdditionalInfo(rawValue: 1 << 0)
public static let progress = NetworkRequestAdditionalInfo(rawValue: 1 << 1)
}
public enum NetworkRequestResult<T> {
case result(T)
case acknowledged
case progress(Float, Int32)
}
public final class Network: NSObject, MTRequestMessageServiceDelegate {
@ -743,7 +755,7 @@ public final class Network: NSObject, MTRequestMessageServiceDelegate {
}
}
public func requestWithAcknowledgement<T>(_ data: (FunctionDescription, Buffer, DeserializeFunctionResponse<T>), tag: NetworkRequestDependencyTag? = nil, automaticFloodWait: Bool = true) -> Signal<NetworkRequestResult<T>, MTRpcError> {
public func requestWithAdditionalInfo<T>(_ data: (FunctionDescription, Buffer, DeserializeFunctionResponse<T>), info: NetworkRequestAdditionalInfo, tag: NetworkRequestDependencyTag? = nil, automaticFloodWait: Bool = true) -> Signal<NetworkRequestResult<T>, MTRpcError> {
let requestService = self.requestService
return Signal { subscriber in
let request = MTRequest()
@ -768,7 +780,15 @@ public final class Network: NSObject, MTRequestMessageServiceDelegate {
}
request.acknowledgementReceived = {
subscriber.putNext(.acknowledged)
if info.contains(.acknowledgement) {
subscriber.putNext(.acknowledged)
}
}
request.progressUpdated = { progress, packetSize in
if info.contains(.progress) {
subscriber.putNext(.progress(progress, Int32(clamping: packetSize)))
}
}
request.completed = { (boxedResponse, timestamp, error) -> () in
@ -804,7 +824,7 @@ public final class Network: NSObject, MTRequestMessageServiceDelegate {
}
}
public func request<T>(_ data: (FunctionDescription, Buffer, DeserializeFunctionResponse<T>), tag: NetworkRequestDependencyTag? = nil, automaticFloodWait: Bool = true) -> Signal<T, MTRpcError> {
public func request<T>(_ data: (FunctionDescription, Buffer, DeserializeFunctionResponse<T>), tag: NetworkRequestDependencyTag? = nil, automaticFloodWait: Bool = true) -> Signal<T, MTRpcError> {
let requestService = self.requestService
return Signal { subscriber in
let request = MTRequest()

View File

@ -907,7 +907,7 @@ public final class PendingMessageManager {
let sendMessageRequest: Signal<NetworkRequestResult<Api.Updates>, MTRpcError>
switch content.content {
case .text:
sendMessageRequest = network.requestWithAcknowledgement(Api.functions.messages.sendMessage(flags: flags, peer: inputPeer, replyToMsgId: replyMessageId, message: message.text, randomId: uniqueId, replyMarkup: nil, entities: messageEntities), tag: dependencyTag)
sendMessageRequest = network.requestWithAdditionalInfo(Api.functions.messages.sendMessage(flags: flags, peer: inputPeer, replyToMsgId: replyMessageId, message: message.text, randomId: uniqueId, replyMarkup: nil, entities: messageEntities), info: .acknowledgement, tag: dependencyTag)
case let .media(inputMedia, text):
sendMessageRequest = network.request(Api.functions.messages.sendMedia(flags: flags, peer: inputPeer, replyToMsgId: replyMessageId, media: inputMedia, message: text, randomId: uniqueId, replyMarkup: nil, entities: messageEntities), tag: dependencyTag)
|> map(NetworkRequestResult.result)
@ -936,6 +936,8 @@ public final class PendingMessageManager {
return .never()
}
switch result {
case .progress:
return .complete()
case .acknowledged:
return strongSelf.applyAcknowledgedMessage(postbox: postbox, message: message)
|> mapError { _ -> MTRpcError in

View File

@ -13,42 +13,20 @@ public enum SearchMessagesLocation: Equatable {
case general
case group(PeerGroupId)
case peer(peerId: PeerId, fromId: PeerId?, tags: MessageTags?)
public static func ==(lhs: SearchMessagesLocation, rhs: SearchMessagesLocation) -> Bool {
switch lhs {
case .general:
if case .general = rhs {
return true
} else {
return false
}
case let .group(groupId):
if case .group(groupId) = rhs {
return true
} else {
return false
}
case let .peer(lhsPeerId, lhsFromId, lhsTags):
if case let .peer(rhsPeerId, rhsFromId, rhsTags) = rhs, lhsPeerId == rhsPeerId, lhsFromId == rhsFromId, lhsTags == rhsTags {
return true
} else {
return false
}
}
}
}
public func searchMessages(account: Account, location: SearchMessagesLocation, query: String) -> Signal<([Message], [PeerId : CombinedPeerReadState]), NoError> {
public func searchMessages(account: Account, location: SearchMessagesLocation, query: String, lowerBound: MessageIndex? = nil, limit: Int32 = 100) -> Signal<([Message], [PeerId : CombinedPeerReadState], Int32), NoError> {
let remoteSearchResult: Signal<Api.messages.Messages?, NoError>
switch location {
case let .peer(peerId, fromId, tags):
if peerId.namespace == Namespaces.Peer.SecretChat {
return account.postbox.transaction { transaction -> ([Message], [PeerId : CombinedPeerReadState]) in
return account.postbox.transaction { transaction -> ([Message], [PeerId : CombinedPeerReadState], Int32) in
var readStates: [PeerId : CombinedPeerReadState] = [:]
if let readState = transaction.getCombinedPeerReadState(peerId) {
readStates[peerId] = readState
}
return (transaction.searchMessages(peerId: peerId, query: query, tags: tags), readStates)
let result = transaction.searchMessages(peerId: peerId, query: query, tags: tags)
return (result, readStates, Int32(result.count))
}
}
@ -69,22 +47,22 @@ public func searchMessages(account: Account, location: SearchMessagesLocation, q
}
remoteSearchResult = account.postbox.transaction { transaction -> (peer:Peer?, from: Peer?) in
if let fromId = fromId {
return (peer: transaction.getPeer(peerId), from: transaction.getPeer(fromId))
}
return (peer: transaction.getPeer(peerId), from: nil)
if let fromId = fromId {
return (peer: transaction.getPeer(peerId), from: transaction.getPeer(fromId))
}
return (peer: transaction.getPeer(peerId), from: nil)
}
|> mapToSignal { values -> Signal<Api.messages.Messages?, NoError> in
if let peer = values.peer, let inputPeer = apiInputPeer(peer) {
var fromInputUser:Api.InputUser? = nil
var flags:Int32 = 0
var fromInputUser: Api.InputUser? = nil
var flags: Int32 = 0
if let from = values.from {
fromInputUser = apiInputUser(from)
if let _ = fromInputUser {
flags |= (1 << 0)
}
}
return account.network.request(Api.functions.messages.search(flags: flags, peer: inputPeer, q: query, fromId: fromInputUser, filter: filter, minDate: 0, maxDate: Int32.max - 1, offsetId: 0, addOffset: 0, limit: 100, maxId: Int32.max - 1, minId: 0, hash: 0))
return account.network.request(Api.functions.messages.search(flags: flags, peer: inputPeer, q: query, fromId: fromInputUser, filter: filter, minDate: 0, maxDate: Int32.max - 1, offsetId: lowerBound?.id.id ?? 0, addOffset: 0, limit: limit, maxId: Int32.max - 1, minId: 0, hash: 0))
|> map(Optional.init)
|> `catch` { _ -> Signal<Api.messages.Messages?, NoError> in
return .single(nil)
@ -99,7 +77,7 @@ public func searchMessages(account: Account, location: SearchMessagesLocation, q
/*remoteSearchResult = account.network.request(Api.functions.channels.searchFeed(feedId: groupId.rawValue, q: query, offsetDate: 0, offsetPeer: Api.InputPeer.inputPeerEmpty, offsetId: 0, limit: 64), automaticFloodWait: false)
|> mapError { _ in } |> map(Optional.init)*/
case .general:
remoteSearchResult = account.network.request(Api.functions.messages.searchGlobal(q: query, offsetDate: 0, offsetPeer: Api.InputPeer.inputPeerEmpty, offsetId: 0, limit: 64), automaticFloodWait: false)
remoteSearchResult = account.network.request(Api.functions.messages.searchGlobal(q: query, offsetDate: 0, offsetPeer: Api.InputPeer.inputPeerEmpty, offsetId: 0, limit: limit), automaticFloodWait: false)
|> map(Optional.init)
|> `catch` { _ -> Signal<Api.messages.Messages?, NoError> in
return .single(nil)
@ -107,82 +85,84 @@ public func searchMessages(account: Account, location: SearchMessagesLocation, q
}
let processedSearchResult = remoteSearchResult
|> mapToSignal { result -> Signal<([Message], [PeerId : CombinedPeerReadState]), NoError> in
guard let result = result else {
return .single(([], [:]))
}
//assert(false)
let messages: [Api.Message]
let chats: [Api.Chat]
let users: [Api.User]
switch result {
case let .channelMessages(_, _, _, apiMessages, apiChats, apiUsers):
messages = apiMessages
chats = apiChats
users = apiUsers
case let .messages(apiMessages, apiChats, apiUsers):
messages = apiMessages
chats = apiChats
users = apiUsers
case let.messagesSlice(_, apiMessages, apiChats, apiUsers):
messages = apiMessages
chats = apiChats
users = apiUsers
case .messagesNotModified:
messages = []
chats = []
users = []
}
return account.postbox.transaction { transaction -> ([Message], [PeerId : CombinedPeerReadState]) in
var peers: [PeerId: Peer] = [:]
for user in users {
if let user = TelegramUser.merge(transaction.getPeer(user.peerId) as? TelegramUser, rhs: user) {
peers[user.id] = user
}
}
for chat in chats {
if let groupOrChannel = parseTelegramGroupOrChannel(chat: chat) {
peers[groupOrChannel.id] = groupOrChannel
}
}
var peerIdsSet: Set<PeerId> = Set()
var readStates:[PeerId : CombinedPeerReadState] = [:]
var renderedMessages: [Message] = []
for message in messages {
if let message = StoreMessage(apiMessage: message), let renderedMessage = locallyRenderedMessage(message: message, peers: peers) {
renderedMessages.append(renderedMessage)
peerIdsSet.insert(message.id.peerId)
}
}
for peerId in peerIdsSet {
if let readState = transaction.getCombinedPeerReadState(peerId) {
readStates[peerId] = readState
}
}
if case .general = location {
let secretMessages = transaction.searchMessages(peerId: nil, query: query, tags: nil)
renderedMessages.append(contentsOf: secretMessages)
}
renderedMessages.sort(by: { lhs, rhs in
return MessageIndex(lhs) > MessageIndex(rhs)
})
return (renderedMessages, readStates)
}
|> mapToSignal { result -> Signal<([Message], [PeerId : CombinedPeerReadState], Int32), NoError> in
guard let result = result else {
return .single(([], [:], 0))
}
//assert(false)
let messages: [Api.Message]
let chats: [Api.Chat]
let users: [Api.User]
let totalCount: Int32
switch result {
case let .channelMessages(_, _, count, apiMessages, apiChats, apiUsers):
messages = apiMessages
chats = apiChats
users = apiUsers
totalCount = count
case let .messages(apiMessages, apiChats, apiUsers):
messages = apiMessages
chats = apiChats
users = apiUsers
totalCount = Int32(messages.count)
case let.messagesSlice(count, apiMessages, apiChats, apiUsers):
messages = apiMessages
chats = apiChats
users = apiUsers
totalCount = count
case .messagesNotModified:
messages = []
chats = []
users = []
totalCount = 0
}
return account.postbox.transaction { transaction -> ([Message], [PeerId : CombinedPeerReadState], Int32) in
var peers: [PeerId: Peer] = [:]
for user in users {
if let user = TelegramUser.merge(transaction.getPeer(user.peerId) as? TelegramUser, rhs: user) {
peers[user.id] = user
}
}
for chat in chats {
if let groupOrChannel = parseTelegramGroupOrChannel(chat: chat) {
peers[groupOrChannel.id] = groupOrChannel
}
}
var peerIdsSet: Set<PeerId> = Set()
var readStates:[PeerId : CombinedPeerReadState] = [:]
var renderedMessages: [Message] = []
for message in messages {
if let message = StoreMessage(apiMessage: message), let renderedMessage = locallyRenderedMessage(message: message, peers: peers) {
renderedMessages.append(renderedMessage)
peerIdsSet.insert(message.id.peerId)
}
}
for peerId in peerIdsSet {
if let readState = transaction.getCombinedPeerReadState(peerId) {
readStates[peerId] = readState
}
}
if case .general = location {
let secretMessages = transaction.searchMessages(peerId: nil, query: query, tags: nil)
renderedMessages.append(contentsOf: secretMessages)
}
renderedMessages.sort(by: { lhs, rhs in
return MessageIndex(lhs) > MessageIndex(rhs)
})
return (renderedMessages, readStates, totalCount)
}
}
return processedSearchResult
}

View File

@ -10,37 +10,71 @@ import Foundation
#endif
public func webpagePreview(account: Account, url: String, webpageId: MediaId? = nil) -> Signal<TelegramMediaWebpage?, NoError> {
return account.postbox.transaction { transaction -> Signal<TelegramMediaWebpage?, NoError> in
if let webpageId = webpageId, let webpage = transaction.getMedia(webpageId) as? TelegramMediaWebpage {
return .single(webpage)
return webpagePreviewWithProgress(account: account, url: url)
|> mapToSignal { next -> Signal<TelegramMediaWebpage?, NoError> in
if case let .result(result) = next {
return .single(result)
} else {
return account.network.request(Api.functions.messages.getWebPagePreview(flags: 0, message: url, entities: nil))
|> `catch` { _ -> Signal<Api.MessageMedia, NoError> in
return .single(.messageMediaEmpty)
}
|> mapToSignal { result -> Signal<TelegramMediaWebpage?, NoError> in
if let preCachedResources = result.preCachedResources {
for (resource, data) in preCachedResources {
account.postbox.mediaBox.storeResourceData(resource.id, data: data)
}
}
switch result {
case let .messageMediaWebPage(webpage):
if let media = telegramMediaWebpageFromApiWebpage(webpage, url: url) {
if case .Loaded = media.content {
return .single(media)
} else {
return .single(media) |> then(account.stateManager.updatedWebpage(media.webpageId) |> map(Optional.init))
}
} else {
return .single(nil)
}
default:
return .single(nil)
}
}
return .complete()
}
} |> switchToLatest
}
}
public enum WebpagePreviewWithProgressResult {
case result(TelegramMediaWebpage?)
case progress(Float)
}
public func webpagePreviewWithProgress(account: Account, url: String, webpageId: MediaId? = nil) -> Signal<WebpagePreviewWithProgressResult, NoError> {
return account.postbox.transaction { transaction -> Signal<WebpagePreviewWithProgressResult, NoError> in
if let webpageId = webpageId, let webpage = transaction.getMedia(webpageId) as? TelegramMediaWebpage {
return .single(.result(webpage))
} else {
return account.network.requestWithAdditionalInfo(Api.functions.messages.getWebPagePreview(flags: 0, message: url, entities: nil), info: .progress)
|> `catch` { _ -> Signal<NetworkRequestResult<Api.MessageMedia>, NoError> in
return .single(.result(.messageMediaEmpty))
}
|> mapToSignal { result -> Signal<WebpagePreviewWithProgressResult, NoError> in
switch result {
case .acknowledged:
return .complete()
case let .progress(progress, packetSize):
if packetSize > 1024 {
return .single(.progress(progress))
} else {
return .complete()
}
case let .result(result):
if let preCachedResources = result.preCachedResources {
for (resource, data) in preCachedResources {
account.postbox.mediaBox.storeResourceData(resource.id, data: data)
}
}
switch result {
case let .messageMediaWebPage(webpage):
if let media = telegramMediaWebpageFromApiWebpage(webpage, url: url) {
if case .Loaded = media.content {
return .single(.result(media))
} else {
return .single(.result(media))
|> then(
account.stateManager.updatedWebpage(media.webpageId)
|> map { next -> WebpagePreviewWithProgressResult in
return .result(next)
}
)
}
} else {
return .single(.result(nil))
}
default:
return .single(.result(nil))
}
}
}
}
}
|> switchToLatest
}
public func actualizedWebpage(postbox: Postbox, network: Network, webpage: TelegramMediaWebpage) -> Signal<TelegramMediaWebpage, NoError> {