tcp.cpp 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676
  1. #include "tcp.h"
  2. #include "details.h"
  3. #include "factory.h"
  4. #include "location.h"
  5. #include "pipequeue.h"
  6. #include "utils.h"
  7. #include <library/cpp/coroutine/listener/listen.h>
  8. #include <library/cpp/coroutine/engine/events.h>
  9. #include <library/cpp/coroutine/engine/sockpool.h>
  10. #include <library/cpp/dns/cache.h>
  11. #include <util/ysaveload.h>
  12. #include <util/generic/buffer.h>
  13. #include <util/generic/guid.h>
  14. #include <util/generic/hash.h>
  15. #include <util/generic/intrlist.h>
  16. #include <util/generic/ptr.h>
  17. #include <util/generic/vector.h>
  18. #include <util/system/yassert.h>
  19. #include <util/system/unaligned_mem.h>
  20. #include <util/stream/buffered.h>
  21. #include <util/stream/mem.h>
  22. using namespace NDns;
  23. using namespace NNeh;
  24. using TNehMessage = TMessage;
  25. template <>
  26. struct TSerializer<TGUID> {
  27. static inline void Save(IOutputStream* out, const TGUID& g) {
  28. out->Write(&g.dw, sizeof(g.dw));
  29. }
  30. static inline void Load(IInputStream* in, TGUID& g) {
  31. in->Load(&g.dw, sizeof(g.dw));
  32. }
  33. };
  34. namespace {
  35. namespace NNehTCP {
  36. typedef IOutputStream::TPart TPart;
  37. static inline ui64 LocalGuid(const TGUID& g) {
  38. return ReadUnaligned<ui64>(g.dw);
  39. }
  40. static inline TString LoadStroka(IInputStream& input, size_t len) {
  41. TString tmp;
  42. tmp.ReserveAndResize(len);
  43. input.Load(tmp.begin(), tmp.size());
  44. return tmp;
  45. }
  46. struct TParts: public TVector<TPart> {
  47. template <class T>
  48. inline void Push(const T& t) {
  49. Push(TPart(t));
  50. }
  51. inline void Push(const TPart& part) {
  52. if (part.len) {
  53. push_back(part);
  54. }
  55. }
  56. inline void Clear() noexcept {
  57. clear();
  58. }
  59. };
  60. template <class T>
  61. struct TMessageQueue {
  62. inline TMessageQueue(TContExecutor* e)
  63. : Ev(e)
  64. {
  65. }
  66. template <class TPtr>
  67. inline void Enqueue(TPtr p) noexcept {
  68. L.PushBack(p.Release());
  69. Ev.Signal();
  70. }
  71. template <class TPtr>
  72. inline bool Dequeue(TPtr& p) noexcept {
  73. do {
  74. if (TryDequeue(p)) {
  75. return true;
  76. }
  77. } while (Ev.WaitI() != ECANCELED);
  78. return false;
  79. }
  80. template <class TPtr>
  81. inline bool TryDequeue(TPtr& p) noexcept {
  82. if (L.Empty()) {
  83. return false;
  84. }
  85. p.Reset(L.PopFront());
  86. return true;
  87. }
  88. inline TContExecutor* Executor() const noexcept {
  89. return Ev.Executor();
  90. }
  91. TIntrusiveListWithAutoDelete<T, TDelete> L;
  92. TContSimpleEvent Ev;
  93. };
  94. template <class Q, class C>
  95. inline bool Dequeue(Q& q, C& c, size_t len) {
  96. typename C::value_type t;
  97. size_t slen = 0;
  98. if (q.Dequeue(t)) {
  99. slen += t->Length();
  100. c.push_back(t);
  101. while (slen < len && q.TryDequeue(t)) {
  102. slen += t->Length();
  103. c.push_back(t);
  104. }
  105. return true;
  106. }
  107. return false;
  108. }
  109. struct TServer: public IRequester, public TContListener::ICallBack {
  110. struct TLink;
  111. typedef TIntrusivePtr<TLink> TLinkRef;
  112. struct TResponce: public TIntrusiveListItem<TResponce> {
  113. inline TResponce(const TLinkRef& link, TData& data, TStringBuf reqid)
  114. : Link(link)
  115. {
  116. Data.swap(data);
  117. TMemoryOutput out(Buf, sizeof(Buf));
  118. ::Save(&out, (ui32)(reqid.size() + Data.size()));
  119. out.Write(reqid.data(), reqid.size());
  120. Y_ASSERT(reqid.size() == 16);
  121. Len = out.Buf() - Buf;
  122. }
  123. inline void Serialize(TParts& parts) {
  124. parts.Push(TStringBuf(Buf, Len));
  125. parts.Push(TStringBuf(Data.data(), Data.size()));
  126. }
  127. inline size_t Length() const noexcept {
  128. return Len + Data.size();
  129. }
  130. TLinkRef Link;
  131. TData Data;
  132. char Buf[32];
  133. size_t Len;
  134. };
  135. typedef TAutoPtr<TResponce> TResponcePtr;
  136. struct TRequest: public IRequest {
  137. inline TRequest(const TLinkRef& link, IInputStream& in, size_t len)
  138. : Link(link)
  139. {
  140. Buf.Proceed(len);
  141. in.Load(Buf.Data(), Buf.Size());
  142. if ((ServiceBegin() - Buf.Data()) + ServiceLen() > Buf.Size()) {
  143. throw yexception() << "invalid request (service len)";
  144. }
  145. }
  146. TStringBuf Scheme() const override {
  147. return TStringBuf("tcp");
  148. }
  149. TString RemoteHost() const override {
  150. return Link->RemoteHost;
  151. }
  152. TStringBuf Service() const override {
  153. return TStringBuf(ServiceBegin(), ServiceLen());
  154. }
  155. TStringBuf Data() const override {
  156. return TStringBuf(Service().end(), Buf.End());
  157. }
  158. TStringBuf RequestId() const override {
  159. return TStringBuf(Buf.Data(), 16);
  160. }
  161. bool Canceled() const override {
  162. //TODO
  163. return false;
  164. }
  165. void SendReply(TData& data) override {
  166. Link->P->Schedule(new TResponce(Link, data, RequestId()));
  167. }
  168. void SendError(TResponseError, const TString&) override {
  169. // TODO
  170. }
  171. size_t ServiceLen() const noexcept {
  172. const char* ptr = RequestId().end();
  173. return *(ui32*)ptr;
  174. }
  175. const char* ServiceBegin() const noexcept {
  176. return RequestId().end() + sizeof(ui32);
  177. }
  178. TBuffer Buf;
  179. TLinkRef Link;
  180. };
  181. struct TLink: public TAtomicRefCount<TLink> {
  182. inline TLink(TServer* parent, const TAcceptFull& a)
  183. : P(parent)
  184. , MQ(Executor())
  185. {
  186. S.Swap(*a.S);
  187. SetNoDelay(S, true);
  188. RemoteHost = PrintHostByRfc(*GetPeerAddr(S));
  189. TLinkRef self(this);
  190. Executor()->Create<TLink, &TLink::RecvCycle>(this, "recv");
  191. Executor()->Create<TLink, &TLink::SendCycle>(this, "send");
  192. Executor()->Running()->Yield();
  193. }
  194. inline void Enqueue(TResponcePtr res) {
  195. MQ.Enqueue(res);
  196. }
  197. inline TContExecutor* Executor() const noexcept {
  198. return P->E.Get();
  199. }
  200. void SendCycle(TCont* c) {
  201. TLinkRef self(this);
  202. try {
  203. DoSendCycle(c);
  204. } catch (...) {
  205. Cdbg << "neh/tcp/1: " << CurrentExceptionMessage() << Endl;
  206. }
  207. }
  208. inline void DoSendCycle(TCont* c) {
  209. TVector<TResponcePtr> responses;
  210. TParts parts;
  211. while (Dequeue(MQ, responses, 7000)) {
  212. for (size_t i = 0; i < responses.size(); ++i) {
  213. responses[i]->Serialize(parts);
  214. }
  215. {
  216. TContIOVector iovec(parts.data(), parts.size());
  217. NCoro::WriteVectorI(c, S, &iovec);
  218. }
  219. parts.Clear();
  220. responses.clear();
  221. }
  222. }
  223. void RecvCycle(TCont* c) {
  224. TLinkRef self(this);
  225. try {
  226. DoRecvCycle(c);
  227. } catch (...) {
  228. if (!c->Cancelled()) {
  229. Cdbg << "neh/tcp/2: " << CurrentExceptionMessage() << Endl;
  230. }
  231. }
  232. }
  233. inline void DoRecvCycle(TCont* c) {
  234. TContIO io(S, c);
  235. TBufferedInput input(&io, 8192 * 4);
  236. while (true) {
  237. ui32 len;
  238. try {
  239. ::Load(&input, len);
  240. } catch (TLoadEOF&) {
  241. return;
  242. }
  243. P->CB->OnRequest(new TRequest(this, input, len));
  244. }
  245. }
  246. TServer* P;
  247. TMessageQueue<TResponce> MQ;
  248. TSocketHolder S;
  249. TString RemoteHost;
  250. };
  251. inline TServer(IOnRequest* cb, ui16 port)
  252. : CB(cb)
  253. , Addr(port)
  254. {
  255. Thrs.push_back(Spawn<TServer, &TServer::Run>(this));
  256. }
  257. ~TServer() override {
  258. Schedule(nullptr);
  259. for (size_t i = 0; i < Thrs.size(); ++i) {
  260. Thrs[i]->Join();
  261. }
  262. }
  263. void Run() {
  264. E = MakeHolder<TContExecutor>(RealStackSize(32000));
  265. THolder<TContListener> L(new TContListener(this, E.Get(), TContListener::TOptions().SetDeferAccept(true)));
  266. //SetHighestThreadPriority();
  267. L->Bind(Addr);
  268. E->Create<TServer, &TServer::RunDispatcher>(this, "dispatcher");
  269. L->Listen();
  270. E->Execute();
  271. }
  272. void OnAcceptFull(const TAcceptFull& a) override {
  273. //I love such code
  274. new TLink(this, a);
  275. }
  276. void OnError() override {
  277. Cerr << CurrentExceptionMessage() << Endl;
  278. }
  279. inline void Schedule(TResponcePtr res) {
  280. PQ.EnqueueSafe(res);
  281. }
  282. void RunDispatcher(TCont* c) {
  283. while (true) {
  284. TResponcePtr res;
  285. PQ.DequeueSafe(c, res);
  286. if (!res) {
  287. break;
  288. }
  289. TLinkRef link = res->Link;
  290. link->Enqueue(res);
  291. }
  292. c->Executor()->Abort();
  293. }
  294. THolder<TContExecutor> E;
  295. IOnRequest* CB;
  296. TNetworkAddress Addr;
  297. TOneConsumerPipeQueue<TResponce> PQ;
  298. TVector<TThreadRef> Thrs;
  299. };
  300. struct TClient {
  301. struct TRequest: public TIntrusiveListItem<TRequest> {
  302. inline TRequest(const TSimpleHandleRef& hndl, const TNehMessage& msg)
  303. : Hndl(hndl)
  304. , Msg(msg)
  305. , Loc(Msg.Addr)
  306. , RI(CachedThrResolve(TResolveInfo(Loc.Host, Loc.GetPort())))
  307. {
  308. CreateGuid(&Guid);
  309. }
  310. inline void Serialize(TParts& parts) {
  311. TMemoryOutput out(Buf, sizeof(Buf));
  312. ::Save(&out, (ui32)MsgLen());
  313. ::Save(&out, Guid);
  314. ::Save(&out, (ui32) Loc.Service.size());
  315. if (Loc.Service.size() > out.Avail()) {
  316. parts.Push(TStringBuf(Buf, out.Buf()));
  317. parts.Push(Loc.Service);
  318. } else {
  319. out.Write(Loc.Service.data(), Loc.Service.size());
  320. parts.Push(TStringBuf(Buf, out.Buf()));
  321. }
  322. parts.Push(Msg.Data);
  323. }
  324. inline size_t Length() const noexcept {
  325. return sizeof(ui32) + MsgLen();
  326. }
  327. inline size_t MsgLen() const noexcept {
  328. return sizeof(Guid.dw) + sizeof(ui32) + Loc.Service.size() + Msg.Data.size();
  329. }
  330. void OnError(const TString& errText) {
  331. Hndl->NotifyError(errText);
  332. }
  333. TSimpleHandleRef Hndl;
  334. TNehMessage Msg;
  335. TGUID Guid;
  336. const TParsedLocation Loc;
  337. const TResolvedHost* RI;
  338. char Buf[128];
  339. };
  340. typedef TAutoPtr<TRequest> TRequestPtr;
  341. struct TChannel {
  342. struct TLink: public TIntrusiveListItem<TLink>, public TSimpleRefCount<TLink> {
  343. inline TLink(TChannel* parent)
  344. : P(parent)
  345. {
  346. Executor()->Create<TLink, &TLink::SendCycle>(this, "send");
  347. }
  348. void SendCycle(TCont* c) {
  349. TIntrusivePtr<TLink> self(this);
  350. try {
  351. DoSendCycle(c);
  352. OnError("shutdown");
  353. } catch (...) {
  354. OnError(CurrentExceptionMessage());
  355. }
  356. Unlink();
  357. }
  358. inline void DoSendCycle(TCont* c) {
  359. if (int ret = NCoro::ConnectI(c, S, P->RI->Addr)) {
  360. ythrow TSystemError(ret) << "can't connect";
  361. }
  362. SetNoDelay(S, true);
  363. Executor()->Create<TLink, &TLink::RecvCycle>(this, "recv");
  364. TVector<TRequestPtr> reqs;
  365. TParts parts;
  366. while (Dequeue(P->Q, reqs, 7000)) {
  367. for (size_t i = 0; i < reqs.size(); ++i) {
  368. TRequestPtr& req = reqs[i];
  369. req->Serialize(parts);
  370. InFly[LocalGuid(req->Guid)] = req;
  371. }
  372. {
  373. TContIOVector vec(parts.data(), parts.size());
  374. NCoro::WriteVectorI(c, S, &vec);
  375. }
  376. reqs.clear();
  377. parts.Clear();
  378. }
  379. }
  380. void RecvCycle(TCont* c) {
  381. TIntrusivePtr<TLink> self(this);
  382. try {
  383. DoRecvCycle(c);
  384. OnError("service close connection");
  385. } catch (...) {
  386. OnError(CurrentExceptionMessage());
  387. }
  388. }
  389. inline void DoRecvCycle(TCont* c) {
  390. TContIO io(S, c);
  391. TBufferedInput input(&io, 8192 * 4);
  392. while (true) {
  393. ui32 len;
  394. TGUID g;
  395. try {
  396. ::Load(&input, len);
  397. } catch (TLoadEOF&) {
  398. return;
  399. }
  400. ::Load(&input, g);
  401. const TString data(LoadStroka(input, len - sizeof(g.dw)));
  402. TInFly::iterator it = InFly.find(LocalGuid(g));
  403. if (it == InFly.end()) {
  404. continue;
  405. }
  406. TRequestPtr req = it->second;
  407. InFly.erase(it);
  408. req->Hndl->NotifyResponse(data);
  409. }
  410. }
  411. inline TContExecutor* Executor() const noexcept {
  412. return P->Q.Executor();
  413. }
  414. void OnError(const TString& errText) {
  415. for (auto& it : InFly) {
  416. it.second->OnError(errText);
  417. }
  418. InFly.clear();
  419. TRequestPtr req;
  420. while (P->Q.TryDequeue(req)) {
  421. req->OnError(errText);
  422. }
  423. }
  424. TChannel* P;
  425. TSocketHolder S;
  426. typedef THashMap<ui64, TRequestPtr> TInFly;
  427. TInFly InFly;
  428. };
  429. inline TChannel(TContExecutor* e, const TResolvedHost* ri)
  430. : Q(e)
  431. , RI(ri)
  432. {
  433. }
  434. inline void Enqueue(TRequestPtr req) {
  435. Q.Enqueue(req);
  436. if (Links.Empty()) {
  437. for (size_t i = 0; i < 1; ++i) {
  438. SpawnLink();
  439. }
  440. }
  441. }
  442. inline void SpawnLink() {
  443. Links.PushBack(new TLink(this));
  444. }
  445. TMessageQueue<TRequest> Q;
  446. TIntrusiveList<TLink> Links;
  447. const TResolvedHost* RI;
  448. };
  449. typedef TAutoPtr<TChannel> TChannelPtr;
  450. inline TClient() {
  451. Thr = Spawn<TClient, &TClient::RunExecutor>(this);
  452. }
  453. inline ~TClient() {
  454. Reqs.Enqueue(nullptr);
  455. Thr->Join();
  456. }
  457. inline THandleRef Schedule(const TNehMessage& msg, IOnRecv* fallback, TServiceStatRef& ss) {
  458. TSimpleHandleRef ret(new TSimpleHandle(fallback, msg, !ss ? nullptr : new TStatCollector(ss)));
  459. Reqs.Enqueue(new TRequest(ret, msg));
  460. return ret.Get();
  461. }
  462. void RunExecutor() {
  463. //SetHighestThreadPriority();
  464. TContExecutor e(RealStackSize(32000));
  465. e.Create<TClient, &TClient::RunDispatcher>(this, "dispatcher");
  466. e.Execute();
  467. }
  468. void RunDispatcher(TCont* c) {
  469. TRequestPtr req;
  470. while (true) {
  471. Reqs.DequeueSafe(c, req);
  472. if (!req) {
  473. break;
  474. }
  475. TChannelPtr& ch = Channels.Get(req->RI->Id);
  476. if (!ch) {
  477. ch.Reset(new TChannel(c->Executor(), req->RI));
  478. }
  479. ch->Enqueue(req);
  480. }
  481. c->Executor()->Abort();
  482. }
  483. TThreadRef Thr;
  484. TOneConsumerPipeQueue<TRequest> Reqs;
  485. TSocketMap<TChannelPtr> Channels;
  486. };
  487. struct TMultiClient {
  488. inline TMultiClient()
  489. : Next(0)
  490. {
  491. for (size_t i = 0; i < 2; ++i) {
  492. Clients.push_back(new TClient());
  493. }
  494. }
  495. inline THandleRef Schedule(const TNehMessage& msg, IOnRecv* fallback, TServiceStatRef& ss) {
  496. return Clients[AtomicIncrement(Next) % Clients.size()]->Schedule(msg, fallback, ss);
  497. }
  498. TVector<TAutoPtr<TClient>> Clients;
  499. TAtomic Next;
  500. };
  501. #if 0
  502. static inline TMultiClient* Client() {
  503. return Singleton<NNehTCP::TMultiClient>();
  504. }
  505. #else
  506. static inline TClient* Client() {
  507. return Singleton<NNehTCP::TClient>();
  508. }
  509. #endif
  510. class TTcpProtocol: public IProtocol {
  511. public:
  512. inline TTcpProtocol() {
  513. InitNetworkSubSystem();
  514. }
  515. IRequesterRef CreateRequester(IOnRequest* cb, const TParsedLocation& loc) override {
  516. return new TServer(cb, loc.GetPort());
  517. }
  518. THandleRef ScheduleRequest(const TNehMessage& msg, IOnRecv* fallback, TServiceStatRef& ss) override {
  519. return Client()->Schedule(msg, fallback, ss);
  520. }
  521. TStringBuf Scheme() const noexcept override {
  522. return TStringBuf("tcp");
  523. }
  524. };
  525. }
  526. }
  527. IProtocol* NNeh::TcpProtocol() {
  528. return Singleton<NNehTCP::TTcpProtocol>();
  529. }