Conference updates

This commit is contained in:
Isaac 2025-03-30 22:40:22 +04:00
parent 7c73b91129
commit 2c32a62f6a
7 changed files with 169 additions and 43 deletions

View File

@ -683,6 +683,10 @@ private final class ConferenceCallE2EContextStateImpl: ConferenceCallE2EContextS
return self.call.emojiState() return self.call.emojiState()
} }
func getParticipantIds() -> [Int64] {
return self.call.participantIds().compactMap { $0.int64Value }
}
func applyBlock(block: Data) { func applyBlock(block: Data) {
self.call.applyBlock(block) self.call.applyBlock(block)
} }
@ -691,6 +695,10 @@ private final class ConferenceCallE2EContextStateImpl: ConferenceCallE2EContextS
self.call.applyBroadcastBlock(block) self.call.applyBroadcastBlock(block)
} }
func generateRemoveParticipantsBlock(participantIds: [Int64]) -> Data? {
return self.call.generateRemoveParticipantsBlock(participantIds.map { $0 as NSNumber })
}
func takeOutgoingBroadcastBlocks() -> [Data] { func takeOutgoingBroadcastBlocks() -> [Data] {
return self.call.takeOutgoingBroadcastBlocks() return self.call.takeOutgoingBroadcastBlocks()
} }
@ -1336,6 +1344,8 @@ public final class PresentationGroupCallImpl: PresentationGroupCall {
if case let .established(_, _, _, ssrc, _) = self.internalState, ssrc == participantUpdate.ssrc { if case let .established(_, _, _, ssrc, _) = self.internalState, ssrc == participantUpdate.ssrc {
self.markAsCanBeRemoved() self.markAsCanBeRemoved()
} }
} else {
self.e2eContext?.synchronizeRemovedParticipants()
} }
} else if participantUpdate.peerId == self.joinAsPeerId { } else if participantUpdate.peerId == self.joinAsPeerId {
if case let .established(_, connectionMode, _, ssrc, _) = self.internalState { if case let .established(_, connectionMode, _, ssrc, _) = self.internalState {

View File

@ -356,7 +356,7 @@ extension VideoChatScreenComponent.View {
}))) })))
} }
if callState.isVideoEnabled && (callState.muteState?.canUnmute ?? true) { if case let .group(groupCall) = currentCall, !groupCall.isConference, callState.isVideoEnabled && (callState.muteState?.canUnmute ?? true) {
if currentCall.hasScreencast { if currentCall.hasScreencast {
items.append(.action(ContextMenuActionItem(text: environment.strings.VoiceChat_StopScreenSharing, icon: { theme in items.append(.action(ContextMenuActionItem(text: environment.strings.VoiceChat_StopScreenSharing, icon: { theme in
return generateTintedImage(image: UIImage(bundleImageName: "Call/Context Menu/ShareScreen"), color: theme.actionSheet.primaryTextColor) return generateTintedImage(image: UIImage(bundleImageName: "Call/Context Menu/ShareScreen"), color: theme.actionSheet.primaryTextColor)

View File

@ -3,10 +3,13 @@ import SwiftSignalKit
public protocol ConferenceCallE2EContextState: AnyObject { public protocol ConferenceCallE2EContextState: AnyObject {
func getEmojiState() -> Data? func getEmojiState() -> Data?
func getParticipantIds() -> [Int64]
func applyBlock(block: Data) func applyBlock(block: Data)
func applyBroadcastBlock(block: Data) func applyBroadcastBlock(block: Data)
func generateRemoveParticipantsBlock(participantIds: [Int64]) -> Data?
func takeOutgoingBroadcastBlocks() -> [Data] func takeOutgoingBroadcastBlocks() -> [Data]
func encrypt(message: Data) -> Data? func encrypt(message: Data) -> Data?
@ -43,6 +46,12 @@ public final class ConferenceCallE2EContext {
private var e2ePoll1Timer: Foundation.Timer? private var e2ePoll1Timer: Foundation.Timer?
private var e2ePoll1Disposable: Disposable? private var e2ePoll1Disposable: Disposable?
private var isSynchronizingRemovedParticipants: Bool = false
private var scheduledSynchronizeRemovedParticipants: Bool = false
private var scheduledSynchronizeRemovedParticipantsAfterPoll: Bool = false
private var synchronizeRemovedParticipantsDisposable: Disposable?
private var synchronizeRemovedParticipantsTimer: Foundation.Timer?
init(queue: Queue, engine: TelegramEngine, callId: Int64, accessHash: Int64, reference: InternalGroupCallReference, state: Atomic<ContextStateHolder>, initializeState: @escaping (TelegramKeyPair, Data) -> ConferenceCallE2EContextState?, keyPair: TelegramKeyPair) { init(queue: Queue, engine: TelegramEngine, callId: Int64, accessHash: Int64, reference: InternalGroupCallReference, state: Atomic<ContextStateHolder>, initializeState: @escaping (TelegramKeyPair, Data) -> ConferenceCallE2EContextState?, keyPair: TelegramKeyPair) {
precondition(queue.isCurrent()) precondition(queue.isCurrent())
precondition(Queue.mainQueue().isCurrent()) precondition(Queue.mainQueue().isCurrent())
@ -62,9 +71,19 @@ public final class ConferenceCallE2EContext {
self.e2ePoll0Disposable?.dispose() self.e2ePoll0Disposable?.dispose()
self.e2ePoll1Timer?.invalidate() self.e2ePoll1Timer?.invalidate()
self.e2ePoll1Disposable?.dispose() self.e2ePoll1Disposable?.dispose()
self.synchronizeRemovedParticipantsDisposable?.dispose()
self.synchronizeRemovedParticipantsTimer?.invalidate()
} }
func begin() { func begin() {
self.scheduledSynchronizeRemovedParticipantsAfterPoll = true
self.synchronizeRemovedParticipantsTimer = Foundation.Timer.scheduledTimer(withTimeInterval: 10.0, repeats: true, block: { [weak self] _ in
guard let self else {
return
}
self.synchronizeRemovedParticipants()
})
self.e2ePoll(subChainId: 0) self.e2ePoll(subChainId: 0)
self.e2ePoll(subChainId: 1) self.e2ePoll(subChainId: 1)
} }
@ -189,6 +208,11 @@ public final class ConferenceCallE2EContext {
} }
self.e2ePoll(subChainId: 0) self.e2ePoll(subChainId: 0)
}) })
if self.scheduledSynchronizeRemovedParticipantsAfterPoll {
self.scheduledSynchronizeRemovedParticipantsAfterPoll = false
self.synchronizeRemovedParticipants()
}
} else if subChainId == 1 { } else if subChainId == 1 {
self.e2ePoll1Timer?.invalidate() self.e2ePoll1Timer?.invalidate()
self.e2ePoll1Timer = Foundation.Timer.scheduledTimer(withTimeInterval: delayPoll ? 1.0 : 0.0, repeats: false, block: { [weak self] _ in self.e2ePoll1Timer = Foundation.Timer.scheduledTimer(withTimeInterval: delayPoll ? 1.0 : 0.0, repeats: false, block: { [weak self] _ in
@ -208,7 +232,83 @@ public final class ConferenceCallE2EContext {
} }
func synchronizeRemovedParticipants() { func synchronizeRemovedParticipants() {
if self.isSynchronizingRemovedParticipants {
self.scheduledSynchronizeRemovedParticipants = true
return
}
self.isSynchronizingRemovedParticipants = true
let engine = self.engine
let state = self.state
let callId = self.callId
let accessHash = self.accessHash
self.synchronizeRemovedParticipantsDisposable?.dispose()
self.synchronizeRemovedParticipantsDisposable = (_internal_getGroupCallParticipants(
account: self.engine.account,
reference: self.reference,
offset: "",
ssrcs: [],
limit: 100,
sortAscending: true
)
|> map(Optional.init)
|> `catch` { _ -> Signal<GroupCallParticipantsContext.State?, NoError> in
return .single(nil)
}
|> mapToSignal { result -> Signal<Bool, NoError> in
guard let result else {
return .single(false)
}
let blockchainPeerIds = state.with { state -> [Int64] in
guard let state = state.state else {
return []
}
return state.getParticipantIds()
}
// Peer ids that are in the blockchain but not in the server list
let removedPeerIds = blockchainPeerIds.filter { blockchainPeerId in
return !result.participants.contains(where: { $0.peer.id.id._internalGetInt64Value() == blockchainPeerId })
}
if removedPeerIds.isEmpty {
return .single(false)
}
guard let removeBlock = state.with({ state -> Data? in
guard let state = state.state else {
return nil
}
return state.generateRemoveParticipantsBlock(participantIds: removedPeerIds)
}) else {
return .single(false)
}
return engine.calls.removeGroupCallBlockchainParticipants(callId: callId, accessHash: accessHash, participantIds: removedPeerIds, block: removeBlock)
|> map { result -> Bool in
switch result {
case .success:
return true
case .pollBlocksAndRetry:
return false
}
}
}
|> deliverOn(self.queue)).startStrict(next: { [weak self] shouldRetry in
guard let self else {
return
}
self.isSynchronizingRemovedParticipants = false
if self.scheduledSynchronizeRemovedParticipants {
self.scheduledSynchronizeRemovedParticipants = false
self.synchronizeRemovedParticipants()
} else if shouldRetry && !self.scheduledSynchronizeRemovedParticipantsAfterPoll {
self.scheduledSynchronizeRemovedParticipantsAfterPoll = true
self.e2ePoll(subChainId: 0)
}
})
} }
} }

View File

@ -854,54 +854,32 @@ func _internal_inviteConferenceCallParticipant(account: Account, callId: Int64,
} }
} }
public enum RemoveGroupCallBlockchainParticipantError { public enum RemoveGroupCallBlockchainParticipantsResult {
case generic case success
case pollBlocksAndRetry case pollBlocksAndRetry
} }
func _internal_removeGroupCallBlockchainParticipants(account: Account, callId: Int64, accessHash: Int64, block: @escaping ([EnginePeer.Id]) -> Data?) -> Signal<Never, RemoveGroupCallBlockchainParticipantError> { func _internal_removeGroupCallBlockchainParticipants(account: Account, callId: Int64, accessHash: Int64, participantIds: [Int64], block: Data) -> Signal<RemoveGroupCallBlockchainParticipantsResult, NoError> {
/*let blockSignal = _internal_getGroupCallParticipants(account: account, callId: callId, accessHash: accessHash, offset: "", ssrcs: [], limit: 1000, sortAscending: nil) return account.postbox.transaction { transaction -> [Api.InputPeer] in
|> mapError { _ -> RemoveGroupCallBlockchainParticipantError in return participantIds.map { participantId in
return .generic let participantPeerId = PeerId(namespace: Namespaces.Peer.CloudUser, id: PeerId.Id._internalFromInt64Value(participantId))
} if let peer = transaction.getPeer(participantPeerId).flatMap(apiInputPeer) {
|> map { result -> Data? in return peer
return block(result.participants.map(\.peer.id))
}
let signal: Signal<Never, RemoveGroupCallBlockchainParticipantError> = blockSignal
|> mapToSignal { block -> Signal<Never, RemoveGroupCallBlockchainParticipantError> in
guard let block else {
return .complete()
}
return
}
account.postbox.transaction { transaction -> Api.InputPeer? in
return transaction.getPeer(participantId).flatMap(apiInputPeer)
}
|> castError(RemoveGroupCallBlockchainParticipantError.self)
|> mapToSignal { inputPeer -> Signal<Never, RemoveGroupCallBlockchainParticipantError> in
guard let inputPeer else {
return .fail(.generic)
}
return account.network.request(Api.functions.phone.deleteConferenceCallParticipant(call: .inputGroupCall(id: callId, accessHash: accessHash), peer: inputPeer, block: Buffer(data: block)))
|> mapError { error -> RemoveGroupCallBlockchainParticipantError in
if error.errorDescription.hasPrefix("CONF_WRITE_CHAIN_INVALID") {
return .pollBlocksAndRetry
} else { } else {
return .generic return .inputPeerUser(userId: participantId, accessHash: 0)
} }
} }
|> mapToSignal { result -> Signal<Never, RemoveGroupCallBlockchainParticipantError> in }
account.stateManager.addUpdates(result) |> mapToSignal { inputUsers -> Signal<RemoveGroupCallBlockchainParticipantsResult, NoError> in
return account.network.request(Api.functions.phone.deleteConferenceCallParticipants(call: .inputGroupCall(id: callId, accessHash: accessHash), ids: inputUsers, block: Buffer(data: block)))
return .complete() |> map { updates -> RemoveGroupCallBlockchainParticipantsResult in
account.stateManager.addUpdates(updates)
return .success
}
|> `catch` { _ -> Signal<RemoveGroupCallBlockchainParticipantsResult, NoError> in
return .single(.pollBlocksAndRetry)
} }
} }
return signal*/
//TODO:release
return .complete()
} }
public struct JoinGroupCallAsScreencastResult { public struct JoinGroupCallAsScreencastResult {

View File

@ -109,8 +109,8 @@ public extension TelegramEngine {
return _internal_inviteConferenceCallParticipant(account: self.account, callId: callId, accessHash: accessHash, peerId: peerId) return _internal_inviteConferenceCallParticipant(account: self.account, callId: callId, accessHash: accessHash, peerId: peerId)
} }
public func removeGroupCallBlockchainParticipant(callId: Int64, accessHash: Int64, block: @escaping ([EnginePeer.Id]) -> Data?) -> Signal<Never, RemoveGroupCallBlockchainParticipantError> { public func removeGroupCallBlockchainParticipants(callId: Int64, accessHash: Int64, participantIds: [Int64], block: Data) -> Signal<RemoveGroupCallBlockchainParticipantsResult, NoError> {
return _internal_removeGroupCallBlockchainParticipants(account: self.account, callId: callId, accessHash: accessHash, block: block) return _internal_removeGroupCallBlockchainParticipants(account: self.account, callId: callId, accessHash: accessHash, participantIds: participantIds, block: block)
} }
public func clearCachedGroupCallDisplayAsAvailablePeers(peerId: PeerId) -> Signal<Never, NoError> { public func clearCachedGroupCallDisplayAsAvailablePeers(peerId: PeerId) -> Signal<Never, NoError> {

View File

@ -35,10 +35,13 @@ NS_ASSUME_NONNULL_BEGIN
- (NSArray<NSData *> *)takeOutgoingBroadcastBlocks; - (NSArray<NSData *> *)takeOutgoingBroadcastBlocks;
- (NSData *)emojiState; - (NSData *)emojiState;
- (NSArray<NSNumber *> *)participantIds;
- (void)applyBlock:(NSData *)block; - (void)applyBlock:(NSData *)block;
- (void)applyBroadcastBlock:(NSData *)block; - (void)applyBroadcastBlock:(NSData *)block;
- (nullable NSData *)generateRemoveParticipantsBlock:(NSArray<NSNumber *> *)participantIds;
- (nullable NSData *)encrypt:(NSData *)message; - (nullable NSData *)encrypt:(NSData *)message;
- (nullable NSData *)decrypt:(NSData *)message; - (nullable NSData *)decrypt:(NSData *)message;

View File

@ -159,6 +159,19 @@ static NSString *hexStringFromData(NSData *data) {
return outEmojiHash; return outEmojiHash;
} }
- (NSArray<NSNumber *> *)participantIds {
auto result = tde2e_api::call_get_state(_callId);
if (!result.is_ok()) {
return @[];
}
auto state = result.value();
NSMutableArray<NSNumber *> *participantIds = [[NSMutableArray alloc] init];
for (const auto &it : state.participants) {
[participantIds addObject:[NSNumber numberWithLongLong:it.user_id]];
}
return participantIds;
}
- (void)applyBlock:(NSData *)block { - (void)applyBlock:(NSData *)block {
std::string mappedBlock((uint8_t *)block.bytes, ((uint8_t *)block.bytes) + block.length); std::string mappedBlock((uint8_t *)block.bytes, ((uint8_t *)block.bytes) + block.length);
@ -195,6 +208,28 @@ static NSString *hexStringFromData(NSData *data) {
} }
} }
- (nullable NSData *)generateRemoveParticipantsBlock:(NSArray<NSNumber *> *)participantIds {
auto stateResult = tde2e_api::call_get_state(_callId);
if (!stateResult.is_ok()) {
return nil;
}
auto state = stateResult.value();
for (NSNumber *participantId in participantIds) {
auto it = std::find_if(state.participants.begin(), state.participants.end(), [participantId](const tde2e_api::CallParticipant &participant) {
return participant.user_id == [participantId longLongValue];
});
if (it != state.participants.end()) {
state.participants.erase(it);
}
}
auto result = tde2e_api::call_create_change_state_block(_callId, state);
if (!result.is_ok()) {
return nil;
}
return [[NSData alloc] initWithBytes:result.value().data() length:result.value().size()];
}
- (nullable NSData *)encrypt:(NSData *)message { - (nullable NSData *)encrypt:(NSData *)message {
std::string mappedMessage((uint8_t *)message.bytes, ((uint8_t *)message.bytes) + message.length); std::string mappedMessage((uint8_t *)message.bytes, ((uint8_t *)message.bytes) + message.length);
auto result = tde2e_api::call_encrypt(_callId, mappedMessage); auto result = tde2e_api::call_encrypt(_callId, mappedMessage);