mirror of
https://github.com/Swiftgram/Telegram-iOS.git
synced 2025-08-01 16:06:59 +00:00
Live stream updates
This commit is contained in:
commit
648c9807bf
@ -10,4 +10,5 @@
|
||||
#import <FFMpegBinding/FFMpegPacket.h>
|
||||
#import <FFMpegBinding/FFMpegAVCodec.h>
|
||||
#import <FFMpegBinding/FFMpegRemuxer.h>
|
||||
#import <FFMpegBinding/FFMpegLiveMuxer.h>
|
||||
#import <FFMpegBinding/FrameConverter.h>
|
||||
|
@ -0,0 +1,11 @@
|
||||
#import <Foundation/Foundation.h>
|
||||
|
||||
NS_ASSUME_NONNULL_BEGIN
|
||||
|
||||
@interface FFMpegLiveMuxer : NSObject
|
||||
|
||||
+ (bool)remux:(NSString * _Nonnull)path to:(NSString * _Nonnull)outPath offsetSeconds:(double)offsetSeconds;
|
||||
|
||||
@end
|
||||
|
||||
NS_ASSUME_NONNULL_END
|
334
submodules/FFMpegBinding/Sources/FFMpegLiveMuxer.m
Normal file
334
submodules/FFMpegBinding/Sources/FFMpegLiveMuxer.m
Normal file
@ -0,0 +1,334 @@
|
||||
#import <FFMpegBinding/FFMpegLiveMuxer.h>
|
||||
#import <FFMpegBinding/FFMpegAVIOContext.h>
|
||||
|
||||
#include "libavutil/timestamp.h"
|
||||
#include "libavformat/avformat.h"
|
||||
#include "libavcodec/avcodec.h"
|
||||
#include "libswresample/swresample.h"
|
||||
|
||||
#define MOV_TIMESCALE 1000
|
||||
|
||||
@implementation FFMpegLiveMuxer
|
||||
|
||||
+ (bool)remux:(NSString * _Nonnull)path to:(NSString * _Nonnull)outPath offsetSeconds:(double)offsetSeconds {
|
||||
AVFormatContext *input_format_context = NULL, *output_format_context = NULL;
|
||||
AVPacket packet;
|
||||
const char *in_filename, *out_filename;
|
||||
int ret, i;
|
||||
int stream_index = 0;
|
||||
int *streams_list = NULL;
|
||||
int number_of_streams = 0;
|
||||
|
||||
struct SwrContext *swr_ctx = NULL;
|
||||
|
||||
in_filename = [path UTF8String];
|
||||
out_filename = [outPath UTF8String];
|
||||
|
||||
if ((ret = avformat_open_input(&input_format_context, in_filename, av_find_input_format("mp4"), NULL)) < 0) {
|
||||
fprintf(stderr, "Could not open input file '%s'\n", in_filename);
|
||||
goto end;
|
||||
}
|
||||
if ((ret = avformat_find_stream_info(input_format_context, NULL)) < 0) {
|
||||
fprintf(stderr, "Failed to retrieve input stream information\n");
|
||||
goto end;
|
||||
}
|
||||
|
||||
avformat_alloc_output_context2(&output_format_context, NULL, "mpegts", out_filename);
|
||||
|
||||
if (!output_format_context) {
|
||||
fprintf(stderr, "Could not create output context\n");
|
||||
ret = AVERROR_UNKNOWN;
|
||||
goto end;
|
||||
}
|
||||
|
||||
const AVCodec *aac_codec = avcodec_find_encoder(AV_CODEC_ID_AAC);
|
||||
if (!aac_codec) {
|
||||
fprintf(stderr, "Could not find AAC encoder\n");
|
||||
ret = AVERROR_UNKNOWN;
|
||||
goto end;
|
||||
}
|
||||
|
||||
AVCodecContext *aac_codec_context = avcodec_alloc_context3(aac_codec);
|
||||
if (!aac_codec_context) {
|
||||
fprintf(stderr, "Could not allocate AAC codec context\n");
|
||||
ret = AVERROR_UNKNOWN;
|
||||
goto end;
|
||||
}
|
||||
|
||||
const AVCodec *opus_decoder = avcodec_find_decoder(AV_CODEC_ID_OPUS);
|
||||
if (!opus_decoder) {
|
||||
fprintf(stderr, "Could not find Opus decoder\n");
|
||||
ret = AVERROR_UNKNOWN;
|
||||
goto end;
|
||||
}
|
||||
|
||||
AVCodecContext *opus_decoder_context = avcodec_alloc_context3(opus_decoder);
|
||||
if (!opus_decoder_context) {
|
||||
fprintf(stderr, "Could not allocate Opus decoder context\n");
|
||||
ret = AVERROR_UNKNOWN;
|
||||
goto end;
|
||||
}
|
||||
|
||||
number_of_streams = input_format_context->nb_streams;
|
||||
streams_list = av_malloc_array(number_of_streams, sizeof(*streams_list));
|
||||
|
||||
if (!streams_list) {
|
||||
ret = AVERROR(ENOMEM);
|
||||
goto end;
|
||||
}
|
||||
|
||||
for (i = 0; i < input_format_context->nb_streams; i++) {
|
||||
AVStream *out_stream;
|
||||
AVStream *in_stream = input_format_context->streams[i];
|
||||
AVCodecParameters *in_codecpar = in_stream->codecpar;
|
||||
|
||||
if (in_codecpar->codec_type != AVMEDIA_TYPE_AUDIO && in_codecpar->codec_type != AVMEDIA_TYPE_VIDEO) {
|
||||
streams_list[i] = -1;
|
||||
continue;
|
||||
}
|
||||
|
||||
streams_list[i] = stream_index++;
|
||||
|
||||
if (in_codecpar->codec_type == AVMEDIA_TYPE_VIDEO) {
|
||||
out_stream = avformat_new_stream(output_format_context, NULL);
|
||||
if (!out_stream) {
|
||||
fprintf(stderr, "Failed allocating output stream\n");
|
||||
ret = AVERROR_UNKNOWN;
|
||||
goto end;
|
||||
}
|
||||
ret = avcodec_parameters_copy(out_stream->codecpar, in_codecpar);
|
||||
if (ret < 0) {
|
||||
fprintf(stderr, "Failed to copy codec parameters\n");
|
||||
goto end;
|
||||
}
|
||||
out_stream->time_base = in_stream->time_base;
|
||||
out_stream->duration = in_stream->duration;
|
||||
} else if (in_codecpar->codec_type == AVMEDIA_TYPE_AUDIO) {
|
||||
out_stream = avformat_new_stream(output_format_context, aac_codec);
|
||||
if (!out_stream) {
|
||||
fprintf(stderr, "Failed allocating output stream\n");
|
||||
ret = AVERROR_UNKNOWN;
|
||||
goto end;
|
||||
}
|
||||
|
||||
// Set the codec parameters for the AAC encoder
|
||||
aac_codec_context->sample_rate = in_codecpar->sample_rate;
|
||||
aac_codec_context->channel_layout = in_codecpar->channel_layout ? in_codecpar->channel_layout : AV_CH_LAYOUT_STEREO;
|
||||
aac_codec_context->channels = av_get_channel_layout_nb_channels(aac_codec_context->channel_layout);
|
||||
aac_codec_context->sample_fmt = aac_codec->sample_fmts ? aac_codec->sample_fmts[0] : AV_SAMPLE_FMT_FLTP; // Use the first supported sample format
|
||||
aac_codec_context->bit_rate = 128000; // Set a default bitrate, you can adjust this as needed
|
||||
//aac_codec_context->time_base = (AVRational){1, 90000};
|
||||
|
||||
ret = avcodec_open2(aac_codec_context, aac_codec, NULL);
|
||||
if (ret < 0) {
|
||||
fprintf(stderr, "Could not open AAC encoder\n");
|
||||
goto end;
|
||||
}
|
||||
|
||||
ret = avcodec_parameters_from_context(out_stream->codecpar, aac_codec_context);
|
||||
if (ret < 0) {
|
||||
fprintf(stderr, "Failed initializing audio output stream\n");
|
||||
goto end;
|
||||
}
|
||||
|
||||
out_stream->time_base = (AVRational){1, 90000};
|
||||
out_stream->duration = av_rescale_q(in_stream->duration, in_stream->time_base, out_stream->time_base);
|
||||
|
||||
// Set up the Opus decoder context
|
||||
ret = avcodec_parameters_to_context(opus_decoder_context, in_codecpar);
|
||||
if (ret < 0) {
|
||||
fprintf(stderr, "Could not copy codec parameters to decoder context\n");
|
||||
goto end;
|
||||
}
|
||||
if (opus_decoder_context->channel_layout == 0) {
|
||||
opus_decoder_context->channel_layout = av_get_default_channel_layout(opus_decoder_context->channels);
|
||||
}
|
||||
ret = avcodec_open2(opus_decoder_context, opus_decoder, NULL);
|
||||
if (ret < 0) {
|
||||
fprintf(stderr, "Could not open Opus decoder\n");
|
||||
goto end;
|
||||
}
|
||||
|
||||
// Reset the channel layout if it was unset before opening the codec
|
||||
if (opus_decoder_context->channel_layout == 0) {
|
||||
opus_decoder_context->channel_layout = av_get_default_channel_layout(opus_decoder_context->channels);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Set up the resampling context
|
||||
swr_ctx = swr_alloc_set_opts(NULL,
|
||||
aac_codec_context->channel_layout, aac_codec_context->sample_fmt, aac_codec_context->sample_rate,
|
||||
opus_decoder_context->channel_layout, opus_decoder_context->sample_fmt, opus_decoder_context->sample_rate,
|
||||
0, NULL);
|
||||
if (!swr_ctx) {
|
||||
fprintf(stderr, "Could not allocate resampler context\n");
|
||||
ret = AVERROR(ENOMEM);
|
||||
goto end;
|
||||
}
|
||||
|
||||
if ((ret = swr_init(swr_ctx)) < 0) {
|
||||
fprintf(stderr, "Failed to initialize the resampling context\n");
|
||||
goto end;
|
||||
}
|
||||
|
||||
if (!(output_format_context->oformat->flags & AVFMT_NOFILE)) {
|
||||
ret = avio_open(&output_format_context->pb, out_filename, AVIO_FLAG_WRITE);
|
||||
if (ret < 0) {
|
||||
fprintf(stderr, "Could not open output file '%s'\n", out_filename);
|
||||
goto end;
|
||||
}
|
||||
}
|
||||
|
||||
AVDictionary* opts = NULL;
|
||||
ret = avformat_write_header(output_format_context, &opts);
|
||||
if (ret < 0) {
|
||||
fprintf(stderr, "Error occurred when opening output file\n");
|
||||
goto end;
|
||||
}
|
||||
|
||||
while (1) {
|
||||
AVStream *in_stream, *out_stream;
|
||||
ret = av_read_frame(input_format_context, &packet);
|
||||
if (ret < 0)
|
||||
break;
|
||||
|
||||
in_stream = input_format_context->streams[packet.stream_index];
|
||||
if (packet.stream_index >= number_of_streams || streams_list[packet.stream_index] < 0) {
|
||||
av_packet_unref(&packet);
|
||||
continue;
|
||||
}
|
||||
|
||||
packet.stream_index = streams_list[packet.stream_index];
|
||||
out_stream = output_format_context->streams[packet.stream_index];
|
||||
|
||||
if (in_stream->codecpar->codec_type == AVMEDIA_TYPE_AUDIO) {
|
||||
ret = avcodec_send_packet(opus_decoder_context, &packet);
|
||||
if (ret < 0) {
|
||||
fprintf(stderr, "Error sending packet to decoder\n");
|
||||
av_packet_unref(&packet);
|
||||
continue;
|
||||
}
|
||||
|
||||
AVFrame *frame = av_frame_alloc();
|
||||
ret = avcodec_receive_frame(opus_decoder_context, frame);
|
||||
if (ret < 0 && ret != AVERROR(EAGAIN) && ret != AVERROR_EOF) {
|
||||
fprintf(stderr, "Error receiving frame from decoder\n");
|
||||
av_frame_free(&frame);
|
||||
av_packet_unref(&packet);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (ret >= 0) {
|
||||
frame->pts = frame->best_effort_timestamp;
|
||||
|
||||
AVFrame *resampled_frame = av_frame_alloc();
|
||||
resampled_frame->channel_layout = aac_codec_context->channel_layout;
|
||||
resampled_frame->sample_rate = aac_codec_context->sample_rate;
|
||||
resampled_frame->format = aac_codec_context->sample_fmt;
|
||||
resampled_frame->nb_samples = aac_codec_context->frame_size;
|
||||
|
||||
if ((ret = av_frame_get_buffer(resampled_frame, 0)) < 0) {
|
||||
fprintf(stderr, "Could not allocate resampled frame buffer\n");
|
||||
av_frame_free(&resampled_frame);
|
||||
av_frame_free(&frame);
|
||||
av_packet_unref(&packet);
|
||||
continue;
|
||||
}
|
||||
|
||||
memset(resampled_frame->data[0], 0, resampled_frame->nb_samples * 2 * 2);
|
||||
//arc4random_buf(resampled_frame->data[0], resampled_frame->nb_samples * 2 * 2);
|
||||
//memset(frame->data[0], 0, frame->nb_samples * 2 * 2);
|
||||
|
||||
if ((ret = swr_convert(swr_ctx,
|
||||
resampled_frame->data, resampled_frame->nb_samples,
|
||||
(const uint8_t **)frame->data, frame->nb_samples)) < 0) {
|
||||
fprintf(stderr, "Error while converting\n");
|
||||
av_frame_free(&resampled_frame);
|
||||
av_frame_free(&frame);
|
||||
av_packet_unref(&packet);
|
||||
continue;
|
||||
}
|
||||
|
||||
resampled_frame->pts = av_rescale_q(frame->pts, opus_decoder_context->time_base, aac_codec_context->time_base);
|
||||
|
||||
ret = avcodec_send_frame(aac_codec_context, resampled_frame);
|
||||
if (ret < 0) {
|
||||
fprintf(stderr, "Error sending frame to encoder\n");
|
||||
av_frame_free(&resampled_frame);
|
||||
av_frame_free(&frame);
|
||||
av_packet_unref(&packet);
|
||||
continue;
|
||||
}
|
||||
|
||||
AVPacket out_packet;
|
||||
av_init_packet(&out_packet);
|
||||
out_packet.data = NULL;
|
||||
out_packet.size = 0;
|
||||
|
||||
ret = avcodec_receive_packet(aac_codec_context, &out_packet);
|
||||
if (ret >= 0) {
|
||||
out_packet.pts = av_rescale_q_rnd(packet.pts, in_stream->time_base, out_stream->time_base, AV_ROUND_NEAR_INF | AV_ROUND_PASS_MINMAX);
|
||||
out_packet.dts = av_rescale_q_rnd(packet.dts, in_stream->time_base, out_stream->time_base, AV_ROUND_NEAR_INF | AV_ROUND_PASS_MINMAX);
|
||||
out_packet.pts += (int64_t)(offsetSeconds * out_stream->time_base.den);
|
||||
out_packet.dts += (int64_t)(offsetSeconds * out_stream->time_base.den);
|
||||
out_packet.duration = av_rescale_q(out_packet.duration, aac_codec_context->time_base, out_stream->time_base);
|
||||
out_packet.stream_index = packet.stream_index;
|
||||
|
||||
ret = av_interleaved_write_frame(output_format_context, &out_packet);
|
||||
if (ret < 0) {
|
||||
fprintf(stderr, "Error muxing packet\n");
|
||||
av_packet_unref(&out_packet);
|
||||
av_frame_free(&resampled_frame);
|
||||
av_frame_free(&frame);
|
||||
av_packet_unref(&packet);
|
||||
break;
|
||||
}
|
||||
av_packet_unref(&out_packet);
|
||||
}
|
||||
av_frame_free(&resampled_frame);
|
||||
av_frame_free(&frame);
|
||||
}
|
||||
} else {
|
||||
packet.pts = av_rescale_q_rnd(packet.pts, in_stream->time_base, out_stream->time_base, AV_ROUND_NEAR_INF | AV_ROUND_PASS_MINMAX);
|
||||
packet.dts = av_rescale_q_rnd(packet.dts, in_stream->time_base, out_stream->time_base, AV_ROUND_NEAR_INF | AV_ROUND_PASS_MINMAX);
|
||||
packet.pts += (int64_t)(offsetSeconds * out_stream->time_base.den);
|
||||
packet.dts += (int64_t)(offsetSeconds * out_stream->time_base.den);
|
||||
packet.duration = av_rescale_q(packet.duration, in_stream->time_base, out_stream->time_base);
|
||||
packet.pos = -1;
|
||||
|
||||
ret = av_interleaved_write_frame(output_format_context, &packet);
|
||||
if (ret < 0) {
|
||||
fprintf(stderr, "Error muxing packet\n");
|
||||
av_packet_unref(&packet);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
av_packet_unref(&packet);
|
||||
}
|
||||
|
||||
av_write_trailer(output_format_context);
|
||||
|
||||
end:
|
||||
avformat_close_input(&input_format_context);
|
||||
if (output_format_context && !(output_format_context->oformat->flags & AVFMT_NOFILE)) {
|
||||
avio_closep(&output_format_context->pb);
|
||||
}
|
||||
avformat_free_context(output_format_context);
|
||||
avcodec_free_context(&aac_codec_context);
|
||||
avcodec_free_context(&opus_decoder_context);
|
||||
av_freep(&streams_list);
|
||||
if (swr_ctx) {
|
||||
swr_free(&swr_ctx);
|
||||
}
|
||||
if (ret < 0 && ret != AVERROR_EOF) {
|
||||
fprintf(stderr, "Error occurred: %s\n", av_err2str(ret));
|
||||
return false;
|
||||
}
|
||||
|
||||
printf("Remuxed video into %s\n", outPath.UTF8String);
|
||||
return true;
|
||||
}
|
||||
|
||||
@end
|
@ -6,11 +6,11 @@ import AVKit
|
||||
import MultilineTextComponent
|
||||
import Display
|
||||
import ShimmerEffect
|
||||
|
||||
import TelegramCore
|
||||
import SwiftSignalKit
|
||||
import AvatarNode
|
||||
import Postbox
|
||||
import TelegramVoip
|
||||
|
||||
final class MediaStreamVideoComponent: Component {
|
||||
let call: PresentationGroupCallImpl
|
||||
@ -157,6 +157,8 @@ final class MediaStreamVideoComponent: Component {
|
||||
private var lastPresentation: UIView?
|
||||
private var pipTrackDisplayLink: CADisplayLink?
|
||||
|
||||
private var livePlayerView: ProxyVideoView?
|
||||
|
||||
override init(frame: CGRect) {
|
||||
self.blurTintView = UIView()
|
||||
self.blurTintView.backgroundColor = UIColor(white: 0.0, alpha: 0.55)
|
||||
@ -211,7 +213,7 @@ final class MediaStreamVideoComponent: Component {
|
||||
let needsFadeInAnimation = hadVideo
|
||||
|
||||
if loadingBlurView.superview == nil {
|
||||
addSubview(loadingBlurView)
|
||||
//addSubview(loadingBlurView)
|
||||
if needsFadeInAnimation {
|
||||
let anim = CABasicAnimation(keyPath: "opacity")
|
||||
anim.duration = 0.5
|
||||
@ -542,6 +544,21 @@ final class MediaStreamVideoComponent: Component {
|
||||
videoFrameUpdateTransition.setFrame(layer: self.videoBlurGradientMask, frame: videoBlurView.bounds)
|
||||
videoFrameUpdateTransition.setFrame(layer: self.videoBlurSolidMask, frame: self.videoBlurGradientMask.bounds)
|
||||
}
|
||||
|
||||
if self.livePlayerView == nil {
|
||||
let livePlayerView = ProxyVideoView(context: component.call.accountContext, call: component.call)
|
||||
self.livePlayerView = livePlayerView
|
||||
livePlayerView.layer.masksToBounds = true
|
||||
self.addSubview(livePlayerView)
|
||||
livePlayerView.frame = newVideoFrame
|
||||
livePlayerView.layer.cornerRadius = videoCornerRadius
|
||||
livePlayerView.update(size: newVideoFrame.size)
|
||||
}
|
||||
if let livePlayerView = self.livePlayerView {
|
||||
videoFrameUpdateTransition.setFrame(view: livePlayerView, frame: newVideoFrame, completion: nil)
|
||||
videoFrameUpdateTransition.setCornerRadius(layer: livePlayerView.layer, cornerRadius: videoCornerRadius)
|
||||
livePlayerView.update(size: newVideoFrame.size)
|
||||
}
|
||||
} else {
|
||||
videoSize = CGSize(width: 16 / 9 * 100.0, height: 100.0).aspectFitted(.init(width: availableSize.width - videoInset * 2, height: availableSize.height))
|
||||
}
|
||||
@ -601,7 +618,7 @@ final class MediaStreamVideoComponent: Component {
|
||||
}
|
||||
}
|
||||
|
||||
if self.noSignalTimeout {
|
||||
if self.noSignalTimeout, !"".isEmpty {
|
||||
var noSignalTransition = transition
|
||||
let noSignalView: ComponentHostView<Empty>
|
||||
if let current = self.noSignalView {
|
||||
@ -769,3 +786,178 @@ private final class CustomIntensityVisualEffectView: UIVisualEffectView {
|
||||
animator.stopAnimation(true)
|
||||
}
|
||||
}
|
||||
|
||||
private final class ProxyVideoView: UIView {
|
||||
private let call: PresentationGroupCallImpl
|
||||
private let id: Int64
|
||||
private let player: AVPlayer
|
||||
private let playerItem: AVPlayerItem
|
||||
private let playerLayer: AVPlayerLayer
|
||||
|
||||
private var contextDisposable: Disposable?
|
||||
|
||||
private var failureObserverId: AnyObject?
|
||||
private var errorObserverId: AnyObject?
|
||||
|
||||
private var server: AnyObject?
|
||||
|
||||
init(context: AccountContext, call: PresentationGroupCallImpl) {
|
||||
self.call = call
|
||||
|
||||
self.id = Int64.random(in: Int64.min ... Int64.max)
|
||||
|
||||
/*if #available(iOS 13.0, *) {
|
||||
do {
|
||||
let server = try HTTPServer(port: NWEndpoint.Port(integerLiteral: 8012), tcpOptions: nil, queue: .main, handler: { request, response in
|
||||
if request.url == "/master.m3u8" {
|
||||
let _ = (call.externalMediaStream.get()
|
||||
|> take(1)
|
||||
|> mapToSignal { externalMediaStream in
|
||||
return externalMediaStream.masterPlaylistData()
|
||||
}
|
||||
|> take(1)
|
||||
|> deliverOnMainQueue).start(next: { masterPlaylistData in
|
||||
response.send(masterPlaylistData.data(using: .utf8)!)
|
||||
})
|
||||
} else if request.url == "/hls_level_0.m3u8" {
|
||||
let _ = (call.externalMediaStream.get()
|
||||
|> take(1)
|
||||
|> mapToSignal { externalMediaStream in
|
||||
return externalMediaStream.playlistData(quality: 0)
|
||||
}
|
||||
|> take(1)
|
||||
|> deliverOnMainQueue).start(next: { playlistData in
|
||||
response.send(playlistData.data(using: .utf8)!)
|
||||
})
|
||||
} else if request.url == "/hls_level_1.m3u8" {
|
||||
let _ = (call.externalMediaStream.get()
|
||||
|> take(1)
|
||||
|> mapToSignal { externalMediaStream in
|
||||
return externalMediaStream.playlistData(quality: 1)
|
||||
}
|
||||
|> take(1)
|
||||
|> deliverOnMainQueue).start(next: { playlistData in
|
||||
response.send(playlistData.data(using: .utf8)!)
|
||||
})
|
||||
} else if request.url.hasPrefix("/hls_stream0_") && request.url.hasSuffix(".ts") {
|
||||
if let partIndex = Int(request.url[request.url.index(request.url.startIndex, offsetBy: "/hls_stream0_".count)..<request.url.index(request.url.endIndex, offsetBy: -".ts".count)]) {
|
||||
let _ = (call.externalMediaStream.get()
|
||||
|> take(1)
|
||||
|> mapToSignal { externalMediaStream in
|
||||
return externalMediaStream.partData(index: partIndex, quality: 0)
|
||||
}
|
||||
|> take(1)
|
||||
|> deliverOnMainQueue).start(next: { partData in
|
||||
guard let partData else {
|
||||
return
|
||||
}
|
||||
|
||||
let sourceTempFile = TempBox.shared.tempFile(fileName: "part.mp4")
|
||||
let tempFile = TempBox.shared.tempFile(fileName: "part.ts")
|
||||
defer {
|
||||
TempBox.shared.dispose(sourceTempFile)
|
||||
TempBox.shared.dispose(tempFile)
|
||||
}
|
||||
|
||||
let _ = try? partData.write(to: URL(fileURLWithPath: sourceTempFile.path))
|
||||
|
||||
let sourcePath = sourceTempFile.path
|
||||
FFMpegLiveMuxer.remux(sourcePath, to: tempFile.path, offsetSeconds: Double(partIndex))
|
||||
if let data = try? Data(contentsOf: URL(fileURLWithPath: tempFile.path)) {
|
||||
response.send(data)
|
||||
} else {
|
||||
let _ = try? response.send("Error")
|
||||
}
|
||||
})
|
||||
} else {
|
||||
try response.send("Error")
|
||||
}
|
||||
} else if request.url.hasPrefix("/hls_stream1_") && request.url.hasSuffix(".ts") {
|
||||
if let partIndex = Int(request.url[request.url.index(request.url.startIndex, offsetBy: "/hls_stream1_".count)..<request.url.index(request.url.endIndex, offsetBy: -".ts".count)]) {
|
||||
let _ = (call.externalMediaStream.get()
|
||||
|> take(1)
|
||||
|> mapToSignal { externalMediaStream in
|
||||
return externalMediaStream.partData(index: partIndex, quality: 1)
|
||||
}
|
||||
|> take(1)
|
||||
|> deliverOnMainQueue).start(next: { partData in
|
||||
guard let partData else {
|
||||
return
|
||||
}
|
||||
|
||||
let sourceTempFile = TempBox.shared.tempFile(fileName: "part.mp4")
|
||||
let tempFile = TempBox.shared.tempFile(fileName: "part.ts")
|
||||
defer {
|
||||
TempBox.shared.dispose(sourceTempFile)
|
||||
TempBox.shared.dispose(tempFile)
|
||||
}
|
||||
|
||||
let _ = try? partData.write(to: URL(fileURLWithPath: sourceTempFile.path))
|
||||
|
||||
let sourcePath = sourceTempFile.path
|
||||
FFMpegLiveMuxer.remux(sourcePath, to: tempFile.path, offsetSeconds: Double(partIndex))
|
||||
if let data = try? Data(contentsOf: URL(fileURLWithPath: tempFile.path)) {
|
||||
response.send(data)
|
||||
} else {
|
||||
let _ = try? response.send("Error")
|
||||
}
|
||||
})
|
||||
} else {
|
||||
try response.send("Error")
|
||||
}
|
||||
} else {
|
||||
try response.send("Error")
|
||||
}
|
||||
})
|
||||
self.server = server
|
||||
server.resume()
|
||||
} catch let e {
|
||||
print("HTTPServer start error: \(e)")
|
||||
}
|
||||
}*/
|
||||
|
||||
let assetUrl = "http://127.0.0.1:\(SharedHLSServer.shared.port)/\(call.internalId)/master.m3u8"
|
||||
Logger.shared.log("MediaStreamVideoComponent", "Initializing HLS asset at \(assetUrl)")
|
||||
#if DEBUG
|
||||
print("Initializing HLS asset at \(assetUrl)")
|
||||
#endif
|
||||
let asset = AVURLAsset(url: URL(string: assetUrl)!, options: [:])
|
||||
self.playerItem = AVPlayerItem(asset: asset)
|
||||
self.player = AVPlayer(playerItem: self.playerItem)
|
||||
self.playerLayer = AVPlayerLayer(player: self.player)
|
||||
|
||||
super.init(frame: CGRect())
|
||||
|
||||
self.failureObserverId = NotificationCenter.default.addObserver(forName: AVPlayerItem.failedToPlayToEndTimeNotification, object: playerItem, queue: .main, using: { notification in
|
||||
print("Player Error: \(notification.description)")
|
||||
})
|
||||
self.errorObserverId = NotificationCenter.default.addObserver(forName: AVPlayerItem.newErrorLogEntryNotification, object: playerItem, queue: .main, using: { notification in
|
||||
print("Player Error: \(notification.description)")
|
||||
})
|
||||
|
||||
self.layer.addSublayer(self.playerLayer)
|
||||
|
||||
//self.contextDisposable = ResourceAdaptor.shared.addContext(id: self.id, context: context, fileReference: fileReference)
|
||||
|
||||
self.player.play()
|
||||
}
|
||||
|
||||
required init?(coder: NSCoder) {
|
||||
fatalError("init(coder:) has not been implemented")
|
||||
}
|
||||
|
||||
deinit {
|
||||
self.contextDisposable?.dispose()
|
||||
if let failureObserverId = self.failureObserverId {
|
||||
NotificationCenter.default.removeObserver(failureObserverId)
|
||||
}
|
||||
if let errorObserverId = self.errorObserverId {
|
||||
NotificationCenter.default.removeObserver(errorObserverId)
|
||||
}
|
||||
}
|
||||
|
||||
func update(size: CGSize) {
|
||||
self.playerLayer.frame = CGRect(origin: CGPoint(), size: size)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -276,6 +276,7 @@ private extension PresentationGroupCallState {
|
||||
private enum CurrentImpl {
|
||||
case call(OngoingGroupCallContext)
|
||||
case mediaStream(WrappedMediaStreamingContext)
|
||||
case externalMediaStream(ExternalMediaStreamingContext)
|
||||
}
|
||||
|
||||
private extension CurrentImpl {
|
||||
@ -283,7 +284,7 @@ private extension CurrentImpl {
|
||||
switch self {
|
||||
case let .call(callContext):
|
||||
return callContext.joinPayload
|
||||
case .mediaStream:
|
||||
case .mediaStream, .externalMediaStream:
|
||||
let ssrcId = UInt32.random(in: 0 ..< UInt32(Int32.max - 1))
|
||||
let dict: [String: Any] = [
|
||||
"fingerprints": [] as [Any],
|
||||
@ -303,7 +304,7 @@ private extension CurrentImpl {
|
||||
switch self {
|
||||
case let .call(callContext):
|
||||
return callContext.networkState
|
||||
case .mediaStream:
|
||||
case .mediaStream, .externalMediaStream:
|
||||
return .single(OngoingGroupCallContext.NetworkState(isConnected: true, isTransitioningFromBroadcastToRtc: false))
|
||||
}
|
||||
}
|
||||
@ -312,7 +313,7 @@ private extension CurrentImpl {
|
||||
switch self {
|
||||
case let .call(callContext):
|
||||
return callContext.audioLevels
|
||||
case .mediaStream:
|
||||
case .mediaStream, .externalMediaStream:
|
||||
return .single([])
|
||||
}
|
||||
}
|
||||
@ -321,7 +322,7 @@ private extension CurrentImpl {
|
||||
switch self {
|
||||
case let .call(callContext):
|
||||
return callContext.isMuted
|
||||
case .mediaStream:
|
||||
case .mediaStream, .externalMediaStream:
|
||||
return .single(true)
|
||||
}
|
||||
}
|
||||
@ -330,7 +331,7 @@ private extension CurrentImpl {
|
||||
switch self {
|
||||
case let .call(callContext):
|
||||
return callContext.isNoiseSuppressionEnabled
|
||||
case .mediaStream:
|
||||
case .mediaStream, .externalMediaStream:
|
||||
return .single(false)
|
||||
}
|
||||
}
|
||||
@ -339,7 +340,7 @@ private extension CurrentImpl {
|
||||
switch self {
|
||||
case let .call(callContext):
|
||||
callContext.stop()
|
||||
case .mediaStream:
|
||||
case .mediaStream, .externalMediaStream:
|
||||
break
|
||||
}
|
||||
}
|
||||
@ -348,7 +349,7 @@ private extension CurrentImpl {
|
||||
switch self {
|
||||
case let .call(callContext):
|
||||
callContext.setIsMuted(isMuted)
|
||||
case .mediaStream:
|
||||
case .mediaStream, .externalMediaStream:
|
||||
break
|
||||
}
|
||||
}
|
||||
@ -357,7 +358,7 @@ private extension CurrentImpl {
|
||||
switch self {
|
||||
case let .call(callContext):
|
||||
callContext.setIsNoiseSuppressionEnabled(isNoiseSuppressionEnabled)
|
||||
case .mediaStream:
|
||||
case .mediaStream, .externalMediaStream:
|
||||
break
|
||||
}
|
||||
}
|
||||
@ -366,7 +367,7 @@ private extension CurrentImpl {
|
||||
switch self {
|
||||
case let .call(callContext):
|
||||
callContext.requestVideo(capturer)
|
||||
case .mediaStream:
|
||||
case .mediaStream, .externalMediaStream:
|
||||
break
|
||||
}
|
||||
}
|
||||
@ -375,7 +376,7 @@ private extension CurrentImpl {
|
||||
switch self {
|
||||
case let .call(callContext):
|
||||
callContext.disableVideo()
|
||||
case .mediaStream:
|
||||
case .mediaStream, .externalMediaStream:
|
||||
break
|
||||
}
|
||||
}
|
||||
@ -384,7 +385,7 @@ private extension CurrentImpl {
|
||||
switch self {
|
||||
case let .call(callContext):
|
||||
callContext.setVolume(ssrc: ssrc, volume: volume)
|
||||
case .mediaStream:
|
||||
case .mediaStream, .externalMediaStream:
|
||||
break
|
||||
}
|
||||
}
|
||||
@ -393,7 +394,7 @@ private extension CurrentImpl {
|
||||
switch self {
|
||||
case let .call(callContext):
|
||||
callContext.setRequestedVideoChannels(channels)
|
||||
case .mediaStream:
|
||||
case .mediaStream, .externalMediaStream:
|
||||
break
|
||||
}
|
||||
}
|
||||
@ -402,17 +403,19 @@ private extension CurrentImpl {
|
||||
switch self {
|
||||
case let .call(callContext):
|
||||
callContext.makeIncomingVideoView(endpointId: endpointId, requestClone: requestClone, completion: completion)
|
||||
case .mediaStream:
|
||||
case .mediaStream, .externalMediaStream:
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
func video(endpointId: String) -> Signal<OngoingGroupCallContext.VideoFrameData, NoError> {
|
||||
func video(endpointId: String) -> Signal<OngoingGroupCallContext.VideoFrameData, NoError>? {
|
||||
switch self {
|
||||
case let .call(callContext):
|
||||
return callContext.video(endpointId: endpointId)
|
||||
case let .mediaStream(mediaStreamContext):
|
||||
return mediaStreamContext.video()
|
||||
case .externalMediaStream:
|
||||
return .never()
|
||||
}
|
||||
}
|
||||
|
||||
@ -420,7 +423,7 @@ private extension CurrentImpl {
|
||||
switch self {
|
||||
case let .call(callContext):
|
||||
callContext.addExternalAudioData(data: data)
|
||||
case .mediaStream:
|
||||
case .mediaStream, .externalMediaStream:
|
||||
break
|
||||
}
|
||||
}
|
||||
@ -429,7 +432,7 @@ private extension CurrentImpl {
|
||||
switch self {
|
||||
case let .call(callContext):
|
||||
callContext.getStats(completion: completion)
|
||||
case .mediaStream:
|
||||
case .mediaStream, .externalMediaStream:
|
||||
break
|
||||
}
|
||||
}
|
||||
@ -438,7 +441,7 @@ private extension CurrentImpl {
|
||||
switch self {
|
||||
case let .call(callContext):
|
||||
callContext.setTone(tone: tone)
|
||||
case .mediaStream:
|
||||
case .mediaStream, .externalMediaStream:
|
||||
break
|
||||
}
|
||||
}
|
||||
@ -647,6 +650,8 @@ public final class PresentationGroupCallImpl: PresentationGroupCall {
|
||||
private var genericCallContext: CurrentImpl?
|
||||
private var currentConnectionMode: OngoingGroupCallContext.ConnectionMode = .none
|
||||
private var didInitializeConnectionMode: Bool = false
|
||||
|
||||
let externalMediaStream = Promise<ExternalMediaStreamingContext>()
|
||||
|
||||
private var screencastCallContext: OngoingGroupCallContext?
|
||||
private var screencastBufferServerContext: IpcGroupCallBufferAppContext?
|
||||
@ -1638,7 +1643,7 @@ public final class PresentationGroupCallImpl: PresentationGroupCall {
|
||||
genericCallContext = current
|
||||
} else {
|
||||
if self.isStream, self.accountContext.sharedContext.immediateExperimentalUISettings.liveStreamV2 {
|
||||
genericCallContext = .mediaStream(WrappedMediaStreamingContext(rejoinNeeded: { [weak self] in
|
||||
let externalMediaStream = ExternalMediaStreamingContext(id: self.internalId, rejoinNeeded: { [weak self] in
|
||||
Queue.mainQueue().async {
|
||||
guard let strongSelf = self else {
|
||||
return
|
||||
@ -1650,7 +1655,9 @@ public final class PresentationGroupCallImpl: PresentationGroupCall {
|
||||
strongSelf.requestCall(movingFromBroadcastToRtc: false)
|
||||
}
|
||||
}
|
||||
}))
|
||||
})
|
||||
genericCallContext = .externalMediaStream(externalMediaStream)
|
||||
self.externalMediaStream.set(.single(externalMediaStream))
|
||||
} else {
|
||||
var outgoingAudioBitrateKbit: Int32?
|
||||
let appConfiguration = self.accountContext.currentAppConfiguration.with({ $0 })
|
||||
@ -1797,6 +1804,14 @@ public final class PresentationGroupCallImpl: PresentationGroupCall {
|
||||
strongSelf.currentConnectionMode = .broadcast
|
||||
mediaStreamContext.setAudioStreamData(audioStreamData: OngoingGroupCallContext.AudioStreamData(engine: strongSelf.accountContext.engine, callId: callInfo.id, accessHash: callInfo.accessHash, isExternalStream: callInfo.isStream))
|
||||
}
|
||||
case let .externalMediaStream(externalMediaStream):
|
||||
switch joinCallResult.connectionMode {
|
||||
case .rtc:
|
||||
strongSelf.currentConnectionMode = .rtc
|
||||
case .broadcast:
|
||||
strongSelf.currentConnectionMode = .broadcast
|
||||
externalMediaStream.setAudioStreamData(audioStreamData: OngoingGroupCallContext.AudioStreamData(engine: strongSelf.accountContext.engine, callId: callInfo.id, accessHash: callInfo.accessHash, isExternalStream: callInfo.isStream))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -3199,7 +3214,7 @@ public final class PresentationGroupCallImpl: PresentationGroupCall {
|
||||
switch genericCallContext {
|
||||
case let .call(callContext):
|
||||
callContext.setConnectionMode(.none, keepBroadcastConnectedIfWasEnabled: movingFromBroadcastToRtc, isUnifiedBroadcast: false)
|
||||
case .mediaStream:
|
||||
case .mediaStream, .externalMediaStream:
|
||||
assertionFailure()
|
||||
break
|
||||
}
|
||||
|
@ -17,6 +17,7 @@ swift_library(
|
||||
"//submodules/TelegramUIPreferences:TelegramUIPreferences",
|
||||
"//submodules/TgVoip:TgVoip",
|
||||
"//submodules/TgVoipWebrtc:TgVoipWebrtc",
|
||||
"//submodules/FFMpegBinding",
|
||||
],
|
||||
visibility = [
|
||||
"//visibility:public",
|
||||
|
@ -2,6 +2,9 @@ import Foundation
|
||||
import SwiftSignalKit
|
||||
import TgVoipWebrtc
|
||||
import TelegramCore
|
||||
import Network
|
||||
import Postbox
|
||||
import FFMpegBinding
|
||||
|
||||
public final class WrappedMediaStreamingContext {
|
||||
private final class Impl {
|
||||
@ -132,3 +135,458 @@ public final class WrappedMediaStreamingContext {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public final class ExternalMediaStreamingContext {
|
||||
private final class Impl {
|
||||
let queue: Queue
|
||||
|
||||
private var broadcastPartsSource: BroadcastPartSource?
|
||||
|
||||
private let resetPlaylistDisposable = MetaDisposable()
|
||||
private let updatePlaylistDisposable = MetaDisposable()
|
||||
|
||||
let masterPlaylistData = Promise<String>()
|
||||
let playlistData = Promise<String>()
|
||||
let mediumPlaylistData = Promise<String>()
|
||||
|
||||
init(queue: Queue, rejoinNeeded: @escaping () -> Void) {
|
||||
self.queue = queue
|
||||
}
|
||||
|
||||
deinit {
|
||||
self.updatePlaylistDisposable.dispose()
|
||||
}
|
||||
|
||||
func setAudioStreamData(audioStreamData: OngoingGroupCallContext.AudioStreamData?) {
|
||||
if let audioStreamData {
|
||||
let broadcastPartsSource = NetworkBroadcastPartSource(queue: self.queue, engine: audioStreamData.engine, callId: audioStreamData.callId, accessHash: audioStreamData.accessHash, isExternalStream: audioStreamData.isExternalStream)
|
||||
self.broadcastPartsSource = broadcastPartsSource
|
||||
|
||||
self.updatePlaylistDisposable.set(nil)
|
||||
|
||||
let queue = self.queue
|
||||
self.resetPlaylistDisposable.set(broadcastPartsSource.requestTime(completion: { [weak self] timestamp in
|
||||
queue.async {
|
||||
guard let self else {
|
||||
return
|
||||
}
|
||||
|
||||
let segmentDuration: Int64 = 1000
|
||||
|
||||
var adjustedTimestamp: Int64 = 0
|
||||
if timestamp > 0 {
|
||||
adjustedTimestamp = timestamp / segmentDuration * segmentDuration - 4 * segmentDuration
|
||||
}
|
||||
|
||||
if adjustedTimestamp > 0 {
|
||||
var masterPlaylistData = "#EXTM3U\n" +
|
||||
"#EXT-X-VERSION:6\n" +
|
||||
"#EXT-X-STREAM-INF:BANDWIDTH=3300000,RESOLUTION=1280x720,CODECS=\"avc1.64001f,mp4a.40.2\"\n" +
|
||||
"hls_level_0.m3u8\n"
|
||||
|
||||
masterPlaylistData += "#EXT-X-STREAM-INF:BANDWIDTH=1000000,RESOLUTION=640x360,CODECS=\"avc1.64001f,mp4a.40.2\"\n" +
|
||||
"hls_level_1.m3u8\n"
|
||||
|
||||
self.masterPlaylistData.set(.single(masterPlaylistData))
|
||||
|
||||
self.beginUpdatingPlaylist(initialHeadTimestamp: adjustedTimestamp)
|
||||
}
|
||||
}
|
||||
}))
|
||||
}
|
||||
}
|
||||
|
||||
private func beginUpdatingPlaylist(initialHeadTimestamp: Int64) {
|
||||
let segmentDuration: Int64 = 1000
|
||||
|
||||
var timestamp = initialHeadTimestamp
|
||||
self.updatePlaylist(headTimestamp: timestamp, quality: 0)
|
||||
self.updatePlaylist(headTimestamp: timestamp, quality: 1)
|
||||
|
||||
self.updatePlaylistDisposable.set((
|
||||
Signal<Void, NoError>.single(Void())
|
||||
|> delay(1.0, queue: self.queue)
|
||||
|> restart
|
||||
|> deliverOn(self.queue)
|
||||
).start(next: { [weak self] _ in
|
||||
guard let self else {
|
||||
return
|
||||
}
|
||||
|
||||
timestamp += segmentDuration
|
||||
self.updatePlaylist(headTimestamp: timestamp, quality: 0)
|
||||
self.updatePlaylist(headTimestamp: timestamp, quality: 1)
|
||||
}))
|
||||
}
|
||||
|
||||
private func updatePlaylist(headTimestamp: Int64, quality: Int) {
|
||||
let segmentDuration: Int64 = 1000
|
||||
let headIndex = headTimestamp / segmentDuration
|
||||
let minIndex = headIndex - 20
|
||||
|
||||
var playlistData = "#EXTM3U\n" +
|
||||
"#EXT-X-VERSION:6\n" +
|
||||
"#EXT-X-TARGETDURATION:1\n" +
|
||||
"#EXT-X-MEDIA-SEQUENCE:\(minIndex)\n" +
|
||||
"#EXT-X-INDEPENDENT-SEGMENTS\n"
|
||||
|
||||
for index in minIndex ... headIndex {
|
||||
playlistData.append("#EXTINF:1.000000,\n")
|
||||
playlistData.append("hls_stream\(quality)_\(index).ts\n")
|
||||
}
|
||||
|
||||
print("Player: updating playlist \(quality) \(minIndex) ... \(headIndex)")
|
||||
|
||||
if quality == 0 {
|
||||
self.playlistData.set(.single(playlistData))
|
||||
} else {
|
||||
self.mediumPlaylistData.set(.single(playlistData))
|
||||
}
|
||||
}
|
||||
|
||||
func partData(index: Int, quality: Int) -> Signal<Data?, NoError> {
|
||||
let segmentDuration: Int64 = 1000
|
||||
let timestamp = Int64(index) * segmentDuration
|
||||
|
||||
print("Player: request part(q: \(quality)) \(index) -> \(timestamp)")
|
||||
|
||||
guard let broadcastPartsSource = self.broadcastPartsSource else {
|
||||
return .single(nil)
|
||||
}
|
||||
|
||||
return Signal { subscriber in
|
||||
return broadcastPartsSource.requestPart(
|
||||
timestampMilliseconds: timestamp,
|
||||
durationMilliseconds: segmentDuration,
|
||||
subject: .video(channelId: 1, quality: quality == 0 ? .full : .medium),
|
||||
completion: { part in
|
||||
var data = part.oggData
|
||||
if data.count > 32 {
|
||||
data = data.subdata(in: 32 ..< data.count)
|
||||
}
|
||||
subscriber.putNext(data)
|
||||
},
|
||||
rejoinNeeded: {
|
||||
//TODO
|
||||
}
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private let queue = Queue()
|
||||
let id: CallSessionInternalId
|
||||
private let impl: QueueLocalObject<Impl>
|
||||
private var hlsServerDisposable: Disposable?
|
||||
|
||||
public init(id: CallSessionInternalId, rejoinNeeded: @escaping () -> Void) {
|
||||
self.id = id
|
||||
let queue = self.queue
|
||||
self.impl = QueueLocalObject(queue: queue, generate: {
|
||||
return Impl(queue: queue, rejoinNeeded: rejoinNeeded)
|
||||
})
|
||||
|
||||
self.hlsServerDisposable = SharedHLSServer.shared.registerPlayer(streamingContext: self)
|
||||
}
|
||||
|
||||
deinit {
|
||||
self.hlsServerDisposable?.dispose()
|
||||
}
|
||||
|
||||
public func setAudioStreamData(audioStreamData: OngoingGroupCallContext.AudioStreamData?) {
|
||||
self.impl.with { impl in
|
||||
impl.setAudioStreamData(audioStreamData: audioStreamData)
|
||||
}
|
||||
}
|
||||
|
||||
public func masterPlaylistData() -> Signal<String, NoError> {
|
||||
return self.impl.signalWith { impl, subscriber in
|
||||
impl.masterPlaylistData.get().start(next: subscriber.putNext)
|
||||
}
|
||||
}
|
||||
|
||||
public func playlistData(quality: Int) -> Signal<String, NoError> {
|
||||
return self.impl.signalWith { impl, subscriber in
|
||||
if quality == 0 {
|
||||
impl.playlistData.get().start(next: subscriber.putNext)
|
||||
} else {
|
||||
impl.mediumPlaylistData.get().start(next: subscriber.putNext)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public func partData(index: Int, quality: Int) -> Signal<Data?, NoError> {
|
||||
return self.impl.signalWith { impl, subscriber in
|
||||
impl.partData(index: index, quality: quality).start(next: subscriber.putNext)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public final class SharedHLSServer {
|
||||
public static let shared: SharedHLSServer = {
|
||||
return SharedHLSServer()
|
||||
}()
|
||||
|
||||
private enum ResponseError {
|
||||
case badRequest
|
||||
case notFound
|
||||
case internalServerError
|
||||
|
||||
var httpString: String {
|
||||
switch self {
|
||||
case .badRequest:
|
||||
return "400 Bad Request"
|
||||
case .notFound:
|
||||
return "404 Not Found"
|
||||
case .internalServerError:
|
||||
return "500 Internal Server Error"
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private final class ContextReference {
|
||||
weak var streamingContext: ExternalMediaStreamingContext?
|
||||
|
||||
init(streamingContext: ExternalMediaStreamingContext) {
|
||||
self.streamingContext = streamingContext
|
||||
}
|
||||
}
|
||||
|
||||
private final class Impl {
|
||||
private let queue: Queue
|
||||
|
||||
private let port: NWEndpoint.Port
|
||||
private var listener: NWListener?
|
||||
|
||||
private var contextReferences = Bag<ContextReference>()
|
||||
|
||||
init(queue: Queue, port: UInt16) {
|
||||
self.queue = queue
|
||||
self.port = NWEndpoint.Port(rawValue: port)!
|
||||
self.start()
|
||||
}
|
||||
|
||||
func start() {
|
||||
let listener: NWListener
|
||||
do {
|
||||
listener = try NWListener(using: .tcp, on: self.port)
|
||||
} catch {
|
||||
Logger.shared.log("SharedHLSServer", "Failed to create listener: \(error)")
|
||||
return
|
||||
}
|
||||
self.listener = listener
|
||||
|
||||
listener.newConnectionHandler = { [weak self] connection in
|
||||
guard let self else {
|
||||
return
|
||||
}
|
||||
self.handleConnection(connection: connection)
|
||||
}
|
||||
|
||||
listener.stateUpdateHandler = { [weak self] state in
|
||||
guard let self else {
|
||||
return
|
||||
}
|
||||
switch state {
|
||||
case .ready:
|
||||
Logger.shared.log("SharedHLSServer", "Server is ready on port \(self.port)")
|
||||
case let .failed(error):
|
||||
Logger.shared.log("SharedHLSServer", "Server failed with error: \(error)")
|
||||
self.listener?.cancel()
|
||||
default:
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
listener.start(queue: self.queue.queue)
|
||||
}
|
||||
|
||||
private func handleConnection(connection: NWConnection) {
|
||||
connection.start(queue: self.queue.queue)
|
||||
connection.receive(minimumIncompleteLength: 1, maximumLength: 1024, completion: { [weak self] data, _, isComplete, error in
|
||||
guard let self else {
|
||||
return
|
||||
}
|
||||
if let data, !data.isEmpty {
|
||||
self.handleRequest(data: data, connection: connection)
|
||||
} else if isComplete {
|
||||
connection.cancel()
|
||||
} else if let error = error {
|
||||
Logger.shared.log("SharedHLSServer", "Error on connection: \(error)")
|
||||
connection.cancel()
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
private func handleRequest(data: Data, connection: NWConnection) {
|
||||
guard let requestString = String(data: data, encoding: .utf8) else {
|
||||
connection.cancel()
|
||||
return
|
||||
}
|
||||
|
||||
if !requestString.hasPrefix("GET /") {
|
||||
self.sendErrorAndClose(connection: connection)
|
||||
return
|
||||
}
|
||||
guard let firstCrLf = requestString.range(of: "\r\n") else {
|
||||
self.sendErrorAndClose(connection: connection)
|
||||
return
|
||||
}
|
||||
let firstLine = String(requestString[requestString.index(requestString.startIndex, offsetBy: "GET /".count) ..< firstCrLf.lowerBound])
|
||||
if !(firstLine.hasSuffix(" HTTP/1.0") || firstLine.hasSuffix(" HTTP/1.1")) {
|
||||
self.sendErrorAndClose(connection: connection)
|
||||
return
|
||||
}
|
||||
|
||||
let requestPath = String(firstLine[firstLine.startIndex ..< firstLine.index(firstLine.endIndex, offsetBy: -" HTTP/1.1".count)])
|
||||
|
||||
guard let firstSlash = requestPath.range(of: "/") else {
|
||||
self.sendErrorAndClose(connection: connection, error: .notFound)
|
||||
return
|
||||
}
|
||||
guard let streamId = UUID(uuidString: String(requestPath[requestPath.startIndex ..< firstSlash.lowerBound])) else {
|
||||
self.sendErrorAndClose(connection: connection)
|
||||
return
|
||||
}
|
||||
guard let streamingContext = self.contextReferences.copyItems().first(where: { $0.streamingContext?.id == streamId })?.streamingContext else {
|
||||
self.sendErrorAndClose(connection: connection)
|
||||
return
|
||||
}
|
||||
|
||||
let filePath = String(requestPath[firstSlash.upperBound...])
|
||||
if filePath == "master.m3u8" {
|
||||
let _ = (streamingContext.masterPlaylistData()
|
||||
|> deliverOn(self.queue)
|
||||
|> take(1)).start(next: { [weak self] result in
|
||||
guard let self else {
|
||||
return
|
||||
}
|
||||
|
||||
self.sendResponseAndClose(connection: connection, data: result.data(using: .utf8)!)
|
||||
})
|
||||
} else if filePath.hasPrefix("hls_level_") && filePath.hasSuffix(".m3u8") {
|
||||
guard let levelIndex = Int(String(filePath[filePath.index(filePath.startIndex, offsetBy: "hls_level_".count) ..< filePath.index(filePath.endIndex, offsetBy: -".m3u8".count)])) else {
|
||||
self.sendErrorAndClose(connection: connection)
|
||||
return
|
||||
}
|
||||
|
||||
let _ = (streamingContext.playlistData(quality: levelIndex)
|
||||
|> deliverOn(self.queue)
|
||||
|> take(1)).start(next: { [weak self] result in
|
||||
guard let self else {
|
||||
return
|
||||
}
|
||||
|
||||
self.sendResponseAndClose(connection: connection, data: result.data(using: .utf8)!)
|
||||
})
|
||||
} else if filePath.hasPrefix("hls_stream") && filePath.hasSuffix(".ts") {
|
||||
let fileId = String(filePath[filePath.index(filePath.startIndex, offsetBy: "hls_stream".count) ..< filePath.index(filePath.endIndex, offsetBy: -".ts".count)])
|
||||
guard let underscoreRange = fileId.range(of: "_") else {
|
||||
self.sendErrorAndClose(connection: connection)
|
||||
return
|
||||
}
|
||||
guard let levelIndex = Int(String(fileId[fileId.startIndex ..< underscoreRange.lowerBound])) else {
|
||||
self.sendErrorAndClose(connection: connection)
|
||||
return
|
||||
}
|
||||
guard let partIndex = Int(String(fileId[underscoreRange.upperBound...])) else {
|
||||
self.sendErrorAndClose(connection: connection)
|
||||
return
|
||||
}
|
||||
let _ = (streamingContext.partData(index: partIndex, quality: levelIndex)
|
||||
|> deliverOn(self.queue)
|
||||
|> take(1)).start(next: { [weak self] result in
|
||||
guard let self else {
|
||||
return
|
||||
}
|
||||
|
||||
if let result {
|
||||
let sourceTempFile = TempBox.shared.tempFile(fileName: "part.mp4")
|
||||
let tempFile = TempBox.shared.tempFile(fileName: "part.ts")
|
||||
defer {
|
||||
TempBox.shared.dispose(sourceTempFile)
|
||||
TempBox.shared.dispose(tempFile)
|
||||
}
|
||||
|
||||
guard let _ = try? result.write(to: URL(fileURLWithPath: sourceTempFile.path)) else {
|
||||
self.sendErrorAndClose(connection: connection, error: .internalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
let sourcePath = sourceTempFile.path
|
||||
FFMpegLiveMuxer.remux(sourcePath, to: tempFile.path, offsetSeconds: Double(partIndex))
|
||||
|
||||
if let data = try? Data(contentsOf: URL(fileURLWithPath: tempFile.path)) {
|
||||
self.sendResponseAndClose(connection: connection, data: data)
|
||||
} else {
|
||||
self.sendErrorAndClose(connection: connection, error: .internalServerError)
|
||||
}
|
||||
} else {
|
||||
self.sendErrorAndClose(connection: connection, error: .notFound)
|
||||
}
|
||||
})
|
||||
} else {
|
||||
self.sendErrorAndClose(connection: connection, error: .notFound)
|
||||
}
|
||||
}
|
||||
|
||||
private func sendErrorAndClose(connection: NWConnection, error: ResponseError = .badRequest) {
|
||||
let errorResponse = "HTTP/1.1 \(error.httpString)\r\nContent-Type: text/html\r\nConnection: close\r\n\r\n"
|
||||
connection.send(content: errorResponse.data(using: .utf8), completion: .contentProcessed { error in
|
||||
if let error {
|
||||
Logger.shared.log("SharedHLSServer", "Failed to send response: \(error)")
|
||||
}
|
||||
connection.cancel()
|
||||
})
|
||||
}
|
||||
|
||||
private func sendResponseAndClose(connection: NWConnection, data: Data) {
|
||||
let responseHeaders = "HTTP/1.1 200 OK\r\nContent-Type: application/octet-stream\r\nConnection: close\r\n\r\n"
|
||||
var responseData = Data()
|
||||
responseData.append(responseHeaders.data(using: .utf8)!)
|
||||
responseData.append(data)
|
||||
connection.send(content: responseData, completion: .contentProcessed { error in
|
||||
if let error {
|
||||
Logger.shared.log("SharedHLSServer", "Failed to send response: \(error)")
|
||||
}
|
||||
connection.cancel()
|
||||
})
|
||||
}
|
||||
|
||||
func registerPlayer(streamingContext: ExternalMediaStreamingContext) -> Disposable {
|
||||
let queue = self.queue
|
||||
let index = self.contextReferences.add(ContextReference(streamingContext: streamingContext))
|
||||
|
||||
return ActionDisposable { [weak self] in
|
||||
queue.async {
|
||||
guard let self else {
|
||||
return
|
||||
}
|
||||
self.contextReferences.remove(index)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static let queue = Queue(name: "SharedHLSServer")
|
||||
public let port: UInt16 = 8016
|
||||
private let impl: QueueLocalObject<Impl>
|
||||
|
||||
private init() {
|
||||
let queue = SharedHLSServer.queue
|
||||
let port = self.port
|
||||
self.impl = QueueLocalObject(queue: queue, generate: {
|
||||
return Impl(queue: queue, port: port)
|
||||
})
|
||||
}
|
||||
|
||||
fileprivate func registerPlayer(streamingContext: ExternalMediaStreamingContext) -> Disposable {
|
||||
let disposable = MetaDisposable()
|
||||
|
||||
self.impl.with { impl in
|
||||
disposable.set(impl.registerPlayer(streamingContext: streamingContext))
|
||||
}
|
||||
|
||||
return disposable
|
||||
}
|
||||
}
|
||||
|
@ -47,13 +47,13 @@ CONFIGURE_FLAGS="--enable-cross-compile --disable-programs \
|
||||
--enable-libopus \
|
||||
--enable-libvpx \
|
||||
--enable-audiotoolbox \
|
||||
--enable-bsf=aac_adtstoasc,vp9_superframe \
|
||||
--enable-bsf=aac_adtstoasc,vp9_superframe,h264_mp4toannexb \
|
||||
--enable-decoder=h264,libvpx_vp9,hevc,libopus,mp3,aac,flac,alac_at,pcm_s16le,pcm_s24le,gsm_ms_at \
|
||||
--enable-encoder=libvpx_vp9 \
|
||||
--enable-demuxer=aac,mov,m4v,mp3,ogg,libopus,flac,wav,aiff,matroska \
|
||||
--enable-encoder=libvpx_vp9,aac_at \
|
||||
--enable-demuxer=aac,mov,m4v,mp3,ogg,libopus,flac,wav,aiff,matroska,mpegts \
|
||||
--enable-parser=aac,h264,mp3,libopus \
|
||||
--enable-protocol=file \
|
||||
--enable-muxer=mp4,matroska \
|
||||
--enable-muxer=mp4,matroska,mpegts \
|
||||
"
|
||||
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user