Synchronization improvements

This commit is contained in:
Ali 2023-02-10 13:58:55 +04:00
parent 0755bab06c
commit 00e72e99a8
5 changed files with 78 additions and 85 deletions

View File

@ -898,7 +898,10 @@ public class Account {
public private(set) var stateManager: AccountStateManager! public private(set) var stateManager: AccountStateManager!
private(set) var contactSyncManager: ContactSyncManager! private(set) var contactSyncManager: ContactSyncManager!
public private(set) var callSessionManager: CallSessionManager! public private(set) var callSessionManager: CallSessionManager!
public private(set) var viewTracker: AccountViewTracker! public private(set) var viewTracker: AccountViewTracker!
private var resetPeerHoleManagement: ((PeerId) -> Void)?
public private(set) var pendingMessageManager: PendingMessageManager! public private(set) var pendingMessageManager: PendingMessageManager!
public private(set) var pendingUpdateMessageManager: PendingUpdateMessageManager! public private(set) var pendingUpdateMessageManager: PendingUpdateMessageManager!
private(set) var messageMediaPreuploadManager: MessageMediaPreuploadManager! private(set) var messageMediaPreuploadManager: MessageMediaPreuploadManager!
@ -911,6 +914,7 @@ public class Account {
fileprivate let managedStickerPacksDisposable = MetaDisposable() fileprivate let managedStickerPacksDisposable = MetaDisposable()
private let becomeMasterDisposable = MetaDisposable() private let becomeMasterDisposable = MetaDisposable()
private let managedServiceViewsDisposable = MetaDisposable() private let managedServiceViewsDisposable = MetaDisposable()
private let managedServiceViewsActionDisposable = MetaDisposable()
private let managedOperationsDisposable = DisposableSet() private let managedOperationsDisposable = DisposableSet()
private var storageSettingsDisposable: Disposable? private var storageSettingsDisposable: Disposable?
private var automaticCacheEvictionContext: AutomaticCacheEvictionContext? private var automaticCacheEvictionContext: AutomaticCacheEvictionContext?
@ -982,6 +986,9 @@ public class Account {
}, shouldKeepOnlinePresence: self.shouldKeepOnlinePresence.get(), peerInputActivityManager: self.peerInputActivityManager, auxiliaryMethods: auxiliaryMethods) }, shouldKeepOnlinePresence: self.shouldKeepOnlinePresence.get(), peerInputActivityManager: self.peerInputActivityManager, auxiliaryMethods: auxiliaryMethods)
self.viewTracker = AccountViewTracker(account: self) self.viewTracker = AccountViewTracker(account: self)
self.viewTracker.resetPeerHoleManagement = { [weak self] peerId in
self?.resetPeerHoleManagement?(peerId)
}
self.taskManager = AccountTaskManager( self.taskManager = AccountTaskManager(
stateManager: self.stateManager, stateManager: self.stateManager,
@ -1085,18 +1092,25 @@ public class Account {
self.network.shouldExplicitelyKeepWorkerConnections.set(self.shouldExplicitelyKeepWorkerConnections.get()) self.network.shouldExplicitelyKeepWorkerConnections.set(self.shouldExplicitelyKeepWorkerConnections.get())
self.network.shouldKeepBackgroundDownloadConnections.set(self.shouldKeepBackgroundDownloadConnections.get()) self.network.shouldKeepBackgroundDownloadConnections.set(self.shouldKeepBackgroundDownloadConnections.get())
let serviceTasksMaster = shouldBeMaster self.managedServiceViewsDisposable.set(shouldBeMaster.start(next: { [weak self] value in
|> deliverOn(self.serviceQueue) guard let strongSelf = self else {
|> mapToSignal { [weak self] value -> Signal<Void, NoError> in return
if let strongSelf = self, value { }
if value {
Logger.shared.log("Account", "Became master") Logger.shared.log("Account", "Became master")
return managedServiceViews(accountPeerId: peerId, network: strongSelf.network, postbox: strongSelf.postbox, stateManager: strongSelf.stateManager, pendingMessageManager: strongSelf.pendingMessageManager) let data = managedServiceViews(accountPeerId: peerId, network: network, postbox: postbox, stateManager: strongSelf.stateManager, pendingMessageManager: strongSelf.pendingMessageManager)
let resetPeerHoles = data.resetPeerHoles
strongSelf.resetPeerHoleManagement = { peerId in
resetPeerHoles(peerId)
}
strongSelf.managedServiceViewsActionDisposable.set(data.disposable)
} else { } else {
Logger.shared.log("Account", "Resigned master") Logger.shared.log("Account", "Resigned master")
return .never() strongSelf.managedServiceViewsActionDisposable.set(nil)
} }
} }))
self.managedServiceViewsDisposable.set(serviceTasksMaster.start())
let pendingMessageManager = self.pendingMessageManager let pendingMessageManager = self.pendingMessageManager
Logger.shared.log("Account", "Begin watching unsent message ids") Logger.shared.log("Account", "Begin watching unsent message ids")
@ -1198,6 +1212,7 @@ public class Account {
self.managedContactsDisposable.dispose() self.managedContactsDisposable.dispose()
self.managedStickerPacksDisposable.dispose() self.managedStickerPacksDisposable.dispose()
self.managedServiceViewsDisposable.dispose() self.managedServiceViewsDisposable.dispose()
self.managedServiceViewsActionDisposable.dispose()
self.managedOperationsDisposable.dispose() self.managedOperationsDisposable.dispose()
self.storageSettingsDisposable?.dispose() self.storageSettingsDisposable?.dispose()
self.smallLogPostDisposable.dispose() self.smallLogPostDisposable.dispose()

View File

@ -333,6 +333,8 @@ public final class AccountViewTracker {
public let chatListPreloadItems = Promise<Set<ChatHistoryPreloadItem>>([]) public let chatListPreloadItems = Promise<Set<ChatHistoryPreloadItem>>([])
var resetPeerHoleManagement: ((PeerId) -> Void)?
init(account: Account) { init(account: Account) {
self.account = account self.account = account
@ -1615,9 +1617,11 @@ public final class AccountViewTracker {
addHole = true addHole = true
pollingCompleted = .single(true) pollingCompleted = .single(true)
} }
let resetPeerHoleManagement = self.resetPeerHoleManagement
let isAutomaticallyTracked = self.account!.postbox.transaction { transaction -> Bool in let isAutomaticallyTracked = self.account!.postbox.transaction { transaction -> Bool in
if transaction.getPeerChatListIndex(peerId) == nil { if transaction.getPeerChatListIndex(peerId) == nil {
if addHole { if addHole {
resetPeerHoleManagement?(peerId)
transaction.addHole(peerId: peerId, threadId: nil, namespace: Namespaces.Message.Cloud, space: .everywhere, range: 1 ... (Int32.max - 1)) transaction.addHole(peerId: peerId, threadId: nil, namespace: Namespaces.Message.Cloud, space: .everywhere, range: 1 ... (Int32.max - 1))
} }
return false return false

View File

@ -86,6 +86,17 @@ private final class ManagedMessageHistoryHolesContext {
self.currentEntriesDisposable?.dispose() self.currentEntriesDisposable?.dispose()
} }
func resetPeer(peerId: PeerId) {
for entry in Array(self.completedEntries.keys) {
switch entry.hole {
case let .peer(peer):
if peer.peerId == peerId {
self.completedEntries.removeValue(forKey: entry)
}
}
}
}
func clearDisposables() -> [Disposable] { func clearDisposables() -> [Disposable] {
var disposables = Array(self.pendingEntries.map(\.disposable)) var disposables = Array(self.pendingEntries.map(\.disposable))
disposables.append(contentsOf: self.discardedEntries.map(\.entry.disposable)) disposables.append(contentsOf: self.discardedEntries.map(\.entry.disposable))
@ -196,69 +207,28 @@ private final class ManagedMessageHistoryHolesContext {
} }
} }
func managedMessageHistoryHoles(accountPeerId: PeerId, network: Network, postbox: Postbox) -> Signal<Void, NoError> { func managedMessageHistoryHoles(accountPeerId: PeerId, network: Network, postbox: Postbox) -> ((PeerId) -> Void, Disposable) {
let sharedQueue = Queue() let sharedQueue = Queue()
return Signal { _ in var context: QueueLocalObject<ManagedMessageHistoryHolesContext>? = QueueLocalObject<ManagedMessageHistoryHolesContext>(queue: sharedQueue, generate: {
var context: QueueLocalObject<ManagedMessageHistoryHolesContext>? = QueueLocalObject<ManagedMessageHistoryHolesContext>(queue: sharedQueue, generate: { return ManagedMessageHistoryHolesContext(
return ManagedMessageHistoryHolesContext( queue: sharedQueue,
queue: sharedQueue, accountPeerId: accountPeerId,
accountPeerId: accountPeerId, postbox: postbox,
postbox: postbox, network: network,
network: network, entries: postbox.messageHistoryHolesView() |> map { view in
entries: postbox.messageHistoryHolesView() |> map { view in return view.entries
return view.entries
}
)
})
/*var performWorkImpl: ((@escaping (ManagedMessageHistoryHolesState) -> Void) -> Void)?
let state = Atomic(value: ManagedMessageHistoryHolesState(performWork: { f in
performWorkImpl?(f)
}))
performWorkImpl = { [weak state] f in
state?.with { state in
f(state)
} }
)
})
return ({ [weak context] peerId in
context?.with { context in
context.resetPeer(peerId: peerId)
} }
}, ActionDisposable {
let disposable = (postbox.messageHistoryHolesView() if context != nil {
|> deliverOn(sharedQueue)).start(next: { view in context = nil
let (removed, added, _) = state.with { state in
return state.update(entries: view.entries)
}
for disposable in removed {
disposable.dispose()
}
for (entry, disposable) in added {
switch entry.hole {
case let .peer(hole):
disposable.set((fetchMessageHistoryHole(accountPeerId: accountPeerId, source: .network(network), postbox: postbox, peerInput: .direct(peerId: hole.peerId, threadId: hole.threadId), namespace: hole.namespace, direction: entry.direction, space: entry.space, count: entry.count)
|> afterDisposed {
sharedQueue.async {
state.with { state in
let _ = state
//state.removeCompletedEntry(entry: entry)
}
}
}).start())
}
}
})*/
return ActionDisposable {
if context != nil {
context = nil
}
/*disposable.dispose()
for disposable in state.with({ state -> [Disposable] in
state.clearDisposables()
}) {
disposable.dispose()
}*/
} }
} })
|> runOn(sharedQueue)
} }

View File

@ -2,13 +2,14 @@ import Foundation
import Postbox import Postbox
import SwiftSignalKit import SwiftSignalKit
func managedServiceViews(accountPeerId: PeerId, network: Network, postbox: Postbox, stateManager: AccountStateManager, pendingMessageManager: PendingMessageManager) -> Signal<Void, NoError> { func managedServiceViews(accountPeerId: PeerId, network: Network, postbox: Postbox, stateManager: AccountStateManager, pendingMessageManager: PendingMessageManager) -> (resetPeerHoles: (PeerId) -> Void, disposable: Disposable) {
return Signal { _ in let disposable = DisposableSet()
let disposable = DisposableSet()
disposable.add(managedMessageHistoryHoles(accountPeerId: accountPeerId, network: network, postbox: postbox).start()) let managedHoles = managedMessageHistoryHoles(accountPeerId: accountPeerId, network: network, postbox: postbox)
disposable.add(managedChatListHoles(network: network, postbox: postbox, accountPeerId: accountPeerId).start())
disposable.add(managedForumTopicListHoles(network: network, postbox: postbox, accountPeerId: accountPeerId).start()) disposable.add(managedHoles.1)
disposable.add(managedChatListHoles(network: network, postbox: postbox, accountPeerId: accountPeerId).start())
return disposable disposable.add(managedForumTopicListHoles(network: network, postbox: postbox, accountPeerId: accountPeerId).start())
}
return (managedHoles.0, disposable)
} }

View File

@ -15,9 +15,6 @@ func _internal_resetAccountState(postbox: Postbox, network: Network, accountPeer
return .never() return .never()
} }
return withResolvedAssociatedMessages(postbox: postbox, source: .network(network), peers: Dictionary(fetchedChats.peers.map({ ($0.id, $0) }), uniquingKeysWith: { lhs, _ in lhs }), storeMessages: fetchedChats.storeMessages, { transaction, additionalPeers, additionalMessages -> Void in return withResolvedAssociatedMessages(postbox: postbox, source: .network(network), peers: Dictionary(fetchedChats.peers.map({ ($0.id, $0) }), uniquingKeysWith: { lhs, _ in lhs }), storeMessages: fetchedChats.storeMessages, { transaction, additionalPeers, additionalMessages -> Void in
transaction.removeAllChatListEntries(groupId: .root, exceptPeerNamespace: Namespaces.Peer.SecretChat)
transaction.removeAllChatListEntries(groupId: .group(1), exceptPeerNamespace: Namespaces.Peer.SecretChat)
for peerId in transaction.chatListGetAllPeerIds() { for peerId in transaction.chatListGetAllPeerIds() {
if peerId.namespace != Namespaces.Peer.SecretChat { if peerId.namespace != Namespaces.Peer.SecretChat {
transaction.updatePeerChatListInclusion(peerId, inclusion: .notIncluded) transaction.updatePeerChatListInclusion(peerId, inclusion: .notIncluded)
@ -27,15 +24,21 @@ func _internal_resetAccountState(postbox: Postbox, network: Network, accountPeer
transaction.addHole(peerId: peerId, threadId: nil, namespace: Namespaces.Message.Cloud, space: .everywhere, range: 1 ... (Int32.max - 1)) transaction.addHole(peerId: peerId, threadId: nil, namespace: Namespaces.Message.Cloud, space: .everywhere, range: 1 ... (Int32.max - 1))
} }
if peerId.namespace == Namespaces.Peer.CloudChannel, let channel = transaction.getPeer(peerId) as? TelegramChannel, channel.flags.contains(.isForum) { if peerId.namespace == Namespaces.Peer.CloudChannel {
transaction.setPeerPinnedThreads(peerId: peerId, threadIds: []) if let channel = transaction.getPeer(peerId) as? TelegramChannel, channel.flags.contains(.isForum) {
for threadId in transaction.setMessageHistoryThreads(peerId: peerId) { transaction.setPeerPinnedThreads(peerId: peerId, threadIds: [])
transaction.setMessageHistoryThreadInfo(peerId: peerId, threadId: threadId, info: nil) for threadId in transaction.setMessageHistoryThreads(peerId: peerId) {
transaction.addHole(peerId: peerId, threadId: threadId, namespace: Namespaces.Message.Cloud, space: .everywhere, range: 1 ... (Int32.max - 1)) transaction.setMessageHistoryThreadInfo(peerId: peerId, threadId: threadId, info: nil)
transaction.addHole(peerId: peerId, threadId: threadId, namespace: Namespaces.Message.Cloud, space: .everywhere, range: 1 ... (Int32.max - 1))
}
} }
transaction.updatePeerCachedData(peerIds: Set([peerId]), update: { _, _ in nil })
} }
} }
transaction.removeAllChatListEntries(groupId: .root, exceptPeerNamespace: Namespaces.Peer.SecretChat)
transaction.removeAllChatListEntries(groupId: .group(1), exceptPeerNamespace: Namespaces.Peer.SecretChat)
updatePeers(transaction: transaction, peers: fetchedChats.peers + additionalPeers, update: { _, updated -> Peer in updatePeers(transaction: transaction, peers: fetchedChats.peers + additionalPeers, update: { _, updated -> Peer in
return updated return updated
}) })