channel_scheduler.h 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120
  1. #pragma once
  2. #include "interconnect_channel.h"
  3. #include "event_holder_pool.h"
  4. #include <memory>
  5. namespace NActors {
  6. class TChannelScheduler {
  7. const ui32 PeerNodeId;
  8. std::array<std::optional<TEventOutputChannel>, 16> ChannelArray;
  9. THashMap<ui16, TEventOutputChannel> ChannelMap;
  10. std::shared_ptr<IInterconnectMetrics> Metrics;
  11. TEventHolderPool& Pool;
  12. const ui32 MaxSerializedEventSize;
  13. const TSessionParams Params;
  14. struct THeapItem {
  15. TEventOutputChannel *Channel;
  16. ui64 WeightConsumed = 0;
  17. friend bool operator <(const THeapItem& x, const THeapItem& y) {
  18. return x.WeightConsumed > y.WeightConsumed;
  19. }
  20. };
  21. std::vector<THeapItem> Heap;
  22. public:
  23. TChannelScheduler(ui32 peerNodeId, const TChannelsConfig& predefinedChannels,
  24. std::shared_ptr<IInterconnectMetrics> metrics, TEventHolderPool& pool, ui32 maxSerializedEventSize,
  25. TSessionParams params)
  26. : PeerNodeId(peerNodeId)
  27. , Metrics(std::move(metrics))
  28. , Pool(pool)
  29. , MaxSerializedEventSize(maxSerializedEventSize)
  30. , Params(std::move(params))
  31. {
  32. for (const auto& item : predefinedChannels) {
  33. GetOutputChannel(item.first);
  34. }
  35. }
  36. TEventOutputChannel *PickChannelWithLeastConsumedWeight() {
  37. Y_VERIFY(!Heap.empty());
  38. return Heap.front().Channel;
  39. }
  40. void AddToHeap(TEventOutputChannel& channel, ui64 counter) {
  41. if (channel.IsWorking()) {
  42. ui64 weight = channel.WeightConsumedOnPause;
  43. weight -= Min(weight, counter - channel.EqualizeCounterOnPause);
  44. Heap.push_back(THeapItem{&channel, weight});
  45. std::push_heap(Heap.begin(), Heap.end());
  46. }
  47. }
  48. void FinishPick(ui64 weightConsumed, ui64 counter) {
  49. std::pop_heap(Heap.begin(), Heap.end());
  50. auto& item = Heap.back();
  51. item.WeightConsumed += weightConsumed;
  52. if (item.Channel->IsWorking()) { // reschedule
  53. std::push_heap(Heap.begin(), Heap.end());
  54. } else { // remove from heap
  55. item.Channel->EqualizeCounterOnPause = counter;
  56. item.Channel->WeightConsumedOnPause = item.WeightConsumed;
  57. Heap.pop_back();
  58. }
  59. }
  60. TEventOutputChannel& GetOutputChannel(ui16 channel) {
  61. if (channel < ChannelArray.size()) {
  62. auto& res = ChannelArray[channel];
  63. if (Y_UNLIKELY(!res)) {
  64. res.emplace(Pool, channel, PeerNodeId, MaxSerializedEventSize, Metrics,
  65. Params);
  66. }
  67. return *res;
  68. } else {
  69. auto it = ChannelMap.find(channel);
  70. if (Y_UNLIKELY(it == ChannelMap.end())) {
  71. it = ChannelMap.emplace(std::piecewise_construct, std::forward_as_tuple(channel),
  72. std::forward_as_tuple(Pool, channel, PeerNodeId, MaxSerializedEventSize,
  73. Metrics, Params)).first;
  74. }
  75. return it->second;
  76. }
  77. }
  78. ui64 Equalize() {
  79. if (Heap.empty()) {
  80. return 0; // nothing to do here -- no working channels
  81. }
  82. // find the minimum consumed weight among working channels and then adjust weights
  83. ui64 min = Max<ui64>();
  84. for (THeapItem& item : Heap) {
  85. min = Min(min, item.WeightConsumed);
  86. }
  87. for (THeapItem& item : Heap) {
  88. item.WeightConsumed -= min;
  89. }
  90. return min;
  91. }
  92. template<typename TCallback>
  93. void ForEach(TCallback&& callback) {
  94. for (auto& channel : ChannelArray) {
  95. if (channel) {
  96. callback(*channel);
  97. }
  98. }
  99. for (auto& [id, channel] : ChannelMap) {
  100. callback(channel);
  101. }
  102. }
  103. };
  104. } // NActors