flow_controlled_queue.cpp 7.0 KB


  1. #include "flow_controlled_queue.h"
  2. #include <library/cpp/actors/core/interconnect.h>
  3. #include <library/cpp/actors/core/hfunc.h>
  4. #include <library/cpp/actors/util/datetime.h>
  5. #include <util/generic/deque.h>
  6. #include <util/datetime/cputimer.h>
  7. #include <util/generic/algorithm.h>
  8. namespace NActors {
  9. class TFlowControlledRequestQueue;
  10. class TFlowControlledRequestActor : public IActor {
  11. TFlowControlledRequestQueue * const QueueActor;
  12. void HandleReply(TAutoPtr<IEventHandle> &ev);
  13. void HandleUndelivered(TEvents::TEvUndelivered::TPtr &ev);
  14. public:
  15. const TActorId Source;
  16. const ui64 Cookie;
  17. const ui32 Flags;
  18. const ui64 StartCounter;
  19. TFlowControlledRequestActor(ui32 activity, TFlowControlledRequestQueue *queue, TActorId source, ui64 cookie, ui32 flags)
  20. : IActor(static_cast<TReceiveFunc>(&TFlowControlledRequestActor::StateWait), activity)
  21. , QueueActor(queue)
  22. , Source(source)
  23. , Cookie(cookie)
  24. , Flags(flags)
  25. , StartCounter(GetCycleCountFast())
  26. {}
  27. STATEFN(StateWait) {
  28. switch (ev->GetTypeRewrite()) {
  29. hFunc(TEvents::TEvUndelivered, HandleUndelivered);
  30. default:
  31. HandleReply(ev);
  32. }
  33. }
  34. TDuration AccumulatedLatency() const {
  35. const ui64 cc = GetCycleCountFast() - StartCounter;
  36. return CyclesToDuration(cc);
  37. }
  38. using IActor::PassAway;
  39. };
  40. class TFlowControlledRequestQueue : public IActor {
  41. const TActorId Target;
  42. const TFlowControlledQueueConfig Config;
  43. TDeque<THolder<IEventHandle>> UnhandledRequests;
  44. TDeque<TFlowControlledRequestActor *> RegisteredRequests;
  45. bool Subscribed = false;
  46. TDuration MinimalSeenLatency;
  47. bool CanRegister() {
  48. const ui64 inFly = RegisteredRequests.size();
  49. if (inFly <= Config.MinAllowedInFly) // <= for handling minAllowed == 0
  50. return true;
  51. if (inFly >= Config.MaxAllowedInFly)
  52. return false;
  53. if (Config.TargetDynamicRate) {
  54. if (const ui64 dynMax = MinimalSeenLatency.MicroSeconds() * Config.TargetDynamicRate / 1000000) {
  55. if (inFly >= dynMax)
  56. return false;
  57. }
  58. }
  59. const TDuration currentLatency = RegisteredRequests.front()->AccumulatedLatency();
  60. if (currentLatency <= Config.MinTrackedLatency)
  61. return true;
  62. if (currentLatency <= MinimalSeenLatency * Config.LatencyFactor)
  63. return true;
  64. return false;
  65. }
  66. void HandleForwardedEvent(TAutoPtr<IEventHandle> &ev) {
  67. if (CanRegister()) {
  68. RegisterReqActor(ev);
  69. } else {
  70. UnhandledRequests.emplace_back(ev.Release());
  71. }
  72. }
  73. void RegisterReqActor(THolder<IEventHandle> ev) {
  74. TFlowControlledRequestActor *reqActor = new TFlowControlledRequestActor(ActivityType, this, ev->Sender, ev->Cookie, ev->Flags);
  75. const TActorId reqActorId = RegisterWithSameMailbox(reqActor);
  76. RegisteredRequests.emplace_back(reqActor);
  77. if (!Subscribed && (Target.NodeId() != SelfId().NodeId())) {
  78. Send(TActivationContext::InterconnectProxy(Target.NodeId()), new TEvents::TEvSubscribe(), IEventHandle::FlagTrackDelivery);
  79. Subscribed = true;
  80. }
  81. TActivationContext::Send(new IEventHandle(Target, reqActorId, ev->ReleaseBase().Release(), IEventHandle::FlagTrackDelivery, ev->Cookie));
  82. }
  83. void PumpQueue() {
  84. while (RegisteredRequests && RegisteredRequests.front() == nullptr)
  85. RegisteredRequests.pop_front();
  86. while (UnhandledRequests && CanRegister()) {
  87. RegisterReqActor(std::move(UnhandledRequests.front()));
  88. UnhandledRequests.pop_front();
  89. }
  90. }
  91. void HandleDisconnected() {
  92. Subscribed = false;
  93. const ui32 nodeid = Target.NodeId();
  94. for (TFlowControlledRequestActor *reqActor : RegisteredRequests) {
  95. if (reqActor) {
  96. if (reqActor->Flags & IEventHandle::FlagSubscribeOnSession) {
  97. TActivationContext::Send(
  98. new IEventHandle(reqActor->Source, TActorId(), new TEvInterconnect::TEvNodeDisconnected(nodeid), 0, reqActor->Cookie)
  99. );
  100. }
  101. reqActor->PassAway();
  102. }
  103. }
  104. RegisteredRequests.clear();
  105. for (auto &ev : UnhandledRequests) {
  106. const auto reason = TEvents::TEvUndelivered::Disconnected;
  107. if (ev->Flags & IEventHandle::FlagTrackDelivery) {
  108. TActivationContext::Send(
  109. new IEventHandle(ev->Sender, ev->Recipient, new TEvents::TEvUndelivered(ev->GetTypeRewrite(), reason), 0, ev->Cookie)
  110. );
  111. }
  112. }
  113. UnhandledRequests.clear();
  114. }
  115. void HandlePoison() {
  116. HandleDisconnected();
  117. if (SelfId().NodeId() != Target.NodeId())
  118. Send(TActivationContext::InterconnectProxy(Target.NodeId()), new TEvents::TEvUnsubscribe());
  119. PassAway();
  120. }
  121. public:
  122. TFlowControlledRequestQueue(TActorId target, ui32 activity, const TFlowControlledQueueConfig &config)
  123. : IActor(static_cast<TReceiveFunc>(&TFlowControlledRequestQueue::StateWork), activity)
  124. , Target(target)
  125. , Config(config)
  126. , MinimalSeenLatency(TDuration::Seconds(1))
  127. {}
  128. STATEFN(StateWork) {
  129. switch (ev->GetTypeRewrite()) {
  130. cFunc(TEvInterconnect::TEvNodeDisconnected::EventType, HandleDisconnected);
  131. IgnoreFunc(TEvInterconnect::TEvNodeConnected);
  132. cFunc(TEvents::TEvUndelivered::EventType, HandleDisconnected);
  133. cFunc(TEvents::TEvPoison::EventType, HandlePoison);
  134. default:
  135. HandleForwardedEvent(ev);
  136. }
  137. }
  138. void HandleRequestReply(TAutoPtr<IEventHandle> &ev, TFlowControlledRequestActor *reqActor) {
  139. auto it = Find(RegisteredRequests, reqActor);
  140. if (it == RegisteredRequests.end())
  141. return;
  142. TActivationContext::Send(ev->Forward(reqActor->Source));
  143. const TDuration reqLatency = reqActor->AccumulatedLatency();
  144. if (reqLatency < MinimalSeenLatency)
  145. MinimalSeenLatency = reqLatency;
  146. *it = nullptr;
  147. PumpQueue();
  148. }
  149. void HandleRequestUndelivered(TEvents::TEvUndelivered::TPtr &ev, TFlowControlledRequestActor *reqActor) {
  150. auto it = Find(RegisteredRequests, reqActor);
  151. if (it == RegisteredRequests.end())
  152. return;
  153. TActivationContext::Send(ev->Forward(reqActor->Source));
  154. *it = nullptr;
  155. PumpQueue();
  156. }
  157. };
  158. void TFlowControlledRequestActor::HandleReply(TAutoPtr<IEventHandle> &ev) {
  159. QueueActor->HandleRequestReply(ev, this);
  160. PassAway();
  161. }
  162. void TFlowControlledRequestActor::HandleUndelivered(TEvents::TEvUndelivered::TPtr &ev) {
  163. QueueActor->HandleRequestUndelivered(ev, this);
  164. PassAway();
  165. }
  166. IActor* CreateFlowControlledRequestQueue(TActorId targetId, ui32 activity, const TFlowControlledQueueConfig &config) {
  167. return new TFlowControlledRequestQueue(targetId, activity, config);
  168. }
  169. }