diff --git a/submodules/TelegramCallsUI/Sources/PresentationGroupCall.swift b/submodules/TelegramCallsUI/Sources/PresentationGroupCall.swift index b417c23c5b..cc04aaffe3 100644 --- a/submodules/TelegramCallsUI/Sources/PresentationGroupCall.swift +++ b/submodules/TelegramCallsUI/Sources/PresentationGroupCall.swift @@ -672,6 +672,38 @@ private final class PendingConferenceInvitationContext { } } +private final class ConferenceCallE2EContextStateImpl: ConferenceCallE2EContextState { + private let call: TdCall + + init(call: TdCall) { + self.call = call + } + + func getEmojiState() -> Data? { + return self.call.emojiState() + } + + func applyBlock(block: Data) { + self.call.applyBlock(block) + } + + func applyBroadcastBlock(block: Data) { + self.call.applyBroadcastBlock(block) + } + + func takeOutgoingBroadcastBlocks() -> [Data] { + return self.call.takeOutgoingBroadcastBlocks() + } + + func encrypt(message: Data) -> Data? { + return self.call.encrypt(message) + } + + func decrypt(message: Data) -> Data? { + return self.call.decrypt(message) + } +} + public final class PresentationGroupCallImpl: PresentationGroupCall { private enum InternalState { case requesting @@ -820,20 +852,6 @@ public final class PresentationGroupCallImpl: PresentationGroupCall { private let keyPair: TelegramKeyPair? - private final class E2ECallState { - var call: TdCall? - var pendingIncomingBroadcastBlocks: [Data] = [] - } - private let e2eCall = Atomic(value: E2ECallState()) - - private var e2ePoll0Offset: Int? - private var e2ePoll0Timer: Foundation.Timer? - private var e2ePoll0Disposable: Disposable? - - private var e2ePoll1Offset: Int? - private var e2ePoll1Timer: Foundation.Timer? - private var e2ePoll1Disposable: Disposable? - private var temporaryJoinTimestamp: Int32 private var temporaryActivityTimestamp: Double? private var temporaryActivityRank: Int? @@ -900,9 +918,8 @@ public final class PresentationGroupCallImpl: PresentationGroupCall { } private let isNoiseSuppressionEnabledDisposable = MetaDisposable() - private let e2eEncryptionKeyHashValue = ValuePromise(nil) public var e2eEncryptionKeyHash: Signal { - return self.e2eEncryptionKeyHashValue.get() + return self.e2eContext?.e2eEncryptionKeyHash ?? .single(nil) } private var isVideoMuted: Bool = false @@ -1110,6 +1127,8 @@ public final class PresentationGroupCallImpl: PresentationGroupCall { public var pendingDisconnedUpgradedConferenceCall: PresentationCallImpl? private var pendingDisconnedUpgradedConferenceCallTimer: Foundation.Timer? private var conferenceInvitationContexts: [PeerId: PendingConferenceInvitationContext] = [:] + + private let e2eContext: ConferenceCallE2EContext? init( accountContext: AccountContext, @@ -1157,6 +1176,27 @@ public final class PresentationGroupCallImpl: PresentationGroupCall { self.conferenceSourceId = conferenceSourceId self.isConference = isConference self.keyPair = keyPair + + if let keyPair, let initialCall { + self.e2eContext = ConferenceCallE2EContext( + engine: accountContext.engine, + callId: initialCall.description.id, + accessHash: initialCall.description.accessHash, + reference: initialCall.reference, + keyPair: keyPair, + initializeState: { keyPair, block in + guard let keyPair = TdKeyPair(keyId: keyPair.id, publicKey: keyPair.publicKey.data) else { + return nil + } + guard let call = TdCall.make(with: keyPair, latestBlock: block) else { + return nil + } + return ConferenceCallE2EContextStateImpl(call: call) + } + ) + } else { + self.e2eContext = nil + } var sharedAudioContext = sharedAudioContext if sharedAudioContext == nil { @@ -1321,39 +1361,8 @@ public final class PresentationGroupCallImpl: PresentationGroupCall { self.markAsCanBeRemoved() } case let .conferenceChainBlocks(subChainId, blocks, nextOffset): - if let _ = self.keyPair { - var processBlock = true - let updateBaseOffset = nextOffset - blocks.count - if subChainId == 0 { - if let e2ePoll0Offset = self.e2ePoll0Offset { - if e2ePoll0Offset == updateBaseOffset { - self.e2ePoll0Offset = nextOffset - } else if e2ePoll0Offset < updateBaseOffset { - self.e2ePoll(subChainId: subChainId) - } else { - processBlock = false - } - } else { - processBlock = false - } - } else if subChainId == 1 { - if let e2ePoll1Offset = self.e2ePoll1Offset { - if e2ePoll1Offset == updateBaseOffset { - self.e2ePoll1Offset = nextOffset - } else if e2ePoll1Offset < updateBaseOffset { - self.e2ePoll(subChainId: subChainId) - } else { - processBlock = false - } - } else { - processBlock = false - } - } else { - processBlock = false - } - if processBlock { - self.addE2EBlocks(blocks: blocks, subChainId: subChainId) - } + if let e2eContext = self.e2eContext { + e2eContext.addChainBlocksUpdate(subChainId: subChainId, blocks: blocks, nextOffset: nextOffset) } } } @@ -1511,10 +1520,6 @@ public final class PresentationGroupCallImpl: PresentationGroupCall { self.peerUpdatesSubscription?.dispose() self.screencastStateDisposable?.dispose() self.pendingDisconnedUpgradedConferenceCallTimer?.invalidate() - self.e2ePoll0Timer?.invalidate() - self.e2ePoll0Disposable?.dispose() - self.e2ePoll1Timer?.invalidate() - self.e2ePoll1Disposable?.dispose() } private func switchToTemporaryParticipantsContext(sourceContext: GroupCallParticipantsContext?, oldMyPeerId: PeerId) { @@ -2021,24 +2026,28 @@ public final class PresentationGroupCallImpl: PresentationGroupCall { audioIsActiveByDefault = false } - var encryptionContext: OngoingGroupCallEncryptionContext? - if self.isConference { - class OngoingGroupCallEncryptionContextImpl: OngoingGroupCallEncryptionContext { - private let e2eCall: Atomic - - init(e2eCall: Atomic) { - self.e2eCall = e2eCall - } - - func encrypt(message: Data) -> Data? { - return self.e2eCall.with({ $0.call?.encrypt(message) }) - } - - func decrypt(message: Data) -> Data? { - return self.e2eCall.with({ $0.call?.decrypt(message) }) - } + class OngoingGroupCallEncryptionContextImpl: OngoingGroupCallEncryptionContext { + private let e2eCall: Atomic + + init(e2eCall: Atomic) { + self.e2eCall = e2eCall } - encryptionContext = OngoingGroupCallEncryptionContextImpl(e2eCall: self.e2eCall) + + func encrypt(message: Data) -> Data? { + return self.e2eCall.with({ $0.state?.encrypt(message: message) }) + } + + func decrypt(message: Data) -> Data? { + return self.e2eCall.with({ $0.state?.decrypt(message: message) }) + } + } + + var encryptionContext: OngoingGroupCallEncryptionContext? + if let e2eContext = self.e2eContext { + encryptionContext = OngoingGroupCallEncryptionContextImpl(e2eCall: e2eContext.state) + } else if self.isConference { + // Prevent non-encrypted conference calls + encryptionContext = OngoingGroupCallEncryptionContextImpl(e2eCall: Atomic(value: ConferenceCallE2EContext.ContextStateHolder())) } genericCallContext = .call(OngoingGroupCallContext(audioSessionActive: contextAudioSessionActive, video: self.videoCapturer, requestMediaChannelDescriptions: { [weak self] ssrcs, completion in @@ -2284,8 +2293,7 @@ public final class PresentationGroupCallImpl: PresentationGroupCall { self.updateSessionState(internalState: .established(info: joinCallResult.callInfo, connectionMode: joinCallResult.connectionMode, clientParams: clientParams, localSsrc: ssrc, initialState: joinCallResult.state), audioSessionControl: self.audioSessionControl) - self.e2ePoll(subChainId: 0) - self.e2ePoll(subChainId: 1) + self.e2eContext?.begin() }, error: { [weak self] error in guard let self else { return @@ -2898,117 +2906,6 @@ public final class PresentationGroupCallImpl: PresentationGroupCall { } } - private func addE2EBlocks(blocks: [Data], subChainId: Int) { - guard let initialCall = self.initialCall, let keyPair = self.keyPair else { - return - } - let (outBlocks, outEmoji) = self.e2eCall.with({ callState -> ([Data], Data) in - if let call = callState.call { - for block in blocks { - if subChainId == 0 { - call.applyBlock(block) - } else if subChainId == 1 { - call.applyBroadcastBlock(block) - } - } - return (call.takeOutgoingBroadcastBlocks(), call.emojiState()) - } else { - if subChainId == 0 { - guard let block = blocks.last else { - return ([], Data()) - } - guard let keyPair = TdKeyPair(keyId: keyPair.id, publicKey: keyPair.publicKey.data) else { - return ([], Data()) - } - guard let call = TdCall.make(with: keyPair, latestBlock: block) else { - return ([], Data()) - } - callState.call = call - for block in callState.pendingIncomingBroadcastBlocks { - call.applyBroadcastBlock(block) - } - callState.pendingIncomingBroadcastBlocks.removeAll() - return (call.takeOutgoingBroadcastBlocks(), call.emojiState()) - } else if subChainId == 1 { - callState.pendingIncomingBroadcastBlocks.append(contentsOf: blocks) - return ([], Data()) - } else { - return ([], Data()) - } - } - }) - self.e2eEncryptionKeyHashValue.set(outEmoji.isEmpty ? nil : outEmoji) - - //TODO:release queue - for outBlock in outBlocks { - let _ = self.accountContext.engine.calls.sendConferenceCallBroadcast(callId: initialCall.description.id, accessHash: initialCall.description.accessHash, block: outBlock).startStandalone() - } - } - - private func e2ePoll(subChainId: Int) { - guard let initialCall = self.initialCall else { - return - } - - let offset: Int? - if subChainId == 0 { - offset = self.e2ePoll0Offset - self.e2ePoll0Disposable?.dispose() - } else if subChainId == 1 { - offset = self.e2ePoll1Offset - self.e2ePoll1Disposable?.dispose() - } else { - return - } - - let disposable = (self.accountContext.engine.calls.pollConferenceCallBlockchain(reference: initialCall.reference, subChainId: subChainId, offset: offset ?? 0, limit: 10) - |> deliverOnMainQueue).startStrict(next: { [weak self] result in - guard let self else { - return - } - - var delayPoll = true - if let result { - if subChainId == 0 { - if self.e2ePoll0Offset != result.nextOffset { - self.e2ePoll0Offset = result.nextOffset - delayPoll = false - } - } else if subChainId == 1 { - if self.e2ePoll1Offset != result.nextOffset { - self.e2ePoll1Offset = result.nextOffset - delayPoll = false - } - } - self.addE2EBlocks(blocks: result.blocks, subChainId: subChainId) - } - - if subChainId == 0 { - self.e2ePoll0Timer?.invalidate() - self.e2ePoll0Timer = Foundation.Timer.scheduledTimer(withTimeInterval: delayPoll ? 1.0 : 0.0, repeats: false, block: { [weak self] _ in - guard let self else { - return - } - self.e2ePoll(subChainId: 0) - }) - } else if subChainId == 1 { - self.e2ePoll1Timer?.invalidate() - self.e2ePoll1Timer = Foundation.Timer.scheduledTimer(withTimeInterval: delayPoll ? 1.0 : 0.0, repeats: false, block: { [weak self] _ in - guard let self else { - return - } - self.e2ePoll(subChainId: 1) - }) - } - }) - - if subChainId == 0 { - self.e2ePoll0Disposable = disposable - } else if subChainId == 1 { - self.e2ePoll1Disposable = disposable - } - } - private func activateIncomingAudioIfNeeded() { if let genericCallContext = self.genericCallContext, case let .call(groupCall) = genericCallContext { groupCall.activateIncomingAudio() diff --git a/submodules/TelegramCore/Sources/State/ConferenceCallE2EContext.swift b/submodules/TelegramCore/Sources/State/ConferenceCallE2EContext.swift new file mode 100644 index 0000000000..b513fb0dfc --- /dev/null +++ b/submodules/TelegramCore/Sources/State/ConferenceCallE2EContext.swift @@ -0,0 +1,249 @@ +import Foundation +import SwiftSignalKit + +public protocol ConferenceCallE2EContextState: AnyObject { + func getEmojiState() -> Data? + + func applyBlock(block: Data) + func applyBroadcastBlock(block: Data) + + func takeOutgoingBroadcastBlocks() -> [Data] + + func encrypt(message: Data) -> Data? + func decrypt(message: Data) -> Data? +} + +public final class ConferenceCallE2EContext { + public final class ContextStateHolder { + public var state: ConferenceCallE2EContextState? + public var pendingIncomingBroadcastBlocks: [Data] = [] + + public init() { + } + } + + private final class Impl { + private let queue: Queue + + private let engine: TelegramEngine + private let callId: Int64 + private let accessHash: Int64 + private let reference: InternalGroupCallReference + private let state: Atomic + private let initializeState: (TelegramKeyPair, Data) -> ConferenceCallE2EContextState? + private let keyPair: TelegramKeyPair + + let e2eEncryptionKeyHashValue = ValuePromise(nil) + + private var e2ePoll0Offset: Int? + private var e2ePoll0Timer: Foundation.Timer? + private var e2ePoll0Disposable: Disposable? + + private var e2ePoll1Offset: Int? + private var e2ePoll1Timer: Foundation.Timer? + private var e2ePoll1Disposable: Disposable? + + init(queue: Queue, engine: TelegramEngine, callId: Int64, accessHash: Int64, reference: InternalGroupCallReference, state: Atomic, initializeState: @escaping (TelegramKeyPair, Data) -> ConferenceCallE2EContextState?, keyPair: TelegramKeyPair) { + precondition(queue.isCurrent()) + precondition(Queue.mainQueue().isCurrent()) + + self.queue = queue + self.engine = engine + self.callId = callId + self.accessHash = accessHash + self.reference = reference + self.state = state + self.initializeState = initializeState + self.keyPair = keyPair + } + + deinit { + self.e2ePoll0Timer?.invalidate() + self.e2ePoll0Disposable?.dispose() + self.e2ePoll1Timer?.invalidate() + self.e2ePoll1Disposable?.dispose() + } + + func begin() { + self.e2ePoll(subChainId: 0) + self.e2ePoll(subChainId: 1) + } + + func addChainBlocksUpdate(subChainId: Int, blocks: [Data], nextOffset: Int) { + var processBlock = true + let updateBaseOffset = nextOffset - blocks.count + if subChainId == 0 { + if let e2ePoll0Offset = self.e2ePoll0Offset { + if e2ePoll0Offset == updateBaseOffset { + self.e2ePoll0Offset = nextOffset + } else if e2ePoll0Offset < updateBaseOffset { + self.e2ePoll(subChainId: subChainId) + } else { + processBlock = false + } + } else { + processBlock = false + } + } else if subChainId == 1 { + if let e2ePoll1Offset = self.e2ePoll1Offset { + if e2ePoll1Offset == updateBaseOffset { + self.e2ePoll1Offset = nextOffset + } else if e2ePoll1Offset < updateBaseOffset { + self.e2ePoll(subChainId: subChainId) + } else { + processBlock = false + } + } else { + processBlock = false + } + } else { + processBlock = false + } + if processBlock { + self.addE2EBlocks(blocks: blocks, subChainId: subChainId) + } + } + + private func addE2EBlocks(blocks: [Data], subChainId: Int) { + let keyPair = self.keyPair + let initializeState = self.initializeState + let (outBlocks, outEmoji) = self.state.with({ callState -> ([Data], Data) in + if let state = callState.state { + for block in blocks { + if subChainId == 0 { + state.applyBlock(block: block) + } else if subChainId == 1 { + state.applyBroadcastBlock(block: block) + } + } + return (state.takeOutgoingBroadcastBlocks(), state.getEmojiState() ?? Data()) + } else { + if subChainId == 0 { + guard let block = blocks.last else { + return ([], Data()) + } + guard let state = initializeState(keyPair, block) else { + return ([], Data()) + } + callState.state = state + for block in callState.pendingIncomingBroadcastBlocks { + state.applyBroadcastBlock(block: block) + } + callState.pendingIncomingBroadcastBlocks.removeAll() + return (state.takeOutgoingBroadcastBlocks(), state.getEmojiState() ?? Data()) + } else if subChainId == 1 { + callState.pendingIncomingBroadcastBlocks.append(contentsOf: blocks) + return ([], Data()) + } else { + return ([], Data()) + } + } + }) + self.e2eEncryptionKeyHashValue.set(outEmoji.isEmpty ? nil : outEmoji) + + for outBlock in outBlocks { + //TODO:release queue + let _ = self.engine.calls.sendConferenceCallBroadcast(callId: self.callId, accessHash: self.accessHash, block: outBlock).startStandalone() + } + } + + private func e2ePoll(subChainId: Int) { + let offset: Int? + if subChainId == 0 { + offset = self.e2ePoll0Offset + self.e2ePoll0Disposable?.dispose() + } else if subChainId == 1 { + offset = self.e2ePoll1Offset + self.e2ePoll1Disposable?.dispose() + } else { + return + } + + let disposable = (self.engine.calls.pollConferenceCallBlockchain(reference: self.reference, subChainId: subChainId, offset: offset ?? 0, limit: 10) + |> deliverOnMainQueue).startStrict(next: { [weak self] result in + guard let self else { + return + } + + var delayPoll = true + if let result { + if subChainId == 0 { + if self.e2ePoll0Offset != result.nextOffset { + self.e2ePoll0Offset = result.nextOffset + delayPoll = false + } + } else if subChainId == 1 { + if self.e2ePoll1Offset != result.nextOffset { + self.e2ePoll1Offset = result.nextOffset + delayPoll = false + } + } + self.addE2EBlocks(blocks: result.blocks, subChainId: subChainId) + } + + if subChainId == 0 { + self.e2ePoll0Timer?.invalidate() + self.e2ePoll0Timer = Foundation.Timer.scheduledTimer(withTimeInterval: delayPoll ? 1.0 : 0.0, repeats: false, block: { [weak self] _ in + guard let self else { + return + } + self.e2ePoll(subChainId: 0) + }) + } else if subChainId == 1 { + self.e2ePoll1Timer?.invalidate() + self.e2ePoll1Timer = Foundation.Timer.scheduledTimer(withTimeInterval: delayPoll ? 1.0 : 0.0, repeats: false, block: { [weak self] _ in + guard let self else { + return + } + self.e2ePoll(subChainId: 1) + }) + } + }) + + if subChainId == 0 { + self.e2ePoll0Disposable = disposable + } else if subChainId == 1 { + self.e2ePoll1Disposable = disposable + } + } + + func synchronizeRemovedParticipants() { + + } + } + + public let state: Atomic = Atomic(value: ContextStateHolder()) + private let impl: QueueLocalObject + + public var e2eEncryptionKeyHash: Signal { + return self.impl.signalWith { impl, subscriber in + return impl.e2eEncryptionKeyHashValue.get().start(next: subscriber.putNext) + } + } + + public init(engine: TelegramEngine, callId: Int64, accessHash: Int64, reference: InternalGroupCallReference, keyPair: TelegramKeyPair, initializeState: @escaping (TelegramKeyPair, Data) -> ConferenceCallE2EContextState?) { + let queue = Queue.mainQueue() + let state = self.state + self.impl = QueueLocalObject(queue: queue, generate: { + return Impl(queue: queue, engine: engine, callId: callId, accessHash: accessHash, reference: reference, state: state, initializeState: initializeState, keyPair: keyPair) + }) + } + + public func begin() { + self.impl.with { impl in + impl.begin() + } + } + + public func addChainBlocksUpdate(subChainId: Int, blocks: [Data], nextOffset: Int) { + self.impl.with { impl in + impl.addChainBlocksUpdate(subChainId: subChainId, blocks: blocks, nextOffset: nextOffset) + } + } + + public func synchronizeRemovedParticipants() { + self.impl.with { impl in + impl.synchronizeRemovedParticipants() + } + } +}