diff --git a/Postbox/MediaBox.swift b/Postbox/MediaBox.swift index fb5145ac2e..512cdd351c 100644 --- a/Postbox/MediaBox.swift +++ b/Postbox/MediaBox.swift @@ -50,7 +50,14 @@ public struct MediaResourceData { public let complete: Bool } -public struct MediaResourceDataFetchResult { +public enum MediaResourceDataFetchResult { + case dataPart(data: Data, range: Range, complete: Bool) + case replaceHeader(data: Data, range: Range) + case moveLocalFile(path: String) + case reset +} + +/*public struct MediaResourceDataFetchResult { public let data: Data public let complete: Bool @@ -58,7 +65,7 @@ public struct MediaResourceDataFetchResult { self.data = data self.complete = complete } -} +}*/ public struct CachedMediaResourceRepresentationResult { public let temporaryPath: String @@ -410,9 +417,14 @@ public final class MediaBox { if let strongSelf = strongSelf { strongSelf.dataQueue.async { if let dataContext = strongSelf.randomAccessContexts[resourceId] { - let storeRange = RandomAccessResourceStoreRange(offset: range.lowerBound + offset, data: result.data) - offset += result.data.count - dataContext.storeRanges([storeRange]) + switch result { + case let .dataPart(data, dataRange, _): + let storeRange = RandomAccessResourceStoreRange(offset: range.lowerBound + offset, data: data.subdata(in: dataRange)) + offset += data.count + dataContext.storeRanges([storeRange]) + default: + assertionFailure() + } } } } @@ -560,79 +572,233 @@ public final class MediaBox { close(fd) } } - }).start(next: { result in + }).start(next: { resultOption in self.dataQueue.async { let _ = self.ensureDirectoryCreated - if fd == nil { - let handle = open(paths.partial, O_WRONLY | O_CREAT | O_APPEND, S_IRUSR | S_IWUSR) - if handle >= 0 { - fd = handle - } - } - - if let thisFd = fd { - if !result.data.isEmpty { - let writeResult = result.data.withUnsafeBytes { bytes -> Int in - return write(thisFd, bytes, result.data.count) + switch resultOption { + case let .dataPart(data, dataRange, complete): + if fd == nil { + let handle = open(paths.partial, O_WRONLY | O_CREAT | O_APPEND, S_IRUSR | S_IWUSR) + if handle >= 0 { + fd = handle + } } - if writeResult != result.data.count { - print("write error \(errno)") + + if let thisFd = fd { + if !dataRange.isEmpty { + let writeResult = data.withUnsafeBytes { bytes -> Int in + return write(thisFd, bytes.advanced(by: dataRange.lowerBound), dataRange.count) + } + if writeResult != dataRange.count { + print("write error \(errno)") + } + } + + offset += dataRange.count + let updatedSize = offset + + let updatedData: MediaResourceData + if complete { + let linkResult = link(paths.partial, paths.complete) + assert(linkResult == 0) + updatedData = MediaResourceData(path: paths.complete, size: updatedSize, complete: true) + } else { + updatedData = MediaResourceData(path: paths.partial, size: updatedSize, complete: false) + } + + dataContext.data = updatedData + + let hadProcessedFetch = dataContext.processedFetch + dataContext.processedFetch = true + + for (_, subscriber) in dataContext.progresiveDataSubscribers.copyItems() { + subscriber(updatedData) + } + + if updatedData.complete { + for (_, subscriber) in dataContext.completeDataSubscribers.copyItems() { + subscriber(updatedData) + } + } else if !hadProcessedFetch { + for (waitUntilFetchStatus, subscriber) in dataContext.completeDataSubscribers.copyItems() { + if waitUntilFetchStatus { + subscriber(updatedData) + } + } + } + + let status: MediaResourceStatus + if updatedData.complete { + status = .Local + } else { + if let resourceSize = resource.size { + status = .Fetching(progress: Float(updatedSize) / Float(resourceSize)) + } else { + status = .Fetching(progress: 0.0) + } + } + + self.statusQueue.async { + if let statusContext = self.statusContexts[resourceId] { + statusContext.status = status + for subscriber in statusContext.subscribers.copyItems() { + subscriber(status) + } + } + } + } + case let .replaceHeader(data, dataRange): + if let thisFd = fd { + close(thisFd) + fd = nil + } + + if fd == nil { + let handle = open(paths.partial, O_CREAT | O_WRONLY, S_IRUSR | S_IWUSR) + if handle >= 0 { + fd = handle + } + } + + if let thisFd = fd { + if !dataRange.isEmpty { + lseek(thisFd, 0, SEEK_SET) + let writeResult = data.withUnsafeBytes { bytes -> Int in + return write(thisFd, bytes.advanced(by: dataRange.lowerBound), dataRange.count) + } + lseek(thisFd, Int64(offset), SEEK_SET) + if writeResult != dataRange.count { + print("write error \(errno)") + } + + close(thisFd) + fd = nil + } + } + case .reset: + if fd == nil { + let handle = open(paths.partial, O_WRONLY | O_CREAT | O_APPEND, S_IRUSR | S_IWUSR) + if handle >= 0 { + fd = handle + } + } + + if let fd = fd { + ftruncate(fd, 0) + lseek(fd, 0, SEEK_SET) + } else { + assertionFailure() } - } - offset += result.data.count - let updatedSize = offset + offset = 0 + let updatedSize = offset + + let updatedData: MediaResourceData + updatedData = MediaResourceData(path: paths.partial, size: updatedSize, complete: false) + + dataContext.data = updatedData + + let hadProcessedFetch = dataContext.processedFetch + dataContext.processedFetch = true + + for (_, subscriber) in dataContext.progresiveDataSubscribers.copyItems() { + subscriber(updatedData) + } + + if updatedData.complete { + for (_, subscriber) in dataContext.completeDataSubscribers.copyItems() { + subscriber(updatedData) + } + } else if !hadProcessedFetch { + for (waitUntilFetchStatus, subscriber) in dataContext.completeDataSubscribers.copyItems() { + if waitUntilFetchStatus { + subscriber(updatedData) + } + } + } + + let status: MediaResourceStatus + if updatedData.complete { + status = .Local + } else { + if let resourceSize = resource.size { + status = .Fetching(progress: Float(updatedSize) / Float(resourceSize)) + } else { + status = .Fetching(progress: 0.0) + } + } + + self.statusQueue.async { + if let statusContext = self.statusContexts[resourceId] { + statusContext.status = status + for subscriber in statusContext.subscribers.copyItems() { + subscriber(status) + } + } + } + case let .moveLocalFile(tempPath): + if let fd = fd { + close(fd) + } + unlink(paths.partial) + do { + try FileManager.default.moveItem(atPath: tempPath, toPath: paths.partial) + } catch { + assertionFailure() + } - let updatedData: MediaResourceData - if result.complete { + guard let offset = fileSize(paths.partial) else { + assertionFailure() + return + } + let updatedSize = offset + + let updatedData: MediaResourceData let linkResult = link(paths.partial, paths.complete) assert(linkResult == 0) updatedData = MediaResourceData(path: paths.complete, size: updatedSize, complete: true) - } else { - updatedData = MediaResourceData(path: paths.partial, size: updatedSize, complete: false) - } - - dataContext.data = updatedData - - let hadProcessedFetch = dataContext.processedFetch - dataContext.processedFetch = true - - for (_, subscriber) in dataContext.progresiveDataSubscribers.copyItems() { - subscriber(updatedData) - } - - if updatedData.complete { - for (_, subscriber) in dataContext.completeDataSubscribers.copyItems() { + + dataContext.data = updatedData + + let hadProcessedFetch = dataContext.processedFetch + dataContext.processedFetch = true + + for (_, subscriber) in dataContext.progresiveDataSubscribers.copyItems() { subscriber(updatedData) } - } else if !hadProcessedFetch { - for (waitUntilFetchStatus, subscriber) in dataContext.completeDataSubscribers.copyItems() { - if waitUntilFetchStatus { + + if updatedData.complete { + for (_, subscriber) in dataContext.completeDataSubscribers.copyItems() { subscriber(updatedData) } - } - } - - let status: MediaResourceStatus - if updatedData.complete { - status = .Local - } else { - if let resourceSize = resource.size { - status = .Fetching(progress: Float(updatedSize) / Float(resourceSize)) - } else { - status = .Fetching(progress: 0.0) - } - } - - self.statusQueue.async { - if let statusContext = self.statusContexts[resourceId] { - statusContext.status = status - for subscriber in statusContext.subscribers.copyItems() { - subscriber(status) + } else if !hadProcessedFetch { + for (waitUntilFetchStatus, subscriber) in dataContext.completeDataSubscribers.copyItems() { + if waitUntilFetchStatus { + subscriber(updatedData) + } + } + } + + let status: MediaResourceStatus + if updatedData.complete { + status = .Local + } else { + if let resourceSize = resource.size { + status = .Fetching(progress: Float(updatedSize) / Float(resourceSize)) + } else { + status = .Fetching(progress: 0.0) + } + } + + self.statusQueue.async { + if let statusContext = self.statusContexts[resourceId] { + statusContext.status = status + for subscriber in statusContext.subscribers.copyItems() { + subscriber(status) + } } } - } } } }) diff --git a/Postbox/MediaResource.swift b/Postbox/MediaResource.swift index f204bc2441..96c028cba1 100644 --- a/Postbox/MediaResource.swift +++ b/Postbox/MediaResource.swift @@ -26,6 +26,7 @@ public protocol MediaResource { var id: MediaResourceId { get } var size: Int? { get } var streamable: Bool { get } + var headerSize: Int32 { get } } public extension MediaResource { @@ -36,6 +37,10 @@ public extension MediaResource { var streamable: Bool { return false } + + var headerSize: Int32 { + return 0 + } } public protocol CachedMediaResourceRepresentation { diff --git a/Postbox/PeerMergedOperationLogView.swift b/Postbox/PeerMergedOperationLogView.swift index 73171c16a7..e07117b152 100644 --- a/Postbox/PeerMergedOperationLogView.swift +++ b/Postbox/PeerMergedOperationLogView.swift @@ -30,7 +30,9 @@ final class MutablePeerMergedOperationLogView { } } else { updated = true - assert(self.entries.isEmpty) + if !self.entries.isEmpty { + assertionFailure("self.entries.isEmpty == false for tag \(self.tag)") + } self.entries.append(entry) self.tailIndex = entry.mergedIndex } diff --git a/Postbox/Postbox.swift b/Postbox/Postbox.swift index 2d4b5c30a9..33b858a76f 100644 --- a/Postbox/Postbox.swift +++ b/Postbox/Postbox.swift @@ -146,6 +146,11 @@ public final class Modifier { self.postbox?.setPeerChatState(id, state: state) } + public func getPeerChatInterfaceState(_ id: PeerId) -> PeerChatInterfaceState? { + assert(!self.disposed) + return self.postbox?.peerChatInterfaceStateTable.get(id) + } + public func updatePeerChatInterfaceState(_ id: PeerId, update: (PeerChatInterfaceState?) -> (PeerChatInterfaceState?)) { assert(!self.disposed) self.postbox?.updatePeerChatInterfaceState(id, update: update)