mirror of
https://github.com/Swiftgram/Telegram-iOS.git
synced 2025-06-16 05:55:20 +00:00
Make FetchV2 the default
This commit is contained in:
parent
518270cf18
commit
ad9a903f21
@ -1485,7 +1485,7 @@ private func debugControllerEntries(sharedContext: SharedAccountContext, present
|
|||||||
if isMainApp {
|
if isMainApp {
|
||||||
entries.append(.disableVideoAspectScaling(experimentalSettings.disableVideoAspectScaling))
|
entries.append(.disableVideoAspectScaling(experimentalSettings.disableVideoAspectScaling))
|
||||||
entries.append(.enableNetworkFramework(networkSettings?.useNetworkFramework ?? useBetaFeatures))
|
entries.append(.enableNetworkFramework(networkSettings?.useNetworkFramework ?? useBetaFeatures))
|
||||||
entries.append(.enableNetworkExperiments(networkSettings?.useExperimentalDownload ?? false))
|
entries.append(.enableNetworkExperiments(networkSettings?.useExperimentalDownload ?? true))
|
||||||
}
|
}
|
||||||
|
|
||||||
if let backupHostOverride = networkSettings?.backupHostOverride {
|
if let backupHostOverride = networkSettings?.backupHostOverride {
|
||||||
|
@ -73,6 +73,44 @@ private final class FetchImpl {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private final class PendingReadyPart {
|
||||||
|
let partRange: Range<Int64>
|
||||||
|
let fetchRange: Range<Int64>
|
||||||
|
let fetchedData: Data
|
||||||
|
let decryptedData: Data
|
||||||
|
|
||||||
|
init(
|
||||||
|
partRange: Range<Int64>,
|
||||||
|
fetchRange: Range<Int64>,
|
||||||
|
fetchedData: Data,
|
||||||
|
decryptedData: Data
|
||||||
|
) {
|
||||||
|
self.partRange = partRange
|
||||||
|
self.fetchRange = fetchRange
|
||||||
|
self.fetchedData = fetchedData
|
||||||
|
self.decryptedData = decryptedData
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private final class PendingHashRange {
|
||||||
|
let range: Range<Int64>
|
||||||
|
var disposable: Disposable?
|
||||||
|
|
||||||
|
init(range: Range<Int64>) {
|
||||||
|
self.range = range
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private final class HashRangeData {
|
||||||
|
let range: Range<Int64>
|
||||||
|
let data: Data
|
||||||
|
|
||||||
|
init(range: Range<Int64>, data: Data) {
|
||||||
|
self.range = range
|
||||||
|
self.data = data
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private final class CdnData {
|
private final class CdnData {
|
||||||
let id: Int
|
let id: Int
|
||||||
let sourceDatacenterId: Int
|
let sourceDatacenterId: Int
|
||||||
@ -95,6 +133,16 @@ private final class FetchImpl {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private final class VerifyPartHashData {
|
||||||
|
let fetchRange: Range<Int64>
|
||||||
|
let fetchedData: Data
|
||||||
|
|
||||||
|
init(fetchRange: Range<Int64>, fetchedData: Data) {
|
||||||
|
self.fetchRange = fetchRange
|
||||||
|
self.fetchedData = fetchedData
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private enum FetchLocation {
|
private enum FetchLocation {
|
||||||
case datacenter(Int)
|
case datacenter(Int)
|
||||||
case cdn(CdnData)
|
case cdn(CdnData)
|
||||||
@ -111,6 +159,12 @@ private final class FetchImpl {
|
|||||||
|
|
||||||
var pendingParts: [PendingPart] = []
|
var pendingParts: [PendingPart] = []
|
||||||
var completedRanges = RangeSet<Int64>()
|
var completedRanges = RangeSet<Int64>()
|
||||||
|
|
||||||
|
var pendingReadyParts: [PendingReadyPart] = []
|
||||||
|
var completedHashRanges = RangeSet<Int64>()
|
||||||
|
var pendingHashRanges: [PendingHashRange] = []
|
||||||
|
var hashRanges: [Int64: HashRangeData] = [:]
|
||||||
|
|
||||||
var nextRangePriorityIndex: Int = 0
|
var nextRangePriorityIndex: Int = 0
|
||||||
|
|
||||||
init(
|
init(
|
||||||
@ -132,8 +186,11 @@ private final class FetchImpl {
|
|||||||
}
|
}
|
||||||
|
|
||||||
deinit {
|
deinit {
|
||||||
for peindingPart in self.pendingParts {
|
for pendingPart in self.pendingParts {
|
||||||
peindingPart.disposable?.dispose()
|
pendingPart.disposable?.dispose()
|
||||||
|
}
|
||||||
|
for pendingHashRange in self.pendingHashRanges {
|
||||||
|
pendingHashRange.disposable?.dispose()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -216,6 +273,7 @@ private final class FetchImpl {
|
|||||||
private var requiredRanges: [RequiredRange] = []
|
private var requiredRanges: [RequiredRange] = []
|
||||||
|
|
||||||
private let defaultPartSize: Int64
|
private let defaultPartSize: Int64
|
||||||
|
private let cdnPartSize: Int64
|
||||||
private var state: State?
|
private var state: State?
|
||||||
|
|
||||||
private let loggingIdentifier: String
|
private let loggingIdentifier: String
|
||||||
@ -281,6 +339,7 @@ private final class FetchImpl {
|
|||||||
} else {
|
} else {
|
||||||
self.defaultPartSize = 128 * 1024
|
self.defaultPartSize = 128 * 1024
|
||||||
}
|
}
|
||||||
|
self.cdnPartSize = 128 * 1024
|
||||||
|
|
||||||
if let resource = resource as? TelegramCloudMediaResource {
|
if let resource = resource as? TelegramCloudMediaResource {
|
||||||
if let apiInputLocation = resource.apiInputLocation(fileReference: Data()) {
|
if let apiInputLocation = resource.apiInputLocation(fileReference: Data()) {
|
||||||
@ -335,6 +394,82 @@ private final class FetchImpl {
|
|||||||
self.onNext(.resourceSizeUpdated(knownSize))
|
self.onNext(.resourceSizeUpdated(knownSize))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
do {
|
||||||
|
var removedPendingReadyPartIndices: [Int] = []
|
||||||
|
for i in 0 ..< state.pendingReadyParts.count {
|
||||||
|
let pendingReadyPart = state.pendingReadyParts[i]
|
||||||
|
if state.completedHashRanges.isSuperset(of: RangeSet<Int64>(pendingReadyPart.fetchRange)) {
|
||||||
|
removedPendingReadyPartIndices.append(i)
|
||||||
|
|
||||||
|
var checkOffset: Int64 = 0
|
||||||
|
var checkFailed = false
|
||||||
|
while checkOffset < pendingReadyPart.fetchedData.count {
|
||||||
|
if let hashRange = state.hashRanges[pendingReadyPart.fetchRange.lowerBound + checkOffset] {
|
||||||
|
var clippedHashRange = hashRange.range
|
||||||
|
|
||||||
|
if pendingReadyPart.fetchRange.lowerBound + Int64(pendingReadyPart.fetchedData.count) < clippedHashRange.lowerBound {
|
||||||
|
Logger.shared.log("FetchV2", "\(self.loggingIdentifier): unable to check \(pendingReadyPart.fetchRange): data range \(clippedHashRange) out of bounds (0 ..< \(pendingReadyPart.fetchedData.count))")
|
||||||
|
checkFailed = true
|
||||||
|
break
|
||||||
|
}
|
||||||
|
clippedHashRange = clippedHashRange.lowerBound ..< min(clippedHashRange.upperBound, pendingReadyPart.fetchRange.lowerBound + Int64(pendingReadyPart.fetchedData.count))
|
||||||
|
|
||||||
|
let partLocalHashRange = (clippedHashRange.lowerBound - pendingReadyPart.fetchRange.lowerBound) ..< (clippedHashRange.upperBound - pendingReadyPart.fetchRange.lowerBound)
|
||||||
|
|
||||||
|
if partLocalHashRange.lowerBound < 0 || partLocalHashRange.upperBound > pendingReadyPart.fetchedData.count {
|
||||||
|
Logger.shared.log("FetchV2", "\(self.loggingIdentifier): unable to check \(pendingReadyPart.fetchRange): data range \(partLocalHashRange) out of bounds (0 ..< \(pendingReadyPart.fetchedData.count))")
|
||||||
|
checkFailed = true
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
let dataToHash = pendingReadyPart.decryptedData.subdata(in: Int(partLocalHashRange.lowerBound) ..< Int(partLocalHashRange.upperBound))
|
||||||
|
let localHash = MTSha256(dataToHash)
|
||||||
|
if localHash != hashRange.data {
|
||||||
|
Logger.shared.log("FetchV2", "\(self.loggingIdentifier): failed to verify \(pendingReadyPart.fetchRange): hash mismatch")
|
||||||
|
checkFailed = true
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
checkOffset += partLocalHashRange.upperBound - partLocalHashRange.lowerBound
|
||||||
|
} else {
|
||||||
|
Logger.shared.log("FetchV2", "\(self.loggingIdentifier): unable to find \(pendingReadyPart.fetchRange) hash range despite it being marked as ready")
|
||||||
|
checkFailed = true
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if !checkFailed {
|
||||||
|
self.commitPendingReadyPart(state: state, partRange: pendingReadyPart.partRange, fetchRange: pendingReadyPart.fetchRange, data: pendingReadyPart.decryptedData)
|
||||||
|
} else {
|
||||||
|
Logger.shared.log("FetchV2", "\(self.loggingIdentifier): unable to find \(pendingReadyPart.fetchRange) hash check failed")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for index in removedPendingReadyPartIndices.reversed() {
|
||||||
|
state.pendingReadyParts.remove(at: index)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
var requiredHashRanges = RangeSet<Int64>()
|
||||||
|
for pendingReadyPart in state.pendingReadyParts {
|
||||||
|
//TODO:check if already have hashes
|
||||||
|
requiredHashRanges.formUnion(RangeSet<Int64>(pendingReadyPart.fetchRange))
|
||||||
|
}
|
||||||
|
requiredHashRanges.subtract(state.completedHashRanges)
|
||||||
|
for pendingHashRange in state.pendingHashRanges {
|
||||||
|
requiredHashRanges.subtract(RangeSet<Int64>(pendingHashRange.range))
|
||||||
|
}
|
||||||
|
|
||||||
|
let expectedHashRangeLength: Int64 = 1 * 1024 * 1024
|
||||||
|
while state.pendingHashRanges.count < state.maxPendingParts {
|
||||||
|
guard let requiredHashRange = requiredHashRanges.ranges.first else {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
let hashRange: Range<Int64> = requiredHashRange.lowerBound ..< (requiredHashRange.lowerBound + expectedHashRangeLength)
|
||||||
|
requiredHashRanges.subtract(RangeSet<Int64>(hashRange))
|
||||||
|
|
||||||
|
state.pendingHashRanges.append(FetchImpl.PendingHashRange(range: hashRange))
|
||||||
|
}
|
||||||
|
|
||||||
var filteredRequiredRanges: [RangeSet<Int64>] = []
|
var filteredRequiredRanges: [RangeSet<Int64>] = []
|
||||||
for _ in 0 ..< 3 {
|
for _ in 0 ..< 3 {
|
||||||
filteredRequiredRanges.append(RangeSet<Int64>())
|
filteredRequiredRanges.append(RangeSet<Int64>())
|
||||||
@ -355,28 +490,14 @@ private final class FetchImpl {
|
|||||||
for pendingPart in state.pendingParts {
|
for pendingPart in state.pendingParts {
|
||||||
filteredRequiredRanges[i].remove(contentsOf: pendingPart.partRange)
|
filteredRequiredRanges[i].remove(contentsOf: pendingPart.partRange)
|
||||||
}
|
}
|
||||||
|
for pendingReadyPart in state.pendingReadyParts {
|
||||||
|
filteredRequiredRanges[i].remove(contentsOf: pendingReadyPart.partRange)
|
||||||
|
}
|
||||||
|
|
||||||
excludedInHigherPriorities.subtract(filteredRequiredRanges[i])
|
excludedInHigherPriorities.subtract(filteredRequiredRanges[i])
|
||||||
}
|
}
|
||||||
|
|
||||||
/*for _ in 0 ..< 1000000 {
|
if state.pendingParts.count < state.maxPendingParts && state.pendingReadyParts.count < state.maxPendingParts {
|
||||||
let i = Int64.random(in: 0 ..< 1024 * 1024 + 500 * 1024)
|
|
||||||
let j = Int64.random(in: 1 ... state.partSize)
|
|
||||||
|
|
||||||
let firstRange: Range<Int64> = Int64(i) ..< (Int64(i) + j)
|
|
||||||
|
|
||||||
let partRange = firstRange.lowerBound ..< min(firstRange.upperBound, firstRange.lowerBound + state.partSize)
|
|
||||||
|
|
||||||
let _ = alignPartFetchRange(
|
|
||||||
partRange: partRange,
|
|
||||||
minPartSize: state.minPartSize,
|
|
||||||
maxPartSize: state.maxPartSize,
|
|
||||||
alignment: state.partAlignment,
|
|
||||||
boundaryLimit: state.partDivision
|
|
||||||
)
|
|
||||||
}*/
|
|
||||||
|
|
||||||
if state.pendingParts.count < state.maxPendingParts {
|
|
||||||
var debugRangesString = ""
|
var debugRangesString = ""
|
||||||
for priorityIndex in 0 ..< 3 {
|
for priorityIndex in 0 ..< 3 {
|
||||||
if filteredRequiredRanges[priorityIndex].isEmpty {
|
if filteredRequiredRanges[priorityIndex].isEmpty {
|
||||||
@ -404,7 +525,7 @@ private final class FetchImpl {
|
|||||||
Logger.shared.log("FetchV2", "\(self.loggingIdentifier): will fetch \(debugRangesString)")
|
Logger.shared.log("FetchV2", "\(self.loggingIdentifier): will fetch \(debugRangesString)")
|
||||||
}
|
}
|
||||||
|
|
||||||
while state.pendingParts.count < state.maxPendingParts {
|
while state.pendingParts.count < state.maxPendingParts && state.pendingReadyParts.count < state.maxPendingParts {
|
||||||
var found = false
|
var found = false
|
||||||
inner: for i in 0 ..< filteredRequiredRanges.count {
|
inner: for i in 0 ..< filteredRequiredRanges.count {
|
||||||
let priorityIndex = (state.nextRangePriorityIndex + i) % filteredRequiredRanges.count
|
let priorityIndex = (state.nextRangePriorityIndex + i) % filteredRequiredRanges.count
|
||||||
@ -423,14 +544,24 @@ private final class FetchImpl {
|
|||||||
boundaryLimit: state.partDivision
|
boundaryLimit: state.partDivision
|
||||||
)
|
)
|
||||||
|
|
||||||
Logger.shared.log("FetchV2", "\(self.loggingIdentifier): take part \(partRange) (aligned as \(alignedRange))")
|
var storePartRange = partRange
|
||||||
|
do {
|
||||||
|
storePartRange = alignedRange
|
||||||
|
Logger.shared.log("FetchV2", "\(self.loggingIdentifier): take part \(partRange) (store aligned as \(storePartRange)")
|
||||||
|
}
|
||||||
|
/*if case .cdn = state.fetchLocation {
|
||||||
|
storePartRange = alignedRange
|
||||||
|
Logger.shared.log("FetchV2", "\(self.loggingIdentifier): take part \(partRange) (store aligned as \(storePartRange)")
|
||||||
|
} else {
|
||||||
|
Logger.shared.log("FetchV2", "\(self.loggingIdentifier): take part \(partRange) (aligned as \(alignedRange))")
|
||||||
|
}*/
|
||||||
|
|
||||||
let pendingPart = PendingPart(
|
let pendingPart = PendingPart(
|
||||||
partRange: partRange,
|
partRange: storePartRange,
|
||||||
fetchRange: alignedRange
|
fetchRange: alignedRange
|
||||||
)
|
)
|
||||||
state.pendingParts.append(pendingPart)
|
state.pendingParts.append(pendingPart)
|
||||||
filteredRequiredRanges[priorityIndex].remove(contentsOf: partRange)
|
filteredRequiredRanges[priorityIndex].remove(contentsOf: storePartRange)
|
||||||
|
|
||||||
found = true
|
found = true
|
||||||
break inner
|
break inner
|
||||||
@ -446,6 +577,11 @@ private final class FetchImpl {
|
|||||||
self.fetchPart(state: state, part: pendingPart)
|
self.fetchPart(state: state, part: pendingPart)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
for pendingHashRange in state.pendingHashRanges {
|
||||||
|
if pendingHashRange.disposable == nil {
|
||||||
|
self.fetchHashRange(state: state, hashRange: pendingHashRange)
|
||||||
|
}
|
||||||
|
}
|
||||||
case let .reuploadingToCdn(state):
|
case let .reuploadingToCdn(state):
|
||||||
if state.disposable == nil {
|
if state.disposable == nil {
|
||||||
Logger.shared.log("FetchV2", "\(self.loggingIdentifier): refreshing CDN")
|
Logger.shared.log("FetchV2", "\(self.loggingIdentifier): refreshing CDN")
|
||||||
@ -472,10 +608,10 @@ private final class FetchImpl {
|
|||||||
}
|
}
|
||||||
self.state = .fetching(FetchImpl.FetchingState(
|
self.state = .fetching(FetchImpl.FetchingState(
|
||||||
fetchLocation: .cdn(cdnData),
|
fetchLocation: .cdn(cdnData),
|
||||||
partSize: self.defaultPartSize,
|
partSize: self.cdnPartSize,
|
||||||
minPartSize: 4 * 1024,
|
minPartSize: self.cdnPartSize,
|
||||||
maxPartSize: self.defaultPartSize,
|
maxPartSize: self.cdnPartSize * 2,
|
||||||
partAlignment: 4 * 1024,
|
partAlignment: self.cdnPartSize,
|
||||||
partDivision: 1 * 1024 * 1024,
|
partDivision: 1 * 1024 * 1024,
|
||||||
maxPendingParts: 6
|
maxPendingParts: 6
|
||||||
))
|
))
|
||||||
@ -549,7 +685,7 @@ private final class FetchImpl {
|
|||||||
}
|
}
|
||||||
|
|
||||||
enum FilePartResult {
|
enum FilePartResult {
|
||||||
case data(Data)
|
case data(data: Data, verifyPartHashData: VerifyPartHashData?)
|
||||||
case cdnRedirect(CdnData)
|
case cdnRedirect(CdnData)
|
||||||
case cdnRefresh(cdnData: CdnData, refreshToken: Data)
|
case cdnRefresh(cdnData: CdnData, refreshToken: Data)
|
||||||
case fileReferenceExpired
|
case fileReferenceExpired
|
||||||
@ -564,6 +700,7 @@ private final class FetchImpl {
|
|||||||
switch state.fetchLocation {
|
switch state.fetchLocation {
|
||||||
case let .cdn(cdnData):
|
case let .cdn(cdnData):
|
||||||
let requestedOffset = part.fetchRange.lowerBound
|
let requestedOffset = part.fetchRange.lowerBound
|
||||||
|
|
||||||
filePartRequest = self.network.multiplexedRequestManager.request(
|
filePartRequest = self.network.multiplexedRequestManager.request(
|
||||||
to: .cdn(cdnData.id),
|
to: .cdn(cdnData.id),
|
||||||
consumerId: self.consumerId,
|
consumerId: self.consumerId,
|
||||||
@ -581,7 +718,7 @@ private final class FetchImpl {
|
|||||||
switch result {
|
switch result {
|
||||||
case let .cdnFile(bytes):
|
case let .cdnFile(bytes):
|
||||||
if bytes.size == 0 {
|
if bytes.size == 0 {
|
||||||
return .data(Data())
|
return .data(data: Data(), verifyPartHashData: nil)
|
||||||
} else {
|
} else {
|
||||||
var partIv = cdnData.encryptionIv
|
var partIv = cdnData.encryptionIv
|
||||||
let partIvCount = partIv.count
|
let partIvCount = partIv.count
|
||||||
@ -590,8 +727,12 @@ private final class FetchImpl {
|
|||||||
var ivOffset: Int32 = Int32(clamping: (requestedOffset / 16)).bigEndian
|
var ivOffset: Int32 = Int32(clamping: (requestedOffset / 16)).bigEndian
|
||||||
memcpy(bytes.advanced(by: partIvCount - 4), &ivOffset, 4)
|
memcpy(bytes.advanced(by: partIvCount - 4), &ivOffset, 4)
|
||||||
}
|
}
|
||||||
//TODO:check hashes
|
|
||||||
return .data(MTAesCtrDecrypt(bytes.makeData(), cdnData.encryptionKey, partIv)!)
|
let fetchedData = bytes.makeData()
|
||||||
|
return .data(
|
||||||
|
data: MTAesCtrDecrypt(fetchedData, cdnData.encryptionKey, partIv)!,
|
||||||
|
verifyPartHashData: VerifyPartHashData(fetchRange: fetchRange, fetchedData: fetchedData)
|
||||||
|
)
|
||||||
}
|
}
|
||||||
case let .cdnFileReuploadNeeded(requestToken):
|
case let .cdnFileReuploadNeeded(requestToken):
|
||||||
return .cdnRefresh(cdnData: cdnData, refreshToken: requestToken.makeData())
|
return .cdnRefresh(cdnData: cdnData, refreshToken: requestToken.makeData())
|
||||||
@ -609,6 +750,7 @@ private final class FetchImpl {
|
|||||||
fileReference = info.reference.apiFileReference
|
fileReference = info.reference.apiFileReference
|
||||||
}
|
}
|
||||||
if let inputLocation = cloudResource.apiInputLocation(fileReference: fileReference) {
|
if let inputLocation = cloudResource.apiInputLocation(fileReference: fileReference) {
|
||||||
|
let queue = self.queue
|
||||||
filePartRequest = self.network.multiplexedRequestManager.request(
|
filePartRequest = self.network.multiplexedRequestManager.request(
|
||||||
to: .main(sourceDatacenterId),
|
to: .main(sourceDatacenterId),
|
||||||
consumerId: self.consumerId,
|
consumerId: self.consumerId,
|
||||||
@ -620,12 +762,19 @@ private final class FetchImpl {
|
|||||||
limit: Int32(requestedLength)),
|
limit: Int32(requestedLength)),
|
||||||
tag: self.parameters?.tag,
|
tag: self.parameters?.tag,
|
||||||
continueInBackground: self.continueInBackground,
|
continueInBackground: self.continueInBackground,
|
||||||
expectedResponseSize: Int32(requestedLength)
|
onFloodWaitError: { [weak self] error in
|
||||||
|
queue.async {
|
||||||
|
guard let self else {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
self.processFloodWaitError(error: error)
|
||||||
|
}
|
||||||
|
}, expectedResponseSize: Int32(requestedLength)
|
||||||
)
|
)
|
||||||
|> map { result -> FilePartResult in
|
|> map { result -> FilePartResult in
|
||||||
switch result {
|
switch result {
|
||||||
case let .file(_, _, bytes):
|
case let .file(_, _, bytes):
|
||||||
return .data(bytes.makeData())
|
return .data(data: bytes.makeData(), verifyPartHashData: nil)
|
||||||
case let .fileCdnRedirect(dcId, fileToken, encryptionKey, encryptionIv, fileHashes):
|
case let .fileCdnRedirect(dcId, fileToken, encryptionKey, encryptionIv, fileHashes):
|
||||||
let _ = fileHashes
|
let _ = fileHashes
|
||||||
return .cdnRedirect(CdnData(
|
return .cdnRedirect(CdnData(
|
||||||
@ -648,73 +797,45 @@ private final class FetchImpl {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if let filePartRequest = filePartRequest {
|
if let filePartRequest {
|
||||||
part.disposable = (filePartRequest
|
part.disposable = (filePartRequest
|
||||||
|> deliverOn(self.queue)).start(next: { [weak self, weak state, weak part] result in
|
|> deliverOn(self.queue)).start(next: { [weak self, weak state, weak part] result in
|
||||||
guard let `self` = self, let state = state, case let .fetching(fetchingState) = self.state, fetchingState === state else {
|
guard let self, let state, case let .fetching(fetchingState) = self.state, fetchingState === state else {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if let part = part {
|
if let part {
|
||||||
if let index = state.pendingParts.firstIndex(where: { $0 === part }) {
|
if let index = state.pendingParts.firstIndex(where: { $0 === part }) {
|
||||||
state.pendingParts.remove(at: index)
|
state.pendingParts.remove(at: index)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
switch result {
|
switch result {
|
||||||
case let .data(data):
|
case let .data(data, verifyPartHashData):
|
||||||
let actualLength = Int64(data.count)
|
if let verifyPartHashData {
|
||||||
|
Logger.shared.log("FetchV2", "\(self.loggingIdentifier): stashing data part \(partRange) (aligned as \(fetchRange)) for hash verification")
|
||||||
if actualLength < requestedLength {
|
|
||||||
let resultingSize = fetchRange.lowerBound + actualLength
|
|
||||||
if let currentKnownSize = self.knownSize {
|
|
||||||
Logger.shared.log("FetchV2", "\(self.loggingIdentifier): setting known size to min(\(currentKnownSize), \(resultingSize)) = \(min(currentKnownSize, resultingSize))")
|
|
||||||
self.knownSize = min(currentKnownSize, resultingSize)
|
|
||||||
} else {
|
|
||||||
Logger.shared.log("FetchV2", "\(self.loggingIdentifier): setting known size to \(resultingSize)")
|
|
||||||
self.knownSize = resultingSize
|
|
||||||
}
|
|
||||||
Logger.shared.log("FetchV2", "\(self.loggingIdentifier): reporting resource size \(resultingSize)")
|
|
||||||
self.onNext(.resourceSizeUpdated(resultingSize))
|
|
||||||
}
|
|
||||||
|
|
||||||
state.completedRanges.formUnion(RangeSet<Int64>(partRange))
|
|
||||||
|
|
||||||
var actualData = data
|
|
||||||
if partRange != fetchRange {
|
|
||||||
precondition(partRange.lowerBound >= fetchRange.lowerBound)
|
|
||||||
precondition(partRange.upperBound <= fetchRange.upperBound)
|
|
||||||
let innerOffset = partRange.lowerBound - fetchRange.lowerBound
|
|
||||||
var innerLength = partRange.upperBound - partRange.lowerBound
|
|
||||||
innerLength = min(innerLength, Int64(actualData.count - Int(innerOffset)))
|
|
||||||
if innerLength > 0 {
|
|
||||||
actualData = actualData.subdata(in: Int(innerOffset) ..< Int(innerOffset + innerLength))
|
|
||||||
} else {
|
|
||||||
actualData = Data()
|
|
||||||
}
|
|
||||||
|
|
||||||
Logger.shared.log("FetchV2", "\(self.loggingIdentifier): extracting aligned part \(partRange) (\(fetchRange)): \(actualData.count)")
|
state.pendingReadyParts.append(FetchImpl.PendingReadyPart(
|
||||||
}
|
partRange: partRange,
|
||||||
|
fetchRange: fetchRange,
|
||||||
if !actualData.isEmpty {
|
fetchedData: verifyPartHashData.fetchedData,
|
||||||
Logger.shared.log("FetchV2", "\(self.loggingIdentifier): emitting data part \(partRange) (aligned as \(fetchRange)): \(actualData.count)")
|
decryptedData: data
|
||||||
|
|
||||||
self.onNext(.dataPart(
|
|
||||||
resourceOffset: partRange.lowerBound,
|
|
||||||
data: actualData,
|
|
||||||
range: 0 ..< Int64(actualData.count),
|
|
||||||
complete: false
|
|
||||||
))
|
))
|
||||||
} else {
|
} else {
|
||||||
Logger.shared.log("FetchV2", "\(self.loggingIdentifier): not emitting data part \(partRange) (aligned as \(fetchRange))")
|
self.commitPendingReadyPart(
|
||||||
|
state: state,
|
||||||
|
partRange: partRange,
|
||||||
|
fetchRange: fetchRange,
|
||||||
|
data: data
|
||||||
|
)
|
||||||
}
|
}
|
||||||
case let .cdnRedirect(cdnData):
|
case let .cdnRedirect(cdnData):
|
||||||
self.state = .fetching(FetchImpl.FetchingState(
|
self.state = .fetching(FetchImpl.FetchingState(
|
||||||
fetchLocation: .cdn(cdnData),
|
fetchLocation: .cdn(cdnData),
|
||||||
partSize: self.defaultPartSize,
|
partSize: self.cdnPartSize,
|
||||||
minPartSize: 4 * 1024,
|
minPartSize: self.cdnPartSize,
|
||||||
maxPartSize: self.defaultPartSize,
|
maxPartSize: self.cdnPartSize * 2,
|
||||||
partAlignment: 4 * 1024,
|
partAlignment: self.cdnPartSize,
|
||||||
partDivision: 1 * 1024 * 1024,
|
partDivision: 1 * 1024 * 1024,
|
||||||
maxPendingParts: 6
|
maxPendingParts: 6
|
||||||
))
|
))
|
||||||
@ -735,6 +856,129 @@ private final class FetchImpl {
|
|||||||
//assertionFailure()
|
//assertionFailure()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private func fetchHashRange(state: FetchingState, hashRange: PendingHashRange) {
|
||||||
|
let fetchRequest: Signal<[Api.FileHash]?, NoError>
|
||||||
|
|
||||||
|
switch state.fetchLocation {
|
||||||
|
case let .cdn(cdnData):
|
||||||
|
Logger.shared.log("FetchV2", "\(self.loggingIdentifier): will fetch hashes for \(hashRange.range)")
|
||||||
|
|
||||||
|
fetchRequest = self.network.multiplexedRequestManager.request(
|
||||||
|
to: .main(cdnData.sourceDatacenterId),
|
||||||
|
consumerId: self.consumerId,
|
||||||
|
resourceId: self.resource.id.stringRepresentation,
|
||||||
|
data: Api.functions.upload.getCdnFileHashes(fileToken: Buffer(data: cdnData.fileToken), offset: hashRange.range.lowerBound),
|
||||||
|
tag: self.parameters?.tag,
|
||||||
|
continueInBackground: self.continueInBackground,
|
||||||
|
expectedResponseSize: nil
|
||||||
|
)
|
||||||
|
|> map(Optional.init)
|
||||||
|
|> `catch` { _ -> Signal<[Api.FileHash]?, NoError> in
|
||||||
|
return .single(nil)
|
||||||
|
}
|
||||||
|
case .datacenter:
|
||||||
|
fetchRequest = .single(nil)
|
||||||
|
}
|
||||||
|
|
||||||
|
let queue = self.queue
|
||||||
|
hashRange.disposable = (fetchRequest
|
||||||
|
|> deliverOn(self.queue)).start(next: { [weak self, weak state, weak hashRange] result in
|
||||||
|
queue.async {
|
||||||
|
guard let self, let state, case let .fetching(fetchingState) = self.state, fetchingState === state else {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if let result {
|
||||||
|
if let hashRange {
|
||||||
|
if let index = state.pendingHashRanges.firstIndex(where: { $0 === hashRange }) {
|
||||||
|
state.pendingHashRanges.remove(at: index)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
var filledRange = RangeSet<Int64>()
|
||||||
|
for hashItem in result {
|
||||||
|
switch hashItem {
|
||||||
|
case let .fileHash(offset, limit, hash):
|
||||||
|
let rangeValue: Range<Int64> = offset ..< (offset + Int64(limit))
|
||||||
|
filledRange.formUnion(RangeSet<Int64>(rangeValue))
|
||||||
|
state.hashRanges[rangeValue.lowerBound] = HashRangeData(
|
||||||
|
range: rangeValue,
|
||||||
|
data: hash.makeData()
|
||||||
|
)
|
||||||
|
state.completedHashRanges.formUnion(RangeSet<Int64>(rangeValue))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Logger.shared.log("FetchV2", "\(self.loggingIdentifier): received hashes for \(filledRange)")
|
||||||
|
}
|
||||||
|
|
||||||
|
self.update()
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
private func commitPendingReadyPart(state: FetchingState, partRange: Range<Int64>, fetchRange: Range<Int64>, data: Data) {
|
||||||
|
let requestedLength = fetchRange.upperBound - fetchRange.lowerBound
|
||||||
|
let actualLength = Int64(data.count)
|
||||||
|
|
||||||
|
if actualLength < requestedLength {
|
||||||
|
let resultingSize = fetchRange.lowerBound + actualLength
|
||||||
|
if let currentKnownSize = self.knownSize {
|
||||||
|
Logger.shared.log("FetchV2", "\(self.loggingIdentifier): setting known size to min(\(currentKnownSize), \(resultingSize)) = \(min(currentKnownSize, resultingSize))")
|
||||||
|
self.knownSize = min(currentKnownSize, resultingSize)
|
||||||
|
} else {
|
||||||
|
Logger.shared.log("FetchV2", "\(self.loggingIdentifier): setting known size to \(resultingSize)")
|
||||||
|
self.knownSize = resultingSize
|
||||||
|
}
|
||||||
|
Logger.shared.log("FetchV2", "\(self.loggingIdentifier): reporting resource size \(resultingSize)")
|
||||||
|
self.onNext(.resourceSizeUpdated(resultingSize))
|
||||||
|
}
|
||||||
|
|
||||||
|
state.completedRanges.formUnion(RangeSet<Int64>(partRange))
|
||||||
|
|
||||||
|
var actualData = data
|
||||||
|
if partRange != fetchRange {
|
||||||
|
precondition(partRange.lowerBound >= fetchRange.lowerBound)
|
||||||
|
precondition(partRange.upperBound <= fetchRange.upperBound)
|
||||||
|
let innerOffset = partRange.lowerBound - fetchRange.lowerBound
|
||||||
|
var innerLength = partRange.upperBound - partRange.lowerBound
|
||||||
|
innerLength = min(innerLength, Int64(actualData.count - Int(innerOffset)))
|
||||||
|
if innerLength > 0 {
|
||||||
|
actualData = actualData.subdata(in: Int(innerOffset) ..< Int(innerOffset + innerLength))
|
||||||
|
} else {
|
||||||
|
actualData = Data()
|
||||||
|
}
|
||||||
|
|
||||||
|
Logger.shared.log("FetchV2", "\(self.loggingIdentifier): extracting aligned part \(partRange) (\(fetchRange)): \(actualData.count)")
|
||||||
|
}
|
||||||
|
|
||||||
|
if !actualData.isEmpty {
|
||||||
|
Logger.shared.log("FetchV2", "\(self.loggingIdentifier): emitting data part \(partRange) (aligned as \(fetchRange)): \(actualData.count)")
|
||||||
|
|
||||||
|
self.onNext(.dataPart(
|
||||||
|
resourceOffset: partRange.lowerBound,
|
||||||
|
data: actualData,
|
||||||
|
range: 0 ..< Int64(actualData.count),
|
||||||
|
complete: false
|
||||||
|
))
|
||||||
|
} else {
|
||||||
|
Logger.shared.log("FetchV2", "\(self.loggingIdentifier): not emitting data part \(partRange) (aligned as \(fetchRange))")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private func processFloodWaitError(error: String) {
|
||||||
|
var networkSpeedLimitSubject: NetworkSpeedLimitedEvent.DownloadSubject?
|
||||||
|
if let location = self.parameters?.location {
|
||||||
|
if let messageId = location.messageId {
|
||||||
|
networkSpeedLimitSubject = .message(messageId)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if let subject = networkSpeedLimitSubject {
|
||||||
|
if error.hasPrefix("FLOOD_PREMIUM_WAIT") {
|
||||||
|
self.network.addNetworkSpeedLimitedEvent(event: .download(subject))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static let sharedQueue = Queue(name: "FetchImpl")
|
private static let sharedQueue = Queue(name: "FetchImpl")
|
||||||
|
@ -630,7 +630,10 @@ func initializedNetwork(accountId: AccountRecordId, arguments: NetworkInitializa
|
|||||||
mtProto.delegate = connectionStatusDelegate
|
mtProto.delegate = connectionStatusDelegate
|
||||||
mtProto.add(requestService)
|
mtProto.add(requestService)
|
||||||
|
|
||||||
let useExperimentalFeatures = networkSettings?.useExperimentalDownload ?? false
|
var useExperimentalFeatures = networkSettings?.useExperimentalDownload ?? true
|
||||||
|
if let data = appConfiguration.data, let _ = data["ios_killswitch_disable_downloadv2"] {
|
||||||
|
useExperimentalFeatures = false
|
||||||
|
}
|
||||||
|
|
||||||
let network = Network(queue: queue, datacenterId: datacenterId, context: context, mtProto: mtProto, requestService: requestService, connectionStatusDelegate: connectionStatusDelegate, _connectionStatus: connectionStatus, basePath: basePath, appDataDisposable: appDataDisposable, encryptionProvider: arguments.encryptionProvider, useRequestTimeoutTimers: useRequestTimeoutTimers, useBetaFeatures: arguments.useBetaFeatures, useExperimentalFeatures: useExperimentalFeatures)
|
let network = Network(queue: queue, datacenterId: datacenterId, context: context, mtProto: mtProto, requestService: requestService, connectionStatusDelegate: connectionStatusDelegate, _connectionStatus: connectionStatus, basePath: basePath, appDataDisposable: appDataDisposable, encryptionProvider: arguments.encryptionProvider, useRequestTimeoutTimers: useRequestTimeoutTimers, useBetaFeatures: arguments.useBetaFeatures, useExperimentalFeatures: useExperimentalFeatures)
|
||||||
|
|
||||||
|
@ -26,7 +26,7 @@ public struct NetworkSettings: Codable {
|
|||||||
self.applicationUpdateUrlPrefix = try? container.decodeIfPresent(String.self, forKey: "applicationUpdateUrlPrefix")
|
self.applicationUpdateUrlPrefix = try? container.decodeIfPresent(String.self, forKey: "applicationUpdateUrlPrefix")
|
||||||
self.backupHostOverride = try? container.decodeIfPresent(String.self, forKey: "backupHostOverride")
|
self.backupHostOverride = try? container.decodeIfPresent(String.self, forKey: "backupHostOverride")
|
||||||
self.useNetworkFramework = try container.decodeIfPresent(Bool.self, forKey: "useNetworkFramework_v2")
|
self.useNetworkFramework = try container.decodeIfPresent(Bool.self, forKey: "useNetworkFramework_v2")
|
||||||
self.useExperimentalDownload = try container.decodeIfPresent(Bool.self, forKey: "useExperimentalDownload")
|
self.useExperimentalDownload = try container.decodeIfPresent(Bool.self, forKey: "useExperimentalDownload_v2")
|
||||||
}
|
}
|
||||||
|
|
||||||
public func encode(to encoder: Encoder) throws {
|
public func encode(to encoder: Encoder) throws {
|
||||||
@ -36,6 +36,6 @@ public struct NetworkSettings: Codable {
|
|||||||
try container.encodeIfPresent(self.applicationUpdateUrlPrefix, forKey: "applicationUpdateUrlPrefix")
|
try container.encodeIfPresent(self.applicationUpdateUrlPrefix, forKey: "applicationUpdateUrlPrefix")
|
||||||
try container.encodeIfPresent(self.backupHostOverride, forKey: "backupHostOverride")
|
try container.encodeIfPresent(self.backupHostOverride, forKey: "backupHostOverride")
|
||||||
try container.encodeIfPresent(self.useNetworkFramework, forKey: "useNetworkFramework_v2")
|
try container.encodeIfPresent(self.useNetworkFramework, forKey: "useNetworkFramework_v2")
|
||||||
try container.encodeIfPresent(self.useExperimentalDownload, forKey: "useExperimentalDownload")
|
try container.encodeIfPresent(self.useExperimentalDownload, forKey: "useExperimentalDownload_v2")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user