netliba_udp_http.cpp 32 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808
  1. #include "netliba_udp_http.h"
  2. #include "utils.h"
  3. #include <library/cpp/netliba/v6/cpu_affinity.h>
  4. #include <library/cpp/netliba/v6/stdafx.h>
  5. #include <library/cpp/netliba/v6/udp_client_server.h>
  6. #include <library/cpp/netliba/v6/udp_socket.h>
  7. #include <library/cpp/netliba/v6/block_chain.h> // depend on another headers
  8. #include <util/system/hp_timer.h>
  9. #include <util/system/shmat.h>
  10. #include <util/system/spinlock.h>
  11. #include <util/system/thread.h>
  12. #include <util/system/types.h>
  13. #include <util/system/yassert.h>
  14. #include <util/thread/lfqueue.h>
  15. #include <atomic>
  16. #if !defined(_win_)
  17. #include <signal.h>
  18. #include <pthread.h>
  19. #endif
  20. using namespace NNetliba;
  21. namespace {
  22. const float HTTP_TIMEOUT = 15.0f;
  23. const size_t MIN_SHARED_MEM_PACKET = 1000;
  24. const size_t MAX_PACKET_SIZE = 0x70000000;
  25. NNeh::TAtomicBool PanicAttack;
  26. std::atomic<NHPTimer::STime> LastHeartbeat;
  27. std::atomic<double> HeartbeatTimeout;
  28. bool IsLocal(const TUdpAddress& addr) {
  29. return addr.IsIPv4() ? IsLocalIPv4(addr.GetIPv4()) : IsLocalIPv6(addr.Network, addr.Interface);
  30. }
  31. void StopAllNetLibaThreads() {
  32. PanicAttack = true; // AAAA!!!!
  33. }
  34. void ReadShm(TSharedMemory* shm, TVector<char>* data) {
  35. Y_ASSERT(shm);
  36. int dataSize = shm->GetSize();
  37. data->yresize(dataSize);
  38. memcpy(&(*data)[0], shm->GetPtr(), dataSize);
  39. }
  40. void ReadShm(TSharedMemory* shm, TString* data) {
  41. Y_ASSERT(shm);
  42. size_t dataSize = shm->GetSize();
  43. data->ReserveAndResize(dataSize);
  44. memcpy(data->begin(), shm->GetPtr(), dataSize);
  45. }
  46. template <class T>
  47. void EraseList(TLockFreeQueue<T*>* data) {
  48. T* ptr = 0;
  49. while (data->Dequeue(&ptr)) {
  50. delete ptr;
  51. }
  52. }
  53. enum EHttpPacket {
  54. PKT_REQUEST,
  55. PKT_PING,
  56. PKT_PING_RESPONSE,
  57. PKT_RESPONSE,
  58. PKT_LOCAL_REQUEST,
  59. PKT_LOCAL_RESPONSE,
  60. PKT_CANCEL,
  61. };
  62. }
  63. namespace NNehNetliba {
  64. TUdpHttpMessage::TUdpHttpMessage(const TGUID& reqId, const TUdpAddress& peerAddr)
  65. : ReqId(reqId)
  66. , PeerAddress(peerAddr)
  67. {
  68. }
  69. TUdpHttpRequest::TUdpHttpRequest(TAutoPtr<TRequest>& dataHolder, const TGUID& reqId, const TUdpAddress& peerAddr)
  70. : TUdpHttpMessage(reqId, peerAddr)
  71. {
  72. TBlockChainIterator reqData(dataHolder->Data->GetChain());
  73. char pktType;
  74. reqData.Read(&pktType, 1);
  75. ReadArr(&reqData, &Url);
  76. if (pktType == PKT_REQUEST) {
  77. ReadYArr(&reqData, &Data);
  78. } else if (pktType == PKT_LOCAL_REQUEST) {
  79. ReadShm(dataHolder->Data->GetSharedData(), &Data);
  80. } else {
  81. Y_ASSERT(0);
  82. }
  83. if (reqData.HasFailed()) {
  84. Y_ASSERT(0 && "wrong format, memory corruption suspected");
  85. Url = "";
  86. Data.clear();
  87. }
  88. }
  89. TUdpHttpResponse::TUdpHttpResponse(TAutoPtr<TRequest>& dataHolder, const TGUID& reqId, const TUdpAddress& peerAddr, EResult result, const char* error)
  90. : TUdpHttpMessage(reqId, peerAddr)
  91. , Ok(result)
  92. {
  93. if (result == TUdpHttpResponse::FAILED) {
  94. Error = error ? error : "request failed";
  95. } else if (result == TUdpHttpResponse::CANCELED) {
  96. Error = error ? error : "request cancelled";
  97. } else {
  98. TBlockChainIterator reqData(dataHolder->Data->GetChain());
  99. if (Y_UNLIKELY(reqData.HasFailed())) {
  100. Y_ASSERT(0 && "wrong format, memory corruption suspected");
  101. Ok = TUdpHttpResponse::FAILED;
  102. Data.clear();
  103. Error = "wrong response format";
  104. } else {
  105. char pktType;
  106. reqData.Read(&pktType, 1);
  107. TGUID guid;
  108. reqData.Read(&guid, sizeof(guid));
  109. Y_ASSERT(ReqId == guid);
  110. if (pktType == PKT_RESPONSE) {
  111. ReadArr<TString>(&reqData, &Data);
  112. } else if (pktType == PKT_LOCAL_RESPONSE) {
  113. ReadShm(dataHolder->Data->GetSharedData(), &Data);
  114. } else {
  115. Y_ASSERT(0);
  116. }
  117. }
  118. }
  119. }
  120. class TUdpHttp: public IRequester {
  121. enum EDir {
  122. DIR_OUT,
  123. DIR_IN
  124. };
  125. struct TInRequestState {
  126. enum EState {
  127. S_WAITING,
  128. S_RESPONSE_SENDING,
  129. S_CANCELED,
  130. };
  131. TInRequestState()
  132. : State(S_WAITING)
  133. {
  134. }
  135. TInRequestState(const TUdpAddress& address)
  136. : State(S_WAITING)
  137. , Address(address)
  138. {
  139. }
  140. EState State;
  141. TUdpAddress Address;
  142. };
  143. struct TOutRequestState {
  144. enum EState {
  145. S_SENDING,
  146. S_WAITING,
  147. S_WAITING_PING_SENDING,
  148. S_WAITING_PING_SENT,
  149. S_CANCEL_AFTER_SENDING
  150. };
  151. TOutRequestState()
  152. : State(S_SENDING)
  153. , TimePassed(0)
  154. , PingTransferId(-1)
  155. {
  156. }
  157. EState State;
  158. TUdpAddress Address;
  159. double TimePassed;
  160. int PingTransferId;
  161. IEventsCollectorRef EventsCollector;
  162. };
  163. struct TTransferPurpose {
  164. EDir Dir;
  165. TGUID Guid;
  166. TTransferPurpose()
  167. : Dir(DIR_OUT)
  168. {
  169. }
  170. TTransferPurpose(EDir dir, TGUID guid)
  171. : Dir(dir)
  172. , Guid(guid)
  173. {
  174. }
  175. };
  176. struct TSendRequest {
  177. TSendRequest() = default;
  178. TSendRequest(const TUdpAddress& addr, TAutoPtr<TRopeDataPacket>* data, const TGUID& reqGuid, const IEventsCollectorRef& eventsCollector)
  179. : Addr(addr)
  180. , Data(*data)
  181. , ReqGuid(reqGuid)
  182. , EventsCollector(eventsCollector)
  183. , Crc32(CalcChecksum(Data->GetChain()))
  184. {
  185. }
  186. TUdpAddress Addr;
  187. TAutoPtr<TRopeDataPacket> Data;
  188. TGUID ReqGuid;
  189. IEventsCollectorRef EventsCollector;
  190. ui32 Crc32;
  191. };
  192. struct TSendResponse {
  193. TSendResponse() = default;
  194. TSendResponse(const TGUID& reqGuid, EPacketPriority prior, TVector<char>* data)
  195. : ReqGuid(reqGuid)
  196. , DataCrc32(0)
  197. , Priority(prior)
  198. {
  199. if (data && !data->empty()) {
  200. data->swap(Data);
  201. DataCrc32 = TIncrementalChecksumCalcer::CalcBlockSum(&Data[0], Data.ysize());
  202. }
  203. }
  204. TVector<char> Data;
  205. TGUID ReqGuid;
  206. ui32 DataCrc32;
  207. EPacketPriority Priority;
  208. };
  209. typedef THashMap<TGUID, TOutRequestState, TGUIDHash> TOutRequestHash;
  210. typedef THashMap<TGUID, TInRequestState, TGUIDHash> TInRequestHash;
  211. public:
  212. TUdpHttp(const IEventsCollectorRef& eventsCollector)
  213. : MyThread_(ExecServerThread, (void*)this)
  214. , AbortTransactions_(false)
  215. , Port_(0)
  216. , EventCollector_(eventsCollector)
  217. , ReportRequestCancel_(false)
  218. , ReporRequestAck_(false)
  219. , PhysicalCpu_(-1)
  220. {
  221. }
  222. ~TUdpHttp() override {
  223. if (MyThread_.Running()) {
  224. AtomicSet(KeepRunning_, 0);
  225. MyThread_.Join();
  226. }
  227. }
  228. bool Start(int port, int physicalCpu) {
  229. Y_ASSERT(Host_.Get() == nullptr);
  230. Port_ = port;
  231. PhysicalCpu_ = physicalCpu;
  232. MyThread_.Start();
  233. HasStarted_.Wait();
  234. return Host_.Get() != nullptr;
  235. }
  236. void EnableReportRequestCancel() override {
  237. ReportRequestCancel_ = true;
  238. }
  239. void EnableReportRequestAck() override {
  240. ReporRequestAck_ = true;
  241. }
  242. void SendRequest(const TUdpAddress& addr, const TString& url, const TString& data, const TGUID& reqId) override {
  243. Y_ABORT_UNLESS(
  244. data.size() < MAX_PACKET_SIZE,
  245. "data size is too large; data.size()=%" PRISZT ", MAX_PACKET_SIZE=%" PRISZT,
  246. data.size(), MAX_PACKET_SIZE);
  247. TAutoPtr<TRopeDataPacket> ms = new TRopeDataPacket;
  248. if (data.size() > MIN_SHARED_MEM_PACKET && IsLocal(addr)) {
  249. TIntrusivePtr<TSharedMemory> shm = new TSharedMemory;
  250. if (shm->Create(data.size())) {
  251. ms->Write((char)PKT_LOCAL_REQUEST);
  252. ms->WriteStroka(url);
  253. memcpy(shm->GetPtr(), data.begin(), data.size());
  254. ms->AttachSharedData(shm);
  255. }
  256. }
  257. if (ms->GetSharedData() == nullptr) {
  258. ms->Write((char)PKT_REQUEST);
  259. ms->WriteStroka(url);
  260. struct TStrokaStorage: public TThrRefBase, public TString {
  261. TStrokaStorage(const TString& s)
  262. : TString(s)
  263. {
  264. }
  265. };
  266. TStrokaStorage* ss = new TStrokaStorage(data);
  267. ms->Write((int)ss->size());
  268. ms->AddBlock(ss, ss->begin(), ss->size());
  269. }
  270. SendReqList_.Enqueue(new TSendRequest(addr, &ms, reqId, EventCollector_));
  271. Host_->CancelWait();
  272. }
  273. void CancelRequest(const TGUID& reqId) override {
  274. CancelReqList_.Enqueue(reqId);
  275. Host_->CancelWait();
  276. }
  277. void SendResponse(const TGUID& reqId, TVector<char>* data) override {
  278. if (data && data->size() > MAX_PACKET_SIZE) {
  279. Y_ABORT(
  280. "data size is too large; data->size()=%" PRISZT ", MAX_PACKET_SIZE=%" PRISZT,
  281. data->size(), MAX_PACKET_SIZE);
  282. }
  283. SendRespList_.Enqueue(new TSendResponse(reqId, PP_NORMAL, data));
  284. Host_->CancelWait();
  285. }
  286. void StopNoWait() override {
  287. AbortTransactions_ = true;
  288. AtomicSet(KeepRunning_, 0);
  289. // calcel all outgoing requests
  290. TGuard<TSpinLock> lock(Spn_);
  291. while (!OutRequests_.empty()) {
  292. // cancel without informing peer that we are cancelling the request
  293. FinishRequest(OutRequests_.begin(), TUdpHttpResponse::CANCELED, nullptr, "request canceled: inside TUdpHttp::StopNoWait()");
  294. }
  295. }
  296. private:
  297. void FinishRequest(TOutRequestHash::iterator i, TUdpHttpResponse::EResult ok, TRequestPtr data, const char* error = nullptr) {
  298. TOutRequestState& s = i->second;
  299. s.EventsCollector->AddResponse(new TUdpHttpResponse(data, i->first, s.Address, ok, error));
  300. OutRequests_.erase(i);
  301. }
  302. int SendWithHighPriority(const TUdpAddress& addr, TAutoPtr<TRopeDataPacket> data) {
  303. ui32 crc32 = CalcChecksum(data->GetChain());
  304. return Host_->Send(addr, data.Release(), crc32, nullptr, PP_HIGH);
  305. }
  306. void ProcessIncomingPackets() {
  307. TVector<TGUID> failedRequests;
  308. for (;;) {
  309. TAutoPtr<TRequest> req = Host_->GetRequest();
  310. if (req.Get() == nullptr)
  311. break;
  312. TBlockChainIterator reqData(req->Data->GetChain());
  313. char pktType;
  314. reqData.Read(&pktType, 1);
  315. switch (pktType) {
  316. case PKT_REQUEST:
  317. case PKT_LOCAL_REQUEST: {
  318. TGUID reqId = req->Guid;
  319. TInRequestHash::iterator z = InRequests_.find(reqId);
  320. if (z != InRequests_.end()) {
  321. // oops, this request already exists!
  322. // might happen if request can be stored in single packet
  323. // and this packet had source IP broken during transmission and managed to pass crc checks
  324. // since we already reported wrong source address for this request to the user
  325. // the best thing we can do is to stop the program to avoid further complications
  326. // but we just report the accident to stderr
  327. fprintf(stderr, "Jackpot, same request %s received twice from %s and earlier from %s\n",
  328. GetGuidAsString(reqId).c_str(), GetAddressAsString(z->second.Address).c_str(),
  329. GetAddressAsString(req->Address).c_str());
  330. } else {
  331. InRequests_[reqId] = TInRequestState(req->Address);
  332. EventCollector_->AddRequest(new TUdpHttpRequest(req, reqId, req->Address));
  333. }
  334. } break;
  335. case PKT_PING: {
  336. TGUID guid;
  337. reqData.Read(&guid, sizeof(guid));
  338. bool ok = InRequests_.find(guid) != InRequests_.end();
  339. TAutoPtr<TRopeDataPacket> ms = new TRopeDataPacket;
  340. ms->Write((char)PKT_PING_RESPONSE);
  341. ms->Write(guid);
  342. ms->Write(ok);
  343. SendWithHighPriority(req->Address, ms.Release());
  344. } break;
  345. case PKT_PING_RESPONSE: {
  346. TGUID guid;
  347. bool ok;
  348. reqData.Read(&guid, sizeof(guid));
  349. reqData.Read(&ok, sizeof(ok));
  350. TOutRequestHash::iterator i = OutRequests_.find(guid);
  351. if (i == OutRequests_.end())
  352. ; //Y_ASSERT(0); // actually possible with some packet orders
  353. else {
  354. if (!ok) {
  355. // can not delete request at this point
  356. // since we can receive failed ping and response at the same moment
  357. // consider sequence: client sends ping, server sends response
  358. // and replies false to ping as reply is sent
  359. // we can not receive failed ping_response earlier then response itself
  360. // but we can receive them simultaneously
  361. failedRequests.push_back(guid);
  362. } else {
  363. TOutRequestState& s = i->second;
  364. switch (s.State) {
  365. case TOutRequestState::S_WAITING_PING_SENDING: {
  366. Y_ASSERT(s.PingTransferId >= 0);
  367. TTransferHash::iterator k = TransferHash_.find(s.PingTransferId);
  368. if (k != TransferHash_.end())
  369. TransferHash_.erase(k);
  370. else
  371. Y_ASSERT(0);
  372. s.PingTransferId = -1;
  373. s.TimePassed = 0;
  374. s.State = TOutRequestState::S_WAITING;
  375. } break;
  376. case TOutRequestState::S_WAITING_PING_SENT:
  377. s.TimePassed = 0;
  378. s.State = TOutRequestState::S_WAITING;
  379. break;
  380. default:
  381. Y_ASSERT(0);
  382. break;
  383. }
  384. }
  385. }
  386. } break;
  387. case PKT_RESPONSE:
  388. case PKT_LOCAL_RESPONSE: {
  389. TGUID guid;
  390. reqData.Read(&guid, sizeof(guid));
  391. TOutRequestHash::iterator i = OutRequests_.find(guid);
  392. if (i == OutRequests_.end()) {
  393. ; //Y_ASSERT(0); // does happen
  394. } else {
  395. FinishRequest(i, TUdpHttpResponse::OK, req);
  396. }
  397. } break;
  398. case PKT_CANCEL: {
  399. TGUID guid;
  400. reqData.Read(&guid, sizeof(guid));
  401. TInRequestHash::iterator i = InRequests_.find(guid);
  402. if (i == InRequests_.end()) {
  403. ; //Y_ASSERT(0); // may happen
  404. } else {
  405. TInRequestState& s = i->second;
  406. if (s.State != TInRequestState::S_CANCELED && ReportRequestCancel_)
  407. EventCollector_->AddCancel(guid);
  408. s.State = TInRequestState::S_CANCELED;
  409. }
  410. } break;
  411. default:
  412. Y_ASSERT(0);
  413. }
  414. }
  415. // cleanup failed requests
  416. for (size_t k = 0; k < failedRequests.size(); ++k) {
  417. const TGUID& guid = failedRequests[k];
  418. TOutRequestHash::iterator i = OutRequests_.find(guid);
  419. if (i != OutRequests_.end())
  420. FinishRequest(i, TUdpHttpResponse::FAILED, nullptr, "failed udp ping");
  421. }
  422. }
  423. void AnalyzeSendResults() {
  424. TSendResult res;
  425. while (Host_->GetSendResult(&res)) {
  426. TTransferHash::iterator k = TransferHash_.find(res.TransferId);
  427. if (k != TransferHash_.end()) {
  428. const TTransferPurpose& tp = k->second;
  429. switch (tp.Dir) {
  430. case DIR_OUT: {
  431. TOutRequestHash::iterator i = OutRequests_.find(tp.Guid);
  432. if (i != OutRequests_.end()) {
  433. const TGUID& reqId = i->first;
  434. TOutRequestState& s = i->second;
  435. switch (s.State) {
  436. case TOutRequestState::S_SENDING:
  437. if (!res.Success) {
  438. FinishRequest(i, TUdpHttpResponse::FAILED, nullptr, "request failed: state S_SENDING");
  439. } else {
  440. if (ReporRequestAck_ && !!s.EventsCollector) {
  441. s.EventsCollector->AddRequestAck(reqId);
  442. }
  443. s.State = TOutRequestState::S_WAITING;
  444. s.TimePassed = 0;
  445. }
  446. break;
  447. case TOutRequestState::S_CANCEL_AFTER_SENDING:
  448. DoSendCancel(s.Address, reqId);
  449. FinishRequest(i, TUdpHttpResponse::CANCELED, nullptr, "request failed: state S_CANCEL_AFTER_SENDING");
  450. break;
  451. case TOutRequestState::S_WAITING:
  452. case TOutRequestState::S_WAITING_PING_SENT:
  453. Y_ASSERT(0);
  454. break;
  455. case TOutRequestState::S_WAITING_PING_SENDING:
  456. Y_ASSERT(s.PingTransferId >= 0 && s.PingTransferId == res.TransferId);
  457. if (!res.Success) {
  458. FinishRequest(i, TUdpHttpResponse::FAILED, nullptr, "request failed: state S_WAITING_PING_SENDING");
  459. } else {
  460. s.PingTransferId = -1;
  461. s.State = TOutRequestState::S_WAITING_PING_SENT;
  462. s.TimePassed = 0;
  463. }
  464. break;
  465. default:
  466. Y_ASSERT(0);
  467. break;
  468. }
  469. }
  470. } break;
  471. case DIR_IN: {
  472. TInRequestHash::iterator i = InRequests_.find(tp.Guid);
  473. if (i != InRequests_.end()) {
  474. Y_ASSERT(i->second.State == TInRequestState::S_RESPONSE_SENDING || i->second.State == TInRequestState::S_CANCELED);
  475. InRequests_.erase(i);
  476. }
  477. } break;
  478. default:
  479. Y_ASSERT(0);
  480. break;
  481. }
  482. TransferHash_.erase(k);
  483. }
  484. }
  485. }
  486. void SendPingsIfNeeded() {
  487. NHPTimer::STime tChk = PingsSendT_;
  488. float deltaT = (float)NHPTimer::GetTimePassed(&tChk);
  489. if (deltaT < 0.05) {
  490. return;
  491. }
  492. PingsSendT_ = tChk;
  493. deltaT = ClampVal(deltaT, 0.0f, HTTP_TIMEOUT / 3);
  494. {
  495. for (TOutRequestHash::iterator i = OutRequests_.begin(); i != OutRequests_.end();) {
  496. TOutRequestHash::iterator curIt = i++;
  497. TOutRequestState& s = curIt->second;
  498. const TGUID& guid = curIt->first;
  499. switch (s.State) {
  500. case TOutRequestState::S_WAITING:
  501. s.TimePassed += deltaT;
  502. if (s.TimePassed > HTTP_TIMEOUT) {
  503. TAutoPtr<TRopeDataPacket> ms = new TRopeDataPacket;
  504. ms->Write((char)PKT_PING);
  505. ms->Write(guid);
  506. int transId = SendWithHighPriority(s.Address, ms.Release());
  507. TransferHash_[transId] = TTransferPurpose(DIR_OUT, guid);
  508. s.State = TOutRequestState::S_WAITING_PING_SENDING;
  509. s.PingTransferId = transId;
  510. }
  511. break;
  512. case TOutRequestState::S_WAITING_PING_SENT:
  513. s.TimePassed += deltaT;
  514. if (s.TimePassed > HTTP_TIMEOUT) {
  515. FinishRequest(curIt, TUdpHttpResponse::FAILED, nullptr, "request failed: http timeout in state S_WAITING_PING_SENT");
  516. }
  517. break;
  518. default:
  519. break;
  520. }
  521. }
  522. }
  523. }
  524. void Step() {
  525. {
  526. TGuard<TSpinLock> lock(Spn_);
  527. DoSends();
  528. }
  529. Host_->Step();
  530. {
  531. TGuard<TSpinLock> lock(Spn_);
  532. DoSends();
  533. ProcessIncomingPackets();
  534. AnalyzeSendResults();
  535. SendPingsIfNeeded();
  536. }
  537. }
  538. void Wait() {
  539. Host_->Wait(0.1f);
  540. }
  541. void DoSendCancel(const TUdpAddress& addr, const TGUID& req) {
  542. TAutoPtr<TRopeDataPacket> ms = new TRopeDataPacket;
  543. ms->Write((char)PKT_CANCEL);
  544. ms->Write(req);
  545. SendWithHighPriority(addr, ms);
  546. }
  547. void DoSends() {
  548. {
  549. // cancelling requests
  550. TGUID reqGuid;
  551. while (CancelReqList_.Dequeue(&reqGuid)) {
  552. TOutRequestHash::iterator i = OutRequests_.find(reqGuid);
  553. if (i == OutRequests_.end()) {
  554. AnticipateCancels_.insert(reqGuid);
  555. continue; // cancelling non existing request is ok
  556. }
  557. TOutRequestState& s = i->second;
  558. if (s.State == TOutRequestState::S_SENDING) {
  559. // we are in trouble - have not sent request and we already have to cancel it, wait send
  560. s.State = TOutRequestState::S_CANCEL_AFTER_SENDING;
  561. s.EventsCollector->AddCancel(i->first);
  562. } else {
  563. DoSendCancel(s.Address, reqGuid);
  564. FinishRequest(i, TUdpHttpResponse::CANCELED, nullptr, "request canceled: notify requested side");
  565. }
  566. }
  567. }
  568. {
  569. // sending replies
  570. for (TSendResponse* rd = nullptr; SendRespList_.Dequeue(&rd); delete rd) {
  571. TInRequestHash::iterator i = InRequests_.find(rd->ReqGuid);
  572. if (i == InRequests_.end()) {
  573. Y_ASSERT(0);
  574. continue;
  575. }
  576. TInRequestState& s = i->second;
  577. if (s.State == TInRequestState::S_CANCELED) {
  578. // need not send response for the canceled request
  579. InRequests_.erase(i);
  580. continue;
  581. }
  582. Y_ASSERT(s.State == TInRequestState::S_WAITING);
  583. s.State = TInRequestState::S_RESPONSE_SENDING;
  584. TAutoPtr<TRopeDataPacket> ms = new TRopeDataPacket;
  585. ui32 crc32 = 0;
  586. int dataSize = rd->Data.ysize();
  587. if (rd->Data.size() > MIN_SHARED_MEM_PACKET && IsLocal(s.Address)) {
  588. TIntrusivePtr<TSharedMemory> shm = new TSharedMemory;
  589. if (shm->Create(dataSize)) {
  590. ms->Write((char)PKT_LOCAL_RESPONSE);
  591. ms->Write(rd->ReqGuid);
  592. memcpy(shm->GetPtr(), &rd->Data[0], dataSize);
  593. TVector<char> empty;
  594. rd->Data.swap(empty);
  595. ms->AttachSharedData(shm);
  596. crc32 = CalcChecksum(ms->GetChain());
  597. }
  598. }
  599. if (ms->GetSharedData() == nullptr) {
  600. ms->Write((char)PKT_RESPONSE);
  601. ms->Write(rd->ReqGuid);
  602. // to offload crc calcs from inner thread, crc of data[] is calced outside and passed in DataCrc32
  603. // this means that we are calculating crc when shared memory is used
  604. // it is hard to avoid since in SendResponse() we don't know if shared mem will be used
  605. // (peer address is not available there)
  606. TIncrementalChecksumCalcer csCalcer;
  607. AddChain(&csCalcer, ms->GetChain());
  608. // here we are replicating the way WriteDestructive serializes data
  609. csCalcer.AddBlock(&dataSize, sizeof(dataSize));
  610. csCalcer.AddBlockSum(rd->DataCrc32, dataSize);
  611. crc32 = csCalcer.CalcChecksum();
  612. ms->WriteDestructive(&rd->Data);
  613. //ui32 chkCrc = CalcChecksum(ms->GetChain()); // can not use since its slow for large responses
  614. //Y_ASSERT(chkCrc == crc32);
  615. }
  616. int transId = Host_->Send(s.Address, ms.Release(), crc32, nullptr, rd->Priority);
  617. TransferHash_[transId] = TTransferPurpose(DIR_IN, rd->ReqGuid);
  618. }
  619. }
  620. {
  621. // sending requests
  622. for (TSendRequest* rd = nullptr; SendReqList_.Dequeue(&rd); delete rd) {
  623. Y_ASSERT(OutRequests_.find(rd->ReqGuid) == OutRequests_.end());
  624. {
  625. TOutRequestState& s = OutRequests_[rd->ReqGuid];
  626. s.State = TOutRequestState::S_SENDING;
  627. s.Address = rd->Addr;
  628. s.EventsCollector = rd->EventsCollector;
  629. }
  630. if (AnticipateCancels_.find(rd->ReqGuid) != AnticipateCancels_.end()) {
  631. FinishRequest(OutRequests_.find(rd->ReqGuid), TUdpHttpResponse::CANCELED, nullptr, "Canceled (before transmit)");
  632. } else {
  633. TGUID pktGuid = rd->ReqGuid; // request packet id should match request id
  634. int transId = Host_->Send(rd->Addr, rd->Data.Release(), rd->Crc32, &pktGuid, PP_NORMAL);
  635. TransferHash_[transId] = TTransferPurpose(DIR_OUT, rd->ReqGuid);
  636. }
  637. }
  638. }
  639. if (!AnticipateCancels_.empty()) {
  640. AnticipateCancels_.clear();
  641. }
  642. }
  643. void FinishOutstandingTransactions() {
  644. // wait all pending requests, all new requests are canceled
  645. while ((!OutRequests_.empty() || !InRequests_.empty() || !SendRespList_.IsEmpty() || !SendReqList_.IsEmpty()) && !PanicAttack) {
  646. Step();
  647. sleep(0);
  648. }
  649. }
  650. static void* ExecServerThread(void* param) {
  651. TUdpHttp* pThis = (TUdpHttp*)param;
  652. if (pThis->GetPhysicalCpu() >= 0) {
  653. BindToSocket(pThis->GetPhysicalCpu());
  654. }
  655. SetHighestThreadPriority();
  656. TIntrusivePtr<NNetlibaSocket::ISocket> socket = NNetlibaSocket::CreateSocket();
  657. socket->Open(pThis->Port_);
  658. if (socket->IsValid()) {
  659. pThis->Port_ = socket->GetPort();
  660. pThis->Host_ = CreateUdpHost(socket);
  661. } else {
  662. pThis->Host_ = nullptr;
  663. }
  664. pThis->HasStarted_.Signal();
  665. if (!pThis->Host_)
  666. return nullptr;
  667. NHPTimer::GetTime(&pThis->PingsSendT_);
  668. while (AtomicGet(pThis->KeepRunning_) && !PanicAttack) {
  669. if (HeartbeatTimeout.load(std::memory_order_acquire) > 0) {
  670. NHPTimer::STime chk = LastHeartbeat.load(std::memory_order_acquire);
  671. if (NHPTimer::GetTimePassed(&chk) > HeartbeatTimeout.load(std::memory_order_acquire)) {
  672. StopAllNetLibaThreads();
  673. #ifndef _win_
  674. killpg(0, SIGKILL);
  675. #endif
  676. abort();
  677. break;
  678. }
  679. }
  680. pThis->Step();
  681. pThis->Wait();
  682. }
  683. if (!pThis->AbortTransactions_ && !PanicAttack) {
  684. pThis->FinishOutstandingTransactions();
  685. }
  686. pThis->Host_ = nullptr;
  687. return nullptr;
  688. }
  689. int GetPhysicalCpu() const noexcept {
  690. return PhysicalCpu_;
  691. }
  692. private:
  693. TThread MyThread_;
  694. TAtomic KeepRunning_ = 1;
  695. bool AbortTransactions_;
  696. TSpinLock Spn_;
  697. TSystemEvent HasStarted_;
  698. NHPTimer::STime PingsSendT_;
  699. TIntrusivePtr<IUdpHost> Host_;
  700. int Port_;
  701. TOutRequestHash OutRequests_;
  702. TInRequestHash InRequests_;
  703. typedef THashMap<int, TTransferPurpose> TTransferHash;
  704. TTransferHash TransferHash_;
  705. // hold it here to not construct on every DoSends()
  706. typedef THashSet<TGUID, TGUIDHash> TAnticipateCancels;
  707. TAnticipateCancels AnticipateCancels_;
  708. TLockFreeQueue<TSendRequest*> SendReqList_;
  709. TLockFreeQueue<TSendResponse*> SendRespList_;
  710. TLockFreeQueue<TGUID> CancelReqList_;
  711. TIntrusivePtr<IEventsCollector> EventCollector_;
  712. bool ReportRequestCancel_;
  713. bool ReporRequestAck_;
  714. int PhysicalCpu_;
  715. };
  716. IRequesterRef CreateHttpUdpRequester(int port, const IEventsCollectorRef& ec, int physicalCpu) {
  717. TUdpHttp* udpHttp = new TUdpHttp(ec);
  718. IRequesterRef res(udpHttp);
  719. if (!udpHttp->Start(port, physicalCpu)) {
  720. if (port) {
  721. ythrow yexception() << "netliba can't bind port=" << port;
  722. } else {
  723. ythrow yexception() << "netliba can't bind random port";
  724. }
  725. }
  726. return res;
  727. }
  728. }