diff --git a/Postbox/MessageHistoryTable.swift b/Postbox/MessageHistoryTable.swift index 38fdf2891c..9b0809116d 100644 --- a/Postbox/MessageHistoryTable.swift +++ b/Postbox/MessageHistoryTable.swift @@ -5,6 +5,7 @@ enum MessageHistoryOperation { case InsertHole(MessageHistoryHole) case Remove([MessageIndex]) case UpdateReadState(CombinedPeerReadState) + case UpdateEmbeddedMedia(MessageIndex, ReadBuffer) } struct MessageHistoryAnchorIndex { @@ -280,6 +281,32 @@ final class MessageHistoryTable: Table { self.processIndexOperations(id.peerId, operations: operations, processedOperationsByPeerId: &operationsByPeerId, unsentMessageOperations: &unsentMessageOperations, updatedPeerReadStateOperations: &updatedPeerReadStateOperations) } + func updateMedia(id: MediaId, media: Media?, inout operationsByPeerId: [PeerId: [MessageHistoryOperation]], inout updatedMedia: [MediaId: Media?]) { + if let previousMedia = self.messageMediaTable.get(id, embedded: { index, id in + return self.embeddedMediaForIndex(index, id: id) + }) { + if let media = media { + if !previousMedia.isEqual(media) { + self.messageMediaTable.update(id, media: media, messageHistoryTable: self, operationsByPeerId: &operationsByPeerId) + updatedMedia[id] = media + } + } else { + updatedMedia[id] = nil + if case let .Embedded(index) = self.messageMediaTable.removeReference(id) { + self.updateEmbeddedMedia(index, operationsByPeerId: &operationsByPeerId, update: { previousMedia in + var updated: [Media] = [] + for previous in previousMedia { + if previous.id != id { + updated.append(previous) + } + } + return updated + }) + } + } + } + } + func resetIncomingReadStates(states: [PeerId: [MessageId.Namespace: PeerReadState]], inout operationsByPeerId: [PeerId: [MessageHistoryOperation]], inout updatedPeerReadStateOperations: [PeerId: PeerReadStateSynchronizationOperation?]) { for (peerId, namespaces) in states { if let combinedState = self.readStateTable.resetStates(peerId, namespaces: namespaces) { @@ -591,16 +618,102 @@ final class MessageHistoryTable: Table { } } - private func updateMedia(from from: [Media], to: [Media]) { - if from.count != 0 { - assertionFailure() - } - - for media in from { - if let id = media.id { - + func embeddedMediaForIndex(index: MessageIndex, id: MediaId) -> Media? { + if let message = self.getMessage(index) where message.embeddedMediaData.length > 4 { + var embeddedMediaCount: Int32 = 0 + message.embeddedMediaData.read(&embeddedMediaCount, offset: 0, length: 4) + + for _ in 0 ..< embeddedMediaCount { + let mediaOffset = message.embeddedMediaData.offset + var mediaLength: Int32 = 0 + var copyMedia = true + message.embeddedMediaData.read(&mediaLength, offset: 0, length: 4) + if let readMedia = Decoder(buffer: MemoryBuffer(memory: message.embeddedMediaData.memory + message.embeddedMediaData.offset, capacity: Int(mediaLength), length: Int(mediaLength), freeWhenDone: false)).decodeRootObject() as? Media { + + if let readMediaId = readMedia.id where readMediaId == id { + return readMedia + } + } + message.embeddedMediaData.skip(Int(mediaLength)) } } + + return nil + } + + func updateEmbeddedMedia(index: MessageIndex, inout operationsByPeerId: [PeerId: [MessageHistoryOperation]], @noescape update: ([Media]) -> [Media]) { + if let message = self.getMessage(index) { + var embeddedMediaCount: Int32 = 0 + message.embeddedMediaData.read(&embeddedMediaCount, offset: 0, length: 4) + + var previousMedia: [Media] = [] + for _ in 0 ..< embeddedMediaCount { + let mediaOffset = message.embeddedMediaData.offset + var mediaLength: Int32 = 0 + var copyMedia = true + message.embeddedMediaData.read(&mediaLength, offset: 0, length: 4) + if let readMedia = Decoder(buffer: MemoryBuffer(memory: message.embeddedMediaData.memory + message.embeddedMediaData.offset, capacity: Int(mediaLength), length: Int(mediaLength), freeWhenDone: false)).decodeRootObject() as? Media { + previousMedia.append(readMedia) + } + message.embeddedMediaData.skip(Int(mediaLength)) + } + + let updatedMedia = update(previousMedia) + var updated = false + if updatedMedia.count != previousMedia.count { + updated = true + } else { + outer: for i in 0 ..< previousMedia.count { + if !previousMedia[i].isEqual(updatedMedia[i]) { + updated = true + break outer + } + } + } + + if updated { + var updatedEmbeddedMediaCount: Int32 = Int32(updatedMedia.count) + + let updatedEmbeddedMediaBuffer = WriteBuffer() + updatedEmbeddedMediaBuffer.write(&updatedEmbeddedMediaCount, offset: 0, length: 4) + + let encoder = Encoder() + + for media in updatedMedia { + encoder.reset() + encoder.encodeRootObject(media) + let encodedBuffer = encoder.readBufferNoCopy() + var encodedLength: Int32 = Int32(encodedBuffer.length) + updatedEmbeddedMediaBuffer.write(&encodedLength, offset: 0, length: 4) + updatedEmbeddedMediaBuffer.write(encodedBuffer.memory, offset: 0, length: encodedBuffer.length) + } + + self.storeIntermediateMessage(IntermediateMessage(stableId: message.stableId, id: message.id, timestamp: message.timestamp, flags: message.flags, tags: message.tags, forwardInfo: message.forwardInfo, authorId: message.authorId, text: message.text, attributesData: message.attributesData, embeddedMediaData: updatedEmbeddedMediaBuffer.readBufferNoCopy(), referencedMedia: message.referencedMedia), sharedKey: self.key(index)) + + let operation: MessageHistoryOperation = .UpdateEmbeddedMedia(index, updatedEmbeddedMediaBuffer.makeReadBufferAndReset()) + if operationsByPeerId[index.id.peerId] == nil { + operationsByPeerId[index.id.peerId] = [operation] + } else { + operationsByPeerId[index.id.peerId]!.append(operation) + } + } + } + } + + func updateEmbeddedMedia(index: MessageIndex, mediaId: MediaId, media: Media?, inout operationsByPeerId: [PeerId: [MessageHistoryOperation]]) { + self.updateEmbeddedMedia(index, operationsByPeerId: &operationsByPeerId, update: { previousMedia in + var updatedMedia: [Media] = [] + for previous in previousMedia { + if previous.id == mediaId { + if let media = media { + updatedMedia.append(media) + } + } else { + updatedMedia.append(previous) + } + } + return updatedMedia + }) } private func justUpdate(index: MessageIndex, message: InternalStoreMessage, sharedKey: ValueBoxKey, sharedBuffer: WriteBuffer, sharedEncoder: Encoder, inout unsentMessageOperations: [IntermediateMessageHistoryUnsentOperation]) -> IntermediateMessage? { @@ -642,14 +755,16 @@ final class MessageHistoryTable: Table { var previousReferencedMedia: [Media] = [] for mediaId in previousMessage.referencedMedia { - if let media = self.messageMediaTable.get(mediaId) { + if let media = self.messageMediaTable.get(mediaId, embedded: { _ in + return nil + }) { previousMedia.append(media) } } var removedMediaIds: [MediaId] = [] - self.updateMedia(from: previousMedia, to: message.media) + //self.updateMedia(from: previousMedia, to: message.media) sharedBuffer.reset() @@ -1041,7 +1156,9 @@ final class MessageHistoryTable: Table { } for mediaId in message.referencedMedia { - if let media = self.messageMediaTable.get(mediaId) { + if let media = self.messageMediaTable.get(mediaId, embedded: { _ in + return nil + }) { parsedMedia.append(media) } } diff --git a/Postbox/MessageHistoryView.swift b/Postbox/MessageHistoryView.swift index a6403034f7..c2b8c41722 100644 --- a/Postbox/MessageHistoryView.swift +++ b/Postbox/MessageHistoryView.swift @@ -190,7 +190,7 @@ final class MutableMessageHistoryView { return false } - func replay(operations: [MessageHistoryOperation], holeFillDirections: [MessageIndex: HoleFillDirection], context: MutableMessageHistoryViewReplayContext) -> Bool { + func replay(operations: [MessageHistoryOperation], holeFillDirections: [MessageIndex: HoleFillDirection], updatedMedia: [MediaId: Media?], context: MutableMessageHistoryViewReplayContext) -> Bool { let tagMask = self.tagMask let unwrappedTagMask: UInt32 = tagMask?.rawValue ?? 0 @@ -216,6 +216,68 @@ final class MutableMessageHistoryView { case let .UpdateReadState(combinedReadState): hasChanges = true //self.combinedReadState = combinedReadState + case let .UpdateEmbeddedMedia(index, embeddedMediaData): + for i in 0 ..< self.entries.count { + if case let .IntermediateMessageEntry(message) = self.entries[i] where MessageIndex(message) == index { + self.entries[i] = .IntermediateMessageEntry(IntermediateMessage(stableId: message.stableId, id: message.id, timestamp: message.timestamp, flags: message.flags, tags: message.tags, forwardInfo: message.forwardInfo, authorId: message.authorId, text: message.text, attributesData: message.attributesData, embeddedMediaData: embeddedMediaData, referencedMedia: message.referencedMedia)) + hasChanges = true + break + } + } + } + } + + if !updatedMedia.isEmpty { + for i in 0 ..< self.entries.count { + switch self.entries[i] { + case let .MessageEntry(message): + var rebuild = false + for media in message.media { + if let mediaId = media.id, updated = updatedMedia[mediaId] { + rebuild = true + break + } + } + + if rebuild { + var messageMedia: [Media] = [] + for media in message.media { + if let mediaId = media.id, updated = updatedMedia[mediaId] { + if let updated = updated { + messageMedia.append(updated) + } + } else { + messageMedia.append(media) + } + } + var updatedMessage = Message(stableId: message.stableId, id: message.id, timestamp: message.timestamp, flags: message.flags, tags: message.tags, forwardInfo: message.forwardInfo, author: message.author, text: message.text, attributes: message.attributes, media: messageMedia, peers: message.peers, associatedMessages: message.associatedMessages) + self.entries[i] = .MessageEntry(updatedMessage) + hasChanges = true + } + case let .IntermediateMessageEntry(message): + var rebuild = false + for mediaId in message.referencedMedia { + if let media = updatedMedia[mediaId] where media?.id != mediaId { + rebuild = true + break + } + } + if rebuild { + var referencedMedia: [MediaId] = [] + for mediaId in message.referencedMedia { + if let media = updatedMedia[mediaId] where media?.id != mediaId { + if let id = media?.id { + referencedMedia.append(id) + } + } else { + referencedMedia.append(mediaId) + } + } + hasChanges = true + } + default: + break + } } } diff --git a/Postbox/MessageMediaTable.swift b/Postbox/MessageMediaTable.swift index 18b8973907..b9f4069b70 100644 --- a/Postbox/MessageMediaTable.swift +++ b/Postbox/MessageMediaTable.swift @@ -10,6 +10,11 @@ enum InsertMediaResult { case Embed(Media) } +enum RemoveMediaResult { + case Reference + case Embedded(MessageIndex) +} + enum DebugMediaEntry { case Direct(Media, Int) case MessageReference(MessageIndex) @@ -30,7 +35,7 @@ final class MessageMediaTable: Table { return key } - func get(id: MediaId) -> Media? { + func get(id: MediaId, @noescape embedded: (MessageIndex, MediaId) -> Media?) -> Media? { if let value = self.valueBox.get(self.tableId, key: self.key(id)) { var type: Int8 = 0 value.read(&type, offset: 0, length: 1) @@ -40,6 +45,19 @@ final class MessageMediaTable: Table { if let media = Decoder(buffer: MemoryBuffer(memory: value.memory + value.offset, capacity: Int(dataLength), length: Int(dataLength), freeWhenDone: false)).decodeRootObject() as? Media { return media } + } else if type == MediaEntryType.MessageReference.rawValue { + var idPeerId: Int64 = 0 + var idNamespace: Int32 = 0 + var idId: Int32 = 0 + var idTimestamp: Int32 = 0 + value.read(&idPeerId, offset: 0, length: 8) + value.read(&idNamespace, offset: 0, length: 4) + value.read(&idId, offset: 0, length: 4) + value.read(&idTimestamp, offset: 0, length: 4) + + let referencedMessageIndex = MessageIndex(id: MessageId(peerId: PeerId(idPeerId), namespace: idNamespace, id: idId), timestamp: idTimestamp) + + return embedded(referencedMessageIndex, id) } } return nil @@ -125,7 +143,7 @@ final class MessageMediaTable: Table { } } - func removeReference(id: MediaId, sharedWriteBuffer: WriteBuffer = WriteBuffer()) { + func removeReference(id: MediaId, sharedWriteBuffer: WriteBuffer = WriteBuffer()) -> RemoveMediaResult { if let value = self.valueBox.get(self.tableId, key: self.key(id)) { var type: Int8 = 0 value.read(&type, offset: 0, length: 1) @@ -151,10 +169,28 @@ final class MessageMediaTable: Table { } else { self.valueBox.set(self.tableId, key: self.key(id), value: sharedWriteBuffer.readBufferNoCopy()) } + + return .Reference } else if type == MediaEntryType.MessageReference.rawValue { + var idPeerId: Int64 = 0 + var idNamespace: Int32 = 0 + var idId: Int32 = 0 + var idTimestamp: Int32 = 0 + value.read(&idPeerId, offset: 0, length: 8) + value.read(&idNamespace, offset: 0, length: 4) + value.read(&idId, offset: 0, length: 4) + value.read(&idTimestamp, offset: 0, length: 4) + + let referencedMessageIndex = MessageIndex(id: MessageId(peerId: PeerId(idPeerId), namespace: idNamespace, id: idId), timestamp: idTimestamp) + self.valueBox.remove(self.tableId, key: self.key(id)) + + return .Embedded(referencedMessageIndex) + } else { + assertionFailure() } } + return .Reference } func removeEmbeddedMedia(media: Media) { @@ -164,6 +200,54 @@ final class MessageMediaTable: Table { self.mediaCleanupTable.add(media) } + func update(id: MediaId, media: Media, messageHistoryTable: MessageHistoryTable, inout operationsByPeerId: [PeerId: [MessageHistoryOperation]], sharedWriteBuffer: WriteBuffer = WriteBuffer(), sharedEncoder: Encoder = Encoder()) { + if let updatedId = media.id { + if let value = self.valueBox.get(self.tableId, key: self.key(id)) { + var type: Int8 = 0 + value.read(&type, offset: 0, length: 1) + if type == MediaEntryType.Direct.rawValue { + var dataLength: Int32 = 0 + value.read(&dataLength, offset: 0, length: 4) + let mediaOffset = value.offset + value.skip(Int(dataLength)) + + var messageReferenceCount: Int32 = 0 + value.read(&messageReferenceCount, offset: 0, length: 4) + + sharedWriteBuffer.reset() + var directType: Int8 = MediaEntryType.Direct.rawValue + sharedWriteBuffer.write(&directType, offset: 0, length: 1) + + sharedEncoder.reset() + sharedEncoder.encodeRootObject(media) + let mediaBuffer = sharedEncoder.memoryBuffer() + var mediaBufferLength = Int32(mediaBuffer.length) + sharedWriteBuffer.write(&mediaBufferLength, offset: 0, length: 4) + sharedWriteBuffer.write(mediaBuffer.memory, offset: 0, length: mediaBuffer.length) + + sharedWriteBuffer.write(&messageReferenceCount, offset: 0, length: 4) + + if id != updatedId { + self.valueBox.remove(self.tableId, key: self.key(id)) + } + self.valueBox.set(self.tableId, key: self.key(updatedId), value: sharedWriteBuffer.readBufferNoCopy()) + } else if type == MediaEntryType.MessageReference.rawValue { + var idPeerId: Int64 = 0 + var idNamespace: Int32 = 0 + var idId: Int32 = 0 + var idTimestamp: Int32 = 0 + value.read(&idPeerId, offset: 0, length: 8) + value.read(&idNamespace, offset: 0, length: 4) + value.read(&idId, offset: 0, length: 4) + value.read(&idTimestamp, offset: 0, length: 4) + + let referencedMessageIndex = MessageIndex(id: MessageId(peerId: PeerId(idPeerId), namespace: idNamespace, id: idId), timestamp: idTimestamp) + messageHistoryTable.updateEmbeddedMedia(referencedMessageIndex, mediaId: id, media: media, operationsByPeerId: &operationsByPeerId) + } + } + } + } + func debugList() -> [DebugMediaEntry] { var entries: [DebugMediaEntry] = [] diff --git a/Postbox/Postbox.swift b/Postbox/Postbox.swift index 2e9b6b1536..90c71c9f73 100644 --- a/Postbox/Postbox.swift +++ b/Postbox/Postbox.swift @@ -97,6 +97,23 @@ public final class Modifier { self.postbox?.updateMessage(index, update: update) } + public func updateMedia(id: MediaId, update: Media?) { + self.postbox?.updateMedia(id, update: update) + } + + public func getMessage(id: MessageId) -> Message? { + if let postbox = self.postbox { + if let entry = postbox.messageHistoryIndexTable.get(id) { + if case let .Message(index) = entry { + if let message = postbox.messageHistoryTable.getMessage(index) { + return postbox.renderIntermediateMessage(message) + } + } + } + } + return nil + } + public func filterStoredMessageIds(messageIds: Set) -> Set { if let postbox = self.postbox { return postbox.filterStoredMessageIds(messageIds) @@ -121,6 +138,7 @@ public final class Postbox { private var currentOperationsByPeerId: [PeerId: [MessageHistoryOperation]] = [:] private var currentUnsentOperations: [IntermediateMessageHistoryUnsentOperation] = [] private var currentUpdatedSynchronizeReadStateOperations: [PeerId: PeerReadStateSynchronizationOperation?] = [:] + private var currentUpdatedMedia: [MediaId: Media?] = [:] private var currentRemovedHolesByPeerId: [PeerId: [MessageIndex: HoleFillDirection]] = [:] private var currentFilledHolesByPeerId: [PeerId: [MessageIndex: HoleFillDirection]] = [:] @@ -613,7 +631,7 @@ public final class Postbox { self.chatListTable.replaceHole(index, hole: hole, operations: &chatListOperations) } - self.viewTracker.updateViews(currentOperationsByPeerId: self.currentOperationsByPeerId, peerIdsWithFilledHoles: self.currentFilledHolesByPeerId, removedHolesByPeerId: self.currentRemovedHolesByPeerId, chatListOperations: chatListOperations, currentUpdatedPeers: self.currentUpdatedPeers, unsentMessageOperations: self.currentUnsentOperations, updatedSynchronizePeerReadStateOperations: self.currentUpdatedSynchronizeReadStateOperations) + self.viewTracker.updateViews(currentOperationsByPeerId: self.currentOperationsByPeerId, peerIdsWithFilledHoles: self.currentFilledHolesByPeerId, removedHolesByPeerId: self.currentRemovedHolesByPeerId, chatListOperations: chatListOperations, currentUpdatedPeers: self.currentUpdatedPeers, unsentMessageOperations: self.currentUnsentOperations, updatedSynchronizePeerReadStateOperations: self.currentUpdatedSynchronizeReadStateOperations, updatedMedia: self.currentUpdatedMedia) self.currentOperationsByPeerId.removeAll() self.currentFilledHolesByPeerId.removeAll() @@ -622,6 +640,7 @@ public final class Postbox { self.currentReplaceChatListHoles.removeAll() self.currentUnsentOperations.removeAll() self.currentUpdatedSynchronizeReadStateOperations.removeAll() + self.currentUpdatedMedia.removeAll() for table in self.tables { table.beforeCommit() @@ -658,6 +677,10 @@ public final class Postbox { } } + private func updateMedia(id: MediaId, update: Media?) { + self.messageHistoryTable.updateMedia(id, media: update, operationsByPeerId: &self.currentOperationsByPeerId, updatedMedia: &self.currentUpdatedMedia) + } + private func filterStoredMessageIds(messageIds: Set) -> Set { var filteredIds = Set() @@ -687,45 +710,6 @@ public final class Postbox { return EmptyDisposable } } - - public func preloadedAroundUnreadMessageHistoryViewForPeerId(peerId: PeerId, count: Int, tagMask: MessageTags? = nil) -> Signal<(MessageHistoryView, ViewUpdateType), NoError> { - return self.aroundUnreadMessageHistoryViewForPeerId(peerId, count: count, tagMask: tagMask) |> filter { view, _ in - if let maxReadIndex = view.maxReadIndex { - var targetIndex = 0 - for i in 0 ..< view.entries.count { - if view.entries[i].index >= maxReadIndex { - targetIndex = i - break - } - } - - /*print("entries:") - for i in 0 ..< view.entries.count { - switch view.entries[i] { - case let .MessageEntry(message): - print(" \(i == targetIndex ? "*" : " ")\(message.id.id): \(message.text)") - case let .HoleEntry(hole): - print(" \(i == targetIndex ? "*" : " ")\(hole.min) — \(hole.maxIndex.id.id): hole") - } - }*/ - - let maxIndex = min(view.entries.count, targetIndex + count / 2) - if maxIndex >= targetIndex { - for i in targetIndex ..< maxIndex { - if case .HoleEntry = view.entries[i] { - return false - } - } - } - - return true - } else { - return true - } - } |> take(1) |> mapToSignal { _ in - return self.aroundUnreadMessageHistoryViewForPeerId(peerId, count: count, tagMask: tagMask) - } - } public func aroundUnreadMessageHistoryViewForPeerId(peerId: PeerId, count: Int, tagMask: MessageTags? = nil) -> Signal<(MessageHistoryView, ViewUpdateType), NoError> { return Signal { subscriber in diff --git a/Postbox/ViewTracker.swift b/Postbox/ViewTracker.swift index 4733db9008..25f19275b9 100644 --- a/Postbox/ViewTracker.swift +++ b/Postbox/ViewTracker.swift @@ -138,12 +138,13 @@ final class ViewTracker { } } - func updateViews(currentOperationsByPeerId currentOperationsByPeerId: [PeerId: [MessageHistoryOperation]], peerIdsWithFilledHoles: [PeerId: [MessageIndex: HoleFillDirection]], removedHolesByPeerId: [PeerId: [MessageIndex: HoleFillDirection]], chatListOperations: [ChatListOperation], currentUpdatedPeers: [PeerId: Peer], unsentMessageOperations: [IntermediateMessageHistoryUnsentOperation], updatedSynchronizePeerReadStateOperations: [PeerId: PeerReadStateSynchronizationOperation?]) { + func updateViews(currentOperationsByPeerId currentOperationsByPeerId: [PeerId: [MessageHistoryOperation]], peerIdsWithFilledHoles: [PeerId: [MessageIndex: HoleFillDirection]], removedHolesByPeerId: [PeerId: [MessageIndex: HoleFillDirection]], chatListOperations: [ChatListOperation], currentUpdatedPeers: [PeerId: Peer], unsentMessageOperations: [IntermediateMessageHistoryUnsentOperation], updatedSynchronizePeerReadStateOperations: [PeerId: PeerReadStateSynchronizationOperation?], updatedMedia: [MediaId: Media?]) { var updateTrackedHolesPeerIds: [PeerId] = [] for (peerId, bag) in self.messageHistoryViews { var updateHoles = false - if let operations = currentOperationsByPeerId[peerId] { + let operations = currentOperationsByPeerId[peerId] + if operations != nil || !updatedMedia.isEmpty { updateHoles = true for (mutableView, pipe) in bag.copyItems() { let context = MutableMessageHistoryViewReplayContext() @@ -156,7 +157,7 @@ final class ViewTracker { updateType = .Generic } - if mutableView.replay(operations, holeFillDirections: peerIdsWithFilledHoles[peerId] ?? [:], context: context) { + if mutableView.replay(operations ?? [], holeFillDirections: peerIdsWithFilledHoles[peerId] ?? [:], updatedMedia: updatedMedia, context: context) { mutableView.complete(context, fetchEarlier: { index, count in return self.fetchEarlierHistoryEntries(peerId, index, count, mutableView.tagMask) }, fetchLater: { index, count in