From d1fc502ba2cc673a0eb797b5dc8c262ec9bb8c00 Mon Sep 17 00:00:00 2001 From: Ali <> Date: Wed, 25 Aug 2021 14:31:00 +0200 Subject: [PATCH] Fix cdn file random access --- .../Sources/Network/MultipartFetch.swift | 173 ++++++++++++++++-- 1 file changed, 160 insertions(+), 13 deletions(-) diff --git a/submodules/TelegramCore/Sources/Network/MultipartFetch.swift b/submodules/TelegramCore/Sources/Network/MultipartFetch.swift index 6ec67cecc1..3d0ff61557 100644 --- a/submodules/TelegramCore/Sources/Network/MultipartFetch.swift +++ b/submodules/TelegramCore/Sources/Network/MultipartFetch.swift @@ -109,18 +109,46 @@ private func roundUp(_ value: Int, to multiple: Int) -> Int { private let dataHashLength: Int32 = 128 * 1024 private final class MultipartCdnHashSource { + private final class ClusterContext { + final class Subscriber { + let completion: ([Int32: Data]) -> Void + let error: (MultipartFetchDownloadError) -> Void + + init(completion: @escaping ([Int32: Data]) -> Void, error: @escaping (MultipartFetchDownloadError) -> Void) { + self.completion = completion + self.error = error + } + } + + let disposable: Disposable + let subscribers = Bag() + + var result: [Int32: Data]? + var error: MultipartFetchDownloadError? + + init(disposable: Disposable) { + self.disposable = disposable + } + + deinit { + self.disposable.dispose() + } + } + private let queue: Queue private let fileToken: Data private let masterDownload: DownloadWrapper private let continueInBackground: Bool + + private var clusterContexts: [Int32: ClusterContext] = [:] - private var knownUpperBound: Int32 + /*private var knownUpperBound: Int32 private var hashes: [Int32: Data] = [:] private var requestOffsetAndDisposable: (Int32, Disposable)? private var requestedUpperBound: Int32? - private var subscribers = Bag<(Int32, Int32, ([Int32: Data]) -> Void)>() + private var subscribers = Bag<(Int32, Int32, ([Int32: Data]) -> Void)>()*/ init(queue: Queue, fileToken: Data, hashes: [Int32: Data], masterDownload: DownloadWrapper, continueInBackground: Bool) { assert(queue.isCurrent()) @@ -130,22 +158,17 @@ private final class MultipartCdnHashSource { self.masterDownload = masterDownload self.continueInBackground = continueInBackground - let knownUpperBound: Int32 = 0 - /*self.hashes = hashes - for (offset, _) in hashes { - assert(offset % dataHashLength == 0) - knownUpperBound = max(knownUpperBound, offset + dataHashLength) - }*/ - self.knownUpperBound = knownUpperBound + /*let knownUpperBound: Int32 = 0 + self.knownUpperBound = knownUpperBound*/ } deinit { assert(self.queue.isCurrent()) - self.requestOffsetAndDisposable?.1.dispose() + //self.requestOffsetAndDisposable?.1.dispose() } - private func take(offset: Int32, limit: Int32) -> [Int32: Data]? { + /*private func take(offset: Int32, limit: Int32) -> [Int32: Data]? { assert(offset % dataHashLength == 0) assert(limit % dataHashLength == 0) @@ -162,9 +185,9 @@ private final class MultipartCdnHashSource { } return result - } + }*/ - func get(offset: Int32, limit: Int32) -> Signal<[Int32: Data], MultipartFetchDownloadError> { + /*func get(offset: Int32, limit: Int32) -> Signal<[Int32: Data], MultipartFetchDownloadError> { assert(self.queue.isCurrent()) let queue = self.queue @@ -262,6 +285,130 @@ private final class MultipartCdnHashSource { } } })) + }*/ + + func getCluster(offset: Int32, completion: @escaping ([Int32: Data]) -> Void, error: @escaping (MultipartFetchDownloadError) -> Void) -> Disposable { + precondition(offset % (1 * 1024 * 1024) == 0) + + let clusterContext: ClusterContext + if let current = self.clusterContexts[offset] { + clusterContext = current + } else { + let disposable = MetaDisposable() + clusterContext = ClusterContext(disposable: disposable) + self.clusterContexts[offset] = clusterContext + + disposable.set((self.masterDownload.request(Api.functions.upload.getCdnFileHashes(fileToken: Buffer(data: self.fileToken), offset: offset), tag: nil, continueInBackground: self.continueInBackground) + |> map { partHashes -> [Int32: Data] in + var parsedPartHashes: [Int32: Data] = [:] + for part in partHashes { + switch part { + case let .fileHash(offset, limit, bytes): + assert(limit == 128 * 1024) + parsedPartHashes[offset] = bytes.makeData() + } + } + return parsedPartHashes + } + |> deliverOn(self.queue)).start(next: { [weak self, weak clusterContext] result in + guard let _ = self, let clusterContext = clusterContext else { + return + } + clusterContext.result = result + for subscriber in clusterContext.subscribers.copyItems() { + subscriber.completion(result) + } + }, error: { [weak self, weak clusterContext] _ in + guard let _ = self, let clusterContext = clusterContext else { + return + } + clusterContext.error = .generic + for subscriber in clusterContext.subscribers.copyItems() { + subscriber.error(.generic) + } + })) + } + + if let result = clusterContext.result { + completion(result) + + return EmptyDisposable + } else if let errorValue = clusterContext.error { + error(errorValue) + + return EmptyDisposable + } else { + let index = clusterContext.subscribers.add(ClusterContext.Subscriber(completion: completion, error: error)) + let queue = self.queue + return ActionDisposable { [weak self, weak clusterContext] in + queue.async { + guard let strongSelf = self, let clusterContext = clusterContext else { + return + } + clusterContext.subscribers.remove(index) + if clusterContext.subscribers.isEmpty { + if strongSelf.clusterContexts[offset] === clusterContext { + strongSelf.clusterContexts.removeValue(forKey: offset) + } + } + } + } + } + } + + private func cluster(offset: Int32) -> Signal<[Int32: Data], MultipartFetchDownloadError> { + let queue = self.queue + return Signal { [weak self] subscriber in + let disposable = MetaDisposable() + + queue.async { + guard let strongSelf = self else { + subscriber.putError(.generic) + return + } + + disposable.set(strongSelf.getCluster(offset: offset, completion: { result in + subscriber.putNext(result) + subscriber.putCompletion() + }, error: { error in + subscriber.putError(error) + })) + } + + return disposable + } + } + + func get(offset: Int32, limit: Int32) -> Signal<[Int32: Data], MultipartFetchDownloadError> { + precondition(offset % dataHashLength == 0) + precondition((offset + limit) % dataHashLength == 0) + + var clusterOffsets = Set() + for partOffset in stride(from: offset, to: offset + limit, by: Int(dataHashLength)) { + clusterOffsets.insert(partOffset - (partOffset % (1 * 1024 * 1024))) + } + + return combineLatest(clusterOffsets.map { clusterOffset in + return self.cluster(offset: clusterOffset) + }) + |> mapToSignal { clusterResults -> Signal<[Int32: Data], MultipartFetchDownloadError> in + var result: [Int32: Data] = [:] + + for partOffset in stride(from: offset, to: offset + limit, by: Int(dataHashLength)) { + var found = false + for cluster in clusterResults { + if let data = cluster[partOffset] { + result[partOffset] = data + found = true + } + } + if !found { + return .fail(.generic) + } + } + + return .single(result) + } } }