netliba.cpp 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508
  1. #include "details.h"
  2. #include "factory.h"
  3. #include "http_common.h"
  4. #include "location.h"
  5. #include "multi.h"
  6. #include "netliba.h"
  7. #include "netliba_udp_http.h"
  8. #include "lfqueue.h"
  9. #include "utils.h"
  10. #include <library/cpp/dns/cache.h>
  11. #include <util/generic/hash.h>
  12. #include <util/generic/singleton.h>
  13. #include <util/generic/vector.h>
  14. #include <util/generic/yexception.h>
  15. #include <util/string/cast.h>
  16. #include <util/system/yassert.h>
  17. #include <atomic>
  18. using namespace NDns;
  19. using namespace NNeh;
  20. namespace NNeh {
  21. size_t TNetLibaOptions::ClientThreads = 4;
  22. TDuration TNetLibaOptions::AckTailEffect = TDuration::Seconds(30);
  23. bool TNetLibaOptions::Set(TStringBuf name, TStringBuf value) {
  24. #define NETLIBA_TRY_SET(optType, optName) \
  25. if (name == TStringBuf(#optName)) { \
  26. optName = FromString<optType>(value); \
  27. }
  28. NETLIBA_TRY_SET(size_t, ClientThreads)
  29. else NETLIBA_TRY_SET(TDuration, AckTailEffect) else {
  30. return false;
  31. }
  32. return true;
  33. }
  34. }
  35. namespace {
  36. namespace NNetLiba {
  37. using namespace NNetliba;
  38. using namespace NNehNetliba;
  39. typedef NNehNetliba::IRequester INetLibaRequester;
  40. typedef TAutoPtr<TUdpHttpRequest> TUdpHttpRequestPtr;
  41. typedef TAutoPtr<TUdpHttpResponse> TUdpHttpResponsePtr;
  42. static inline const addrinfo* FindIPBase(const TNetworkAddress* addr, int family) {
  43. for (TNetworkAddress::TIterator it = addr->Begin(); it != addr->End(); ++it) {
  44. if (it->ai_family == family) {
  45. return &*it;
  46. }
  47. }
  48. return nullptr;
  49. }
  50. static inline const sockaddr_in6& FindIP(const TNetworkAddress* addr) {
  51. //prefer ipv6
  52. const addrinfo* ret = FindIPBase(addr, AF_INET6);
  53. if (!ret) {
  54. ret = FindIPBase(addr, AF_INET);
  55. }
  56. if (!ret) {
  57. ythrow yexception() << "ip not supported by " << *addr;
  58. }
  59. return *(const sockaddr_in6*)(ret->ai_addr);
  60. }
  61. class TLastAckTimes {
  62. struct TTimeVal {
  63. TTimeVal()
  64. : Val(0)
  65. {
  66. }
  67. std::atomic<TInstant::TValue> Val;
  68. };
  69. public:
  70. TInstant::TValue Get(size_t idAddr) {
  71. return Tm_.Get(idAddr).Val.load(std::memory_order_acquire);
  72. }
  73. void Set(size_t idAddr) {
  74. Tm_.Get(idAddr).Val.store(TInstant::Now().GetValue(), std::memory_order_release);
  75. }
  76. static TLastAckTimes& Common() {
  77. return *Singleton<TLastAckTimes>();
  78. }
  79. private:
  80. NNeh::NHttp::TLockFreeSequence<TTimeVal> Tm_;
  81. };
  82. class TRequest: public TSimpleHandle {
  83. public:
  84. inline TRequest(TIntrusivePtr<INetLibaRequester>& r, size_t idAddr, const TMessage& msg, IOnRecv* cb, TStatCollector* s)
  85. : TSimpleHandle(cb, msg, s)
  86. , R_(r)
  87. , IdAddr_(idAddr)
  88. , Notified_(false)
  89. {
  90. CreateGuid(&Guid_);
  91. }
  92. void Cancel() noexcept override {
  93. TSimpleHandle::Cancel();
  94. R_->CancelRequest(Guid_);
  95. }
  96. inline const TString& Addr() const noexcept {
  97. return Message().Addr;
  98. }
  99. inline const TGUID& Guid() const noexcept {
  100. return Guid_;
  101. }
  102. //return false if already notifie
  103. inline bool SetNotified() noexcept {
  104. bool ret = Notified_;
  105. Notified_ = true;
  106. return !ret;
  107. }
  108. void OnSend() {
  109. if (TNetLibaOptions::AckTailEffect.GetValue() && TLastAckTimes::Common().Get(IdAddr_) + TNetLibaOptions::AckTailEffect.GetValue() > TInstant::Now().GetValue()) {
  110. //fake(predicted) completing detection
  111. SetSendComplete();
  112. }
  113. }
  114. void OnRequestAck() {
  115. if (TNetLibaOptions::AckTailEffect.GetValue()) {
  116. TLastAckTimes::Common().Set(IdAddr_);
  117. }
  118. SetSendComplete();
  119. }
  120. private:
  121. TIntrusivePtr<INetLibaRequester> R_;
  122. size_t IdAddr_;
  123. TGUID Guid_;
  124. bool Notified_;
  125. };
  126. typedef TIntrusivePtr<TRequest> TRequestRef;
  127. class TNetLibaBus {
  128. class TEventsHandler: public IEventsCollector {
  129. typedef THashMap<TGUID, TRequestRef, TGUIDHash> TInFly;
  130. public:
  131. inline void OnSend(TRequestRef& req) {
  132. Q_.Enqueue(req);
  133. req->OnSend();
  134. }
  135. private:
  136. void UpdateInFly() {
  137. TRequestRef req;
  138. while (Q_.Dequeue(&req)) {
  139. if (!req) {
  140. return;
  141. }
  142. InFly_[req->Guid()] = req;
  143. }
  144. }
  145. void AddRequest(TUdpHttpRequest* req) override {
  146. //ignore received requests in client
  147. delete req;
  148. }
  149. void AddResponse(TUdpHttpResponse* resp) override {
  150. TUdpHttpResponsePtr ptr(resp);
  151. UpdateInFly();
  152. TInFly::iterator it = InFly_.find(resp->ReqId);
  153. Y_ABORT_UNLESS(it != InFly_.end(), "incorrect incoming message");
  154. TRequestRef& req = it->second;
  155. if (req->SetNotified()) {
  156. if (resp->Ok == TUdpHttpResponse::OK) {
  157. req->NotifyResponse(TString(resp->Data.data(), resp->Data.size()));
  158. } else {
  159. if (resp->Ok == TUdpHttpResponse::CANCELED) {
  160. req->NotifyError(new TError(resp->Error, TError::Cancelled));
  161. } else {
  162. req->NotifyError(new TError(resp->Error));
  163. }
  164. }
  165. }
  166. InFly_.erase(it);
  167. }
  168. void AddCancel(const TGUID& guid) override {
  169. UpdateInFly();
  170. TInFly::iterator it = InFly_.find(guid);
  171. if (it != InFly_.end() && it->second->SetNotified()) {
  172. it->second->NotifyError("Canceled (before ack)");
  173. }
  174. }
  175. void AddRequestAck(const TGUID& guid) override {
  176. UpdateInFly();
  177. TInFly::iterator it = InFly_.find(guid);
  178. Y_ABORT_UNLESS(it != InFly_.end(), "incorrect complete notification");
  179. it->second->OnRequestAck();
  180. }
  181. private:
  182. TLockFreeQueue<TRequestRef> Q_;
  183. TInFly InFly_;
  184. };
  185. struct TClientThread {
  186. TClientThread(int physicalCpu)
  187. : EH_(new TEventsHandler())
  188. , R_(CreateHttpUdpRequester(0, IEventsCollectorRef(EH_.Get()), physicalCpu))
  189. {
  190. R_->EnableReportRequestAck();
  191. }
  192. ~TClientThread() {
  193. R_->StopNoWait();
  194. }
  195. TIntrusivePtr<TEventsHandler> EH_;
  196. TIntrusivePtr<INetLibaRequester> R_;
  197. };
  198. public:
  199. TNetLibaBus() {
  200. for (size_t i = 0; i < TNetLibaOptions::ClientThreads; ++i) {
  201. Clnt_.push_back(new TClientThread(i));
  202. }
  203. }
  204. inline THandleRef Schedule(const TMessage& msg, IOnRecv* cb, TServiceStatRef& ss) {
  205. TParsedLocation loc(msg.Addr);
  206. TUdpAddress addr;
  207. const TResolvedHost* resHost = CachedResolve(TResolveInfo(loc.Host, loc.GetPort()));
  208. GetUdpAddress(&addr, FindIP(&resHost->Addr));
  209. TClientThread& clnt = *Clnt_[resHost->Id % Clnt_.size()];
  210. TIntrusivePtr<INetLibaRequester> rr = clnt.R_;
  211. TRequestRef req(new TRequest(rr, resHost->Id, msg, cb, !ss ? nullptr : new TStatCollector(ss)));
  212. clnt.EH_->OnSend(req);
  213. rr->SendRequest(addr, ToString(loc.Service), msg.Data, req->Guid());
  214. return THandleRef(req.Get());
  215. }
  216. private:
  217. TVector<TAutoPtr<TClientThread>> Clnt_;
  218. };
  219. //server
  220. class TRequester: public TThrRefBase {
  221. struct TSrvRequestState: public TAtomicRefCount<TSrvRequestState> {
  222. TSrvRequestState()
  223. : Canceled(false)
  224. {
  225. }
  226. TAtomicBool Canceled;
  227. };
  228. class TRequest: public IRequest {
  229. public:
  230. inline TRequest(TAutoPtr<TUdpHttpRequest> req, TIntrusivePtr<TSrvRequestState> state, TRequester* parent)
  231. : R_(req)
  232. , S_(state)
  233. , P_(parent)
  234. {
  235. }
  236. ~TRequest() override {
  237. if (!!P_) {
  238. P_->RequestProcessed(this);
  239. }
  240. }
  241. TStringBuf Scheme() const override {
  242. return TStringBuf("netliba");
  243. }
  244. TString RemoteHost() const override {
  245. if (!H_) {
  246. TUdpAddress tmp(R_->PeerAddress);
  247. tmp.Scope = 0; //discard scope from serialized addr
  248. TString addr = GetAddressAsString(tmp);
  249. TStringBuf host, port;
  250. TStringBuf(addr).RSplit(':', host, port);
  251. H_ = host;
  252. }
  253. return H_;
  254. }
  255. TStringBuf Service() const override {
  256. return TStringBuf(R_->Url.c_str(), R_->Url.length());
  257. }
  258. TStringBuf Data() const override {
  259. return TStringBuf((const char*)R_->Data.data(), R_->Data.size());
  260. }
  261. TStringBuf RequestId() const override {
  262. const TGUID& g = R_->ReqId;
  263. return TStringBuf((const char*)g.dw, sizeof(g.dw));
  264. }
  265. bool Canceled() const override {
  266. return S_->Canceled;
  267. }
  268. void SendReply(TData& data) override {
  269. TIntrusivePtr<TRequester> p;
  270. p.Swap(P_);
  271. if (!!p) {
  272. if (!Canceled()) {
  273. p->R_->SendResponse(R_->ReqId, &data);
  274. }
  275. p->RequestProcessed(this);
  276. }
  277. }
  278. void SendError(TResponseError, const TString&) override {
  279. // TODO
  280. }
  281. inline const TGUID& RequestGuid() const noexcept {
  282. return R_->ReqId;
  283. }
  284. private:
  285. TAutoPtr<TUdpHttpRequest> R_;
  286. mutable TString H_;
  287. TIntrusivePtr<TSrvRequestState> S_;
  288. TIntrusivePtr<TRequester> P_;
  289. };
  290. class TEventsHandler: public IEventsCollector {
  291. public:
  292. TEventsHandler(TRequester* parent)
  293. {
  294. P_.store(parent, std::memory_order_release);
  295. }
  296. void RequestProcessed(const TRequest* r) {
  297. FinishedReqs_.Enqueue(r->RequestGuid());
  298. }
  299. //thread safe method for disable proxy callbacks to parent (OnRequest(...))
  300. void SyncStop() {
  301. P_.store(nullptr, std::memory_order_release);
  302. while (!RequesterPtrPotector_.TryAcquire()) {
  303. Sleep(TDuration::MicroSeconds(100));
  304. }
  305. RequesterPtrPotector_.Release();
  306. }
  307. private:
  308. typedef THashMap<TGUID, TIntrusivePtr<TSrvRequestState>, TGUIDHash> TStatesInProcessRequests;
  309. void AddRequest(TUdpHttpRequest* req) override {
  310. TUdpHttpRequestPtr ptr(req);
  311. TSrvRequestState* state = new TSrvRequestState();
  312. InProcess_[req->ReqId] = state;
  313. try {
  314. TGuard<TSpinLock> m(RequesterPtrPotector_);
  315. if (TRequester* p = P_.load(std::memory_order_acquire)) {
  316. p->OnRequest(ptr, state); //move req. owning to parent
  317. }
  318. } catch (...) {
  319. Cdbg << "ignore exc.: " << CurrentExceptionMessage() << Endl;
  320. }
  321. }
  322. void AddResponse(TUdpHttpResponse*) override {
  323. Y_ABORT("unexpected response in neh netliba server");
  324. }
  325. void AddCancel(const TGUID& guid) override {
  326. UpdateInProcess();
  327. TStatesInProcessRequests::iterator ustate = InProcess_.find(guid);
  328. if (ustate != InProcess_.end())
  329. ustate->second->Canceled = true;
  330. }
  331. void AddRequestAck(const TGUID&) override {
  332. Y_ABORT("unexpected acc in neh netliba server");
  333. }
  334. void UpdateInProcess() {
  335. TGUID guid;
  336. while (FinishedReqs_.Dequeue(&guid)) {
  337. InProcess_.erase(guid);
  338. }
  339. }
  340. private:
  341. TLockFreeStack<TGUID> FinishedReqs_; //processed requests (responded or destroyed)
  342. TStatesInProcessRequests InProcess_;
  343. TSpinLock RequesterPtrPotector_;
  344. std::atomic<TRequester*> P_;
  345. };
  346. public:
  347. inline TRequester(IOnRequest* cb, ui16 port)
  348. : CB_(cb)
  349. , EH_(new TEventsHandler(this))
  350. , R_(CreateHttpUdpRequester(port, EH_.Get()))
  351. {
  352. R_->EnableReportRequestCancel();
  353. }
  354. ~TRequester() override {
  355. Shutdown();
  356. }
  357. void Shutdown() noexcept {
  358. if (!Shutdown_) {
  359. Shutdown_ = true;
  360. R_->StopNoWait();
  361. EH_->SyncStop();
  362. }
  363. }
  364. void OnRequest(TUdpHttpRequestPtr req, TSrvRequestState* state) {
  365. CB_->OnRequest(new TRequest(req, state, this));
  366. }
  367. void RequestProcessed(const TRequest* r) {
  368. EH_->RequestProcessed(r);
  369. }
  370. private:
  371. IOnRequest* CB_;
  372. TIntrusivePtr<TEventsHandler> EH_;
  373. TIntrusivePtr<INetLibaRequester> R_;
  374. bool Shutdown_ = false;
  375. };
  376. typedef TIntrusivePtr<TRequester> TRequesterRef;
  377. class TRequesterAutoShutdown: public NNeh::IRequester {
  378. public:
  379. TRequesterAutoShutdown(const TRequesterRef& r)
  380. : R_(r)
  381. {
  382. }
  383. ~TRequesterAutoShutdown() override {
  384. R_->Shutdown();
  385. }
  386. private:
  387. TRequesterRef R_;
  388. };
  389. class TProtocol: public IProtocol {
  390. public:
  391. THandleRef ScheduleRequest(const TMessage& msg, IOnRecv* fallback, TServiceStatRef& ss) override {
  392. return Singleton<TNetLibaBus>()->Schedule(msg, fallback, ss);
  393. }
  394. NNeh::IRequesterRef CreateRequester(IOnRequest* cb, const TParsedLocation& loc) override {
  395. TRequesterRef r(new TRequester(cb, loc.GetPort()));
  396. return new TRequesterAutoShutdown(r);
  397. }
  398. TStringBuf Scheme() const noexcept override {
  399. return TStringBuf("netliba");
  400. }
  401. };
  402. }
  403. }
  404. IProtocol* NNeh::NetLibaProtocol() {
  405. return Singleton<NNetLiba::TProtocol>();
  406. }