Swiftgram/submodules/TelegramVoip/Sources/IpcGroupCallContext.swift
2025-01-14 11:52:31 +08:00

1114 lines
37 KiB
Swift

import Foundation
import SwiftSignalKit
import CoreMedia
import ImageIO
private struct JoinPayload: Codable {
var id: UInt32
var string: String
}
private struct JoinResponsePayload: Codable {
var id: UInt32
var string: String
}
private struct CutoffPayload: Codable {
var id: UInt32
var timestamp: Int32
}
private let checkInterval: Double = 0.2
private let keepaliveTimeout: Double = 2.0
private func payloadDescriptionPath(basePath: String) -> String {
return basePath + "/currentPayloadDescription.json"
}
private func joinPayloadPath(basePath: String) -> String {
return basePath + "/joinPayload.json"
}
private func joinResponsePayloadPath(basePath: String) -> String {
return basePath + "/joinResponsePayload.json"
}
private func keepaliveInfoPath(basePath: String) -> String {
return basePath + "/keepaliveInfo.json"
}
private func cutoffPayloadPath(basePath: String) -> String {
return basePath + "/cutoffPayload.json"
}
private func broadcastAppSocketPath(basePath: String) -> String {
return basePath + "/0"
}
private final class FdReadConnection {
private final class PendingData {
var data: Data
var offset: Int = 0
init(count: Int) {
self.data = Data(bytesNoCopy: malloc(count)!, count: count, deallocator: .free)
}
}
private let queue: Queue
let fd: Int32
private let didRead: ((Data) -> Void)?
private let channel: DispatchSourceRead
private var currendData: PendingData?
init(queue: Queue, fd: Int32, didRead: ((Data) -> Void)?) {
assert(queue.isCurrent())
self.queue = queue
self.fd = fd
self.didRead = didRead
self.channel = DispatchSource.makeReadSource(fileDescriptor: fd, queue: queue.queue)
self.channel.setEventHandler(handler: { [weak self] in
guard let strongSelf = self else {
return
}
while true {
if let currendData = strongSelf.currendData {
let offset = currendData.offset
let count = currendData.data.count - offset
let bytesRead = currendData.data.withUnsafeMutableBytes { bytes -> Int in
return Darwin.read(fd, bytes.baseAddress!.advanced(by: offset), min(8129, count))
}
if bytesRead <= 0 {
break
} else {
currendData.offset += bytesRead
if currendData.offset == currendData.data.count {
strongSelf.currendData = nil
strongSelf.didRead?(currendData.data)
}
}
} else {
var length: Int32 = 0
let bytesRead = read(fd, &length, 4)
if bytesRead < 0 {
break
} else {
assert(bytesRead == 4)
if length > 0 {
assert(length > 0 && length <= 30 * 1024 * 1024)
strongSelf.currendData = PendingData(count: Int(length))
}
}
}
}
})
self.channel.resume()
}
deinit {
assert(self.queue.isCurrent())
self.channel.cancel()
}
}
private final class FdWriteConnection {
private final class PendingData {
let data: Data
var didWriteHeader: Bool = false
var offset: Int = 0
init(data: Data) {
self.data = data
}
}
private let queue: Queue
let fd: Int32
private let channel: DispatchSourceWrite
private var isResumed = false
private let bufferSize: Int
private let buffer: UnsafeMutableRawPointer
private var currentData: PendingData?
private var nextDataList: [Data] = []
init(queue: Queue, fd: Int32) {
assert(queue.isCurrent())
self.queue = queue
self.fd = fd
self.bufferSize = 8192
self.buffer = malloc(self.bufferSize)
self.channel = DispatchSource.makeWriteSource(fileDescriptor: fd, queue: queue.queue)
self.channel.setEventHandler(handler: { [weak self] in
guard let strongSelf = self else {
return
}
while true {
if let currentData = strongSelf.currentData {
if !currentData.didWriteHeader {
var length: Int32 = Int32(currentData.data.count)
let writtenBytes = Darwin.write(fd, &length, 4)
if writtenBytes > 0 {
assert(writtenBytes == 4)
currentData.didWriteHeader = true
} else {
strongSelf.channel.suspend()
strongSelf.isResumed = false
break
}
} else {
let offset = currentData.offset
let count = currentData.data.count - offset
let writtenBytes = currentData.data.withUnsafeBytes { bytes -> Int in
return Darwin.write(fd, bytes.baseAddress!.advanced(by: offset), min(count, strongSelf.bufferSize))
}
if writtenBytes > 0 {
currentData.offset += writtenBytes
if currentData.offset == currentData.data.count {
strongSelf.currentData = nil
if !strongSelf.nextDataList.isEmpty {
let nextData = strongSelf.nextDataList.removeFirst()
strongSelf.currentData = PendingData(data: nextData)
} else {
strongSelf.channel.suspend()
strongSelf.isResumed = false
break
}
}
} else {
strongSelf.channel.suspend()
strongSelf.isResumed = false
break
}
}
} else {
strongSelf.channel.suspend()
strongSelf.isResumed = false
break
}
}
})
}
deinit {
assert(self.queue.isCurrent())
if !self.isResumed {
self.channel.resume()
}
self.channel.cancel()
free(self.buffer)
}
func addData(data: Data) {
if self.currentData == nil {
self.currentData = PendingData(data: data)
} else {
var totalBytes = 0
for data in self.nextDataList {
totalBytes += data.count
}
if totalBytes < 1 * 1024 * 1024 {
self.nextDataList.append(data)
}
}
if !self.isResumed {
self.isResumed = true
self.channel.resume()
}
}
}
private final class NamedPipeReaderImpl {
private let queue: Queue
private var connection: FdReadConnection?
init(queue: Queue, path: String, didRead: @escaping (Data) -> Void) {
self.queue = queue
unlink(path)
mkfifo(path, 0o666)
let fd = open(path, O_RDONLY | O_NONBLOCK, S_IRUSR | S_IWUSR)
if fd != -1 {
self.connection = FdReadConnection(queue: self.queue, fd: fd, didRead: { data in
didRead(data)
})
}
}
}
private final class NamedPipeReader {
private let queue = Queue()
let impl: QueueLocalObject<NamedPipeReaderImpl>
init(path: String, didRead: @escaping (Data) -> Void) {
let queue = self.queue
self.impl = QueueLocalObject(queue: queue, generate: {
return NamedPipeReaderImpl(queue: queue, path: path, didRead: didRead)
})
}
}
private final class NamedPipeWriterImpl {
private let queue: Queue
private var connection: FdWriteConnection?
init(queue: Queue, path: String) {
self.queue = queue
let fd = open(path, O_WRONLY | O_NONBLOCK, S_IRUSR | S_IWUSR)
if fd != -1 {
self.connection = FdWriteConnection(queue: self.queue, fd: fd)
}
}
func addData(data: Data) {
guard let connection = self.connection else {
return
}
connection.addData(data: data)
}
}
private final class NamedPipeWriter {
private let queue = Queue()
private let impl: QueueLocalObject<NamedPipeWriterImpl>
init(path: String) {
let queue = self.queue
self.impl = QueueLocalObject(queue: queue, generate: {
return NamedPipeWriterImpl(queue: queue, path: path)
})
}
func addData(data: Data) {
self.impl.with { impl in
impl.addData(data: data)
}
}
}
private final class MappedFile {
let path: String
private var handle: Int32
private var currentSize: Int
private(set) var memory: UnsafeMutableRawPointer
init?(path: String, createIfNotExists: Bool) {
self.path = path
var flags: Int32 = O_RDWR | O_APPEND
if createIfNotExists {
flags |= O_CREAT
}
self.handle = open(path, flags, S_IRUSR | S_IWUSR)
if self.handle < 0 {
return nil
}
var value = stat()
stat(path, &value)
self.currentSize = Int(value.st_size)
self.memory = mmap(nil, self.currentSize, PROT_READ | PROT_WRITE, MAP_SHARED, self.handle, 0)
}
deinit {
munmap(self.memory, self.currentSize)
close(self.handle)
}
var size: Int {
get {
return self.currentSize
} set(value) {
if value != self.currentSize {
munmap(self.memory, self.currentSize)
ftruncate(self.handle, off_t(value))
self.currentSize = value
self.memory = mmap(nil, self.currentSize, PROT_READ | PROT_WRITE, MAP_SHARED, self.handle, 0)
}
}
}
func synchronize() {
msync(self.memory, self.currentSize, MS_ASYNC)
}
func write(at range: Range<Int>, from data: UnsafeRawPointer) {
memcpy(self.memory.advanced(by: range.lowerBound), data, range.count)
}
func read(at range: Range<Int>, to data: UnsafeMutableRawPointer) {
memcpy(data, self.memory.advanced(by: range.lowerBound), range.count)
}
func clear() {
memset(self.memory, 0, self.currentSize)
}
}
public final class IpcGroupCallBufferAppContext {
struct KeepaliveInfo: Codable {
var id: UInt32
var timestamp: Int32
}
struct PayloadDescription: Codable {
var id: UInt32
var timestamp: Int32
}
private let basePath: String
private var audioServer: NamedPipeReader?
private let id: UInt32
private let isActivePromise = ValuePromise<Bool>(false, ignoreRepeated: true)
public var isActive: Signal<Bool, NoError> {
return self.isActivePromise.get()
}
private var isActiveCheckTimer: SwiftSignalKit.Timer?
private let framesPipe = ValuePipe<(CVPixelBuffer, CGImagePropertyOrientation)>()
public var frames: Signal<(CVPixelBuffer, CGImagePropertyOrientation), NoError> {
return self.framesPipe.signal()
}
private let audioDataPipe = ValuePipe<Data>()
public var audioData: Signal<Data, NoError> {
return self.audioDataPipe.signal()
}
private var framePollTimer: SwiftSignalKit.Timer?
private var mappedFile: MappedFile?
private var callActiveInfoTimer: SwiftSignalKit.Timer?
public init(basePath: String) {
self.basePath = basePath
let _ = try? FileManager.default.createDirectory(atPath: basePath, withIntermediateDirectories: true, attributes: nil)
self.id = UInt32.random(in: 0 ..< UInt32.max)
let dataPath = broadcastAppSocketPath(basePath: basePath) + "-data-\(self.id)"
let audioDataPath = broadcastAppSocketPath(basePath: basePath) + "-audio-\(self.id)"
if let mappedFile = MappedFile(path: dataPath, createIfNotExists: true) {
self.mappedFile = mappedFile
if mappedFile.size < 10 * 1024 * 1024 {
mappedFile.size = 10 * 1024 * 1024
}
}
let audioDataPipe = self.audioDataPipe
self.audioServer = NamedPipeReader(path: audioDataPath, didRead: { data in
audioDataPipe.putNext(data)
})
let framePollTimer = SwiftSignalKit.Timer(timeout: 1.0 / 30.0, repeat: true, completion: { [weak self] in
guard let strongSelf = self, let mappedFile = strongSelf.mappedFile else {
return
}
var orientationValue: Int32 = 0
mappedFile.read(at: 0 ..< 4, to: &orientationValue)
let orientation = CGImagePropertyOrientation(rawValue: UInt32(bitPattern: orientationValue)) ?? .up
let data = Data(bytesNoCopy: mappedFile.memory.advanced(by: 4), count: mappedFile.size - 4, deallocator: .none)
if let frame = deserializePixelBuffer(data: data) {
strongSelf.framesPipe.putNext((frame, orientation))
}
}, queue: .mainQueue())
self.framePollTimer = framePollTimer
framePollTimer.start()
self.updateCallIsActive()
let callActiveInfoTimer = SwiftSignalKit.Timer(timeout: 1.0, repeat: true, completion: { [weak self] in
self?.updateCallIsActive()
}, queue: .mainQueue())
self.callActiveInfoTimer = callActiveInfoTimer
callActiveInfoTimer.start()
let isActiveCheckTimer = SwiftSignalKit.Timer(timeout: 1.0, repeat: true, completion: { [weak self] in
self?.updateKeepaliveInfo()
}, queue: .mainQueue())
self.isActiveCheckTimer = isActiveCheckTimer
isActiveCheckTimer.start()
}
deinit {
self.framePollTimer?.invalidate()
self.callActiveInfoTimer?.invalidate()
self.isActiveCheckTimer?.invalidate()
if let mappedFile = self.mappedFile {
self.mappedFile = nil
let _ = try? FileManager.default.removeItem(atPath: mappedFile.path)
}
}
private func updateCallIsActive() {
let timestamp = Int32(Date().timeIntervalSince1970)
let payloadDescription = IpcGroupCallBufferAppContext.PayloadDescription(
id: self.id,
timestamp: timestamp
)
guard let payloadDescriptionData = try? JSONEncoder().encode(payloadDescription) else {
return
}
guard let _ = try? payloadDescriptionData.write(to: URL(fileURLWithPath: payloadDescriptionPath(basePath: self.basePath)), options: .atomic) else {
return
}
}
private func updateKeepaliveInfo() {
let filePath = keepaliveInfoPath(basePath: self.basePath)
guard let keepaliveInfoData = try? Data(contentsOf: URL(fileURLWithPath: filePath)) else {
return
}
guard let keepaliveInfo = try? JSONDecoder().decode(IpcGroupCallBufferAppContext.KeepaliveInfo.self, from: keepaliveInfoData) else {
return
}
if keepaliveInfo.id != self.id {
self.isActivePromise.set(false)
return
}
let timestamp = Int32(Date().timeIntervalSince1970)
if keepaliveInfo.timestamp < timestamp - Int32(keepaliveTimeout) {
self.isActivePromise.set(false)
return
}
self.isActivePromise.set(true)
}
public func stopScreencast() {
let timestamp = Int32(Date().timeIntervalSince1970)
let cutoffPayload = CutoffPayload(
id: self.id,
timestamp: timestamp
)
guard let cutoffPayloadData = try? JSONEncoder().encode(cutoffPayload) else {
return
}
guard let _ = try? cutoffPayloadData.write(to: URL(fileURLWithPath: cutoffPayloadPath(basePath: self.basePath)), options: .atomic) else {
return
}
}
}
public final class IpcGroupCallBufferBroadcastContext {
public enum Status {
public enum FinishReason {
case screencastEnded
case callEnded
case error
}
case active
case finished(FinishReason)
}
private let basePath: String
private let client: NamedPipeWriter
private var timer: SwiftSignalKit.Timer?
private let statusPromise = Promise<Status>()
public var status: Signal<Status, NoError> {
return self.statusPromise.get()
}
private var mappedFile: MappedFile?
private var currentId: UInt32?
private var audioClient: NamedPipeWriter?
private var callActiveInfoTimer: SwiftSignalKit.Timer?
private var keepaliveInfoTimer: SwiftSignalKit.Timer?
private var screencastCutoffTimer: SwiftSignalKit.Timer?
public init(basePath: String) {
self.basePath = basePath
let _ = try? FileManager.default.createDirectory(atPath: basePath, withIntermediateDirectories: true, attributes: nil)
self.client = NamedPipeWriter(path: broadcastAppSocketPath(basePath: basePath))
let callActiveInfoTimer = SwiftSignalKit.Timer(timeout: 1.0, repeat: true, completion: { [weak self] in
self?.updateCallIsActive()
}, queue: .mainQueue())
self.callActiveInfoTimer = callActiveInfoTimer
callActiveInfoTimer.start()
let screencastCutoffTimer = SwiftSignalKit.Timer(timeout: 1.0, repeat: true, completion: { [weak self] in
self?.updateScreencastCutoff()
}, queue: .mainQueue())
self.screencastCutoffTimer = screencastCutoffTimer
screencastCutoffTimer.start()
}
deinit {
self.endActiveIndication()
self.callActiveInfoTimer?.invalidate()
self.keepaliveInfoTimer?.invalidate()
self.screencastCutoffTimer?.invalidate()
}
private func updateScreencastCutoff() {
let filePath = cutoffPayloadPath(basePath: self.basePath)
guard let cutoffPayloadData = try? Data(contentsOf: URL(fileURLWithPath: filePath)) else {
return
}
guard let cutoffPayload = try? JSONDecoder().decode(CutoffPayload.self, from: cutoffPayloadData) else {
return
}
let timestamp = Int32(Date().timeIntervalSince1970)
if let currentId = self.currentId, currentId == cutoffPayload.id && cutoffPayload.timestamp > timestamp - 10 {
self.statusPromise.set(.single(.finished(.screencastEnded)))
return
}
}
private func updateCallIsActive() {
let filePath = payloadDescriptionPath(basePath: self.basePath)
guard let payloadDescriptionData = try? Data(contentsOf: URL(fileURLWithPath: filePath)) else {
self.statusPromise.set(.single(.finished(.error)))
return
}
guard let payloadDescription = try? JSONDecoder().decode(IpcGroupCallBufferAppContext.PayloadDescription.self, from: payloadDescriptionData) else {
self.statusPromise.set(.single(.finished(.error)))
return
}
let timestamp = Int32(Date().timeIntervalSince1970)
if payloadDescription.timestamp < timestamp - 4 {
self.statusPromise.set(.single(.finished(.callEnded)))
return
}
if let currentId = self.currentId {
if currentId != payloadDescription.id {
self.statusPromise.set(.single(.finished(.callEnded)))
}
} else {
self.currentId = payloadDescription.id
let dataPath = broadcastAppSocketPath(basePath: basePath) + "-data-\(payloadDescription.id)"
let audioDataPath = broadcastAppSocketPath(basePath: basePath) + "-audio-\(payloadDescription.id)"
if let mappedFile = MappedFile(path: dataPath, createIfNotExists: false) {
self.mappedFile = mappedFile
if mappedFile.size < 10 * 1024 * 1024 {
mappedFile.size = 10 * 1024 * 1024
}
}
self.audioClient = NamedPipeWriter(path: audioDataPath)
self.writeKeepaliveInfo()
let keepaliveInfoTimer = SwiftSignalKit.Timer(timeout: 1.0, repeat: true, completion: { [weak self] in
self?.writeKeepaliveInfo()
}, queue: .mainQueue())
self.keepaliveInfoTimer = keepaliveInfoTimer
keepaliveInfoTimer.start()
self.statusPromise.set(.single(.active))
}
}
public func setCurrentFrame(data: Data, orientation: CGImagePropertyOrientation) {
if let mappedFile = self.mappedFile, mappedFile.size >= data.count {
let _ = data.withUnsafeBytes { bytes in
var orientationValue = Int32(bitPattern: orientation.rawValue)
memmove(mappedFile.memory, &orientationValue, 4)
memcpy(mappedFile.memory.advanced(by: 4), bytes.baseAddress!, data.count)
}
}
}
public func writeAudioData(data: Data) {
self.audioClient?.addData(data: data)
}
private func writeKeepaliveInfo() {
guard let currentId = self.currentId else {
preconditionFailure()
}
let keepaliveInfo = IpcGroupCallBufferAppContext.KeepaliveInfo(
id: currentId,
timestamp: Int32(Date().timeIntervalSince1970)
)
guard let keepaliveInfoData = try? JSONEncoder().encode(keepaliveInfo) else {
preconditionFailure()
}
guard let _ = try? keepaliveInfoData.write(to: URL(fileURLWithPath: keepaliveInfoPath(basePath: self.basePath)), options: .atomic) else {
preconditionFailure()
}
}
private func endActiveIndication() {
let _ = try? FileManager.default.removeItem(atPath: keepaliveInfoPath(basePath: self.basePath))
}
}
public func serializePixelBuffer(buffer: CVPixelBuffer) -> Data? {
let pixelFormat = CVPixelBufferGetPixelFormatType(buffer)
switch pixelFormat {
case kCVPixelFormatType_420YpCbCr8BiPlanarFullRange, kCVPixelFormatType_420YpCbCr8BiPlanarVideoRange:
let status = CVPixelBufferLockBaseAddress(buffer, .readOnly)
if status != kCVReturnSuccess {
return nil
}
defer {
CVPixelBufferUnlockBaseAddress(buffer, .readOnly)
}
let width = CVPixelBufferGetWidth(buffer)
let height = CVPixelBufferGetHeight(buffer)
guard let yPlane = CVPixelBufferGetBaseAddressOfPlane(buffer, 0) else {
return nil
}
let yStride = CVPixelBufferGetBytesPerRowOfPlane(buffer, 0)
let yPlaneSize = yStride * height
guard let uvPlane = CVPixelBufferGetBaseAddressOfPlane(buffer, 1) else {
return nil
}
let uvStride = CVPixelBufferGetBytesPerRowOfPlane(buffer, 1)
let uvPlaneSize = uvStride * (height / 2)
let headerSize: Int = 4 + 4 + 4 + 4 + 4
let dataSize = headerSize + yPlaneSize + uvPlaneSize
let resultBytes = malloc(dataSize)!
var pixelFormatValue = pixelFormat
memcpy(resultBytes.advanced(by: 0), &pixelFormatValue, 4)
var widthValue = Int32(width)
memcpy(resultBytes.advanced(by: 4), &widthValue, 4)
var heightValue = Int32(height)
memcpy(resultBytes.advanced(by: 4 + 4), &heightValue, 4)
var yStrideValue = Int32(yStride)
memcpy(resultBytes.advanced(by: 4 + 4 + 4), &yStrideValue, 4)
var uvStrideValue = Int32(uvStride)
memcpy(resultBytes.advanced(by: 4 + 4 + 4 + 4), &uvStrideValue, 4)
memcpy(resultBytes.advanced(by: headerSize), yPlane, yPlaneSize)
memcpy(resultBytes.advanced(by: headerSize + yPlaneSize), uvPlane, uvPlaneSize)
return Data(bytesNoCopy: resultBytes, count: dataSize, deallocator: .free)
default:
return nil
}
}
public func deserializePixelBuffer(data: Data) -> CVPixelBuffer? {
if data.count < 4 + 4 + 4 + 4 {
return nil
}
let count = data.count
return data.withUnsafeBytes { bytes -> CVPixelBuffer? in
let dataBytes = bytes.baseAddress!
var pixelFormat: UInt32 = 0
memcpy(&pixelFormat, dataBytes.advanced(by: 0), 4)
switch pixelFormat {
case kCVPixelFormatType_420YpCbCr8BiPlanarFullRange, kCVPixelFormatType_420YpCbCr8BiPlanarVideoRange:
break
default:
return nil
}
var width: Int32 = 0
memcpy(&width, dataBytes.advanced(by: 4), 4)
var height: Int32 = 0
memcpy(&height, dataBytes.advanced(by: 4 + 4), 4)
var yStride: Int32 = 0
memcpy(&yStride, dataBytes.advanced(by: 4 + 4 + 4), 4)
var uvStride: Int32 = 0
memcpy(&uvStride, dataBytes.advanced(by: 4 + 4 + 4 + 4), 4)
if width < 0 || width > 8192 {
return nil
}
if height < 0 || height > 8192 {
return nil
}
let headerSize: Int = 4 + 4 + 4 + 4 + 4
let yPlaneSize = Int(yStride * height)
let uvPlaneSize = Int(uvStride * height / 2)
let dataSize = headerSize + yPlaneSize + uvPlaneSize
if dataSize > count {
return nil
}
var buffer: CVPixelBuffer? = nil
CVPixelBufferCreate(nil, Int(width), Int(height), pixelFormat, nil, &buffer)
if let buffer = buffer {
let status = CVPixelBufferLockBaseAddress(buffer, [])
if status != kCVReturnSuccess {
return nil
}
defer {
CVPixelBufferUnlockBaseAddress(buffer, [])
}
guard let destYPlane = CVPixelBufferGetBaseAddressOfPlane(buffer, 0) else {
return nil
}
let destYStride = CVPixelBufferGetBytesPerRowOfPlane(buffer, 0)
if destYStride != Int(yStride) {
return nil
}
guard let destUvPlane = CVPixelBufferGetBaseAddressOfPlane(buffer, 1) else {
return nil
}
let destUvStride = CVPixelBufferGetBytesPerRowOfPlane(buffer, 1)
if destUvStride != Int(uvStride) {
return nil
}
memcpy(destYPlane, dataBytes.advanced(by: headerSize), yPlaneSize)
memcpy(destUvPlane, dataBytes.advanced(by: headerSize + yPlaneSize), uvPlaneSize)
return buffer
} else {
return nil
}
}
}
public final class IpcGroupCallEmbeddedAppContext {
public struct JoinPayload: Codable, Equatable {
public var id: UInt32
public var data: String
public var ssrc: UInt32
public init(id: UInt32, data: String, ssrc: UInt32) {
self.id = id
self.data = data
self.ssrc = ssrc
}
}
public struct JoinResponse: Codable, Equatable {
public var data: String
public init(data: String) {
self.data = data
}
}
struct KeepaliveInfo: Codable {
var id: UInt32
var timestamp: Int32
var joinPayload: JoinPayload?
init(id: UInt32, timestamp: Int32, joinPayload: JoinPayload?) {
self.id = id
self.timestamp = timestamp
self.joinPayload = joinPayload
}
}
struct PayloadDescription: Codable {
var id: UInt32
var timestamp: Int32
var activeRequestId: UInt32?
var joinResponse: JoinResponse?
init(id: UInt32, timestamp: Int32, activeRequestId: UInt32?, joinResponse: JoinResponse?) {
self.id = id
self.timestamp = timestamp
self.activeRequestId = activeRequestId
self.joinResponse = joinResponse
}
}
private let basePath: String
private let id: UInt32
private let isActivePromise = ValuePromise<Bool>(false, ignoreRepeated: true)
public var isActive: Signal<Bool, NoError> {
return self.isActivePromise.get()
}
private var isActiveCheckTimer: SwiftSignalKit.Timer?
private var joinPayloadValue: JoinPayload? {
didSet {
if let joinPayload = self.joinPayloadValue, joinPayload != oldValue {
self.joinPayloadPromise.set(.single(joinPayload))
}
}
}
private let joinPayloadPromise = Promise<JoinPayload>()
public var joinPayload: Signal<JoinPayload, NoError> {
return self.joinPayloadPromise.get()
}
private var nextActiveRequestId: UInt32 = 0
private var activeRequestId: UInt32? {
didSet {
if self.activeRequestId != oldValue {
self.updateCallIsActive()
}
}
}
public var joinResponse: JoinResponse? {
didSet {
if self.joinResponse != oldValue {
self.updateCallIsActive()
}
}
}
private var callActiveInfoTimer: SwiftSignalKit.Timer?
public init(basePath: String) {
self.basePath = basePath
let _ = try? FileManager.default.createDirectory(atPath: basePath, withIntermediateDirectories: true, attributes: nil)
self.id = UInt32.random(in: 0 ..< UInt32.max)
self.updateCallIsActive()
let callActiveInfoTimer = SwiftSignalKit.Timer(timeout: 1.0, repeat: true, completion: { [weak self] in
self?.updateCallIsActive()
}, queue: .mainQueue())
self.callActiveInfoTimer = callActiveInfoTimer
callActiveInfoTimer.start()
let isActiveCheckTimer = SwiftSignalKit.Timer(timeout: 1.0, repeat: true, completion: { [weak self] in
self?.updateKeepaliveInfo()
}, queue: .mainQueue())
self.isActiveCheckTimer = isActiveCheckTimer
isActiveCheckTimer.start()
}
deinit {
self.callActiveInfoTimer?.invalidate()
self.isActiveCheckTimer?.invalidate()
}
private func updateCallIsActive() {
let timestamp = Int32(Date().timeIntervalSince1970)
let payloadDescription = IpcGroupCallEmbeddedAppContext.PayloadDescription(
id: self.id,
timestamp: timestamp,
activeRequestId: self.activeRequestId,
joinResponse: self.joinResponse
)
guard let payloadDescriptionData = try? JSONEncoder().encode(payloadDescription) else {
return
}
guard let _ = try? payloadDescriptionData.write(to: URL(fileURLWithPath: payloadDescriptionPath(basePath: self.basePath)), options: .atomic) else {
return
}
}
private func updateKeepaliveInfo() {
let filePath = keepaliveInfoPath(basePath: self.basePath)
guard let keepaliveInfoData = try? Data(contentsOf: URL(fileURLWithPath: filePath)) else {
return
}
guard let keepaliveInfo = try? JSONDecoder().decode(KeepaliveInfo.self, from: keepaliveInfoData) else {
return
}
if keepaliveInfo.id != self.id {
self.isActivePromise.set(false)
return
}
let timestamp = Int32(Date().timeIntervalSince1970)
if keepaliveInfo.timestamp < timestamp - Int32(keepaliveTimeout) {
self.isActivePromise.set(false)
return
}
self.isActivePromise.set(true)
self.joinPayloadValue = keepaliveInfo.joinPayload
}
public func startScreencast() -> UInt32? {
if self.activeRequestId == nil {
let id = self.nextActiveRequestId
self.nextActiveRequestId += 1
self.activeRequestId = id
return id
} else {
return nil
}
}
public func stopScreencast() {
self.activeRequestId = nil
let timestamp = Int32(Date().timeIntervalSince1970)
let cutoffPayload = CutoffPayload(
id: self.id,
timestamp: timestamp
)
guard let cutoffPayloadData = try? JSONEncoder().encode(cutoffPayload) else {
return
}
guard let _ = try? cutoffPayloadData.write(to: URL(fileURLWithPath: cutoffPayloadPath(basePath: self.basePath)), options: .atomic) else {
return
}
}
}
public final class IpcGroupCallEmbeddedBroadcastContext {
public enum Status {
public enum FinishReason {
case screencastEnded
case callEnded
case error
}
case active(id: UInt32?, joinResponse: IpcGroupCallEmbeddedAppContext.JoinResponse?)
case finished(FinishReason)
}
private let basePath: String
private var timer: SwiftSignalKit.Timer?
private let statusPromise = Promise<Status>()
public var status: Signal<Status, NoError> {
return self.statusPromise.get()
}
private var currentId: UInt32?
private var callActiveInfoTimer: SwiftSignalKit.Timer?
private var keepaliveInfoTimer: SwiftSignalKit.Timer?
private var screencastCutoffTimer: SwiftSignalKit.Timer?
public var joinPayload: IpcGroupCallEmbeddedAppContext.JoinPayload? {
didSet {
if self.joinPayload != oldValue {
self.writeKeepaliveInfo()
}
}
}
public init(basePath: String) {
self.basePath = basePath
let _ = try? FileManager.default.createDirectory(atPath: basePath, withIntermediateDirectories: true, attributes: nil)
let callActiveInfoTimer = SwiftSignalKit.Timer(timeout: 1.0, repeat: true, completion: { [weak self] in
self?.updateCallIsActive()
}, queue: .mainQueue())
self.callActiveInfoTimer = callActiveInfoTimer
callActiveInfoTimer.start()
let screencastCutoffTimer = SwiftSignalKit.Timer(timeout: 1.0, repeat: true, completion: { [weak self] in
self?.updateScreencastCutoff()
}, queue: .mainQueue())
self.screencastCutoffTimer = screencastCutoffTimer
screencastCutoffTimer.start()
}
deinit {
self.endActiveIndication()
self.callActiveInfoTimer?.invalidate()
self.keepaliveInfoTimer?.invalidate()
self.screencastCutoffTimer?.invalidate()
}
private func updateScreencastCutoff() {
let filePath = cutoffPayloadPath(basePath: self.basePath)
guard let cutoffPayloadData = try? Data(contentsOf: URL(fileURLWithPath: filePath)) else {
return
}
guard let cutoffPayload = try? JSONDecoder().decode(CutoffPayload.self, from: cutoffPayloadData) else {
return
}
let timestamp = Int32(Date().timeIntervalSince1970)
if let currentId = self.currentId, currentId == cutoffPayload.id && cutoffPayload.timestamp > timestamp - 10 {
self.statusPromise.set(.single(.finished(.screencastEnded)))
return
}
}
private func updateCallIsActive() {
let filePath = payloadDescriptionPath(basePath: self.basePath)
guard let payloadDescriptionData = try? Data(contentsOf: URL(fileURLWithPath: filePath)) else {
self.statusPromise.set(.single(.finished(.error)))
return
}
guard let payloadDescription = try? JSONDecoder().decode(IpcGroupCallEmbeddedAppContext.PayloadDescription.self, from: payloadDescriptionData) else {
self.statusPromise.set(.single(.finished(.error)))
return
}
let timestamp = Int32(Date().timeIntervalSince1970)
if payloadDescription.timestamp < timestamp - 4 {
self.statusPromise.set(.single(.finished(.callEnded)))
return
}
if let currentId = self.currentId {
if currentId != payloadDescription.id {
self.statusPromise.set(.single(.finished(.callEnded)))
} else {
self.statusPromise.set(.single(.active(id: payloadDescription.activeRequestId, joinResponse: payloadDescription.joinResponse)))
}
} else {
self.currentId = payloadDescription.id
self.writeKeepaliveInfo()
let keepaliveInfoTimer = SwiftSignalKit.Timer(timeout: 1.0, repeat: true, completion: { [weak self] in
self?.writeKeepaliveInfo()
}, queue: .mainQueue())
self.keepaliveInfoTimer = keepaliveInfoTimer
keepaliveInfoTimer.start()
self.statusPromise.set(.single(.active(id: payloadDescription.activeRequestId, joinResponse: payloadDescription.joinResponse)))
}
}
private func writeKeepaliveInfo() {
guard let currentId = self.currentId else {
preconditionFailure()
}
let keepaliveInfo = IpcGroupCallEmbeddedAppContext.KeepaliveInfo(
id: currentId,
timestamp: Int32(Date().timeIntervalSince1970),
joinPayload: self.joinPayload
)
guard let keepaliveInfoData = try? JSONEncoder().encode(keepaliveInfo) else {
preconditionFailure()
}
guard let _ = try? keepaliveInfoData.write(to: URL(fileURLWithPath: keepaliveInfoPath(basePath: self.basePath)), options: .atomic) else {
preconditionFailure()
}
}
private func endActiveIndication() {
let _ = try? FileManager.default.removeItem(atPath: keepaliveInfoPath(basePath: self.basePath))
}
}