Add MediaFileContextV2Impl

This commit is contained in:
Ali 2023-03-14 19:15:39 +04:00
parent 09afcfca7e
commit eed4f9801c
6 changed files with 1239 additions and 445 deletions

View File

@ -47,7 +47,7 @@ public struct ResourceStorePaths {
public let complete: String
}
public struct MediaResourceData {
public struct MediaResourceData: Equatable {
public let path: String
public let offset: Int64
public let size: Int64
@ -575,12 +575,22 @@ public final class MediaBox {
paths.partial,
paths.partial + ".meta"
])
if let fileContext = MediaBoxFileContext(queue: self.dataQueue, manager: self.dataFileManager, storageBox: self.storageBox, resourceId: id.stringRepresentation.data(using: .utf8)!, path: paths.complete, partialPath: paths.partial, metaPath: paths.partial + ".meta") {
#if DEBUG
if let fileContext = MediaBoxFileContextV2Impl(queue: self.dataQueue, manager: self.dataFileManager, storageBox: self.storageBox, resourceId: id.stringRepresentation.data(using: .utf8)!, path: paths.complete, partialPath: paths.partial, metaPath: paths.partial + ".meta") {
context = fileContext
self.fileContexts[resourceId] = fileContext
} else {
return nil
}
#else
if let fileContext = MediaBoxFileContextImpl(queue: self.dataQueue, manager: self.dataFileManager, storageBox: self.storageBox, resourceId: id.stringRepresentation.data(using: .utf8)!, path: paths.complete, partialPath: paths.partial, metaPath: paths.partial + ".meta") {
context = fileContext
self.fileContexts[resourceId] = fileContext
} else {
return nil
}
#endif
}
if let context = context {
let index = context.addReference()

View File

@ -1,450 +1,8 @@
import Foundation
import SwiftSignalKit
import Crc32
import ManagedFile
import RangeSet
final class MediaBoxFileManager {
enum Mode {
case read
case readwrite
}
enum AccessError: Error {
case generic
}
final class Item {
final class Accessor {
private let file: ManagedFile
init(file: ManagedFile) {
self.file = file
}
func write(_ data: UnsafeRawPointer, count: Int) -> Int {
return self.file.write(data, count: count)
}
func read(_ data: UnsafeMutableRawPointer, _ count: Int) -> Int {
return self.file.read(data, count)
}
func readData(count: Int) -> Data {
return self.file.readData(count: count)
}
func seek(position: Int64) {
self.file.seek(position: position)
}
}
weak var manager: MediaBoxFileManager?
let path: String
let mode: Mode
weak var context: ItemContext?
init(manager: MediaBoxFileManager, path: String, mode: Mode) {
self.manager = manager
self.path = path
self.mode = mode
}
deinit {
if let manager = self.manager, let context = self.context {
manager.discardItemContext(context: context)
}
}
func access(_ f: (Accessor) throws -> Void) throws {
if let context = self.context {
try f(Accessor(file: context.file))
} else {
if let manager = self.manager {
if let context = manager.takeContext(path: self.path, mode: self.mode) {
self.context = context
try f(Accessor(file: context.file))
} else {
throw AccessError.generic
}
} else {
throw AccessError.generic
}
}
}
func sync() {
if let context = self.context {
context.sync()
}
}
}
final class ItemContext {
let id: Int
let path: String
let mode: Mode
let file: ManagedFile
private var isDisposed: Bool = false
init?(id: Int, path: String, mode: Mode) {
let mappedMode: ManagedFile.Mode
switch mode {
case .read:
mappedMode = .read
case .readwrite:
mappedMode = .readwrite
}
guard let file = ManagedFile(queue: nil, path: path, mode: mappedMode) else {
return nil
}
self.file = file
self.id = id
self.path = path
self.mode = mode
}
deinit {
assert(self.isDisposed)
}
func dispose() {
if !self.isDisposed {
self.isDisposed = true
self.file._unsafeClose()
} else {
assertionFailure()
}
}
func sync() {
self.file.sync()
}
}
private let queue: Queue?
private var contexts: [Int: ItemContext] = [:]
private var nextItemId: Int = 0
private let maxOpenFiles: Int
init(queue: Queue?) {
self.queue = queue
self.maxOpenFiles = 16
}
func open(path: String, mode: Mode) -> Item? {
if let queue = self.queue {
assert(queue.isCurrent())
}
return Item(manager: self, path: path, mode: mode)
}
private func takeContext(path: String, mode: Mode) -> ItemContext? {
if let queue = self.queue {
assert(queue.isCurrent())
}
if self.contexts.count > self.maxOpenFiles {
if let minKey = self.contexts.keys.min(), let context = self.contexts[minKey] {
self.discardItemContext(context: context)
}
}
let id = self.nextItemId
self.nextItemId += 1
let context = ItemContext(id: id, path: path, mode: mode)
self.contexts[id] = context
return context
}
private func discardItemContext(context: ItemContext) {
if let queue = self.queue {
assert(queue.isCurrent())
}
if let context = self.contexts.removeValue(forKey: context.id) {
context.dispose()
}
}
}
private final class MediaBoxFileMap {
enum FileMapError: Error {
case generic
}
fileprivate(set) var sum: Int64
private(set) var ranges: RangeSet<Int64>
private(set) var truncationSize: Int64?
private(set) var progress: Float?
init() {
self.sum = 0
self.ranges = RangeSet<Int64>()
self.truncationSize = nil
self.progress = nil
}
private init(
sum: Int64,
ranges: RangeSet<Int64>,
truncationSize: Int64?,
progress: Float?
) {
self.sum = sum
self.ranges = ranges
self.truncationSize = truncationSize
self.progress = progress
}
static func read(manager: MediaBoxFileManager, path: String) throws -> MediaBoxFileMap {
guard let length = fileSize(path) else {
throw FileMapError.generic
}
guard let fileItem = manager.open(path: path, mode: .readwrite) else {
throw FileMapError.generic
}
var result: MediaBoxFileMap?
try fileItem.access { fd in
var firstUInt32: UInt32 = 0
guard fd.read(&firstUInt32, 4) == 4 else {
throw FileMapError.generic
}
if firstUInt32 == 0x7bac1487 {
var crc: UInt32 = 0
guard fd.read(&crc, 4) == 4 else {
throw FileMapError.generic
}
var count: Int32 = 0
var sum: Int64 = 0
var ranges = RangeSet<Int64>()
guard fd.read(&count, 4) == 4 else {
throw FileMapError.generic
}
if count < 0 {
throw FileMapError.generic
}
if count < 0 || length < 4 + 4 + 4 + 8 + count * 2 * 8 {
throw FileMapError.generic
}
var truncationSizeValue: Int64 = 0
var data = Data(count: Int(8 + count * 2 * 8))
let dataCount = data.count
if !(data.withUnsafeMutableBytes { rawBytes -> Bool in
let bytes = rawBytes.baseAddress!.assumingMemoryBound(to: UInt8.self)
guard fd.read(bytes, dataCount) == dataCount else {
return false
}
memcpy(&truncationSizeValue, bytes, 8)
let calculatedCrc = Crc32(bytes, Int32(dataCount))
if calculatedCrc != crc {
return false
}
var offset = 8
for _ in 0 ..< count {
var intervalOffset: Int64 = 0
var intervalLength: Int64 = 0
memcpy(&intervalOffset, bytes.advanced(by: offset), 8)
memcpy(&intervalLength, bytes.advanced(by: offset + 8), 8)
offset += 8 * 2
ranges.insert(contentsOf: intervalOffset ..< (intervalOffset + intervalLength))
sum += intervalLength
}
return true
}) {
throw FileMapError.generic
}
let mappedTruncationSize: Int64?
if truncationSizeValue == -1 {
mappedTruncationSize = nil
} else if truncationSizeValue < 0 {
mappedTruncationSize = nil
} else {
mappedTruncationSize = truncationSizeValue
}
result = MediaBoxFileMap(
sum: sum,
ranges: ranges,
truncationSize: mappedTruncationSize,
progress: nil
)
} else {
let crc: UInt32 = firstUInt32
var count: Int32 = 0
var sum: Int32 = 0
var ranges = RangeSet<Int64>()
guard fd.read(&count, 4) == 4 else {
throw FileMapError.generic
}
if count < 0 {
throw FileMapError.generic
}
if count < 0 || UInt64(length) < 4 + 4 + UInt64(count) * 2 * 4 {
throw FileMapError.generic
}
var truncationSizeValue: Int32 = 0
var data = Data(count: Int(4 + count * 2 * 4))
let dataCount = data.count
if !(data.withUnsafeMutableBytes { rawBytes -> Bool in
let bytes = rawBytes.baseAddress!.assumingMemoryBound(to: UInt8.self)
guard fd.read(bytes, dataCount) == dataCount else {
return false
}
memcpy(&truncationSizeValue, bytes, 4)
let calculatedCrc = Crc32(bytes, Int32(dataCount))
if calculatedCrc != crc {
return false
}
var offset = 4
for _ in 0 ..< count {
var intervalOffset: Int32 = 0
var intervalLength: Int32 = 0
memcpy(&intervalOffset, bytes.advanced(by: offset), 4)
memcpy(&intervalLength, bytes.advanced(by: offset + 4), 4)
offset += 8
ranges.insert(contentsOf: Int64(intervalOffset) ..< Int64(intervalOffset + intervalLength))
sum += intervalLength
}
return true
}) {
throw FileMapError.generic
}
let mappedTruncationSize: Int64?
if truncationSizeValue == -1 {
mappedTruncationSize = nil
} else {
mappedTruncationSize = Int64(truncationSizeValue)
}
result = MediaBoxFileMap(
sum: Int64(sum),
ranges: ranges,
truncationSize: mappedTruncationSize,
progress: nil
)
}
}
guard let result = result else {
throw FileMapError.generic
}
return result
}
func serialize(manager: MediaBoxFileManager, to path: String) {
guard let fileItem = manager.open(path: path, mode: .readwrite) else {
postboxLog("MediaBoxFile: serialize: cannot open file")
return
}
let _ = try? fileItem.access { file in
file.seek(position: 0)
let buffer = WriteBuffer()
var magic: UInt32 = 0x7bac1487
buffer.write(&magic, offset: 0, length: 4)
var zero: Int32 = 0
buffer.write(&zero, offset: 0, length: 4)
let rangeView = self.ranges.ranges
var count: Int32 = Int32(rangeView.count)
buffer.write(&count, offset: 0, length: 4)
var truncationSizeValue: Int64 = self.truncationSize ?? -1
buffer.write(&truncationSizeValue, offset: 0, length: 8)
for range in rangeView {
var intervalOffset = range.lowerBound
var intervalLength = range.upperBound - range.lowerBound
buffer.write(&intervalOffset, offset: 0, length: 8)
buffer.write(&intervalLength, offset: 0, length: 8)
}
var crc: UInt32 = Crc32(buffer.memory.advanced(by: 4 + 4 + 4), Int32(buffer.length - (4 + 4 + 4)))
memcpy(buffer.memory.advanced(by: 4), &crc, 4)
let written = file.write(buffer.memory, count: buffer.length)
assert(written == buffer.length)
}
}
fileprivate func fill(_ range: Range<Int64>) {
var previousCount: Int64 = 0
for intersectionRange in self.ranges.intersection(RangeSet<Int64>(range)).ranges {
previousCount += intersectionRange.upperBound - intersectionRange.lowerBound
}
self.ranges.insert(contentsOf: range)
self.sum += (range.upperBound - range.lowerBound) - previousCount
}
fileprivate func truncate(_ size: Int64) {
self.truncationSize = size
}
fileprivate func progressUpdated(_ progress: Float) {
self.progress = progress
}
fileprivate func reset() {
self.truncationSize = nil
self.ranges = RangeSet<Int64>()
self.sum = 0
self.progress = nil
}
fileprivate func contains(_ range: Range<Int64>) -> Range<Int64>? {
let maxValue: Int64
if let truncationSize = self.truncationSize {
maxValue = truncationSize
} else {
maxValue = Int64.max
}
let clippedUpperBound = min(maxValue, range.upperBound)
let clippedRange: Range<Int64> = min(range.lowerBound, clippedUpperBound) ..< clippedUpperBound
let clippedRangeSet = RangeSet<Int64>(clippedRange)
if self.ranges.isSuperset(of: clippedRangeSet) {
return clippedRange
} else {
return nil
}
}
}
private class MediaBoxPartialFileDataRequest {
let range: Range<Int64>
var waitingUntilAfterInitialFetch: Bool
@ -1182,7 +740,7 @@ private enum MediaBoxFileContent {
case partial(MediaBoxPartialFile)
}
final class MediaBoxFileContext {
final class MediaBoxFileContextImpl: MediaBoxFileContext {
private let queue: Queue
private let path: String
private let partialPath: String

View File

@ -0,0 +1,18 @@
import Foundation
import SwiftSignalKit
import RangeSet
protocol MediaBoxFileContext: AnyObject {
var isEmpty: Bool { get }
func addReference() -> Int
func removeReference(_ index: Int)
func data(range: Range<Int64>, waitUntilAfterInitialFetch: Bool, next: @escaping (MediaResourceData) -> Void) -> Disposable
func fetched(range: Range<Int64>, priority: MediaBoxFetchPriority, fetch: @escaping (Signal<[(Range<Int64>, MediaBoxFetchPriority)], NoError>) -> Signal<MediaResourceDataFetchResult, MediaResourceDataFetchError>, error: @escaping (MediaResourceDataFetchError) -> Void, completed: @escaping () -> Void) -> Disposable
func fetchedFullRange(fetch: @escaping (Signal<[(Range<Int64>, MediaBoxFetchPriority)], NoError>) -> Signal<MediaResourceDataFetchResult, MediaResourceDataFetchError>, error: @escaping (MediaResourceDataFetchError) -> Void, completed: @escaping () -> Void) -> Disposable
func cancelFullRangeFetches()
func rangeStatus(next: @escaping (RangeSet<Int64>) -> Void, completed: @escaping () -> Void) -> Disposable
func status(next: @escaping (MediaResourceStatus) -> Void, completed: @escaping () -> Void, size: Int64?) -> Disposable
}

View File

@ -0,0 +1,760 @@
import Foundation
import RangeSet
import SwiftSignalKit
final class MediaBoxFileContextV2Impl: MediaBoxFileContext {
private final class RangeRequest {
let value: Range<Int64>
let priority: MediaBoxFetchPriority
let isFullRange: Bool
let error: (MediaResourceDataFetchError) -> Void
let completed: () -> Void
init(
value: Range<Int64>,
priority: MediaBoxFetchPriority,
isFullRange: Bool,
error: @escaping (MediaResourceDataFetchError) -> Void,
completed: @escaping () -> Void
) {
self.value = value
self.priority = priority
self.isFullRange = isFullRange
self.error = error
self.completed = completed
}
}
private final class StatusRequest {
let size: Int64?
let next: (MediaResourceStatus) -> Void
let completed: () -> Void
var reportedStatus: MediaResourceStatus?
init(size: Int64?, next: @escaping (MediaResourceStatus) -> Void, completed: @escaping () -> Void) {
self.size = size
self.next = next
self.completed = completed
}
}
private final class PartialDataRequest {
let range: Range<Int64>
let next: (MediaResourceData) -> Void
var waitingUntilAfterInitialFetch: Bool
var reportedStatus: MediaResourceData?
init(
range: Range<Int64>,
waitUntilAfterInitialFetch: Bool,
next: @escaping (MediaResourceData) -> Void
) {
self.range = range
self.waitingUntilAfterInitialFetch = waitUntilAfterInitialFetch
self.next = next
}
}
private final class RangeStatusRequest {
let next: (RangeSet<Int64>) -> Void
let completed: () -> Void
var reportedStatus: RangeSet<Int64>?
init(
next: @escaping (RangeSet<Int64>) -> Void,
completed: @escaping () -> Void
) {
self.next = next
self.completed = completed
}
}
private struct MaterializedRangeRequest: Equatable {
let range: Range<Int64>
let priority: MediaBoxFetchPriority
init(
range: Range<Int64>,
priority: MediaBoxFetchPriority
) {
self.range = range
self.priority = priority
}
}
private final class PendingFetch {
let initialFilterRanges: RangeSet<Int64>
let ranges = Promise<[(Range<Int64>, MediaBoxFetchPriority)]>()
let disposable: Disposable
init(initialFilterRanges: RangeSet<Int64>, disposable: Disposable) {
self.initialFilterRanges = initialFilterRanges
self.disposable = disposable
}
}
private final class PartialState {
private let queue: Queue
private let manager: MediaBoxFileManager
private let storageBox: StorageBox
private let resourceId: Data
private let partialPath: String
private let fullPath: String
private let metaPath: String
private let destinationFile: MediaBoxFileManager.Item?
private let fileMap: MediaBoxFileMap
private var rangeRequests = Bag<RangeRequest>()
private var statusRequests = Bag<StatusRequest>()
private var rangeStatusRequests = Bag<RangeStatusRequest>()
private var partialDataRequests = Bag<PartialDataRequest>()
private var fetchImpl: ((Signal<[(Range<Int64>, MediaBoxFetchPriority)], NoError>) -> Signal<MediaResourceDataFetchResult, MediaResourceDataFetchError>)?
private var materializedRangeRequests: [MaterializedRangeRequest] = []
private var pendingFetch: PendingFetch?
private var hasPerformedAnyFetch: Bool = false
private var isComplete: Bool = false
init(
queue: Queue,
manager: MediaBoxFileManager,
storageBox: StorageBox,
resourceId: Data,
partialPath: String,
fullPath: String,
metaPath: String
) {
self.queue = queue
self.manager = manager
self.storageBox = storageBox
self.resourceId = resourceId
self.partialPath = partialPath
self.fullPath = fullPath
self.metaPath = metaPath
do {
self.fileMap = try MediaBoxFileMap.read(manager: self.manager, path: self.metaPath)
} catch {
let _ = try? FileManager.default.removeItem(atPath: self.metaPath)
self.fileMap = MediaBoxFileMap()
}
self.destinationFile = self.manager.open(path: self.partialPath, mode: .readwrite)
if FileManager.default.fileExists(atPath: self.fullPath) {
self.isComplete = true
}
}
func request(
range: Range<Int64>,
isFullRange: Bool,
priority: MediaBoxFetchPriority,
fetch: @escaping (Signal<[(Range<Int64>, MediaBoxFetchPriority)], NoError>) -> Signal<MediaResourceDataFetchResult, MediaResourceDataFetchError>,
error: @escaping (MediaResourceDataFetchError) -> Void,
completed: @escaping () -> Void
) -> Disposable {
assert(self.queue.isCurrent())
self.fetchImpl = fetch
let request = RangeRequest(
value: range,
priority: priority,
isFullRange: isFullRange,
error: error,
completed: completed
)
if self.updateRangeRequest(request: request) {
return EmptyDisposable
} else {
let index = self.rangeRequests.add(request)
self.updateRequests()
let queue = self.queue
return ActionDisposable { [weak self] in
queue.async {
guard let `self` = self else {
return
}
self.rangeRequests.remove(index)
self.updateRequests()
}
}
}
}
func cancelFullRangeFetches() {
for (index, rangeRequest) in self.rangeRequests.copyItemsWithIndices() {
if rangeRequest.isFullRange {
self.rangeRequests.remove(index)
}
}
self.updateRequests()
}
func status(next: @escaping (MediaResourceStatus) -> Void, completed: @escaping () -> Void, size: Int64?) -> Disposable {
assert(self.queue.isCurrent())
let request = StatusRequest(
size: size,
next: next,
completed: completed
)
if self.updateStatusRequest(request: request) {
return EmptyDisposable
} else {
let index = self.statusRequests.add(request)
let queue = self.queue
return ActionDisposable { [weak self] in
queue.async {
guard let `self` = self else {
return
}
self.statusRequests.remove(index)
}
}
}
}
func partialData(range: Range<Int64>, waitUntilAfterInitialFetch: Bool, next: @escaping (MediaResourceData) -> Void) -> Disposable {
let request = PartialDataRequest(
range: range,
waitUntilAfterInitialFetch: waitUntilAfterInitialFetch && !self.hasPerformedAnyFetch,
next: next
)
if self.updatePartialDataRequest(request: request) {
return EmptyDisposable
} else {
let index = self.partialDataRequests.add(request)
let queue = self.queue
return ActionDisposable { [weak self] in
queue.async {
guard let `self` = self else {
return
}
self.partialDataRequests.remove(index)
}
}
}
}
func rangeStatus(
next: @escaping (RangeSet<Int64>) -> Void,
completed: @escaping () -> Void
) -> Disposable {
assert(self.queue.isCurrent())
let request = RangeStatusRequest(
next: next,
completed: completed
)
if self.updateRangeStatusRequest(request: request) {
return EmptyDisposable
} else {
let index = self.rangeStatusRequests.add(request)
let queue = self.queue
return ActionDisposable { [weak self] in
queue.async {
guard let `self` = self else {
return
}
self.rangeStatusRequests.remove(index)
}
}
}
}
private func updateRequests() {
var rangesByPriority: [MediaBoxFetchPriority: RangeSet<Int64>] = [:]
for (index, rangeRequest) in self.rangeRequests.copyItemsWithIndices() {
if self.updateRangeRequest(request: rangeRequest) {
self.rangeRequests.remove(index)
continue
}
if rangesByPriority[rangeRequest.priority] == nil {
rangesByPriority[rangeRequest.priority] = RangeSet()
}
rangesByPriority[rangeRequest.priority]?.formUnion(RangeSet<Int64>(rangeRequest.value))
}
let initialFilterRanges: RangeSet<Int64>
if let current = self.pendingFetch {
initialFilterRanges = current.initialFilterRanges
} else {
initialFilterRanges = self.fileMap.ranges
}
var materializedRangeRequests: [MaterializedRangeRequest] = []
for (priority, ranges) in rangesByPriority.sorted(by: { $0.key.rawValue < $1.key.rawValue }) {
let filteredRanges = ranges.subtracting(initialFilterRanges)
for range in filteredRanges.ranges {
materializedRangeRequests.append(MaterializedRangeRequest(range: range, priority: priority))
}
}
if self.materializedRangeRequests != materializedRangeRequests {
self.materializedRangeRequests = materializedRangeRequests
if !materializedRangeRequests.isEmpty {
if let fetchImpl = self.fetchImpl {
let pendingFetch: PendingFetch
if let current = self.pendingFetch {
pendingFetch = current
} else {
let disposable = MetaDisposable()
pendingFetch = PendingFetch(initialFilterRanges: initialFilterRanges, disposable: disposable)
self.pendingFetch = pendingFetch
self.hasPerformedAnyFetch = true
let queue = self.queue
disposable.set(fetchImpl(pendingFetch.ranges.get()).start(next: { [weak self] result in
queue.async {
guard let `self` = self else {
return
}
self.processFetchResult(result: result)
}
}, error: { [weak self] error in
queue.async {
guard let `self` = self else {
return
}
self.processFetchError(error: error)
}
}))
}
pendingFetch.ranges.set(.single(materializedRangeRequests.map { request -> (Range<Int64>, MediaBoxFetchPriority) in
return (request.range, request.priority)
}))
}
} else {
if let pendingFetch = self.pendingFetch {
self.pendingFetch = nil
pendingFetch.disposable.dispose()
}
}
}
self.updateStatusRequests()
}
private func processFetchResult(result: MediaResourceDataFetchResult) {
assert(self.queue.isCurrent())
switch result {
case let .dataPart(resourceOffset, data, dataRange, complete):
self.processWrite(resourceOffset: resourceOffset, data: data, dataRange: dataRange)
if complete {
if let maxOffset = self.fileMap.ranges.ranges.reversed().first?.upperBound {
let maxValue = max(resourceOffset + Int64(dataRange.count), Int64(maxOffset))
self.fileMap.truncate(maxValue)
}
}
case let .resourceSizeUpdated(size):
self.fileMap.truncate(size)
self.fileMap.serialize(manager: self.manager, to: self.metaPath)
case let .progressUpdated(progress):
let _ = progress
case let .replaceHeader(data, range):
self.processWrite(resourceOffset: 0, data: data, dataRange: range)
case let .moveLocalFile(path):
do {
try FileManager.default.moveItem(atPath: path, toPath: self.fullPath)
self.processMovedFile()
} catch let e {
postboxLog("MediaBoxFileContextV2Impl: error moving temp file at \(self.fullPath): \(e)")
}
case let .moveTempFile(file):
do {
try FileManager.default.moveItem(atPath: file.path, toPath: self.fullPath)
self.processMovedFile()
} catch let e {
postboxLog("MediaBoxFileContextV2Impl: error moving temp file at \(self.fullPath): \(e)")
}
TempBox.shared.dispose(file)
case let .copyLocalItem(localItem):
do {
if localItem.copyTo(url: URL(fileURLWithPath: self.fullPath)) {
unlink(self.partialPath)
unlink(self.metaPath)
}
self.processMovedFile()
}
case .reset:
if !self.fileMap.ranges.isEmpty {
self.fileMap.reset()
self.fileMap.serialize(manager: self.manager, to: self.metaPath)
}
}
if !self.isComplete, let truncationSize = self.fileMap.truncationSize, truncationSize == self.fileMap.sum {
self.isComplete = true
let linkResult = link(self.partialPath, self.fullPath)
if linkResult != 0 {
postboxLog("MediaBoxFileContextV2Impl: error while linking \(self.partialPath): \(linkResult)")
}
}
self.updateRequests()
}
private func processWrite(resourceOffset: Int64, data: Data, dataRange: Range<Int64>) {
if let destinationFile = self.destinationFile {
do {
try destinationFile.access { fd in
fd.seek(position: resourceOffset)
let written = data.withUnsafeBytes { rawBytes -> Int in
let bytes = rawBytes.baseAddress!.assumingMemoryBound(to: UInt8.self)
return fd.write(bytes.advanced(by: Int(dataRange.lowerBound)), count: dataRange.count)
}
assert(written == dataRange.count)
}
let range: Range<Int64> = resourceOffset ..< (resourceOffset + Int64(dataRange.count))
self.fileMap.fill(range)
self.fileMap.serialize(manager: self.manager, to: self.metaPath)
self.storageBox.update(id: self.resourceId, size: self.fileMap.sum)
} catch let e {
postboxLog("MediaBoxFileContextV2Impl: error writing file at \(self.partialPath): \(e)")
}
}
}
private func processMovedFile() {
if let size = fileSize(self.fullPath) {
self.isComplete = true
self.storageBox.update(id: self.resourceId, size: size)
}
}
private func processFetchError(error: MediaResourceDataFetchError) {
assert(self.queue.isCurrent())
let rangeRequests = self.rangeRequests.copyItems()
self.rangeRequests.removeAll()
self.statusRequests.removeAll()
self.rangeStatusRequests.removeAll()
//TODO:set status to .remote?
for rangeRequest in rangeRequests {
rangeRequest.error(error)
}
}
private func updateRangeRequest(request: RangeRequest) -> Bool {
assert(self.queue.isCurrent())
if self.fileMap.contains(request.value) != nil {
request.completed()
return true
} else {
return false
}
}
private func updateStatusRequests() {
for (index, partialDataRequest) in self.partialDataRequests.copyItemsWithIndices() {
if self.updatePartialDataRequest(request: partialDataRequest) {
self.partialDataRequests.remove(index)
}
}
for (index, statusRequest) in self.statusRequests.copyItemsWithIndices() {
if self.updateStatusRequest(request: statusRequest) {
self.statusRequests.remove(index)
}
}
for (index, rangeStatusRequest) in self.rangeStatusRequests.copyItemsWithIndices() {
if self.updateRangeStatusRequest(request: rangeStatusRequest) {
self.rangeStatusRequests.remove(index)
}
}
}
private func updatePartialDataRequest(request: PartialDataRequest) -> Bool {
assert(self.queue.isCurrent())
if self.isComplete, let size = fileSize(self.fullPath) {
var clampedLowerBound = request.range.lowerBound
var clampedUpperBound = request.range.upperBound
if clampedUpperBound > size {
clampedUpperBound = size
}
if clampedLowerBound > clampedUpperBound {
clampedLowerBound = clampedUpperBound
}
let updatedStatus = MediaResourceData(path: self.fullPath, offset: clampedLowerBound, size: clampedUpperBound - clampedLowerBound, complete: true)
if request.reportedStatus != updatedStatus {
request.reportedStatus = updatedStatus
request.next(updatedStatus)
}
return true
} else if self.fileMap.contains(request.range) != nil {
let updatedStatus = MediaResourceData(path: self.partialPath, offset: request.range.lowerBound, size: request.range.upperBound - request.range.lowerBound, complete: true)
if request.reportedStatus != updatedStatus {
request.reportedStatus = updatedStatus
request.next(updatedStatus)
}
return true
} else {
let updatedStatus = MediaResourceData(path: self.partialPath, offset: request.range.lowerBound, size: 0, complete: false)
if request.reportedStatus != updatedStatus {
if request.waitingUntilAfterInitialFetch {
if self.hasPerformedAnyFetch {
request.waitingUntilAfterInitialFetch = false
request.reportedStatus = updatedStatus
request.next(updatedStatus)
}
} else {
request.reportedStatus = updatedStatus
request.next(updatedStatus)
}
}
return false
}
}
private func updateStatusRequest(request: StatusRequest) -> Bool {
assert(self.queue.isCurrent())
let updatedStatus: MediaResourceStatus
if self.isComplete {
updatedStatus = .Local
} else if let totalSize = self.fileMap.truncationSize ?? request.size {
let progress = Float(self.fileMap.sum) / Float(totalSize)
if self.pendingFetch != nil {
updatedStatus = .Fetching(isActive: true, progress: progress)
} else {
updatedStatus = .Remote(progress: progress)
}
} else if self.pendingFetch != nil {
updatedStatus = .Fetching(isActive: true, progress: 0.0)
} else {
updatedStatus = .Remote(progress: 0.0)
}
if request.reportedStatus != updatedStatus {
request.reportedStatus = updatedStatus
request.next(updatedStatus)
}
return false
}
private func updateRangeStatusRequest(request: RangeStatusRequest) -> Bool {
assert(self.queue.isCurrent())
let status: RangeSet<Int64>
if self.isComplete, let size = fileSize(self.fullPath) {
status = RangeSet(0 ..< size)
} else {
status = self.fileMap.ranges
}
if request.reportedStatus != status {
request.reportedStatus = status
request.next(status)
if let truncationSize = self.fileMap.truncationSize, self.fileMap.sum == truncationSize {
request.completed()
return true
}
}
return false
}
}
private let queue: Queue
private let manager: MediaBoxFileManager
private let storageBox: StorageBox
private let resourceId: Data
private let path: String
private let partialPath: String
private let metaPath: String
private let references = Bag<Void>()
private var partialState: PartialState?
var isEmpty: Bool {
return self.references.isEmpty
}
init?(
queue: Queue,
manager: MediaBoxFileManager,
storageBox: StorageBox,
resourceId: Data,
path: String,
partialPath: String,
metaPath: String
) {
self.queue = queue
self.manager = manager
self.storageBox = storageBox
self.resourceId = resourceId
self.path = path
self.partialPath = partialPath
self.metaPath = metaPath
}
func addReference() -> Int {
assert(self.queue.isCurrent())
return self.references.add(Void())
}
func removeReference(_ index: Int) {
assert(self.queue.isCurrent())
return self.references.remove(index)
}
private func withPartialState<T>(_ f: (PartialState) -> T) -> T {
if let partialState = self.partialState {
return f(partialState)
} else {
let partialState = PartialState(
queue: self.queue,
manager: self.manager,
storageBox: self.storageBox,
resourceId: self.resourceId,
partialPath: self.partialPath,
fullPath: self.path,
metaPath: self.metaPath
)
self.partialState = partialState
return f(partialState)
}
}
func data(range: Range<Int64>, waitUntilAfterInitialFetch: Bool, next: @escaping (MediaResourceData) -> Void) -> Disposable {
assert(self.queue.isCurrent())
if let size = fileSize(self.path) {
var clampedLowerBound = range.lowerBound
var clampedUpperBound = range.upperBound
if clampedUpperBound > size {
clampedUpperBound = size
}
if clampedLowerBound > clampedUpperBound {
clampedLowerBound = clampedUpperBound
}
next(MediaResourceData(path: self.path, offset: clampedLowerBound, size: clampedUpperBound - clampedLowerBound, complete: true))
return EmptyDisposable
} else {
return self.withPartialState { partialState in
return partialState.partialData(
range: range,
waitUntilAfterInitialFetch: waitUntilAfterInitialFetch,
next: next
)
}
}
}
func fetched(
range: Range<Int64>,
priority: MediaBoxFetchPriority,
fetch: @escaping (Signal<[(Range<Int64>, MediaBoxFetchPriority)], NoError>) -> Signal<MediaResourceDataFetchResult, MediaResourceDataFetchError>,
error: @escaping (MediaResourceDataFetchError) -> Void,
completed: @escaping () -> Void
) -> Disposable {
assert(self.queue.isCurrent())
if FileManager.default.fileExists(atPath: self.path) {
completed()
return EmptyDisposable
} else {
return self.withPartialState { partialState in
return partialState.request(
range: range,
isFullRange: false,
priority: priority,
fetch: fetch,
error: error,
completed: completed
)
}
}
}
func fetchedFullRange(
fetch: @escaping (Signal<[(Range<Int64>, MediaBoxFetchPriority)], NoError>) -> Signal<MediaResourceDataFetchResult, MediaResourceDataFetchError>,
error: @escaping (MediaResourceDataFetchError) -> Void,
completed: @escaping () -> Void
) -> Disposable {
assert(self.queue.isCurrent())
if FileManager.default.fileExists(atPath: self.path) {
completed()
return EmptyDisposable
} else {
return self.withPartialState { partialState in
return partialState.request(
range: 0 ..< Int64.max,
isFullRange: true,
priority: .default,
fetch: fetch,
error: error,
completed: completed
)
}
}
}
func cancelFullRangeFetches() {
assert(self.queue.isCurrent())
if let partialState = self.partialState {
partialState.cancelFullRangeFetches()
}
}
func rangeStatus(next: @escaping (RangeSet<Int64>) -> Void, completed: @escaping () -> Void) -> Disposable {
assert(self.queue.isCurrent())
if let size = fileSize(self.path) {
next(RangeSet<Int64>([0 ..< Int64(size) as Range<Int64>]))
completed()
return EmptyDisposable
} else {
return self.withPartialState { partialState in
return partialState.rangeStatus(next: next, completed: completed)
}
}
}
func status(next: @escaping (MediaResourceStatus) -> Void, completed: @escaping () -> Void, size: Int64?) -> Disposable {
assert(self.queue.isCurrent())
if let _ = fileSize(self.path) {
next(.Local)
completed()
return EmptyDisposable
} else {
return self.withPartialState { partialState in
return partialState.status(next: next, completed: completed, size: size)
}
}
}
}

View File

@ -0,0 +1,173 @@
import Foundation
import SwiftSignalKit
import ManagedFile
final class MediaBoxFileManager {
enum Mode {
case read
case readwrite
}
enum AccessError: Error {
case generic
}
final class Item {
final class Accessor {
private let file: ManagedFile
init(file: ManagedFile) {
self.file = file
}
func write(_ data: UnsafeRawPointer, count: Int) -> Int {
return self.file.write(data, count: count)
}
func read(_ data: UnsafeMutableRawPointer, _ count: Int) -> Int {
return self.file.read(data, count)
}
func readData(count: Int) -> Data {
return self.file.readData(count: count)
}
func seek(position: Int64) {
self.file.seek(position: position)
}
}
weak var manager: MediaBoxFileManager?
let path: String
let mode: Mode
weak var context: ItemContext?
init(manager: MediaBoxFileManager, path: String, mode: Mode) {
self.manager = manager
self.path = path
self.mode = mode
}
deinit {
if let manager = self.manager, let context = self.context {
manager.discardItemContext(context: context)
}
}
func access(_ f: (Accessor) throws -> Void) throws {
if let context = self.context {
try f(Accessor(file: context.file))
} else {
if let manager = self.manager {
if let context = manager.takeContext(path: self.path, mode: self.mode) {
self.context = context
try f(Accessor(file: context.file))
} else {
throw AccessError.generic
}
} else {
throw AccessError.generic
}
}
}
func sync() {
if let context = self.context {
context.sync()
}
}
}
final class ItemContext {
let id: Int
let path: String
let mode: Mode
let file: ManagedFile
private var isDisposed: Bool = false
init?(id: Int, path: String, mode: Mode) {
let mappedMode: ManagedFile.Mode
switch mode {
case .read:
mappedMode = .read
case .readwrite:
mappedMode = .readwrite
}
guard let file = ManagedFile(queue: nil, path: path, mode: mappedMode) else {
return nil
}
self.file = file
self.id = id
self.path = path
self.mode = mode
}
deinit {
assert(self.isDisposed)
}
func dispose() {
if !self.isDisposed {
self.isDisposed = true
self.file._unsafeClose()
} else {
assertionFailure()
}
}
func sync() {
self.file.sync()
}
}
private let queue: Queue?
private var contexts: [Int: ItemContext] = [:]
private var nextItemId: Int = 0
private let maxOpenFiles: Int
init(queue: Queue?) {
self.queue = queue
self.maxOpenFiles = 16
}
func open(path: String, mode: Mode) -> Item? {
if let queue = self.queue {
assert(queue.isCurrent())
}
return Item(manager: self, path: path, mode: mode)
}
private func takeContext(path: String, mode: Mode) -> ItemContext? {
if let queue = self.queue {
assert(queue.isCurrent())
}
if self.contexts.count > self.maxOpenFiles {
if let minKey = self.contexts.keys.min(), let context = self.contexts[minKey] {
self.discardItemContext(context: context)
}
}
let id = self.nextItemId
self.nextItemId += 1
let context = ItemContext(id: id, path: path, mode: mode)
self.contexts[id] = context
return context
}
private func discardItemContext(context: ItemContext) {
if let queue = self.queue {
assert(queue.isCurrent())
}
if let context = self.contexts.removeValue(forKey: context.id) {
context.dispose()
}
}
}

View File

@ -0,0 +1,275 @@
import Foundation
import RangeSet
import Crc32
final class MediaBoxFileMap {
enum FileMapError: Error {
case generic
}
private(set) var sum: Int64
private(set) var ranges: RangeSet<Int64>
private(set) var truncationSize: Int64?
private(set) var progress: Float?
init() {
self.sum = 0
self.ranges = RangeSet<Int64>()
self.truncationSize = nil
self.progress = nil
}
private init(
sum: Int64,
ranges: RangeSet<Int64>,
truncationSize: Int64?,
progress: Float?
) {
self.sum = sum
self.ranges = ranges
self.truncationSize = truncationSize
self.progress = progress
}
static func read(manager: MediaBoxFileManager, path: String) throws -> MediaBoxFileMap {
guard let length = fileSize(path) else {
throw FileMapError.generic
}
guard let fileItem = manager.open(path: path, mode: .readwrite) else {
throw FileMapError.generic
}
var result: MediaBoxFileMap?
try fileItem.access { fd in
var firstUInt32: UInt32 = 0
guard fd.read(&firstUInt32, 4) == 4 else {
throw FileMapError.generic
}
if firstUInt32 == 0x7bac1487 {
var crc: UInt32 = 0
guard fd.read(&crc, 4) == 4 else {
throw FileMapError.generic
}
var count: Int32 = 0
var sum: Int64 = 0
var ranges = RangeSet<Int64>()
guard fd.read(&count, 4) == 4 else {
throw FileMapError.generic
}
if count < 0 {
throw FileMapError.generic
}
if count < 0 || length < 4 + 4 + 4 + 8 + count * 2 * 8 {
throw FileMapError.generic
}
var truncationSizeValue: Int64 = 0
var data = Data(count: Int(8 + count * 2 * 8))
let dataCount = data.count
if !(data.withUnsafeMutableBytes { rawBytes -> Bool in
let bytes = rawBytes.baseAddress!.assumingMemoryBound(to: UInt8.self)
guard fd.read(bytes, dataCount) == dataCount else {
return false
}
memcpy(&truncationSizeValue, bytes, 8)
let calculatedCrc = Crc32(bytes, Int32(dataCount))
if calculatedCrc != crc {
return false
}
var offset = 8
for _ in 0 ..< count {
var intervalOffset: Int64 = 0
var intervalLength: Int64 = 0
memcpy(&intervalOffset, bytes.advanced(by: offset), 8)
memcpy(&intervalLength, bytes.advanced(by: offset + 8), 8)
offset += 8 * 2
ranges.insert(contentsOf: intervalOffset ..< (intervalOffset + intervalLength))
sum += intervalLength
}
return true
}) {
throw FileMapError.generic
}
let mappedTruncationSize: Int64?
if truncationSizeValue == -1 {
mappedTruncationSize = nil
} else if truncationSizeValue < 0 {
mappedTruncationSize = nil
} else {
mappedTruncationSize = truncationSizeValue
}
result = MediaBoxFileMap(
sum: sum,
ranges: ranges,
truncationSize: mappedTruncationSize,
progress: nil
)
} else {
let crc: UInt32 = firstUInt32
var count: Int32 = 0
var sum: Int32 = 0
var ranges = RangeSet<Int64>()
guard fd.read(&count, 4) == 4 else {
throw FileMapError.generic
}
if count < 0 {
throw FileMapError.generic
}
if count < 0 || UInt64(length) < 4 + 4 + UInt64(count) * 2 * 4 {
throw FileMapError.generic
}
var truncationSizeValue: Int32 = 0
var data = Data(count: Int(4 + count * 2 * 4))
let dataCount = data.count
if !(data.withUnsafeMutableBytes { rawBytes -> Bool in
let bytes = rawBytes.baseAddress!.assumingMemoryBound(to: UInt8.self)
guard fd.read(bytes, dataCount) == dataCount else {
return false
}
memcpy(&truncationSizeValue, bytes, 4)
let calculatedCrc = Crc32(bytes, Int32(dataCount))
if calculatedCrc != crc {
return false
}
var offset = 4
for _ in 0 ..< count {
var intervalOffset: Int32 = 0
var intervalLength: Int32 = 0
memcpy(&intervalOffset, bytes.advanced(by: offset), 4)
memcpy(&intervalLength, bytes.advanced(by: offset + 4), 4)
offset += 8
ranges.insert(contentsOf: Int64(intervalOffset) ..< Int64(intervalOffset + intervalLength))
sum += intervalLength
}
return true
}) {
throw FileMapError.generic
}
let mappedTruncationSize: Int64?
if truncationSizeValue == -1 {
mappedTruncationSize = nil
} else {
mappedTruncationSize = Int64(truncationSizeValue)
}
result = MediaBoxFileMap(
sum: Int64(sum),
ranges: ranges,
truncationSize: mappedTruncationSize,
progress: nil
)
}
}
guard let result = result else {
throw FileMapError.generic
}
return result
}
func serialize(manager: MediaBoxFileManager, to path: String) {
guard let fileItem = manager.open(path: path, mode: .readwrite) else {
postboxLog("MediaBoxFile: serialize: cannot open file")
return
}
let _ = try? fileItem.access { file in
file.seek(position: 0)
let buffer = WriteBuffer()
var magic: UInt32 = 0x7bac1487
buffer.write(&magic, offset: 0, length: 4)
var zero: Int32 = 0
buffer.write(&zero, offset: 0, length: 4)
let rangeView = self.ranges.ranges
var count: Int32 = Int32(rangeView.count)
buffer.write(&count, offset: 0, length: 4)
var truncationSizeValue: Int64 = self.truncationSize ?? -1
buffer.write(&truncationSizeValue, offset: 0, length: 8)
for range in rangeView {
var intervalOffset = range.lowerBound
var intervalLength = range.upperBound - range.lowerBound
buffer.write(&intervalOffset, offset: 0, length: 8)
buffer.write(&intervalLength, offset: 0, length: 8)
}
var crc: UInt32 = Crc32(buffer.memory.advanced(by: 4 + 4 + 4), Int32(buffer.length - (4 + 4 + 4)))
memcpy(buffer.memory.advanced(by: 4), &crc, 4)
let written = file.write(buffer.memory, count: buffer.length)
assert(written == buffer.length)
}
}
func fill(_ range: Range<Int64>) {
var previousCount: Int64 = 0
for intersectionRange in self.ranges.intersection(RangeSet<Int64>(range)).ranges {
previousCount += intersectionRange.upperBound - intersectionRange.lowerBound
}
self.ranges.insert(contentsOf: range)
self.sum += (range.upperBound - range.lowerBound) - previousCount
}
func truncate(_ size: Int64) {
self.truncationSize = size
}
func progressUpdated(_ progress: Float) {
self.progress = progress
}
func reset() {
self.truncationSize = nil
self.ranges = RangeSet<Int64>()
self.sum = 0
self.progress = nil
}
func contains(_ range: Range<Int64>) -> Range<Int64>? {
let maxValue: Int64
if let truncationSize = self.truncationSize {
maxValue = truncationSize
} else {
maxValue = Int64.max
}
let clippedUpperBound = min(maxValue, range.upperBound)
let clippedRange: Range<Int64> = min(range.lowerBound, clippedUpperBound) ..< clippedUpperBound
let clippedRangeSet = RangeSet<Int64>(clippedRange)
if self.ranges.isSuperset(of: clippedRangeSet) {
return clippedRange
} else {
return nil
}
}
}