mirror of
https://github.com/Swiftgram/Telegram-iOS.git
synced 2025-06-16 05:55:20 +00:00
969 lines
40 KiB
Swift
969 lines
40 KiB
Swift
import Foundation
|
|
import SwiftSignalKit
|
|
import TgVoipWebrtc
|
|
import TelegramCore
|
|
import Network
|
|
import Postbox
|
|
import FFMpegBinding
|
|
import ManagedFile
|
|
|
|
@available(iOS 12.0, macOS 14.0, *)
|
|
public final class WrappedMediaStreamingContext {
|
|
private final class Impl {
|
|
let queue: Queue
|
|
let context: MediaStreamingContext
|
|
|
|
private let broadcastPartsSource = Atomic<BroadcastPartSource?>(value: nil)
|
|
|
|
init(queue: Queue, rejoinNeeded: @escaping () -> Void) {
|
|
self.queue = queue
|
|
|
|
var getBroadcastPartsSource: (() -> BroadcastPartSource?)?
|
|
|
|
self.context = MediaStreamingContext(
|
|
queue: ContextQueueImpl(queue: queue),
|
|
requestCurrentTime: { completion in
|
|
let disposable = MetaDisposable()
|
|
|
|
queue.async {
|
|
if let source = getBroadcastPartsSource?() {
|
|
disposable.set(source.requestTime(completion: completion))
|
|
} else {
|
|
completion(0)
|
|
}
|
|
}
|
|
|
|
return OngoingGroupCallBroadcastPartTaskImpl(disposable: disposable)
|
|
},
|
|
requestAudioBroadcastPart: { timestampMilliseconds, durationMilliseconds, completion in
|
|
let disposable = MetaDisposable()
|
|
|
|
queue.async {
|
|
disposable.set(getBroadcastPartsSource?()?.requestPart(timestampMilliseconds: timestampMilliseconds, durationMilliseconds: durationMilliseconds, subject: .audio, completion: completion, rejoinNeeded: {
|
|
rejoinNeeded()
|
|
}))
|
|
}
|
|
|
|
return OngoingGroupCallBroadcastPartTaskImpl(disposable: disposable)
|
|
},
|
|
requestVideoBroadcastPart: { timestampMilliseconds, durationMilliseconds, channelId, quality, completion in
|
|
let disposable = MetaDisposable()
|
|
|
|
queue.async {
|
|
let mappedQuality: OngoingGroupCallContext.VideoChannel.Quality
|
|
switch quality {
|
|
case .thumbnail:
|
|
mappedQuality = .thumbnail
|
|
case .medium:
|
|
mappedQuality = .medium
|
|
case .full:
|
|
mappedQuality = .full
|
|
@unknown default:
|
|
mappedQuality = .thumbnail
|
|
}
|
|
disposable.set(getBroadcastPartsSource?()?.requestPart(timestampMilliseconds: timestampMilliseconds, durationMilliseconds: durationMilliseconds, subject: .video(channelId: channelId, quality: mappedQuality), completion: completion, rejoinNeeded: {
|
|
rejoinNeeded()
|
|
}))
|
|
}
|
|
|
|
return OngoingGroupCallBroadcastPartTaskImpl(disposable: disposable)
|
|
}
|
|
)
|
|
|
|
let broadcastPartsSource = self.broadcastPartsSource
|
|
getBroadcastPartsSource = {
|
|
return broadcastPartsSource.with { $0 }
|
|
}
|
|
}
|
|
|
|
deinit {
|
|
}
|
|
|
|
func setAudioStreamData(audioStreamData: OngoingGroupCallContext.AudioStreamData?) {
|
|
if let audioStreamData = audioStreamData {
|
|
let broadcastPartsSource = NetworkBroadcastPartSource(queue: self.queue, engine: audioStreamData.engine, callId: audioStreamData.callId, accessHash: audioStreamData.accessHash, isExternalStream: audioStreamData.isExternalStream)
|
|
let _ = self.broadcastPartsSource.swap(broadcastPartsSource)
|
|
self.context.start()
|
|
}
|
|
}
|
|
|
|
func video() -> Signal<OngoingGroupCallContext.VideoFrameData, NoError> {
|
|
let queue = self.queue
|
|
return Signal { [weak self] subscriber in
|
|
let disposable = MetaDisposable()
|
|
|
|
queue.async {
|
|
guard let strongSelf = self else {
|
|
return
|
|
}
|
|
let innerDisposable = strongSelf.context.addVideoOutput() { videoFrameData in
|
|
subscriber.putNext(OngoingGroupCallContext.VideoFrameData(frameData: videoFrameData))
|
|
}
|
|
disposable.set(ActionDisposable {
|
|
innerDisposable.dispose()
|
|
})
|
|
}
|
|
|
|
return disposable
|
|
}
|
|
}
|
|
}
|
|
|
|
private let queue = Queue()
|
|
private let impl: QueueLocalObject<Impl>
|
|
|
|
public init(rejoinNeeded: @escaping () -> Void) {
|
|
let queue = self.queue
|
|
self.impl = QueueLocalObject(queue: queue, generate: {
|
|
return Impl(queue: queue, rejoinNeeded: rejoinNeeded)
|
|
})
|
|
}
|
|
|
|
public func setAudioStreamData(audioStreamData: OngoingGroupCallContext.AudioStreamData?) {
|
|
self.impl.with { impl in
|
|
impl.setAudioStreamData(audioStreamData: audioStreamData)
|
|
}
|
|
}
|
|
|
|
public func video() -> Signal<OngoingGroupCallContext.VideoFrameData, NoError> {
|
|
return Signal { subscriber in
|
|
let disposable = MetaDisposable()
|
|
self.impl.with { impl in
|
|
disposable.set(impl.video().start(next: { value in
|
|
subscriber.putNext(value)
|
|
}))
|
|
}
|
|
return disposable
|
|
}
|
|
}
|
|
}
|
|
@available(iOS 12.0, macOS 14.0, *)
|
|
public final class ExternalMediaStreamingContext: SharedHLSServerSource {
|
|
private final class Impl {
|
|
let queue: Queue
|
|
|
|
private var broadcastPartsSource: BroadcastPartSource?
|
|
|
|
private let resetPlaylistDisposable = MetaDisposable()
|
|
private let updatePlaylistDisposable = MetaDisposable()
|
|
|
|
let masterPlaylistData = Promise<String>()
|
|
let playlistData = Promise<String>()
|
|
let mediumPlaylistData = Promise<String>()
|
|
|
|
init(queue: Queue, rejoinNeeded: @escaping () -> Void) {
|
|
self.queue = queue
|
|
}
|
|
|
|
deinit {
|
|
self.updatePlaylistDisposable.dispose()
|
|
}
|
|
|
|
func setAudioStreamData(audioStreamData: OngoingGroupCallContext.AudioStreamData?) {
|
|
if let audioStreamData {
|
|
let broadcastPartsSource = NetworkBroadcastPartSource(queue: self.queue, engine: audioStreamData.engine, callId: audioStreamData.callId, accessHash: audioStreamData.accessHash, isExternalStream: audioStreamData.isExternalStream)
|
|
self.broadcastPartsSource = broadcastPartsSource
|
|
|
|
self.updatePlaylistDisposable.set(nil)
|
|
|
|
let queue = self.queue
|
|
self.resetPlaylistDisposable.set(broadcastPartsSource.requestTime(completion: { [weak self] timestamp in
|
|
queue.async {
|
|
guard let self else {
|
|
return
|
|
}
|
|
|
|
let segmentDuration: Int64 = 1000
|
|
|
|
var adjustedTimestamp: Int64 = 0
|
|
if timestamp > 0 {
|
|
adjustedTimestamp = timestamp / segmentDuration * segmentDuration - 4 * segmentDuration
|
|
}
|
|
|
|
if adjustedTimestamp > 0 {
|
|
var masterPlaylistData = "#EXTM3U\n" +
|
|
"#EXT-X-VERSION:6\n" +
|
|
"#EXT-X-STREAM-INF:BANDWIDTH=3300000,RESOLUTION=1280x720,CODECS=\"avc1.64001f,mp4a.40.2\"\n" +
|
|
"hls_level_0.m3u8\n"
|
|
|
|
masterPlaylistData += "#EXT-X-STREAM-INF:BANDWIDTH=1000000,RESOLUTION=640x360,CODECS=\"avc1.64001f,mp4a.40.2\"\n" +
|
|
"hls_level_1.m3u8\n"
|
|
|
|
self.masterPlaylistData.set(.single(masterPlaylistData))
|
|
|
|
self.beginUpdatingPlaylist(initialHeadTimestamp: adjustedTimestamp)
|
|
}
|
|
}
|
|
}))
|
|
}
|
|
}
|
|
|
|
private func beginUpdatingPlaylist(initialHeadTimestamp: Int64) {
|
|
let segmentDuration: Int64 = 1000
|
|
|
|
var timestamp = initialHeadTimestamp
|
|
self.updatePlaylist(headTimestamp: timestamp, quality: 0)
|
|
self.updatePlaylist(headTimestamp: timestamp, quality: 1)
|
|
|
|
self.updatePlaylistDisposable.set((
|
|
Signal<Void, NoError>.single(Void())
|
|
|> delay(1.0, queue: self.queue)
|
|
|> restart
|
|
|> deliverOn(self.queue)
|
|
).start(next: { [weak self] _ in
|
|
guard let self else {
|
|
return
|
|
}
|
|
|
|
timestamp += segmentDuration
|
|
self.updatePlaylist(headTimestamp: timestamp, quality: 0)
|
|
self.updatePlaylist(headTimestamp: timestamp, quality: 1)
|
|
}))
|
|
}
|
|
|
|
private func updatePlaylist(headTimestamp: Int64, quality: Int) {
|
|
let segmentDuration: Int64 = 1000
|
|
let headIndex = headTimestamp / segmentDuration
|
|
let minIndex = headIndex - 20
|
|
|
|
var playlistData = "#EXTM3U\n" +
|
|
"#EXT-X-VERSION:6\n" +
|
|
"#EXT-X-TARGETDURATION:1\n" +
|
|
"#EXT-X-MEDIA-SEQUENCE:\(minIndex)\n" +
|
|
"#EXT-X-INDEPENDENT-SEGMENTS\n"
|
|
|
|
for index in minIndex ... headIndex {
|
|
playlistData.append("#EXTINF:1.000000,\n")
|
|
playlistData.append("hls_stream\(quality)_\(index).ts\n")
|
|
}
|
|
|
|
//print("Player: updating playlist \(quality) \(minIndex) ... \(headIndex)")
|
|
|
|
if quality == 0 {
|
|
self.playlistData.set(.single(playlistData))
|
|
} else {
|
|
self.mediumPlaylistData.set(.single(playlistData))
|
|
}
|
|
}
|
|
|
|
func partData(index: Int, quality: Int) -> Signal<Data?, NoError> {
|
|
let segmentDuration: Int64 = 1000
|
|
let timestamp = Int64(index) * segmentDuration
|
|
|
|
print("Player: request part(q: \(quality)) \(index) -> \(timestamp)")
|
|
|
|
guard let broadcastPartsSource = self.broadcastPartsSource else {
|
|
return .single(nil)
|
|
}
|
|
|
|
return Signal { subscriber in
|
|
return broadcastPartsSource.requestPart(
|
|
timestampMilliseconds: timestamp,
|
|
durationMilliseconds: segmentDuration,
|
|
subject: .video(channelId: 1, quality: quality == 0 ? .full : .medium),
|
|
completion: { part in
|
|
var data = part.oggData
|
|
if data.count > 32 {
|
|
data = data.subdata(in: 32 ..< data.count)
|
|
}
|
|
subscriber.putNext(data)
|
|
},
|
|
rejoinNeeded: {
|
|
//TODO
|
|
}
|
|
)
|
|
}
|
|
}
|
|
|
|
func fileData(id: Int64, range: Range<Int>) -> Signal<(TempBoxFile, Range<Int>, Int)?, NoError> {
|
|
return .never()
|
|
}
|
|
}
|
|
|
|
private let queue = Queue()
|
|
let internalId: CallSessionInternalId
|
|
private let impl: QueueLocalObject<Impl>
|
|
private var hlsServerDisposable: Disposable?
|
|
|
|
public var id: String {
|
|
return self.internalId.uuidString
|
|
}
|
|
|
|
public init(id: CallSessionInternalId, rejoinNeeded: @escaping () -> Void) {
|
|
self.internalId = id
|
|
let queue = self.queue
|
|
self.impl = QueueLocalObject(queue: queue, generate: {
|
|
return Impl(queue: queue, rejoinNeeded: rejoinNeeded)
|
|
})
|
|
|
|
self.hlsServerDisposable = SharedHLSServer.shared.registerPlayer(source: self, completion: {})
|
|
}
|
|
|
|
deinit {
|
|
self.hlsServerDisposable?.dispose()
|
|
}
|
|
|
|
public func setAudioStreamData(audioStreamData: OngoingGroupCallContext.AudioStreamData?) {
|
|
self.impl.with { impl in
|
|
impl.setAudioStreamData(audioStreamData: audioStreamData)
|
|
}
|
|
}
|
|
|
|
public func masterPlaylistData() -> Signal<String, NoError> {
|
|
return self.impl.signalWith { impl, subscriber in
|
|
impl.masterPlaylistData.get().start(next: subscriber.putNext)
|
|
}
|
|
}
|
|
|
|
public func playlistData(quality: Int) -> Signal<String, NoError> {
|
|
return self.impl.signalWith { impl, subscriber in
|
|
if quality == 0 {
|
|
impl.playlistData.get().start(next: subscriber.putNext)
|
|
} else {
|
|
impl.mediumPlaylistData.get().start(next: subscriber.putNext)
|
|
}
|
|
}
|
|
}
|
|
|
|
public func partData(index: Int, quality: Int) -> Signal<Data?, NoError> {
|
|
return self.impl.signalWith { impl, subscriber in
|
|
impl.partData(index: index, quality: quality).start(next: subscriber.putNext)
|
|
}
|
|
}
|
|
|
|
public func fileData(id: Int64, range: Range<Int>) -> Signal<(TempBoxFile, Range<Int>, Int)?, NoError> {
|
|
return self.impl.signalWith { impl, subscriber in
|
|
impl.fileData(id: id, range: range).start(next: subscriber.putNext)
|
|
}
|
|
}
|
|
|
|
public func arbitraryFileData(path: String) -> Signal<(data: Data, contentType: String)?, NoError> {
|
|
return .single(nil)
|
|
}
|
|
}
|
|
|
|
public final class DirectMediaStreamingContext {
|
|
public struct Playlist: Equatable {
|
|
public struct Part: Equatable {
|
|
public let index: Int
|
|
public let timestamp: Double
|
|
public let duration: Double
|
|
|
|
public init(index: Int, timestamp: Double, duration: Double) {
|
|
self.index = index
|
|
self.timestamp = timestamp
|
|
self.duration = duration
|
|
}
|
|
}
|
|
|
|
public var parts: [Part]
|
|
|
|
public init(parts: [Part]) {
|
|
self.parts = parts
|
|
}
|
|
}
|
|
|
|
private final class Impl {
|
|
let queue: Queue
|
|
|
|
private var broadcastPartsSource: BroadcastPartSource?
|
|
|
|
private let resetPlaylistDisposable = MetaDisposable()
|
|
private let updatePlaylistDisposable = MetaDisposable()
|
|
|
|
let playlistData = Promise<Playlist>()
|
|
|
|
init(queue: Queue, rejoinNeeded: @escaping () -> Void) {
|
|
self.queue = queue
|
|
}
|
|
|
|
deinit {
|
|
self.updatePlaylistDisposable.dispose()
|
|
}
|
|
|
|
func setAudioStreamData(audioStreamData: OngoingGroupCallContext.AudioStreamData?) {
|
|
if let audioStreamData {
|
|
let broadcastPartsSource = NetworkBroadcastPartSource(queue: self.queue, engine: audioStreamData.engine, callId: audioStreamData.callId, accessHash: audioStreamData.accessHash, isExternalStream: audioStreamData.isExternalStream)
|
|
self.broadcastPartsSource = broadcastPartsSource
|
|
|
|
self.updatePlaylistDisposable.set(nil)
|
|
|
|
let queue = self.queue
|
|
self.resetPlaylistDisposable.set(broadcastPartsSource.requestTime(completion: { [weak self] timestamp in
|
|
queue.async {
|
|
guard let self else {
|
|
return
|
|
}
|
|
|
|
let segmentDuration: Int64 = 1000
|
|
|
|
var adjustedTimestamp: Int64 = 0
|
|
if timestamp > 0 {
|
|
adjustedTimestamp = timestamp / segmentDuration * segmentDuration - 4 * segmentDuration
|
|
}
|
|
|
|
if adjustedTimestamp > 0 {
|
|
self.beginUpdatingPlaylist(initialHeadTimestamp: adjustedTimestamp)
|
|
}
|
|
}
|
|
}))
|
|
}
|
|
}
|
|
|
|
private func beginUpdatingPlaylist(initialHeadTimestamp: Int64) {
|
|
let segmentDuration: Int64 = 1000
|
|
|
|
var timestamp = initialHeadTimestamp
|
|
self.updatePlaylist(headTimestamp: timestamp)
|
|
|
|
self.updatePlaylistDisposable.set((
|
|
Signal<Void, NoError>.single(Void())
|
|
|> delay(1.0, queue: self.queue)
|
|
|> restart
|
|
|> deliverOn(self.queue)
|
|
).start(next: { [weak self] _ in
|
|
guard let self else {
|
|
return
|
|
}
|
|
|
|
timestamp += segmentDuration
|
|
self.updatePlaylist(headTimestamp: timestamp)
|
|
}))
|
|
}
|
|
|
|
private func updatePlaylist(headTimestamp: Int64) {
|
|
let segmentDuration: Int64 = 1000
|
|
let headIndex = headTimestamp / segmentDuration
|
|
let minIndex = headIndex - 20
|
|
|
|
var parts: [Playlist.Part] = []
|
|
for index in minIndex ... headIndex {
|
|
parts.append(DirectMediaStreamingContext.Playlist.Part(
|
|
index: Int(index),
|
|
timestamp: Double(index),
|
|
duration: 1.0
|
|
))
|
|
}
|
|
|
|
self.playlistData.set(.single(Playlist(parts: parts)))
|
|
}
|
|
|
|
func partData(index: Int) -> Signal<Data?, NoError> {
|
|
let segmentDuration: Int64 = 1000
|
|
let timestamp = Int64(index) * segmentDuration
|
|
|
|
//print("Player: request part(q: \(quality)) \(index) -> \(timestamp)")
|
|
|
|
guard let broadcastPartsSource = self.broadcastPartsSource else {
|
|
return .single(nil)
|
|
}
|
|
|
|
return Signal { subscriber in
|
|
return broadcastPartsSource.requestPart(
|
|
timestampMilliseconds: timestamp,
|
|
durationMilliseconds: segmentDuration,
|
|
subject: .video(channelId: 1, quality: .full),
|
|
completion: { part in
|
|
var data = part.oggData
|
|
if data.count > 32 {
|
|
data = data.subdata(in: 32 ..< data.count)
|
|
}
|
|
subscriber.putNext(data)
|
|
},
|
|
rejoinNeeded: {
|
|
//TODO
|
|
}
|
|
)
|
|
}
|
|
}
|
|
}
|
|
|
|
private let queue = Queue()
|
|
let internalId: CallSessionInternalId
|
|
private let impl: QueueLocalObject<Impl>
|
|
private var hlsServerDisposable: Disposable?
|
|
|
|
public init(id: CallSessionInternalId, rejoinNeeded: @escaping () -> Void) {
|
|
self.internalId = id
|
|
let queue = self.queue
|
|
self.impl = QueueLocalObject(queue: queue, generate: {
|
|
return Impl(queue: queue, rejoinNeeded: rejoinNeeded)
|
|
})
|
|
}
|
|
|
|
deinit {
|
|
}
|
|
|
|
public func setAudioStreamData(audioStreamData: OngoingGroupCallContext.AudioStreamData?) {
|
|
self.impl.with { impl in
|
|
impl.setAudioStreamData(audioStreamData: audioStreamData)
|
|
}
|
|
}
|
|
|
|
public func playlistData() -> Signal<Playlist, NoError> {
|
|
return self.impl.signalWith { impl, subscriber in
|
|
impl.playlistData.get().start(next: subscriber.putNext)
|
|
}
|
|
}
|
|
|
|
public func partData(index: Int) -> Signal<Data?, NoError> {
|
|
return self.impl.signalWith { impl, subscriber in
|
|
impl.partData(index: index).start(next: subscriber.putNext)
|
|
}
|
|
}
|
|
}
|
|
|
|
public protocol SharedHLSServerSource: AnyObject {
|
|
var id: String { get }
|
|
|
|
func masterPlaylistData() -> Signal<String, NoError>
|
|
func playlistData(quality: Int) -> Signal<String, NoError>
|
|
func partData(index: Int, quality: Int) -> Signal<Data?, NoError>
|
|
func fileData(id: Int64, range: Range<Int>) -> Signal<(TempBoxFile, Range<Int>, Int)?, NoError>
|
|
func arbitraryFileData(path: String) -> Signal<(data: Data, contentType: String)?, NoError>
|
|
}
|
|
|
|
@available(iOS 12.0, macOS 14.0, *)
|
|
public final class SharedHLSServer {
|
|
public typealias Source = SharedHLSServerSource
|
|
|
|
public static let shared: SharedHLSServer = {
|
|
return SharedHLSServer()
|
|
}()
|
|
|
|
private enum ResponseError {
|
|
case badRequest
|
|
case notFound
|
|
case internalServerError
|
|
|
|
var httpString: String {
|
|
switch self {
|
|
case .badRequest:
|
|
return "400 Bad Request"
|
|
case .notFound:
|
|
return "404 Not Found"
|
|
case .internalServerError:
|
|
return "500 Internal Server Error"
|
|
}
|
|
}
|
|
}
|
|
|
|
private final class SourceReference {
|
|
weak var source: SharedHLSServerSource?
|
|
|
|
init(source: SharedHLSServerSource) {
|
|
self.source = source
|
|
}
|
|
}
|
|
@available(iOS 12.0, macOS 14.0, *)
|
|
private final class Impl {
|
|
private let queue: Queue
|
|
|
|
private let port: NWEndpoint.Port
|
|
private var listener: NWListener?
|
|
|
|
private var sourceReferences = Bag<SourceReference>()
|
|
private var referenceCheckTimer: SwiftSignalKit.Timer?
|
|
private var shutdownTimer: SwiftSignalKit.Timer?
|
|
|
|
init(queue: Queue, port: UInt16) {
|
|
self.queue = queue
|
|
self.port = NWEndpoint.Port(rawValue: port)!
|
|
}
|
|
|
|
deinit {
|
|
self.referenceCheckTimer?.invalidate()
|
|
self.shutdownTimer?.invalidate()
|
|
}
|
|
|
|
private func updateNeedsListener() {
|
|
var isEmpty = true
|
|
for item in self.sourceReferences.copyItems() {
|
|
if let _ = item.source {
|
|
isEmpty = false
|
|
break
|
|
}
|
|
}
|
|
|
|
if isEmpty {
|
|
if self.listener != nil {
|
|
if self.shutdownTimer == nil {
|
|
self.shutdownTimer = SwiftSignalKit.Timer(timeout: 1.0, repeat: false, completion: { [weak self] in
|
|
guard let self else {
|
|
return
|
|
}
|
|
self.shutdownTimer = nil
|
|
self.stopListener()
|
|
}, queue: self.queue)
|
|
self.shutdownTimer?.start()
|
|
}
|
|
}
|
|
if let referenceCheckTimer = self.referenceCheckTimer {
|
|
self.referenceCheckTimer = nil
|
|
referenceCheckTimer.invalidate()
|
|
}
|
|
} else {
|
|
if let shutdownTimer = self.shutdownTimer {
|
|
self.shutdownTimer = nil
|
|
shutdownTimer.invalidate()
|
|
}
|
|
if self.listener == nil {
|
|
self.startListener()
|
|
}
|
|
if self.referenceCheckTimer == nil {
|
|
self.referenceCheckTimer = SwiftSignalKit.Timer(timeout: 1.0, repeat: true, completion: { [weak self] in
|
|
guard let self else {
|
|
return
|
|
}
|
|
self.updateNeedsListener()
|
|
}, queue: self.queue)
|
|
self.referenceCheckTimer?.start()
|
|
}
|
|
}
|
|
}
|
|
|
|
private func startListener() {
|
|
let listener: NWListener
|
|
do {
|
|
listener = try NWListener(using: .tcp, on: self.port)
|
|
} catch {
|
|
Logger.shared.log("SharedHLSServer", "Failed to create listener: \(error)")
|
|
return
|
|
}
|
|
self.listener = listener
|
|
|
|
listener.newConnectionHandler = { [weak self] connection in
|
|
guard let self else {
|
|
return
|
|
}
|
|
self.handleConnection(connection: connection)
|
|
}
|
|
|
|
listener.stateUpdateHandler = { [weak self, weak listener] state in
|
|
guard let self, let listener else {
|
|
return
|
|
}
|
|
switch state {
|
|
case .ready:
|
|
Logger.shared.log("SharedHLSServer", "Server is ready on port \(self.port)")
|
|
case let .failed(error):
|
|
Logger.shared.log("SharedHLSServer", "Server failed with error: \(error)")
|
|
listener.cancel()
|
|
|
|
listener.start(queue: self.queue.queue)
|
|
default:
|
|
break
|
|
}
|
|
}
|
|
|
|
listener.start(queue: self.queue.queue)
|
|
}
|
|
|
|
private func stopListener() {
|
|
guard let listener = self.listener else {
|
|
return
|
|
}
|
|
self.listener = nil
|
|
listener.cancel()
|
|
}
|
|
|
|
private func handleConnection(connection: NWConnection) {
|
|
connection.start(queue: self.queue.queue)
|
|
connection.receive(minimumIncompleteLength: 1, maximumLength: 32 * 1024, completion: { [weak self] data, _, isComplete, error in
|
|
guard let self else {
|
|
return
|
|
}
|
|
if let data, !data.isEmpty {
|
|
self.handleRequest(data: data, connection: connection)
|
|
} else if isComplete {
|
|
connection.cancel()
|
|
} else if let error = error {
|
|
Logger.shared.log("SharedHLSServer", "Error on connection: \(error)")
|
|
connection.cancel()
|
|
}
|
|
})
|
|
}
|
|
|
|
private func handleRequest(data: Data, connection: NWConnection) {
|
|
guard let requestString = String(data: data, encoding: .utf8) else {
|
|
connection.cancel()
|
|
return
|
|
}
|
|
|
|
if !requestString.hasPrefix("GET /") {
|
|
self.sendErrorAndClose(connection: connection)
|
|
return
|
|
}
|
|
guard let firstCrLf = requestString.range(of: "\r\n") else {
|
|
self.sendErrorAndClose(connection: connection)
|
|
return
|
|
}
|
|
let firstLine = String(requestString[requestString.index(requestString.startIndex, offsetBy: "GET /".count) ..< firstCrLf.lowerBound])
|
|
if !(firstLine.hasSuffix(" HTTP/1.0") || firstLine.hasSuffix(" HTTP/1.1")) {
|
|
self.sendErrorAndClose(connection: connection)
|
|
return
|
|
}
|
|
|
|
let requestPath = String(firstLine[firstLine.startIndex ..< firstLine.index(firstLine.endIndex, offsetBy: -" HTTP/1.1".count)])
|
|
var requestRange: Range<Int>?
|
|
if let rangeRange = requestString.range(of: "Range: bytes=") {
|
|
if let endRange = requestString.range(of: "\r\n", range: rangeRange.upperBound ..< requestString.endIndex) {
|
|
let rangeString = String(requestString[rangeRange.upperBound ..< endRange.lowerBound])
|
|
if let dashRange = rangeString.range(of: "-") {
|
|
let lowerBoundString = String(rangeString[rangeString.startIndex ..< dashRange.lowerBound])
|
|
let upperBoundString = String(rangeString[dashRange.upperBound ..< rangeString.endIndex])
|
|
|
|
if let lowerBound = Int(lowerBoundString), let upperBound = Int(upperBoundString) {
|
|
requestRange = lowerBound ..< upperBound
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
guard let firstSlash = requestPath.range(of: "/") else {
|
|
self.sendErrorAndClose(connection: connection, error: .notFound)
|
|
return
|
|
}
|
|
let streamId = String(requestPath[requestPath.startIndex ..< firstSlash.lowerBound])
|
|
guard let source = self.sourceReferences.copyItems().first(where: { $0.source?.id == streamId })?.source else {
|
|
self.sendErrorAndClose(connection: connection)
|
|
return
|
|
}
|
|
|
|
let filePath = String(requestPath[firstSlash.upperBound...])
|
|
if filePath == "master.m3u8" {
|
|
let _ = (source.masterPlaylistData()
|
|
|> deliverOn(self.queue)
|
|
|> take(1)).start(next: { [weak self] result in
|
|
guard let self else {
|
|
return
|
|
}
|
|
|
|
self.sendResponseAndClose(connection: connection, data: result.data(using: .utf8)!)
|
|
})
|
|
} else if filePath.hasPrefix("hls_level_") && filePath.hasSuffix(".m3u8") {
|
|
guard let levelIndex = Int(String(filePath[filePath.index(filePath.startIndex, offsetBy: "hls_level_".count) ..< filePath.index(filePath.endIndex, offsetBy: -".m3u8".count)])) else {
|
|
self.sendErrorAndClose(connection: connection)
|
|
return
|
|
}
|
|
|
|
let _ = (source.playlistData(quality: levelIndex)
|
|
|> deliverOn(self.queue)
|
|
|> take(1)).start(next: { [weak self] result in
|
|
guard let self else {
|
|
return
|
|
}
|
|
|
|
self.sendResponseAndClose(connection: connection, data: result.data(using: .utf8)!)
|
|
})
|
|
} else if filePath.hasPrefix("hls_stream") && filePath.hasSuffix(".ts") {
|
|
let fileId = String(filePath[filePath.index(filePath.startIndex, offsetBy: "hls_stream".count) ..< filePath.index(filePath.endIndex, offsetBy: -".ts".count)])
|
|
guard let underscoreRange = fileId.range(of: "_") else {
|
|
self.sendErrorAndClose(connection: connection)
|
|
return
|
|
}
|
|
guard let levelIndex = Int(String(fileId[fileId.startIndex ..< underscoreRange.lowerBound])) else {
|
|
self.sendErrorAndClose(connection: connection)
|
|
return
|
|
}
|
|
guard let partIndex = Int(String(fileId[underscoreRange.upperBound...])) else {
|
|
self.sendErrorAndClose(connection: connection)
|
|
return
|
|
}
|
|
let _ = (source.partData(index: partIndex, quality: levelIndex)
|
|
|> deliverOn(self.queue)
|
|
|> take(1)).start(next: { [weak self] result in
|
|
guard let self else {
|
|
return
|
|
}
|
|
|
|
if let result {
|
|
let sourceTempFile = TempBox.shared.tempFile(fileName: "part.mp4")
|
|
let tempFile = TempBox.shared.tempFile(fileName: "part.ts")
|
|
defer {
|
|
TempBox.shared.dispose(sourceTempFile)
|
|
TempBox.shared.dispose(tempFile)
|
|
}
|
|
|
|
guard let _ = try? result.write(to: URL(fileURLWithPath: sourceTempFile.path)) else {
|
|
self.sendErrorAndClose(connection: connection, error: .internalServerError)
|
|
return
|
|
}
|
|
|
|
let sourcePath = sourceTempFile.path
|
|
FFMpegLiveMuxer.remux(sourcePath, to: tempFile.path, offsetSeconds: Double(partIndex))
|
|
|
|
if let data = try? Data(contentsOf: URL(fileURLWithPath: tempFile.path)) {
|
|
self.sendResponseAndClose(connection: connection, data: data)
|
|
} else {
|
|
self.sendErrorAndClose(connection: connection, error: .internalServerError)
|
|
}
|
|
} else {
|
|
self.sendErrorAndClose(connection: connection, error: .notFound)
|
|
}
|
|
})
|
|
} else if filePath.hasPrefix("partfile") && filePath.hasSuffix(".mp4") {
|
|
let fileId = String(filePath[filePath.index(filePath.startIndex, offsetBy: "partfile".count) ..< filePath.index(filePath.endIndex, offsetBy: -".mp4".count)])
|
|
guard let fileIdValue = Int64(fileId) else {
|
|
self.sendErrorAndClose(connection: connection)
|
|
return
|
|
}
|
|
guard let requestRange else {
|
|
self.sendErrorAndClose(connection: connection)
|
|
return
|
|
}
|
|
let _ = (source.fileData(id: fileIdValue, range: requestRange.lowerBound ..< requestRange.upperBound + 1)
|
|
|> deliverOn(self.queue)
|
|
//|> timeout(5.0, queue: self.queue, alternate: .single(nil))
|
|
|> take(1)).start(next: { [weak self] result in
|
|
guard let self else {
|
|
return
|
|
}
|
|
|
|
if let (tempFile, tempFileRange, totalSize) = result {
|
|
self.sendResponseFileAndClose(connection: connection, file: tempFile, fileRange: tempFileRange, range: requestRange, totalSize: totalSize)
|
|
} else {
|
|
self.sendErrorAndClose(connection: connection, error: .internalServerError)
|
|
}
|
|
})
|
|
} else {
|
|
let _ = (source.arbitraryFileData(path: filePath)
|
|
|> deliverOn(self.queue)
|
|
|> take(1)).start(next: { [weak self] result in
|
|
guard let self else {
|
|
return
|
|
}
|
|
|
|
if let result {
|
|
self.sendResponseAndClose(connection: connection, data: result.data, contentType: result.contentType)
|
|
} else {
|
|
self.sendErrorAndClose(connection: connection, error: .notFound)
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
private func sendErrorAndClose(connection: NWConnection, error: ResponseError = .badRequest) {
|
|
let errorResponse = "HTTP/1.1 \(error.httpString)\r\nContent-Type: text/html\r\nConnection: close\r\n\r\n"
|
|
connection.send(content: errorResponse.data(using: .utf8), completion: .contentProcessed { error in
|
|
if let error {
|
|
Logger.shared.log("SharedHLSServer", "Failed to send response: \(error)")
|
|
}
|
|
connection.cancel()
|
|
})
|
|
}
|
|
|
|
private func sendResponseAndClose(connection: NWConnection, data: Data, contentType: String = "application/octet-stream", range: Range<Int>? = nil, totalSize: Int? = nil) {
|
|
var responseHeaders = "HTTP/1.1 200 OK\r\n"
|
|
responseHeaders.append("Content-Length: \(data.count)\r\n")
|
|
if let range, let totalSize {
|
|
responseHeaders.append("Content-Range: bytes \(range.lowerBound)-\(range.upperBound)/\(totalSize)\r\n")
|
|
}
|
|
|
|
responseHeaders.append("Content-Type: \(contentType)\r\n")
|
|
responseHeaders.append("Connection: close\r\n")
|
|
responseHeaders.append("Access-Control-Allow-Origin: *\r\n")
|
|
responseHeaders.append("\r\n")
|
|
var responseData = Data()
|
|
responseData.append(responseHeaders.data(using: .utf8)!)
|
|
responseData.append(data)
|
|
connection.send(content: responseData, completion: .contentProcessed { error in
|
|
if let error {
|
|
Logger.shared.log("SharedHLSServer", "Failed to send response: \(error)")
|
|
}
|
|
connection.cancel()
|
|
})
|
|
}
|
|
|
|
private static func sendRemainingFileRange(queue: Queue, connection: NWConnection, tempFile: TempBoxFile, managedFile: ManagedFile, remainingRange: Range<Int>, fileSize: Int) -> Void {
|
|
let blockSize = 256 * 1024
|
|
|
|
let clippedLowerBound = min(remainingRange.lowerBound, fileSize)
|
|
var clippedUpperBound = min(remainingRange.upperBound, fileSize)
|
|
clippedUpperBound = min(clippedUpperBound, clippedLowerBound + blockSize)
|
|
|
|
if clippedUpperBound == clippedLowerBound {
|
|
TempBox.shared.dispose(tempFile)
|
|
connection.cancel()
|
|
} else {
|
|
let _ = managedFile.seek(position: Int64(clippedLowerBound))
|
|
let data = managedFile.readData(count: Int(clippedUpperBound - clippedLowerBound))
|
|
let nextRange = clippedUpperBound ..< remainingRange.upperBound
|
|
|
|
connection.send(content: data, completion: .contentProcessed { error in
|
|
queue.async {
|
|
if let error {
|
|
Logger.shared.log("SharedHLSServer", "Failed to send response: \(error)")
|
|
connection.cancel()
|
|
TempBox.shared.dispose(tempFile)
|
|
} else {
|
|
sendRemainingFileRange(queue: queue, connection: connection, tempFile: tempFile, managedFile: managedFile, remainingRange: nextRange, fileSize: fileSize)
|
|
}
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
private func sendResponseFileAndClose(connection: NWConnection, file: TempBoxFile, fileRange: Range<Int>, range: Range<Int>, totalSize: Int) {
|
|
let queue = self.queue
|
|
|
|
guard let managedFile = ManagedFile(queue: nil, path: file.path, mode: .read), let fileSize = managedFile.getSize() else {
|
|
self.sendErrorAndClose(connection: connection, error: .internalServerError)
|
|
TempBox.shared.dispose(file)
|
|
return
|
|
}
|
|
|
|
var responseHeaders = "HTTP/1.1 200 OK\r\n"
|
|
responseHeaders.append("Content-Length: \(fileRange.upperBound - fileRange.lowerBound)\r\n")
|
|
responseHeaders.append("Content-Range: bytes \(range.lowerBound)-\(range.upperBound)/\(totalSize)\r\n")
|
|
responseHeaders.append("Content-Type: application/octet-stream\r\n")
|
|
responseHeaders.append("Connection: close\r\n")
|
|
responseHeaders.append("Access-Control-Allow-Origin: *\r\n")
|
|
responseHeaders.append("\r\n")
|
|
|
|
connection.send(content: responseHeaders.data(using: .utf8)!, completion: .contentProcessed({ _ in }))
|
|
|
|
Impl.sendRemainingFileRange(queue: queue, connection: connection, tempFile: file, managedFile: managedFile, remainingRange: fileRange, fileSize: Int(fileSize))
|
|
}
|
|
|
|
func registerPlayer(source: SharedHLSServerSource, completion: @escaping () -> Void) -> Disposable {
|
|
let queue = self.queue
|
|
let index = self.sourceReferences.add(SourceReference(source: source))
|
|
self.updateNeedsListener()
|
|
completion()
|
|
|
|
return ActionDisposable { [weak self] in
|
|
queue.async {
|
|
guard let self else {
|
|
return
|
|
}
|
|
self.sourceReferences.remove(index)
|
|
self.updateNeedsListener()
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
private static let queue = Queue(name: "SharedHLSServer")
|
|
public let port: UInt16 = 8016
|
|
private let impl: QueueLocalObject<Impl>
|
|
|
|
private init() {
|
|
let queue = SharedHLSServer.queue
|
|
let port = self.port
|
|
self.impl = QueueLocalObject(queue: queue, generate: {
|
|
return Impl(queue: queue, port: port)
|
|
})
|
|
}
|
|
|
|
public func registerPlayer(source: SharedHLSServerSource, completion: @escaping () -> Void) -> Disposable {
|
|
let disposable = MetaDisposable()
|
|
|
|
self.impl.with { impl in
|
|
disposable.set(impl.registerPlayer(source: source, completion: completion))
|
|
}
|
|
|
|
return disposable
|
|
}
|
|
}
|