flow_controlled_queue.cpp 7.1 KB


  1. #include "flow_controlled_queue.h"
  2. #include <library/cpp/actors/core/interconnect.h>
  3. #include <library/cpp/actors/core/hfunc.h>
  4. #include <library/cpp/actors/util/datetime.h>
  5. #include <util/generic/deque.h>
  6. #include <util/datetime/cputimer.h>
  7. #include <util/generic/algorithm.h>
  8. namespace NActors {
  9. class TFlowControlledRequestQueue;
  10. class TFlowControlledRequestActor : public IActorCallback {
  11. TFlowControlledRequestQueue * const QueueActor;
  12. void HandleReply(TAutoPtr<IEventHandle> &ev);
  13. void HandleUndelivered(TEvents::TEvUndelivered::TPtr &ev);
  14. public:
  15. const TActorId Source;
  16. const ui64 Cookie;
  17. const ui32 Flags;
  18. const ui64 StartCounter;
  19. TFlowControlledRequestActor(ui32 activity, TFlowControlledRequestQueue *queue, TActorId source, ui64 cookie, ui32 flags)
  20. : IActorCallback(static_cast<TReceiveFunc>(&TFlowControlledRequestActor::StateWait), activity)
  21. , QueueActor(queue)
  22. , Source(source)
  23. , Cookie(cookie)
  24. , Flags(flags)
  25. , StartCounter(GetCycleCountFast())
  26. {}
  27. STATEFN(StateWait) {
  28. switch (ev->GetTypeRewrite()) {
  29. hFunc(TEvents::TEvUndelivered, HandleUndelivered);
  30. default:
  31. HandleReply(ev);
  32. }
  33. }
  34. TDuration AccumulatedLatency() const {
  35. const ui64 cc = GetCycleCountFast() - StartCounter;
  36. return CyclesToDuration(cc);
  37. }
  38. using IActor::PassAway;
  39. };
  40. class TFlowControlledRequestQueue : public IActorCallback {
  41. const TActorId Target;
  42. const TFlowControlledQueueConfig Config;
  43. TDeque<THolder<IEventHandle>> UnhandledRequests;
  44. TDeque<TFlowControlledRequestActor *> RegisteredRequests;
  45. bool Subscribed = false;
  46. TDuration MinimalSeenLatency;
  47. bool CanRegister() {
  48. const ui64 inFly = RegisteredRequests.size();
  49. if (inFly <= Config.MinAllowedInFly) // <= for handling minAllowed == 0
  50. return true;
  51. if (inFly >= Config.MaxAllowedInFly)
  52. return false;
  53. if (Config.TargetDynamicRate) {
  54. if (const ui64 dynMax = MinimalSeenLatency.MicroSeconds() * Config.TargetDynamicRate / 1000000) {
  55. if (inFly >= dynMax)
  56. return false;
  57. }
  58. }
  59. const TDuration currentLatency = RegisteredRequests.front()->AccumulatedLatency();
  60. if (currentLatency <= Config.MinTrackedLatency)
  61. return true;
  62. if (currentLatency <= MinimalSeenLatency * Config.LatencyFactor)
  63. return true;
  64. return false;
  65. }
  66. void HandleForwardedEvent(TAutoPtr<IEventHandle> &ev) {
  67. if (CanRegister()) {
  68. RegisterReqActor(ev);
  69. } else {
  70. UnhandledRequests.emplace_back(ev.Release());
  71. }
  72. }
  73. void RegisterReqActor(THolder<IEventHandle> ev) {
  74. TFlowControlledRequestActor *reqActor = new TFlowControlledRequestActor(ActivityType, this, ev->Sender, ev->Cookie, ev->Flags);
  75. const TActorId reqActorId = RegisterWithSameMailbox(reqActor);
  76. RegisteredRequests.emplace_back(reqActor);
  77. if (!Subscribed && (Target.NodeId() != SelfId().NodeId())) {
  78. Send(TActivationContext::InterconnectProxy(Target.NodeId()), new TEvents::TEvSubscribe(), IEventHandle::FlagTrackDelivery);
  79. Subscribed = true;
  80. }
  81. TActivationContext::Send(new IEventHandle(Target, reqActorId, ev.Get()->ReleaseBase().Release(), IEventHandle::FlagTrackDelivery, ev->Cookie));
  82. }
  83. void PumpQueue() {
  84. while (RegisteredRequests && RegisteredRequests.front() == nullptr)
  85. RegisteredRequests.pop_front();
  86. while (UnhandledRequests && CanRegister()) {
  87. RegisterReqActor(std::move(UnhandledRequests.front()));
  88. UnhandledRequests.pop_front();
  89. }
  90. }
  91. void HandleDisconnected() {
  92. Subscribed = false;
  93. const ui32 nodeid = Target.NodeId();
  94. for (TFlowControlledRequestActor *reqActor : RegisteredRequests) {
  95. if (reqActor) {
  96. if (reqActor->Flags & IEventHandle::FlagSubscribeOnSession) {
  97. TActivationContext::Send(
  98. new IEventHandle(reqActor->Source, TActorId(), new TEvInterconnect::TEvNodeDisconnected(nodeid), 0, reqActor->Cookie)
  99. );
  100. }
  101. reqActor->PassAway();
  102. }
  103. }
  104. RegisteredRequests.clear();
  105. for (auto &ev : UnhandledRequests) {
  106. const auto reason = TEvents::TEvUndelivered::Disconnected;
  107. if (ev->Flags & IEventHandle::FlagTrackDelivery) {
  108. TActivationContext::Send(
  109. new IEventHandle(ev->Sender, ev->Recipient, new TEvents::TEvUndelivered(ev->GetTypeRewrite(), reason), 0, ev->Cookie)
  110. );
  111. }
  112. }
  113. UnhandledRequests.clear();
  114. }
  115. void HandlePoison() {
  116. HandleDisconnected();
  117. if (SelfId().NodeId() != Target.NodeId())
  118. Send(TActivationContext::InterconnectProxy(Target.NodeId()), new TEvents::TEvUnsubscribe());
  119. PassAway();
  120. }
  121. public:
  122. template <class TEnum>
  123. TFlowControlledRequestQueue(TActorId target, const TEnum activity, const TFlowControlledQueueConfig &config)
  124. : IActorCallback(static_cast<TReceiveFunc>(&TFlowControlledRequestQueue::StateWork), activity)
  125. , Target(target)
  126. , Config(config)
  127. , MinimalSeenLatency(TDuration::Seconds(1))
  128. {}
  129. STATEFN(StateWork) {
  130. switch (ev->GetTypeRewrite()) {
  131. cFunc(TEvInterconnect::TEvNodeDisconnected::EventType, HandleDisconnected);
  132. IgnoreFunc(TEvInterconnect::TEvNodeConnected);
  133. cFunc(TEvents::TEvUndelivered::EventType, HandleDisconnected);
  134. cFunc(TEvents::TEvPoison::EventType, HandlePoison);
  135. default:
  136. HandleForwardedEvent(ev);
  137. }
  138. }
  139. void HandleRequestReply(TAutoPtr<IEventHandle> &ev, TFlowControlledRequestActor *reqActor) {
  140. auto it = Find(RegisteredRequests, reqActor);
  141. if (it == RegisteredRequests.end())
  142. return;
  143. TActivationContext::Send(ev->Forward(reqActor->Source).Release());
  144. const TDuration reqLatency = reqActor->AccumulatedLatency();
  145. if (reqLatency < MinimalSeenLatency)
  146. MinimalSeenLatency = reqLatency;
  147. *it = nullptr;
  148. PumpQueue();
  149. }
  150. void HandleRequestUndelivered(TEvents::TEvUndelivered::TPtr &ev, TFlowControlledRequestActor *reqActor) {
  151. auto it = Find(RegisteredRequests, reqActor);
  152. if (it == RegisteredRequests.end())
  153. return;
  154. TActivationContext::Send(ev->Forward(reqActor->Source).Release());
  155. *it = nullptr;
  156. PumpQueue();
  157. }
  158. };
  159. void TFlowControlledRequestActor::HandleReply(TAutoPtr<IEventHandle> &ev) {
  160. QueueActor->HandleRequestReply(ev, this);
  161. PassAway();
  162. }
  163. void TFlowControlledRequestActor::HandleUndelivered(TEvents::TEvUndelivered::TPtr &ev) {
  164. QueueActor->HandleRequestUndelivered(ev, this);
  165. PassAway();
  166. }
  167. template <class TEnum>
  168. IActor* CreateFlowControlledRequestQueue(TActorId targetId, const TEnum activity, const TFlowControlledQueueConfig &config) {
  169. return new TFlowControlledRequestQueue(targetId, activity, config);
  170. }
  171. }