client.cpp 49 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584
  1. #include "client.h"
  2. #include "batch_request_impl.h"
  3. #include "client_reader.h"
  4. #include "client_writer.h"
  5. #include "file_reader.h"
  6. #include "file_writer.h"
  7. #include "format_hints.h"
  8. #include "init.h"
  9. #include "lock.h"
  10. #include "operation.h"
  11. #include "retryful_writer.h"
  12. #include "transaction.h"
  13. #include "transaction_pinger.h"
  14. #include "yt_poller.h"
  15. #include <yt/cpp/mapreduce/common/helpers.h>
  16. #include <yt/cpp/mapreduce/common/retry_lib.h>
  17. #include <yt/cpp/mapreduce/http/helpers.h>
  18. #include <yt/cpp/mapreduce/http/http.h>
  19. #include <yt/cpp/mapreduce/http/http_client.h>
  20. #include <yt/cpp/mapreduce/http/requests.h>
  21. #include <yt/cpp/mapreduce/http/retry_request.h>
  22. #include <yt/cpp/mapreduce/interface/config.h>
  23. #include <yt/cpp/mapreduce/interface/client.h>
  24. #include <yt/cpp/mapreduce/interface/error_codes.h>
  25. #include <yt/cpp/mapreduce/interface/fluent.h>
  26. #include <yt/cpp/mapreduce/interface/logging/yt_log.h>
  27. #include <yt/cpp/mapreduce/interface/skiff_row.h>
  28. #include <yt/cpp/mapreduce/io/yamr_table_reader.h>
  29. #include <yt/cpp/mapreduce/io/yamr_table_writer.h>
  30. #include <yt/cpp/mapreduce/io/node_table_reader.h>
  31. #include <yt/cpp/mapreduce/io/node_table_writer.h>
  32. #include <yt/cpp/mapreduce/io/proto_table_reader.h>
  33. #include <yt/cpp/mapreduce/io/proto_table_writer.h>
  34. #include <yt/cpp/mapreduce/io/skiff_row_table_reader.h>
  35. #include <yt/cpp/mapreduce/io/proto_helpers.h>
  36. #include <yt/cpp/mapreduce/library/table_schema/protobuf.h>
  37. #include <yt/cpp/mapreduce/http_client/raw_client.h>
  38. #include <yt/cpp/mapreduce/http_client/raw_requests.h>
  39. #include <yt/yt/core/ytree/fluent.h>
  40. #include <library/cpp/json/json_reader.h>
  41. #include <util/generic/algorithm.h>
  42. #include <util/string/type.h>
  43. #include <util/system/env.h>
  44. namespace NYT {
  45. ////////////////////////////////////////////////////////////////////////////////
  46. namespace NDetail {
  47. ////////////////////////////////////////////////////////////////////////////////
  48. namespace {
  49. ////////////////////////////////////////////////////////////////////////////////
  50. THashMap<TString, TString> ParseProxyUrlAliasingRules(TString envConfig)
  51. {
  52. if (envConfig.empty()) {
  53. return {};
  54. }
  55. return NYTree::ConvertTo<THashMap<TString, TString>>(NYson::TYsonString(envConfig));
  56. }
  57. void ApplyProxyUrlAliasingRules(TString& url)
  58. {
  59. static auto rules = ParseProxyUrlAliasingRules(GetEnv("YT_PROXY_URL_ALIASING_CONFIG"));
  60. if (auto ruleIt = rules.find(url); ruleIt != rules.end()) {
  61. url = ruleIt->second;
  62. }
  63. }
  64. ////////////////////////////////////////////////////////////////////////////////
  65. } // namespace
  66. ////////////////////////////////////////////////////////////////////////////////
  67. TClientBase::TClientBase(
  68. IRawClientPtr rawClient,
  69. const TClientContext& context,
  70. const TTransactionId& transactionId,
  71. IClientRetryPolicyPtr retryPolicy)
  72. : RawClient_(std::move(rawClient))
  73. , Context_(context)
  74. , TransactionId_(transactionId)
  75. , ClientRetryPolicy_(std::move(retryPolicy))
  76. { }
  77. ITransactionPtr TClientBase::StartTransaction(
  78. const TStartTransactionOptions& options)
  79. {
  80. return MakeIntrusive<TTransaction>(RawClient_, GetParentClientImpl(), Context_, TransactionId_, options);
  81. }
  82. TNodeId TClientBase::Create(
  83. const TYPath& path,
  84. ENodeType type,
  85. const TCreateOptions& options)
  86. {
  87. return RequestWithRetry<TNodeId>(
  88. ClientRetryPolicy_->CreatePolicyForGenericRequest(),
  89. [this, &path, &type, &options] (TMutationId& mutationId) {
  90. return RawClient_->Create(mutationId, TransactionId_, path, type, options);
  91. });
  92. }
  93. void TClientBase::Remove(
  94. const TYPath& path,
  95. const TRemoveOptions& options)
  96. {
  97. RequestWithRetry<void>(
  98. ClientRetryPolicy_->CreatePolicyForGenericRequest(),
  99. [this, &path, &options] (TMutationId& mutationId) {
  100. RawClient_->Remove(mutationId, TransactionId_, path, options);
  101. });
  102. }
  103. bool TClientBase::Exists(
  104. const TYPath& path,
  105. const TExistsOptions& options)
  106. {
  107. return RequestWithRetry<bool>(
  108. ClientRetryPolicy_->CreatePolicyForGenericRequest(),
  109. [this, &path, &options] (TMutationId /*mutationId*/) {
  110. return RawClient_->Exists(TransactionId_, path, options);
  111. });
  112. }
  113. TNode TClientBase::Get(
  114. const TYPath& path,
  115. const TGetOptions& options)
  116. {
  117. return RequestWithRetry<TNode>(
  118. ClientRetryPolicy_->CreatePolicyForGenericRequest(),
  119. [this, &path, &options] (TMutationId /*mutationId*/) {
  120. return RawClient_->Get(TransactionId_, path, options);
  121. });
  122. }
  123. void TClientBase::Set(
  124. const TYPath& path,
  125. const TNode& value,
  126. const TSetOptions& options)
  127. {
  128. RequestWithRetry<void>(
  129. ClientRetryPolicy_->CreatePolicyForGenericRequest(),
  130. [this, &path, &value, &options] (TMutationId& mutationId) {
  131. RawClient_->Set(mutationId, TransactionId_, path, value, options);
  132. });
  133. }
  134. void TClientBase::MultisetAttributes(
  135. const TYPath& path,
  136. const TNode::TMapType& value,
  137. const TMultisetAttributesOptions& options)
  138. {
  139. RequestWithRetry<void>(
  140. ClientRetryPolicy_->CreatePolicyForGenericRequest(),
  141. [this, &path, &value, &options] (TMutationId& mutationId) {
  142. RawClient_->MultisetAttributes(mutationId, TransactionId_, path, value, options);
  143. });
  144. }
  145. TNode::TListType TClientBase::List(
  146. const TYPath& path,
  147. const TListOptions& options)
  148. {
  149. return RequestWithRetry<TNode::TListType>(
  150. ClientRetryPolicy_->CreatePolicyForGenericRequest(),
  151. [this, &path, &options] (TMutationId /*mutationId*/) {
  152. return RawClient_->List(TransactionId_, path, options);
  153. });
  154. }
  155. TNodeId TClientBase::Copy(
  156. const TYPath& sourcePath,
  157. const TYPath& destinationPath,
  158. const TCopyOptions& options)
  159. {
  160. try {
  161. return RequestWithRetry<TNodeId>(
  162. ClientRetryPolicy_->CreatePolicyForGenericRequest(),
  163. [this, &sourcePath, &destinationPath, &options] (TMutationId& mutationId) {
  164. return RawClient_->CopyInsideMasterCell(mutationId, TransactionId_, sourcePath, destinationPath, options);
  165. });
  166. } catch (const TErrorResponse& e) {
  167. if (e.GetError().ContainsErrorCode(NClusterErrorCodes::NObjectClient::CrossCellAdditionalPath)) {
  168. // Do transaction for cross cell copying.
  169. return RequestWithRetry<TNodeId>(
  170. ClientRetryPolicy_->CreatePolicyForGenericRequest(),
  171. [this, &sourcePath, &destinationPath, &options] (TMutationId /*mutationId*/) {
  172. auto transaction = StartTransaction(TStartTransactionOptions());
  173. auto nodeId = RawClient_->CopyWithoutRetries(transaction->GetId(), sourcePath, destinationPath, options);
  174. transaction->Commit();
  175. return nodeId;
  176. });
  177. } else {
  178. throw;
  179. }
  180. }
  181. }
  182. TNodeId TClientBase::Move(
  183. const TYPath& sourcePath,
  184. const TYPath& destinationPath,
  185. const TMoveOptions& options)
  186. {
  187. try {
  188. return RequestWithRetry<TNodeId>(
  189. ClientRetryPolicy_->CreatePolicyForGenericRequest(),
  190. [this, &sourcePath, &destinationPath, &options] (TMutationId& mutationId) {
  191. return RawClient_->MoveInsideMasterCell(mutationId, TransactionId_, sourcePath, destinationPath, options);
  192. });
  193. } catch (const TErrorResponse& e) {
  194. if (e.GetError().ContainsErrorCode(NClusterErrorCodes::NObjectClient::CrossCellAdditionalPath)) {
  195. // Do transaction for cross cell moving.
  196. return RequestWithRetry<TNodeId>(
  197. ClientRetryPolicy_->CreatePolicyForGenericRequest(),
  198. [this, &sourcePath, &destinationPath, &options] (TMutationId /*mutationId*/) {
  199. auto transaction = StartTransaction(TStartTransactionOptions());
  200. auto nodeId = RawClient_->MoveWithoutRetries(transaction->GetId(), sourcePath, destinationPath, options);
  201. transaction->Commit();
  202. return nodeId;
  203. });
  204. } else {
  205. throw;
  206. }
  207. }
  208. }
  209. TNodeId TClientBase::Link(
  210. const TYPath& targetPath,
  211. const TYPath& linkPath,
  212. const TLinkOptions& options)
  213. {
  214. return RequestWithRetry<TNodeId>(
  215. ClientRetryPolicy_->CreatePolicyForGenericRequest(),
  216. [this, &targetPath, &linkPath, &options] (TMutationId& mutationId) {
  217. return RawClient_->Link(mutationId, TransactionId_, targetPath, linkPath, options);
  218. });
  219. }
  220. void TClientBase::Concatenate(
  221. const TVector<TRichYPath>& sourcePaths,
  222. const TRichYPath& destinationPath,
  223. const TConcatenateOptions& options)
  224. {
  225. RequestWithRetry<void>(
  226. ClientRetryPolicy_->CreatePolicyForGenericRequest(),
  227. [this, &sourcePaths, &destinationPath, &options] (TMutationId /*mutationId*/) {
  228. auto transaction = StartTransaction(TStartTransactionOptions());
  229. if (!options.Append_ && !sourcePaths.empty() && !transaction->Exists(destinationPath.Path_)) {
  230. auto typeNode = transaction->Get(CanonizeYPath(sourcePaths.front()).Path_ + "/@type");
  231. auto type = FromString<ENodeType>(typeNode.AsString());
  232. transaction->Create(destinationPath.Path_, type, TCreateOptions().IgnoreExisting(true));
  233. }
  234. RawClient_->Concatenate(transaction->GetId(), sourcePaths, destinationPath, options);
  235. transaction->Commit();
  236. });
  237. }
  238. TRichYPath TClientBase::CanonizeYPath(const TRichYPath& path)
  239. {
  240. return NRawClient::CanonizeYPath(RawClient_, path);
  241. }
  242. TVector<TTableColumnarStatistics> TClientBase::GetTableColumnarStatistics(
  243. const TVector<TRichYPath>& paths,
  244. const TGetTableColumnarStatisticsOptions& options)
  245. {
  246. return RequestWithRetry<TVector<TTableColumnarStatistics>>(
  247. ClientRetryPolicy_->CreatePolicyForGenericRequest(),
  248. [this, &paths, &options] (TMutationId /*mutationId*/) {
  249. return RawClient_->GetTableColumnarStatistics(TransactionId_, paths, options);
  250. });
  251. }
  252. TMultiTablePartitions TClientBase::GetTablePartitions(
  253. const TVector<TRichYPath>& paths,
  254. const TGetTablePartitionsOptions& options)
  255. {
  256. return RequestWithRetry<TMultiTablePartitions>(
  257. ClientRetryPolicy_->CreatePolicyForGenericRequest(),
  258. [this, &paths, &options] (TMutationId /*mutationId*/) {
  259. return RawClient_->GetTablePartitions(TransactionId_, paths, options);
  260. });
  261. }
  262. TMaybe<TYPath> TClientBase::GetFileFromCache(
  263. const TString& md5Signature,
  264. const TYPath& cachePath,
  265. const TGetFileFromCacheOptions& options)
  266. {
  267. return RequestWithRetry<TMaybe<TYPath>>(
  268. ClientRetryPolicy_->CreatePolicyForGenericRequest(),
  269. [this, &md5Signature, &cachePath, &options] (TMutationId /*mutationId*/) {
  270. return RawClient_->GetFileFromCache(TransactionId_, md5Signature, cachePath, options);
  271. });
  272. }
  273. TYPath TClientBase::PutFileToCache(
  274. const TYPath& filePath,
  275. const TString& md5Signature,
  276. const TYPath& cachePath,
  277. const TPutFileToCacheOptions& options)
  278. {
  279. return RequestWithRetry<TYPath>(
  280. ClientRetryPolicy_->CreatePolicyForGenericRequest(),
  281. [this, &filePath, &md5Signature, &cachePath, &options] (TMutationId /*mutationId*/) {
  282. return RawClient_->PutFileToCache(TransactionId_, filePath, md5Signature, cachePath, options);
  283. });
  284. }
  285. IFileReaderPtr TClientBase::CreateBlobTableReader(
  286. const TYPath& path,
  287. const TKey& key,
  288. const TBlobTableReaderOptions& options)
  289. {
  290. return new TBlobTableReader(
  291. path,
  292. key,
  293. RawClient_,
  294. ClientRetryPolicy_,
  295. GetTransactionPinger(),
  296. Context_,
  297. TransactionId_,
  298. options);
  299. }
  300. IFileReaderPtr TClientBase::CreateFileReader(
  301. const TRichYPath& path,
  302. const TFileReaderOptions& options)
  303. {
  304. return new TFileReader(
  305. CanonizeYPath(path),
  306. RawClient_,
  307. ClientRetryPolicy_,
  308. GetTransactionPinger(),
  309. Context_,
  310. TransactionId_,
  311. options);
  312. }
  313. IFileWriterPtr TClientBase::CreateFileWriter(
  314. const TRichYPath& path,
  315. const TFileWriterOptions& options)
  316. {
  317. auto realPath = CanonizeYPath(path);
  318. auto exists = RequestWithRetry<bool>(
  319. ClientRetryPolicy_->CreatePolicyForGenericRequest(),
  320. [this, &realPath] (TMutationId /*mutationId*/) {
  321. return RawClient_->Exists(TransactionId_, realPath.Path_);
  322. });
  323. if (!exists) {
  324. RequestWithRetry<void>(
  325. ClientRetryPolicy_->CreatePolicyForGenericRequest(),
  326. [this, &realPath] (TMutationId& mutationId) {
  327. RawClient_->Create(mutationId, TransactionId_, realPath.Path_, NT_FILE, TCreateOptions().IgnoreExisting(true));
  328. });
  329. }
  330. return new TFileWriter(realPath, RawClient_, ClientRetryPolicy_, GetTransactionPinger(), Context_, TransactionId_, options);
  331. }
  332. TTableWriterPtr<::google::protobuf::Message> TClientBase::CreateTableWriter(
  333. const TRichYPath& path, const ::google::protobuf::Descriptor& descriptor, const TTableWriterOptions& options)
  334. {
  335. const Message* prototype = google::protobuf::MessageFactory::generated_factory()->GetPrototype(&descriptor);
  336. return new TTableWriter<::google::protobuf::Message>(CreateProtoWriter(path, options, prototype));
  337. }
  338. TRawTableReaderPtr TClientBase::CreateRawReader(
  339. const TRichYPath& path,
  340. const TFormat& format,
  341. const TTableReaderOptions& options)
  342. {
  343. return CreateClientReader(path, format, options).Get();
  344. }
  345. TRawTableWriterPtr TClientBase::CreateRawWriter(
  346. const TRichYPath& path,
  347. const TFormat& format,
  348. const TTableWriterOptions& options)
  349. {
  350. return ::MakeIntrusive<TRetryfulWriter>(
  351. RawClient_,
  352. ClientRetryPolicy_,
  353. GetTransactionPinger(),
  354. Context_,
  355. TransactionId_,
  356. GetWriteTableCommand(Context_.Config->ApiVersion),
  357. format,
  358. CanonizeYPath(path),
  359. options).Get();
  360. }
  361. IOperationPtr TClientBase::DoMap(
  362. const TMapOperationSpec& spec,
  363. ::TIntrusivePtr<IStructuredJob> mapper,
  364. const TOperationOptions& options)
  365. {
  366. auto operation = ::MakeIntrusive<TOperation>(GetParentClientImpl());
  367. auto prepareOperation = [
  368. this_ = ::TIntrusivePtr(this),
  369. operation,
  370. spec,
  371. mapper,
  372. options
  373. ] () {
  374. ExecuteMap(
  375. operation,
  376. ::MakeIntrusive<TOperationPreparer>(this_->GetParentClientImpl(), this_->TransactionId_),
  377. spec,
  378. mapper,
  379. options);
  380. };
  381. return ProcessOperation(GetParentClientImpl(), std::move(prepareOperation), std::move(operation), options);
  382. }
  383. IOperationPtr TClientBase::RawMap(
  384. const TRawMapOperationSpec& spec,
  385. ::TIntrusivePtr<IRawJob> mapper,
  386. const TOperationOptions& options)
  387. {
  388. auto operation = ::MakeIntrusive<TOperation>(GetParentClientImpl());
  389. auto prepareOperation = [
  390. this_=::TIntrusivePtr(this),
  391. operation,
  392. spec,
  393. mapper,
  394. options
  395. ] () {
  396. ExecuteRawMap(
  397. operation,
  398. ::MakeIntrusive<TOperationPreparer>(this_->GetParentClientImpl(), this_->TransactionId_),
  399. spec,
  400. mapper,
  401. options);
  402. };
  403. return ProcessOperation(GetParentClientImpl(), std::move(prepareOperation), std::move(operation), options);
  404. }
  405. IOperationPtr TClientBase::DoReduce(
  406. const TReduceOperationSpec& spec,
  407. ::TIntrusivePtr<IStructuredJob> reducer,
  408. const TOperationOptions& options)
  409. {
  410. auto operation = ::MakeIntrusive<TOperation>(GetParentClientImpl());
  411. auto prepareOperation = [
  412. this_=::TIntrusivePtr(this),
  413. operation,
  414. spec,
  415. reducer,
  416. options
  417. ] () {
  418. ExecuteReduce(
  419. operation,
  420. ::MakeIntrusive<TOperationPreparer>(this_->GetParentClientImpl(), this_->TransactionId_),
  421. spec,
  422. reducer,
  423. options);
  424. };
  425. return ProcessOperation(GetParentClientImpl(), std::move(prepareOperation), std::move(operation), options);
  426. }
  427. IOperationPtr TClientBase::RawReduce(
  428. const TRawReduceOperationSpec& spec,
  429. ::TIntrusivePtr<IRawJob> reducer,
  430. const TOperationOptions& options)
  431. {
  432. auto operation = ::MakeIntrusive<TOperation>(GetParentClientImpl());
  433. auto prepareOperation = [
  434. this_=::TIntrusivePtr(this),
  435. operation,
  436. spec,
  437. reducer,
  438. options
  439. ] () {
  440. ExecuteRawReduce(
  441. operation,
  442. ::MakeIntrusive<TOperationPreparer>(this_->GetParentClientImpl(), this_->TransactionId_),
  443. spec,
  444. reducer,
  445. options);
  446. };
  447. return ProcessOperation(GetParentClientImpl(), std::move(prepareOperation), std::move(operation), options);
  448. }
  449. IOperationPtr TClientBase::DoJoinReduce(
  450. const TJoinReduceOperationSpec& spec,
  451. ::TIntrusivePtr<IStructuredJob> reducer,
  452. const TOperationOptions& options)
  453. {
  454. auto operation = ::MakeIntrusive<TOperation>(GetParentClientImpl());
  455. auto prepareOperation = [
  456. this_=::TIntrusivePtr(this),
  457. operation,
  458. spec,
  459. reducer,
  460. options
  461. ] () {
  462. ExecuteJoinReduce(
  463. operation,
  464. ::MakeIntrusive<TOperationPreparer>(this_->GetParentClientImpl(), this_->TransactionId_),
  465. spec,
  466. reducer,
  467. options);
  468. };
  469. return ProcessOperation(GetParentClientImpl(), std::move(prepareOperation), std::move(operation), options);
  470. }
  471. IOperationPtr TClientBase::RawJoinReduce(
  472. const TRawJoinReduceOperationSpec& spec,
  473. ::TIntrusivePtr<IRawJob> reducer,
  474. const TOperationOptions& options)
  475. {
  476. auto operation = ::MakeIntrusive<TOperation>(GetParentClientImpl());
  477. auto prepareOperation = [
  478. this_=::TIntrusivePtr(this),
  479. operation,
  480. spec,
  481. reducer,
  482. options
  483. ] () {
  484. ExecuteRawJoinReduce(
  485. operation,
  486. ::MakeIntrusive<TOperationPreparer>(this_->GetParentClientImpl(), this_->TransactionId_),
  487. spec,
  488. reducer,
  489. options);
  490. };
  491. return ProcessOperation(GetParentClientImpl(), std::move(prepareOperation), std::move(operation), options);
  492. }
  493. IOperationPtr TClientBase::DoMapReduce(
  494. const TMapReduceOperationSpec& spec,
  495. ::TIntrusivePtr<IStructuredJob> mapper,
  496. ::TIntrusivePtr<IStructuredJob> reduceCombiner,
  497. ::TIntrusivePtr<IStructuredJob> reducer,
  498. const TOperationOptions& options)
  499. {
  500. auto operation = ::MakeIntrusive<TOperation>(GetParentClientImpl());
  501. auto prepareOperation = [
  502. this_=::TIntrusivePtr(this),
  503. operation,
  504. spec,
  505. mapper,
  506. reduceCombiner,
  507. reducer,
  508. options
  509. ] () {
  510. ExecuteMapReduce(
  511. operation,
  512. ::MakeIntrusive<TOperationPreparer>(this_->GetParentClientImpl(), this_->TransactionId_),
  513. spec,
  514. mapper,
  515. reduceCombiner,
  516. reducer,
  517. options);
  518. };
  519. return ProcessOperation(GetParentClientImpl(), std::move(prepareOperation), std::move(operation), options);
  520. }
  521. IOperationPtr TClientBase::RawMapReduce(
  522. const TRawMapReduceOperationSpec& spec,
  523. ::TIntrusivePtr<IRawJob> mapper,
  524. ::TIntrusivePtr<IRawJob> reduceCombiner,
  525. ::TIntrusivePtr<IRawJob> reducer,
  526. const TOperationOptions& options)
  527. {
  528. auto operation = ::MakeIntrusive<TOperation>(GetParentClientImpl());
  529. auto prepareOperation = [
  530. this_=::TIntrusivePtr(this),
  531. operation,
  532. spec,
  533. mapper,
  534. reduceCombiner,
  535. reducer,
  536. options
  537. ] () {
  538. ExecuteRawMapReduce(
  539. operation,
  540. ::MakeIntrusive<TOperationPreparer>(this_->GetParentClientImpl(), this_->TransactionId_),
  541. spec,
  542. mapper,
  543. reduceCombiner,
  544. reducer,
  545. options);
  546. };
  547. return ProcessOperation(GetParentClientImpl(), std::move(prepareOperation), std::move(operation), options);
  548. }
  549. IOperationPtr TClientBase::Sort(
  550. const TSortOperationSpec& spec,
  551. const TOperationOptions& options)
  552. {
  553. auto operation = ::MakeIntrusive<TOperation>(GetParentClientImpl());
  554. auto prepareOperation = [
  555. this_ = ::TIntrusivePtr(this),
  556. operation,
  557. spec,
  558. options
  559. ] () {
  560. ExecuteSort(
  561. operation,
  562. ::MakeIntrusive<TOperationPreparer>(this_->GetParentClientImpl(), this_->TransactionId_),
  563. spec,
  564. options);
  565. };
  566. return ProcessOperation(GetParentClientImpl(), std::move(prepareOperation), std::move(operation), options);
  567. }
  568. IOperationPtr TClientBase::Merge(
  569. const TMergeOperationSpec& spec,
  570. const TOperationOptions& options)
  571. {
  572. auto operation = ::MakeIntrusive<TOperation>(GetParentClientImpl());
  573. auto prepareOperation = [
  574. this_ = ::TIntrusivePtr(this),
  575. operation,
  576. spec,
  577. options
  578. ] () {
  579. ExecuteMerge(
  580. operation,
  581. ::MakeIntrusive<TOperationPreparer>(this_->GetParentClientImpl(), this_->TransactionId_),
  582. spec,
  583. options);
  584. };
  585. return ProcessOperation(GetParentClientImpl(), std::move(prepareOperation), std::move(operation), options);
  586. }
  587. IOperationPtr TClientBase::Erase(
  588. const TEraseOperationSpec& spec,
  589. const TOperationOptions& options)
  590. {
  591. auto operation = ::MakeIntrusive<TOperation>(GetParentClientImpl());
  592. auto prepareOperation = [
  593. this_ = ::TIntrusivePtr(this),
  594. operation,
  595. spec,
  596. options
  597. ] () {
  598. ExecuteErase(
  599. operation,
  600. ::MakeIntrusive<TOperationPreparer>(this_->GetParentClientImpl(), this_->TransactionId_),
  601. spec,
  602. options);
  603. };
  604. return ProcessOperation(GetParentClientImpl(), std::move(prepareOperation), std::move(operation), options);
  605. }
  606. IOperationPtr TClientBase::RemoteCopy(
  607. const TRemoteCopyOperationSpec& spec,
  608. const TOperationOptions& options)
  609. {
  610. auto operation = ::MakeIntrusive<TOperation>(GetParentClientImpl());
  611. auto prepareOperation = [
  612. this_ = ::TIntrusivePtr(this),
  613. operation,
  614. spec,
  615. options
  616. ] () {
  617. ExecuteRemoteCopy(
  618. operation,
  619. ::MakeIntrusive<TOperationPreparer>(this_->GetParentClientImpl(), this_->TransactionId_),
  620. spec,
  621. options);
  622. };
  623. return ProcessOperation(GetParentClientImpl(), std::move(prepareOperation), std::move(operation), options);
  624. }
  625. IOperationPtr TClientBase::RunVanilla(
  626. const TVanillaOperationSpec& spec,
  627. const TOperationOptions& options)
  628. {
  629. auto operation = ::MakeIntrusive<TOperation>(GetParentClientImpl());
  630. auto prepareOperation = [
  631. this_ = ::TIntrusivePtr(this),
  632. operation,
  633. spec,
  634. options
  635. ] () {
  636. ExecuteVanilla(
  637. operation,
  638. ::MakeIntrusive<TOperationPreparer>(this_->GetParentClientImpl(), this_->TransactionId_),
  639. spec,
  640. options);
  641. };
  642. return ProcessOperation(GetParentClientImpl(), std::move(prepareOperation), std::move(operation), options);
  643. }
  644. IOperationPtr TClientBase::AttachOperation(const TOperationId& operationId)
  645. {
  646. auto operation = ::MakeIntrusive<TOperation>(operationId, GetParentClientImpl());
  647. operation->GetBriefState(); // check that operation exists
  648. return operation;
  649. }
  650. EOperationBriefState TClientBase::CheckOperation(const TOperationId& operationId)
  651. {
  652. return NYT::NDetail::CheckOperation(RawClient_, ClientRetryPolicy_, operationId);
  653. }
  654. void TClientBase::AbortOperation(const TOperationId& operationId)
  655. {
  656. RequestWithRetry<void>(
  657. ClientRetryPolicy_->CreatePolicyForGenericRequest(),
  658. [this, &operationId] (TMutationId& mutationId) {
  659. RawClient_->AbortOperation(mutationId, operationId);
  660. });
  661. }
  662. void TClientBase::CompleteOperation(const TOperationId& operationId)
  663. {
  664. RequestWithRetry<void>(
  665. ClientRetryPolicy_->CreatePolicyForGenericRequest(),
  666. [this, &operationId] (TMutationId& mutationId) {
  667. RawClient_->CompleteOperation(mutationId, operationId);
  668. });
  669. }
  670. void TClientBase::WaitForOperation(const TOperationId& operationId)
  671. {
  672. NYT::NDetail::WaitForOperation(ClientRetryPolicy_, RawClient_, Context_, operationId);
  673. }
  674. void TClientBase::AlterTable(
  675. const TYPath& path,
  676. const TAlterTableOptions& options)
  677. {
  678. RequestWithRetry<void>(
  679. ClientRetryPolicy_->CreatePolicyForGenericRequest(),
  680. [this, &path, &options] (TMutationId& mutationId) {
  681. RawClient_->AlterTable(mutationId, TransactionId_, path, options);
  682. });
  683. }
  684. ::TIntrusivePtr<TClientReader> TClientBase::CreateClientReader(
  685. const TRichYPath& path,
  686. const TFormat& format,
  687. const TTableReaderOptions& options,
  688. bool useFormatFromTableAttributes)
  689. {
  690. return ::MakeIntrusive<TClientReader>(
  691. CanonizeYPath(path),
  692. RawClient_,
  693. ClientRetryPolicy_,
  694. GetTransactionPinger(),
  695. Context_,
  696. TransactionId_,
  697. format,
  698. options,
  699. useFormatFromTableAttributes);
  700. }
  701. THolder<TClientWriter> TClientBase::CreateClientWriter(
  702. const TRichYPath& path,
  703. const TFormat& format,
  704. const TTableWriterOptions& options)
  705. {
  706. auto realPath = CanonizeYPath(path);
  707. auto exists = RequestWithRetry<bool>(
  708. ClientRetryPolicy_->CreatePolicyForGenericRequest(),
  709. [this, &realPath] (TMutationId /*mutationId*/) {
  710. return RawClient_->Exists(TransactionId_, realPath.Path_);
  711. });
  712. if (!exists) {
  713. RequestWithRetry<void>(
  714. ClientRetryPolicy_->CreatePolicyForGenericRequest(),
  715. [this, &realPath] (TMutationId& mutataionId) {
  716. RawClient_->Create(mutataionId, TransactionId_, realPath.Path_, NT_TABLE, TCreateOptions().IgnoreExisting(true));
  717. });
  718. }
  719. return MakeHolder<TClientWriter>(
  720. realPath,
  721. RawClient_,
  722. ClientRetryPolicy_,
  723. GetTransactionPinger(),
  724. Context_,
  725. TransactionId_,
  726. format,
  727. options
  728. );
  729. }
  730. ::TIntrusivePtr<INodeReaderImpl> TClientBase::CreateNodeReader(
  731. const TRichYPath& path, const TTableReaderOptions& options)
  732. {
  733. auto format = TFormat::YsonBinary();
  734. ApplyFormatHints<TNode>(&format, options.FormatHints_);
  735. // Skiff is disabled here because of large header problem (see https://st.yandex-team.ru/YT-6926).
  736. // Revert this code to r3614168 when it is fixed.
  737. return new TNodeTableReader(
  738. CreateClientReader(path, format, options));
  739. }
  740. ::TIntrusivePtr<IYaMRReaderImpl> TClientBase::CreateYaMRReader(
  741. const TRichYPath& path, const TTableReaderOptions& options)
  742. {
  743. return new TYaMRTableReader(
  744. CreateClientReader(path, TFormat::YaMRLenval(), options, /* useFormatFromTableAttributes = */ true));
  745. }
  746. ::TIntrusivePtr<IProtoReaderImpl> TClientBase::CreateProtoReader(
  747. const TRichYPath& path,
  748. const TTableReaderOptions& options,
  749. const Message* prototype)
  750. {
  751. TVector<const ::google::protobuf::Descriptor*> descriptors;
  752. descriptors.push_back(prototype->GetDescriptor());
  753. if (Context_.Config->UseClientProtobuf) {
  754. return new TProtoTableReader(
  755. CreateClientReader(path, TFormat::YsonBinary(), options),
  756. std::move(descriptors));
  757. } else {
  758. auto format = TFormat::Protobuf({prototype->GetDescriptor()}, Context_.Config->ProtobufFormatWithDescriptors);
  759. return new TLenvalProtoTableReader(
  760. CreateClientReader(path, format, options),
  761. std::move(descriptors));
  762. }
  763. }
  764. ::TIntrusivePtr<ISkiffRowReaderImpl> TClientBase::CreateSkiffRowReader(
  765. const TRichYPath& path,
  766. const TTableReaderOptions& options,
  767. const ISkiffRowSkipperPtr& skipper,
  768. const NSkiff::TSkiffSchemaPtr& schema)
  769. {
  770. auto skiffOptions = TCreateSkiffSchemaOptions().HasRangeIndex(true);
  771. auto resultSchema = NYT::NDetail::CreateSkiffSchema(TVector{schema}, skiffOptions);
  772. return new TSkiffRowTableReader(
  773. CreateClientReader(path, NYT::NDetail::CreateSkiffFormat(resultSchema), options),
  774. resultSchema,
  775. {skipper},
  776. std::move(skiffOptions));
  777. }
  778. ::TIntrusivePtr<INodeWriterImpl> TClientBase::CreateNodeWriter(
  779. const TRichYPath& path, const TTableWriterOptions& options)
  780. {
  781. auto format = TFormat::YsonBinary();
  782. ApplyFormatHints<TNode>(&format, options.FormatHints_);
  783. return new TNodeTableWriter(
  784. CreateClientWriter(path, format, options));
  785. }
  786. ::TIntrusivePtr<IYaMRWriterImpl> TClientBase::CreateYaMRWriter(
  787. const TRichYPath& path, const TTableWriterOptions& options)
  788. {
  789. auto format = TFormat::YaMRLenval();
  790. ApplyFormatHints<TYaMRRow>(&format, options.FormatHints_);
  791. return new TYaMRTableWriter(
  792. CreateClientWriter(path, format, options));
  793. }
  794. ::TIntrusivePtr<IProtoWriterImpl> TClientBase::CreateProtoWriter(
  795. const TRichYPath& path,
  796. const TTableWriterOptions& options,
  797. const Message* prototype)
  798. {
  799. TVector<const ::google::protobuf::Descriptor*> descriptors;
  800. descriptors.push_back(prototype->GetDescriptor());
  801. auto pathWithSchema = path;
  802. if (options.InferSchema_.GetOrElse(Context_.Config->InferTableSchema) && !path.Schema_) {
  803. pathWithSchema.Schema(CreateTableSchema(*prototype->GetDescriptor()));
  804. }
  805. if (Context_.Config->UseClientProtobuf) {
  806. auto format = TFormat::YsonBinary();
  807. ApplyFormatHints<TNode>(&format, options.FormatHints_);
  808. return new TProtoTableWriter(
  809. CreateClientWriter(pathWithSchema, format, options),
  810. std::move(descriptors));
  811. } else {
  812. auto format = TFormat::Protobuf({prototype->GetDescriptor()}, Context_.Config->ProtobufFormatWithDescriptors);
  813. ApplyFormatHints<::google::protobuf::Message>(&format, options.FormatHints_);
  814. return new TLenvalProtoTableWriter(
  815. CreateClientWriter(pathWithSchema, format, options),
  816. std::move(descriptors));
  817. }
  818. }
  819. TBatchRequestPtr TClientBase::CreateBatchRequest()
  820. {
  821. return MakeIntrusive<TBatchRequest>(TransactionId_, GetParentClientImpl());
  822. }
  823. IClientPtr TClientBase::GetParentClient()
  824. {
  825. return GetParentClientImpl();
  826. }
  827. IRawClientPtr TClientBase::GetRawClient() const
  828. {
  829. return RawClient_;
  830. }
  831. const TClientContext& TClientBase::GetContext() const
  832. {
  833. return Context_;
  834. }
  835. const IClientRetryPolicyPtr& TClientBase::GetRetryPolicy() const
  836. {
  837. return ClientRetryPolicy_;
  838. }
  839. ////////////////////////////////////////////////////////////////////////////////
  840. TTransaction::TTransaction(
  841. const IRawClientPtr& rawClient,
  842. TClientPtr parentClient,
  843. const TClientContext& context,
  844. const TTransactionId& parentTransactionId,
  845. const TStartTransactionOptions& options)
  846. : TClientBase(rawClient, context, parentTransactionId, parentClient->GetRetryPolicy())
  847. , TransactionPinger_(parentClient->GetTransactionPinger())
  848. , PingableTx_(
  849. std::make_unique<TPingableTransaction>(
  850. rawClient,
  851. parentClient->GetRetryPolicy(),
  852. context,
  853. parentTransactionId,
  854. TransactionPinger_->GetChildTxPinger(),
  855. options))
  856. , ParentClient_(parentClient)
  857. {
  858. TransactionId_ = PingableTx_->GetId();
  859. }
  860. TTransaction::TTransaction(
  861. const IRawClientPtr& rawClient,
  862. TClientPtr parentClient,
  863. const TClientContext& context,
  864. const TTransactionId& transactionId,
  865. const TAttachTransactionOptions& options)
  866. : TClientBase(rawClient, context, transactionId, parentClient->GetRetryPolicy())
  867. , TransactionPinger_(parentClient->GetTransactionPinger())
  868. , PingableTx_(
  869. new TPingableTransaction(
  870. rawClient,
  871. parentClient->GetRetryPolicy(),
  872. context,
  873. transactionId,
  874. parentClient->GetTransactionPinger()->GetChildTxPinger(),
  875. options))
  876. , ParentClient_(parentClient)
  877. { }
  878. const TTransactionId& TTransaction::GetId() const
  879. {
  880. return TransactionId_;
  881. }
  882. ILockPtr TTransaction::Lock(
  883. const TYPath& path,
  884. ELockMode mode,
  885. const TLockOptions& options)
  886. {
  887. auto lockId = RequestWithRetry<TLockId>(
  888. ClientRetryPolicy_->CreatePolicyForGenericRequest(),
  889. [this, &path, &mode, &options] (TMutationId& mutationId) {
  890. return RawClient_->Lock(mutationId, TransactionId_, path, mode, options);
  891. });
  892. return ::MakeIntrusive<TLock>(lockId, GetParentClientImpl(), options.Waitable_);
  893. }
  894. void TTransaction::Unlock(
  895. const TYPath& path,
  896. const TUnlockOptions& options)
  897. {
  898. RequestWithRetry<void>(
  899. ClientRetryPolicy_->CreatePolicyForGenericRequest(),
  900. [this, &path, &options] (TMutationId& mutationId) {
  901. RawClient_->Unlock(mutationId, TransactionId_, path, options);
  902. });
  903. }
  904. void TTransaction::Commit()
  905. {
  906. PingableTx_->Commit();
  907. }
  908. void TTransaction::Abort()
  909. {
  910. PingableTx_->Abort();
  911. }
  912. void TTransaction::Ping()
  913. {
  914. RequestWithRetry<void>(
  915. ClientRetryPolicy_->CreatePolicyForGenericRequest(),
  916. [this] (TMutationId /*mutationId*/) {
  917. RawClient_->PingTransaction(TransactionId_);
  918. });
  919. }
  920. void TTransaction::Detach()
  921. {
  922. PingableTx_->Detach();
  923. }
  924. ITransactionPingerPtr TTransaction::GetTransactionPinger()
  925. {
  926. return TransactionPinger_;
  927. }
  928. TClientPtr TTransaction::GetParentClientImpl()
  929. {
  930. return ParentClient_;
  931. }
  932. ////////////////////////////////////////////////////////////////////////////////
  933. TClient::TClient(
  934. IRawClientPtr rawClient,
  935. const TClientContext& context,
  936. const TTransactionId& globalId,
  937. IClientRetryPolicyPtr retryPolicy)
  938. : TClientBase(std::move(rawClient), context, globalId, retryPolicy)
  939. , TransactionPinger_(nullptr)
  940. { }
  941. TClient::~TClient() = default;
  942. ITransactionPtr TClient::AttachTransaction(
  943. const TTransactionId& transactionId,
  944. const TAttachTransactionOptions& options)
  945. {
  946. CheckShutdown();
  947. return MakeIntrusive<TTransaction>(RawClient_, this, Context_, transactionId, options);
  948. }
  949. void TClient::MountTable(
  950. const TYPath& path,
  951. const TMountTableOptions& options)
  952. {
  953. CheckShutdown();
  954. RequestWithRetry<void>(
  955. ClientRetryPolicy_->CreatePolicyForGenericRequest(),
  956. [this, &path, &options] (TMutationId& mutationId) {
  957. RawClient_->MountTable(mutationId, path, options);
  958. });
  959. }
  960. void TClient::UnmountTable(
  961. const TYPath& path,
  962. const TUnmountTableOptions& options)
  963. {
  964. CheckShutdown();
  965. RequestWithRetry<void>(
  966. ClientRetryPolicy_->CreatePolicyForGenericRequest(),
  967. [this, &path, &options] (TMutationId& mutationId) {
  968. RawClient_->UnmountTable(mutationId, path, options);
  969. });
  970. }
  971. void TClient::RemountTable(
  972. const TYPath& path,
  973. const TRemountTableOptions& options)
  974. {
  975. CheckShutdown();
  976. RequestWithRetry<void>(
  977. ClientRetryPolicy_->CreatePolicyForGenericRequest(),
  978. [this, &path, &options] (TMutationId& mutationId) {
  979. RawClient_->RemountTable(mutationId, path, options);
  980. });
  981. }
  982. void TClient::FreezeTable(
  983. const TYPath& path,
  984. const TFreezeTableOptions& options)
  985. {
  986. CheckShutdown();
  987. RequestWithRetry<void>(
  988. ClientRetryPolicy_->CreatePolicyForGenericRequest(),
  989. [this, &path, &options] (TMutationId /*mutationId*/) {
  990. RawClient_->FreezeTable(path, options);
  991. });
  992. }
  993. void TClient::UnfreezeTable(
  994. const TYPath& path,
  995. const TUnfreezeTableOptions& options)
  996. {
  997. CheckShutdown();
  998. RequestWithRetry<void>(
  999. ClientRetryPolicy_->CreatePolicyForGenericRequest(),
  1000. [this, &path, &options] (TMutationId /*mutationId*/) {
  1001. RawClient_->UnfreezeTable(path, options);
  1002. });
  1003. }
  1004. void TClient::ReshardTable(
  1005. const TYPath& path,
  1006. const TVector<TKey>& keys,
  1007. const TReshardTableOptions& options)
  1008. {
  1009. CheckShutdown();
  1010. RequestWithRetry<void>(
  1011. ClientRetryPolicy_->CreatePolicyForGenericRequest(),
  1012. [this, &path, &keys, &options] (TMutationId& mutationId) {
  1013. RawClient_->ReshardTableByPivotKeys(mutationId, path, keys, options);
  1014. });
  1015. }
  1016. void TClient::ReshardTable(
  1017. const TYPath& path,
  1018. i64 tabletCount,
  1019. const TReshardTableOptions& options)
  1020. {
  1021. CheckShutdown();
  1022. RequestWithRetry<void>(
  1023. ClientRetryPolicy_->CreatePolicyForGenericRequest(),
  1024. [this, &path, tabletCount, &options] (TMutationId& mutationId) {
  1025. RawClient_->ReshardTableByTabletCount(mutationId, path, tabletCount, options);
  1026. });
  1027. }
  1028. void TClient::InsertRows(
  1029. const TYPath& path,
  1030. const TNode::TListType& rows,
  1031. const TInsertRowsOptions& options)
  1032. {
  1033. CheckShutdown();
  1034. RequestWithRetry<void>(
  1035. ClientRetryPolicy_->CreatePolicyForGenericRequest(),
  1036. [this, &path, &rows, &options] (TMutationId /*mutationId*/) {
  1037. RawClient_->InsertRows(path, rows, options);
  1038. });
  1039. }
  1040. void TClient::DeleteRows(
  1041. const TYPath& path,
  1042. const TNode::TListType& keys,
  1043. const TDeleteRowsOptions& options)
  1044. {
  1045. CheckShutdown();
  1046. RequestWithRetry<void>(
  1047. ClientRetryPolicy_->CreatePolicyForGenericRequest(),
  1048. [this, &path, &keys, &options] (TMutationId /*mutationId*/) {
  1049. RawClient_->DeleteRows(path, keys, options);
  1050. });
  1051. }
  1052. void TClient::TrimRows(
  1053. const TYPath& path,
  1054. i64 tabletIndex,
  1055. i64 rowCount,
  1056. const TTrimRowsOptions& options)
  1057. {
  1058. CheckShutdown();
  1059. RequestWithRetry<void>(
  1060. ClientRetryPolicy_->CreatePolicyForGenericRequest(),
  1061. [this, &path, tabletIndex, rowCount, &options] (TMutationId /*mutationId*/) {
  1062. RawClient_->TrimRows(path, tabletIndex, rowCount, options);
  1063. });
  1064. }
  1065. TNode::TListType TClient::LookupRows(
  1066. const TYPath& path,
  1067. const TNode::TListType& keys,
  1068. const TLookupRowsOptions& options)
  1069. {
  1070. CheckShutdown();
  1071. return RequestWithRetry<TNode::TListType>(
  1072. ClientRetryPolicy_->CreatePolicyForGenericRequest(),
  1073. [this, &path, &keys, &options] (TMutationId /*mutationId*/) {
  1074. return RawClient_->LookupRows(path, keys, options);
  1075. });
  1076. }
  1077. TNode::TListType TClient::SelectRows(
  1078. const TString& query,
  1079. const TSelectRowsOptions& options)
  1080. {
  1081. CheckShutdown();
  1082. return RequestWithRetry<TNode::TListType>(
  1083. ClientRetryPolicy_->CreatePolicyForGenericRequest(),
  1084. [this, &query, &options] (TMutationId /*mutationId*/) {
  1085. return RawClient_->SelectRows(query, options);
  1086. });
  1087. }
  1088. void TClient::AlterTableReplica(const TReplicaId& replicaId, const TAlterTableReplicaOptions& options)
  1089. {
  1090. CheckShutdown();
  1091. RequestWithRetry<void>(
  1092. ClientRetryPolicy_->CreatePolicyForGenericRequest(),
  1093. [this, &replicaId, &options] (TMutationId& mutationId) {
  1094. RawClient_->AlterTableReplica(mutationId, replicaId, options);
  1095. });
  1096. }
  1097. ui64 TClient::GenerateTimestamp()
  1098. {
  1099. CheckShutdown();
  1100. return RequestWithRetry<ui64>(
  1101. ClientRetryPolicy_->CreatePolicyForGenericRequest(),
  1102. [this] (TMutationId /*mutationId*/) {
  1103. return RawClient_->GenerateTimestamp();
  1104. });
  1105. }
  1106. TAuthorizationInfo TClient::WhoAmI()
  1107. {
  1108. CheckShutdown();
  1109. return RequestWithRetry<TAuthorizationInfo>(
  1110. ClientRetryPolicy_->CreatePolicyForGenericRequest(),
  1111. [this] (TMutationId /*mutationId*/) {
  1112. return NRawClient::WhoAmI(Context_);
  1113. });
  1114. }
  1115. TOperationAttributes TClient::GetOperation(
  1116. const TOperationId& operationId,
  1117. const TGetOperationOptions& options)
  1118. {
  1119. CheckShutdown();
  1120. return RequestWithRetry<TOperationAttributes>(
  1121. ClientRetryPolicy_->CreatePolicyForGenericRequest(),
  1122. [this, &operationId, &options] (TMutationId /*mutationId*/) {
  1123. return RawClient_->GetOperation(operationId, options);
  1124. });
  1125. }
  1126. TOperationAttributes TClient::GetOperation(
  1127. const TString& alias,
  1128. const TGetOperationOptions& options)
  1129. {
  1130. CheckShutdown();
  1131. return RequestWithRetry<TOperationAttributes>(
  1132. ClientRetryPolicy_->CreatePolicyForGenericRequest(),
  1133. [this, &alias, &options] (TMutationId /*mutationId*/) {
  1134. return RawClient_->GetOperation(alias, options);
  1135. });
  1136. }
  1137. TListOperationsResult TClient::ListOperations(const TListOperationsOptions& options)
  1138. {
  1139. CheckShutdown();
  1140. return RequestWithRetry<TListOperationsResult>(
  1141. ClientRetryPolicy_->CreatePolicyForGenericRequest(),
  1142. [this, &options] (TMutationId /*mutationId*/) {
  1143. return RawClient_->ListOperations(options);
  1144. });
  1145. }
  1146. void TClient::UpdateOperationParameters(
  1147. const TOperationId& operationId,
  1148. const TUpdateOperationParametersOptions& options)
  1149. {
  1150. CheckShutdown();
  1151. RequestWithRetry<void>(
  1152. ClientRetryPolicy_->CreatePolicyForGenericRequest(),
  1153. [this, &operationId, &options] (TMutationId /*mutationId*/) {
  1154. RawClient_->UpdateOperationParameters(operationId, options);
  1155. });
  1156. }
  1157. TJobAttributes TClient::GetJob(
  1158. const TOperationId& operationId,
  1159. const TJobId& jobId,
  1160. const TGetJobOptions& options)
  1161. {
  1162. CheckShutdown();
  1163. auto result = RequestWithRetry<NYson::TYsonString>(
  1164. ClientRetryPolicy_->CreatePolicyForGenericRequest(),
  1165. [this, &operationId, &jobId, &options] (TMutationId /*mutationId*/) {
  1166. return RawClient_->GetJob(operationId, jobId, options);
  1167. });
  1168. return NRawClient::ParseJobAttributes(NodeFromYsonString(result.AsStringBuf()));
  1169. }
  1170. TListJobsResult TClient::ListJobs(
  1171. const TOperationId& operationId,
  1172. const TListJobsOptions& options)
  1173. {
  1174. CheckShutdown();
  1175. return RequestWithRetry<TListJobsResult>(
  1176. ClientRetryPolicy_->CreatePolicyForGenericRequest(),
  1177. [this, &operationId, &options] (TMutationId /*mutationId*/) {
  1178. return RawClient_->ListJobs(operationId, options);
  1179. });
  1180. }
  1181. IFileReaderPtr TClient::GetJobInput(
  1182. const TJobId& jobId,
  1183. const TGetJobInputOptions& options)
  1184. {
  1185. CheckShutdown();
  1186. return RawClient_->GetJobInput(jobId, options);
  1187. }
  1188. IFileReaderPtr TClient::GetJobFailContext(
  1189. const TOperationId& operationId,
  1190. const TJobId& jobId,
  1191. const TGetJobFailContextOptions& options)
  1192. {
  1193. CheckShutdown();
  1194. return RawClient_->GetJobFailContext(operationId, jobId, options);
  1195. }
  1196. IFileReaderPtr TClient::GetJobStderr(
  1197. const TOperationId& operationId,
  1198. const TJobId& jobId,
  1199. const TGetJobStderrOptions& options)
  1200. {
  1201. CheckShutdown();
  1202. return RawClient_->GetJobStderr(operationId, jobId, options);
  1203. }
  1204. std::vector<TJobTraceEvent> TClient::GetJobTrace(
  1205. const TOperationId& operationId,
  1206. const TGetJobTraceOptions& options)
  1207. {
  1208. CheckShutdown();
  1209. return RequestWithRetry<std::vector<TJobTraceEvent>>(
  1210. ClientRetryPolicy_->CreatePolicyForGenericRequest(),
  1211. [this, &operationId, &options] (TMutationId /*mutationId*/) {
  1212. return RawClient_->GetJobTrace(operationId, options);
  1213. });
  1214. }
  1215. TNode::TListType TClient::SkyShareTable(
  1216. const std::vector<TYPath>& tablePaths,
  1217. const TSkyShareTableOptions& options)
  1218. {
  1219. CheckShutdown();
  1220. // As documented at https://wiki.yandex-team.ru/yt/userdoc/blob_tables/#shag3.sozdajomrazdachu
  1221. // first request returns HTTP status code 202 (Accepted). And we need retrying until we have 200 (OK).
  1222. NHttpClient::IHttpResponsePtr response;
  1223. do {
  1224. response = RequestWithRetry<NHttpClient::IHttpResponsePtr>(
  1225. ClientRetryPolicy_->CreatePolicyForGenericRequest(),
  1226. [this, &tablePaths, &options] (TMutationId /*mutationId*/) {
  1227. return NRawClient::SkyShareTable(Context_, tablePaths, options);
  1228. });
  1229. TWaitProxy::Get()->Sleep(TDuration::Seconds(5));
  1230. } while (response->GetStatusCode() != 200);
  1231. if (options.KeyColumns_) {
  1232. return NodeFromJsonString(response->GetResponse())["torrents"].AsList();
  1233. } else {
  1234. TNode torrent;
  1235. torrent["key"] = TNode::CreateList();
  1236. torrent["rbtorrent"] = response->GetResponse();
  1237. return TNode::TListType{torrent};
  1238. }
  1239. }
  1240. TCheckPermissionResponse TClient::CheckPermission(
  1241. const TString& user,
  1242. EPermission permission,
  1243. const TYPath& path,
  1244. const TCheckPermissionOptions& options)
  1245. {
  1246. CheckShutdown();
  1247. return RequestWithRetry<TCheckPermissionResponse>(
  1248. ClientRetryPolicy_->CreatePolicyForGenericRequest(),
  1249. [this, &user, &permission, &path, &options] (TMutationId /*mutationId*/) {
  1250. return RawClient_->CheckPermission(user, permission, path, options);
  1251. });
  1252. }
  1253. TVector<TTabletInfo> TClient::GetTabletInfos(
  1254. const TYPath& path,
  1255. const TVector<int>& tabletIndexes,
  1256. const TGetTabletInfosOptions& options)
  1257. {
  1258. CheckShutdown();
  1259. return RequestWithRetry<TVector<TTabletInfo>>(
  1260. ClientRetryPolicy_->CreatePolicyForGenericRequest(),
  1261. [this, &path, &tabletIndexes, &options] (TMutationId /*mutationId*/) {
  1262. return RawClient_->GetTabletInfos(path, tabletIndexes, options);
  1263. });
  1264. }
  1265. void TClient::SuspendOperation(
  1266. const TOperationId& operationId,
  1267. const TSuspendOperationOptions& options)
  1268. {
  1269. CheckShutdown();
  1270. RequestWithRetry<void>(
  1271. ClientRetryPolicy_->CreatePolicyForGenericRequest(),
  1272. [this, &operationId, &options] (TMutationId& mutationId) {
  1273. RawClient_->SuspendOperation(mutationId, operationId, options);
  1274. });
  1275. }
  1276. void TClient::ResumeOperation(
  1277. const TOperationId& operationId,
  1278. const TResumeOperationOptions& options)
  1279. {
  1280. CheckShutdown();
  1281. RequestWithRetry<void>(
  1282. ClientRetryPolicy_->CreatePolicyForGenericRequest(),
  1283. [this, &operationId, &options] (TMutationId& mutationId) {
  1284. RawClient_->ResumeOperation(mutationId, operationId, options);
  1285. });
  1286. }
  1287. TYtPoller& TClient::GetYtPoller()
  1288. {
  1289. auto g = Guard(Lock_);
  1290. if (!YtPoller_) {
  1291. CheckShutdown();
  1292. // We don't use current client and create new client because YtPoller_ might use
  1293. // this client during current client shutdown.
  1294. // That might lead to incrementing of current client refcount and double delete of current client object.
  1295. YtPoller_ = std::make_unique<TYtPoller>(Context_, ClientRetryPolicy_);
  1296. }
  1297. return *YtPoller_;
  1298. }
  1299. void TClient::Shutdown()
  1300. {
  1301. auto g = Guard(Lock_);
  1302. if (!Shutdown_.exchange(true) && YtPoller_) {
  1303. YtPoller_->Stop();
  1304. }
  1305. }
  1306. ITransactionPingerPtr TClient::GetTransactionPinger()
  1307. {
  1308. auto g = Guard(Lock_);
  1309. if (!TransactionPinger_) {
  1310. TransactionPinger_ = CreateTransactionPinger(Context_.Config);
  1311. }
  1312. return TransactionPinger_;
  1313. }
  1314. TClientPtr TClient::GetParentClientImpl()
  1315. {
  1316. return this;
  1317. }
  1318. void TClient::CheckShutdown() const
  1319. {
  1320. if (Shutdown_) {
  1321. ythrow TApiUsageError() << "Call client's methods after shutdown";
  1322. }
  1323. }
  1324. TClientPtr CreateClientImpl(
  1325. const TString& serverName,
  1326. const TCreateClientOptions& options)
  1327. {
  1328. TClientContext context;
  1329. context.Config = options.Config_ ? options.Config_ : TConfig::Get();
  1330. context.TvmOnly = options.TvmOnly_;
  1331. context.ProxyAddress = options.ProxyAddress_;
  1332. context.ServerName = serverName;
  1333. ApplyProxyUrlAliasingRules(context.ServerName);
  1334. if (context.ServerName.find('.') == TString::npos &&
  1335. context.ServerName.find(':') == TString::npos &&
  1336. context.ServerName.find("localhost") == TString::npos)
  1337. {
  1338. context.ServerName += ".yt.yandex.net";
  1339. }
  1340. static constexpr char httpUrlSchema[] = "http://";
  1341. static constexpr char httpsUrlSchema[] = "https://";
  1342. if (options.UseTLS_) {
  1343. context.UseTLS = *options.UseTLS_;
  1344. } else {
  1345. context.UseTLS = context.ServerName.StartsWith(httpsUrlSchema);
  1346. }
  1347. if (context.ServerName.StartsWith(httpUrlSchema)) {
  1348. if (context.UseTLS) {
  1349. ythrow TApiUsageError() << "URL schema doesn't match UseTLS option";
  1350. }
  1351. context.ServerName.erase(0, sizeof(httpUrlSchema) - 1);
  1352. }
  1353. if (context.ServerName.StartsWith(httpsUrlSchema)) {
  1354. if (!context.UseTLS) {
  1355. ythrow TApiUsageError() << "URL schema doesn't match UseTLS option";
  1356. }
  1357. context.ServerName.erase(0, sizeof(httpsUrlSchema) - 1);
  1358. }
  1359. if (context.ServerName.find(':') == TString::npos) {
  1360. context.ServerName = CreateHostNameWithPort(context.ServerName, context);
  1361. }
  1362. if (options.TvmOnly_) {
  1363. context.ServerName = Format("tvm.%v", context.ServerName);
  1364. }
  1365. if (context.UseTLS || options.UseCoreHttpClient_) {
  1366. context.HttpClient = NHttpClient::CreateCoreHttpClient(context.UseTLS, context.Config);
  1367. } else {
  1368. context.HttpClient = NHttpClient::CreateDefaultHttpClient();
  1369. }
  1370. context.Token = context.Config->Token;
  1371. if (options.Token_) {
  1372. context.Token = options.Token_;
  1373. } else if (options.TokenPath_) {
  1374. context.Token = TConfig::LoadTokenFromFile(options.TokenPath_);
  1375. } else if (options.ServiceTicketAuth_) {
  1376. context.ServiceTicketAuth = options.ServiceTicketAuth_;
  1377. }
  1378. context.ImpersonationUser = options.ImpersonationUser_;
  1379. if (context.Token) {
  1380. TConfig::ValidateToken(context.Token);
  1381. }
  1382. auto globalTxId = GetGuid(context.Config->GlobalTxId);
  1383. auto retryConfigProvider = options.RetryConfigProvider_;
  1384. if (!retryConfigProvider) {
  1385. retryConfigProvider = CreateDefaultRetryConfigProvider();
  1386. }
  1387. auto rawClient = MakeIntrusive<THttpRawClient>(context);
  1388. EnsureInitialized();
  1389. return new TClient(
  1390. std::move(rawClient),
  1391. context,
  1392. globalTxId,
  1393. CreateDefaultClientRetryPolicy(retryConfigProvider, context.Config));
  1394. }
  1395. ////////////////////////////////////////////////////////////////////////////////
  1396. } // namespace NDetail
  1397. ////////////////////////////////////////////////////////////////////////////////
  1398. IClientPtr CreateClient(
  1399. const TString& serverName,
  1400. const TCreateClientOptions& options)
  1401. {
  1402. return NDetail::CreateClientImpl(serverName, options);
  1403. }
  1404. IClientPtr CreateClientFromEnv(const TCreateClientOptions& options)
  1405. {
  1406. auto serverName = GetEnv("YT_PROXY");
  1407. if (!serverName) {
  1408. ythrow yexception() << "YT_PROXY is not set";
  1409. }
  1410. return CreateClient(serverName, options);
  1411. }
  1412. ////////////////////////////////////////////////////////////////////////////////
  1413. } // namespace NYT