diff --git a/submodules/Postbox/Sources/MediaBox.swift b/submodules/Postbox/Sources/MediaBox.swift index f1cbdb1419..c23ba27abd 100644 --- a/submodules/Postbox/Sources/MediaBox.swift +++ b/submodules/Postbox/Sources/MediaBox.swift @@ -140,6 +140,7 @@ public final class MediaBox { private let statusQueue = Queue() private let concurrentQueue = Queue.concurrentDefaultQueue() private let dataQueue = Queue() + private let dataFileManager: MediaBoxFileManager private let cacheQueue = Queue() private let timeBasedCleanup: TimeBasedCleanup @@ -194,6 +195,8 @@ public final class MediaBox { self.basePath + "/short-cache" ]) + self.dataFileManager = MediaBoxFileManager(queue: self.dataQueue) + let _ = self.ensureDirectoryCreated } @@ -540,7 +543,7 @@ public final class MediaBox { paths.partial, paths.partial + ".meta" ]) - if let fileContext = MediaBoxFileContext(queue: self.dataQueue, path: paths.complete, partialPath: paths.partial, metaPath: paths.partial + ".meta") { + if let fileContext = MediaBoxFileContext(queue: self.dataQueue, manager: self.dataFileManager, path: paths.complete, partialPath: paths.partial, metaPath: paths.partial + ".meta") { context = fileContext self.fileContexts[resourceId] = fileContext } else { @@ -633,7 +636,7 @@ public final class MediaBox { subscriber.putCompletion() return EmptyDisposable } else { - if let data = MediaBoxPartialFile.extractPartialData(path: paths.partial, metaPath: paths.partial + ".meta", range: range) { + if let data = MediaBoxPartialFile.extractPartialData(manager: MediaBoxFileManager(queue: nil), path: paths.partial, metaPath: paths.partial + ".meta", range: range) { subscriber.putNext((data, true)) subscriber.putCompletion() return EmptyDisposable diff --git a/submodules/Postbox/Sources/MediaBoxFile.swift b/submodules/Postbox/Sources/MediaBoxFile.swift index aeb69d5829..3cf588079b 100644 --- a/submodules/Postbox/Sources/MediaBoxFile.swift +++ b/submodules/Postbox/Sources/MediaBoxFile.swift @@ -4,7 +4,180 @@ import Crc32 import ManagedFile import RangeSet +final class MediaBoxFileManager { + enum Mode { + case read + case readwrite + } + + enum AccessError: Error { + case generic + } + + final class Item { + final class Accessor { + private let file: ManagedFile + + init(file: ManagedFile) { + self.file = file + } + + func write(_ data: UnsafeRawPointer, count: Int) -> Int { + return self.file.write(data, count: count) + } + + func read(_ data: UnsafeMutableRawPointer, _ count: Int) -> Int { + return self.file.read(data, count) + } + + func readData(count: Int) -> Data { + return self.file.readData(count: count) + } + + func seek(position: Int64) { + self.file.seek(position: position) + } + } + + weak var manager: MediaBoxFileManager? + let path: String + let mode: Mode + + weak var context: ItemContext? + + init(manager: MediaBoxFileManager, path: String, mode: Mode) { + self.manager = manager + self.path = path + self.mode = mode + } + + deinit { + if let manager = self.manager, let context = self.context { + manager.discardItemContext(context: context) + } + } + + func access(_ f: (Accessor) throws -> Void) throws { + if let context = self.context { + try f(Accessor(file: context.file)) + } else { + if let manager = self.manager { + if let context = manager.takeContext(path: self.path, mode: self.mode) { + self.context = context + try f(Accessor(file: context.file)) + } else { + throw AccessError.generic + } + } else { + throw AccessError.generic + } + } + } + + func sync() { + if let context = self.context { + context.sync() + } + } + } + + final class ItemContext { + let id: Int + let path: String + let mode: Mode + let file: ManagedFile + + private var isDisposed: Bool = false + + init?(id: Int, path: String, mode: Mode) { + let mappedMode: ManagedFile.Mode + switch mode { + case .read: + mappedMode = .read + case .readwrite: + mappedMode = .readwrite + } + + guard let file = ManagedFile(queue: nil, path: path, mode: mappedMode) else { + return nil + } + self.file = file + + self.id = id + self.path = path + self.mode = mode + } + + deinit { + assert(self.isDisposed) + } + + func dispose() { + if !self.isDisposed { + self.isDisposed = true + self.file._unsafeClose() + } else { + assertionFailure() + } + } + + func sync() { + self.file.sync() + } + } + + private let queue: Queue? + private var contexts: [Int: ItemContext] = [:] + private var nextItemId: Int = 0 + private let maxOpenFiles: Int + + init(queue: Queue?) { + self.queue = queue + self.maxOpenFiles = 16 + } + + func open(path: String, mode: Mode) -> Item? { + if let queue = self.queue { + assert(queue.isCurrent()) + } + + return Item(manager: self, path: path, mode: mode) + } + + private func takeContext(path: String, mode: Mode) -> ItemContext? { + if let queue = self.queue { + assert(queue.isCurrent()) + } + + if self.contexts.count > self.maxOpenFiles { + if let minKey = self.contexts.keys.min(), let context = self.contexts[minKey] { + self.discardItemContext(context: context) + } + } + + let id = self.nextItemId + self.nextItemId += 1 + let context = ItemContext(id: id, path: path, mode: mode) + self.contexts[id] = context + return context + } + + private func discardItemContext(context: ItemContext) { + if let queue = self.queue { + assert(queue.isCurrent()) + } + + if let context = self.contexts.removeValue(forKey: context.id) { + context.dispose() + } + } +} + private final class MediaBoxFileMap { + enum FileMapError: Error { + case generic + } + fileprivate(set) var sum: Int64 private(set) var ranges: RangeSet private(set) var truncationSize: Int64? @@ -17,173 +190,216 @@ private final class MediaBoxFileMap { self.progress = nil } - init?(fd: ManagedFile) { - guard let length = fd.getSize() else { - return nil - } - - var firstUInt32: UInt32 = 0 - guard fd.read(&firstUInt32, 4) == 4 else { - return nil - } - - if firstUInt32 == 0x7bac1487 { - var crc: UInt32 = 0 - guard fd.read(&crc, 4) == 4 else { - return nil - } - - var count: Int32 = 0 - var sum: Int64 = 0 - var ranges = RangeSet() - - guard fd.read(&count, 4) == 4 else { - return nil - } - - if count < 0 { - return nil - } - - if count < 0 || length < 4 + 4 + 4 + 8 + count * 2 * 8 { - return nil - } - - var truncationSizeValue: Int64 = 0 - - var data = Data(count: Int(8 + count * 2 * 8)) - let dataCount = data.count - if !(data.withUnsafeMutableBytes { rawBytes -> Bool in - let bytes = rawBytes.baseAddress!.assumingMemoryBound(to: UInt8.self) - - guard fd.read(bytes, dataCount) == dataCount else { - return false - } - - memcpy(&truncationSizeValue, bytes, 8) - - let calculatedCrc = Crc32(bytes, Int32(dataCount)) - if calculatedCrc != crc { - return false - } - - var offset = 8 - for _ in 0 ..< count { - var intervalOffset: Int64 = 0 - var intervalLength: Int64 = 0 - memcpy(&intervalOffset, bytes.advanced(by: offset), 8) - memcpy(&intervalLength, bytes.advanced(by: offset + 8), 8) - offset += 8 * 2 - - ranges.insert(contentsOf: intervalOffset ..< (intervalOffset + intervalLength)) - - sum += intervalLength - } - - return true - }) { - return nil - } - - self.sum = sum - self.ranges = ranges - if truncationSizeValue == -1 { - self.truncationSize = nil - } else if truncationSizeValue < 0 { - self.truncationSize = nil - } else { - self.truncationSize = truncationSizeValue - } - } else { - let crc: UInt32 = firstUInt32 - var count: Int32 = 0 - var sum: Int32 = 0 - var ranges = RangeSet() - - guard fd.read(&count, 4) == 4 else { - return nil - } - - if count < 0 { - return nil - } - - if count < 0 || UInt64(length) < 4 + 4 + UInt64(count) * 2 * 4 { - return nil - } - - var truncationSizeValue: Int32 = 0 - - var data = Data(count: Int(4 + count * 2 * 4)) - let dataCount = data.count - if !(data.withUnsafeMutableBytes { rawBytes -> Bool in - let bytes = rawBytes.baseAddress!.assumingMemoryBound(to: UInt8.self) - - guard fd.read(bytes, dataCount) == dataCount else { - return false - } - - memcpy(&truncationSizeValue, bytes, 4) - - let calculatedCrc = Crc32(bytes, Int32(dataCount)) - if calculatedCrc != crc { - return false - } - - var offset = 4 - for _ in 0 ..< count { - var intervalOffset: Int32 = 0 - var intervalLength: Int32 = 0 - memcpy(&intervalOffset, bytes.advanced(by: offset), 4) - memcpy(&intervalLength, bytes.advanced(by: offset + 4), 4) - offset += 8 - - ranges.insert(contentsOf: Int64(intervalOffset) ..< Int64(intervalOffset + intervalLength)) - - sum += intervalLength - } - - return true - }) { - return nil - } - - self.sum = Int64(sum) - self.ranges = ranges - if truncationSizeValue == -1 { - self.truncationSize = nil - } else { - self.truncationSize = Int64(truncationSizeValue) - } - } + private init( + sum: Int64, + ranges: RangeSet, + truncationSize: Int64?, + progress: Float? + ) { + self.sum = sum + self.ranges = ranges + self.truncationSize = truncationSize + self.progress = progress } - func serialize(to file: ManagedFile) { - file.seek(position: 0) - let buffer = WriteBuffer() - var magic: UInt32 = 0x7bac1487 - buffer.write(&magic, offset: 0, length: 4) - - var zero: Int32 = 0 - buffer.write(&zero, offset: 0, length: 4) - - let rangeView = self.ranges.ranges - var count: Int32 = Int32(rangeView.count) - buffer.write(&count, offset: 0, length: 4) - - var truncationSizeValue: Int64 = self.truncationSize ?? -1 - buffer.write(&truncationSizeValue, offset: 0, length: 8) - - for range in rangeView { - var intervalOffset = range.lowerBound - var intervalLength = range.upperBound - range.lowerBound - buffer.write(&intervalOffset, offset: 0, length: 8) - buffer.write(&intervalLength, offset: 0, length: 8) + static func read(manager: MediaBoxFileManager, path: String) throws -> MediaBoxFileMap { + guard let length = fileSize(path) else { + throw FileMapError.generic + } + guard let fileItem = manager.open(path: path, mode: .readwrite) else { + throw FileMapError.generic + } + + var result: MediaBoxFileMap? + + try fileItem.access { fd in + var firstUInt32: UInt32 = 0 + guard fd.read(&firstUInt32, 4) == 4 else { + throw FileMapError.generic + } + + if firstUInt32 == 0x7bac1487 { + var crc: UInt32 = 0 + guard fd.read(&crc, 4) == 4 else { + throw FileMapError.generic + } + + var count: Int32 = 0 + var sum: Int64 = 0 + var ranges = RangeSet() + + guard fd.read(&count, 4) == 4 else { + throw FileMapError.generic + } + + if count < 0 { + throw FileMapError.generic + } + + if count < 0 || length < 4 + 4 + 4 + 8 + count * 2 * 8 { + throw FileMapError.generic + } + + var truncationSizeValue: Int64 = 0 + + var data = Data(count: Int(8 + count * 2 * 8)) + let dataCount = data.count + if !(data.withUnsafeMutableBytes { rawBytes -> Bool in + let bytes = rawBytes.baseAddress!.assumingMemoryBound(to: UInt8.self) + + guard fd.read(bytes, dataCount) == dataCount else { + return false + } + + memcpy(&truncationSizeValue, bytes, 8) + + let calculatedCrc = Crc32(bytes, Int32(dataCount)) + if calculatedCrc != crc { + return false + } + + var offset = 8 + for _ in 0 ..< count { + var intervalOffset: Int64 = 0 + var intervalLength: Int64 = 0 + memcpy(&intervalOffset, bytes.advanced(by: offset), 8) + memcpy(&intervalLength, bytes.advanced(by: offset + 8), 8) + offset += 8 * 2 + + ranges.insert(contentsOf: intervalOffset ..< (intervalOffset + intervalLength)) + + sum += intervalLength + } + + return true + }) { + throw FileMapError.generic + } + + let mappedTruncationSize: Int64? + if truncationSizeValue == -1 { + mappedTruncationSize = nil + } else if truncationSizeValue < 0 { + mappedTruncationSize = nil + } else { + mappedTruncationSize = truncationSizeValue + } + + result = MediaBoxFileMap( + sum: sum, + ranges: ranges, + truncationSize: mappedTruncationSize, + progress: nil + ) + } else { + let crc: UInt32 = firstUInt32 + var count: Int32 = 0 + var sum: Int32 = 0 + var ranges = RangeSet() + + guard fd.read(&count, 4) == 4 else { + throw FileMapError.generic + } + + if count < 0 { + throw FileMapError.generic + } + + if count < 0 || UInt64(length) < 4 + 4 + UInt64(count) * 2 * 4 { + throw FileMapError.generic + } + + var truncationSizeValue: Int32 = 0 + + var data = Data(count: Int(4 + count * 2 * 4)) + let dataCount = data.count + if !(data.withUnsafeMutableBytes { rawBytes -> Bool in + let bytes = rawBytes.baseAddress!.assumingMemoryBound(to: UInt8.self) + + guard fd.read(bytes, dataCount) == dataCount else { + return false + } + + memcpy(&truncationSizeValue, bytes, 4) + + let calculatedCrc = Crc32(bytes, Int32(dataCount)) + if calculatedCrc != crc { + return false + } + + var offset = 4 + for _ in 0 ..< count { + var intervalOffset: Int32 = 0 + var intervalLength: Int32 = 0 + memcpy(&intervalOffset, bytes.advanced(by: offset), 4) + memcpy(&intervalLength, bytes.advanced(by: offset + 4), 4) + offset += 8 + + ranges.insert(contentsOf: Int64(intervalOffset) ..< Int64(intervalOffset + intervalLength)) + + sum += intervalLength + } + + return true + }) { + throw FileMapError.generic + } + + let mappedTruncationSize: Int64? + if truncationSizeValue == -1 { + mappedTruncationSize = nil + } else { + mappedTruncationSize = Int64(truncationSizeValue) + } + + result = MediaBoxFileMap( + sum: Int64(sum), + ranges: ranges, + truncationSize: mappedTruncationSize, + progress: nil + ) + } + } + + guard let result = result else { + throw FileMapError.generic + } + return result + } + + func serialize(manager: MediaBoxFileManager, to path: String) { + guard let fileItem = manager.open(path: path, mode: .readwrite) else { + postboxLog("MediaBoxFile: serialize: cannot open file") + return + } + + let _ = try? fileItem.access { file in + file.seek(position: 0) + let buffer = WriteBuffer() + var magic: UInt32 = 0x7bac1487 + buffer.write(&magic, offset: 0, length: 4) + + var zero: Int32 = 0 + buffer.write(&zero, offset: 0, length: 4) + + let rangeView = self.ranges.ranges + var count: Int32 = Int32(rangeView.count) + buffer.write(&count, offset: 0, length: 4) + + var truncationSizeValue: Int64 = self.truncationSize ?? -1 + buffer.write(&truncationSizeValue, offset: 0, length: 8) + + for range in rangeView { + var intervalOffset = range.lowerBound + var intervalLength = range.upperBound - range.lowerBound + buffer.write(&intervalOffset, offset: 0, length: 8) + buffer.write(&intervalLength, offset: 0, length: 8) + } + var crc: UInt32 = Crc32(buffer.memory.advanced(by: 4 + 4 + 4), Int32(buffer.length - (4 + 4 + 4))) + memcpy(buffer.memory.advanced(by: 4), &crc, 4) + let written = file.write(buffer.memory, count: buffer.length) + assert(written == buffer.length) } - var crc: UInt32 = Crc32(buffer.memory.advanced(by: 4 + 4 + 4), Int32(buffer.length - (4 + 4 + 4))) - memcpy(buffer.memory.advanced(by: 4), &crc, 4) - let written = file.write(buffer.memory, count: buffer.length) - assert(written == buffer.length) } fileprivate func fill(_ range: Range) { @@ -243,12 +459,12 @@ private class MediaBoxPartialFileDataRequest { final class MediaBoxPartialFile { private let queue: Queue + private let manager: MediaBoxFileManager private let path: String private let metaPath: String private let completePath: String private let completed: (Int64) -> Void - private let metadataFd: ManagedFile - private let fd: ManagedFile + private let fd: MediaBoxFileManager.Item fileprivate let fileMap: MediaBoxFileMap private var dataRequests = Bag() private let missingRanges: MediaBoxFileMissingRanges @@ -260,17 +476,17 @@ final class MediaBoxPartialFile { private var currentFetch: (Promise<[(Range, MediaBoxFetchPriority)]>, Disposable)? private var processedAtLeastOneFetch: Bool = false - init?(queue: Queue, path: String, metaPath: String, completePath: String, completed: @escaping (Int64) -> Void) { + init?(queue: Queue, manager: MediaBoxFileManager, path: String, metaPath: String, completePath: String, completed: @escaping (Int64) -> Void) { assert(queue.isCurrent()) - if let metadataFd = ManagedFile(queue: queue, path: metaPath, mode: .readwrite), let fd = ManagedFile(queue: queue, path: path, mode: .readwrite) { + self.manager = manager + if let fd = manager.open(path: path, mode: .readwrite) { self.queue = queue self.path = path self.metaPath = metaPath self.completePath = completePath self.completed = completed - self.metadataFd = metadataFd self.fd = fd - if let fileMap = MediaBoxFileMap(fd: self.metadataFd) { + if let fileMap = try? MediaBoxFileMap.read(manager: manager, path: self.metaPath) { if !fileMap.ranges.isEmpty { let upperBound = fileMap.ranges.ranges.last!.upperBound if let actualSize = fileSize(path, useTotalFileAllocatedSize: false) { @@ -298,14 +514,11 @@ final class MediaBoxPartialFile { self.currentFetch?.1.dispose() } - static func extractPartialData(path: String, metaPath: String, range: Range) -> Data? { - guard let metadataFd = ManagedFile(queue: nil, path: metaPath, mode: .read) else { - return nil - } + static func extractPartialData(manager: MediaBoxFileManager, path: String, metaPath: String, range: Range) -> Data? { guard let fd = ManagedFile(queue: nil, path: path, mode: .read) else { return nil } - guard let fileMap = MediaBoxFileMap(fd: metadataFd) else { + guard let fileMap = try? MediaBoxFileMap.read(manager: manager, path: metaPath) else { return nil } guard let clippedRange = fileMap.contains(range) else { @@ -324,7 +537,7 @@ final class MediaBoxPartialFile { assert(self.queue.isCurrent()) self.fileMap.reset() - self.fileMap.serialize(to: self.metadataFd) + self.fileMap.serialize(manager: self.manager, to: self.metaPath) for request in self.dataRequests.copyItems() { request.completion(MediaResourceData(path: self.path, offset: request.range.lowerBound, size: 0, complete: false)) @@ -428,7 +641,7 @@ final class MediaBoxPartialFile { let range: Range = size ..< Int64.max self.fileMap.truncate(size) - self.fileMap.serialize(to: self.metadataFd) + self.fileMap.serialize(manager: self.manager, to: self.metaPath) self.checkDataRequestsAfterFill(range: range) } @@ -443,16 +656,23 @@ final class MediaBoxPartialFile { func write(offset: Int64, data: Data, dataRange: Range) { assert(self.queue.isCurrent()) - self.fd.seek(position: offset) - let written = data.withUnsafeBytes { rawBytes -> Int in - let bytes = rawBytes.baseAddress!.assumingMemoryBound(to: UInt8.self) + do { + try self.fd.access { fd in + fd.seek(position: offset) + let written = data.withUnsafeBytes { rawBytes -> Int in + let bytes = rawBytes.baseAddress!.assumingMemoryBound(to: UInt8.self) - return self.fd.write(bytes.advanced(by: Int(dataRange.lowerBound)), count: dataRange.count) + return fd.write(bytes.advanced(by: Int(dataRange.lowerBound)), count: dataRange.count) + } + assert(written == dataRange.count) + } + } catch let e { + postboxLog("MediaBoxPartialFile.write error: \(e)") } - assert(written == dataRange.count) + let range: Range = offset ..< (offset + Int64(dataRange.count)) self.fileMap.fill(range) - self.fileMap.serialize(to: self.metadataFd) + self.fileMap.serialize(manager: self.manager, to: self.metaPath) self.checkDataRequestsAfterFill(range: range) } @@ -536,16 +756,25 @@ final class MediaBoxPartialFile { assert(self.queue.isCurrent()) if let actualRange = self.fileMap.contains(range) { - self.fd.seek(position: Int64(actualRange.lowerBound)) - var data = Data(count: actualRange.count) - let dataCount = data.count - let readBytes = data.withUnsafeMutableBytes { rawBytes -> Int in - let bytes = rawBytes.baseAddress!.assumingMemoryBound(to: Int8.self) - return self.fd.read(bytes, dataCount) - } - if readBytes == data.count { - return data - } else { + do { + var result: Data? + try self.fd.access { fd in + fd.seek(position: Int64(actualRange.lowerBound)) + var data = Data(count: actualRange.count) + let dataCount = data.count + let readBytes = data.withUnsafeMutableBytes { rawBytes -> Int in + let bytes = rawBytes.baseAddress!.assumingMemoryBound(to: Int8.self) + return fd.read(bytes, dataCount) + } + if readBytes == data.count { + result = data + } else { + result = nil + } + } + return result + } catch let e { + postboxLog("MediaBoxPartialFile.read error: \(e)") return nil } } else { @@ -954,7 +1183,7 @@ final class MediaBoxFileContext { return self.references.isEmpty } - init?(queue: Queue, path: String, partialPath: String, metaPath: String) { + init?(queue: Queue, manager: MediaBoxFileManager, path: String, partialPath: String, metaPath: String) { assert(queue.isCurrent()) self.queue = queue @@ -965,7 +1194,7 @@ final class MediaBoxFileContext { var completeImpl: ((Int64) -> Void)? if let size = fileSize(path) { self.content = .complete(path, size) - } else if let file = MediaBoxPartialFile(queue: queue, path: partialPath, metaPath: metaPath, completePath: path, completed: { size in + } else if let file = MediaBoxPartialFile(queue: queue, manager: manager, path: partialPath, metaPath: metaPath, completePath: path, completed: { size in completeImpl?(size) }) { self.content = .partial(file)