diff --git a/submodules/TelegramCore/Sources/CallSessionManager.swift b/submodules/TelegramCore/Sources/CallSessionManager.swift index 7ac37066ab..afd27413f2 100644 --- a/submodules/TelegramCore/Sources/CallSessionManager.swift +++ b/submodules/TelegramCore/Sources/CallSessionManager.swift @@ -246,7 +246,7 @@ private final class CallSessionContext { var isVideoPossible: Bool var state: CallSessionInternalState let subscribers = Bag<(CallSession) -> Void>() - let signalingSubscribers = Bag<(Data) -> Void>() + var signalingReceiver: (([Data]) -> Void)? let signalingDisposables = DisposableSet() @@ -254,7 +254,7 @@ private final class CallSessionContext { var isEmpty: Bool { if case .terminated = self.state { - return self.subscribers.isEmpty && self.signalingSubscribers.isEmpty + return self.subscribers.isEmpty } else { return false } @@ -303,6 +303,8 @@ private final class CallSessionManagerContext { private let ringingSubscribers = Bag<([CallSessionRingingState]) -> Void>() private var contexts: [CallSessionInternalId: CallSessionContext] = [:] private var contextIdByStableId: [CallSessionStableId: CallSessionInternalId] = [:] + + private var enqueuedSignalingData: [Int64: [Data]] = [:] private let disposables = DisposableSet() @@ -395,29 +397,31 @@ private final class CallSessionManagerContext { } } - func callSignalingData(internalId: CallSessionInternalId) -> Signal { + func beginReceivingCallSignalingData(internalId: CallSessionInternalId, _ receiver: @escaping ([Data]) -> Void) -> Disposable { let queue = self.queue - return Signal { [weak self] subscriber in - let disposable = MetaDisposable() - queue.async { - if let strongSelf = self, let context = strongSelf.contexts[internalId] { - let index = context.signalingSubscribers.add { next in - subscriber.putNext(next) + + let disposable = MetaDisposable() + queue.async { [weak self] in + if let strongSelf = self, let context = strongSelf.contexts[internalId] { + context.signalingReceiver = receiver + + for (listStableId, listInternalId) in strongSelf.contextIdByStableId { + if listInternalId == internalId { + strongSelf.deliverCallSignalingData(id: listStableId) + break } - disposable.set(ActionDisposable { - queue.async { - if let strongSelf = self, let context = strongSelf.contexts[internalId] { - context.signalingSubscribers.remove(index) - if context.isEmpty { - strongSelf.contexts.removeValue(forKey: internalId) - } - } - } - }) } + + disposable.set(ActionDisposable { + queue.async { + if let strongSelf = self, let context = strongSelf.contexts[internalId] { + context.signalingReceiver = nil + } + } + }) } - return disposable } + return disposable } private func ringingStatesValue() -> [CallSessionRingingState] { @@ -475,6 +479,7 @@ private final class CallSessionManagerContext { })) self.contextIdByStableId[stableId] = internalId self.contextUpdated(internalId: internalId) + self.deliverCallSignalingData(id: stableId) self.ringingStatesUpdated() return internalId } else { @@ -596,7 +601,7 @@ private final class CallSessionManagerContext { func accept(internalId: CallSessionInternalId) { if let context = self.contexts[internalId] { switch context.state { - case let .ringing(id, accessHash, gAHash, b, remoteVersions): + case let .ringing(id, accessHash, gAHash, b, _): let acceptVersions = self.versions.map({ $0.version }) context.state = .accepting(id: id, accessHash: accessHash, gAHash: gAHash, b: b, disposable: (acceptCallSession(postbox: self.postbox, network: self.network, stableId: id, accessHash: accessHash, b: b, maxLayer: self.maxLayer, versions: acceptVersions) |> deliverOn(self.queue)).start(next: { [weak self] result in if let strongSelf = self, let context = strongSelf.contexts[internalId] { @@ -652,7 +657,7 @@ private final class CallSessionManagerContext { switch call { case .phoneCallEmpty: break - case let .phoneCallAccepted(flags, id, _, _, _, _, gB, remoteProtocol): + case let .phoneCallAccepted(_, id, _, _, _, _, gB, remoteProtocol): let remoteVersions: [String] switch remoteProtocol { case let .phoneCallProtocol(_, _, _, versions): @@ -854,11 +859,22 @@ private final class CallSessionManagerContext { } func addCallSignalingData(id: Int64, data: Data) { + if self.enqueuedSignalingData[id] == nil { + self.enqueuedSignalingData[id] = [] + } + self.enqueuedSignalingData[id]?.append(data) + + self.deliverCallSignalingData(id: id) + } + + private func deliverCallSignalingData(id: Int64) { guard let internalId = self.contextIdByStableId[id], let context = self.contexts[internalId] else { return } - for f in context.signalingSubscribers.copyItems() { - f(data) + if let signalingReceiver = context.signalingReceiver { + if let data = self.enqueuedSignalingData.removeValue(forKey: id) { + signalingReceiver(data) + } } } @@ -902,6 +918,7 @@ private final class CallSessionManagerContext { context.state = .requested(id: id, accessHash: accessHash, a: a, gA: gA, config: config, remoteConfirmationTimestamp: remoteConfirmationTimestamp) strongSelf.contextIdByStableId[id] = internalId strongSelf.contextUpdated(internalId: internalId) + strongSelf.deliverCallSignalingData(id: id) case let .failed(error): context.state = .terminated(id: nil, accessHash: nil, reason: .error(error), reportRating: false, sendDebugLogs: false) strongSelf.contextUpdated(internalId: internalId) @@ -1044,16 +1061,14 @@ public final class CallSessionManager { } } - public func callSignalingData(internalId: CallSessionInternalId) -> Signal { - return Signal { [weak self] subscriber in - let disposable = MetaDisposable() - self?.withContext { context in - disposable.set(context.callSignalingData(internalId: internalId).start(next: { next in - subscriber.putNext(next) - })) - } - return disposable + public func beginReceivingCallSignalingData(internalId: CallSessionInternalId, _ receiver: @escaping ([Data]) -> Void) -> Disposable { + let disposable = MetaDisposable() + + self.withContext { context in + disposable.set(context.beginReceivingCallSignalingData(internalId: internalId, receiver)) } + + return disposable } } diff --git a/submodules/TelegramVoip/Sources/OngoingCallContext.swift b/submodules/TelegramVoip/Sources/OngoingCallContext.swift index 332f932042..98e71dc325 100644 --- a/submodules/TelegramVoip/Sources/OngoingCallContext.swift +++ b/submodules/TelegramVoip/Sources/OngoingCallContext.swift @@ -729,19 +729,20 @@ public final class OngoingCallContext { } }) } + + strongSelf.signalingDataDisposable = callSessionManager.beginReceivingCallSignalingData(internalId: internalId, { [weak self] dataList in + queue.async { + self?.withContext { context in + if let context = context as? OngoingCallThreadLocalContextWebrtc { + for data in dataList { + context.addSignaling(data) + } + } + } + } + }) } })) - - self.signalingDataDisposable = (callSessionManager.callSignalingData(internalId: internalId)).start(next: { [weak self] data in - print("data received") - queue.async { - self?.withContext { context in - if let context = context as? OngoingCallThreadLocalContextWebrtc { - context.addSignaling(data) - } - } - } - }) } deinit { diff --git a/submodules/TgVoipWebrtc/tgcalls b/submodules/TgVoipWebrtc/tgcalls index 9083f718ce..fec19df1b5 160000 --- a/submodules/TgVoipWebrtc/tgcalls +++ b/submodules/TgVoipWebrtc/tgcalls @@ -1 +1 @@ -Subproject commit 9083f718ce79005c8d645b61038e7826b17bdca1 +Subproject commit fec19df1b5342aba8d83d01b05fccce294d940f4