import Foundation import SwiftSignalKit import ManagedFile import RangeSet private final class ResourceStatusContext { var status: MediaResourceStatus? let subscribers = Bag<(MediaResourceStatus) -> Void>() let disposable: Disposable init(disposable: Disposable) { self.disposable = disposable } } private final class ResourceDataContext { var data: MediaResourceData var processedFetch: Bool = false let progresiveDataSubscribers = Bag<(waitUntilFetchStatus: Bool, sink: (MediaResourceData) -> Void)>() let completeDataSubscribers = Bag<(waitUntilFetchStatus: Bool, sink: (MediaResourceData) -> Void)>() var fetchDisposable: Disposable? let fetchSubscribers = Bag() init(data: MediaResourceData) { self.data = data } } public enum ResourceDataRangeMode { case complete case incremental case partial } public enum FetchResourceSourceType { case local case remote } public enum FetchResourceError { case generic } public struct ResourceStorePaths { public let partial: String public let complete: String } public struct MediaResourceData: Equatable { public let path: String public let offset: Int64 public let size: Int64 public let complete: Bool public init(path: String, offset: Int64, size: Int64, complete: Bool) { self.path = path self.offset = offset self.size = size self.complete = complete } } public protocol MediaResourceDataFetchCopyLocalItem { func copyTo(url: URL) -> Bool } public enum MediaBoxFetchPriority: Int32 { case `default` = 0 case elevated = 1 case maximum = 2 } public enum MediaResourceDataFetchResult { case dataPart(resourceOffset: Int64, data: Data, range: Range, complete: Bool) case resourceSizeUpdated(Int64) case progressUpdated(Float) case replaceHeader(data: Data, range: Range) case moveLocalFile(path: String) case moveTempFile(file: TempBoxFile) case copyLocalItem(MediaResourceDataFetchCopyLocalItem) case reset } public enum MediaResourceDataFetchError { case generic } public enum CachedMediaResourceRepresentationResult { case reset case data(Data) case done case temporaryPath(String) case tempFile(TempBoxFile) } public enum CachedMediaRepresentationKeepDuration { case general case shortLived } private struct CachedMediaResourceRepresentationKey: Hashable { let resourceId: String? let representation: String } private final class CachedMediaResourceRepresentationSubscriber { let update: (MediaResourceData) -> Void let onlyComplete: Bool init(update: @escaping (MediaResourceData) -> Void, onlyComplete: Bool) { self.update = update self.onlyComplete = onlyComplete } } private final class CachedMediaResourceRepresentationContext { var currentData: MediaResourceData? let dataSubscribers = Bag() let disposable = MetaDisposable() var initialized = false } public enum ResourceDataRequestOption { case complete(waitUntilFetchStatus: Bool) case incremental(waitUntilFetchStatus: Bool) } private final class MediaBoxKeepResourceContext { let subscribers = Bag() var isEmpty: Bool { return self.subscribers.isEmpty } } public final class MediaBox { public let basePath: String private let statusQueue = Queue() private let concurrentQueue = Queue.concurrentDefaultQueue() private let dataQueue = Queue(name: "MediaBox-Data") private let dataFileManager: MediaBoxFileManager private let cacheQueue = Queue() private let timeBasedCleanup: TimeBasedCleanup public let storageBox: StorageBox public let cacheStorageBox: StorageBox private let didRemoveResourcesPipe = ValuePipe() public var didRemoveResources: Signal { return .single(Void()) |> then(self.didRemoveResourcesPipe.signal()) } private var statusContexts: [MediaResourceId: ResourceStatusContext] = [:] private var cachedRepresentationContexts: [CachedMediaResourceRepresentationKey: CachedMediaResourceRepresentationContext] = [:] private var fileContexts: [MediaResourceId: MediaBoxFileContext] = [:] private var keepResourceContexts: [MediaResourceId: MediaBoxKeepResourceContext] = [:] private var wrappedFetchResource = Promise<(MediaResource, Signal<[(Range, MediaBoxFetchPriority)], NoError>, MediaResourceFetchParameters?) -> Signal>() public var fetchResource: ((MediaResource, Signal<[(Range, MediaBoxFetchPriority)], NoError>, MediaResourceFetchParameters?) -> Signal)? { didSet { if let fetchResource = self.fetchResource { wrappedFetchResource.set(.single(fetchResource)) } else { wrappedFetchResource.set(.never()) } } } public var wrappedFetchCachedResourceRepresentation = Promise<(MediaResource, CachedMediaResourceRepresentation) -> Signal>() public var fetchCachedResourceRepresentation: ((MediaResource, CachedMediaResourceRepresentation) -> Signal)? { didSet { if let fetchCachedResourceRepresentation = self.fetchCachedResourceRepresentation { wrappedFetchCachedResourceRepresentation.set(.single(fetchCachedResourceRepresentation)) } else { wrappedFetchCachedResourceRepresentation.set(.never()) } } } lazy var ensureDirectoryCreated: Void = { let _ = try? FileManager.default.createDirectory(atPath: self.basePath, withIntermediateDirectories: true, attributes: nil) let _ = try? FileManager.default.createDirectory(atPath: self.basePath + "/cache", withIntermediateDirectories: true, attributes: nil) let _ = try? FileManager.default.createDirectory(atPath: self.basePath + "/short-cache", withIntermediateDirectories: true, attributes: nil) }() public init(basePath: String) { self.basePath = basePath self.storageBox = StorageBox(logger: StorageBox.Logger(impl: { string in postboxLog(string) }), basePath: basePath + "/storage") self.cacheStorageBox = StorageBox(logger: StorageBox.Logger(impl: { string in postboxLog(string) }), basePath: basePath + "/cache-storage") self.timeBasedCleanup = TimeBasedCleanup(storageBox: self.storageBox, generalPaths: [ self.basePath + "/cache", self.basePath + "/animation-cache" ], totalSizeBasedPath: self.basePath, shortLivedPaths: [ self.basePath + "/short-cache" ]) self.dataFileManager = MediaBoxFileManager(queue: self.dataQueue) let _ = self.ensureDirectoryCreated //self.updateResourceIndex() /*#if DEBUG self.dataQueue.async { for _ in 0 ..< 5 { let tempFile = TempBox.shared.tempFile(fileName: "file") print("MediaBox test: file \(tempFile.path)") let queue2 = Queue.concurrentDefaultQueue() if let fileContext = MediaBoxFileContextV2Impl(queue: self.dataQueue, manager: self.dataFileManager, storageBox: self.storageBox, resourceId: tempFile.path.data(using: .utf8)!, path: tempFile.path + "_complete", partialPath: tempFile.path + "_partial", metaPath: tempFile.path + "_partial" + ".meta") { let _ = fileContext.fetched( range: 0 ..< Int64.max, priority: .default, fetch: { ranges in return ranges |> filter { !$0.isEmpty } |> take(1) |> castError(MediaResourceDataFetchError.self) |> mapToSignal { _ in return Signal { subscriber in queue2.async { subscriber.putNext(.resourceSizeUpdated(524288)) } queue2.async { subscriber.putNext(.resourceSizeUpdated(393216)) } queue2.async { subscriber.putNext(.resourceSizeUpdated(655360)) } queue2.async { subscriber.putNext(.resourceSizeUpdated(169608)) } queue2.async { subscriber.putNext(.dataPart(resourceOffset: 131072, data: Data(repeating: 0xbb, count: 38536), range: 0 ..< 38536, complete: true)) } queue2.async { subscriber.putNext(.dataPart(resourceOffset: 0, data: Data(repeating: 0xaa, count: 131072), range: 0 ..< 131072, complete: false)) } return EmptyDisposable } } }, error: { _ in }, completed: { assert(try! Data(contentsOf: URL(fileURLWithPath: tempFile.path + "_complete")) == Data(repeating: 0xaa, count: 131072) + Data(repeating: 0xbb, count: 38536)) let _ = fileContext.addReference() } ) } } } #endif*/ } public func setMaxStoreTimes(general: Int32, shortLived: Int32, gigabytesLimit: Int32) { self.timeBasedCleanup.setMaxStoreTimes(general: general, shortLived: shortLived, gigabytesLimit: gigabytesLimit) } public static func idForFileName(name: String) -> String { if name.hasSuffix("_partial.meta") { return String(name[name.startIndex ..< name.index(name.endIndex, offsetBy: -13)]) } else if name.hasSuffix("_partial") { return String(name[name.startIndex ..< name.index(name.endIndex, offsetBy: -8)]) } else { return name } } private func fileNameForId(_ id: MediaResourceId) -> String { return "\(id.stringRepresentation)" } private func fileNameForId(_ id: String) -> String { return "\(id)" } private func pathForId(_ id: MediaResourceId) -> String { return "\(self.basePath)/\(fileNameForId(id))" } public func storePathsForId(_ id: MediaResourceId) -> ResourceStorePaths { return ResourceStorePaths(partial: "\(self.basePath)/\(fileNameForId(id))_partial", complete: "\(self.basePath)/\(fileNameForId(id))") } public func fileSizeForId(_ id: MediaResourceId) -> Int64 { let paths = self.storePathsForId(id) if let size = fileSize(paths.complete, useTotalFileAllocatedSize: false) { return size } else if let size = fileSize(paths.partial, useTotalFileAllocatedSize: true) { return size } else { return 0 } } private func fileNamesForId(_ id: MediaResourceId) -> ResourceStorePaths { return ResourceStorePaths(partial: "\(fileNameForId(id))_partial", complete: "\(fileNameForId(id))") } private func cachedRepresentationPathsForId(_ id: String, representationId: String, keepDuration: CachedMediaRepresentationKeepDuration) -> ResourceStorePaths { let cacheString: String switch keepDuration { case .general: cacheString = "cache" case .shortLived: cacheString = "short-cache" } return ResourceStorePaths(partial: "\(self.basePath)/\(cacheString)/\(fileNameForId(id))_partial:\(representationId)", complete: "\(self.basePath)/\(cacheString)/\(fileNameForId(id)):\(representationId)") } public func cachedRepresentationPathForId(_ id: String, representationId: String, keepDuration: CachedMediaRepresentationKeepDuration) -> String { let cacheString: String switch keepDuration { case .general: cacheString = "cache" case .shortLived: cacheString = "short-cache" } return "\(self.basePath)/\(cacheString)/\(fileNameForId(id)):\(representationId)" } public func cachedRepresentationCompletePath(_ id: MediaResourceId, representation: CachedMediaResourceRepresentation) -> String { let cacheString: String switch representation.keepDuration { case .general: cacheString = "cache" case .shortLived: cacheString = "short-cache" } return "\(self.basePath)/\(cacheString)/\(fileNameForId(id)):\(representation.uniqueId)" } public func shortLivedResourceCachePathPrefix(_ id: MediaResourceId) -> String { let cacheString = "short-cache" return "\(self.basePath)/\(cacheString)/\(fileNameForId(id))" } public func storeResourceData(_ id: MediaResourceId, data: Data, synchronous: Bool = false) { let begin = { let paths = self.storePathsForId(id) let _ = try? data.write(to: URL(fileURLWithPath: paths.complete), options: [.atomic]) } if synchronous { begin() } else { self.dataQueue.async(begin) } } public func moveResourceData(_ id: MediaResourceId, fromTempPath: String) { self.dataQueue.async { let paths = self.storePathsForId(id) let _ = try? FileManager.default.moveItem(at: URL(fileURLWithPath: fromTempPath), to: URL(fileURLWithPath: paths.complete)) } } public func copyResourceData(_ id: MediaResourceId, fromTempPath: String) { self.dataQueue.async { let paths = self.storePathsForId(id) let _ = try? FileManager.default.copyItem(at: URL(fileURLWithPath: fromTempPath), to: URL(fileURLWithPath: paths.complete)) } } public func moveResourceData(from: MediaResourceId, to: MediaResourceId) { if from == to { return } self.dataQueue.async { let pathsFrom = self.storePathsForId(from) let pathsTo = self.storePathsForId(to) link(pathsFrom.partial, pathsTo.partial) link(pathsFrom.complete, pathsTo.complete) unlink(pathsFrom.partial) unlink(pathsFrom.complete) } } public func copyResourceData(from: MediaResourceId, to: MediaResourceId, synchronous: Bool = false) { if from == to { return } let begin = { let pathsFrom = self.storePathsForId(from) let pathsTo = self.storePathsForId(to) let _ = try? FileManager.default.copyItem(atPath: pathsFrom.partial, toPath: pathsTo.partial) let _ = try? FileManager.default.copyItem(atPath: pathsFrom.complete, toPath: pathsTo.complete) } if synchronous { begin() } else { self.dataQueue.async(begin) } } public func resourceStatus(_ resource: MediaResource, approximateSynchronousValue: Bool = false) -> Signal { return self.resourceStatus(resource.id, resourceSize: resource.size, approximateSynchronousValue: approximateSynchronousValue) } public func resourceStatus(_ resourceId: MediaResourceId, resourceSize: Int64?, approximateSynchronousValue: Bool = false) -> Signal { let signal = Signal { subscriber in let disposable = MetaDisposable() self.concurrentQueue.async { let paths = self.storePathsForId(resourceId) if let _ = fileSize(paths.complete) { self.timeBasedCleanup.touch(paths: [ paths.complete ]) subscriber.putNext(.Local) subscriber.putCompletion() } else { self.statusQueue.async { let statusContext: ResourceStatusContext var statusUpdateDisposable: MetaDisposable? if let current = self.statusContexts[resourceId] { statusContext = current } else { let statusUpdateDisposableValue = MetaDisposable() statusContext = ResourceStatusContext(disposable: statusUpdateDisposableValue) self.statusContexts[resourceId] = statusContext statusUpdateDisposable = statusUpdateDisposableValue } let index = statusContext.subscribers.add({ status in subscriber.putNext(status) }) if let status = statusContext.status { subscriber.putNext(status) } if let statusUpdateDisposable = statusUpdateDisposable { let statusQueue = self.statusQueue self.dataQueue.async { if let (fileContext, releaseContext) = self.fileContext(for: resourceId) { let statusDisposable = fileContext.status(next: { [weak statusContext] value in statusQueue.async { if let current = self.statusContexts[resourceId], current === statusContext, current.status != value { current.status = value for subscriber in current.subscribers.copyItems() { subscriber(value) } } } }, completed: { [weak statusContext] in statusQueue.async { if let current = self.statusContexts[resourceId], current === statusContext { current.subscribers.remove(index) if current.subscribers.isEmpty { self.statusContexts.removeValue(forKey: resourceId) current.disposable.dispose() } } } }, size: resourceSize) statusUpdateDisposable.set(ActionDisposable { statusDisposable.dispose() releaseContext() }) } } } disposable.set(ActionDisposable { [weak statusContext] in self.statusQueue.async { if let current = self.statusContexts[resourceId], current === statusContext { current.subscribers.remove(index) if current.subscribers.isEmpty { self.statusContexts.removeValue(forKey: resourceId) current.disposable.dispose() } } } }) } } } return disposable } if approximateSynchronousValue { return Signal, NoError> { subscriber in let paths = self.storePathsForId(resourceId) if let _ = fileSize(paths.complete) { subscriber.putNext(.single(.Local)) } else if let size = fileSize(paths.partial), size == resourceSize { subscriber.putNext(.single(.Local)) } else { subscriber.putNext(.single(.Remote(progress: 0.0)) |> then(signal)) } subscriber.putCompletion() return EmptyDisposable } |> switchToLatest } else { return signal } } public func resourcePath(_ resource: MediaResource) -> String { let paths = self.storePathsForId(resource.id) return paths.complete } public func completedResourcePath(_ resource: MediaResource, pathExtension: String? = nil) -> String? { return self.completedResourcePath(id: resource.id, pathExtension: pathExtension) } public func completedResourcePath(id: MediaResourceId, pathExtension: String? = nil) -> String? { let paths = self.storePathsForId(id) if let _ = fileSize(paths.complete) { self.timeBasedCleanup.touch(paths: [ paths.complete ]) if let pathExtension = pathExtension { let symlinkPath = paths.complete + ".\(pathExtension)" if fileSize(symlinkPath) == nil { let _ = try? FileManager.default.linkItem(atPath: paths.complete, toPath: symlinkPath) } return symlinkPath } else { return paths.complete } } else { return nil } } public func resourceData(_ resource: MediaResource, pathExtension: String? = nil, option: ResourceDataRequestOption = .complete(waitUntilFetchStatus: false), attemptSynchronously: Bool = false) -> Signal { return self.resourceData(id: resource.id, pathExtension: pathExtension, option: option, attemptSynchronously: attemptSynchronously) } public func resourceData(id: MediaResourceId, pathExtension: String? = nil, option: ResourceDataRequestOption = .complete(waitUntilFetchStatus: false), attemptSynchronously: Bool = false) -> Signal { return Signal { subscriber in let disposable = MetaDisposable() let begin: () -> Void = { let paths = self.storePathsForId(id) if let completeSize = fileSize(paths.complete) { self.timeBasedCleanup.touch(paths: [ paths.complete ]) if let pathExtension = pathExtension { let symlinkPath = paths.complete + ".\(pathExtension)" if fileSize(symlinkPath) == nil { let _ = try? FileManager.default.linkItem(atPath: paths.complete, toPath: symlinkPath) } subscriber.putNext(MediaResourceData(path: symlinkPath, offset: 0, size: completeSize, complete: true)) subscriber.putCompletion() } else { subscriber.putNext(MediaResourceData(path: paths.complete, offset: 0, size: completeSize, complete: true)) subscriber.putCompletion() } } else { if attemptSynchronously, case .complete(false) = option { subscriber.putNext(MediaResourceData(path: paths.partial, offset: 0, size: fileSize(paths.partial) ?? 0, complete: false)) } self.dataQueue.async { if let (fileContext, releaseContext) = self.fileContext(for: id) { let waitUntilAfterInitialFetch: Bool switch option { case let .complete(waitUntilFetchStatus): waitUntilAfterInitialFetch = waitUntilFetchStatus case let .incremental(waitUntilFetchStatus): waitUntilAfterInitialFetch = waitUntilFetchStatus } let dataDisposable = fileContext.data(range: 0 ..< Int64.max, waitUntilAfterInitialFetch: waitUntilAfterInitialFetch, next: { value in self.dataQueue.async { if value.complete { if let pathExtension = pathExtension { let symlinkPath = paths.complete + ".\(pathExtension)" if fileSize(symlinkPath) == nil { let _ = try? FileManager.default.linkItem(atPath: paths.complete, toPath: symlinkPath) } subscriber.putNext(MediaResourceData(path: symlinkPath, offset: 0, size: value.size, complete: true)) } else { subscriber.putNext(value) } subscriber.putCompletion() } else { subscriber.putNext(value) } } }) disposable.set(ActionDisposable { dataDisposable.dispose() releaseContext() }) } } } } if attemptSynchronously { begin() } else { self.concurrentQueue.async(begin) } return disposable } } private func fileContext(for id: MediaResourceId) -> (MediaBoxFileContext, () -> Void)? { assert(self.dataQueue.isCurrent()) let resourceId = id var context: MediaBoxFileContext? if let current = self.fileContexts[resourceId] { context = current } else { let paths = self.storePathsForId(id) self.timeBasedCleanup.touch(paths: [ paths.complete, paths.partial, paths.partial + ".meta" ]) #if true if let fileContext = MediaBoxFileContextV2Impl(queue: self.dataQueue, manager: self.dataFileManager, storageBox: self.storageBox, resourceId: id.stringRepresentation.data(using: .utf8)!, path: paths.complete, partialPath: paths.partial, metaPath: paths.partial + ".meta") { context = fileContext self.fileContexts[resourceId] = fileContext } else { return nil } #else if let fileContext = MediaBoxFileContextImpl(queue: self.dataQueue, manager: self.dataFileManager, storageBox: self.storageBox, resourceId: id.stringRepresentation.data(using: .utf8)!, path: paths.complete, partialPath: paths.partial, metaPath: paths.partial + ".meta") { context = fileContext self.fileContexts[resourceId] = fileContext } else { return nil } #endif } if let context = context { let index = context.addReference() let queue = self.dataQueue return (context, { [weak self, weak context] in queue.async { guard let strongSelf = self, let previousContext = context, let context = strongSelf.fileContexts[resourceId], context === previousContext else { return } context.removeReference(index) if context.isEmpty { strongSelf.fileContexts.removeValue(forKey: resourceId) } } }) } else { return nil } } public func fetchedResourceData(_ resource: MediaResource, in range: Range, priority: MediaBoxFetchPriority = .default, parameters: MediaResourceFetchParameters?) -> Signal { return Signal { subscriber in let disposable = MetaDisposable() self.dataQueue.async { let paths = self.storePathsForId(resource.id) if let _ = fileSize(paths.complete) { subscriber.putCompletion() return } if let parameters = parameters, let location = parameters.location { var messageNamespace: Int32 = 0 var messageIdValue: Int32 = 0 if let messageId = location.messageId { messageNamespace = messageId.namespace messageIdValue = messageId.id } self.storageBox.add(reference: StorageBox.Reference(peerId: location.peerId.toInt64(), messageNamespace: UInt8(clamping: messageNamespace), messageId: messageIdValue), to: resource.id.stringRepresentation.data(using: .utf8)!, contentType: parameters.contentType.rawValue) } else { self.storageBox.add(reference: StorageBox.Reference(peerId: 0, messageNamespace: 0, messageId: 0), to: resource.id.stringRepresentation.data(using: .utf8)!, contentType: parameters?.contentType.rawValue ?? 0) } guard let (fileContext, releaseContext) = self.fileContext(for: resource.id) else { subscriber.putCompletion() return } var range = range if let parameters = parameters, !parameters.isRandomAccessAllowed { range = 0 ..< range.upperBound } let fetchResource = self.wrappedFetchResource.get() let fetchedDisposable = fileContext.fetched(range: range.lowerBound ..< range.upperBound, priority: priority, fetch: { intervals in return fetchResource |> castError(MediaResourceDataFetchError.self) |> mapToSignal { fetch in return fetch(resource, intervals, parameters) } }, error: { _ in subscriber.putCompletion() }, completed: { subscriber.putCompletion() }) disposable.set(ActionDisposable { fetchedDisposable.dispose() releaseContext() }) } return disposable } } public func resourceData(_ resource: MediaResource, size: Int64, in range: Range, mode: ResourceDataRangeMode = .complete, notifyAboutIncomplete: Bool = false, attemptSynchronously: Bool = false) -> Signal<(Data, Bool), NoError> { return self.resourceData(id: resource.id, size: size, in: range, mode: mode, notifyAboutIncomplete: notifyAboutIncomplete, attemptSynchronously: attemptSynchronously) } public func resourceData(id: MediaResourceId, size: Int64, in range: Range, mode: ResourceDataRangeMode = .complete, notifyAboutIncomplete: Bool = false, attemptSynchronously: Bool = false) -> Signal<(Data, Bool), NoError> { return Signal { subscriber in let disposable = MetaDisposable() if attemptSynchronously { let paths = self.storePathsForId(id) if let completeSize = fileSize(paths.complete) { self.timeBasedCleanup.touch(paths: [ paths.complete ]) if let file = ManagedFile(queue: nil, path: paths.complete, mode: .read) { let clippedLowerBound = min(completeSize, max(0, range.lowerBound)) let clippedUpperBound = min(completeSize, max(0, range.upperBound)) if clippedLowerBound < clippedUpperBound && (clippedUpperBound - clippedLowerBound) <= 64 * 1024 * 1024 { let _ = file.seek(position: clippedLowerBound) let data = file.readData(count: Int(clippedUpperBound - clippedLowerBound)) subscriber.putNext((data, true)) } else { subscriber.putNext((Data(), isComplete: true)) } subscriber.putCompletion() return EmptyDisposable } else { let tempManager = MediaBoxFileManager(queue: nil) let data = withExtendedLifetime(tempManager, { return MediaBoxPartialFile.extractPartialData(manager: tempManager, path: paths.partial, metaPath: paths.partial + ".meta", range: range) }) if let data = data { subscriber.putNext((data, true)) subscriber.putCompletion() return EmptyDisposable } } } } self.dataQueue.async { guard let (fileContext, releaseContext) = self.fileContext(for: id) else { subscriber.putCompletion() return } let dataDisposable = fileContext.data(range: range, waitUntilAfterInitialFetch: false, next: { result in if let file = ManagedFile(queue: self.dataQueue, path: result.path, mode: .read), let fileSize = file.getSize() { if result.complete { let clippedLowerBound = min(result.offset, fileSize) let clippedUpperBound = min(result.offset + result.size, fileSize) if clippedUpperBound == clippedLowerBound { subscriber.putNext((Data(), true)) subscriber.putCompletion() } else if clippedUpperBound <= fileSize && (clippedUpperBound - clippedLowerBound) <= 64 * 1024 * 1024 { let _ = file.seek(position: Int64(clippedLowerBound)) let resultData = file.readData(count: Int(clippedUpperBound - clippedLowerBound)) subscriber.putNext((resultData, true)) subscriber.putCompletion() } else { assertionFailure() } } else { switch mode { case .complete, .incremental: if notifyAboutIncomplete { subscriber.putNext((Data(), false)) } case .partial: subscriber.putNext((Data(), false)) } } } else { switch mode { case .complete, .incremental: if notifyAboutIncomplete { subscriber.putNext((Data(), false)) } case .partial: subscriber.putNext((Data(), false)) } } }) disposable.set(ActionDisposable { dataDisposable.dispose() releaseContext() }) } return disposable } } public func resourceRangesStatus(_ resource: MediaResource) -> Signal, NoError> { return Signal { subscriber in let disposable = MetaDisposable() self.dataQueue.async { guard let (fileContext, releaseContext) = self.fileContext(for: resource.id) else { subscriber.putCompletion() return } let statusDisposable = fileContext.rangeStatus(next: { result in subscriber.putNext(result) }, completed: { subscriber.putCompletion() }) disposable.set(ActionDisposable { statusDisposable.dispose() releaseContext() }) } return disposable } } public func fetchedResource(_ resource: MediaResource, parameters: MediaResourceFetchParameters?, implNext: Bool = false) -> Signal { return Signal { subscriber in let disposable = MetaDisposable() self.dataQueue.async { let paths = self.storePathsForId(resource.id) if let parameters = parameters, let location = parameters.location { var messageNamespace: Int32 = 0 var messageIdValue: Int32 = 0 if let messageId = location.messageId { messageNamespace = messageId.namespace messageIdValue = messageId.id } self.storageBox.add(reference: StorageBox.Reference(peerId: location.peerId.toInt64(), messageNamespace: UInt8(clamping: messageNamespace), messageId: messageIdValue), to: resource.id.stringRepresentation.data(using: .utf8)!, contentType: parameters.contentType.rawValue) } else { self.storageBox.add(reference: StorageBox.Reference(peerId: 0, messageNamespace: 0, messageId: 0), to: resource.id.stringRepresentation.data(using: .utf8)!, contentType: parameters?.contentType.rawValue ?? 0) } if let _ = fileSize(paths.complete) { if implNext { subscriber.putNext(.local) } subscriber.putCompletion() } else { if let (fileContext, releaseContext) = self.fileContext(for: resource.id) { let fetchResource = self.wrappedFetchResource.get() let fetchedDisposable = fileContext.fetchedFullRange(fetch: { ranges in return fetchResource |> castError(MediaResourceDataFetchError.self) |> mapToSignal { fetch in return fetch(resource, ranges, parameters) } }, error: { _ in subscriber.putError(.generic) }, completed: { if implNext { subscriber.putNext(.remote) } subscriber.putCompletion() }) disposable.set(ActionDisposable { fetchedDisposable.dispose() releaseContext() }) } } } return disposable } } public func keepResource(id: MediaResourceId) -> Signal { return Signal { subscriber in let disposable = MetaDisposable() let dataQueue = self.dataQueue self.dataQueue.async { let context: MediaBoxKeepResourceContext if let current = self.keepResourceContexts[id] { context = current } else { context = MediaBoxKeepResourceContext() self.keepResourceContexts[id] = context } let index = context.subscribers.add(Void()) disposable.set(ActionDisposable { [weak self, weak context] in dataQueue.async { guard let strongSelf = self, let context = context, let currentContext = strongSelf.keepResourceContexts[id], currentContext === context else { return } currentContext.subscribers.remove(index) if currentContext.isEmpty { strongSelf.keepResourceContexts.removeValue(forKey: id) } } }) } return disposable } } public func cancelInteractiveResourceFetch(_ resource: MediaResource) { self.cancelInteractiveResourceFetch(resourceId: resource.id) } public func cancelInteractiveResourceFetch(resourceId: MediaResourceId) { self.dataQueue.async { if let (fileContext, releaseContext) = self.fileContext(for: resourceId) { fileContext.cancelFullRangeFetches() releaseContext() } } } public func storeCachedResourceRepresentation(_ resource: MediaResource, representation: CachedMediaResourceRepresentation, data: Data) { self.dataQueue.async { let path = self.cachedRepresentationPathsForId(resource.id.stringRepresentation, representationId: representation.uniqueId, keepDuration: representation.keepDuration).complete if let pathData = path.data(using: .utf8) { self.cacheStorageBox.update(id: pathData, size: Int64(data.count)) } let _ = try? data.write(to: URL(fileURLWithPath: path)) } } public func storeCachedResourceRepresentation(_ resource: MediaResource, representationId: String, keepDuration: CachedMediaRepresentationKeepDuration, data: Data, completion: @escaping (String) -> Void = { _ in }) { self.dataQueue.async { let path = self.cachedRepresentationPathsForId(resource.id.stringRepresentation, representationId: representationId, keepDuration: keepDuration).complete if let pathData = path.data(using: .utf8) { self.cacheStorageBox.update(id: pathData, size: Int64(data.count)) } let _ = try? data.write(to: URL(fileURLWithPath: path)) completion(path) } } public func storeCachedResourceRepresentation(_ resourceId: String, representationId: String, keepDuration: CachedMediaRepresentationKeepDuration, data: Data, completion: @escaping (String) -> Void = { _ in }) { self.dataQueue.async { let path = self.cachedRepresentationPathsForId(resourceId, representationId: representationId, keepDuration: keepDuration).complete let _ = try? data.write(to: URL(fileURLWithPath: path)) if let pathData = path.data(using: .utf8) { self.cacheStorageBox.update(id: pathData, size: Int64(data.count)) } completion(path) } } public func storeCachedResourceRepresentation(_ resourceId: String, representationId: String, keepDuration: CachedMediaRepresentationKeepDuration, tempFile: TempBoxFile, completion: @escaping (String) -> Void = { _ in }) { self.dataQueue.async { let path = self.cachedRepresentationPathsForId(resourceId, representationId: representationId, keepDuration: keepDuration).complete let _ = try? FileManager.default.moveItem(atPath: tempFile.path, toPath: path) if let fileSize = fileSize(path), fileSize != 0, let pathData = path.data(using: .utf8) { self.cacheStorageBox.update(id: pathData, size: fileSize) } completion(path) } } public func cachedResourceRepresentation(_ resource: MediaResource, representation: CachedMediaResourceRepresentation, pathExtension: String? = nil, complete: Bool, fetch: Bool = true, attemptSynchronously: Bool = false) -> Signal { return Signal { subscriber in let disposable = MetaDisposable() let begin: () -> Void = { let paths = self.cachedRepresentationPathsForId(resource.id.stringRepresentation, representationId: representation.uniqueId, keepDuration: representation.keepDuration) if let size = fileSize(paths.complete) { self.timeBasedCleanup.touch(paths: [ paths.complete ]) if let pathExtension = pathExtension { let symlinkPath = paths.complete + ".\(pathExtension)" if fileSize(symlinkPath) == nil { let _ = try? FileManager.default.linkItem(atPath: paths.complete, toPath: symlinkPath) } subscriber.putNext(MediaResourceData(path: symlinkPath, offset: 0, size: size, complete: true)) subscriber.putCompletion() } else { subscriber.putNext(MediaResourceData(path: paths.complete, offset: 0, size: size, complete: true)) subscriber.putCompletion() } } else if fetch { if attemptSynchronously && complete { subscriber.putNext(MediaResourceData(path: paths.partial, offset: 0, size: 0, complete: false)) } self.dataQueue.async { let key = CachedMediaResourceRepresentationKey(resourceId: resource.id.stringRepresentation, representation: representation.uniqueId) let context: CachedMediaResourceRepresentationContext if let currentContext = self.cachedRepresentationContexts[key] { context = currentContext } else { context = CachedMediaResourceRepresentationContext() self.cachedRepresentationContexts[key] = context } let index = context.dataSubscribers.add(CachedMediaResourceRepresentationSubscriber(update: { data in if !complete || data.complete { if let pathExtension = pathExtension, data.complete { let symlinkPath = data.path + ".\(pathExtension)" if fileSize(symlinkPath) == nil { let _ = try? FileManager.default.linkItem(atPath: data.path, toPath: symlinkPath) } subscriber.putNext(MediaResourceData(path: symlinkPath, offset: data.offset, size: data.size, complete: data.complete)) } else { subscriber.putNext(data) } } if data.complete { subscriber.putCompletion() } }, onlyComplete: complete)) if let currentData = context.currentData { if !complete || currentData.complete { subscriber.putNext(currentData) } if currentData.complete { subscriber.putCompletion() } } else if !complete { subscriber.putNext(MediaResourceData(path: paths.partial, offset: 0, size: 0, complete: false)) } disposable.set(ActionDisposable { [weak context] in self.dataQueue.async { if let currentContext = self.cachedRepresentationContexts[key], currentContext === context { currentContext.dataSubscribers.remove(index) if currentContext.dataSubscribers.isEmpty { currentContext.disposable.dispose() self.cachedRepresentationContexts.removeValue(forKey: key) } } } }) if !context.initialized { context.initialized = true let cacheStorageBox = self.cacheStorageBox let signal = self.wrappedFetchCachedResourceRepresentation.get() |> take(1) |> mapToSignal { fetch in return fetch(resource, representation) |> map(Optional.init) } |> deliverOn(self.dataQueue) context.disposable.set(signal.start(next: { [weak self, weak context] next in guard let strongSelf = self else { return } if let next = next { var isDone = false switch next { case let .temporaryPath(temporaryPath): rename(temporaryPath, paths.complete) if let size = fileSize(paths.complete), let pathData = paths.complete.data(using: .utf8) { cacheStorageBox.update(id: pathData, size: size) } isDone = true case let .tempFile(tempFile): rename(tempFile.path, paths.complete) TempBox.shared.dispose(tempFile) if let size = fileSize(paths.complete), let pathData = paths.complete.data(using: .utf8) { cacheStorageBox.update(id: pathData, size: size) } isDone = true case .reset: let file = ManagedFile(queue: strongSelf.dataQueue, path: paths.partial, mode: .readwrite) file?.truncate(count: 0) unlink(paths.complete) if let pathData = paths.complete.data(using: .utf8) { cacheStorageBox.update(id: pathData, size: 0) } case let .data(dataPart): let file = ManagedFile(queue: strongSelf.dataQueue, path: paths.partial, mode: .append) let dataCount = dataPart.count dataPart.withUnsafeBytes { rawBytes -> Void in let bytes = rawBytes.baseAddress!.assumingMemoryBound(to: UInt8.self) let _ = file?.write(bytes, count: dataCount) } if let file = file, let size = file.getSize(), let pathData = paths.complete.data(using: .utf8) { cacheStorageBox.update(id: pathData, size: size) } case .done: link(paths.partial, paths.complete) if let size = fileSize(paths.complete), let pathData = paths.complete.data(using: .utf8) { cacheStorageBox.update(id: pathData, size: size) } isDone = true } if let strongSelf = self, let currentContext = strongSelf.cachedRepresentationContexts[key], currentContext === context { if isDone { currentContext.disposable.dispose() strongSelf.cachedRepresentationContexts.removeValue(forKey: key) } if let size = fileSize(paths.complete) { let data = MediaResourceData(path: paths.complete, offset: 0, size: size, complete: isDone) currentContext.currentData = data for subscriber in currentContext.dataSubscribers.copyItems() { if !subscriber.onlyComplete || isDone { subscriber.update(data) } } } else if let size = fileSize(paths.partial) { let data = MediaResourceData(path: paths.partial, offset: 0, size: size, complete: isDone) currentContext.currentData = data for subscriber in currentContext.dataSubscribers.copyItems() { if !subscriber.onlyComplete || isDone { subscriber.update(data) } } } } } else { if let strongSelf = self, let context = strongSelf.cachedRepresentationContexts[key] { let data = MediaResourceData(path: paths.partial, offset: 0, size: 0, complete: false) context.currentData = data for subscriber in context.dataSubscribers.copyItems() { if !subscriber.onlyComplete { subscriber.update(data) } } } } })) } } } else { subscriber.putNext(MediaResourceData(path: paths.partial, offset: 0, size: 0, complete: false)) subscriber.putCompletion() } } if attemptSynchronously { begin() } else { self.concurrentQueue.async(begin) } return ActionDisposable { disposable.dispose() } } } public func customResourceData(id: String, baseResourceId: String?, pathExtension: String?, complete: Bool, fetch: (() -> Signal)?, keepDuration: CachedMediaRepresentationKeepDuration, attemptSynchronously: Bool) -> Signal { return Signal { subscriber in let disposable = MetaDisposable() let begin: () -> Void = { let paths: ResourceStorePaths if let baseResourceId = baseResourceId { paths = self.cachedRepresentationPathsForId(MediaResourceId(baseResourceId).stringRepresentation, representationId: id, keepDuration: keepDuration) } else { paths = self.storePathsForId(MediaResourceId(id)) } if let size = fileSize(paths.complete) { self.timeBasedCleanup.touch(paths: [ paths.complete ]) if let pathExtension = pathExtension { let symlinkPath = paths.complete + ".\(pathExtension)" if fileSize(symlinkPath) == nil { let _ = try? FileManager.default.linkItem(atPath: paths.complete, toPath: symlinkPath) } subscriber.putNext(MediaResourceData(path: symlinkPath, offset: 0, size: size, complete: true)) subscriber.putCompletion() } else { subscriber.putNext(MediaResourceData(path: paths.complete, offset: 0, size: size, complete: true)) subscriber.putCompletion() } } else if let fetch = fetch { if attemptSynchronously && complete { subscriber.putNext(MediaResourceData(path: paths.partial, offset: 0, size: 0, complete: false)) } self.dataQueue.async { let key = CachedMediaResourceRepresentationKey(resourceId: baseResourceId, representation: id) let context: CachedMediaResourceRepresentationContext if let currentContext = self.cachedRepresentationContexts[key] { context = currentContext } else { context = CachedMediaResourceRepresentationContext() self.cachedRepresentationContexts[key] = context } let index = context.dataSubscribers.add(CachedMediaResourceRepresentationSubscriber(update: { data in if !complete || data.complete { if let pathExtension = pathExtension, data.complete { let symlinkPath = data.path + ".\(pathExtension)" if fileSize(symlinkPath) == nil { let _ = try? FileManager.default.linkItem(atPath: data.path, toPath: symlinkPath) } subscriber.putNext(MediaResourceData(path: symlinkPath, offset: data.offset, size: data.size, complete: data.complete)) } else { subscriber.putNext(data) } } if data.complete { subscriber.putCompletion() } }, onlyComplete: complete)) if let currentData = context.currentData { if !complete || currentData.complete { subscriber.putNext(currentData) } if currentData.complete { subscriber.putCompletion() } } else if !complete { subscriber.putNext(MediaResourceData(path: paths.partial, offset: 0, size: 0, complete: false)) } disposable.set(ActionDisposable { [weak context] in self.dataQueue.async { if let currentContext = self.cachedRepresentationContexts[key], currentContext === context { currentContext.dataSubscribers.remove(index) if currentContext.dataSubscribers.isEmpty { currentContext.disposable.dispose() self.cachedRepresentationContexts.removeValue(forKey: key) } } } }) if !context.initialized { context.initialized = true let cacheStorageBox = self.cacheStorageBox let signal = fetch() |> deliverOn(self.dataQueue) context.disposable.set(signal.start(next: { [weak self, weak context] next in guard let strongSelf = self else { return } var isDone = false switch next { case let .temporaryPath(temporaryPath): rename(temporaryPath, paths.complete) if let size = fileSize(paths.complete), let pathData = paths.complete.data(using: .utf8) { cacheStorageBox.update(id: pathData, size: size) } isDone = true case let .tempFile(tempFile): rename(tempFile.path, paths.complete) TempBox.shared.dispose(tempFile) if let size = fileSize(paths.complete), let pathData = paths.complete.data(using: .utf8) { cacheStorageBox.update(id: pathData, size: size) } isDone = true case .reset: let file = ManagedFile(queue: strongSelf.dataQueue, path: paths.partial, mode: .readwrite) file?.truncate(count: 0) unlink(paths.complete) if let pathData = paths.complete.data(using: .utf8) { cacheStorageBox.update(id: pathData, size: 0) } case let .data(dataPart): let file = ManagedFile(queue: strongSelf.dataQueue, path: paths.partial, mode: .append) let dataCount = dataPart.count dataPart.withUnsafeBytes { rawBytes -> Void in let bytes = rawBytes.baseAddress!.assumingMemoryBound(to: UInt8.self) let _ = file?.write(bytes, count: dataCount) } if let file = file, let size = file.getSize(), let pathData = paths.complete.data(using: .utf8) { cacheStorageBox.update(id: pathData, size: size) } case .done: link(paths.partial, paths.complete) isDone = true if let size = fileSize(paths.complete), let pathData = paths.complete.data(using: .utf8) { cacheStorageBox.update(id: pathData, size: size) } } if let strongSelf = self, let currentContext = strongSelf.cachedRepresentationContexts[key], currentContext === context { if isDone { currentContext.disposable.dispose() strongSelf.cachedRepresentationContexts.removeValue(forKey: key) } if let size = fileSize(paths.complete) { let data = MediaResourceData(path: paths.complete, offset: 0, size: size, complete: isDone) currentContext.currentData = data for subscriber in currentContext.dataSubscribers.copyItems() { if !subscriber.onlyComplete || isDone { subscriber.update(data) } } } else if let size = fileSize(paths.partial) { let data = MediaResourceData(path: paths.partial, offset: 0, size: size, complete: isDone) currentContext.currentData = data for subscriber in currentContext.dataSubscribers.copyItems() { if !subscriber.onlyComplete || isDone { subscriber.update(data) } } } } })) } } } else { subscriber.putNext(MediaResourceData(path: paths.partial, offset: 0, size: 0, complete: false)) subscriber.putCompletion() } } if attemptSynchronously { begin() } else { self.concurrentQueue.async(begin) } return ActionDisposable { disposable.dispose() } } } public func resourceUsage(id: MediaResourceId) -> Int64 { let paths = self.storePathsForId(id) if let size = fileSize(paths.complete) { return Int64(size) } else if let size = fileSize(paths.partial, useTotalFileAllocatedSize: true) { return Int64(size) } else { return 0 } } public func resourceUsageWithInfo(id: MediaResourceId) -> Int32 { let paths = self.storePathsForId(id) var value = stat() if stat(paths.complete, &value) == 0 { return Int32(value.st_mtimespec.tv_sec) } value = stat() if stat(paths.partial, &value) == 0 { return Int32(value.st_mtimespec.tv_sec) } return 0 } public func collectResourceCacheUsage(_ ids: [MediaResourceId]) -> Signal<[MediaResourceId: Int64], NoError> { return Signal { subscriber in self.dataQueue.async { var result: [MediaResourceId: Int64] = [:] for id in ids { let wrappedId = id let paths = self.storePathsForId(id) if let size = fileSize(paths.complete) { result[wrappedId] = Int64(size) } else if let size = fileSize(paths.partial, useTotalFileAllocatedSize: true) { result[wrappedId] = Int64(size) } } subscriber.putNext(result) subscriber.putCompletion() } return EmptyDisposable } } private func updateGeneralResourceIndex(lowImpact: Bool, completion: @escaping () -> Void) -> Disposable { let basePath = self.basePath let storageBox = self.storageBox var isCancelled: Bool = false let processQueue = Queue(name: "UpdateResourceIndex", qos: .background) processQueue.async { if isCancelled { return } let scanContext = ScanFilesContext(path: basePath) func processStale(nextId: Data?) { let _ = (storageBox.enumerateItems(startingWith: nextId, limit: 1000) |> deliverOn(processQueue)).start(next: { ids, realNextId in var staleIds: [Data] = [] for id in ids { if let name = String(data: id, encoding: .utf8) { if self.resourceUsage(id: MediaResourceId(name)) == 0 { staleIds.append(id) } } else { staleIds.append(id) } } if !staleIds.isEmpty { storageBox.remove(ids: staleIds) } if realNextId == nil { completion() } else { if lowImpact { processQueue.after(0.4, { processStale(nextId: realNextId) }) } else { processStale(nextId: realNextId) } } }) } func processNext() { processQueue.async { if isCancelled { return } let results = scanContext.nextBatch(count: 32000) if results.isEmpty { processStale(nextId: nil) return } storageBox.addEmptyReferencesIfNotReferenced(ids: results.map { name -> (id: Data, size: Int64) in let resourceId = MediaBox.idForFileName(name: name) let paths = self.storePathsForId(MediaResourceId(resourceId)) var size: Int64 = 0 if let value = fileSize(paths.complete) { size = value } else if let value = fileSize(paths.partial) { size = value } return (resourceId.data(using: .utf8)!, size) }, contentType: MediaResourceUserContentType.other.rawValue, completion: { addedCount in if addedCount != 0 { postboxLog("UpdateResourceIndex: added \(addedCount) unreferenced ids") } if lowImpact { processQueue.after(0.4, { processNext() }) } else { processNext() } }) } } processNext() } return ActionDisposable { isCancelled = true } } /*private func updateCacheResourceIndex(pathPrefix: String, lowImpact: Bool, completion: @escaping () -> Void) -> Disposable { let cacheStorageBox = self.cacheStorageBox var isCancelled: Bool = false let processQueue = Queue(name: "UpdateResourceIndex", qos: .background) processQueue.async { if isCancelled { return } let scanContext = ScanFilesContext(path: pathPrefix) func processStale(nextId: Data?) { let _ = (storageBox.enumerateItems(startingWith: nextId, limit: 1000) |> deliverOn(processQueue)).start(next: { ids, realNextId in var staleIds: [Data] = [] for id in ids { if let name = String(data: id, encoding: .utf8) { if self.resourceUsage(id: MediaResourceId(name)) == 0 { staleIds.append(id) } } else { staleIds.append(id) } } if !staleIds.isEmpty { storageBox.remove(ids: staleIds) } if realNextId == nil { completion() } else { if lowImpact { processQueue.after(0.4, { processStale(nextId: realNextId) }) } else { processStale(nextId: realNextId) } } }) } func processNext() { processQueue.async { if isCancelled { return } let results = scanContext.nextBatch(count: 32000) if results.isEmpty { processStale(nextId: nil) return } storageBox.addEmptyReferencesIfNotReferenced(ids: results.map { name -> (id: Data, size: Int64) in let resourceId = MediaBox.idForFileName(name: name) let paths = self.storePathsForId(MediaResourceId(resourceId)) var size: Int64 = 0 if let value = fileSize(paths.complete) { size = value } else if let value = fileSize(paths.partial) { size = value } return (resourceId.data(using: .utf8)!, size) }, contentType: MediaResourceUserContentType.other.rawValue, completion: { addedCount in if addedCount != 0 { postboxLog("UpdateResourceIndex: added \(addedCount) unreferenced ids") } if lowImpact { processQueue.after(0.4, { processNext() }) } else { processNext() } }) } } processNext() } return ActionDisposable { isCancelled = true } }*/ public func updateResourceIndex(lowImpact: Bool, completion: @escaping () -> Void) -> Disposable { return self.updateGeneralResourceIndex(lowImpact: lowImpact, completion: { completion() }) } public func collectAllResourceUsage() -> Signal<[(id: String?, path: String, size: Int64)], NoError> { return Signal { subscriber in self.dataQueue.async { var result: [(id: String?, path: String, size: Int64)] = [] var fileIds = Set() if let enumerator = FileManager.default.enumerator(at: URL(fileURLWithPath: self.basePath), includingPropertiesForKeys: [.fileSizeKey, .fileResourceIdentifierKey], options: [.skipsHiddenFiles, .skipsSubdirectoryDescendants], errorHandler: nil) { loop: for url in enumerator { if let url = url as? URL { if let fileId = (try? url.resourceValues(forKeys: Set([.fileResourceIdentifierKey])))?.fileResourceIdentifier as? Data { if fileIds.contains(fileId) { //paths.append(url.lastPathComponent) continue loop } if let value = (try? url.resourceValues(forKeys: Set([.fileSizeKey])))?.fileSize, value != 0 { fileIds.insert(fileId) result.append((id: MediaBox.idForFileName(name: url.lastPathComponent), path: url.lastPathComponent, size: Int64(value))) //paths.append(url.lastPathComponent) } } } } } subscriber.putNext(result) subscriber.putCompletion() } return EmptyDisposable } } public func collectOtherResourceUsage(excludeIds: Set, combinedExcludeIds: Set) -> Signal<(Int64, [String], Int64), NoError> { return Signal { subscriber in self.dataQueue.async { var result: Int64 = 0 var excludeNames = Set() for id in combinedExcludeIds { let partial = "\(self.fileNameForId(id))_partial" let meta = "\(self.fileNameForId(id))_meta" let complete = self.fileNameForId(id) excludeNames.insert(meta) excludeNames.insert(partial) excludeNames.insert(complete) } var fileIds = Set() var paths: [String] = [] if let enumerator = FileManager.default.enumerator(at: URL(fileURLWithPath: self.basePath), includingPropertiesForKeys: [.fileSizeKey, .fileResourceIdentifierKey], options: [.skipsHiddenFiles, .skipsSubdirectoryDescendants], errorHandler: nil) { loop: for url in enumerator { if let url = url as? URL { if excludeNames.contains(url.lastPathComponent) { continue loop } if let fileId = (try? url.resourceValues(forKeys: Set([.fileResourceIdentifierKey])))?.fileResourceIdentifier as? Data { if fileIds.contains(fileId) { paths.append(url.lastPathComponent) continue loop } if let value = (try? url.resourceValues(forKeys: Set([.fileSizeKey])))?.fileSize, value != 0 { fileIds.insert(fileId) paths.append(url.lastPathComponent) result += Int64(value) } } } } } var cacheResult: Int64 = 0 var excludePrefixes = Set() for id in excludeIds { let cachedRepresentationPrefix = self.fileNameForId(id) excludePrefixes.insert(cachedRepresentationPrefix) } if let enumerator = FileManager.default.enumerator(at: URL(fileURLWithPath: self.basePath + "/cache"), includingPropertiesForKeys: [.fileSizeKey], options: [.skipsHiddenFiles, .skipsSubdirectoryDescendants], errorHandler: nil) { loop: for url in enumerator { if let url = url as? URL { if let prefix = url.lastPathComponent.components(separatedBy: ":").first, excludePrefixes.contains(prefix) { continue loop } if let value = (try? url.resourceValues(forKeys: Set([.fileSizeKey])))?.fileSize, value != 0 { paths.append("cache/" + url.lastPathComponent) cacheResult += Int64(value) } } } } func processRecursive(directoryPath: String, subdirectoryPath: String) { if let enumerator = FileManager.default.enumerator(at: URL(fileURLWithPath: directoryPath), includingPropertiesForKeys: [.fileSizeKey, .isDirectoryKey], options: [.skipsHiddenFiles, .skipsSubdirectoryDescendants], errorHandler: nil) { loop: for url in enumerator { if let url = url as? URL { if let prefix = url.lastPathComponent.components(separatedBy: ":").first, excludePrefixes.contains(prefix) { continue loop } if let isDirectory = (try? url.resourceValues(forKeys: Set([.isDirectoryKey])))?.isDirectory, isDirectory { processRecursive(directoryPath: url.path, subdirectoryPath: subdirectoryPath + "/\(url.lastPathComponent)") } else if let value = (try? url.resourceValues(forKeys: Set([.fileSizeKey])))?.fileSize, value != 0 { paths.append("\(subdirectoryPath)/" + url.lastPathComponent) cacheResult += Int64(value) } } } } } processRecursive(directoryPath: self.basePath + "/animation-cache", subdirectoryPath: "animation-cache") if let enumerator = FileManager.default.enumerator(at: URL(fileURLWithPath: self.basePath + "/short-cache"), includingPropertiesForKeys: [.fileSizeKey], options: [.skipsHiddenFiles, .skipsSubdirectoryDescendants], errorHandler: nil) { loop: for url in enumerator { if let url = url as? URL { if let prefix = url.lastPathComponent.components(separatedBy: ":").first, excludePrefixes.contains(prefix) { continue loop } if let value = (try? url.resourceValues(forKeys: Set([.fileSizeKey])))?.fileSize, value != 0 { paths.append("short-cache/" + url.lastPathComponent) cacheResult += Int64(value) } } } } subscriber.putNext((result, paths, cacheResult)) subscriber.putCompletion() } return EmptyDisposable } } public func removeOtherCachedResources(paths: [String]) -> Signal { return Signal { subscriber in self.dataQueue.async { var keepPrefixes: [String] = [] for id in self.keepResourceContexts.keys { let resourcePaths = self.fileNamesForId(id) keepPrefixes.append(resourcePaths.partial) keepPrefixes.append(resourcePaths.complete) } var count: Int = 0 let totalCount = paths.count if totalCount == 0 { subscriber.putNext(1.0) subscriber.putCompletion() return } let reportProgress: (Int) -> Void = { count in Queue.mainQueue().async { subscriber.putNext(min(1.0, Float(count) / Float(totalCount))) } } outer: for path in paths { for prefix in keepPrefixes { if path.starts(with: prefix) { count += 1 continue outer } } count += 1 unlink(self.basePath + "/" + path) reportProgress(count) } subscriber.putCompletion() } return EmptyDisposable } } public func removeCachedResources(_ ids: [MediaResourceId], force: Bool = false, notify: Bool = false) -> Signal { return Signal { subscriber in self.dataQueue.async { //let uniqueIds = Set(ids.map { $0.stringRepresentation }) let pathsToDelete: [String] = [] /*for cacheType in ["cache", "short-cache"] { if let enumerator = FileManager.default.enumerator(at: URL(fileURLWithPath: "\(self.basePath)/\(cacheType)"), includingPropertiesForKeys: [], options: [.skipsSubdirectoryDescendants], errorHandler: nil) { while let item = enumerator.nextObject() { guard let url = item as? NSURL, let path = url.path, let fileName = url.lastPathComponent else { continue } if let range = fileName.range(of: ":") { let resourceId = String(fileName[fileName.startIndex ..< range.lowerBound]) if uniqueIds.contains(resourceId) { pathsToDelete.append(path) } } } } }*/ var count: Int = 0 let totalCount = ids.count * 3 + pathsToDelete.count if totalCount == 0 { subscriber.putNext(1.0) subscriber.putCompletion() return } var lastReportValue = 0 let reportProgress: (Int) -> Void = { count in let currentProgress = min(1.0, Float(count) / Float(totalCount)) let currentInteger = Int(currentProgress * 100.0) if lastReportValue != currentInteger { lastReportValue = currentInteger subscriber.putNext(currentProgress) } } for id in ids { if !force { if self.fileContexts[id] != nil { count += 3 reportProgress(count) continue } if self.keepResourceContexts[id] != nil { count += 3 reportProgress(count) continue } } let paths = self.storePathsForId(id) unlink(paths.complete) unlink(paths.partial) unlink(paths.partial + ".meta") self.fileContexts.removeValue(forKey: id) count += 3 reportProgress(count) } for path in pathsToDelete { unlink(path) count += 1 reportProgress(count) } if notify { for id in ids { if let context = self.statusContexts[id] { context.status = .Remote(progress: 0.0) for f in context.subscribers.copyItems() { f(.Remote(progress: 0.0)) } } } } self.dataQueue.justDispatch { self.didRemoveResourcesPipe.putNext(Void()) } subscriber.putNext(1.0) subscriber.putCompletion() } return EmptyDisposable } } public func removeCachedResourcesWithResult(_ ids: [MediaResourceId], force: Bool = false, notify: Bool = false) -> Signal<[MediaResourceId], NoError> { return Signal { subscriber in self.dataQueue.async { var removedIds: [MediaResourceId] = [] for id in ids { if !force { if self.fileContexts[id] != nil { continue } if self.keepResourceContexts[id] != nil { continue } } let paths = self.storePathsForId(id) unlink(paths.complete) unlink(paths.partial) unlink(paths.partial + ".meta") self.fileContexts.removeValue(forKey: id) removedIds.append(id) } if notify { for id in ids { if let context = self.statusContexts[id] { context.status = .Remote(progress: 0.0) for f in context.subscribers.copyItems() { f(.Remote(progress: 0.0)) } } } } self.dataQueue.justDispatch { self.didRemoveResourcesPipe.putNext(Void()) } subscriber.putNext(removedIds) subscriber.putCompletion() } return EmptyDisposable } } public func allFileContextResourceIds() -> Signal, NoError> { return Signal { subscriber in self.dataQueue.async { subscriber.putNext(Set(self.fileContexts.map({ $0.key }))) subscriber.putCompletion() } return EmptyDisposable } } public func allFileContexts() -> Signal<[(partial: String, complete: String)], NoError> { return Signal { subscriber in self.dataQueue.async { var result: [(partial: String, complete: String)] = [] for (id, _) in self.fileContexts { let paths = self.storePathsForId(id) result.append((partial: paths.partial, complete: paths.complete)) } subscriber.putNext(result) subscriber.putCompletion() } return EmptyDisposable } } } private final class ScanFilesContext { private let path: String private var dirHandle: UnsafeMutablePointer? private let pathBuffer: UnsafeMutablePointer init(path: String) { self.path = path self.dirHandle = opendir(path) self.pathBuffer = malloc(2048).assumingMemoryBound(to: Int8.self) } deinit { if let dirHandle = self.dirHandle { closedir(dirHandle) } free(self.pathBuffer) } func nextBatch(count: Int) -> [String] { guard let dirHandle = self.dirHandle else { return [] } var result: [String] = [] while true { guard let dirp = readdir(dirHandle) else { closedir(dirHandle) self.dirHandle = nil break } if dirp.pointee.d_type != DT_REG { continue } if strncmp(&dirp.pointee.d_name.0, ".", 1024) == 0 { continue } if strncmp(&dirp.pointee.d_name.0, "..", 1024) == 0 { continue } strncpy(self.pathBuffer, self.path, 1024) strncat(self.pathBuffer, "/", 1024) strncat(self.pathBuffer, &dirp.pointee.d_name.0, 1024) //puts(pathBuffer) //puts("\n") var value = stat() if stat(self.pathBuffer, &value) == 0 { if let itemPath = String(data: Data(bytes: &dirp.pointee.d_name.0, count: Int(dirp.pointee.d_namlen)), encoding: .utf8) { result.append(itemPath) } /*result.totalSize += UInt64(value.st_size) inodes.append(InodeInfo( inode: value.st_ino, timestamp: Int32(clamping: value.st_mtimespec.tv_sec), size: UInt32(clamping: value.st_size) ))*/ } } return result } } /*private func scanFiles(at path: String, inodes: inout [InodeInfo]) -> ScanFilesResult { var result = ScanFilesResult() if let dp = opendir(path) { let pathBuffer = malloc(2048).assumingMemoryBound(to: Int8.self) defer { free(pathBuffer) } while true { guard let dirp = readdir(dp) else { break } if strncmp(&dirp.pointee.d_name.0, ".", 1024) == 0 { continue } if strncmp(&dirp.pointee.d_name.0, "..", 1024) == 0 { continue } strncpy(pathBuffer, path, 1024) strncat(pathBuffer, "/", 1024) strncat(pathBuffer, &dirp.pointee.d_name.0, 1024) //puts(pathBuffer) //puts("\n") var value = stat() if stat(pathBuffer, &value) == 0 { if value.st_mtimespec.tv_sec < minTimestamp { unlink(pathBuffer) result.unlinkedCount += 1 } else { result.totalSize += UInt64(value.st_size) inodes.append(InodeInfo( inode: value.st_ino, timestamp: Int32(clamping: value.st_mtimespec.tv_sec), size: UInt32(clamping: value.st_size) )) } } } closedir(dp) } return result }*/