123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508 |
- #include "details.h"
- #include "factory.h"
- #include "http_common.h"
- #include "location.h"
- #include "multi.h"
- #include "netliba.h"
- #include "netliba_udp_http.h"
- #include "lfqueue.h"
- #include "utils.h"
- #include <library/cpp/dns/cache.h>
- #include <util/generic/hash.h>
- #include <util/generic/singleton.h>
- #include <util/generic/vector.h>
- #include <util/generic/yexception.h>
- #include <util/string/cast.h>
- #include <util/system/yassert.h>
- #include <atomic>
- using namespace NDns;
- using namespace NNeh;
- namespace NNeh {
- size_t TNetLibaOptions::ClientThreads = 4;
- TDuration TNetLibaOptions::AckTailEffect = TDuration::Seconds(30);
- bool TNetLibaOptions::Set(TStringBuf name, TStringBuf value) {
- #define NETLIBA_TRY_SET(optType, optName) \
- if (name == TStringBuf(#optName)) { \
- optName = FromString<optType>(value); \
- }
- NETLIBA_TRY_SET(size_t, ClientThreads)
- else NETLIBA_TRY_SET(TDuration, AckTailEffect) else {
- return false;
- }
- return true;
- }
- }
- namespace {
- namespace NNetLiba {
- using namespace NNetliba;
- using namespace NNehNetliba;
- typedef NNehNetliba::IRequester INetLibaRequester;
- typedef TAutoPtr<TUdpHttpRequest> TUdpHttpRequestPtr;
- typedef TAutoPtr<TUdpHttpResponse> TUdpHttpResponsePtr;
- static inline const addrinfo* FindIPBase(const TNetworkAddress* addr, int family) {
- for (TNetworkAddress::TIterator it = addr->Begin(); it != addr->End(); ++it) {
- if (it->ai_family == family) {
- return &*it;
- }
- }
- return nullptr;
- }
- static inline const sockaddr_in6& FindIP(const TNetworkAddress* addr) {
- //prefer ipv6
- const addrinfo* ret = FindIPBase(addr, AF_INET6);
- if (!ret) {
- ret = FindIPBase(addr, AF_INET);
- }
- if (!ret) {
- ythrow yexception() << "ip not supported by " << *addr;
- }
- return *(const sockaddr_in6*)(ret->ai_addr);
- }
- class TLastAckTimes {
- struct TTimeVal {
- TTimeVal()
- : Val(0)
- {
- }
- std::atomic<TInstant::TValue> Val;
- };
- public:
- TInstant::TValue Get(size_t idAddr) {
- return Tm_.Get(idAddr).Val.load(std::memory_order_acquire);
- }
- void Set(size_t idAddr) {
- Tm_.Get(idAddr).Val.store(TInstant::Now().GetValue(), std::memory_order_release);
- }
- static TLastAckTimes& Common() {
- return *Singleton<TLastAckTimes>();
- }
- private:
- NNeh::NHttp::TLockFreeSequence<TTimeVal> Tm_;
- };
- class TRequest: public TSimpleHandle {
- public:
- inline TRequest(TIntrusivePtr<INetLibaRequester>& r, size_t idAddr, const TMessage& msg, IOnRecv* cb, TStatCollector* s)
- : TSimpleHandle(cb, msg, s)
- , R_(r)
- , IdAddr_(idAddr)
- , Notified_(false)
- {
- CreateGuid(&Guid_);
- }
- void Cancel() noexcept override {
- TSimpleHandle::Cancel();
- R_->CancelRequest(Guid_);
- }
- inline const TString& Addr() const noexcept {
- return Message().Addr;
- }
- inline const TGUID& Guid() const noexcept {
- return Guid_;
- }
- //return false if already notifie
- inline bool SetNotified() noexcept {
- bool ret = Notified_;
- Notified_ = true;
- return !ret;
- }
- void OnSend() {
- if (TNetLibaOptions::AckTailEffect.GetValue() && TLastAckTimes::Common().Get(IdAddr_) + TNetLibaOptions::AckTailEffect.GetValue() > TInstant::Now().GetValue()) {
- //fake(predicted) completing detection
- SetSendComplete();
- }
- }
- void OnRequestAck() {
- if (TNetLibaOptions::AckTailEffect.GetValue()) {
- TLastAckTimes::Common().Set(IdAddr_);
- }
- SetSendComplete();
- }
- private:
- TIntrusivePtr<INetLibaRequester> R_;
- size_t IdAddr_;
- TGUID Guid_;
- bool Notified_;
- };
- typedef TIntrusivePtr<TRequest> TRequestRef;
- class TNetLibaBus {
- class TEventsHandler: public IEventsCollector {
- typedef THashMap<TGUID, TRequestRef, TGUIDHash> TInFly;
- public:
- inline void OnSend(TRequestRef& req) {
- Q_.Enqueue(req);
- req->OnSend();
- }
- private:
- void UpdateInFly() {
- TRequestRef req;
- while (Q_.Dequeue(&req)) {
- if (!req) {
- return;
- }
- InFly_[req->Guid()] = req;
- }
- }
- void AddRequest(TUdpHttpRequest* req) override {
- //ignore received requests in client
- delete req;
- }
- void AddResponse(TUdpHttpResponse* resp) override {
- TUdpHttpResponsePtr ptr(resp);
- UpdateInFly();
- TInFly::iterator it = InFly_.find(resp->ReqId);
- Y_ABORT_UNLESS(it != InFly_.end(), "incorrect incoming message");
- TRequestRef& req = it->second;
- if (req->SetNotified()) {
- if (resp->Ok == TUdpHttpResponse::OK) {
- req->NotifyResponse(TString(resp->Data.data(), resp->Data.size()));
- } else {
- if (resp->Ok == TUdpHttpResponse::CANCELED) {
- req->NotifyError(new TError(resp->Error, TError::Cancelled));
- } else {
- req->NotifyError(new TError(resp->Error));
- }
- }
- }
- InFly_.erase(it);
- }
- void AddCancel(const TGUID& guid) override {
- UpdateInFly();
- TInFly::iterator it = InFly_.find(guid);
- if (it != InFly_.end() && it->second->SetNotified()) {
- it->second->NotifyError("Canceled (before ack)");
- }
- }
- void AddRequestAck(const TGUID& guid) override {
- UpdateInFly();
- TInFly::iterator it = InFly_.find(guid);
- Y_ABORT_UNLESS(it != InFly_.end(), "incorrect complete notification");
- it->second->OnRequestAck();
- }
- private:
- TLockFreeQueue<TRequestRef> Q_;
- TInFly InFly_;
- };
- struct TClientThread {
- TClientThread(int physicalCpu)
- : EH_(new TEventsHandler())
- , R_(CreateHttpUdpRequester(0, IEventsCollectorRef(EH_.Get()), physicalCpu))
- {
- R_->EnableReportRequestAck();
- }
- ~TClientThread() {
- R_->StopNoWait();
- }
- TIntrusivePtr<TEventsHandler> EH_;
- TIntrusivePtr<INetLibaRequester> R_;
- };
- public:
- TNetLibaBus() {
- for (size_t i = 0; i < TNetLibaOptions::ClientThreads; ++i) {
- Clnt_.push_back(new TClientThread(i));
- }
- }
- inline THandleRef Schedule(const TMessage& msg, IOnRecv* cb, TServiceStatRef& ss) {
- TParsedLocation loc(msg.Addr);
- TUdpAddress addr;
- const TResolvedHost* resHost = CachedResolve(TResolveInfo(loc.Host, loc.GetPort()));
- GetUdpAddress(&addr, FindIP(&resHost->Addr));
- TClientThread& clnt = *Clnt_[resHost->Id % Clnt_.size()];
- TIntrusivePtr<INetLibaRequester> rr = clnt.R_;
- TRequestRef req(new TRequest(rr, resHost->Id, msg, cb, !ss ? nullptr : new TStatCollector(ss)));
- clnt.EH_->OnSend(req);
- rr->SendRequest(addr, ToString(loc.Service), msg.Data, req->Guid());
- return THandleRef(req.Get());
- }
- private:
- TVector<TAutoPtr<TClientThread>> Clnt_;
- };
- //server
- class TRequester: public TThrRefBase {
- struct TSrvRequestState: public TAtomicRefCount<TSrvRequestState> {
- TSrvRequestState()
- : Canceled(false)
- {
- }
- TAtomicBool Canceled;
- };
- class TRequest: public IRequest {
- public:
- inline TRequest(TAutoPtr<TUdpHttpRequest> req, TIntrusivePtr<TSrvRequestState> state, TRequester* parent)
- : R_(req)
- , S_(state)
- , P_(parent)
- {
- }
- ~TRequest() override {
- if (!!P_) {
- P_->RequestProcessed(this);
- }
- }
- TStringBuf Scheme() const override {
- return TStringBuf("netliba");
- }
- TString RemoteHost() const override {
- if (!H_) {
- TUdpAddress tmp(R_->PeerAddress);
- tmp.Scope = 0; //discard scope from serialized addr
- TString addr = GetAddressAsString(tmp);
- TStringBuf host, port;
- TStringBuf(addr).RSplit(':', host, port);
- H_ = host;
- }
- return H_;
- }
- TStringBuf Service() const override {
- return TStringBuf(R_->Url.c_str(), R_->Url.length());
- }
- TStringBuf Data() const override {
- return TStringBuf((const char*)R_->Data.data(), R_->Data.size());
- }
- TStringBuf RequestId() const override {
- const TGUID& g = R_->ReqId;
- return TStringBuf((const char*)g.dw, sizeof(g.dw));
- }
- bool Canceled() const override {
- return S_->Canceled;
- }
- void SendReply(TData& data) override {
- TIntrusivePtr<TRequester> p;
- p.Swap(P_);
- if (!!p) {
- if (!Canceled()) {
- p->R_->SendResponse(R_->ReqId, &data);
- }
- p->RequestProcessed(this);
- }
- }
- void SendError(TResponseError, const TString&) override {
- // TODO
- }
- inline const TGUID& RequestGuid() const noexcept {
- return R_->ReqId;
- }
- private:
- TAutoPtr<TUdpHttpRequest> R_;
- mutable TString H_;
- TIntrusivePtr<TSrvRequestState> S_;
- TIntrusivePtr<TRequester> P_;
- };
- class TEventsHandler: public IEventsCollector {
- public:
- TEventsHandler(TRequester* parent)
- {
- P_.store(parent, std::memory_order_release);
- }
- void RequestProcessed(const TRequest* r) {
- FinishedReqs_.Enqueue(r->RequestGuid());
- }
- //thread safe method for disable proxy callbacks to parent (OnRequest(...))
- void SyncStop() {
- P_.store(nullptr, std::memory_order_release);
- while (!RequesterPtrPotector_.TryAcquire()) {
- Sleep(TDuration::MicroSeconds(100));
- }
- RequesterPtrPotector_.Release();
- }
- private:
- typedef THashMap<TGUID, TIntrusivePtr<TSrvRequestState>, TGUIDHash> TStatesInProcessRequests;
- void AddRequest(TUdpHttpRequest* req) override {
- TUdpHttpRequestPtr ptr(req);
- TSrvRequestState* state = new TSrvRequestState();
- InProcess_[req->ReqId] = state;
- try {
- TGuard<TSpinLock> m(RequesterPtrPotector_);
- if (TRequester* p = P_.load(std::memory_order_acquire)) {
- p->OnRequest(ptr, state); //move req. owning to parent
- }
- } catch (...) {
- Cdbg << "ignore exc.: " << CurrentExceptionMessage() << Endl;
- }
- }
- void AddResponse(TUdpHttpResponse*) override {
- Y_ABORT("unexpected response in neh netliba server");
- }
- void AddCancel(const TGUID& guid) override {
- UpdateInProcess();
- TStatesInProcessRequests::iterator ustate = InProcess_.find(guid);
- if (ustate != InProcess_.end())
- ustate->second->Canceled = true;
- }
- void AddRequestAck(const TGUID&) override {
- Y_ABORT("unexpected acc in neh netliba server");
- }
- void UpdateInProcess() {
- TGUID guid;
- while (FinishedReqs_.Dequeue(&guid)) {
- InProcess_.erase(guid);
- }
- }
- private:
- TLockFreeStack<TGUID> FinishedReqs_; //processed requests (responded or destroyed)
- TStatesInProcessRequests InProcess_;
- TSpinLock RequesterPtrPotector_;
- std::atomic<TRequester*> P_;
- };
- public:
- inline TRequester(IOnRequest* cb, ui16 port)
- : CB_(cb)
- , EH_(new TEventsHandler(this))
- , R_(CreateHttpUdpRequester(port, EH_.Get()))
- {
- R_->EnableReportRequestCancel();
- }
- ~TRequester() override {
- Shutdown();
- }
- void Shutdown() noexcept {
- if (!Shutdown_) {
- Shutdown_ = true;
- R_->StopNoWait();
- EH_->SyncStop();
- }
- }
- void OnRequest(TUdpHttpRequestPtr req, TSrvRequestState* state) {
- CB_->OnRequest(new TRequest(req, state, this));
- }
- void RequestProcessed(const TRequest* r) {
- EH_->RequestProcessed(r);
- }
- private:
- IOnRequest* CB_;
- TIntrusivePtr<TEventsHandler> EH_;
- TIntrusivePtr<INetLibaRequester> R_;
- bool Shutdown_ = false;
- };
- typedef TIntrusivePtr<TRequester> TRequesterRef;
- class TRequesterAutoShutdown: public NNeh::IRequester {
- public:
- TRequesterAutoShutdown(const TRequesterRef& r)
- : R_(r)
- {
- }
- ~TRequesterAutoShutdown() override {
- R_->Shutdown();
- }
- private:
- TRequesterRef R_;
- };
- class TProtocol: public IProtocol {
- public:
- THandleRef ScheduleRequest(const TMessage& msg, IOnRecv* fallback, TServiceStatRef& ss) override {
- return Singleton<TNetLibaBus>()->Schedule(msg, fallback, ss);
- }
- NNeh::IRequesterRef CreateRequester(IOnRequest* cb, const TParsedLocation& loc) override {
- TRequesterRef r(new TRequester(cb, loc.GetPort()));
- return new TRequesterAutoShutdown(r);
- }
- TStringBuf Scheme() const noexcept override {
- return TStringBuf("netliba");
- }
- };
- }
- }
- IProtocol* NNeh::NetLibaProtocol() {
- return Singleton<NNetLiba::TProtocol>();
- }
|