interconnect_nameserver_dynamic.cpp 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178
  1. #include "interconnect.h"
  2. #include "interconnect_impl.h"
  3. #include "interconnect_address.h"
  4. #include "interconnect_nameserver_base.h"
  5. #include "events_local.h"
  6. #include "logging.h"
  7. #include <library/cpp/actors/core/hfunc.h>
  8. #include <library/cpp/actors/core/log.h>
  9. namespace NActors {
  10. class TInterconnectDynamicNameserver
  11. : public TInterconnectNameserverBase<TInterconnectDynamicNameserver>
  12. , public TInterconnectLoggingBase
  13. {
  14. struct TPendingRequest {
  15. TEvInterconnect::TEvResolveNode::TPtr Request;
  16. TInstant Deadline;
  17. TPendingRequest(TEvInterconnect::TEvResolveNode::TPtr request, const TInstant& deadline)
  18. : Request(request), Deadline(deadline)
  19. {
  20. }
  21. };
  22. TMap<ui32, TTableNameserverSetup::TNodeInfo> NodeTable;
  23. TVector<TPendingRequest> PendingRequests;
  24. TDuration PendingPeriod;
  25. void PrintInfo() {
  26. TString logMsg = TStringBuilder() << "Table size: " << NodeTable.size();
  27. for (const auto& [nodeId, node] : NodeTable) {
  28. TString str = TStringBuilder() << "\n > Node " << nodeId << " `" << node.Address << "`:" << node.Port << ", host: " << node.Host << ", resolveHost: " << node.ResolveHost;
  29. logMsg += str;
  30. }
  31. LOG_DEBUG_IC("ICN01", "%s", logMsg.c_str());
  32. }
  33. bool IsNodeUpdated(const ui32 nodeId, const TString& address, const ui32 port) {
  34. bool printInfo = false;
  35. auto it = NodeTable.find(nodeId);
  36. if (it == NodeTable.end()) {
  37. LOG_DEBUG_IC("ICN02", "New node %u `%s`: %u",
  38. nodeId, address.c_str(), port);
  39. printInfo = true;
  40. } else if (it->second.Address != address || it->second.Port != port) {
  41. LOG_DEBUG_IC("ICN03", "Updated node %u `%s`: %u (from `%s`: %u)",
  42. nodeId, address.c_str(), port, it->second.Address.c_str(), it->second.Port);
  43. printInfo = true;
  44. Send(TActivationContext::InterconnectProxy(nodeId), new TEvInterconnect::TEvDisconnect);
  45. }
  46. return printInfo;
  47. }
  48. void DiscardTimedOutRequests(const TActorContext& ctx, ui32 compactionCount = 0) {
  49. auto now = Now();
  50. for (auto& pending : PendingRequests) {
  51. if (pending.Deadline > now) {
  52. LOG_ERROR_IC("ICN06", "Unknown nodeId: %u", pending.Request->Get()->Record.GetNodeId());
  53. auto reply = new TEvLocalNodeInfo;
  54. reply->NodeId = pending.Request->Get()->Record.GetNodeId();
  55. ctx.Send(pending.Request->Sender, reply);
  56. pending.Request.Reset();
  57. compactionCount++;
  58. }
  59. }
  60. if (compactionCount) {
  61. TVector<TPendingRequest> requests;
  62. if (compactionCount < PendingRequests.size()) { // sanity check
  63. requests.reserve(PendingRequests.size() - compactionCount);
  64. }
  65. for (auto& pending : PendingRequests) {
  66. if (pending.Request) {
  67. requests.emplace_back(pending.Request, pending.Deadline);
  68. }
  69. }
  70. PendingRequests.swap(requests);
  71. }
  72. }
  73. void SchedulePeriodic() {
  74. Schedule(TDuration::MilliSeconds(200), new TEvents::TEvWakeup());
  75. }
  76. public:
  77. static constexpr EActivityType ActorActivityType() {
  78. return NAMESERVICE;
  79. }
  80. TInterconnectDynamicNameserver(const TIntrusivePtr<TTableNameserverSetup>& setup, const TDuration& pendingPeriod, ui32 /*resolvePoolId*/ )
  81. : TInterconnectNameserverBase<TInterconnectDynamicNameserver>(&TInterconnectDynamicNameserver::StateFunc, NodeTable)
  82. , NodeTable(setup->StaticNodeTable)
  83. , PendingPeriod(pendingPeriod)
  84. {
  85. Y_VERIFY(setup->IsEntriesUnique());
  86. }
  87. STFUNC(StateFunc) {
  88. try {
  89. switch (ev->GetTypeRewrite()) {
  90. HFunc(TEvInterconnect::TEvResolveNode, Handle);
  91. HFunc(TEvResolveAddress, Handle);
  92. HFunc(TEvInterconnect::TEvListNodes, Handle);
  93. HFunc(TEvInterconnect::TEvGetNode, Handle);
  94. HFunc(TEvInterconnect::TEvNodesInfo, HandleUpdate);
  95. CFunc(TEvents::TEvWakeup::EventType, HandlePeriodic);
  96. }
  97. } catch (...) {
  98. LOG_ERROR_IC("ICN09", "%s", CurrentExceptionMessage().c_str());
  99. }
  100. }
  101. void HandleMissedNodeId(TEvInterconnect::TEvResolveNode::TPtr& ev,
  102. const TActorContext& ctx,
  103. const TInstant& deadline) {
  104. if (PendingPeriod) {
  105. if (PendingRequests.size() == 0) {
  106. SchedulePeriodic();
  107. }
  108. PendingRequests.emplace_back(std::move(ev), Min(deadline, Now() + PendingPeriod));
  109. } else {
  110. LOG_ERROR_IC("ICN07", "Unknown nodeId: %u", ev->Get()->Record.GetNodeId());
  111. TInterconnectNameserverBase::HandleMissedNodeId(ev, ctx, deadline);
  112. }
  113. }
  114. void HandleUpdate(TEvInterconnect::TEvNodesInfo::TPtr& ev,
  115. const TActorContext& ctx) {
  116. auto request = ev->Get();
  117. LOG_DEBUG_IC("ICN04", "Update TEvNodesInfo with sz: %lu ", request->Nodes.size());
  118. bool printInfo = false;
  119. ui32 compactionCount = 0;
  120. for (const auto& node : request->Nodes) {
  121. printInfo |= IsNodeUpdated(node.NodeId, node.Address, node.Port);
  122. NodeTable[node.NodeId] = TTableNameserverSetup::TNodeInfo(
  123. node.Address, node.Host, node.ResolveHost, node.Port, node.Location);
  124. for (auto& pending : PendingRequests) {
  125. if (pending.Request->Get()->Record.GetNodeId() == node.NodeId) {
  126. LOG_DEBUG_IC("ICN05", "Pending nodeId: %u discovered", node.NodeId);
  127. RegisterWithSameMailbox(
  128. CreateResolveActor(node.NodeId, NodeTable[node.NodeId], pending.Request->Sender, SelfId(), pending.Deadline));
  129. pending.Request.Reset();
  130. compactionCount++;
  131. }
  132. }
  133. }
  134. if (printInfo) {
  135. PrintInfo();
  136. }
  137. DiscardTimedOutRequests(ctx, compactionCount);
  138. }
  139. void HandlePeriodic(const TActorContext& ctx) {
  140. DiscardTimedOutRequests(ctx, 0);
  141. if (PendingRequests.size()) {
  142. SchedulePeriodic();
  143. }
  144. }
  145. };
  146. IActor* CreateDynamicNameserver(const TIntrusivePtr<TTableNameserverSetup>& setup,
  147. const TDuration& pendingPeriod,
  148. ui32 poolId) {
  149. return new TInterconnectDynamicNameserver(setup, pendingPeriod, poolId);
  150. }
  151. }