Merge branch 'master' of gitlab.com:peter-iakovlev/TelegramCore

This commit is contained in:
Ilya Laktyushin 2018-12-07 22:59:43 +04:00
commit bb12e8c99c
11 changed files with 156 additions and 79 deletions

View File

@ -812,6 +812,7 @@ public class Account {
public let shouldKeepOnlinePresence = Promise<Bool>()
public let autolockReportDeadline = Promise<Int32?>()
public let shouldExplicitelyKeepWorkerConnections = Promise<Bool>(false)
public let shouldKeepBackgroundDownloadConnections = Promise<Bool>(false)
private let networkStateValue = Promise<AccountNetworkState>(.waitingForNetwork)
public var networkState: Signal<AccountNetworkState, NoError> {
@ -1035,6 +1036,7 @@ public class Account {
self.network.shouldKeepConnection.set(shouldBeMaster)
self.network.shouldExplicitelyKeepWorkerConnections.set(self.shouldExplicitelyKeepWorkerConnections.get())
self.network.shouldKeepBackgroundDownloadConnections.set(self.shouldKeepBackgroundDownloadConnections.get())
let serviceTasksMaster = shouldBeMaster
|> deliverOn(self.serviceQueue)

View File

@ -23,8 +23,9 @@ public struct CacheUsageStats {
public let cacheSize: Int64
public let tempPaths: [String]
public let tempSize: Int64
public let immutableSize: Int64
public init(media: [PeerId: [PeerCacheUsageCategory: [MediaId: Int64]]], mediaResourceIds: [MediaId: [MediaResourceId]], peers: [PeerId: Peer], otherSize: Int64, otherPaths: [String], cacheSize: Int64, tempPaths: [String], tempSize: Int64) {
public init(media: [PeerId: [PeerCacheUsageCategory: [MediaId: Int64]]], mediaResourceIds: [MediaId: [MediaResourceId]], peers: [PeerId: Peer], otherSize: Int64, otherPaths: [String], cacheSize: Int64, tempPaths: [String], tempSize: Int64, immutableSize: Int64) {
self.media = media
self.mediaResourceIds = mediaResourceIds
self.peers = peers
@ -33,6 +34,7 @@ public struct CacheUsageStats {
self.cacheSize = cacheSize
self.tempPaths = tempPaths
self.tempSize = tempSize
self.immutableSize = immutableSize
}
}
@ -53,7 +55,7 @@ private final class CacheUsageStatsState {
var lowerBound: MessageIndex?
}
public func collectCacheUsageStats(account: Account) -> Signal<CacheUsageStatsResult, NoError> {
public func collectCacheUsageStats(account: Account, additionalCachePaths: [String], logFilesPath: String) -> Signal<CacheUsageStatsResult, NoError> {
let state = Atomic<CacheUsageStatsState>(value: CacheUsageStatsState())
let excludeResourceIds = account.postbox.transaction { transaction -> Set<WrappedMediaResourceId> in
@ -85,14 +87,32 @@ public func collectCacheUsageStats(account: Account) -> Signal<CacheUsageStatsRe
var resourceIds: [MediaResourceId] = []
for (id, media) in mediaRefs {
mediaResourceIds[id] = []
var parsedMedia: [Media] = []
switch media {
case let image as TelegramMediaImage:
parsedMedia.append(image)
case let file as TelegramMediaFile:
parsedMedia.append(file)
case let webpage as TelegramMediaWebpage:
if case let .Loaded(content) = webpage.content {
if let image = content.image {
parsedMedia.append(image)
}
if let file = content.file {
parsedMedia.append(file)
}
}
default:
break
}
for media in parsedMedia {
if let image = media as? TelegramMediaImage {
for representation in image.representations {
resourceIds.append(representation.resource.id)
resourceIdToMediaId[WrappedMediaResourceId(representation.resource.id)] = (id, .image)
mediaResourceIds[id]!.append(representation.resource.id)
}
case let file as TelegramMediaFile:
} else if let file = media as? TelegramMediaFile {
var category: PeerCacheUsageCategory = .file
loop: for attribute in file.attributes {
switch attribute {
@ -114,8 +134,7 @@ public func collectCacheUsageStats(account: Account) -> Signal<CacheUsageStatsRe
resourceIds.append(file.resource.id)
resourceIdToMediaId[WrappedMediaResourceId(file.resource.id)] = (id, category)
mediaResourceIds[id]!.append(file.resource.id)
default:
break
}
}
}
return account.postbox.mediaBox.collectResourceCacheUsage(resourceIds)
@ -172,6 +191,38 @@ public func collectCacheUsageStats(account: Account) -> Signal<CacheUsageStatsRe
}
#endif
var immutableSize: Int64 = 0
if let files = try? FileManager.default.contentsOfDirectory(at: URL(fileURLWithPath: account.basePath + "/postbox/db"), includingPropertiesForKeys: [URLResourceKey.fileSizeKey], options: []) {
for url in files {
if let fileSize = (try? url.resourceValues(forKeys: Set([.fileSizeKey])))?.fileSize {
immutableSize += Int64(fileSize)
}
}
}
if let files = try? FileManager.default.contentsOfDirectory(at: URL(fileURLWithPath: logFilesPath), includingPropertiesForKeys: [URLResourceKey.fileSizeKey], options: []) {
for url in files {
if let fileSize = (try? url.resourceValues(forKeys: Set([.fileSizeKey])))?.fileSize {
immutableSize += Int64(fileSize)
}
}
}
for additionalPath in additionalCachePaths {
if let enumerator = FileManager.default.enumerator(at: URL(fileURLWithPath: additionalPath), includingPropertiesForKeys: [.isDirectoryKey, .fileSizeKey]) {
for url in enumerator {
if let url = url as? URL {
if let isDirectoryValue = (try? url.resourceValues(forKeys: Set([.isDirectoryKey])))?.isDirectory, isDirectoryValue {
//tempPaths.append(url.path)
} else if let fileSizeValue = (try? url.resourceValues(forKeys: Set([.fileSizeKey])))?.fileSize {
tempPaths.append(url.path)
//print("\(url.path) \(fileSizeValue)")
tempSize += Int64(fileSizeValue)
}
}
}
}
}
return account.postbox.transaction { transaction -> CacheUsageStats in
var peers: [PeerId: Peer] = [:]
for peerId in finalMedia.keys {
@ -182,7 +233,7 @@ public func collectCacheUsageStats(account: Account) -> Signal<CacheUsageStatsRe
}
}
}
return CacheUsageStats(media: finalMedia, mediaResourceIds: finalMediaResourceIds, peers: peers, otherSize: otherSize, otherPaths: otherPaths, cacheSize: cacheSize, tempPaths: tempPaths, tempSize: tempSize)
return CacheUsageStats(media: finalMedia, mediaResourceIds: finalMediaResourceIds, peers: peers, otherSize: otherSize, otherPaths: otherPaths, cacheSize: cacheSize, tempPaths: tempPaths, tempSize: tempSize, immutableSize: immutableSize)
} |> mapError { _ -> CollectCacheUsageStatsError in preconditionFailure() }
|> mapToSignal { stats -> Signal<CacheUsageStatsResult, CollectCacheUsageStatsError> in
return .fail(.done(stats))

View File

@ -495,19 +495,21 @@ extension MediaResourceReference {
final class TelegramCloudMediaResourceFetchInfo: MediaResourceFetchInfo {
let reference: MediaResourceReference
let preferBackgroundReferenceRevalidation: Bool
let continueInBackground: Bool
init(reference: MediaResourceReference, preferBackgroundReferenceRevalidation: Bool) {
init(reference: MediaResourceReference, preferBackgroundReferenceRevalidation: Bool, continueInBackground: Bool) {
self.reference = reference
self.preferBackgroundReferenceRevalidation = preferBackgroundReferenceRevalidation
self.continueInBackground = continueInBackground
}
}
public func fetchedMediaResource(postbox: Postbox, reference: MediaResourceReference, range: (Range<Int>, MediaBoxFetchPriority)? = nil, statsCategory: MediaResourceStatsCategory = .generic, reportResultStatus: Bool = false, preferBackgroundReferenceRevalidation: Bool = false) -> Signal<FetchResourceSourceType, NoError> {
public func fetchedMediaResource(postbox: Postbox, reference: MediaResourceReference, range: (Range<Int>, MediaBoxFetchPriority)? = nil, statsCategory: MediaResourceStatsCategory = .generic, reportResultStatus: Bool = false, preferBackgroundReferenceRevalidation: Bool = false, continueInBackground: Bool = false) -> Signal<FetchResourceSourceType, NoError> {
if let (range, priority) = range {
return postbox.mediaBox.fetchedResourceData(reference.resource, in: range, priority: priority, parameters: MediaResourceFetchParameters(tag: TelegramMediaResourceFetchTag(statsCategory: statsCategory), info: TelegramCloudMediaResourceFetchInfo(reference: reference, preferBackgroundReferenceRevalidation: preferBackgroundReferenceRevalidation)))
return postbox.mediaBox.fetchedResourceData(reference.resource, in: range, priority: priority, parameters: MediaResourceFetchParameters(tag: TelegramMediaResourceFetchTag(statsCategory: statsCategory), info: TelegramCloudMediaResourceFetchInfo(reference: reference, preferBackgroundReferenceRevalidation: preferBackgroundReferenceRevalidation, continueInBackground: continueInBackground)))
|> map { _ in .local }
} else {
return postbox.mediaBox.fetchedResource(reference.resource, parameters: MediaResourceFetchParameters(tag: TelegramMediaResourceFetchTag(statsCategory: statsCategory), info: TelegramCloudMediaResourceFetchInfo(reference: reference, preferBackgroundReferenceRevalidation: preferBackgroundReferenceRevalidation)), implNext: reportResultStatus)
return postbox.mediaBox.fetchedResource(reference.resource, parameters: MediaResourceFetchParameters(tag: TelegramMediaResourceFetchTag(statsCategory: statsCategory), info: TelegramCloudMediaResourceFetchInfo(reference: reference, preferBackgroundReferenceRevalidation: preferBackgroundReferenceRevalidation, continueInBackground: continueInBackground)), implNext: reportResultStatus)
}
}

View File

@ -154,7 +154,7 @@ private func synchronizeRecentlyUsedMedia(transaction: Transaction, postbox: Pos
case .generic:
return .fail(.generic)
case .invalidReference:
return revalidateMediaResourceReference(postbox: postbox, network: network, revalidationContext: revalidationContext, info: TelegramCloudMediaResourceFetchInfo(reference: fileReference.resourceReference(fileReference.media.resource), preferBackgroundReferenceRevalidation: false), resource: fileReference.media.resource)
return revalidateMediaResourceReference(postbox: postbox, network: network, revalidationContext: revalidationContext, info: TelegramCloudMediaResourceFetchInfo(reference: fileReference.resourceReference(fileReference.media.resource), preferBackgroundReferenceRevalidation: false, continueInBackground: false), resource: fileReference.media.resource)
|> mapError { _ -> SaveRecentlyUsedMediaError in
return .generic
}

View File

@ -150,7 +150,7 @@ private func synchronizeSavedGifs(transaction: Transaction, postbox: Postbox, ne
case .generic:
return .fail(.generic)
case .invalidReference:
return revalidateMediaResourceReference(postbox: postbox, network: network, revalidationContext: revalidationContext, info: TelegramCloudMediaResourceFetchInfo(reference: fileReference.resourceReference(fileReference.media.resource), preferBackgroundReferenceRevalidation: false), resource: fileReference.media.resource)
return revalidateMediaResourceReference(postbox: postbox, network: network, revalidationContext: revalidationContext, info: TelegramCloudMediaResourceFetchInfo(reference: fileReference.resourceReference(fileReference.media.resource), preferBackgroundReferenceRevalidation: false, continueInBackground: false), resource: fileReference.media.resource)
|> mapError { _ -> SaveGifError in
return .generic
}

View File

@ -150,7 +150,7 @@ private func synchronizeSavedStickers(transaction: Transaction, postbox: Postbox
case .generic:
return .fail(.generic)
case .invalidReference:
return revalidateMediaResourceReference(postbox: postbox, network: network, revalidationContext: revalidationContext, info: TelegramCloudMediaResourceFetchInfo(reference: fileReference.resourceReference(fileReference.media.resource), preferBackgroundReferenceRevalidation: false), resource: fileReference.media.resource)
return revalidateMediaResourceReference(postbox: postbox, network: network, revalidationContext: revalidationContext, info: TelegramCloudMediaResourceFetchInfo(reference: fileReference.resourceReference(fileReference.media.resource), preferBackgroundReferenceRevalidation: false, continueInBackground: false), resource: fileReference.media.resource)
|> mapError { _ -> SaveStickerError in
return .generic
}

View File

@ -84,14 +84,14 @@ private struct DownloadWrapper {
let isCdn: Bool
let network: Network
func request<T>(_ data: (FunctionDescription, Buffer, DeserializeFunctionResponse<T>), tag: MediaResourceFetchTag?) -> Signal<T, MTRpcError> {
func request<T>(_ data: (FunctionDescription, Buffer, DeserializeFunctionResponse<T>), tag: MediaResourceFetchTag?, continueInBackground: Bool) -> Signal<T, MTRpcError> {
let target: MultiplexedRequestTarget
if self.isCdn {
target = .cdn(Int(self.datacenterId))
} else {
target = .main(Int(self.datacenterId))
}
return network.multiplexedRequestManager.request(to: target, consumerId: self.consumerId, data: data, tag: tag)
return network.multiplexedRequestManager.request(to: target, consumerId: self.consumerId, data: data, tag: tag, continueInBackground: continueInBackground)
}
}
@ -115,6 +115,7 @@ private final class MultipartCdnHashSource {
private let fileToken: Data
private let masterDownload: DownloadWrapper
private let continueInBackground: Bool
private var knownUpperBound: Int32
private var hashes: [Int32: Data] = [:]
@ -123,14 +124,15 @@ private final class MultipartCdnHashSource {
private var subscribers = Bag<(Int32, Int32, ([Int32: Data]) -> Void)>()
init(queue: Queue, fileToken: Data, hashes: [Int32: Data], masterDownload: DownloadWrapper) {
init(queue: Queue, fileToken: Data, hashes: [Int32: Data], masterDownload: DownloadWrapper, continueInBackground: Bool) {
assert(queue.isCurrent())
self.queue = queue
self.fileToken = fileToken
self.masterDownload = masterDownload
self.continueInBackground = continueInBackground
var knownUpperBound: Int32 = 0
let knownUpperBound: Int32 = 0
/*self.hashes = hashes
for (offset, _) in hashes {
assert(offset % dataHashLength == 0)
@ -221,7 +223,7 @@ private final class MultipartCdnHashSource {
self.requestOffsetAndDisposable = (requestOffset, disposable)
let queue = self.queue
let fileToken = self.fileToken
disposable.set((self.masterDownload.request(Api.functions.upload.getCdnFileHashes(fileToken: Buffer(data: fileToken), offset: requestOffset), tag: nil)
disposable.set((self.masterDownload.request(Api.functions.upload.getCdnFileHashes(fileToken: Buffer(data: fileToken), offset: requestOffset), tag: nil, continueInBackground: self.continueInBackground)
|> map { partHashes -> [Int32: Data] in
var parsedPartHashes: [Int32: Data] = [:]
for part in partHashes {
@ -270,7 +272,7 @@ private enum MultipartFetchSource {
case master(location: MultipartFetchMasterLocation, download: DownloadWrapper)
case cdn(masterDatacenterId: Int32, fileToken: Data, key: Data, iv: Data, download: DownloadWrapper, masterDownload: DownloadWrapper, hashSource: MultipartCdnHashSource)
func request(offset: Int32, limit: Int32, tag: MediaResourceFetchTag?, fileReference: Data?) -> Signal<Data, MultipartFetchDownloadError> {
func request(offset: Int32, limit: Int32, tag: MediaResourceFetchTag?, fileReference: Data?, continueInBackground: Bool) -> Signal<Data, MultipartFetchDownloadError> {
switch self {
case .none:
return .never()
@ -283,7 +285,7 @@ private enum MultipartFetchSource {
switch location {
case let .generic(_, location):
if let parsedLocation = location(fileReference) {
return download.request(Api.functions.upload.getFile(location: parsedLocation, offset: offset, limit: Int32(updatedLength)), tag: tag)
return download.request(Api.functions.upload.getFile(location: parsedLocation, offset: offset, limit: Int32(updatedLength)), tag: tag, continueInBackground: continueInBackground)
|> mapError { error -> MultipartFetchDownloadError in
if error.errorDescription.hasPrefix("FILEREF_INVALID") || error.errorDescription.hasPrefix("FILE_REFERENCE_") {
return .revalidateMediaReference
@ -314,7 +316,7 @@ private enum MultipartFetchSource {
return .fail(.revalidateMediaReference)
}
case let .web(_, location):
return download.request(Api.functions.upload.getWebFile(location: location, offset: offset, limit: Int32(updatedLength)), tag: tag)
return download.request(Api.functions.upload.getWebFile(location: location, offset: offset, limit: Int32(updatedLength)), tag: tag, continueInBackground: continueInBackground)
|> mapError { error -> MultipartFetchDownloadError in
return .generic
}
@ -335,7 +337,7 @@ private enum MultipartFetchSource {
updatedLength += 1
}
let part = download.request(Api.functions.upload.getCdnFile(fileToken: Buffer(data: fileToken), offset: offset, limit: Int32(updatedLength)), tag: nil)
let part = download.request(Api.functions.upload.getCdnFile(fileToken: Buffer(data: fileToken), offset: offset, limit: Int32(updatedLength)), tag: nil, continueInBackground: continueInBackground)
|> mapError { _ -> MultipartFetchDownloadError in
return .generic
}
@ -402,6 +404,7 @@ private final class MultipartFetchManager {
let postbox: Postbox
let network: Network
let revalidationContext: MediaReferenceRevalidationContext
let continueInBackground: Bool
let partReady: (Int, Data) -> Void
let reportCompleteSize: (Int) -> Void
@ -437,6 +440,9 @@ private final class MultipartFetchManager {
if let info = parameters?.info as? TelegramCloudMediaResourceFetchInfo {
self.fileReference = info.reference.apiFileReference
self.continueInBackground = info.continueInBackground
} else {
self.continueInBackground = false
}
self.state = MultipartDownloadState(encryptionKey: encryptionKey, decryptedSize: decryptedSize)
@ -587,7 +593,7 @@ private final class MultipartFetchManager {
requestLimit = (requestLimit / self.partAlignment + 1) * self.partAlignment
}
let part = self.source.request(offset: Int32(downloadRange.lowerBound), limit: Int32(requestLimit), tag: self.parameters?.tag, fileReference: self.fileReference)
let part = self.source.request(offset: Int32(downloadRange.lowerBound), limit: Int32(requestLimit), tag: self.parameters?.tag, fileReference: self.fileReference, continueInBackground: self.continueInBackground)
|> deliverOn(self.queue)
let partDisposable = MetaDisposable()
self.fetchingParts[downloadRange.lowerBound] = (downloadRange.count, partDisposable)
@ -634,7 +640,7 @@ private final class MultipartFetchManager {
case let .switchToCdn(id, token, key, iv, partHashes):
switch strongSelf.source {
case let .master(location, download):
strongSelf.source = .cdn(masterDatacenterId: location.datacenterId, fileToken: token, key: key, iv: iv, download: DownloadWrapper(consumerId: strongSelf.consumerId, datacenterId: id, isCdn: true, network: strongSelf.network), masterDownload: download, hashSource: MultipartCdnHashSource(queue: strongSelf.queue, fileToken: token, hashes: partHashes, masterDownload: download))
strongSelf.source = .cdn(masterDatacenterId: location.datacenterId, fileToken: token, key: key, iv: iv, download: DownloadWrapper(consumerId: strongSelf.consumerId, datacenterId: id, isCdn: true, network: strongSelf.network), masterDownload: download, hashSource: MultipartCdnHashSource(queue: strongSelf.queue, fileToken: token, hashes: partHashes, masterDownload: download, continueInBackground: strongSelf.continueInBackground))
strongSelf.checkState()
case .cdn, .none:
break
@ -646,7 +652,7 @@ private final class MultipartFetchManager {
case let .cdn(_, fileToken, _, _, _, masterDownload, _):
if !strongSelf.reuploadingToCdn {
strongSelf.reuploadingToCdn = true
let reupload: Signal<[Api.FileHash], NoError> = masterDownload.request(Api.functions.upload.reuploadCdnFile(fileToken: Buffer(data: fileToken), requestToken: Buffer(data: token)), tag: nil)
let reupload: Signal<[Api.FileHash], NoError> = masterDownload.request(Api.functions.upload.reuploadCdnFile(fileToken: Buffer(data: fileToken), requestToken: Buffer(data: token)), tag: nil, continueInBackground: strongSelf.continueInBackground)
|> `catch` { _ -> Signal<[Api.FileHash], NoError> in
return .single([])
}
@ -666,7 +672,7 @@ private final class MultipartFetchManager {
}
}
func multipartFetch(account: Account, resource: TelegramMediaResource, datacenterId: Int, size: Int?, intervals: Signal<[(Range<Int>, MediaBoxFetchPriority)], NoError>, parameters: MediaResourceFetchParameters?, encryptionKey: SecretFileEncryptionKey? = nil, decryptedSize: Int32? = nil) -> Signal<MediaResourceDataFetchResult, MediaResourceDataFetchError> {
func multipartFetch(account: Account, resource: TelegramMediaResource, datacenterId: Int, size: Int?, intervals: Signal<[(Range<Int>, MediaBoxFetchPriority)], NoError>, parameters: MediaResourceFetchParameters?, encryptionKey: SecretFileEncryptionKey? = nil, decryptedSize: Int32? = nil, continueInBackground: Bool = false) -> Signal<MediaResourceDataFetchResult, MediaResourceDataFetchError> {
return Signal { subscriber in
let location: MultipartFetchMasterLocation
if let resource = resource as? TelegramCloudMediaResource {

View File

@ -14,6 +14,11 @@ enum MultiplexedRequestTarget: Equatable, Hashable {
case cdn(Int)
}
private struct MultiplexedRequestTargetKey: Equatable, Hashable {
let target: MultiplexedRequestTarget
let continueInBackground: Bool
}
private final class RequestData {
let id: Int32
let consumerId: Int64
@ -21,16 +26,18 @@ private final class RequestData {
let functionDescription: FunctionDescription
let payload: Buffer
let tag: MediaResourceFetchTag?
let continueInBackground: Bool
let deserializeResponse: (Buffer) -> Any?
let completed: (Any) -> Void
let error: (MTRpcError) -> Void
init(id: Int32, consumerId: Int64, target: MultiplexedRequestTarget, functionDescription: FunctionDescription, payload: Buffer, tag: MediaResourceFetchTag?, deserializeResponse: @escaping (Buffer) -> Any?, completed: @escaping (Any) -> Void, error: @escaping (MTRpcError) -> Void) {
init(id: Int32, consumerId: Int64, target: MultiplexedRequestTarget, functionDescription: FunctionDescription, payload: Buffer, tag: MediaResourceFetchTag?, continueInBackground: Bool, deserializeResponse: @escaping (Buffer) -> Any?, completed: @escaping (Any) -> Void, error: @escaping (MTRpcError) -> Void) {
self.id = id
self.consumerId = consumerId
self.target = target
self.functionDescription = functionDescription
self.tag = tag
self.continueInBackground = continueInBackground
self.payload = payload
self.deserializeResponse = deserializeResponse
self.completed = completed
@ -61,7 +68,7 @@ private final class RequestTargetContext {
}
private struct MultiplexedRequestTargetTimerKey: Equatable, Hashable {
let target: MultiplexedRequestTarget
let key: MultiplexedRequestTargetKey
let id: Int32
}
@ -73,21 +80,21 @@ private typealias SignalKitTimer = SwiftSignalKit.Timer
private final class MultiplexedRequestManagerContext {
private let queue: Queue
private let takeWorker: (MultiplexedRequestTarget, MediaResourceFetchTag?) -> Download?
private let takeWorker: (MultiplexedRequestTarget, MediaResourceFetchTag?, Bool) -> Download?
private var queuedRequests: [RequestData] = []
private var nextId: Int32 = 0
private var targetContexts: [MultiplexedRequestTarget: [RequestTargetContext]] = [:]
private var targetContexts: [MultiplexedRequestTargetKey: [RequestTargetContext]] = [:]
private var emptyTargetTimers: [MultiplexedRequestTargetTimerKey: SignalKitTimer] = [:]
init(queue: Queue, takeWorker: @escaping (MultiplexedRequestTarget, MediaResourceFetchTag?) -> Download?) {
init(queue: Queue, takeWorker: @escaping (MultiplexedRequestTarget, MediaResourceFetchTag?, Bool) -> Download?) {
self.queue = queue
self.takeWorker = takeWorker
}
deinit {
for targetContextList in targetContexts.values {
for targetContextList in self.targetContexts.values {
for targetContext in targetContextList {
for request in targetContext.requests {
request.disposable.dispose()
@ -99,10 +106,12 @@ private final class MultiplexedRequestManagerContext {
}
}
func request(to target: MultiplexedRequestTarget, consumerId: Int64, data: (FunctionDescription, Buffer, (Buffer) -> Any?), tag: MediaResourceFetchTag?, completed: @escaping (Any) -> Void, error: @escaping (MTRpcError) -> Void) -> Disposable {
func request(to target: MultiplexedRequestTarget, consumerId: Int64, data: (FunctionDescription, Buffer, (Buffer) -> Any?), tag: MediaResourceFetchTag?, continueInBackground: Bool, completed: @escaping (Any) -> Void, error: @escaping (MTRpcError) -> Void) -> Disposable {
let targetKey = MultiplexedRequestTargetKey(target: target, continueInBackground: continueInBackground)
let requestId = self.nextId
self.nextId += 1
self.queuedRequests.append(RequestData(id: requestId, consumerId: consumerId, target: target, functionDescription: data.0, payload: data.1, tag: tag, deserializeResponse: { buffer in
self.queuedRequests.append(RequestData(id: requestId, consumerId: consumerId, target: target, functionDescription: data.0, payload: data.1, tag: tag, continueInBackground: continueInBackground, deserializeResponse: { buffer in
return data.2(buffer)
}, completed: { result in
completed(result)
@ -125,8 +134,8 @@ private final class MultiplexedRequestManagerContext {
}
}
if strongSelf.targetContexts[target] != nil {
outer: for targetContext in strongSelf.targetContexts[target]! {
if strongSelf.targetContexts[targetKey] != nil {
outer: for targetContext in strongSelf.targetContexts[targetKey]! {
for i in 0 ..< targetContext.requests.count {
if targetContext.requests[i].requestId == requestId {
targetContext.requests[i].disposable.dispose()
@ -149,23 +158,24 @@ private final class MultiplexedRequestManagerContext {
var requestIndex = 0
while requestIndex < self.queuedRequests.count {
let request = self.queuedRequests[requestIndex]
let targetKey = MultiplexedRequestTargetKey(target: request.target, continueInBackground: request.continueInBackground)
if self.targetContexts[request.target] == nil {
self.targetContexts[request.target] = []
if self.targetContexts[targetKey] == nil {
self.targetContexts[targetKey] = []
}
var selectedContext: RequestTargetContext?
for targetContext in self.targetContexts[request.target]! {
for targetContext in self.targetContexts[targetKey]! {
if targetContext.requests.count < maxRequestsPerWorker {
selectedContext = targetContext
break
}
}
if selectedContext == nil && self.targetContexts[request.target]!.count < maxWorkersPerTarget {
if let worker = self.takeWorker(request.target, request.tag) {
if selectedContext == nil && self.targetContexts[targetKey]!.count < maxWorkersPerTarget {
if let worker = self.takeWorker(request.target, request.tag, request.continueInBackground) {
let contextId = self.nextId
self.nextId += 1
let targetContext = RequestTargetContext(id: contextId, worker: worker)
self.targetContexts[request.target]!.append(targetContext)
self.targetContexts[targetKey]!.append(targetContext)
selectedContext = targetContext
} else {
Logger.shared.log("MultiplexedRequestManager", "couldn't take worker")
@ -221,9 +231,9 @@ private final class MultiplexedRequestManagerContext {
}
private func checkEmptyContexts() {
for (target, contexts) in self.targetContexts {
for (targetKey, contexts) in self.targetContexts {
for context in contexts {
let key = MultiplexedRequestTargetTimerKey(target: target, id: context.id)
let key = MultiplexedRequestTargetTimerKey(key: targetKey, id: context.id)
if context.requests.isEmpty {
if self.emptyTargetTimers[key] == nil {
let timer = SignalKitTimer(timeout: 2.0, repeat: false, completion: { [weak self] in
@ -231,10 +241,10 @@ private final class MultiplexedRequestManagerContext {
return
}
strongSelf.emptyTargetTimers.removeValue(forKey: key)
if strongSelf.targetContexts[target] != nil {
for i in 0 ..< strongSelf.targetContexts[target]!.count {
if strongSelf.targetContexts[target]![i].id == key.id {
strongSelf.targetContexts[target]!.remove(at: i)
if strongSelf.targetContexts[targetKey] != nil {
for i in 0 ..< strongSelf.targetContexts[targetKey]!.count {
if strongSelf.targetContexts[targetKey]![i].id == key.id {
strongSelf.targetContexts[targetKey]!.remove(at: i)
break
}
}
@ -258,20 +268,20 @@ final class MultiplexedRequestManager {
private let queue = Queue()
private let context: QueueLocalObject<MultiplexedRequestManagerContext>
init(takeWorker: @escaping (MultiplexedRequestTarget, MediaResourceFetchTag?) -> Download?) {
init(takeWorker: @escaping (MultiplexedRequestTarget, MediaResourceFetchTag?, Bool) -> Download?) {
let queue = self.queue
self.context = QueueLocalObject(queue: self.queue, generate: {
return MultiplexedRequestManagerContext(queue: queue, takeWorker: takeWorker)
})
}
func request<T>(to target: MultiplexedRequestTarget, consumerId: Int64, data: (FunctionDescription, Buffer, DeserializeFunctionResponse<T>), tag: MediaResourceFetchTag?) -> Signal<T, MTRpcError> {
func request<T>(to target: MultiplexedRequestTarget, consumerId: Int64, data: (FunctionDescription, Buffer, DeserializeFunctionResponse<T>), tag: MediaResourceFetchTag?, continueInBackground: Bool) -> Signal<T, MTRpcError> {
return Signal { subscriber in
let disposable = MetaDisposable()
self.context.with { context in
disposable.set(context.request(to: target, consumerId: consumerId, data: (data.0, data.1, { buffer in
return data.2.parse(buffer)
}), tag: tag, completed: { result in
}), tag: tag, continueInBackground: continueInBackground, completed: { result in
if let result = result as? T {
subscriber.putNext(result)
subscriber.putCompletion()

View File

@ -583,6 +583,7 @@ public final class Network: NSObject, MTRequestMessageServiceDelegate {
private let shouldKeepConnectionDisposable = MetaDisposable()
public let shouldExplicitelyKeepWorkerConnections = Promise<Bool>(false)
public let shouldKeepBackgroundDownloadConnections = Promise<Bool>(false)
public var mockConnectionStatus: ConnectionStatus? {
didSet {
@ -649,7 +650,7 @@ public final class Network: NSObject, MTRequestMessageServiceDelegate {
}))
requestService.delegate = self
self._multiplexedRequestManager = MultiplexedRequestManager(takeWorker: { [weak self] target, tag in
self._multiplexedRequestManager = MultiplexedRequestManager(takeWorker: { [weak self] target, tag, continueInBackground in
if let strongSelf = self {
let datacenterId: Int
let isCdn: Bool
@ -662,7 +663,7 @@ public final class Network: NSObject, MTRequestMessageServiceDelegate {
datacenterId = id
isCdn = true
}
return strongSelf.makeWorker(datacenterId: datacenterId, isCdn: isCdn, isMedia: isMedia, tag: tag)
return strongSelf.makeWorker(datacenterId: datacenterId, isCdn: isCdn, isMedia: isMedia, tag: tag, continueInBackground: continueInBackground)
}
return nil
})
@ -717,10 +718,11 @@ public final class Network: NSObject, MTRequestMessageServiceDelegate {
return self.worker(datacenterId: self.datacenterId, isCdn: false, isMedia: false, tag: nil)
}
private func makeWorker(datacenterId: Int, isCdn: Bool, isMedia: Bool, tag: MediaResourceFetchTag?) -> Download {
let shouldKeepWorkerConnection: Signal<Bool, NoError> = combineLatest(self.shouldKeepConnection.get(), self.shouldExplicitelyKeepWorkerConnections.get())
|> map { shouldKeepConnection, shouldExplicitelyKeepWorkerConnections -> Bool in
return shouldKeepConnection || shouldExplicitelyKeepWorkerConnections
private func makeWorker(datacenterId: Int, isCdn: Bool, isMedia: Bool, tag: MediaResourceFetchTag?, continueInBackground: Bool = false) -> Download {
let queue = Queue.mainQueue()
let shouldKeepWorkerConnection: Signal<Bool, NoError> = combineLatest(queue: queue, self.shouldKeepConnection.get(), self.shouldExplicitelyKeepWorkerConnections.get(), self.shouldKeepBackgroundDownloadConnections.get())
|> map { shouldKeepConnection, shouldExplicitelyKeepWorkerConnections, shouldKeepBackgroundDownloadConnections -> Bool in
return shouldKeepConnection || shouldExplicitelyKeepWorkerConnections || (continueInBackground && shouldKeepBackgroundDownloadConnections)
}
|> distinctUntilChanged
return Download(queue: self.queue, datacenterId: datacenterId, isMedia: isMedia, isCdn: isCdn, context: self.context, masterDatacenterId: self.datacenterId, usageInfo: usageCalculationInfo(basePath: self.basePath, category: (tag as? TelegramMediaResourceFetchTag)?.statsCategory), shouldKeepConnection: shouldKeepWorkerConnection)

View File

@ -103,7 +103,7 @@ func mediaContentToUpload(network: Network, postbox: Postbox, auxiliaryMethods:
} else {
mediaReference = .savedGif(media: file)
}
return revalidateMediaResourceReference(postbox: postbox, network: network, revalidationContext: revalidationContext, info: TelegramCloudMediaResourceFetchInfo(reference: mediaReference.resourceReference(file.resource), preferBackgroundReferenceRevalidation: false), resource: resource)
return revalidateMediaResourceReference(postbox: postbox, network: network, revalidationContext: revalidationContext, info: TelegramCloudMediaResourceFetchInfo(reference: mediaReference.resourceReference(file.resource), preferBackgroundReferenceRevalidation: false, continueInBackground: false), resource: resource)
|> mapError { _ -> PendingMessageUploadError in
return .generic
}

View File

@ -324,6 +324,10 @@ func fetchRemoteMessage(postbox: Postbox, source: FetchMessageHistoryHoleSource,
}
}
updatePeers(transaction: transaction, peers: Array(peers.values), update: { _, updated in
return updated
})
var renderedMessages: [Message] = []
for message in messages {
if let message = StoreMessage(apiMessage: message), case let .Id(updatedId) = message.id {