multiclient.cpp 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378
  1. #include "multiclient.h"
  2. #include "utils.h"
  3. #include <library/cpp/containers/intrusive_rb_tree/rb_tree.h>
  4. #include <atomic>
  5. namespace {
  6. using namespace NNeh;
  7. struct TCompareDeadline {
  8. template <class T>
  9. static inline bool Compare(const T& l, const T& r) noexcept {
  10. return l.Deadline() < r.Deadline() || (l.Deadline() == r.Deadline() && &l < &r);
  11. }
  12. };
  13. class TMultiClient: public IMultiClient, public TThrRefBase {
  14. class TRequestSupervisor: public TRbTreeItem<TRequestSupervisor, TCompareDeadline>, public IOnRecv, public TThrRefBase, public TNonCopyable {
  15. private:
  16. TRequestSupervisor() {
  17. } //disable
  18. public:
  19. inline TRequestSupervisor(const TRequest& request, TMultiClient* mc) noexcept
  20. : MC_(mc)
  21. , Request_(request)
  22. , Maked_(0)
  23. , FinishOnMakeRequest_(0)
  24. , Handled_(0)
  25. , Dequeued_(false)
  26. {
  27. }
  28. inline TInstant Deadline() const noexcept {
  29. return Request_.Deadline;
  30. }
  31. //not thread safe (can be called at some time from TMultiClient::Request() and TRequestSupervisor::OnNotify())
  32. void OnMakeRequest(THandleRef h) noexcept {
  33. //request can be mark as maked only once, so only one/first call set handle
  34. if (AtomicCas(&Maked_, 1, 0)) {
  35. H_.Swap(h);
  36. //[paranoid mode on] make sure handle be initiated before return
  37. AtomicSet(FinishOnMakeRequest_, 1);
  38. } else {
  39. while (!AtomicGet(FinishOnMakeRequest_)) {
  40. SpinLockPause();
  41. }
  42. //[paranoid mode off]
  43. }
  44. }
  45. void FillEvent(TEvent& ev) noexcept {
  46. ev.Hndl = H_;
  47. FillEventUserData(ev);
  48. }
  49. void FillEventUserData(TEvent& ev) noexcept {
  50. ev.UserData = Request_.UserData;
  51. }
  52. void ResetRequest() noexcept { //destroy keepaliving cross-ref TRequestSupervisor<->THandle
  53. H_.Drop();
  54. }
  55. //method OnProcessRequest() & OnProcessResponse() executed from Wait() context (thread)
  56. void OnEndProcessRequest() {
  57. Dequeued_ = true;
  58. if (Y_UNLIKELY(IsHandled())) {
  59. ResetRequest(); //race - response already handled before processing request from queue
  60. } else {
  61. MC_->RegisterRequest(this);
  62. }
  63. }
  64. void OnEndProcessResponse() {
  65. if (Y_LIKELY(Dequeued_)) {
  66. UnLink();
  67. ResetRequest();
  68. } //else request yet not dequeued/registered, so we not need unlink request
  69. //(when we later dequeue request OnEndProcessRequest()...IsHandled() return true and we reset request)
  70. }
  71. //IOnRecv interface
  72. void OnNotify(THandle& h) override {
  73. if (Y_LIKELY(MarkAsHandled())) {
  74. THandleRef hr(&h);
  75. OnMakeRequest(hr); //fix race with receiving response before return control from NNeh::Request()
  76. MC_->ScheduleResponse(this, hr);
  77. }
  78. }
  79. void OnRecv(THandle&) noexcept override {
  80. UnRef();
  81. }
  82. void OnEnd() noexcept override {
  83. UnRef();
  84. }
  85. //
  86. //request can be handled only once, so only one/first call MarkAsHandled() return true
  87. bool MarkAsHandled() noexcept {
  88. return AtomicCas(&Handled_, 1, 0);
  89. }
  90. bool IsHandled() const noexcept {
  91. return AtomicGet(Handled_);
  92. }
  93. private:
  94. TIntrusivePtr<TMultiClient> MC_;
  95. TRequest Request_;
  96. THandleRef H_;
  97. TAtomic Maked_;
  98. TAtomic FinishOnMakeRequest_;
  99. TAtomic Handled_;
  100. bool Dequeued_;
  101. };
  102. typedef TRbTree<TRequestSupervisor, TCompareDeadline> TRequestsSupervisors;
  103. typedef TIntrusivePtr<TRequestSupervisor> TRequestSupervisorRef;
  104. public:
  105. TMultiClient()
  106. : Interrupt_(false)
  107. , NearDeadline_(TInstant::Max().GetValue())
  108. , E_(::TSystemEvent::rAuto)
  109. , Shutdown_(false)
  110. {
  111. }
  112. struct TResetRequest {
  113. inline void operator()(TRequestSupervisor& rs) const noexcept {
  114. rs.ResetRequest();
  115. }
  116. };
  117. void Shutdown() {
  118. //reset THandleRef's for all exist supervisors and jobs queue (+prevent creating new)
  119. //- so we break crossref-chain, which prevent destroy this object THande->TRequestSupervisor->TMultiClient)
  120. Shutdown_ = true;
  121. RS_.ForEachNoOrder(TResetRequest());
  122. RS_.Clear();
  123. CleanQueue();
  124. }
  125. private:
  126. class IJob {
  127. public:
  128. virtual ~IJob() {
  129. }
  130. virtual bool Process(TEvent&) = 0;
  131. virtual void Cancel() = 0;
  132. };
  133. typedef TAutoPtr<IJob> TJobPtr;
  134. class TNewRequest: public IJob {
  135. public:
  136. TNewRequest(TRequestSupervisorRef& rs)
  137. : RS_(rs)
  138. {
  139. }
  140. private:
  141. bool Process(TEvent&) override {
  142. RS_->OnEndProcessRequest();
  143. return false;
  144. }
  145. void Cancel() override {
  146. RS_->ResetRequest();
  147. }
  148. TRequestSupervisorRef RS_;
  149. };
  150. class TNewResponse: public IJob {
  151. public:
  152. TNewResponse(TRequestSupervisor* rs, THandleRef& h) noexcept
  153. : RS_(rs)
  154. , H_(h)
  155. {
  156. }
  157. private:
  158. bool Process(TEvent& ev) override {
  159. ev.Type = TEvent::Response;
  160. ev.Hndl = H_;
  161. RS_->FillEventUserData(ev);
  162. RS_->OnEndProcessResponse();
  163. return true;
  164. }
  165. void Cancel() override {
  166. RS_->ResetRequest();
  167. }
  168. TRequestSupervisorRef RS_;
  169. THandleRef H_;
  170. };
  171. public:
  172. THandleRef Request(const TRequest& request) override {
  173. TIntrusivePtr<TRequestSupervisor> rs(new TRequestSupervisor(request, this));
  174. THandleRef h;
  175. try {
  176. rs->Ref();
  177. h = NNeh::Request(request.Msg, rs.Get());
  178. //accurately handle race when processing new request event
  179. //(we already can receive response (call OnNotify) before we schedule info about new request here)
  180. } catch (...) {
  181. rs->UnRef();
  182. throw;
  183. }
  184. rs->OnMakeRequest(h);
  185. ScheduleRequest(rs, h, request.Deadline);
  186. return h;
  187. }
  188. bool Wait(TEvent& ev, const TInstant deadline_ = TInstant::Max()) override {
  189. while (!Interrupt_) {
  190. TInstant deadline = deadline_;
  191. const TInstant now = TInstant::Now();
  192. if (deadline != TInstant::Max() && now >= deadline) {
  193. break;
  194. }
  195. { //process jobs queue (requests/responses info)
  196. TAutoPtr<IJob> j;
  197. while (JQ_.Dequeue(&j)) {
  198. if (j->Process(ev)) {
  199. return true;
  200. }
  201. }
  202. }
  203. if (!RS_.Empty()) {
  204. TRequestSupervisor* nearRS = &*RS_.Begin();
  205. if (nearRS->Deadline() <= now) {
  206. if (!nearRS->MarkAsHandled()) {
  207. //race with notify, - now in queue must exist response job for this request
  208. continue;
  209. }
  210. ev.Type = TEvent::Timeout;
  211. nearRS->FillEvent(ev);
  212. nearRS->ResetRequest();
  213. nearRS->UnLink();
  214. return true;
  215. }
  216. deadline = Min(nearRS->Deadline(), deadline);
  217. }
  218. if (SetNearDeadline(deadline)) {
  219. continue; //update deadline to more far time, so need re-check queue for avoiding race
  220. }
  221. E_.WaitD(deadline);
  222. }
  223. Interrupt_ = false;
  224. return false;
  225. }
  226. void Interrupt() override {
  227. Interrupt_ = true;
  228. Signal();
  229. }
  230. size_t QueueSize() override {
  231. return JQ_.Size();
  232. }
  233. private:
  234. void Signal() {
  235. //TODO:try optimize - hack with skipping signaling if not have waiters (reduce mutex usage)
  236. E_.Signal();
  237. }
  238. void ScheduleRequest(TIntrusivePtr<TRequestSupervisor>& rs, const THandleRef& h, const TInstant& deadline) {
  239. TJobPtr j(new TNewRequest(rs));
  240. JQ_.Enqueue(j);
  241. if (!h->Signalled()) {
  242. if (deadline.GetValue() < GetNearDeadline_()) {
  243. Signal();
  244. }
  245. }
  246. }
  247. void ScheduleResponse(TRequestSupervisor* rs, THandleRef& h) {
  248. TJobPtr j(new TNewResponse(rs, h));
  249. JQ_.Enqueue(j);
  250. if (Y_UNLIKELY(Shutdown_)) {
  251. CleanQueue();
  252. } else {
  253. Signal();
  254. }
  255. }
  256. //return true, if deadline re-installed to more late time
  257. bool SetNearDeadline(const TInstant& deadline) {
  258. bool deadlineMovedFurther = deadline.GetValue() > GetNearDeadline_();
  259. SetNearDeadline_(deadline.GetValue());
  260. return deadlineMovedFurther;
  261. }
  262. //used only from Wait()
  263. void RegisterRequest(TRequestSupervisor* rs) {
  264. if (rs->Deadline() != TInstant::Max()) {
  265. RS_.Insert(rs);
  266. } else {
  267. rs->ResetRequest(); //prevent blocking destruction 'endless' requests
  268. }
  269. }
  270. void CleanQueue() {
  271. TAutoPtr<IJob> j;
  272. while (JQ_.Dequeue(&j)) {
  273. j->Cancel();
  274. }
  275. }
  276. private:
  277. void SetNearDeadline_(const TInstant::TValue& v) noexcept {
  278. TGuard<TAdaptiveLock> g(NDLock_);
  279. NearDeadline_.store(v, std::memory_order_release);
  280. }
  281. TInstant::TValue GetNearDeadline_() const noexcept {
  282. TGuard<TAdaptiveLock> g(NDLock_);
  283. return NearDeadline_.load(std::memory_order_acquire);
  284. }
  285. NNeh::TAutoLockFreeQueue<IJob> JQ_;
  286. TAtomicBool Interrupt_;
  287. TRequestsSupervisors RS_;
  288. TAdaptiveLock NDLock_;
  289. std::atomic<TInstant::TValue> NearDeadline_;
  290. ::TSystemEvent E_;
  291. TAtomicBool Shutdown_;
  292. };
  293. class TMultiClientAutoShutdown: public IMultiClient {
  294. public:
  295. TMultiClientAutoShutdown()
  296. : MC_(new TMultiClient())
  297. {
  298. }
  299. ~TMultiClientAutoShutdown() override {
  300. MC_->Shutdown();
  301. }
  302. size_t QueueSize() override {
  303. return MC_->QueueSize();
  304. }
  305. private:
  306. THandleRef Request(const TRequest& req) override {
  307. return MC_->Request(req);
  308. }
  309. bool Wait(TEvent& ev, TInstant deadline = TInstant::Max()) override {
  310. return MC_->Wait(ev, deadline);
  311. }
  312. void Interrupt() override {
  313. return MC_->Interrupt();
  314. }
  315. private:
  316. TIntrusivePtr<TMultiClient> MC_;
  317. };
  318. }
  319. TMultiClientPtr NNeh::CreateMultiClient() {
  320. return new TMultiClientAutoShutdown();
  321. }