From 0a305bc84826760dd46c8dc34163ce934b66a4d2 Mon Sep 17 00:00:00 2001 From: Ali <> Date: Fri, 7 Apr 2023 18:33:30 +0400 Subject: [PATCH] Add network stats --- .../PublicHeaders/MtProtoKit/MTContext.h | 2 +- .../MtProtoKit/MTMessageService.h | 2 +- .../PublicHeaders/MtProtoKit/MTRequest.h | 12 +- .../MtProtoKit/MTRequestContext.h | 1 + .../PublicHeaders/MtProtoKit/MTTransport.h | 4 +- .../MtProtoKit/Sources/GCDAsyncSocket.h | 2 +- .../MtProtoKit/Sources/GCDAsyncSocket.m | 5 +- .../Sources/MTBackupAddressSignals.m | 2 +- .../Sources/MTBindKeyMessageService.m | 2 +- .../Sources/MTDatacenterAuthMessageService.m | 2 +- .../Sources/MTDatacenterTransferAuthAction.m | 4 +- .../MTDiscoverDatacenterAddressAction.m | 2 +- submodules/MtProtoKit/Sources/MTProto.m | 10 +- submodules/MtProtoKit/Sources/MTRequest.m | 14 ++ .../Sources/MTRequestMessageService.m | 20 ++- .../Sources/MTResendMessageService.m | 2 +- .../MtProtoKit/Sources/MTTcpConnection.h | 2 +- .../MtProtoKit/Sources/MTTcpConnection.m | 27 ++-- .../MtProtoKit/Sources/MTTcpTransport.m | 6 +- .../Sources/MTTimeSyncMessageService.m | 2 +- submodules/MtProtoKit/Sources/MTTransport.m | 6 +- .../Sources/Account/Account.swift | 4 + .../Sources/Network/Download.swift | 21 +-- .../Sources/Network/MultipartFetch.swift | 108 +++++++++++----- .../Network/MultiplexedRequestManager.swift | 27 ++-- ...tworkFrameworkTcpConnectionInterface.swift | 3 +- .../Sources/Network/NetworkStatsContext.swift | 122 ++++++++++++++++++ .../TelegramCore/Sources/State/Fetch.swift | 2 +- .../State/FetchSecretFileResource.swift | 2 +- .../Resources/TelegramEngineResources.swift | 2 +- 30 files changed, 323 insertions(+), 97 deletions(-) create mode 100644 submodules/TelegramCore/Sources/Network/NetworkStatsContext.swift diff --git a/submodules/MtProtoKit/PublicHeaders/MtProtoKit/MTContext.h b/submodules/MtProtoKit/PublicHeaders/MtProtoKit/MTContext.h index 03c3202c86..fa26508ecb 100644 --- a/submodules/MtProtoKit/PublicHeaders/MtProtoKit/MTContext.h +++ b/submodules/MtProtoKit/PublicHeaders/MtProtoKit/MTContext.h @@ -21,7 +21,7 @@ @protocol MTTcpConnectionInterfaceDelegate - (void)connectionInterfaceDidReadPartialDataOfLength:(NSUInteger)partialLength tag:(long)tag; -- (void)connectionInterfaceDidReadData:(NSData * _Nonnull)rawData withTag:(long)tag; +- (void)connectionInterfaceDidReadData:(NSData * _Nonnull)rawData withTag:(long)tag networkType:(int32_t)networkType; - (void)connectionInterfaceDidConnect; - (void)connectionInterfaceDidDisconnectWithError:(NSError * _Nullable)error; diff --git a/submodules/MtProtoKit/PublicHeaders/MtProtoKit/MTMessageService.h b/submodules/MtProtoKit/PublicHeaders/MtProtoKit/MTMessageService.h index 7f3d013a62..1dd9fdd4f8 100644 --- a/submodules/MtProtoKit/PublicHeaders/MtProtoKit/MTMessageService.h +++ b/submodules/MtProtoKit/PublicHeaders/MtProtoKit/MTMessageService.h @@ -21,7 +21,7 @@ - (MTMessageTransaction *)mtProtoMessageTransaction:(MTProto *)mtProto authInfoSelector:(MTDatacenterAuthInfoSelector)authInfoSelector sessionInfo:(MTSessionInfo *)sessionInfo scheme:(MTTransportScheme *)scheme; - (void)mtProtoDidChangeSession:(MTProto *)mtProto; - (void)mtProtoServerDidChangeSession:(MTProto *)mtProto firstValidMessageId:(int64_t)firstValidMessageId otherValidMessageIds:(NSArray *)otherValidMessageIds; -- (void)mtProto:(MTProto *)mtProto receivedMessage:(MTIncomingMessage *)message authInfoSelector:(MTDatacenterAuthInfoSelector)authInfoSelector; +- (void)mtProto:(MTProto *)mtProto receivedMessage:(MTIncomingMessage *)message authInfoSelector:(MTDatacenterAuthInfoSelector)authInfoSelector networkType:(int32_t)networkType; - (void)mtProto:(MTProto *)mtProto receivedQuickAck:(int32_t)quickAckId; - (void)mtProto:(MTProto *)mtProto transactionsMayHaveFailed:(NSArray *)transactionIds; - (void)mtProtoAllTransactionsMayHaveFailed:(MTProto *)mtProto; diff --git a/submodules/MtProtoKit/PublicHeaders/MtProtoKit/MTRequest.h b/submodules/MtProtoKit/PublicHeaders/MtProtoKit/MTRequest.h index 5f5d3d1248..28dba10b2d 100644 --- a/submodules/MtProtoKit/PublicHeaders/MtProtoKit/MTRequest.h +++ b/submodules/MtProtoKit/PublicHeaders/MtProtoKit/MTRequest.h @@ -6,6 +6,16 @@ @class MTRequestErrorContext; @class MTRpcError; +@interface MTRequestResponseInfo : NSObject + +@property (nonatomic, readonly) int32_t networkType; +@property (nonatomic, readonly) double timestamp; +@property (nonatomic, readonly) double duration; + +- (instancetype)initWithNetworkType:(int32_t)networkType timestamp:(double)timestamp duration:(double)duration; + +@end + @interface MTRequest : NSObject @property (nonatomic, strong, readonly) id internalId; @@ -24,7 +34,7 @@ @property (nonatomic) bool passthroughPasswordEntryError; @property (nonatomic) bool needsTimeoutTimer; -@property (nonatomic, copy) void (^completed)(id result, NSTimeInterval completionTimestamp, MTRpcError *error); +@property (nonatomic, copy) void (^completed)(id result, MTRequestResponseInfo *info, MTRpcError *error); @property (nonatomic, copy) void (^progressUpdated)(float progress, NSUInteger packetLength); @property (nonatomic, copy) void (^acknowledgementReceived)(); diff --git a/submodules/MtProtoKit/PublicHeaders/MtProtoKit/MTRequestContext.h b/submodules/MtProtoKit/PublicHeaders/MtProtoKit/MTRequestContext.h index 46b17abcee..9c63163811 100644 --- a/submodules/MtProtoKit/PublicHeaders/MtProtoKit/MTRequestContext.h +++ b/submodules/MtProtoKit/PublicHeaders/MtProtoKit/MTRequestContext.h @@ -12,6 +12,7 @@ @property (nonatomic) bool delivered; @property (nonatomic) int64_t responseMessageId; @property (nonatomic) bool willInitializeApi; +@property (nonatomic) double sentTimestamp; - (instancetype)initWithMessageId:(int64_t)messageId messageSeqNo:(int32_t)messageSeqNo transactionId:(id)transactionId quickAckId:(int32_t)quickAckId; diff --git a/submodules/MtProtoKit/PublicHeaders/MtProtoKit/MTTransport.h b/submodules/MtProtoKit/PublicHeaders/MtProtoKit/MTTransport.h index 156fe128c3..9237e0d24d 100644 --- a/submodules/MtProtoKit/PublicHeaders/MtProtoKit/MTTransport.h +++ b/submodules/MtProtoKit/PublicHeaders/MtProtoKit/MTTransport.h @@ -26,7 +26,7 @@ - (void)transportConnectionProblemsStatusChanged:(MTTransport * _Nonnull)transport scheme:(MTTransportScheme * _Nonnull)scheme hasConnectionProblems:(bool)hasConnectionProblems isProbablyHttp:(bool)isProbablyHttp; - (void)transportReadyForTransaction:(MTTransport * _Nonnull)transport scheme:(MTTransportScheme * _Nonnull)scheme transportSpecificTransaction:(MTMessageTransaction * _Nonnull)transportSpecificTransaction forceConfirmations:(bool)forceConfirmations transactionReady:(void (^ _Nonnull)(NSArray * _Nonnull))transactionReady; -- (void)transportHasIncomingData:(MTTransport * _Nonnull)transport scheme:(MTTransportScheme * _Nonnull)scheme data:(NSData * _Nonnull)data transactionId:(id _Nonnull)transactionId requestTransactionAfterProcessing:(bool)requestTransactionAfterProcessing decodeResult:(void (^ _Nonnull)(id _Nonnull transactionId, bool success))decodeResult; +- (void)transportHasIncomingData:(MTTransport * _Nonnull)transport scheme:(MTTransportScheme * _Nonnull)scheme networkType:(int32_t)networkType data:(NSData * _Nonnull)data transactionId:(id _Nonnull)transactionId requestTransactionAfterProcessing:(bool)requestTransactionAfterProcessing decodeResult:(void (^ _Nonnull)(id _Nonnull transactionId, bool success))decodeResult; - (void)transportTransactionsMayHaveFailed:(MTTransport * _Nonnull)transport transactionIds:(NSArray * _Nonnull)transactionIds; - (void)transportReceivedQuickAck:(MTTransport * _Nonnull)transport quickAckId:(int32_t)quickAckId; - (void)transportDecodeProgressToken:(MTTransport * _Nonnull)transport scheme:(MTTransportScheme * _Nonnull)scheme data:(NSData * _Nonnull)data token:(int64_t)token completion:(void (^ _Nonnull)(int64_t token, id _Nonnull progressToken))completion; @@ -57,7 +57,7 @@ - (void)stop; - (void)updateConnectionState; - (void)setDelegateNeedsTransaction; -- (void)_processIncomingData:(NSData * _Nonnull)data scheme:(MTTransportScheme * _Nonnull)scheme transactionId:(id _Nonnull)transactionId requestTransactionAfterProcessing:(bool)requestTransactionAfterProcessing decodeResult:(void (^ _Nonnull)(id _Nonnull transactionId, bool success))decodeResult; +- (void)_processIncomingData:(NSData * _Nonnull)data scheme:(MTTransportScheme * _Nonnull)scheme networkType:(int32_t)networkType transactionId:(id _Nonnull)transactionId requestTransactionAfterProcessing:(bool)requestTransactionAfterProcessing decodeResult:(void (^ _Nonnull)(id _Nonnull transactionId, bool success))decodeResult; - (void)_networkAvailabilityChanged:(bool)networkAvailable; - (void)activeTransactionIds:(void (^ _Nonnull)(NSArray * _Nonnull activeTransactionId))completion; diff --git a/submodules/MtProtoKit/Sources/GCDAsyncSocket.h b/submodules/MtProtoKit/Sources/GCDAsyncSocket.h index 1f7f6e6378..c329cd4b7d 100644 --- a/submodules/MtProtoKit/Sources/GCDAsyncSocket.h +++ b/submodules/MtProtoKit/Sources/GCDAsyncSocket.h @@ -975,7 +975,7 @@ typedef enum GCDAsyncSocketError GCDAsyncSocketError; * Called when a socket has completed reading the requested data into memory. * Not called if there is an error. **/ -- (void)socket:(GCDAsyncSocket *)sock didReadData:(NSData *)data withTag:(long)tag; +- (void)socket:(GCDAsyncSocket *)sock didReadData:(NSData *)data withTag:(long)tag networkType:(int32_t)networkType; /** * Called when a socket has read in data, but has not yet completed the read. diff --git a/submodules/MtProtoKit/Sources/GCDAsyncSocket.m b/submodules/MtProtoKit/Sources/GCDAsyncSocket.m index 470658d86e..fd54f234a0 100755 --- a/submodules/MtProtoKit/Sources/GCDAsyncSocket.m +++ b/submodules/MtProtoKit/Sources/GCDAsyncSocket.m @@ -5119,14 +5119,15 @@ enum GCDAsyncSocketConfig result = [NSData dataWithBytesNoCopy:buffer length:currentRead->bytesDone freeWhenDone:NO]; } - if (delegateQueue && [delegate respondsToSelector:@selector(socket:didReadData:withTag:)]) + if (delegateQueue && [delegate respondsToSelector:@selector(socket:didReadData:withTag:networkType:)]) { __strong id theDelegate = delegate; GCDAsyncReadPacket *theRead = currentRead; // Ensure currentRead retained since result may not own buffer + int32_t networkType = _interface == MTNetworkUsageManagerInterfaceOther ? 0 : 1; dispatch_async(delegateQueue, ^{ @autoreleasepool { - [theDelegate socket:self didReadData:result withTag:theRead->tag]; + [theDelegate socket:self didReadData:result withTag:theRead->tag networkType:networkType]; }}); } diff --git a/submodules/MtProtoKit/Sources/MTBackupAddressSignals.m b/submodules/MtProtoKit/Sources/MTBackupAddressSignals.m index be7b1f7ab2..5b79068a55 100644 --- a/submodules/MtProtoKit/Sources/MTBackupAddressSignals.m +++ b/submodules/MtProtoKit/Sources/MTBackupAddressSignals.m @@ -313,7 +313,7 @@ static NSString *makeRandomPadding() { __weak MTContext *weakCurrentContext = currentContext; return [[MTSignal alloc] initWithGenerator:^id(MTSubscriber *subscriber) { - [request setCompleted:^(MTDatacenterAddressListData *result, __unused NSTimeInterval completionTimestamp, id error) + [request setCompleted:^(MTDatacenterAddressListData *result, __unused MTRequestResponseInfo *info, id error) { if (error == nil) { __strong MTContext *strongCurrentContext = weakCurrentContext; diff --git a/submodules/MtProtoKit/Sources/MTBindKeyMessageService.m b/submodules/MtProtoKit/Sources/MTBindKeyMessageService.m index 417818f43c..7b339d497d 100644 --- a/submodules/MtProtoKit/Sources/MTBindKeyMessageService.m +++ b/submodules/MtProtoKit/Sources/MTBindKeyMessageService.m @@ -136,7 +136,7 @@ } } -- (void)mtProto:(MTProto *)mtProto receivedMessage:(MTIncomingMessage *)message authInfoSelector:(MTDatacenterAuthInfoSelector)authInfoSelector { +- (void)mtProto:(MTProto *)mtProto receivedMessage:(MTIncomingMessage *)message authInfoSelector:(MTDatacenterAuthInfoSelector)authInfoSelector networkType:(int32_t)networkType { if ([message.body isKindOfClass:[MTRpcResultMessage class]]) { MTRpcResultMessage *rpcResultMessage = message.body; if (rpcResultMessage.requestMessageId == _currentMessageId) { diff --git a/submodules/MtProtoKit/Sources/MTDatacenterAuthMessageService.m b/submodules/MtProtoKit/Sources/MTDatacenterAuthMessageService.m index 06c4f4264f..882d6c6118 100644 --- a/submodules/MtProtoKit/Sources/MTDatacenterAuthMessageService.m +++ b/submodules/MtProtoKit/Sources/MTDatacenterAuthMessageService.m @@ -410,7 +410,7 @@ static NSData *encryptRSAModernPadding(id encryptionProvider } } -- (void)mtProto:(MTProto *)mtProto receivedMessage:(MTIncomingMessage *)message authInfoSelector:(MTDatacenterAuthInfoSelector)authInfoSelector +- (void)mtProto:(MTProto *)mtProto receivedMessage:(MTIncomingMessage *)message authInfoSelector:(MTDatacenterAuthInfoSelector)authInfoSelector networkType:(int32_t)networkType { if (_stage == MTDatacenterAuthStagePQ && [message.body isKindOfClass:[MTResPqMessage class]]) { diff --git a/submodules/MtProtoKit/Sources/MTDatacenterTransferAuthAction.m b/submodules/MtProtoKit/Sources/MTDatacenterTransferAuthAction.m index 54e2de78ad..0e38f9c20c 100644 --- a/submodules/MtProtoKit/Sources/MTDatacenterTransferAuthAction.m +++ b/submodules/MtProtoKit/Sources/MTDatacenterTransferAuthAction.m @@ -101,7 +101,7 @@ [request setPayload:exportAuthRequestData metadata:@"exportAuthorization" shortMetadata:@"exportAuthorization" responseParser:responseParser]; __weak MTDatacenterTransferAuthAction *weakSelf = self; - [request setCompleted:^(MTExportedAuthorizationData *result, __unused NSTimeInterval timestamp, id error) + [request setCompleted:^(MTExportedAuthorizationData *result, __unused MTRequestResponseInfo *info, id error) { __strong MTDatacenterTransferAuthAction *strongSelf = weakSelf; if (strongSelf == nil) @@ -147,7 +147,7 @@ id authToken = _authToken; __weak MTDatacenterTransferAuthAction *weakSelf = self; - [request setCompleted:^(__unused id result, __unused NSTimeInterval timestamp, id error) + [request setCompleted:^(__unused id result, __unused MTRequestResponseInfo *info, id error) { __strong MTDatacenterTransferAuthAction *strongSelf = weakSelf; if (strongSelf == nil) diff --git a/submodules/MtProtoKit/Sources/MTDiscoverDatacenterAddressAction.m b/submodules/MtProtoKit/Sources/MTDiscoverDatacenterAddressAction.m index 8cd1ed0a6f..2cac360ee8 100644 --- a/submodules/MtProtoKit/Sources/MTDiscoverDatacenterAddressAction.m +++ b/submodules/MtProtoKit/Sources/MTDiscoverDatacenterAddressAction.m @@ -106,7 +106,7 @@ [request setPayload:getConfigData metadata:@"getConfig" shortMetadata:@"getConfig" responseParser:responseParser]; __weak MTDiscoverDatacenterAddressAction *weakSelf = self; - [request setCompleted:^(MTDatacenterAddressListData *result, __unused NSTimeInterval completionTimestamp, id error) + [request setCompleted:^(MTDatacenterAddressListData *result, __unused MTRequestResponseInfo *info, id error) { __strong MTDiscoverDatacenterAddressAction *strongSelf = weakSelf; if (strongSelf != nil) { diff --git a/submodules/MtProtoKit/Sources/MTProto.m b/submodules/MtProtoKit/Sources/MTProto.m index 4b02c06b50..7e71be1b65 100644 --- a/submodules/MtProtoKit/Sources/MTProto.m +++ b/submodules/MtProtoKit/Sources/MTProto.m @@ -1966,7 +1966,7 @@ static NSString *dumpHexString(NSData *data, int maxLength) { return hexString; } -- (void)transportHasIncomingData:(MTTransport *)transport scheme:(MTTransportScheme *)scheme data:(NSData *)data transactionId:(id)transactionId requestTransactionAfterProcessing:(bool)requestTransactionAfterProcessing decodeResult:(void (^)(id transactionId, bool success))decodeResult +- (void)transportHasIncomingData:(MTTransport *)transport scheme:(MTTransportScheme *)scheme networkType:(int32_t)networkType data:(NSData *)data transactionId:(id)transactionId requestTransactionAfterProcessing:(bool)requestTransactionAfterProcessing decodeResult:(void (^)(id transactionId, bool success))decodeResult { /*__block bool simulateError = false; static dispatch_once_t onceToken; @@ -2097,7 +2097,7 @@ static NSString *dumpHexString(NSData *data, int maxLength) { for (MTIncomingMessage *incomingMessage in parsedMessages) { - [self _processIncomingMessage:incomingMessage totalSize:(int)data.length withTransactionId:transactionId address:scheme.address authInfoSelector:authInfoSelector]; + [self _processIncomingMessage:incomingMessage totalSize:(int)data.length withTransactionId:transactionId address:scheme.address authInfoSelector:authInfoSelector networkType:networkType]; } if (requestTransactionAfterProcessing) @@ -2422,7 +2422,7 @@ static bool isDataEqualToDataConstTime(NSData *data1, NSData *data2) { return messages; } -- (void)_processIncomingMessage:(MTIncomingMessage *)incomingMessage totalSize:(int)totalSize withTransactionId:(id)transactionId address:(MTDatacenterAddress *)address authInfoSelector:(MTDatacenterAuthInfoSelector)authInfoSelector +- (void)_processIncomingMessage:(MTIncomingMessage *)incomingMessage totalSize:(int)totalSize withTransactionId:(id)transactionId address:(MTDatacenterAddress *)address authInfoSelector:(MTDatacenterAuthInfoSelector)authInfoSelector networkType:(int32_t)networkType { if ([_sessionInfo messageProcessed:incomingMessage.messageId]) { @@ -2605,8 +2605,8 @@ static bool isDataEqualToDataConstTime(NSData *data1, NSData *data2) { { id messageService = _messageServices[(NSUInteger)i]; - if ([messageService respondsToSelector:@selector(mtProto:receivedMessage:authInfoSelector:)]) - [messageService mtProto:self receivedMessage:incomingMessage authInfoSelector:authInfoSelector]; + if ([messageService respondsToSelector:@selector(mtProto:receivedMessage:authInfoSelector:networkType:)]) + [messageService mtProto:self receivedMessage:incomingMessage authInfoSelector:authInfoSelector networkType:networkType]; } if (_timeFixContext != nil && [incomingMessage.body isKindOfClass:[MTPongMessage class]] && ((MTPongMessage *)incomingMessage.body).messageId == _timeFixContext.messageId) diff --git a/submodules/MtProtoKit/Sources/MTRequest.m b/submodules/MtProtoKit/Sources/MTRequest.m index e348537a8c..b154934fd3 100644 --- a/submodules/MtProtoKit/Sources/MTRequest.m +++ b/submodules/MtProtoKit/Sources/MTRequest.m @@ -5,6 +5,20 @@ #import #import +@implementation MTRequestResponseInfo + +- (instancetype)initWithNetworkType:(int32_t)networkType timestamp:(double)timestamp duration:(double)duration { + self = [super init]; + if (self != nil) { + _networkType = networkType; + _timestamp = timestamp; + _duration = duration; + } + return self; +} + +@end + @interface MTRequestInternalId : NSObject { NSUInteger _value; diff --git a/submodules/MtProtoKit/Sources/MTRequestMessageService.m b/submodules/MtProtoKit/Sources/MTRequestMessageService.m index 9b89b6a545..fed278d302 100644 --- a/submodules/MtProtoKit/Sources/MTRequestMessageService.m +++ b/submodules/MtProtoKit/Sources/MTRequestMessageService.m @@ -369,7 +369,7 @@ MTRequestNoopParser responseParser = [[_context serialization] requestNoop:&noopData]; [request setPayload:noopData metadata:@"noop" shortMetadata:@"noop" responseParser:responseParser]; - [request setCompleted:^(__unused id result, __unused NSTimeInterval timestamp, __unused id error) { + [request setCompleted:^(__unused id result, __unused MTRequestResponseInfo *info, __unused id error) { }]; [self addRequest:request]; @@ -614,6 +614,7 @@ } MTRequestContext *requestContext = [[MTRequestContext alloc] initWithMessageId:preparedMessage.messageId messageSeqNo:preparedMessage.seqNo transactionId:nil quickAckId:0]; + requestContext.sentTimestamp = CFAbsoluteTimeGetCurrent(); requestContext.willInitializeApi = requestsWillInitializeApi; requestContext.waitingForMessageId = true; request.requestContext = requestContext; @@ -646,6 +647,7 @@ continue; } MTRequestContext *requestContext = [[MTRequestContext alloc] initWithMessageId:preparedMessage.messageId messageSeqNo:preparedMessage.seqNo transactionId:messageInternalIdToTransactionId[messageInternalId] quickAckId:(int32_t)[messageInternalIdToQuickAckId[messageInternalId] intValue]]; + requestContext.sentTimestamp = CFAbsoluteTimeGetCurrent(); requestContext.willInitializeApi = requestsWillInitializeApi; request.requestContext = requestContext; } @@ -667,7 +669,7 @@ return nil; } -- (void)mtProto:(MTProto *)__unused mtProto receivedMessage:(MTIncomingMessage *)message authInfoSelector:(MTDatacenterAuthInfoSelector)authInfoSelector +- (void)mtProto:(MTProto *)__unused mtProto receivedMessage:(MTIncomingMessage *)message authInfoSelector:(MTDatacenterAuthInfoSelector)authInfoSelector networkType:(int32_t)networkType { if ([message.body isKindOfClass:[MTRpcResultMessage class]]) { @@ -871,6 +873,8 @@ } } + double sentTimestamp = request.requestContext.sentTimestamp; + request.requestContext = nil; if (restartRequest) @@ -879,11 +883,17 @@ } else { - void (^completed)(id result, NSTimeInterval completionTimestamp, id error) = [request.completed copy]; + void (^completed)(id result, MTRequestResponseInfo *info, id error) = [request.completed copy]; [_requests removeObjectAtIndex:(NSUInteger)index]; - if (completed) - completed(rpcResult, message.timestamp, rpcError); + if (completed) { + double duration = 0.0; + if (sentTimestamp != 0.0) { + duration = CFAbsoluteTimeGetCurrent() - sentTimestamp; + } + MTRequestResponseInfo *info = [[MTRequestResponseInfo alloc] initWithNetworkType:networkType timestamp:message.timestamp duration:duration]; + completed(rpcResult, info, rpcError); + } } break; diff --git a/submodules/MtProtoKit/Sources/MTResendMessageService.m b/submodules/MtProtoKit/Sources/MTResendMessageService.m index 468e30eec6..c72d1f7f81 100644 --- a/submodules/MtProtoKit/Sources/MTResendMessageService.m +++ b/submodules/MtProtoKit/Sources/MTResendMessageService.m @@ -121,7 +121,7 @@ } } -- (void)mtProto:(MTProto *)mtProto receivedMessage:(MTIncomingMessage *)message authInfoSelector:(MTDatacenterAuthInfoSelector)authInfoSelector +- (void)mtProto:(MTProto *)mtProto receivedMessage:(MTIncomingMessage *)message authInfoSelector:(MTDatacenterAuthInfoSelector)authInfoSelector networkType:(int32_t)networkType { if (message.messageId == _messageId) { diff --git a/submodules/MtProtoKit/Sources/MTTcpConnection.h b/submodules/MtProtoKit/Sources/MTTcpConnection.h index 872eea7981..b186b83745 100644 --- a/submodules/MtProtoKit/Sources/MTTcpConnection.h +++ b/submodules/MtProtoKit/Sources/MTTcpConnection.h @@ -13,7 +13,7 @@ - (void)tcpConnectionOpened:(MTTcpConnection *)connection; - (void)tcpConnectionClosed:(MTTcpConnection *)connection error:(bool)error; -- (void)tcpConnectionReceivedData:(MTTcpConnection *)connection data:(NSData *)data; +- (void)tcpConnectionReceivedData:(MTTcpConnection *)connection networkType:(int32_t)networkType data:(NSData *)data; - (void)tcpConnectionReceivedQuickAck:(MTTcpConnection *)connection quickAck:(int32_t)quickAck; - (void)tcpConnectionDecodePacketProgressToken:(MTTcpConnection *)connection data:(NSData *)data token:(int64_t)token completion:(void (^)(int64_t token, id packetProgressToken))completion; - (void)tcpConnectionProgressUpdated:(MTTcpConnection *)connection packetProgressToken:(id)packetProgressToken packetLength:(NSUInteger)packetLength progress:(float)progress; diff --git a/submodules/MtProtoKit/Sources/MTTcpConnection.m b/submodules/MtProtoKit/Sources/MTTcpConnection.m index 77819c28ec..dd2755e68d 100644 --- a/submodules/MtProtoKit/Sources/MTTcpConnection.m +++ b/submodules/MtProtoKit/Sources/MTTcpConnection.m @@ -664,10 +664,10 @@ struct ctr_state { } } -- (void)socket:(GCDAsyncSocket *)socket didReadData:(NSData *)rawData withTag:(long)tag { +- (void)socket:(GCDAsyncSocket *)socket didReadData:(NSData *)rawData withTag:(long)tag networkType:(int32_t)networkType { id delegate = _delegate; if (delegate) { - [delegate connectionInterfaceDidReadData:rawData withTag:tag]; + [delegate connectionInterfaceDidReadData:rawData withTag:tag networkType:networkType]; } } @@ -717,6 +717,7 @@ struct ctr_state { id _socket; bool _closed; + int32_t _lastNetworkType; bool _useIntermediateFormat; @@ -1459,7 +1460,7 @@ struct ctr_state { [_socket readDataToLength:4 withTimeout:-1 tag:MTTcpSocksRequest]; } -- (void)connectionInterfaceDidReadData:(NSData *)rawData withTag:(long)tag +- (void)connectionInterfaceDidReadData:(NSData *)rawData withTag:(long)tag networkType:(int32_t)networkType { if (_closed) return; @@ -1754,12 +1755,12 @@ struct ctr_state { [_socket readDataToLength:(int)nextLength withTimeout:-1 tag:MTTcpSocksReceiveComplexPacketPart]; return; } else if (tag == MTTcpSocksReceiveComplexPacketPart) { - [self addReadData:rawData]; + [self addReadData:rawData networkType:networkType]; [_socket readDataToLength:5 withTimeout:-1 tag:MTTcpSocksReceiveComplexLength]; return; } else { - [self addReadData:rawData]; + [self addReadData:rawData networkType:networkType]; } } @@ -1775,15 +1776,15 @@ struct ctr_state { [_receivedDataBuffer replaceBytesInRange:NSMakeRange(0, _pendingReceiveData.length) withBytes:nil length:0]; int tag = _pendingReceiveData.tag; _pendingReceiveData = nil; - [self processReceivedData:rawData tag:tag]; + [self processReceivedData:rawData tag:tag networkType:_lastNetworkType]; } } -- (void)addReadData:(NSData *)data { +- (void)addReadData:(NSData *)data networkType:(int32_t)networkType { if (_pendingReceiveData != nil && _pendingReceiveData.length == data.length) { int tag = _pendingReceiveData.tag; _pendingReceiveData = nil; - [self processReceivedData:data tag:tag]; + [self processReceivedData:data tag:tag networkType:networkType]; } else { [_receivedDataBuffer appendData:data]; if (_pendingReceiveData != nil) { @@ -1792,13 +1793,15 @@ struct ctr_state { [_receivedDataBuffer replaceBytesInRange:NSMakeRange(0, _pendingReceiveData.length) withBytes:nil length:0]; int tag = _pendingReceiveData.tag; _pendingReceiveData = nil; - [self processReceivedData:rawData tag:tag]; + [self processReceivedData:rawData tag:tag networkType:networkType]; } } } } -- (void)processReceivedData:(NSData *)rawData tag:(int)tag { +- (void)processReceivedData:(NSData *)rawData tag:(int)tag networkType:(int32_t)networkType { + _lastNetworkType = networkType; + NSMutableData *decryptedData = [[NSMutableData alloc] initWithLength:rawData.length]; [_incomingAesCtr encryptIn:rawData.bytes out:decryptedData.mutableBytes len:rawData.length]; @@ -1968,8 +1971,8 @@ struct ctr_state { if (_connectionReceivedData) _connectionReceivedData(packetData); id delegate = _delegate; - if ([delegate respondsToSelector:@selector(tcpConnectionReceivedData:data:)]) - [delegate tcpConnectionReceivedData:self data:packetData]; + if ([delegate respondsToSelector:@selector(tcpConnectionReceivedData:networkType:data:)]) + [delegate tcpConnectionReceivedData:self networkType:networkType data:packetData]; } if (_useIntermediateFormat) { diff --git a/submodules/MtProtoKit/Sources/MTTcpTransport.m b/submodules/MtProtoKit/Sources/MTTcpTransport.m index 5dc5154d52..7c5b0fab46 100644 --- a/submodules/MtProtoKit/Sources/MTTcpTransport.m +++ b/submodules/MtProtoKit/Sources/MTTcpTransport.m @@ -452,7 +452,7 @@ static const NSTimeInterval MTTcpTransportSleepWatchdogTimeout = 60.0; }]; } -- (void)tcpConnectionReceivedData:(MTTcpConnection *)connection data:(NSData *)data +- (void)tcpConnectionReceivedData:(MTTcpConnection *)connection networkType:(int32_t)networkType data:(NSData *)data { MTTcpTransportContext *transportContext = _transportContext; [[MTTcpTransport tcpTransportQueue] dispatchOnQueue:^ @@ -464,7 +464,7 @@ static const NSTimeInterval MTTcpTransportSleepWatchdogTimeout = 60.0; [self startActualizationPingResendTimer]; __weak MTTcpTransport *weakSelf = self; - [self _processIncomingData:data scheme:connection.scheme transactionId:connection.internalId requestTransactionAfterProcessing:false decodeResult:^(id transactionId, bool success) + [self _processIncomingData:data scheme:connection.scheme networkType:networkType transactionId:connection.internalId requestTransactionAfterProcessing:false decodeResult:^(id transactionId, bool success) { if (success) { @@ -760,7 +760,7 @@ static const NSTimeInterval MTTcpTransportSleepWatchdogTimeout = 60.0; }]; } -- (void)mtProto:(MTProto *)__unused mtProto receivedMessage:(MTIncomingMessage *)incomingMessage authInfoSelector:(MTDatacenterAuthInfoSelector)authInfoSelector +- (void)mtProto:(MTProto *)__unused mtProto receivedMessage:(MTIncomingMessage *)incomingMessage authInfoSelector:(MTDatacenterAuthInfoSelector)authInfoSelector networkType:(int32_t)networkType { if ([incomingMessage.body isKindOfClass:[MTPongMessage class]]) { diff --git a/submodules/MtProtoKit/Sources/MTTimeSyncMessageService.m b/submodules/MtProtoKit/Sources/MTTimeSyncMessageService.m index 30f1fe2f15..deaa634fd2 100644 --- a/submodules/MtProtoKit/Sources/MTTimeSyncMessageService.m +++ b/submodules/MtProtoKit/Sources/MTTimeSyncMessageService.m @@ -127,7 +127,7 @@ } } -- (void)mtProto:(MTProto *)mtProto receivedMessage:(MTIncomingMessage *)message authInfoSelector:(MTDatacenterAuthInfoSelector)authInfoSelector +- (void)mtProto:(MTProto *)mtProto receivedMessage:(MTIncomingMessage *)message authInfoSelector:(MTDatacenterAuthInfoSelector)authInfoSelector networkType:(int32_t)networkType { if ([message.body isKindOfClass:[MTFutureSaltsMessage class]] && ((MTFutureSaltsMessage *)message.body).requestMessageId == _currentMessageId) { diff --git a/submodules/MtProtoKit/Sources/MTTransport.m b/submodules/MtProtoKit/Sources/MTTransport.m index ab2511226f..6d23ff4897 100644 --- a/submodules/MtProtoKit/Sources/MTTransport.m +++ b/submodules/MtProtoKit/Sources/MTTransport.m @@ -58,12 +58,12 @@ { } -- (void)_processIncomingData:(NSData *)data scheme:(MTTransportScheme *)scheme transactionId:(id)transactionId requestTransactionAfterProcessing:(bool)requestTransactionAfterProcessing decodeResult:(void (^)(id transactionId, bool success))decodeResult +- (void)_processIncomingData:(NSData *)data scheme:(MTTransportScheme *)scheme networkType:(int32_t)networkType transactionId:(id)transactionId requestTransactionAfterProcessing:(bool)requestTransactionAfterProcessing decodeResult:(void (^)(id transactionId, bool success))decodeResult { id delegate = _delegate; - if ([delegate respondsToSelector:@selector(transportHasIncomingData:scheme:data:transactionId:requestTransactionAfterProcessing:decodeResult:)]) + if ([delegate respondsToSelector:@selector(transportHasIncomingData:scheme:networkType:data:transactionId:requestTransactionAfterProcessing:decodeResult:)]) { - [delegate transportHasIncomingData:self scheme:scheme data:data transactionId:transactionId requestTransactionAfterProcessing:requestTransactionAfterProcessing decodeResult:decodeResult]; + [delegate transportHasIncomingData:self scheme:scheme networkType:networkType data:data transactionId:transactionId requestTransactionAfterProcessing:requestTransactionAfterProcessing decodeResult:decodeResult]; } } diff --git a/submodules/TelegramCore/Sources/Account/Account.swift b/submodules/TelegramCore/Sources/Account/Account.swift index 1b48a4be4c..2362832a49 100644 --- a/submodules/TelegramCore/Sources/Account/Account.swift +++ b/submodules/TelegramCore/Sources/Account/Account.swift @@ -970,6 +970,8 @@ public class Account { private var lastSmallLogPostTimestamp: Double? private let smallLogPostDisposable = MetaDisposable() + let networkStatsContext: NetworkStatsContext + public init(accountManager: AccountManager, id: AccountRecordId, basePath: String, testingEnvironment: Bool, postbox: Postbox, network: Network, networkArguments: NetworkInitializationArguments, peerId: PeerId, auxiliaryMethods: AccountAuxiliaryMethods, supplementary: Bool) { self.accountManager = accountManager self.id = id @@ -983,6 +985,8 @@ public class Account { self.auxiliaryMethods = auxiliaryMethods self.supplementary = supplementary + self.networkStatsContext = NetworkStatsContext(postbox: postbox) + self.peerInputActivityManager = PeerInputActivityManager() self.callSessionManager = CallSessionManager(postbox: postbox, network: network, maxLayer: networkArguments.voipMaxLayer, versions: networkArguments.voipVersions, addUpdates: { [weak self] updates in self?.stateManager?.addUpdates(updates) diff --git a/submodules/TelegramCore/Sources/Network/Download.swift b/submodules/TelegramCore/Sources/Network/Download.swift index 477be727de..ff00e74251 100644 --- a/submodules/TelegramCore/Sources/Network/Download.swift +++ b/submodules/TelegramCore/Sources/Network/Download.swift @@ -362,16 +362,16 @@ class Download: NSObject, MTRequestMessageServiceDelegate { return true } - request.completed = { (boxedResponse, timestamp, error) -> () in + request.completed = { (boxedResponse, info, error) -> () in if let error = error { - subscriber.putError((error, timestamp)) + subscriber.putError((error, info?.timestamp ?? 0.0)) } else { if let result = (boxedResponse as! BoxedMessage).body as? T { - subscriber.putNext((result, timestamp)) + subscriber.putNext((result, info?.timestamp ?? 0.0)) subscriber.putCompletion() } else { - subscriber.putError((MTRpcError(errorCode: 500, errorDescription: "TL_VERIFICATION_ERROR"), timestamp)) + subscriber.putError((MTRpcError(errorCode: 500, errorDescription: "TL_VERIFICATION_ERROR"), info?.timestamp ?? 0.0)) } } } @@ -386,7 +386,7 @@ class Download: NSObject, MTRequestMessageServiceDelegate { } } - func rawRequest(_ data: (FunctionDescription, Buffer, (Buffer) -> Any?), automaticFloodWait: Bool = true, failOnServerErrors: Bool = false, logPrefix: String = "") -> Signal<(Any, Double), (MTRpcError, Double)> { + func rawRequest(_ data: (FunctionDescription, Buffer, (Buffer) -> Any?), automaticFloodWait: Bool = true, failOnServerErrors: Bool = false, logPrefix: String = "") -> Signal<(Any, NetworkResponseInfo), (MTRpcError, Double)> { let requestService = self.requestService return Signal { subscriber in let request = MTRequest() @@ -414,11 +414,16 @@ class Download: NSObject, MTRequestMessageServiceDelegate { return true } - request.completed = { (boxedResponse, timestamp, error) -> () in + request.completed = { (boxedResponse, info, error) -> () in if let error = error { - subscriber.putError((error, timestamp)) + subscriber.putError((error, info?.timestamp ?? 0)) } else { - subscriber.putNext(((boxedResponse as! BoxedMessage).body, timestamp)) + let mappedInfo = NetworkResponseInfo( + timestamp: info?.timestamp ?? 0.0, + networkType: info?.networkType == 0 ? .wifi : .cellular, + networkDuration: info?.duration ?? 0.0 + ) + subscriber.putNext(((boxedResponse as! BoxedMessage).body, mappedInfo)) subscriber.putCompletion() } } diff --git a/submodules/TelegramCore/Sources/Network/MultipartFetch.swift b/submodules/TelegramCore/Sources/Network/MultipartFetch.swift index 568eb5fac4..3bd4c3dbe0 100644 --- a/submodules/TelegramCore/Sources/Network/MultipartFetch.swift +++ b/submodules/TelegramCore/Sources/Network/MultipartFetch.swift @@ -86,14 +86,17 @@ private struct DownloadWrapper { let network: Network let useMainConnection: Bool - func request(_ data: (FunctionDescription, Buffer, DeserializeFunctionResponse), tag: MediaResourceFetchTag?, continueInBackground: Bool) -> Signal { + func request(_ data: (FunctionDescription, Buffer, DeserializeFunctionResponse), tag: MediaResourceFetchTag?, continueInBackground: Bool) -> Signal<(T, NetworkResponseInfo), MTRpcError> { let target: MultiplexedRequestTarget if self.isCdn { target = .cdn(Int(self.datacenterId)) } else { target = .main(Int(self.datacenterId)) } - return network.multiplexedRequestManager.request(to: target, consumerId: self.consumerId, data: data, tag: tag, continueInBackground: continueInBackground) + return network.multiplexedRequestManager.requestWithAdditionalInfo(to: target, consumerId: self.consumerId, data: data, tag: tag, continueInBackground: continueInBackground) + |> mapError { error, _ -> MTRpcError in + return error + } } } @@ -172,7 +175,7 @@ private final class MultipartCdnHashSource { self.clusterContexts[offset] = clusterContext disposable.set((self.masterDownload.request(Api.functions.upload.getCdnFileHashes(fileToken: Buffer(data: self.fileToken), offset: offset), tag: nil, continueInBackground: self.continueInBackground) - |> map { partHashes -> [Int64: Data] in + |> map { partHashes, _ -> [Int64: Data] in var parsedPartHashes: [Int64: Data] = [:] for part in partHashes { switch part { @@ -288,9 +291,20 @@ private final class MultipartCdnHashSource { private enum MultipartFetchSource { case none case master(location: MultipartFetchMasterLocation, download: DownloadWrapper) - case cdn(masterDatacenterId: Int32, fileToken: Data, key: Data, iv: Data, download: DownloadWrapper, masterDownload: DownloadWrapper, hashSource: MultipartCdnHashSource) + case cdn(masterDatacenterId: Int32, cdnDatacenterId: Int32, fileToken: Data, key: Data, iv: Data, download: DownloadWrapper, masterDownload: DownloadWrapper, hashSource: MultipartCdnHashSource) - func request(offset: Int64, limit: Int64, tag: MediaResourceFetchTag?, resource: TelegramMediaResource, resourceReference: FetchResourceReference, fileReference: Data?, continueInBackground: Bool) -> Signal { + var effectiveDatacenterId: Int32 { + switch self { + case .none: + return 0 + case let .master(location, _): + return location.datacenterId + case let .cdn(_, cdnDatacenterId, _, _, _, _, _, _): + return cdnDatacenterId + } + } + + func request(offset: Int64, limit: Int64, tag: MediaResourceFetchTag?, resource: TelegramMediaResource, resourceReference: FetchResourceReference, fileReference: Data?, continueInBackground: Bool) -> Signal<(Data, NetworkResponseInfo), MultipartFetchDownloadError> { var resourceReferenceValue: MediaResourceReference? switch resourceReference { case .forceRevalidate: @@ -323,14 +337,14 @@ private enum MultipartFetchSource { } return .generic } - |> mapToSignal { result -> Signal in + |> mapToSignal { result, info -> Signal<(Data, NetworkResponseInfo), MultipartFetchDownloadError> in switch result { case let .file(_, _, bytes): var resultData = bytes.makeData() if resultData.count > Int(limit) { resultData.count = Int(limit) } - return .single(resultData) + return .single((resultData, info)) case let .fileCdnRedirect(dcId, fileToken, encryptionKey, encryptionIv, partHashes): var parsedPartHashes: [Int64: Data] = [:] for part in partHashes { @@ -350,18 +364,18 @@ private enum MultipartFetchSource { |> mapError { error -> MultipartFetchDownloadError in return .fatal } - |> mapToSignal { result -> Signal in + |> mapToSignal { result, info -> Signal<(Data, NetworkResponseInfo), MultipartFetchDownloadError> in switch result { case let .webFile(_, _, _, _, bytes): var resultData = bytes.makeData() if resultData.count > Int(limit) { resultData.count = Int(limit) } - return .single(resultData) + return .single((resultData, info)) } } } - case let .cdn(masterDatacenterId, fileToken, key, iv, download, _, hashSource): + case let .cdn(masterDatacenterId, _, fileToken, key, iv, download, _, hashSource): var updatedLength = roundUp(Int64(limit), to: 4096) while updatedLength % 4096 != 0 || 1048576 % updatedLength != 0 { updatedLength += 1 @@ -371,13 +385,13 @@ private enum MultipartFetchSource { |> mapError { _ -> MultipartFetchDownloadError in return .generic } - |> mapToSignal { result -> Signal in + |> mapToSignal { result, info -> Signal<(Data, NetworkResponseInfo), MultipartFetchDownloadError> in switch result { case let .cdnFileReuploadNeeded(token): return .fail(.reuploadToCdn(masterDatacenterId: masterDatacenterId, token: token.makeData())) case let .cdnFile(bytes): if bytes.size == 0 { - return .single(bytes.makeData()) + return .single((bytes.makeData(), info)) } else { var partIv = iv let partIvCount = partIv.count @@ -386,13 +400,14 @@ private enum MultipartFetchSource { var ivOffset: Int32 = Int32(clamping: (offset / 16)).bigEndian memcpy(bytes.advanced(by: partIvCount - 4), &ivOffset, 4) } - return .single(MTAesCtrDecrypt(bytes.makeData(), key, partIv)!) + return .single((MTAesCtrDecrypt(bytes.makeData(), key, partIv)!, info)) } } } return combineLatest(part, hashSource.get(offset: offset, limit: limit)) - |> mapToSignal { partData, hashData -> Signal in + |> mapToSignal { partDataAndInfo, hashData -> Signal<(Data, NetworkResponseInfo), MultipartFetchDownloadError> in + let (partData, info) = partDataAndInfo var localOffset: Int64 = 0 while localOffset < partData.count { let dataToHash = partData.subdata(in: Int(localOffset) ..< min(partData.count, Int(localOffset + Int64(dataHashLength)))) @@ -407,7 +422,7 @@ private enum MultipartFetchSource { localOffset += Int64(dataHashLength) } - return .single(partData) + return .single((partData, info)) } } } @@ -425,6 +440,21 @@ private final class MultipartFetchManager { var byteCount: Int } + private final class FetchingPart { + let size: Int64 + let disposable: Disposable + let startTime: Double + + init( + size: Int64, + disposable: Disposable + ) { + self.size = size + self.disposable = disposable + self.startTime = CFAbsoluteTimeGetCurrent() + } + } + let parallelParts: Int let defaultPartSize: Int64 var partAlignment: Int64 = 4 * 1024 @@ -445,6 +475,7 @@ private final class MultipartFetchManager { let postbox: Postbox let network: Network + let networkStatsContext: NetworkStatsContext? let revalidationContext: MediaReferenceRevalidationContext? let continueInBackground: Bool let partReady: (Int64, Data) -> Void @@ -454,7 +485,7 @@ private final class MultipartFetchManager { private let useMainConnection: Bool private var source: MultipartFetchSource - var fetchingParts: [Int64: (Int64, Disposable)] = [:] + private var fetchingParts: [Int64: FetchingPart] = [:] var nextFetchingPartId = 0 var fetchedParts: [Int64: (Int64, Data)] = [:] var cachedPartHashes: [Int64: Data] = [:] @@ -474,7 +505,7 @@ private final class MultipartFetchManager { private var fetchSpeedRecords: [FetchSpeedRecord] = [] private var totalFetchedByteCount: Int = 0 - init(resource: TelegramMediaResource, parameters: MediaResourceFetchParameters?, size: Int64?, intervals: Signal<[(Range, MediaBoxFetchPriority)], NoError>, encryptionKey: SecretFileEncryptionKey?, decryptedSize: Int64?, location: MultipartFetchMasterLocation, postbox: Postbox, network: Network, revalidationContext: MediaReferenceRevalidationContext?, partReady: @escaping (Int64, Data) -> Void, reportCompleteSize: @escaping (Int64) -> Void, finishWithError: @escaping (MediaResourceDataFetchError) -> Void, useMainConnection: Bool) { + init(resource: TelegramMediaResource, parameters: MediaResourceFetchParameters?, size: Int64?, intervals: Signal<[(Range, MediaBoxFetchPriority)], NoError>, encryptionKey: SecretFileEncryptionKey?, decryptedSize: Int64?, location: MultipartFetchMasterLocation, postbox: Postbox, network: Network, networkStatsContext: NetworkStatsContext?, revalidationContext: MediaReferenceRevalidationContext?, partReady: @escaping (Int64, Data) -> Void, reportCompleteSize: @escaping (Int64) -> Void, finishWithError: @escaping (MediaResourceDataFetchError) -> Void, useMainConnection: Bool) { self.resource = resource self.parameters = parameters self.consumerId = Int64.random(in: Int64.min ... Int64.max) @@ -526,6 +557,7 @@ private final class MultipartFetchManager { self.state = MultipartDownloadState(encryptionKey: encryptionKey, decryptedSize: decryptedSize) self.postbox = postbox self.network = network + self.networkStatsContext = networkStatsContext self.revalidationContext = revalidationContext self.source = .master(location: location, download: DownloadWrapper(consumerId: self.consumerId, datacenterId: location.datacenterId, isCdn: false, network: network, useMainConnection: self.useMainConnection)) self.partReady = partReady @@ -613,8 +645,8 @@ private final class MultipartFetchManager { func cancel() { self.queue.async { self.source = .none - for (_, (_, disposable)) in self.fetchingParts { - disposable.dispose() + for (_, fetchingPart) in self.fetchingParts { + fetchingPart.disposable.dispose() } self.reuploadToCdnDisposable.dispose() self.revalidateMediaReferenceDisposable.dispose() @@ -680,8 +712,8 @@ private final class MultipartFetchManager { } } - for (offset, (size, _)) in self.fetchingParts { - removeFromFetchIntervals.formUnion(RangeSet(offset ..< (offset + size))) + for (offset, fetchingPart) in self.fetchingParts { + removeFromFetchIntervals.formUnion(RangeSet(offset ..< (offset + fetchingPart.size))) } if let completeSize = self.completeSize { @@ -758,16 +790,27 @@ private final class MultipartFetchManager { insertIndex += 1 } + let partSize: Int32 = Int32(downloadRange.upperBound - downloadRange.lowerBound) let part = self.source.request(offset: downloadRange.lowerBound, limit: downloadRange.upperBound - downloadRange.lowerBound, tag: self.parameters?.tag, resource: self.resource, resourceReference: self.resourceReference, fileReference: self.fileReference, continueInBackground: self.continueInBackground) - //|> delay(5.0, queue: self.queue) |> deliverOn(self.queue) let partDisposable = MetaDisposable() - self.fetchingParts[downloadRange.lowerBound] = (Int64(downloadRange.count), partDisposable) - - partDisposable.set(part.start(next: { [weak self] data in + self.fetchingParts[downloadRange.lowerBound] = FetchingPart(size: Int64(downloadRange.count), disposable: partDisposable) + let partStartTimestamp = CFAbsoluteTimeGetCurrent() + let effectiveDatacenterId = self.source.effectiveDatacenterId + partDisposable.set(part.start(next: { [weak self] data, info in guard let strongSelf = self else { return } + + strongSelf.networkStatsContext?.add(downloadEvents: [ + NetworkStatsContext.DownloadEvent( + networkType: info.networkType, + datacenterId: effectiveDatacenterId, + size: Double(partSize), + networkDuration: info.networkDuration, + issueDuration: CFAbsoluteTimeGetCurrent() - partStartTimestamp + ) + ]) if data.count < downloadRange.count { strongSelf.completeSize = downloadRange.lowerBound + Int64(data.count) } @@ -788,7 +831,7 @@ private final class MultipartFetchManager { if !strongSelf.revalidatingMediaReference && !strongSelf.revalidatedMediaReference { strongSelf.revalidatingMediaReference = true for (_, part) in strongSelf.fetchingParts { - part.1.dispose() + part.disposable.dispose() } strongSelf.fetchingParts.removeAll() @@ -819,7 +862,7 @@ private final class MultipartFetchManager { switch strongSelf.source { case let .master(location, download): strongSelf.partAlignment = dataHashLength - strongSelf.source = .cdn(masterDatacenterId: location.datacenterId, fileToken: token, key: key, iv: iv, download: DownloadWrapper(consumerId: strongSelf.consumerId, datacenterId: id, isCdn: true, network: strongSelf.network, useMainConnection: strongSelf.useMainConnection), masterDownload: download, hashSource: MultipartCdnHashSource(queue: strongSelf.queue, fileToken: token, hashes: partHashes, masterDownload: download, continueInBackground: strongSelf.continueInBackground)) + strongSelf.source = .cdn(masterDatacenterId: location.datacenterId, cdnDatacenterId: id, fileToken: token, key: key, iv: iv, download: DownloadWrapper(consumerId: strongSelf.consumerId, datacenterId: id, isCdn: true, network: strongSelf.network, useMainConnection: strongSelf.useMainConnection), masterDownload: download, hashSource: MultipartCdnHashSource(queue: strongSelf.queue, fileToken: token, hashes: partHashes, masterDownload: download, continueInBackground: strongSelf.continueInBackground)) strongSelf.checkState() case .cdn, .none: break @@ -828,10 +871,13 @@ private final class MultipartFetchManager { switch strongSelf.source { case .master, .none: break - case let .cdn(_, fileToken, _, _, _, masterDownload, _): + case let .cdn(_, _, fileToken, _, _, _, masterDownload, _): if !strongSelf.reuploadingToCdn { strongSelf.reuploadingToCdn = true let reupload: Signal<[Api.FileHash], NoError> = masterDownload.request(Api.functions.upload.reuploadCdnFile(fileToken: Buffer(data: fileToken), requestToken: Buffer(data: token)), tag: nil, continueInBackground: strongSelf.continueInBackground) + |> map { result, _ -> [Api.FileHash] in + return result + } |> `catch` { _ -> Signal<[Api.FileHash], NoError> in return .single([]) } @@ -856,6 +902,7 @@ public func standaloneMultipartFetch(postbox: Postbox, network: Network, resourc postbox: postbox, network: network, mediaReferenceRevalidationContext: nil, + networkStatsContext: nil, resource: resource, datacenterId: datacenterId, size: size, @@ -877,6 +924,7 @@ private func multipartFetchV1( postbox: Postbox, network: Network, mediaReferenceRevalidationContext: MediaReferenceRevalidationContext?, + networkStatsContext: NetworkStatsContext?, resource: TelegramMediaResource, datacenterId: Int, size: Int64?, @@ -944,7 +992,7 @@ private func multipartFetchV1( subscriber.putNext(.reset) } - let manager = MultipartFetchManager(resource: resource, parameters: parameters, size: size, intervals: intervals, encryptionKey: encryptionKey, decryptedSize: decryptedSize, location: location, postbox: postbox, network: network, revalidationContext: mediaReferenceRevalidationContext, partReady: { dataOffset, data in + let manager = MultipartFetchManager(resource: resource, parameters: parameters, size: size, intervals: intervals, encryptionKey: encryptionKey, decryptedSize: decryptedSize, location: location, postbox: postbox, network: network, networkStatsContext: networkStatsContext, revalidationContext: mediaReferenceRevalidationContext, partReady: { dataOffset, data in subscriber.putNext(.dataPart(resourceOffset: dataOffset, data: data, range: 0 ..< Int64(data.count), complete: false)) }, reportCompleteSize: { size in subscriber.putNext(.resourceSizeUpdated(size)) @@ -968,6 +1016,7 @@ func multipartFetch( postbox: Postbox, network: Network, mediaReferenceRevalidationContext: MediaReferenceRevalidationContext?, + networkStatsContext: NetworkStatsContext?, resource: TelegramMediaResource, datacenterId: Int, size: Int64?, @@ -998,6 +1047,7 @@ func multipartFetch( postbox: postbox, network: network, mediaReferenceRevalidationContext: mediaReferenceRevalidationContext, + networkStatsContext: networkStatsContext, resource: resource, datacenterId: datacenterId, size: size, diff --git a/submodules/TelegramCore/Sources/Network/MultiplexedRequestManager.swift b/submodules/TelegramCore/Sources/Network/MultiplexedRequestManager.swift index 16c1c22e64..73b7d0c2c8 100644 --- a/submodules/TelegramCore/Sources/Network/MultiplexedRequestManager.swift +++ b/submodules/TelegramCore/Sources/Network/MultiplexedRequestManager.swift @@ -33,10 +33,10 @@ private final class RequestData { let continueInBackground: Bool let automaticFloodWait: Bool let deserializeResponse: (Buffer) -> Any? - let completed: (Any, Double) -> Void + let completed: (Any, NetworkResponseInfo) -> Void let error: (MTRpcError, Double) -> Void - init(id: Int32, consumerId: Int64, target: MultiplexedRequestTarget, functionDescription: FunctionDescription, payload: Buffer, tag: MediaResourceFetchTag?, continueInBackground: Bool, automaticFloodWait: Bool, deserializeResponse: @escaping (Buffer) -> Any?, completed: @escaping (Any, Double) -> Void, error: @escaping (MTRpcError, Double) -> Void) { + init(id: Int32, consumerId: Int64, target: MultiplexedRequestTarget, functionDescription: FunctionDescription, payload: Buffer, tag: MediaResourceFetchTag?, continueInBackground: Bool, automaticFloodWait: Bool, deserializeResponse: @escaping (Buffer) -> Any?, completed: @escaping (Any, NetworkResponseInfo) -> Void, error: @escaping (MTRpcError, Double) -> Void) { self.id = id self.consumerId = consumerId self.target = target @@ -80,6 +80,11 @@ private struct MultiplexedRequestTargetTimerKey: Equatable, Hashable { private typealias SignalKitTimer = SwiftSignalKit.Timer +struct NetworkResponseInfo { + var timestamp: Double + var networkType: NetworkStatsContext.NetworkType + var networkDuration: Double +} private final class MultiplexedRequestManagerContext { private let queue: Queue @@ -109,15 +114,15 @@ private final class MultiplexedRequestManagerContext { } } - func request(to target: MultiplexedRequestTarget, consumerId: Int64, data: (FunctionDescription, Buffer, (Buffer) -> Any?), tag: MediaResourceFetchTag?, continueInBackground: Bool, automaticFloodWait: Bool, completed: @escaping (Any, Double) -> Void, error: @escaping (MTRpcError, Double) -> Void) -> Disposable { + func request(to target: MultiplexedRequestTarget, consumerId: Int64, data: (FunctionDescription, Buffer, (Buffer) -> Any?), tag: MediaResourceFetchTag?, continueInBackground: Bool, automaticFloodWait: Bool, completed: @escaping (Any, NetworkResponseInfo) -> Void, error: @escaping (MTRpcError, Double) -> Void) -> Disposable { let targetKey = MultiplexedRequestTargetKey(target: target, continueInBackground: continueInBackground) let requestId = self.nextId self.nextId += 1 self.queuedRequests.append(RequestData(id: requestId, consumerId: consumerId, target: target, functionDescription: data.0, payload: data.1, tag: tag, continueInBackground: continueInBackground, automaticFloodWait: automaticFloodWait, deserializeResponse: { buffer in return data.2(buffer) - }, completed: { result, timestamp in - completed(result, timestamp) + }, completed: { result, info in + completed(result, info) }, error: { e, timestamp in error(e, timestamp) })) @@ -189,7 +194,7 @@ private final class MultiplexedRequestManagerContext { let requestId = request.id selectedContext.requests.append(ExecutingRequestData(requestId: requestId, disposable: disposable)) let queue = self.queue - disposable.set(selectedContext.worker.rawRequest((request.functionDescription, request.payload, request.deserializeResponse), automaticFloodWait: request.automaticFloodWait).start(next: { [weak self, weak selectedContext] result, timestamp in + disposable.set(selectedContext.worker.rawRequest((request.functionDescription, request.payload, request.deserializeResponse), automaticFloodWait: request.automaticFloodWait).start(next: { [weak self, weak selectedContext] result, info in queue.async { guard let strongSelf = self else { return @@ -202,7 +207,7 @@ private final class MultiplexedRequestManagerContext { } } } - request.completed(result, timestamp) + request.completed(result, info) strongSelf.updateState() } }, error: { [weak self, weak selectedContext] error, timestamp in @@ -299,18 +304,18 @@ final class MultiplexedRequestManager { } } - func requestWithAdditionalInfo(to target: MultiplexedRequestTarget, consumerId: Int64, data: (FunctionDescription, Buffer, DeserializeFunctionResponse), tag: MediaResourceFetchTag?, continueInBackground: Bool, automaticFloodWait: Bool = true) -> Signal<(T, Double), (MTRpcError, Double)> { + func requestWithAdditionalInfo(to target: MultiplexedRequestTarget, consumerId: Int64, data: (FunctionDescription, Buffer, DeserializeFunctionResponse), tag: MediaResourceFetchTag?, continueInBackground: Bool, automaticFloodWait: Bool = true) -> Signal<(T, NetworkResponseInfo), (MTRpcError, Double)> { return Signal { subscriber in let disposable = MetaDisposable() self.context.with { context in disposable.set(context.request(to: target, consumerId: consumerId, data: (data.0, data.1, { buffer in return data.2.parse(buffer) - }), tag: tag, continueInBackground: continueInBackground, automaticFloodWait: automaticFloodWait, completed: { result, timestamp in + }), tag: tag, continueInBackground: continueInBackground, automaticFloodWait: automaticFloodWait, completed: { result, info in if let result = result as? T { - subscriber.putNext((result, timestamp)) + subscriber.putNext((result, info)) subscriber.putCompletion() } else { - subscriber.putError((MTRpcError(errorCode: 500, errorDescription: "TL_VERIFICATION_ERROR"), timestamp)) + subscriber.putError((MTRpcError(errorCode: 500, errorDescription: "TL_VERIFICATION_ERROR"), info.timestamp)) } }, error: { error, timestamp in subscriber.putError((error, timestamp)) diff --git a/submodules/TelegramCore/Sources/Network/NetworkFrameworkTcpConnectionInterface.swift b/submodules/TelegramCore/Sources/Network/NetworkFrameworkTcpConnectionInterface.swift index 7cd4f87958..dd1def5974 100644 --- a/submodules/TelegramCore/Sources/Network/NetworkFrameworkTcpConnectionInterface.swift +++ b/submodules/TelegramCore/Sources/Network/NetworkFrameworkTcpConnectionInterface.swift @@ -222,9 +222,10 @@ final class NetworkFrameworkTcpConnectionInterface: NSObject, MTTcpConnectionInt self.currentReadRequest = nil weak var delegate = self.delegate + let currentInterfaceIsWifi = self.currentInterfaceIsWifi self.delegateQueue.async { if let delegate = delegate { - delegate.connectionInterfaceDidRead(currentReadRequest.data, withTag: currentReadRequest.request.tag) + delegate.connectionInterfaceDidRead(currentReadRequest.data, withTag: currentReadRequest.request.tag, networkType: currentInterfaceIsWifi ? 0 : 1) } } diff --git a/submodules/TelegramCore/Sources/Network/NetworkStatsContext.swift b/submodules/TelegramCore/Sources/Network/NetworkStatsContext.swift new file mode 100644 index 0000000000..21b656fed5 --- /dev/null +++ b/submodules/TelegramCore/Sources/Network/NetworkStatsContext.swift @@ -0,0 +1,122 @@ +import Foundation +import SwiftSignalKit +import Postbox + +final class NetworkStatsContext { + enum NetworkType: Int32 { + case wifi = 0 + case cellular = 1 + } + + struct DownloadEvent { + let networkType: NetworkType + let datacenterId: Int32 + let size: Double + let networkDuration: Double + let issueDuration: Double + + init( + networkType: NetworkType, + datacenterId: Int32, + size: Double, + networkDuration: Double, + issueDuration: Double + ) { + self.networkType = networkType + self.datacenterId = datacenterId + self.size = size + self.networkDuration = networkDuration + self.issueDuration = issueDuration + } + } + + private struct TargetKey: Hashable { + let networkType: NetworkType + let datacenterId: Int32 + + init(networkType: NetworkType, datacenterId: Int32) { + self.networkType = networkType + self.datacenterId = datacenterId + } + } + + private final class AverageStats { + var networkBps: Double = 0.0 + var issueDuration: Double = 0.0 + var networkDelay: Double = 0.0 + var count: Int = 0 + var size: Int64 = 0 + } + + private final class Impl { + let queue: Queue + let postbox: Postbox + + var averageTargetStats: [TargetKey: AverageStats] = [:] + + init(queue: Queue, postbox: Postbox) { + self.queue = queue + self.postbox = postbox + } + + func add(downloadEvents: [DownloadEvent]) { + for event in downloadEvents { + if event.networkDuration == 0.0 { + continue + } + let targetKey = TargetKey(networkType: event.networkType, datacenterId: event.datacenterId) + let averageStats: AverageStats + if let current = self.averageTargetStats[targetKey] { + averageStats = current + } else { + averageStats = AverageStats() + self.averageTargetStats[targetKey] = averageStats + } + averageStats.count += 1 + averageStats.issueDuration += event.issueDuration + averageStats.networkDelay += event.issueDuration - event.networkDuration + averageStats.networkBps += event.size / event.networkDuration + averageStats.size += Int64(event.size) + } + + self.maybeFlushStats() + } + + private func maybeFlushStats() { + var removeKeys: [TargetKey] = [] + for (targetKey, averageStats) in self.averageTargetStats { + if averageStats.count >= 1000 || averageStats.size >= 4 * 1024 * 1024 { + addAppLogEvent(postbox: self.postbox, type: "download", data: .dictionary([ + "n": .number(Double(targetKey.networkType.rawValue)), + "d": .number(Double(targetKey.datacenterId)), + "b": .number(averageStats.networkBps / Double(averageStats.count)), + "nd": .number(averageStats.networkDelay / Double(averageStats.count)) + ])) + removeKeys.append(targetKey) + } + } + for key in removeKeys { + self.averageTargetStats.removeValue(forKey: key) + } + } + } + + private static let sharedQueue = Queue(name: "NetworkStatsContext") + + private let queue: Queue + private let impl: QueueLocalObject + + init(postbox: Postbox) { + let queue = NetworkStatsContext.sharedQueue + self.queue = queue + self.impl = QueueLocalObject(queue: queue, generate: { + return Impl(queue: queue, postbox: postbox) + }) + } + + func add(downloadEvents: [DownloadEvent]) { + self.impl.with { impl in + impl.add(downloadEvents: downloadEvents) + } + } +} diff --git a/submodules/TelegramCore/Sources/State/Fetch.swift b/submodules/TelegramCore/Sources/State/Fetch.swift index 5e65c43197..831354282a 100644 --- a/submodules/TelegramCore/Sources/State/Fetch.swift +++ b/submodules/TelegramCore/Sources/State/Fetch.swift @@ -23,7 +23,7 @@ private final class MediaResourceDataCopyFile : MediaResourceDataFetchCopyLocalI } public func fetchCloudMediaLocation(account: Account, resource: TelegramMediaResource, datacenterId: Int, size: Int64?, intervals: Signal<[(Range, MediaBoxFetchPriority)], NoError>, parameters: MediaResourceFetchParameters?) -> Signal { - return multipartFetch(postbox: account.postbox, network: account.network, mediaReferenceRevalidationContext: account.mediaReferenceRevalidationContext, resource: resource, datacenterId: datacenterId, size: size, intervals: intervals, parameters: parameters) + return multipartFetch(postbox: account.postbox, network: account.network, mediaReferenceRevalidationContext: account.mediaReferenceRevalidationContext, networkStatsContext: account.networkStatsContext, resource: resource, datacenterId: datacenterId, size: size, intervals: intervals, parameters: parameters) } private func fetchLocalFileResource(path: String, move: Bool) -> Signal { diff --git a/submodules/TelegramCore/Sources/State/FetchSecretFileResource.swift b/submodules/TelegramCore/Sources/State/FetchSecretFileResource.swift index c352f45184..13491cbbf3 100644 --- a/submodules/TelegramCore/Sources/State/FetchSecretFileResource.swift +++ b/submodules/TelegramCore/Sources/State/FetchSecretFileResource.swift @@ -5,5 +5,5 @@ import MtProtoKit func fetchSecretFileResource(account: Account, resource: SecretFileMediaResource, intervals: Signal<[(Range, MediaBoxFetchPriority)], NoError>, parameters: MediaResourceFetchParameters?) -> Signal { - return multipartFetch(postbox: account.postbox, network: account.network, mediaReferenceRevalidationContext: account.mediaReferenceRevalidationContext, resource: resource, datacenterId: resource.datacenterId, size: resource.size, intervals: intervals, parameters: parameters, encryptionKey: resource.key, decryptedSize: resource.decryptedSize) + return multipartFetch(postbox: account.postbox, network: account.network, mediaReferenceRevalidationContext: account.mediaReferenceRevalidationContext, networkStatsContext: account.networkStatsContext, resource: resource, datacenterId: resource.datacenterId, size: resource.size, intervals: intervals, parameters: parameters, encryptionKey: resource.key, decryptedSize: resource.decryptedSize) } diff --git a/submodules/TelegramCore/Sources/TelegramEngine/Resources/TelegramEngineResources.swift b/submodules/TelegramCore/Sources/TelegramEngine/Resources/TelegramEngineResources.swift index e71bff8e58..7d286c3289 100644 --- a/submodules/TelegramCore/Sources/TelegramEngine/Resources/TelegramEngineResources.swift +++ b/submodules/TelegramCore/Sources/TelegramEngine/Resources/TelegramEngineResources.swift @@ -339,7 +339,7 @@ public extension TelegramEngine { |> mapToSignal { datacenterId -> Signal in let resource = AlbumCoverResource(datacenterId: Int(datacenterId), file: file, title: title, performer: performer, isThumbnail: isThumbnail) - return multipartFetch(postbox: self.account.postbox, network: self.account.network, mediaReferenceRevalidationContext: self.account.mediaReferenceRevalidationContext, resource: resource, datacenterId: Int(datacenterId), size: nil, intervals: .single([(0 ..< Int64.max, .default)]), parameters: MediaResourceFetchParameters( + return multipartFetch(postbox: self.account.postbox, network: self.account.network, mediaReferenceRevalidationContext: self.account.mediaReferenceRevalidationContext, networkStatsContext: self.account.networkStatsContext, resource: resource, datacenterId: Int(datacenterId), size: nil, intervals: .single([(0 ..< Int64.max, .default)]), parameters: MediaResourceFetchParameters( tag: nil, info: TelegramCloudMediaResourceFetchInfo( reference: MediaResourceReference.standalone(resource: resource),