udp_client_server.cpp 54 KB


  1. #include "stdafx.h"
  2. #include "udp_client_server.h"
  3. #include "net_acks.h"
  4. #include <util/generic/guid.h>
  5. #include <util/system/hp_timer.h>
  6. #include <util/datetime/cputimer.h>
  7. #include <util/system/yield.h>
  8. #include <util/system/unaligned_mem.h>
  9. #include "block_chain.h"
  10. #include <util/system/shmat.h>
  11. #include "udp_debug.h"
  12. #include "udp_socket.h"
  13. #include "ib_cs.h"
  14. #include <library/cpp/netliba/socket/socket.h>
  15. #include <util/random/random.h>
  16. #include <util/system/sanitizers.h>
  17. #include <atomic>
  18. namespace NNetliba {
  19. // rely on UDP checksum in packets, check crc only for complete packets
  20. // UPDATE: looks like UDP checksum is not enough, network errors do happen, we saw 600+ retransmits of a ~1MB data packet
  21. const float UDP_TRANSFER_TIMEOUT = 90.0f;
  22. const float DEFAULT_MAX_WAIT_TIME = 1;
  23. const float UDP_KEEP_PEER_INFO = 600;
  24. // траффик может идти, а новых данных для конкретного пакета может не добавляться.
  25. // это возможно когда мы прерываем процесс в момент передачи и перезапускаем его на том же порту,
  26. // тогда на приемнике повиснет пакет. Этот пакет мы зашибем по этому таймауту
  27. const float UDP_MAX_INPUT_DATA_WAIT = UDP_TRANSFER_TIMEOUT * 2;
  28. constexpr int UDP_PACKET_SIZE_FULL = 8900; // used for ping to detect jumbo-frame support
  29. constexpr int UDP_PACKET_SIZE = 8800; // max data in packet
  30. constexpr int UDP_PACKET_SIZE_SMALL = 1350; // 1180 would be better taking into account that 1280 is guaranteed ipv6 minimum MTU
  31. constexpr int UDP_PACKET_BUF_SIZE = UDP_PACKET_SIZE + 100;
  32. //////////////////////////////////////////////////////////////////////////
  33. struct TUdpCompleteInTransfer {
  34. TGUID PacketGuid;
  35. };
  36. //////////////////////////////////////////////////////////////////////////
  37. struct TUdpRecvPacket {
  38. int DataStart, DataSize;
  39. ui32 BlockSum;
  40. // Data[] should be last member in struct, this fact is used to create truncated TUdpRecvPacket in CreateNewSmallPacket()
  41. char Data[UDP_PACKET_BUF_SIZE];
  42. };
  43. struct TUdpInTransfer {
  44. private:
  45. TVector<TUdpRecvPacket*> Packets;
  46. public:
  47. sockaddr_in6 ToAddress;
  48. int PacketSize, LastPacketSize;
  49. bool HasLastPacket;
  50. TVector<int> NewPacketsToAck;
  51. TCongestionControlPtr Congestion;
  52. float TimeSinceLastRecv;
  53. int Attempt;
  54. TGUID PacketGuid;
  55. int Crc32;
  56. TIntrusivePtr<TSharedMemory> SharedData;
  57. TRequesterPendingDataStats* Stats;
  58. TUdpInTransfer()
  59. : PacketSize(0)
  60. , LastPacketSize(0)
  61. , HasLastPacket(false)
  62. , TimeSinceLastRecv(0)
  63. , Attempt(0)
  64. , Crc32(0)
  65. , Stats(nullptr)
  66. {
  67. Zero(ToAddress);
  68. }
  69. ~TUdpInTransfer() {
  70. if (Stats) {
  71. Stats->InpCount -= 1;
  72. }
  73. EraseAllPackets();
  74. }
  75. void EraseAllPackets() {
  76. for (int i = 0; i < Packets.ysize(); ++i) {
  77. ErasePacket(i);
  78. }
  79. Packets.clear();
  80. HasLastPacket = false;
  81. }
  82. void AttachStats(TRequesterPendingDataStats* stats) {
  83. Stats = stats;
  84. Stats->InpCount += 1;
  85. Y_ASSERT(Packets.empty());
  86. }
  87. void ErasePacket(int id) {
  88. TUdpRecvPacket* pkt = Packets[id];
  89. if (pkt) {
  90. if (Stats) {
  91. Stats->InpDataSize -= PacketSize;
  92. }
  93. TRopeDataPacket::FreeBuf((char*)pkt);
  94. Packets[id] = nullptr;
  95. }
  96. }
  97. void AssignPacket(int id, TUdpRecvPacket* pkt) {
  98. ErasePacket(id);
  99. if (pkt && Stats) {
  100. Stats->InpDataSize += PacketSize;
  101. }
  102. Packets[id] = pkt;
  103. }
  104. int GetPacketCount() const {
  105. return Packets.ysize();
  106. }
  107. void SetPacketCount(int n) {
  108. Packets.resize(n, nullptr);
  109. }
  110. const TUdpRecvPacket* GetPacket(int id) const {
  111. return Packets[id];
  112. }
  113. TUdpRecvPacket* ExtractPacket(int id) {
  114. TUdpRecvPacket* res = Packets[id];
  115. if (res) {
  116. if (Stats) {
  117. Stats->InpDataSize -= PacketSize;
  118. }
  119. Packets[id] = nullptr;
  120. }
  121. return res;
  122. }
  123. };
  124. struct TUdpOutTransfer {
  125. sockaddr_in6 ToAddress;
  126. TAutoPtr<TRopeDataPacket> Data;
  127. int PacketCount;
  128. int PacketSize, LastPacketSize;
  129. TAckTracker AckTracker;
  130. int Attempt;
  131. TGUID PacketGuid;
  132. int Crc32;
  133. EPacketPriority PacketPriority;
  134. TRequesterPendingDataStats* Stats;
  135. TUdpOutTransfer()
  136. : PacketCount(0)
  137. , PacketSize(0)
  138. , LastPacketSize(0)
  139. , Attempt(0)
  140. , Crc32(0)
  141. , PacketPriority(PP_LOW)
  142. , Stats(nullptr)
  143. {
  144. Zero(ToAddress);
  145. }
  146. ~TUdpOutTransfer() {
  147. if (Stats) {
  148. Stats->OutCount -= 1;
  149. Stats->OutDataSize -= Data->GetSize();
  150. }
  151. }
  152. void AttachStats(TRequesterPendingDataStats* stats) {
  153. Stats = stats;
  154. Stats->OutCount += 1;
  155. Stats->OutDataSize += Data->GetSize();
  156. }
  157. };
  158. struct TTransferKey {
  159. TUdpAddress Address;
  160. int Id;
  161. };
  162. inline bool operator==(const TTransferKey& a, const TTransferKey& b) {
  163. return a.Address == b.Address && a.Id == b.Id;
  164. }
  165. struct TTransferKeyHash {
  166. int operator()(const TTransferKey& k) const {
  167. return (ui32)k.Address.Interface + (ui32)k.Address.Port * (ui32)389461 + (ui32)k.Id;
  168. }
  169. };
  170. struct TUdpAddressHash {
  171. int operator()(const TUdpAddress& addr) const {
  172. return (ui32)addr.Interface + (ui32)addr.Port * (ui32)389461;
  173. }
  174. };
  175. class TUdpHostRevBufAlloc: public TNonCopyable {
  176. TUdpRecvPacket* RecvPktBuf;
  177. void AllocNewBuf() {
  178. RecvPktBuf = (TUdpRecvPacket*)TRopeDataPacket::AllocBuf(sizeof(TUdpRecvPacket));
  179. }
  180. public:
  181. TUdpHostRevBufAlloc() {
  182. AllocNewBuf();
  183. }
  184. ~TUdpHostRevBufAlloc() {
  185. FreeBuf(RecvPktBuf);
  186. }
  187. void FreeBuf(TUdpRecvPacket* pkt) {
  188. TRopeDataPacket::FreeBuf((char*)pkt);
  189. }
  190. TUdpRecvPacket* ExtractPacket() {
  191. TUdpRecvPacket* res = RecvPktBuf;
  192. AllocNewBuf();
  193. return res;
  194. }
  195. TUdpRecvPacket* CreateNewSmallPacket(int sz) {
  196. int pktStructSz = sizeof(TUdpRecvPacket) - Y_ARRAY_SIZE(RecvPktBuf->Data) + sz;
  197. TUdpRecvPacket* pkt = (TUdpRecvPacket*)TRopeDataPacket::AllocBuf(pktStructSz);
  198. return pkt;
  199. }
  200. int GetBufSize() const {
  201. return Y_ARRAY_SIZE(RecvPktBuf->Data);
  202. }
  203. char* GetDataPtr() const {
  204. return RecvPktBuf->Data;
  205. }
  206. };
  207. static TAtomic transferIdCounter = (long)(GetCycleCount() & 0x1fffffff);
  208. inline int GetTransferId() {
  209. int res = AtomicAdd(transferIdCounter, 1);
  210. while (res < 0) {
  211. // negative transfer ids are treated as errors, so wrap transfer id
  212. AtomicCas(&transferIdCounter, 0, transferIdCounter);
  213. res = AtomicAdd(transferIdCounter, 1);
  214. }
  215. return res;
  216. }
  217. static bool IBDetection = true;
  218. class TUdpHost: public IUdpHost {
  219. struct TPeerLink {
  220. TIntrusivePtr<TCongestionControl> UdpCongestion;
  221. TIntrusivePtr<IIBPeer> IBPeer;
  222. double TimeNoActiveTransfers;
  223. TPeerLink()
  224. : TimeNoActiveTransfers(0)
  225. {
  226. }
  227. bool Update(float deltaT, const TUdpAddress& toAddress, float* maxWaitTime) {
  228. bool updateOk = UdpCongestion->UpdateAlive(toAddress, deltaT, UDP_TRANSFER_TIMEOUT, maxWaitTime);
  229. return updateOk;
  230. }
  231. void StartSleep(const TUdpAddress& toAddress, float* maxWaitTime) {
  232. //printf("peer_link start sleep, IBPeer = %p, refs = %d\n", IBPeer.Get(), (int)IBPeer.RefCount());
  233. UdpCongestion->UpdateAlive(toAddress, 0, UDP_TRANSFER_TIMEOUT, maxWaitTime);
  234. UdpCongestion->MarkAlive();
  235. TimeNoActiveTransfers = 0;
  236. }
  237. bool UpdateSleep(float deltaT) {
  238. TimeNoActiveTransfers += deltaT;
  239. if (IBPeer.Get()) {
  240. //printf("peer_link update sleep, IBPeer = %p, refs = %d\n", IBPeer.Get(), (int)IBPeer.RefCount());
  241. if (IBPeer->GetState() == IIBPeer::OK) {
  242. return true;
  243. }
  244. //printf("Drop broken IB connection\n");
  245. IBPeer = nullptr;
  246. }
  247. return (TimeNoActiveTransfers < UDP_KEEP_PEER_INFO);
  248. }
  249. };
  250. TNetSocket s;
  251. typedef THashMap<TTransferKey, TUdpInTransfer, TTransferKeyHash> TUdpInXferHash;
  252. typedef THashMap<TTransferKey, TUdpOutTransfer, TTransferKeyHash> TUdpOutXferHash;
  253. // congestion control per peer
  254. typedef THashMap<TUdpAddress, TPeerLink, TUdpAddressHash> TPeerLinkHash;
  255. typedef THashMap<TTransferKey, TUdpCompleteInTransfer, TTransferKeyHash> TUdpCompleteInXferHash;
  256. typedef THashMap<TUdpAddress, TIntrusivePtr<TPeerQueueStats>, TUdpAddressHash> TQueueStatsHash;
  257. TUdpInXferHash RecvQueue;
  258. TUdpCompleteInXferHash RecvCompleted;
  259. TUdpOutXferHash SendQueue;
  260. TPeerLinkHash CongestionTrack, CongestionTrackHistory;
  261. TList<TRequest*> ReceivedList;
  262. NHPTimer::STime CurrentT;
  263. TList<TSendResult> SendResults;
  264. TList<TTransferKey> SendOrderLow, SendOrder, SendOrderHighPrior;
  265. TAtomic IsWaiting;
  266. float MaxWaitTime;
  267. std::atomic<float> MaxWaitTime2;
  268. float IBIdleTime;
  269. TVector<TTransferKey> RecvCompletedQueue, KeepCompletedQueue;
  270. float TimeSinceCompletedQueueClean, TimeSinceCongestionHistoryUpdate;
  271. TRequesterPendingDataStats PendingDataStats;
  272. TQueueStatsHash PeerQueueStats;
  273. TIntrusivePtr<IIBClientServer> IB;
  274. typedef THashMap<TIBMsgHandle, TTransferKey> TIBtoTransferKeyHash;
  275. TIBtoTransferKeyHash IBKeyToTransferKey;
  276. char PktBuf[UDP_PACKET_BUF_SIZE];
  277. TUdpHostRevBufAlloc RecvBuf;
  278. TPeerLink& GetPeerLink(const TUdpAddress& ip) {
  279. TPeerLinkHash::iterator z = CongestionTrack.find(ip);
  280. if (z == CongestionTrack.end()) {
  281. z = CongestionTrackHistory.find(ip);
  282. if (z == CongestionTrackHistory.end()) {
  283. TPeerLink& res = CongestionTrack[ip];
  284. Y_ASSERT(res.UdpCongestion.Get() == nullptr);
  285. res.UdpCongestion = new TCongestionControl;
  286. TQueueStatsHash::iterator zq = PeerQueueStats.find(ip);
  287. if (zq != PeerQueueStats.end()) {
  288. res.UdpCongestion->AttachQueueStats(zq->second);
  289. }
  290. return res;
  291. } else {
  292. TPeerLink& res = CongestionTrack[z->first];
  293. res = z->second;
  294. CongestionTrackHistory.erase(z);
  295. return res;
  296. }
  297. } else {
  298. Y_ASSERT(CongestionTrackHistory.find(ip) == CongestionTrackHistory.end());
  299. return z->second;
  300. }
  301. }
  302. void SucceededSend(int id) {
  303. SendResults.push_back(TSendResult(id, true));
  304. }
  305. void FailedSend(int id) {
  306. SendResults.push_back(TSendResult(id, false));
  307. }
  308. void SendData(TList<TTransferKey>* order, float deltaT, bool needCheckAlive);
  309. void RecvCycle();
  310. public:
  311. TUdpHost()
  312. : CurrentT(0)
  313. , IsWaiting(0)
  314. , MaxWaitTime(DEFAULT_MAX_WAIT_TIME)
  315. , MaxWaitTime2(DEFAULT_MAX_WAIT_TIME)
  316. , IBIdleTime(0)
  317. , TimeSinceCompletedQueueClean(0)
  318. , TimeSinceCongestionHistoryUpdate(0)
  319. {
  320. }
  321. ~TUdpHost() override {
  322. for (TList<TRequest*>::const_iterator i = ReceivedList.begin(); i != ReceivedList.end(); ++i)
  323. delete *i;
  324. }
  325. bool Start(const TIntrusivePtr<NNetlibaSocket::ISocket>& socket);
  326. TRequest* GetRequest() override {
  327. if (ReceivedList.empty()) {
  328. if (IB.Get()) {
  329. return IB->GetRequest();
  330. }
  331. return nullptr;
  332. }
  333. TRequest* res = ReceivedList.front();
  334. ReceivedList.pop_front();
  335. return res;
  336. }
  337. void AddToSendOrder(const TTransferKey& transferKey, EPacketPriority pp) {
  338. if (pp == PP_LOW)
  339. SendOrderLow.push_back(transferKey);
  340. else if (pp == PP_NORMAL)
  341. SendOrder.push_back(transferKey);
  342. else if (pp == PP_HIGH)
  343. SendOrderHighPrior.push_back(transferKey);
  344. else
  345. Y_ASSERT(0);
  346. CancelWait();
  347. }
  348. int Send(const TUdpAddress& addr, TAutoPtr<TRopeDataPacket> data, int crc32, TGUID* packetGuid, EPacketPriority pp) override {
  349. if (addr.Port == 0) {
  350. // shortcut for broken addresses
  351. if (packetGuid && packetGuid->IsEmpty())
  352. CreateGuid(packetGuid);
  353. int reqId = GetTransferId();
  354. FailedSend(reqId);
  355. return reqId;
  356. }
  357. TTransferKey transferKey;
  358. transferKey.Address = addr;
  359. transferKey.Id = GetTransferId();
  360. Y_ASSERT(SendQueue.find(transferKey) == SendQueue.end());
  361. TPeerLink& peerInfo = GetPeerLink(transferKey.Address);
  362. TUdpOutTransfer& xfer = SendQueue[transferKey];
  363. GetWinsockAddr(&xfer.ToAddress, transferKey.Address);
  364. xfer.Crc32 = crc32;
  365. xfer.PacketPriority = pp;
  366. if (!packetGuid || packetGuid->IsEmpty()) {
  367. CreateGuid(&xfer.PacketGuid);
  368. if (packetGuid)
  369. *packetGuid = xfer.PacketGuid;
  370. } else {
  371. xfer.PacketGuid = *packetGuid;
  372. }
  373. xfer.Data.Reset(data.Release());
  374. xfer.AttachStats(&PendingDataStats);
  375. xfer.AckTracker.AttachCongestionControl(peerInfo.UdpCongestion.Get());
  376. bool isSentOverIB = false;
  377. // we don't support priorities (=service levels in IB terms) currently
  378. // so send only PP_NORMAL traffic over IB
  379. if (pp == PP_NORMAL && peerInfo.IBPeer.Get() && xfer.Data->GetSharedData() == nullptr) {
  380. TIBMsgHandle hndl = IB->Send(peerInfo.IBPeer, xfer.Data.Get(), xfer.PacketGuid);
  381. if (hndl >= 0) {
  382. IBKeyToTransferKey[hndl] = transferKey;
  383. isSentOverIB = true;
  384. } else {
  385. // so we failed to use IB, ibPeer is either not connected yet or failed
  386. if (peerInfo.IBPeer->GetState() == IIBPeer::FAILED) {
  387. //printf("Disconnect failed IB peer\n");
  388. peerInfo.IBPeer = nullptr;
  389. }
  390. }
  391. }
  392. if (!isSentOverIB) {
  393. AddToSendOrder(transferKey, pp);
  394. }
  395. return transferKey.Id;
  396. }
  397. bool GetSendResult(TSendResult* res) override {
  398. if (SendResults.empty()) {
  399. if (IB.Get()) {
  400. TIBSendResult sr;
  401. if (IB->GetSendResult(&sr)) {
  402. TIBtoTransferKeyHash::iterator z = IBKeyToTransferKey.find(sr.Handle);
  403. if (z == IBKeyToTransferKey.end()) {
  404. Y_ABORT_UNLESS(0, "unknown handle returned from IB");
  405. }
  406. TTransferKey transferKey = z->second;
  407. IBKeyToTransferKey.erase(z);
  408. TUdpOutXferHash::iterator i = SendQueue.find(transferKey);
  409. if (i == SendQueue.end()) {
  410. Y_ABORT_UNLESS(0, "IBKeyToTransferKey refers nonexisting xfer");
  411. }
  412. if (sr.Success) {
  413. TUdpOutTransfer& xfer = i->second;
  414. xfer.AckTracker.MarkAlive(); // do we really need this?
  415. *res = TSendResult(transferKey.Id, sr.Success);
  416. SendQueue.erase(i);
  417. return true;
  418. } else {
  419. //printf("IB send failed, fall back to regular network\n");
  420. // Houston, we got a problem
  421. // IB failed to send, try to use regular network
  422. TUdpOutTransfer& xfer = i->second;
  423. AddToSendOrder(transferKey, xfer.PacketPriority);
  424. }
  425. }
  426. }
  427. return false;
  428. }
  429. *res = SendResults.front();
  430. SendResults.pop_front();
  431. return true;
  432. }
  433. void Step() override;
  434. void IBStep() override;
  435. void Wait(float seconds) override {
  436. if (seconds < 1e-3)
  437. seconds = 0;
  438. if (seconds > MaxWaitTime)
  439. seconds = MaxWaitTime;
  440. if (IBIdleTime < 0.010) {
  441. seconds = 0;
  442. }
  443. if (seconds == 0) {
  444. ThreadYield();
  445. } else {
  446. AtomicAdd(IsWaiting, 1);
  447. if (seconds > MaxWaitTime2)
  448. seconds = MaxWaitTime2;
  449. MaxWaitTime2 = DEFAULT_MAX_WAIT_TIME;
  450. if (seconds == 0) {
  451. ThreadYield();
  452. } else {
  453. if (IB.Get()) {
  454. for (float done = 0; done < seconds;) {
  455. float deltaSleep = Min(seconds - done, 0.002f);
  456. s.Wait(deltaSleep);
  457. NHPTimer::STime tChk;
  458. NHPTimer::GetTime(&tChk);
  459. if (IB->Step(tChk)) {
  460. IBIdleTime = 0;
  461. break;
  462. }
  463. done += deltaSleep;
  464. }
  465. } else {
  466. s.Wait(seconds);
  467. }
  468. }
  469. AtomicAdd(IsWaiting, -1);
  470. }
  471. }
  472. void CancelWait() override {
  473. MaxWaitTime2 = 0;
  474. if (AtomicAdd(IsWaiting, 0) == 1) {
  475. s.SendSelfFakePacket();
  476. }
  477. }
  478. void GetPendingDataSize(TRequesterPendingDataStats* res) override {
  479. *res = PendingDataStats;
  480. #ifndef NDEBUG
  481. TRequesterPendingDataStats chk;
  482. for (TUdpOutXferHash::const_iterator i = SendQueue.begin(); i != SendQueue.end(); ++i) {
  483. TRopeDataPacket* pckt = i->second.Data.Get();
  484. if (pckt) {
  485. chk.OutDataSize += pckt->GetSize();
  486. ++chk.OutCount;
  487. }
  488. }
  489. for (TUdpInXferHash::const_iterator i = RecvQueue.begin(); i != RecvQueue.end(); ++i) {
  490. const TUdpInTransfer& tr = i->second;
  491. for (int p = 0; p < tr.GetPacketCount(); ++p) {
  492. if (tr.GetPacket(p)) {
  493. chk.InpDataSize += tr.PacketSize;
  494. }
  495. }
  496. ++chk.InpCount;
  497. }
  498. Y_ASSERT(memcmp(&chk, res, sizeof(chk)) == 0);
  499. #endif
  500. }
  501. TString GetDebugInfo() override;
  502. TString GetPeerLinkDebug(const TPeerLinkHash& ch);
  503. void Kill(const TUdpAddress& addr) override;
  504. TIntrusivePtr<IPeerQueueStats> GetQueueStats(const TUdpAddress& addr) override;
  505. };
  506. bool TUdpHost::Start(const TIntrusivePtr<NNetlibaSocket::ISocket>& socket) {
  507. if (s.IsValid()) {
  508. Y_ASSERT(0);
  509. return false;
  510. }
  511. s.Open(socket);
  512. if (!s.IsValid())
  513. return false;
  514. if (IBDetection)
  515. IB = CreateIBClientServer();
  516. NHPTimer::GetTime(&CurrentT);
  517. return true;
  518. }
  519. static bool HasAllPackets(const TUdpInTransfer& res) {
  520. if (!res.HasLastPacket)
  521. return false;
  522. for (int i = res.GetPacketCount() - 1; i >= 0; --i) {
  523. if (!res.GetPacket(i))
  524. return false;
  525. }
  526. return true;
  527. }
  528. // grouped acks, first int - packet_id, second int - bit mask for 32 packets preceding packet_id
  529. const int SIZEOF_ACK = 8;
  530. static int WriteAck(TUdpInTransfer* p, int* dst, int maxAcks) {
  531. int ackCount = 0;
  532. if (p->NewPacketsToAck.size() > 1)
  533. Sort(p->NewPacketsToAck.begin(), p->NewPacketsToAck.end());
  534. int lastAcked = 0;
  535. for (size_t idx = 0; idx < p->NewPacketsToAck.size(); ++idx) {
  536. int pkt = p->NewPacketsToAck[idx];
  537. if (idx == p->NewPacketsToAck.size() - 1 || pkt > lastAcked + 30) {
  538. *dst++ = pkt;
  539. int bitMask = 0;
  540. int backPackets = Min(pkt, 32);
  541. for (int k = 0; k < backPackets; ++k) {
  542. if (p->GetPacket(pkt - k - 1))
  543. bitMask |= 1 << k;
  544. }
  545. *dst++ = bitMask;
  546. if (++ackCount >= maxAcks)
  547. break;
  548. lastAcked = pkt;
  549. //printf("sending ack %d (mask %x)\n", pkt, bitMask);
  550. }
  551. }
  552. p->NewPacketsToAck.clear();
  553. return ackCount;
  554. }
  555. static void AckPacket(TUdpOutTransfer* p, int pkt, float deltaT, bool updateRTT) {
  556. if (pkt < 0 || pkt >= p->PacketCount) {
  557. Y_ASSERT(0);
  558. return;
  559. }
  560. p->AckTracker.Ack(pkt, deltaT, updateRTT);
  561. }
  562. static void ReadAcks(TUdpOutTransfer* p, const int* acks, int ackCount, float deltaT) {
  563. for (int i = 0; i < ackCount; ++i) {
  564. int pkt = *acks++;
  565. int bitMask = *acks++;
  566. bool updateRTT = i == ackCount - 1; // update RTT using only last packet in the pack
  567. AckPacket(p, pkt, deltaT, updateRTT);
  568. for (int k = 0; k < 32; ++k) {
  569. if (bitMask & (1 << k))
  570. AckPacket(p, pkt - k - 1, deltaT, false);
  571. }
  572. }
  573. }
  574. using namespace NNetlibaSocket::NNetliba;
  575. const ui64 KILL_PASSPHRASE1 = 0x98ff9cefb11d9a4cul;
  576. const ui64 KILL_PASSPHRASE2 = 0xf7754c29e0be95eaul;
  577. template <class T>
  578. inline T Read(char** data) {
  579. T res = ReadUnaligned<T>(*data);
  580. *data += sizeof(T);
  581. return res;
  582. }
  583. template <class T>
  584. inline void Write(char** data, T res) {
  585. WriteUnaligned<T>(*data, res);
  586. *data += sizeof(T);
  587. }
  588. static void RequireResend(const TNetSocket& s, const sockaddr_in6& toAddress, int transferId, int attempt) {
  589. char buf[100], *pktData = buf + UDP_LOW_LEVEL_HEADER_SIZE;
  590. Write(&pktData, transferId);
  591. Write(&pktData, (char)ACK_RESEND);
  592. Write(&pktData, attempt);
  593. s.SendTo(buf, (int)(pktData - buf), toAddress, FF_ALLOW_FRAG);
  594. }
  595. static void RequireResendNoShmem(const TNetSocket& s, const sockaddr_in6& toAddress, int transferId, int attempt) {
  596. char buf[100], *pktData = buf + UDP_LOW_LEVEL_HEADER_SIZE;
  597. Write(&pktData, transferId);
  598. Write(&pktData, (char)ACK_RESEND_NOSHMEM);
  599. Write(&pktData, attempt);
  600. s.SendTo(buf, (int)(pktData - buf), toAddress, FF_ALLOW_FRAG);
  601. }
  602. static void AckComplete(const TNetSocket& s, const sockaddr_in6& toAddress, int transferId, const TGUID& packetGuid, int packetId) {
  603. char buf[100], *pktData = buf + UDP_LOW_LEVEL_HEADER_SIZE;
  604. Write(&pktData, transferId);
  605. Write(&pktData, (char)ACK_COMPLETE);
  606. Write(&pktData, packetGuid);
  607. Write(&pktData, packetId); // we need packetId to update RTT
  608. s.SendTo(buf, (int)(pktData - buf), toAddress, FF_ALLOW_FRAG);
  609. }
  610. static void SendPing(TNetSocket& s, const sockaddr_in6& toAddress, int selfNetworkOrderPort) {
  611. char pktBuf[UDP_PACKET_SIZE_FULL];
  612. char* pktData = pktBuf + UDP_LOW_LEVEL_HEADER_SIZE;
  613. if (NSan::MSanIsOn()) {
  614. Zero(pktBuf);
  615. }
  616. Write(&pktData, (int)0);
  617. Write(&pktData, (char)PING);
  618. Write(&pktData, selfNetworkOrderPort);
  619. s.SendTo(pktBuf, UDP_PACKET_SIZE_FULL, toAddress, FF_DONT_FRAG);
  620. }
  621. // not MTU discovery, just figure out IB address of the peer
  622. static void SendFakePing(TNetSocket& s, const sockaddr_in6& toAddress, int selfNetworkOrderPort) {
  623. char buf[100];
  624. char* pktData = buf + UDP_LOW_LEVEL_HEADER_SIZE;
  625. Write(&pktData, (int)0);
  626. Write(&pktData, (char)PING);
  627. Write(&pktData, selfNetworkOrderPort);
  628. s.SendTo(buf, (int)(pktData - buf), toAddress, FF_ALLOW_FRAG);
  629. }
  630. void TUdpHost::SendData(TList<TTransferKey>* order, float deltaT1, bool needCheckAlive) {
  631. for (TList<TTransferKey>::iterator z = order->begin(); z != order->end();) {
  632. // pick connection to send
  633. const TTransferKey& transferKey = *z;
  634. TUdpOutXferHash::iterator i = SendQueue.find(transferKey);
  635. if (i == SendQueue.end()) {
  636. z = order->erase(z);
  637. continue;
  638. }
  639. ++z;
  640. // perform sending
  641. int transferId = transferKey.Id;
  642. TUdpOutTransfer& xfer = i->second;
  643. if (!xfer.AckTracker.IsInitialized()) {
  644. TIntrusivePtr<TCongestionControl> congestion = xfer.AckTracker.GetCongestionControl();
  645. Y_ASSERT(congestion.Get() != nullptr);
  646. if (!congestion->IsKnownMTU()) {
  647. TLameMTUDiscovery* md = congestion->GetMTUDiscovery();
  648. if (md->IsTimedOut()) {
  649. congestion->SetMTU(UDP_PACKET_SIZE_SMALL);
  650. } else {
  651. if (md->CanSend()) {
  652. SendPing(s, xfer.ToAddress, s.GetNetworkOrderPort());
  653. md->PingSent();
  654. }
  655. continue;
  656. }
  657. }
  658. // try to use large mtu, we could have selected small mtu due to connectivity problems
  659. if (congestion->GetMTU() == UDP_PACKET_SIZE_SMALL || IB.Get() != nullptr) {
  660. // recheck every ~50mb
  661. int chkDenom = (50000000 / xfer.Data->GetSize()) | 1;
  662. if ((NetAckRnd() % chkDenom) == 0) {
  663. //printf("send rechecking ping\n");
  664. if (congestion->GetMTU() == UDP_PACKET_SIZE_SMALL) {
  665. SendPing(s, xfer.ToAddress, s.GetNetworkOrderPort());
  666. } else {
  667. SendFakePing(s, xfer.ToAddress, s.GetNetworkOrderPort());
  668. }
  669. }
  670. }
  671. xfer.PacketSize = congestion->GetMTU();
  672. xfer.LastPacketSize = xfer.Data->GetSize() % xfer.PacketSize;
  673. xfer.PacketCount = xfer.Data->GetSize() / xfer.PacketSize + 1;
  674. xfer.AckTracker.SetPacketCount(xfer.PacketCount);
  675. }
  676. xfer.AckTracker.Step(deltaT1);
  677. MaxWaitTime = Min(MaxWaitTime, xfer.AckTracker.GetTimeToNextPacketTimeout());
  678. if (needCheckAlive && !xfer.AckTracker.IsAlive()) {
  679. FailedSend(transferId);
  680. SendQueue.erase(i);
  681. continue;
  682. }
  683. bool sendBufferOverflow = false;
  684. while (xfer.AckTracker.CanSend()) {
  685. NHPTimer::STime tCopy = CurrentT;
  686. float deltaT2 = (float)NHPTimer::GetTimePassed(&tCopy);
  687. deltaT2 = ClampVal(deltaT2, 0.0f, UDP_TRANSFER_TIMEOUT / 3);
  688. int pkt = xfer.AckTracker.GetPacketToSend(deltaT2);
  689. if (pkt == -1) {
  690. break;
  691. }
  692. int dataSize = xfer.PacketSize;
  693. if (pkt == xfer.PacketCount - 1)
  694. dataSize = xfer.LastPacketSize;
  695. char* pktData = PktBuf + UDP_LOW_LEVEL_HEADER_SIZE;
  696. Write(&pktData, transferId);
  697. char pktType = xfer.PacketSize == UDP_PACKET_SIZE ? DATA : DATA_SMALL;
  698. TSharedMemory* shm = xfer.Data->GetSharedData();
  699. if (shm) {
  700. if (pktType == DATA)
  701. pktType = DATA_SHMEM;
  702. else
  703. pktType = DATA_SMALL_SHMEM;
  704. }
  705. Write(&pktData, pktType);
  706. Write(&pktData, xfer.Attempt);
  707. Write(&pktData, pkt);
  708. if (pkt == 0) {
  709. Write(&pktData, xfer.PacketGuid);
  710. Write(&pktData, xfer.Crc32);
  711. if (shm) {
  712. Write(&pktData, shm->GetId());
  713. Write(&pktData, shm->GetSize());
  714. }
  715. }
  716. TBlockChainIterator dataReader(xfer.Data->GetChain());
  717. dataReader.Seek(pkt * xfer.PacketSize);
  718. dataReader.Read(pktData, dataSize);
  719. pktData += dataSize;
  720. int sendSize = (int)(pktData - PktBuf);
  721. TNetSocket::ESendError sendErr = s.SendTo(PktBuf, sendSize, xfer.ToAddress, FF_ALLOW_FRAG);
  722. if (sendErr != TNetSocket::SEND_OK) {
  723. if (sendErr == TNetSocket::SEND_NO_ROUTE_TO_HOST) {
  724. FailedSend(transferId);
  725. SendQueue.erase(i);
  726. break;
  727. } else {
  728. // most probably out of send buffer space (or something terrible has happened)
  729. xfer.AckTracker.AddToResend(pkt);
  730. sendBufferOverflow = true;
  731. MaxWaitTime = 0;
  732. //printf("failed send\n");
  733. break;
  734. }
  735. }
  736. }
  737. if (sendBufferOverflow)
  738. break;
  739. }
  740. }
  741. void TUdpHost::RecvCycle() {
  742. for (;;) {
  743. sockaddr_in6 fromAddress;
  744. int rv = RecvBuf.GetBufSize();
  745. bool recvOk = s.RecvFrom(RecvBuf.GetDataPtr(), &rv, &fromAddress);
  746. if (!recvOk)
  747. break;
  748. NHPTimer::STime tCopy = CurrentT;
  749. float deltaT = (float)NHPTimer::GetTimePassed(&tCopy);
  750. deltaT = ClampVal(deltaT, 0.0f, UDP_TRANSFER_TIMEOUT / 3);
  751. //int fromIP = fromAddress.sin_addr.s_addr;
  752. TTransferKey k;
  753. char* pktData = RecvBuf.GetDataPtr() + UDP_LOW_LEVEL_HEADER_SIZE;
  754. GetUdpAddress(&k.Address, fromAddress);
  755. k.Id = Read<int>(&pktData);
  756. int transferId = k.Id;
  757. int cmd = Read<char>(&pktData);
  758. Y_ASSERT(cmd == (int)*(RecvBuf.GetDataPtr() + CMD_POS));
  759. switch (cmd) {
  760. case DATA:
  761. case DATA_SMALL:
  762. case DATA_SHMEM:
  763. case DATA_SMALL_SHMEM: {
  764. int attempt = Read<int>(&pktData);
  765. int packetId = Read<int>(&pktData);
  766. //printf("data packet %d (trans ID = %d)\n", packetId, transferId);
  767. TUdpCompleteInXferHash::iterator itCompl = RecvCompleted.find(k);
  768. if (itCompl != RecvCompleted.end()) {
  769. Y_ASSERT(RecvQueue.find(k) == RecvQueue.end());
  770. const TUdpCompleteInTransfer& complete = itCompl->second;
  771. bool sendAckComplete = true;
  772. if (packetId == 0) {
  773. // check packet GUID
  774. char* tmpPktData = pktData;
  775. TGUID packetGuid;
  776. packetGuid = Read<TGUID>(&tmpPktData);
  777. if (packetGuid != complete.PacketGuid) {
  778. // we are receiving new data with the same transferId
  779. // in this case we have to flush all the information about previous transfer
  780. // and start over
  781. //printf("same transferId for a different packet\n");
  782. RecvCompleted.erase(itCompl);
  783. sendAckComplete = false;
  784. }
  785. }
  786. if (sendAckComplete) {
  787. AckComplete(s, fromAddress, transferId, complete.PacketGuid, packetId);
  788. break;
  789. }
  790. }
  791. TUdpInXferHash::iterator rq = RecvQueue.find(k);
  792. if (rq == RecvQueue.end()) {
  793. //printf("new input transfer\n");
  794. TUdpInTransfer& res = RecvQueue[k];
  795. res.ToAddress = fromAddress;
  796. res.Attempt = attempt;
  797. res.Congestion = GetPeerLink(k.Address).UdpCongestion.Get();
  798. res.PacketSize = 0;
  799. res.HasLastPacket = false;
  800. res.AttachStats(&PendingDataStats);
  801. rq = RecvQueue.find(k);
  802. Y_ASSERT(rq != RecvQueue.end());
  803. }
  804. TUdpInTransfer& res = rq->second;
  805. res.Congestion->MarkAlive();
  806. res.TimeSinceLastRecv = 0;
  807. if (packetId == 0) {
  808. TGUID packetGuid;
  809. packetGuid = Read<TGUID>(&pktData);
  810. int crc32 = Read<int>(&pktData);
  811. res.Crc32 = crc32;
  812. res.PacketGuid = packetGuid;
  813. if (cmd == DATA_SHMEM || cmd == DATA_SMALL_SHMEM) {
  814. // link to attached shared memory
  815. TGUID shmemId = Read<TGUID>(&pktData);
  816. int shmemSize = Read<int>(&pktData);
  817. if (res.SharedData.Get() == nullptr) {
  818. res.SharedData = new TSharedMemory;
  819. if (!res.SharedData->Open(shmemId, shmemSize)) {
  820. res.SharedData = nullptr;
  821. RequireResendNoShmem(s, res.ToAddress, transferId, res.Attempt);
  822. break;
  823. }
  824. }
  825. }
  826. }
  827. if (attempt != res.Attempt) {
  828. RequireResend(s, res.ToAddress, transferId, res.Attempt);
  829. break;
  830. } else {
  831. if (res.PacketSize == 0) {
  832. res.PacketSize = (cmd == DATA || cmd == DATA_SHMEM ? UDP_PACKET_SIZE : UDP_PACKET_SIZE_SMALL);
  833. } else {
  834. // check that all data is of same size
  835. Y_ASSERT(cmd == DATA || cmd == DATA_SMALL);
  836. Y_ASSERT(res.PacketSize == (cmd == DATA ? UDP_PACKET_SIZE : UDP_PACKET_SIZE_SMALL));
  837. }
  838. int dataSize = (int)(RecvBuf.GetDataPtr() + rv - pktData);
  839. Y_ASSERT(dataSize <= res.PacketSize);
  840. if (dataSize > res.PacketSize)
  841. break; // mem overrun protection
  842. if (packetId >= res.GetPacketCount())
  843. res.SetPacketCount(packetId + 1);
  844. {
  845. TUdpRecvPacket* pkt = nullptr;
  846. if (res.PacketSize == UDP_PACKET_SIZE_SMALL) {
  847. // save memory by using smaller buffer at the cost of additional memcpy
  848. pkt = RecvBuf.CreateNewSmallPacket(dataSize);
  849. memcpy(pkt->Data, pktData, dataSize);
  850. pkt->DataStart = 0;
  851. pkt->DataSize = dataSize;
  852. } else {
  853. int dataStart = (int)(pktData - RecvBuf.GetDataPtr()); // data offset in the packet
  854. pkt = RecvBuf.ExtractPacket();
  855. pkt->DataStart = dataStart;
  856. pkt->DataSize = dataSize;
  857. }
  858. // calc packet sum, will be used to calc whole message crc
  859. pkt->BlockSum = TIncrementalChecksumCalcer::CalcBlockSum(pkt->Data + pkt->DataStart, pkt->DataSize);
  860. res.AssignPacket(packetId, pkt);
  861. }
  862. if (dataSize != res.PacketSize) {
  863. res.LastPacketSize = dataSize;
  864. res.HasLastPacket = true;
  865. }
  866. if (HasAllPackets(res)) {
  867. //printf("received\n");
  868. TRequest* out = new TRequest;
  869. out->Address = k.Address;
  870. out->Guid = res.PacketGuid;
  871. TIncrementalChecksumCalcer incCS;
  872. int packetCount = res.GetPacketCount();
  873. out->Data.Reset(new TRopeDataPacket);
  874. for (int i = 0; i < packetCount; ++i) {
  875. TUdpRecvPacket* pkt = res.ExtractPacket(i);
  876. Y_ASSERT(pkt->DataSize == ((i == packetCount - 1) ? res.LastPacketSize : res.PacketSize));
  877. out->Data->AddBlock((char*)pkt, pkt->Data + pkt->DataStart, pkt->DataSize);
  878. incCS.AddBlockSum(pkt->BlockSum, pkt->DataSize);
  879. }
  880. out->Data->AttachSharedData(res.SharedData);
  881. res.EraseAllPackets();
  882. int crc32 = incCS.CalcChecksum(); // CalcChecksum(out->Data->GetChain());
  883. #ifdef SIMULATE_NETWORK_FAILURES
  884. bool crcOk = crc32 == res.Crc32 ? (RandomNumber<size_t>() % 10) != 0 : false;
  885. #else
  886. bool crcOk = crc32 == res.Crc32;
  887. #endif
  888. if (crcOk) {
  889. ReceivedList.push_back(out);
  890. Y_ASSERT(RecvCompleted.find(k) == RecvCompleted.end());
  891. TUdpCompleteInTransfer& complete = RecvCompleted[k];
  892. RecvCompletedQueue.push_back(k);
  893. complete.PacketGuid = res.PacketGuid;
  894. AckComplete(s, res.ToAddress, transferId, complete.PacketGuid, packetId);
  895. RecvQueue.erase(rq);
  896. } else {
  897. //printf("crc failed, require resend\n");
  898. delete out;
  899. ++res.Attempt;
  900. res.NewPacketsToAck.clear();
  901. RequireResend(s, res.ToAddress, transferId, res.Attempt);
  902. }
  903. } else {
  904. res.NewPacketsToAck.push_back(packetId);
  905. }
  906. }
  907. } break;
  908. case ACK: {
  909. TUdpOutXferHash::iterator i = SendQueue.find(k);
  910. if (i == SendQueue.end())
  911. break;
  912. TUdpOutTransfer& xfer = i->second;
  913. if (!xfer.AckTracker.IsInitialized())
  914. break;
  915. xfer.AckTracker.MarkAlive();
  916. int attempt = Read<int>(&pktData);
  917. Y_ASSERT(attempt <= xfer.Attempt);
  918. if (attempt != xfer.Attempt)
  919. break;
  920. ReadAcks(&xfer, (int*)pktData, (int)(RecvBuf.GetDataPtr() + rv - pktData) / SIZEOF_ACK, deltaT);
  921. break;
  922. }
  923. case ACK_COMPLETE: {
  924. TUdpOutXferHash::iterator i = SendQueue.find(k);
  925. if (i == SendQueue.end())
  926. break;
  927. TUdpOutTransfer& xfer = i->second;
  928. xfer.AckTracker.MarkAlive();
  929. TGUID packetGuid;
  930. packetGuid = Read<TGUID>(&pktData);
  931. int packetId = Read<int>(&pktData);
  932. if (packetGuid == xfer.PacketGuid) {
  933. xfer.AckTracker.Ack(packetId, deltaT, true); // update RTT
  934. xfer.AckTracker.AckAll(); // acking packets is required, otherwise they will be treated as lost (look AckTracker destructor)
  935. SucceededSend(transferId);
  936. SendQueue.erase(i);
  937. } else {
  938. // peer asserts that he has received this packet but packetGuid is wrong
  939. // try to resend everything
  940. // ++xfer.Attempt; // should not do this, only sender can modify attempt number, otherwise cycle is possible with out of order packets
  941. xfer.AckTracker.Resend();
  942. }
  943. break;
  944. } break;
  945. case ACK_RESEND: {
  946. TUdpOutXferHash::iterator i = SendQueue.find(k);
  947. if (i == SendQueue.end())
  948. break;
  949. TUdpOutTransfer& xfer = i->second;
  950. xfer.AckTracker.MarkAlive();
  951. int attempt = Read<int>(&pktData);
  952. if (xfer.Attempt != attempt) {
  953. // reset current tranfser & initialize new one
  954. xfer.Attempt = attempt;
  955. xfer.AckTracker.Resend();
  956. }
  957. break;
  958. }
  959. case ACK_RESEND_NOSHMEM: {
  960. // abort execution here
  961. // failed to open shmem on recv side, need to transmit data without using shmem
  962. Y_ABORT_UNLESS(0, "not implemented yet");
  963. break;
  964. }
  965. case PING: {
  966. sockaddr_in6 trueFromAddress = fromAddress;
  967. int port = Read<int>(&pktData);
  968. Y_ASSERT(trueFromAddress.sin6_family == AF_INET6);
  969. trueFromAddress.sin6_port = port;
  970. // can not set MTU for fromAddress here since asymmetrical mtu is possible
  971. char* pktData2 = PktBuf + UDP_LOW_LEVEL_HEADER_SIZE;
  972. Write(&pktData2, (int)0);
  973. Write(&pktData2, (char)PONG);
  974. if (IB.Get()) {
  975. const TIBConnectInfo& ibConnectInfo = IB->GetConnectInfo();
  976. Write(&pktData2, ibConnectInfo);
  977. Write(&pktData2, trueFromAddress);
  978. }
  979. s.SendTo(PktBuf, pktData2 - PktBuf, trueFromAddress, FF_ALLOW_FRAG);
  980. break;
  981. }
  982. case PONG: {
  983. TPeerLink& peerInfo = GetPeerLink(k.Address);
  984. peerInfo.UdpCongestion->SetMTU(UDP_PACKET_SIZE);
  985. int dataSize = (int)(RecvBuf.GetDataPtr() + rv - pktData);
  986. if (dataSize == sizeof(TIBConnectInfo) + sizeof(sockaddr_in6)) {
  987. if (IB.Get() != nullptr && peerInfo.IBPeer.Get() == nullptr) {
  988. TIBConnectInfo info = Read<TIBConnectInfo>(&pktData);
  989. sockaddr_in6 myAddress = Read<sockaddr_in6>(&pktData);
  990. TUdpAddress myUdpAddress;
  991. GetUdpAddress(&myUdpAddress, myAddress);
  992. peerInfo.IBPeer = IB->ConnectPeer(info, k.Address, myUdpAddress);
  993. }
  994. }
  995. break;
  996. }
  997. case KILL: {
  998. ui64 p1 = Read<ui64>(&pktData);
  999. ui64 p2 = Read<ui64>(&pktData);
  1000. int restSize = (int)(RecvBuf.GetDataPtr() + rv - pktData);
  1001. if (restSize == 0 && p1 == KILL_PASSPHRASE1 && p2 == KILL_PASSPHRASE2) {
  1002. abort();
  1003. }
  1004. break;
  1005. }
  1006. default:
  1007. Y_ASSERT(0);
  1008. break;
  1009. }
  1010. }
  1011. }
  1012. void TUdpHost::IBStep() {
  1013. if (IB.Get()) {
  1014. NHPTimer::STime tChk = CurrentT;
  1015. float chkDeltaT = (float)NHPTimer::GetTimePassed(&tChk);
  1016. if (IB->Step(tChk)) {
  1017. IBIdleTime = -chkDeltaT;
  1018. }
  1019. }
  1020. }
  1021. void TUdpHost::Step() {
  1022. if (IB.Get()) {
  1023. NHPTimer::STime tChk = CurrentT;
  1024. float chkDeltaT = (float)NHPTimer::GetTimePassed(&tChk);
  1025. if (IB->Step(tChk)) {
  1026. IBIdleTime = -chkDeltaT;
  1027. }
  1028. if (chkDeltaT < 0.0005) {
  1029. return;
  1030. }
  1031. }
  1032. if (UseTOSforAcks) {
  1033. s.SetTOS(0x20);
  1034. } else {
  1035. s.SetTOS(0);
  1036. }
  1037. RecvCycle();
  1038. float deltaT = (float)NHPTimer::GetTimePassed(&CurrentT);
  1039. deltaT = ClampVal(deltaT, 0.0f, UDP_TRANSFER_TIMEOUT / 3);
  1040. MaxWaitTime = DEFAULT_MAX_WAIT_TIME;
  1041. IBIdleTime += deltaT;
  1042. bool needCheckAlive = false;
  1043. // update alive ports
  1044. const float INACTIVE_CONGESTION_UPDATE_INTERVAL = 1;
  1045. TimeSinceCongestionHistoryUpdate += deltaT;
  1046. if (TimeSinceCongestionHistoryUpdate > INACTIVE_CONGESTION_UPDATE_INTERVAL) {
  1047. for (TPeerLinkHash::iterator i = CongestionTrackHistory.begin(); i != CongestionTrackHistory.end();) {
  1048. TPeerLink& pl = i->second;
  1049. if (!pl.UpdateSleep(TimeSinceCongestionHistoryUpdate)) {
  1050. TPeerLinkHash::iterator k = i++;
  1051. CongestionTrackHistory.erase(k);
  1052. needCheckAlive = true;
  1053. } else {
  1054. ++i;
  1055. }
  1056. }
  1057. TimeSinceCongestionHistoryUpdate = 0;
  1058. }
  1059. for (TPeerLinkHash::iterator i = CongestionTrack.begin(); i != CongestionTrack.end();) {
  1060. const TUdpAddress& addr = i->first;
  1061. TPeerLink& pl = i->second;
  1062. if (pl.UdpCongestion->GetTransferCount() == 0) {
  1063. pl.StartSleep(addr, &MaxWaitTime);
  1064. CongestionTrackHistory[i->first] = i->second;
  1065. TPeerLinkHash::iterator k = i++;
  1066. CongestionTrack.erase(k);
  1067. } else if (!pl.Update(deltaT, addr, &MaxWaitTime)) {
  1068. TPeerLinkHash::iterator k = i++;
  1069. CongestionTrack.erase(k);
  1070. needCheckAlive = true;
  1071. } else {
  1072. ++i;
  1073. }
  1074. }
  1075. // send acks on received data
  1076. for (TUdpInXferHash::iterator i = RecvQueue.begin(); i != RecvQueue.end();) {
  1077. const TTransferKey& transKey = i->first;
  1078. int transferId = transKey.Id;
  1079. TUdpInTransfer& xfer = i->second;
  1080. xfer.TimeSinceLastRecv += deltaT;
  1081. if (xfer.TimeSinceLastRecv > UDP_MAX_INPUT_DATA_WAIT || (needCheckAlive && !xfer.Congestion->IsAlive())) {
  1082. TUdpInXferHash::iterator k = i++;
  1083. RecvQueue.erase(k);
  1084. continue;
  1085. }
  1086. Y_ASSERT(RecvCompleted.find(i->first) == RecvCompleted.end()); // state "Complete & incomplete" is incorrect
  1087. if (!xfer.NewPacketsToAck.empty()) {
  1088. char* pktData = PktBuf + UDP_LOW_LEVEL_HEADER_SIZE;
  1089. Write(&pktData, transferId);
  1090. Write(&pktData, (char)ACK);
  1091. Write(&pktData, xfer.Attempt);
  1092. int acks = WriteAck(&xfer, (int*)pktData, (int)(xfer.PacketSize - (pktData - PktBuf)) / SIZEOF_ACK);
  1093. pktData += acks * SIZEOF_ACK;
  1094. s.SendTo(PktBuf, (int)(pktData - PktBuf), xfer.ToAddress, FF_ALLOW_FRAG);
  1095. }
  1096. ++i;
  1097. }
  1098. if (UseTOSforAcks) {
  1099. s.SetTOS(0x60);
  1100. }
  1101. // send data for outbound connections
  1102. SendData(&SendOrderHighPrior, deltaT, needCheckAlive);
  1103. SendData(&SendOrder, deltaT, needCheckAlive);
  1104. SendData(&SendOrderLow, deltaT, needCheckAlive);
  1105. // roll send order to avoid exotic problems with lots of peers and high traffic
  1106. SendOrderHighPrior.splice(SendOrderHighPrior.end(), SendOrderHighPrior, SendOrderHighPrior.begin());
  1107. //SendOrder.splice(SendOrder.end(), SendOrder, SendOrder.begin()); // sending data in order has lower delay and shorter queue
  1108. // clean completed queue
  1109. TimeSinceCompletedQueueClean += deltaT;
  1110. if (TimeSinceCompletedQueueClean > UDP_TRANSFER_TIMEOUT * 1.5) {
  1111. for (size_t i = 0; i < KeepCompletedQueue.size(); ++i) {
  1112. TUdpCompleteInXferHash::iterator k = RecvCompleted.find(KeepCompletedQueue[i]);
  1113. if (k != RecvCompleted.end())
  1114. RecvCompleted.erase(k);
  1115. }
  1116. KeepCompletedQueue.clear();
  1117. KeepCompletedQueue.swap(RecvCompletedQueue);
  1118. TimeSinceCompletedQueueClean = 0;
  1119. }
  1120. }
  1121. TString TUdpHost::GetPeerLinkDebug(const TPeerLinkHash& ch) {
  1122. TString res;
  1123. char buf[1000];
  1124. for (const auto& i : ch) {
  1125. const TUdpAddress& ip = i.first;
  1126. const TCongestionControl& cc = *i.second.UdpCongestion;
  1127. IIBPeer* ibPeer = i.second.IBPeer.Get();
  1128. snprintf(buf, sizeof(buf), "%s\tIB: %d, RTT: %g Timeout: %g Window: %g MaxWin: %g FailRate: %g TimeSinceLastRecv: %g Transfers: %d MTU: %d\n",
  1129. GetAddressAsString(ip).c_str(),
  1130. ibPeer ? ibPeer->GetState() : -1,
  1131. cc.GetRTT() * 1000, cc.GetTimeout() * 1000, cc.GetWindow(), cc.GetMaxWindow(), cc.GetFailRate(),
  1132. cc.GetTimeSinceLastRecv() * 1000, cc.GetTransferCount(), cc.GetMTU());
  1133. res += buf;
  1134. }
  1135. return res;
  1136. }
  1137. TString TUdpHost::GetDebugInfo() {
  1138. TString res;
  1139. char buf[1000];
  1140. snprintf(buf, sizeof(buf), "Receiving %d msgs, sending %d high prior, %d regular msgs, %d low prior msgs\n",
  1141. RecvQueue.ysize(), (int)SendOrderHighPrior.size(), (int)SendOrder.size(), (int)SendOrderLow.size());
  1142. res += buf;
  1143. TRequesterPendingDataStats pds;
  1144. GetPendingDataSize(&pds);
  1145. snprintf(buf, sizeof(buf), "Pending data size: %" PRIu64 "\n", pds.InpDataSize + pds.OutDataSize);
  1146. res += buf;
  1147. snprintf(buf, sizeof(buf), " in packets: %d, size %" PRIu64 "\n", pds.InpCount, pds.InpDataSize);
  1148. res += buf;
  1149. snprintf(buf, sizeof(buf), " out packets: %d, size %" PRIu64 "\n", pds.OutCount, pds.OutDataSize);
  1150. res += buf;
  1151. res += "\nCongestion info:\n";
  1152. res += GetPeerLinkDebug(CongestionTrack);
  1153. res += "\nCongestion info history:\n";
  1154. res += GetPeerLinkDebug(CongestionTrackHistory);
  1155. return res;
  1156. }
  1157. static void SendKill(const TNetSocket& s, const sockaddr_in6& toAddress) {
  1158. char buf[100];
  1159. char* pktData = buf + UDP_LOW_LEVEL_HEADER_SIZE;
  1160. Write(&pktData, (int)0);
  1161. Write(&pktData, (char)KILL);
  1162. Write(&pktData, KILL_PASSPHRASE1);
  1163. Write(&pktData, KILL_PASSPHRASE2);
  1164. s.SendTo(buf, (int)(pktData - buf), toAddress, FF_ALLOW_FRAG);
  1165. }
  1166. void TUdpHost::Kill(const TUdpAddress& addr) {
  1167. sockaddr_in6 target;
  1168. GetWinsockAddr(&target, addr);
  1169. SendKill(s, target);
  1170. }
  1171. TIntrusivePtr<IPeerQueueStats> TUdpHost::GetQueueStats(const TUdpAddress& addr) {
  1172. TQueueStatsHash::iterator zq = PeerQueueStats.find(addr);
  1173. if (zq != PeerQueueStats.end()) {
  1174. return zq->second.Get();
  1175. }
  1176. TPeerQueueStats* res = new TPeerQueueStats;
  1177. PeerQueueStats[addr] = res;
  1178. // attach to existing congestion tracker
  1179. TPeerLinkHash::iterator z;
  1180. z = CongestionTrack.find(addr);
  1181. if (z != CongestionTrack.end()) {
  1182. z->second.UdpCongestion->AttachQueueStats(res);
  1183. }
  1184. z = CongestionTrackHistory.find(addr);
  1185. if (z != CongestionTrackHistory.end()) {
  1186. z->second.UdpCongestion->AttachQueueStats(res);
  1187. }
  1188. return res;
  1189. }
  1190. //////////////////////////////////////////////////////////////////////////
  1191. TIntrusivePtr<IUdpHost> CreateUdpHost(int port) {
  1192. TIntrusivePtr<NNetlibaSocket::ISocket> socket = NNetlibaSocket::CreateBestRecvSocket();
  1193. socket->Open(port);
  1194. if (!socket->IsValid())
  1195. return nullptr;
  1196. return CreateUdpHost(socket);
  1197. }
  1198. TIntrusivePtr<IUdpHost> CreateUdpHost(const TIntrusivePtr<NNetlibaSocket::ISocket>& socket) {
  1199. if (!InitLocalIPList()) {
  1200. Y_ASSERT(0 && "Can not determine self IP address");
  1201. return nullptr;
  1202. }
  1203. TIntrusivePtr<TUdpHost> res = new TUdpHost;
  1204. if (!res->Start(socket))
  1205. return nullptr;
  1206. return res.Get();
  1207. }
  1208. void SetUdpMaxBandwidthPerIP(float f) {
  1209. f = Max(0.0f, f);
  1210. TCongestionControl::MaxPacketRate = f / UDP_PACKET_SIZE;
  1211. }
  1212. void SetUdpSlowStart(bool enable) {
  1213. TCongestionControl::StartWindowSize = enable ? 0.5f : 3;
  1214. }
  1215. void DisableIBDetection() {
  1216. IBDetection = false;
  1217. }
  1218. }