From 7fe5501b0618a29c081a447e9c93472426f90e0b Mon Sep 17 00:00:00 2001 From: Peter Iakovlev Date: Fri, 7 Dec 2018 17:25:52 +0400 Subject: [PATCH] Improved cache stats Added support for background downloads --- TelegramCore/Account.swift | 2 + TelegramCore/CollectCacheUsageStats.swift | 63 +++++++++++++++++-- TelegramCore/FetchedMediaResource.swift | 10 +-- ...nchronizeRecentlyUsedMediaOperations.swift | 2 +- ...anagedSynchronizeSavedGifsOperations.swift | 2 +- ...edSynchronizeSavedStickersOperations.swift | 2 +- TelegramCore/MultipartFetch.swift | 32 ++++++---- TelegramCore/MultiplexedRequestManager.swift | 60 ++++++++++-------- TelegramCore/Network.swift | 56 +++++++++-------- .../PendingMessageUploadedContent.swift | 2 +- TelegramCore/SearchMessages.swift | 4 ++ 11 files changed, 156 insertions(+), 79 deletions(-) diff --git a/TelegramCore/Account.swift b/TelegramCore/Account.swift index f5fd59b781..d35c3ac8a7 100644 --- a/TelegramCore/Account.swift +++ b/TelegramCore/Account.swift @@ -812,6 +812,7 @@ public class Account { public let shouldKeepOnlinePresence = Promise() public let autolockReportDeadline = Promise() public let shouldExplicitelyKeepWorkerConnections = Promise(false) + public let shouldKeepBackgroundDownloadConnections = Promise(false) private let networkStateValue = Promise(.waitingForNetwork) public var networkState: Signal { @@ -1035,6 +1036,7 @@ public class Account { self.network.shouldKeepConnection.set(shouldBeMaster) self.network.shouldExplicitelyKeepWorkerConnections.set(self.shouldExplicitelyKeepWorkerConnections.get()) + self.network.shouldKeepBackgroundDownloadConnections.set(self.shouldKeepBackgroundDownloadConnections.get()) let serviceTasksMaster = shouldBeMaster |> deliverOn(self.serviceQueue) diff --git a/TelegramCore/CollectCacheUsageStats.swift b/TelegramCore/CollectCacheUsageStats.swift index 0e524a50f5..04d9555eae 100644 --- a/TelegramCore/CollectCacheUsageStats.swift +++ b/TelegramCore/CollectCacheUsageStats.swift @@ -23,8 +23,9 @@ public struct CacheUsageStats { public let cacheSize: Int64 public let tempPaths: [String] public let tempSize: Int64 + public let immutableSize: Int64 - public init(media: [PeerId: [PeerCacheUsageCategory: [MediaId: Int64]]], mediaResourceIds: [MediaId: [MediaResourceId]], peers: [PeerId: Peer], otherSize: Int64, otherPaths: [String], cacheSize: Int64, tempPaths: [String], tempSize: Int64) { + public init(media: [PeerId: [PeerCacheUsageCategory: [MediaId: Int64]]], mediaResourceIds: [MediaId: [MediaResourceId]], peers: [PeerId: Peer], otherSize: Int64, otherPaths: [String], cacheSize: Int64, tempPaths: [String], tempSize: Int64, immutableSize: Int64) { self.media = media self.mediaResourceIds = mediaResourceIds self.peers = peers @@ -33,6 +34,7 @@ public struct CacheUsageStats { self.cacheSize = cacheSize self.tempPaths = tempPaths self.tempSize = tempSize + self.immutableSize = immutableSize } } @@ -53,7 +55,7 @@ private final class CacheUsageStatsState { var lowerBound: MessageIndex? } -public func collectCacheUsageStats(account: Account) -> Signal { +public func collectCacheUsageStats(account: Account, additionalCachePaths: [String], logFilesPath: String) -> Signal { let state = Atomic(value: CacheUsageStatsState()) let excludeResourceIds = account.postbox.transaction { transaction -> Set in @@ -85,14 +87,32 @@ public func collectCacheUsageStats(account: Account) -> Signal Signal Signal CacheUsageStats in var peers: [PeerId: Peer] = [:] for peerId in finalMedia.keys { @@ -182,7 +233,7 @@ public func collectCacheUsageStats(account: Account) -> Signal mapError { _ -> CollectCacheUsageStatsError in preconditionFailure() } |> mapToSignal { stats -> Signal in return .fail(.done(stats)) diff --git a/TelegramCore/FetchedMediaResource.swift b/TelegramCore/FetchedMediaResource.swift index 7a8a987485..27e171c6ff 100644 --- a/TelegramCore/FetchedMediaResource.swift +++ b/TelegramCore/FetchedMediaResource.swift @@ -495,19 +495,21 @@ extension MediaResourceReference { final class TelegramCloudMediaResourceFetchInfo: MediaResourceFetchInfo { let reference: MediaResourceReference let preferBackgroundReferenceRevalidation: Bool + let continueInBackground: Bool - init(reference: MediaResourceReference, preferBackgroundReferenceRevalidation: Bool) { + init(reference: MediaResourceReference, preferBackgroundReferenceRevalidation: Bool, continueInBackground: Bool) { self.reference = reference self.preferBackgroundReferenceRevalidation = preferBackgroundReferenceRevalidation + self.continueInBackground = continueInBackground } } -public func fetchedMediaResource(postbox: Postbox, reference: MediaResourceReference, range: (Range, MediaBoxFetchPriority)? = nil, statsCategory: MediaResourceStatsCategory = .generic, reportResultStatus: Bool = false, preferBackgroundReferenceRevalidation: Bool = false) -> Signal { +public func fetchedMediaResource(postbox: Postbox, reference: MediaResourceReference, range: (Range, MediaBoxFetchPriority)? = nil, statsCategory: MediaResourceStatsCategory = .generic, reportResultStatus: Bool = false, preferBackgroundReferenceRevalidation: Bool = false, continueInBackground: Bool = false) -> Signal { if let (range, priority) = range { - return postbox.mediaBox.fetchedResourceData(reference.resource, in: range, priority: priority, parameters: MediaResourceFetchParameters(tag: TelegramMediaResourceFetchTag(statsCategory: statsCategory), info: TelegramCloudMediaResourceFetchInfo(reference: reference, preferBackgroundReferenceRevalidation: preferBackgroundReferenceRevalidation))) + return postbox.mediaBox.fetchedResourceData(reference.resource, in: range, priority: priority, parameters: MediaResourceFetchParameters(tag: TelegramMediaResourceFetchTag(statsCategory: statsCategory), info: TelegramCloudMediaResourceFetchInfo(reference: reference, preferBackgroundReferenceRevalidation: preferBackgroundReferenceRevalidation, continueInBackground: continueInBackground))) |> map { _ in .local } } else { - return postbox.mediaBox.fetchedResource(reference.resource, parameters: MediaResourceFetchParameters(tag: TelegramMediaResourceFetchTag(statsCategory: statsCategory), info: TelegramCloudMediaResourceFetchInfo(reference: reference, preferBackgroundReferenceRevalidation: preferBackgroundReferenceRevalidation)), implNext: reportResultStatus) + return postbox.mediaBox.fetchedResource(reference.resource, parameters: MediaResourceFetchParameters(tag: TelegramMediaResourceFetchTag(statsCategory: statsCategory), info: TelegramCloudMediaResourceFetchInfo(reference: reference, preferBackgroundReferenceRevalidation: preferBackgroundReferenceRevalidation, continueInBackground: continueInBackground)), implNext: reportResultStatus) } } diff --git a/TelegramCore/ManagedSynchronizeRecentlyUsedMediaOperations.swift b/TelegramCore/ManagedSynchronizeRecentlyUsedMediaOperations.swift index 7ef8ed8245..bcd91d557c 100644 --- a/TelegramCore/ManagedSynchronizeRecentlyUsedMediaOperations.swift +++ b/TelegramCore/ManagedSynchronizeRecentlyUsedMediaOperations.swift @@ -154,7 +154,7 @@ private func synchronizeRecentlyUsedMedia(transaction: Transaction, postbox: Pos case .generic: return .fail(.generic) case .invalidReference: - return revalidateMediaResourceReference(postbox: postbox, network: network, revalidationContext: revalidationContext, info: TelegramCloudMediaResourceFetchInfo(reference: fileReference.resourceReference(fileReference.media.resource), preferBackgroundReferenceRevalidation: false), resource: fileReference.media.resource) + return revalidateMediaResourceReference(postbox: postbox, network: network, revalidationContext: revalidationContext, info: TelegramCloudMediaResourceFetchInfo(reference: fileReference.resourceReference(fileReference.media.resource), preferBackgroundReferenceRevalidation: false, continueInBackground: false), resource: fileReference.media.resource) |> mapError { _ -> SaveRecentlyUsedMediaError in return .generic } diff --git a/TelegramCore/ManagedSynchronizeSavedGifsOperations.swift b/TelegramCore/ManagedSynchronizeSavedGifsOperations.swift index cd24082656..e45367120d 100644 --- a/TelegramCore/ManagedSynchronizeSavedGifsOperations.swift +++ b/TelegramCore/ManagedSynchronizeSavedGifsOperations.swift @@ -150,7 +150,7 @@ private func synchronizeSavedGifs(transaction: Transaction, postbox: Postbox, ne case .generic: return .fail(.generic) case .invalidReference: - return revalidateMediaResourceReference(postbox: postbox, network: network, revalidationContext: revalidationContext, info: TelegramCloudMediaResourceFetchInfo(reference: fileReference.resourceReference(fileReference.media.resource), preferBackgroundReferenceRevalidation: false), resource: fileReference.media.resource) + return revalidateMediaResourceReference(postbox: postbox, network: network, revalidationContext: revalidationContext, info: TelegramCloudMediaResourceFetchInfo(reference: fileReference.resourceReference(fileReference.media.resource), preferBackgroundReferenceRevalidation: false, continueInBackground: false), resource: fileReference.media.resource) |> mapError { _ -> SaveGifError in return .generic } diff --git a/TelegramCore/ManagedSynchronizeSavedStickersOperations.swift b/TelegramCore/ManagedSynchronizeSavedStickersOperations.swift index 3ceb816cd8..af20154f39 100644 --- a/TelegramCore/ManagedSynchronizeSavedStickersOperations.swift +++ b/TelegramCore/ManagedSynchronizeSavedStickersOperations.swift @@ -150,7 +150,7 @@ private func synchronizeSavedStickers(transaction: Transaction, postbox: Postbox case .generic: return .fail(.generic) case .invalidReference: - return revalidateMediaResourceReference(postbox: postbox, network: network, revalidationContext: revalidationContext, info: TelegramCloudMediaResourceFetchInfo(reference: fileReference.resourceReference(fileReference.media.resource), preferBackgroundReferenceRevalidation: false), resource: fileReference.media.resource) + return revalidateMediaResourceReference(postbox: postbox, network: network, revalidationContext: revalidationContext, info: TelegramCloudMediaResourceFetchInfo(reference: fileReference.resourceReference(fileReference.media.resource), preferBackgroundReferenceRevalidation: false, continueInBackground: false), resource: fileReference.media.resource) |> mapError { _ -> SaveStickerError in return .generic } diff --git a/TelegramCore/MultipartFetch.swift b/TelegramCore/MultipartFetch.swift index a9bc70b17a..0d272e87a4 100644 --- a/TelegramCore/MultipartFetch.swift +++ b/TelegramCore/MultipartFetch.swift @@ -84,14 +84,14 @@ private struct DownloadWrapper { let isCdn: Bool let network: Network - func request(_ data: (FunctionDescription, Buffer, DeserializeFunctionResponse), tag: MediaResourceFetchTag?) -> Signal { + func request(_ data: (FunctionDescription, Buffer, DeserializeFunctionResponse), tag: MediaResourceFetchTag?, continueInBackground: Bool) -> Signal { let target: MultiplexedRequestTarget if self.isCdn { target = .cdn(Int(self.datacenterId)) } else { target = .main(Int(self.datacenterId)) } - return network.multiplexedRequestManager.request(to: target, consumerId: self.consumerId, data: data, tag: tag) + return network.multiplexedRequestManager.request(to: target, consumerId: self.consumerId, data: data, tag: tag, continueInBackground: continueInBackground) } } @@ -115,6 +115,7 @@ private final class MultipartCdnHashSource { private let fileToken: Data private let masterDownload: DownloadWrapper + private let continueInBackground: Bool private var knownUpperBound: Int32 private var hashes: [Int32: Data] = [:] @@ -123,14 +124,15 @@ private final class MultipartCdnHashSource { private var subscribers = Bag<(Int32, Int32, ([Int32: Data]) -> Void)>() - init(queue: Queue, fileToken: Data, hashes: [Int32: Data], masterDownload: DownloadWrapper) { + init(queue: Queue, fileToken: Data, hashes: [Int32: Data], masterDownload: DownloadWrapper, continueInBackground: Bool) { assert(queue.isCurrent()) self.queue = queue self.fileToken = fileToken self.masterDownload = masterDownload + self.continueInBackground = continueInBackground - var knownUpperBound: Int32 = 0 + let knownUpperBound: Int32 = 0 /*self.hashes = hashes for (offset, _) in hashes { assert(offset % dataHashLength == 0) @@ -221,7 +223,7 @@ private final class MultipartCdnHashSource { self.requestOffsetAndDisposable = (requestOffset, disposable) let queue = self.queue let fileToken = self.fileToken - disposable.set((self.masterDownload.request(Api.functions.upload.getCdnFileHashes(fileToken: Buffer(data: fileToken), offset: requestOffset), tag: nil) + disposable.set((self.masterDownload.request(Api.functions.upload.getCdnFileHashes(fileToken: Buffer(data: fileToken), offset: requestOffset), tag: nil, continueInBackground: self.continueInBackground) |> map { partHashes -> [Int32: Data] in var parsedPartHashes: [Int32: Data] = [:] for part in partHashes { @@ -270,7 +272,7 @@ private enum MultipartFetchSource { case master(location: MultipartFetchMasterLocation, download: DownloadWrapper) case cdn(masterDatacenterId: Int32, fileToken: Data, key: Data, iv: Data, download: DownloadWrapper, masterDownload: DownloadWrapper, hashSource: MultipartCdnHashSource) - func request(offset: Int32, limit: Int32, tag: MediaResourceFetchTag?, fileReference: Data?) -> Signal { + func request(offset: Int32, limit: Int32, tag: MediaResourceFetchTag?, fileReference: Data?, continueInBackground: Bool) -> Signal { switch self { case .none: return .never() @@ -283,7 +285,7 @@ private enum MultipartFetchSource { switch location { case let .generic(_, location): if let parsedLocation = location(fileReference) { - return download.request(Api.functions.upload.getFile(location: parsedLocation, offset: offset, limit: Int32(updatedLength)), tag: tag) + return download.request(Api.functions.upload.getFile(location: parsedLocation, offset: offset, limit: Int32(updatedLength)), tag: tag, continueInBackground: continueInBackground) |> mapError { error -> MultipartFetchDownloadError in if error.errorDescription.hasPrefix("FILEREF_INVALID") || error.errorDescription.hasPrefix("FILE_REFERENCE_") { return .revalidateMediaReference @@ -314,7 +316,7 @@ private enum MultipartFetchSource { return .fail(.revalidateMediaReference) } case let .web(_, location): - return download.request(Api.functions.upload.getWebFile(location: location, offset: offset, limit: Int32(updatedLength)), tag: tag) + return download.request(Api.functions.upload.getWebFile(location: location, offset: offset, limit: Int32(updatedLength)), tag: tag, continueInBackground: continueInBackground) |> mapError { error -> MultipartFetchDownloadError in return .generic } @@ -335,7 +337,7 @@ private enum MultipartFetchSource { updatedLength += 1 } - let part = download.request(Api.functions.upload.getCdnFile(fileToken: Buffer(data: fileToken), offset: offset, limit: Int32(updatedLength)), tag: nil) + let part = download.request(Api.functions.upload.getCdnFile(fileToken: Buffer(data: fileToken), offset: offset, limit: Int32(updatedLength)), tag: nil, continueInBackground: continueInBackground) |> mapError { _ -> MultipartFetchDownloadError in return .generic } @@ -402,6 +404,7 @@ private final class MultipartFetchManager { let postbox: Postbox let network: Network let revalidationContext: MediaReferenceRevalidationContext + let continueInBackground: Bool let partReady: (Int, Data) -> Void let reportCompleteSize: (Int) -> Void @@ -437,6 +440,9 @@ private final class MultipartFetchManager { if let info = parameters?.info as? TelegramCloudMediaResourceFetchInfo { self.fileReference = info.reference.apiFileReference + self.continueInBackground = info.continueInBackground + } else { + self.continueInBackground = false } self.state = MultipartDownloadState(encryptionKey: encryptionKey, decryptedSize: decryptedSize) @@ -587,7 +593,7 @@ private final class MultipartFetchManager { requestLimit = (requestLimit / self.partAlignment + 1) * self.partAlignment } - let part = self.source.request(offset: Int32(downloadRange.lowerBound), limit: Int32(requestLimit), tag: self.parameters?.tag, fileReference: self.fileReference) + let part = self.source.request(offset: Int32(downloadRange.lowerBound), limit: Int32(requestLimit), tag: self.parameters?.tag, fileReference: self.fileReference, continueInBackground: self.continueInBackground) |> deliverOn(self.queue) let partDisposable = MetaDisposable() self.fetchingParts[downloadRange.lowerBound] = (downloadRange.count, partDisposable) @@ -634,7 +640,7 @@ private final class MultipartFetchManager { case let .switchToCdn(id, token, key, iv, partHashes): switch strongSelf.source { case let .master(location, download): - strongSelf.source = .cdn(masterDatacenterId: location.datacenterId, fileToken: token, key: key, iv: iv, download: DownloadWrapper(consumerId: strongSelf.consumerId, datacenterId: id, isCdn: true, network: strongSelf.network), masterDownload: download, hashSource: MultipartCdnHashSource(queue: strongSelf.queue, fileToken: token, hashes: partHashes, masterDownload: download)) + strongSelf.source = .cdn(masterDatacenterId: location.datacenterId, fileToken: token, key: key, iv: iv, download: DownloadWrapper(consumerId: strongSelf.consumerId, datacenterId: id, isCdn: true, network: strongSelf.network), masterDownload: download, hashSource: MultipartCdnHashSource(queue: strongSelf.queue, fileToken: token, hashes: partHashes, masterDownload: download, continueInBackground: strongSelf.continueInBackground)) strongSelf.checkState() case .cdn, .none: break @@ -646,7 +652,7 @@ private final class MultipartFetchManager { case let .cdn(_, fileToken, _, _, _, masterDownload, _): if !strongSelf.reuploadingToCdn { strongSelf.reuploadingToCdn = true - let reupload: Signal<[Api.FileHash], NoError> = masterDownload.request(Api.functions.upload.reuploadCdnFile(fileToken: Buffer(data: fileToken), requestToken: Buffer(data: token)), tag: nil) + let reupload: Signal<[Api.FileHash], NoError> = masterDownload.request(Api.functions.upload.reuploadCdnFile(fileToken: Buffer(data: fileToken), requestToken: Buffer(data: token)), tag: nil, continueInBackground: strongSelf.continueInBackground) |> `catch` { _ -> Signal<[Api.FileHash], NoError> in return .single([]) } @@ -666,7 +672,7 @@ private final class MultipartFetchManager { } } -func multipartFetch(account: Account, resource: TelegramMediaResource, datacenterId: Int, size: Int?, intervals: Signal<[(Range, MediaBoxFetchPriority)], NoError>, parameters: MediaResourceFetchParameters?, encryptionKey: SecretFileEncryptionKey? = nil, decryptedSize: Int32? = nil) -> Signal { +func multipartFetch(account: Account, resource: TelegramMediaResource, datacenterId: Int, size: Int?, intervals: Signal<[(Range, MediaBoxFetchPriority)], NoError>, parameters: MediaResourceFetchParameters?, encryptionKey: SecretFileEncryptionKey? = nil, decryptedSize: Int32? = nil, continueInBackground: Bool = false) -> Signal { return Signal { subscriber in let location: MultipartFetchMasterLocation if let resource = resource as? TelegramCloudMediaResource { diff --git a/TelegramCore/MultiplexedRequestManager.swift b/TelegramCore/MultiplexedRequestManager.swift index 6d1000cda1..6fa6bb76fe 100644 --- a/TelegramCore/MultiplexedRequestManager.swift +++ b/TelegramCore/MultiplexedRequestManager.swift @@ -14,6 +14,11 @@ enum MultiplexedRequestTarget: Equatable, Hashable { case cdn(Int) } +private struct MultiplexedRequestTargetKey: Equatable, Hashable { + let target: MultiplexedRequestTarget + let continueInBackground: Bool +} + private final class RequestData { let id: Int32 let consumerId: Int64 @@ -21,16 +26,18 @@ private final class RequestData { let functionDescription: FunctionDescription let payload: Buffer let tag: MediaResourceFetchTag? + let continueInBackground: Bool let deserializeResponse: (Buffer) -> Any? let completed: (Any) -> Void let error: (MTRpcError) -> Void - init(id: Int32, consumerId: Int64, target: MultiplexedRequestTarget, functionDescription: FunctionDescription, payload: Buffer, tag: MediaResourceFetchTag?, deserializeResponse: @escaping (Buffer) -> Any?, completed: @escaping (Any) -> Void, error: @escaping (MTRpcError) -> Void) { + init(id: Int32, consumerId: Int64, target: MultiplexedRequestTarget, functionDescription: FunctionDescription, payload: Buffer, tag: MediaResourceFetchTag?, continueInBackground: Bool, deserializeResponse: @escaping (Buffer) -> Any?, completed: @escaping (Any) -> Void, error: @escaping (MTRpcError) -> Void) { self.id = id self.consumerId = consumerId self.target = target self.functionDescription = functionDescription self.tag = tag + self.continueInBackground = continueInBackground self.payload = payload self.deserializeResponse = deserializeResponse self.completed = completed @@ -61,7 +68,7 @@ private final class RequestTargetContext { } private struct MultiplexedRequestTargetTimerKey: Equatable, Hashable { - let target: MultiplexedRequestTarget + let key: MultiplexedRequestTargetKey let id: Int32 } @@ -73,21 +80,21 @@ private typealias SignalKitTimer = SwiftSignalKit.Timer private final class MultiplexedRequestManagerContext { private let queue: Queue - private let takeWorker: (MultiplexedRequestTarget, MediaResourceFetchTag?) -> Download? + private let takeWorker: (MultiplexedRequestTarget, MediaResourceFetchTag?, Bool) -> Download? private var queuedRequests: [RequestData] = [] private var nextId: Int32 = 0 - private var targetContexts: [MultiplexedRequestTarget: [RequestTargetContext]] = [:] + private var targetContexts: [MultiplexedRequestTargetKey: [RequestTargetContext]] = [:] private var emptyTargetTimers: [MultiplexedRequestTargetTimerKey: SignalKitTimer] = [:] - init(queue: Queue, takeWorker: @escaping (MultiplexedRequestTarget, MediaResourceFetchTag?) -> Download?) { + init(queue: Queue, takeWorker: @escaping (MultiplexedRequestTarget, MediaResourceFetchTag?, Bool) -> Download?) { self.queue = queue self.takeWorker = takeWorker } deinit { - for targetContextList in targetContexts.values { + for targetContextList in self.targetContexts.values { for targetContext in targetContextList { for request in targetContext.requests { request.disposable.dispose() @@ -99,10 +106,12 @@ private final class MultiplexedRequestManagerContext { } } - func request(to target: MultiplexedRequestTarget, consumerId: Int64, data: (FunctionDescription, Buffer, (Buffer) -> Any?), tag: MediaResourceFetchTag?, completed: @escaping (Any) -> Void, error: @escaping (MTRpcError) -> Void) -> Disposable { + func request(to target: MultiplexedRequestTarget, consumerId: Int64, data: (FunctionDescription, Buffer, (Buffer) -> Any?), tag: MediaResourceFetchTag?, continueInBackground: Bool, completed: @escaping (Any) -> Void, error: @escaping (MTRpcError) -> Void) -> Disposable { + let targetKey = MultiplexedRequestTargetKey(target: target, continueInBackground: continueInBackground) + let requestId = self.nextId self.nextId += 1 - self.queuedRequests.append(RequestData(id: requestId, consumerId: consumerId, target: target, functionDescription: data.0, payload: data.1, tag: tag, deserializeResponse: { buffer in + self.queuedRequests.append(RequestData(id: requestId, consumerId: consumerId, target: target, functionDescription: data.0, payload: data.1, tag: tag, continueInBackground: continueInBackground, deserializeResponse: { buffer in return data.2(buffer) }, completed: { result in completed(result) @@ -125,8 +134,8 @@ private final class MultiplexedRequestManagerContext { } } - if strongSelf.targetContexts[target] != nil { - outer: for targetContext in strongSelf.targetContexts[target]! { + if strongSelf.targetContexts[targetKey] != nil { + outer: for targetContext in strongSelf.targetContexts[targetKey]! { for i in 0 ..< targetContext.requests.count { if targetContext.requests[i].requestId == requestId { targetContext.requests[i].disposable.dispose() @@ -149,23 +158,24 @@ private final class MultiplexedRequestManagerContext { var requestIndex = 0 while requestIndex < self.queuedRequests.count { let request = self.queuedRequests[requestIndex] + let targetKey = MultiplexedRequestTargetKey(target: request.target, continueInBackground: request.continueInBackground) - if self.targetContexts[request.target] == nil { - self.targetContexts[request.target] = [] + if self.targetContexts[targetKey] == nil { + self.targetContexts[targetKey] = [] } var selectedContext: RequestTargetContext? - for targetContext in self.targetContexts[request.target]! { + for targetContext in self.targetContexts[targetKey]! { if targetContext.requests.count < maxRequestsPerWorker { selectedContext = targetContext break } } - if selectedContext == nil && self.targetContexts[request.target]!.count < maxWorkersPerTarget { - if let worker = self.takeWorker(request.target, request.tag) { + if selectedContext == nil && self.targetContexts[targetKey]!.count < maxWorkersPerTarget { + if let worker = self.takeWorker(request.target, request.tag, request.continueInBackground) { let contextId = self.nextId self.nextId += 1 let targetContext = RequestTargetContext(id: contextId, worker: worker) - self.targetContexts[request.target]!.append(targetContext) + self.targetContexts[targetKey]!.append(targetContext) selectedContext = targetContext } else { Logger.shared.log("MultiplexedRequestManager", "couldn't take worker") @@ -221,9 +231,9 @@ private final class MultiplexedRequestManagerContext { } private func checkEmptyContexts() { - for (target, contexts) in self.targetContexts { + for (targetKey, contexts) in self.targetContexts { for context in contexts { - let key = MultiplexedRequestTargetTimerKey(target: target, id: context.id) + let key = MultiplexedRequestTargetTimerKey(key: targetKey, id: context.id) if context.requests.isEmpty { if self.emptyTargetTimers[key] == nil { let timer = SignalKitTimer(timeout: 2.0, repeat: false, completion: { [weak self] in @@ -231,10 +241,10 @@ private final class MultiplexedRequestManagerContext { return } strongSelf.emptyTargetTimers.removeValue(forKey: key) - if strongSelf.targetContexts[target] != nil { - for i in 0 ..< strongSelf.targetContexts[target]!.count { - if strongSelf.targetContexts[target]![i].id == key.id { - strongSelf.targetContexts[target]!.remove(at: i) + if strongSelf.targetContexts[targetKey] != nil { + for i in 0 ..< strongSelf.targetContexts[targetKey]!.count { + if strongSelf.targetContexts[targetKey]![i].id == key.id { + strongSelf.targetContexts[targetKey]!.remove(at: i) break } } @@ -258,20 +268,20 @@ final class MultiplexedRequestManager { private let queue = Queue() private let context: QueueLocalObject - init(takeWorker: @escaping (MultiplexedRequestTarget, MediaResourceFetchTag?) -> Download?) { + init(takeWorker: @escaping (MultiplexedRequestTarget, MediaResourceFetchTag?, Bool) -> Download?) { let queue = self.queue self.context = QueueLocalObject(queue: self.queue, generate: { return MultiplexedRequestManagerContext(queue: queue, takeWorker: takeWorker) }) } - func request(to target: MultiplexedRequestTarget, consumerId: Int64, data: (FunctionDescription, Buffer, DeserializeFunctionResponse), tag: MediaResourceFetchTag?) -> Signal { + func request(to target: MultiplexedRequestTarget, consumerId: Int64, data: (FunctionDescription, Buffer, DeserializeFunctionResponse), tag: MediaResourceFetchTag?, continueInBackground: Bool) -> Signal { return Signal { subscriber in let disposable = MetaDisposable() self.context.with { context in disposable.set(context.request(to: target, consumerId: consumerId, data: (data.0, data.1, { buffer in return data.2.parse(buffer) - }), tag: tag, completed: { result in + }), tag: tag, continueInBackground: continueInBackground, completed: { result in if let result = result as? T { subscriber.putNext(result) subscriber.putCompletion() diff --git a/TelegramCore/Network.swift b/TelegramCore/Network.swift index 5802d3478f..015f9451fd 100644 --- a/TelegramCore/Network.swift +++ b/TelegramCore/Network.swift @@ -583,6 +583,7 @@ public final class Network: NSObject, MTRequestMessageServiceDelegate { private let shouldKeepConnectionDisposable = MetaDisposable() public let shouldExplicitelyKeepWorkerConnections = Promise(false) + public let shouldKeepBackgroundDownloadConnections = Promise(false) public var mockConnectionStatus: ConnectionStatus? { didSet { @@ -611,30 +612,30 @@ public final class Network: NSObject, MTRequestMessageServiceDelegate { context.add(NetworkHelper(requestPublicKeys: { [weak self] id in if let strongSelf = self { return strongSelf.request(Api.functions.help.getCdnConfig()) - |> map(Optional.init) - |> `catch` { _ -> Signal in - return .single(nil) - } - |> map { result -> NSArray in - let array = NSMutableArray() - if let result = result { - switch result { - case let .cdnConfig(publicKeys): - for key in publicKeys { - switch key { - case let .cdnPublicKey(dcId, publicKey): - if id == Int(dcId) { - let dict = NSMutableDictionary() - dict["key"] = publicKey - dict["fingerprint"] = MTRsaFingerprint(publicKey) - array.add(dict) - } - } + |> map(Optional.init) + |> `catch` { _ -> Signal in + return .single(nil) + } + |> map { result -> NSArray in + let array = NSMutableArray() + if let result = result { + switch result { + case let .cdnConfig(publicKeys): + for key in publicKeys { + switch key { + case let .cdnPublicKey(dcId, publicKey): + if id == Int(dcId) { + let dict = NSMutableDictionary() + dict["key"] = publicKey + dict["fingerprint"] = MTRsaFingerprint(publicKey) + array.add(dict) + } } - } + } } - return array } + return array + } } else { return .never() } @@ -649,7 +650,7 @@ public final class Network: NSObject, MTRequestMessageServiceDelegate { })) requestService.delegate = self - self._multiplexedRequestManager = MultiplexedRequestManager(takeWorker: { [weak self] target, tag in + self._multiplexedRequestManager = MultiplexedRequestManager(takeWorker: { [weak self] target, tag, continueInBackground in if let strongSelf = self { let datacenterId: Int let isCdn: Bool @@ -662,7 +663,7 @@ public final class Network: NSObject, MTRequestMessageServiceDelegate { datacenterId = id isCdn = true } - return strongSelf.makeWorker(datacenterId: datacenterId, isCdn: isCdn, isMedia: isMedia, tag: tag) + return strongSelf.makeWorker(datacenterId: datacenterId, isCdn: isCdn, isMedia: isMedia, tag: tag, continueInBackground: continueInBackground) } return nil }) @@ -717,10 +718,11 @@ public final class Network: NSObject, MTRequestMessageServiceDelegate { return self.worker(datacenterId: self.datacenterId, isCdn: false, isMedia: false, tag: nil) } - private func makeWorker(datacenterId: Int, isCdn: Bool, isMedia: Bool, tag: MediaResourceFetchTag?) -> Download { - let shouldKeepWorkerConnection: Signal = combineLatest(self.shouldKeepConnection.get(), self.shouldExplicitelyKeepWorkerConnections.get()) - |> map { shouldKeepConnection, shouldExplicitelyKeepWorkerConnections -> Bool in - return shouldKeepConnection || shouldExplicitelyKeepWorkerConnections + private func makeWorker(datacenterId: Int, isCdn: Bool, isMedia: Bool, tag: MediaResourceFetchTag?, continueInBackground: Bool = false) -> Download { + let queue = Queue.mainQueue() + let shouldKeepWorkerConnection: Signal = combineLatest(queue: queue, self.shouldKeepConnection.get(), self.shouldExplicitelyKeepWorkerConnections.get(), self.shouldKeepBackgroundDownloadConnections.get()) + |> map { shouldKeepConnection, shouldExplicitelyKeepWorkerConnections, shouldKeepBackgroundDownloadConnections -> Bool in + return shouldKeepConnection || shouldExplicitelyKeepWorkerConnections || (continueInBackground && shouldKeepBackgroundDownloadConnections) } |> distinctUntilChanged return Download(queue: self.queue, datacenterId: datacenterId, isMedia: isMedia, isCdn: isCdn, context: self.context, masterDatacenterId: self.datacenterId, usageInfo: usageCalculationInfo(basePath: self.basePath, category: (tag as? TelegramMediaResourceFetchTag)?.statsCategory), shouldKeepConnection: shouldKeepWorkerConnection) diff --git a/TelegramCore/PendingMessageUploadedContent.swift b/TelegramCore/PendingMessageUploadedContent.swift index 15c7acb24d..13fa16df66 100644 --- a/TelegramCore/PendingMessageUploadedContent.swift +++ b/TelegramCore/PendingMessageUploadedContent.swift @@ -103,7 +103,7 @@ func mediaContentToUpload(network: Network, postbox: Postbox, auxiliaryMethods: } else { mediaReference = .savedGif(media: file) } - return revalidateMediaResourceReference(postbox: postbox, network: network, revalidationContext: revalidationContext, info: TelegramCloudMediaResourceFetchInfo(reference: mediaReference.resourceReference(file.resource), preferBackgroundReferenceRevalidation: false), resource: resource) + return revalidateMediaResourceReference(postbox: postbox, network: network, revalidationContext: revalidationContext, info: TelegramCloudMediaResourceFetchInfo(reference: mediaReference.resourceReference(file.resource), preferBackgroundReferenceRevalidation: false, continueInBackground: false), resource: resource) |> mapError { _ -> PendingMessageUploadError in return .generic } diff --git a/TelegramCore/SearchMessages.swift b/TelegramCore/SearchMessages.swift index b24e55c4c4..4265e35ebd 100644 --- a/TelegramCore/SearchMessages.swift +++ b/TelegramCore/SearchMessages.swift @@ -324,6 +324,10 @@ func fetchRemoteMessage(postbox: Postbox, source: FetchMessageHistoryHoleSource, } } + updatePeers(transaction: transaction, peers: Array(peers.values), update: { _, updated in + return updated + }) + var renderedMessages: [Message] = [] for message in messages { if let message = StoreMessage(apiMessage: message), case let .Id(updatedId) = message.id {