diff --git a/Buffers.h b/Buffers.h index 94fe512e7c..23f4afab73 100644 --- a/Buffers.h +++ b/Buffers.h @@ -50,6 +50,7 @@ namespace tgvoip{ public: BufferOutputStream(size_t size); BufferOutputStream(unsigned char* buffer, size_t size); + BufferOutputStream(const BufferOutputStream& other)=delete; ~BufferOutputStream(); void WriteByte(unsigned char byte); void WriteInt64(int64_t i); @@ -61,10 +62,23 @@ namespace tgvoip{ size_t GetLength(); void Reset(); void Rewind(size_t numBytes); + + BufferOutputStream& operator=(BufferOutputStream&& other){ + if(this!=&other){ + if(!bufferProvided && buffer) + free(buffer); + buffer=other.buffer; + offset=other.offset; + size=other.size; + bufferProvided=other.bufferProvided; + other.buffer=NULL; + } + return *this; + } private: void ExpandBufferIfNeeded(size_t need); - unsigned char* buffer; + unsigned char* buffer=NULL; size_t size; size_t offset; bool bufferProvided; @@ -114,6 +128,7 @@ namespace tgvoip{ ~Buffer(){ if(data) free(data); + data=NULL; }; Buffer& operator=(Buffer&& other){ if(this!=&other){ diff --git a/VoIPController.cpp b/VoIPController.cpp index fbb191556d..4032411dc2 100644 --- a/VoIPController.cpp +++ b/VoIPController.cpp @@ -25,6 +25,7 @@ #include #include #include +#include @@ -197,6 +198,7 @@ VoIPController::VoIPController() : activeNetItfName(""), shittyInternetMode=false; didAddIPv6Relays=false; didSendIPv6Endpoint=false; + unsentStreamPackets.store(0); sendThread=NULL; recvThread=NULL; @@ -404,8 +406,8 @@ void VoIPController::AudioInputCallback(unsigned char* data, size_t length, unsi void VoIPController::HandleAudioInput(unsigned char *data, size_t len, unsigned char* secondaryData, size_t secondaryLen){ if(stopping) return; - if(waitingForAcks || dontSendPackets>0){ - LOGV("waiting for RLC, dropping outgoing audio packet"); + if(waitingForAcks || dontSendPackets>0 || (unsigned int)unsentStreamPackets>=2){ + LOGV("waiting for queue, dropping outgoing audio packet"); return; } //LOGV("Audio packet size %u", (unsigned int)len); @@ -421,6 +423,7 @@ void VoIPController::HandleAudioInput(unsigned char *data, size_t len, unsigned pkt.WriteInt32(audioTimestampOut); pkt.WriteBytes(data, len); + unsentStreamPackets++; PendingOutgoingPacket p{ /*.seq=*/GenerateOutSeq(), /*.type=*/PKT_STREAM_DATA, @@ -895,6 +898,9 @@ void VoIPController::RunSendThread(void* arg){ BufferOutputStream p(buf, sizeof(buf)); WritePacketHeader(pkt.seq, &p, pkt.type, (uint32_t)pkt.len); p.WriteBytes(pkt.data); + if(pkt.type==PKT_STREAM_DATA){ + unsentStreamPackets--; + } SendPacket(p.GetBuffer(), p.GetLength(), endpoint, pkt); } //}else{ @@ -1098,6 +1104,10 @@ void VoIPController::ProcessIncomingPacket(NetworkPacket &packet, shared_ptr((int64_t)(FOURCC('L','A','N','4')) << 32, peerPort, v4addr, v6addr, Endpoint::TYPE_UDP_P2P_LAN, peerTag)); } }else if(type==EXTRA_TYPE_NETWORK_CHANGED){ - if(currentEndpoint->type!=Endpoint::TYPE_UDP_RELAY && currentEndpoint->type!=Endpoint::TYPE_TCP_RELAY){ + LOGI("Peer network changed"); + if(currentEndpoint->type!=Endpoint::TYPE_UDP_RELAY && currentEndpoint->type!=Endpoint::TYPE_TCP_RELAY) currentEndpoint=preferredRelay; - if(allowP2p) - SendPublicEndpointsRequest(); - //if(peerVersion>=2){ - uint32_t flags=(uint32_t) in.ReadInt32(); - dataSavingRequestedByPeer=(flags & INIT_FLAG_DATA_SAVING_ENABLED)==INIT_FLAG_DATA_SAVING_ENABLED; - UpdateDataSavingState(); - UpdateAudioBitrateLimit(); - //} - } + if(allowP2p) + SendPublicEndpointsRequest(); + uint32_t flags=(uint32_t) in.ReadInt32(); + dataSavingRequestedByPeer=(flags & INIT_FLAG_DATA_SAVING_ENABLED)==INIT_FLAG_DATA_SAVING_ENABLED; + UpdateDataSavingState(); + UpdateAudioBitrateLimit(); + ResetEndpointPingStats(); }else if(type==EXTRA_TYPE_GROUP_CALL_KEY){ if(!didReceiveGroupCallKey && !didSendGroupCallKey){ unsigned char groupKey[256]; @@ -1929,6 +1939,7 @@ void VoIPController::SetNetworkType(int type){ AddIPv6Relays(); ResetUdpAvailability(); + ResetEndpointPingStats(); } LOGI("set network type: %d, active interface %s", type, activeNetItfName.c_str()); } @@ -2214,6 +2225,7 @@ void VoIPController::KDF2(unsigned char* msgKey, size_t x, unsigned char *aesKey string VoIPController::GetDebugString(){ string r="Remote endpoints: \n"; char buffer[2048]; + MutexGuard m(endpointsMutex); for(shared_ptr& endpoint:endpoints){ const char* type; switch(endpoint->type){ @@ -2233,7 +2245,7 @@ string VoIPController::GetDebugString(){ type="UNKNOWN"; break; } - snprintf(buffer, sizeof(buffer), "%s:%u %dms %d 0x%lx [%s%s]\n", endpoint->address.IsEmpty() ? ("["+endpoint->v6address.ToString()+"]").c_str() : endpoint->address.ToString().c_str(), endpoint->port, (int)(endpoint->averageRTT*1000), endpoint->udpPongCount, (uint64_t)endpoint->id, type, currentEndpoint==endpoint ? ", IN_USE" : ""); + snprintf(buffer, sizeof(buffer), "%s:%u %dms %d 0x%" PRIx64 " [%s%s]\n", endpoint->address.IsEmpty() ? ("["+endpoint->v6address.ToString()+"]").c_str() : endpoint->address.ToString().c_str(), endpoint->port, (int)(endpoint->averageRTT*1000), endpoint->udpPongCount, (uint64_t)endpoint->id, type, currentEndpoint==endpoint ? ", IN_USE" : ""); r+=buffer; } if(shittyInternetMode){ @@ -2254,6 +2266,7 @@ string VoIPController::GetDebugString(){ "Last recvd seq: %u\n" "Send/recv losses: %u/%u (%d%%)\n" "Audio bitrate: %d kbit\n" + "Outgoing queue: %u\n" // "Packet grouping: %d\n" "Frame size out/in: %d/%d\n" "Bytes sent/recvd: %llu/%llu", @@ -2266,6 +2279,7 @@ string VoIPController::GetDebugString(){ lastSentSeq, lastRemoteAckSeq, lastRemoteSeq, conctl->GetSendLossCount(), recvLossCount, encoder ? encoder->GetPacketLoss() : 0, encoder ? (encoder->GetBitrate()/1000) : 0, + static_cast(unsentStreamPackets), // audioPacketGrouping, outgoingStreams[0]->frameDuration, incomingStreams.size()>0 ? incomingStreams[0]->frameDuration : 0, (long long unsigned int)(stats.bytesSentMobile+stats.bytesSentWifi), @@ -2751,6 +2765,14 @@ void VoIPController::ResetUdpAvailability(){ udpPingTimeoutID=messageThread.Post(std::bind(&VoIPController::SendUdpPings, this), 0.0, 0.5); } +void VoIPController::ResetEndpointPingStats(){ + MutexGuard m(endpointsMutex); + for(shared_ptr& e:endpoints){ + e->averageRTT=0.0; + e->rtts.Reset(); + } +} + #pragma mark - Timer methods void VoIPController::SendUdpPings(){ @@ -2815,6 +2837,8 @@ void VoIPController::SendRelayPings(){ if((state==STATE_ESTABLISHED || state==STATE_RECONNECTING) && endpoints.size()>1){ shared_ptr minPingRelay=preferredRelay; double minPing=preferredRelay->averageRTT*(preferredRelay->type==Endpoint::TYPE_TCP_RELAY ? 2 : 1); + if(minPing==0.0) // force the switch to an available relay, if any + minPing=DBL_MAX; for(shared_ptr& endpoint:endpoints){ if(endpoint->type==Endpoint::TYPE_TCP_RELAY && !useTCP) continue; diff --git a/VoIPController.h b/VoIPController.h index 031c30d956..954424b1f8 100644 --- a/VoIPController.h +++ b/VoIPController.h @@ -20,6 +20,7 @@ #include #include #include +#include #include "audio/AudioInput.h" #include "BlockingQueue.h" #include "audio/AudioOutput.h" @@ -34,7 +35,7 @@ #include "PacketReassembler.h" #include "MessageThread.h" -#define LIBTGVOIP_VERSION "2.2.2" +#define LIBTGVOIP_VERSION "2.2.3" #ifdef _WIN32 #undef GetCurrentTime @@ -412,6 +413,7 @@ namespace tgvoip{ virtual void SendExtra(Buffer& data, unsigned char type); void SendStreamFlags(Stream& stream); void InitializeTimers(); + void ResetEndpointPingStats(); private: struct Stream{ @@ -592,6 +594,7 @@ namespace tgvoip{ MessageThread messageThread; bool wasEstablished=false; bool receivedFirstStreamPacket=false; + std::atomic unsentStreamPackets; uint32_t initTimeoutID=MessageThread::INVALID_ID; uint32_t noStreamsNopID=MessageThread::INVALID_ID;