1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321 |
- #include "stdafx.h"
- #include "udp_client_server.h"
- #include "net_acks.h"
- #include <util/generic/guid.h>
- #include <util/system/hp_timer.h>
- #include <util/datetime/cputimer.h>
- #include <util/system/yield.h>
- #include <util/system/unaligned_mem.h>
- #include "block_chain.h"
- #include <util/system/shmat.h>
- #include "udp_debug.h"
- #include "udp_socket.h"
- #include "ib_cs.h"
- #include <library/cpp/netliba/socket/socket.h>
- #include <util/random/random.h>
- #include <util/system/sanitizers.h>
- #include <atomic>
- namespace NNetliba {
- // rely on UDP checksum in packets, check crc only for complete packets
- // UPDATE: looks like UDP checksum is not enough, network errors do happen, we saw 600+ retransmits of a ~1MB data packet
- const float UDP_TRANSFER_TIMEOUT = 90.0f;
- const float DEFAULT_MAX_WAIT_TIME = 1;
- const float UDP_KEEP_PEER_INFO = 600;
- // траффик может идти, а новых данных для конкретного пакета может не добавляться.
- // это возможно когда мы прерываем процесс в момент передачи и перезапускаем его на том же порту,
- // тогда на приемнике повиснет пакет. Этот пакет мы зашибем по этому таймауту
- const float UDP_MAX_INPUT_DATA_WAIT = UDP_TRANSFER_TIMEOUT * 2;
- enum {
- UDP_PACKET_SIZE_FULL = 8900, // used for ping to detect jumbo-frame support
- UDP_PACKET_SIZE = 8800, // max data in packet
- UDP_PACKET_SIZE_SMALL = 1350, // 1180 would be better taking into account that 1280 is guaranteed ipv6 minimum MTU
- UDP_PACKET_BUF_SIZE = UDP_PACKET_SIZE + 100,
- };
- //////////////////////////////////////////////////////////////////////////
- struct TUdpCompleteInTransfer {
- TGUID PacketGuid;
- };
- //////////////////////////////////////////////////////////////////////////
- struct TUdpRecvPacket {
- int DataStart, DataSize;
- ui32 BlockSum;
- // Data[] should be last member in struct, this fact is used to create truncated TUdpRecvPacket in CreateNewSmallPacket()
- char Data[UDP_PACKET_BUF_SIZE];
- };
- struct TUdpInTransfer {
- private:
- TVector<TUdpRecvPacket*> Packets;
- public:
- sockaddr_in6 ToAddress;
- int PacketSize, LastPacketSize;
- bool HasLastPacket;
- TVector<int> NewPacketsToAck;
- TCongestionControlPtr Congestion;
- float TimeSinceLastRecv;
- int Attempt;
- TGUID PacketGuid;
- int Crc32;
- TIntrusivePtr<TSharedMemory> SharedData;
- TRequesterPendingDataStats* Stats;
- TUdpInTransfer()
- : PacketSize(0)
- , LastPacketSize(0)
- , HasLastPacket(false)
- , TimeSinceLastRecv(0)
- , Attempt(0)
- , Crc32(0)
- , Stats(nullptr)
- {
- Zero(ToAddress);
- }
- ~TUdpInTransfer() {
- if (Stats) {
- Stats->InpCount -= 1;
- }
- EraseAllPackets();
- }
- void EraseAllPackets() {
- for (int i = 0; i < Packets.ysize(); ++i) {
- ErasePacket(i);
- }
- Packets.clear();
- HasLastPacket = false;
- }
- void AttachStats(TRequesterPendingDataStats* stats) {
- Stats = stats;
- Stats->InpCount += 1;
- Y_ASSERT(Packets.empty());
- }
- void ErasePacket(int id) {
- TUdpRecvPacket* pkt = Packets[id];
- if (pkt) {
- if (Stats) {
- Stats->InpDataSize -= PacketSize;
- }
- TRopeDataPacket::FreeBuf((char*)pkt);
- Packets[id] = nullptr;
- }
- }
- void AssignPacket(int id, TUdpRecvPacket* pkt) {
- ErasePacket(id);
- if (pkt && Stats) {
- Stats->InpDataSize += PacketSize;
- }
- Packets[id] = pkt;
- }
- int GetPacketCount() const {
- return Packets.ysize();
- }
- void SetPacketCount(int n) {
- Packets.resize(n, nullptr);
- }
- const TUdpRecvPacket* GetPacket(int id) const {
- return Packets[id];
- }
- TUdpRecvPacket* ExtractPacket(int id) {
- TUdpRecvPacket* res = Packets[id];
- if (res) {
- if (Stats) {
- Stats->InpDataSize -= PacketSize;
- }
- Packets[id] = nullptr;
- }
- return res;
- }
- };
- struct TUdpOutTransfer {
- sockaddr_in6 ToAddress;
- TAutoPtr<TRopeDataPacket> Data;
- int PacketCount;
- int PacketSize, LastPacketSize;
- TAckTracker AckTracker;
- int Attempt;
- TGUID PacketGuid;
- int Crc32;
- EPacketPriority PacketPriority;
- TRequesterPendingDataStats* Stats;
- TUdpOutTransfer()
- : PacketCount(0)
- , PacketSize(0)
- , LastPacketSize(0)
- , Attempt(0)
- , Crc32(0)
- , PacketPriority(PP_LOW)
- , Stats(nullptr)
- {
- Zero(ToAddress);
- }
- ~TUdpOutTransfer() {
- if (Stats) {
- Stats->OutCount -= 1;
- Stats->OutDataSize -= Data->GetSize();
- }
- }
- void AttachStats(TRequesterPendingDataStats* stats) {
- Stats = stats;
- Stats->OutCount += 1;
- Stats->OutDataSize += Data->GetSize();
- }
- };
- struct TTransferKey {
- TUdpAddress Address;
- int Id;
- };
- inline bool operator==(const TTransferKey& a, const TTransferKey& b) {
- return a.Address == b.Address && a.Id == b.Id;
- }
- struct TTransferKeyHash {
- int operator()(const TTransferKey& k) const {
- return (ui32)k.Address.Interface + (ui32)k.Address.Port * (ui32)389461 + (ui32)k.Id;
- }
- };
- struct TUdpAddressHash {
- int operator()(const TUdpAddress& addr) const {
- return (ui32)addr.Interface + (ui32)addr.Port * (ui32)389461;
- }
- };
- class TUdpHostRevBufAlloc: public TNonCopyable {
- TUdpRecvPacket* RecvPktBuf;
- void AllocNewBuf() {
- RecvPktBuf = (TUdpRecvPacket*)TRopeDataPacket::AllocBuf(sizeof(TUdpRecvPacket));
- }
- public:
- TUdpHostRevBufAlloc() {
- AllocNewBuf();
- }
- ~TUdpHostRevBufAlloc() {
- FreeBuf(RecvPktBuf);
- }
- void FreeBuf(TUdpRecvPacket* pkt) {
- TRopeDataPacket::FreeBuf((char*)pkt);
- }
- TUdpRecvPacket* ExtractPacket() {
- TUdpRecvPacket* res = RecvPktBuf;
- AllocNewBuf();
- return res;
- }
- TUdpRecvPacket* CreateNewSmallPacket(int sz) {
- int pktStructSz = sizeof(TUdpRecvPacket) - Y_ARRAY_SIZE(RecvPktBuf->Data) + sz;
- TUdpRecvPacket* pkt = (TUdpRecvPacket*)TRopeDataPacket::AllocBuf(pktStructSz);
- return pkt;
- }
- int GetBufSize() const {
- return Y_ARRAY_SIZE(RecvPktBuf->Data);
- }
- char* GetDataPtr() const {
- return RecvPktBuf->Data;
- }
- };
- static TAtomic transferIdCounter = (long)(GetCycleCount() & 0x1fffffff);
- inline int GetTransferId() {
- int res = AtomicAdd(transferIdCounter, 1);
- while (res < 0) {
- // negative transfer ids are treated as errors, so wrap transfer id
- AtomicCas(&transferIdCounter, 0, transferIdCounter);
- res = AtomicAdd(transferIdCounter, 1);
- }
- return res;
- }
- static bool IBDetection = true;
- class TUdpHost: public IUdpHost {
- struct TPeerLink {
- TIntrusivePtr<TCongestionControl> UdpCongestion;
- TIntrusivePtr<IIBPeer> IBPeer;
- double TimeNoActiveTransfers;
- TPeerLink()
- : TimeNoActiveTransfers(0)
- {
- }
- bool Update(float deltaT, const TUdpAddress& toAddress, float* maxWaitTime) {
- bool updateOk = UdpCongestion->UpdateAlive(toAddress, deltaT, UDP_TRANSFER_TIMEOUT, maxWaitTime);
- return updateOk;
- }
- void StartSleep(const TUdpAddress& toAddress, float* maxWaitTime) {
- //printf("peer_link start sleep, IBPeer = %p, refs = %d\n", IBPeer.Get(), (int)IBPeer.RefCount());
- UdpCongestion->UpdateAlive(toAddress, 0, UDP_TRANSFER_TIMEOUT, maxWaitTime);
- UdpCongestion->MarkAlive();
- TimeNoActiveTransfers = 0;
- }
- bool UpdateSleep(float deltaT) {
- TimeNoActiveTransfers += deltaT;
- if (IBPeer.Get()) {
- //printf("peer_link update sleep, IBPeer = %p, refs = %d\n", IBPeer.Get(), (int)IBPeer.RefCount());
- if (IBPeer->GetState() == IIBPeer::OK) {
- return true;
- }
- //printf("Drop broken IB connection\n");
- IBPeer = nullptr;
- }
- return (TimeNoActiveTransfers < UDP_KEEP_PEER_INFO);
- }
- };
- TNetSocket s;
- typedef THashMap<TTransferKey, TUdpInTransfer, TTransferKeyHash> TUdpInXferHash;
- typedef THashMap<TTransferKey, TUdpOutTransfer, TTransferKeyHash> TUdpOutXferHash;
- // congestion control per peer
- typedef THashMap<TUdpAddress, TPeerLink, TUdpAddressHash> TPeerLinkHash;
- typedef THashMap<TTransferKey, TUdpCompleteInTransfer, TTransferKeyHash> TUdpCompleteInXferHash;
- typedef THashMap<TUdpAddress, TIntrusivePtr<TPeerQueueStats>, TUdpAddressHash> TQueueStatsHash;
- TUdpInXferHash RecvQueue;
- TUdpCompleteInXferHash RecvCompleted;
- TUdpOutXferHash SendQueue;
- TPeerLinkHash CongestionTrack, CongestionTrackHistory;
- TList<TRequest*> ReceivedList;
- NHPTimer::STime CurrentT;
- TList<TSendResult> SendResults;
- TList<TTransferKey> SendOrderLow, SendOrder, SendOrderHighPrior;
- TAtomic IsWaiting;
- float MaxWaitTime;
- std::atomic<float> MaxWaitTime2;
- float IBIdleTime;
- TVector<TTransferKey> RecvCompletedQueue, KeepCompletedQueue;
- float TimeSinceCompletedQueueClean, TimeSinceCongestionHistoryUpdate;
- TRequesterPendingDataStats PendingDataStats;
- TQueueStatsHash PeerQueueStats;
- TIntrusivePtr<IIBClientServer> IB;
- typedef THashMap<TIBMsgHandle, TTransferKey> TIBtoTransferKeyHash;
- TIBtoTransferKeyHash IBKeyToTransferKey;
- char PktBuf[UDP_PACKET_BUF_SIZE];
- TUdpHostRevBufAlloc RecvBuf;
- TPeerLink& GetPeerLink(const TUdpAddress& ip) {
- TPeerLinkHash::iterator z = CongestionTrack.find(ip);
- if (z == CongestionTrack.end()) {
- z = CongestionTrackHistory.find(ip);
- if (z == CongestionTrackHistory.end()) {
- TPeerLink& res = CongestionTrack[ip];
- Y_ASSERT(res.UdpCongestion.Get() == nullptr);
- res.UdpCongestion = new TCongestionControl;
- TQueueStatsHash::iterator zq = PeerQueueStats.find(ip);
- if (zq != PeerQueueStats.end()) {
- res.UdpCongestion->AttachQueueStats(zq->second);
- }
- return res;
- } else {
- TPeerLink& res = CongestionTrack[z->first];
- res = z->second;
- CongestionTrackHistory.erase(z);
- return res;
- }
- } else {
- Y_ASSERT(CongestionTrackHistory.find(ip) == CongestionTrackHistory.end());
- return z->second;
- }
- }
- void SucceededSend(int id) {
- SendResults.push_back(TSendResult(id, true));
- }
- void FailedSend(int id) {
- SendResults.push_back(TSendResult(id, false));
- }
- void SendData(TList<TTransferKey>* order, float deltaT, bool needCheckAlive);
- void RecvCycle();
- public:
- TUdpHost()
- : CurrentT(0)
- , IsWaiting(0)
- , MaxWaitTime(DEFAULT_MAX_WAIT_TIME)
- , MaxWaitTime2(DEFAULT_MAX_WAIT_TIME)
- , IBIdleTime(0)
- , TimeSinceCompletedQueueClean(0)
- , TimeSinceCongestionHistoryUpdate(0)
- {
- }
- ~TUdpHost() override {
- for (TList<TRequest*>::const_iterator i = ReceivedList.begin(); i != ReceivedList.end(); ++i)
- delete *i;
- }
- bool Start(const TIntrusivePtr<NNetlibaSocket::ISocket>& socket);
- TRequest* GetRequest() override {
- if (ReceivedList.empty()) {
- if (IB.Get()) {
- return IB->GetRequest();
- }
- return nullptr;
- }
- TRequest* res = ReceivedList.front();
- ReceivedList.pop_front();
- return res;
- }
- void AddToSendOrder(const TTransferKey& transferKey, EPacketPriority pp) {
- if (pp == PP_LOW)
- SendOrderLow.push_back(transferKey);
- else if (pp == PP_NORMAL)
- SendOrder.push_back(transferKey);
- else if (pp == PP_HIGH)
- SendOrderHighPrior.push_back(transferKey);
- else
- Y_ASSERT(0);
- CancelWait();
- }
- int Send(const TUdpAddress& addr, TAutoPtr<TRopeDataPacket> data, int crc32, TGUID* packetGuid, EPacketPriority pp) override {
- if (addr.Port == 0) {
- // shortcut for broken addresses
- if (packetGuid && packetGuid->IsEmpty())
- CreateGuid(packetGuid);
- int reqId = GetTransferId();
- FailedSend(reqId);
- return reqId;
- }
- TTransferKey transferKey;
- transferKey.Address = addr;
- transferKey.Id = GetTransferId();
- Y_ASSERT(SendQueue.find(transferKey) == SendQueue.end());
- TPeerLink& peerInfo = GetPeerLink(transferKey.Address);
- TUdpOutTransfer& xfer = SendQueue[transferKey];
- GetWinsockAddr(&xfer.ToAddress, transferKey.Address);
- xfer.Crc32 = crc32;
- xfer.PacketPriority = pp;
- if (!packetGuid || packetGuid->IsEmpty()) {
- CreateGuid(&xfer.PacketGuid);
- if (packetGuid)
- *packetGuid = xfer.PacketGuid;
- } else {
- xfer.PacketGuid = *packetGuid;
- }
- xfer.Data.Reset(data.Release());
- xfer.AttachStats(&PendingDataStats);
- xfer.AckTracker.AttachCongestionControl(peerInfo.UdpCongestion.Get());
- bool isSentOverIB = false;
- // we don't support priorities (=service levels in IB terms) currently
- // so send only PP_NORMAL traffic over IB
- if (pp == PP_NORMAL && peerInfo.IBPeer.Get() && xfer.Data->GetSharedData() == nullptr) {
- TIBMsgHandle hndl = IB->Send(peerInfo.IBPeer, xfer.Data.Get(), xfer.PacketGuid);
- if (hndl >= 0) {
- IBKeyToTransferKey[hndl] = transferKey;
- isSentOverIB = true;
- } else {
- // so we failed to use IB, ibPeer is either not connected yet or failed
- if (peerInfo.IBPeer->GetState() == IIBPeer::FAILED) {
- //printf("Disconnect failed IB peer\n");
- peerInfo.IBPeer = nullptr;
- }
- }
- }
- if (!isSentOverIB) {
- AddToSendOrder(transferKey, pp);
- }
- return transferKey.Id;
- }
- bool GetSendResult(TSendResult* res) override {
- if (SendResults.empty()) {
- if (IB.Get()) {
- TIBSendResult sr;
- if (IB->GetSendResult(&sr)) {
- TIBtoTransferKeyHash::iterator z = IBKeyToTransferKey.find(sr.Handle);
- if (z == IBKeyToTransferKey.end()) {
- Y_ABORT_UNLESS(0, "unknown handle returned from IB");
- }
- TTransferKey transferKey = z->second;
- IBKeyToTransferKey.erase(z);
- TUdpOutXferHash::iterator i = SendQueue.find(transferKey);
- if (i == SendQueue.end()) {
- Y_ABORT_UNLESS(0, "IBKeyToTransferKey refers nonexisting xfer");
- }
- if (sr.Success) {
- TUdpOutTransfer& xfer = i->second;
- xfer.AckTracker.MarkAlive(); // do we really need this?
- *res = TSendResult(transferKey.Id, sr.Success);
- SendQueue.erase(i);
- return true;
- } else {
- //printf("IB send failed, fall back to regular network\n");
- // Houston, we got a problem
- // IB failed to send, try to use regular network
- TUdpOutTransfer& xfer = i->second;
- AddToSendOrder(transferKey, xfer.PacketPriority);
- }
- }
- }
- return false;
- }
- *res = SendResults.front();
- SendResults.pop_front();
- return true;
- }
- void Step() override;
- void IBStep() override;
- void Wait(float seconds) override {
- if (seconds < 1e-3)
- seconds = 0;
- if (seconds > MaxWaitTime)
- seconds = MaxWaitTime;
- if (IBIdleTime < 0.010) {
- seconds = 0;
- }
- if (seconds == 0) {
- ThreadYield();
- } else {
- AtomicAdd(IsWaiting, 1);
- if (seconds > MaxWaitTime2)
- seconds = MaxWaitTime2;
- MaxWaitTime2 = DEFAULT_MAX_WAIT_TIME;
- if (seconds == 0) {
- ThreadYield();
- } else {
- if (IB.Get()) {
- for (float done = 0; done < seconds;) {
- float deltaSleep = Min(seconds - done, 0.002f);
- s.Wait(deltaSleep);
- NHPTimer::STime tChk;
- NHPTimer::GetTime(&tChk);
- if (IB->Step(tChk)) {
- IBIdleTime = 0;
- break;
- }
- done += deltaSleep;
- }
- } else {
- s.Wait(seconds);
- }
- }
- AtomicAdd(IsWaiting, -1);
- }
- }
- void CancelWait() override {
- MaxWaitTime2 = 0;
- if (AtomicAdd(IsWaiting, 0) == 1) {
- s.SendSelfFakePacket();
- }
- }
- void GetPendingDataSize(TRequesterPendingDataStats* res) override {
- *res = PendingDataStats;
- #ifndef NDEBUG
- TRequesterPendingDataStats chk;
- for (TUdpOutXferHash::const_iterator i = SendQueue.begin(); i != SendQueue.end(); ++i) {
- TRopeDataPacket* pckt = i->second.Data.Get();
- if (pckt) {
- chk.OutDataSize += pckt->GetSize();
- ++chk.OutCount;
- }
- }
- for (TUdpInXferHash::const_iterator i = RecvQueue.begin(); i != RecvQueue.end(); ++i) {
- const TUdpInTransfer& tr = i->second;
- for (int p = 0; p < tr.GetPacketCount(); ++p) {
- if (tr.GetPacket(p)) {
- chk.InpDataSize += tr.PacketSize;
- }
- }
- ++chk.InpCount;
- }
- Y_ASSERT(memcmp(&chk, res, sizeof(chk)) == 0);
- #endif
- }
- TString GetDebugInfo() override;
- TString GetPeerLinkDebug(const TPeerLinkHash& ch);
- void Kill(const TUdpAddress& addr) override;
- TIntrusivePtr<IPeerQueueStats> GetQueueStats(const TUdpAddress& addr) override;
- };
- bool TUdpHost::Start(const TIntrusivePtr<NNetlibaSocket::ISocket>& socket) {
- if (s.IsValid()) {
- Y_ASSERT(0);
- return false;
- }
- s.Open(socket);
- if (!s.IsValid())
- return false;
- if (IBDetection)
- IB = CreateIBClientServer();
- NHPTimer::GetTime(&CurrentT);
- return true;
- }
- static bool HasAllPackets(const TUdpInTransfer& res) {
- if (!res.HasLastPacket)
- return false;
- for (int i = res.GetPacketCount() - 1; i >= 0; --i) {
- if (!res.GetPacket(i))
- return false;
- }
- return true;
- }
- // grouped acks, first int - packet_id, second int - bit mask for 32 packets preceding packet_id
- const int SIZEOF_ACK = 8;
- static int WriteAck(TUdpInTransfer* p, int* dst, int maxAcks) {
- int ackCount = 0;
- if (p->NewPacketsToAck.size() > 1)
- Sort(p->NewPacketsToAck.begin(), p->NewPacketsToAck.end());
- int lastAcked = 0;
- for (size_t idx = 0; idx < p->NewPacketsToAck.size(); ++idx) {
- int pkt = p->NewPacketsToAck[idx];
- if (idx == p->NewPacketsToAck.size() - 1 || pkt > lastAcked + 30) {
- *dst++ = pkt;
- int bitMask = 0;
- int backPackets = Min(pkt, 32);
- for (int k = 0; k < backPackets; ++k) {
- if (p->GetPacket(pkt - k - 1))
- bitMask |= 1 << k;
- }
- *dst++ = bitMask;
- if (++ackCount >= maxAcks)
- break;
- lastAcked = pkt;
- //printf("sending ack %d (mask %x)\n", pkt, bitMask);
- }
- }
- p->NewPacketsToAck.clear();
- return ackCount;
- }
- static void AckPacket(TUdpOutTransfer* p, int pkt, float deltaT, bool updateRTT) {
- if (pkt < 0 || pkt >= p->PacketCount) {
- Y_ASSERT(0);
- return;
- }
- p->AckTracker.Ack(pkt, deltaT, updateRTT);
- }
- static void ReadAcks(TUdpOutTransfer* p, const int* acks, int ackCount, float deltaT) {
- for (int i = 0; i < ackCount; ++i) {
- int pkt = *acks++;
- int bitMask = *acks++;
- bool updateRTT = i == ackCount - 1; // update RTT using only last packet in the pack
- AckPacket(p, pkt, deltaT, updateRTT);
- for (int k = 0; k < 32; ++k) {
- if (bitMask & (1 << k))
- AckPacket(p, pkt - k - 1, deltaT, false);
- }
- }
- }
- using namespace NNetlibaSocket::NNetliba;
- const ui64 KILL_PASSPHRASE1 = 0x98ff9cefb11d9a4cul;
- const ui64 KILL_PASSPHRASE2 = 0xf7754c29e0be95eaul;
- template <class T>
- inline T Read(char** data) {
- T res = ReadUnaligned<T>(*data);
- *data += sizeof(T);
- return res;
- }
- template <class T>
- inline void Write(char** data, T res) {
- WriteUnaligned<T>(*data, res);
- *data += sizeof(T);
- }
- static void RequireResend(const TNetSocket& s, const sockaddr_in6& toAddress, int transferId, int attempt) {
- char buf[100], *pktData = buf + UDP_LOW_LEVEL_HEADER_SIZE;
- Write(&pktData, transferId);
- Write(&pktData, (char)ACK_RESEND);
- Write(&pktData, attempt);
- s.SendTo(buf, (int)(pktData - buf), toAddress, FF_ALLOW_FRAG);
- }
- static void RequireResendNoShmem(const TNetSocket& s, const sockaddr_in6& toAddress, int transferId, int attempt) {
- char buf[100], *pktData = buf + UDP_LOW_LEVEL_HEADER_SIZE;
- Write(&pktData, transferId);
- Write(&pktData, (char)ACK_RESEND_NOSHMEM);
- Write(&pktData, attempt);
- s.SendTo(buf, (int)(pktData - buf), toAddress, FF_ALLOW_FRAG);
- }
- static void AckComplete(const TNetSocket& s, const sockaddr_in6& toAddress, int transferId, const TGUID& packetGuid, int packetId) {
- char buf[100], *pktData = buf + UDP_LOW_LEVEL_HEADER_SIZE;
- Write(&pktData, transferId);
- Write(&pktData, (char)ACK_COMPLETE);
- Write(&pktData, packetGuid);
- Write(&pktData, packetId); // we need packetId to update RTT
- s.SendTo(buf, (int)(pktData - buf), toAddress, FF_ALLOW_FRAG);
- }
- static void SendPing(TNetSocket& s, const sockaddr_in6& toAddress, int selfNetworkOrderPort) {
- char pktBuf[UDP_PACKET_SIZE_FULL];
- char* pktData = pktBuf + UDP_LOW_LEVEL_HEADER_SIZE;
- if (NSan::MSanIsOn()) {
- Zero(pktBuf);
- }
- Write(&pktData, (int)0);
- Write(&pktData, (char)PING);
- Write(&pktData, selfNetworkOrderPort);
- s.SendTo(pktBuf, UDP_PACKET_SIZE_FULL, toAddress, FF_DONT_FRAG);
- }
- // not MTU discovery, just figure out IB address of the peer
- static void SendFakePing(TNetSocket& s, const sockaddr_in6& toAddress, int selfNetworkOrderPort) {
- char buf[100];
- char* pktData = buf + UDP_LOW_LEVEL_HEADER_SIZE;
- Write(&pktData, (int)0);
- Write(&pktData, (char)PING);
- Write(&pktData, selfNetworkOrderPort);
- s.SendTo(buf, (int)(pktData - buf), toAddress, FF_ALLOW_FRAG);
- }
- void TUdpHost::SendData(TList<TTransferKey>* order, float deltaT1, bool needCheckAlive) {
- for (TList<TTransferKey>::iterator z = order->begin(); z != order->end();) {
- // pick connection to send
- const TTransferKey& transferKey = *z;
- TUdpOutXferHash::iterator i = SendQueue.find(transferKey);
- if (i == SendQueue.end()) {
- z = order->erase(z);
- continue;
- }
- ++z;
- // perform sending
- int transferId = transferKey.Id;
- TUdpOutTransfer& xfer = i->second;
- if (!xfer.AckTracker.IsInitialized()) {
- TIntrusivePtr<TCongestionControl> congestion = xfer.AckTracker.GetCongestionControl();
- Y_ASSERT(congestion.Get() != nullptr);
- if (!congestion->IsKnownMTU()) {
- TLameMTUDiscovery* md = congestion->GetMTUDiscovery();
- if (md->IsTimedOut()) {
- congestion->SetMTU(UDP_PACKET_SIZE_SMALL);
- } else {
- if (md->CanSend()) {
- SendPing(s, xfer.ToAddress, s.GetNetworkOrderPort());
- md->PingSent();
- }
- continue;
- }
- }
- // try to use large mtu, we could have selected small mtu due to connectivity problems
- if (congestion->GetMTU() == UDP_PACKET_SIZE_SMALL || IB.Get() != nullptr) {
- // recheck every ~50mb
- int chkDenom = (50000000 / xfer.Data->GetSize()) | 1;
- if ((NetAckRnd() % chkDenom) == 0) {
- //printf("send rechecking ping\n");
- if (congestion->GetMTU() == UDP_PACKET_SIZE_SMALL) {
- SendPing(s, xfer.ToAddress, s.GetNetworkOrderPort());
- } else {
- SendFakePing(s, xfer.ToAddress, s.GetNetworkOrderPort());
- }
- }
- }
- xfer.PacketSize = congestion->GetMTU();
- xfer.LastPacketSize = xfer.Data->GetSize() % xfer.PacketSize;
- xfer.PacketCount = xfer.Data->GetSize() / xfer.PacketSize + 1;
- xfer.AckTracker.SetPacketCount(xfer.PacketCount);
- }
- xfer.AckTracker.Step(deltaT1);
- MaxWaitTime = Min(MaxWaitTime, xfer.AckTracker.GetTimeToNextPacketTimeout());
- if (needCheckAlive && !xfer.AckTracker.IsAlive()) {
- FailedSend(transferId);
- SendQueue.erase(i);
- continue;
- }
- bool sendBufferOverflow = false;
- while (xfer.AckTracker.CanSend()) {
- NHPTimer::STime tCopy = CurrentT;
- float deltaT2 = (float)NHPTimer::GetTimePassed(&tCopy);
- deltaT2 = ClampVal(deltaT2, 0.0f, UDP_TRANSFER_TIMEOUT / 3);
- int pkt = xfer.AckTracker.GetPacketToSend(deltaT2);
- if (pkt == -1) {
- break;
- }
- int dataSize = xfer.PacketSize;
- if (pkt == xfer.PacketCount - 1)
- dataSize = xfer.LastPacketSize;
- char* pktData = PktBuf + UDP_LOW_LEVEL_HEADER_SIZE;
- Write(&pktData, transferId);
- char pktType = xfer.PacketSize == UDP_PACKET_SIZE ? DATA : DATA_SMALL;
- TSharedMemory* shm = xfer.Data->GetSharedData();
- if (shm) {
- if (pktType == DATA)
- pktType = DATA_SHMEM;
- else
- pktType = DATA_SMALL_SHMEM;
- }
- Write(&pktData, pktType);
- Write(&pktData, xfer.Attempt);
- Write(&pktData, pkt);
- if (pkt == 0) {
- Write(&pktData, xfer.PacketGuid);
- Write(&pktData, xfer.Crc32);
- if (shm) {
- Write(&pktData, shm->GetId());
- Write(&pktData, shm->GetSize());
- }
- }
- TBlockChainIterator dataReader(xfer.Data->GetChain());
- dataReader.Seek(pkt * xfer.PacketSize);
- dataReader.Read(pktData, dataSize);
- pktData += dataSize;
- int sendSize = (int)(pktData - PktBuf);
- TNetSocket::ESendError sendErr = s.SendTo(PktBuf, sendSize, xfer.ToAddress, FF_ALLOW_FRAG);
- if (sendErr != TNetSocket::SEND_OK) {
- if (sendErr == TNetSocket::SEND_NO_ROUTE_TO_HOST) {
- FailedSend(transferId);
- SendQueue.erase(i);
- break;
- } else {
- // most probably out of send buffer space (or something terrible has happened)
- xfer.AckTracker.AddToResend(pkt);
- sendBufferOverflow = true;
- MaxWaitTime = 0;
- //printf("failed send\n");
- break;
- }
- }
- }
- if (sendBufferOverflow)
- break;
- }
- }
- void TUdpHost::RecvCycle() {
- for (;;) {
- sockaddr_in6 fromAddress;
- int rv = RecvBuf.GetBufSize();
- bool recvOk = s.RecvFrom(RecvBuf.GetDataPtr(), &rv, &fromAddress);
- if (!recvOk)
- break;
- NHPTimer::STime tCopy = CurrentT;
- float deltaT = (float)NHPTimer::GetTimePassed(&tCopy);
- deltaT = ClampVal(deltaT, 0.0f, UDP_TRANSFER_TIMEOUT / 3);
- //int fromIP = fromAddress.sin_addr.s_addr;
- TTransferKey k;
- char* pktData = RecvBuf.GetDataPtr() + UDP_LOW_LEVEL_HEADER_SIZE;
- GetUdpAddress(&k.Address, fromAddress);
- k.Id = Read<int>(&pktData);
- int transferId = k.Id;
- int cmd = Read<char>(&pktData);
- Y_ASSERT(cmd == (int)*(RecvBuf.GetDataPtr() + CMD_POS));
- switch (cmd) {
- case DATA:
- case DATA_SMALL:
- case DATA_SHMEM:
- case DATA_SMALL_SHMEM: {
- int attempt = Read<int>(&pktData);
- int packetId = Read<int>(&pktData);
- //printf("data packet %d (trans ID = %d)\n", packetId, transferId);
- TUdpCompleteInXferHash::iterator itCompl = RecvCompleted.find(k);
- if (itCompl != RecvCompleted.end()) {
- Y_ASSERT(RecvQueue.find(k) == RecvQueue.end());
- const TUdpCompleteInTransfer& complete = itCompl->second;
- bool sendAckComplete = true;
- if (packetId == 0) {
- // check packet GUID
- char* tmpPktData = pktData;
- TGUID packetGuid;
- packetGuid = Read<TGUID>(&tmpPktData);
- if (packetGuid != complete.PacketGuid) {
- // we are receiving new data with the same transferId
- // in this case we have to flush all the information about previous transfer
- // and start over
- //printf("same transferId for a different packet\n");
- RecvCompleted.erase(itCompl);
- sendAckComplete = false;
- }
- }
- if (sendAckComplete) {
- AckComplete(s, fromAddress, transferId, complete.PacketGuid, packetId);
- break;
- }
- }
- TUdpInXferHash::iterator rq = RecvQueue.find(k);
- if (rq == RecvQueue.end()) {
- //printf("new input transfer\n");
- TUdpInTransfer& res = RecvQueue[k];
- res.ToAddress = fromAddress;
- res.Attempt = attempt;
- res.Congestion = GetPeerLink(k.Address).UdpCongestion.Get();
- res.PacketSize = 0;
- res.HasLastPacket = false;
- res.AttachStats(&PendingDataStats);
- rq = RecvQueue.find(k);
- Y_ASSERT(rq != RecvQueue.end());
- }
- TUdpInTransfer& res = rq->second;
- res.Congestion->MarkAlive();
- res.TimeSinceLastRecv = 0;
- if (packetId == 0) {
- TGUID packetGuid;
- packetGuid = Read<TGUID>(&pktData);
- int crc32 = Read<int>(&pktData);
- res.Crc32 = crc32;
- res.PacketGuid = packetGuid;
- if (cmd == DATA_SHMEM || cmd == DATA_SMALL_SHMEM) {
- // link to attached shared memory
- TGUID shmemId = Read<TGUID>(&pktData);
- int shmemSize = Read<int>(&pktData);
- if (res.SharedData.Get() == nullptr) {
- res.SharedData = new TSharedMemory;
- if (!res.SharedData->Open(shmemId, shmemSize)) {
- res.SharedData = nullptr;
- RequireResendNoShmem(s, res.ToAddress, transferId, res.Attempt);
- break;
- }
- }
- }
- }
- if (attempt != res.Attempt) {
- RequireResend(s, res.ToAddress, transferId, res.Attempt);
- break;
- } else {
- if (res.PacketSize == 0) {
- res.PacketSize = (cmd == DATA || cmd == DATA_SHMEM ? UDP_PACKET_SIZE : UDP_PACKET_SIZE_SMALL);
- } else {
- // check that all data is of same size
- Y_ASSERT(cmd == DATA || cmd == DATA_SMALL);
- Y_ASSERT(res.PacketSize == (cmd == DATA ? UDP_PACKET_SIZE : UDP_PACKET_SIZE_SMALL));
- }
- int dataSize = (int)(RecvBuf.GetDataPtr() + rv - pktData);
- Y_ASSERT(dataSize <= res.PacketSize);
- if (dataSize > res.PacketSize)
- break; // mem overrun protection
- if (packetId >= res.GetPacketCount())
- res.SetPacketCount(packetId + 1);
- {
- TUdpRecvPacket* pkt = nullptr;
- if (res.PacketSize == UDP_PACKET_SIZE_SMALL) {
- // save memory by using smaller buffer at the cost of additional memcpy
- pkt = RecvBuf.CreateNewSmallPacket(dataSize);
- memcpy(pkt->Data, pktData, dataSize);
- pkt->DataStart = 0;
- pkt->DataSize = dataSize;
- } else {
- int dataStart = (int)(pktData - RecvBuf.GetDataPtr()); // data offset in the packet
- pkt = RecvBuf.ExtractPacket();
- pkt->DataStart = dataStart;
- pkt->DataSize = dataSize;
- }
- // calc packet sum, will be used to calc whole message crc
- pkt->BlockSum = TIncrementalChecksumCalcer::CalcBlockSum(pkt->Data + pkt->DataStart, pkt->DataSize);
- res.AssignPacket(packetId, pkt);
- }
- if (dataSize != res.PacketSize) {
- res.LastPacketSize = dataSize;
- res.HasLastPacket = true;
- }
- if (HasAllPackets(res)) {
- //printf("received\n");
- TRequest* out = new TRequest;
- out->Address = k.Address;
- out->Guid = res.PacketGuid;
- TIncrementalChecksumCalcer incCS;
- int packetCount = res.GetPacketCount();
- out->Data.Reset(new TRopeDataPacket);
- for (int i = 0; i < packetCount; ++i) {
- TUdpRecvPacket* pkt = res.ExtractPacket(i);
- Y_ASSERT(pkt->DataSize == ((i == packetCount - 1) ? res.LastPacketSize : res.PacketSize));
- out->Data->AddBlock((char*)pkt, pkt->Data + pkt->DataStart, pkt->DataSize);
- incCS.AddBlockSum(pkt->BlockSum, pkt->DataSize);
- }
- out->Data->AttachSharedData(res.SharedData);
- res.EraseAllPackets();
- int crc32 = incCS.CalcChecksum(); // CalcChecksum(out->Data->GetChain());
- #ifdef SIMULATE_NETWORK_FAILURES
- bool crcOk = crc32 == res.Crc32 ? (RandomNumber<size_t>() % 10) != 0 : false;
- #else
- bool crcOk = crc32 == res.Crc32;
- #endif
- if (crcOk) {
- ReceivedList.push_back(out);
- Y_ASSERT(RecvCompleted.find(k) == RecvCompleted.end());
- TUdpCompleteInTransfer& complete = RecvCompleted[k];
- RecvCompletedQueue.push_back(k);
- complete.PacketGuid = res.PacketGuid;
- AckComplete(s, res.ToAddress, transferId, complete.PacketGuid, packetId);
- RecvQueue.erase(rq);
- } else {
- //printf("crc failed, require resend\n");
- delete out;
- ++res.Attempt;
- res.NewPacketsToAck.clear();
- RequireResend(s, res.ToAddress, transferId, res.Attempt);
- }
- } else {
- res.NewPacketsToAck.push_back(packetId);
- }
- }
- } break;
- case ACK: {
- TUdpOutXferHash::iterator i = SendQueue.find(k);
- if (i == SendQueue.end())
- break;
- TUdpOutTransfer& xfer = i->second;
- if (!xfer.AckTracker.IsInitialized())
- break;
- xfer.AckTracker.MarkAlive();
- int attempt = Read<int>(&pktData);
- Y_ASSERT(attempt <= xfer.Attempt);
- if (attempt != xfer.Attempt)
- break;
- ReadAcks(&xfer, (int*)pktData, (int)(RecvBuf.GetDataPtr() + rv - pktData) / SIZEOF_ACK, deltaT);
- break;
- }
- case ACK_COMPLETE: {
- TUdpOutXferHash::iterator i = SendQueue.find(k);
- if (i == SendQueue.end())
- break;
- TUdpOutTransfer& xfer = i->second;
- xfer.AckTracker.MarkAlive();
- TGUID packetGuid;
- packetGuid = Read<TGUID>(&pktData);
- int packetId = Read<int>(&pktData);
- if (packetGuid == xfer.PacketGuid) {
- xfer.AckTracker.Ack(packetId, deltaT, true); // update RTT
- xfer.AckTracker.AckAll(); // acking packets is required, otherwise they will be treated as lost (look AckTracker destructor)
- SucceededSend(transferId);
- SendQueue.erase(i);
- } else {
- // peer asserts that he has received this packet but packetGuid is wrong
- // try to resend everything
- // ++xfer.Attempt; // should not do this, only sender can modify attempt number, otherwise cycle is possible with out of order packets
- xfer.AckTracker.Resend();
- }
- break;
- } break;
- case ACK_RESEND: {
- TUdpOutXferHash::iterator i = SendQueue.find(k);
- if (i == SendQueue.end())
- break;
- TUdpOutTransfer& xfer = i->second;
- xfer.AckTracker.MarkAlive();
- int attempt = Read<int>(&pktData);
- if (xfer.Attempt != attempt) {
- // reset current tranfser & initialize new one
- xfer.Attempt = attempt;
- xfer.AckTracker.Resend();
- }
- break;
- }
- case ACK_RESEND_NOSHMEM: {
- // abort execution here
- // failed to open shmem on recv side, need to transmit data without using shmem
- Y_ABORT_UNLESS(0, "not implemented yet");
- break;
- }
- case PING: {
- sockaddr_in6 trueFromAddress = fromAddress;
- int port = Read<int>(&pktData);
- Y_ASSERT(trueFromAddress.sin6_family == AF_INET6);
- trueFromAddress.sin6_port = port;
- // can not set MTU for fromAddress here since asymmetrical mtu is possible
- char* pktData2 = PktBuf + UDP_LOW_LEVEL_HEADER_SIZE;
- Write(&pktData2, (int)0);
- Write(&pktData2, (char)PONG);
- if (IB.Get()) {
- const TIBConnectInfo& ibConnectInfo = IB->GetConnectInfo();
- Write(&pktData2, ibConnectInfo);
- Write(&pktData2, trueFromAddress);
- }
- s.SendTo(PktBuf, pktData2 - PktBuf, trueFromAddress, FF_ALLOW_FRAG);
- break;
- }
- case PONG: {
- TPeerLink& peerInfo = GetPeerLink(k.Address);
- peerInfo.UdpCongestion->SetMTU(UDP_PACKET_SIZE);
- int dataSize = (int)(RecvBuf.GetDataPtr() + rv - pktData);
- if (dataSize == sizeof(TIBConnectInfo) + sizeof(sockaddr_in6)) {
- if (IB.Get() != nullptr && peerInfo.IBPeer.Get() == nullptr) {
- TIBConnectInfo info = Read<TIBConnectInfo>(&pktData);
- sockaddr_in6 myAddress = Read<sockaddr_in6>(&pktData);
- TUdpAddress myUdpAddress;
- GetUdpAddress(&myUdpAddress, myAddress);
- peerInfo.IBPeer = IB->ConnectPeer(info, k.Address, myUdpAddress);
- }
- }
- break;
- }
- case KILL: {
- ui64 p1 = Read<ui64>(&pktData);
- ui64 p2 = Read<ui64>(&pktData);
- int restSize = (int)(RecvBuf.GetDataPtr() + rv - pktData);
- if (restSize == 0 && p1 == KILL_PASSPHRASE1 && p2 == KILL_PASSPHRASE2) {
- abort();
- }
- break;
- }
- default:
- Y_ASSERT(0);
- break;
- }
- }
- }
- void TUdpHost::IBStep() {
- if (IB.Get()) {
- NHPTimer::STime tChk = CurrentT;
- float chkDeltaT = (float)NHPTimer::GetTimePassed(&tChk);
- if (IB->Step(tChk)) {
- IBIdleTime = -chkDeltaT;
- }
- }
- }
- void TUdpHost::Step() {
- if (IB.Get()) {
- NHPTimer::STime tChk = CurrentT;
- float chkDeltaT = (float)NHPTimer::GetTimePassed(&tChk);
- if (IB->Step(tChk)) {
- IBIdleTime = -chkDeltaT;
- }
- if (chkDeltaT < 0.0005) {
- return;
- }
- }
- if (UseTOSforAcks) {
- s.SetTOS(0x20);
- } else {
- s.SetTOS(0);
- }
- RecvCycle();
- float deltaT = (float)NHPTimer::GetTimePassed(&CurrentT);
- deltaT = ClampVal(deltaT, 0.0f, UDP_TRANSFER_TIMEOUT / 3);
- MaxWaitTime = DEFAULT_MAX_WAIT_TIME;
- IBIdleTime += deltaT;
- bool needCheckAlive = false;
- // update alive ports
- const float INACTIVE_CONGESTION_UPDATE_INTERVAL = 1;
- TimeSinceCongestionHistoryUpdate += deltaT;
- if (TimeSinceCongestionHistoryUpdate > INACTIVE_CONGESTION_UPDATE_INTERVAL) {
- for (TPeerLinkHash::iterator i = CongestionTrackHistory.begin(); i != CongestionTrackHistory.end();) {
- TPeerLink& pl = i->second;
- if (!pl.UpdateSleep(TimeSinceCongestionHistoryUpdate)) {
- TPeerLinkHash::iterator k = i++;
- CongestionTrackHistory.erase(k);
- needCheckAlive = true;
- } else {
- ++i;
- }
- }
- TimeSinceCongestionHistoryUpdate = 0;
- }
- for (TPeerLinkHash::iterator i = CongestionTrack.begin(); i != CongestionTrack.end();) {
- const TUdpAddress& addr = i->first;
- TPeerLink& pl = i->second;
- if (pl.UdpCongestion->GetTransferCount() == 0) {
- pl.StartSleep(addr, &MaxWaitTime);
- CongestionTrackHistory[i->first] = i->second;
- TPeerLinkHash::iterator k = i++;
- CongestionTrack.erase(k);
- } else if (!pl.Update(deltaT, addr, &MaxWaitTime)) {
- TPeerLinkHash::iterator k = i++;
- CongestionTrack.erase(k);
- needCheckAlive = true;
- } else {
- ++i;
- }
- }
- // send acks on received data
- for (TUdpInXferHash::iterator i = RecvQueue.begin(); i != RecvQueue.end();) {
- const TTransferKey& transKey = i->first;
- int transferId = transKey.Id;
- TUdpInTransfer& xfer = i->second;
- xfer.TimeSinceLastRecv += deltaT;
- if (xfer.TimeSinceLastRecv > UDP_MAX_INPUT_DATA_WAIT || (needCheckAlive && !xfer.Congestion->IsAlive())) {
- TUdpInXferHash::iterator k = i++;
- RecvQueue.erase(k);
- continue;
- }
- Y_ASSERT(RecvCompleted.find(i->first) == RecvCompleted.end()); // state "Complete & incomplete" is incorrect
- if (!xfer.NewPacketsToAck.empty()) {
- char* pktData = PktBuf + UDP_LOW_LEVEL_HEADER_SIZE;
- Write(&pktData, transferId);
- Write(&pktData, (char)ACK);
- Write(&pktData, xfer.Attempt);
- int acks = WriteAck(&xfer, (int*)pktData, (int)(xfer.PacketSize - (pktData - PktBuf)) / SIZEOF_ACK);
- pktData += acks * SIZEOF_ACK;
- s.SendTo(PktBuf, (int)(pktData - PktBuf), xfer.ToAddress, FF_ALLOW_FRAG);
- }
- ++i;
- }
- if (UseTOSforAcks) {
- s.SetTOS(0x60);
- }
- // send data for outbound connections
- SendData(&SendOrderHighPrior, deltaT, needCheckAlive);
- SendData(&SendOrder, deltaT, needCheckAlive);
- SendData(&SendOrderLow, deltaT, needCheckAlive);
- // roll send order to avoid exotic problems with lots of peers and high traffic
- SendOrderHighPrior.splice(SendOrderHighPrior.end(), SendOrderHighPrior, SendOrderHighPrior.begin());
- //SendOrder.splice(SendOrder.end(), SendOrder, SendOrder.begin()); // sending data in order has lower delay and shorter queue
- // clean completed queue
- TimeSinceCompletedQueueClean += deltaT;
- if (TimeSinceCompletedQueueClean > UDP_TRANSFER_TIMEOUT * 1.5) {
- for (size_t i = 0; i < KeepCompletedQueue.size(); ++i) {
- TUdpCompleteInXferHash::iterator k = RecvCompleted.find(KeepCompletedQueue[i]);
- if (k != RecvCompleted.end())
- RecvCompleted.erase(k);
- }
- KeepCompletedQueue.clear();
- KeepCompletedQueue.swap(RecvCompletedQueue);
- TimeSinceCompletedQueueClean = 0;
- }
- }
- TString TUdpHost::GetPeerLinkDebug(const TPeerLinkHash& ch) {
- TString res;
- char buf[1000];
- for (const auto& i : ch) {
- const TUdpAddress& ip = i.first;
- const TCongestionControl& cc = *i.second.UdpCongestion;
- IIBPeer* ibPeer = i.second.IBPeer.Get();
- snprintf(buf, sizeof(buf), "%s\tIB: %d, RTT: %g Timeout: %g Window: %g MaxWin: %g FailRate: %g TimeSinceLastRecv: %g Transfers: %d MTU: %d\n",
- GetAddressAsString(ip).c_str(),
- ibPeer ? ibPeer->GetState() : -1,
- cc.GetRTT() * 1000, cc.GetTimeout() * 1000, cc.GetWindow(), cc.GetMaxWindow(), cc.GetFailRate(),
- cc.GetTimeSinceLastRecv() * 1000, cc.GetTransferCount(), cc.GetMTU());
- res += buf;
- }
- return res;
- }
- TString TUdpHost::GetDebugInfo() {
- TString res;
- char buf[1000];
- snprintf(buf, sizeof(buf), "Receiving %d msgs, sending %d high prior, %d regular msgs, %d low prior msgs\n",
- RecvQueue.ysize(), (int)SendOrderHighPrior.size(), (int)SendOrder.size(), (int)SendOrderLow.size());
- res += buf;
- TRequesterPendingDataStats pds;
- GetPendingDataSize(&pds);
- snprintf(buf, sizeof(buf), "Pending data size: %" PRIu64 "\n", pds.InpDataSize + pds.OutDataSize);
- res += buf;
- snprintf(buf, sizeof(buf), " in packets: %d, size %" PRIu64 "\n", pds.InpCount, pds.InpDataSize);
- res += buf;
- snprintf(buf, sizeof(buf), " out packets: %d, size %" PRIu64 "\n", pds.OutCount, pds.OutDataSize);
- res += buf;
- res += "\nCongestion info:\n";
- res += GetPeerLinkDebug(CongestionTrack);
- res += "\nCongestion info history:\n";
- res += GetPeerLinkDebug(CongestionTrackHistory);
- return res;
- }
- static void SendKill(const TNetSocket& s, const sockaddr_in6& toAddress) {
- char buf[100];
- char* pktData = buf + UDP_LOW_LEVEL_HEADER_SIZE;
- Write(&pktData, (int)0);
- Write(&pktData, (char)KILL);
- Write(&pktData, KILL_PASSPHRASE1);
- Write(&pktData, KILL_PASSPHRASE2);
- s.SendTo(buf, (int)(pktData - buf), toAddress, FF_ALLOW_FRAG);
- }
- void TUdpHost::Kill(const TUdpAddress& addr) {
- sockaddr_in6 target;
- GetWinsockAddr(&target, addr);
- SendKill(s, target);
- }
- TIntrusivePtr<IPeerQueueStats> TUdpHost::GetQueueStats(const TUdpAddress& addr) {
- TQueueStatsHash::iterator zq = PeerQueueStats.find(addr);
- if (zq != PeerQueueStats.end()) {
- return zq->second.Get();
- }
- TPeerQueueStats* res = new TPeerQueueStats;
- PeerQueueStats[addr] = res;
- // attach to existing congestion tracker
- TPeerLinkHash::iterator z;
- z = CongestionTrack.find(addr);
- if (z != CongestionTrack.end()) {
- z->second.UdpCongestion->AttachQueueStats(res);
- }
- z = CongestionTrackHistory.find(addr);
- if (z != CongestionTrackHistory.end()) {
- z->second.UdpCongestion->AttachQueueStats(res);
- }
- return res;
- }
- //////////////////////////////////////////////////////////////////////////
- TIntrusivePtr<IUdpHost> CreateUdpHost(int port) {
- TIntrusivePtr<NNetlibaSocket::ISocket> socket = NNetlibaSocket::CreateBestRecvSocket();
- socket->Open(port);
- if (!socket->IsValid())
- return nullptr;
- return CreateUdpHost(socket);
- }
- TIntrusivePtr<IUdpHost> CreateUdpHost(const TIntrusivePtr<NNetlibaSocket::ISocket>& socket) {
- if (!InitLocalIPList()) {
- Y_ASSERT(0 && "Can not determine self IP address");
- return nullptr;
- }
- TIntrusivePtr<TUdpHost> res = new TUdpHost;
- if (!res->Start(socket))
- return nullptr;
- return res.Get();
- }
- void SetUdpMaxBandwidthPerIP(float f) {
- f = Max(0.0f, f);
- TCongestionControl::MaxPacketRate = f / UDP_PACKET_SIZE;
- }
- void SetUdpSlowStart(bool enable) {
- TCongestionControl::StartWindowSize = enable ? 0.5f : 3;
- }
- void DisableIBDetection() {
- IBDetection = false;
- }
- }
|