Conference updates

This commit is contained in:
Isaac
2025-04-04 16:51:46 +04:00
parent e8b7f53c84
commit 0a499f4e2b
10 changed files with 340 additions and 92 deletions

View File

@@ -52,6 +52,8 @@ public final class ConferenceCallE2EContext {
private var scheduledSynchronizeRemovedParticipantsAfterPoll: Bool = false
private var synchronizeRemovedParticipantsDisposable: Disposable?
private var synchronizeRemovedParticipantsTimer: Foundation.Timer?
private var pendingKickPeers: [EnginePeer.Id] = []
init(queue: Queue, engine: TelegramEngine, callId: Int64, accessHash: Int64, userId: Int64, reference: InternalGroupCallReference, state: Atomic<ContextStateHolder>, initializeState: @escaping (TelegramKeyPair, Int64, Data) -> ConferenceCallE2EContextState?, keyPair: TelegramKeyPair) {
precondition(queue.isCurrent())
@@ -91,37 +93,53 @@ public final class ConferenceCallE2EContext {
}
func addChainBlocksUpdate(subChainId: Int, blocks: [Data], nextOffset: Int) {
var processBlock = true
let updateBaseOffset = nextOffset - blocks.count
if subChainId == 0 {
if let e2ePoll0Offset = self.e2ePoll0Offset {
if e2ePoll0Offset == updateBaseOffset {
self.e2ePoll0Offset = nextOffset
} else if e2ePoll0Offset < updateBaseOffset {
self.e2ePoll(subChainId: subChainId)
} else {
processBlock = false
var blocksToProcess: [Data] = []
var shouldPoll = false
for i in 0 ..< blocks.count {
let blockOffset = updateBaseOffset + i
if subChainId == 0 {
if var e2ePoll0Offset = self.e2ePoll0Offset {
if blockOffset == e2ePoll0Offset {
e2ePoll0Offset += 1
self.e2ePoll0Offset = e2ePoll0Offset
blocksToProcess.append(blocks[i])
} else if blockOffset > e2ePoll0Offset {
shouldPoll = true
}
}
} else {
processBlock = false
}
} else if subChainId == 1 {
if let e2ePoll1Offset = self.e2ePoll1Offset {
if e2ePoll1Offset == updateBaseOffset {
self.e2ePoll1Offset = nextOffset
} else if e2ePoll1Offset < updateBaseOffset {
self.e2ePoll(subChainId: subChainId)
} else {
processBlock = false
} else if subChainId == 1 {
if var e2ePoll1Offset = self.e2ePoll1Offset {
if blockOffset == e2ePoll1Offset {
e2ePoll1Offset += 1
self.e2ePoll1Offset = e2ePoll1Offset
blocksToProcess.append(blocks[i])
} else if blockOffset > e2ePoll1Offset {
shouldPoll = true
}
}
} else {
processBlock = false
}
} else {
processBlock = false
}
if processBlock {
self.addE2EBlocks(blocks: blocks, subChainId: subChainId)
if !blocksToProcess.isEmpty {
if subChainId == 0 {
if self.e2ePoll0Disposable != nil {
self.e2ePoll0Disposable?.dispose()
self.e2ePoll0Disposable = nil
shouldPoll = true
}
} else if subChainId == 1 {
if self.e2ePoll1Disposable != nil {
self.e2ePoll1Disposable?.dispose()
self.e2ePoll1Disposable = nil
shouldPoll = true
}
}
self.addE2EBlocks(blocks: blocksToProcess, subChainId: subChainId)
}
if shouldPoll {
self.e2ePoll(subChainId: subChainId)
}
}
@@ -186,6 +204,14 @@ public final class ConferenceCallE2EContext {
guard let self else {
return
}
if subChainId == 0 {
self.e2ePoll0Disposable?.dispose()
self.e2ePoll0Disposable = nil
} else if subChainId == 1 {
self.e2ePoll1Disposable?.dispose()
self.e2ePoll1Disposable = nil
}
var delayPoll = true
if let result {
@@ -247,71 +273,139 @@ public final class ConferenceCallE2EContext {
let callId = self.callId
let accessHash = self.accessHash
self.synchronizeRemovedParticipantsDisposable?.dispose()
self.synchronizeRemovedParticipantsDisposable = (_internal_getGroupCallParticipants(
account: self.engine.account,
reference: self.reference,
offset: "",
ssrcs: [],
limit: 100,
sortAscending: true
)
|> map(Optional.init)
|> `catch` { _ -> Signal<GroupCallParticipantsContext.State?, NoError> in
return .single(nil)
}
|> mapToSignal { result -> Signal<Bool, NoError> in
guard let result else {
return .single(false)
}
let blockchainPeerIds = state.with { state -> [Int64] in
guard let state = state.state else {
return []
}
return state.getParticipantIds()
}
// Peer ids that are in the blockchain but not in the server list
let removedPeerIds = blockchainPeerIds.filter { blockchainPeerId in
return !result.participants.contains(where: { $0.peer.id.id._internalGetInt64Value() == blockchainPeerId })
}
if !self.pendingKickPeers.isEmpty {
let pendingKickPeers = self.pendingKickPeers
self.pendingKickPeers.removeAll()
if removedPeerIds.isEmpty {
return .single(false)
}
guard let removeBlock = state.with({ state -> Data? in
self.synchronizeRemovedParticipantsDisposable?.dispose()
let removeBlock = state.with({ state -> Data? in
guard let state = state.state else {
return nil
}
return state.generateRemoveParticipantsBlock(participantIds: removedPeerIds)
}) else {
return .single(false)
}
return engine.calls.removeGroupCallBlockchainParticipants(callId: callId, accessHash: accessHash, mode: .cleanup, participantIds: removedPeerIds, block: removeBlock)
|> map { result -> Bool in
switch result {
case .success:
return true
case .pollBlocksAndRetry:
return false
let currentIds = state.getParticipantIds()
let remainingIds = pendingKickPeers.filter({ currentIds.contains($0.id._internalGetInt64Value()) })
if remainingIds.isEmpty {
return nil
}
return state.generateRemoveParticipantsBlock(participantIds: remainingIds.map { $0.id._internalGetInt64Value() })
})
if let removeBlock {
self.synchronizeRemovedParticipantsDisposable = (engine.calls.removeGroupCallBlockchainParticipants(callId: callId, accessHash: accessHash, mode: .kick, participantIds: pendingKickPeers.map { $0.id._internalGetInt64Value() }, block: removeBlock)
|> map { result -> Bool in
switch result {
case .success:
return true
case .pollBlocksAndRetry:
return false
}
}
|> deliverOnMainQueue).startStrict(next: { [weak self] shouldRetry in
guard let self else {
return
}
if shouldRetry {
for id in pendingKickPeers {
if !self.pendingKickPeers.contains(id) {
self.pendingKickPeers.append(id)
}
}
}
self.isSynchronizingRemovedParticipants = false
if self.scheduledSynchronizeRemovedParticipants {
self.scheduledSynchronizeRemovedParticipants = false
self.synchronizeRemovedParticipants()
} else if shouldRetry && !self.scheduledSynchronizeRemovedParticipantsAfterPoll {
self.scheduledSynchronizeRemovedParticipantsAfterPoll = true
self.e2ePoll(subChainId: 0)
}
})
} else {
self.isSynchronizingRemovedParticipants = false
if self.scheduledSynchronizeRemovedParticipants {
self.scheduledSynchronizeRemovedParticipants = false
self.synchronizeRemovedParticipants()
}
}
} else {
self.synchronizeRemovedParticipantsDisposable?.dispose()
self.synchronizeRemovedParticipantsDisposable = (_internal_getGroupCallParticipants(
account: self.engine.account,
reference: self.reference,
offset: "",
ssrcs: [],
limit: 100,
sortAscending: true
)
|> map(Optional.init)
|> `catch` { _ -> Signal<GroupCallParticipantsContext.State?, NoError> in
return .single(nil)
}
|> mapToSignal { result -> Signal<Bool, NoError> in
guard let result else {
return .single(false)
}
let blockchainPeerIds = state.with { state -> [Int64] in
guard let state = state.state else {
return []
}
return state.getParticipantIds()
}
// Peer ids that are in the blockchain but not in the server list
let removedPeerIds = blockchainPeerIds.filter { blockchainPeerId in
return !result.participants.contains(where: { $0.peer.id.id._internalGetInt64Value() == blockchainPeerId })
}
if removedPeerIds.isEmpty {
return .single(false)
}
guard let removeBlock = state.with({ state -> Data? in
guard let state = state.state else {
return nil
}
return state.generateRemoveParticipantsBlock(participantIds: removedPeerIds)
}) else {
return .single(false)
}
return engine.calls.removeGroupCallBlockchainParticipants(callId: callId, accessHash: accessHash, mode: .cleanup, participantIds: removedPeerIds, block: removeBlock)
|> map { result -> Bool in
switch result {
case .success:
return true
case .pollBlocksAndRetry:
return false
}
}
}
|> deliverOn(self.queue)).startStrict(next: { [weak self] shouldRetry in
guard let self else {
return
}
self.isSynchronizingRemovedParticipants = false
if self.scheduledSynchronizeRemovedParticipants {
self.scheduledSynchronizeRemovedParticipants = false
self.synchronizeRemovedParticipants()
} else if shouldRetry && !self.scheduledSynchronizeRemovedParticipantsAfterPoll {
self.scheduledSynchronizeRemovedParticipantsAfterPoll = true
self.e2ePoll(subChainId: 0)
}
})
}
}
func kickPeer(id: EnginePeer.Id) {
//TODO:release
if !self.pendingKickPeers.contains(id) {
self.pendingKickPeers.append(id)
self.synchronizeRemovedParticipants()
}
|> deliverOn(self.queue)).startStrict(next: { [weak self] shouldRetry in
guard let self else {
return
}
self.isSynchronizingRemovedParticipants = false
if self.scheduledSynchronizeRemovedParticipants {
self.scheduledSynchronizeRemovedParticipants = false
self.synchronizeRemovedParticipants()
} else if shouldRetry && !self.scheduledSynchronizeRemovedParticipantsAfterPoll {
self.scheduledSynchronizeRemovedParticipantsAfterPoll = true
self.e2ePoll(subChainId: 0)
}
})
}
}
@@ -349,4 +443,10 @@ public final class ConferenceCallE2EContext {
impl.synchronizeRemovedParticipants()
}
}
public func kickPeer(id: EnginePeer.Id) {
self.impl.with { impl in
impl.kickPeer(id: id)
}
}
}