client.cpp 42 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764
  1. #include "client.h"
  2. #include "config.h"
  3. #include "private.h"
  4. #include <yt/yt/client/api/client.h>
  5. #include <yt/yt/client/api/transaction.h>
  6. #include <yt/yt/client/api/dynamic_table_transaction_mixin.h>
  7. #include <yt/yt/client/api/queue_transaction_mixin.h>
  8. #include <yt/yt/client/misc/method_helpers.h>
  9. #include <yt/yt/client/ypath/public.h>
  10. #include <yt/yt/client/object_client/helpers.h>
  11. #include <yt/yt/core/concurrency/periodic_executor.h>
  12. #include <yt/yt/core/net/address.h>
  13. #include <yt/yt/core/net/local_address.h>
  14. #include <yt/yt/core/rpc/dispatcher.h>
  15. #include <yt/yt/core/rpc/helpers.h>
  16. #include <library/cpp/yt/memory/ref.h>
  17. namespace NYT::NClient::NFederated {
  18. using namespace NApi;
  19. ////////////////////////////////////////////////////////////////////////////////
  20. static const auto& Logger = FederatedClientLogger;
  21. ////////////////////////////////////////////////////////////////////////////////
  22. DECLARE_REFCOUNTED_CLASS(TClient)
  23. ////////////////////////////////////////////////////////////////////////////////
  24. std::optional<TString> GetDataCenterByClient(const IClientPtr& client)
  25. {
  26. TListNodeOptions options;
  27. options.MaxSize = 1;
  28. auto items = NConcurrency::WaitFor(client->ListNode(RpcProxiesPath, options))
  29. .ValueOrThrow();
  30. auto itemsList = NYTree::ConvertTo<NYTree::IListNodePtr>(items);
  31. if (!itemsList->GetChildCount()) {
  32. return std::nullopt;
  33. }
  34. auto host = itemsList->GetChildren()[0];
  35. return NNet::InferYPClusterFromHostName(host->GetValue<TString>());
  36. }
  37. class TTransaction
  38. : public virtual ITransaction
  39. , public TDynamicTableTransactionMixin
  40. , public TQueueTransactionMixin
  41. {
  42. public:
  43. TTransaction(TClientPtr client, int clientIndex, ITransactionPtr underlying);
  44. TFuture<ITransactionPtr> StartTransaction(
  45. NTransactionClient::ETransactionType type,
  46. const TTransactionStartOptions& options = {}) override;
  47. TFuture<TUnversionedLookupRowsResult> LookupRows(
  48. const NYPath::TYPath& path,
  49. NTableClient::TNameTablePtr nameTable,
  50. const TSharedRange<NTableClient::TLegacyKey>& keys,
  51. const TLookupRowsOptions& options = {}) override;
  52. TFuture<TSelectRowsResult> SelectRows(
  53. const TString& query,
  54. const TSelectRowsOptions& options = {}) override;
  55. void ModifyRows(
  56. const NYPath::TYPath& path,
  57. NTableClient::TNameTablePtr nameTable,
  58. TSharedRange<TRowModification> modifications,
  59. const TModifyRowsOptions& options) override;
  60. using TQueueTransactionMixin::AdvanceConsumer;
  61. TFuture<void> AdvanceConsumer(
  62. const NYPath::TRichYPath& consumerPath,
  63. const NYPath::TRichYPath& queuePath,
  64. int partitionIndex,
  65. std::optional<i64> oldOffset,
  66. i64 newOffset,
  67. const TAdvanceConsumerOptions& options) override;
  68. TFuture<TTransactionFlushResult> Flush() override;
  69. TFuture<void> Ping(const NApi::TTransactionPingOptions& options = {}) override;
  70. TFuture<TTransactionCommitResult> Commit(const TTransactionCommitOptions& options = TTransactionCommitOptions()) override;
  71. TFuture<void> Abort(const TTransactionAbortOptions& options = TTransactionAbortOptions()) override;
  72. TFuture<TVersionedLookupRowsResult> VersionedLookupRows(
  73. const NYPath::TYPath&, NTableClient::TNameTablePtr,
  74. const TSharedRange<NTableClient::TUnversionedRow>&,
  75. const TVersionedLookupRowsOptions&) override;
  76. TFuture<std::vector<TUnversionedLookupRowsResult>> MultiLookupRows(
  77. const std::vector<TMultiLookupSubrequest>&,
  78. const TMultiLookupOptions&) override;
  79. TFuture<NYson::TYsonString> ExplainQuery(const TString&, const TExplainQueryOptions&) override;
  80. TFuture<NYson::TYsonString> GetNode(const NYPath::TYPath&, const TGetNodeOptions&) override;
  81. TFuture<NYson::TYsonString> ListNode(const NYPath::TYPath&, const TListNodeOptions&) override;
  82. TFuture<bool> NodeExists(const NYPath::TYPath&, const TNodeExistsOptions&) override;
  83. TFuture<TPullRowsResult> PullRows(const NYPath::TYPath&, const TPullRowsOptions&) override;
  84. IClientPtr GetClient() const override
  85. {
  86. return Underlying_->GetClient();
  87. }
  88. NTransactionClient::ETransactionType GetType() const override
  89. {
  90. return Underlying_->GetType();
  91. }
  92. NTransactionClient::TTransactionId GetId() const override
  93. {
  94. return Underlying_->GetId();
  95. }
  96. NTransactionClient::TTimestamp GetStartTimestamp() const override
  97. {
  98. return Underlying_->GetStartTimestamp();
  99. }
  100. virtual NTransactionClient::EAtomicity GetAtomicity() const override
  101. {
  102. return Underlying_->GetAtomicity();
  103. }
  104. virtual NTransactionClient::EDurability GetDurability() const override
  105. {
  106. return Underlying_->GetDurability();
  107. }
  108. virtual TDuration GetTimeout() const override
  109. {
  110. return Underlying_->GetTimeout();
  111. }
  112. void Detach() override
  113. {
  114. return Underlying_->Detach();
  115. }
  116. void RegisterAlienTransaction(const ITransactionPtr& transaction) override
  117. {
  118. return Underlying_->RegisterAlienTransaction(transaction);
  119. }
  120. IConnectionPtr GetConnection() override
  121. {
  122. return Underlying_->GetConnection();
  123. }
  124. void SubscribeCommitted(const TCommittedHandler& handler) override
  125. {
  126. Underlying_->SubscribeCommitted(handler);
  127. }
  128. void UnsubscribeCommitted(const TCommittedHandler& handler) override
  129. {
  130. Underlying_->UnsubscribeCommitted(handler);
  131. }
  132. void SubscribeAborted(const TAbortedHandler& handler) override
  133. {
  134. Underlying_->SubscribeAborted(handler);
  135. }
  136. void UnsubscribeAborted(const TAbortedHandler& handler) override
  137. {
  138. Underlying_->UnsubscribeAborted(handler);
  139. }
  140. UNIMPLEMENTED_METHOD(TFuture<void>, SetNode, (const NYPath::TYPath&, const NYson::TYsonString&, const TSetNodeOptions&));
  141. UNIMPLEMENTED_METHOD(TFuture<void>, MultisetAttributesNode, (const NYPath::TYPath&, const NYTree::IMapNodePtr&, const TMultisetAttributesNodeOptions&));
  142. UNIMPLEMENTED_METHOD(TFuture<void>, RemoveNode, (const NYPath::TYPath&, const TRemoveNodeOptions&));
  143. UNIMPLEMENTED_METHOD(TFuture<NCypressClient::TNodeId>, CreateNode, (const NYPath::TYPath&, NObjectClient::EObjectType, const TCreateNodeOptions&));
  144. UNIMPLEMENTED_METHOD(TFuture<TLockNodeResult>, LockNode, (const NYPath::TYPath&, NCypressClient::ELockMode, const TLockNodeOptions&));
  145. UNIMPLEMENTED_METHOD(TFuture<void>, UnlockNode, (const NYPath::TYPath&, const TUnlockNodeOptions&));
  146. UNIMPLEMENTED_METHOD(TFuture<NCypressClient::TNodeId>, CopyNode, (const NYPath::TYPath&, const NYPath::TYPath&, const TCopyNodeOptions&));
  147. UNIMPLEMENTED_METHOD(TFuture<NCypressClient::TNodeId>, MoveNode, (const NYPath::TYPath&, const NYPath::TYPath&, const TMoveNodeOptions&));
  148. UNIMPLEMENTED_METHOD(TFuture<NCypressClient::TNodeId>, LinkNode, (const NYPath::TYPath&, const NYPath::TYPath&, const TLinkNodeOptions&));
  149. UNIMPLEMENTED_METHOD(TFuture<void>, ConcatenateNodes, (const std::vector<NYPath::TRichYPath>&, const NYPath::TRichYPath&, const TConcatenateNodesOptions&));
  150. UNIMPLEMENTED_METHOD(TFuture<void>, ExternalizeNode, (const NYPath::TYPath&, NObjectClient::TCellTag, const TExternalizeNodeOptions&));
  151. UNIMPLEMENTED_METHOD(TFuture<void>, InternalizeNode, (const NYPath::TYPath&, const TInternalizeNodeOptions&));
  152. UNIMPLEMENTED_METHOD(TFuture<NObjectClient::TObjectId>, CreateObject, (NObjectClient::EObjectType, const TCreateObjectOptions&));
  153. UNIMPLEMENTED_METHOD(TFuture<ITableReaderPtr>, CreateTableReader, (const NYPath::TRichYPath&, const TTableReaderOptions&));
  154. UNIMPLEMENTED_METHOD(TFuture<IFileReaderPtr>, CreateFileReader, (const NYPath::TYPath&, const TFileReaderOptions&));
  155. UNIMPLEMENTED_METHOD(TFuture<ITableWriterPtr>, CreateTableWriter, (const NYPath::TRichYPath&, const TTableWriterOptions&));
  156. UNIMPLEMENTED_METHOD(IFileWriterPtr, CreateFileWriter, (const NYPath::TRichYPath&, const TFileWriterOptions&));
  157. UNIMPLEMENTED_METHOD(IJournalReaderPtr, CreateJournalReader, (const NYPath::TYPath&, const TJournalReaderOptions&));
  158. UNIMPLEMENTED_METHOD(IJournalWriterPtr, CreateJournalWriter, (const NYPath::TYPath&, const TJournalWriterOptions&));
  159. private:
  160. const TClientPtr Client_;
  161. const int ClientIndex_;
  162. const ITransactionPtr Underlying_;
  163. void OnResult(const TErrorOr<void>& error);
  164. };
  165. DECLARE_REFCOUNTED_TYPE(TTransaction)
  166. ////////////////////////////////////////////////////////////////////////////////
  167. DECLARE_REFCOUNTED_STRUCT(TClientDescription)
  168. struct TClientDescription final
  169. {
  170. TClientDescription(IClientPtr client, int priority)
  171. : Client(std::move(client))
  172. , Priority(priority)
  173. { }
  174. IClientPtr Client;
  175. int Priority;
  176. std::atomic<bool> HasErrors{false};
  177. };
  178. DEFINE_REFCOUNTED_TYPE(TClientDescription)
  179. ////////////////////////////////////////////////////////////////////////////////
  180. class TClient
  181. : public IClient
  182. {
  183. public:
  184. TClient(
  185. const std::vector<IClientPtr>& underlyingClients,
  186. TFederationConfigPtr config);
  187. TFuture<TUnversionedLookupRowsResult> LookupRows(
  188. const NYPath::TYPath& path,
  189. NTableClient::TNameTablePtr nameTable,
  190. const TSharedRange<NTableClient::TLegacyKey>& keys,
  191. const TLookupRowsOptions& options = {}) override;
  192. TFuture<TSelectRowsResult> SelectRows(
  193. const TString& query,
  194. const TSelectRowsOptions& options = {}) override;
  195. TFuture<std::vector<TUnversionedLookupRowsResult>> MultiLookupRows(
  196. const std::vector<TMultiLookupSubrequest>&,
  197. const TMultiLookupOptions&) override;
  198. TFuture<TVersionedLookupRowsResult> VersionedLookupRows(
  199. const NYPath::TYPath&, NTableClient::TNameTablePtr,
  200. const TSharedRange<NTableClient::TUnversionedRow>&,
  201. const TVersionedLookupRowsOptions&) override;
  202. TFuture<TPullRowsResult> PullRows(const NYPath::TYPath&, const TPullRowsOptions&) override;
  203. TFuture<NQueueClient::IQueueRowsetPtr> PullQueue(
  204. const NYPath::TRichYPath&,
  205. i64,
  206. int,
  207. const NQueueClient::TQueueRowBatchReadOptions&,
  208. const TPullQueueOptions&) override;
  209. TFuture<NQueueClient::IQueueRowsetPtr> PullConsumer(
  210. const NYPath::TRichYPath&,
  211. const NYPath::TRichYPath&,
  212. std::optional<i64>,
  213. int,
  214. const NQueueClient::TQueueRowBatchReadOptions&,
  215. const TPullConsumerOptions&) override;
  216. TFuture<ITransactionPtr> StartTransaction(
  217. NTransactionClient::ETransactionType type,
  218. const NApi::TTransactionStartOptions& options) override;
  219. TFuture<NYson::TYsonString> ExplainQuery(const TString&, const TExplainQueryOptions&) override;
  220. TFuture<NYson::TYsonString> GetNode(const NYPath::TYPath&, const TGetNodeOptions&) override;
  221. TFuture<NYson::TYsonString> ListNode(const NYPath::TYPath&, const TListNodeOptions&) override;
  222. TFuture<bool> NodeExists(const NYPath::TYPath&, const TNodeExistsOptions&) override;
  223. TFuture<std::vector<TListQueueConsumerRegistrationsResult>> ListQueueConsumerRegistrations(const std::optional<NYPath::TRichYPath>&, const std::optional<NYPath::TRichYPath>&, const TListQueueConsumerRegistrationsOptions&) override;
  224. const NTabletClient::ITableMountCachePtr& GetTableMountCache() override;
  225. TFuture<std::vector<TTabletInfo>> GetTabletInfos(const NYPath::TYPath&, const std::vector<int>&, const TGetTabletInfosOptions&) override;
  226. TFuture<NChaosClient::TReplicationCardPtr> GetReplicationCard(NChaosClient::TReplicationCardId, const TGetReplicationCardOptions&) override;
  227. const NTransactionClient::ITimestampProviderPtr& GetTimestampProvider() override;
  228. ITransactionPtr AttachTransaction(NTransactionClient::TTransactionId, const TTransactionAttachOptions&) override;
  229. IConnectionPtr GetConnection() override
  230. {
  231. auto [client, _] = GetActiveClient();
  232. return client->GetConnection();
  233. }
  234. std::optional<TStringBuf> GetClusterName(bool fetchIfNull) override
  235. {
  236. auto [client, _] = GetActiveClient();
  237. return client->GetClusterName(fetchIfNull);
  238. }
  239. void Terminate() override
  240. { }
  241. // IClientBase unsupported methods.
  242. UNIMPLEMENTED_METHOD(TFuture<void>, SetNode, (const NYPath::TYPath&, const NYson::TYsonString&, const TSetNodeOptions&));
  243. UNIMPLEMENTED_METHOD(TFuture<void>, MultisetAttributesNode, (const NYPath::TYPath&, const NYTree::IMapNodePtr&, const TMultisetAttributesNodeOptions&));
  244. UNIMPLEMENTED_METHOD(TFuture<void>, RemoveNode, (const NYPath::TYPath&, const TRemoveNodeOptions&));
  245. UNIMPLEMENTED_METHOD(TFuture<NCypressClient::TNodeId>, CreateNode, (const NYPath::TYPath&, NObjectClient::EObjectType, const TCreateNodeOptions&));
  246. UNIMPLEMENTED_METHOD(TFuture<TLockNodeResult>, LockNode, (const NYPath::TYPath&, NCypressClient::ELockMode, const TLockNodeOptions&));
  247. UNIMPLEMENTED_METHOD(TFuture<void>, UnlockNode, (const NYPath::TYPath&, const TUnlockNodeOptions&));
  248. UNIMPLEMENTED_METHOD(TFuture<NCypressClient::TNodeId>, CopyNode, (const NYPath::TYPath&, const NYPath::TYPath&, const TCopyNodeOptions&));
  249. UNIMPLEMENTED_METHOD(TFuture<NCypressClient::TNodeId>, MoveNode, (const NYPath::TYPath&, const NYPath::TYPath&, const TMoveNodeOptions&));
  250. UNIMPLEMENTED_METHOD(TFuture<NCypressClient::TNodeId>, LinkNode, (const NYPath::TYPath&, const NYPath::TYPath&, const TLinkNodeOptions&));
  251. UNIMPLEMENTED_METHOD(TFuture<void>, ConcatenateNodes, (const std::vector<NYPath::TRichYPath>&, const NYPath::TRichYPath&, const TConcatenateNodesOptions&));
  252. UNIMPLEMENTED_METHOD(TFuture<void>, ExternalizeNode, (const NYPath::TYPath&, NObjectClient::TCellTag, const TExternalizeNodeOptions&));
  253. UNIMPLEMENTED_METHOD(TFuture<void>, InternalizeNode, (const NYPath::TYPath&, const TInternalizeNodeOptions&));
  254. UNIMPLEMENTED_METHOD(TFuture<NObjectClient::TObjectId>, CreateObject, (NObjectClient::EObjectType, const TCreateObjectOptions&));
  255. UNIMPLEMENTED_METHOD(TFuture<TQueryResult>, GetQueryResult, (NQueryTrackerClient::TQueryId, i64, const TGetQueryResultOptions&));
  256. // IClient unsupported methods.
  257. UNIMPLEMENTED_METHOD(TFuture<void>, RegisterQueueConsumer, (const NYPath::TRichYPath&, const NYPath::TRichYPath&, bool, const TRegisterQueueConsumerOptions&));
  258. UNIMPLEMENTED_METHOD(TFuture<void>, UnregisterQueueConsumer, (const NYPath::TRichYPath&, const NYPath::TRichYPath&, const TUnregisterQueueConsumerOptions&));
  259. UNIMPLEMENTED_METHOD(const NChaosClient::IReplicationCardCachePtr&, GetReplicationCardCache, ());
  260. UNIMPLEMENTED_METHOD(TFuture<void>, MountTable, (const NYPath::TYPath&, const TMountTableOptions&));
  261. UNIMPLEMENTED_METHOD(TFuture<void>, UnmountTable, (const NYPath::TYPath&, const TUnmountTableOptions&));
  262. UNIMPLEMENTED_METHOD(TFuture<void>, RemountTable, (const NYPath::TYPath&, const TRemountTableOptions&));
  263. UNIMPLEMENTED_METHOD(TFuture<void>, FreezeTable, (const NYPath::TYPath&, const TFreezeTableOptions&));
  264. UNIMPLEMENTED_METHOD(TFuture<void>, UnfreezeTable, (const NYPath::TYPath&, const TUnfreezeTableOptions&));
  265. UNIMPLEMENTED_METHOD(TFuture<void>, ReshardTable, (const NYPath::TYPath&, const std::vector<NTableClient::TUnversionedOwningRow>&, const TReshardTableOptions&));
  266. UNIMPLEMENTED_METHOD(TFuture<void>, ReshardTable, (const NYPath::TYPath&, int, const TReshardTableOptions&));
  267. UNIMPLEMENTED_METHOD(TFuture<std::vector<NTabletClient::TTabletActionId>>, ReshardTableAutomatic, (const NYPath::TYPath&, const TReshardTableAutomaticOptions&));
  268. UNIMPLEMENTED_METHOD(TFuture<void>, TrimTable, (const NYPath::TYPath&, int, i64, const TTrimTableOptions&));
  269. UNIMPLEMENTED_METHOD(TFuture<void>, AlterTable, (const NYPath::TYPath&, const TAlterTableOptions&));
  270. UNIMPLEMENTED_METHOD(TFuture<void>, AlterTableReplica, (NTabletClient::TTableReplicaId, const TAlterTableReplicaOptions&));
  271. UNIMPLEMENTED_METHOD(TFuture<void>, AlterReplicationCard, (NChaosClient::TReplicationCardId, const TAlterReplicationCardOptions&));
  272. UNIMPLEMENTED_METHOD(TFuture<std::vector<NTabletClient::TTableReplicaId>>, GetInSyncReplicas, (const NYPath::TYPath&, const NTableClient::TNameTablePtr&, const TSharedRange<NTableClient::TUnversionedRow>&, const TGetInSyncReplicasOptions&));
  273. UNIMPLEMENTED_METHOD(TFuture<std::vector<NTabletClient::TTableReplicaId>>, GetInSyncReplicas, (const NYPath::TYPath&, const TGetInSyncReplicasOptions&));
  274. UNIMPLEMENTED_METHOD(TFuture<TGetTabletErrorsResult>, GetTabletErrors, (const NYPath::TYPath&, const TGetTabletErrorsOptions&));
  275. UNIMPLEMENTED_METHOD(TFuture<std::vector<NTabletClient::TTabletActionId>>, BalanceTabletCells, (const TString&, const std::vector<NYPath::TYPath>&, const TBalanceTabletCellsOptions&));
  276. UNIMPLEMENTED_METHOD(TFuture<TSkynetSharePartsLocationsPtr>, LocateSkynetShare, (const NYPath::TRichYPath&, const TLocateSkynetShareOptions&));
  277. UNIMPLEMENTED_METHOD(TFuture<std::vector<NTableClient::TColumnarStatistics>>, GetColumnarStatistics, (const std::vector<NYPath::TRichYPath>&, const TGetColumnarStatisticsOptions&));
  278. UNIMPLEMENTED_METHOD(TFuture<TMultiTablePartitions>, PartitionTables, (const std::vector<NYPath::TRichYPath>&, const TPartitionTablesOptions&));
  279. UNIMPLEMENTED_METHOD(TFuture<NYson::TYsonString>, GetTablePivotKeys, (const NYPath::TYPath&, const TGetTablePivotKeysOptions&));
  280. UNIMPLEMENTED_METHOD(TFuture<void>, CreateTableBackup, (const TBackupManifestPtr&, const TCreateTableBackupOptions&));
  281. UNIMPLEMENTED_METHOD(TFuture<void>, RestoreTableBackup, (const TBackupManifestPtr&, const TRestoreTableBackupOptions&));
  282. UNIMPLEMENTED_METHOD(TFuture<void>, TruncateJournal, (const NYPath::TYPath&, i64, const TTruncateJournalOptions&));
  283. UNIMPLEMENTED_METHOD(TFuture<TGetFileFromCacheResult>, GetFileFromCache, (const TString&, const TGetFileFromCacheOptions&));
  284. UNIMPLEMENTED_METHOD(TFuture<TPutFileToCacheResult>, PutFileToCache, (const NYPath::TYPath&, const TString&, const TPutFileToCacheOptions&));
  285. UNIMPLEMENTED_METHOD(TFuture<void>, AddMember, (const TString&, const TString&, const TAddMemberOptions&));
  286. UNIMPLEMENTED_METHOD(TFuture<void>, RemoveMember, (const TString&, const TString&, const TRemoveMemberOptions&));
  287. UNIMPLEMENTED_METHOD(TFuture<TCheckPermissionResponse>, CheckPermission, (const TString&, const NYPath::TYPath&, NYTree::EPermission, const TCheckPermissionOptions&));
  288. UNIMPLEMENTED_METHOD(TFuture<TCheckPermissionByAclResult>, CheckPermissionByAcl, (const std::optional<TString>&, NYTree::EPermission, NYTree::INodePtr, const TCheckPermissionByAclOptions&));
  289. UNIMPLEMENTED_METHOD(TFuture<void>, TransferAccountResources, (const TString&, const TString&, NYTree::INodePtr, const TTransferAccountResourcesOptions&));
  290. UNIMPLEMENTED_METHOD(TFuture<void>, TransferPoolResources, (const TString&, const TString&, const TString&, NYTree::INodePtr, const TTransferPoolResourcesOptions&));
  291. UNIMPLEMENTED_METHOD(TFuture<NScheduler::TOperationId>, StartOperation, (NScheduler::EOperationType, const NYson::TYsonString&, const TStartOperationOptions&));
  292. UNIMPLEMENTED_METHOD(TFuture<void>, AbortOperation, (const NScheduler::TOperationIdOrAlias&, const TAbortOperationOptions&));
  293. UNIMPLEMENTED_METHOD(TFuture<void>, SuspendOperation, (const NScheduler::TOperationIdOrAlias&, const TSuspendOperationOptions&));
  294. UNIMPLEMENTED_METHOD(TFuture<void>, ResumeOperation, (const NScheduler::TOperationIdOrAlias&, const TResumeOperationOptions&));
  295. UNIMPLEMENTED_METHOD(TFuture<void>, CompleteOperation, (const NScheduler::TOperationIdOrAlias&, const TCompleteOperationOptions&));
  296. UNIMPLEMENTED_METHOD(TFuture<void>, UpdateOperationParameters, (const NScheduler::TOperationIdOrAlias&, const NYson::TYsonString&, const TUpdateOperationParametersOptions&));
  297. UNIMPLEMENTED_METHOD(TFuture<TOperation>, GetOperation, (const NScheduler::TOperationIdOrAlias&, const TGetOperationOptions&));
  298. UNIMPLEMENTED_METHOD(TFuture<void>, DumpJobContext, (NJobTrackerClient::TJobId, const NYPath::TYPath&, const TDumpJobContextOptions&));
  299. UNIMPLEMENTED_METHOD(TFuture<NConcurrency::IAsyncZeroCopyInputStreamPtr>, GetJobInput, (NJobTrackerClient::TJobId, const TGetJobInputOptions&));
  300. UNIMPLEMENTED_METHOD(TFuture<NYson::TYsonString>, GetJobInputPaths, (NJobTrackerClient::TJobId, const TGetJobInputPathsOptions&));
  301. UNIMPLEMENTED_METHOD(TFuture<NYson::TYsonString>, GetJobSpec, (NJobTrackerClient::TJobId, const TGetJobSpecOptions&));
  302. UNIMPLEMENTED_METHOD(TFuture<TSharedRef>, GetJobStderr, (const NScheduler::TOperationIdOrAlias&, NJobTrackerClient::TJobId, const TGetJobStderrOptions&));
  303. UNIMPLEMENTED_METHOD(TFuture<TSharedRef>, GetJobFailContext, (const NScheduler::TOperationIdOrAlias&, NJobTrackerClient::TJobId, const TGetJobFailContextOptions&));
  304. UNIMPLEMENTED_METHOD(TFuture<TListOperationsResult>, ListOperations, (const TListOperationsOptions&));
  305. UNIMPLEMENTED_METHOD(TFuture<TListJobsResult>, ListJobs, (const NScheduler::TOperationIdOrAlias&, const TListJobsOptions&));
  306. UNIMPLEMENTED_METHOD(TFuture<NYson::TYsonString>, GetJob, (const NScheduler::TOperationIdOrAlias&, NJobTrackerClient::TJobId, const TGetJobOptions&));
  307. UNIMPLEMENTED_METHOD(TFuture<void>, AbandonJob, (NJobTrackerClient::TJobId, const TAbandonJobOptions&));
  308. UNIMPLEMENTED_METHOD(TFuture<TPollJobShellResponse>, PollJobShell, (NJobTrackerClient::TJobId, const std::optional<TString>&, const NYson::TYsonString&, const TPollJobShellOptions&));
  309. UNIMPLEMENTED_METHOD(TFuture<void>, AbortJob, (NJobTrackerClient::TJobId, const TAbortJobOptions&));
  310. UNIMPLEMENTED_METHOD(TFuture<TClusterMeta>, GetClusterMeta, (const TGetClusterMetaOptions&));
  311. UNIMPLEMENTED_METHOD(TFuture<void>, CheckClusterLiveness, (const TCheckClusterLivenessOptions&));
  312. UNIMPLEMENTED_METHOD(TFuture<int>, BuildSnapshot, (const TBuildSnapshotOptions&));
  313. UNIMPLEMENTED_METHOD(TFuture<TCellIdToSnapshotIdMap>, BuildMasterSnapshots, (const TBuildMasterSnapshotsOptions&));
  314. UNIMPLEMENTED_METHOD(TFuture<void>, ExitReadOnly, (NObjectClient::TCellId, const TExitReadOnlyOptions&));
  315. UNIMPLEMENTED_METHOD(TFuture<void>, MasterExitReadOnly, (const TMasterExitReadOnlyOptions&));
  316. UNIMPLEMENTED_METHOD(TFuture<void>, DiscombobulateNonvotingPeers, (NObjectClient::TCellId, const TDiscombobulateNonvotingPeersOptions&));
  317. UNIMPLEMENTED_METHOD(TFuture<void>, SwitchLeader, (NObjectClient::TCellId, const TString&, const TSwitchLeaderOptions&));
  318. UNIMPLEMENTED_METHOD(TFuture<void>, ResetStateHash, (NObjectClient::TCellId, const TResetStateHashOptions&));
  319. UNIMPLEMENTED_METHOD(TFuture<void>, GCCollect, (const TGCCollectOptions&));
  320. UNIMPLEMENTED_METHOD(TFuture<void>, KillProcess, (const TString&, const TKillProcessOptions&));
  321. UNIMPLEMENTED_METHOD(TFuture<TString>, WriteCoreDump, (const TString&, const TWriteCoreDumpOptions&));
  322. UNIMPLEMENTED_METHOD(TFuture<TGuid>, WriteLogBarrier, (const TString&, const TWriteLogBarrierOptions&));
  323. UNIMPLEMENTED_METHOD(TFuture<TString>, WriteOperationControllerCoreDump, (NJobTrackerClient::TOperationId, const TWriteOperationControllerCoreDumpOptions&));
  324. UNIMPLEMENTED_METHOD(TFuture<void>, HealExecNode, (const TString&, const THealExecNodeOptions&));
  325. UNIMPLEMENTED_METHOD(TFuture<void>, SuspendCoordinator, (NObjectClient::TCellId, const TSuspendCoordinatorOptions&));
  326. UNIMPLEMENTED_METHOD(TFuture<void>, ResumeCoordinator, (NObjectClient::TCellId, const TResumeCoordinatorOptions&));
  327. UNIMPLEMENTED_METHOD(TFuture<void>, MigrateReplicationCards, (NObjectClient::TCellId, const TMigrateReplicationCardsOptions&));
  328. UNIMPLEMENTED_METHOD(TFuture<void>, SuspendChaosCells, (const std::vector<NObjectClient::TCellId>&, const TSuspendChaosCellsOptions&));
  329. UNIMPLEMENTED_METHOD(TFuture<void>, ResumeChaosCells, (const std::vector<NObjectClient::TCellId>&, const TResumeChaosCellsOptions&));
  330. UNIMPLEMENTED_METHOD(TFuture<void>, SuspendTabletCells, (const std::vector<NObjectClient::TCellId>&, const TSuspendTabletCellsOptions&));
  331. UNIMPLEMENTED_METHOD(TFuture<void>, ResumeTabletCells, (const std::vector<NObjectClient::TCellId>&, const TResumeTabletCellsOptions&));
  332. UNIMPLEMENTED_METHOD(TFuture<void>, UpdateChaosTableReplicaProgress, (NChaosClient::TReplicaId, const TUpdateChaosTableReplicaProgressOptions&));
  333. UNIMPLEMENTED_METHOD(TFuture<TMaintenanceId>, AddMaintenance, (EMaintenanceComponent, const TString&, EMaintenanceType, const TString&, const TAddMaintenanceOptions&));
  334. UNIMPLEMENTED_METHOD(TFuture<TMaintenanceCounts>, RemoveMaintenance, (EMaintenanceComponent, const TString&, const TMaintenanceFilter&, const TRemoveMaintenanceOptions&));
  335. UNIMPLEMENTED_METHOD(TFuture<TDisableChunkLocationsResult>, DisableChunkLocations, (const TString&, const std::vector<TGuid>&, const TDisableChunkLocationsOptions&));
  336. UNIMPLEMENTED_METHOD(TFuture<TDestroyChunkLocationsResult>, DestroyChunkLocations, (const TString&, bool, const std::vector<TGuid>&, const TDestroyChunkLocationsOptions&));
  337. UNIMPLEMENTED_METHOD(TFuture<TResurrectChunkLocationsResult>, ResurrectChunkLocations, (const TString&, const std::vector<TGuid>&, const TResurrectChunkLocationsOptions&));
  338. UNIMPLEMENTED_METHOD(TFuture<TRequestRestartResult>, RequestRestart, (const TString&, const TRequestRestartOptions&));
  339. UNIMPLEMENTED_METHOD(TFuture<void>, SetUserPassword, (const TString&, const TString&, const TString&, const TSetUserPasswordOptions&));
  340. UNIMPLEMENTED_METHOD(TFuture<TIssueTokenResult>, IssueToken, (const TString&, const TString&, const TIssueTokenOptions&));
  341. UNIMPLEMENTED_METHOD(TFuture<void>, RevokeToken, (const TString&, const TString&, const TString&, const TRevokeTokenOptions&));
  342. UNIMPLEMENTED_METHOD(TFuture<TListUserTokensResult>, ListUserTokens, (const TString&, const TString&, const TListUserTokensOptions&));
  343. UNIMPLEMENTED_METHOD(TFuture<NQueryTrackerClient::TQueryId>, StartQuery, (NQueryTrackerClient::EQueryEngine, const TString&, const TStartQueryOptions&));
  344. UNIMPLEMENTED_METHOD(TFuture<void>, AbortQuery, (NQueryTrackerClient::TQueryId, const TAbortQueryOptions&));
  345. UNIMPLEMENTED_METHOD(TFuture<IUnversionedRowsetPtr>, ReadQueryResult, (NQueryTrackerClient::TQueryId, i64, const TReadQueryResultOptions&));
  346. UNIMPLEMENTED_METHOD(TFuture<TQuery>, GetQuery, (NQueryTrackerClient::TQueryId, const TGetQueryOptions&));
  347. UNIMPLEMENTED_METHOD(TFuture<TListQueriesResult>, ListQueries, (const TListQueriesOptions&));
  348. UNIMPLEMENTED_METHOD(TFuture<void>, AlterQuery, (NQueryTrackerClient::TQueryId, const TAlterQueryOptions&));
  349. UNIMPLEMENTED_METHOD(TFuture<NBundleControllerClient::TBundleConfigDescriptorPtr>, GetBundleConfig, (const TString&, const NBundleControllerClient::TGetBundleConfigOptions&));
  350. UNIMPLEMENTED_METHOD(TFuture<void>, SetBundleConfig, (const TString&, const NBundleControllerClient::TBundleTargetConfigPtr&, const NBundleControllerClient::TSetBundleConfigOptions&));
  351. UNIMPLEMENTED_METHOD(TFuture<ITableReaderPtr>, CreateTableReader, (const NYPath::TRichYPath&, const TTableReaderOptions&));
  352. UNIMPLEMENTED_METHOD(TFuture<ITableWriterPtr>, CreateTableWriter, (const NYPath::TRichYPath&, const TTableWriterOptions&));
  353. UNIMPLEMENTED_METHOD(TFuture<IFileReaderPtr>, CreateFileReader, (const NYPath::TYPath&, const TFileReaderOptions&));
  354. UNIMPLEMENTED_METHOD(IFileWriterPtr, CreateFileWriter, (const NYPath::TRichYPath&, const TFileWriterOptions&));
  355. UNIMPLEMENTED_METHOD(IJournalReaderPtr, CreateJournalReader, (const NYPath::TYPath&, const TJournalReaderOptions&));
  356. UNIMPLEMENTED_METHOD(IJournalWriterPtr, CreateJournalWriter, (const NYPath::TYPath&, const TJournalWriterOptions&));
  357. UNIMPLEMENTED_METHOD(TFuture<TGetPipelineSpecResult>, GetPipelineSpec, (const NYPath::TYPath&, const TGetPipelineSpecOptions&));
  358. UNIMPLEMENTED_METHOD(TFuture<TSetPipelineSpecResult>, SetPipelineSpec, (const NYPath::TYPath&, const NYson::TYsonString&, const TSetPipelineSpecOptions&));
  359. UNIMPLEMENTED_METHOD(TFuture<TGetPipelineDynamicSpecResult>, GetPipelineDynamicSpec, (const NYPath::TYPath&, const TGetPipelineDynamicSpecOptions&));
  360. UNIMPLEMENTED_METHOD(TFuture<TSetPipelineDynamicSpecResult>, SetPipelineDynamicSpec, (const NYPath::TYPath&, const NYson::TYsonString&, const TSetPipelineDynamicSpecOptions&));
  361. UNIMPLEMENTED_METHOD(TFuture<void>, StartPipeline, (const NYPath::TYPath&, const TStartPipelineOptions&));
  362. UNIMPLEMENTED_METHOD(TFuture<void>, StopPipeline, (const NYPath::TYPath&, const TStopPipelineOptions&));
  363. UNIMPLEMENTED_METHOD(TFuture<void>, PausePipeline, (const NYPath::TYPath&, const TPausePipelineOptions&));
  364. UNIMPLEMENTED_METHOD(TFuture<TPipelineStatus>, GetPipelineStatus, (const NYPath::TYPath&, const TGetPipelineStatusOptions&));
  365. private:
  366. friend class TTransaction;
  367. struct TActiveClientInfo
  368. {
  369. IClientPtr Client;
  370. int ClientIndex;
  371. };
  372. template <class T>
  373. TFuture<T> DoCall(int retryAttemptCount, const TCallback<TFuture<T>(const IClientPtr&, int)>& callee);
  374. void HandleError(const TErrorOr<void>& error, int clientIndex);
  375. void UpdateActiveClient();
  376. TActiveClientInfo GetActiveClient();
  377. void CheckClustersHealth();
  378. private:
  379. const TFederationConfigPtr Config_;
  380. const NConcurrency::TPeriodicExecutorPtr Executor_;
  381. std::vector<TClientDescriptionPtr> UnderlyingClients_;
  382. IClientPtr ActiveClient_;
  383. std::atomic<int> ActiveClientIndex_;
  384. YT_DECLARE_SPIN_LOCK(NThreading::TReaderWriterSpinLock, Lock_);
  385. };
  386. DECLARE_REFCOUNTED_TYPE(TTransaction)
  387. ////////////////////////////////////////////////////////////////////////////////
  388. TTransaction::TTransaction(TClientPtr client, int clientIndex, ITransactionPtr underlying)
  389. : Client_(std::move(client))
  390. , ClientIndex_(clientIndex)
  391. , Underlying_(std::move(underlying))
  392. { }
  393. void TTransaction::OnResult(const TErrorOr<void>& error)
  394. {
  395. if (!error.IsOK()) {
  396. Client_->HandleError(error, ClientIndex_);
  397. }
  398. }
  399. #define TRANSACTION_METHOD_IMPL(ResultType, MethodName, Args) \
  400. TFuture<ResultType> TTransaction::MethodName(Y_METHOD_USED_ARGS_DECLARATION(Args)) \
  401. { \
  402. auto future = Underlying_->MethodName(Y_PASS_METHOD_USED_ARGS(Args)); \
  403. future.Subscribe(BIND(&TTransaction::OnResult, MakeStrong(this))); \
  404. return future; \
  405. } Y_SEMICOLON_GUARD
  406. TRANSACTION_METHOD_IMPL(TUnversionedLookupRowsResult, LookupRows, (const NYPath::TYPath&, NTableClient::TNameTablePtr, const TSharedRange<NTableClient::TUnversionedRow>&, const TLookupRowsOptions&));
  407. TRANSACTION_METHOD_IMPL(TSelectRowsResult, SelectRows, (const TString&, const TSelectRowsOptions&));
  408. TRANSACTION_METHOD_IMPL(void, Ping, (const NApi::TTransactionPingOptions&));
  409. TRANSACTION_METHOD_IMPL(TTransactionCommitResult, Commit, (const TTransactionCommitOptions&));
  410. TRANSACTION_METHOD_IMPL(void, Abort, (const TTransactionAbortOptions&));
  411. TRANSACTION_METHOD_IMPL(TVersionedLookupRowsResult, VersionedLookupRows, (const NYPath::TYPath&, NTableClient::TNameTablePtr, const TSharedRange<NTableClient::TUnversionedRow>&, const TVersionedLookupRowsOptions&));
  412. TRANSACTION_METHOD_IMPL(std::vector<TUnversionedLookupRowsResult>, MultiLookupRows, (const std::vector<TMultiLookupSubrequest>&, const TMultiLookupOptions&));
  413. TRANSACTION_METHOD_IMPL(TPullRowsResult, PullRows, (const NYPath::TYPath&, const TPullRowsOptions&));
  414. TRANSACTION_METHOD_IMPL(void, AdvanceConsumer, (const NYPath::TRichYPath&, const NYPath::TRichYPath&, int, std::optional<i64>, i64, const TAdvanceConsumerOptions&));
  415. TRANSACTION_METHOD_IMPL(NYson::TYsonString, ExplainQuery, (const TString&, const TExplainQueryOptions&));
  416. TRANSACTION_METHOD_IMPL(NYson::TYsonString, GetNode, (const NYPath::TYPath&, const TGetNodeOptions&));
  417. TRANSACTION_METHOD_IMPL(NYson::TYsonString, ListNode, (const NYPath::TYPath&, const TListNodeOptions&));
  418. TRANSACTION_METHOD_IMPL(bool, NodeExists, (const NYPath::TYPath&, const TNodeExistsOptions&));
  419. void TTransaction::ModifyRows(
  420. const NYPath::TYPath& path,
  421. NTableClient::TNameTablePtr nameTable,
  422. TSharedRange<TRowModification> modifications,
  423. const TModifyRowsOptions& options)
  424. {
  425. Underlying_->ModifyRows(path, nameTable, modifications, options);
  426. }
  427. TFuture<TTransactionFlushResult> TTransaction::Flush()
  428. {
  429. auto future = Underlying_->Flush();
  430. future.Subscribe(BIND(&TTransaction::OnResult, MakeStrong(this)));
  431. return future;
  432. }
  433. TFuture<ITransactionPtr> TTransaction::StartTransaction(
  434. NTransactionClient::ETransactionType type,
  435. const TTransactionStartOptions& options)
  436. {
  437. return Underlying_->StartTransaction(type, options).ApplyUnique(BIND(
  438. [this, this_ = MakeStrong(this)] (TErrorOr<ITransactionPtr>&& result) -> TErrorOr<ITransactionPtr> {
  439. if (!result.IsOK()) {
  440. Client_->HandleError(result, ClientIndex_);
  441. return result;
  442. } else {
  443. return {New<TTransaction>(Client_, ClientIndex_, result.Value())};
  444. }
  445. }
  446. ));
  447. }
  448. DEFINE_REFCOUNTED_TYPE(TTransaction)
  449. ////////////////////////////////////////////////////////////////////////////////
  450. TClient::TClient(const std::vector<IClientPtr>& underlyingClients, TFederationConfigPtr config)
  451. : Config_(std::move(config))
  452. , Executor_(New<NConcurrency::TPeriodicExecutor>(
  453. NRpc::TDispatcher::Get()->GetLightInvoker(),
  454. BIND(&TClient::CheckClustersHealth, MakeWeak(this)),
  455. Config_->ClusterHealthCheckPeriod))
  456. {
  457. YT_VERIFY(!underlyingClients.empty());
  458. UnderlyingClients_.reserve(underlyingClients.size());
  459. const auto& localDatacenter = NNet::GetLocalYPCluster();
  460. for (const auto& client : underlyingClients) {
  461. int priority = GetDataCenterByClient(client) == localDatacenter ? 1 : 0;
  462. UnderlyingClients_.push_back(New<TClientDescription>(client, priority));
  463. }
  464. std::stable_sort(UnderlyingClients_.begin(), UnderlyingClients_.end(), [](const auto& lhs, const auto& rhs) {
  465. return lhs->Priority > rhs->Priority;
  466. });
  467. ActiveClient_ = UnderlyingClients_[0]->Client;
  468. ActiveClientIndex_ = 0;
  469. Executor_->Start();
  470. }
  471. void TClient::CheckClustersHealth()
  472. {
  473. TCheckClusterLivenessOptions options;
  474. options.CheckCypressRoot = true;
  475. options.CheckTabletCellBundle = Config_->BundleName;
  476. int activeClientIndex = ActiveClientIndex_.load();
  477. std::optional<int> betterClientIndex;
  478. std::vector<TFuture<void>> checks;
  479. checks.reserve(UnderlyingClients_.size());
  480. for (const auto& clientDescription : UnderlyingClients_) {
  481. checks.emplace_back(clientDescription->Client->CheckClusterLiveness(options));
  482. }
  483. for (int index = 0; index < std::ssize(checks); ++index) {
  484. const auto& check = checks[index];
  485. bool hasErrors = !NConcurrency::WaitFor(check).IsOK();
  486. UnderlyingClients_[index]->HasErrors = hasErrors;
  487. if (!betterClientIndex && !hasErrors && index < activeClientIndex) {
  488. betterClientIndex = index;
  489. }
  490. }
  491. if (betterClientIndex && ActiveClientIndex_ == activeClientIndex) {
  492. int newClientIndex = *betterClientIndex;
  493. auto guard = NThreading::WriterGuard(Lock_);
  494. ActiveClient_ = UnderlyingClients_[newClientIndex]->Client;
  495. ActiveClientIndex_ = newClientIndex;
  496. return;
  497. }
  498. // If active cluster is not healthy, try changing it.
  499. if (UnderlyingClients_[activeClientIndex]->HasErrors) {
  500. auto guard = NThreading::WriterGuard(Lock_);
  501. // Check that active client wasn't changed.
  502. if (ActiveClientIndex_ == activeClientIndex && UnderlyingClients_[activeClientIndex]->HasErrors) {
  503. UpdateActiveClient();
  504. }
  505. }
  506. }
  507. template <class T>
  508. TFuture<T> TClient::DoCall(int retryAttemptCount, const TCallback<TFuture<T>(const IClientPtr&, int)>& callee)
  509. {
  510. auto [client, clientIndex] = GetActiveClient();
  511. return callee(client, clientIndex).ApplyUnique(BIND(
  512. [
  513. this,
  514. this_ = MakeStrong(this),
  515. retryAttemptCount,
  516. callee,
  517. clientIndex = clientIndex
  518. ] (TErrorOr<T>&& result) {
  519. if (!result.IsOK()) {
  520. HandleError(result, clientIndex);
  521. if (retryAttemptCount > 1) {
  522. return DoCall<T>(retryAttemptCount - 1, callee);
  523. }
  524. }
  525. return MakeFuture(std::move(result));
  526. }));
  527. }
  528. TFuture<ITransactionPtr> TClient::StartTransaction(
  529. NTransactionClient::ETransactionType type,
  530. const NApi::TTransactionStartOptions& options)
  531. {
  532. auto callee = BIND([this_ = MakeStrong(this), type, options] (const IClientPtr& client, int clientIndex) {
  533. return client->StartTransaction(type, options).ApplyUnique(BIND(
  534. [this_, clientIndex] (ITransactionPtr&& transaction) -> ITransactionPtr {
  535. return New<TTransaction>(std::move(this_), clientIndex, std::move(transaction));
  536. }));
  537. });
  538. return DoCall<ITransactionPtr>(Config_->ClusterRetryAttempts, callee);
  539. }
  540. #define CLIENT_METHOD_IMPL(ResultType, MethodName, Args) \
  541. TFuture<ResultType> TClient::MethodName(Y_METHOD_USED_ARGS_DECLARATION(Args)) \
  542. { \
  543. auto callee = BIND([Y_PASS_METHOD_USED_ARGS(Args)] (const IClientPtr& client, int /*clientIndex*/) { \
  544. return client->MethodName(Y_PASS_METHOD_USED_ARGS(Args)); \
  545. }); \
  546. return DoCall<ResultType>(Config_->ClusterRetryAttempts, callee); \
  547. } Y_SEMICOLON_GUARD
  548. CLIENT_METHOD_IMPL(TUnversionedLookupRowsResult, LookupRows, (const NYPath::TYPath&, NTableClient::TNameTablePtr, const TSharedRange<NTableClient::TLegacyKey>&, const TLookupRowsOptions&));
  549. CLIENT_METHOD_IMPL(TSelectRowsResult, SelectRows, (const TString&, const TSelectRowsOptions&));
  550. CLIENT_METHOD_IMPL(std::vector<TUnversionedLookupRowsResult>, MultiLookupRows, (const std::vector<TMultiLookupSubrequest>&, const TMultiLookupOptions&));
  551. CLIENT_METHOD_IMPL(TVersionedLookupRowsResult, VersionedLookupRows, (const NYPath::TYPath&, NTableClient::TNameTablePtr, const TSharedRange<NTableClient::TUnversionedRow>&, const TVersionedLookupRowsOptions&));
  552. CLIENT_METHOD_IMPL(TPullRowsResult, PullRows, (const NYPath::TYPath&, const TPullRowsOptions&));
  553. CLIENT_METHOD_IMPL(NQueueClient::IQueueRowsetPtr, PullQueue, (const NYPath::TRichYPath&, i64, int, const NQueueClient::TQueueRowBatchReadOptions&, const TPullQueueOptions&));
  554. CLIENT_METHOD_IMPL(NQueueClient::IQueueRowsetPtr, PullConsumer, (const NYPath::TRichYPath&, const NYPath::TRichYPath&, std::optional<i64>, int, const NQueueClient::TQueueRowBatchReadOptions&, const TPullConsumerOptions&));
  555. CLIENT_METHOD_IMPL(NYson::TYsonString, ExplainQuery, (const TString&, const TExplainQueryOptions&));
  556. CLIENT_METHOD_IMPL(NYson::TYsonString, GetNode, (const NYPath::TYPath&, const TGetNodeOptions&));
  557. CLIENT_METHOD_IMPL(NYson::TYsonString, ListNode, (const NYPath::TYPath&, const TListNodeOptions&));
  558. CLIENT_METHOD_IMPL(bool, NodeExists, (const NYPath::TYPath&, const TNodeExistsOptions&));
  559. CLIENT_METHOD_IMPL(std::vector<TTabletInfo>, GetTabletInfos, (const NYPath::TYPath&, const std::vector<int>&, const TGetTabletInfosOptions&));
  560. CLIENT_METHOD_IMPL(NChaosClient::TReplicationCardPtr, GetReplicationCard, (NChaosClient::TReplicationCardId, const TGetReplicationCardOptions&));
  561. CLIENT_METHOD_IMPL(std::vector<TListQueueConsumerRegistrationsResult>, ListQueueConsumerRegistrations, (const std::optional<NYPath::TRichYPath>&, const std::optional<NYPath::TRichYPath>&, const TListQueueConsumerRegistrationsOptions&));
  562. const NTabletClient::ITableMountCachePtr& TClient::GetTableMountCache()
  563. {
  564. auto [client, _] = GetActiveClient();
  565. return client->GetTableMountCache();
  566. }
  567. const NTransactionClient::ITimestampProviderPtr& TClient::GetTimestampProvider()
  568. {
  569. auto [client, _] = GetActiveClient();
  570. return client->GetTimestampProvider();
  571. }
  572. ITransactionPtr TClient::AttachTransaction(
  573. NTransactionClient::TTransactionId transactionId,
  574. const TTransactionAttachOptions& options)
  575. {
  576. auto transactionClusterTag = NObjectClient::CellTagFromId(transactionId);
  577. for (const auto& clientDescription : UnderlyingClients_) {
  578. const auto& client = clientDescription->Client;
  579. auto clientClusterTag = client->GetConnection()->GetClusterTag();
  580. if (clientClusterTag == transactionClusterTag) {
  581. return client->AttachTransaction(transactionId, options);
  582. }
  583. }
  584. THROW_ERROR_EXCEPTION("No client is known for transaction %v", transactionId);
  585. }
  586. void TClient::HandleError(const TErrorOr<void>& error, int clientIndex)
  587. {
  588. if (!NRpc::IsChannelFailureError(error) && !Config_->RetryAnyError) {
  589. return;
  590. }
  591. UnderlyingClients_[clientIndex]->HasErrors = true;
  592. if (ActiveClientIndex_ != clientIndex) {
  593. return;
  594. }
  595. auto guard = WriterGuard(Lock_);
  596. if (ActiveClientIndex_ != clientIndex) {
  597. return;
  598. }
  599. UpdateActiveClient();
  600. }
  601. void TClient::UpdateActiveClient()
  602. {
  603. VERIFY_WRITER_SPINLOCK_AFFINITY(Lock_);
  604. int activeClientIndex = ActiveClientIndex_.load();
  605. for (int index = 0; index < std::ssize(UnderlyingClients_); ++index) {
  606. const auto& clientDescription = UnderlyingClients_[index];
  607. if (!clientDescription->HasErrors) {
  608. if (activeClientIndex != index) {
  609. YT_LOG_DEBUG("Active client was changed (PreviousClientIndex: %v, NewClientIndex: %v)",
  610. activeClientIndex,
  611. index);
  612. }
  613. ActiveClient_ = clientDescription->Client;
  614. ActiveClientIndex_ = index;
  615. break;
  616. }
  617. }
  618. }
  619. TClient::TActiveClientInfo TClient::GetActiveClient()
  620. {
  621. auto guard = ReaderGuard(Lock_);
  622. YT_LOG_TRACE("Request will be send to the active client (ClientIndex: %v)",
  623. ActiveClientIndex_.load());
  624. return {ActiveClient_, ActiveClientIndex_.load()};
  625. }
  626. DEFINE_REFCOUNTED_TYPE(TClient)
  627. ////////////////////////////////////////////////////////////////////////////////
  628. IClientPtr CreateClient(
  629. std::vector<NApi::IClientPtr> clients,
  630. TFederationConfigPtr config)
  631. {
  632. return New<TClient>(
  633. std::move(clients),
  634. std::move(config));
  635. }
  636. ////////////////////////////////////////////////////////////////////////////////
  637. } // NYT::NClient::NFederated