This commit is contained in:
Ali 2023-03-24 21:43:27 +04:00
parent 444a35c0d6
commit 9b7562c7bc
3 changed files with 305 additions and 19 deletions

View File

@ -933,22 +933,58 @@ public final class OngoingCallContext {
}
}
let context = OngoingCallThreadLocalContextWebrtc(version: version, queue: OngoingCallThreadLocalContextQueueImpl(queue: queue), proxy: voipProxyServer, networkType: ongoingNetworkTypeForTypeWebrtc(initialNetworkType), dataSaving: ongoingDataSavingForTypeWebrtc(dataSaving), derivedState: Data(), key: key, isOutgoing: isOutgoing, connections: filteredConnections, maxLayer: maxLayer, allowP2P: allowP2P, allowTCP: enableTCP, enableStunMarking: enableStunMarking, logPath: logPath, statsLogPath: tempStatsLogPath, sendSignalingData: { [weak callSessionManager] data in
queue.async {
guard let strongSelf = self else {
return
}
if let signalingConnectionManager = strongSelf.signalingConnectionManager {
signalingConnectionManager.with { impl in
impl.send(payloadData: data)
}
}
if let callSessionManager = callSessionManager {
callSessionManager.sendSignalingData(internalId: internalId, data: data)
var directConnection: OngoingCallDirectConnection?
#if DEBUG
if #available(iOS 12.0, *) {
for connection in filteredConnections {
if connection.username == "reflector" && connection.reflectorId == 1 && !connection.hasTcp && connection.hasTurn {
directConnection = CallDirectConnectionImpl(host: connection.ip, port: Int(connection.port), peerTag: dataWithHexString(connection.password))
break
}
}
}, videoCapturer: video?.impl, preferredVideoCodec: preferredVideoCodec, audioInputDeviceId: "", audioDevice: audioDevice?.impl)
}
#else
directConnection = nil
#endif
let context = OngoingCallThreadLocalContextWebrtc(
version: version,
queue: OngoingCallThreadLocalContextQueueImpl(queue: queue),
proxy: voipProxyServer,
networkType: ongoingNetworkTypeForTypeWebrtc(initialNetworkType),
dataSaving: ongoingDataSavingForTypeWebrtc(dataSaving),
derivedState: Data(),
key: key,
isOutgoing: isOutgoing,
connections: filteredConnections,
maxLayer: maxLayer,
allowP2P: allowP2P,
allowTCP: enableTCP,
enableStunMarking: enableStunMarking,
logPath: logPath,
statsLogPath: tempStatsLogPath,
sendSignalingData: { [weak callSessionManager] data in
queue.async {
guard let strongSelf = self else {
return
}
if let signalingConnectionManager = strongSelf.signalingConnectionManager {
signalingConnectionManager.with { impl in
impl.send(payloadData: data)
}
}
if let callSessionManager = callSessionManager {
callSessionManager.sendSignalingData(internalId: internalId, data: data)
}
}
},
videoCapturer: video?.impl,
preferredVideoCodec: preferredVideoCodec,
audioInputDeviceId: "",
audioDevice: audioDevice?.impl,
directConnection: directConnection
)
strongSelf.contextRef = Unmanaged.passRetained(OngoingCallThreadLocalContextHolder(context))
context.stateChanged = { [weak callSessionManager] state, videoState, remoteVideoState, remoteAudioState, remoteBatteryLevel, _ in
@ -1287,12 +1323,204 @@ public final class OngoingCallContext {
}
}
private protocol CallSignalingConnection {
private protocol CallSignalingConnection: AnyObject {
func start()
func stop()
func send(payloadData: Data)
}
@available(iOS 13.0, *)
private class CustomWrapperProtocol: NWProtocolFramerImplementation {
static var label: String = "CustomWrapperProtocol"
static let definition = NWProtocolFramer.Definition(implementation: CustomWrapperProtocol.self)
required init(framer: NWProtocolFramer.Instance) {
}
func start(framer: NWProtocolFramer.Instance) -> NWProtocolFramer.StartResult {
return .ready
}
func handleInput(framer: NWProtocolFramer.Instance) -> Int {
preconditionFailure()
}
func handleOutput(framer: NWProtocolFramer.Instance, message: NWProtocolFramer.Message, messageLength: Int, isComplete: Bool) {
preconditionFailure()
}
func wakeup(framer: NWProtocolFramer.Instance) {
}
func stop(framer: NWProtocolFramer.Instance) -> Bool {
return true
}
func cleanup(framer: NWProtocolFramer.Instance) {
}
}
@available(iOS 12.0, *)
private final class CallDirectConnectionImpl: NSObject, OngoingCallDirectConnection {
private final class Impl {
private let queue: Queue
private let peerTag: Data
private var connection: NWConnection?
var incomingDataHandler: ((Data) -> Void)?
init(queue: Queue, host: String, port: Int, peerTag: Data) {
self.queue = queue
var peerTag = peerTag
peerTag.withUnsafeMutableBytes { buffer in
let bytes = buffer.baseAddress!.assumingMemoryBound(to: UInt8.self)
for i in (buffer.count - 4) ..< buffer.count {
bytes.advanced(by: i).pointee = 1
}
}
self.peerTag = peerTag
if let port = NWEndpoint.Port(rawValue: UInt16(clamping: port)) {
self.connection = NWConnection(host: NWEndpoint.Host(host), port: port, using: .udp)
}
self.connection?.stateUpdateHandler = { newState in
switch newState {
case .ready:
print("CallDirectConnection: State: Ready")
case .setup:
print("CallDirectConnection: State: Setup")
case .cancelled:
print("CallDirectConnection: State: Cancelled")
case .preparing:
print("CallDirectConnection: State: Preparing")
case let .waiting(error):
print("CallDirectConnection: State: Waiting (\(error))")
case let .failed(error):
print("CallDirectConnection: State: Error (\(error))")
@unknown default:
print("CallDirectConnection: State: Unknown")
}
}
self.connection?.start(queue: self.queue.queue)
self.receive()
}
deinit {
}
private func receive() {
let queue = self.queue
self.connection?.receiveMessage(completion: { [weak self] data, _, _, error in
assert(queue.isCurrent())
guard let self else {
return
}
if let data {
if data.count >= 16 {
var unwrappedData = Data(count: data.count - 16)
unwrappedData.withUnsafeMutableBytes { destBuffer -> Void in
data.withUnsafeBytes { sourceBuffer -> Void in
sourceBuffer.copyBytes(to: destBuffer, from: 16 ..< sourceBuffer.count)
}
}
self.incomingDataHandler?(unwrappedData)
} else {
print("Invalid data size")
}
}
if error == nil {
self.receive()
}
})
}
func send(data: Data) {
var wrappedData = Data()
wrappedData.append(self.peerTag)
wrappedData.append(data)
self.connection?.send(content: wrappedData, completion: .contentProcessed({ error in
if let error {
print("Send error: \(error)")
}
}))
}
}
private static let sharedQueue = Queue(name: "CallDirectConnectionImpl")
private let queue: Queue
private let impl: QueueLocalObject<Impl>
private let incomingDataHandlers = Atomic<Bag<(Data) -> Void>>(value: Bag())
init(host: String, port: Int, peerTag: Data) {
let queue = CallDirectConnectionImpl.sharedQueue
self.queue = queue
self.impl = QueueLocalObject(queue: queue, generate: {
return Impl(queue: queue, host: host, port: port, peerTag: peerTag)
})
let incomingDataHandlers = self.incomingDataHandlers
self.impl.with { [weak incomingDataHandlers] impl in
impl.incomingDataHandler = { data in
guard let incomingDataHandlers else {
return
}
for f in incomingDataHandlers.with({ return $0.copyItems() }) {
f(data)
}
}
}
}
func add(onIncomingPacket addOnIncomingPacket: @escaping (Data) -> Void) -> Data {
var token = self.incomingDataHandlers.with { bag -> Int32 in
return Int32(bag.add(addOnIncomingPacket))
}
return withUnsafeBytes(of: &token, { buffer -> Data in
let bytes = buffer.baseAddress!.assumingMemoryBound(to: UInt8.self)
return Data(bytes: bytes, count: 4)
})
}
func remove(onIncomingPacket token: Data) {
if token.count != 4 {
return
}
var tokenValue: Int32 = 0
withUnsafeMutableBytes(of: &tokenValue, { tokenBuffer in
let tokenBytes = tokenBuffer.baseAddress!.assumingMemoryBound(to: UInt8.self)
token.withUnsafeBytes { sourceBuffer in
let sourceBytes = sourceBuffer.baseAddress!.assumingMemoryBound(to: UInt8.self)
memcpy(tokenBytes, sourceBytes, 4)
}
})
self.incomingDataHandlers.with { bag in
bag.remove(Int(tokenValue))
}
}
func sendPacket(_ packet: Data) {
self.impl.with { impl in
impl.send(data: packet)
}
}
}
@available(iOS 12.0, *)
private final class CallSignalingConnectionImpl: CallSignalingConnection {
private let queue: Queue
@ -1316,7 +1544,18 @@ private final class CallSignalingConnectionImpl: CallSignalingConnection {
self.peerTag = peerTag
self.dataReceived = dataReceived
self.isClosed = isClosed
#if DEBUG
if #available(iOS 15.0, *) {
let parameters = NWParameters.quic(alpn: ["tgcalls"])
parameters.defaultProtocolStack.internetProtocol = NWProtocolFramer.Options(definition: CustomWrapperProtocol.definition)
self.connection = NWConnection(host: self.host, port: self.port, using: parameters)
} else {
preconditionFailure()
}
#else
self.connection = NWConnection(host: self.host, port: self.port, using: .tcp)
#endif
self.connection.stateUpdateHandler = { [weak self] state in
queue.async {

View File

@ -215,6 +215,14 @@ typedef NS_ENUM(int32_t, OngoingCallDataSavingWebrtc) {
@end
@protocol OngoingCallDirectConnection <NSObject>
- (NSData * _Nonnull)addOnIncomingPacket:(void (^_Nonnull)(NSData * _Nonnull))addOnIncomingPacket;
- (void)removeOnIncomingPacket:(NSData * _Nonnull)token;
- (void)sendPacket:(NSData * _Nonnull)packet;
@end
@interface OngoingCallThreadLocalContextWebrtc : NSObject
+ (void)logMessage:(NSString * _Nonnull)string;
@ -245,7 +253,8 @@ typedef NS_ENUM(int32_t, OngoingCallDataSavingWebrtc) {
sendSignalingData:(void (^ _Nonnull)(NSData * _Nonnull))sendSignalingData videoCapturer:(OngoingCallThreadLocalContextVideoCapturer * _Nullable)videoCapturer
preferredVideoCodec:(NSString * _Nullable)preferredVideoCodec
audioInputDeviceId:(NSString * _Nonnull)audioInputDeviceId
audioDevice:(SharedCallAudioDevice * _Nullable)audioDevice;
audioDevice:(SharedCallAudioDevice * _Nullable)audioDevice
directConnection:(id<OngoingCallDirectConnection> _Nullable)directConnection;
- (void)setManualAudioSessionIsActive:(bool)isAudioSessionActive;

View File

@ -134,7 +134,7 @@ private:
self = [super init];
if (self != nil) {
_audioDeviceModule.reset(new tgcalls::ThreadLocalObject<tgcalls::SharedAudioDeviceModule>(tgcalls::StaticThreads::getThreads()->getWorkerThread(), [disableRecording]() mutable {
return (tgcalls::SharedAudioDeviceModule *)(new SharedAudioDeviceModuleImpl(disableRecording));
return std::static_pointer_cast<tgcalls::SharedAudioDeviceModule>(std::make_shared<SharedAudioDeviceModuleImpl>(disableRecording));
}));
}
return self;
@ -535,6 +535,37 @@ private:
void (^_frameReceived)(webrtc::VideoFrame const &);
};
class DirectConnectionChannelImpl : public tgcalls::DirectConnectionChannel {
public:
DirectConnectionChannelImpl(id<OngoingCallDirectConnection> _Nonnull impl) {
_impl = impl;
}
virtual ~DirectConnectionChannelImpl() {
}
virtual std::vector<uint8_t> addOnIncomingPacket(std::function<void(std::shared_ptr<std::vector<uint8_t>>)> &&handler) override {
__block auto localHandler = std::move(handler);
NSData *token = [_impl addOnIncomingPacket:^(NSData * _Nonnull data) {
std::shared_ptr<std::vector<uint8_t>> mappedData = std::make_shared<std::vector<uint8_t>>((uint8_t const *)data.bytes, (uint8_t const *)data.bytes + data.length);
localHandler(mappedData);
}];
return std::vector<uint8_t>((uint8_t * const)token.bytes, (uint8_t * const)token.bytes + token.length);
}
virtual void removeOnIncomingPacket(std::vector<uint8_t> &token) override {
[_impl removeOnIncomingPacket:[[NSData alloc] initWithBytes:token.data() length:token.size()]];
}
virtual void sendPacket(std::unique_ptr<std::vector<uint8_t>> &&packet) override {
[_impl sendPacket:[[NSData alloc] initWithBytes:packet->data() length:packet->size()]];
}
private:
id<OngoingCallDirectConnection> _impl;
};
}
@interface GroupCallVideoSink : NSObject {
@ -1024,7 +1055,8 @@ static void (*InternalVoipLoggingFunction)(NSString *) = NULL;
sendSignalingData:(void (^ _Nonnull)(NSData * _Nonnull))sendSignalingData videoCapturer:(OngoingCallThreadLocalContextVideoCapturer * _Nullable)videoCapturer
preferredVideoCodec:(NSString * _Nullable)preferredVideoCodec
audioInputDeviceId:(NSString * _Nonnull)audioInputDeviceId
audioDevice:(SharedCallAudioDevice * _Nullable)audioDevice {
audioDevice:(SharedCallAudioDevice * _Nullable)audioDevice
directConnection:(id<OngoingCallDirectConnection> _Nullable)directConnection {
self = [super init];
if (self != nil) {
_version = version;
@ -1149,6 +1181,11 @@ static void (*InternalVoipLoggingFunction)(NSString *) = NULL;
audioDeviceModule = [_audioDevice getAudioDeviceModule];
}
std::shared_ptr<tgcalls::DirectConnectionChannel> directConnectionChannel;
if (directConnection) {
directConnectionChannel = std::static_pointer_cast<tgcalls::DirectConnectionChannel>(std::make_shared<DirectConnectionChannelImpl>(directConnection));
}
__weak OngoingCallThreadLocalContextWebrtc *weakSelf = self;
_tgVoip = tgcalls::Meta::Create([version UTF8String], (tgcalls::Descriptor){
.version = [version UTF8String],
@ -1288,7 +1325,8 @@ static void (*InternalVoipLoggingFunction)(NSString *) = NULL;
}];
return resultModule;
}
}
},
.directConnectionChannel = directConnectionChannel
});
_state = OngoingCallStateInitializing;
_signalBars = 4;