#include "stdafx.h" #include "net_acks.h" #include #include namespace NNetliba { const float RTT_AVERAGE_OVER = 15; float TCongestionControl::StartWindowSize = 3; float TCongestionControl::MaxPacketRate = 0; // unlimited bool UseTOSforAcks = false; //true;// void EnableUseTOSforAcks(bool enable) { UseTOSforAcks = enable; } float CONG_CTRL_CHANNEL_INFLATE = 1; void SetCongCtrlChannelInflate(float inflate) { CONG_CTRL_CHANNEL_INFLATE = inflate; } ////////////////////////////////////////////////////////////////////////// TPingTracker::TPingTracker() : AvrgRTT(CONG_CTRL_INITIAL_RTT) , AvrgRTT2(CONG_CTRL_INITIAL_RTT * CONG_CTRL_INITIAL_RTT) , RTTCount(0) { } void TPingTracker::RegisterRTT(float rtt) { Y_ASSERT(rtt > 0); float keep = RTTCount / (RTTCount + 1); AvrgRTT *= keep; AvrgRTT += (1 - keep) * rtt; AvrgRTT2 *= keep; AvrgRTT2 += (1 - keep) * Sqr(rtt); RTTCount = Min(RTTCount + 1, RTT_AVERAGE_OVER); //static int n; //if ((++n % 1024) == 0) // printf("Average RTT = %g (sko = %g)\n", GetRTT() * 1000, GetRTTSKO() * 1000); } void TPingTracker::IncreaseRTT() { const float F_RTT_DECAY_RATE = 1.1f; AvrgRTT *= F_RTT_DECAY_RATE; AvrgRTT2 *= Sqr(F_RTT_DECAY_RATE); } ////////////////////////////////////////////////////////////////////////// void TAckTracker::Resend() { CurrentPacket = 0; for (TPacketHash::const_iterator i = PacketsInFly.begin(); i != PacketsInFly.end(); ++i) Congestion->Failure(); // not actually correct but simplifies logic a lot PacketsInFly.clear(); DroppedPackets.clear(); ResendQueue.clear(); for (size_t i = 0; i < AckReceived.size(); ++i) AckReceived[i] = false; } int TAckTracker::SelectPacket() { if (!ResendQueue.empty()) { int res = ResendQueue.back(); ResendQueue.pop_back(); //printf("resending packet %d\n", res); return res; } if (CurrentPacket == PacketCount) { return -1; } return CurrentPacket++; } TAckTracker::~TAckTracker() { for (TPacketHash::const_iterator i = PacketsInFly.begin(); i != PacketsInFly.end(); ++i) Congestion->Failure(); // object will be incorrect state after this (failed packets are not added to resend queue), but who cares } int TAckTracker::GetPacketToSend(float deltaT) { int res = SelectPacket(); if (res == -1) { // needed to count time even if we don't have anything to send Congestion->HasTriedToSend(); return res; } Congestion->LaunchPacket(); PacketsInFly[res] = -deltaT; // deltaT is time since last Step(), so for the timing to be correct we should subtract it return res; } // called on SendTo() failure void TAckTracker::AddToResend(int pkt) { //printf("AddToResend(%d)\n", pkt); TPacketHash::iterator i = PacketsInFly.find(pkt); if (i != PacketsInFly.end()) { PacketsInFly.erase(i); Congestion->FailureOnSend(); ResendQueue.push_back(pkt); } else Y_ASSERT(0); } void TAckTracker::Ack(int pkt, float deltaT, bool updateRTT) { Y_ASSERT(pkt >= 0 && pkt < PacketCount); if (AckReceived[pkt]) return; AckReceived[pkt] = true; //printf("Ack received for %d\n", pkt); TPacketHash::iterator i = PacketsInFly.find(pkt); if (i == PacketsInFly.end()) { for (size_t k = 0; k < ResendQueue.size(); ++k) { if (ResendQueue[k] == pkt) { ResendQueue[k] = ResendQueue.back(); ResendQueue.pop_back(); break; } } TPacketHash::iterator z = DroppedPackets.find(pkt); if (z != DroppedPackets.end()) { // late packet arrived if (updateRTT) { float ping = z->second + deltaT; Congestion->RegisterRTT(ping); } DroppedPackets.erase(z); } else { // Y_ASSERT(0); // ack on nonsent packet, possible in resend scenario } return; } if (updateRTT) { float ping = i->second + deltaT; //printf("Register RTT %g\n", ping * 1000); Congestion->RegisterRTT(ping); } PacketsInFly.erase(i); Congestion->Success(); } void TAckTracker::AckAll() { for (TPacketHash::const_iterator i = PacketsInFly.begin(); i != PacketsInFly.end(); ++i) { int pkt = i->first; AckReceived[pkt] = true; Congestion->Success(); } PacketsInFly.clear(); } void TAckTracker::Step(float deltaT) { float timeoutVal = Congestion->GetTimeout(); //static int n; //if ((++n % 1024) == 0) // printf("timeout = %g, window = %g, fail_rate %g, pkt_rate = %g\n", timeoutVal * 1000, Congestion->GetWindow(), Congestion->GetFailRate(), (1 - Congestion->GetFailRate()) * Congestion->GetWindow() / Congestion->GetRTT()); TimeToNextPacketTimeout = 1000; // для окон меньше единицы мы кидаем рандом один раз за RTT на то, можно ли пускать пакет // поэтому можно ждать максимум RTT, после этого надо кинуть новый random if (Congestion->GetWindow() < 1) TimeToNextPacketTimeout = Congestion->GetRTT(); for (auto& droppedPacket : DroppedPackets) { float& t = droppedPacket.second; t += deltaT; } for (TPacketHash::iterator i = PacketsInFly.begin(); i != PacketsInFly.end();) { float& t = i->second; t += deltaT; if (t > timeoutVal) { //printf("packet %d timed out (timeout = %g)\n", i->first, timeoutVal); ResendQueue.push_back(i->first); DroppedPackets[i->first] = i->second; TPacketHash::iterator k = i++; PacketsInFly.erase(k); Congestion->Failure(); } else { TimeToNextPacketTimeout = Min(TimeToNextPacketTimeout, timeoutVal - t); ++i; } } } static std::atomic netAckRndVal = (ui32)GetCycleCount(); ui32 NetAckRnd() { const auto nextNetAckRndVal = static_cast(((ui64)netAckRndVal.load(std::memory_order_acquire) * 279470273) % 4294967291); netAckRndVal.store(nextNetAckRndVal, std::memory_order_release); return nextNetAckRndVal; } }