udp.cpp 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691
  1. #include "udp.h"
  2. #include "details.h"
  3. #include "neh.h"
  4. #include "location.h"
  5. #include "utils.h"
  6. #include "factory.h"
  7. #include <library/cpp/dns/cache.h>
  8. #include <util/network/socket.h>
  9. #include <util/network/address.h>
  10. #include <util/generic/deque.h>
  11. #include <util/generic/hash.h>
  12. #include <util/generic/string.h>
  13. #include <util/generic/buffer.h>
  14. #include <util/generic/singleton.h>
  15. #include <util/digest/murmur.h>
  16. #include <util/random/random.h>
  17. #include <util/ysaveload.h>
  18. #include <util/system/thread.h>
  19. #include <util/system/pipe.h>
  20. #include <util/system/error.h>
  21. #include <util/stream/mem.h>
  22. #include <util/stream/buffer.h>
  23. #include <util/string/cast.h>
  24. using namespace NNeh;
  25. using namespace NDns;
  26. using namespace NAddr;
  27. namespace {
  28. namespace NUdp {
  29. enum EPacketType {
  30. PT_REQUEST = 1,
  31. PT_RESPONSE = 2,
  32. PT_STOP = 3,
  33. PT_TIMEOUT = 4
  34. };
  35. struct TUdpHandle: public TNotifyHandle {
  36. inline TUdpHandle(IOnRecv* r, const TMessage& msg, TStatCollector* sc) noexcept
  37. : TNotifyHandle(r, msg, sc)
  38. {
  39. }
  40. void Cancel() noexcept override {
  41. THandle::Cancel(); //inform stat collector
  42. }
  43. bool MessageSendedCompletely() const noexcept override {
  44. //TODO
  45. return true;
  46. }
  47. };
  48. static inline IRemoteAddrPtr GetSendAddr(SOCKET s) {
  49. IRemoteAddrPtr local = GetSockAddr(s);
  50. const sockaddr* addr = local->Addr();
  51. switch (addr->sa_family) {
  52. case AF_INET: {
  53. const TIpAddress a = *(const sockaddr_in*)addr;
  54. return MakeHolder<TIPv4Addr>(TIpAddress(InetToHost(INADDR_LOOPBACK), a.Port()));
  55. }
  56. case AF_INET6: {
  57. sockaddr_in6 a = *(const sockaddr_in6*)addr;
  58. a.sin6_addr = in6addr_loopback;
  59. return MakeHolder<TIPv6Addr>(a);
  60. }
  61. }
  62. ythrow yexception() << "unsupported";
  63. }
  64. typedef ui32 TCheckSum;
  65. static inline TString GenerateGuid() {
  66. const ui64 res[2] = {
  67. RandomNumber<ui64>(), RandomNumber<ui64>()};
  68. return TString((const char*)res, sizeof(res));
  69. }
  70. static inline TCheckSum Sum(const TStringBuf& s) noexcept {
  71. return HostToInet(MurmurHash<TCheckSum>(s.data(), s.size()));
  72. }
  73. struct TPacket;
  74. template <class T>
  75. static inline void Serialize(TPacket& p, const T& t);
  76. struct TPacket {
  77. inline TPacket(IRemoteAddrPtr addr)
  78. : Addr(std::move(addr))
  79. {
  80. }
  81. template <class T>
  82. inline TPacket(const T& t, IRemoteAddrPtr addr)
  83. : Addr(std::move(addr))
  84. {
  85. NUdp::Serialize(*this, t);
  86. }
  87. inline TPacket(TSocketHolder& s, TBuffer& tmp) {
  88. TAutoPtr<TOpaqueAddr> addr(new TOpaqueAddr());
  89. retry_on_intr : {
  90. const int rv = recvfrom(s, tmp.Data(), tmp.size(), MSG_WAITALL, addr->MutableAddr(), addr->LenPtr());
  91. if (rv < 0) {
  92. int err = LastSystemError();
  93. if (err == EAGAIN || err == EWOULDBLOCK) {
  94. Data.Resize(sizeof(TCheckSum) + 1);
  95. *(Data.data() + sizeof(TCheckSum)) = static_cast<char>(PT_TIMEOUT);
  96. } else if (err == EINTR) {
  97. goto retry_on_intr;
  98. } else {
  99. ythrow TSystemError() << "recv failed";
  100. }
  101. } else {
  102. Data.Append(tmp.Data(), (size_t)rv);
  103. Addr.Reset(addr.Release());
  104. CheckSign();
  105. }
  106. }
  107. }
  108. inline void SendTo(TSocketHolder& s) {
  109. Sign();
  110. if (sendto(s, Data.data(), Data.size(), 0, Addr->Addr(), Addr->Len()) < 0) {
  111. Cdbg << LastSystemErrorText() << Endl;
  112. }
  113. }
  114. IRemoteAddrPtr Addr;
  115. TBuffer Data;
  116. inline void Sign() {
  117. const TCheckSum sum = CalcSign();
  118. memcpy(Data.Data(), &sum, sizeof(sum));
  119. }
  120. inline char Type() const noexcept {
  121. return *(Data.data() + sizeof(TCheckSum));
  122. }
  123. inline void CheckSign() const {
  124. if (Data.size() < 16) {
  125. ythrow yexception() << "small packet";
  126. }
  127. if (StoredSign() != CalcSign()) {
  128. ythrow yexception() << "bad checksum";
  129. }
  130. }
  131. inline TCheckSum StoredSign() const noexcept {
  132. TCheckSum sum;
  133. memcpy(&sum, Data.Data(), sizeof(sum));
  134. return sum;
  135. }
  136. inline TCheckSum CalcSign() const noexcept {
  137. return Sum(Body());
  138. }
  139. inline TStringBuf Body() const noexcept {
  140. return TStringBuf(Data.data() + sizeof(TCheckSum), Data.End());
  141. }
  142. };
  143. typedef TAutoPtr<TPacket> TPacketRef;
  144. class TPacketInput: public TMemoryInput {
  145. public:
  146. inline TPacketInput(const TPacket& p)
  147. : TMemoryInput(p.Body().data(), p.Body().size())
  148. {
  149. }
  150. };
  151. class TPacketOutput: public TBufferOutput {
  152. public:
  153. inline TPacketOutput(TPacket& p)
  154. : TBufferOutput(p.Data)
  155. {
  156. p.Data.Proceed(sizeof(TCheckSum));
  157. }
  158. };
  159. template <class T>
  160. static inline void Serialize(TPacketOutput* out, const T& t) {
  161. Save(out, t.Type());
  162. t.Serialize(out);
  163. }
  164. template <class T>
  165. static inline void Serialize(TPacket& p, const T& t) {
  166. TPacketOutput out(p);
  167. NUdp::Serialize(&out, t);
  168. }
  169. namespace NPrivate {
  170. template <class T>
  171. static inline void Deserialize(TPacketInput* in, T& t) {
  172. char type;
  173. Load(in, type);
  174. if (type != t.Type()) {
  175. ythrow yexception() << "unsupported packet";
  176. }
  177. t.Deserialize(in);
  178. }
  179. template <class T>
  180. static inline void Deserialize(const TPacket& p, T& t) {
  181. TPacketInput in(p);
  182. Deserialize(&in, t);
  183. }
  184. }
  185. struct TRequestPacket {
  186. TString Guid;
  187. TString Service;
  188. TString Data;
  189. inline TRequestPacket(const TPacket& p) {
  190. NPrivate::Deserialize(p, *this);
  191. }
  192. inline TRequestPacket(const TString& srv, const TString& data)
  193. : Guid(GenerateGuid())
  194. , Service(srv)
  195. , Data(data)
  196. {
  197. }
  198. inline char Type() const noexcept {
  199. return static_cast<char>(PT_REQUEST);
  200. }
  201. inline void Serialize(TPacketOutput* out) const {
  202. Save(out, Guid);
  203. Save(out, Service);
  204. Save(out, Data);
  205. }
  206. inline void Deserialize(TPacketInput* in) {
  207. Load(in, Guid);
  208. Load(in, Service);
  209. Load(in, Data);
  210. }
  211. };
  212. template <class TStore>
  213. struct TResponsePacket {
  214. TString Guid;
  215. TStore Data;
  216. inline TResponsePacket(const TString& guid, TStore& data)
  217. : Guid(guid)
  218. {
  219. Data.swap(data);
  220. }
  221. inline TResponsePacket(const TPacket& p) {
  222. NPrivate::Deserialize(p, *this);
  223. }
  224. inline char Type() const noexcept {
  225. return static_cast<char>(PT_RESPONSE);
  226. }
  227. inline void Serialize(TPacketOutput* out) const {
  228. Save(out, Guid);
  229. Save(out, Data);
  230. }
  231. inline void Deserialize(TPacketInput* in) {
  232. Load(in, Guid);
  233. Load(in, Data);
  234. }
  235. };
  236. struct TStopPacket {
  237. inline char Type() const noexcept {
  238. return static_cast<char>(PT_STOP);
  239. }
  240. inline void Serialize(TPacketOutput* out) const {
  241. Save(out, TString("stop packet"));
  242. }
  243. };
  244. struct TBindError: public TSystemError {
  245. };
  246. struct TSocketDescr {
  247. inline TSocketDescr(TSocketHolder& s, int family)
  248. : S(s.Release())
  249. , Family(family)
  250. {
  251. }
  252. TSocketHolder S;
  253. int Family;
  254. };
  255. typedef TAutoPtr<TSocketDescr> TSocketRef;
  256. typedef TVector<TSocketRef> TSockets;
  257. static inline void CreateSocket(TSocketHolder& s, const IRemoteAddr& addr) {
  258. TSocketHolder res(socket(addr.Addr()->sa_family, SOCK_DGRAM, IPPROTO_UDP));
  259. if (!res) {
  260. ythrow TSystemError() << "can not create socket";
  261. }
  262. FixIPv6ListenSocket(res);
  263. if (bind(res, addr.Addr(), addr.Len()) != 0) {
  264. ythrow TBindError() << "can not bind " << PrintHostAndPort(addr);
  265. }
  266. res.Swap(s);
  267. }
  268. static inline void CreateSockets(TSockets& s, ui16 port) {
  269. TNetworkAddress addr(port);
  270. for (TNetworkAddress::TIterator it = addr.Begin(); it != addr.End(); ++it) {
  271. TSocketHolder res;
  272. CreateSocket(res, TAddrInfo(&*it));
  273. s.push_back(new TSocketDescr(res, it->ai_family));
  274. }
  275. }
  276. static inline void CreateSocketsOnRandomPort(TSockets& s) {
  277. while (true) {
  278. try {
  279. TSockets tmp;
  280. CreateSockets(tmp, 5000 + (RandomNumber<ui16>() % 1000));
  281. tmp.swap(s);
  282. return;
  283. } catch (const TBindError&) {
  284. }
  285. }
  286. }
  287. typedef ui64 TTimeStamp;
  288. static inline TTimeStamp TimeStamp() noexcept {
  289. return GetCycleCount() >> 31;
  290. }
  291. struct TRequestDescr: public TIntrusiveListItem<TRequestDescr> {
  292. inline TRequestDescr(const TString& guid, const TNotifyHandleRef& hndl, const TMessage& msg)
  293. : Guid(guid)
  294. , Hndl(hndl)
  295. , Msg(msg)
  296. , TS(TimeStamp())
  297. {
  298. }
  299. TString Guid;
  300. TNotifyHandleRef Hndl;
  301. TMessage Msg;
  302. TTimeStamp TS;
  303. };
  304. typedef TAutoPtr<TRequestDescr> TRequestDescrRef;
  305. class TProto {
  306. class TRequest: public IRequest, public TRequestPacket {
  307. public:
  308. inline TRequest(TPacket& p, TProto* parent)
  309. : TRequestPacket(p)
  310. , Addr_(std::move(p.Addr))
  311. , H_(PrintHostByRfc(*Addr_))
  312. , P_(parent)
  313. {
  314. }
  315. TStringBuf Scheme() const override {
  316. return TStringBuf("udp");
  317. }
  318. TString RemoteHost() const override {
  319. return H_;
  320. }
  321. TStringBuf Service() const override {
  322. return ((TRequestPacket&)(*this)).Service;
  323. }
  324. TStringBuf Data() const override {
  325. return ((TRequestPacket&)(*this)).Data;
  326. }
  327. TStringBuf RequestId() const override {
  328. return ((TRequestPacket&)(*this)).Guid;
  329. }
  330. bool Canceled() const override {
  331. //TODO ?
  332. return false;
  333. }
  334. void SendReply(TData& data) override {
  335. P_->Schedule(new TPacket(TResponsePacket<TData>(Guid, data), std::move(Addr_)));
  336. }
  337. void SendError(TResponseError, const TString&) override {
  338. // TODO
  339. }
  340. private:
  341. IRemoteAddrPtr Addr_;
  342. TString H_;
  343. TProto* P_;
  344. };
  345. public:
  346. inline TProto(IOnRequest* cb, TSocketHolder& s)
  347. : CB_(cb)
  348. , ToSendEv_(TSystemEvent::rAuto)
  349. , S_(s.Release())
  350. {
  351. SetSocketTimeout(S_, 10);
  352. Thrs_.push_back(Spawn<TProto, &TProto::ExecuteRecv>(this));
  353. Thrs_.push_back(Spawn<TProto, &TProto::ExecuteSend>(this));
  354. }
  355. inline ~TProto() {
  356. Schedule(new TPacket(TStopPacket(), GetSendAddr(S_)));
  357. for (size_t i = 0; i < Thrs_.size(); ++i) {
  358. Thrs_[i]->Join();
  359. }
  360. }
  361. inline TPacketRef Recv() {
  362. TBuffer tmp;
  363. tmp.Resize(128 * 1024);
  364. while (true) {
  365. try {
  366. return new TPacket(S_, tmp);
  367. } catch (...) {
  368. Cdbg << CurrentExceptionMessage() << Endl;
  369. continue;
  370. }
  371. }
  372. }
  373. typedef THashMap<TString, TRequestDescrRef> TInFlyBase;
  374. struct TInFly: public TInFlyBase, public TIntrusiveList<TRequestDescr> {
  375. typedef TInFlyBase::iterator TIter;
  376. typedef TInFlyBase::const_iterator TContsIter;
  377. inline void Insert(TRequestDescrRef& d) {
  378. PushBack(d.Get());
  379. (*this)[d->Guid] = d;
  380. }
  381. inline void EraseStale() noexcept {
  382. const TTimeStamp now = TimeStamp();
  383. for (TIterator it = Begin(); (it != End()) && (it->TS < now) && ((now - it->TS) > 120);) {
  384. it->Hndl->NotifyError("request timeout");
  385. TString safe_key = (it++)->Guid;
  386. erase(safe_key);
  387. }
  388. }
  389. };
  390. inline void ExecuteRecv() {
  391. SetHighestThreadPriority();
  392. TInFly infly;
  393. while (true) {
  394. TPacketRef p = Recv();
  395. switch (static_cast<EPacketType>(p->Type())) {
  396. case PT_REQUEST:
  397. if (CB_) {
  398. CB_->OnRequest(new TRequest(*p, this));
  399. } else {
  400. //skip request in case of client
  401. }
  402. break;
  403. case PT_RESPONSE: {
  404. CancelStaleRequests(infly);
  405. TResponsePacket<TString> rp(*p);
  406. TInFly::TIter it = static_cast<TInFlyBase&>(infly).find(rp.Guid);
  407. if (it == static_cast<TInFlyBase&>(infly).end()) {
  408. break;
  409. }
  410. const TRequestDescrRef& d = it->second;
  411. d->Hndl->NotifyResponse(rp.Data);
  412. infly.erase(it);
  413. break;
  414. }
  415. case PT_STOP:
  416. Schedule(nullptr);
  417. return;
  418. case PT_TIMEOUT:
  419. CancelStaleRequests(infly);
  420. break;
  421. }
  422. }
  423. }
  424. inline void ExecuteSend() {
  425. SetHighestThreadPriority();
  426. while (true) {
  427. TPacketRef p;
  428. while (!ToSend_.Dequeue(&p)) {
  429. ToSendEv_.Wait();
  430. }
  431. //shutdown
  432. if (!p) {
  433. return;
  434. }
  435. p->SendTo(S_);
  436. }
  437. }
  438. inline void Schedule(TPacketRef p) {
  439. ToSend_.Enqueue(p);
  440. ToSendEv_.Signal();
  441. }
  442. inline void Schedule(TRequestDescrRef dsc, TPacketRef p) {
  443. ScheduledReqs_.Enqueue(dsc);
  444. Schedule(p);
  445. }
  446. protected:
  447. void CancelStaleRequests(TInFly& infly) {
  448. TRequestDescrRef d;
  449. while (ScheduledReqs_.Dequeue(&d)) {
  450. infly.Insert(d);
  451. }
  452. infly.EraseStale();
  453. }
  454. IOnRequest* CB_;
  455. NNeh::TAutoLockFreeQueue<TPacket> ToSend_;
  456. NNeh::TAutoLockFreeQueue<TRequestDescr> ScheduledReqs_;
  457. TSystemEvent ToSendEv_;
  458. TSocketHolder S_;
  459. TVector<TThreadRef> Thrs_;
  460. };
  461. class TProtos {
  462. public:
  463. inline TProtos() {
  464. TSockets s;
  465. CreateSocketsOnRandomPort(s);
  466. Init(nullptr, s);
  467. }
  468. inline TProtos(IOnRequest* cb, ui16 port) {
  469. TSockets s;
  470. CreateSockets(s, port);
  471. Init(cb, s);
  472. }
  473. static inline TProtos* Instance() {
  474. return Singleton<TProtos>();
  475. }
  476. inline void Schedule(const TMessage& msg, const TNotifyHandleRef& hndl) {
  477. TParsedLocation loc(msg.Addr);
  478. const TNetworkAddress* addr = &CachedThrResolve(TResolveInfo(loc.Host, loc.GetPort()))->Addr;
  479. for (TNetworkAddress::TIterator ai = addr->Begin(); ai != addr->End(); ++ai) {
  480. TProto* proto = Find(ai->ai_family);
  481. if (proto) {
  482. TRequestPacket rp(ToString(loc.Service), msg.Data);
  483. TRequestDescrRef rd(new TRequestDescr(rp.Guid, hndl, msg));
  484. IRemoteAddrPtr raddr(new TAddrInfo(&*ai));
  485. TPacketRef p(new TPacket(rp, std::move(raddr)));
  486. proto->Schedule(rd, p);
  487. return;
  488. }
  489. }
  490. ythrow yexception() << "unsupported protocol family";
  491. }
  492. private:
  493. inline void Init(IOnRequest* cb, TSockets& s) {
  494. for (auto& it : s) {
  495. P_[it->Family] = new TProto(cb, it->S);
  496. }
  497. }
  498. inline TProto* Find(int family) const {
  499. TProtoStorage::const_iterator it = P_.find(family);
  500. if (it == P_.end()) {
  501. return nullptr;
  502. }
  503. return it->second.Get();
  504. }
  505. private:
  506. typedef TAutoPtr<TProto> TProtoRef;
  507. typedef THashMap<int, TProtoRef> TProtoStorage;
  508. TProtoStorage P_;
  509. };
  510. class TRequester: public IRequester, public TProtos {
  511. public:
  512. inline TRequester(IOnRequest* cb, ui16 port)
  513. : TProtos(cb, port)
  514. {
  515. }
  516. };
  517. class TProtocol: public IProtocol {
  518. public:
  519. IRequesterRef CreateRequester(IOnRequest* cb, const TParsedLocation& loc) override {
  520. return new TRequester(cb, loc.GetPort());
  521. }
  522. THandleRef ScheduleRequest(const TMessage& msg, IOnRecv* fallback, TServiceStatRef& ss) override {
  523. TNotifyHandleRef ret(new TUdpHandle(fallback, msg, !ss ? nullptr : new TStatCollector(ss)));
  524. TProtos::Instance()->Schedule(msg, ret);
  525. return ret.Get();
  526. }
  527. TStringBuf Scheme() const noexcept override {
  528. return TStringBuf("udp");
  529. }
  530. };
  531. }
  532. }
  533. IProtocol* NNeh::UdpProtocol() {
  534. return Singleton<NUdp::TProtocol>();
  535. }