123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194 |
- #include "stdafx.h"
- #include "net_acks.h"
- #include <util/datetime/cputimer.h>
- #include <atomic>
- namespace NNetliba {
- const float RTT_AVERAGE_OVER = 15;
- float TCongestionControl::StartWindowSize = 3;
- float TCongestionControl::MaxPacketRate = 0; // unlimited
- bool UseTOSforAcks = false; //true;//
- void EnableUseTOSforAcks(bool enable) {
- UseTOSforAcks = enable;
- }
- float CONG_CTRL_CHANNEL_INFLATE = 1;
- void SetCongCtrlChannelInflate(float inflate) {
- CONG_CTRL_CHANNEL_INFLATE = inflate;
- }
- //////////////////////////////////////////////////////////////////////////
- TPingTracker::TPingTracker()
- : AvrgRTT(CONG_CTRL_INITIAL_RTT)
- , AvrgRTT2(CONG_CTRL_INITIAL_RTT * CONG_CTRL_INITIAL_RTT)
- , RTTCount(0)
- {
- }
- void TPingTracker::RegisterRTT(float rtt) {
- Y_ASSERT(rtt > 0);
- float keep = RTTCount / (RTTCount + 1);
- AvrgRTT *= keep;
- AvrgRTT += (1 - keep) * rtt;
- AvrgRTT2 *= keep;
- AvrgRTT2 += (1 - keep) * Sqr(rtt);
- RTTCount = Min(RTTCount + 1, RTT_AVERAGE_OVER);
- //static int n;
- //if ((++n % 1024) == 0)
- // printf("Average RTT = %g (sko = %g)\n", GetRTT() * 1000, GetRTTSKO() * 1000);
- }
- void TPingTracker::IncreaseRTT() {
- const float F_RTT_DECAY_RATE = 1.1f;
- AvrgRTT *= F_RTT_DECAY_RATE;
- AvrgRTT2 *= Sqr(F_RTT_DECAY_RATE);
- }
- //////////////////////////////////////////////////////////////////////////
- void TAckTracker::Resend() {
- CurrentPacket = 0;
- for (TPacketHash::const_iterator i = PacketsInFly.begin(); i != PacketsInFly.end(); ++i)
- Congestion->Failure(); // not actually correct but simplifies logic a lot
- PacketsInFly.clear();
- DroppedPackets.clear();
- ResendQueue.clear();
- for (size_t i = 0; i < AckReceived.size(); ++i)
- AckReceived[i] = false;
- }
- int TAckTracker::SelectPacket() {
- if (!ResendQueue.empty()) {
- int res = ResendQueue.back();
- ResendQueue.pop_back();
- //printf("resending packet %d\n", res);
- return res;
- }
- if (CurrentPacket == PacketCount) {
- return -1;
- }
- return CurrentPacket++;
- }
- TAckTracker::~TAckTracker() {
- for (TPacketHash::const_iterator i = PacketsInFly.begin(); i != PacketsInFly.end(); ++i)
- Congestion->Failure();
- // object will be incorrect state after this (failed packets are not added to resend queue), but who cares
- }
- int TAckTracker::GetPacketToSend(float deltaT) {
- int res = SelectPacket();
- if (res == -1) {
- // needed to count time even if we don't have anything to send
- Congestion->HasTriedToSend();
- return res;
- }
- Congestion->LaunchPacket();
- PacketsInFly[res] = -deltaT; // deltaT is time since last Step(), so for the timing to be correct we should subtract it
- return res;
- }
- // called on SendTo() failure
- void TAckTracker::AddToResend(int pkt) {
- //printf("AddToResend(%d)\n", pkt);
- TPacketHash::iterator i = PacketsInFly.find(pkt);
- if (i != PacketsInFly.end()) {
- PacketsInFly.erase(i);
- Congestion->FailureOnSend();
- ResendQueue.push_back(pkt);
- } else
- Y_ASSERT(0);
- }
- void TAckTracker::Ack(int pkt, float deltaT, bool updateRTT) {
- Y_ASSERT(pkt >= 0 && pkt < PacketCount);
- if (AckReceived[pkt])
- return;
- AckReceived[pkt] = true;
- //printf("Ack received for %d\n", pkt);
- TPacketHash::iterator i = PacketsInFly.find(pkt);
- if (i == PacketsInFly.end()) {
- for (size_t k = 0; k < ResendQueue.size(); ++k) {
- if (ResendQueue[k] == pkt) {
- ResendQueue[k] = ResendQueue.back();
- ResendQueue.pop_back();
- break;
- }
- }
- TPacketHash::iterator z = DroppedPackets.find(pkt);
- if (z != DroppedPackets.end()) {
- // late packet arrived
- if (updateRTT) {
- float ping = z->second + deltaT;
- Congestion->RegisterRTT(ping);
- }
- DroppedPackets.erase(z);
- } else {
- // Y_ASSERT(0); // ack on nonsent packet, possible in resend scenario
- }
- return;
- }
- if (updateRTT) {
- float ping = i->second + deltaT;
- //printf("Register RTT %g\n", ping * 1000);
- Congestion->RegisterRTT(ping);
- }
- PacketsInFly.erase(i);
- Congestion->Success();
- }
- void TAckTracker::AckAll() {
- for (TPacketHash::const_iterator i = PacketsInFly.begin(); i != PacketsInFly.end(); ++i) {
- int pkt = i->first;
- AckReceived[pkt] = true;
- Congestion->Success();
- }
- PacketsInFly.clear();
- }
- void TAckTracker::Step(float deltaT) {
- float timeoutVal = Congestion->GetTimeout();
- //static int n;
- //if ((++n % 1024) == 0)
- // printf("timeout = %g, window = %g, fail_rate %g, pkt_rate = %g\n", timeoutVal * 1000, Congestion->GetWindow(), Congestion->GetFailRate(), (1 - Congestion->GetFailRate()) * Congestion->GetWindow() / Congestion->GetRTT());
- TimeToNextPacketTimeout = 1000;
- // для окон меньше единицы мы кидаем рандом один раз за RTT на то, можно ли пускать пакет
- // поэтому можно ждать максимум RTT, после этого надо кинуть новый random
- if (Congestion->GetWindow() < 1)
- TimeToNextPacketTimeout = Congestion->GetRTT();
- for (auto& droppedPacket : DroppedPackets) {
- float& t = droppedPacket.second;
- t += deltaT;
- }
- for (TPacketHash::iterator i = PacketsInFly.begin(); i != PacketsInFly.end();) {
- float& t = i->second;
- t += deltaT;
- if (t > timeoutVal) {
- //printf("packet %d timed out (timeout = %g)\n", i->first, timeoutVal);
- ResendQueue.push_back(i->first);
- DroppedPackets[i->first] = i->second;
- TPacketHash::iterator k = i++;
- PacketsInFly.erase(k);
- Congestion->Failure();
- } else {
- TimeToNextPacketTimeout = Min(TimeToNextPacketTimeout, timeoutVal - t);
- ++i;
- }
- }
- }
- static std::atomic<ui32> netAckRndVal = (ui32)GetCycleCount();
- ui32 NetAckRnd() {
- const auto nextNetAckRndVal = static_cast<ui32>(((ui64)netAckRndVal.load(std::memory_order_acquire) * 279470273) % 4294967291);
- netAckRndVal.store(nextNetAckRndVal, std::memory_order_release);
- return nextNetAckRndVal;
- }
- }
|