From 788b1b46b4c065eac712e86eba021db323dc2bb2 Mon Sep 17 00:00:00 2001 From: Ali <> Date: Fri, 17 Mar 2023 23:35:17 +0400 Subject: [PATCH] FetchV2 Improvements --- .../Sources/DebugController.swift | 20 ++- .../Sources/Network/FetchV2.swift | 150 ++++++++++++++---- .../Sources/Network/MultipartFetch.swift | 33 ++-- .../Sources/Network/Network.swift | 8 +- .../SyncCore/SyncCore_NetworkSettings.swift | 8 +- 5 files changed, 163 insertions(+), 56 deletions(-) diff --git a/submodules/DebugSettingsUI/Sources/DebugController.swift b/submodules/DebugSettingsUI/Sources/DebugController.swift index e84bdebc74..9aa30f9383 100644 --- a/submodules/DebugSettingsUI/Sources/DebugController.swift +++ b/submodules/DebugSettingsUI/Sources/DebugController.swift @@ -99,6 +99,7 @@ private enum DebugControllerEntry: ItemListNodeEntry { case preferredVideoCodec(Int, String, String?, Bool) case disableVideoAspectScaling(Bool) case enableNetworkFramework(Bool) + case enableNetworkExperiments(Bool) case restorePurchases(PresentationTheme) case logTranslationRecognition(Bool) case resetTranslationStates @@ -123,7 +124,7 @@ private enum DebugControllerEntry: ItemListNodeEntry { return DebugControllerSection.translation.rawValue case .preferredVideoCodec: return DebugControllerSection.videoExperiments.rawValue - case .disableVideoAspectScaling, .enableNetworkFramework: + case .disableVideoAspectScaling, .enableNetworkFramework, .enableNetworkExperiments: return DebugControllerSection.videoExperiments2.rawValue case .hostInfo, .versionInfo: return DebugControllerSection.info.rawValue @@ -226,10 +227,12 @@ private enum DebugControllerEntry: ItemListNodeEntry { return 100 case .enableNetworkFramework: return 101 - case .hostInfo: + case .enableNetworkExperiments: return 102 - case .versionInfo: + case .hostInfo: return 103 + case .versionInfo: + return 104 } } @@ -1283,6 +1286,16 @@ private enum DebugControllerEntry: ItemListNodeEntry { }).start() } }) + case let .enableNetworkExperiments(value): + return ItemListSwitchItem(presentationData: presentationData, title: "Download X [Restart App]", value: value, sectionId: self.section, style: .blocks, updated: { value in + if let context = arguments.context { + let _ = updateNetworkSettingsInteractively(postbox: context.account.postbox, network: context.account.network, { settings in + var settings = settings + settings.useExperimentalDownload = value + return settings + }).start() + } + }) case .restorePurchases: return ItemListActionItem(presentationData: presentationData, title: "Restore Purchases", kind: .generic, alignment: .natural, sectionId: self.section, style: .blocks, action: { arguments.context?.inAppPurchaseManager?.restorePurchases(completion: { state in @@ -1391,6 +1404,7 @@ private func debugControllerEntries(sharedContext: SharedAccountContext, present if isMainApp { entries.append(.disableVideoAspectScaling(experimentalSettings.disableVideoAspectScaling)) entries.append(.enableNetworkFramework(networkSettings?.useNetworkFramework ?? useBetaFeatures)) + entries.append(.enableNetworkExperiments(networkSettings?.useExperimentalDownload ?? false)) } if let backupHostOverride = networkSettings?.backupHostOverride { diff --git a/submodules/TelegramCore/Sources/Network/FetchV2.swift b/submodules/TelegramCore/Sources/Network/FetchV2.swift index 697fd9a68a..3cd53c9bf4 100644 --- a/submodules/TelegramCore/Sources/Network/FetchV2.swift +++ b/submodules/TelegramCore/Sources/Network/FetchV2.swift @@ -111,6 +111,7 @@ private final class FetchImpl { var pendingParts: [PendingPart] = [] var completedRanges = RangeSet() + var nextRangePriorityIndex: Int = 0 init( fetchLocation: FetchLocation, @@ -193,7 +194,7 @@ private final class FetchImpl { private let postbox: Postbox private let network: Network private let mediaReferenceRevalidationContext: MediaReferenceRevalidationContext? - private let resource: TelegramMediaResource + private var resource: TelegramMediaResource private let datacenterId: Int private let size: Int64? private let parameters: MediaResourceFetchParameters? @@ -207,6 +208,7 @@ private final class FetchImpl { private let consumerId: Int64 private var knownSize: Int64? + private var updatedFileReference: Data? private var requiredRangesDisposable: Disposable? private var requiredRanges: [RequiredRange] = [] @@ -253,6 +255,10 @@ private final class FetchImpl { self.knownSize = size + /*#if DEBUG + self.updatedFileReference = Data() + #endif*/ + if let resource = resource as? TelegramCloudMediaResource { if let apiInputLocation = resource.apiInputLocation(fileReference: Data()) { self.loggingIdentifier = "\(apiInputLocation)" @@ -301,16 +307,28 @@ private final class FetchImpl { switch state { case let .fetching(state): - var filteredRequiredRanges = RangeSet() + var filteredRequiredRanges: [RangeSet] = [] + for _ in 0 ..< 3 { + filteredRequiredRanges.append(RangeSet()) + } + for range in self.requiredRanges { - filteredRequiredRanges.formUnion(RangeSet(range.value)) + filteredRequiredRanges[Int(range.priority.rawValue)].formUnion(RangeSet(range.value)) } - if let knownSize = self.knownSize { - filteredRequiredRanges.remove(contentsOf: knownSize ..< Int64.max) - } - filteredRequiredRanges.subtract(state.completedRanges) - for pendingPart in state.pendingParts { - filteredRequiredRanges.remove(contentsOf: pendingPart.partRange) + var excludedInHigherPriorities = RangeSet() + for i in (0 ..< filteredRequiredRanges.count).reversed() { + if let knownSize = self.knownSize { + for i in 0 ..< filteredRequiredRanges.count { + filteredRequiredRanges[i].remove(contentsOf: knownSize ..< Int64.max) + } + } + filteredRequiredRanges[i].subtract(excludedInHigherPriorities) + filteredRequiredRanges[i].subtract(state.completedRanges) + for pendingPart in state.pendingParts { + filteredRequiredRanges[i].remove(contentsOf: pendingPart.partRange) + } + + excludedInHigherPriorities.subtract(filteredRequiredRanges[i]) } /*for _ in 0 ..< 1000000 { @@ -331,29 +349,43 @@ private final class FetchImpl { }*/ if state.pendingParts.count < state.maxPendingParts { - Logger.shared.log("FetchV2", "\(self.loggingIdentifier): will fetch \(filteredRequiredRanges.ranges)") + //let debugRanges = filteredRequiredRanges.ranges.map { "\($0.lowerBound)..<\($0.upperBound)" } + //Logger.shared.log("FetchV2", "\(self.loggingIdentifier): will fetch \(debugRanges)") while state.pendingParts.count < state.maxPendingParts { - guard let firstRange = filteredRequiredRanges.ranges.first else { + var found = false + inner: for i in 0 ..< filteredRequiredRanges.count { + let priorityIndex = (state.nextRangePriorityIndex + i) % filteredRequiredRanges.count + + guard let firstRange = filteredRequiredRanges[priorityIndex].ranges.first else { + continue + } + + state.nextRangePriorityIndex += 1 + + let (partRange, alignedRange) = alignPartFetchRange( + partRange: firstRange.lowerBound ..< min(firstRange.upperBound, firstRange.lowerBound + state.partSize), + minPartSize: state.minPartSize, + maxPartSize: state.maxPartSize, + alignment: state.partAlignment, + boundaryLimit: state.partDivision + ) + + Logger.shared.log("FetchV2", "\(self.loggingIdentifier): take part \(partRange) (aligned as \(alignedRange))") + + let pendingPart = PendingPart( + partRange: partRange, + fetchRange: alignedRange + ) + state.pendingParts.append(pendingPart) + filteredRequiredRanges[priorityIndex].remove(contentsOf: partRange) + + found = true + break inner + } + if !found { break } - - let (partRange, alignedRange) = alignPartFetchRange( - partRange: firstRange.lowerBound ..< min(firstRange.upperBound, firstRange.lowerBound + state.partSize), - minPartSize: state.minPartSize, - maxPartSize: state.maxPartSize, - alignment: state.partAlignment, - boundaryLimit: state.partDivision - ) - - Logger.shared.log("FetchV2", "\(self.loggingIdentifier): take part \(partRange) (aligned as \(alignedRange))") - - let pendingPart = PendingPart( - partRange: partRange, - fetchRange: alignedRange - ) - state.pendingParts.append(pendingPart) - filteredRequiredRanges.remove(contentsOf: partRange) } } @@ -393,6 +425,7 @@ private final class FetchImpl { partDivision: 1 * 1024 * 1024, maxPendingParts: 6 )) + self.update() }, error: { [weak self] error in guard let `self` = self else { return @@ -403,6 +436,52 @@ private final class FetchImpl { case let .refreshingFileReference(state): if state.disposable == nil { Logger.shared.log("FetchV2", "\(self.loggingIdentifier): refreshing file reference") + + if let info = self.parameters?.info as? TelegramCloudMediaResourceFetchInfo, let mediaReferenceRevalidationContext = self.mediaReferenceRevalidationContext { + let fetchLocation = state.fetchLocation + + state.disposable = (revalidateMediaResourceReference( + postbox: self.postbox, + network: self.network, + revalidationContext: mediaReferenceRevalidationContext, + info: info, + resource: self.resource + ) + |> deliverOn(self.queue)).start(next: { [weak self] validationResult in + guard let `self` = self else { + return + } + + if let validatedResource = validationResult.updatedResource as? TelegramCloudMediaResourceWithFileReference, let reference = validatedResource.fileReference { + self.updatedFileReference = reference + } + self.resource = validationResult.updatedResource + + /*if let reference = validationResult.updatedReference { + strongSelf.resourceReference = .reference(reference) + } else { + strongSelf.resourceReference = .empty + }*/ + + self.state = .fetching(FetchingState( + fetchLocation: fetchLocation, + partSize: 128 * 1024, + minPartSize: 4 * 1024, + maxPartSize: 128 * 1024, + partAlignment: 4 * 1024, + partDivision: 1 * 1024 * 1024, + maxPendingParts: 6 + )) + + self.update() + }, error: { [weak self] _ in + guard let `self` = self else { + return + } + self.state = .failed + self.update() + }) + } } case .failed: break @@ -467,7 +546,9 @@ private final class FetchImpl { case let .datacenter(sourceDatacenterId): if let cloudResource = self.resource as? TelegramCloudMediaResource { var fileReference: Data? - if let info = self.parameters?.info as? TelegramCloudMediaResourceFetchInfo { + if let updatedFileReference = self.updatedFileReference { + fileReference = updatedFileReference + } else if let info = self.parameters?.info as? TelegramCloudMediaResourceFetchInfo { fileReference = info.reference.apiFileReference } if let inputLocation = cloudResource.apiInputLocation(fileReference: fileReference) { @@ -477,7 +558,8 @@ private final class FetchImpl { data: Api.functions.upload.getFile( flags: 0, location: inputLocation, - offset: part.fetchRange.lowerBound, limit: Int32(requestedLength)), + offset: part.fetchRange.lowerBound, + limit: Int32(requestedLength)), tag: self.parameters?.tag, continueInBackground: self.continueInBackground ) @@ -496,8 +578,12 @@ private final class FetchImpl { )) } } - |> `catch` { _ -> Signal in - return .single(.failure) + |> `catch` { error -> Signal in + if error.errorDescription.hasPrefix("FILEREF_INVALID") || error.errorDescription.hasPrefix("FILE_REFERENCE_") { + return .single(.fileReferenceExpired) + } else { + return .single(.failure) + } } } } diff --git a/submodules/TelegramCore/Sources/Network/MultipartFetch.swift b/submodules/TelegramCore/Sources/Network/MultipartFetch.swift index 1726dbaf62..4f54b333ae 100644 --- a/submodules/TelegramCore/Sources/Network/MultipartFetch.swift +++ b/submodules/TelegramCore/Sources/Network/MultipartFetch.swift @@ -978,22 +978,22 @@ func multipartFetch( continueInBackground: Bool = false, useMainConnection: Bool = false ) -> Signal { - #if DEBUG && false - return multipartFetchV2( - postbox: postbox, - network: network, - mediaReferenceRevalidationContext: mediaReferenceRevalidationContext, - resource: resource, - datacenterId: datacenterId, - size: size, - intervals: intervals, - parameters: parameters, - encryptionKey: encryptionKey, - decryptedSize: decryptedSize, - continueInBackground: continueInBackground, - useMainConnection: useMainConnection - ) - #else + if network.useExperimentalFeatures, let _ = resource as? TelegramCloudMediaResource { + return multipartFetchV2( + postbox: postbox, + network: network, + mediaReferenceRevalidationContext: mediaReferenceRevalidationContext, + resource: resource, + datacenterId: datacenterId, + size: size, + intervals: intervals, + parameters: parameters, + encryptionKey: encryptionKey, + decryptedSize: decryptedSize, + continueInBackground: continueInBackground, + useMainConnection: useMainConnection + ) + } return multipartFetchV1( postbox: postbox, network: network, @@ -1008,5 +1008,4 @@ func multipartFetch( continueInBackground: continueInBackground, useMainConnection: useMainConnection ) - #endif } diff --git a/submodules/TelegramCore/Sources/Network/Network.swift b/submodules/TelegramCore/Sources/Network/Network.swift index a918255478..23b71f1ef2 100644 --- a/submodules/TelegramCore/Sources/Network/Network.swift +++ b/submodules/TelegramCore/Sources/Network/Network.swift @@ -603,7 +603,9 @@ func initializedNetwork(accountId: AccountRecordId, arguments: NetworkInitializa mtProto.delegate = connectionStatusDelegate mtProto.add(requestService) - let network = Network(queue: queue, datacenterId: datacenterId, context: context, mtProto: mtProto, requestService: requestService, connectionStatusDelegate: connectionStatusDelegate, _connectionStatus: connectionStatus, basePath: basePath, appDataDisposable: appDataDisposable, encryptionProvider: arguments.encryptionProvider, useRequestTimeoutTimers: useRequestTimeoutTimers, useBetaFeatures: arguments.useBetaFeatures) + let useExperimentalFeatures = networkSettings?.useExperimentalDownload ?? false + + let network = Network(queue: queue, datacenterId: datacenterId, context: context, mtProto: mtProto, requestService: requestService, connectionStatusDelegate: connectionStatusDelegate, _connectionStatus: connectionStatus, basePath: basePath, appDataDisposable: appDataDisposable, encryptionProvider: arguments.encryptionProvider, useRequestTimeoutTimers: useRequestTimeoutTimers, useBetaFeatures: arguments.useBetaFeatures, useExperimentalFeatures: useExperimentalFeatures) appDataUpdatedImpl = { [weak network] data in guard let data = data else { return @@ -735,6 +737,7 @@ public final class Network: NSObject, MTRequestMessageServiceDelegate { private let connectionStatusDelegate: MTProtoConnectionStatusDelegate private let useRequestTimeoutTimers: Bool public let useBetaFeatures: Bool + public let useExperimentalFeatures: Bool private let appDataDisposable: Disposable @@ -778,7 +781,7 @@ public final class Network: NSObject, MTRequestMessageServiceDelegate { return "Network context: \(self.context)" } - fileprivate init(queue: Queue, datacenterId: Int, context: MTContext, mtProto: MTProto, requestService: MTRequestMessageService, connectionStatusDelegate: MTProtoConnectionStatusDelegate, _connectionStatus: Promise, basePath: String, appDataDisposable: Disposable, encryptionProvider: EncryptionProvider, useRequestTimeoutTimers: Bool, useBetaFeatures: Bool) { + fileprivate init(queue: Queue, datacenterId: Int, context: MTContext, mtProto: MTProto, requestService: MTRequestMessageService, connectionStatusDelegate: MTProtoConnectionStatusDelegate, _connectionStatus: Promise, basePath: String, appDataDisposable: Disposable, encryptionProvider: EncryptionProvider, useRequestTimeoutTimers: Bool, useBetaFeatures: Bool, useExperimentalFeatures: Bool) { self.encryptionProvider = encryptionProvider self.queue = queue @@ -793,6 +796,7 @@ public final class Network: NSObject, MTRequestMessageServiceDelegate { self.basePath = basePath self.useRequestTimeoutTimers = useRequestTimeoutTimers self.useBetaFeatures = useBetaFeatures + self.useExperimentalFeatures = useExperimentalFeatures super.init() diff --git a/submodules/TelegramCore/Sources/SyncCore/SyncCore_NetworkSettings.swift b/submodules/TelegramCore/Sources/SyncCore/SyncCore_NetworkSettings.swift index 14d8a988ba..420f7cda94 100644 --- a/submodules/TelegramCore/Sources/SyncCore/SyncCore_NetworkSettings.swift +++ b/submodules/TelegramCore/Sources/SyncCore/SyncCore_NetworkSettings.swift @@ -5,16 +5,18 @@ public struct NetworkSettings: Codable { public var applicationUpdateUrlPrefix: String? public var backupHostOverride: String? public var useNetworkFramework: Bool? + public var useExperimentalDownload: Bool? public static var defaultSettings: NetworkSettings { - return NetworkSettings(reducedBackupDiscoveryTimeout: false, applicationUpdateUrlPrefix: nil, backupHostOverride: nil, useNetworkFramework: nil) + return NetworkSettings(reducedBackupDiscoveryTimeout: false, applicationUpdateUrlPrefix: nil, backupHostOverride: nil, useNetworkFramework: nil, useExperimentalDownload: nil) } - public init(reducedBackupDiscoveryTimeout: Bool, applicationUpdateUrlPrefix: String?, backupHostOverride: String?, useNetworkFramework: Bool?) { + public init(reducedBackupDiscoveryTimeout: Bool, applicationUpdateUrlPrefix: String?, backupHostOverride: String?, useNetworkFramework: Bool?, useExperimentalDownload: Bool?) { self.reducedBackupDiscoveryTimeout = reducedBackupDiscoveryTimeout self.applicationUpdateUrlPrefix = applicationUpdateUrlPrefix self.backupHostOverride = backupHostOverride self.useNetworkFramework = useNetworkFramework + self.useExperimentalDownload = useExperimentalDownload } public init(from decoder: Decoder) throws { @@ -24,6 +26,7 @@ public struct NetworkSettings: Codable { self.applicationUpdateUrlPrefix = try? container.decodeIfPresent(String.self, forKey: "applicationUpdateUrlPrefix") self.backupHostOverride = try? container.decodeIfPresent(String.self, forKey: "backupHostOverride") self.useNetworkFramework = try container.decodeIfPresent(Bool.self, forKey: "useNetworkFramework_v2") + self.useExperimentalDownload = try container.decodeIfPresent(Bool.self, forKey: "useExperimentalDownload") } public func encode(to encoder: Encoder) throws { @@ -33,5 +36,6 @@ public struct NetworkSettings: Codable { try container.encodeIfPresent(self.applicationUpdateUrlPrefix, forKey: "applicationUpdateUrlPrefix") try container.encodeIfPresent(self.backupHostOverride, forKey: "backupHostOverride") try container.encodeIfPresent(self.useNetworkFramework, forKey: "useNetworkFramework_v2") + try container.encodeIfPresent(self.useExperimentalDownload, forKey: "useExperimentalDownload") } }