Attempt to fix call signaling data delivery

This commit is contained in:
Ali
2021-04-20 03:22:31 +04:00
parent de6320960a
commit f8bdc11893
3 changed files with 61 additions and 45 deletions

View File

@@ -246,7 +246,7 @@ private final class CallSessionContext {
var isVideoPossible: Bool
var state: CallSessionInternalState
let subscribers = Bag<(CallSession) -> Void>()
let signalingSubscribers = Bag<(Data) -> Void>()
var signalingReceiver: (([Data]) -> Void)?
let signalingDisposables = DisposableSet()
@@ -254,7 +254,7 @@ private final class CallSessionContext {
var isEmpty: Bool {
if case .terminated = self.state {
return self.subscribers.isEmpty && self.signalingSubscribers.isEmpty
return self.subscribers.isEmpty
} else {
return false
}
@@ -303,6 +303,8 @@ private final class CallSessionManagerContext {
private let ringingSubscribers = Bag<([CallSessionRingingState]) -> Void>()
private var contexts: [CallSessionInternalId: CallSessionContext] = [:]
private var contextIdByStableId: [CallSessionStableId: CallSessionInternalId] = [:]
private var enqueuedSignalingData: [Int64: [Data]] = [:]
private let disposables = DisposableSet()
@@ -395,29 +397,31 @@ private final class CallSessionManagerContext {
}
}
func callSignalingData(internalId: CallSessionInternalId) -> Signal<Data, NoError> {
func beginReceivingCallSignalingData(internalId: CallSessionInternalId, _ receiver: @escaping ([Data]) -> Void) -> Disposable {
let queue = self.queue
return Signal { [weak self] subscriber in
let disposable = MetaDisposable()
queue.async {
if let strongSelf = self, let context = strongSelf.contexts[internalId] {
let index = context.signalingSubscribers.add { next in
subscriber.putNext(next)
let disposable = MetaDisposable()
queue.async { [weak self] in
if let strongSelf = self, let context = strongSelf.contexts[internalId] {
context.signalingReceiver = receiver
for (listStableId, listInternalId) in strongSelf.contextIdByStableId {
if listInternalId == internalId {
strongSelf.deliverCallSignalingData(id: listStableId)
break
}
disposable.set(ActionDisposable {
queue.async {
if let strongSelf = self, let context = strongSelf.contexts[internalId] {
context.signalingSubscribers.remove(index)
if context.isEmpty {
strongSelf.contexts.removeValue(forKey: internalId)
}
}
}
})
}
disposable.set(ActionDisposable {
queue.async {
if let strongSelf = self, let context = strongSelf.contexts[internalId] {
context.signalingReceiver = nil
}
}
})
}
return disposable
}
return disposable
}
private func ringingStatesValue() -> [CallSessionRingingState] {
@@ -475,6 +479,7 @@ private final class CallSessionManagerContext {
}))
self.contextIdByStableId[stableId] = internalId
self.contextUpdated(internalId: internalId)
self.deliverCallSignalingData(id: stableId)
self.ringingStatesUpdated()
return internalId
} else {
@@ -596,7 +601,7 @@ private final class CallSessionManagerContext {
func accept(internalId: CallSessionInternalId) {
if let context = self.contexts[internalId] {
switch context.state {
case let .ringing(id, accessHash, gAHash, b, remoteVersions):
case let .ringing(id, accessHash, gAHash, b, _):
let acceptVersions = self.versions.map({ $0.version })
context.state = .accepting(id: id, accessHash: accessHash, gAHash: gAHash, b: b, disposable: (acceptCallSession(postbox: self.postbox, network: self.network, stableId: id, accessHash: accessHash, b: b, maxLayer: self.maxLayer, versions: acceptVersions) |> deliverOn(self.queue)).start(next: { [weak self] result in
if let strongSelf = self, let context = strongSelf.contexts[internalId] {
@@ -652,7 +657,7 @@ private final class CallSessionManagerContext {
switch call {
case .phoneCallEmpty:
break
case let .phoneCallAccepted(flags, id, _, _, _, _, gB, remoteProtocol):
case let .phoneCallAccepted(_, id, _, _, _, _, gB, remoteProtocol):
let remoteVersions: [String]
switch remoteProtocol {
case let .phoneCallProtocol(_, _, _, versions):
@@ -854,11 +859,22 @@ private final class CallSessionManagerContext {
}
func addCallSignalingData(id: Int64, data: Data) {
if self.enqueuedSignalingData[id] == nil {
self.enqueuedSignalingData[id] = []
}
self.enqueuedSignalingData[id]?.append(data)
self.deliverCallSignalingData(id: id)
}
private func deliverCallSignalingData(id: Int64) {
guard let internalId = self.contextIdByStableId[id], let context = self.contexts[internalId] else {
return
}
for f in context.signalingSubscribers.copyItems() {
f(data)
if let signalingReceiver = context.signalingReceiver {
if let data = self.enqueuedSignalingData.removeValue(forKey: id) {
signalingReceiver(data)
}
}
}
@@ -902,6 +918,7 @@ private final class CallSessionManagerContext {
context.state = .requested(id: id, accessHash: accessHash, a: a, gA: gA, config: config, remoteConfirmationTimestamp: remoteConfirmationTimestamp)
strongSelf.contextIdByStableId[id] = internalId
strongSelf.contextUpdated(internalId: internalId)
strongSelf.deliverCallSignalingData(id: id)
case let .failed(error):
context.state = .terminated(id: nil, accessHash: nil, reason: .error(error), reportRating: false, sendDebugLogs: false)
strongSelf.contextUpdated(internalId: internalId)
@@ -1044,16 +1061,14 @@ public final class CallSessionManager {
}
}
public func callSignalingData(internalId: CallSessionInternalId) -> Signal<Data, NoError> {
return Signal { [weak self] subscriber in
let disposable = MetaDisposable()
self?.withContext { context in
disposable.set(context.callSignalingData(internalId: internalId).start(next: { next in
subscriber.putNext(next)
}))
}
return disposable
public func beginReceivingCallSignalingData(internalId: CallSessionInternalId, _ receiver: @escaping ([Data]) -> Void) -> Disposable {
let disposable = MetaDisposable()
self.withContext { context in
disposable.set(context.beginReceivingCallSignalingData(internalId: internalId, receiver))
}
return disposable
}
}

View File

@@ -729,19 +729,20 @@ public final class OngoingCallContext {
}
})
}
strongSelf.signalingDataDisposable = callSessionManager.beginReceivingCallSignalingData(internalId: internalId, { [weak self] dataList in
queue.async {
self?.withContext { context in
if let context = context as? OngoingCallThreadLocalContextWebrtc {
for data in dataList {
context.addSignaling(data)
}
}
}
}
})
}
}))
self.signalingDataDisposable = (callSessionManager.callSignalingData(internalId: internalId)).start(next: { [weak self] data in
print("data received")
queue.async {
self?.withContext { context in
if let context = context as? OngoingCallThreadLocalContextWebrtc {
context.addSignaling(data)
}
}
}
})
}
deinit {