From db0fbce155023ed269c8083b6a85fa5c79fd7cfe Mon Sep 17 00:00:00 2001 From: Ali <> Date: Tue, 29 Jun 2021 19:43:43 +0400 Subject: [PATCH] Implement screen capture audio sharing --- .../BroadcastUploadExtension.swift | 235 ++++++++++++------ .../Sources/ManagedAudioSession.swift | 2 + .../Sources/PresentationGroupCall.swift | 9 + .../Sources/GroupCallContext.swift | 10 + .../Sources/IpcGroupCallContext.swift | 52 ++-- .../OngoingCallThreadLocalContext.h | 2 + .../Sources/OngoingCallThreadLocalContext.mm | 12 +- submodules/TgVoipWebrtc/tgcalls | 2 +- 8 files changed, 225 insertions(+), 99 deletions(-) diff --git a/Telegram/BroadcastUpload/BroadcastUploadExtension.swift b/Telegram/BroadcastUpload/BroadcastUploadExtension.swift index 465763a5ba..ce1b589fc6 100644 --- a/Telegram/BroadcastUpload/BroadcastUploadExtension.swift +++ b/Telegram/BroadcastUpload/BroadcastUploadExtension.swift @@ -4,6 +4,7 @@ import CoreVideo import TelegramVoip import SwiftSignalKit import BuildConfig +import AudioToolbox private func rootPathForBasePath(_ appGroupPath: String) -> String { return appGroupPath + "/telegram-data" @@ -11,22 +12,11 @@ private func rootPathForBasePath(_ appGroupPath: String) -> String { @available(iOS 10.0, *) @objc(BroadcastUploadSampleHandler) class BroadcastUploadSampleHandler: RPBroadcastSampleHandler { - /*private var ipcContext: IpcGroupCallBroadcastContext? - private var callContext: OngoingGroupCallContext? - private var videoCapturer: OngoingCallVideoCapturer? - private var requestDisposable: Disposable? - private var joinPayloadDisposable: Disposable? - private var joinResponsePayloadDisposable: Disposable?*/ - private var screencastBufferClientContext: IpcGroupCallBufferBroadcastContext? private var statusDisposable: Disposable? + private var audioConverter: CustomAudioConverter? deinit { - /*self.requestDisposable?.dispose() - self.joinPayloadDisposable?.dispose() - self.joinResponsePayloadDisposable?.dispose() - self.callContext?.stop()*/ - self.statusDisposable?.dispose() } @@ -39,11 +29,6 @@ private func rootPathForBasePath(_ appGroupPath: String) -> String { NSLocalizedDescriptionKey: "Finished" ]) finishBroadcastWithError(error) - - /*self.callContext?.stop() - self.callContext = nil - - self.ipcContext = nil*/ } private func finishWithNoBroadcast() { @@ -87,57 +72,8 @@ private func rootPathForBasePath(_ appGroupPath: String) -> String { strongSelf.finishWithNoBroadcast() } }) - - /*let ipcContext = IpcGroupCallBroadcastContext(basePath: rootPath + "/broadcast-coordination") - self.ipcContext = ipcContext - - self.requestDisposable = (ipcContext.request - |> timeout(3.0, queue: .mainQueue(), alternate: .single(.failed)) - |> take(1) - |> deliverOnMainQueue).start(next: { [weak self] request in - guard let strongSelf = self else { - return - } - switch request { - case .request: - strongSelf.beginWithRequest() - case .failed: - strongSelf.finishWithGenericError() - } - })*/ } - /*private func beginWithRequest() { - let videoCapturer = OngoingCallVideoCapturer(isCustom: true) - self.videoCapturer = videoCapturer - - let callContext = OngoingGroupCallContext(video: videoCapturer, requestMediaChannelDescriptions: { _, _ in return EmptyDisposable }, audioStreamData: nil, rejoinNeeded: { - }, outgoingAudioBitrateKbit: nil, videoContentType: .screencast, enableNoiseSuppression: false) - self.callContext = callContext - - self.joinPayloadDisposable = (callContext.joinPayload - |> take(1) - |> deliverOnMainQueue).start(next: { [weak self] joinPayload in - guard let strongSelf = self, let ipcContext = strongSelf.ipcContext else { - return - } - ipcContext.setJoinPayload(joinPayload.0) - - strongSelf.joinResponsePayloadDisposable = (ipcContext.joinResponsePayload - |> take(1) - |> deliverOnMainQueue).start(next: { joinResponsePayload in - guard let strongSelf = self, let callContext = strongSelf.callContext, let ipcContext = strongSelf.ipcContext else { - return - } - - callContext.setConnectionMode(.rtc, keepBroadcastConnectedIfWasEnabled: false) - callContext.setJoinResponse(payload: joinResponsePayload) - - ipcContext.beginActiveIndication() - }) - }) - }*/ - override public func broadcastPaused() { } @@ -152,7 +88,7 @@ private func rootPathForBasePath(_ appGroupPath: String) -> String { case RPSampleBufferType.video: processVideoSampleBuffer(sampleBuffer: sampleBuffer) case RPSampleBufferType.audioApp: - break + processAudioSampleBuffer(sampleBuffer: sampleBuffer) case RPSampleBufferType.audioMic: break @unknown default: @@ -173,25 +109,166 @@ private func rootPathForBasePath(_ appGroupPath: String) -> String { if let data = serializePixelBuffer(buffer: pixelBuffer) { self.screencastBufferClientContext?.setCurrentFrame(data: data, orientation: orientation) } + } - //self.videoCapturer?.injectSampleBuffer(sampleBuffer) - /*if CMSampleBufferGetNumSamples(sampleBuffer) != 1 { + private func processAudioSampleBuffer(sampleBuffer: CMSampleBuffer) { + guard let formatDescription = CMSampleBufferGetFormatDescription(sampleBuffer) else { return } - if !CMSampleBufferIsValid(sampleBuffer) { + guard let asbd = CMAudioFormatDescriptionGetStreamBasicDescription(formatDescription) else { return } - if !CMSampleBufferDataIsReady(sampleBuffer) { + /*guard let blockBuffer = CMSampleBufferGetDataBuffer(sampleBuffer) else { return + }*/ + + let format = CustomAudioConverter.Format( + numChannels: Int(asbd.pointee.mChannelsPerFrame), + sampleRate: Int(asbd.pointee.mSampleRate) + ) + if self.audioConverter?.format != format { + self.audioConverter = CustomAudioConverter(asbd: asbd) } - guard let pixelBuffer = CMSampleBufferGetImageBuffer(sampleBuffer) else { - return + if let audioConverter = self.audioConverter { + if let data = audioConverter.convert(sampleBuffer: sampleBuffer) { + self.screencastBufferClientContext?.writeAudioData(data: data) + } } - - let pixelFormat = CVPixelBufferGetPixelFormatType(pixelBuffer) - - CVPixelBufferLockBaseAddress(pixelBuffer, .readOnly) - - CVPixelBufferUnlockBaseAddress(pixelBuffer, .readOnly)*/ } } + +private final class CustomAudioConverter { + struct Format: Equatable { + let numChannels: Int + let sampleRate: Int + } + + let format: Format + + var currentInputDescription: UnsafePointer? + var currentBuffer: AudioBuffer? + var currentBufferOffset: UInt32 = 0 + + init(asbd: UnsafePointer) { + self.format = Format( + numChannels: Int(asbd.pointee.mChannelsPerFrame), + sampleRate: Int(asbd.pointee.mSampleRate) + ) + } + + func convert(sampleBuffer: CMSampleBuffer) -> Data? { + guard let formatDescription = CMSampleBufferGetFormatDescription(sampleBuffer) else { + return nil + } + guard let asbd = CMAudioFormatDescriptionGetStreamBasicDescription(formatDescription) else { + return nil + } + + var bufferList = AudioBufferList() + var blockBuffer: CMBlockBuffer? = nil + CMSampleBufferGetAudioBufferListWithRetainedBlockBuffer( + sampleBuffer, + bufferListSizeNeededOut: nil, + bufferListOut: &bufferList, + bufferListSize: MemoryLayout.size, + blockBufferAllocator: nil, + blockBufferMemoryAllocator: nil, + flags: kCMSampleBufferFlag_AudioBufferList_Assure16ByteAlignment, + blockBufferOut: &blockBuffer + ) + let size = bufferList.mBuffers.mDataByteSize + guard size != 0, let mData = bufferList.mBuffers.mData else { + return nil + } + + var outputDescription = AudioStreamBasicDescription( + mSampleRate: 48000.0, + mFormatID: kAudioFormatLinearPCM, + mFormatFlags: kAudioFormatFlagIsSignedInteger | kAudioFormatFlagsNativeEndian | kAudioFormatFlagIsPacked, + mBytesPerPacket: 2, + mFramesPerPacket: 1, + mBytesPerFrame: 2, + mChannelsPerFrame: 1, + mBitsPerChannel: 16, + mReserved: 0 + ) + var maybeAudioConverter: AudioConverterRef? + let _ = AudioConverterNew(asbd, &outputDescription, &maybeAudioConverter) + guard let audioConverter = maybeAudioConverter else { + return nil + } + + self.currentBuffer = AudioBuffer( + mNumberChannels: asbd.pointee.mChannelsPerFrame, + mDataByteSize: UInt32(size), + mData: mData + ) + self.currentBufferOffset = 0 + self.currentInputDescription = asbd + + var numPackets: UInt32? + let outputSize = 32768 * 2 + var outputBuffer = Data(count: outputSize) + outputBuffer.withUnsafeMutableBytes { (outputBytes: UnsafeMutableRawBufferPointer) -> Void in + var outputBufferList = AudioBufferList() + outputBufferList.mNumberBuffers = 1 + outputBufferList.mBuffers.mNumberChannels = outputDescription.mChannelsPerFrame + outputBufferList.mBuffers.mDataByteSize = UInt32(outputSize) + outputBufferList.mBuffers.mData = outputBytes.baseAddress! + + var outputDataPacketSize = UInt32(outputSize) / outputDescription.mBytesPerPacket + + let result = AudioConverterFillComplexBuffer( + audioConverter, + converterComplexInputDataProc, + Unmanaged.passUnretained(self).toOpaque(), + &outputDataPacketSize, + &outputBufferList, + nil + ) + if result == noErr { + numPackets = outputDataPacketSize + } + } + + AudioConverterDispose(audioConverter) + + if let numPackets = numPackets { + outputBuffer.count = Int(numPackets * outputDescription.mBytesPerPacket) + return outputBuffer + } else { + return nil + } + } +} + +private func converterComplexInputDataProc(inAudioConverter: AudioConverterRef, ioNumberDataPackets: UnsafeMutablePointer, ioData: UnsafeMutablePointer, ioDataPacketDescription: UnsafeMutablePointer?>?, inUserData: UnsafeMutableRawPointer?) -> Int32 { + guard let inUserData = inUserData else { + ioNumberDataPackets.pointee = 0 + return 0 + } + let instance = Unmanaged.fromOpaque(inUserData).takeUnretainedValue() + guard let currentBuffer = instance.currentBuffer else { + ioNumberDataPackets.pointee = 0 + return 0 + } + guard let currentInputDescription = instance.currentInputDescription else { + ioNumberDataPackets.pointee = 0 + return 0 + } + + let numPacketsInBuffer = currentBuffer.mDataByteSize / currentInputDescription.pointee.mBytesPerPacket + let numPacketsAvailable = numPacketsInBuffer - instance.currentBufferOffset / currentInputDescription.pointee.mBytesPerPacket + + let numPacketsToRead = min(ioNumberDataPackets.pointee, numPacketsAvailable) + ioNumberDataPackets.pointee = numPacketsToRead + + ioData.pointee.mNumberBuffers = 1 + ioData.pointee.mBuffers.mData = currentBuffer.mData?.advanced(by: Int(instance.currentBufferOffset)) + ioData.pointee.mBuffers.mDataByteSize = currentBuffer.mDataByteSize - instance.currentBufferOffset + ioData.pointee.mBuffers.mNumberChannels = currentBuffer.mNumberChannels + + instance.currentBufferOffset += numPacketsToRead * currentInputDescription.pointee.mBytesPerPacket + + return 0 +} diff --git a/submodules/TelegramAudio/Sources/ManagedAudioSession.swift b/submodules/TelegramAudio/Sources/ManagedAudioSession.swift index 16dadef996..ffe4017e73 100644 --- a/submodules/TelegramAudio/Sources/ManagedAudioSession.swift +++ b/submodules/TelegramAudio/Sources/ManagedAudioSession.swift @@ -734,8 +734,10 @@ public final class ManagedAudioSession { switch type { case .voiceCall: mode = .voiceChat + options.insert(.mixWithOthers) case .videoCall: mode = .videoChat + options.insert(.mixWithOthers) default: mode = .default } diff --git a/submodules/TelegramCallsUI/Sources/PresentationGroupCall.swift b/submodules/TelegramCallsUI/Sources/PresentationGroupCall.swift index fb4696beed..e632f43a01 100644 --- a/submodules/TelegramCallsUI/Sources/PresentationGroupCall.swift +++ b/submodules/TelegramCallsUI/Sources/PresentationGroupCall.swift @@ -613,6 +613,7 @@ public final class PresentationGroupCallImpl: PresentationGroupCall { } private var screencastFramesDisposable: Disposable? + private var screencastAudioDataDisposable: Disposable? private var screencastStateDisposable: Disposable? init( @@ -881,6 +882,13 @@ public final class PresentationGroupCallImpl: PresentationGroupCall { } screencastCapturer.injectPixelBuffer(screencastFrame.0, rotation: screencastFrame.1) }) + self.screencastAudioDataDisposable = (screencastBufferServerContext.audioData + |> deliverOnMainQueue).start(next: { [weak self] data in + guard let strongSelf = self else { + return + } + strongSelf.genericCallContext?.addExternalAudioData(data: data) + }) self.screencastStateDisposable = (screencastBufferServerContext.isActive |> distinctUntilChanged |> deliverOnMainQueue).start(next: { [weak self] isActive in @@ -941,6 +949,7 @@ public final class PresentationGroupCallImpl: PresentationGroupCall { self.peerUpdatesSubscription?.dispose() self.screencastFramesDisposable?.dispose() + self.screencastAudioDataDisposable?.dispose() self.screencastStateDisposable?.dispose() } diff --git a/submodules/TelegramVoip/Sources/GroupCallContext.swift b/submodules/TelegramVoip/Sources/GroupCallContext.swift index 98f829d5e3..3ef9ad3ccd 100644 --- a/submodules/TelegramVoip/Sources/GroupCallContext.swift +++ b/submodules/TelegramVoip/Sources/GroupCallContext.swift @@ -645,6 +645,10 @@ public final class OngoingGroupCallContext { } }) } + + func addExternalAudioData(data: Data) { + self.context.addExternalAudioData(data) + } } private let queue = Queue() @@ -804,4 +808,10 @@ public final class OngoingGroupCallContext { impl.makeIncomingVideoView(endpointId: endpointId, requestClone: requestClone, completion: completion) } } + + public func addExternalAudioData(data: Data) { + self.impl.with { impl in + impl.addExternalAudioData(data: data) + } + } } diff --git a/submodules/TelegramVoip/Sources/IpcGroupCallContext.swift b/submodules/TelegramVoip/Sources/IpcGroupCallContext.swift index af9f2ce13c..0f6b34294b 100644 --- a/submodules/TelegramVoip/Sources/IpcGroupCallContext.swift +++ b/submodules/TelegramVoip/Sources/IpcGroupCallContext.swift @@ -396,7 +396,7 @@ private final class FdWriteConnection { private let buffer: UnsafeMutableRawPointer private var currentData: PendingData? - private var nextData: Data? + private var nextDataList: [Data] = [] init(queue: Queue, fd: Int32) { assert(queue.isCurrent()) @@ -436,8 +436,8 @@ private final class FdWriteConnection { if currentData.offset == currentData.data.count { strongSelf.currentData = nil - if let nextData = strongSelf.nextData { - strongSelf.nextData = nil + if !strongSelf.nextDataList.isEmpty { + let nextData = strongSelf.nextDataList.removeFirst() strongSelf.currentData = PendingData(data: nextData) } else { strongSelf.channel.suspend() @@ -471,11 +471,17 @@ private final class FdWriteConnection { free(self.buffer) } - func replaceData(data: Data) { + func addData(data: Data) { if self.currentData == nil { self.currentData = PendingData(data: data) } else { - self.nextData = data + var totalBytes = 0 + for data in self.nextDataList { + totalBytes += data.count + } + if totalBytes < 1 * 1024 * 1024 { + self.nextDataList.append(data) + } } if !self.isResumed { @@ -528,11 +534,11 @@ private final class NamedPipeWriterImpl { } } - func replaceData(data: Data) { + func addData(data: Data) { guard let connection = self.connection else { return } - connection.replaceData(data: data) + connection.addData(data: data) } } @@ -547,9 +553,9 @@ private final class NamedPipeWriter { }) } - func replaceData(data: Data) { + func addData(data: Data) { self.impl.with { impl in - impl.replaceData(data: data) + impl.addData(data: data) } } } @@ -617,7 +623,7 @@ private final class MappedFile { public final class IpcGroupCallBufferAppContext { private let basePath: String - private let server: NamedPipeReader + private var audioServer: NamedPipeReader? private let id: UInt32 @@ -632,6 +638,11 @@ public final class IpcGroupCallBufferAppContext { return self.framesPipe.signal() } + private let audioDataPipe = ValuePipe() + public var audioData: Signal { + return self.audioDataPipe.signal() + } + private var framePollTimer: SwiftSignalKit.Timer? private var mappedFile: MappedFile? @@ -643,12 +654,8 @@ public final class IpcGroupCallBufferAppContext { self.id = UInt32.random(in: 0 ..< UInt32.max) - let framesPipe = self.framesPipe - self.server = NamedPipeReader(path: broadcastAppSocketPath(basePath: basePath), didRead: { data in - //framesPipe.putNext(data) - }) - let dataPath = broadcastAppSocketPath(basePath: basePath) + "-data-\(self.id)" + let audioDataPath = broadcastAppSocketPath(basePath: basePath) + "-audio-\(self.id)" if let mappedFile = MappedFile(path: dataPath, createIfNotExists: true) { self.mappedFile = mappedFile @@ -657,6 +664,11 @@ public final class IpcGroupCallBufferAppContext { } } + let audioDataPipe = self.audioDataPipe + self.audioServer = NamedPipeReader(path: audioDataPath, didRead: { data in + audioDataPipe.putNext(data) + }) + let framePollTimer = SwiftSignalKit.Timer(timeout: 1.0 / 30.0, repeat: true, completion: { [weak self] in guard let strongSelf = self, let mappedFile = strongSelf.mappedFile else { return @@ -750,6 +762,7 @@ public final class IpcGroupCallBufferBroadcastContext { private var mappedFile: MappedFile? private var currentId: UInt32? + private var audioClient: NamedPipeWriter? private var callActiveInfoTimer: SwiftSignalKit.Timer? @@ -800,6 +813,7 @@ public final class IpcGroupCallBufferBroadcastContext { self.currentId = payloadDescription.id let dataPath = broadcastAppSocketPath(basePath: basePath) + "-data-\(payloadDescription.id)" + let audioDataPath = broadcastAppSocketPath(basePath: basePath) + "-audio-\(payloadDescription.id)" if let mappedFile = MappedFile(path: dataPath, createIfNotExists: false) { self.mappedFile = mappedFile @@ -808,6 +822,8 @@ public final class IpcGroupCallBufferBroadcastContext { } } + self.audioClient = NamedPipeWriter(path: audioDataPath) + self.writeKeepaliveInfo() let keepaliveInfoTimer = SwiftSignalKit.Timer(timeout: 1.0, repeat: true, completion: { [weak self] in @@ -819,8 +835,6 @@ public final class IpcGroupCallBufferBroadcastContext { } public func setCurrentFrame(data: Data, orientation: CGImagePropertyOrientation) { - //let _ = try? data.write(to: URL(fileURLWithPath: dataPath), options: []) - if let mappedFile = self.mappedFile, mappedFile.size >= data.count { let _ = data.withUnsafeBytes { bytes in var orientationValue = Int32(bitPattern: orientation.rawValue) @@ -828,8 +842,10 @@ public final class IpcGroupCallBufferBroadcastContext { memcpy(mappedFile.memory.advanced(by: 4), bytes.baseAddress!, data.count) } } + } - //self.client.replaceData(data: data) + public func writeAudioData(data: Data) { + self.audioClient?.addData(data: data) } private func writeKeepaliveInfo() { diff --git a/submodules/TgVoipWebrtc/PublicHeaders/TgVoipWebrtc/OngoingCallThreadLocalContext.h b/submodules/TgVoipWebrtc/PublicHeaders/TgVoipWebrtc/OngoingCallThreadLocalContext.h index 13dece37ac..5358bf382e 100644 --- a/submodules/TgVoipWebrtc/PublicHeaders/TgVoipWebrtc/OngoingCallThreadLocalContext.h +++ b/submodules/TgVoipWebrtc/PublicHeaders/TgVoipWebrtc/OngoingCallThreadLocalContext.h @@ -290,6 +290,8 @@ typedef NS_ENUM(int32_t, OngoingGroupCallRequestedVideoQuality) { - (void)switchAudioInput:(NSString * _Nonnull)deviceId; - (void)makeIncomingVideoViewWithEndpointId:(NSString * _Nonnull)endpointId requestClone:(bool)requestClone completion:(void (^_Nonnull)(UIView * _Nullable, UIView * _Nullable))completion; +- (void)addExternalAudioData:(NSData * _Nonnull)data; + @end #endif diff --git a/submodules/TgVoipWebrtc/Sources/OngoingCallThreadLocalContext.mm b/submodules/TgVoipWebrtc/Sources/OngoingCallThreadLocalContext.mm index 87bf10adfb..cee381ef1e 100644 --- a/submodules/TgVoipWebrtc/Sources/OngoingCallThreadLocalContext.mm +++ b/submodules/TgVoipWebrtc/Sources/OngoingCallThreadLocalContext.mm @@ -1121,9 +1121,9 @@ private: } std::vector videoCodecPreferences; - //videoCodecPreferences.push_back(tgcalls::VideoCodecName::H264); int minOutgoingVideoBitrateKbit = 500; + bool disableOutgoingAudioProcessing = true; tgcalls::GroupConfig config; config.need_log = false; @@ -1195,6 +1195,7 @@ private: return std::make_shared(task); }, .outgoingAudioBitrateKbit = outgoingAudioBitrateKbit, + .disableOutgoingAudioProcessing = disableOutgoingAudioProcessing, .videoContentType = _videoContentType, .videoCodecPreferences = videoCodecPreferences, .initialEnableNoiseSuppression = enableNoiseSuppression, @@ -1476,6 +1477,15 @@ private: } } +- (void)addExternalAudioData:(NSData * _Nonnull)data { + if (_instance) { + std::vector samples; + samples.resize(data.length); + [data getBytes:samples.data() length:data.length]; + _instance->addExternalAudioSamples(std::move(samples)); + } +} + @end @implementation OngoingGroupCallMediaChannelDescription diff --git a/submodules/TgVoipWebrtc/tgcalls b/submodules/TgVoipWebrtc/tgcalls index 11a8896882..bd2f789b28 160000 --- a/submodules/TgVoipWebrtc/tgcalls +++ b/submodules/TgVoipWebrtc/tgcalls @@ -1 +1 @@ -Subproject commit 11a889688203f9ec2c95ce4c735e908f3a0833e5 +Subproject commit bd2f789b2888854b0a1ec6f064aabfc1072fbb6c