Group call experiment

This commit is contained in:
Ali 2022-02-18 20:13:43 +04:00
parent 738dbd2b7c
commit fabc273e16
8 changed files with 218 additions and 30 deletions

View File

@ -137,6 +137,7 @@ fileprivate let parsers: [Int32 : (BufferReader) -> Any?] = {
dict[-1278304028] = { return Api.storage.FileType.parse_fileMp4($0) }
dict[276907596] = { return Api.storage.FileType.parse_fileWebp($0) }
dict[1338747336] = { return Api.messages.ArchivedStickers.parse_archivedStickers($0) }
dict[-2132064081] = { return Api.GroupCallStreamChannel.parse_groupCallStreamChannel($0) }
dict[406307684] = { return Api.InputEncryptedFile.parse_inputEncryptedFileEmpty($0) }
dict[1690108678] = { return Api.InputEncryptedFile.parse_inputEncryptedFileUploaded($0) }
dict[1511503333] = { return Api.InputEncryptedFile.parse_inputEncryptedFile($0) }
@ -146,6 +147,7 @@ fileprivate let parsers: [Int32 : (BufferReader) -> Any?] = {
dict[-341428482] = { return Api.GroupCallParticipant.parse_groupCallParticipant($0) }
dict[1443858741] = { return Api.messages.SentEncryptedMessage.parse_sentEncryptedMessage($0) }
dict[-1802240206] = { return Api.messages.SentEncryptedMessage.parse_sentEncryptedFile($0) }
dict[-790330702] = { return Api.phone.GroupCallStreamChannels.parse_groupCallStreamChannels($0) }
dict[289586518] = { return Api.SavedContact.parse_savedPhoneContact($0) }
dict[1571494644] = { return Api.ExportedMessageLink.parse_exportedMessageLink($0) }
dict[872119224] = { return Api.auth.Authorization.parse_authorization($0) }
@ -1098,6 +1100,8 @@ public struct Api {
_1.serialize(buffer, boxed)
case let _1 as Api.messages.ArchivedStickers:
_1.serialize(buffer, boxed)
case let _1 as Api.GroupCallStreamChannel:
_1.serialize(buffer, boxed)
case let _1 as Api.InputEncryptedFile:
_1.serialize(buffer, boxed)
case let _1 as Api.account.Takeout:
@ -1108,6 +1112,8 @@ public struct Api {
_1.serialize(buffer, boxed)
case let _1 as Api.messages.SentEncryptedMessage:
_1.serialize(buffer, boxed)
case let _1 as Api.phone.GroupCallStreamChannels:
_1.serialize(buffer, boxed)
case let _1 as Api.SavedContact:
_1.serialize(buffer, boxed)
case let _1 as Api.ExportedMessageLink:

View File

@ -3568,6 +3568,48 @@ public extension Api {
}
}
}
public enum GroupCallStreamChannel: TypeConstructorDescription {
case groupCallStreamChannel(channel: Int32, scale: Int32, lastTimestampMs: Int64)
public func serialize(_ buffer: Buffer, _ boxed: Swift.Bool) {
switch self {
case .groupCallStreamChannel(let channel, let scale, let lastTimestampMs):
if boxed {
buffer.appendInt32(-2132064081)
}
serializeInt32(channel, buffer: buffer, boxed: false)
serializeInt32(scale, buffer: buffer, boxed: false)
serializeInt64(lastTimestampMs, buffer: buffer, boxed: false)
break
}
}
public func descriptionFields() -> (String, [(String, Any)]) {
switch self {
case .groupCallStreamChannel(let channel, let scale, let lastTimestampMs):
return ("groupCallStreamChannel", [("channel", channel), ("scale", scale), ("lastTimestampMs", lastTimestampMs)])
}
}
public static func parse_groupCallStreamChannel(_ reader: BufferReader) -> GroupCallStreamChannel? {
var _1: Int32?
_1 = reader.readInt32()
var _2: Int32?
_2 = reader.readInt32()
var _3: Int64?
_3 = reader.readInt64()
let _c1 = _1 != nil
let _c2 = _2 != nil
let _c3 = _3 != nil
if _c1 && _c2 && _c3 {
return Api.GroupCallStreamChannel.groupCallStreamChannel(channel: _1!, scale: _2!, lastTimestampMs: _3!)
}
else {
return nil
}
}
}
public enum InputEncryptedFile: TypeConstructorDescription {
case inputEncryptedFileEmpty

View File

@ -1722,6 +1722,46 @@ public struct photos {
}
public extension Api {
public struct phone {
public enum GroupCallStreamChannels: TypeConstructorDescription {
case groupCallStreamChannels(channels: [Api.GroupCallStreamChannel])
public func serialize(_ buffer: Buffer, _ boxed: Swift.Bool) {
switch self {
case .groupCallStreamChannels(let channels):
if boxed {
buffer.appendInt32(-790330702)
}
buffer.appendInt32(481674261)
buffer.appendInt32(Int32(channels.count))
for item in channels {
item.serialize(buffer, true)
}
break
}
}
public func descriptionFields() -> (String, [(String, Any)]) {
switch self {
case .groupCallStreamChannels(let channels):
return ("groupCallStreamChannels", [("channels", channels)])
}
}
public static func parse_groupCallStreamChannels(_ reader: BufferReader) -> GroupCallStreamChannels? {
var _1: [Api.GroupCallStreamChannel]?
if let _ = reader.readInt32() {
_1 = Api.parseVector(reader, elementSignature: 0, elementType: Api.GroupCallStreamChannel.self)
}
let _c1 = _1 != nil
if _c1 {
return Api.phone.GroupCallStreamChannels.groupCallStreamChannels(channels: _1!)
}
else {
return nil
}
}
}
public enum JoinAsPeers: TypeConstructorDescription {
case joinAsPeers(peers: [Api.Peer], chats: [Api.Chat], users: [Api.User])
@ -4657,6 +4697,22 @@ public extension Api {
return result
})
}
public static func searchSentMedia(q: String, filter: Api.MessagesFilter, limit: Int32) -> (FunctionDescription, Buffer, DeserializeFunctionResponse<Api.messages.Messages>) {
let buffer = Buffer()
buffer.appendInt32(276705696)
serializeString(q, buffer: buffer, boxed: false)
filter.serialize(buffer, true)
serializeInt32(limit, buffer: buffer, boxed: false)
return (FunctionDescription(name: "messages.searchSentMedia", parameters: [("q", q), ("filter", filter), ("limit", limit)]), buffer, DeserializeFunctionResponse { (buffer: Buffer) -> Api.messages.Messages? in
let reader = BufferReader(buffer)
var result: Api.messages.Messages?
if let signature = reader.readInt32() {
result = Api.parse(reader, signature: signature) as? Api.messages.Messages
}
return result
})
}
}
public struct channels {
public static func readHistory(channel: Api.InputChannel, maxId: Int32) -> (FunctionDescription, Buffer, DeserializeFunctionResponse<Api.Bool>) {
@ -8603,6 +8659,20 @@ public extension Api {
return result
})
}
public static func getGroupCallStreamChannels(call: Api.InputGroupCall) -> (FunctionDescription, Buffer, DeserializeFunctionResponse<Api.phone.GroupCallStreamChannels>) {
let buffer = Buffer()
buffer.appendInt32(447879488)
call.serialize(buffer, true)
return (FunctionDescription(name: "phone.getGroupCallStreamChannels", parameters: [("call", call)]), buffer, DeserializeFunctionResponse { (buffer: Buffer) -> Api.phone.GroupCallStreamChannels? in
let reader = BufferReader(buffer)
var result: Api.phone.GroupCallStreamChannels?
if let signature = reader.readInt32() {
result = Api.parse(reader, signature: signature) as? Api.phone.GroupCallStreamChannels
}
return result
})
}
}
}
}

View File

@ -1410,7 +1410,7 @@ public final class PresentationGroupCallImpl: PresentationGroupCall {
disposable.set(strongSelf.requestMediaChannelDescriptions(ssrcs: ssrcs, completion: completion))
}
return disposable
}, audioStreamData: OngoingGroupCallContext.AudioStreamData(engine: self.accountContext.engine, callId: callInfo.id, accessHash: callInfo.accessHash), rejoinNeeded: { [weak self] in
}, rejoinNeeded: { [weak self] in
Queue.mainQueue().async {
guard let strongSelf = self else {
return
@ -1513,8 +1513,9 @@ public final class PresentationGroupCallImpl: PresentationGroupCall {
strongSelf.currentConnectionMode = .rtc
strongSelf.genericCallContext?.setConnectionMode(.rtc, keepBroadcastConnectedIfWasEnabled: false)
strongSelf.genericCallContext?.setJoinResponse(payload: clientParams)
case .broadcast:
case let .broadcast(isExternalStream):
strongSelf.currentConnectionMode = .broadcast
strongSelf.genericCallContext?.setAudioStreamData(audioStreamData: OngoingGroupCallContext.AudioStreamData(engine: strongSelf.accountContext.engine, callId: callInfo.id, accessHash: callInfo.accessHash, isExternalStream: isExternalStream))
strongSelf.genericCallContext?.setConnectionMode(.broadcast, keepBroadcastConnectedIfWasEnabled: false)
}
@ -2399,7 +2400,7 @@ public final class PresentationGroupCallImpl: PresentationGroupCall {
activeSpeakers: Set()
)))
self.startDisposable.set((self.accountContext.engine.calls.createGroupCall(peerId: self.peerId, title: nil, scheduleDate: timestamp)
self.startDisposable.set((self.accountContext.engine.calls.createGroupCall(peerId: self.peerId, title: nil, scheduleDate: timestamp, isExternalStream: false)
|> deliverOnMainQueue).start(next: { [weak self] callInfo in
guard let strongSelf = self else {
return
@ -2668,7 +2669,7 @@ public final class PresentationGroupCallImpl: PresentationGroupCall {
self.hasScreencast = true
let screencastCallContext = OngoingGroupCallContext(video: self.screencastCapturer, requestMediaChannelDescriptions: { _, _ in EmptyDisposable }, audioStreamData: nil, rejoinNeeded: { }, outgoingAudioBitrateKbit: nil, videoContentType: .screencast, enableNoiseSuppression: false, preferX264: false)
let screencastCallContext = OngoingGroupCallContext(video: self.screencastCapturer, requestMediaChannelDescriptions: { _, _ in EmptyDisposable }, rejoinNeeded: { }, outgoingAudioBitrateKbit: nil, videoContentType: .screencast, enableNoiseSuppression: false, preferX264: false)
self.screencastCallContext = screencastCallContext
self.screencastJoinDisposable.set((screencastCallContext.joinPayload

View File

@ -210,7 +210,7 @@ public class BoxedMessage: NSObject {
public class Serialization: NSObject, MTSerialization {
public func currentLayer() -> UInt {
return 138
return 139
}
public func parseMessage(_ data: Data!) -> Any! {

View File

@ -143,7 +143,7 @@ public enum CreateGroupCallError {
case scheduledTooLate
}
func _internal_createGroupCall(account: Account, peerId: PeerId, title: String?, scheduleDate: Int32?) -> Signal<GroupCallInfo, CreateGroupCallError> {
func _internal_createGroupCall(account: Account, peerId: PeerId, title: String?, scheduleDate: Int32?, isExternalStream: Bool) -> Signal<GroupCallInfo, CreateGroupCallError> {
return account.postbox.transaction { transaction -> Api.InputPeer? in
let callPeer = transaction.getPeer(peerId).flatMap(apiInputPeer)
return callPeer
@ -160,6 +160,9 @@ func _internal_createGroupCall(account: Account, peerId: PeerId, title: String?,
if let _ = scheduleDate {
flags |= (1 << 1)
}
if isExternalStream {
flags |= (1 << 2)
}
return account.network.request(Api.functions.phone.createGroupCall(flags: flags, peer: inputPeer, randomId: Int32.random(in: Int32.min ... Int32.max), title: title, scheduleDate: scheduleDate))
|> mapError { error -> CreateGroupCallError in
if error.errorDescription == "ANONYMOUS_CALLS_DISABLED" {
@ -433,7 +436,7 @@ public enum JoinGroupCallError {
public struct JoinGroupCallResult {
public enum ConnectionMode {
case rtc
case broadcast
case broadcast(isExternalStream: Bool)
}
public var callInfo: GroupCallInfo
@ -589,12 +592,16 @@ func _internal_joinGroupCall(account: Account, peerId: PeerId, joinAs: PeerId?,
let connectionMode: JoinGroupCallResult.ConnectionMode
if let clientParamsData = parsedClientParams.data(using: .utf8), let dict = (try? JSONSerialization.jsonObject(with: clientParamsData, options: [])) as? [String: Any] {
if let stream = dict["stream"] as? Bool, stream {
connectionMode = .broadcast
var isExternalStream = false
if let rtmp = dict["rtmp"] as? Bool, rtmp {
isExternalStream = true
}
connectionMode = .broadcast(isExternalStream: isExternalStream)
} else {
connectionMode = .rtc
}
} else {
connectionMode = .broadcast
connectionMode = .broadcast(isExternalStream: false)
}
return account.postbox.transaction { transaction -> JoinGroupCallResult in

View File

@ -1,5 +1,17 @@
import SwiftSignalKit
import Postbox
import TelegramApi
import MtProtoKit
public struct EngineCallStreamState {
public struct Channel {
public var id: Int32
public var scale: Int32
public var latestTimestamp: Int64
}
public var channels: [Channel]
}
public extension TelegramEngine {
final class Calls {
@ -21,8 +33,8 @@ public extension TelegramEngine {
return _internal_getCurrentGroupCall(account: self.account, callId: callId, accessHash: accessHash, peerId: peerId)
}
public func createGroupCall(peerId: PeerId, title: String?, scheduleDate: Int32?) -> Signal<GroupCallInfo, CreateGroupCallError> {
return _internal_createGroupCall(account: self.account, peerId: peerId, title: title, scheduleDate: scheduleDate)
public func createGroupCall(peerId: PeerId, title: String?, scheduleDate: Int32?, isExternalStream: Bool) -> Signal<GroupCallInfo, CreateGroupCallError> {
return _internal_createGroupCall(account: self.account, peerId: peerId, title: title, scheduleDate: scheduleDate, isExternalStream: isExternalStream)
}
public func startScheduledGroupCall(peerId: PeerId, callId: Int64, accessHash: Int64) -> Signal<GroupCallInfo, StartScheduledGroupCallError> {
@ -119,5 +131,28 @@ public extension TelegramEngine {
}
|> take(1)
}
public func requestStreamState(callId: Int64, accessHash: Int64) -> Signal<EngineCallStreamState?, NoError> {
return self.account.network.request(Api.functions.phone.getGroupCallStreamChannels(call: .inputGroupCall(id: callId, accessHash: accessHash)))
|> mapToSignal { result -> Signal<EngineCallStreamState?, MTRpcError> in
switch result {
case let .groupCallStreamChannels(channels):
let state = EngineCallStreamState(channels: channels.map { channel -> EngineCallStreamState.Channel in
switch channel {
case let .groupCallStreamChannel(channel, scale, lastTimestampMs):
return EngineCallStreamState.Channel(id: channel, scale: scale, latestTimestamp: lastTimestampMs)
}
})
/*if state.channels.isEmpty {
return .fail(MTRpcError(errorCode: 500, errorDescription: "Generated")) |> delay(10.0, queue: .mainQueue())
}*/
return .single(state)
}
}
//|> restartIfError
|> `catch` { _ -> Signal<EngineCallStreamState?, NoError> in
return .single(nil)
}
}
}
}

View File

@ -42,20 +42,30 @@ private final class NetworkBroadcastPartSource: BroadcastPartSource {
private let engine: TelegramEngine
private let callId: Int64
private let accessHash: Int64
private let isExternalStream: Bool
private var dataSource: AudioBroadcastDataSource?
init(queue: Queue, engine: TelegramEngine, callId: Int64, accessHash: Int64) {
init(queue: Queue, engine: TelegramEngine, callId: Int64, accessHash: Int64, isExternalStream: Bool) {
self.queue = queue
self.engine = engine
self.callId = callId
self.accessHash = accessHash
self.isExternalStream = isExternalStream
}
func requestTime(completion: @escaping (Int64) -> Void) -> Disposable {
return engine.calls.serverTime().start(next: { result in
if self.isExternalStream {
return self.engine.calls.requestStreamState(callId: self.callId, accessHash: self.accessHash).start(next: { result in
if let channel = result?.channels.first {
completion(channel.latestTimestamp)
}
})
} else {
return self.engine.calls.serverTime().start(next: { result in
completion(result)
})
}
}
func requestPart(timestampMilliseconds: Int64, durationMilliseconds: Int64, subject: BroadcastPartSubject, completion: @escaping (OngoingGroupCallBroadcastPart) -> Void, rejoinNeeded: @escaping () -> Void) -> Disposable {
let timestampIdMilliseconds: Int64
@ -146,11 +156,13 @@ public final class OngoingGroupCallContext {
public var engine: TelegramEngine
public var callId: Int64
public var accessHash: Int64
public var isExternalStream: Bool
public init(engine: TelegramEngine, callId: Int64, accessHash: Int64) {
public init(engine: TelegramEngine, callId: Int64, accessHash: Int64, isExternalStream: Bool) {
self.engine = engine
self.callId = callId
self.accessHash = accessHash
self.isExternalStream = isExternalStream
}
}
@ -361,21 +373,14 @@ public final class OngoingGroupCallContext {
private var currentRequestedVideoChannels: [VideoChannel] = []
private var broadcastPartsSource: BroadcastPartSource?
private let broadcastPartsSource = Atomic<BroadcastPartSource?>(value: nil)
init(queue: Queue, inputDeviceId: String, outputDeviceId: String, video: OngoingCallVideoCapturer?, requestMediaChannelDescriptions: @escaping (Set<UInt32>, @escaping ([MediaChannelDescription]) -> Void) -> Disposable, audioStreamData: AudioStreamData?, rejoinNeeded: @escaping () -> Void, outgoingAudioBitrateKbit: Int32?, videoContentType: VideoContentType, enableNoiseSuppression: Bool, preferX264: Bool) {
init(queue: Queue, inputDeviceId: String, outputDeviceId: String, video: OngoingCallVideoCapturer?, requestMediaChannelDescriptions: @escaping (Set<UInt32>, @escaping ([MediaChannelDescription]) -> Void) -> Disposable, rejoinNeeded: @escaping () -> Void, outgoingAudioBitrateKbit: Int32?, videoContentType: VideoContentType, enableNoiseSuppression: Bool, preferX264: Bool) {
self.queue = queue
var networkStateUpdatedImpl: ((GroupCallNetworkState) -> Void)?
var audioLevelsUpdatedImpl: (([NSNumber]) -> Void)?
if let audioStreamData = audioStreamData {
let broadcastPartsSource = NetworkBroadcastPartSource(queue: queue, engine: audioStreamData.engine, callId: audioStreamData.callId, accessHash: audioStreamData.accessHash)
self.broadcastPartsSource = broadcastPartsSource
}
let broadcastPartsSource = self.broadcastPartsSource
let _videoContentType: OngoingGroupCallVideoContentType
switch videoContentType {
case .generic:
@ -386,6 +391,8 @@ public final class OngoingGroupCallContext {
_videoContentType = .none
}
var getBroadcastPartsSource: (() -> BroadcastPartSource?)?
self.context = GroupCallThreadLocalContext(
queue: ContextQueueImpl(queue: queue),
networkStateUpdated: { state in
@ -433,7 +440,7 @@ public final class OngoingGroupCallContext {
let disposable = MetaDisposable()
queue.async {
disposable.set(broadcastPartsSource?.requestTime(completion: completion))
disposable.set(getBroadcastPartsSource?()?.requestTime(completion: completion))
}
return OngoingGroupCallBroadcastPartTaskImpl(disposable: disposable)
@ -442,7 +449,7 @@ public final class OngoingGroupCallContext {
let disposable = MetaDisposable()
queue.async {
disposable.set(broadcastPartsSource?.requestPart(timestampMilliseconds: timestampMilliseconds, durationMilliseconds: durationMilliseconds, subject: .audio, completion: completion, rejoinNeeded: {
disposable.set(getBroadcastPartsSource?()?.requestPart(timestampMilliseconds: timestampMilliseconds, durationMilliseconds: durationMilliseconds, subject: .audio, completion: completion, rejoinNeeded: {
rejoinNeeded()
}))
}
@ -464,7 +471,7 @@ public final class OngoingGroupCallContext {
@unknown default:
mappedQuality = .thumbnail
}
disposable.set(broadcastPartsSource?.requestPart(timestampMilliseconds: timestampMilliseconds, durationMilliseconds: durationMilliseconds, subject: .video(channelId: channelId, quality: mappedQuality), completion: completion, rejoinNeeded: {
disposable.set(getBroadcastPartsSource?()?.requestPart(timestampMilliseconds: timestampMilliseconds, durationMilliseconds: durationMilliseconds, subject: .video(channelId: channelId, quality: mappedQuality), completion: completion, rejoinNeeded: {
rejoinNeeded()
}))
}
@ -479,6 +486,11 @@ public final class OngoingGroupCallContext {
let queue = self.queue
let broadcastPartsSource = self.broadcastPartsSource
getBroadcastPartsSource = {
return broadcastPartsSource.with { $0 }
}
networkStateUpdatedImpl = { [weak self] state in
queue.async {
guard let strongSelf = self else {
@ -525,6 +537,13 @@ public final class OngoingGroupCallContext {
self.context.setJoinResponsePayload(payload)
}
func setAudioStreamData(audioStreamData: AudioStreamData?) {
if let audioStreamData = audioStreamData {
let broadcastPartsSource = NetworkBroadcastPartSource(queue: self.queue, engine: audioStreamData.engine, callId: audioStreamData.callId, accessHash: audioStreamData.accessHash, isExternalStream: audioStreamData.isExternalStream)
let _ = self.broadcastPartsSource.swap(broadcastPartsSource)
}
}
func addSsrcs(ssrcs: [UInt32]) {
}
@ -874,10 +893,10 @@ public final class OngoingGroupCallContext {
}
}
public init(inputDeviceId: String = "", outputDeviceId: String = "", video: OngoingCallVideoCapturer?, requestMediaChannelDescriptions: @escaping (Set<UInt32>, @escaping ([MediaChannelDescription]) -> Void) -> Disposable, audioStreamData: AudioStreamData?, rejoinNeeded: @escaping () -> Void, outgoingAudioBitrateKbit: Int32?, videoContentType: VideoContentType, enableNoiseSuppression: Bool, preferX264: Bool) {
public init(inputDeviceId: String = "", outputDeviceId: String = "", video: OngoingCallVideoCapturer?, requestMediaChannelDescriptions: @escaping (Set<UInt32>, @escaping ([MediaChannelDescription]) -> Void) -> Disposable, rejoinNeeded: @escaping () -> Void, outgoingAudioBitrateKbit: Int32?, videoContentType: VideoContentType, enableNoiseSuppression: Bool, preferX264: Bool) {
let queue = self.queue
self.impl = QueueLocalObject(queue: queue, generate: {
return Impl(queue: queue, inputDeviceId: inputDeviceId, outputDeviceId: outputDeviceId, video: video, requestMediaChannelDescriptions: requestMediaChannelDescriptions, audioStreamData: audioStreamData, rejoinNeeded: rejoinNeeded, outgoingAudioBitrateKbit: outgoingAudioBitrateKbit, videoContentType: videoContentType, enableNoiseSuppression: enableNoiseSuppression, preferX264: preferX264)
return Impl(queue: queue, inputDeviceId: inputDeviceId, outputDeviceId: outputDeviceId, video: video, requestMediaChannelDescriptions: requestMediaChannelDescriptions, rejoinNeeded: rejoinNeeded, outgoingAudioBitrateKbit: outgoingAudioBitrateKbit, videoContentType: videoContentType, enableNoiseSuppression: enableNoiseSuppression, preferX264: preferX264)
})
}
@ -916,17 +935,25 @@ public final class OngoingGroupCallContext {
impl.switchAudioInput(deviceId)
}
}
public func switchAudioOutput(_ deviceId: String) {
self.impl.with { impl in
impl.switchAudioOutput(deviceId)
}
}
public func setJoinResponse(payload: String) {
self.impl.with { impl in
impl.setJoinResponse(payload: payload)
}
}
public func setAudioStreamData(audioStreamData: AudioStreamData?) {
self.impl.with { impl in
impl.setAudioStreamData(audioStreamData: audioStreamData)
}
}
public func addSsrcs(ssrcs: [UInt32]) {
self.impl.with { impl in
impl.addSsrcs(ssrcs: ssrcs)