Use ssrc to request speaking peers

This commit is contained in:
Ali 2021-01-28 23:46:17 +05:00
parent 92c06c45b2
commit c6270f78f1
2 changed files with 49 additions and 45 deletions

View File

@ -57,7 +57,7 @@ public final class AccountGroupCallContextImpl: AccountGroupCallContext {
groupCall: nil groupCall: nil
))) )))
self.disposable = (getGroupCallParticipants(account: account, callId: call.id, accessHash: call.accessHash, offset: "", peerIds: [], limit: 100) self.disposable = (getGroupCallParticipants(account: account, callId: call.id, accessHash: call.accessHash, offset: "", ssrcs: [], limit: 100)
|> map(Optional.init) |> map(Optional.init)
|> `catch` { _ -> Signal<GroupCallParticipantsContext.State?, NoError> in |> `catch` { _ -> Signal<GroupCallParticipantsContext.State?, NoError> in
return .single(nil) return .single(nil)
@ -228,13 +228,14 @@ public final class PresentationGroupCallImpl: PresentationGroupCall {
private let silentTimeout: Int32 = 2 private let silentTimeout: Int32 = 2
struct Participant { struct Participant {
let ssrc: UInt32
let timestamp: Int32 let timestamp: Int32
let level: Float let level: Float
} }
private var participants: [PeerId: Participant] = [:] private var participants: [PeerId: Participant] = [:]
private let speakingParticipantsPromise = ValuePromise<Set<PeerId>>() private let speakingParticipantsPromise = ValuePromise<[PeerId: UInt32]>()
private var speakingParticipants = Set<PeerId>() { private var speakingParticipants = [PeerId: UInt32]() {
didSet { didSet {
self.speakingParticipantsPromise.set(self.speakingParticipants) self.speakingParticipantsPromise.set(self.speakingParticipants)
} }
@ -245,17 +246,17 @@ public final class PresentationGroupCallImpl: PresentationGroupCall {
init() { init() {
} }
func update(levels: [(PeerId, Float, Bool)]) { func update(levels: [(PeerId, UInt32, Float, Bool)]) {
let timestamp = Int32(CFAbsoluteTimeGetCurrent()) let timestamp = Int32(CFAbsoluteTimeGetCurrent())
let currentParticipants: [PeerId: Participant] = self.participants let currentParticipants: [PeerId: Participant] = self.participants
var validSpeakers: [PeerId: Participant] = [:] var validSpeakers: [PeerId: Participant] = [:]
var silentParticipants = Set<PeerId>() var silentParticipants = Set<PeerId>()
var speakingParticipants = Set<PeerId>() var speakingParticipants = [PeerId: UInt32]()
for (peerId, level, hasVoice) in levels { for (peerId, ssrc, level, hasVoice) in levels {
if level > speakingLevelThreshold && hasVoice { if level > speakingLevelThreshold && hasVoice {
validSpeakers[peerId] = Participant(timestamp: timestamp, level: level) validSpeakers[peerId] = Participant(ssrc: ssrc, timestamp: timestamp, level: level)
speakingParticipants.insert(peerId) speakingParticipants[peerId] = ssrc
} else { } else {
silentParticipants.insert(peerId) silentParticipants.insert(peerId)
} }
@ -268,17 +269,17 @@ public final class PresentationGroupCallImpl: PresentationGroupCall {
if silentParticipants.contains(peerId) { if silentParticipants.contains(peerId) {
if delta < silentTimeout { if delta < silentTimeout {
validSpeakers[peerId] = participant validSpeakers[peerId] = participant
speakingParticipants.insert(peerId) speakingParticipants[peerId] = participant.ssrc
} }
} else if delta < cutoffTimeout { } else if delta < cutoffTimeout {
validSpeakers[peerId] = participant validSpeakers[peerId] = participant
speakingParticipants.insert(peerId) speakingParticipants[peerId] = participant.ssrc
} }
} }
} }
var audioLevels: [(PeerId, Float, Bool)] = [] var audioLevels: [(PeerId, Float, Bool)] = []
for (peerId, level, hasVoice) in levels { for (peerId, _, level, hasVoice) in levels {
if level > 0.001 { if level > 0.001 {
audioLevels.append((peerId, level, hasVoice)) audioLevels.append((peerId, level, hasVoice))
} }
@ -289,8 +290,8 @@ public final class PresentationGroupCallImpl: PresentationGroupCall {
self.audioLevelsPromise.set(.single(audioLevels)) self.audioLevelsPromise.set(.single(audioLevels))
} }
func get() -> Signal<Set<PeerId>, NoError> { func get() -> Signal<[PeerId: UInt32], NoError> {
return self.speakingParticipantsPromise.get() |> distinctUntilChanged return self.speakingParticipantsPromise.get()
} }
func getAudioLevels() -> Signal<[(PeerId, Float, Bool)], NoError> { func getAudioLevels() -> Signal<[(PeerId, Float, Bool)], NoError> {
@ -887,16 +888,19 @@ public final class PresentationGroupCallImpl: PresentationGroupCall {
guard let strongSelf = self else { guard let strongSelf = self else {
return return
} }
var result: [(PeerId, Float, Bool)] = [] var result: [(PeerId, UInt32, Float, Bool)] = []
var myLevel: Float = 0.0 var myLevel: Float = 0.0
var myLevelHasVoice: Bool = false var myLevelHasVoice: Bool = false
for (ssrcKey, level, hasVoice) in levels { for (ssrcKey, level, hasVoice) in levels {
var peerId: PeerId? var peerId: PeerId?
let ssrcValue: UInt32
switch ssrcKey { switch ssrcKey {
case .local: case .local:
peerId = strongSelf.accountContext.account.peerId peerId = strongSelf.accountContext.account.peerId
ssrcValue = 0
case let .source(ssrc): case let .source(ssrc):
peerId = strongSelf.ssrcMapping[ssrc] peerId = strongSelf.ssrcMapping[ssrc]
ssrcValue = ssrc
} }
if let peerId = peerId { if let peerId = peerId {
if case .local = ssrcKey { if case .local = ssrcKey {
@ -905,7 +909,7 @@ public final class PresentationGroupCallImpl: PresentationGroupCall {
myLevelHasVoice = hasVoice myLevelHasVoice = hasVoice
} }
} }
result.append((peerId, level, hasVoice)) result.append((peerId, ssrcValue, level, hasVoice))
} }
} }
@ -985,9 +989,9 @@ public final class PresentationGroupCallImpl: PresentationGroupCall {
var topParticipants: [GroupCallParticipantsContext.Participant] = [] var topParticipants: [GroupCallParticipantsContext.Participant] = []
var reportSpeakingParticipants: [PeerId] = [] var reportSpeakingParticipants: [PeerId: UInt32] = [:]
let timestamp = CACurrentMediaTime() let timestamp = CACurrentMediaTime()
for peerId in speakingParticipants { for (peerId, ssrc) in speakingParticipants {
let shouldReport: Bool let shouldReport: Bool
if let previousTimestamp = strongSelf.speakingParticipantsReportTimestamp[peerId] { if let previousTimestamp = strongSelf.speakingParticipantsReportTimestamp[peerId] {
shouldReport = previousTimestamp + 1.0 < timestamp shouldReport = previousTimestamp + 1.0 < timestamp
@ -996,7 +1000,7 @@ public final class PresentationGroupCallImpl: PresentationGroupCall {
} }
if shouldReport { if shouldReport {
strongSelf.speakingParticipantsReportTimestamp[peerId] = timestamp strongSelf.speakingParticipantsReportTimestamp[peerId] = timestamp
reportSpeakingParticipants.append(peerId) reportSpeakingParticipants[peerId] = ssrc
} }
} }
@ -1008,7 +1012,7 @@ public final class PresentationGroupCallImpl: PresentationGroupCall {
var members = PresentationGroupCallMembers( var members = PresentationGroupCallMembers(
participants: [], participants: [],
speakingParticipants: speakingParticipants, speakingParticipants: Set(speakingParticipants.keys),
totalCount: 0, totalCount: 0,
loadMoreToken: nil loadMoreToken: nil
) )

View File

@ -187,8 +187,8 @@ public enum GetGroupCallParticipantsError {
case generic case generic
} }
public func getGroupCallParticipants(account: Account, callId: Int64, accessHash: Int64, offset: String, peerIds: [PeerId], limit: Int32) -> Signal<GroupCallParticipantsContext.State, GetGroupCallParticipantsError> { public func getGroupCallParticipants(account: Account, callId: Int64, accessHash: Int64, offset: String, ssrcs: [UInt32], limit: Int32) -> Signal<GroupCallParticipantsContext.State, GetGroupCallParticipantsError> {
return account.network.request(Api.functions.phone.getGroupParticipants(call: .inputGroupCall(id: callId, accessHash: accessHash), ids: peerIds.map { $0.id }, sources: [], offset: offset, limit: limit)) return account.network.request(Api.functions.phone.getGroupParticipants(call: .inputGroupCall(id: callId, accessHash: accessHash), ids: [], sources: ssrcs.map { Int32(bitPattern: $0) }, offset: offset, limit: limit))
|> mapError { _ -> GetGroupCallParticipantsError in |> mapError { _ -> GetGroupCallParticipantsError in
return .generic return .generic
} }
@ -368,7 +368,7 @@ public func joinGroupCall(account: Account, peerId: PeerId, callId: Int64, acces
|> mapError { _ -> JoinGroupCallError in |> mapError { _ -> JoinGroupCallError in
return .generic return .generic
}, },
getGroupCallParticipants(account: account, callId: callId, accessHash: accessHash, offset: "", peerIds: [], limit: 100) getGroupCallParticipants(account: account, callId: callId, accessHash: accessHash, offset: "", ssrcs: [], limit: 100)
|> mapError { _ -> JoinGroupCallError in |> mapError { _ -> JoinGroupCallError in
return .generic return .generic
}, },
@ -772,7 +772,7 @@ public final class GroupCallParticipantsContext {
private var isLoadingMore: Bool = false private var isLoadingMore: Bool = false
private var shouldResetStateFromServer: Bool = false private var shouldResetStateFromServer: Bool = false
private var missingPeerIds = Set<PeerId>() private var missingSsrcs = Set<UInt32>()
private let updateDefaultMuteDisposable = MetaDisposable() private let updateDefaultMuteDisposable = MetaDisposable()
@ -811,8 +811,6 @@ public final class GroupCallParticipantsContext {
}) })
strongSelf.activeSpeakersValue = peerIds strongSelf.activeSpeakersValue = peerIds
strongSelf.ensureHaveParticipants(peerIds: peerIds)
if !strongSelf.hasReceivedSpeakingParticipantsReport { if !strongSelf.hasReceivedSpeakingParticipantsReport {
var updatedParticipants = strongSelf.stateValue.state.participants var updatedParticipants = strongSelf.stateValue.state.participants
var indexMap: [PeerId: Int] = [:] var indexMap: [PeerId: Int] = [:]
@ -890,7 +888,7 @@ public final class GroupCallParticipantsContext {
} }
} }
public func reportSpeakingParticipants(ids: [PeerId]) { public func reportSpeakingParticipants(ids: [PeerId: UInt32]) {
if !ids.isEmpty { if !ids.isEmpty {
self.hasReceivedSpeakingParticipantsReport = true self.hasReceivedSpeakingParticipantsReport = true
} }
@ -906,7 +904,7 @@ public final class GroupCallParticipantsContext {
let timestamp = CFAbsoluteTimeGetCurrent() + NSTimeIntervalSince1970 let timestamp = CFAbsoluteTimeGetCurrent() + NSTimeIntervalSince1970
for activityPeerId in ids { for (activityPeerId, _) in ids {
if let index = indexMap[activityPeerId] { if let index = indexMap[activityPeerId] {
var updateTimestamp = false var updateTimestamp = false
if let activityTimestamp = updatedParticipants[index].activityTimestamp { if let activityTimestamp = updatedParticipants[index].activityTimestamp {
@ -947,30 +945,32 @@ public final class GroupCallParticipantsContext {
overlayState: strongSelf.stateValue.overlayState overlayState: strongSelf.stateValue.overlayState
) )
} }
self.ensureHaveParticipants(ssrcs: Set(ids.map { $0.1 }))
} }
private func ensureHaveParticipants(peerIds: Set<PeerId>) { private func ensureHaveParticipants(ssrcs: Set<UInt32>) {
var missingPeerIds = Set<PeerId>() var missingSsrcs = Set<UInt32>()
var existingParticipantIds = Set<PeerId>() var existingSsrcs = Set<UInt32>()
for participant in self.stateValue.state.participants { for participant in self.stateValue.state.participants {
existingParticipantIds.insert(participant.peer.id) existingSsrcs.insert(participant.ssrc)
} }
for peerId in peerIds { for ssrc in ssrcs {
if !existingParticipantIds.contains(peerId) { if !existingSsrcs.contains(ssrc) {
missingPeerIds.insert(peerId) missingSsrcs.insert(ssrc)
} }
} }
if !missingPeerIds.isEmpty { if !missingSsrcs.isEmpty {
self.missingPeerIds.formUnion(missingPeerIds) self.missingSsrcs.formUnion(missingSsrcs)
self.loadMissingPeers() self.loadMissingSsrcs()
} }
} }
private func loadMissingPeers() { private func loadMissingSsrcs() {
if self.missingPeerIds.isEmpty { if self.missingSsrcs.isEmpty {
return return
} }
if self.isLoadingMore { if self.isLoadingMore {
@ -978,16 +978,16 @@ public final class GroupCallParticipantsContext {
} }
self.isLoadingMore = true self.isLoadingMore = true
let peerIds = self.missingPeerIds let ssrcs = self.missingSsrcs
self.disposable.set((getGroupCallParticipants(account: self.account, callId: self.id, accessHash: self.accessHash, offset: "", peerIds: Array(peerIds), limit: 100) self.disposable.set((getGroupCallParticipants(account: self.account, callId: self.id, accessHash: self.accessHash, offset: "", ssrcs: Array(ssrcs), limit: 100)
|> deliverOnMainQueue).start(next: { [weak self] state in |> deliverOnMainQueue).start(next: { [weak self] state in
guard let strongSelf = self else { guard let strongSelf = self else {
return return
} }
strongSelf.isLoadingMore = false strongSelf.isLoadingMore = false
strongSelf.missingPeerIds.subtract(peerIds) strongSelf.missingSsrcs.subtract(ssrcs)
var updatedState = strongSelf.stateValue.state var updatedState = strongSelf.stateValue.state
@ -1013,7 +1013,7 @@ public final class GroupCallParticipantsContext {
if strongSelf.shouldResetStateFromServer { if strongSelf.shouldResetStateFromServer {
strongSelf.resetStateFromServer() strongSelf.resetStateFromServer()
} else { } else {
strongSelf.loadMissingPeers() strongSelf.loadMissingSsrcs()
} }
})) }))
} }
@ -1165,7 +1165,7 @@ public final class GroupCallParticipantsContext {
self.updateQueue.removeAll() self.updateQueue.removeAll()
self.disposable.set((getGroupCallParticipants(account: self.account, callId: self.id, accessHash: self.accessHash, offset: "", peerIds: [], limit: 100) self.disposable.set((getGroupCallParticipants(account: self.account, callId: self.id, accessHash: self.accessHash, offset: "", ssrcs: [], limit: 100)
|> deliverOnMainQueue).start(next: { [weak self] state in |> deliverOnMainQueue).start(next: { [weak self] state in
guard let strongSelf = self else { guard let strongSelf = self else {
return return
@ -1285,7 +1285,7 @@ public final class GroupCallParticipantsContext {
} }
self.isLoadingMore = true self.isLoadingMore = true
self.disposable.set((getGroupCallParticipants(account: self.account, callId: self.id, accessHash: self.accessHash, offset: token, peerIds: [], limit: 100) self.disposable.set((getGroupCallParticipants(account: self.account, callId: self.id, accessHash: self.accessHash, offset: token, ssrcs: [], limit: 100)
|> deliverOnMainQueue).start(next: { [weak self] state in |> deliverOnMainQueue).start(next: { [weak self] state in
guard let strongSelf = self else { guard let strongSelf = self else {
return return