123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322 |
- #include "rpc.h"
- #include "rq.h"
- #include "multi.h"
- #include "location.h"
- #include <library/cpp/threading/thread_local/thread_local.h>
- #include <util/generic/hash.h>
- #include <util/thread/factory.h>
- #include <util/system/spinlock.h>
- using namespace NNeh;
- namespace {
- typedef std::pair<TString, IServiceRef> TServiceDescr;
- typedef TVector<TServiceDescr> TServicesBase;
- class TServices: public TServicesBase, public TThrRefBase, public IOnRequest {
- typedef THashMap<TStringBuf, IServiceRef> TSrvs;
- struct TVersionedServiceMap {
- TSrvs Srvs;
- i64 Version = 0;
- };
- struct TFunc: public IThreadFactory::IThreadAble {
- inline TFunc(TServices* parent)
- : Parent(parent)
- {
- }
- void DoExecute() override {
- TThread::SetCurrentThreadName("NehTFunc");
- TVersionedServiceMap mp;
- while (true) {
- IRequestRef req = Parent->RQ_->Next();
- if (!req) {
- break;
- }
- Parent->ServeRequest(mp, req);
- }
- Parent->RQ_->Schedule(nullptr);
- }
- TServices* Parent;
- };
- public:
- inline TServices()
- : RQ_(CreateRequestQueue())
- {
- }
- inline TServices(TCheck check)
- : RQ_(CreateRequestQueue())
- , C_(check)
- {
- }
- inline ~TServices() override {
- LF_.Destroy();
- }
- inline void Add(const TString& service, IServiceRef srv) {
- TGuard<TSpinLock> guard(L_);
- push_back(std::make_pair(service, srv));
- AtomicIncrement(SelfVersion_);
- }
- inline void Listen() {
- Y_ENSURE(!HasLoop_ || !*HasLoop_);
- HasLoop_ = false;
- RR_ = MultiRequester(ListenAddrs(), this);
- }
- inline void Loop(size_t threads) {
- Y_ENSURE(!HasLoop_ || *HasLoop_);
- HasLoop_ = true;
- TIntrusivePtr<TServices> self(this);
- IRequesterRef rr = MultiRequester(ListenAddrs(), this);
- TFunc func(this);
- typedef TAutoPtr<IThreadFactory::IThread> IThreadRef;
- TVector<IThreadRef> thrs;
- for (size_t i = 1; i < threads; ++i) {
- thrs.push_back(SystemThreadFactory()->Run(&func));
- }
- func.Execute();
- for (size_t i = 0; i < thrs.size(); ++i) {
- thrs[i]->Join();
- }
- RQ_->Clear();
- }
- inline void ForkLoop(size_t threads) {
- Y_ENSURE(!HasLoop_ || *HasLoop_);
- HasLoop_ = true;
- //here we can have trouble with binding port(s), so expect exceptions
- IRequesterRef rr = MultiRequester(ListenAddrs(), this);
- LF_.Reset(new TLoopFunc(this, threads, rr));
- }
- inline void Stop() {
- RQ_->Schedule(nullptr);
- }
- inline void SyncStopFork() {
- Stop();
- if (LF_) {
- LF_->SyncStop();
- }
- RQ_->Clear();
- LF_.Destroy();
- }
- void OnRequest(IRequestRef req) override {
- if (C_) {
- if (auto error = C_(req)) {
- req->SendError(*error);
- return;
- }
- }
- if (!*HasLoop_) {
- ServeRequest(LocalMap_.GetRef(), req);
- } else {
- RQ_->Schedule(req);
- }
- }
- private:
- class TLoopFunc: public TFunc {
- public:
- TLoopFunc(TServices* parent, size_t threads, IRequesterRef& rr)
- : TFunc(parent)
- , RR_(rr)
- {
- T_.reserve(threads);
- try {
- for (size_t i = 0; i < threads; ++i) {
- T_.push_back(SystemThreadFactory()->Run(this));
- }
- } catch (...) {
- //paranoid mode on
- SyncStop();
- throw;
- }
- }
- ~TLoopFunc() override {
- try {
- SyncStop();
- } catch (...) {
- Cdbg << TStringBuf("neh rpc ~loop_func: ") << CurrentExceptionMessage() << Endl;
- }
- }
- void SyncStop() {
- if (!T_) {
- return;
- }
- Parent->Stop();
- for (size_t i = 0; i < T_.size(); ++i) {
- T_[i]->Join();
- }
- T_.clear();
- }
- private:
- typedef TAutoPtr<IThreadFactory::IThread> IThreadRef;
- TVector<IThreadRef> T_;
- IRequesterRef RR_;
- };
- inline void ServeRequest(TVersionedServiceMap& mp, IRequestRef req) {
- if (!req) {
- return;
- }
- const TStringBuf name = req->Service();
- TSrvs::const_iterator it = mp.Srvs.find(name);
- if (Y_UNLIKELY(it == mp.Srvs.end())) {
- if (UpdateServices(mp.Srvs, mp.Version)) {
- it = mp.Srvs.find(name);
- }
- }
- if (Y_UNLIKELY(it == mp.Srvs.end())) {
- it = mp.Srvs.find(TStringBuf("*"));
- }
- if (Y_UNLIKELY(it == mp.Srvs.end())) {
- req->SendError(IRequest::NotExistService);
- } else {
- try {
- it->second->ServeRequest(req);
- } catch (...) {
- Cdbg << CurrentExceptionMessage() << Endl;
- }
- }
- }
- inline bool UpdateServices(TSrvs& srvs, i64& version) const {
- if (AtomicGet(SelfVersion_) == version) {
- return false;
- }
- srvs.clear();
- TGuard<TSpinLock> guard(L_);
- for (const auto& it : *this) {
- srvs[TParsedLocation(it.first).Service] = it.second;
- }
- version = AtomicGet(SelfVersion_);
- return true;
- }
- inline TListenAddrs ListenAddrs() const {
- TListenAddrs addrs;
- {
- TGuard<TSpinLock> guard(L_);
- for (const auto& it : *this) {
- addrs.push_back(it.first);
- }
- }
- return addrs;
- }
- TSpinLock L_;
- IRequestQueueRef RQ_;
- THolder<TLoopFunc> LF_;
- TAtomic SelfVersion_ = 1;
- TCheck C_;
- NThreading::TThreadLocalValue<TVersionedServiceMap> LocalMap_;
- IRequesterRef RR_;
- TMaybe<bool> HasLoop_;
- };
- class TServicesFace: public IServices {
- public:
- inline TServicesFace()
- : S_(new TServices())
- {
- }
- inline TServicesFace(TCheck check)
- : S_(new TServices(check))
- {
- }
- void DoAdd(const TString& service, IServiceRef srv) override {
- S_->Add(service, srv);
- }
- void Loop(size_t threads) override {
- S_->Loop(threads);
- }
- void ForkLoop(size_t threads) override {
- S_->ForkLoop(threads);
- }
- void SyncStopFork() override {
- S_->SyncStopFork();
- }
- void Stop() override {
- S_->Stop();
- }
- void Listen() override {
- S_->Listen();
- }
- private:
- TIntrusivePtr<TServices> S_;
- };
- }
- IServiceRef NNeh::Wrap(const TServiceFunction& func) {
- struct TWrapper: public IService {
- inline TWrapper(const TServiceFunction& f)
- : F(f)
- {
- }
- void ServeRequest(const IRequestRef& request) override {
- F(request);
- }
- TServiceFunction F;
- };
- return new TWrapper(func);
- }
- IServicesRef NNeh::CreateLoop() {
- return new TServicesFace();
- }
- IServicesRef NNeh::CreateLoop(TCheck check) {
- return new TServicesFace(check);
- }
|