rpc.h 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155
  1. #pragma once
  2. #include <util/generic/vector.h>
  3. #include <util/generic/ptr.h>
  4. #include <util/generic/string.h>
  5. #include <util/generic/strbuf.h>
  6. #include <util/generic/maybe.h>
  7. #include <util/stream/output.h>
  8. #include <util/datetime/base.h>
  9. #include <functional>
  10. namespace NNeh {
  11. using TData = TVector<char>;
  12. class TDataSaver: public TData, public IOutputStream {
  13. public:
  14. TDataSaver() = default;
  15. ~TDataSaver() override = default;
  16. TDataSaver(TDataSaver&&) noexcept = default;
  17. TDataSaver& operator=(TDataSaver&&) noexcept = default;
  18. void DoWrite(const void* buf, size_t len) override {
  19. insert(end(), (const char*)buf, (const char*)buf + len);
  20. }
  21. };
  22. class IRequest {
  23. public:
  24. IRequest()
  25. : ArrivalTime_(TInstant::Now())
  26. {
  27. }
  28. virtual ~IRequest() = default;
  29. virtual TStringBuf Scheme() const = 0;
  30. virtual TString RemoteHost() const = 0; //IP-literal / IPv4address / reg-name()
  31. virtual TStringBuf Service() const = 0;
  32. virtual TStringBuf Data() const = 0;
  33. virtual TStringBuf RequestId() const = 0;
  34. virtual bool Canceled() const = 0;
  35. virtual void SendReply(TData& data) = 0;
  36. enum TResponseError {
  37. BadRequest, // bad request data - http_code 400
  38. Forbidden, // forbidden request - http_code 403
  39. NotExistService, // not found request handler - http_code 404
  40. TooManyRequests, // too many requests for the handler - http_code 429
  41. InternalError, // s...amthing happen - http_code 500
  42. NotImplemented, // not implemented - http_code 501
  43. BadGateway, // remote backend not available - http_code 502
  44. ServiceUnavailable, // overload - http_code 503
  45. BandwidthLimitExceeded, // 5xx version of 429
  46. MaxResponseError // count error types
  47. };
  48. virtual void SendError(TResponseError err, const TString& details = TString()) = 0;
  49. virtual TInstant ArrivalTime() const {
  50. return ArrivalTime_;
  51. }
  52. private:
  53. TInstant ArrivalTime_;
  54. };
  55. using IRequestRef = TAutoPtr<IRequest>;
  56. struct IOnRequest {
  57. virtual void OnRequest(IRequestRef req) = 0;
  58. };
  59. class TRequestOut: public TDataSaver {
  60. public:
  61. inline TRequestOut(IRequest* req)
  62. : Req_(req)
  63. {
  64. }
  65. ~TRequestOut() override {
  66. try {
  67. Finish();
  68. } catch (...) {
  69. }
  70. }
  71. void DoFinish() override {
  72. if (Req_) {
  73. Req_->SendReply(*this);
  74. Req_ = nullptr;
  75. }
  76. }
  77. private:
  78. IRequest* Req_;
  79. };
  80. struct IRequester {
  81. virtual ~IRequester() = default;
  82. };
  83. using IRequesterRef = TAtomicSharedPtr<IRequester>;
  84. struct IService: public TThrRefBase {
  85. virtual void ServeRequest(const IRequestRef& request) = 0;
  86. };
  87. using IServiceRef = TIntrusivePtr<IService>;
  88. using TServiceFunction = std::function<void(const IRequestRef&)>;
  89. IServiceRef Wrap(const TServiceFunction& func);
  90. class IServices {
  91. public:
  92. virtual ~IServices() = default;
  93. /// use current thread and run #threads-1 in addition
  94. virtual void Loop(size_t threads) = 0;
  95. /// run #threads and return control
  96. virtual void ForkLoop(size_t threads) = 0;
  97. /// send stopping request and wait stopping all services
  98. virtual void SyncStopFork() = 0;
  99. /// send stopping request and return control (async call)
  100. virtual void Stop() = 0;
  101. /// just listen, don't start any threads
  102. virtual void Listen() = 0;
  103. inline IServices& Add(const TString& service, IServiceRef srv) {
  104. DoAdd(service, srv);
  105. return *this;
  106. }
  107. inline IServices& Add(const TString& service, const TServiceFunction& func) {
  108. return Add(service, Wrap(func));
  109. }
  110. template <class T>
  111. inline IServices& Add(const TString& service, T& t) {
  112. return this->Add(service, std::bind(&T::ServeRequest, std::ref(t), std::placeholders::_1));
  113. }
  114. template <class T, void (T::*M)(const IRequestRef&)>
  115. inline IServices& Add(const TString& service, T& t) {
  116. return this->Add(service, std::bind(M, std::ref(t), std::placeholders::_1));
  117. }
  118. private:
  119. virtual void DoAdd(const TString& service, IServiceRef srv) = 0;
  120. };
  121. using IServicesRef = TAutoPtr<IServices>;
  122. using TCheck = std::function<TMaybe<IRequest::TResponseError>(const IRequestRef&)>;
  123. IServicesRef CreateLoop();
  124. // if request fails check it will be cancelled
  125. IServicesRef CreateLoop(TCheck check);
  126. }