neh.cpp 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145
  1. #include "neh.h"
  2. #include "details.h"
  3. #include "factory.h"
  4. #include <util/generic/list.h>
  5. #include <util/generic/hash_set.h>
  6. #include <util/digest/numeric.h>
  7. #include <util/string/cast.h>
  8. using namespace NNeh;
  9. namespace {
  10. class TMultiRequester: public IMultiRequester {
  11. struct TOps {
  12. template <class T>
  13. inline bool operator()(const T& l, const T& r) const noexcept {
  14. return l.Get() == r.Get();
  15. }
  16. template <class T>
  17. inline size_t operator()(const T& t) const noexcept {
  18. return NumericHash(t.Get());
  19. }
  20. };
  21. struct TOnComplete {
  22. TMultiRequester* Parent;
  23. bool Signalled;
  24. inline TOnComplete(TMultiRequester* parent)
  25. : Parent(parent)
  26. , Signalled(false)
  27. {
  28. }
  29. inline void operator()(TWaitHandle* wh) {
  30. THandleRef req(static_cast<THandle*>(wh));
  31. Signalled = true;
  32. Parent->OnComplete(req);
  33. }
  34. };
  35. public:
  36. void Add(const THandleRef& req) override {
  37. Reqs_.insert(req);
  38. req->Register(WaitQueue_);
  39. }
  40. void Del(const THandleRef& req) override {
  41. Reqs_.erase(req);
  42. }
  43. bool Wait(THandleRef& req, TInstant deadLine) override {
  44. while (Complete_.empty()) {
  45. if (Reqs_.empty()) {
  46. return false;
  47. }
  48. TOnComplete cb(this);
  49. WaitForMultipleObj(*WaitQueue_, deadLine, cb);
  50. if (!cb.Signalled) {
  51. return false;
  52. }
  53. }
  54. req = *Complete_.begin();
  55. Complete_.pop_front();
  56. return true;
  57. }
  58. bool IsEmpty() const override {
  59. return Reqs_.empty() && Complete_.empty();
  60. }
  61. inline void OnComplete(const THandleRef& req) {
  62. Complete_.push_back(req);
  63. Del(req);
  64. }
  65. private:
  66. typedef THashSet<THandleRef, TOps, TOps> TReqs;
  67. typedef TList<THandleRef> TComplete;
  68. TIntrusivePtr<TWaitQueue> WaitQueue_ = MakeIntrusive<TWaitQueue>();
  69. TReqs Reqs_;
  70. TComplete Complete_;
  71. };
  72. inline IProtocol* ProtocolForMessage(const TMessage& msg) {
  73. return ProtocolFactory()->Protocol(TStringBuf(msg.Addr).Before(':'));
  74. }
  75. }
  76. NNeh::TMessage NNeh::TMessage::FromString(const TStringBuf req) {
  77. TStringBuf addr;
  78. TStringBuf data;
  79. req.Split('?', addr, data);
  80. return TMessage(ToString(addr), ToString(data));
  81. }
  82. namespace {
  83. const TString svcFail = "service status: failed";
  84. }
  85. THandleRef NNeh::Request(const TMessage& msg, IOnRecv* fallback) {
  86. TServiceStatRef ss;
  87. if (TServiceStat::Disabled()) {
  88. return ProtocolForMessage(msg)->ScheduleRequest(msg, fallback, ss);
  89. }
  90. ss = GetServiceStat(msg.Addr);
  91. TServiceStat::EStatus es = ss->GetStatus();
  92. if (es == TServiceStat::Ok) {
  93. return ProtocolForMessage(msg)->ScheduleRequest(msg, fallback, ss);
  94. }
  95. if (es == TServiceStat::ReTry) {
  96. //send empty data request for validating service (update TServiceStat info)
  97. TMessage validator;
  98. validator.Addr = msg.Addr;
  99. ProtocolForMessage(msg)->ScheduleRequest(validator, nullptr, ss);
  100. }
  101. TNotifyHandleRef h(new TNotifyHandle(fallback, msg));
  102. h->NotifyError(new TError(svcFail));
  103. return h.Get();
  104. }
  105. THandleRef NNeh::Request(const TString& req, IOnRecv* fallback) {
  106. return Request(TMessage::FromString(req), fallback);
  107. }
  108. IMultiRequesterRef NNeh::CreateRequester() {
  109. return new TMultiRequester();
  110. }
  111. bool NNeh::SetProtocolOption(TStringBuf protoOption, TStringBuf value) {
  112. return ProtocolFactory()->Protocol(protoOption.Before('/'))->SetOption(protoOption.After('/'), value);
  113. }