udp_http.cpp 57 KB


  1. #include "stdafx.h"
  2. #include "udp_http.h"
  3. #include "udp_client_server.h"
  4. #include "udp_socket.h"
  5. #include "cpu_affinity.h"
  6. #include <library/cpp/threading/atomic/bool.h>
  7. #include <util/system/hp_timer.h>
  8. #include <util/thread/lfqueue.h>
  9. #include <util/system/thread.h>
  10. #include <util/system/spinlock.h>
  11. #if !defined(_win_)
  12. #include <signal.h>
  13. #include <pthread.h>
  14. #endif
  15. #include "block_chain.h"
  16. #include <util/system/shmat.h>
  17. #include <atomic>
  18. namespace NNetliba {
  19. const float HTTP_TIMEOUT = 15.0f;
  20. const int MIN_SHARED_MEM_PACKET = 1000;
  21. static ::NAtomic::TBool PanicAttack;
  22. static std::atomic<NHPTimer::STime> LastHeartbeat;
  23. static std::atomic<double> HeartbeatTimeout;
  24. static int GetPacketSize(TRequest* req) {
  25. if (req && req->Data.Get())
  26. return req->Data->GetSize();
  27. return 0;
  28. }
  29. static bool IsLocalFast(const TUdpAddress& addr) {
  30. if (addr.IsIPv4()) {
  31. return IsLocalIPv4(addr.GetIPv4());
  32. } else {
  33. return IsLocalIPv6(addr.Network, addr.Interface);
  34. }
  35. }
  36. bool IsLocal(const TUdpAddress& addr) {
  37. InitLocalIPList();
  38. return IsLocalFast(addr);
  39. }
  40. TUdpHttpRequest::~TUdpHttpRequest() {
  41. }
  42. TUdpHttpResponse::~TUdpHttpResponse() {
  43. }
  44. class TRequesterUserQueueSizes: public TThrRefBase {
  45. public:
  46. TAtomic ReqCount, RespCount;
  47. TAtomic ReqQueueSize, RespQueueSize;
  48. TRequesterUserQueueSizes()
  49. : ReqCount(0)
  50. , RespCount(0)
  51. , ReqQueueSize(0)
  52. , RespQueueSize(0)
  53. {
  54. }
  55. };
  56. template <class T>
  57. void EraseList(TLockFreeQueue<T*>* data) {
  58. T* ptr = nullptr;
  59. while (data->Dequeue(&ptr)) {
  60. delete ptr;
  61. }
  62. }
  63. class TRequesterUserQueues: public TThrRefBase {
  64. TIntrusivePtr<TRequesterUserQueueSizes> QueueSizes;
  65. TLockFreeQueue<TUdpHttpRequest*> ReqList;
  66. TLockFreeQueue<TUdpHttpResponse*> ResponseList;
  67. TLockFreeStack<TGUID> CancelList, SendRequestAccList; // any order will do
  68. TMuxEvent AsyncEvent;
  69. void UpdateAsyncSignalState() {
  70. // not sure about this one. Idea is that AsyncEvent.Reset() is a memory barrier
  71. if (ReqList.IsEmpty() && ResponseList.IsEmpty() && CancelList.IsEmpty() && SendRequestAccList.IsEmpty()) {
  72. AsyncEvent.Reset();
  73. if (!ReqList.IsEmpty() || !ResponseList.IsEmpty() || !CancelList.IsEmpty() || !SendRequestAccList.IsEmpty())
  74. AsyncEvent.Signal();
  75. }
  76. }
  77. ~TRequesterUserQueues() override {
  78. EraseList(&ReqList);
  79. EraseList(&ResponseList);
  80. }
  81. public:
  82. TRequesterUserQueues(TRequesterUserQueueSizes* queueSizes)
  83. : QueueSizes(queueSizes)
  84. {
  85. }
  86. TUdpHttpRequest* GetRequest();
  87. TUdpHttpResponse* GetResponse();
  88. bool GetRequestCancel(TGUID* req) {
  89. bool res = CancelList.Dequeue(req);
  90. UpdateAsyncSignalState();
  91. return res;
  92. }
  93. bool GetSendRequestAcc(TGUID* req) {
  94. bool res = SendRequestAccList.Dequeue(req);
  95. UpdateAsyncSignalState();
  96. return res;
  97. }
  98. void AddRequest(TUdpHttpRequest* res) {
  99. AtomicAdd(QueueSizes->ReqCount, 1);
  100. AtomicAdd(QueueSizes->ReqQueueSize, GetPacketSize(res->DataHolder.Get()));
  101. ReqList.Enqueue(res);
  102. AsyncEvent.Signal();
  103. }
  104. void AddResponse(TUdpHttpResponse* res) {
  105. AtomicAdd(QueueSizes->RespCount, 1);
  106. AtomicAdd(QueueSizes->RespQueueSize, GetPacketSize(res->DataHolder.Get()));
  107. ResponseList.Enqueue(res);
  108. AsyncEvent.Signal();
  109. }
  110. void AddCancel(const TGUID& req) {
  111. CancelList.Enqueue(req);
  112. AsyncEvent.Signal();
  113. }
  114. void AddSendRequestAcc(const TGUID& req) {
  115. SendRequestAccList.Enqueue(req);
  116. AsyncEvent.Signal();
  117. }
  118. TMuxEvent& GetAsyncEvent() {
  119. return AsyncEvent;
  120. }
  121. void AsyncSignal() {
  122. AsyncEvent.Signal();
  123. }
  124. };
  125. struct TOutRequestState {
  126. enum EState {
  127. S_SENDING,
  128. S_WAITING,
  129. S_WAITING_PING_SENDING,
  130. S_WAITING_PING_SENT,
  131. S_CANCEL_AFTER_SENDING
  132. };
  133. EState State;
  134. TUdpAddress Address;
  135. double TimePassed;
  136. int PingTransferId;
  137. TIntrusivePtr<TRequesterUserQueues> UserQueues;
  138. TOutRequestState()
  139. : State(S_SENDING)
  140. , TimePassed(0)
  141. , PingTransferId(-1)
  142. {
  143. }
  144. };
  145. struct TInRequestState {
  146. enum EState {
  147. S_WAITING,
  148. S_RESPONSE_SENDING,
  149. S_CANCELED,
  150. };
  151. EState State;
  152. TUdpAddress Address;
  153. TInRequestState()
  154. : State(S_WAITING)
  155. {
  156. }
  157. TInRequestState(const TUdpAddress& address)
  158. : State(S_WAITING)
  159. , Address(address)
  160. {
  161. }
  162. };
  163. enum EHttpPacket {
  164. PKT_REQUEST,
  165. PKT_PING,
  166. PKT_PING_RESPONSE,
  167. PKT_RESPONSE,
  168. PKT_GETDEBUGINFO,
  169. PKT_LOCAL_REQUEST,
  170. PKT_LOCAL_RESPONSE,
  171. PKT_CANCEL,
  172. };
  173. class TUdpHttp: public IRequester {
  174. enum EDir {
  175. DIR_OUT,
  176. DIR_IN
  177. };
  178. struct TTransferPurpose {
  179. EDir Dir;
  180. TGUID Guid;
  181. TTransferPurpose()
  182. : Dir(DIR_OUT)
  183. {
  184. }
  185. TTransferPurpose(EDir dir, TGUID guid)
  186. : Dir(dir)
  187. , Guid(guid)
  188. {
  189. }
  190. };
  191. struct TSendRequest {
  192. TUdpAddress Addr;
  193. TAutoPtr<TRopeDataPacket> Data;
  194. TGUID ReqGuid;
  195. TIntrusivePtr<TWaitResponse> WR;
  196. TIntrusivePtr<TRequesterUserQueues> UserQueues;
  197. ui32 Crc32;
  198. TSendRequest()
  199. : Crc32(0)
  200. {
  201. }
  202. TSendRequest(const TUdpAddress& addr, TAutoPtr<TRopeDataPacket>* data, const TGUID& reqguid, TWaitResponse* wr, TRequesterUserQueues* userQueues)
  203. : Addr(addr)
  204. , Data(*data)
  205. , ReqGuid(reqguid)
  206. , WR(wr)
  207. , UserQueues(userQueues)
  208. , Crc32(CalcChecksum(Data->GetChain()))
  209. {
  210. }
  211. };
  212. struct TSendResponse {
  213. TVector<char> Data;
  214. TGUID ReqGuid;
  215. ui32 DataCrc32;
  216. EPacketPriority Priority;
  217. TSendResponse()
  218. : DataCrc32(0)
  219. , Priority(PP_NORMAL)
  220. {
  221. }
  222. TSendResponse(const TGUID& reqguid, EPacketPriority prior, TVector<char>* data)
  223. : ReqGuid(reqguid)
  224. , DataCrc32(0)
  225. , Priority(prior)
  226. {
  227. if (data && !data->empty()) {
  228. data->swap(Data);
  229. DataCrc32 = TIncrementalChecksumCalcer::CalcBlockSum(&Data[0], Data.ysize());
  230. }
  231. }
  232. };
  233. struct TCancelRequest {
  234. TGUID ReqGuid;
  235. TCancelRequest() = default;
  236. TCancelRequest(const TGUID& reqguid)
  237. : ReqGuid(reqguid)
  238. {
  239. }
  240. };
  241. struct TBreakRequest {
  242. TGUID ReqGuid;
  243. TBreakRequest() = default;
  244. TBreakRequest(const TGUID& reqguid)
  245. : ReqGuid(reqguid)
  246. {
  247. }
  248. };
  249. TThread myThread;
  250. bool KeepRunning, AbortTransactions;
  251. TSpinLock cs;
  252. TSystemEvent HasStarted;
  253. NHPTimer::STime PingsSendT;
  254. TIntrusivePtr<IUdpHost> Host;
  255. TIntrusivePtr<NNetlibaSocket::ISocket> Socket;
  256. typedef THashMap<TGUID, TOutRequestState, TGUIDHash> TOutRequestHash;
  257. typedef THashMap<TGUID, TInRequestState, TGUIDHash> TInRequestHash;
  258. TOutRequestHash OutRequests;
  259. TInRequestHash InRequests;
  260. typedef THashMap<int, TTransferPurpose> TTransferHash;
  261. TTransferHash TransferHash;
  262. typedef THashMap<TGUID, TIntrusivePtr<TWaitResponse>, TGUIDHash> TSyncRequests;
  263. TSyncRequests SyncRequests;
  264. // hold it here to not construct on every DoSends()
  265. typedef THashSet<TGUID, TGUIDHash> TAnticipateCancels;
  266. TAnticipateCancels AnticipateCancels;
  267. TLockFreeQueue<TSendRequest*> SendReqList;
  268. TLockFreeQueue<TSendResponse*> SendRespList;
  269. TLockFreeQueue<TCancelRequest> CancelReqList;
  270. TLockFreeQueue<TBreakRequest> BreakReqList;
  271. TIntrusivePtr<TRequesterUserQueueSizes> QueueSizes;
  272. TIntrusivePtr<TRequesterUserQueues> UserQueues;
  273. struct TStatsRequest: public TThrRefBase {
  274. enum EReq {
  275. PENDING_SIZE,
  276. DEBUG_INFO,
  277. HAS_IN_REQUEST,
  278. GET_PEER_ADDRESS,
  279. GET_PEER_QUEUE_STATS,
  280. };
  281. EReq Req;
  282. TRequesterPendingDataStats PendingDataSize;
  283. TString DebugInfo;
  284. TGUID RequestId;
  285. TUdpAddress PeerAddress;
  286. TIntrusivePtr<IPeerQueueStats> QueueStats;
  287. bool RequestFound;
  288. TSystemEvent Complete;
  289. TStatsRequest(EReq req)
  290. : Req(req)
  291. , RequestFound(false)
  292. {
  293. }
  294. };
  295. TLockFreeQueue<TIntrusivePtr<TStatsRequest>> StatsReqList;
  296. bool ReportRequestCancel;
  297. bool ReportSendRequestAcc;
  298. void FinishRequest(TOutRequestHash::iterator i, TUdpHttpResponse::EResult ok, TAutoPtr<TRequest> data, const char* error = nullptr) {
  299. TOutRequestState& s = i->second;
  300. TUdpHttpResponse* res = new TUdpHttpResponse;
  301. res->DataHolder = data;
  302. res->ReqId = i->first;
  303. res->PeerAddress = s.Address;
  304. res->Ok = ok;
  305. if (ok == TUdpHttpResponse::FAILED)
  306. res->Error = error ? error : "request failed";
  307. else if (ok == TUdpHttpResponse::CANCELED)
  308. res->Error = error ? error : "request cancelled";
  309. TSyncRequests::iterator k = SyncRequests.find(res->ReqId);
  310. if (k != SyncRequests.end()) {
  311. TIntrusivePtr<TWaitResponse>& wr = k->second;
  312. wr->SetResponse(res);
  313. SyncRequests.erase(k);
  314. } else {
  315. s.UserQueues->AddResponse(res);
  316. }
  317. OutRequests.erase(i);
  318. }
  319. int SendWithHighPriority(const TUdpAddress& addr, TAutoPtr<TRopeDataPacket> data) {
  320. ui32 crc32 = CalcChecksum(data->GetChain());
  321. return Host->Send(addr, data.Release(), crc32, nullptr, PP_HIGH);
  322. }
  323. void ProcessIncomingPackets() {
  324. TVector<TGUID, TCustomAllocator<TGUID>> failedRequests;
  325. for (;;) {
  326. TAutoPtr<TRequest> req = Host->GetRequest();
  327. if (req.Get() == nullptr) {
  328. if (!failedRequests.empty()) {
  329. // we want to handle following sequence of events
  330. // <- send ping
  331. // -> send response over IB
  332. // -> send ping response (no such request) over UDP
  333. // Now if we are lucky enough we can get IB response waiting in the IB receive queue
  334. // at the same time response sender will receive "send complete" from IB
  335. // indeed, IB delivered message (but it was not parsed by ib_cs.cpp yet)
  336. // so after receiving "send response complete" event resposne sender can legally response
  337. // to pings with "no such request"
  338. // but ping responses can be sent over UDP
  339. // So we can run into situation with negative ping response in
  340. // UDP receive queue and response waiting unprocessed in IB receive queue
  341. // to check that there is no response in the IB queue we have to process IB queues
  342. // so we call IBStep()
  343. Host->IBStep();
  344. req = Host->GetRequest();
  345. if (req.Get() == nullptr) {
  346. break;
  347. }
  348. } else {
  349. break;
  350. }
  351. }
  352. TBlockChainIterator reqData(req->Data->GetChain());
  353. char pktType;
  354. reqData.Read(&pktType, 1);
  355. switch (pktType) {
  356. case PKT_REQUEST:
  357. case PKT_LOCAL_REQUEST: {
  358. //printf("recv PKT_REQUEST or PKT_LOCAL_REQUEST\n");
  359. TGUID reqId = req->Guid;
  360. TInRequestHash::iterator z = InRequests.find(reqId);
  361. if (z != InRequests.end()) {
  362. // oops, this request already exists!
  363. // might happen if request can be stored in single packet
  364. // and this packet had source IP broken during transmission and managed to pass crc checks
  365. // since we already reported wrong source address for this request to the user
  366. // the best thing we can do is to stop the program to avoid further complications
  367. // but we just report the accident to stderr
  368. fprintf(stderr, "Jackpot, same request %s received twice from %s and earlier from %s\n",
  369. GetGuidAsString(reqId).c_str(), GetAddressAsString(z->second.Address).c_str(),
  370. GetAddressAsString(req->Address).c_str());
  371. } else {
  372. InRequests[reqId] = TInRequestState(req->Address);
  373. //printf("InReq %s PKT_REQUEST recv ... -> S_WAITING\n", GetGuidAsString(reqId).c_str());
  374. TUdpHttpRequest* res = new TUdpHttpRequest;
  375. res->ReqId = reqId;
  376. res->PeerAddress = req->Address;
  377. res->DataHolder = req;
  378. UserQueues->AddRequest(res);
  379. }
  380. } break;
  381. case PKT_PING: {
  382. //printf("recv PKT_PING\n");
  383. TGUID guid;
  384. reqData.Read(&guid, sizeof(guid));
  385. bool ok = InRequests.find(guid) != InRequests.end();
  386. TAutoPtr<TRopeDataPacket> ms = new TRopeDataPacket;
  387. ms->Write((char)PKT_PING_RESPONSE);
  388. ms->Write(guid);
  389. ms->Write(ok);
  390. SendWithHighPriority(req->Address, ms.Release());
  391. //printf("InReq %s PKT_PING recv Sending PKT_PING_RESPONSE\n", GetGuidAsString(guid).c_str());
  392. //printf("got PKT_PING, responding %d\n", (int)ok);
  393. } break;
  394. case PKT_PING_RESPONSE: {
  395. //printf("recv PKT_PING_RESPONSE\n");
  396. TGUID guid;
  397. bool ok;
  398. reqData.Read(&guid, sizeof(guid));
  399. reqData.Read(&ok, sizeof(ok));
  400. TOutRequestHash::iterator i = OutRequests.find(guid);
  401. if (i == OutRequests.end()) {
  402. ; //Y_ASSERT(0); // actually possible with some packet orders
  403. } else {
  404. if (!ok) {
  405. // can not delete request at this point
  406. // since we can receive failed ping and response at the same moment
  407. // consider sequence: client sends ping, server sends response
  408. // and replies false to ping as reply is sent
  409. // we can not receive failed ping_response earlier then response itself
  410. // but we can receive them simultaneously
  411. failedRequests.push_back(guid);
  412. //printf("OutReq %s PKT_PING_RESPONSE recv no such query -> failed\n", GetGuidAsString(guid).c_str());
  413. } else {
  414. TOutRequestState& s = i->second;
  415. switch (s.State) {
  416. case TOutRequestState::S_WAITING_PING_SENDING: {
  417. Y_ASSERT(s.PingTransferId >= 0);
  418. TTransferHash::iterator k = TransferHash.find(s.PingTransferId);
  419. if (k != TransferHash.end())
  420. TransferHash.erase(k);
  421. else
  422. Y_ASSERT(0);
  423. s.PingTransferId = -1;
  424. s.TimePassed = 0;
  425. s.State = TOutRequestState::S_WAITING;
  426. //printf("OutReq %s PKT_PING_RESPONSE recv S_WAITING_PING_SENDING -> S_WAITING\n", GetGuidAsString(guid).c_str());
  427. } break;
  428. case TOutRequestState::S_WAITING_PING_SENT:
  429. s.TimePassed = 0;
  430. s.State = TOutRequestState::S_WAITING;
  431. //printf("OutReq %s PKT_PING_RESPONSE recv S_WAITING_PING_SENT -> S_WAITING\n", GetGuidAsString(guid).c_str());
  432. break;
  433. default:
  434. Y_ASSERT(0);
  435. break;
  436. }
  437. }
  438. }
  439. } break;
  440. case PKT_RESPONSE:
  441. case PKT_LOCAL_RESPONSE: {
  442. //printf("recv PKT_RESPONSE or PKT_LOCAL_RESPONSE\n");
  443. TGUID guid;
  444. reqData.Read(&guid, sizeof(guid));
  445. TOutRequestHash::iterator i = OutRequests.find(guid);
  446. if (i == OutRequests.end()) {
  447. ; //Y_ASSERT(0); // does happen
  448. //printf("OutReq %s PKT_RESPONSE recv for non-existing req\n", GetGuidAsString(guid).c_str());
  449. } else {
  450. FinishRequest(i, TUdpHttpResponse::OK, req);
  451. //printf("OutReq %s PKT_RESPONSE recv ... -> ok\n", GetGuidAsString(guid).c_str());
  452. }
  453. } break;
  454. case PKT_CANCEL: {
  455. //printf("recv PKT_CANCEL\n");
  456. TGUID guid;
  457. reqData.Read(&guid, sizeof(guid));
  458. TInRequestHash::iterator i = InRequests.find(guid);
  459. if (i == InRequests.end()) {
  460. ; //Y_ASSERT(0); // may happen
  461. //printf("InReq %s PKT_CANCEL recv for non-existing req\n", GetGuidAsString(guid).c_str());
  462. } else {
  463. TInRequestState& s = i->second;
  464. if (s.State != TInRequestState::S_CANCELED && ReportRequestCancel)
  465. UserQueues->AddCancel(guid);
  466. s.State = TInRequestState::S_CANCELED;
  467. //printf("InReq %s PKT_CANCEL recv\n", GetGuidAsString(guid).c_str());
  468. }
  469. } break;
  470. case PKT_GETDEBUGINFO: {
  471. //printf("recv PKT_GETDEBUGINFO\n");
  472. TString dbgInfo = GetDebugInfoLocked();
  473. TAutoPtr<TRopeDataPacket> ms = new TRopeDataPacket;
  474. ms->Write(dbgInfo.c_str(), (int)dbgInfo.size());
  475. SendWithHighPriority(req->Address, ms);
  476. } break;
  477. default:
  478. Y_ASSERT(0);
  479. }
  480. }
  481. // cleanup failed requests
  482. for (size_t k = 0; k < failedRequests.size(); ++k) {
  483. const TGUID& guid = failedRequests[k];
  484. TOutRequestHash::iterator i = OutRequests.find(guid);
  485. if (i != OutRequests.end())
  486. FinishRequest(i, TUdpHttpResponse::FAILED, nullptr, "request failed: recv no such query");
  487. }
  488. }
  489. void AnalyzeSendResults() {
  490. TSendResult res;
  491. while (Host->GetSendResult(&res)) {
  492. //printf("Send result received\n");
  493. TTransferHash::iterator k1 = TransferHash.find(res.TransferId);
  494. if (k1 != TransferHash.end()) {
  495. const TTransferPurpose& tp = k1->second;
  496. switch (tp.Dir) {
  497. case DIR_OUT: {
  498. TOutRequestHash::iterator i = OutRequests.find(tp.Guid);
  499. if (i != OutRequests.end()) {
  500. const TGUID& reqId = i->first;
  501. TOutRequestState& s = i->second;
  502. switch (s.State) {
  503. case TOutRequestState::S_SENDING:
  504. if (!res.Success) {
  505. FinishRequest(i, TUdpHttpResponse::FAILED, nullptr, "request failed: state S_SENDING");
  506. //printf("OutReq %s AnalyzeSendResults() S_SENDING -> failed\n", GetGuidAsString(reqId).c_str());
  507. } else {
  508. if (ReportSendRequestAcc) {
  509. if (s.UserQueues.Get()) {
  510. s.UserQueues->AddSendRequestAcc(reqId);
  511. } else {
  512. // waitable request?
  513. TSyncRequests::iterator k2 = SyncRequests.find(reqId);
  514. if (k2 != SyncRequests.end()) {
  515. TIntrusivePtr<TWaitResponse>& wr = k2->second;
  516. wr->SetRequestSent();
  517. }
  518. }
  519. }
  520. s.State = TOutRequestState::S_WAITING;
  521. //printf("OutReq %s AnalyzeSendResults() S_SENDING -> S_WAITING\n", GetGuidAsString(reqId).c_str());
  522. s.TimePassed = 0;
  523. }
  524. break;
  525. case TOutRequestState::S_CANCEL_AFTER_SENDING:
  526. DoSendCancel(s.Address, reqId);
  527. FinishRequest(i, TUdpHttpResponse::CANCELED, nullptr, "request failed: state S_CANCEL_AFTER_SENDING");
  528. break;
  529. case TOutRequestState::S_WAITING:
  530. case TOutRequestState::S_WAITING_PING_SENT:
  531. Y_ASSERT(0);
  532. break;
  533. case TOutRequestState::S_WAITING_PING_SENDING:
  534. Y_ASSERT(s.PingTransferId >= 0 && s.PingTransferId == res.TransferId);
  535. if (!res.Success) {
  536. FinishRequest(i, TUdpHttpResponse::FAILED, nullptr, "request failed: state S_WAITING_PING_SENDING");
  537. //printf("OutReq %s AnalyzeSendResults() S_WAITING_PING_SENDING -> failed\n", GetGuidAsString(reqId).c_str());
  538. } else {
  539. s.PingTransferId = -1;
  540. s.State = TOutRequestState::S_WAITING_PING_SENT;
  541. //printf("OutReq %s AnalyzeSendResults() S_WAITING_PING_SENDING -> S_WAITING_PING_SENT\n", GetGuidAsString(reqId).c_str());
  542. s.TimePassed = 0;
  543. }
  544. break;
  545. default:
  546. Y_ASSERT(0);
  547. break;
  548. }
  549. }
  550. } break;
  551. case DIR_IN: {
  552. TInRequestHash::iterator i = InRequests.find(tp.Guid);
  553. if (i != InRequests.end()) {
  554. Y_ASSERT(i->second.State == TInRequestState::S_RESPONSE_SENDING || i->second.State == TInRequestState::S_CANCELED);
  555. InRequests.erase(i);
  556. //if (res.Success)
  557. // printf("InReq %s AnalyzeSendResults() ... -> finished\n", GetGuidAsString(tp.Guid).c_str());
  558. //else
  559. // printf("InReq %s AnalyzeSendResults() ... -> failed response send\n", GetGuidAsString(tp.Guid).c_str());
  560. }
  561. } break;
  562. default:
  563. Y_ASSERT(0);
  564. break;
  565. }
  566. TransferHash.erase(k1);
  567. }
  568. }
  569. }
  570. void SendPingsIfNeeded() {
  571. NHPTimer::STime tChk = PingsSendT;
  572. float deltaT = (float)NHPTimer::GetTimePassed(&tChk);
  573. if (deltaT < 0.05) {
  574. return;
  575. }
  576. PingsSendT = tChk;
  577. deltaT = ClampVal(deltaT, 0.0f, HTTP_TIMEOUT / 3);
  578. {
  579. for (TOutRequestHash::iterator i = OutRequests.begin(); i != OutRequests.end();) {
  580. TOutRequestHash::iterator curIt = i++;
  581. TOutRequestState& s = curIt->second;
  582. const TGUID& guid = curIt->first;
  583. switch (s.State) {
  584. case TOutRequestState::S_WAITING:
  585. s.TimePassed += deltaT;
  586. if (s.TimePassed > HTTP_TIMEOUT) {
  587. TAutoPtr<TRopeDataPacket> ms = new TRopeDataPacket;
  588. ms->Write((char)PKT_PING);
  589. ms->Write(guid);
  590. int transId = SendWithHighPriority(s.Address, ms.Release());
  591. TransferHash[transId] = TTransferPurpose(DIR_OUT, guid);
  592. s.State = TOutRequestState::S_WAITING_PING_SENDING;
  593. //printf("OutReq %s SendPingsIfNeeded() S_WAITING -> S_WAITING_PING_SENDING\n", GetGuidAsString(guid).c_str());
  594. s.PingTransferId = transId;
  595. }
  596. break;
  597. case TOutRequestState::S_WAITING_PING_SENT:
  598. s.TimePassed += deltaT;
  599. if (s.TimePassed > HTTP_TIMEOUT) {
  600. //printf("OutReq %s SendPingsIfNeeded() S_WAITING_PING_SENT -> failed\n", GetGuidAsString(guid).c_str());
  601. FinishRequest(curIt, TUdpHttpResponse::FAILED, nullptr, "request failed: http timeout in state S_WAITING_PING_SENT");
  602. }
  603. break;
  604. default:
  605. break;
  606. }
  607. }
  608. }
  609. }
  610. void Step() {
  611. {
  612. TGuard<TSpinLock> lock(cs);
  613. DoSends();
  614. }
  615. Host->Step();
  616. for (TIntrusivePtr<TStatsRequest> req; StatsReqList.Dequeue(&req);) {
  617. switch (req->Req) {
  618. case TStatsRequest::PENDING_SIZE:
  619. Host->GetPendingDataSize(&req->PendingDataSize);
  620. break;
  621. case TStatsRequest::DEBUG_INFO: {
  622. TGuard<TSpinLock> lock(cs);
  623. req->DebugInfo = GetDebugInfoLocked();
  624. } break;
  625. case TStatsRequest::HAS_IN_REQUEST: {
  626. TGuard<TSpinLock> lock(cs);
  627. req->RequestFound = (InRequests.find(req->RequestId) != InRequests.end());
  628. } break;
  629. case TStatsRequest::GET_PEER_ADDRESS: {
  630. TGuard<TSpinLock> lock(cs);
  631. TInRequestHash::const_iterator i = InRequests.find(req->RequestId);
  632. if (i != InRequests.end()) {
  633. req->PeerAddress = i->second.Address;
  634. } else {
  635. TOutRequestHash::const_iterator o = OutRequests.find(req->RequestId);
  636. if (o != OutRequests.end()) {
  637. req->PeerAddress = o->second.Address;
  638. } else {
  639. req->PeerAddress = TUdpAddress();
  640. }
  641. }
  642. } break;
  643. case TStatsRequest::GET_PEER_QUEUE_STATS:
  644. req->QueueStats = Host->GetQueueStats(req->PeerAddress);
  645. break;
  646. default:
  647. Y_ASSERT(0);
  648. break;
  649. }
  650. req->Complete.Signal();
  651. }
  652. {
  653. TGuard<TSpinLock> lock(cs);
  654. DoSends();
  655. ProcessIncomingPackets();
  656. AnalyzeSendResults();
  657. SendPingsIfNeeded();
  658. }
  659. }
  660. void Wait() {
  661. Host->Wait(0.1f);
  662. }
  663. void DoSendCancel(const TUdpAddress& addr, const TGUID& req) {
  664. TAutoPtr<TRopeDataPacket> ms = new TRopeDataPacket;
  665. ms->Write((char)PKT_CANCEL);
  666. ms->Write(req);
  667. SendWithHighPriority(addr, ms);
  668. }
  669. void DoSends() {
  670. {
  671. TBreakRequest rb;
  672. while (BreakReqList.Dequeue(&rb)) {
  673. InRequests.erase(rb.ReqGuid);
  674. }
  675. }
  676. {
  677. // cancelling requests
  678. TCancelRequest rc;
  679. while (CancelReqList.Dequeue(&rc)) {
  680. TOutRequestHash::iterator i = OutRequests.find(rc.ReqGuid);
  681. if (i == OutRequests.end()) {
  682. AnticipateCancels.insert(rc.ReqGuid);
  683. continue; // cancelling non existing request is ok
  684. }
  685. TOutRequestState& s = i->second;
  686. if (s.State == TOutRequestState::S_SENDING) {
  687. // we are in trouble - have not sent request and we already have to cancel it, wait send
  688. s.State = TOutRequestState::S_CANCEL_AFTER_SENDING;
  689. } else {
  690. DoSendCancel(s.Address, rc.ReqGuid);
  691. FinishRequest(i, TUdpHttpResponse::CANCELED, nullptr, "request canceled: notify requested side");
  692. }
  693. }
  694. }
  695. {
  696. // sending replies
  697. for (TSendResponse* rd = nullptr; SendRespList.Dequeue(&rd); delete rd) {
  698. TInRequestHash::iterator i = InRequests.find(rd->ReqGuid);
  699. if (i == InRequests.end()) {
  700. Y_ASSERT(0);
  701. continue;
  702. }
  703. TInRequestState& s = i->second;
  704. if (s.State == TInRequestState::S_CANCELED) {
  705. // need not send response for the canceled request
  706. InRequests.erase(i);
  707. continue;
  708. }
  709. Y_ASSERT(s.State == TInRequestState::S_WAITING);
  710. s.State = TInRequestState::S_RESPONSE_SENDING;
  711. //printf("InReq %s SendResponse() ... -> S_RESPONSE_SENDING (pkt %s)\n", GetGuidAsString(reqId).c_str(), GetGuidAsString(lowPktGuid).c_str());
  712. TAutoPtr<TRopeDataPacket> ms = new TRopeDataPacket;
  713. ui32 crc32 = 0;
  714. int dataSize = rd->Data.ysize();
  715. if (rd->Data.ysize() > MIN_SHARED_MEM_PACKET && IsLocalFast(s.Address)) {
  716. TIntrusivePtr<TSharedMemory> shm = new TSharedMemory;
  717. if (shm->Create(dataSize)) {
  718. ms->Write((char)PKT_LOCAL_RESPONSE);
  719. ms->Write(rd->ReqGuid);
  720. memcpy(shm->GetPtr(), &rd->Data[0], dataSize);
  721. TVector<char> empty;
  722. rd->Data.swap(empty);
  723. ms->AttachSharedData(shm);
  724. crc32 = CalcChecksum(ms->GetChain());
  725. }
  726. }
  727. if (ms->GetSharedData() == nullptr) {
  728. ms->Write((char)PKT_RESPONSE);
  729. ms->Write(rd->ReqGuid);
  730. // to offload crc calcs from inner thread, crc of data[] is calced outside and passed in DataCrc32
  731. // this means that we are calculating crc when shared memory is used
  732. // it is hard to avoid since in SendResponse() we don't know if shared mem will be used (peer address is not available there)
  733. TIncrementalChecksumCalcer csCalcer;
  734. AddChain(&csCalcer, ms->GetChain());
  735. // here we are replicating the way WriteDestructive serializes data
  736. csCalcer.AddBlock(&dataSize, sizeof(dataSize));
  737. csCalcer.AddBlockSum(rd->DataCrc32, dataSize);
  738. crc32 = csCalcer.CalcChecksum();
  739. ms->WriteDestructive(&rd->Data);
  740. //ui32 chkCrc = CalcChecksum(ms->GetChain()); // can not use since its slow for large responses
  741. //Y_ASSERT(chkCrc == crc32);
  742. }
  743. int transId = Host->Send(s.Address, ms.Release(), crc32, nullptr, rd->Priority);
  744. TransferHash[transId] = TTransferPurpose(DIR_IN, rd->ReqGuid);
  745. }
  746. }
  747. {
  748. // sending requests
  749. for (TSendRequest* rd = nullptr; SendReqList.Dequeue(&rd); delete rd) {
  750. Y_ASSERT(OutRequests.find(rd->ReqGuid) == OutRequests.end());
  751. {
  752. TOutRequestState& s = OutRequests[rd->ReqGuid];
  753. s.State = TOutRequestState::S_SENDING;
  754. s.Address = rd->Addr;
  755. s.UserQueues = rd->UserQueues;
  756. //printf("OutReq %s SendRequest() ... -> S_SENDING\n", GetGuidAsString(guid).c_str());
  757. }
  758. if (rd->WR.Get())
  759. SyncRequests[rd->ReqGuid] = rd->WR;
  760. if (AnticipateCancels.find(rd->ReqGuid) != AnticipateCancels.end()) {
  761. FinishRequest(OutRequests.find(rd->ReqGuid), TUdpHttpResponse::CANCELED, nullptr, "request canceled before transmitting");
  762. } else {
  763. TGUID pktGuid = rd->ReqGuid; // request packet id should match request id
  764. int transId = Host->Send(rd->Addr, rd->Data.Release(), rd->Crc32, &pktGuid, PP_NORMAL);
  765. TransferHash[transId] = TTransferPurpose(DIR_OUT, rd->ReqGuid);
  766. }
  767. }
  768. }
  769. if (!AnticipateCancels.empty()) {
  770. AnticipateCancels.clear();
  771. }
  772. }
  773. public:
  774. void SendRequestImpl(const TUdpAddress& addr, const TString& url, TVector<char>* data, const TGUID& reqId,
  775. TWaitResponse* wr, TRequesterUserQueues* userQueues) {
  776. if (data && data->size() > MAX_PACKET_SIZE) {
  777. Y_ABORT_UNLESS(0, "data size is too large");
  778. }
  779. //printf("SendRequest(%s)\n", url.c_str());
  780. if (wr)
  781. wr->SetReqId(reqId);
  782. TAutoPtr<TRopeDataPacket> ms = new TRopeDataPacket;
  783. if (data && data->ysize() > MIN_SHARED_MEM_PACKET && IsLocalFast(addr)) {
  784. int dataSize = data->ysize();
  785. TIntrusivePtr<TSharedMemory> shm = new TSharedMemory;
  786. if (shm->Create(dataSize)) {
  787. ms->Write((char)PKT_LOCAL_REQUEST);
  788. ms->WriteStroka(url);
  789. memcpy(shm->GetPtr(), &(*data)[0], dataSize);
  790. TVector<char> empty;
  791. data->swap(empty);
  792. ms->AttachSharedData(shm);
  793. }
  794. }
  795. if (ms->GetSharedData() == nullptr) {
  796. ms->Write((char)PKT_REQUEST);
  797. ms->WriteStroka(url);
  798. ms->WriteDestructive(data);
  799. }
  800. SendReqList.Enqueue(new TSendRequest(addr, &ms, reqId, wr, userQueues));
  801. Host->CancelWait();
  802. }
  803. void SendRequest(const TUdpAddress& addr, const TString& url, TVector<char>* data, const TGUID& reqId) override {
  804. SendRequestImpl(addr, url, data, reqId, nullptr, UserQueues.Get());
  805. }
  806. void CancelRequest(const TGUID& reqId) override {
  807. CancelReqList.Enqueue(TCancelRequest(reqId));
  808. Host->CancelWait();
  809. }
  810. void BreakRequest(const TGUID& reqId) override {
  811. BreakReqList.Enqueue(TBreakRequest(reqId));
  812. Host->CancelWait();
  813. }
  814. void SendResponseImpl(const TGUID& reqId, EPacketPriority prior, TVector<char>* data) // non-virtual, for direct call from TRequestOps
  815. {
  816. if (data && data->size() > MAX_PACKET_SIZE) {
  817. Y_ABORT_UNLESS(0, "data size is too large");
  818. }
  819. SendRespList.Enqueue(new TSendResponse(reqId, prior, data));
  820. Host->CancelWait();
  821. }
  822. void SendResponse(const TGUID& reqId, TVector<char>* data) override {
  823. SendResponseImpl(reqId, PP_NORMAL, data);
  824. }
  825. void SendResponseLowPriority(const TGUID& reqId, TVector<char>* data) override {
  826. SendResponseImpl(reqId, PP_LOW, data);
  827. }
  828. TUdpHttpRequest* GetRequest() override {
  829. return UserQueues->GetRequest();
  830. }
  831. TUdpHttpResponse* GetResponse() override {
  832. return UserQueues->GetResponse();
  833. }
  834. bool GetRequestCancel(TGUID* req) override {
  835. return UserQueues->GetRequestCancel(req);
  836. }
  837. bool GetSendRequestAcc(TGUID* req) override {
  838. return UserQueues->GetSendRequestAcc(req);
  839. }
  840. TUdpHttpResponse* Request(const TUdpAddress& addr, const TString& url, TVector<char>* data) override {
  841. TIntrusivePtr<TWaitResponse> wr = WaitableRequest(addr, url, data);
  842. wr->Wait();
  843. return wr->GetResponse();
  844. }
  845. TIntrusivePtr<TWaitResponse> WaitableRequest(const TUdpAddress& addr, const TString& url, TVector<char>* data) override {
  846. TIntrusivePtr<TWaitResponse> wr = new TWaitResponse;
  847. TGUID reqId;
  848. CreateGuid(&reqId);
  849. SendRequestImpl(addr, url, data, reqId, wr.Get(), nullptr);
  850. return wr;
  851. }
  852. TMuxEvent& GetAsyncEvent() override {
  853. return UserQueues->GetAsyncEvent();
  854. }
  855. int GetPort() override {
  856. return Socket.Get() ? Socket->GetPort() : 0;
  857. }
  858. void StopNoWait() override {
  859. AbortTransactions = true;
  860. KeepRunning = false;
  861. UserQueues->AsyncSignal();
  862. // calcel all outgoing requests
  863. TGuard<TSpinLock> lock(cs);
  864. while (!OutRequests.empty()) {
  865. // cancel without informing peer that we are cancelling the request
  866. FinishRequest(OutRequests.begin(), TUdpHttpResponse::CANCELED, nullptr, "request canceled: inside TUdpHttp::StopNoWait()");
  867. }
  868. }
  869. void ExecStatsRequest(TIntrusivePtr<TStatsRequest> req) {
  870. StatsReqList.Enqueue(req);
  871. Host->CancelWait();
  872. req->Complete.Wait();
  873. }
  874. TUdpAddress GetPeerAddress(const TGUID& reqId) override {
  875. TIntrusivePtr<TStatsRequest> req = new TStatsRequest(TStatsRequest::GET_PEER_ADDRESS);
  876. req->RequestId = reqId;
  877. ExecStatsRequest(req);
  878. return req->PeerAddress;
  879. }
  880. void GetPendingDataSize(TRequesterPendingDataStats* res) override {
  881. TIntrusivePtr<TStatsRequest> req = new TStatsRequest(TStatsRequest::PENDING_SIZE);
  882. ExecStatsRequest(req);
  883. *res = req->PendingDataSize;
  884. }
  885. bool HasRequest(const TGUID& reqId) override {
  886. TIntrusivePtr<TStatsRequest> req = new TStatsRequest(TStatsRequest::HAS_IN_REQUEST);
  887. req->RequestId = reqId;
  888. ExecStatsRequest(req);
  889. return req->RequestFound;
  890. }
  891. private:
  892. void FinishOutstandingTransactions() {
  893. // wait all pending requests, all new requests are canceled
  894. while ((!OutRequests.empty() || !InRequests.empty() || !SendRespList.IsEmpty() || !SendReqList.IsEmpty()) && !PanicAttack) {
  895. while (TUdpHttpRequest* req = GetRequest()) {
  896. TInRequestHash::iterator i = InRequests.find(req->ReqId);
  897. //printf("dropping request(%s) (thread %d)\n", req->Url.c_str(), ThreadId());
  898. delete req;
  899. if (i == InRequests.end()) {
  900. Y_ASSERT(0);
  901. continue;
  902. }
  903. InRequests.erase(i);
  904. }
  905. Step();
  906. sleep(0);
  907. }
  908. }
  909. static void* ExecServerThread(void* param) {
  910. BindToSocket(0);
  911. SetHighestThreadPriority();
  912. TUdpHttp* pThis = (TUdpHttp*)param;
  913. pThis->Host = CreateUdpHost(pThis->Socket);
  914. pThis->HasStarted.Signal();
  915. if (!pThis->Host) {
  916. pThis->Socket.Drop();
  917. return nullptr;
  918. }
  919. NHPTimer::GetTime(&pThis->PingsSendT);
  920. while (pThis->KeepRunning && !PanicAttack) {
  921. if (HeartbeatTimeout.load(std::memory_order_acquire) > 0) {
  922. NHPTimer::STime chk = LastHeartbeat.load(std::memory_order_acquire);
  923. double passed = NHPTimer::GetTimePassed(&chk);
  924. if (passed > HeartbeatTimeout.load(std::memory_order_acquire)) {
  925. StopAllNetLibaThreads();
  926. fprintf(stderr, "%s\tTUdpHttp\tWaiting for %0.2f, time limit %0.2f, commit a suicide!11\n", Now().ToStringUpToSeconds().c_str(), passed, HeartbeatTimeout.load(std::memory_order_acquire));
  927. fflush(stderr);
  928. #ifndef _win_
  929. killpg(0, SIGKILL);
  930. #endif
  931. abort();
  932. break;
  933. }
  934. }
  935. pThis->Step();
  936. pThis->Wait();
  937. }
  938. if (!pThis->AbortTransactions && !PanicAttack)
  939. pThis->FinishOutstandingTransactions();
  940. pThis->Host = nullptr;
  941. return nullptr;
  942. }
  943. ~TUdpHttp() override {
  944. if (myThread.Running()) {
  945. KeepRunning = false;
  946. myThread.Join();
  947. }
  948. for (TIntrusivePtr<TStatsRequest> req; StatsReqList.Dequeue(&req);) {
  949. req->Complete.Signal();
  950. }
  951. }
  952. public:
  953. TUdpHttp()
  954. : myThread(TThread::TParams(ExecServerThread, (void*)this).SetName("nl6_udp_host"))
  955. , KeepRunning(true)
  956. , AbortTransactions(false)
  957. , PingsSendT(0)
  958. , ReportRequestCancel(false)
  959. , ReportSendRequestAcc(false)
  960. {
  961. NHPTimer::GetTime(&PingsSendT);
  962. QueueSizes = new TRequesterUserQueueSizes;
  963. UserQueues = new TRequesterUserQueues(QueueSizes.Get());
  964. }
  965. bool Start(const TIntrusivePtr<NNetlibaSocket::ISocket>& socket) {
  966. Y_ASSERT(Host.Get() == nullptr);
  967. Socket = socket;
  968. myThread.Start();
  969. HasStarted.Wait();
  970. if (Host.Get()) {
  971. return true;
  972. }
  973. Socket.Drop();
  974. return false;
  975. }
  976. TString GetDebugInfoLocked() {
  977. TString res = KeepRunning ? "State: running\n" : "State: stopping\n";
  978. res += Host->GetDebugInfo();
  979. char buf[1000];
  980. TRequesterUserQueueSizes* qs = QueueSizes.Get();
  981. snprintf(buf, sizeof(buf), "\nRequest queue %d (%d bytes)\n", (int)AtomicGet(qs->ReqCount), (int)AtomicGet(qs->ReqQueueSize));
  982. res += buf;
  983. snprintf(buf, sizeof(buf), "Response queue %d (%d bytes)\n", (int)AtomicGet(qs->RespCount), (int)AtomicGet(qs->RespQueueSize));
  984. res += buf;
  985. const char* outReqStateNames[] = {
  986. "S_SENDING",
  987. "S_WAITING",
  988. "S_WAITING_PING_SENDING",
  989. "S_WAITING_PING_SENT",
  990. "S_CANCEL_AFTER_SENDING"};
  991. const char* inReqStateNames[] = {
  992. "S_WAITING",
  993. "S_RESPONSE_SENDING",
  994. "S_CANCELED"};
  995. res += "\nOut requests:\n";
  996. for (TOutRequestHash::const_iterator i = OutRequests.begin(); i != OutRequests.end(); ++i) {
  997. const TGUID& gg = i->first;
  998. const TOutRequestState& s = i->second;
  999. bool isSync = SyncRequests.find(gg) != SyncRequests.end();
  1000. snprintf(buf, sizeof(buf), "%s\t%s %s TimePassed: %g %s\n",
  1001. GetAddressAsString(s.Address).c_str(), GetGuidAsString(gg).c_str(), outReqStateNames[s.State],
  1002. s.TimePassed * 1000,
  1003. isSync ? "isSync" : "");
  1004. res += buf;
  1005. }
  1006. res += "\nIn requests:\n";
  1007. for (TInRequestHash::const_iterator i = InRequests.begin(); i != InRequests.end(); ++i) {
  1008. const TGUID& gg = i->first;
  1009. const TInRequestState& s = i->second;
  1010. snprintf(buf, sizeof(buf), "%s\t%s %s\n",
  1011. GetAddressAsString(s.Address).c_str(), GetGuidAsString(gg).c_str(), inReqStateNames[s.State]);
  1012. res += buf;
  1013. }
  1014. return res;
  1015. }
  1016. TString GetDebugInfo() override {
  1017. TIntrusivePtr<TStatsRequest> req = new TStatsRequest(TStatsRequest::DEBUG_INFO);
  1018. ExecStatsRequest(req);
  1019. return req->DebugInfo;
  1020. }
  1021. void GetRequestQueueSize(TRequesterQueueStats* res) override {
  1022. TRequesterUserQueueSizes* qs = QueueSizes.Get();
  1023. res->ReqCount = (int)AtomicGet(qs->ReqCount);
  1024. res->RespCount = (int)AtomicGet(qs->RespCount);
  1025. res->ReqQueueSize = (int)AtomicGet(qs->ReqQueueSize);
  1026. res->RespQueueSize = (int)AtomicGet(qs->RespQueueSize);
  1027. }
  1028. TRequesterUserQueueSizes* GetQueueSizes() const {
  1029. return QueueSizes.Get();
  1030. }
  1031. IRequestOps* CreateSubRequester() override;
  1032. void EnableReportRequestCancel() override {
  1033. ReportRequestCancel = true;
  1034. }
  1035. void EnableReportSendRequestAcc() override {
  1036. ReportSendRequestAcc = true;
  1037. }
  1038. TIntrusivePtr<IPeerQueueStats> GetQueueStats(const TUdpAddress& addr) override {
  1039. TIntrusivePtr<TStatsRequest> req = new TStatsRequest(TStatsRequest::GET_PEER_QUEUE_STATS);
  1040. req->PeerAddress = addr;
  1041. ExecStatsRequest(req);
  1042. return req->QueueStats;
  1043. }
  1044. };
  1045. //////////////////////////////////////////////////////////////////////////
  1046. static void ReadShm(TSharedMemory* shm, TVector<char>* data) {
  1047. Y_ASSERT(shm);
  1048. int dataSize = shm->GetSize();
  1049. data->yresize(dataSize);
  1050. memcpy(&(*data)[0], shm->GetPtr(), dataSize);
  1051. }
  1052. static void LoadRequestData(TUdpHttpRequest* res) {
  1053. if (!res)
  1054. return;
  1055. {
  1056. TBlockChainIterator reqData(res->DataHolder->Data->GetChain());
  1057. char pktType;
  1058. reqData.Read(&pktType, 1);
  1059. ReadArr(&reqData, &res->Url);
  1060. if (pktType == PKT_REQUEST) {
  1061. ReadYArr(&reqData, &res->Data);
  1062. } else if (pktType == PKT_LOCAL_REQUEST) {
  1063. ReadShm(res->DataHolder->Data->GetSharedData(), &res->Data);
  1064. } else
  1065. Y_ASSERT(0);
  1066. if (reqData.HasFailed()) {
  1067. Y_ASSERT(0 && "wrong format, memory corruption suspected");
  1068. res->Url = "";
  1069. res->Data.clear();
  1070. }
  1071. }
  1072. res->DataHolder.Reset(nullptr);
  1073. }
  1074. static void LoadResponseData(TUdpHttpResponse* res) {
  1075. if (!res || res->DataHolder.Get() == nullptr)
  1076. return;
  1077. {
  1078. TBlockChainIterator reqData(res->DataHolder->Data->GetChain());
  1079. char pktType;
  1080. reqData.Read(&pktType, 1);
  1081. TGUID guid;
  1082. reqData.Read(&guid, sizeof(guid));
  1083. Y_ASSERT(res->ReqId == guid);
  1084. if (pktType == PKT_RESPONSE) {
  1085. ReadYArr(&reqData, &res->Data);
  1086. } else if (pktType == PKT_LOCAL_RESPONSE) {
  1087. ReadShm(res->DataHolder->Data->GetSharedData(), &res->Data);
  1088. } else
  1089. Y_ASSERT(0);
  1090. if (reqData.HasFailed()) {
  1091. Y_ASSERT(0 && "wrong format, memory corruption suspected");
  1092. res->Ok = TUdpHttpResponse::FAILED;
  1093. res->Data.clear();
  1094. res->Error = "wrong response format";
  1095. }
  1096. }
  1097. res->DataHolder.Reset(nullptr);
  1098. }
  1099. //////////////////////////////////////////////////////////////////////////
  1100. // IRequestOps::TWaitResponse
  1101. TUdpHttpResponse* IRequestOps::TWaitResponse::GetResponse() {
  1102. if (!Response)
  1103. return nullptr;
  1104. TUdpHttpResponse* res = Response;
  1105. Response = nullptr;
  1106. LoadResponseData(res);
  1107. return res;
  1108. }
  1109. void IRequestOps::TWaitResponse::SetResponse(TUdpHttpResponse* r) {
  1110. Y_ASSERT(Response == nullptr || r == nullptr);
  1111. if (r)
  1112. Response = r;
  1113. CompleteEvent.Signal();
  1114. }
  1115. //////////////////////////////////////////////////////////////////////////
  1116. // TRequesterUserQueues
  1117. TUdpHttpRequest* TRequesterUserQueues::GetRequest() {
  1118. TUdpHttpRequest* res = nullptr;
  1119. ReqList.Dequeue(&res);
  1120. if (res) {
  1121. AtomicAdd(QueueSizes->ReqCount, -1);
  1122. AtomicAdd(QueueSizes->ReqQueueSize, -GetPacketSize(res->DataHolder.Get()));
  1123. }
  1124. UpdateAsyncSignalState();
  1125. LoadRequestData(res);
  1126. return res;
  1127. }
  1128. TUdpHttpResponse* TRequesterUserQueues::GetResponse() {
  1129. TUdpHttpResponse* res = nullptr;
  1130. ResponseList.Dequeue(&res);
  1131. if (res) {
  1132. AtomicAdd(QueueSizes->RespCount, -1);
  1133. AtomicAdd(QueueSizes->RespQueueSize, -GetPacketSize(res->DataHolder.Get()));
  1134. }
  1135. UpdateAsyncSignalState();
  1136. LoadResponseData(res);
  1137. return res;
  1138. }
  1139. //////////////////////////////////////////////////////////////////////////
  1140. class TRequestOps: public IRequestOps {
  1141. TIntrusivePtr<TUdpHttp> Requester;
  1142. TIntrusivePtr<TRequesterUserQueues> UserQueues;
  1143. public:
  1144. TRequestOps(TUdpHttp* req)
  1145. : Requester(req)
  1146. {
  1147. UserQueues = new TRequesterUserQueues(req->GetQueueSizes());
  1148. }
  1149. void SendRequest(const TUdpAddress& addr, const TString& url, TVector<char>* data, const TGUID& reqId) override {
  1150. Requester->SendRequestImpl(addr, url, data, reqId, nullptr, UserQueues.Get());
  1151. }
  1152. void CancelRequest(const TGUID& reqId) override {
  1153. Requester->CancelRequest(reqId);
  1154. }
  1155. void BreakRequest(const TGUID& reqId) override {
  1156. Requester->BreakRequest(reqId);
  1157. }
  1158. void SendResponse(const TGUID& reqId, TVector<char>* data) override {
  1159. Requester->SendResponseImpl(reqId, PP_NORMAL, data);
  1160. }
  1161. void SendResponseLowPriority(const TGUID& reqId, TVector<char>* data) override {
  1162. Requester->SendResponseImpl(reqId, PP_LOW, data);
  1163. }
  1164. TUdpHttpRequest* GetRequest() override {
  1165. Y_ASSERT(0);
  1166. //return UserQueues.GetRequest();
  1167. return nullptr; // all requests are routed to the main requester
  1168. }
  1169. TUdpHttpResponse* GetResponse() override {
  1170. return UserQueues->GetResponse();
  1171. }
  1172. bool GetRequestCancel(TGUID*) override {
  1173. Y_ASSERT(0);
  1174. return false; // all request cancels are routed to the main requester
  1175. }
  1176. bool GetSendRequestAcc(TGUID* req) override {
  1177. return UserQueues->GetSendRequestAcc(req);
  1178. }
  1179. // sync mode
  1180. TUdpHttpResponse* Request(const TUdpAddress& addr, const TString& url, TVector<char>* data) override {
  1181. return Requester->Request(addr, url, data);
  1182. }
  1183. TIntrusivePtr<TWaitResponse> WaitableRequest(const TUdpAddress& addr, const TString& url, TVector<char>* data) override {
  1184. return Requester->WaitableRequest(addr, url, data);
  1185. }
  1186. //
  1187. TMuxEvent& GetAsyncEvent() override {
  1188. return UserQueues->GetAsyncEvent();
  1189. }
  1190. };
  1191. IRequestOps* TUdpHttp::CreateSubRequester() {
  1192. return new TRequestOps(this);
  1193. }
  1194. //////////////////////////////////////////////////////////////////////////
  1195. void AbortOnFailedRequest(TUdpHttpResponse* answer) {
  1196. if (answer && answer->Ok == TUdpHttpResponse::FAILED) {
  1197. fprintf(stderr, "Failed request to host %s\n", GetAddressAsString(answer->PeerAddress).data());
  1198. fprintf(stderr, "Error description: %s\n", answer->Error.data());
  1199. fflush(nullptr);
  1200. Y_ASSERT(0);
  1201. abort();
  1202. }
  1203. }
  1204. TString GetDebugInfo(const TUdpAddress& addr, double timeout) {
  1205. NHPTimer::STime start;
  1206. NHPTimer::GetTime(&start);
  1207. TIntrusivePtr<IUdpHost> host = CreateUdpHost(0);
  1208. {
  1209. TAutoPtr<TRopeDataPacket> rq = new TRopeDataPacket;
  1210. rq->Write((char)PKT_GETDEBUGINFO);
  1211. ui32 crc32 = CalcChecksum(rq->GetChain());
  1212. host->Send(addr, rq.Release(), crc32, nullptr, PP_HIGH);
  1213. }
  1214. for (;;) {
  1215. TAutoPtr<TRequest> ptr = host->GetRequest();
  1216. if (ptr.Get()) {
  1217. TBlockChainIterator reqData(ptr->Data->GetChain());
  1218. int sz = reqData.GetSize();
  1219. TString res;
  1220. res.resize(sz);
  1221. reqData.Read(res.begin(), sz);
  1222. return res;
  1223. }
  1224. host->Step();
  1225. host->Wait(0.1f);
  1226. NHPTimer::STime now;
  1227. NHPTimer::GetTime(&now);
  1228. if (NHPTimer::GetSeconds(now - start) > timeout) {
  1229. return TString();
  1230. }
  1231. }
  1232. }
  1233. void Kill(const TUdpAddress& addr) {
  1234. TIntrusivePtr<IUdpHost> host = CreateUdpHost(0);
  1235. host->Kill(addr);
  1236. }
  1237. void StopAllNetLibaThreads() {
  1238. PanicAttack = true; // AAAA!!!!
  1239. }
  1240. void SetNetLibaHeartbeatTimeout(double timeoutSec) {
  1241. NetLibaHeartbeat();
  1242. HeartbeatTimeout.store(timeoutSec, std::memory_order_release);
  1243. }
  1244. void NetLibaHeartbeat() {
  1245. NHPTimer::STime now;
  1246. NHPTimer::GetTime(&now);
  1247. LastHeartbeat.store(now, std::memory_order_release);
  1248. }
  1249. IRequester* CreateHttpUdpRequester(int port) {
  1250. if (PanicAttack)
  1251. return nullptr;
  1252. TIntrusivePtr<NNetlibaSocket::ISocket> socket = NNetlibaSocket::CreateSocket();
  1253. socket->Open(port);
  1254. if (!socket->IsValid())
  1255. return nullptr;
  1256. return CreateHttpUdpRequester(socket);
  1257. }
  1258. IRequester* CreateHttpUdpRequester(const TIntrusivePtr<NNetlibaSocket::ISocket>& socket) {
  1259. if (PanicAttack)
  1260. return nullptr;
  1261. TIntrusivePtr<TUdpHttp> res(new TUdpHttp);
  1262. if (!res->Start(socket))
  1263. return nullptr;
  1264. return res.Release();
  1265. }
  1266. }