no message

This commit is contained in:
Peter
2017-03-23 21:26:23 +03:00
parent 7227f38311
commit 04d78772be
4 changed files with 241 additions and 63 deletions

View File

@@ -50,7 +50,14 @@ public struct MediaResourceData {
public let complete: Bool
}
public struct MediaResourceDataFetchResult {
public enum MediaResourceDataFetchResult {
case dataPart(data: Data, range: Range<Int>, complete: Bool)
case replaceHeader(data: Data, range: Range<Int>)
case moveLocalFile(path: String)
case reset
}
/*public struct MediaResourceDataFetchResult {
public let data: Data
public let complete: Bool
@@ -58,7 +65,7 @@ public struct MediaResourceDataFetchResult {
self.data = data
self.complete = complete
}
}
}*/
public struct CachedMediaResourceRepresentationResult {
public let temporaryPath: String
@@ -410,9 +417,14 @@ public final class MediaBox {
if let strongSelf = strongSelf {
strongSelf.dataQueue.async {
if let dataContext = strongSelf.randomAccessContexts[resourceId] {
let storeRange = RandomAccessResourceStoreRange(offset: range.lowerBound + offset, data: result.data)
offset += result.data.count
dataContext.storeRanges([storeRange])
switch result {
case let .dataPart(data, dataRange, _):
let storeRange = RandomAccessResourceStoreRange(offset: range.lowerBound + offset, data: data.subdata(in: dataRange))
offset += data.count
dataContext.storeRanges([storeRange])
default:
assertionFailure()
}
}
}
}
@@ -560,79 +572,233 @@ public final class MediaBox {
close(fd)
}
}
}).start(next: { result in
}).start(next: { resultOption in
self.dataQueue.async {
let _ = self.ensureDirectoryCreated
if fd == nil {
let handle = open(paths.partial, O_WRONLY | O_CREAT | O_APPEND, S_IRUSR | S_IWUSR)
if handle >= 0 {
fd = handle
}
}
if let thisFd = fd {
if !result.data.isEmpty {
let writeResult = result.data.withUnsafeBytes { bytes -> Int in
return write(thisFd, bytes, result.data.count)
switch resultOption {
case let .dataPart(data, dataRange, complete):
if fd == nil {
let handle = open(paths.partial, O_WRONLY | O_CREAT | O_APPEND, S_IRUSR | S_IWUSR)
if handle >= 0 {
fd = handle
}
}
if writeResult != result.data.count {
print("write error \(errno)")
if let thisFd = fd {
if !dataRange.isEmpty {
let writeResult = data.withUnsafeBytes { bytes -> Int in
return write(thisFd, bytes.advanced(by: dataRange.lowerBound), dataRange.count)
}
if writeResult != dataRange.count {
print("write error \(errno)")
}
}
offset += dataRange.count
let updatedSize = offset
let updatedData: MediaResourceData
if complete {
let linkResult = link(paths.partial, paths.complete)
assert(linkResult == 0)
updatedData = MediaResourceData(path: paths.complete, size: updatedSize, complete: true)
} else {
updatedData = MediaResourceData(path: paths.partial, size: updatedSize, complete: false)
}
dataContext.data = updatedData
let hadProcessedFetch = dataContext.processedFetch
dataContext.processedFetch = true
for (_, subscriber) in dataContext.progresiveDataSubscribers.copyItems() {
subscriber(updatedData)
}
if updatedData.complete {
for (_, subscriber) in dataContext.completeDataSubscribers.copyItems() {
subscriber(updatedData)
}
} else if !hadProcessedFetch {
for (waitUntilFetchStatus, subscriber) in dataContext.completeDataSubscribers.copyItems() {
if waitUntilFetchStatus {
subscriber(updatedData)
}
}
}
let status: MediaResourceStatus
if updatedData.complete {
status = .Local
} else {
if let resourceSize = resource.size {
status = .Fetching(progress: Float(updatedSize) / Float(resourceSize))
} else {
status = .Fetching(progress: 0.0)
}
}
self.statusQueue.async {
if let statusContext = self.statusContexts[resourceId] {
statusContext.status = status
for subscriber in statusContext.subscribers.copyItems() {
subscriber(status)
}
}
}
}
case let .replaceHeader(data, dataRange):
if let thisFd = fd {
close(thisFd)
fd = nil
}
if fd == nil {
let handle = open(paths.partial, O_CREAT | O_WRONLY, S_IRUSR | S_IWUSR)
if handle >= 0 {
fd = handle
}
}
if let thisFd = fd {
if !dataRange.isEmpty {
lseek(thisFd, 0, SEEK_SET)
let writeResult = data.withUnsafeBytes { bytes -> Int in
return write(thisFd, bytes.advanced(by: dataRange.lowerBound), dataRange.count)
}
lseek(thisFd, Int64(offset), SEEK_SET)
if writeResult != dataRange.count {
print("write error \(errno)")
}
close(thisFd)
fd = nil
}
}
case .reset:
if fd == nil {
let handle = open(paths.partial, O_WRONLY | O_CREAT | O_APPEND, S_IRUSR | S_IWUSR)
if handle >= 0 {
fd = handle
}
}
if let fd = fd {
ftruncate(fd, 0)
lseek(fd, 0, SEEK_SET)
} else {
assertionFailure()
}
}
offset += result.data.count
let updatedSize = offset
offset = 0
let updatedSize = offset
let updatedData: MediaResourceData
updatedData = MediaResourceData(path: paths.partial, size: updatedSize, complete: false)
dataContext.data = updatedData
let hadProcessedFetch = dataContext.processedFetch
dataContext.processedFetch = true
for (_, subscriber) in dataContext.progresiveDataSubscribers.copyItems() {
subscriber(updatedData)
}
if updatedData.complete {
for (_, subscriber) in dataContext.completeDataSubscribers.copyItems() {
subscriber(updatedData)
}
} else if !hadProcessedFetch {
for (waitUntilFetchStatus, subscriber) in dataContext.completeDataSubscribers.copyItems() {
if waitUntilFetchStatus {
subscriber(updatedData)
}
}
}
let status: MediaResourceStatus
if updatedData.complete {
status = .Local
} else {
if let resourceSize = resource.size {
status = .Fetching(progress: Float(updatedSize) / Float(resourceSize))
} else {
status = .Fetching(progress: 0.0)
}
}
self.statusQueue.async {
if let statusContext = self.statusContexts[resourceId] {
statusContext.status = status
for subscriber in statusContext.subscribers.copyItems() {
subscriber(status)
}
}
}
case let .moveLocalFile(tempPath):
if let fd = fd {
close(fd)
}
unlink(paths.partial)
do {
try FileManager.default.moveItem(atPath: tempPath, toPath: paths.partial)
} catch {
assertionFailure()
}
let updatedData: MediaResourceData
if result.complete {
guard let offset = fileSize(paths.partial) else {
assertionFailure()
return
}
let updatedSize = offset
let updatedData: MediaResourceData
let linkResult = link(paths.partial, paths.complete)
assert(linkResult == 0)
updatedData = MediaResourceData(path: paths.complete, size: updatedSize, complete: true)
} else {
updatedData = MediaResourceData(path: paths.partial, size: updatedSize, complete: false)
}
dataContext.data = updatedData
let hadProcessedFetch = dataContext.processedFetch
dataContext.processedFetch = true
for (_, subscriber) in dataContext.progresiveDataSubscribers.copyItems() {
subscriber(updatedData)
}
if updatedData.complete {
for (_, subscriber) in dataContext.completeDataSubscribers.copyItems() {
dataContext.data = updatedData
let hadProcessedFetch = dataContext.processedFetch
dataContext.processedFetch = true
for (_, subscriber) in dataContext.progresiveDataSubscribers.copyItems() {
subscriber(updatedData)
}
} else if !hadProcessedFetch {
for (waitUntilFetchStatus, subscriber) in dataContext.completeDataSubscribers.copyItems() {
if waitUntilFetchStatus {
if updatedData.complete {
for (_, subscriber) in dataContext.completeDataSubscribers.copyItems() {
subscriber(updatedData)
}
}
}
let status: MediaResourceStatus
if updatedData.complete {
status = .Local
} else {
if let resourceSize = resource.size {
status = .Fetching(progress: Float(updatedSize) / Float(resourceSize))
} else {
status = .Fetching(progress: 0.0)
}
}
self.statusQueue.async {
if let statusContext = self.statusContexts[resourceId] {
statusContext.status = status
for subscriber in statusContext.subscribers.copyItems() {
subscriber(status)
} else if !hadProcessedFetch {
for (waitUntilFetchStatus, subscriber) in dataContext.completeDataSubscribers.copyItems() {
if waitUntilFetchStatus {
subscriber(updatedData)
}
}
}
let status: MediaResourceStatus
if updatedData.complete {
status = .Local
} else {
if let resourceSize = resource.size {
status = .Fetching(progress: Float(updatedSize) / Float(resourceSize))
} else {
status = .Fetching(progress: 0.0)
}
}
self.statusQueue.async {
if let statusContext = self.statusContexts[resourceId] {
statusContext.status = status
for subscriber in statusContext.subscribers.copyItems() {
subscriber(status)
}
}
}
}
}
}
})

View File

@@ -26,6 +26,7 @@ public protocol MediaResource {
var id: MediaResourceId { get }
var size: Int? { get }
var streamable: Bool { get }
var headerSize: Int32 { get }
}
public extension MediaResource {
@@ -36,6 +37,10 @@ public extension MediaResource {
var streamable: Bool {
return false
}
var headerSize: Int32 {
return 0
}
}
public protocol CachedMediaResourceRepresentation {

View File

@@ -30,7 +30,9 @@ final class MutablePeerMergedOperationLogView {
}
} else {
updated = true
assert(self.entries.isEmpty)
if !self.entries.isEmpty {
assertionFailure("self.entries.isEmpty == false for tag \(self.tag)")
}
self.entries.append(entry)
self.tailIndex = entry.mergedIndex
}

View File

@@ -146,6 +146,11 @@ public final class Modifier {
self.postbox?.setPeerChatState(id, state: state)
}
public func getPeerChatInterfaceState(_ id: PeerId) -> PeerChatInterfaceState? {
assert(!self.disposed)
return self.postbox?.peerChatInterfaceStateTable.get(id)
}
public func updatePeerChatInterfaceState(_ id: PeerId, update: (PeerChatInterfaceState?) -> (PeerChatInterfaceState?)) {
assert(!self.disposed)
self.postbox?.updatePeerChatInterfaceState(id, update: update)