inproc.cpp 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212
  1. #include "inproc.h"
  2. #include "details.h"
  3. #include "neh.h"
  4. #include "location.h"
  5. #include "utils.h"
  6. #include "factory.h"
  7. #include <util/generic/ptr.h>
  8. #include <util/generic/string.h>
  9. #include <util/generic/singleton.h>
  10. #include <util/stream/output.h>
  11. #include <util/string/cast.h>
  12. using namespace NNeh;
  13. namespace {
  14. const TString canceled = "canceled";
  15. struct TInprocHandle: public TNotifyHandle {
  16. inline TInprocHandle(const TMessage& msg, IOnRecv* r, TStatCollector* sc) noexcept
  17. : TNotifyHandle(r, msg, sc)
  18. , Canceled_(false)
  19. , NotifyCnt_(0)
  20. {
  21. }
  22. bool MessageSendedCompletely() const noexcept override {
  23. return true;
  24. }
  25. void Cancel() noexcept override {
  26. THandle::Cancel(); //inform stat collector
  27. Canceled_ = true;
  28. try {
  29. if (MarkReplied()) {
  30. NotifyError(new TError(canceled, TError::Cancelled));
  31. }
  32. } catch (...) {
  33. Cdbg << "inproc canc. " << CurrentExceptionMessage() << Endl;
  34. }
  35. }
  36. inline void SendReply(const TString& resp) {
  37. if (MarkReplied()) {
  38. NotifyResponse(resp);
  39. }
  40. }
  41. inline void SendError(const TString& details) {
  42. if (MarkReplied()) {
  43. NotifyError(new TError{details, TError::ProtocolSpecific, 1});
  44. }
  45. }
  46. void Disable() {
  47. F_ = nullptr;
  48. MarkReplied();
  49. }
  50. inline bool Canceled() const noexcept {
  51. return Canceled_;
  52. }
  53. private:
  54. //return true when mark first reply
  55. inline bool MarkReplied() {
  56. return AtomicAdd(NotifyCnt_, 1) == 1;
  57. }
  58. private:
  59. TAtomicBool Canceled_;
  60. TAtomic NotifyCnt_;
  61. };
  62. typedef TIntrusivePtr<TInprocHandle> TInprocHandleRef;
  63. class TInprocLocation: public TParsedLocation {
  64. public:
  65. TInprocLocation(const TStringBuf& addr)
  66. : TParsedLocation(addr)
  67. {
  68. Service.Split('?', InprocService, InprocId);
  69. }
  70. TStringBuf InprocService;
  71. TStringBuf InprocId;
  72. };
  73. class TRequest: public IRequest {
  74. public:
  75. TRequest(const TInprocHandleRef& hndl)
  76. : Location(hndl->Message().Addr)
  77. , Handle_(hndl)
  78. {
  79. }
  80. TStringBuf Scheme() const override {
  81. return TStringBuf("inproc");
  82. }
  83. TString RemoteHost() const override {
  84. return TString();
  85. }
  86. TStringBuf Service() const override {
  87. return Location.InprocService;
  88. }
  89. TStringBuf Data() const override {
  90. return Handle_->Message().Data;
  91. }
  92. TStringBuf RequestId() const override {
  93. return Location.InprocId;
  94. }
  95. bool Canceled() const override {
  96. return Handle_->Canceled();
  97. }
  98. void SendReply(TData& data) override {
  99. Handle_->SendReply(TString(data.data(), data.size()));
  100. }
  101. void SendError(TResponseError, const TString& details) override {
  102. Handle_->SendError(details);
  103. }
  104. const TMessage Request;
  105. const TInprocLocation Location;
  106. private:
  107. TInprocHandleRef Handle_;
  108. };
  109. class TInprocRequester: public IRequester {
  110. public:
  111. TInprocRequester(IOnRequest*& rqcb)
  112. : RegisteredCallback_(rqcb)
  113. {
  114. }
  115. ~TInprocRequester() override {
  116. RegisteredCallback_ = nullptr;
  117. }
  118. private:
  119. IOnRequest*& RegisteredCallback_;
  120. };
  121. class TInprocRequesterStg: public IProtocol {
  122. public:
  123. inline TInprocRequesterStg() {
  124. V_.resize(1 + (size_t)Max<ui16>());
  125. }
  126. IRequesterRef CreateRequester(IOnRequest* cb, const TParsedLocation& loc) override {
  127. IOnRequest*& rqcb = Find(loc);
  128. if (!rqcb) {
  129. rqcb = cb;
  130. } else if (rqcb != cb) {
  131. ythrow yexception() << "shit happen - already registered";
  132. }
  133. return new TInprocRequester(rqcb);
  134. }
  135. THandleRef ScheduleRequest(const TMessage& msg, IOnRecv* fallback, TServiceStatRef& ss) override {
  136. TInprocHandleRef hndl(new TInprocHandle(msg, fallback, !ss ? nullptr : new TStatCollector(ss)));
  137. try {
  138. TAutoPtr<TRequest> req(new TRequest(hndl));
  139. if (IOnRequest* cb = Find(req->Location)) {
  140. cb->OnRequest(req.Release());
  141. } else {
  142. throw yexception() << TStringBuf("not found inproc location");
  143. }
  144. } catch (...) {
  145. hndl->Disable();
  146. throw;
  147. }
  148. return THandleRef(hndl.Get());
  149. }
  150. TStringBuf Scheme() const noexcept override {
  151. return TStringBuf("inproc");
  152. }
  153. private:
  154. static inline ui16 Id(const TParsedLocation& loc) {
  155. return loc.GetPort();
  156. }
  157. inline IOnRequest*& Find(const TParsedLocation& loc) {
  158. return Find(Id(loc));
  159. }
  160. inline IOnRequest*& Find(ui16 id) {
  161. return V_[id];
  162. }
  163. private:
  164. TVector<IOnRequest*> V_;
  165. };
  166. }
  167. IProtocol* NNeh::InProcProtocol() {
  168. return Singleton<TInprocRequesterStg>();
  169. }