udp_client_server.cpp 54 KB


  1. #include "stdafx.h"
  2. #include "udp_client_server.h"
  3. #include "net_acks.h"
  4. #include <util/generic/guid.h>
  5. #include <util/system/hp_timer.h>
  6. #include <util/datetime/cputimer.h>
  7. #include <util/system/yield.h>
  8. #include <util/system/unaligned_mem.h>
  9. #include "block_chain.h"
  10. #include <util/system/shmat.h>
  11. #include "udp_debug.h"
  12. #include "udp_socket.h"
  13. #include "ib_cs.h"
  14. #include <library/cpp/netliba/socket/socket.h>
  15. #include <util/random/random.h>
  16. #include <util/system/sanitizers.h>
  17. #include <atomic>
  18. namespace NNetliba {
  19. // rely on UDP checksum in packets, check crc only for complete packets
  20. // UPDATE: looks like UDP checksum is not enough, network errors do happen, we saw 600+ retransmits of a ~1MB data packet
  21. const float UDP_TRANSFER_TIMEOUT = 90.0f;
  22. const float DEFAULT_MAX_WAIT_TIME = 1;
  23. const float UDP_KEEP_PEER_INFO = 600;
  24. // траффик может идти, а новых данных для конкретного пакета может не добавляться.
  25. // это возможно когда мы прерываем процесс в момент передачи и перезапускаем его на том же порту,
  26. // тогда на приемнике повиснет пакет. Этот пакет мы зашибем по этому таймауту
  27. const float UDP_MAX_INPUT_DATA_WAIT = UDP_TRANSFER_TIMEOUT * 2;
  28. enum {
  29. UDP_PACKET_SIZE_FULL = 8900, // used for ping to detect jumbo-frame support
  30. UDP_PACKET_SIZE = 8800, // max data in packet
  31. UDP_PACKET_SIZE_SMALL = 1350, // 1180 would be better taking into account that 1280 is guaranteed ipv6 minimum MTU
  32. UDP_PACKET_BUF_SIZE = UDP_PACKET_SIZE + 100,
  33. };
  34. //////////////////////////////////////////////////////////////////////////
  35. struct TUdpCompleteInTransfer {
  36. TGUID PacketGuid;
  37. };
  38. //////////////////////////////////////////////////////////////////////////
  39. struct TUdpRecvPacket {
  40. int DataStart, DataSize;
  41. ui32 BlockSum;
  42. // Data[] should be last member in struct, this fact is used to create truncated TUdpRecvPacket in CreateNewSmallPacket()
  43. char Data[UDP_PACKET_BUF_SIZE];
  44. };
  45. struct TUdpInTransfer {
  46. private:
  47. TVector<TUdpRecvPacket*> Packets;
  48. public:
  49. sockaddr_in6 ToAddress;
  50. int PacketSize, LastPacketSize;
  51. bool HasLastPacket;
  52. TVector<int> NewPacketsToAck;
  53. TCongestionControlPtr Congestion;
  54. float TimeSinceLastRecv;
  55. int Attempt;
  56. TGUID PacketGuid;
  57. int Crc32;
  58. TIntrusivePtr<TSharedMemory> SharedData;
  59. TRequesterPendingDataStats* Stats;
  60. TUdpInTransfer()
  61. : PacketSize(0)
  62. , LastPacketSize(0)
  63. , HasLastPacket(false)
  64. , TimeSinceLastRecv(0)
  65. , Attempt(0)
  66. , Crc32(0)
  67. , Stats(nullptr)
  68. {
  69. Zero(ToAddress);
  70. }
  71. ~TUdpInTransfer() {
  72. if (Stats) {
  73. Stats->InpCount -= 1;
  74. }
  75. EraseAllPackets();
  76. }
  77. void EraseAllPackets() {
  78. for (int i = 0; i < Packets.ysize(); ++i) {
  79. ErasePacket(i);
  80. }
  81. Packets.clear();
  82. HasLastPacket = false;
  83. }
  84. void AttachStats(TRequesterPendingDataStats* stats) {
  85. Stats = stats;
  86. Stats->InpCount += 1;
  87. Y_ASSERT(Packets.empty());
  88. }
  89. void ErasePacket(int id) {
  90. TUdpRecvPacket* pkt = Packets[id];
  91. if (pkt) {
  92. if (Stats) {
  93. Stats->InpDataSize -= PacketSize;
  94. }
  95. TRopeDataPacket::FreeBuf((char*)pkt);
  96. Packets[id] = nullptr;
  97. }
  98. }
  99. void AssignPacket(int id, TUdpRecvPacket* pkt) {
  100. ErasePacket(id);
  101. if (pkt && Stats) {
  102. Stats->InpDataSize += PacketSize;
  103. }
  104. Packets[id] = pkt;
  105. }
  106. int GetPacketCount() const {
  107. return Packets.ysize();
  108. }
  109. void SetPacketCount(int n) {
  110. Packets.resize(n, nullptr);
  111. }
  112. const TUdpRecvPacket* GetPacket(int id) const {
  113. return Packets[id];
  114. }
  115. TUdpRecvPacket* ExtractPacket(int id) {
  116. TUdpRecvPacket* res = Packets[id];
  117. if (res) {
  118. if (Stats) {
  119. Stats->InpDataSize -= PacketSize;
  120. }
  121. Packets[id] = nullptr;
  122. }
  123. return res;
  124. }
  125. };
  126. struct TUdpOutTransfer {
  127. sockaddr_in6 ToAddress;
  128. TAutoPtr<TRopeDataPacket> Data;
  129. int PacketCount;
  130. int PacketSize, LastPacketSize;
  131. TAckTracker AckTracker;
  132. int Attempt;
  133. TGUID PacketGuid;
  134. int Crc32;
  135. EPacketPriority PacketPriority;
  136. TRequesterPendingDataStats* Stats;
  137. TUdpOutTransfer()
  138. : PacketCount(0)
  139. , PacketSize(0)
  140. , LastPacketSize(0)
  141. , Attempt(0)
  142. , Crc32(0)
  143. , PacketPriority(PP_LOW)
  144. , Stats(nullptr)
  145. {
  146. Zero(ToAddress);
  147. }
  148. ~TUdpOutTransfer() {
  149. if (Stats) {
  150. Stats->OutCount -= 1;
  151. Stats->OutDataSize -= Data->GetSize();
  152. }
  153. }
  154. void AttachStats(TRequesterPendingDataStats* stats) {
  155. Stats = stats;
  156. Stats->OutCount += 1;
  157. Stats->OutDataSize += Data->GetSize();
  158. }
  159. };
  160. struct TTransferKey {
  161. TUdpAddress Address;
  162. int Id;
  163. };
  164. inline bool operator==(const TTransferKey& a, const TTransferKey& b) {
  165. return a.Address == b.Address && a.Id == b.Id;
  166. }
  167. struct TTransferKeyHash {
  168. int operator()(const TTransferKey& k) const {
  169. return (ui32)k.Address.Interface + (ui32)k.Address.Port * (ui32)389461 + (ui32)k.Id;
  170. }
  171. };
  172. struct TUdpAddressHash {
  173. int operator()(const TUdpAddress& addr) const {
  174. return (ui32)addr.Interface + (ui32)addr.Port * (ui32)389461;
  175. }
  176. };
  177. class TUdpHostRevBufAlloc: public TNonCopyable {
  178. TUdpRecvPacket* RecvPktBuf;
  179. void AllocNewBuf() {
  180. RecvPktBuf = (TUdpRecvPacket*)TRopeDataPacket::AllocBuf(sizeof(TUdpRecvPacket));
  181. }
  182. public:
  183. TUdpHostRevBufAlloc() {
  184. AllocNewBuf();
  185. }
  186. ~TUdpHostRevBufAlloc() {
  187. FreeBuf(RecvPktBuf);
  188. }
  189. void FreeBuf(TUdpRecvPacket* pkt) {
  190. TRopeDataPacket::FreeBuf((char*)pkt);
  191. }
  192. TUdpRecvPacket* ExtractPacket() {
  193. TUdpRecvPacket* res = RecvPktBuf;
  194. AllocNewBuf();
  195. return res;
  196. }
  197. TUdpRecvPacket* CreateNewSmallPacket(int sz) {
  198. int pktStructSz = sizeof(TUdpRecvPacket) - Y_ARRAY_SIZE(RecvPktBuf->Data) + sz;
  199. TUdpRecvPacket* pkt = (TUdpRecvPacket*)TRopeDataPacket::AllocBuf(pktStructSz);
  200. return pkt;
  201. }
  202. int GetBufSize() const {
  203. return Y_ARRAY_SIZE(RecvPktBuf->Data);
  204. }
  205. char* GetDataPtr() const {
  206. return RecvPktBuf->Data;
  207. }
  208. };
  209. static TAtomic transferIdCounter = (long)(GetCycleCount() & 0x1fffffff);
  210. inline int GetTransferId() {
  211. int res = AtomicAdd(transferIdCounter, 1);
  212. while (res < 0) {
  213. // negative transfer ids are treated as errors, so wrap transfer id
  214. AtomicCas(&transferIdCounter, 0, transferIdCounter);
  215. res = AtomicAdd(transferIdCounter, 1);
  216. }
  217. return res;
  218. }
  219. static bool IBDetection = true;
  220. class TUdpHost: public IUdpHost {
  221. struct TPeerLink {
  222. TIntrusivePtr<TCongestionControl> UdpCongestion;
  223. TIntrusivePtr<IIBPeer> IBPeer;
  224. double TimeNoActiveTransfers;
  225. TPeerLink()
  226. : TimeNoActiveTransfers(0)
  227. {
  228. }
  229. bool Update(float deltaT, const TUdpAddress& toAddress, float* maxWaitTime) {
  230. bool updateOk = UdpCongestion->UpdateAlive(toAddress, deltaT, UDP_TRANSFER_TIMEOUT, maxWaitTime);
  231. return updateOk;
  232. }
  233. void StartSleep(const TUdpAddress& toAddress, float* maxWaitTime) {
  234. //printf("peer_link start sleep, IBPeer = %p, refs = %d\n", IBPeer.Get(), (int)IBPeer.RefCount());
  235. UdpCongestion->UpdateAlive(toAddress, 0, UDP_TRANSFER_TIMEOUT, maxWaitTime);
  236. UdpCongestion->MarkAlive();
  237. TimeNoActiveTransfers = 0;
  238. }
  239. bool UpdateSleep(float deltaT) {
  240. TimeNoActiveTransfers += deltaT;
  241. if (IBPeer.Get()) {
  242. //printf("peer_link update sleep, IBPeer = %p, refs = %d\n", IBPeer.Get(), (int)IBPeer.RefCount());
  243. if (IBPeer->GetState() == IIBPeer::OK) {
  244. return true;
  245. }
  246. //printf("Drop broken IB connection\n");
  247. IBPeer = nullptr;
  248. }
  249. return (TimeNoActiveTransfers < UDP_KEEP_PEER_INFO);
  250. }
  251. };
  252. TNetSocket s;
  253. typedef THashMap<TTransferKey, TUdpInTransfer, TTransferKeyHash> TUdpInXferHash;
  254. typedef THashMap<TTransferKey, TUdpOutTransfer, TTransferKeyHash> TUdpOutXferHash;
  255. // congestion control per peer
  256. typedef THashMap<TUdpAddress, TPeerLink, TUdpAddressHash> TPeerLinkHash;
  257. typedef THashMap<TTransferKey, TUdpCompleteInTransfer, TTransferKeyHash> TUdpCompleteInXferHash;
  258. typedef THashMap<TUdpAddress, TIntrusivePtr<TPeerQueueStats>, TUdpAddressHash> TQueueStatsHash;
  259. TUdpInXferHash RecvQueue;
  260. TUdpCompleteInXferHash RecvCompleted;
  261. TUdpOutXferHash SendQueue;
  262. TPeerLinkHash CongestionTrack, CongestionTrackHistory;
  263. TList<TRequest*> ReceivedList;
  264. NHPTimer::STime CurrentT;
  265. TList<TSendResult> SendResults;
  266. TList<TTransferKey> SendOrderLow, SendOrder, SendOrderHighPrior;
  267. TAtomic IsWaiting;
  268. float MaxWaitTime;
  269. std::atomic<float> MaxWaitTime2;
  270. float IBIdleTime;
  271. TVector<TTransferKey> RecvCompletedQueue, KeepCompletedQueue;
  272. float TimeSinceCompletedQueueClean, TimeSinceCongestionHistoryUpdate;
  273. TRequesterPendingDataStats PendingDataStats;
  274. TQueueStatsHash PeerQueueStats;
  275. TIntrusivePtr<IIBClientServer> IB;
  276. typedef THashMap<TIBMsgHandle, TTransferKey> TIBtoTransferKeyHash;
  277. TIBtoTransferKeyHash IBKeyToTransferKey;
  278. char PktBuf[UDP_PACKET_BUF_SIZE];
  279. TUdpHostRevBufAlloc RecvBuf;
  280. TPeerLink& GetPeerLink(const TUdpAddress& ip) {
  281. TPeerLinkHash::iterator z = CongestionTrack.find(ip);
  282. if (z == CongestionTrack.end()) {
  283. z = CongestionTrackHistory.find(ip);
  284. if (z == CongestionTrackHistory.end()) {
  285. TPeerLink& res = CongestionTrack[ip];
  286. Y_ASSERT(res.UdpCongestion.Get() == nullptr);
  287. res.UdpCongestion = new TCongestionControl;
  288. TQueueStatsHash::iterator zq = PeerQueueStats.find(ip);
  289. if (zq != PeerQueueStats.end()) {
  290. res.UdpCongestion->AttachQueueStats(zq->second);
  291. }
  292. return res;
  293. } else {
  294. TPeerLink& res = CongestionTrack[z->first];
  295. res = z->second;
  296. CongestionTrackHistory.erase(z);
  297. return res;
  298. }
  299. } else {
  300. Y_ASSERT(CongestionTrackHistory.find(ip) == CongestionTrackHistory.end());
  301. return z->second;
  302. }
  303. }
  304. void SucceededSend(int id) {
  305. SendResults.push_back(TSendResult(id, true));
  306. }
  307. void FailedSend(int id) {
  308. SendResults.push_back(TSendResult(id, false));
  309. }
  310. void SendData(TList<TTransferKey>* order, float deltaT, bool needCheckAlive);
  311. void RecvCycle();
  312. public:
  313. TUdpHost()
  314. : CurrentT(0)
  315. , IsWaiting(0)
  316. , MaxWaitTime(DEFAULT_MAX_WAIT_TIME)
  317. , MaxWaitTime2(DEFAULT_MAX_WAIT_TIME)
  318. , IBIdleTime(0)
  319. , TimeSinceCompletedQueueClean(0)
  320. , TimeSinceCongestionHistoryUpdate(0)
  321. {
  322. }
  323. ~TUdpHost() override {
  324. for (TList<TRequest*>::const_iterator i = ReceivedList.begin(); i != ReceivedList.end(); ++i)
  325. delete *i;
  326. }
  327. bool Start(const TIntrusivePtr<NNetlibaSocket::ISocket>& socket);
  328. TRequest* GetRequest() override {
  329. if (ReceivedList.empty()) {
  330. if (IB.Get()) {
  331. return IB->GetRequest();
  332. }
  333. return nullptr;
  334. }
  335. TRequest* res = ReceivedList.front();
  336. ReceivedList.pop_front();
  337. return res;
  338. }
  339. void AddToSendOrder(const TTransferKey& transferKey, EPacketPriority pp) {
  340. if (pp == PP_LOW)
  341. SendOrderLow.push_back(transferKey);
  342. else if (pp == PP_NORMAL)
  343. SendOrder.push_back(transferKey);
  344. else if (pp == PP_HIGH)
  345. SendOrderHighPrior.push_back(transferKey);
  346. else
  347. Y_ASSERT(0);
  348. CancelWait();
  349. }
  350. int Send(const TUdpAddress& addr, TAutoPtr<TRopeDataPacket> data, int crc32, TGUID* packetGuid, EPacketPriority pp) override {
  351. if (addr.Port == 0) {
  352. // shortcut for broken addresses
  353. if (packetGuid && packetGuid->IsEmpty())
  354. CreateGuid(packetGuid);
  355. int reqId = GetTransferId();
  356. FailedSend(reqId);
  357. return reqId;
  358. }
  359. TTransferKey transferKey;
  360. transferKey.Address = addr;
  361. transferKey.Id = GetTransferId();
  362. Y_ASSERT(SendQueue.find(transferKey) == SendQueue.end());
  363. TPeerLink& peerInfo = GetPeerLink(transferKey.Address);
  364. TUdpOutTransfer& xfer = SendQueue[transferKey];
  365. GetWinsockAddr(&xfer.ToAddress, transferKey.Address);
  366. xfer.Crc32 = crc32;
  367. xfer.PacketPriority = pp;
  368. if (!packetGuid || packetGuid->IsEmpty()) {
  369. CreateGuid(&xfer.PacketGuid);
  370. if (packetGuid)
  371. *packetGuid = xfer.PacketGuid;
  372. } else {
  373. xfer.PacketGuid = *packetGuid;
  374. }
  375. xfer.Data.Reset(data.Release());
  376. xfer.AttachStats(&PendingDataStats);
  377. xfer.AckTracker.AttachCongestionControl(peerInfo.UdpCongestion.Get());
  378. bool isSentOverIB = false;
  379. // we don't support priorities (=service levels in IB terms) currently
  380. // so send only PP_NORMAL traffic over IB
  381. if (pp == PP_NORMAL && peerInfo.IBPeer.Get() && xfer.Data->GetSharedData() == nullptr) {
  382. TIBMsgHandle hndl = IB->Send(peerInfo.IBPeer, xfer.Data.Get(), xfer.PacketGuid);
  383. if (hndl >= 0) {
  384. IBKeyToTransferKey[hndl] = transferKey;
  385. isSentOverIB = true;
  386. } else {
  387. // so we failed to use IB, ibPeer is either not connected yet or failed
  388. if (peerInfo.IBPeer->GetState() == IIBPeer::FAILED) {
  389. //printf("Disconnect failed IB peer\n");
  390. peerInfo.IBPeer = nullptr;
  391. }
  392. }
  393. }
  394. if (!isSentOverIB) {
  395. AddToSendOrder(transferKey, pp);
  396. }
  397. return transferKey.Id;
  398. }
  399. bool GetSendResult(TSendResult* res) override {
  400. if (SendResults.empty()) {
  401. if (IB.Get()) {
  402. TIBSendResult sr;
  403. if (IB->GetSendResult(&sr)) {
  404. TIBtoTransferKeyHash::iterator z = IBKeyToTransferKey.find(sr.Handle);
  405. if (z == IBKeyToTransferKey.end()) {
  406. Y_ABORT_UNLESS(0, "unknown handle returned from IB");
  407. }
  408. TTransferKey transferKey = z->second;
  409. IBKeyToTransferKey.erase(z);
  410. TUdpOutXferHash::iterator i = SendQueue.find(transferKey);
  411. if (i == SendQueue.end()) {
  412. Y_ABORT_UNLESS(0, "IBKeyToTransferKey refers nonexisting xfer");
  413. }
  414. if (sr.Success) {
  415. TUdpOutTransfer& xfer = i->second;
  416. xfer.AckTracker.MarkAlive(); // do we really need this?
  417. *res = TSendResult(transferKey.Id, sr.Success);
  418. SendQueue.erase(i);
  419. return true;
  420. } else {
  421. //printf("IB send failed, fall back to regular network\n");
  422. // Houston, we got a problem
  423. // IB failed to send, try to use regular network
  424. TUdpOutTransfer& xfer = i->second;
  425. AddToSendOrder(transferKey, xfer.PacketPriority);
  426. }
  427. }
  428. }
  429. return false;
  430. }
  431. *res = SendResults.front();
  432. SendResults.pop_front();
  433. return true;
  434. }
  435. void Step() override;
  436. void IBStep() override;
  437. void Wait(float seconds) override {
  438. if (seconds < 1e-3)
  439. seconds = 0;
  440. if (seconds > MaxWaitTime)
  441. seconds = MaxWaitTime;
  442. if (IBIdleTime < 0.010) {
  443. seconds = 0;
  444. }
  445. if (seconds == 0) {
  446. ThreadYield();
  447. } else {
  448. AtomicAdd(IsWaiting, 1);
  449. if (seconds > MaxWaitTime2)
  450. seconds = MaxWaitTime2;
  451. MaxWaitTime2 = DEFAULT_MAX_WAIT_TIME;
  452. if (seconds == 0) {
  453. ThreadYield();
  454. } else {
  455. if (IB.Get()) {
  456. for (float done = 0; done < seconds;) {
  457. float deltaSleep = Min(seconds - done, 0.002f);
  458. s.Wait(deltaSleep);
  459. NHPTimer::STime tChk;
  460. NHPTimer::GetTime(&tChk);
  461. if (IB->Step(tChk)) {
  462. IBIdleTime = 0;
  463. break;
  464. }
  465. done += deltaSleep;
  466. }
  467. } else {
  468. s.Wait(seconds);
  469. }
  470. }
  471. AtomicAdd(IsWaiting, -1);
  472. }
  473. }
  474. void CancelWait() override {
  475. MaxWaitTime2 = 0;
  476. if (AtomicAdd(IsWaiting, 0) == 1) {
  477. s.SendSelfFakePacket();
  478. }
  479. }
  480. void GetPendingDataSize(TRequesterPendingDataStats* res) override {
  481. *res = PendingDataStats;
  482. #ifndef NDEBUG
  483. TRequesterPendingDataStats chk;
  484. for (TUdpOutXferHash::const_iterator i = SendQueue.begin(); i != SendQueue.end(); ++i) {
  485. TRopeDataPacket* pckt = i->second.Data.Get();
  486. if (pckt) {
  487. chk.OutDataSize += pckt->GetSize();
  488. ++chk.OutCount;
  489. }
  490. }
  491. for (TUdpInXferHash::const_iterator i = RecvQueue.begin(); i != RecvQueue.end(); ++i) {
  492. const TUdpInTransfer& tr = i->second;
  493. for (int p = 0; p < tr.GetPacketCount(); ++p) {
  494. if (tr.GetPacket(p)) {
  495. chk.InpDataSize += tr.PacketSize;
  496. }
  497. }
  498. ++chk.InpCount;
  499. }
  500. Y_ASSERT(memcmp(&chk, res, sizeof(chk)) == 0);
  501. #endif
  502. }
  503. TString GetDebugInfo() override;
  504. TString GetPeerLinkDebug(const TPeerLinkHash& ch);
  505. void Kill(const TUdpAddress& addr) override;
  506. TIntrusivePtr<IPeerQueueStats> GetQueueStats(const TUdpAddress& addr) override;
  507. };
  508. bool TUdpHost::Start(const TIntrusivePtr<NNetlibaSocket::ISocket>& socket) {
  509. if (s.IsValid()) {
  510. Y_ASSERT(0);
  511. return false;
  512. }
  513. s.Open(socket);
  514. if (!s.IsValid())
  515. return false;
  516. if (IBDetection)
  517. IB = CreateIBClientServer();
  518. NHPTimer::GetTime(&CurrentT);
  519. return true;
  520. }
  521. static bool HasAllPackets(const TUdpInTransfer& res) {
  522. if (!res.HasLastPacket)
  523. return false;
  524. for (int i = res.GetPacketCount() - 1; i >= 0; --i) {
  525. if (!res.GetPacket(i))
  526. return false;
  527. }
  528. return true;
  529. }
  530. // grouped acks, first int - packet_id, second int - bit mask for 32 packets preceding packet_id
  531. const int SIZEOF_ACK = 8;
  532. static int WriteAck(TUdpInTransfer* p, int* dst, int maxAcks) {
  533. int ackCount = 0;
  534. if (p->NewPacketsToAck.size() > 1)
  535. Sort(p->NewPacketsToAck.begin(), p->NewPacketsToAck.end());
  536. int lastAcked = 0;
  537. for (size_t idx = 0; idx < p->NewPacketsToAck.size(); ++idx) {
  538. int pkt = p->NewPacketsToAck[idx];
  539. if (idx == p->NewPacketsToAck.size() - 1 || pkt > lastAcked + 30) {
  540. *dst++ = pkt;
  541. int bitMask = 0;
  542. int backPackets = Min(pkt, 32);
  543. for (int k = 0; k < backPackets; ++k) {
  544. if (p->GetPacket(pkt - k - 1))
  545. bitMask |= 1 << k;
  546. }
  547. *dst++ = bitMask;
  548. if (++ackCount >= maxAcks)
  549. break;
  550. lastAcked = pkt;
  551. //printf("sending ack %d (mask %x)\n", pkt, bitMask);
  552. }
  553. }
  554. p->NewPacketsToAck.clear();
  555. return ackCount;
  556. }
  557. static void AckPacket(TUdpOutTransfer* p, int pkt, float deltaT, bool updateRTT) {
  558. if (pkt < 0 || pkt >= p->PacketCount) {
  559. Y_ASSERT(0);
  560. return;
  561. }
  562. p->AckTracker.Ack(pkt, deltaT, updateRTT);
  563. }
  564. static void ReadAcks(TUdpOutTransfer* p, const int* acks, int ackCount, float deltaT) {
  565. for (int i = 0; i < ackCount; ++i) {
  566. int pkt = *acks++;
  567. int bitMask = *acks++;
  568. bool updateRTT = i == ackCount - 1; // update RTT using only last packet in the pack
  569. AckPacket(p, pkt, deltaT, updateRTT);
  570. for (int k = 0; k < 32; ++k) {
  571. if (bitMask & (1 << k))
  572. AckPacket(p, pkt - k - 1, deltaT, false);
  573. }
  574. }
  575. }
  576. using namespace NNetlibaSocket::NNetliba;
  577. const ui64 KILL_PASSPHRASE1 = 0x98ff9cefb11d9a4cul;
  578. const ui64 KILL_PASSPHRASE2 = 0xf7754c29e0be95eaul;
  579. template <class T>
  580. inline T Read(char** data) {
  581. T res = ReadUnaligned<T>(*data);
  582. *data += sizeof(T);
  583. return res;
  584. }
  585. template <class T>
  586. inline void Write(char** data, T res) {
  587. WriteUnaligned<T>(*data, res);
  588. *data += sizeof(T);
  589. }
  590. static void RequireResend(const TNetSocket& s, const sockaddr_in6& toAddress, int transferId, int attempt) {
  591. char buf[100], *pktData = buf + UDP_LOW_LEVEL_HEADER_SIZE;
  592. Write(&pktData, transferId);
  593. Write(&pktData, (char)ACK_RESEND);
  594. Write(&pktData, attempt);
  595. s.SendTo(buf, (int)(pktData - buf), toAddress, FF_ALLOW_FRAG);
  596. }
  597. static void RequireResendNoShmem(const TNetSocket& s, const sockaddr_in6& toAddress, int transferId, int attempt) {
  598. char buf[100], *pktData = buf + UDP_LOW_LEVEL_HEADER_SIZE;
  599. Write(&pktData, transferId);
  600. Write(&pktData, (char)ACK_RESEND_NOSHMEM);
  601. Write(&pktData, attempt);
  602. s.SendTo(buf, (int)(pktData - buf), toAddress, FF_ALLOW_FRAG);
  603. }
  604. static void AckComplete(const TNetSocket& s, const sockaddr_in6& toAddress, int transferId, const TGUID& packetGuid, int packetId) {
  605. char buf[100], *pktData = buf + UDP_LOW_LEVEL_HEADER_SIZE;
  606. Write(&pktData, transferId);
  607. Write(&pktData, (char)ACK_COMPLETE);
  608. Write(&pktData, packetGuid);
  609. Write(&pktData, packetId); // we need packetId to update RTT
  610. s.SendTo(buf, (int)(pktData - buf), toAddress, FF_ALLOW_FRAG);
  611. }
  612. static void SendPing(TNetSocket& s, const sockaddr_in6& toAddress, int selfNetworkOrderPort) {
  613. char pktBuf[UDP_PACKET_SIZE_FULL];
  614. char* pktData = pktBuf + UDP_LOW_LEVEL_HEADER_SIZE;
  615. if (NSan::MSanIsOn()) {
  616. Zero(pktBuf);
  617. }
  618. Write(&pktData, (int)0);
  619. Write(&pktData, (char)PING);
  620. Write(&pktData, selfNetworkOrderPort);
  621. s.SendTo(pktBuf, UDP_PACKET_SIZE_FULL, toAddress, FF_DONT_FRAG);
  622. }
  623. // not MTU discovery, just figure out IB address of the peer
  624. static void SendFakePing(TNetSocket& s, const sockaddr_in6& toAddress, int selfNetworkOrderPort) {
  625. char buf[100];
  626. char* pktData = buf + UDP_LOW_LEVEL_HEADER_SIZE;
  627. Write(&pktData, (int)0);
  628. Write(&pktData, (char)PING);
  629. Write(&pktData, selfNetworkOrderPort);
  630. s.SendTo(buf, (int)(pktData - buf), toAddress, FF_ALLOW_FRAG);
  631. }
  632. void TUdpHost::SendData(TList<TTransferKey>* order, float deltaT1, bool needCheckAlive) {
  633. for (TList<TTransferKey>::iterator z = order->begin(); z != order->end();) {
  634. // pick connection to send
  635. const TTransferKey& transferKey = *z;
  636. TUdpOutXferHash::iterator i = SendQueue.find(transferKey);
  637. if (i == SendQueue.end()) {
  638. z = order->erase(z);
  639. continue;
  640. }
  641. ++z;
  642. // perform sending
  643. int transferId = transferKey.Id;
  644. TUdpOutTransfer& xfer = i->second;
  645. if (!xfer.AckTracker.IsInitialized()) {
  646. TIntrusivePtr<TCongestionControl> congestion = xfer.AckTracker.GetCongestionControl();
  647. Y_ASSERT(congestion.Get() != nullptr);
  648. if (!congestion->IsKnownMTU()) {
  649. TLameMTUDiscovery* md = congestion->GetMTUDiscovery();
  650. if (md->IsTimedOut()) {
  651. congestion->SetMTU(UDP_PACKET_SIZE_SMALL);
  652. } else {
  653. if (md->CanSend()) {
  654. SendPing(s, xfer.ToAddress, s.GetNetworkOrderPort());
  655. md->PingSent();
  656. }
  657. continue;
  658. }
  659. }
  660. // try to use large mtu, we could have selected small mtu due to connectivity problems
  661. if (congestion->GetMTU() == UDP_PACKET_SIZE_SMALL || IB.Get() != nullptr) {
  662. // recheck every ~50mb
  663. int chkDenom = (50000000 / xfer.Data->GetSize()) | 1;
  664. if ((NetAckRnd() % chkDenom) == 0) {
  665. //printf("send rechecking ping\n");
  666. if (congestion->GetMTU() == UDP_PACKET_SIZE_SMALL) {
  667. SendPing(s, xfer.ToAddress, s.GetNetworkOrderPort());
  668. } else {
  669. SendFakePing(s, xfer.ToAddress, s.GetNetworkOrderPort());
  670. }
  671. }
  672. }
  673. xfer.PacketSize = congestion->GetMTU();
  674. xfer.LastPacketSize = xfer.Data->GetSize() % xfer.PacketSize;
  675. xfer.PacketCount = xfer.Data->GetSize() / xfer.PacketSize + 1;
  676. xfer.AckTracker.SetPacketCount(xfer.PacketCount);
  677. }
  678. xfer.AckTracker.Step(deltaT1);
  679. MaxWaitTime = Min(MaxWaitTime, xfer.AckTracker.GetTimeToNextPacketTimeout());
  680. if (needCheckAlive && !xfer.AckTracker.IsAlive()) {
  681. FailedSend(transferId);
  682. SendQueue.erase(i);
  683. continue;
  684. }
  685. bool sendBufferOverflow = false;
  686. while (xfer.AckTracker.CanSend()) {
  687. NHPTimer::STime tCopy = CurrentT;
  688. float deltaT2 = (float)NHPTimer::GetTimePassed(&tCopy);
  689. deltaT2 = ClampVal(deltaT2, 0.0f, UDP_TRANSFER_TIMEOUT / 3);
  690. int pkt = xfer.AckTracker.GetPacketToSend(deltaT2);
  691. if (pkt == -1) {
  692. break;
  693. }
  694. int dataSize = xfer.PacketSize;
  695. if (pkt == xfer.PacketCount - 1)
  696. dataSize = xfer.LastPacketSize;
  697. char* pktData = PktBuf + UDP_LOW_LEVEL_HEADER_SIZE;
  698. Write(&pktData, transferId);
  699. char pktType = xfer.PacketSize == UDP_PACKET_SIZE ? DATA : DATA_SMALL;
  700. TSharedMemory* shm = xfer.Data->GetSharedData();
  701. if (shm) {
  702. if (pktType == DATA)
  703. pktType = DATA_SHMEM;
  704. else
  705. pktType = DATA_SMALL_SHMEM;
  706. }
  707. Write(&pktData, pktType);
  708. Write(&pktData, xfer.Attempt);
  709. Write(&pktData, pkt);
  710. if (pkt == 0) {
  711. Write(&pktData, xfer.PacketGuid);
  712. Write(&pktData, xfer.Crc32);
  713. if (shm) {
  714. Write(&pktData, shm->GetId());
  715. Write(&pktData, shm->GetSize());
  716. }
  717. }
  718. TBlockChainIterator dataReader(xfer.Data->GetChain());
  719. dataReader.Seek(pkt * xfer.PacketSize);
  720. dataReader.Read(pktData, dataSize);
  721. pktData += dataSize;
  722. int sendSize = (int)(pktData - PktBuf);
  723. TNetSocket::ESendError sendErr = s.SendTo(PktBuf, sendSize, xfer.ToAddress, FF_ALLOW_FRAG);
  724. if (sendErr != TNetSocket::SEND_OK) {
  725. if (sendErr == TNetSocket::SEND_NO_ROUTE_TO_HOST) {
  726. FailedSend(transferId);
  727. SendQueue.erase(i);
  728. break;
  729. } else {
  730. // most probably out of send buffer space (or something terrible has happened)
  731. xfer.AckTracker.AddToResend(pkt);
  732. sendBufferOverflow = true;
  733. MaxWaitTime = 0;
  734. //printf("failed send\n");
  735. break;
  736. }
  737. }
  738. }
  739. if (sendBufferOverflow)
  740. break;
  741. }
  742. }
  743. void TUdpHost::RecvCycle() {
  744. for (;;) {
  745. sockaddr_in6 fromAddress;
  746. int rv = RecvBuf.GetBufSize();
  747. bool recvOk = s.RecvFrom(RecvBuf.GetDataPtr(), &rv, &fromAddress);
  748. if (!recvOk)
  749. break;
  750. NHPTimer::STime tCopy = CurrentT;
  751. float deltaT = (float)NHPTimer::GetTimePassed(&tCopy);
  752. deltaT = ClampVal(deltaT, 0.0f, UDP_TRANSFER_TIMEOUT / 3);
  753. //int fromIP = fromAddress.sin_addr.s_addr;
  754. TTransferKey k;
  755. char* pktData = RecvBuf.GetDataPtr() + UDP_LOW_LEVEL_HEADER_SIZE;
  756. GetUdpAddress(&k.Address, fromAddress);
  757. k.Id = Read<int>(&pktData);
  758. int transferId = k.Id;
  759. int cmd = Read<char>(&pktData);
  760. Y_ASSERT(cmd == (int)*(RecvBuf.GetDataPtr() + CMD_POS));
  761. switch (cmd) {
  762. case DATA:
  763. case DATA_SMALL:
  764. case DATA_SHMEM:
  765. case DATA_SMALL_SHMEM: {
  766. int attempt = Read<int>(&pktData);
  767. int packetId = Read<int>(&pktData);
  768. //printf("data packet %d (trans ID = %d)\n", packetId, transferId);
  769. TUdpCompleteInXferHash::iterator itCompl = RecvCompleted.find(k);
  770. if (itCompl != RecvCompleted.end()) {
  771. Y_ASSERT(RecvQueue.find(k) == RecvQueue.end());
  772. const TUdpCompleteInTransfer& complete = itCompl->second;
  773. bool sendAckComplete = true;
  774. if (packetId == 0) {
  775. // check packet GUID
  776. char* tmpPktData = pktData;
  777. TGUID packetGuid;
  778. packetGuid = Read<TGUID>(&tmpPktData);
  779. if (packetGuid != complete.PacketGuid) {
  780. // we are receiving new data with the same transferId
  781. // in this case we have to flush all the information about previous transfer
  782. // and start over
  783. //printf("same transferId for a different packet\n");
  784. RecvCompleted.erase(itCompl);
  785. sendAckComplete = false;
  786. }
  787. }
  788. if (sendAckComplete) {
  789. AckComplete(s, fromAddress, transferId, complete.PacketGuid, packetId);
  790. break;
  791. }
  792. }
  793. TUdpInXferHash::iterator rq = RecvQueue.find(k);
  794. if (rq == RecvQueue.end()) {
  795. //printf("new input transfer\n");
  796. TUdpInTransfer& res = RecvQueue[k];
  797. res.ToAddress = fromAddress;
  798. res.Attempt = attempt;
  799. res.Congestion = GetPeerLink(k.Address).UdpCongestion.Get();
  800. res.PacketSize = 0;
  801. res.HasLastPacket = false;
  802. res.AttachStats(&PendingDataStats);
  803. rq = RecvQueue.find(k);
  804. Y_ASSERT(rq != RecvQueue.end());
  805. }
  806. TUdpInTransfer& res = rq->second;
  807. res.Congestion->MarkAlive();
  808. res.TimeSinceLastRecv = 0;
  809. if (packetId == 0) {
  810. TGUID packetGuid;
  811. packetGuid = Read<TGUID>(&pktData);
  812. int crc32 = Read<int>(&pktData);
  813. res.Crc32 = crc32;
  814. res.PacketGuid = packetGuid;
  815. if (cmd == DATA_SHMEM || cmd == DATA_SMALL_SHMEM) {
  816. // link to attached shared memory
  817. TGUID shmemId = Read<TGUID>(&pktData);
  818. int shmemSize = Read<int>(&pktData);
  819. if (res.SharedData.Get() == nullptr) {
  820. res.SharedData = new TSharedMemory;
  821. if (!res.SharedData->Open(shmemId, shmemSize)) {
  822. res.SharedData = nullptr;
  823. RequireResendNoShmem(s, res.ToAddress, transferId, res.Attempt);
  824. break;
  825. }
  826. }
  827. }
  828. }
  829. if (attempt != res.Attempt) {
  830. RequireResend(s, res.ToAddress, transferId, res.Attempt);
  831. break;
  832. } else {
  833. if (res.PacketSize == 0) {
  834. res.PacketSize = (cmd == DATA || cmd == DATA_SHMEM ? UDP_PACKET_SIZE : UDP_PACKET_SIZE_SMALL);
  835. } else {
  836. // check that all data is of same size
  837. Y_ASSERT(cmd == DATA || cmd == DATA_SMALL);
  838. Y_ASSERT(res.PacketSize == (cmd == DATA ? UDP_PACKET_SIZE : UDP_PACKET_SIZE_SMALL));
  839. }
  840. int dataSize = (int)(RecvBuf.GetDataPtr() + rv - pktData);
  841. Y_ASSERT(dataSize <= res.PacketSize);
  842. if (dataSize > res.PacketSize)
  843. break; // mem overrun protection
  844. if (packetId >= res.GetPacketCount())
  845. res.SetPacketCount(packetId + 1);
  846. {
  847. TUdpRecvPacket* pkt = nullptr;
  848. if (res.PacketSize == UDP_PACKET_SIZE_SMALL) {
  849. // save memory by using smaller buffer at the cost of additional memcpy
  850. pkt = RecvBuf.CreateNewSmallPacket(dataSize);
  851. memcpy(pkt->Data, pktData, dataSize);
  852. pkt->DataStart = 0;
  853. pkt->DataSize = dataSize;
  854. } else {
  855. int dataStart = (int)(pktData - RecvBuf.GetDataPtr()); // data offset in the packet
  856. pkt = RecvBuf.ExtractPacket();
  857. pkt->DataStart = dataStart;
  858. pkt->DataSize = dataSize;
  859. }
  860. // calc packet sum, will be used to calc whole message crc
  861. pkt->BlockSum = TIncrementalChecksumCalcer::CalcBlockSum(pkt->Data + pkt->DataStart, pkt->DataSize);
  862. res.AssignPacket(packetId, pkt);
  863. }
  864. if (dataSize != res.PacketSize) {
  865. res.LastPacketSize = dataSize;
  866. res.HasLastPacket = true;
  867. }
  868. if (HasAllPackets(res)) {
  869. //printf("received\n");
  870. TRequest* out = new TRequest;
  871. out->Address = k.Address;
  872. out->Guid = res.PacketGuid;
  873. TIncrementalChecksumCalcer incCS;
  874. int packetCount = res.GetPacketCount();
  875. out->Data.Reset(new TRopeDataPacket);
  876. for (int i = 0; i < packetCount; ++i) {
  877. TUdpRecvPacket* pkt = res.ExtractPacket(i);
  878. Y_ASSERT(pkt->DataSize == ((i == packetCount - 1) ? res.LastPacketSize : res.PacketSize));
  879. out->Data->AddBlock((char*)pkt, pkt->Data + pkt->DataStart, pkt->DataSize);
  880. incCS.AddBlockSum(pkt->BlockSum, pkt->DataSize);
  881. }
  882. out->Data->AttachSharedData(res.SharedData);
  883. res.EraseAllPackets();
  884. int crc32 = incCS.CalcChecksum(); // CalcChecksum(out->Data->GetChain());
  885. #ifdef SIMULATE_NETWORK_FAILURES
  886. bool crcOk = crc32 == res.Crc32 ? (RandomNumber<size_t>() % 10) != 0 : false;
  887. #else
  888. bool crcOk = crc32 == res.Crc32;
  889. #endif
  890. if (crcOk) {
  891. ReceivedList.push_back(out);
  892. Y_ASSERT(RecvCompleted.find(k) == RecvCompleted.end());
  893. TUdpCompleteInTransfer& complete = RecvCompleted[k];
  894. RecvCompletedQueue.push_back(k);
  895. complete.PacketGuid = res.PacketGuid;
  896. AckComplete(s, res.ToAddress, transferId, complete.PacketGuid, packetId);
  897. RecvQueue.erase(rq);
  898. } else {
  899. //printf("crc failed, require resend\n");
  900. delete out;
  901. ++res.Attempt;
  902. res.NewPacketsToAck.clear();
  903. RequireResend(s, res.ToAddress, transferId, res.Attempt);
  904. }
  905. } else {
  906. res.NewPacketsToAck.push_back(packetId);
  907. }
  908. }
  909. } break;
  910. case ACK: {
  911. TUdpOutXferHash::iterator i = SendQueue.find(k);
  912. if (i == SendQueue.end())
  913. break;
  914. TUdpOutTransfer& xfer = i->second;
  915. if (!xfer.AckTracker.IsInitialized())
  916. break;
  917. xfer.AckTracker.MarkAlive();
  918. int attempt = Read<int>(&pktData);
  919. Y_ASSERT(attempt <= xfer.Attempt);
  920. if (attempt != xfer.Attempt)
  921. break;
  922. ReadAcks(&xfer, (int*)pktData, (int)(RecvBuf.GetDataPtr() + rv - pktData) / SIZEOF_ACK, deltaT);
  923. break;
  924. }
  925. case ACK_COMPLETE: {
  926. TUdpOutXferHash::iterator i = SendQueue.find(k);
  927. if (i == SendQueue.end())
  928. break;
  929. TUdpOutTransfer& xfer = i->second;
  930. xfer.AckTracker.MarkAlive();
  931. TGUID packetGuid;
  932. packetGuid = Read<TGUID>(&pktData);
  933. int packetId = Read<int>(&pktData);
  934. if (packetGuid == xfer.PacketGuid) {
  935. xfer.AckTracker.Ack(packetId, deltaT, true); // update RTT
  936. xfer.AckTracker.AckAll(); // acking packets is required, otherwise they will be treated as lost (look AckTracker destructor)
  937. SucceededSend(transferId);
  938. SendQueue.erase(i);
  939. } else {
  940. // peer asserts that he has received this packet but packetGuid is wrong
  941. // try to resend everything
  942. // ++xfer.Attempt; // should not do this, only sender can modify attempt number, otherwise cycle is possible with out of order packets
  943. xfer.AckTracker.Resend();
  944. }
  945. break;
  946. } break;
  947. case ACK_RESEND: {
  948. TUdpOutXferHash::iterator i = SendQueue.find(k);
  949. if (i == SendQueue.end())
  950. break;
  951. TUdpOutTransfer& xfer = i->second;
  952. xfer.AckTracker.MarkAlive();
  953. int attempt = Read<int>(&pktData);
  954. if (xfer.Attempt != attempt) {
  955. // reset current tranfser & initialize new one
  956. xfer.Attempt = attempt;
  957. xfer.AckTracker.Resend();
  958. }
  959. break;
  960. }
  961. case ACK_RESEND_NOSHMEM: {
  962. // abort execution here
  963. // failed to open shmem on recv side, need to transmit data without using shmem
  964. Y_ABORT_UNLESS(0, "not implemented yet");
  965. break;
  966. }
  967. case PING: {
  968. sockaddr_in6 trueFromAddress = fromAddress;
  969. int port = Read<int>(&pktData);
  970. Y_ASSERT(trueFromAddress.sin6_family == AF_INET6);
  971. trueFromAddress.sin6_port = port;
  972. // can not set MTU for fromAddress here since asymmetrical mtu is possible
  973. char* pktData2 = PktBuf + UDP_LOW_LEVEL_HEADER_SIZE;
  974. Write(&pktData2, (int)0);
  975. Write(&pktData2, (char)PONG);
  976. if (IB.Get()) {
  977. const TIBConnectInfo& ibConnectInfo = IB->GetConnectInfo();
  978. Write(&pktData2, ibConnectInfo);
  979. Write(&pktData2, trueFromAddress);
  980. }
  981. s.SendTo(PktBuf, pktData2 - PktBuf, trueFromAddress, FF_ALLOW_FRAG);
  982. break;
  983. }
  984. case PONG: {
  985. TPeerLink& peerInfo = GetPeerLink(k.Address);
  986. peerInfo.UdpCongestion->SetMTU(UDP_PACKET_SIZE);
  987. int dataSize = (int)(RecvBuf.GetDataPtr() + rv - pktData);
  988. if (dataSize == sizeof(TIBConnectInfo) + sizeof(sockaddr_in6)) {
  989. if (IB.Get() != nullptr && peerInfo.IBPeer.Get() == nullptr) {
  990. TIBConnectInfo info = Read<TIBConnectInfo>(&pktData);
  991. sockaddr_in6 myAddress = Read<sockaddr_in6>(&pktData);
  992. TUdpAddress myUdpAddress;
  993. GetUdpAddress(&myUdpAddress, myAddress);
  994. peerInfo.IBPeer = IB->ConnectPeer(info, k.Address, myUdpAddress);
  995. }
  996. }
  997. break;
  998. }
  999. case KILL: {
  1000. ui64 p1 = Read<ui64>(&pktData);
  1001. ui64 p2 = Read<ui64>(&pktData);
  1002. int restSize = (int)(RecvBuf.GetDataPtr() + rv - pktData);
  1003. if (restSize == 0 && p1 == KILL_PASSPHRASE1 && p2 == KILL_PASSPHRASE2) {
  1004. abort();
  1005. }
  1006. break;
  1007. }
  1008. default:
  1009. Y_ASSERT(0);
  1010. break;
  1011. }
  1012. }
  1013. }
  1014. void TUdpHost::IBStep() {
  1015. if (IB.Get()) {
  1016. NHPTimer::STime tChk = CurrentT;
  1017. float chkDeltaT = (float)NHPTimer::GetTimePassed(&tChk);
  1018. if (IB->Step(tChk)) {
  1019. IBIdleTime = -chkDeltaT;
  1020. }
  1021. }
  1022. }
  1023. void TUdpHost::Step() {
  1024. if (IB.Get()) {
  1025. NHPTimer::STime tChk = CurrentT;
  1026. float chkDeltaT = (float)NHPTimer::GetTimePassed(&tChk);
  1027. if (IB->Step(tChk)) {
  1028. IBIdleTime = -chkDeltaT;
  1029. }
  1030. if (chkDeltaT < 0.0005) {
  1031. return;
  1032. }
  1033. }
  1034. if (UseTOSforAcks) {
  1035. s.SetTOS(0x20);
  1036. } else {
  1037. s.SetTOS(0);
  1038. }
  1039. RecvCycle();
  1040. float deltaT = (float)NHPTimer::GetTimePassed(&CurrentT);
  1041. deltaT = ClampVal(deltaT, 0.0f, UDP_TRANSFER_TIMEOUT / 3);
  1042. MaxWaitTime = DEFAULT_MAX_WAIT_TIME;
  1043. IBIdleTime += deltaT;
  1044. bool needCheckAlive = false;
  1045. // update alive ports
  1046. const float INACTIVE_CONGESTION_UPDATE_INTERVAL = 1;
  1047. TimeSinceCongestionHistoryUpdate += deltaT;
  1048. if (TimeSinceCongestionHistoryUpdate > INACTIVE_CONGESTION_UPDATE_INTERVAL) {
  1049. for (TPeerLinkHash::iterator i = CongestionTrackHistory.begin(); i != CongestionTrackHistory.end();) {
  1050. TPeerLink& pl = i->second;
  1051. if (!pl.UpdateSleep(TimeSinceCongestionHistoryUpdate)) {
  1052. TPeerLinkHash::iterator k = i++;
  1053. CongestionTrackHistory.erase(k);
  1054. needCheckAlive = true;
  1055. } else {
  1056. ++i;
  1057. }
  1058. }
  1059. TimeSinceCongestionHistoryUpdate = 0;
  1060. }
  1061. for (TPeerLinkHash::iterator i = CongestionTrack.begin(); i != CongestionTrack.end();) {
  1062. const TUdpAddress& addr = i->first;
  1063. TPeerLink& pl = i->second;
  1064. if (pl.UdpCongestion->GetTransferCount() == 0) {
  1065. pl.StartSleep(addr, &MaxWaitTime);
  1066. CongestionTrackHistory[i->first] = i->second;
  1067. TPeerLinkHash::iterator k = i++;
  1068. CongestionTrack.erase(k);
  1069. } else if (!pl.Update(deltaT, addr, &MaxWaitTime)) {
  1070. TPeerLinkHash::iterator k = i++;
  1071. CongestionTrack.erase(k);
  1072. needCheckAlive = true;
  1073. } else {
  1074. ++i;
  1075. }
  1076. }
  1077. // send acks on received data
  1078. for (TUdpInXferHash::iterator i = RecvQueue.begin(); i != RecvQueue.end();) {
  1079. const TTransferKey& transKey = i->first;
  1080. int transferId = transKey.Id;
  1081. TUdpInTransfer& xfer = i->second;
  1082. xfer.TimeSinceLastRecv += deltaT;
  1083. if (xfer.TimeSinceLastRecv > UDP_MAX_INPUT_DATA_WAIT || (needCheckAlive && !xfer.Congestion->IsAlive())) {
  1084. TUdpInXferHash::iterator k = i++;
  1085. RecvQueue.erase(k);
  1086. continue;
  1087. }
  1088. Y_ASSERT(RecvCompleted.find(i->first) == RecvCompleted.end()); // state "Complete & incomplete" is incorrect
  1089. if (!xfer.NewPacketsToAck.empty()) {
  1090. char* pktData = PktBuf + UDP_LOW_LEVEL_HEADER_SIZE;
  1091. Write(&pktData, transferId);
  1092. Write(&pktData, (char)ACK);
  1093. Write(&pktData, xfer.Attempt);
  1094. int acks = WriteAck(&xfer, (int*)pktData, (int)(xfer.PacketSize - (pktData - PktBuf)) / SIZEOF_ACK);
  1095. pktData += acks * SIZEOF_ACK;
  1096. s.SendTo(PktBuf, (int)(pktData - PktBuf), xfer.ToAddress, FF_ALLOW_FRAG);
  1097. }
  1098. ++i;
  1099. }
  1100. if (UseTOSforAcks) {
  1101. s.SetTOS(0x60);
  1102. }
  1103. // send data for outbound connections
  1104. SendData(&SendOrderHighPrior, deltaT, needCheckAlive);
  1105. SendData(&SendOrder, deltaT, needCheckAlive);
  1106. SendData(&SendOrderLow, deltaT, needCheckAlive);
  1107. // roll send order to avoid exotic problems with lots of peers and high traffic
  1108. SendOrderHighPrior.splice(SendOrderHighPrior.end(), SendOrderHighPrior, SendOrderHighPrior.begin());
  1109. //SendOrder.splice(SendOrder.end(), SendOrder, SendOrder.begin()); // sending data in order has lower delay and shorter queue
  1110. // clean completed queue
  1111. TimeSinceCompletedQueueClean += deltaT;
  1112. if (TimeSinceCompletedQueueClean > UDP_TRANSFER_TIMEOUT * 1.5) {
  1113. for (size_t i = 0; i < KeepCompletedQueue.size(); ++i) {
  1114. TUdpCompleteInXferHash::iterator k = RecvCompleted.find(KeepCompletedQueue[i]);
  1115. if (k != RecvCompleted.end())
  1116. RecvCompleted.erase(k);
  1117. }
  1118. KeepCompletedQueue.clear();
  1119. KeepCompletedQueue.swap(RecvCompletedQueue);
  1120. TimeSinceCompletedQueueClean = 0;
  1121. }
  1122. }
  1123. TString TUdpHost::GetPeerLinkDebug(const TPeerLinkHash& ch) {
  1124. TString res;
  1125. char buf[1000];
  1126. for (const auto& i : ch) {
  1127. const TUdpAddress& ip = i.first;
  1128. const TCongestionControl& cc = *i.second.UdpCongestion;
  1129. IIBPeer* ibPeer = i.second.IBPeer.Get();
  1130. snprintf(buf, sizeof(buf), "%s\tIB: %d, RTT: %g Timeout: %g Window: %g MaxWin: %g FailRate: %g TimeSinceLastRecv: %g Transfers: %d MTU: %d\n",
  1131. GetAddressAsString(ip).c_str(),
  1132. ibPeer ? ibPeer->GetState() : -1,
  1133. cc.GetRTT() * 1000, cc.GetTimeout() * 1000, cc.GetWindow(), cc.GetMaxWindow(), cc.GetFailRate(),
  1134. cc.GetTimeSinceLastRecv() * 1000, cc.GetTransferCount(), cc.GetMTU());
  1135. res += buf;
  1136. }
  1137. return res;
  1138. }
  1139. TString TUdpHost::GetDebugInfo() {
  1140. TString res;
  1141. char buf[1000];
  1142. snprintf(buf, sizeof(buf), "Receiving %d msgs, sending %d high prior, %d regular msgs, %d low prior msgs\n",
  1143. RecvQueue.ysize(), (int)SendOrderHighPrior.size(), (int)SendOrder.size(), (int)SendOrderLow.size());
  1144. res += buf;
  1145. TRequesterPendingDataStats pds;
  1146. GetPendingDataSize(&pds);
  1147. snprintf(buf, sizeof(buf), "Pending data size: %" PRIu64 "\n", pds.InpDataSize + pds.OutDataSize);
  1148. res += buf;
  1149. snprintf(buf, sizeof(buf), " in packets: %d, size %" PRIu64 "\n", pds.InpCount, pds.InpDataSize);
  1150. res += buf;
  1151. snprintf(buf, sizeof(buf), " out packets: %d, size %" PRIu64 "\n", pds.OutCount, pds.OutDataSize);
  1152. res += buf;
  1153. res += "\nCongestion info:\n";
  1154. res += GetPeerLinkDebug(CongestionTrack);
  1155. res += "\nCongestion info history:\n";
  1156. res += GetPeerLinkDebug(CongestionTrackHistory);
  1157. return res;
  1158. }
  1159. static void SendKill(const TNetSocket& s, const sockaddr_in6& toAddress) {
  1160. char buf[100];
  1161. char* pktData = buf + UDP_LOW_LEVEL_HEADER_SIZE;
  1162. Write(&pktData, (int)0);
  1163. Write(&pktData, (char)KILL);
  1164. Write(&pktData, KILL_PASSPHRASE1);
  1165. Write(&pktData, KILL_PASSPHRASE2);
  1166. s.SendTo(buf, (int)(pktData - buf), toAddress, FF_ALLOW_FRAG);
  1167. }
  1168. void TUdpHost::Kill(const TUdpAddress& addr) {
  1169. sockaddr_in6 target;
  1170. GetWinsockAddr(&target, addr);
  1171. SendKill(s, target);
  1172. }
  1173. TIntrusivePtr<IPeerQueueStats> TUdpHost::GetQueueStats(const TUdpAddress& addr) {
  1174. TQueueStatsHash::iterator zq = PeerQueueStats.find(addr);
  1175. if (zq != PeerQueueStats.end()) {
  1176. return zq->second.Get();
  1177. }
  1178. TPeerQueueStats* res = new TPeerQueueStats;
  1179. PeerQueueStats[addr] = res;
  1180. // attach to existing congestion tracker
  1181. TPeerLinkHash::iterator z;
  1182. z = CongestionTrack.find(addr);
  1183. if (z != CongestionTrack.end()) {
  1184. z->second.UdpCongestion->AttachQueueStats(res);
  1185. }
  1186. z = CongestionTrackHistory.find(addr);
  1187. if (z != CongestionTrackHistory.end()) {
  1188. z->second.UdpCongestion->AttachQueueStats(res);
  1189. }
  1190. return res;
  1191. }
  1192. //////////////////////////////////////////////////////////////////////////
  1193. TIntrusivePtr<IUdpHost> CreateUdpHost(int port) {
  1194. TIntrusivePtr<NNetlibaSocket::ISocket> socket = NNetlibaSocket::CreateBestRecvSocket();
  1195. socket->Open(port);
  1196. if (!socket->IsValid())
  1197. return nullptr;
  1198. return CreateUdpHost(socket);
  1199. }
  1200. TIntrusivePtr<IUdpHost> CreateUdpHost(const TIntrusivePtr<NNetlibaSocket::ISocket>& socket) {
  1201. if (!InitLocalIPList()) {
  1202. Y_ASSERT(0 && "Can not determine self IP address");
  1203. return nullptr;
  1204. }
  1205. TIntrusivePtr<TUdpHost> res = new TUdpHost;
  1206. if (!res->Start(socket))
  1207. return nullptr;
  1208. return res.Get();
  1209. }
  1210. void SetUdpMaxBandwidthPerIP(float f) {
  1211. f = Max(0.0f, f);
  1212. TCongestionControl::MaxPacketRate = f / UDP_PACKET_SIZE;
  1213. }
  1214. void SetUdpSlowStart(bool enable) {
  1215. TCongestionControl::StartWindowSize = enable ? 0.5f : 3;
  1216. }
  1217. void DisableIBDetection() {
  1218. IBDetection = false;
  1219. }
  1220. }