diff --git a/submodules/TelegramVoip/Sources/OngoingCallContext.swift b/submodules/TelegramVoip/Sources/OngoingCallContext.swift index ddd3f5e954..92a1e664d1 100644 --- a/submodules/TelegramVoip/Sources/OngoingCallContext.swift +++ b/submodules/TelegramVoip/Sources/OngoingCallContext.swift @@ -933,22 +933,58 @@ public final class OngoingCallContext { } } - let context = OngoingCallThreadLocalContextWebrtc(version: version, queue: OngoingCallThreadLocalContextQueueImpl(queue: queue), proxy: voipProxyServer, networkType: ongoingNetworkTypeForTypeWebrtc(initialNetworkType), dataSaving: ongoingDataSavingForTypeWebrtc(dataSaving), derivedState: Data(), key: key, isOutgoing: isOutgoing, connections: filteredConnections, maxLayer: maxLayer, allowP2P: allowP2P, allowTCP: enableTCP, enableStunMarking: enableStunMarking, logPath: logPath, statsLogPath: tempStatsLogPath, sendSignalingData: { [weak callSessionManager] data in - queue.async { - guard let strongSelf = self else { - return - } - if let signalingConnectionManager = strongSelf.signalingConnectionManager { - signalingConnectionManager.with { impl in - impl.send(payloadData: data) - } - } - - if let callSessionManager = callSessionManager { - callSessionManager.sendSignalingData(internalId: internalId, data: data) + var directConnection: OngoingCallDirectConnection? + #if DEBUG + if #available(iOS 12.0, *) { + for connection in filteredConnections { + if connection.username == "reflector" && connection.reflectorId == 1 && !connection.hasTcp && connection.hasTurn { + directConnection = CallDirectConnectionImpl(host: connection.ip, port: Int(connection.port), peerTag: dataWithHexString(connection.password)) + break } } - }, videoCapturer: video?.impl, preferredVideoCodec: preferredVideoCodec, audioInputDeviceId: "", audioDevice: audioDevice?.impl) + } + #else + directConnection = nil + #endif + + let context = OngoingCallThreadLocalContextWebrtc( + version: version, + queue: OngoingCallThreadLocalContextQueueImpl(queue: queue), + proxy: voipProxyServer, + networkType: ongoingNetworkTypeForTypeWebrtc(initialNetworkType), + dataSaving: ongoingDataSavingForTypeWebrtc(dataSaving), + derivedState: Data(), + key: key, + isOutgoing: isOutgoing, + connections: filteredConnections, + maxLayer: maxLayer, + allowP2P: allowP2P, + allowTCP: enableTCP, + enableStunMarking: enableStunMarking, + logPath: logPath, + statsLogPath: tempStatsLogPath, + sendSignalingData: { [weak callSessionManager] data in + queue.async { + guard let strongSelf = self else { + return + } + if let signalingConnectionManager = strongSelf.signalingConnectionManager { + signalingConnectionManager.with { impl in + impl.send(payloadData: data) + } + } + + if let callSessionManager = callSessionManager { + callSessionManager.sendSignalingData(internalId: internalId, data: data) + } + } + }, + videoCapturer: video?.impl, + preferredVideoCodec: preferredVideoCodec, + audioInputDeviceId: "", + audioDevice: audioDevice?.impl, + directConnection: directConnection + ) strongSelf.contextRef = Unmanaged.passRetained(OngoingCallThreadLocalContextHolder(context)) context.stateChanged = { [weak callSessionManager] state, videoState, remoteVideoState, remoteAudioState, remoteBatteryLevel, _ in @@ -1287,12 +1323,204 @@ public final class OngoingCallContext { } } -private protocol CallSignalingConnection { +private protocol CallSignalingConnection: AnyObject { func start() func stop() func send(payloadData: Data) } +@available(iOS 13.0, *) +private class CustomWrapperProtocol: NWProtocolFramerImplementation { + static var label: String = "CustomWrapperProtocol" + + static let definition = NWProtocolFramer.Definition(implementation: CustomWrapperProtocol.self) + + required init(framer: NWProtocolFramer.Instance) { + + } + + func start(framer: NWProtocolFramer.Instance) -> NWProtocolFramer.StartResult { + return .ready + } + + func handleInput(framer: NWProtocolFramer.Instance) -> Int { + preconditionFailure() + } + + func handleOutput(framer: NWProtocolFramer.Instance, message: NWProtocolFramer.Message, messageLength: Int, isComplete: Bool) { + preconditionFailure() + } + + func wakeup(framer: NWProtocolFramer.Instance) { + } + + func stop(framer: NWProtocolFramer.Instance) -> Bool { + return true + } + + func cleanup(framer: NWProtocolFramer.Instance) { + } +} + +@available(iOS 12.0, *) +private final class CallDirectConnectionImpl: NSObject, OngoingCallDirectConnection { + private final class Impl { + private let queue: Queue + private let peerTag: Data + + private var connection: NWConnection? + + var incomingDataHandler: ((Data) -> Void)? + + init(queue: Queue, host: String, port: Int, peerTag: Data) { + self.queue = queue + + var peerTag = peerTag + peerTag.withUnsafeMutableBytes { buffer in + let bytes = buffer.baseAddress!.assumingMemoryBound(to: UInt8.self) + for i in (buffer.count - 4) ..< buffer.count { + bytes.advanced(by: i).pointee = 1 + } + } + self.peerTag = peerTag + + if let port = NWEndpoint.Port(rawValue: UInt16(clamping: port)) { + self.connection = NWConnection(host: NWEndpoint.Host(host), port: port, using: .udp) + } + + self.connection?.stateUpdateHandler = { newState in + switch newState { + case .ready: + print("CallDirectConnection: State: Ready") + case .setup: + print("CallDirectConnection: State: Setup") + case .cancelled: + print("CallDirectConnection: State: Cancelled") + case .preparing: + print("CallDirectConnection: State: Preparing") + case let .waiting(error): + print("CallDirectConnection: State: Waiting (\(error))") + case let .failed(error): + print("CallDirectConnection: State: Error (\(error))") + @unknown default: + print("CallDirectConnection: State: Unknown") + } + } + + self.connection?.start(queue: self.queue.queue) + self.receive() + } + + deinit { + + } + + private func receive() { + let queue = self.queue + self.connection?.receiveMessage(completion: { [weak self] data, _, _, error in + assert(queue.isCurrent()) + + guard let self else { + return + } + + if let data { + if data.count >= 16 { + var unwrappedData = Data(count: data.count - 16) + unwrappedData.withUnsafeMutableBytes { destBuffer -> Void in + data.withUnsafeBytes { sourceBuffer -> Void in + sourceBuffer.copyBytes(to: destBuffer, from: 16 ..< sourceBuffer.count) + } + } + + self.incomingDataHandler?(unwrappedData) + } else { + print("Invalid data size") + } + } + if error == nil { + self.receive() + } + }) + } + + func send(data: Data) { + var wrappedData = Data() + wrappedData.append(self.peerTag) + wrappedData.append(data) + + self.connection?.send(content: wrappedData, completion: .contentProcessed({ error in + if let error { + print("Send error: \(error)") + } + })) + } + } + + private static let sharedQueue = Queue(name: "CallDirectConnectionImpl") + + private let queue: Queue + private let impl: QueueLocalObject + + private let incomingDataHandlers = Atomic Void>>(value: Bag()) + + init(host: String, port: Int, peerTag: Data) { + let queue = CallDirectConnectionImpl.sharedQueue + self.queue = queue + self.impl = QueueLocalObject(queue: queue, generate: { + return Impl(queue: queue, host: host, port: port, peerTag: peerTag) + }) + + let incomingDataHandlers = self.incomingDataHandlers + self.impl.with { [weak incomingDataHandlers] impl in + impl.incomingDataHandler = { data in + guard let incomingDataHandlers else { + return + } + for f in incomingDataHandlers.with({ return $0.copyItems() }) { + f(data) + } + } + } + } + + func add(onIncomingPacket addOnIncomingPacket: @escaping (Data) -> Void) -> Data { + var token = self.incomingDataHandlers.with { bag -> Int32 in + return Int32(bag.add(addOnIncomingPacket)) + } + return withUnsafeBytes(of: &token, { buffer -> Data in + let bytes = buffer.baseAddress!.assumingMemoryBound(to: UInt8.self) + return Data(bytes: bytes, count: 4) + }) + } + + func remove(onIncomingPacket token: Data) { + if token.count != 4 { + return + } + + var tokenValue: Int32 = 0 + withUnsafeMutableBytes(of: &tokenValue, { tokenBuffer in + let tokenBytes = tokenBuffer.baseAddress!.assumingMemoryBound(to: UInt8.self) + + token.withUnsafeBytes { sourceBuffer in + let sourceBytes = sourceBuffer.baseAddress!.assumingMemoryBound(to: UInt8.self) + memcpy(tokenBytes, sourceBytes, 4) + } + }) + + self.incomingDataHandlers.with { bag in + bag.remove(Int(tokenValue)) + } + } + + func sendPacket(_ packet: Data) { + self.impl.with { impl in + impl.send(data: packet) + } + } +} + @available(iOS 12.0, *) private final class CallSignalingConnectionImpl: CallSignalingConnection { private let queue: Queue @@ -1316,7 +1544,18 @@ private final class CallSignalingConnectionImpl: CallSignalingConnection { self.peerTag = peerTag self.dataReceived = dataReceived self.isClosed = isClosed + + #if DEBUG + if #available(iOS 15.0, *) { + let parameters = NWParameters.quic(alpn: ["tgcalls"]) + parameters.defaultProtocolStack.internetProtocol = NWProtocolFramer.Options(definition: CustomWrapperProtocol.definition) + self.connection = NWConnection(host: self.host, port: self.port, using: parameters) + } else { + preconditionFailure() + } + #else self.connection = NWConnection(host: self.host, port: self.port, using: .tcp) + #endif self.connection.stateUpdateHandler = { [weak self] state in queue.async { diff --git a/submodules/TgVoipWebrtc/PublicHeaders/TgVoipWebrtc/OngoingCallThreadLocalContext.h b/submodules/TgVoipWebrtc/PublicHeaders/TgVoipWebrtc/OngoingCallThreadLocalContext.h index 734f600161..daf0875498 100644 --- a/submodules/TgVoipWebrtc/PublicHeaders/TgVoipWebrtc/OngoingCallThreadLocalContext.h +++ b/submodules/TgVoipWebrtc/PublicHeaders/TgVoipWebrtc/OngoingCallThreadLocalContext.h @@ -215,6 +215,14 @@ typedef NS_ENUM(int32_t, OngoingCallDataSavingWebrtc) { @end +@protocol OngoingCallDirectConnection + +- (NSData * _Nonnull)addOnIncomingPacket:(void (^_Nonnull)(NSData * _Nonnull))addOnIncomingPacket; +- (void)removeOnIncomingPacket:(NSData * _Nonnull)token; +- (void)sendPacket:(NSData * _Nonnull)packet; + +@end + @interface OngoingCallThreadLocalContextWebrtc : NSObject + (void)logMessage:(NSString * _Nonnull)string; @@ -245,7 +253,8 @@ typedef NS_ENUM(int32_t, OngoingCallDataSavingWebrtc) { sendSignalingData:(void (^ _Nonnull)(NSData * _Nonnull))sendSignalingData videoCapturer:(OngoingCallThreadLocalContextVideoCapturer * _Nullable)videoCapturer preferredVideoCodec:(NSString * _Nullable)preferredVideoCodec audioInputDeviceId:(NSString * _Nonnull)audioInputDeviceId - audioDevice:(SharedCallAudioDevice * _Nullable)audioDevice; + audioDevice:(SharedCallAudioDevice * _Nullable)audioDevice + directConnection:(id _Nullable)directConnection; - (void)setManualAudioSessionIsActive:(bool)isAudioSessionActive; diff --git a/submodules/TgVoipWebrtc/Sources/OngoingCallThreadLocalContext.mm b/submodules/TgVoipWebrtc/Sources/OngoingCallThreadLocalContext.mm index 907e01cd87..110536a4ed 100644 --- a/submodules/TgVoipWebrtc/Sources/OngoingCallThreadLocalContext.mm +++ b/submodules/TgVoipWebrtc/Sources/OngoingCallThreadLocalContext.mm @@ -134,7 +134,7 @@ private: self = [super init]; if (self != nil) { _audioDeviceModule.reset(new tgcalls::ThreadLocalObject(tgcalls::StaticThreads::getThreads()->getWorkerThread(), [disableRecording]() mutable { - return (tgcalls::SharedAudioDeviceModule *)(new SharedAudioDeviceModuleImpl(disableRecording)); + return std::static_pointer_cast(std::make_shared(disableRecording)); })); } return self; @@ -535,6 +535,37 @@ private: void (^_frameReceived)(webrtc::VideoFrame const &); }; +class DirectConnectionChannelImpl : public tgcalls::DirectConnectionChannel { +public: + DirectConnectionChannelImpl(id _Nonnull impl) { + _impl = impl; + } + + virtual ~DirectConnectionChannelImpl() { + } + + virtual std::vector addOnIncomingPacket(std::function>)> &&handler) override { + __block auto localHandler = std::move(handler); + + NSData *token = [_impl addOnIncomingPacket:^(NSData * _Nonnull data) { + std::shared_ptr> mappedData = std::make_shared>((uint8_t const *)data.bytes, (uint8_t const *)data.bytes + data.length); + localHandler(mappedData); + }]; + return std::vector((uint8_t * const)token.bytes, (uint8_t * const)token.bytes + token.length); + } + + virtual void removeOnIncomingPacket(std::vector &token) override { + [_impl removeOnIncomingPacket:[[NSData alloc] initWithBytes:token.data() length:token.size()]]; + } + + virtual void sendPacket(std::unique_ptr> &&packet) override { + [_impl sendPacket:[[NSData alloc] initWithBytes:packet->data() length:packet->size()]]; + } + +private: + id _impl; +}; + } @interface GroupCallVideoSink : NSObject { @@ -1024,7 +1055,8 @@ static void (*InternalVoipLoggingFunction)(NSString *) = NULL; sendSignalingData:(void (^ _Nonnull)(NSData * _Nonnull))sendSignalingData videoCapturer:(OngoingCallThreadLocalContextVideoCapturer * _Nullable)videoCapturer preferredVideoCodec:(NSString * _Nullable)preferredVideoCodec audioInputDeviceId:(NSString * _Nonnull)audioInputDeviceId - audioDevice:(SharedCallAudioDevice * _Nullable)audioDevice { + audioDevice:(SharedCallAudioDevice * _Nullable)audioDevice + directConnection:(id _Nullable)directConnection { self = [super init]; if (self != nil) { _version = version; @@ -1149,6 +1181,11 @@ static void (*InternalVoipLoggingFunction)(NSString *) = NULL; audioDeviceModule = [_audioDevice getAudioDeviceModule]; } + std::shared_ptr directConnectionChannel; + if (directConnection) { + directConnectionChannel = std::static_pointer_cast(std::make_shared(directConnection)); + } + __weak OngoingCallThreadLocalContextWebrtc *weakSelf = self; _tgVoip = tgcalls::Meta::Create([version UTF8String], (tgcalls::Descriptor){ .version = [version UTF8String], @@ -1288,7 +1325,8 @@ static void (*InternalVoipLoggingFunction)(NSString *) = NULL; }]; return resultModule; } - } + }, + .directConnectionChannel = directConnectionChannel }); _state = OngoingCallStateInitializing; _signalBars = 4;