interconnect.h 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179
  1. #pragma once
  2. #include <library/cpp/actors/core/actorsystem.h>
  3. #include <library/cpp/actors/core/interconnect.h>
  4. #include <util/generic/map.h>
  5. #include <util/network/address.h>
  6. namespace NActors {
  7. struct TInterconnectGlobalState: public TThrRefBase {
  8. TString SelfAddress;
  9. ui32 SelfPort;
  10. TVector<TActorId> GlobalNameservers; // todo: add some info about (like expected reply time)
  11. };
  12. struct TInterconnectProxySetup: public TThrRefBase {
  13. // synchronous (session -> proxy)
  14. struct IProxy : TNonCopyable {
  15. virtual ~IProxy() {
  16. }
  17. virtual void ActivateSession(const TActorContext& ctx) = 0; // session activated
  18. virtual void DetachSession(const TActorContext& ctx) = 0; // session is dead
  19. };
  20. // synchronous (proxy -> session)
  21. struct ISession : TNonCopyable {
  22. virtual ~ISession() {
  23. }
  24. virtual void DetachSession(const TActorContext& ownerCtx, const TActorContext& sessionCtx) = 0; // kill yourself
  25. virtual void ForwardPacket(TAutoPtr<IEventHandle>& ev, const TActorContext& ownerCtx, const TActorContext& sessionCtx) = 0; // receive packet for forward
  26. virtual void Connect(const TActorContext& ownerCtx, const TActorContext& sessionCtx) = 0; // begin connection
  27. virtual bool ReceiveIncomingSession(TAutoPtr<IEventHandle>& ev, const TActorContext& ownerCtx, const TActorContext& sessionCtx) = 0; // handle incoming session, if returns true - then session is dead and must be recreated with new one
  28. };
  29. ui32 DestinationNode;
  30. TString StaticAddress; // if set - would be used as main destination address
  31. int StaticPort;
  32. TIntrusivePtr<TInterconnectGlobalState> GlobalState;
  33. virtual IActor* CreateSession(const TActorId& ownerId, IProxy* owner) = 0; // returned actor is session and would be attached to same mailbox as proxy to allow sync calls
  34. virtual TActorSetupCmd CreateAcceptor() = 0;
  35. };
  36. struct TNameserverSetup {
  37. TActorId ServiceID;
  38. TIntrusivePtr<TInterconnectGlobalState> GlobalState;
  39. };
  40. struct TTableNameserverSetup: public TThrRefBase {
  41. struct TNodeInfo {
  42. TString Address;
  43. TString Host;
  44. TString ResolveHost;
  45. ui16 Port;
  46. TNodeLocation Location;
  47. TString& first;
  48. ui16& second;
  49. TNodeInfo()
  50. : first(Address)
  51. , second(Port)
  52. {
  53. }
  54. TNodeInfo(const TNodeInfo&) = default;
  55. // for testing purposes only
  56. TNodeInfo(const TString& address, const TString& host, ui16 port)
  57. : TNodeInfo()
  58. {
  59. Address = address;
  60. Host = host;
  61. ResolveHost = host;
  62. Port = port;
  63. }
  64. TNodeInfo(const TString& address,
  65. const TString& host,
  66. const TString& resolveHost,
  67. ui16 port,
  68. const TNodeLocation& location)
  69. : TNodeInfo()
  70. {
  71. Address = address;
  72. Host = host;
  73. ResolveHost = resolveHost;
  74. Port = port;
  75. Location = location;
  76. }
  77. // for testing purposes only
  78. TNodeInfo& operator=(const std::pair<TString, ui32>& pr) {
  79. Address = pr.first;
  80. Host = pr.first;
  81. ResolveHost = pr.first;
  82. Port = pr.second;
  83. return *this;
  84. }
  85. TNodeInfo& operator=(const TNodeInfo& ni) {
  86. Address = ni.Address;
  87. Host = ni.Host;
  88. ResolveHost = ni.ResolveHost;
  89. Port = ni.Port;
  90. Location = ni.Location;
  91. return *this;
  92. }
  93. };
  94. TMap<ui32, TNodeInfo> StaticNodeTable;
  95. bool IsEntriesUnique() const;
  96. };
  97. struct TNodeRegistrarSetup {
  98. TActorId ServiceID;
  99. TIntrusivePtr<TInterconnectGlobalState> GlobalState;
  100. };
  101. TActorId GetNameserviceActorId();
  102. /**
  103. * Const table-lookup based name service
  104. */
  105. IActor* CreateNameserverTable(
  106. const TIntrusivePtr<TTableNameserverSetup>& setup,
  107. ui32 poolId = 0);
  108. /**
  109. * Name service which can be paired with external discovery service.
  110. * Copies information from setup on the start (table may be empty).
  111. * Handles TEvNodesInfo to change list of known nodes.
  112. *
  113. * If PendingPeriod is not zero, wait for unknown nodeId
  114. */
  115. IActor* CreateDynamicNameserver(
  116. const TIntrusivePtr<TTableNameserverSetup>& setup,
  117. const TDuration& pendingPeriod = TDuration::Zero(),
  118. ui32 poolId = 0);
  119. /**
  120. * Creates an actor that resolves host/port and replies with either:
  121. *
  122. * - TEvLocalNodeInfo on success
  123. * - TEvResolveError on errors
  124. *
  125. * Optional defaultAddress may be used as fallback.
  126. */
  127. IActor* CreateResolveActor(
  128. const TString& host, ui16 port, ui32 nodeId, const TString& defaultAddress,
  129. const TActorId& replyTo, const TActorId& replyFrom, TInstant deadline);
  130. inline IActor* CreateResolveActor(
  131. ui32 nodeId, const TTableNameserverSetup::TNodeInfo& nodeInfo,
  132. const TActorId& replyTo, const TActorId& replyFrom, TInstant deadline)
  133. {
  134. return CreateResolveActor(nodeInfo.ResolveHost, nodeInfo.Port, nodeId, nodeInfo.Address,
  135. replyTo, replyFrom, deadline);
  136. }
  137. /**
  138. * Creates an actor that resolves host/port and replies with either:
  139. *
  140. * - TEvAddressInfo on success
  141. * - TEvResolveError on errors
  142. */
  143. IActor* CreateResolveActor(
  144. const TString& host, ui16 port,
  145. const TActorId& replyTo, const TActorId& replyFrom, TInstant deadline);
  146. }