Refactoring

This commit is contained in:
Ali
2021-07-07 01:59:12 +04:00
parent 9973b37e97
commit 9134bae908
123 changed files with 509 additions and 1477 deletions

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,228 @@
import Foundation
import Postbox
import SwiftSignalKit
import TelegramApi
import MtProtoKit
import SyncCore
public func updateMessageReactionsInteractively(postbox: Postbox, messageId: MessageId, reaction: String?) -> Signal<Never, NoError> {
return postbox.transaction { transaction -> Void in
transaction.setPendingMessageAction(type: .updateReaction, id: messageId, action: UpdateMessageReactionsAction())
transaction.updateMessage(messageId, update: { currentMessage in
var storeForwardInfo: StoreMessageForwardInfo?
if let forwardInfo = currentMessage.forwardInfo {
storeForwardInfo = StoreMessageForwardInfo(authorId: forwardInfo.author?.id, sourceId: forwardInfo.source?.id, sourceMessageId: forwardInfo.sourceMessageId, date: forwardInfo.date, authorSignature: forwardInfo.authorSignature, psaType: forwardInfo.psaType, flags: forwardInfo.flags)
}
var attributes = currentMessage.attributes
loop: for j in 0 ..< attributes.count {
if let _ = attributes[j] as? PendingReactionsMessageAttribute {
attributes.remove(at: j)
break loop
}
}
attributes.append(PendingReactionsMessageAttribute(value: reaction))
return .update(StoreMessage(id: currentMessage.id, globallyUniqueId: currentMessage.globallyUniqueId, groupingKey: currentMessage.groupingKey, threadId: currentMessage.threadId, timestamp: currentMessage.timestamp, flags: StoreMessageFlags(currentMessage.flags), tags: currentMessage.tags, globalTags: currentMessage.globalTags, localTags: currentMessage.localTags, forwardInfo: storeForwardInfo, authorId: currentMessage.author?.id, text: currentMessage.text, attributes: attributes, media: currentMessage.media))
})
}
|> ignoreValues
}
private enum RequestUpdateMessageReactionError {
case generic
}
private func requestUpdateMessageReaction(postbox: Postbox, network: Network, stateManager: AccountStateManager, messageId: MessageId) -> Signal<Never, RequestUpdateMessageReactionError> {
return .complete()
/*return postbox.transaction { transaction -> (Peer, String?)? in
guard let peer = transaction.getPeer(messageId.peerId) else {
return nil
}
guard let message = transaction.getMessage(messageId) else {
return nil
}
var value: String?
for attribute in message.attributes {
if let attribute = attribute as? PendingReactionsMessageAttribute {
value = attribute.value
break
}
}
return (peer, value)
}
|> castError(RequestUpdateMessageReactionError.self)
|> mapToSignal { peerAndValue in
guard let (peer, value) = peerAndValue else {
return .fail(.generic)
}
guard let inputPeer = apiInputPeer(peer) else {
return .fail(.generic)
}
if messageId.namespace != Namespaces.Message.Cloud {
return .fail(.generic)
}
return network.request(Api.functions.messages.sendReaction(flags: value == nil ? 0 : 1, peer: inputPeer, msgId: messageId.id, reaction: value))
|> mapError { _ -> RequestUpdateMessageReactionError in
return .generic
}
|> mapToSignal { result -> Signal<Never, RequestUpdateMessageReactionError> in
return postbox.transaction { transaction -> Void in
transaction.setPendingMessageAction(type: .updateReaction, id: messageId, action: UpdateMessageReactionsAction())
transaction.updateMessage(messageId, update: { currentMessage in
var storeForwardInfo: StoreMessageForwardInfo?
if let forwardInfo = currentMessage.forwardInfo {
storeForwardInfo = StoreMessageForwardInfo(authorId: forwardInfo.author?.id, sourceId: forwardInfo.source?.id, sourceMessageId: forwardInfo.sourceMessageId, date: forwardInfo.date, authorSignature: forwardInfo.authorSignature, psaType: forwardInfo.psaType, flags: forwardInfo.flags)
}
let reactions = mergedMessageReactions(attributes: currentMessage.attributes)
var attributes = currentMessage.attributes
for j in (0 ..< attributes.count).reversed() {
if attributes[j] is PendingReactionsMessageAttribute || attributes[j] is ReactionsMessageAttribute {
attributes.remove(at: j)
}
}
if let reactions = reactions {
attributes.append(reactions)
}
return .update(StoreMessage(id: currentMessage.id, globallyUniqueId: currentMessage.globallyUniqueId, groupingKey: currentMessage.groupingKey, timestamp: currentMessage.timestamp, flags: StoreMessageFlags(currentMessage.flags), tags: currentMessage.tags, globalTags: currentMessage.globalTags, localTags: currentMessage.localTags, forwardInfo: storeForwardInfo, authorId: currentMessage.author?.id, text: currentMessage.text, attributes: attributes, media: currentMessage.media))
})
stateManager.addUpdates(result)
}
|> castError(RequestUpdateMessageReactionError.self)
|> ignoreValues
}
}*/
}
private final class ManagedApplyPendingMessageReactionsActionsHelper {
var operationDisposables: [MessageId: Disposable] = [:]
func update(entries: [PendingMessageActionsEntry]) -> (disposeOperations: [Disposable], beginOperations: [(PendingMessageActionsEntry, MetaDisposable)]) {
var disposeOperations: [Disposable] = []
var beginOperations: [(PendingMessageActionsEntry, MetaDisposable)] = []
var hasRunningOperationForPeerId = Set<PeerId>()
var validIds = Set<MessageId>()
for entry in entries {
if !hasRunningOperationForPeerId.contains(entry.id.peerId) {
hasRunningOperationForPeerId.insert(entry.id.peerId)
validIds.insert(entry.id)
if self.operationDisposables[entry.id] == nil {
let disposable = MetaDisposable()
beginOperations.append((entry, disposable))
self.operationDisposables[entry.id] = disposable
}
}
}
var removeMergedIds: [MessageId] = []
for (id, disposable) in self.operationDisposables {
if !validIds.contains(id) {
removeMergedIds.append(id)
disposeOperations.append(disposable)
}
}
for id in removeMergedIds {
self.operationDisposables.removeValue(forKey: id)
}
return (disposeOperations, beginOperations)
}
func reset() -> [Disposable] {
let disposables = Array(self.operationDisposables.values)
self.operationDisposables.removeAll()
return disposables
}
}
private func withTakenAction(postbox: Postbox, type: PendingMessageActionType, id: MessageId, _ f: @escaping (Transaction, PendingMessageActionsEntry?) -> Signal<Never, NoError>) -> Signal<Never, NoError> {
return postbox.transaction { transaction -> Signal<Never, NoError> in
var result: PendingMessageActionsEntry?
if let action = transaction.getPendingMessageAction(type: type, id: id) as? UpdateMessageReactionsAction {
result = PendingMessageActionsEntry(id: id, action: action)
}
return f(transaction, result)
}
|> switchToLatest
}
func managedApplyPendingMessageReactionsActions(postbox: Postbox, network: Network, stateManager: AccountStateManager) -> Signal<Void, NoError> {
return Signal { _ in
let helper = Atomic<ManagedApplyPendingMessageReactionsActionsHelper>(value: ManagedApplyPendingMessageReactionsActionsHelper())
let actionsKey = PostboxViewKey.pendingMessageActions(type: .updateReaction)
let disposable = postbox.combinedView(keys: [actionsKey]).start(next: { view in
var entries: [PendingMessageActionsEntry] = []
if let v = view.views[actionsKey] as? PendingMessageActionsView {
entries = v.entries
}
let (disposeOperations, beginOperations) = helper.with { helper -> (disposeOperations: [Disposable], beginOperations: [(PendingMessageActionsEntry, MetaDisposable)]) in
return helper.update(entries: entries)
}
for disposable in disposeOperations {
disposable.dispose()
}
for (entry, disposable) in beginOperations {
let signal = withTakenAction(postbox: postbox, type: .updateReaction, id: entry.id, { transaction, entry -> Signal<Never, NoError> in
if let entry = entry {
if let _ = entry.action as? UpdateMessageReactionsAction {
return synchronizeMessageReactions(transaction: transaction, postbox: postbox, network: network, stateManager: stateManager, id: entry.id)
} else {
assertionFailure()
}
}
return .complete()
})
|> then(
postbox.transaction { transaction -> Void in
transaction.setPendingMessageAction(type: .updateReaction, id: entry.id, action: nil)
}
|> ignoreValues
)
disposable.set(signal.start())
}
})
return ActionDisposable {
let disposables = helper.with { helper -> [Disposable] in
return helper.reset()
}
for disposable in disposables {
disposable.dispose()
}
disposable.dispose()
}
}
}
private func synchronizeMessageReactions(transaction: Transaction, postbox: Postbox, network: Network, stateManager: AccountStateManager, id: MessageId) -> Signal<Never, NoError> {
return requestUpdateMessageReaction(postbox: postbox, network: network, stateManager: stateManager, messageId: id)
|> `catch` { _ -> Signal<Never, NoError> in
return postbox.transaction { transaction -> Void in
transaction.setPendingMessageAction(type: .updateReaction, id: id, action: nil)
transaction.updateMessage(id, update: { currentMessage in
var storeForwardInfo: StoreMessageForwardInfo?
if let forwardInfo = currentMessage.forwardInfo {
storeForwardInfo = StoreMessageForwardInfo(authorId: forwardInfo.author?.id, sourceId: forwardInfo.source?.id, sourceMessageId: forwardInfo.sourceMessageId, date: forwardInfo.date, authorSignature: forwardInfo.authorSignature, psaType: forwardInfo.psaType, flags: forwardInfo.flags)
}
var attributes = currentMessage.attributes
loop: for j in 0 ..< attributes.count {
if let _ = attributes[j] as? PendingReactionsMessageAttribute {
attributes.remove(at: j)
break loop
}
}
return .update(StoreMessage(id: currentMessage.id, globallyUniqueId: currentMessage.globallyUniqueId, groupingKey: currentMessage.groupingKey, threadId: currentMessage.threadId, timestamp: currentMessage.timestamp, flags: StoreMessageFlags(currentMessage.flags), tags: currentMessage.tags, globalTags: currentMessage.globalTags, localTags: currentMessage.localTags, forwardInfo: storeForwardInfo, authorId: currentMessage.author?.id, text: currentMessage.text, attributes: attributes, media: currentMessage.media))
})
}
|> ignoreValues
}
}

View File

@@ -0,0 +1,70 @@
import Foundation
import TelegramApi
public enum PeerInputActivity: Comparable {
case typingText
case uploadingFile(progress: Int32)
case recordingVoice
case uploadingPhoto(progress: Int32)
case uploadingVideo(progress: Int32)
case playingGame
case recordingInstantVideo
case uploadingInstantVideo(progress: Int32)
case speakingInGroupCall(timestamp: Int32)
public var key: Int32 {
switch self {
case .typingText:
return 0
case .speakingInGroupCall:
return 1
case .uploadingFile:
return 2
case .recordingVoice:
return 3
case .uploadingPhoto:
return 4
case .uploadingVideo:
return 5
case .recordingInstantVideo:
return 6
case .uploadingInstantVideo:
return 7
case .playingGame:
return 8
}
}
public static func <(lhs: PeerInputActivity, rhs: PeerInputActivity) -> Bool {
return lhs.key < rhs.key
}
}
extension PeerInputActivity {
init?(apiType: Api.SendMessageAction, timestamp: Int32) {
switch apiType {
case .sendMessageCancelAction, .sendMessageChooseContactAction, .sendMessageGeoLocationAction, .sendMessageRecordVideoAction:
return nil
case .sendMessageGamePlayAction:
self = .playingGame
case .sendMessageRecordAudioAction, .sendMessageUploadAudioAction:
self = .recordingVoice
case .sendMessageTypingAction:
self = .typingText
case let .sendMessageUploadDocumentAction(progress):
self = .uploadingFile(progress: progress)
case let .sendMessageUploadPhotoAction(progress):
self = .uploadingPhoto(progress: progress)
case let .sendMessageUploadVideoAction(progress):
self = .uploadingVideo(progress: progress)
case .sendMessageRecordRoundAction:
self = .recordingInstantVideo
case let .sendMessageUploadRoundAction(progress):
self = .uploadingInstantVideo(progress: progress)
case .speakingInGroupCallAction:
self = .speakingInGroupCall(timestamp: timestamp)
case let .sendMessageHistoryImportAction:
return nil
}
}
}

View File

@@ -0,0 +1,416 @@
import Foundation
import Postbox
import SwiftSignalKit
private typealias SignalKitTimer = SwiftSignalKit.Timer
private struct ActivityRecord {
let peerId: PeerId
let activity: PeerInputActivity
let id: Int32
let timer: SignalKitTimer
let episodeId: Int32?
let timestamp: Double
let updateId: Int32
}
private final class PeerInputActivityContext {
private let queue: Queue
private let notifyEmpty: () -> Void
private let notifyUpdated: () -> Void
private var nextId: Int32 = 0
private var activities: [ActivityRecord] = []
private let subscribers = Bag<([(PeerId, PeerInputActivityRecord)]) -> Void>()
private var scheduledUpdateSubscribers = false
init(queue: Queue, notifyEmpty: @escaping () -> Void, notifyUpdated: @escaping () -> Void) {
self.queue = queue
self.notifyEmpty = notifyEmpty
self.notifyUpdated = notifyUpdated
}
func addActivity(peerId: PeerId, activity: PeerInputActivity, timeout: Double, episodeId: Int32?, nextUpdateId: inout Int32) {
assert(self.queue.isCurrent())
let timestamp = CFAbsoluteTimeGetCurrent()
var updated = false
var found = false
for i in 0 ..< self.activities.count {
let record = self.activities[i]
if record.peerId == peerId && record.activity.key == activity.key && record.episodeId == episodeId {
found = true
record.timer.invalidate()
var updateId = record.updateId
var recordTimestamp = record.timestamp
if record.activity != activity || record.timestamp + 1.0 < timestamp {
updated = true
updateId = nextUpdateId
recordTimestamp = timestamp
nextUpdateId += 1
}
let currentId = record.id
let timer = SignalKitTimer(timeout: timeout, repeat: false, completion: { [weak self] in
if let strongSelf = self {
for currentActivity in strongSelf.activities {
if currentActivity.id == currentId {
strongSelf.removeActivity(peerId: currentActivity.peerId, activity: currentActivity.activity, episodeId: currentActivity.episodeId)
}
}
}
}, queue: self.queue)
self.activities[i] = ActivityRecord(peerId: peerId, activity: activity, id: currentId, timer: timer, episodeId: episodeId, timestamp: recordTimestamp, updateId: updateId)
timer.start()
break
}
}
if !found {
updated = true
let activityId = self.nextId
self.nextId += 1
let timer = SignalKitTimer(timeout: timeout, repeat: false, completion: { [weak self] in
if let strongSelf = self {
for currentActivity in strongSelf.activities {
if currentActivity.id == activityId {
strongSelf.removeActivity(peerId: currentActivity.peerId, activity: currentActivity.activity, episodeId: currentActivity.episodeId)
}
}
}
}, queue: self.queue)
let updateId = nextUpdateId
nextUpdateId += 1
self.activities.append(ActivityRecord(peerId: peerId, activity: activity, id: activityId, timer: timer, episodeId: episodeId, timestamp: timestamp, updateId: updateId))
timer.start()
}
if updated {
self.scheduleUpdateSubscribers()
}
}
func removeActivity(peerId: PeerId, activity: PeerInputActivity, episodeId: Int32?) {
assert(self.queue.isCurrent())
for i in 0 ..< self.activities.count {
let record = self.activities[i]
if record.peerId == peerId && record.activity.key == activity.key && record.episodeId == episodeId {
self.activities.remove(at: i)
record.timer.invalidate()
self.scheduleUpdateSubscribers()
break
}
}
}
func removeAllActivities(peerId: PeerId) {
assert(self.queue.isCurrent())
var updated = false
for i in (0 ..< self.activities.count).reversed() {
let record = self.activities[i]
if record.peerId == peerId {
record.timer.invalidate()
self.activities.remove(at: i)
updated = true
}
}
if updated {
self.scheduleUpdateSubscribers()
}
}
func scheduleUpdateSubscribers() {
if !self.scheduledUpdateSubscribers {
self.scheduledUpdateSubscribers = true
self.queue.async { [weak self] in
self?.updateSubscribers()
}
}
}
func isEmpty() -> Bool {
return self.activities.isEmpty && self.subscribers.isEmpty
}
func topActivities() -> [(PeerId, PeerInputActivityRecord)] {
var peerIds = Set<PeerId>()
var result: [(PeerId, PeerInputActivityRecord)] = []
for record in self.activities {
if !peerIds.contains(record.peerId) {
peerIds.insert(record.peerId)
result.append((record.peerId, PeerInputActivityRecord(activity: record.activity, updateId: record.updateId)))
if result.count == 10 {
break
}
}
}
return result
}
func updateSubscribers() {
self.scheduledUpdateSubscribers = false
if self.isEmpty() {
self.notifyEmpty()
} else {
let topActivities = self.topActivities()
for subscriber in self.subscribers.copyItems() {
subscriber(topActivities)
}
self.notifyUpdated()
}
}
func addSubscriber(_ subscriber: @escaping ([(PeerId, PeerInputActivityRecord)]) -> Void) -> Int {
return self.subscribers.add(subscriber)
}
func removeSubscriber(_ index: Int) {
self.subscribers.remove(index)
}
}
private final class PeerGlobalInputActivityContext {
private let subscribers = Bag<([PeerActivitySpace: [(PeerId, PeerInputActivityRecord)]]) -> Void>()
func addSubscriber(_ subscriber: @escaping ([PeerActivitySpace: [(PeerId, PeerInputActivityRecord)]]) -> Void) -> Int {
return self.subscribers.add(subscriber)
}
func removeSubscriber(_ index: Int) {
self.subscribers.remove(index)
}
var isEmpty: Bool {
return self.subscribers.isEmpty
}
func notify(_ activities: [PeerActivitySpace: [(PeerId, PeerInputActivityRecord)]]) {
for subscriber in self.subscribers.copyItems() {
subscriber(activities)
}
}
}
final class PeerInputActivityManager {
private let queue = Queue()
private var nextEpisodeId: Int32 = 0
private var nextUpdateId: Int32 = 0
private var contexts: [PeerActivitySpace: PeerInputActivityContext] = [:]
private var globalContext: PeerGlobalInputActivityContext?
func activities(peerId: PeerActivitySpace) -> Signal<[(PeerId, PeerInputActivityRecord)], NoError> {
let queue = self.queue
return Signal { [weak self] subscriber in
let disposable = MetaDisposable()
queue.async {
if let strongSelf = self {
let context: PeerInputActivityContext
if let currentContext = strongSelf.contexts[peerId] {
context = currentContext
} else {
context = PeerInputActivityContext(queue: queue, notifyEmpty: {
if let strongSelf = self {
strongSelf.contexts.removeValue(forKey: peerId)
if let globalContext = strongSelf.globalContext {
let activities = strongSelf.collectActivities()
globalContext.notify(activities)
}
}
}, notifyUpdated: {
if let strongSelf = self, let globalContext = strongSelf.globalContext {
let activities = strongSelf.collectActivities()
globalContext.notify(activities)
}
})
strongSelf.contexts[peerId] = context
}
let index = context.addSubscriber({ next in
subscriber.putNext(next)
})
subscriber.putNext(context.topActivities())
disposable.set(ActionDisposable {
queue.async {
if let strongSelf = self {
if let currentContext = strongSelf.contexts[peerId] {
currentContext.removeSubscriber(index)
if currentContext.isEmpty() {
strongSelf.contexts.removeValue(forKey: peerId)
}
}
}
}
})
}
}
return disposable
}
}
private func collectActivities() -> [PeerActivitySpace: [(PeerId, PeerInputActivityRecord)]] {
assert(self.queue.isCurrent())
var dict: [PeerActivitySpace: [(PeerId, PeerInputActivityRecord)]] = [:]
for (chatPeerId, context) in self.contexts {
dict[chatPeerId] = context.topActivities()
}
return dict
}
func allActivities() -> Signal<[PeerActivitySpace: [(PeerId, PeerInputActivityRecord)]], NoError> {
let queue = self.queue
return Signal { [weak self] subscriber in
let disposable = MetaDisposable()
queue.async {
if let strongSelf = self {
let context: PeerGlobalInputActivityContext
if let current = strongSelf.globalContext {
context = current
} else {
context = PeerGlobalInputActivityContext()
strongSelf.globalContext = context
}
let index = context.addSubscriber({ next in
subscriber.putNext(next)
})
subscriber.putNext(strongSelf.collectActivities())
disposable.set(ActionDisposable {
queue.async {
if let strongSelf = self {
if let currentContext = strongSelf.globalContext {
currentContext.removeSubscriber(index)
if currentContext.isEmpty {
strongSelf.globalContext = nil
}
}
}
}
})
}
}
return disposable
}
}
func addActivity(chatPeerId: PeerActivitySpace, peerId: PeerId, activity: PeerInputActivity, episodeId: Int32? = nil) {
self.queue.async {
let context: PeerInputActivityContext
if let currentContext = self.contexts[chatPeerId] {
context = currentContext
} else {
context = PeerInputActivityContext(queue: self.queue, notifyEmpty: { [weak self] in
if let strongSelf = self {
strongSelf.contexts.removeValue(forKey: chatPeerId)
if let globalContext = strongSelf.globalContext {
let activities = strongSelf.collectActivities()
globalContext.notify(activities)
}
}
}, notifyUpdated: { [weak self] in
if let strongSelf = self, let globalContext = strongSelf.globalContext {
let activities = strongSelf.collectActivities()
globalContext.notify(activities)
}
})
self.contexts[chatPeerId] = context
}
let timeout: Double
switch activity {
case .speakingInGroupCall:
timeout = 3.0
default:
timeout = 8.0
}
context.addActivity(peerId: peerId, activity: activity, timeout: timeout, episodeId: episodeId, nextUpdateId: &self.nextUpdateId)
if let globalContext = self.globalContext {
let activities = self.collectActivities()
globalContext.notify(activities)
}
}
}
func removeActivity(chatPeerId: PeerActivitySpace, peerId: PeerId, activity: PeerInputActivity, episodeId: Int32? = nil) {
self.queue.async {
if let context = self.contexts[chatPeerId] {
context.removeActivity(peerId: peerId, activity: activity, episodeId: episodeId)
if let globalContext = self.globalContext {
let activities = self.collectActivities()
globalContext.notify(activities)
}
}
}
}
func removeAllActivities(chatPeerId: PeerActivitySpace, peerId: PeerId) {
self.queue.async {
if let currentContext = self.contexts[chatPeerId] {
currentContext.removeAllActivities(peerId: peerId)
if let globalContext = self.globalContext {
let activities = self.collectActivities()
globalContext.notify(activities)
}
}
}
}
func transaction(_ f: @escaping (PeerInputActivityManager) -> Void) {
self.queue.async {
f(self)
}
}
func acquireActivity(chatPeerId: PeerActivitySpace, peerId: PeerId, activity: PeerInputActivity) -> Disposable {
let disposable = MetaDisposable()
let queue = self.queue
queue.async {
let episodeId = self.nextEpisodeId
self.nextEpisodeId += 1
let update: () -> Void = { [weak self] in
self?.addActivity(chatPeerId: chatPeerId, peerId: peerId, activity: activity, episodeId: episodeId)
}
let timeout: Double
switch activity {
case .speakingInGroupCall:
timeout = 2.0
default:
timeout = 5.0
}
let timer = SignalKitTimer(timeout: timeout, repeat: true, completion: {
update()
}, queue: queue)
timer.start()
update()
disposable.set(ActionDisposable { [weak self] in
queue.async {
timer.invalidate()
guard let strongSelf = self else {
return
}
strongSelf.removeActivity(chatPeerId: chatPeerId, peerId: peerId, activity: activity, episodeId: episodeId)
}
})
}
return disposable
}
}