mirror of
https://github.com/Swiftgram/Telegram-iOS.git
synced 2025-06-16 05:55:20 +00:00
Add network stats
This commit is contained in:
parent
6dc31ae148
commit
0a305bc848
@ -21,7 +21,7 @@
|
||||
@protocol MTTcpConnectionInterfaceDelegate <NSObject>
|
||||
|
||||
- (void)connectionInterfaceDidReadPartialDataOfLength:(NSUInteger)partialLength tag:(long)tag;
|
||||
- (void)connectionInterfaceDidReadData:(NSData * _Nonnull)rawData withTag:(long)tag;
|
||||
- (void)connectionInterfaceDidReadData:(NSData * _Nonnull)rawData withTag:(long)tag networkType:(int32_t)networkType;
|
||||
- (void)connectionInterfaceDidConnect;
|
||||
- (void)connectionInterfaceDidDisconnectWithError:(NSError * _Nullable)error;
|
||||
|
||||
|
@ -21,7 +21,7 @@
|
||||
- (MTMessageTransaction *)mtProtoMessageTransaction:(MTProto *)mtProto authInfoSelector:(MTDatacenterAuthInfoSelector)authInfoSelector sessionInfo:(MTSessionInfo *)sessionInfo scheme:(MTTransportScheme *)scheme;
|
||||
- (void)mtProtoDidChangeSession:(MTProto *)mtProto;
|
||||
- (void)mtProtoServerDidChangeSession:(MTProto *)mtProto firstValidMessageId:(int64_t)firstValidMessageId otherValidMessageIds:(NSArray *)otherValidMessageIds;
|
||||
- (void)mtProto:(MTProto *)mtProto receivedMessage:(MTIncomingMessage *)message authInfoSelector:(MTDatacenterAuthInfoSelector)authInfoSelector;
|
||||
- (void)mtProto:(MTProto *)mtProto receivedMessage:(MTIncomingMessage *)message authInfoSelector:(MTDatacenterAuthInfoSelector)authInfoSelector networkType:(int32_t)networkType;
|
||||
- (void)mtProto:(MTProto *)mtProto receivedQuickAck:(int32_t)quickAckId;
|
||||
- (void)mtProto:(MTProto *)mtProto transactionsMayHaveFailed:(NSArray *)transactionIds;
|
||||
- (void)mtProtoAllTransactionsMayHaveFailed:(MTProto *)mtProto;
|
||||
|
@ -6,6 +6,16 @@
|
||||
@class MTRequestErrorContext;
|
||||
@class MTRpcError;
|
||||
|
||||
@interface MTRequestResponseInfo : NSObject
|
||||
|
||||
@property (nonatomic, readonly) int32_t networkType;
|
||||
@property (nonatomic, readonly) double timestamp;
|
||||
@property (nonatomic, readonly) double duration;
|
||||
|
||||
- (instancetype)initWithNetworkType:(int32_t)networkType timestamp:(double)timestamp duration:(double)duration;
|
||||
|
||||
@end
|
||||
|
||||
@interface MTRequest : NSObject
|
||||
|
||||
@property (nonatomic, strong, readonly) id internalId;
|
||||
@ -24,7 +34,7 @@
|
||||
@property (nonatomic) bool passthroughPasswordEntryError;
|
||||
@property (nonatomic) bool needsTimeoutTimer;
|
||||
|
||||
@property (nonatomic, copy) void (^completed)(id result, NSTimeInterval completionTimestamp, MTRpcError *error);
|
||||
@property (nonatomic, copy) void (^completed)(id result, MTRequestResponseInfo *info, MTRpcError *error);
|
||||
@property (nonatomic, copy) void (^progressUpdated)(float progress, NSUInteger packetLength);
|
||||
@property (nonatomic, copy) void (^acknowledgementReceived)();
|
||||
|
||||
|
@ -12,6 +12,7 @@
|
||||
@property (nonatomic) bool delivered;
|
||||
@property (nonatomic) int64_t responseMessageId;
|
||||
@property (nonatomic) bool willInitializeApi;
|
||||
@property (nonatomic) double sentTimestamp;
|
||||
|
||||
- (instancetype)initWithMessageId:(int64_t)messageId messageSeqNo:(int32_t)messageSeqNo transactionId:(id)transactionId quickAckId:(int32_t)quickAckId;
|
||||
|
||||
|
@ -26,7 +26,7 @@
|
||||
- (void)transportConnectionProblemsStatusChanged:(MTTransport * _Nonnull)transport scheme:(MTTransportScheme * _Nonnull)scheme hasConnectionProblems:(bool)hasConnectionProblems isProbablyHttp:(bool)isProbablyHttp;
|
||||
|
||||
- (void)transportReadyForTransaction:(MTTransport * _Nonnull)transport scheme:(MTTransportScheme * _Nonnull)scheme transportSpecificTransaction:(MTMessageTransaction * _Nonnull)transportSpecificTransaction forceConfirmations:(bool)forceConfirmations transactionReady:(void (^ _Nonnull)(NSArray * _Nonnull))transactionReady;
|
||||
- (void)transportHasIncomingData:(MTTransport * _Nonnull)transport scheme:(MTTransportScheme * _Nonnull)scheme data:(NSData * _Nonnull)data transactionId:(id _Nonnull)transactionId requestTransactionAfterProcessing:(bool)requestTransactionAfterProcessing decodeResult:(void (^ _Nonnull)(id _Nonnull transactionId, bool success))decodeResult;
|
||||
- (void)transportHasIncomingData:(MTTransport * _Nonnull)transport scheme:(MTTransportScheme * _Nonnull)scheme networkType:(int32_t)networkType data:(NSData * _Nonnull)data transactionId:(id _Nonnull)transactionId requestTransactionAfterProcessing:(bool)requestTransactionAfterProcessing decodeResult:(void (^ _Nonnull)(id _Nonnull transactionId, bool success))decodeResult;
|
||||
- (void)transportTransactionsMayHaveFailed:(MTTransport * _Nonnull)transport transactionIds:(NSArray * _Nonnull)transactionIds;
|
||||
- (void)transportReceivedQuickAck:(MTTransport * _Nonnull)transport quickAckId:(int32_t)quickAckId;
|
||||
- (void)transportDecodeProgressToken:(MTTransport * _Nonnull)transport scheme:(MTTransportScheme * _Nonnull)scheme data:(NSData * _Nonnull)data token:(int64_t)token completion:(void (^ _Nonnull)(int64_t token, id _Nonnull progressToken))completion;
|
||||
@ -57,7 +57,7 @@
|
||||
- (void)stop;
|
||||
- (void)updateConnectionState;
|
||||
- (void)setDelegateNeedsTransaction;
|
||||
- (void)_processIncomingData:(NSData * _Nonnull)data scheme:(MTTransportScheme * _Nonnull)scheme transactionId:(id _Nonnull)transactionId requestTransactionAfterProcessing:(bool)requestTransactionAfterProcessing decodeResult:(void (^ _Nonnull)(id _Nonnull transactionId, bool success))decodeResult;
|
||||
- (void)_processIncomingData:(NSData * _Nonnull)data scheme:(MTTransportScheme * _Nonnull)scheme networkType:(int32_t)networkType transactionId:(id _Nonnull)transactionId requestTransactionAfterProcessing:(bool)requestTransactionAfterProcessing decodeResult:(void (^ _Nonnull)(id _Nonnull transactionId, bool success))decodeResult;
|
||||
- (void)_networkAvailabilityChanged:(bool)networkAvailable;
|
||||
|
||||
- (void)activeTransactionIds:(void (^ _Nonnull)(NSArray * _Nonnull activeTransactionId))completion;
|
||||
|
@ -975,7 +975,7 @@ typedef enum GCDAsyncSocketError GCDAsyncSocketError;
|
||||
* Called when a socket has completed reading the requested data into memory.
|
||||
* Not called if there is an error.
|
||||
**/
|
||||
- (void)socket:(GCDAsyncSocket *)sock didReadData:(NSData *)data withTag:(long)tag;
|
||||
- (void)socket:(GCDAsyncSocket *)sock didReadData:(NSData *)data withTag:(long)tag networkType:(int32_t)networkType;
|
||||
|
||||
/**
|
||||
* Called when a socket has read in data, but has not yet completed the read.
|
||||
|
@ -5119,14 +5119,15 @@ enum GCDAsyncSocketConfig
|
||||
result = [NSData dataWithBytesNoCopy:buffer length:currentRead->bytesDone freeWhenDone:NO];
|
||||
}
|
||||
|
||||
if (delegateQueue && [delegate respondsToSelector:@selector(socket:didReadData:withTag:)])
|
||||
if (delegateQueue && [delegate respondsToSelector:@selector(socket:didReadData:withTag:networkType:)])
|
||||
{
|
||||
__strong id theDelegate = delegate;
|
||||
GCDAsyncReadPacket *theRead = currentRead; // Ensure currentRead retained since result may not own buffer
|
||||
int32_t networkType = _interface == MTNetworkUsageManagerInterfaceOther ? 0 : 1;
|
||||
|
||||
dispatch_async(delegateQueue, ^{ @autoreleasepool {
|
||||
|
||||
[theDelegate socket:self didReadData:result withTag:theRead->tag];
|
||||
[theDelegate socket:self didReadData:result withTag:theRead->tag networkType:networkType];
|
||||
}});
|
||||
}
|
||||
|
||||
|
@ -313,7 +313,7 @@ static NSString *makeRandomPadding() {
|
||||
|
||||
__weak MTContext *weakCurrentContext = currentContext;
|
||||
return [[MTSignal alloc] initWithGenerator:^id<MTDisposable>(MTSubscriber *subscriber) {
|
||||
[request setCompleted:^(MTDatacenterAddressListData *result, __unused NSTimeInterval completionTimestamp, id error)
|
||||
[request setCompleted:^(MTDatacenterAddressListData *result, __unused MTRequestResponseInfo *info, id error)
|
||||
{
|
||||
if (error == nil) {
|
||||
__strong MTContext *strongCurrentContext = weakCurrentContext;
|
||||
|
@ -136,7 +136,7 @@
|
||||
}
|
||||
}
|
||||
|
||||
- (void)mtProto:(MTProto *)mtProto receivedMessage:(MTIncomingMessage *)message authInfoSelector:(MTDatacenterAuthInfoSelector)authInfoSelector {
|
||||
- (void)mtProto:(MTProto *)mtProto receivedMessage:(MTIncomingMessage *)message authInfoSelector:(MTDatacenterAuthInfoSelector)authInfoSelector networkType:(int32_t)networkType {
|
||||
if ([message.body isKindOfClass:[MTRpcResultMessage class]]) {
|
||||
MTRpcResultMessage *rpcResultMessage = message.body;
|
||||
if (rpcResultMessage.requestMessageId == _currentMessageId) {
|
||||
|
@ -410,7 +410,7 @@ static NSData *encryptRSAModernPadding(id<EncryptionProvider> encryptionProvider
|
||||
}
|
||||
}
|
||||
|
||||
- (void)mtProto:(MTProto *)mtProto receivedMessage:(MTIncomingMessage *)message authInfoSelector:(MTDatacenterAuthInfoSelector)authInfoSelector
|
||||
- (void)mtProto:(MTProto *)mtProto receivedMessage:(MTIncomingMessage *)message authInfoSelector:(MTDatacenterAuthInfoSelector)authInfoSelector networkType:(int32_t)networkType
|
||||
{
|
||||
if (_stage == MTDatacenterAuthStagePQ && [message.body isKindOfClass:[MTResPqMessage class]])
|
||||
{
|
||||
|
@ -101,7 +101,7 @@
|
||||
[request setPayload:exportAuthRequestData metadata:@"exportAuthorization" shortMetadata:@"exportAuthorization" responseParser:responseParser];
|
||||
|
||||
__weak MTDatacenterTransferAuthAction *weakSelf = self;
|
||||
[request setCompleted:^(MTExportedAuthorizationData *result, __unused NSTimeInterval timestamp, id error)
|
||||
[request setCompleted:^(MTExportedAuthorizationData *result, __unused MTRequestResponseInfo *info, id error)
|
||||
{
|
||||
__strong MTDatacenterTransferAuthAction *strongSelf = weakSelf;
|
||||
if (strongSelf == nil)
|
||||
@ -147,7 +147,7 @@
|
||||
id authToken = _authToken;
|
||||
|
||||
__weak MTDatacenterTransferAuthAction *weakSelf = self;
|
||||
[request setCompleted:^(__unused id result, __unused NSTimeInterval timestamp, id error)
|
||||
[request setCompleted:^(__unused id result, __unused MTRequestResponseInfo *info, id error)
|
||||
{
|
||||
__strong MTDatacenterTransferAuthAction *strongSelf = weakSelf;
|
||||
if (strongSelf == nil)
|
||||
|
@ -106,7 +106,7 @@
|
||||
[request setPayload:getConfigData metadata:@"getConfig" shortMetadata:@"getConfig" responseParser:responseParser];
|
||||
|
||||
__weak MTDiscoverDatacenterAddressAction *weakSelf = self;
|
||||
[request setCompleted:^(MTDatacenterAddressListData *result, __unused NSTimeInterval completionTimestamp, id error)
|
||||
[request setCompleted:^(MTDatacenterAddressListData *result, __unused MTRequestResponseInfo *info, id error)
|
||||
{
|
||||
__strong MTDiscoverDatacenterAddressAction *strongSelf = weakSelf;
|
||||
if (strongSelf != nil) {
|
||||
|
@ -1966,7 +1966,7 @@ static NSString *dumpHexString(NSData *data, int maxLength) {
|
||||
return hexString;
|
||||
}
|
||||
|
||||
- (void)transportHasIncomingData:(MTTransport *)transport scheme:(MTTransportScheme *)scheme data:(NSData *)data transactionId:(id)transactionId requestTransactionAfterProcessing:(bool)requestTransactionAfterProcessing decodeResult:(void (^)(id transactionId, bool success))decodeResult
|
||||
- (void)transportHasIncomingData:(MTTransport *)transport scheme:(MTTransportScheme *)scheme networkType:(int32_t)networkType data:(NSData *)data transactionId:(id)transactionId requestTransactionAfterProcessing:(bool)requestTransactionAfterProcessing decodeResult:(void (^)(id transactionId, bool success))decodeResult
|
||||
{
|
||||
/*__block bool simulateError = false;
|
||||
static dispatch_once_t onceToken;
|
||||
@ -2097,7 +2097,7 @@ static NSString *dumpHexString(NSData *data, int maxLength) {
|
||||
|
||||
for (MTIncomingMessage *incomingMessage in parsedMessages)
|
||||
{
|
||||
[self _processIncomingMessage:incomingMessage totalSize:(int)data.length withTransactionId:transactionId address:scheme.address authInfoSelector:authInfoSelector];
|
||||
[self _processIncomingMessage:incomingMessage totalSize:(int)data.length withTransactionId:transactionId address:scheme.address authInfoSelector:authInfoSelector networkType:networkType];
|
||||
}
|
||||
|
||||
if (requestTransactionAfterProcessing)
|
||||
@ -2422,7 +2422,7 @@ static bool isDataEqualToDataConstTime(NSData *data1, NSData *data2) {
|
||||
return messages;
|
||||
}
|
||||
|
||||
- (void)_processIncomingMessage:(MTIncomingMessage *)incomingMessage totalSize:(int)totalSize withTransactionId:(id)transactionId address:(MTDatacenterAddress *)address authInfoSelector:(MTDatacenterAuthInfoSelector)authInfoSelector
|
||||
- (void)_processIncomingMessage:(MTIncomingMessage *)incomingMessage totalSize:(int)totalSize withTransactionId:(id)transactionId address:(MTDatacenterAddress *)address authInfoSelector:(MTDatacenterAuthInfoSelector)authInfoSelector networkType:(int32_t)networkType
|
||||
{
|
||||
if ([_sessionInfo messageProcessed:incomingMessage.messageId])
|
||||
{
|
||||
@ -2605,8 +2605,8 @@ static bool isDataEqualToDataConstTime(NSData *data1, NSData *data2) {
|
||||
{
|
||||
id<MTMessageService> messageService = _messageServices[(NSUInteger)i];
|
||||
|
||||
if ([messageService respondsToSelector:@selector(mtProto:receivedMessage:authInfoSelector:)])
|
||||
[messageService mtProto:self receivedMessage:incomingMessage authInfoSelector:authInfoSelector];
|
||||
if ([messageService respondsToSelector:@selector(mtProto:receivedMessage:authInfoSelector:networkType:)])
|
||||
[messageService mtProto:self receivedMessage:incomingMessage authInfoSelector:authInfoSelector networkType:networkType];
|
||||
}
|
||||
|
||||
if (_timeFixContext != nil && [incomingMessage.body isKindOfClass:[MTPongMessage class]] && ((MTPongMessage *)incomingMessage.body).messageId == _timeFixContext.messageId)
|
||||
|
@ -5,6 +5,20 @@
|
||||
#import <os/lock.h>
|
||||
#import <libkern/OSAtomic.h>
|
||||
|
||||
@implementation MTRequestResponseInfo
|
||||
|
||||
- (instancetype)initWithNetworkType:(int32_t)networkType timestamp:(double)timestamp duration:(double)duration {
|
||||
self = [super init];
|
||||
if (self != nil) {
|
||||
_networkType = networkType;
|
||||
_timestamp = timestamp;
|
||||
_duration = duration;
|
||||
}
|
||||
return self;
|
||||
}
|
||||
|
||||
@end
|
||||
|
||||
@interface MTRequestInternalId : NSObject <NSCopying>
|
||||
{
|
||||
NSUInteger _value;
|
||||
|
@ -369,7 +369,7 @@
|
||||
MTRequestNoopParser responseParser = [[_context serialization] requestNoop:&noopData];
|
||||
[request setPayload:noopData metadata:@"noop" shortMetadata:@"noop" responseParser:responseParser];
|
||||
|
||||
[request setCompleted:^(__unused id result, __unused NSTimeInterval timestamp, __unused id error) {
|
||||
[request setCompleted:^(__unused id result, __unused MTRequestResponseInfo *info, __unused id error) {
|
||||
}];
|
||||
|
||||
[self addRequest:request];
|
||||
@ -614,6 +614,7 @@
|
||||
}
|
||||
|
||||
MTRequestContext *requestContext = [[MTRequestContext alloc] initWithMessageId:preparedMessage.messageId messageSeqNo:preparedMessage.seqNo transactionId:nil quickAckId:0];
|
||||
requestContext.sentTimestamp = CFAbsoluteTimeGetCurrent();
|
||||
requestContext.willInitializeApi = requestsWillInitializeApi;
|
||||
requestContext.waitingForMessageId = true;
|
||||
request.requestContext = requestContext;
|
||||
@ -646,6 +647,7 @@
|
||||
continue;
|
||||
}
|
||||
MTRequestContext *requestContext = [[MTRequestContext alloc] initWithMessageId:preparedMessage.messageId messageSeqNo:preparedMessage.seqNo transactionId:messageInternalIdToTransactionId[messageInternalId] quickAckId:(int32_t)[messageInternalIdToQuickAckId[messageInternalId] intValue]];
|
||||
requestContext.sentTimestamp = CFAbsoluteTimeGetCurrent();
|
||||
requestContext.willInitializeApi = requestsWillInitializeApi;
|
||||
request.requestContext = requestContext;
|
||||
}
|
||||
@ -667,7 +669,7 @@
|
||||
return nil;
|
||||
}
|
||||
|
||||
- (void)mtProto:(MTProto *)__unused mtProto receivedMessage:(MTIncomingMessage *)message authInfoSelector:(MTDatacenterAuthInfoSelector)authInfoSelector
|
||||
- (void)mtProto:(MTProto *)__unused mtProto receivedMessage:(MTIncomingMessage *)message authInfoSelector:(MTDatacenterAuthInfoSelector)authInfoSelector networkType:(int32_t)networkType
|
||||
{
|
||||
if ([message.body isKindOfClass:[MTRpcResultMessage class]])
|
||||
{
|
||||
@ -871,6 +873,8 @@
|
||||
}
|
||||
}
|
||||
|
||||
double sentTimestamp = request.requestContext.sentTimestamp;
|
||||
|
||||
request.requestContext = nil;
|
||||
|
||||
if (restartRequest)
|
||||
@ -879,11 +883,17 @@
|
||||
}
|
||||
else
|
||||
{
|
||||
void (^completed)(id result, NSTimeInterval completionTimestamp, id error) = [request.completed copy];
|
||||
void (^completed)(id result, MTRequestResponseInfo *info, id error) = [request.completed copy];
|
||||
[_requests removeObjectAtIndex:(NSUInteger)index];
|
||||
|
||||
if (completed)
|
||||
completed(rpcResult, message.timestamp, rpcError);
|
||||
if (completed) {
|
||||
double duration = 0.0;
|
||||
if (sentTimestamp != 0.0) {
|
||||
duration = CFAbsoluteTimeGetCurrent() - sentTimestamp;
|
||||
}
|
||||
MTRequestResponseInfo *info = [[MTRequestResponseInfo alloc] initWithNetworkType:networkType timestamp:message.timestamp duration:duration];
|
||||
completed(rpcResult, info, rpcError);
|
||||
}
|
||||
}
|
||||
|
||||
break;
|
||||
|
@ -121,7 +121,7 @@
|
||||
}
|
||||
}
|
||||
|
||||
- (void)mtProto:(MTProto *)mtProto receivedMessage:(MTIncomingMessage *)message authInfoSelector:(MTDatacenterAuthInfoSelector)authInfoSelector
|
||||
- (void)mtProto:(MTProto *)mtProto receivedMessage:(MTIncomingMessage *)message authInfoSelector:(MTDatacenterAuthInfoSelector)authInfoSelector networkType:(int32_t)networkType
|
||||
{
|
||||
if (message.messageId == _messageId)
|
||||
{
|
||||
|
@ -13,7 +13,7 @@
|
||||
|
||||
- (void)tcpConnectionOpened:(MTTcpConnection *)connection;
|
||||
- (void)tcpConnectionClosed:(MTTcpConnection *)connection error:(bool)error;
|
||||
- (void)tcpConnectionReceivedData:(MTTcpConnection *)connection data:(NSData *)data;
|
||||
- (void)tcpConnectionReceivedData:(MTTcpConnection *)connection networkType:(int32_t)networkType data:(NSData *)data;
|
||||
- (void)tcpConnectionReceivedQuickAck:(MTTcpConnection *)connection quickAck:(int32_t)quickAck;
|
||||
- (void)tcpConnectionDecodePacketProgressToken:(MTTcpConnection *)connection data:(NSData *)data token:(int64_t)token completion:(void (^)(int64_t token, id packetProgressToken))completion;
|
||||
- (void)tcpConnectionProgressUpdated:(MTTcpConnection *)connection packetProgressToken:(id)packetProgressToken packetLength:(NSUInteger)packetLength progress:(float)progress;
|
||||
|
@ -664,10 +664,10 @@ struct ctr_state {
|
||||
}
|
||||
}
|
||||
|
||||
- (void)socket:(GCDAsyncSocket *)socket didReadData:(NSData *)rawData withTag:(long)tag {
|
||||
- (void)socket:(GCDAsyncSocket *)socket didReadData:(NSData *)rawData withTag:(long)tag networkType:(int32_t)networkType {
|
||||
id<MTTcpConnectionInterfaceDelegate> delegate = _delegate;
|
||||
if (delegate) {
|
||||
[delegate connectionInterfaceDidReadData:rawData withTag:tag];
|
||||
[delegate connectionInterfaceDidReadData:rawData withTag:tag networkType:networkType];
|
||||
}
|
||||
}
|
||||
|
||||
@ -717,6 +717,7 @@ struct ctr_state {
|
||||
|
||||
id<MTTcpConnectionInterface> _socket;
|
||||
bool _closed;
|
||||
int32_t _lastNetworkType;
|
||||
|
||||
bool _useIntermediateFormat;
|
||||
|
||||
@ -1459,7 +1460,7 @@ struct ctr_state {
|
||||
[_socket readDataToLength:4 withTimeout:-1 tag:MTTcpSocksRequest];
|
||||
}
|
||||
|
||||
- (void)connectionInterfaceDidReadData:(NSData *)rawData withTag:(long)tag
|
||||
- (void)connectionInterfaceDidReadData:(NSData *)rawData withTag:(long)tag networkType:(int32_t)networkType
|
||||
{
|
||||
if (_closed)
|
||||
return;
|
||||
@ -1754,12 +1755,12 @@ struct ctr_state {
|
||||
[_socket readDataToLength:(int)nextLength withTimeout:-1 tag:MTTcpSocksReceiveComplexPacketPart];
|
||||
return;
|
||||
} else if (tag == MTTcpSocksReceiveComplexPacketPart) {
|
||||
[self addReadData:rawData];
|
||||
[self addReadData:rawData networkType:networkType];
|
||||
|
||||
[_socket readDataToLength:5 withTimeout:-1 tag:MTTcpSocksReceiveComplexLength];
|
||||
return;
|
||||
} else {
|
||||
[self addReadData:rawData];
|
||||
[self addReadData:rawData networkType:networkType];
|
||||
}
|
||||
}
|
||||
|
||||
@ -1775,15 +1776,15 @@ struct ctr_state {
|
||||
[_receivedDataBuffer replaceBytesInRange:NSMakeRange(0, _pendingReceiveData.length) withBytes:nil length:0];
|
||||
int tag = _pendingReceiveData.tag;
|
||||
_pendingReceiveData = nil;
|
||||
[self processReceivedData:rawData tag:tag];
|
||||
[self processReceivedData:rawData tag:tag networkType:_lastNetworkType];
|
||||
}
|
||||
}
|
||||
|
||||
- (void)addReadData:(NSData *)data {
|
||||
- (void)addReadData:(NSData *)data networkType:(int32_t)networkType {
|
||||
if (_pendingReceiveData != nil && _pendingReceiveData.length == data.length) {
|
||||
int tag = _pendingReceiveData.tag;
|
||||
_pendingReceiveData = nil;
|
||||
[self processReceivedData:data tag:tag];
|
||||
[self processReceivedData:data tag:tag networkType:networkType];
|
||||
} else {
|
||||
[_receivedDataBuffer appendData:data];
|
||||
if (_pendingReceiveData != nil) {
|
||||
@ -1792,13 +1793,15 @@ struct ctr_state {
|
||||
[_receivedDataBuffer replaceBytesInRange:NSMakeRange(0, _pendingReceiveData.length) withBytes:nil length:0];
|
||||
int tag = _pendingReceiveData.tag;
|
||||
_pendingReceiveData = nil;
|
||||
[self processReceivedData:rawData tag:tag];
|
||||
[self processReceivedData:rawData tag:tag networkType:networkType];
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
- (void)processReceivedData:(NSData *)rawData tag:(int)tag {
|
||||
- (void)processReceivedData:(NSData *)rawData tag:(int)tag networkType:(int32_t)networkType {
|
||||
_lastNetworkType = networkType;
|
||||
|
||||
NSMutableData *decryptedData = [[NSMutableData alloc] initWithLength:rawData.length];
|
||||
[_incomingAesCtr encryptIn:rawData.bytes out:decryptedData.mutableBytes len:rawData.length];
|
||||
|
||||
@ -1968,8 +1971,8 @@ struct ctr_state {
|
||||
if (_connectionReceivedData)
|
||||
_connectionReceivedData(packetData);
|
||||
id<MTTcpConnectionDelegate> delegate = _delegate;
|
||||
if ([delegate respondsToSelector:@selector(tcpConnectionReceivedData:data:)])
|
||||
[delegate tcpConnectionReceivedData:self data:packetData];
|
||||
if ([delegate respondsToSelector:@selector(tcpConnectionReceivedData:networkType:data:)])
|
||||
[delegate tcpConnectionReceivedData:self networkType:networkType data:packetData];
|
||||
}
|
||||
|
||||
if (_useIntermediateFormat) {
|
||||
|
@ -452,7 +452,7 @@ static const NSTimeInterval MTTcpTransportSleepWatchdogTimeout = 60.0;
|
||||
}];
|
||||
}
|
||||
|
||||
- (void)tcpConnectionReceivedData:(MTTcpConnection *)connection data:(NSData *)data
|
||||
- (void)tcpConnectionReceivedData:(MTTcpConnection *)connection networkType:(int32_t)networkType data:(NSData *)data
|
||||
{
|
||||
MTTcpTransportContext *transportContext = _transportContext;
|
||||
[[MTTcpTransport tcpTransportQueue] dispatchOnQueue:^
|
||||
@ -464,7 +464,7 @@ static const NSTimeInterval MTTcpTransportSleepWatchdogTimeout = 60.0;
|
||||
[self startActualizationPingResendTimer];
|
||||
|
||||
__weak MTTcpTransport *weakSelf = self;
|
||||
[self _processIncomingData:data scheme:connection.scheme transactionId:connection.internalId requestTransactionAfterProcessing:false decodeResult:^(id transactionId, bool success)
|
||||
[self _processIncomingData:data scheme:connection.scheme networkType:networkType transactionId:connection.internalId requestTransactionAfterProcessing:false decodeResult:^(id transactionId, bool success)
|
||||
{
|
||||
if (success)
|
||||
{
|
||||
@ -760,7 +760,7 @@ static const NSTimeInterval MTTcpTransportSleepWatchdogTimeout = 60.0;
|
||||
}];
|
||||
}
|
||||
|
||||
- (void)mtProto:(MTProto *)__unused mtProto receivedMessage:(MTIncomingMessage *)incomingMessage authInfoSelector:(MTDatacenterAuthInfoSelector)authInfoSelector
|
||||
- (void)mtProto:(MTProto *)__unused mtProto receivedMessage:(MTIncomingMessage *)incomingMessage authInfoSelector:(MTDatacenterAuthInfoSelector)authInfoSelector networkType:(int32_t)networkType
|
||||
{
|
||||
if ([incomingMessage.body isKindOfClass:[MTPongMessage class]])
|
||||
{
|
||||
|
@ -127,7 +127,7 @@
|
||||
}
|
||||
}
|
||||
|
||||
- (void)mtProto:(MTProto *)mtProto receivedMessage:(MTIncomingMessage *)message authInfoSelector:(MTDatacenterAuthInfoSelector)authInfoSelector
|
||||
- (void)mtProto:(MTProto *)mtProto receivedMessage:(MTIncomingMessage *)message authInfoSelector:(MTDatacenterAuthInfoSelector)authInfoSelector networkType:(int32_t)networkType
|
||||
{
|
||||
if ([message.body isKindOfClass:[MTFutureSaltsMessage class]] && ((MTFutureSaltsMessage *)message.body).requestMessageId == _currentMessageId)
|
||||
{
|
||||
|
@ -58,12 +58,12 @@
|
||||
{
|
||||
}
|
||||
|
||||
- (void)_processIncomingData:(NSData *)data scheme:(MTTransportScheme *)scheme transactionId:(id)transactionId requestTransactionAfterProcessing:(bool)requestTransactionAfterProcessing decodeResult:(void (^)(id transactionId, bool success))decodeResult
|
||||
- (void)_processIncomingData:(NSData *)data scheme:(MTTransportScheme *)scheme networkType:(int32_t)networkType transactionId:(id)transactionId requestTransactionAfterProcessing:(bool)requestTransactionAfterProcessing decodeResult:(void (^)(id transactionId, bool success))decodeResult
|
||||
{
|
||||
id<MTTransportDelegate> delegate = _delegate;
|
||||
if ([delegate respondsToSelector:@selector(transportHasIncomingData:scheme:data:transactionId:requestTransactionAfterProcessing:decodeResult:)])
|
||||
if ([delegate respondsToSelector:@selector(transportHasIncomingData:scheme:networkType:data:transactionId:requestTransactionAfterProcessing:decodeResult:)])
|
||||
{
|
||||
[delegate transportHasIncomingData:self scheme:scheme data:data transactionId:transactionId requestTransactionAfterProcessing:requestTransactionAfterProcessing decodeResult:decodeResult];
|
||||
[delegate transportHasIncomingData:self scheme:scheme networkType:networkType data:data transactionId:transactionId requestTransactionAfterProcessing:requestTransactionAfterProcessing decodeResult:decodeResult];
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -970,6 +970,8 @@ public class Account {
|
||||
private var lastSmallLogPostTimestamp: Double?
|
||||
private let smallLogPostDisposable = MetaDisposable()
|
||||
|
||||
let networkStatsContext: NetworkStatsContext
|
||||
|
||||
public init(accountManager: AccountManager<TelegramAccountManagerTypes>, id: AccountRecordId, basePath: String, testingEnvironment: Bool, postbox: Postbox, network: Network, networkArguments: NetworkInitializationArguments, peerId: PeerId, auxiliaryMethods: AccountAuxiliaryMethods, supplementary: Bool) {
|
||||
self.accountManager = accountManager
|
||||
self.id = id
|
||||
@ -983,6 +985,8 @@ public class Account {
|
||||
self.auxiliaryMethods = auxiliaryMethods
|
||||
self.supplementary = supplementary
|
||||
|
||||
self.networkStatsContext = NetworkStatsContext(postbox: postbox)
|
||||
|
||||
self.peerInputActivityManager = PeerInputActivityManager()
|
||||
self.callSessionManager = CallSessionManager(postbox: postbox, network: network, maxLayer: networkArguments.voipMaxLayer, versions: networkArguments.voipVersions, addUpdates: { [weak self] updates in
|
||||
self?.stateManager?.addUpdates(updates)
|
||||
|
@ -362,16 +362,16 @@ class Download: NSObject, MTRequestMessageServiceDelegate {
|
||||
return true
|
||||
}
|
||||
|
||||
request.completed = { (boxedResponse, timestamp, error) -> () in
|
||||
request.completed = { (boxedResponse, info, error) -> () in
|
||||
if let error = error {
|
||||
subscriber.putError((error, timestamp))
|
||||
subscriber.putError((error, info?.timestamp ?? 0.0))
|
||||
} else {
|
||||
if let result = (boxedResponse as! BoxedMessage).body as? T {
|
||||
subscriber.putNext((result, timestamp))
|
||||
subscriber.putNext((result, info?.timestamp ?? 0.0))
|
||||
subscriber.putCompletion()
|
||||
}
|
||||
else {
|
||||
subscriber.putError((MTRpcError(errorCode: 500, errorDescription: "TL_VERIFICATION_ERROR"), timestamp))
|
||||
subscriber.putError((MTRpcError(errorCode: 500, errorDescription: "TL_VERIFICATION_ERROR"), info?.timestamp ?? 0.0))
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -386,7 +386,7 @@ class Download: NSObject, MTRequestMessageServiceDelegate {
|
||||
}
|
||||
}
|
||||
|
||||
func rawRequest(_ data: (FunctionDescription, Buffer, (Buffer) -> Any?), automaticFloodWait: Bool = true, failOnServerErrors: Bool = false, logPrefix: String = "") -> Signal<(Any, Double), (MTRpcError, Double)> {
|
||||
func rawRequest(_ data: (FunctionDescription, Buffer, (Buffer) -> Any?), automaticFloodWait: Bool = true, failOnServerErrors: Bool = false, logPrefix: String = "") -> Signal<(Any, NetworkResponseInfo), (MTRpcError, Double)> {
|
||||
let requestService = self.requestService
|
||||
return Signal { subscriber in
|
||||
let request = MTRequest()
|
||||
@ -414,11 +414,16 @@ class Download: NSObject, MTRequestMessageServiceDelegate {
|
||||
return true
|
||||
}
|
||||
|
||||
request.completed = { (boxedResponse, timestamp, error) -> () in
|
||||
request.completed = { (boxedResponse, info, error) -> () in
|
||||
if let error = error {
|
||||
subscriber.putError((error, timestamp))
|
||||
subscriber.putError((error, info?.timestamp ?? 0))
|
||||
} else {
|
||||
subscriber.putNext(((boxedResponse as! BoxedMessage).body, timestamp))
|
||||
let mappedInfo = NetworkResponseInfo(
|
||||
timestamp: info?.timestamp ?? 0.0,
|
||||
networkType: info?.networkType == 0 ? .wifi : .cellular,
|
||||
networkDuration: info?.duration ?? 0.0
|
||||
)
|
||||
subscriber.putNext(((boxedResponse as! BoxedMessage).body, mappedInfo))
|
||||
subscriber.putCompletion()
|
||||
}
|
||||
}
|
||||
|
@ -86,14 +86,17 @@ private struct DownloadWrapper {
|
||||
let network: Network
|
||||
let useMainConnection: Bool
|
||||
|
||||
func request<T>(_ data: (FunctionDescription, Buffer, DeserializeFunctionResponse<T>), tag: MediaResourceFetchTag?, continueInBackground: Bool) -> Signal<T, MTRpcError> {
|
||||
func request<T>(_ data: (FunctionDescription, Buffer, DeserializeFunctionResponse<T>), tag: MediaResourceFetchTag?, continueInBackground: Bool) -> Signal<(T, NetworkResponseInfo), MTRpcError> {
|
||||
let target: MultiplexedRequestTarget
|
||||
if self.isCdn {
|
||||
target = .cdn(Int(self.datacenterId))
|
||||
} else {
|
||||
target = .main(Int(self.datacenterId))
|
||||
}
|
||||
return network.multiplexedRequestManager.request(to: target, consumerId: self.consumerId, data: data, tag: tag, continueInBackground: continueInBackground)
|
||||
return network.multiplexedRequestManager.requestWithAdditionalInfo(to: target, consumerId: self.consumerId, data: data, tag: tag, continueInBackground: continueInBackground)
|
||||
|> mapError { error, _ -> MTRpcError in
|
||||
return error
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -172,7 +175,7 @@ private final class MultipartCdnHashSource {
|
||||
self.clusterContexts[offset] = clusterContext
|
||||
|
||||
disposable.set((self.masterDownload.request(Api.functions.upload.getCdnFileHashes(fileToken: Buffer(data: self.fileToken), offset: offset), tag: nil, continueInBackground: self.continueInBackground)
|
||||
|> map { partHashes -> [Int64: Data] in
|
||||
|> map { partHashes, _ -> [Int64: Data] in
|
||||
var parsedPartHashes: [Int64: Data] = [:]
|
||||
for part in partHashes {
|
||||
switch part {
|
||||
@ -288,9 +291,20 @@ private final class MultipartCdnHashSource {
|
||||
private enum MultipartFetchSource {
|
||||
case none
|
||||
case master(location: MultipartFetchMasterLocation, download: DownloadWrapper)
|
||||
case cdn(masterDatacenterId: Int32, fileToken: Data, key: Data, iv: Data, download: DownloadWrapper, masterDownload: DownloadWrapper, hashSource: MultipartCdnHashSource)
|
||||
case cdn(masterDatacenterId: Int32, cdnDatacenterId: Int32, fileToken: Data, key: Data, iv: Data, download: DownloadWrapper, masterDownload: DownloadWrapper, hashSource: MultipartCdnHashSource)
|
||||
|
||||
func request(offset: Int64, limit: Int64, tag: MediaResourceFetchTag?, resource: TelegramMediaResource, resourceReference: FetchResourceReference, fileReference: Data?, continueInBackground: Bool) -> Signal<Data, MultipartFetchDownloadError> {
|
||||
var effectiveDatacenterId: Int32 {
|
||||
switch self {
|
||||
case .none:
|
||||
return 0
|
||||
case let .master(location, _):
|
||||
return location.datacenterId
|
||||
case let .cdn(_, cdnDatacenterId, _, _, _, _, _, _):
|
||||
return cdnDatacenterId
|
||||
}
|
||||
}
|
||||
|
||||
func request(offset: Int64, limit: Int64, tag: MediaResourceFetchTag?, resource: TelegramMediaResource, resourceReference: FetchResourceReference, fileReference: Data?, continueInBackground: Bool) -> Signal<(Data, NetworkResponseInfo), MultipartFetchDownloadError> {
|
||||
var resourceReferenceValue: MediaResourceReference?
|
||||
switch resourceReference {
|
||||
case .forceRevalidate:
|
||||
@ -323,14 +337,14 @@ private enum MultipartFetchSource {
|
||||
}
|
||||
return .generic
|
||||
}
|
||||
|> mapToSignal { result -> Signal<Data, MultipartFetchDownloadError> in
|
||||
|> mapToSignal { result, info -> Signal<(Data, NetworkResponseInfo), MultipartFetchDownloadError> in
|
||||
switch result {
|
||||
case let .file(_, _, bytes):
|
||||
var resultData = bytes.makeData()
|
||||
if resultData.count > Int(limit) {
|
||||
resultData.count = Int(limit)
|
||||
}
|
||||
return .single(resultData)
|
||||
return .single((resultData, info))
|
||||
case let .fileCdnRedirect(dcId, fileToken, encryptionKey, encryptionIv, partHashes):
|
||||
var parsedPartHashes: [Int64: Data] = [:]
|
||||
for part in partHashes {
|
||||
@ -350,18 +364,18 @@ private enum MultipartFetchSource {
|
||||
|> mapError { error -> MultipartFetchDownloadError in
|
||||
return .fatal
|
||||
}
|
||||
|> mapToSignal { result -> Signal<Data, MultipartFetchDownloadError> in
|
||||
|> mapToSignal { result, info -> Signal<(Data, NetworkResponseInfo), MultipartFetchDownloadError> in
|
||||
switch result {
|
||||
case let .webFile(_, _, _, _, bytes):
|
||||
var resultData = bytes.makeData()
|
||||
if resultData.count > Int(limit) {
|
||||
resultData.count = Int(limit)
|
||||
}
|
||||
return .single(resultData)
|
||||
return .single((resultData, info))
|
||||
}
|
||||
}
|
||||
}
|
||||
case let .cdn(masterDatacenterId, fileToken, key, iv, download, _, hashSource):
|
||||
case let .cdn(masterDatacenterId, _, fileToken, key, iv, download, _, hashSource):
|
||||
var updatedLength = roundUp(Int64(limit), to: 4096)
|
||||
while updatedLength % 4096 != 0 || 1048576 % updatedLength != 0 {
|
||||
updatedLength += 1
|
||||
@ -371,13 +385,13 @@ private enum MultipartFetchSource {
|
||||
|> mapError { _ -> MultipartFetchDownloadError in
|
||||
return .generic
|
||||
}
|
||||
|> mapToSignal { result -> Signal<Data, MultipartFetchDownloadError> in
|
||||
|> mapToSignal { result, info -> Signal<(Data, NetworkResponseInfo), MultipartFetchDownloadError> in
|
||||
switch result {
|
||||
case let .cdnFileReuploadNeeded(token):
|
||||
return .fail(.reuploadToCdn(masterDatacenterId: masterDatacenterId, token: token.makeData()))
|
||||
case let .cdnFile(bytes):
|
||||
if bytes.size == 0 {
|
||||
return .single(bytes.makeData())
|
||||
return .single((bytes.makeData(), info))
|
||||
} else {
|
||||
var partIv = iv
|
||||
let partIvCount = partIv.count
|
||||
@ -386,13 +400,14 @@ private enum MultipartFetchSource {
|
||||
var ivOffset: Int32 = Int32(clamping: (offset / 16)).bigEndian
|
||||
memcpy(bytes.advanced(by: partIvCount - 4), &ivOffset, 4)
|
||||
}
|
||||
return .single(MTAesCtrDecrypt(bytes.makeData(), key, partIv)!)
|
||||
return .single((MTAesCtrDecrypt(bytes.makeData(), key, partIv)!, info))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return combineLatest(part, hashSource.get(offset: offset, limit: limit))
|
||||
|> mapToSignal { partData, hashData -> Signal<Data, MultipartFetchDownloadError> in
|
||||
|> mapToSignal { partDataAndInfo, hashData -> Signal<(Data, NetworkResponseInfo), MultipartFetchDownloadError> in
|
||||
let (partData, info) = partDataAndInfo
|
||||
var localOffset: Int64 = 0
|
||||
while localOffset < partData.count {
|
||||
let dataToHash = partData.subdata(in: Int(localOffset) ..< min(partData.count, Int(localOffset + Int64(dataHashLength))))
|
||||
@ -407,7 +422,7 @@ private enum MultipartFetchSource {
|
||||
|
||||
localOffset += Int64(dataHashLength)
|
||||
}
|
||||
return .single(partData)
|
||||
return .single((partData, info))
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -425,6 +440,21 @@ private final class MultipartFetchManager {
|
||||
var byteCount: Int
|
||||
}
|
||||
|
||||
private final class FetchingPart {
|
||||
let size: Int64
|
||||
let disposable: Disposable
|
||||
let startTime: Double
|
||||
|
||||
init(
|
||||
size: Int64,
|
||||
disposable: Disposable
|
||||
) {
|
||||
self.size = size
|
||||
self.disposable = disposable
|
||||
self.startTime = CFAbsoluteTimeGetCurrent()
|
||||
}
|
||||
}
|
||||
|
||||
let parallelParts: Int
|
||||
let defaultPartSize: Int64
|
||||
var partAlignment: Int64 = 4 * 1024
|
||||
@ -445,6 +475,7 @@ private final class MultipartFetchManager {
|
||||
|
||||
let postbox: Postbox
|
||||
let network: Network
|
||||
let networkStatsContext: NetworkStatsContext?
|
||||
let revalidationContext: MediaReferenceRevalidationContext?
|
||||
let continueInBackground: Bool
|
||||
let partReady: (Int64, Data) -> Void
|
||||
@ -454,7 +485,7 @@ private final class MultipartFetchManager {
|
||||
private let useMainConnection: Bool
|
||||
private var source: MultipartFetchSource
|
||||
|
||||
var fetchingParts: [Int64: (Int64, Disposable)] = [:]
|
||||
private var fetchingParts: [Int64: FetchingPart] = [:]
|
||||
var nextFetchingPartId = 0
|
||||
var fetchedParts: [Int64: (Int64, Data)] = [:]
|
||||
var cachedPartHashes: [Int64: Data] = [:]
|
||||
@ -474,7 +505,7 @@ private final class MultipartFetchManager {
|
||||
private var fetchSpeedRecords: [FetchSpeedRecord] = []
|
||||
private var totalFetchedByteCount: Int = 0
|
||||
|
||||
init(resource: TelegramMediaResource, parameters: MediaResourceFetchParameters?, size: Int64?, intervals: Signal<[(Range<Int64>, MediaBoxFetchPriority)], NoError>, encryptionKey: SecretFileEncryptionKey?, decryptedSize: Int64?, location: MultipartFetchMasterLocation, postbox: Postbox, network: Network, revalidationContext: MediaReferenceRevalidationContext?, partReady: @escaping (Int64, Data) -> Void, reportCompleteSize: @escaping (Int64) -> Void, finishWithError: @escaping (MediaResourceDataFetchError) -> Void, useMainConnection: Bool) {
|
||||
init(resource: TelegramMediaResource, parameters: MediaResourceFetchParameters?, size: Int64?, intervals: Signal<[(Range<Int64>, MediaBoxFetchPriority)], NoError>, encryptionKey: SecretFileEncryptionKey?, decryptedSize: Int64?, location: MultipartFetchMasterLocation, postbox: Postbox, network: Network, networkStatsContext: NetworkStatsContext?, revalidationContext: MediaReferenceRevalidationContext?, partReady: @escaping (Int64, Data) -> Void, reportCompleteSize: @escaping (Int64) -> Void, finishWithError: @escaping (MediaResourceDataFetchError) -> Void, useMainConnection: Bool) {
|
||||
self.resource = resource
|
||||
self.parameters = parameters
|
||||
self.consumerId = Int64.random(in: Int64.min ... Int64.max)
|
||||
@ -526,6 +557,7 @@ private final class MultipartFetchManager {
|
||||
self.state = MultipartDownloadState(encryptionKey: encryptionKey, decryptedSize: decryptedSize)
|
||||
self.postbox = postbox
|
||||
self.network = network
|
||||
self.networkStatsContext = networkStatsContext
|
||||
self.revalidationContext = revalidationContext
|
||||
self.source = .master(location: location, download: DownloadWrapper(consumerId: self.consumerId, datacenterId: location.datacenterId, isCdn: false, network: network, useMainConnection: self.useMainConnection))
|
||||
self.partReady = partReady
|
||||
@ -613,8 +645,8 @@ private final class MultipartFetchManager {
|
||||
func cancel() {
|
||||
self.queue.async {
|
||||
self.source = .none
|
||||
for (_, (_, disposable)) in self.fetchingParts {
|
||||
disposable.dispose()
|
||||
for (_, fetchingPart) in self.fetchingParts {
|
||||
fetchingPart.disposable.dispose()
|
||||
}
|
||||
self.reuploadToCdnDisposable.dispose()
|
||||
self.revalidateMediaReferenceDisposable.dispose()
|
||||
@ -680,8 +712,8 @@ private final class MultipartFetchManager {
|
||||
}
|
||||
}
|
||||
|
||||
for (offset, (size, _)) in self.fetchingParts {
|
||||
removeFromFetchIntervals.formUnion(RangeSet<Int64>(offset ..< (offset + size)))
|
||||
for (offset, fetchingPart) in self.fetchingParts {
|
||||
removeFromFetchIntervals.formUnion(RangeSet<Int64>(offset ..< (offset + fetchingPart.size)))
|
||||
}
|
||||
|
||||
if let completeSize = self.completeSize {
|
||||
@ -758,16 +790,27 @@ private final class MultipartFetchManager {
|
||||
insertIndex += 1
|
||||
}
|
||||
|
||||
let partSize: Int32 = Int32(downloadRange.upperBound - downloadRange.lowerBound)
|
||||
let part = self.source.request(offset: downloadRange.lowerBound, limit: downloadRange.upperBound - downloadRange.lowerBound, tag: self.parameters?.tag, resource: self.resource, resourceReference: self.resourceReference, fileReference: self.fileReference, continueInBackground: self.continueInBackground)
|
||||
//|> delay(5.0, queue: self.queue)
|
||||
|> deliverOn(self.queue)
|
||||
let partDisposable = MetaDisposable()
|
||||
self.fetchingParts[downloadRange.lowerBound] = (Int64(downloadRange.count), partDisposable)
|
||||
|
||||
partDisposable.set(part.start(next: { [weak self] data in
|
||||
self.fetchingParts[downloadRange.lowerBound] = FetchingPart(size: Int64(downloadRange.count), disposable: partDisposable)
|
||||
let partStartTimestamp = CFAbsoluteTimeGetCurrent()
|
||||
let effectiveDatacenterId = self.source.effectiveDatacenterId
|
||||
partDisposable.set(part.start(next: { [weak self] data, info in
|
||||
guard let strongSelf = self else {
|
||||
return
|
||||
}
|
||||
|
||||
strongSelf.networkStatsContext?.add(downloadEvents: [
|
||||
NetworkStatsContext.DownloadEvent(
|
||||
networkType: info.networkType,
|
||||
datacenterId: effectiveDatacenterId,
|
||||
size: Double(partSize),
|
||||
networkDuration: info.networkDuration,
|
||||
issueDuration: CFAbsoluteTimeGetCurrent() - partStartTimestamp
|
||||
)
|
||||
])
|
||||
if data.count < downloadRange.count {
|
||||
strongSelf.completeSize = downloadRange.lowerBound + Int64(data.count)
|
||||
}
|
||||
@ -788,7 +831,7 @@ private final class MultipartFetchManager {
|
||||
if !strongSelf.revalidatingMediaReference && !strongSelf.revalidatedMediaReference {
|
||||
strongSelf.revalidatingMediaReference = true
|
||||
for (_, part) in strongSelf.fetchingParts {
|
||||
part.1.dispose()
|
||||
part.disposable.dispose()
|
||||
}
|
||||
strongSelf.fetchingParts.removeAll()
|
||||
|
||||
@ -819,7 +862,7 @@ private final class MultipartFetchManager {
|
||||
switch strongSelf.source {
|
||||
case let .master(location, download):
|
||||
strongSelf.partAlignment = dataHashLength
|
||||
strongSelf.source = .cdn(masterDatacenterId: location.datacenterId, fileToken: token, key: key, iv: iv, download: DownloadWrapper(consumerId: strongSelf.consumerId, datacenterId: id, isCdn: true, network: strongSelf.network, useMainConnection: strongSelf.useMainConnection), masterDownload: download, hashSource: MultipartCdnHashSource(queue: strongSelf.queue, fileToken: token, hashes: partHashes, masterDownload: download, continueInBackground: strongSelf.continueInBackground))
|
||||
strongSelf.source = .cdn(masterDatacenterId: location.datacenterId, cdnDatacenterId: id, fileToken: token, key: key, iv: iv, download: DownloadWrapper(consumerId: strongSelf.consumerId, datacenterId: id, isCdn: true, network: strongSelf.network, useMainConnection: strongSelf.useMainConnection), masterDownload: download, hashSource: MultipartCdnHashSource(queue: strongSelf.queue, fileToken: token, hashes: partHashes, masterDownload: download, continueInBackground: strongSelf.continueInBackground))
|
||||
strongSelf.checkState()
|
||||
case .cdn, .none:
|
||||
break
|
||||
@ -828,10 +871,13 @@ private final class MultipartFetchManager {
|
||||
switch strongSelf.source {
|
||||
case .master, .none:
|
||||
break
|
||||
case let .cdn(_, fileToken, _, _, _, masterDownload, _):
|
||||
case let .cdn(_, _, fileToken, _, _, _, masterDownload, _):
|
||||
if !strongSelf.reuploadingToCdn {
|
||||
strongSelf.reuploadingToCdn = true
|
||||
let reupload: Signal<[Api.FileHash], NoError> = masterDownload.request(Api.functions.upload.reuploadCdnFile(fileToken: Buffer(data: fileToken), requestToken: Buffer(data: token)), tag: nil, continueInBackground: strongSelf.continueInBackground)
|
||||
|> map { result, _ -> [Api.FileHash] in
|
||||
return result
|
||||
}
|
||||
|> `catch` { _ -> Signal<[Api.FileHash], NoError> in
|
||||
return .single([])
|
||||
}
|
||||
@ -856,6 +902,7 @@ public func standaloneMultipartFetch(postbox: Postbox, network: Network, resourc
|
||||
postbox: postbox,
|
||||
network: network,
|
||||
mediaReferenceRevalidationContext: nil,
|
||||
networkStatsContext: nil,
|
||||
resource: resource,
|
||||
datacenterId: datacenterId,
|
||||
size: size,
|
||||
@ -877,6 +924,7 @@ private func multipartFetchV1(
|
||||
postbox: Postbox,
|
||||
network: Network,
|
||||
mediaReferenceRevalidationContext: MediaReferenceRevalidationContext?,
|
||||
networkStatsContext: NetworkStatsContext?,
|
||||
resource: TelegramMediaResource,
|
||||
datacenterId: Int,
|
||||
size: Int64?,
|
||||
@ -944,7 +992,7 @@ private func multipartFetchV1(
|
||||
subscriber.putNext(.reset)
|
||||
}
|
||||
|
||||
let manager = MultipartFetchManager(resource: resource, parameters: parameters, size: size, intervals: intervals, encryptionKey: encryptionKey, decryptedSize: decryptedSize, location: location, postbox: postbox, network: network, revalidationContext: mediaReferenceRevalidationContext, partReady: { dataOffset, data in
|
||||
let manager = MultipartFetchManager(resource: resource, parameters: parameters, size: size, intervals: intervals, encryptionKey: encryptionKey, decryptedSize: decryptedSize, location: location, postbox: postbox, network: network, networkStatsContext: networkStatsContext, revalidationContext: mediaReferenceRevalidationContext, partReady: { dataOffset, data in
|
||||
subscriber.putNext(.dataPart(resourceOffset: dataOffset, data: data, range: 0 ..< Int64(data.count), complete: false))
|
||||
}, reportCompleteSize: { size in
|
||||
subscriber.putNext(.resourceSizeUpdated(size))
|
||||
@ -968,6 +1016,7 @@ func multipartFetch(
|
||||
postbox: Postbox,
|
||||
network: Network,
|
||||
mediaReferenceRevalidationContext: MediaReferenceRevalidationContext?,
|
||||
networkStatsContext: NetworkStatsContext?,
|
||||
resource: TelegramMediaResource,
|
||||
datacenterId: Int,
|
||||
size: Int64?,
|
||||
@ -998,6 +1047,7 @@ func multipartFetch(
|
||||
postbox: postbox,
|
||||
network: network,
|
||||
mediaReferenceRevalidationContext: mediaReferenceRevalidationContext,
|
||||
networkStatsContext: networkStatsContext,
|
||||
resource: resource,
|
||||
datacenterId: datacenterId,
|
||||
size: size,
|
||||
|
@ -33,10 +33,10 @@ private final class RequestData {
|
||||
let continueInBackground: Bool
|
||||
let automaticFloodWait: Bool
|
||||
let deserializeResponse: (Buffer) -> Any?
|
||||
let completed: (Any, Double) -> Void
|
||||
let completed: (Any, NetworkResponseInfo) -> Void
|
||||
let error: (MTRpcError, Double) -> Void
|
||||
|
||||
init(id: Int32, consumerId: Int64, target: MultiplexedRequestTarget, functionDescription: FunctionDescription, payload: Buffer, tag: MediaResourceFetchTag?, continueInBackground: Bool, automaticFloodWait: Bool, deserializeResponse: @escaping (Buffer) -> Any?, completed: @escaping (Any, Double) -> Void, error: @escaping (MTRpcError, Double) -> Void) {
|
||||
init(id: Int32, consumerId: Int64, target: MultiplexedRequestTarget, functionDescription: FunctionDescription, payload: Buffer, tag: MediaResourceFetchTag?, continueInBackground: Bool, automaticFloodWait: Bool, deserializeResponse: @escaping (Buffer) -> Any?, completed: @escaping (Any, NetworkResponseInfo) -> Void, error: @escaping (MTRpcError, Double) -> Void) {
|
||||
self.id = id
|
||||
self.consumerId = consumerId
|
||||
self.target = target
|
||||
@ -80,6 +80,11 @@ private struct MultiplexedRequestTargetTimerKey: Equatable, Hashable {
|
||||
|
||||
private typealias SignalKitTimer = SwiftSignalKit.Timer
|
||||
|
||||
struct NetworkResponseInfo {
|
||||
var timestamp: Double
|
||||
var networkType: NetworkStatsContext.NetworkType
|
||||
var networkDuration: Double
|
||||
}
|
||||
|
||||
private final class MultiplexedRequestManagerContext {
|
||||
private let queue: Queue
|
||||
@ -109,15 +114,15 @@ private final class MultiplexedRequestManagerContext {
|
||||
}
|
||||
}
|
||||
|
||||
func request(to target: MultiplexedRequestTarget, consumerId: Int64, data: (FunctionDescription, Buffer, (Buffer) -> Any?), tag: MediaResourceFetchTag?, continueInBackground: Bool, automaticFloodWait: Bool, completed: @escaping (Any, Double) -> Void, error: @escaping (MTRpcError, Double) -> Void) -> Disposable {
|
||||
func request(to target: MultiplexedRequestTarget, consumerId: Int64, data: (FunctionDescription, Buffer, (Buffer) -> Any?), tag: MediaResourceFetchTag?, continueInBackground: Bool, automaticFloodWait: Bool, completed: @escaping (Any, NetworkResponseInfo) -> Void, error: @escaping (MTRpcError, Double) -> Void) -> Disposable {
|
||||
let targetKey = MultiplexedRequestTargetKey(target: target, continueInBackground: continueInBackground)
|
||||
|
||||
let requestId = self.nextId
|
||||
self.nextId += 1
|
||||
self.queuedRequests.append(RequestData(id: requestId, consumerId: consumerId, target: target, functionDescription: data.0, payload: data.1, tag: tag, continueInBackground: continueInBackground, automaticFloodWait: automaticFloodWait, deserializeResponse: { buffer in
|
||||
return data.2(buffer)
|
||||
}, completed: { result, timestamp in
|
||||
completed(result, timestamp)
|
||||
}, completed: { result, info in
|
||||
completed(result, info)
|
||||
}, error: { e, timestamp in
|
||||
error(e, timestamp)
|
||||
}))
|
||||
@ -189,7 +194,7 @@ private final class MultiplexedRequestManagerContext {
|
||||
let requestId = request.id
|
||||
selectedContext.requests.append(ExecutingRequestData(requestId: requestId, disposable: disposable))
|
||||
let queue = self.queue
|
||||
disposable.set(selectedContext.worker.rawRequest((request.functionDescription, request.payload, request.deserializeResponse), automaticFloodWait: request.automaticFloodWait).start(next: { [weak self, weak selectedContext] result, timestamp in
|
||||
disposable.set(selectedContext.worker.rawRequest((request.functionDescription, request.payload, request.deserializeResponse), automaticFloodWait: request.automaticFloodWait).start(next: { [weak self, weak selectedContext] result, info in
|
||||
queue.async {
|
||||
guard let strongSelf = self else {
|
||||
return
|
||||
@ -202,7 +207,7 @@ private final class MultiplexedRequestManagerContext {
|
||||
}
|
||||
}
|
||||
}
|
||||
request.completed(result, timestamp)
|
||||
request.completed(result, info)
|
||||
strongSelf.updateState()
|
||||
}
|
||||
}, error: { [weak self, weak selectedContext] error, timestamp in
|
||||
@ -299,18 +304,18 @@ final class MultiplexedRequestManager {
|
||||
}
|
||||
}
|
||||
|
||||
func requestWithAdditionalInfo<T>(to target: MultiplexedRequestTarget, consumerId: Int64, data: (FunctionDescription, Buffer, DeserializeFunctionResponse<T>), tag: MediaResourceFetchTag?, continueInBackground: Bool, automaticFloodWait: Bool = true) -> Signal<(T, Double), (MTRpcError, Double)> {
|
||||
func requestWithAdditionalInfo<T>(to target: MultiplexedRequestTarget, consumerId: Int64, data: (FunctionDescription, Buffer, DeserializeFunctionResponse<T>), tag: MediaResourceFetchTag?, continueInBackground: Bool, automaticFloodWait: Bool = true) -> Signal<(T, NetworkResponseInfo), (MTRpcError, Double)> {
|
||||
return Signal { subscriber in
|
||||
let disposable = MetaDisposable()
|
||||
self.context.with { context in
|
||||
disposable.set(context.request(to: target, consumerId: consumerId, data: (data.0, data.1, { buffer in
|
||||
return data.2.parse(buffer)
|
||||
}), tag: tag, continueInBackground: continueInBackground, automaticFloodWait: automaticFloodWait, completed: { result, timestamp in
|
||||
}), tag: tag, continueInBackground: continueInBackground, automaticFloodWait: automaticFloodWait, completed: { result, info in
|
||||
if let result = result as? T {
|
||||
subscriber.putNext((result, timestamp))
|
||||
subscriber.putNext((result, info))
|
||||
subscriber.putCompletion()
|
||||
} else {
|
||||
subscriber.putError((MTRpcError(errorCode: 500, errorDescription: "TL_VERIFICATION_ERROR"), timestamp))
|
||||
subscriber.putError((MTRpcError(errorCode: 500, errorDescription: "TL_VERIFICATION_ERROR"), info.timestamp))
|
||||
}
|
||||
}, error: { error, timestamp in
|
||||
subscriber.putError((error, timestamp))
|
||||
|
@ -222,9 +222,10 @@ final class NetworkFrameworkTcpConnectionInterface: NSObject, MTTcpConnectionInt
|
||||
self.currentReadRequest = nil
|
||||
|
||||
weak var delegate = self.delegate
|
||||
let currentInterfaceIsWifi = self.currentInterfaceIsWifi
|
||||
self.delegateQueue.async {
|
||||
if let delegate = delegate {
|
||||
delegate.connectionInterfaceDidRead(currentReadRequest.data, withTag: currentReadRequest.request.tag)
|
||||
delegate.connectionInterfaceDidRead(currentReadRequest.data, withTag: currentReadRequest.request.tag, networkType: currentInterfaceIsWifi ? 0 : 1)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -0,0 +1,122 @@
|
||||
import Foundation
|
||||
import SwiftSignalKit
|
||||
import Postbox
|
||||
|
||||
final class NetworkStatsContext {
|
||||
enum NetworkType: Int32 {
|
||||
case wifi = 0
|
||||
case cellular = 1
|
||||
}
|
||||
|
||||
struct DownloadEvent {
|
||||
let networkType: NetworkType
|
||||
let datacenterId: Int32
|
||||
let size: Double
|
||||
let networkDuration: Double
|
||||
let issueDuration: Double
|
||||
|
||||
init(
|
||||
networkType: NetworkType,
|
||||
datacenterId: Int32,
|
||||
size: Double,
|
||||
networkDuration: Double,
|
||||
issueDuration: Double
|
||||
) {
|
||||
self.networkType = networkType
|
||||
self.datacenterId = datacenterId
|
||||
self.size = size
|
||||
self.networkDuration = networkDuration
|
||||
self.issueDuration = issueDuration
|
||||
}
|
||||
}
|
||||
|
||||
private struct TargetKey: Hashable {
|
||||
let networkType: NetworkType
|
||||
let datacenterId: Int32
|
||||
|
||||
init(networkType: NetworkType, datacenterId: Int32) {
|
||||
self.networkType = networkType
|
||||
self.datacenterId = datacenterId
|
||||
}
|
||||
}
|
||||
|
||||
private final class AverageStats {
|
||||
var networkBps: Double = 0.0
|
||||
var issueDuration: Double = 0.0
|
||||
var networkDelay: Double = 0.0
|
||||
var count: Int = 0
|
||||
var size: Int64 = 0
|
||||
}
|
||||
|
||||
private final class Impl {
|
||||
let queue: Queue
|
||||
let postbox: Postbox
|
||||
|
||||
var averageTargetStats: [TargetKey: AverageStats] = [:]
|
||||
|
||||
init(queue: Queue, postbox: Postbox) {
|
||||
self.queue = queue
|
||||
self.postbox = postbox
|
||||
}
|
||||
|
||||
func add(downloadEvents: [DownloadEvent]) {
|
||||
for event in downloadEvents {
|
||||
if event.networkDuration == 0.0 {
|
||||
continue
|
||||
}
|
||||
let targetKey = TargetKey(networkType: event.networkType, datacenterId: event.datacenterId)
|
||||
let averageStats: AverageStats
|
||||
if let current = self.averageTargetStats[targetKey] {
|
||||
averageStats = current
|
||||
} else {
|
||||
averageStats = AverageStats()
|
||||
self.averageTargetStats[targetKey] = averageStats
|
||||
}
|
||||
averageStats.count += 1
|
||||
averageStats.issueDuration += event.issueDuration
|
||||
averageStats.networkDelay += event.issueDuration - event.networkDuration
|
||||
averageStats.networkBps += event.size / event.networkDuration
|
||||
averageStats.size += Int64(event.size)
|
||||
}
|
||||
|
||||
self.maybeFlushStats()
|
||||
}
|
||||
|
||||
private func maybeFlushStats() {
|
||||
var removeKeys: [TargetKey] = []
|
||||
for (targetKey, averageStats) in self.averageTargetStats {
|
||||
if averageStats.count >= 1000 || averageStats.size >= 4 * 1024 * 1024 {
|
||||
addAppLogEvent(postbox: self.postbox, type: "download", data: .dictionary([
|
||||
"n": .number(Double(targetKey.networkType.rawValue)),
|
||||
"d": .number(Double(targetKey.datacenterId)),
|
||||
"b": .number(averageStats.networkBps / Double(averageStats.count)),
|
||||
"nd": .number(averageStats.networkDelay / Double(averageStats.count))
|
||||
]))
|
||||
removeKeys.append(targetKey)
|
||||
}
|
||||
}
|
||||
for key in removeKeys {
|
||||
self.averageTargetStats.removeValue(forKey: key)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static let sharedQueue = Queue(name: "NetworkStatsContext")
|
||||
|
||||
private let queue: Queue
|
||||
private let impl: QueueLocalObject<Impl>
|
||||
|
||||
init(postbox: Postbox) {
|
||||
let queue = NetworkStatsContext.sharedQueue
|
||||
self.queue = queue
|
||||
self.impl = QueueLocalObject(queue: queue, generate: {
|
||||
return Impl(queue: queue, postbox: postbox)
|
||||
})
|
||||
}
|
||||
|
||||
func add(downloadEvents: [DownloadEvent]) {
|
||||
self.impl.with { impl in
|
||||
impl.add(downloadEvents: downloadEvents)
|
||||
}
|
||||
}
|
||||
}
|
@ -23,7 +23,7 @@ private final class MediaResourceDataCopyFile : MediaResourceDataFetchCopyLocalI
|
||||
}
|
||||
|
||||
public func fetchCloudMediaLocation(account: Account, resource: TelegramMediaResource, datacenterId: Int, size: Int64?, intervals: Signal<[(Range<Int64>, MediaBoxFetchPriority)], NoError>, parameters: MediaResourceFetchParameters?) -> Signal<MediaResourceDataFetchResult, MediaResourceDataFetchError> {
|
||||
return multipartFetch(postbox: account.postbox, network: account.network, mediaReferenceRevalidationContext: account.mediaReferenceRevalidationContext, resource: resource, datacenterId: datacenterId, size: size, intervals: intervals, parameters: parameters)
|
||||
return multipartFetch(postbox: account.postbox, network: account.network, mediaReferenceRevalidationContext: account.mediaReferenceRevalidationContext, networkStatsContext: account.networkStatsContext, resource: resource, datacenterId: datacenterId, size: size, intervals: intervals, parameters: parameters)
|
||||
}
|
||||
|
||||
private func fetchLocalFileResource(path: String, move: Bool) -> Signal<MediaResourceDataFetchResult, NoError> {
|
||||
|
@ -5,5 +5,5 @@ import MtProtoKit
|
||||
|
||||
|
||||
func fetchSecretFileResource(account: Account, resource: SecretFileMediaResource, intervals: Signal<[(Range<Int64>, MediaBoxFetchPriority)], NoError>, parameters: MediaResourceFetchParameters?) -> Signal<MediaResourceDataFetchResult, MediaResourceDataFetchError> {
|
||||
return multipartFetch(postbox: account.postbox, network: account.network, mediaReferenceRevalidationContext: account.mediaReferenceRevalidationContext, resource: resource, datacenterId: resource.datacenterId, size: resource.size, intervals: intervals, parameters: parameters, encryptionKey: resource.key, decryptedSize: resource.decryptedSize)
|
||||
return multipartFetch(postbox: account.postbox, network: account.network, mediaReferenceRevalidationContext: account.mediaReferenceRevalidationContext, networkStatsContext: account.networkStatsContext, resource: resource, datacenterId: resource.datacenterId, size: resource.size, intervals: intervals, parameters: parameters, encryptionKey: resource.key, decryptedSize: resource.decryptedSize)
|
||||
}
|
||||
|
@ -339,7 +339,7 @@ public extension TelegramEngine {
|
||||
|> mapToSignal { datacenterId -> Signal<EngineMediaResource.Fetch.Result, EngineMediaResource.Fetch.Error> in
|
||||
let resource = AlbumCoverResource(datacenterId: Int(datacenterId), file: file, title: title, performer: performer, isThumbnail: isThumbnail)
|
||||
|
||||
return multipartFetch(postbox: self.account.postbox, network: self.account.network, mediaReferenceRevalidationContext: self.account.mediaReferenceRevalidationContext, resource: resource, datacenterId: Int(datacenterId), size: nil, intervals: .single([(0 ..< Int64.max, .default)]), parameters: MediaResourceFetchParameters(
|
||||
return multipartFetch(postbox: self.account.postbox, network: self.account.network, mediaReferenceRevalidationContext: self.account.mediaReferenceRevalidationContext, networkStatsContext: self.account.networkStatsContext, resource: resource, datacenterId: Int(datacenterId), size: nil, intervals: .single([(0 ..< Int64.max, .default)]), parameters: MediaResourceFetchParameters(
|
||||
tag: nil,
|
||||
info: TelegramCloudMediaResourceFetchInfo(
|
||||
reference: MediaResourceReference.standalone(resource: resource),
|
||||
|
Loading…
x
Reference in New Issue
Block a user