#include "stdafx.h" #include "udp_client_server.h" #include "net_acks.h" #include #include #include #include #include #include "block_chain.h" #include #include "udp_debug.h" #include "udp_socket.h" #include "ib_cs.h" #include #include #include #include namespace NNetliba { // rely on UDP checksum in packets, check crc only for complete packets // UPDATE: looks like UDP checksum is not enough, network errors do happen, we saw 600+ retransmits of a ~1MB data packet const float UDP_TRANSFER_TIMEOUT = 90.0f; const float DEFAULT_MAX_WAIT_TIME = 1; const float UDP_KEEP_PEER_INFO = 600; // траффик может идти, а новых данных для конкретного пакета может не добавляться. // это возможно когда мы прерываем процесс в момент передачи и перезапускаем его на том же порту, // тогда на приемнике повиснет пакет. Этот пакет мы зашибем по этому таймауту const float UDP_MAX_INPUT_DATA_WAIT = UDP_TRANSFER_TIMEOUT * 2; constexpr int UDP_PACKET_SIZE_FULL = 8900; // used for ping to detect jumbo-frame support constexpr int UDP_PACKET_SIZE = 8800; // max data in packet constexpr int UDP_PACKET_SIZE_SMALL = 1350; // 1180 would be better taking into account that 1280 is guaranteed ipv6 minimum MTU constexpr int UDP_PACKET_BUF_SIZE = UDP_PACKET_SIZE + 100; ////////////////////////////////////////////////////////////////////////// struct TUdpCompleteInTransfer { TGUID PacketGuid; }; ////////////////////////////////////////////////////////////////////////// struct TUdpRecvPacket { int DataStart, DataSize; ui32 BlockSum; // Data[] should be last member in struct, this fact is used to create truncated TUdpRecvPacket in CreateNewSmallPacket() char Data[UDP_PACKET_BUF_SIZE]; }; struct TUdpInTransfer { private: TVector Packets; public: sockaddr_in6 ToAddress; int PacketSize, LastPacketSize; bool HasLastPacket; TVector NewPacketsToAck; TCongestionControlPtr Congestion; float TimeSinceLastRecv; int Attempt; TGUID PacketGuid; int Crc32; TIntrusivePtr SharedData; TRequesterPendingDataStats* Stats; TUdpInTransfer() : PacketSize(0) , LastPacketSize(0) , HasLastPacket(false) , TimeSinceLastRecv(0) , Attempt(0) , Crc32(0) , Stats(nullptr) { Zero(ToAddress); } ~TUdpInTransfer() { if (Stats) { Stats->InpCount -= 1; } EraseAllPackets(); } void EraseAllPackets() { for (int i = 0; i < Packets.ysize(); ++i) { ErasePacket(i); } Packets.clear(); HasLastPacket = false; } void AttachStats(TRequesterPendingDataStats* stats) { Stats = stats; Stats->InpCount += 1; Y_ASSERT(Packets.empty()); } void ErasePacket(int id) { TUdpRecvPacket* pkt = Packets[id]; if (pkt) { if (Stats) { Stats->InpDataSize -= PacketSize; } TRopeDataPacket::FreeBuf((char*)pkt); Packets[id] = nullptr; } } void AssignPacket(int id, TUdpRecvPacket* pkt) { ErasePacket(id); if (pkt && Stats) { Stats->InpDataSize += PacketSize; } Packets[id] = pkt; } int GetPacketCount() const { return Packets.ysize(); } void SetPacketCount(int n) { Packets.resize(n, nullptr); } const TUdpRecvPacket* GetPacket(int id) const { return Packets[id]; } TUdpRecvPacket* ExtractPacket(int id) { TUdpRecvPacket* res = Packets[id]; if (res) { if (Stats) { Stats->InpDataSize -= PacketSize; } Packets[id] = nullptr; } return res; } }; struct TUdpOutTransfer { sockaddr_in6 ToAddress; TAutoPtr Data; int PacketCount; int PacketSize, LastPacketSize; TAckTracker AckTracker; int Attempt; TGUID PacketGuid; int Crc32; EPacketPriority PacketPriority; TRequesterPendingDataStats* Stats; TUdpOutTransfer() : PacketCount(0) , PacketSize(0) , LastPacketSize(0) , Attempt(0) , Crc32(0) , PacketPriority(PP_LOW) , Stats(nullptr) { Zero(ToAddress); } ~TUdpOutTransfer() { if (Stats) { Stats->OutCount -= 1; Stats->OutDataSize -= Data->GetSize(); } } void AttachStats(TRequesterPendingDataStats* stats) { Stats = stats; Stats->OutCount += 1; Stats->OutDataSize += Data->GetSize(); } }; struct TTransferKey { TUdpAddress Address; int Id; }; inline bool operator==(const TTransferKey& a, const TTransferKey& b) { return a.Address == b.Address && a.Id == b.Id; } struct TTransferKeyHash { int operator()(const TTransferKey& k) const { return (ui32)k.Address.Interface + (ui32)k.Address.Port * (ui32)389461 + (ui32)k.Id; } }; struct TUdpAddressHash { int operator()(const TUdpAddress& addr) const { return (ui32)addr.Interface + (ui32)addr.Port * (ui32)389461; } }; class TUdpHostRevBufAlloc: public TNonCopyable { TUdpRecvPacket* RecvPktBuf; void AllocNewBuf() { RecvPktBuf = (TUdpRecvPacket*)TRopeDataPacket::AllocBuf(sizeof(TUdpRecvPacket)); } public: TUdpHostRevBufAlloc() { AllocNewBuf(); } ~TUdpHostRevBufAlloc() { FreeBuf(RecvPktBuf); } void FreeBuf(TUdpRecvPacket* pkt) { TRopeDataPacket::FreeBuf((char*)pkt); } TUdpRecvPacket* ExtractPacket() { TUdpRecvPacket* res = RecvPktBuf; AllocNewBuf(); return res; } TUdpRecvPacket* CreateNewSmallPacket(int sz) { int pktStructSz = sizeof(TUdpRecvPacket) - Y_ARRAY_SIZE(RecvPktBuf->Data) + sz; TUdpRecvPacket* pkt = (TUdpRecvPacket*)TRopeDataPacket::AllocBuf(pktStructSz); return pkt; } int GetBufSize() const { return Y_ARRAY_SIZE(RecvPktBuf->Data); } char* GetDataPtr() const { return RecvPktBuf->Data; } }; static TAtomic transferIdCounter = (long)(GetCycleCount() & 0x1fffffff); inline int GetTransferId() { int res = AtomicAdd(transferIdCounter, 1); while (res < 0) { // negative transfer ids are treated as errors, so wrap transfer id AtomicCas(&transferIdCounter, 0, transferIdCounter); res = AtomicAdd(transferIdCounter, 1); } return res; } static bool IBDetection = true; class TUdpHost: public IUdpHost { struct TPeerLink { TIntrusivePtr UdpCongestion; TIntrusivePtr IBPeer; double TimeNoActiveTransfers; TPeerLink() : TimeNoActiveTransfers(0) { } bool Update(float deltaT, const TUdpAddress& toAddress, float* maxWaitTime) { bool updateOk = UdpCongestion->UpdateAlive(toAddress, deltaT, UDP_TRANSFER_TIMEOUT, maxWaitTime); return updateOk; } void StartSleep(const TUdpAddress& toAddress, float* maxWaitTime) { //printf("peer_link start sleep, IBPeer = %p, refs = %d\n", IBPeer.Get(), (int)IBPeer.RefCount()); UdpCongestion->UpdateAlive(toAddress, 0, UDP_TRANSFER_TIMEOUT, maxWaitTime); UdpCongestion->MarkAlive(); TimeNoActiveTransfers = 0; } bool UpdateSleep(float deltaT) { TimeNoActiveTransfers += deltaT; if (IBPeer.Get()) { //printf("peer_link update sleep, IBPeer = %p, refs = %d\n", IBPeer.Get(), (int)IBPeer.RefCount()); if (IBPeer->GetState() == IIBPeer::OK) { return true; } //printf("Drop broken IB connection\n"); IBPeer = nullptr; } return (TimeNoActiveTransfers < UDP_KEEP_PEER_INFO); } }; TNetSocket s; typedef THashMap TUdpInXferHash; typedef THashMap TUdpOutXferHash; // congestion control per peer typedef THashMap TPeerLinkHash; typedef THashMap TUdpCompleteInXferHash; typedef THashMap, TUdpAddressHash> TQueueStatsHash; TUdpInXferHash RecvQueue; TUdpCompleteInXferHash RecvCompleted; TUdpOutXferHash SendQueue; TPeerLinkHash CongestionTrack, CongestionTrackHistory; TList ReceivedList; NHPTimer::STime CurrentT; TList SendResults; TList SendOrderLow, SendOrder, SendOrderHighPrior; TAtomic IsWaiting; float MaxWaitTime; std::atomic MaxWaitTime2; float IBIdleTime; TVector RecvCompletedQueue, KeepCompletedQueue; float TimeSinceCompletedQueueClean, TimeSinceCongestionHistoryUpdate; TRequesterPendingDataStats PendingDataStats; TQueueStatsHash PeerQueueStats; TIntrusivePtr IB; typedef THashMap TIBtoTransferKeyHash; TIBtoTransferKeyHash IBKeyToTransferKey; char PktBuf[UDP_PACKET_BUF_SIZE]; TUdpHostRevBufAlloc RecvBuf; TPeerLink& GetPeerLink(const TUdpAddress& ip) { TPeerLinkHash::iterator z = CongestionTrack.find(ip); if (z == CongestionTrack.end()) { z = CongestionTrackHistory.find(ip); if (z == CongestionTrackHistory.end()) { TPeerLink& res = CongestionTrack[ip]; Y_ASSERT(res.UdpCongestion.Get() == nullptr); res.UdpCongestion = new TCongestionControl; TQueueStatsHash::iterator zq = PeerQueueStats.find(ip); if (zq != PeerQueueStats.end()) { res.UdpCongestion->AttachQueueStats(zq->second); } return res; } else { TPeerLink& res = CongestionTrack[z->first]; res = z->second; CongestionTrackHistory.erase(z); return res; } } else { Y_ASSERT(CongestionTrackHistory.find(ip) == CongestionTrackHistory.end()); return z->second; } } void SucceededSend(int id) { SendResults.push_back(TSendResult(id, true)); } void FailedSend(int id) { SendResults.push_back(TSendResult(id, false)); } void SendData(TList* order, float deltaT, bool needCheckAlive); void RecvCycle(); public: TUdpHost() : CurrentT(0) , IsWaiting(0) , MaxWaitTime(DEFAULT_MAX_WAIT_TIME) , MaxWaitTime2(DEFAULT_MAX_WAIT_TIME) , IBIdleTime(0) , TimeSinceCompletedQueueClean(0) , TimeSinceCongestionHistoryUpdate(0) { } ~TUdpHost() override { for (TList::const_iterator i = ReceivedList.begin(); i != ReceivedList.end(); ++i) delete *i; } bool Start(const TIntrusivePtr& socket); TRequest* GetRequest() override { if (ReceivedList.empty()) { if (IB.Get()) { return IB->GetRequest(); } return nullptr; } TRequest* res = ReceivedList.front(); ReceivedList.pop_front(); return res; } void AddToSendOrder(const TTransferKey& transferKey, EPacketPriority pp) { if (pp == PP_LOW) SendOrderLow.push_back(transferKey); else if (pp == PP_NORMAL) SendOrder.push_back(transferKey); else if (pp == PP_HIGH) SendOrderHighPrior.push_back(transferKey); else Y_ASSERT(0); CancelWait(); } int Send(const TUdpAddress& addr, TAutoPtr data, int crc32, TGUID* packetGuid, EPacketPriority pp) override { if (addr.Port == 0) { // shortcut for broken addresses if (packetGuid && packetGuid->IsEmpty()) CreateGuid(packetGuid); int reqId = GetTransferId(); FailedSend(reqId); return reqId; } TTransferKey transferKey; transferKey.Address = addr; transferKey.Id = GetTransferId(); Y_ASSERT(SendQueue.find(transferKey) == SendQueue.end()); TPeerLink& peerInfo = GetPeerLink(transferKey.Address); TUdpOutTransfer& xfer = SendQueue[transferKey]; GetWinsockAddr(&xfer.ToAddress, transferKey.Address); xfer.Crc32 = crc32; xfer.PacketPriority = pp; if (!packetGuid || packetGuid->IsEmpty()) { CreateGuid(&xfer.PacketGuid); if (packetGuid) *packetGuid = xfer.PacketGuid; } else { xfer.PacketGuid = *packetGuid; } xfer.Data.Reset(data.Release()); xfer.AttachStats(&PendingDataStats); xfer.AckTracker.AttachCongestionControl(peerInfo.UdpCongestion.Get()); bool isSentOverIB = false; // we don't support priorities (=service levels in IB terms) currently // so send only PP_NORMAL traffic over IB if (pp == PP_NORMAL && peerInfo.IBPeer.Get() && xfer.Data->GetSharedData() == nullptr) { TIBMsgHandle hndl = IB->Send(peerInfo.IBPeer, xfer.Data.Get(), xfer.PacketGuid); if (hndl >= 0) { IBKeyToTransferKey[hndl] = transferKey; isSentOverIB = true; } else { // so we failed to use IB, ibPeer is either not connected yet or failed if (peerInfo.IBPeer->GetState() == IIBPeer::FAILED) { //printf("Disconnect failed IB peer\n"); peerInfo.IBPeer = nullptr; } } } if (!isSentOverIB) { AddToSendOrder(transferKey, pp); } return transferKey.Id; } bool GetSendResult(TSendResult* res) override { if (SendResults.empty()) { if (IB.Get()) { TIBSendResult sr; if (IB->GetSendResult(&sr)) { TIBtoTransferKeyHash::iterator z = IBKeyToTransferKey.find(sr.Handle); if (z == IBKeyToTransferKey.end()) { Y_ABORT_UNLESS(0, "unknown handle returned from IB"); } TTransferKey transferKey = z->second; IBKeyToTransferKey.erase(z); TUdpOutXferHash::iterator i = SendQueue.find(transferKey); if (i == SendQueue.end()) { Y_ABORT_UNLESS(0, "IBKeyToTransferKey refers nonexisting xfer"); } if (sr.Success) { TUdpOutTransfer& xfer = i->second; xfer.AckTracker.MarkAlive(); // do we really need this? *res = TSendResult(transferKey.Id, sr.Success); SendQueue.erase(i); return true; } else { //printf("IB send failed, fall back to regular network\n"); // Houston, we got a problem // IB failed to send, try to use regular network TUdpOutTransfer& xfer = i->second; AddToSendOrder(transferKey, xfer.PacketPriority); } } } return false; } *res = SendResults.front(); SendResults.pop_front(); return true; } void Step() override; void IBStep() override; void Wait(float seconds) override { if (seconds < 1e-3) seconds = 0; if (seconds > MaxWaitTime) seconds = MaxWaitTime; if (IBIdleTime < 0.010) { seconds = 0; } if (seconds == 0) { ThreadYield(); } else { AtomicAdd(IsWaiting, 1); if (seconds > MaxWaitTime2) seconds = MaxWaitTime2; MaxWaitTime2 = DEFAULT_MAX_WAIT_TIME; if (seconds == 0) { ThreadYield(); } else { if (IB.Get()) { for (float done = 0; done < seconds;) { float deltaSleep = Min(seconds - done, 0.002f); s.Wait(deltaSleep); NHPTimer::STime tChk; NHPTimer::GetTime(&tChk); if (IB->Step(tChk)) { IBIdleTime = 0; break; } done += deltaSleep; } } else { s.Wait(seconds); } } AtomicAdd(IsWaiting, -1); } } void CancelWait() override { MaxWaitTime2 = 0; if (AtomicAdd(IsWaiting, 0) == 1) { s.SendSelfFakePacket(); } } void GetPendingDataSize(TRequesterPendingDataStats* res) override { *res = PendingDataStats; #ifndef NDEBUG TRequesterPendingDataStats chk; for (TUdpOutXferHash::const_iterator i = SendQueue.begin(); i != SendQueue.end(); ++i) { TRopeDataPacket* pckt = i->second.Data.Get(); if (pckt) { chk.OutDataSize += pckt->GetSize(); ++chk.OutCount; } } for (TUdpInXferHash::const_iterator i = RecvQueue.begin(); i != RecvQueue.end(); ++i) { const TUdpInTransfer& tr = i->second; for (int p = 0; p < tr.GetPacketCount(); ++p) { if (tr.GetPacket(p)) { chk.InpDataSize += tr.PacketSize; } } ++chk.InpCount; } Y_ASSERT(memcmp(&chk, res, sizeof(chk)) == 0); #endif } TString GetDebugInfo() override; TString GetPeerLinkDebug(const TPeerLinkHash& ch); void Kill(const TUdpAddress& addr) override; TIntrusivePtr GetQueueStats(const TUdpAddress& addr) override; }; bool TUdpHost::Start(const TIntrusivePtr& socket) { if (s.IsValid()) { Y_ASSERT(0); return false; } s.Open(socket); if (!s.IsValid()) return false; if (IBDetection) IB = CreateIBClientServer(); NHPTimer::GetTime(&CurrentT); return true; } static bool HasAllPackets(const TUdpInTransfer& res) { if (!res.HasLastPacket) return false; for (int i = res.GetPacketCount() - 1; i >= 0; --i) { if (!res.GetPacket(i)) return false; } return true; } // grouped acks, first int - packet_id, second int - bit mask for 32 packets preceding packet_id const int SIZEOF_ACK = 8; static int WriteAck(TUdpInTransfer* p, int* dst, int maxAcks) { int ackCount = 0; if (p->NewPacketsToAck.size() > 1) Sort(p->NewPacketsToAck.begin(), p->NewPacketsToAck.end()); int lastAcked = 0; for (size_t idx = 0; idx < p->NewPacketsToAck.size(); ++idx) { int pkt = p->NewPacketsToAck[idx]; if (idx == p->NewPacketsToAck.size() - 1 || pkt > lastAcked + 30) { *dst++ = pkt; int bitMask = 0; int backPackets = Min(pkt, 32); for (int k = 0; k < backPackets; ++k) { if (p->GetPacket(pkt - k - 1)) bitMask |= 1 << k; } *dst++ = bitMask; if (++ackCount >= maxAcks) break; lastAcked = pkt; //printf("sending ack %d (mask %x)\n", pkt, bitMask); } } p->NewPacketsToAck.clear(); return ackCount; } static void AckPacket(TUdpOutTransfer* p, int pkt, float deltaT, bool updateRTT) { if (pkt < 0 || pkt >= p->PacketCount) { Y_ASSERT(0); return; } p->AckTracker.Ack(pkt, deltaT, updateRTT); } static void ReadAcks(TUdpOutTransfer* p, const int* acks, int ackCount, float deltaT) { for (int i = 0; i < ackCount; ++i) { int pkt = *acks++; int bitMask = *acks++; bool updateRTT = i == ackCount - 1; // update RTT using only last packet in the pack AckPacket(p, pkt, deltaT, updateRTT); for (int k = 0; k < 32; ++k) { if (bitMask & (1 << k)) AckPacket(p, pkt - k - 1, deltaT, false); } } } using namespace NNetlibaSocket::NNetliba; const ui64 KILL_PASSPHRASE1 = 0x98ff9cefb11d9a4cul; const ui64 KILL_PASSPHRASE2 = 0xf7754c29e0be95eaul; template inline T Read(char** data) { T res = ReadUnaligned(*data); *data += sizeof(T); return res; } template inline void Write(char** data, T res) { WriteUnaligned(*data, res); *data += sizeof(T); } static void RequireResend(const TNetSocket& s, const sockaddr_in6& toAddress, int transferId, int attempt) { char buf[100], *pktData = buf + UDP_LOW_LEVEL_HEADER_SIZE; Write(&pktData, transferId); Write(&pktData, (char)ACK_RESEND); Write(&pktData, attempt); s.SendTo(buf, (int)(pktData - buf), toAddress, FF_ALLOW_FRAG); } static void RequireResendNoShmem(const TNetSocket& s, const sockaddr_in6& toAddress, int transferId, int attempt) { char buf[100], *pktData = buf + UDP_LOW_LEVEL_HEADER_SIZE; Write(&pktData, transferId); Write(&pktData, (char)ACK_RESEND_NOSHMEM); Write(&pktData, attempt); s.SendTo(buf, (int)(pktData - buf), toAddress, FF_ALLOW_FRAG); } static void AckComplete(const TNetSocket& s, const sockaddr_in6& toAddress, int transferId, const TGUID& packetGuid, int packetId) { char buf[100], *pktData = buf + UDP_LOW_LEVEL_HEADER_SIZE; Write(&pktData, transferId); Write(&pktData, (char)ACK_COMPLETE); Write(&pktData, packetGuid); Write(&pktData, packetId); // we need packetId to update RTT s.SendTo(buf, (int)(pktData - buf), toAddress, FF_ALLOW_FRAG); } static void SendPing(TNetSocket& s, const sockaddr_in6& toAddress, int selfNetworkOrderPort) { char pktBuf[UDP_PACKET_SIZE_FULL]; char* pktData = pktBuf + UDP_LOW_LEVEL_HEADER_SIZE; if (NSan::MSanIsOn()) { Zero(pktBuf); } Write(&pktData, (int)0); Write(&pktData, (char)PING); Write(&pktData, selfNetworkOrderPort); s.SendTo(pktBuf, UDP_PACKET_SIZE_FULL, toAddress, FF_DONT_FRAG); } // not MTU discovery, just figure out IB address of the peer static void SendFakePing(TNetSocket& s, const sockaddr_in6& toAddress, int selfNetworkOrderPort) { char buf[100]; char* pktData = buf + UDP_LOW_LEVEL_HEADER_SIZE; Write(&pktData, (int)0); Write(&pktData, (char)PING); Write(&pktData, selfNetworkOrderPort); s.SendTo(buf, (int)(pktData - buf), toAddress, FF_ALLOW_FRAG); } void TUdpHost::SendData(TList* order, float deltaT1, bool needCheckAlive) { for (TList::iterator z = order->begin(); z != order->end();) { // pick connection to send const TTransferKey& transferKey = *z; TUdpOutXferHash::iterator i = SendQueue.find(transferKey); if (i == SendQueue.end()) { z = order->erase(z); continue; } ++z; // perform sending int transferId = transferKey.Id; TUdpOutTransfer& xfer = i->second; if (!xfer.AckTracker.IsInitialized()) { TIntrusivePtr congestion = xfer.AckTracker.GetCongestionControl(); Y_ASSERT(congestion.Get() != nullptr); if (!congestion->IsKnownMTU()) { TLameMTUDiscovery* md = congestion->GetMTUDiscovery(); if (md->IsTimedOut()) { congestion->SetMTU(UDP_PACKET_SIZE_SMALL); } else { if (md->CanSend()) { SendPing(s, xfer.ToAddress, s.GetNetworkOrderPort()); md->PingSent(); } continue; } } // try to use large mtu, we could have selected small mtu due to connectivity problems if (congestion->GetMTU() == UDP_PACKET_SIZE_SMALL || IB.Get() != nullptr) { // recheck every ~50mb int chkDenom = (50000000 / xfer.Data->GetSize()) | 1; if ((NetAckRnd() % chkDenom) == 0) { //printf("send rechecking ping\n"); if (congestion->GetMTU() == UDP_PACKET_SIZE_SMALL) { SendPing(s, xfer.ToAddress, s.GetNetworkOrderPort()); } else { SendFakePing(s, xfer.ToAddress, s.GetNetworkOrderPort()); } } } xfer.PacketSize = congestion->GetMTU(); xfer.LastPacketSize = xfer.Data->GetSize() % xfer.PacketSize; xfer.PacketCount = xfer.Data->GetSize() / xfer.PacketSize + 1; xfer.AckTracker.SetPacketCount(xfer.PacketCount); } xfer.AckTracker.Step(deltaT1); MaxWaitTime = Min(MaxWaitTime, xfer.AckTracker.GetTimeToNextPacketTimeout()); if (needCheckAlive && !xfer.AckTracker.IsAlive()) { FailedSend(transferId); SendQueue.erase(i); continue; } bool sendBufferOverflow = false; while (xfer.AckTracker.CanSend()) { NHPTimer::STime tCopy = CurrentT; float deltaT2 = (float)NHPTimer::GetTimePassed(&tCopy); deltaT2 = ClampVal(deltaT2, 0.0f, UDP_TRANSFER_TIMEOUT / 3); int pkt = xfer.AckTracker.GetPacketToSend(deltaT2); if (pkt == -1) { break; } int dataSize = xfer.PacketSize; if (pkt == xfer.PacketCount - 1) dataSize = xfer.LastPacketSize; char* pktData = PktBuf + UDP_LOW_LEVEL_HEADER_SIZE; Write(&pktData, transferId); char pktType = xfer.PacketSize == UDP_PACKET_SIZE ? DATA : DATA_SMALL; TSharedMemory* shm = xfer.Data->GetSharedData(); if (shm) { if (pktType == DATA) pktType = DATA_SHMEM; else pktType = DATA_SMALL_SHMEM; } Write(&pktData, pktType); Write(&pktData, xfer.Attempt); Write(&pktData, pkt); if (pkt == 0) { Write(&pktData, xfer.PacketGuid); Write(&pktData, xfer.Crc32); if (shm) { Write(&pktData, shm->GetId()); Write(&pktData, shm->GetSize()); } } TBlockChainIterator dataReader(xfer.Data->GetChain()); dataReader.Seek(pkt * xfer.PacketSize); dataReader.Read(pktData, dataSize); pktData += dataSize; int sendSize = (int)(pktData - PktBuf); TNetSocket::ESendError sendErr = s.SendTo(PktBuf, sendSize, xfer.ToAddress, FF_ALLOW_FRAG); if (sendErr != TNetSocket::SEND_OK) { if (sendErr == TNetSocket::SEND_NO_ROUTE_TO_HOST) { FailedSend(transferId); SendQueue.erase(i); break; } else { // most probably out of send buffer space (or something terrible has happened) xfer.AckTracker.AddToResend(pkt); sendBufferOverflow = true; MaxWaitTime = 0; //printf("failed send\n"); break; } } } if (sendBufferOverflow) break; } } void TUdpHost::RecvCycle() { for (;;) { sockaddr_in6 fromAddress; int rv = RecvBuf.GetBufSize(); bool recvOk = s.RecvFrom(RecvBuf.GetDataPtr(), &rv, &fromAddress); if (!recvOk) break; NHPTimer::STime tCopy = CurrentT; float deltaT = (float)NHPTimer::GetTimePassed(&tCopy); deltaT = ClampVal(deltaT, 0.0f, UDP_TRANSFER_TIMEOUT / 3); //int fromIP = fromAddress.sin_addr.s_addr; TTransferKey k; char* pktData = RecvBuf.GetDataPtr() + UDP_LOW_LEVEL_HEADER_SIZE; GetUdpAddress(&k.Address, fromAddress); k.Id = Read(&pktData); int transferId = k.Id; int cmd = Read(&pktData); Y_ASSERT(cmd == (int)*(RecvBuf.GetDataPtr() + CMD_POS)); switch (cmd) { case DATA: case DATA_SMALL: case DATA_SHMEM: case DATA_SMALL_SHMEM: { int attempt = Read(&pktData); int packetId = Read(&pktData); //printf("data packet %d (trans ID = %d)\n", packetId, transferId); TUdpCompleteInXferHash::iterator itCompl = RecvCompleted.find(k); if (itCompl != RecvCompleted.end()) { Y_ASSERT(RecvQueue.find(k) == RecvQueue.end()); const TUdpCompleteInTransfer& complete = itCompl->second; bool sendAckComplete = true; if (packetId == 0) { // check packet GUID char* tmpPktData = pktData; TGUID packetGuid; packetGuid = Read(&tmpPktData); if (packetGuid != complete.PacketGuid) { // we are receiving new data with the same transferId // in this case we have to flush all the information about previous transfer // and start over //printf("same transferId for a different packet\n"); RecvCompleted.erase(itCompl); sendAckComplete = false; } } if (sendAckComplete) { AckComplete(s, fromAddress, transferId, complete.PacketGuid, packetId); break; } } TUdpInXferHash::iterator rq = RecvQueue.find(k); if (rq == RecvQueue.end()) { //printf("new input transfer\n"); TUdpInTransfer& res = RecvQueue[k]; res.ToAddress = fromAddress; res.Attempt = attempt; res.Congestion = GetPeerLink(k.Address).UdpCongestion.Get(); res.PacketSize = 0; res.HasLastPacket = false; res.AttachStats(&PendingDataStats); rq = RecvQueue.find(k); Y_ASSERT(rq != RecvQueue.end()); } TUdpInTransfer& res = rq->second; res.Congestion->MarkAlive(); res.TimeSinceLastRecv = 0; if (packetId == 0) { TGUID packetGuid; packetGuid = Read(&pktData); int crc32 = Read(&pktData); res.Crc32 = crc32; res.PacketGuid = packetGuid; if (cmd == DATA_SHMEM || cmd == DATA_SMALL_SHMEM) { // link to attached shared memory TGUID shmemId = Read(&pktData); int shmemSize = Read(&pktData); if (res.SharedData.Get() == nullptr) { res.SharedData = new TSharedMemory; if (!res.SharedData->Open(shmemId, shmemSize)) { res.SharedData = nullptr; RequireResendNoShmem(s, res.ToAddress, transferId, res.Attempt); break; } } } } if (attempt != res.Attempt) { RequireResend(s, res.ToAddress, transferId, res.Attempt); break; } else { if (res.PacketSize == 0) { res.PacketSize = (cmd == DATA || cmd == DATA_SHMEM ? UDP_PACKET_SIZE : UDP_PACKET_SIZE_SMALL); } else { // check that all data is of same size Y_ASSERT(cmd == DATA || cmd == DATA_SMALL); Y_ASSERT(res.PacketSize == (cmd == DATA ? UDP_PACKET_SIZE : UDP_PACKET_SIZE_SMALL)); } int dataSize = (int)(RecvBuf.GetDataPtr() + rv - pktData); Y_ASSERT(dataSize <= res.PacketSize); if (dataSize > res.PacketSize) break; // mem overrun protection if (packetId >= res.GetPacketCount()) res.SetPacketCount(packetId + 1); { TUdpRecvPacket* pkt = nullptr; if (res.PacketSize == UDP_PACKET_SIZE_SMALL) { // save memory by using smaller buffer at the cost of additional memcpy pkt = RecvBuf.CreateNewSmallPacket(dataSize); memcpy(pkt->Data, pktData, dataSize); pkt->DataStart = 0; pkt->DataSize = dataSize; } else { int dataStart = (int)(pktData - RecvBuf.GetDataPtr()); // data offset in the packet pkt = RecvBuf.ExtractPacket(); pkt->DataStart = dataStart; pkt->DataSize = dataSize; } // calc packet sum, will be used to calc whole message crc pkt->BlockSum = TIncrementalChecksumCalcer::CalcBlockSum(pkt->Data + pkt->DataStart, pkt->DataSize); res.AssignPacket(packetId, pkt); } if (dataSize != res.PacketSize) { res.LastPacketSize = dataSize; res.HasLastPacket = true; } if (HasAllPackets(res)) { //printf("received\n"); TRequest* out = new TRequest; out->Address = k.Address; out->Guid = res.PacketGuid; TIncrementalChecksumCalcer incCS; int packetCount = res.GetPacketCount(); out->Data.Reset(new TRopeDataPacket); for (int i = 0; i < packetCount; ++i) { TUdpRecvPacket* pkt = res.ExtractPacket(i); Y_ASSERT(pkt->DataSize == ((i == packetCount - 1) ? res.LastPacketSize : res.PacketSize)); out->Data->AddBlock((char*)pkt, pkt->Data + pkt->DataStart, pkt->DataSize); incCS.AddBlockSum(pkt->BlockSum, pkt->DataSize); } out->Data->AttachSharedData(res.SharedData); res.EraseAllPackets(); int crc32 = incCS.CalcChecksum(); // CalcChecksum(out->Data->GetChain()); #ifdef SIMULATE_NETWORK_FAILURES bool crcOk = crc32 == res.Crc32 ? (RandomNumber() % 10) != 0 : false; #else bool crcOk = crc32 == res.Crc32; #endif if (crcOk) { ReceivedList.push_back(out); Y_ASSERT(RecvCompleted.find(k) == RecvCompleted.end()); TUdpCompleteInTransfer& complete = RecvCompleted[k]; RecvCompletedQueue.push_back(k); complete.PacketGuid = res.PacketGuid; AckComplete(s, res.ToAddress, transferId, complete.PacketGuid, packetId); RecvQueue.erase(rq); } else { //printf("crc failed, require resend\n"); delete out; ++res.Attempt; res.NewPacketsToAck.clear(); RequireResend(s, res.ToAddress, transferId, res.Attempt); } } else { res.NewPacketsToAck.push_back(packetId); } } } break; case ACK: { TUdpOutXferHash::iterator i = SendQueue.find(k); if (i == SendQueue.end()) break; TUdpOutTransfer& xfer = i->second; if (!xfer.AckTracker.IsInitialized()) break; xfer.AckTracker.MarkAlive(); int attempt = Read(&pktData); Y_ASSERT(attempt <= xfer.Attempt); if (attempt != xfer.Attempt) break; ReadAcks(&xfer, (int*)pktData, (int)(RecvBuf.GetDataPtr() + rv - pktData) / SIZEOF_ACK, deltaT); break; } case ACK_COMPLETE: { TUdpOutXferHash::iterator i = SendQueue.find(k); if (i == SendQueue.end()) break; TUdpOutTransfer& xfer = i->second; xfer.AckTracker.MarkAlive(); TGUID packetGuid; packetGuid = Read(&pktData); int packetId = Read(&pktData); if (packetGuid == xfer.PacketGuid) { xfer.AckTracker.Ack(packetId, deltaT, true); // update RTT xfer.AckTracker.AckAll(); // acking packets is required, otherwise they will be treated as lost (look AckTracker destructor) SucceededSend(transferId); SendQueue.erase(i); } else { // peer asserts that he has received this packet but packetGuid is wrong // try to resend everything // ++xfer.Attempt; // should not do this, only sender can modify attempt number, otherwise cycle is possible with out of order packets xfer.AckTracker.Resend(); } break; } break; case ACK_RESEND: { TUdpOutXferHash::iterator i = SendQueue.find(k); if (i == SendQueue.end()) break; TUdpOutTransfer& xfer = i->second; xfer.AckTracker.MarkAlive(); int attempt = Read(&pktData); if (xfer.Attempt != attempt) { // reset current tranfser & initialize new one xfer.Attempt = attempt; xfer.AckTracker.Resend(); } break; } case ACK_RESEND_NOSHMEM: { // abort execution here // failed to open shmem on recv side, need to transmit data without using shmem Y_ABORT_UNLESS(0, "not implemented yet"); break; } case PING: { sockaddr_in6 trueFromAddress = fromAddress; int port = Read(&pktData); Y_ASSERT(trueFromAddress.sin6_family == AF_INET6); trueFromAddress.sin6_port = port; // can not set MTU for fromAddress here since asymmetrical mtu is possible char* pktData2 = PktBuf + UDP_LOW_LEVEL_HEADER_SIZE; Write(&pktData2, (int)0); Write(&pktData2, (char)PONG); if (IB.Get()) { const TIBConnectInfo& ibConnectInfo = IB->GetConnectInfo(); Write(&pktData2, ibConnectInfo); Write(&pktData2, trueFromAddress); } s.SendTo(PktBuf, pktData2 - PktBuf, trueFromAddress, FF_ALLOW_FRAG); break; } case PONG: { TPeerLink& peerInfo = GetPeerLink(k.Address); peerInfo.UdpCongestion->SetMTU(UDP_PACKET_SIZE); int dataSize = (int)(RecvBuf.GetDataPtr() + rv - pktData); if (dataSize == sizeof(TIBConnectInfo) + sizeof(sockaddr_in6)) { if (IB.Get() != nullptr && peerInfo.IBPeer.Get() == nullptr) { TIBConnectInfo info = Read(&pktData); sockaddr_in6 myAddress = Read(&pktData); TUdpAddress myUdpAddress; GetUdpAddress(&myUdpAddress, myAddress); peerInfo.IBPeer = IB->ConnectPeer(info, k.Address, myUdpAddress); } } break; } case KILL: { ui64 p1 = Read(&pktData); ui64 p2 = Read(&pktData); int restSize = (int)(RecvBuf.GetDataPtr() + rv - pktData); if (restSize == 0 && p1 == KILL_PASSPHRASE1 && p2 == KILL_PASSPHRASE2) { abort(); } break; } default: Y_ASSERT(0); break; } } } void TUdpHost::IBStep() { if (IB.Get()) { NHPTimer::STime tChk = CurrentT; float chkDeltaT = (float)NHPTimer::GetTimePassed(&tChk); if (IB->Step(tChk)) { IBIdleTime = -chkDeltaT; } } } void TUdpHost::Step() { if (IB.Get()) { NHPTimer::STime tChk = CurrentT; float chkDeltaT = (float)NHPTimer::GetTimePassed(&tChk); if (IB->Step(tChk)) { IBIdleTime = -chkDeltaT; } if (chkDeltaT < 0.0005) { return; } } if (UseTOSforAcks) { s.SetTOS(0x20); } else { s.SetTOS(0); } RecvCycle(); float deltaT = (float)NHPTimer::GetTimePassed(&CurrentT); deltaT = ClampVal(deltaT, 0.0f, UDP_TRANSFER_TIMEOUT / 3); MaxWaitTime = DEFAULT_MAX_WAIT_TIME; IBIdleTime += deltaT; bool needCheckAlive = false; // update alive ports const float INACTIVE_CONGESTION_UPDATE_INTERVAL = 1; TimeSinceCongestionHistoryUpdate += deltaT; if (TimeSinceCongestionHistoryUpdate > INACTIVE_CONGESTION_UPDATE_INTERVAL) { for (TPeerLinkHash::iterator i = CongestionTrackHistory.begin(); i != CongestionTrackHistory.end();) { TPeerLink& pl = i->second; if (!pl.UpdateSleep(TimeSinceCongestionHistoryUpdate)) { TPeerLinkHash::iterator k = i++; CongestionTrackHistory.erase(k); needCheckAlive = true; } else { ++i; } } TimeSinceCongestionHistoryUpdate = 0; } for (TPeerLinkHash::iterator i = CongestionTrack.begin(); i != CongestionTrack.end();) { const TUdpAddress& addr = i->first; TPeerLink& pl = i->second; if (pl.UdpCongestion->GetTransferCount() == 0) { pl.StartSleep(addr, &MaxWaitTime); CongestionTrackHistory[i->first] = i->second; TPeerLinkHash::iterator k = i++; CongestionTrack.erase(k); } else if (!pl.Update(deltaT, addr, &MaxWaitTime)) { TPeerLinkHash::iterator k = i++; CongestionTrack.erase(k); needCheckAlive = true; } else { ++i; } } // send acks on received data for (TUdpInXferHash::iterator i = RecvQueue.begin(); i != RecvQueue.end();) { const TTransferKey& transKey = i->first; int transferId = transKey.Id; TUdpInTransfer& xfer = i->second; xfer.TimeSinceLastRecv += deltaT; if (xfer.TimeSinceLastRecv > UDP_MAX_INPUT_DATA_WAIT || (needCheckAlive && !xfer.Congestion->IsAlive())) { TUdpInXferHash::iterator k = i++; RecvQueue.erase(k); continue; } Y_ASSERT(RecvCompleted.find(i->first) == RecvCompleted.end()); // state "Complete & incomplete" is incorrect if (!xfer.NewPacketsToAck.empty()) { char* pktData = PktBuf + UDP_LOW_LEVEL_HEADER_SIZE; Write(&pktData, transferId); Write(&pktData, (char)ACK); Write(&pktData, xfer.Attempt); int acks = WriteAck(&xfer, (int*)pktData, (int)(xfer.PacketSize - (pktData - PktBuf)) / SIZEOF_ACK); pktData += acks * SIZEOF_ACK; s.SendTo(PktBuf, (int)(pktData - PktBuf), xfer.ToAddress, FF_ALLOW_FRAG); } ++i; } if (UseTOSforAcks) { s.SetTOS(0x60); } // send data for outbound connections SendData(&SendOrderHighPrior, deltaT, needCheckAlive); SendData(&SendOrder, deltaT, needCheckAlive); SendData(&SendOrderLow, deltaT, needCheckAlive); // roll send order to avoid exotic problems with lots of peers and high traffic SendOrderHighPrior.splice(SendOrderHighPrior.end(), SendOrderHighPrior, SendOrderHighPrior.begin()); //SendOrder.splice(SendOrder.end(), SendOrder, SendOrder.begin()); // sending data in order has lower delay and shorter queue // clean completed queue TimeSinceCompletedQueueClean += deltaT; if (TimeSinceCompletedQueueClean > UDP_TRANSFER_TIMEOUT * 1.5) { for (size_t i = 0; i < KeepCompletedQueue.size(); ++i) { TUdpCompleteInXferHash::iterator k = RecvCompleted.find(KeepCompletedQueue[i]); if (k != RecvCompleted.end()) RecvCompleted.erase(k); } KeepCompletedQueue.clear(); KeepCompletedQueue.swap(RecvCompletedQueue); TimeSinceCompletedQueueClean = 0; } } TString TUdpHost::GetPeerLinkDebug(const TPeerLinkHash& ch) { TString res; char buf[1000]; for (const auto& i : ch) { const TUdpAddress& ip = i.first; const TCongestionControl& cc = *i.second.UdpCongestion; IIBPeer* ibPeer = i.second.IBPeer.Get(); snprintf(buf, sizeof(buf), "%s\tIB: %d, RTT: %g Timeout: %g Window: %g MaxWin: %g FailRate: %g TimeSinceLastRecv: %g Transfers: %d MTU: %d\n", GetAddressAsString(ip).c_str(), ibPeer ? ibPeer->GetState() : -1, cc.GetRTT() * 1000, cc.GetTimeout() * 1000, cc.GetWindow(), cc.GetMaxWindow(), cc.GetFailRate(), cc.GetTimeSinceLastRecv() * 1000, cc.GetTransferCount(), cc.GetMTU()); res += buf; } return res; } TString TUdpHost::GetDebugInfo() { TString res; char buf[1000]; snprintf(buf, sizeof(buf), "Receiving %d msgs, sending %d high prior, %d regular msgs, %d low prior msgs\n", RecvQueue.ysize(), (int)SendOrderHighPrior.size(), (int)SendOrder.size(), (int)SendOrderLow.size()); res += buf; TRequesterPendingDataStats pds; GetPendingDataSize(&pds); snprintf(buf, sizeof(buf), "Pending data size: %" PRIu64 "\n", pds.InpDataSize + pds.OutDataSize); res += buf; snprintf(buf, sizeof(buf), " in packets: %d, size %" PRIu64 "\n", pds.InpCount, pds.InpDataSize); res += buf; snprintf(buf, sizeof(buf), " out packets: %d, size %" PRIu64 "\n", pds.OutCount, pds.OutDataSize); res += buf; res += "\nCongestion info:\n"; res += GetPeerLinkDebug(CongestionTrack); res += "\nCongestion info history:\n"; res += GetPeerLinkDebug(CongestionTrackHistory); return res; } static void SendKill(const TNetSocket& s, const sockaddr_in6& toAddress) { char buf[100]; char* pktData = buf + UDP_LOW_LEVEL_HEADER_SIZE; Write(&pktData, (int)0); Write(&pktData, (char)KILL); Write(&pktData, KILL_PASSPHRASE1); Write(&pktData, KILL_PASSPHRASE2); s.SendTo(buf, (int)(pktData - buf), toAddress, FF_ALLOW_FRAG); } void TUdpHost::Kill(const TUdpAddress& addr) { sockaddr_in6 target; GetWinsockAddr(&target, addr); SendKill(s, target); } TIntrusivePtr TUdpHost::GetQueueStats(const TUdpAddress& addr) { TQueueStatsHash::iterator zq = PeerQueueStats.find(addr); if (zq != PeerQueueStats.end()) { return zq->second.Get(); } TPeerQueueStats* res = new TPeerQueueStats; PeerQueueStats[addr] = res; // attach to existing congestion tracker TPeerLinkHash::iterator z; z = CongestionTrack.find(addr); if (z != CongestionTrack.end()) { z->second.UdpCongestion->AttachQueueStats(res); } z = CongestionTrackHistory.find(addr); if (z != CongestionTrackHistory.end()) { z->second.UdpCongestion->AttachQueueStats(res); } return res; } ////////////////////////////////////////////////////////////////////////// TIntrusivePtr CreateUdpHost(int port) { TIntrusivePtr socket = NNetlibaSocket::CreateBestRecvSocket(); socket->Open(port); if (!socket->IsValid()) return nullptr; return CreateUdpHost(socket); } TIntrusivePtr CreateUdpHost(const TIntrusivePtr& socket) { if (!InitLocalIPList()) { Y_ASSERT(0 && "Can not determine self IP address"); return nullptr; } TIntrusivePtr res = new TUdpHost; if (!res->Start(socket)) return nullptr; return res.Get(); } void SetUdpMaxBandwidthPerIP(float f) { f = Max(0.0f, f); TCongestionControl::MaxPacketRate = f / UDP_PACKET_SIZE; } void SetUdpSlowStart(bool enable) { TCongestionControl::StartWindowSize = enable ? 0.5f : 3; } void DisableIBDetection() { IBDetection = false; } }