Fix cdn file random access

This commit is contained in:
Ali
2021-08-25 14:31:00 +02:00
parent 231f5654ed
commit d1fc502ba2

View File

@@ -109,18 +109,46 @@ private func roundUp(_ value: Int, to multiple: Int) -> Int {
private let dataHashLength: Int32 = 128 * 1024
private final class MultipartCdnHashSource {
private final class ClusterContext {
final class Subscriber {
let completion: ([Int32: Data]) -> Void
let error: (MultipartFetchDownloadError) -> Void
init(completion: @escaping ([Int32: Data]) -> Void, error: @escaping (MultipartFetchDownloadError) -> Void) {
self.completion = completion
self.error = error
}
}
let disposable: Disposable
let subscribers = Bag<Subscriber>()
var result: [Int32: Data]?
var error: MultipartFetchDownloadError?
init(disposable: Disposable) {
self.disposable = disposable
}
deinit {
self.disposable.dispose()
}
}
private let queue: Queue
private let fileToken: Data
private let masterDownload: DownloadWrapper
private let continueInBackground: Bool
private var clusterContexts: [Int32: ClusterContext] = [:]
private var knownUpperBound: Int32
/*private var knownUpperBound: Int32
private var hashes: [Int32: Data] = [:]
private var requestOffsetAndDisposable: (Int32, Disposable)?
private var requestedUpperBound: Int32?
private var subscribers = Bag<(Int32, Int32, ([Int32: Data]) -> Void)>()
private var subscribers = Bag<(Int32, Int32, ([Int32: Data]) -> Void)>()*/
init(queue: Queue, fileToken: Data, hashes: [Int32: Data], masterDownload: DownloadWrapper, continueInBackground: Bool) {
assert(queue.isCurrent())
@@ -130,22 +158,17 @@ private final class MultipartCdnHashSource {
self.masterDownload = masterDownload
self.continueInBackground = continueInBackground
let knownUpperBound: Int32 = 0
/*self.hashes = hashes
for (offset, _) in hashes {
assert(offset % dataHashLength == 0)
knownUpperBound = max(knownUpperBound, offset + dataHashLength)
}*/
self.knownUpperBound = knownUpperBound
/*let knownUpperBound: Int32 = 0
self.knownUpperBound = knownUpperBound*/
}
deinit {
assert(self.queue.isCurrent())
self.requestOffsetAndDisposable?.1.dispose()
//self.requestOffsetAndDisposable?.1.dispose()
}
private func take(offset: Int32, limit: Int32) -> [Int32: Data]? {
/*private func take(offset: Int32, limit: Int32) -> [Int32: Data]? {
assert(offset % dataHashLength == 0)
assert(limit % dataHashLength == 0)
@@ -162,9 +185,9 @@ private final class MultipartCdnHashSource {
}
return result
}
}*/
func get(offset: Int32, limit: Int32) -> Signal<[Int32: Data], MultipartFetchDownloadError> {
/*func get(offset: Int32, limit: Int32) -> Signal<[Int32: Data], MultipartFetchDownloadError> {
assert(self.queue.isCurrent())
let queue = self.queue
@@ -262,6 +285,130 @@ private final class MultipartCdnHashSource {
}
}
}))
}*/
func getCluster(offset: Int32, completion: @escaping ([Int32: Data]) -> Void, error: @escaping (MultipartFetchDownloadError) -> Void) -> Disposable {
precondition(offset % (1 * 1024 * 1024) == 0)
let clusterContext: ClusterContext
if let current = self.clusterContexts[offset] {
clusterContext = current
} else {
let disposable = MetaDisposable()
clusterContext = ClusterContext(disposable: disposable)
self.clusterContexts[offset] = clusterContext
disposable.set((self.masterDownload.request(Api.functions.upload.getCdnFileHashes(fileToken: Buffer(data: self.fileToken), offset: offset), tag: nil, continueInBackground: self.continueInBackground)
|> map { partHashes -> [Int32: Data] in
var parsedPartHashes: [Int32: Data] = [:]
for part in partHashes {
switch part {
case let .fileHash(offset, limit, bytes):
assert(limit == 128 * 1024)
parsedPartHashes[offset] = bytes.makeData()
}
}
return parsedPartHashes
}
|> deliverOn(self.queue)).start(next: { [weak self, weak clusterContext] result in
guard let _ = self, let clusterContext = clusterContext else {
return
}
clusterContext.result = result
for subscriber in clusterContext.subscribers.copyItems() {
subscriber.completion(result)
}
}, error: { [weak self, weak clusterContext] _ in
guard let _ = self, let clusterContext = clusterContext else {
return
}
clusterContext.error = .generic
for subscriber in clusterContext.subscribers.copyItems() {
subscriber.error(.generic)
}
}))
}
if let result = clusterContext.result {
completion(result)
return EmptyDisposable
} else if let errorValue = clusterContext.error {
error(errorValue)
return EmptyDisposable
} else {
let index = clusterContext.subscribers.add(ClusterContext.Subscriber(completion: completion, error: error))
let queue = self.queue
return ActionDisposable { [weak self, weak clusterContext] in
queue.async {
guard let strongSelf = self, let clusterContext = clusterContext else {
return
}
clusterContext.subscribers.remove(index)
if clusterContext.subscribers.isEmpty {
if strongSelf.clusterContexts[offset] === clusterContext {
strongSelf.clusterContexts.removeValue(forKey: offset)
}
}
}
}
}
}
private func cluster(offset: Int32) -> Signal<[Int32: Data], MultipartFetchDownloadError> {
let queue = self.queue
return Signal { [weak self] subscriber in
let disposable = MetaDisposable()
queue.async {
guard let strongSelf = self else {
subscriber.putError(.generic)
return
}
disposable.set(strongSelf.getCluster(offset: offset, completion: { result in
subscriber.putNext(result)
subscriber.putCompletion()
}, error: { error in
subscriber.putError(error)
}))
}
return disposable
}
}
func get(offset: Int32, limit: Int32) -> Signal<[Int32: Data], MultipartFetchDownloadError> {
precondition(offset % dataHashLength == 0)
precondition((offset + limit) % dataHashLength == 0)
var clusterOffsets = Set<Int32>()
for partOffset in stride(from: offset, to: offset + limit, by: Int(dataHashLength)) {
clusterOffsets.insert(partOffset - (partOffset % (1 * 1024 * 1024)))
}
return combineLatest(clusterOffsets.map { clusterOffset in
return self.cluster(offset: clusterOffset)
})
|> mapToSignal { clusterResults -> Signal<[Int32: Data], MultipartFetchDownloadError> in
var result: [Int32: Data] = [:]
for partOffset in stride(from: offset, to: offset + limit, by: Int(dataHashLength)) {
var found = false
for cluster in clusterResults {
if let data = cluster[partOffset] {
result[partOffset] = data
found = true
}
}
if !found {
return .fail(.generic)
}
}
return .single(result)
}
}
}