Update reactions

This commit is contained in:
Ali
2022-01-28 01:59:28 +04:00
parent 8edb5f967c
commit 7cd4c4e489
9 changed files with 268 additions and 44 deletions

View File

@@ -15,6 +15,10 @@ public enum PostboxUpdateMessage {
case skip
}
public protocol StoreOrUpdateMessageAction: AnyObject {
func addOrUpdate(messages: [StoreMessage], transaction: Transaction)
}
public final class Transaction {
private let queue: Queue
private weak var postbox: PostboxImpl?
@@ -507,7 +511,7 @@ public final class Transaction {
public func updateMessage(_ id: MessageId, update: (Message) -> PostboxUpdateMessage) {
assert(!self.disposed)
self.postbox?.updateMessage(id, update: update)
self.postbox?.updateMessage(transaction: self, id: id, update: update)
}
public func offsetPendingMessagesTimestamps(lowerBound: MessageId, excludeIds: Set<MessageId>, timestamp: Int32) {
@@ -1452,6 +1456,7 @@ final class PostboxImpl {
let peerRatingTable: RatingTable<PeerId>
var installedMessageActionsByPeerId: [PeerId: Bag<([StoreMessage], Transaction) -> Void>] = [:]
var installedStoreOrUpdateMessageActionsByPeerId: [PeerId: Bag<StoreOrUpdateMessageAction>] = [:]
init(queue: Queue, basePath: String, seedConfiguration: SeedConfiguration, valueBox: SqliteValueBox, timestampForAbsoluteTimeBasedOperations: Int32, isTemporary: Bool, tempDir: TempBoxDirectory?, useCaches: Bool) {
assert(queue.isCurrent())
@@ -1750,6 +1755,12 @@ final class PostboxImpl {
f(peerMessages, transaction)
}
}
if let bag = self.installedStoreOrUpdateMessageActionsByPeerId[peerId] {
for f in bag.copyItems() {
f.addOrUpdate(messages: peerMessages, transaction: transaction)
}
}
}
return addResult
@@ -2255,11 +2266,17 @@ final class PostboxImpl {
self.peerRatingTable.replace(items: peerIds)
}
fileprivate func updateMessage(_ id: MessageId, update: (Message) -> PostboxUpdateMessage) {
fileprivate func updateMessage(transaction: Transaction, id: MessageId, update: (Message) -> PostboxUpdateMessage) {
if let index = self.messageHistoryIndexTable.getIndex(id), let intermediateMessage = self.messageHistoryTable.getMessage(index) {
let message = self.renderIntermediateMessage(intermediateMessage)
if case let .update(updatedMessage) = update(message) {
self.messageHistoryTable.updateMessage(id, message: updatedMessage, operationsByPeerId: &self.currentOperationsByPeerId, updatedMedia: &self.currentUpdatedMedia, unsentMessageOperations: &self.currentUnsentOperations, updatedPeerReadStateOperations: &self.currentUpdatedSynchronizeReadStateOperations, globalTagsOperations: &self.currentGlobalTagsOperations, pendingActionsOperations: &self.currentPendingMessageActionsOperations, updatedMessageActionsSummaries: &self.currentUpdatedMessageActionsSummaries, updatedMessageTagSummaries: &self.currentUpdatedMessageTagSummaries, invalidateMessageTagSummaries: &self.currentInvalidateMessageTagSummaries, localTagsOperations: &self.currentLocalTagsOperations, timestampBasedMessageAttributesOperations: &self.currentTimestampBasedMessageAttributesOperations)
if let bag = self.installedStoreOrUpdateMessageActionsByPeerId[id.peerId] {
for f in bag.copyItems() {
f.addOrUpdate(messages: [updatedMessage], transaction: transaction)
}
}
}
}
}
@@ -3397,6 +3414,24 @@ final class PostboxImpl {
return disposable
}
public func installStoreOrUpdateMessageAction(peerId: PeerId, action: StoreOrUpdateMessageAction) -> Disposable {
let disposable = MetaDisposable()
self.queue.async {
if self.installedStoreOrUpdateMessageActionsByPeerId[peerId] == nil {
self.installedStoreOrUpdateMessageActionsByPeerId[peerId] = Bag()
}
let index = self.installedStoreOrUpdateMessageActionsByPeerId[peerId]!.add(action)
disposable.set(ActionDisposable {
self.queue.async {
if let bag = self.installedStoreOrUpdateMessageActionsByPeerId[peerId] {
bag.remove(index)
}
}
})
}
return disposable
}
fileprivate func scanMessages(peerId: PeerId, namespace: MessageId.Namespace, tag: MessageTags, _ f: (Message) -> Bool) {
var index = MessageIndex.lowerBound(peerId: peerId, namespace: namespace)
while true {
@@ -4139,6 +4174,16 @@ public class Postbox {
return disposable
}
public func installStoreOrUpdateMessageAction(peerId: PeerId, action: StoreOrUpdateMessageAction) -> Disposable {
let disposable = MetaDisposable()
self.impl.with { impl in
disposable.set(impl.installStoreOrUpdateMessageAction(peerId: peerId, action: action))
}
return disposable
}
public func isMasterClient() -> Signal<Bool, NoError> {
return Signal { subscriber in