mirror of
https://github.com/Swiftgram/Telegram-iOS.git
synced 2025-06-16 05:55:20 +00:00
238 lines
8.8 KiB
Swift
238 lines
8.8 KiB
Swift
import Foundation
|
|
import Postbox
|
|
import SwiftSignalKit
|
|
|
|
private final class ManagedMessageHistoryHolesContext {
|
|
private struct LocationKey: Equatable {
|
|
var peerId: PeerId
|
|
var threadId: Int64?
|
|
var space: MessageHistoryHoleOperationSpace
|
|
|
|
init(peerId: PeerId, threadId: Int64?, space: MessageHistoryHoleOperationSpace) {
|
|
self.peerId = peerId
|
|
self.threadId = threadId
|
|
self.space = space
|
|
}
|
|
}
|
|
|
|
private struct PendingEntry: CustomStringConvertible {
|
|
var id: Int
|
|
var key: LocationKey
|
|
var entry: MessageHistoryHolesViewEntry
|
|
var disposable: MetaDisposable
|
|
|
|
init(id: Int, key: LocationKey, entry: MessageHistoryHolesViewEntry, disposable: MetaDisposable) {
|
|
self.id = id
|
|
self.key = key
|
|
self.entry = entry
|
|
self.disposable = disposable
|
|
}
|
|
|
|
var description: String {
|
|
return "entry: \(self.entry)"
|
|
}
|
|
}
|
|
|
|
private struct DiscardedEntry {
|
|
var entry: PendingEntry
|
|
var timestamp: Double
|
|
|
|
init(entry: PendingEntry, timestamp: Double) {
|
|
self.entry = entry
|
|
self.timestamp = timestamp
|
|
}
|
|
}
|
|
|
|
private let queue: Queue
|
|
private let accountPeerId: PeerId
|
|
private let postbox: Postbox
|
|
private let network: Network
|
|
|
|
private var nextEntryId: Int = 0
|
|
private var pendingEntries: [PendingEntry] = []
|
|
private var discardedEntries: [DiscardedEntry] = []
|
|
|
|
private var oldEntriesTimer: SwiftSignalKit.Timer?
|
|
|
|
private var currentEntries: Set<MessageHistoryHolesViewEntry> = Set()
|
|
private var currentEntriesDisposable: Disposable?
|
|
|
|
private var completedEntries: [MessageHistoryHolesViewEntry: Double] = [:]
|
|
|
|
init(
|
|
queue: Queue,
|
|
accountPeerId: PeerId,
|
|
postbox: Postbox,
|
|
network: Network,
|
|
entries: Signal<Set<MessageHistoryHolesViewEntry>, NoError>
|
|
) {
|
|
self.queue = queue
|
|
self.accountPeerId = accountPeerId
|
|
self.postbox = postbox
|
|
self.network = network
|
|
|
|
self.currentEntriesDisposable = (entries |> deliverOn(self.queue)).start(next: { [weak self] entries in
|
|
guard let self = self else {
|
|
return
|
|
}
|
|
self.update(entries: entries)
|
|
})
|
|
}
|
|
|
|
deinit {
|
|
assert(self.queue.isCurrent())
|
|
|
|
self.oldEntriesTimer?.invalidate()
|
|
self.currentEntriesDisposable?.dispose()
|
|
}
|
|
|
|
func resetPeer(peerId: PeerId) {
|
|
for entry in Array(self.completedEntries.keys) {
|
|
switch entry.hole {
|
|
case let .peer(peer):
|
|
if peer.peerId == peerId {
|
|
self.completedEntries.removeValue(forKey: entry)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func clearDisposables() -> [Disposable] {
|
|
var disposables = Array(self.pendingEntries.map(\.disposable))
|
|
disposables.append(contentsOf: self.discardedEntries.map(\.entry.disposable))
|
|
self.pendingEntries.removeAll()
|
|
self.discardedEntries.removeAll()
|
|
return disposables
|
|
}
|
|
|
|
private func updateNeedsTimer() {
|
|
let needsTimer = !self.discardedEntries.isEmpty
|
|
if needsTimer {
|
|
if self.oldEntriesTimer == nil {
|
|
self.oldEntriesTimer = SwiftSignalKit.Timer(timeout: 0.2, repeat: true, completion: { [weak self] in
|
|
guard let self = self else {
|
|
return
|
|
}
|
|
let disposables = self.discardOldEntries()
|
|
for disposable in disposables {
|
|
disposable.dispose()
|
|
}
|
|
}, queue: self.queue)
|
|
self.oldEntriesTimer?.start()
|
|
}
|
|
} else if let oldEntriesTimer = self.oldEntriesTimer {
|
|
self.oldEntriesTimer = nil
|
|
oldEntriesTimer.invalidate()
|
|
}
|
|
}
|
|
|
|
private func discardOldEntries() -> [Disposable] {
|
|
let timestamp = CFAbsoluteTimeGetCurrent()
|
|
|
|
var result: [Disposable] = []
|
|
for i in (0 ..< self.discardedEntries.count).reversed() {
|
|
if self.discardedEntries[i].timestamp < timestamp - 0.5 {
|
|
result.append(self.discardedEntries[i].entry.disposable)
|
|
Logger.shared.log("ManagedMessageHistoryHoles", "Removing discarded entry \(self.discardedEntries[i].entry)")
|
|
self.discardedEntries.remove(at: i)
|
|
}
|
|
}
|
|
|
|
return result
|
|
}
|
|
|
|
func update(entries: Set<MessageHistoryHolesViewEntry>) {
|
|
//let removed: [Disposable] = []
|
|
var added: [PendingEntry] = []
|
|
|
|
let timestamp = CFAbsoluteTimeGetCurrent()
|
|
let _ = timestamp
|
|
|
|
/*for i in (0 ..< self.pendingEntries.count).reversed() {
|
|
if !entries.contains(self.pendingEntries[i].entry) {
|
|
Logger.shared.log("ManagedMessageHistoryHoles", "Stashing entry \(self.pendingEntries[i])")
|
|
self.discardedEntries.append(DiscardedEntry(entry: self.pendingEntries[i], timestamp: timestamp))
|
|
self.pendingEntries.remove(at: i)
|
|
}
|
|
}*/
|
|
|
|
for entry in entries {
|
|
if let previousTimestamp = self.completedEntries[entry] {
|
|
if previousTimestamp >= CFAbsoluteTimeGetCurrent() - 20.0 {
|
|
Logger.shared.log("ManagedMessageHistoryHoles", "Not adding recently finished entry \(entry)")
|
|
continue
|
|
}
|
|
}
|
|
|
|
switch entry.hole {
|
|
case let .peer(peerHole):
|
|
let key = LocationKey(peerId: peerHole.peerId, threadId: peerHole.threadId, space: entry.space)
|
|
if !self.pendingEntries.contains(where: { $0.key == key }) {
|
|
if let discardedIndex = self.discardedEntries.firstIndex(where: { $0.entry.entry == entry }) {
|
|
let discardedEntry = self.discardedEntries.remove(at: discardedIndex)
|
|
Logger.shared.log("ManagedMessageHistoryHoles", "Taking discarded entry \(discardedEntry.entry)")
|
|
self.pendingEntries.append(discardedEntry.entry)
|
|
} else {
|
|
let disposable = MetaDisposable()
|
|
let id = self.nextEntryId
|
|
self.nextEntryId += 1
|
|
let pendingEntry = PendingEntry(id: id, key: key, entry: entry, disposable: disposable)
|
|
self.pendingEntries.append(pendingEntry)
|
|
Logger.shared.log("ManagedMessageHistoryHoles", "Adding pending entry \(pendingEntry), discarded entries: \(self.discardedEntries.map(\.entry))")
|
|
added.append(pendingEntry)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
self.updateNeedsTimer()
|
|
|
|
for pendingEntry in added {
|
|
let id = pendingEntry.id
|
|
let entry = pendingEntry.entry
|
|
switch pendingEntry.entry.hole {
|
|
case let .peer(hole):
|
|
pendingEntry.disposable.set((fetchMessageHistoryHole(
|
|
accountPeerId: self.accountPeerId,
|
|
source: .network(self.network),
|
|
postbox: self.postbox,
|
|
peerInput: .direct(peerId: hole.peerId, threadId: hole.threadId), namespace: hole.namespace, direction: pendingEntry.entry.direction, space: pendingEntry.entry.space, count: pendingEntry.entry.count)
|
|
|> deliverOn(self.queue)).start(completed: { [weak self] in
|
|
guard let self = self else {
|
|
return
|
|
}
|
|
self.pendingEntries.removeAll(where: { $0.id == id })
|
|
self.completedEntries[entry] = CFAbsoluteTimeGetCurrent()
|
|
self.update(entries: self.currentEntries)
|
|
}))
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func managedMessageHistoryHoles(accountPeerId: PeerId, network: Network, postbox: Postbox) -> ((PeerId) -> Void, Disposable) {
|
|
let sharedQueue = Queue()
|
|
|
|
var context: QueueLocalObject<ManagedMessageHistoryHolesContext>? = QueueLocalObject<ManagedMessageHistoryHolesContext>(queue: sharedQueue, generate: {
|
|
return ManagedMessageHistoryHolesContext(
|
|
queue: sharedQueue,
|
|
accountPeerId: accountPeerId,
|
|
postbox: postbox,
|
|
network: network,
|
|
entries: postbox.messageHistoryHolesView() |> map { view in
|
|
return view.entries
|
|
}
|
|
)
|
|
})
|
|
|
|
return ({ [weak context] peerId in
|
|
context?.with { context in
|
|
context.resetPeer(peerId: peerId)
|
|
}
|
|
}, ActionDisposable {
|
|
if context != nil {
|
|
context = nil
|
|
}
|
|
})
|
|
}
|