client.cpp 43 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362
  1. #include "client.h"
  2. #include "batch_request_impl.h"
  3. #include "client_reader.h"
  4. #include "client_writer.h"
  5. #include "file_reader.h"
  6. #include "file_writer.h"
  7. #include "format_hints.h"
  8. #include "lock.h"
  9. #include "operation.h"
  10. #include "retry_transaction.h"
  11. #include "retryful_writer.h"
  12. #include "transaction.h"
  13. #include "transaction_pinger.h"
  14. #include "yt_poller.h"
  15. #include <yt/cpp/mapreduce/common/helpers.h>
  16. #include <yt/cpp/mapreduce/common/retry_lib.h>
  17. #include <yt/cpp/mapreduce/http/helpers.h>
  18. #include <yt/cpp/mapreduce/http/http.h>
  19. #include <yt/cpp/mapreduce/http/http_client.h>
  20. #include <yt/cpp/mapreduce/http/requests.h>
  21. #include <yt/cpp/mapreduce/http/retry_request.h>
  22. #include <yt/cpp/mapreduce/interface/config.h>
  23. #include <yt/cpp/mapreduce/interface/client.h>
  24. #include <yt/cpp/mapreduce/interface/fluent.h>
  25. #include <yt/cpp/mapreduce/interface/logging/yt_log.h>
  26. #include <yt/cpp/mapreduce/interface/skiff_row.h>
  27. #include <yt/cpp/mapreduce/io/yamr_table_reader.h>
  28. #include <yt/cpp/mapreduce/io/yamr_table_writer.h>
  29. #include <yt/cpp/mapreduce/io/node_table_reader.h>
  30. #include <yt/cpp/mapreduce/io/node_table_writer.h>
  31. #include <yt/cpp/mapreduce/io/proto_table_reader.h>
  32. #include <yt/cpp/mapreduce/io/proto_table_writer.h>
  33. #include <yt/cpp/mapreduce/io/skiff_row_table_reader.h>
  34. #include <yt/cpp/mapreduce/io/proto_helpers.h>
  35. #include <yt/cpp/mapreduce/library/table_schema/protobuf.h>
  36. #include <yt/cpp/mapreduce/raw_client/raw_requests.h>
  37. #include <yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.h>
  38. #include <library/cpp/json/json_reader.h>
  39. #include <util/generic/algorithm.h>
  40. #include <util/string/type.h>
  41. #include <util/system/env.h>
  42. #include <exception>
  43. using namespace NYT::NDetail::NRawClient;
  44. namespace NYT {
  45. ////////////////////////////////////////////////////////////////////////////////
  46. namespace NDetail {
  47. ////////////////////////////////////////////////////////////////////////////////
  48. TClientBase::TClientBase(
  49. const TClientContext& context,
  50. const TTransactionId& transactionId,
  51. IClientRetryPolicyPtr retryPolicy)
  52. : Context_(context)
  53. , TransactionId_(transactionId)
  54. , ClientRetryPolicy_(std::move(retryPolicy))
  55. { }
  56. ITransactionPtr TClientBase::StartTransaction(
  57. const TStartTransactionOptions& options)
  58. {
  59. return MakeIntrusive<TTransaction>(GetParentClientImpl(), Context_, TransactionId_, options);
  60. }
  61. TNodeId TClientBase::Create(
  62. const TYPath& path,
  63. ENodeType type,
  64. const TCreateOptions& options)
  65. {
  66. return NRawClient::Create(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, TransactionId_, path, type, options);
  67. }
  68. void TClientBase::Remove(
  69. const TYPath& path,
  70. const TRemoveOptions& options)
  71. {
  72. return NRawClient::Remove(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, TransactionId_, path, options);
  73. }
  74. bool TClientBase::Exists(
  75. const TYPath& path,
  76. const TExistsOptions& options)
  77. {
  78. return NRawClient::Exists(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, TransactionId_, path, options);
  79. }
  80. TNode TClientBase::Get(
  81. const TYPath& path,
  82. const TGetOptions& options)
  83. {
  84. return NRawClient::Get(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, TransactionId_, path, options);
  85. }
  86. void TClientBase::Set(
  87. const TYPath& path,
  88. const TNode& value,
  89. const TSetOptions& options)
  90. {
  91. NRawClient::Set(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, TransactionId_, path, value, options);
  92. }
  93. void TClientBase::MultisetAttributes(
  94. const TYPath& path, const TNode::TMapType& value, const TMultisetAttributesOptions& options)
  95. {
  96. NRawClient::MultisetAttributes(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, TransactionId_, path, value, options);
  97. }
  98. TNode::TListType TClientBase::List(
  99. const TYPath& path,
  100. const TListOptions& options)
  101. {
  102. return NRawClient::List(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, TransactionId_, path, options);
  103. }
  104. TNodeId TClientBase::Copy(
  105. const TYPath& sourcePath,
  106. const TYPath& destinationPath,
  107. const TCopyOptions& options)
  108. {
  109. return NRawClient::Copy(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, TransactionId_, sourcePath, destinationPath, options);
  110. }
  111. TNodeId TClientBase::Move(
  112. const TYPath& sourcePath,
  113. const TYPath& destinationPath,
  114. const TMoveOptions& options)
  115. {
  116. return NRawClient::Move(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, TransactionId_, sourcePath, destinationPath, options);
  117. }
  118. TNodeId TClientBase::Link(
  119. const TYPath& targetPath,
  120. const TYPath& linkPath,
  121. const TLinkOptions& options)
  122. {
  123. return NRawClient::Link(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, TransactionId_, targetPath, linkPath, options);
  124. }
  125. void TClientBase::Concatenate(
  126. const TVector<TRichYPath>& sourcePaths,
  127. const TRichYPath& destinationPath,
  128. const TConcatenateOptions& options)
  129. {
  130. std::function<void(ITransactionPtr)> lambda = [&sourcePaths, &destinationPath, &options, this](ITransactionPtr transaction) {
  131. if (!options.Append_ && !sourcePaths.empty() && !transaction->Exists(destinationPath.Path_)) {
  132. auto typeNode = transaction->Get(CanonizeYPath(sourcePaths.front()).Path_ + "/@type");
  133. auto type = FromString<ENodeType>(typeNode.AsString());
  134. transaction->Create(destinationPath.Path_, type, TCreateOptions().IgnoreExisting(true));
  135. }
  136. NRawClient::Concatenate(this->Context_, transaction->GetId(), sourcePaths, destinationPath, options);
  137. };
  138. RetryTransactionWithPolicy(this, lambda, ClientRetryPolicy_->CreatePolicyForGenericRequest());
  139. }
  140. TRichYPath TClientBase::CanonizeYPath(const TRichYPath& path)
  141. {
  142. return NRawClient::CanonizeYPath(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, path);
  143. }
  144. TVector<TTableColumnarStatistics> TClientBase::GetTableColumnarStatistics(
  145. const TVector<TRichYPath>& paths,
  146. const TGetTableColumnarStatisticsOptions& options)
  147. {
  148. return NRawClient::GetTableColumnarStatistics(
  149. ClientRetryPolicy_->CreatePolicyForGenericRequest(),
  150. Context_,
  151. TransactionId_,
  152. paths,
  153. options);
  154. }
  155. TMultiTablePartitions TClientBase::GetTablePartitions(
  156. const TVector<TRichYPath>& paths,
  157. const TGetTablePartitionsOptions& options)
  158. {
  159. return NRawClient::GetTablePartitions(
  160. ClientRetryPolicy_->CreatePolicyForGenericRequest(),
  161. Context_,
  162. TransactionId_,
  163. paths,
  164. options);
  165. }
  166. TMaybe<TYPath> TClientBase::GetFileFromCache(
  167. const TString& md5Signature,
  168. const TYPath& cachePath,
  169. const TGetFileFromCacheOptions& options)
  170. {
  171. return NRawClient::GetFileFromCache(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, TransactionId_, md5Signature, cachePath, options);
  172. }
  173. TYPath TClientBase::PutFileToCache(
  174. const TYPath& filePath,
  175. const TString& md5Signature,
  176. const TYPath& cachePath,
  177. const TPutFileToCacheOptions& options)
  178. {
  179. return NRawClient::PutFileToCache(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, TransactionId_, filePath, md5Signature, cachePath, options);
  180. }
  181. IFileReaderPtr TClientBase::CreateBlobTableReader(
  182. const TYPath& path,
  183. const TKey& key,
  184. const TBlobTableReaderOptions& options)
  185. {
  186. return new TBlobTableReader(
  187. path,
  188. key,
  189. ClientRetryPolicy_,
  190. GetTransactionPinger(),
  191. Context_,
  192. TransactionId_,
  193. options);
  194. }
  195. IFileReaderPtr TClientBase::CreateFileReader(
  196. const TRichYPath& path,
  197. const TFileReaderOptions& options)
  198. {
  199. return new TFileReader(
  200. CanonizeYPath(path),
  201. ClientRetryPolicy_,
  202. GetTransactionPinger(),
  203. Context_,
  204. TransactionId_,
  205. options);
  206. }
  207. IFileWriterPtr TClientBase::CreateFileWriter(
  208. const TRichYPath& path,
  209. const TFileWriterOptions& options)
  210. {
  211. auto realPath = CanonizeYPath(path);
  212. if (!NRawClient::Exists(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, TransactionId_, realPath.Path_)) {
  213. NRawClient::Create(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, TransactionId_, realPath.Path_, NT_FILE,
  214. TCreateOptions().IgnoreExisting(true));
  215. }
  216. return new TFileWriter(realPath, ClientRetryPolicy_, GetTransactionPinger(), Context_, TransactionId_, options);
  217. }
  218. TTableWriterPtr<::google::protobuf::Message> TClientBase::CreateTableWriter(
  219. const TRichYPath& path, const ::google::protobuf::Descriptor& descriptor, const TTableWriterOptions& options)
  220. {
  221. const Message* prototype = google::protobuf::MessageFactory::generated_factory()->GetPrototype(&descriptor);
  222. return new TTableWriter<::google::protobuf::Message>(CreateProtoWriter(path, options, prototype));
  223. }
  224. TRawTableReaderPtr TClientBase::CreateRawReader(
  225. const TRichYPath& path,
  226. const TFormat& format,
  227. const TTableReaderOptions& options)
  228. {
  229. return CreateClientReader(path, format, options).Get();
  230. }
  231. TRawTableWriterPtr TClientBase::CreateRawWriter(
  232. const TRichYPath& path,
  233. const TFormat& format,
  234. const TTableWriterOptions& options)
  235. {
  236. return ::MakeIntrusive<TRetryfulWriter>(
  237. ClientRetryPolicy_,
  238. GetTransactionPinger(),
  239. Context_,
  240. TransactionId_,
  241. GetWriteTableCommand(Context_.Config->ApiVersion),
  242. format,
  243. CanonizeYPath(path),
  244. options).Get();
  245. }
  246. IOperationPtr TClientBase::DoMap(
  247. const TMapOperationSpec& spec,
  248. ::TIntrusivePtr<IStructuredJob> mapper,
  249. const TOperationOptions& options)
  250. {
  251. auto operation = ::MakeIntrusive<TOperation>(GetParentClientImpl());
  252. auto prepareOperation = [
  253. this_ = ::TIntrusivePtr(this),
  254. operation,
  255. spec,
  256. mapper,
  257. options
  258. ] () {
  259. ExecuteMap(
  260. operation,
  261. ::MakeIntrusive<TOperationPreparer>(this_->GetParentClientImpl(), this_->TransactionId_),
  262. spec,
  263. mapper,
  264. options);
  265. };
  266. return ProcessOperation(GetParentClientImpl(), std::move(prepareOperation), std::move(operation), options);
  267. }
  268. IOperationPtr TClientBase::RawMap(
  269. const TRawMapOperationSpec& spec,
  270. ::TIntrusivePtr<IRawJob> mapper,
  271. const TOperationOptions& options)
  272. {
  273. auto operation = ::MakeIntrusive<TOperation>(GetParentClientImpl());
  274. auto prepareOperation = [
  275. this_=::TIntrusivePtr(this),
  276. operation,
  277. spec,
  278. mapper,
  279. options
  280. ] () {
  281. ExecuteRawMap(
  282. operation,
  283. ::MakeIntrusive<TOperationPreparer>(this_->GetParentClientImpl(), this_->TransactionId_),
  284. spec,
  285. mapper,
  286. options);
  287. };
  288. return ProcessOperation(GetParentClientImpl(), std::move(prepareOperation), std::move(operation), options);
  289. }
  290. IOperationPtr TClientBase::DoReduce(
  291. const TReduceOperationSpec& spec,
  292. ::TIntrusivePtr<IStructuredJob> reducer,
  293. const TOperationOptions& options)
  294. {
  295. auto operation = ::MakeIntrusive<TOperation>(GetParentClientImpl());
  296. auto prepareOperation = [
  297. this_=::TIntrusivePtr(this),
  298. operation,
  299. spec,
  300. reducer,
  301. options
  302. ] () {
  303. ExecuteReduce(
  304. operation,
  305. ::MakeIntrusive<TOperationPreparer>(this_->GetParentClientImpl(), this_->TransactionId_),
  306. spec,
  307. reducer,
  308. options);
  309. };
  310. return ProcessOperation(GetParentClientImpl(), std::move(prepareOperation), std::move(operation), options);
  311. }
  312. IOperationPtr TClientBase::RawReduce(
  313. const TRawReduceOperationSpec& spec,
  314. ::TIntrusivePtr<IRawJob> reducer,
  315. const TOperationOptions& options)
  316. {
  317. auto operation = ::MakeIntrusive<TOperation>(GetParentClientImpl());
  318. auto prepareOperation = [
  319. this_=::TIntrusivePtr(this),
  320. operation,
  321. spec,
  322. reducer,
  323. options
  324. ] () {
  325. ExecuteRawReduce(
  326. operation,
  327. ::MakeIntrusive<TOperationPreparer>(this_->GetParentClientImpl(), this_->TransactionId_),
  328. spec,
  329. reducer,
  330. options);
  331. };
  332. return ProcessOperation(GetParentClientImpl(), std::move(prepareOperation), std::move(operation), options);
  333. }
  334. IOperationPtr TClientBase::DoJoinReduce(
  335. const TJoinReduceOperationSpec& spec,
  336. ::TIntrusivePtr<IStructuredJob> reducer,
  337. const TOperationOptions& options)
  338. {
  339. auto operation = ::MakeIntrusive<TOperation>(GetParentClientImpl());
  340. auto prepareOperation = [
  341. this_=::TIntrusivePtr(this),
  342. operation,
  343. spec,
  344. reducer,
  345. options
  346. ] () {
  347. ExecuteJoinReduce(
  348. operation,
  349. ::MakeIntrusive<TOperationPreparer>(this_->GetParentClientImpl(), this_->TransactionId_),
  350. spec,
  351. reducer,
  352. options);
  353. };
  354. return ProcessOperation(GetParentClientImpl(), std::move(prepareOperation), std::move(operation), options);
  355. }
  356. IOperationPtr TClientBase::RawJoinReduce(
  357. const TRawJoinReduceOperationSpec& spec,
  358. ::TIntrusivePtr<IRawJob> reducer,
  359. const TOperationOptions& options)
  360. {
  361. auto operation = ::MakeIntrusive<TOperation>(GetParentClientImpl());
  362. auto prepareOperation = [
  363. this_=::TIntrusivePtr(this),
  364. operation,
  365. spec,
  366. reducer,
  367. options
  368. ] () {
  369. ExecuteRawJoinReduce(
  370. operation,
  371. ::MakeIntrusive<TOperationPreparer>(this_->GetParentClientImpl(), this_->TransactionId_),
  372. spec,
  373. reducer,
  374. options);
  375. };
  376. return ProcessOperation(GetParentClientImpl(), std::move(prepareOperation), std::move(operation), options);
  377. }
  378. IOperationPtr TClientBase::DoMapReduce(
  379. const TMapReduceOperationSpec& spec,
  380. ::TIntrusivePtr<IStructuredJob> mapper,
  381. ::TIntrusivePtr<IStructuredJob> reduceCombiner,
  382. ::TIntrusivePtr<IStructuredJob> reducer,
  383. const TOperationOptions& options)
  384. {
  385. auto operation = ::MakeIntrusive<TOperation>(GetParentClientImpl());
  386. auto prepareOperation = [
  387. this_=::TIntrusivePtr(this),
  388. operation,
  389. spec,
  390. mapper,
  391. reduceCombiner,
  392. reducer,
  393. options
  394. ] () {
  395. ExecuteMapReduce(
  396. operation,
  397. ::MakeIntrusive<TOperationPreparer>(this_->GetParentClientImpl(), this_->TransactionId_),
  398. spec,
  399. mapper,
  400. reduceCombiner,
  401. reducer,
  402. options);
  403. };
  404. return ProcessOperation(GetParentClientImpl(), std::move(prepareOperation), std::move(operation), options);
  405. }
  406. IOperationPtr TClientBase::RawMapReduce(
  407. const TRawMapReduceOperationSpec& spec,
  408. ::TIntrusivePtr<IRawJob> mapper,
  409. ::TIntrusivePtr<IRawJob> reduceCombiner,
  410. ::TIntrusivePtr<IRawJob> reducer,
  411. const TOperationOptions& options)
  412. {
  413. auto operation = ::MakeIntrusive<TOperation>(GetParentClientImpl());
  414. auto prepareOperation = [
  415. this_=::TIntrusivePtr(this),
  416. operation,
  417. spec,
  418. mapper,
  419. reduceCombiner,
  420. reducer,
  421. options
  422. ] () {
  423. ExecuteRawMapReduce(
  424. operation,
  425. ::MakeIntrusive<TOperationPreparer>(this_->GetParentClientImpl(), this_->TransactionId_),
  426. spec,
  427. mapper,
  428. reduceCombiner,
  429. reducer,
  430. options);
  431. };
  432. return ProcessOperation(GetParentClientImpl(), std::move(prepareOperation), std::move(operation), options);
  433. }
  434. IOperationPtr TClientBase::Sort(
  435. const TSortOperationSpec& spec,
  436. const TOperationOptions& options)
  437. {
  438. auto operation = ::MakeIntrusive<TOperation>(GetParentClientImpl());
  439. auto prepareOperation = [
  440. this_ = ::TIntrusivePtr(this),
  441. operation,
  442. spec,
  443. options
  444. ] () {
  445. ExecuteSort(
  446. operation,
  447. ::MakeIntrusive<TOperationPreparer>(this_->GetParentClientImpl(), this_->TransactionId_),
  448. spec,
  449. options);
  450. };
  451. return ProcessOperation(GetParentClientImpl(), std::move(prepareOperation), std::move(operation), options);
  452. }
  453. IOperationPtr TClientBase::Merge(
  454. const TMergeOperationSpec& spec,
  455. const TOperationOptions& options)
  456. {
  457. auto operation = ::MakeIntrusive<TOperation>(GetParentClientImpl());
  458. auto prepareOperation = [
  459. this_ = ::TIntrusivePtr(this),
  460. operation,
  461. spec,
  462. options
  463. ] () {
  464. ExecuteMerge(
  465. operation,
  466. ::MakeIntrusive<TOperationPreparer>(this_->GetParentClientImpl(), this_->TransactionId_),
  467. spec,
  468. options);
  469. };
  470. return ProcessOperation(GetParentClientImpl(), std::move(prepareOperation), std::move(operation), options);
  471. }
  472. IOperationPtr TClientBase::Erase(
  473. const TEraseOperationSpec& spec,
  474. const TOperationOptions& options)
  475. {
  476. auto operation = ::MakeIntrusive<TOperation>(GetParentClientImpl());
  477. auto prepareOperation = [
  478. this_ = ::TIntrusivePtr(this),
  479. operation,
  480. spec,
  481. options
  482. ] () {
  483. ExecuteErase(
  484. operation,
  485. ::MakeIntrusive<TOperationPreparer>(this_->GetParentClientImpl(), this_->TransactionId_),
  486. spec,
  487. options);
  488. };
  489. return ProcessOperation(GetParentClientImpl(), std::move(prepareOperation), std::move(operation), options);
  490. }
  491. IOperationPtr TClientBase::RemoteCopy(
  492. const TRemoteCopyOperationSpec& spec,
  493. const TOperationOptions& options)
  494. {
  495. auto operation = ::MakeIntrusive<TOperation>(GetParentClientImpl());
  496. auto prepareOperation = [
  497. this_ = ::TIntrusivePtr(this),
  498. operation,
  499. spec,
  500. options
  501. ] () {
  502. ExecuteRemoteCopy(
  503. operation,
  504. ::MakeIntrusive<TOperationPreparer>(this_->GetParentClientImpl(), this_->TransactionId_),
  505. spec,
  506. options);
  507. };
  508. return ProcessOperation(GetParentClientImpl(), std::move(prepareOperation), std::move(operation), options);
  509. }
  510. IOperationPtr TClientBase::RunVanilla(
  511. const TVanillaOperationSpec& spec,
  512. const TOperationOptions& options)
  513. {
  514. auto operation = ::MakeIntrusive<TOperation>(GetParentClientImpl());
  515. auto prepareOperation = [
  516. this_ = ::TIntrusivePtr(this),
  517. operation,
  518. spec,
  519. options
  520. ] () {
  521. ExecuteVanilla(
  522. operation,
  523. ::MakeIntrusive<TOperationPreparer>(this_->GetParentClientImpl(), this_->TransactionId_),
  524. spec,
  525. options);
  526. };
  527. return ProcessOperation(GetParentClientImpl(), std::move(prepareOperation), std::move(operation), options);
  528. }
  529. IOperationPtr TClientBase::AttachOperation(const TOperationId& operationId)
  530. {
  531. auto operation = ::MakeIntrusive<TOperation>(operationId, GetParentClientImpl());
  532. operation->GetBriefState(); // check that operation exists
  533. return operation;
  534. }
  535. EOperationBriefState TClientBase::CheckOperation(const TOperationId& operationId)
  536. {
  537. return NYT::NDetail::CheckOperation(ClientRetryPolicy_, Context_, operationId);
  538. }
  539. void TClientBase::AbortOperation(const TOperationId& operationId)
  540. {
  541. NRawClient::AbortOperation(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, operationId);
  542. }
  543. void TClientBase::CompleteOperation(const TOperationId& operationId)
  544. {
  545. NRawClient::CompleteOperation(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, operationId);
  546. }
  547. void TClientBase::WaitForOperation(const TOperationId& operationId)
  548. {
  549. NYT::NDetail::WaitForOperation(ClientRetryPolicy_, Context_, operationId);
  550. }
  551. void TClientBase::AlterTable(
  552. const TYPath& path,
  553. const TAlterTableOptions& options)
  554. {
  555. NRawClient::AlterTable(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, TransactionId_, path, options);
  556. }
  557. ::TIntrusivePtr<TClientReader> TClientBase::CreateClientReader(
  558. const TRichYPath& path,
  559. const TFormat& format,
  560. const TTableReaderOptions& options,
  561. bool useFormatFromTableAttributes)
  562. {
  563. return ::MakeIntrusive<TClientReader>(
  564. CanonizeYPath(path),
  565. ClientRetryPolicy_,
  566. GetTransactionPinger(),
  567. Context_,
  568. TransactionId_,
  569. format,
  570. options,
  571. useFormatFromTableAttributes);
  572. }
  573. THolder<TClientWriter> TClientBase::CreateClientWriter(
  574. const TRichYPath& path,
  575. const TFormat& format,
  576. const TTableWriterOptions& options)
  577. {
  578. auto realPath = CanonizeYPath(path);
  579. if (!NRawClient::Exists(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, TransactionId_, realPath.Path_)) {
  580. NRawClient::Create(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, TransactionId_, realPath.Path_, NT_TABLE,
  581. TCreateOptions().IgnoreExisting(true));
  582. }
  583. return MakeHolder<TClientWriter>(
  584. realPath,
  585. ClientRetryPolicy_,
  586. GetTransactionPinger(),
  587. Context_,
  588. TransactionId_,
  589. format,
  590. options
  591. );
  592. }
  593. ::TIntrusivePtr<INodeReaderImpl> TClientBase::CreateNodeReader(
  594. const TRichYPath& path, const TTableReaderOptions& options)
  595. {
  596. auto format = TFormat::YsonBinary();
  597. ApplyFormatHints<TNode>(&format, options.FormatHints_);
  598. // Skiff is disabled here because of large header problem (see https://st.yandex-team.ru/YT-6926).
  599. // Revert this code to r3614168 when it is fixed.
  600. return new TNodeTableReader(
  601. CreateClientReader(path, format, options));
  602. }
  603. ::TIntrusivePtr<IYaMRReaderImpl> TClientBase::CreateYaMRReader(
  604. const TRichYPath& path, const TTableReaderOptions& options)
  605. {
  606. return new TYaMRTableReader(
  607. CreateClientReader(path, TFormat::YaMRLenval(), options, /* useFormatFromTableAttributes = */ true));
  608. }
  609. ::TIntrusivePtr<IProtoReaderImpl> TClientBase::CreateProtoReader(
  610. const TRichYPath& path,
  611. const TTableReaderOptions& options,
  612. const Message* prototype)
  613. {
  614. TVector<const ::google::protobuf::Descriptor*> descriptors;
  615. descriptors.push_back(prototype->GetDescriptor());
  616. if (Context_.Config->UseClientProtobuf) {
  617. return new TProtoTableReader(
  618. CreateClientReader(path, TFormat::YsonBinary(), options),
  619. std::move(descriptors));
  620. } else {
  621. auto format = TFormat::Protobuf({prototype->GetDescriptor()}, Context_.Config->ProtobufFormatWithDescriptors);
  622. return new TLenvalProtoTableReader(
  623. CreateClientReader(path, format, options),
  624. std::move(descriptors));
  625. }
  626. }
  627. ::TIntrusivePtr<ISkiffRowReaderImpl> TClientBase::CreateSkiffRowReader(
  628. const TRichYPath& path,
  629. const TTableReaderOptions& options,
  630. const ISkiffRowSkipperPtr& skipper,
  631. const NSkiff::TSkiffSchemaPtr& schema)
  632. {
  633. auto skiffOptions = TCreateSkiffSchemaOptions().HasRangeIndex(true);
  634. auto resultSchema = NYT::NDetail::CreateSkiffSchema(TVector{schema}, skiffOptions);
  635. return new TSkiffRowTableReader(
  636. CreateClientReader(path, NYT::NDetail::CreateSkiffFormat(resultSchema), options),
  637. resultSchema,
  638. {skipper},
  639. std::move(skiffOptions));
  640. }
  641. ::TIntrusivePtr<INodeWriterImpl> TClientBase::CreateNodeWriter(
  642. const TRichYPath& path, const TTableWriterOptions& options)
  643. {
  644. auto format = TFormat::YsonBinary();
  645. ApplyFormatHints<TNode>(&format, options.FormatHints_);
  646. return new TNodeTableWriter(
  647. CreateClientWriter(path, format, options));
  648. }
  649. ::TIntrusivePtr<IYaMRWriterImpl> TClientBase::CreateYaMRWriter(
  650. const TRichYPath& path, const TTableWriterOptions& options)
  651. {
  652. auto format = TFormat::YaMRLenval();
  653. ApplyFormatHints<TYaMRRow>(&format, options.FormatHints_);
  654. return new TYaMRTableWriter(
  655. CreateClientWriter(path, format, options));
  656. }
  657. ::TIntrusivePtr<IProtoWriterImpl> TClientBase::CreateProtoWriter(
  658. const TRichYPath& path,
  659. const TTableWriterOptions& options,
  660. const Message* prototype)
  661. {
  662. TVector<const ::google::protobuf::Descriptor*> descriptors;
  663. descriptors.push_back(prototype->GetDescriptor());
  664. auto pathWithSchema = path;
  665. if (options.InferSchema_.GetOrElse(Context_.Config->InferTableSchema) && !path.Schema_) {
  666. pathWithSchema.Schema(CreateTableSchema(*prototype->GetDescriptor()));
  667. }
  668. if (Context_.Config->UseClientProtobuf) {
  669. auto format = TFormat::YsonBinary();
  670. ApplyFormatHints<TNode>(&format, options.FormatHints_);
  671. return new TProtoTableWriter(
  672. CreateClientWriter(pathWithSchema, format, options),
  673. std::move(descriptors));
  674. } else {
  675. auto format = TFormat::Protobuf({prototype->GetDescriptor()}, Context_.Config->ProtobufFormatWithDescriptors);
  676. ApplyFormatHints<::google::protobuf::Message>(&format, options.FormatHints_);
  677. return new TLenvalProtoTableWriter(
  678. CreateClientWriter(pathWithSchema, format, options),
  679. std::move(descriptors));
  680. }
  681. }
  682. TBatchRequestPtr TClientBase::CreateBatchRequest()
  683. {
  684. return MakeIntrusive<TBatchRequest>(TransactionId_, GetParentClientImpl());
  685. }
  686. IClientPtr TClientBase::GetParentClient()
  687. {
  688. return GetParentClientImpl();
  689. }
  690. const TClientContext& TClientBase::GetContext() const
  691. {
  692. return Context_;
  693. }
  694. const IClientRetryPolicyPtr& TClientBase::GetRetryPolicy() const
  695. {
  696. return ClientRetryPolicy_;
  697. }
  698. ////////////////////////////////////////////////////////////////////////////////
  699. TTransaction::TTransaction(
  700. TClientPtr parentClient,
  701. const TClientContext& context,
  702. const TTransactionId& parentTransactionId,
  703. const TStartTransactionOptions& options)
  704. : TClientBase(context, parentTransactionId, parentClient->GetRetryPolicy())
  705. , TransactionPinger_(parentClient->GetTransactionPinger())
  706. , PingableTx_(
  707. MakeHolder<TPingableTransaction>(
  708. parentClient->GetRetryPolicy(),
  709. context,
  710. parentTransactionId,
  711. TransactionPinger_->GetChildTxPinger(),
  712. options))
  713. , ParentClient_(parentClient)
  714. {
  715. TransactionId_ = PingableTx_->GetId();
  716. }
  717. TTransaction::TTransaction(
  718. TClientPtr parentClient,
  719. const TClientContext& context,
  720. const TTransactionId& transactionId,
  721. const TAttachTransactionOptions& options)
  722. : TClientBase(context, transactionId, parentClient->GetRetryPolicy())
  723. , TransactionPinger_(parentClient->GetTransactionPinger())
  724. , PingableTx_(
  725. new TPingableTransaction(
  726. parentClient->GetRetryPolicy(),
  727. context,
  728. transactionId,
  729. parentClient->GetTransactionPinger()->GetChildTxPinger(),
  730. options))
  731. , ParentClient_(parentClient)
  732. { }
  733. const TTransactionId& TTransaction::GetId() const
  734. {
  735. return TransactionId_;
  736. }
  737. ILockPtr TTransaction::Lock(
  738. const TYPath& path,
  739. ELockMode mode,
  740. const TLockOptions& options)
  741. {
  742. auto lockId = NRawClient::Lock(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, TransactionId_, path, mode, options);
  743. return ::MakeIntrusive<TLock>(lockId, GetParentClientImpl(), options.Waitable_);
  744. }
  745. void TTransaction::Unlock(
  746. const TYPath& path,
  747. const TUnlockOptions& options)
  748. {
  749. NRawClient::Unlock(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, TransactionId_, path, options);
  750. }
  751. void TTransaction::Commit()
  752. {
  753. PingableTx_->Commit();
  754. }
  755. void TTransaction::Abort()
  756. {
  757. PingableTx_->Abort();
  758. }
  759. void TTransaction::Ping()
  760. {
  761. PingTx(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, TransactionId_);
  762. }
  763. void TTransaction::Detach()
  764. {
  765. PingableTx_->Detach();
  766. }
  767. ITransactionPingerPtr TTransaction::GetTransactionPinger()
  768. {
  769. return TransactionPinger_;
  770. }
  771. TClientPtr TTransaction::GetParentClientImpl()
  772. {
  773. return ParentClient_;
  774. }
  775. ////////////////////////////////////////////////////////////////////////////////
  776. TClient::TClient(
  777. const TClientContext& context,
  778. const TTransactionId& globalId,
  779. IClientRetryPolicyPtr retryPolicy)
  780. : TClientBase(context, globalId, retryPolicy)
  781. , TransactionPinger_(nullptr)
  782. { }
  783. TClient::~TClient() = default;
  784. ITransactionPtr TClient::AttachTransaction(
  785. const TTransactionId& transactionId,
  786. const TAttachTransactionOptions& options)
  787. {
  788. CheckShutdown();
  789. return MakeIntrusive<TTransaction>(this, Context_, transactionId, options);
  790. }
  791. void TClient::MountTable(
  792. const TYPath& path,
  793. const TMountTableOptions& options)
  794. {
  795. CheckShutdown();
  796. THttpHeader header("POST", "mount_table");
  797. SetTabletParams(header, path, options);
  798. if (options.CellId_) {
  799. header.AddParameter("cell_id", GetGuidAsString(*options.CellId_));
  800. }
  801. header.AddParameter("freeze", options.Freeze_);
  802. RetryRequestWithPolicy(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, header);
  803. }
  804. void TClient::UnmountTable(
  805. const TYPath& path,
  806. const TUnmountTableOptions& options)
  807. {
  808. CheckShutdown();
  809. THttpHeader header("POST", "unmount_table");
  810. SetTabletParams(header, path, options);
  811. header.AddParameter("force", options.Force_);
  812. RetryRequestWithPolicy(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, header);
  813. }
  814. void TClient::RemountTable(
  815. const TYPath& path,
  816. const TRemountTableOptions& options)
  817. {
  818. CheckShutdown();
  819. THttpHeader header("POST", "remount_table");
  820. SetTabletParams(header, path, options);
  821. RetryRequestWithPolicy(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, header);
  822. }
  823. void TClient::FreezeTable(
  824. const TYPath& path,
  825. const TFreezeTableOptions& options)
  826. {
  827. CheckShutdown();
  828. NRawClient::FreezeTable(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, path, options);
  829. }
  830. void TClient::UnfreezeTable(
  831. const TYPath& path,
  832. const TUnfreezeTableOptions& options)
  833. {
  834. CheckShutdown();
  835. NRawClient::UnfreezeTable(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, path, options);
  836. }
  837. void TClient::ReshardTable(
  838. const TYPath& path,
  839. const TVector<TKey>& keys,
  840. const TReshardTableOptions& options)
  841. {
  842. CheckShutdown();
  843. THttpHeader header("POST", "reshard_table");
  844. SetTabletParams(header, path, options);
  845. header.AddParameter("pivot_keys", BuildYsonNodeFluently().List(keys));
  846. RetryRequestWithPolicy(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, header);
  847. }
  848. void TClient::ReshardTable(
  849. const TYPath& path,
  850. i64 tabletCount,
  851. const TReshardTableOptions& options)
  852. {
  853. CheckShutdown();
  854. THttpHeader header("POST", "reshard_table");
  855. SetTabletParams(header, path, options);
  856. header.AddParameter("tablet_count", tabletCount);
  857. RetryRequestWithPolicy(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, header);
  858. }
  859. void TClient::InsertRows(
  860. const TYPath& path,
  861. const TNode::TListType& rows,
  862. const TInsertRowsOptions& options)
  863. {
  864. CheckShutdown();
  865. THttpHeader header("PUT", "insert_rows");
  866. header.SetInputFormat(TFormat::YsonBinary());
  867. // TODO: use corresponding raw request
  868. header.MergeParameters(SerializeParametersForInsertRows(Context_.Config->Prefix, path, options));
  869. auto body = NodeListToYsonString(rows);
  870. TRequestConfig config;
  871. config.IsHeavy = true;
  872. RetryRequestWithPolicy(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, header, body, config);
  873. }
  874. void TClient::DeleteRows(
  875. const TYPath& path,
  876. const TNode::TListType& keys,
  877. const TDeleteRowsOptions& options)
  878. {
  879. CheckShutdown();
  880. return NRawClient::DeleteRows(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, path, keys, options);
  881. }
  882. void TClient::TrimRows(
  883. const TYPath& path,
  884. i64 tabletIndex,
  885. i64 rowCount,
  886. const TTrimRowsOptions& options)
  887. {
  888. CheckShutdown();
  889. THttpHeader header("POST", "trim_rows");
  890. header.AddParameter("trimmed_row_count", rowCount);
  891. header.AddParameter("tablet_index", tabletIndex);
  892. // TODO: use corresponding raw request
  893. header.MergeParameters(NRawClient::SerializeParametersForTrimRows(Context_.Config->Prefix, path, options));
  894. TRequestConfig config;
  895. config.IsHeavy = true;
  896. RetryRequestWithPolicy(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, header, {}, config);
  897. }
  898. TNode::TListType TClient::LookupRows(
  899. const TYPath& path,
  900. const TNode::TListType& keys,
  901. const TLookupRowsOptions& options)
  902. {
  903. CheckShutdown();
  904. Y_UNUSED(options);
  905. THttpHeader header("PUT", "lookup_rows");
  906. header.AddPath(AddPathPrefix(path, Context_.Config->ApiVersion));
  907. header.SetInputFormat(TFormat::YsonBinary());
  908. header.SetOutputFormat(TFormat::YsonBinary());
  909. header.MergeParameters(BuildYsonNodeFluently().BeginMap()
  910. .DoIf(options.Timeout_.Defined(), [&] (TFluentMap fluent) {
  911. fluent.Item("timeout").Value(static_cast<i64>(options.Timeout_->MilliSeconds()));
  912. })
  913. .Item("keep_missing_rows").Value(options.KeepMissingRows_)
  914. .DoIf(options.Versioned_.Defined(), [&] (TFluentMap fluent) {
  915. fluent.Item("versioned").Value(*options.Versioned_);
  916. })
  917. .DoIf(options.Columns_.Defined(), [&] (TFluentMap fluent) {
  918. fluent.Item("column_names").Value(*options.Columns_);
  919. })
  920. .EndMap());
  921. auto body = NodeListToYsonString(keys);
  922. TRequestConfig config;
  923. config.IsHeavy = true;
  924. auto result = RetryRequestWithPolicy(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, header, body, config);
  925. return NodeFromYsonString(result.Response, ::NYson::EYsonType::ListFragment).AsList();
  926. }
  927. TNode::TListType TClient::SelectRows(
  928. const TString& query,
  929. const TSelectRowsOptions& options)
  930. {
  931. CheckShutdown();
  932. THttpHeader header("GET", "select_rows");
  933. header.SetInputFormat(TFormat::YsonBinary());
  934. header.SetOutputFormat(TFormat::YsonBinary());
  935. header.MergeParameters(BuildYsonNodeFluently().BeginMap()
  936. .Item("query").Value(query)
  937. .DoIf(options.Timeout_.Defined(), [&] (TFluentMap fluent) {
  938. fluent.Item("timeout").Value(static_cast<i64>(options.Timeout_->MilliSeconds()));
  939. })
  940. .DoIf(options.InputRowLimit_.Defined(), [&] (TFluentMap fluent) {
  941. fluent.Item("input_row_limit").Value(*options.InputRowLimit_);
  942. })
  943. .DoIf(options.OutputRowLimit_.Defined(), [&] (TFluentMap fluent) {
  944. fluent.Item("output_row_limit").Value(*options.OutputRowLimit_);
  945. })
  946. .Item("range_expansion_limit").Value(options.RangeExpansionLimit_)
  947. .Item("fail_on_incomplete_result").Value(options.FailOnIncompleteResult_)
  948. .Item("verbose_logging").Value(options.VerboseLogging_)
  949. .Item("enable_code_cache").Value(options.EnableCodeCache_)
  950. .EndMap());
  951. TRequestConfig config;
  952. config.IsHeavy = true;
  953. auto result = RetryRequestWithPolicy(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, header, {}, config);
  954. return NodeFromYsonString(result.Response, ::NYson::EYsonType::ListFragment).AsList();
  955. }
  956. void TClient::AlterTableReplica(const TReplicaId& replicaId, const TAlterTableReplicaOptions& options)
  957. {
  958. CheckShutdown();
  959. NRawClient::AlterTableReplica(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, replicaId, options);
  960. }
  961. ui64 TClient::GenerateTimestamp()
  962. {
  963. CheckShutdown();
  964. THttpHeader header("GET", "generate_timestamp");
  965. TRequestConfig config;
  966. config.IsHeavy = true;
  967. auto requestResult = RetryRequestWithPolicy(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, header, {}, config);
  968. return NodeFromYsonString(requestResult.Response).AsUint64();
  969. }
  970. TAuthorizationInfo TClient::WhoAmI()
  971. {
  972. CheckShutdown();
  973. THttpHeader header("GET", "auth/whoami", /* isApi = */ false);
  974. auto requestResult = RetryRequestWithPolicy(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, header);
  975. TAuthorizationInfo result;
  976. NJson::TJsonValue jsonValue;
  977. bool ok = NJson::ReadJsonTree(requestResult.Response, &jsonValue, /* throwOnError = */ true);
  978. Y_ABORT_UNLESS(ok);
  979. result.Login = jsonValue["login"].GetString();
  980. result.Realm = jsonValue["realm"].GetString();
  981. return result;
  982. }
  983. TOperationAttributes TClient::GetOperation(
  984. const TOperationId& operationId,
  985. const TGetOperationOptions& options)
  986. {
  987. CheckShutdown();
  988. return NRawClient::GetOperation(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, operationId, options);
  989. }
  990. TListOperationsResult TClient::ListOperations(
  991. const TListOperationsOptions& options)
  992. {
  993. CheckShutdown();
  994. return NRawClient::ListOperations(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, options);
  995. }
  996. void TClient::UpdateOperationParameters(
  997. const TOperationId& operationId,
  998. const TUpdateOperationParametersOptions& options)
  999. {
  1000. CheckShutdown();
  1001. return NRawClient::UpdateOperationParameters(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, operationId, options);
  1002. }
  1003. TJobAttributes TClient::GetJob(
  1004. const TOperationId& operationId,
  1005. const TJobId& jobId,
  1006. const TGetJobOptions& options)
  1007. {
  1008. CheckShutdown();
  1009. return NRawClient::GetJob(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, operationId, jobId, options);
  1010. }
  1011. TListJobsResult TClient::ListJobs(
  1012. const TOperationId& operationId,
  1013. const TListJobsOptions& options)
  1014. {
  1015. CheckShutdown();
  1016. return NRawClient::ListJobs(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, operationId, options);
  1017. }
  1018. IFileReaderPtr TClient::GetJobInput(
  1019. const TJobId& jobId,
  1020. const TGetJobInputOptions& options)
  1021. {
  1022. CheckShutdown();
  1023. return NRawClient::GetJobInput(Context_, jobId, options);
  1024. }
  1025. IFileReaderPtr TClient::GetJobFailContext(
  1026. const TOperationId& operationId,
  1027. const TJobId& jobId,
  1028. const TGetJobFailContextOptions& options)
  1029. {
  1030. CheckShutdown();
  1031. return NRawClient::GetJobFailContext(Context_, operationId, jobId, options);
  1032. }
  1033. IFileReaderPtr TClient::GetJobStderr(
  1034. const TOperationId& operationId,
  1035. const TJobId& jobId,
  1036. const TGetJobStderrOptions& options)
  1037. {
  1038. CheckShutdown();
  1039. return NRawClient::GetJobStderr(Context_, operationId, jobId, options);
  1040. }
  1041. TNode::TListType TClient::SkyShareTable(
  1042. const std::vector<TYPath>& tablePaths,
  1043. const TSkyShareTableOptions& options)
  1044. {
  1045. CheckShutdown();
  1046. return NRawClient::SkyShareTable(
  1047. ClientRetryPolicy_->CreatePolicyForGenericRequest(),
  1048. Context_,
  1049. tablePaths,
  1050. options);
  1051. }
  1052. TCheckPermissionResponse TClient::CheckPermission(
  1053. const TString& user,
  1054. EPermission permission,
  1055. const TYPath& path,
  1056. const TCheckPermissionOptions& options)
  1057. {
  1058. CheckShutdown();
  1059. return NRawClient::CheckPermission(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, user, permission, path, options);
  1060. }
  1061. TVector<TTabletInfo> TClient::GetTabletInfos(
  1062. const TYPath& path,
  1063. const TVector<int>& tabletIndexes,
  1064. const TGetTabletInfosOptions& options)
  1065. {
  1066. CheckShutdown();
  1067. return NRawClient::GetTabletInfos(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, path, tabletIndexes, options);
  1068. }
  1069. void TClient::SuspendOperation(
  1070. const TOperationId& operationId,
  1071. const TSuspendOperationOptions& options)
  1072. {
  1073. CheckShutdown();
  1074. NRawClient::SuspendOperation(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, operationId, options);
  1075. }
  1076. void TClient::ResumeOperation(
  1077. const TOperationId& operationId,
  1078. const TResumeOperationOptions& options)
  1079. {
  1080. CheckShutdown();
  1081. NRawClient::ResumeOperation(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, operationId, options);
  1082. }
  1083. TYtPoller& TClient::GetYtPoller()
  1084. {
  1085. auto g = Guard(Lock_);
  1086. if (!YtPoller_) {
  1087. CheckShutdown();
  1088. // We don't use current client and create new client because YtPoller_ might use
  1089. // this client during current client shutdown.
  1090. // That might lead to incrementing of current client refcount and double delete of current client object.
  1091. YtPoller_ = MakeHolder<TYtPoller>(Context_, ClientRetryPolicy_);
  1092. }
  1093. return *YtPoller_;
  1094. }
  1095. void TClient::Shutdown()
  1096. {
  1097. auto g = Guard(Lock_);
  1098. if (!Shutdown_.exchange(true) && YtPoller_) {
  1099. YtPoller_->Stop();
  1100. }
  1101. }
  1102. ITransactionPingerPtr TClient::GetTransactionPinger()
  1103. {
  1104. auto g = Guard(Lock_);
  1105. if (!TransactionPinger_) {
  1106. TransactionPinger_ = CreateTransactionPinger(Context_.Config);
  1107. }
  1108. return TransactionPinger_;
  1109. }
  1110. TClientPtr TClient::GetParentClientImpl()
  1111. {
  1112. return this;
  1113. }
  1114. template <class TOptions>
  1115. void TClient::SetTabletParams(
  1116. THttpHeader& header,
  1117. const TYPath& path,
  1118. const TOptions& options)
  1119. {
  1120. header.AddPath(AddPathPrefix(path, Context_.Config->Prefix));
  1121. if (options.FirstTabletIndex_) {
  1122. header.AddParameter("first_tablet_index", *options.FirstTabletIndex_);
  1123. }
  1124. if (options.LastTabletIndex_) {
  1125. header.AddParameter("last_tablet_index", *options.LastTabletIndex_);
  1126. }
  1127. }
  1128. void TClient::CheckShutdown() const
  1129. {
  1130. if (Shutdown_) {
  1131. ythrow TApiUsageError() << "Call client's methods after shutdown";
  1132. }
  1133. }
  1134. TClientPtr CreateClientImpl(
  1135. const TString& serverName,
  1136. const TCreateClientOptions& options)
  1137. {
  1138. TClientContext context;
  1139. context.Config = options.Config_ ? options.Config_ : TConfig::Get();
  1140. context.TvmOnly = options.TvmOnly_;
  1141. context.UseTLS = options.UseTLS_;
  1142. context.ServerName = serverName;
  1143. if (serverName.find('.') == TString::npos &&
  1144. serverName.find(':') == TString::npos)
  1145. {
  1146. context.ServerName += ".yt.yandex.net";
  1147. }
  1148. if (serverName.find(':') == TString::npos) {
  1149. context.ServerName = CreateHostNameWithPort(context.ServerName, context);
  1150. }
  1151. if (options.TvmOnly_) {
  1152. context.ServerName = Format("tvm.%v", context.ServerName);
  1153. }
  1154. if (options.UseTLS_ || options.UseCoreHttpClient_) {
  1155. context.HttpClient = NHttpClient::CreateCoreHttpClient(options.UseTLS_, context.Config);
  1156. } else {
  1157. context.HttpClient = NHttpClient::CreateDefaultHttpClient();
  1158. }
  1159. context.Token = context.Config->Token;
  1160. if (options.Token_) {
  1161. context.Token = options.Token_;
  1162. } else if (options.TokenPath_) {
  1163. context.Token = TConfig::LoadTokenFromFile(options.TokenPath_);
  1164. } else if (options.ServiceTicketAuth_) {
  1165. context.ServiceTicketAuth = options.ServiceTicketAuth_;
  1166. }
  1167. context.ImpersonationUser = options.ImpersonationUser_;
  1168. if (context.Token) {
  1169. TConfig::ValidateToken(context.Token);
  1170. }
  1171. auto globalTxId = GetGuid(context.Config->GlobalTxId);
  1172. auto retryConfigProvider = options.RetryConfigProvider_;
  1173. if (!retryConfigProvider) {
  1174. retryConfigProvider = CreateDefaultRetryConfigProvider();
  1175. }
  1176. return new NDetail::TClient(context, globalTxId, CreateDefaultClientRetryPolicy(retryConfigProvider, context.Config));
  1177. }
  1178. ////////////////////////////////////////////////////////////////////////////////
  1179. } // namespace NDetail
  1180. ////////////////////////////////////////////////////////////////////////////////
  1181. IClientPtr CreateClient(
  1182. const TString& serverName,
  1183. const TCreateClientOptions& options)
  1184. {
  1185. return NDetail::CreateClientImpl(serverName, options);
  1186. }
  1187. IClientPtr CreateClientFromEnv(const TCreateClientOptions& options)
  1188. {
  1189. auto serverName = GetEnv("YT_PROXY");
  1190. if (!serverName) {
  1191. ythrow yexception() << "YT_PROXY is not set";
  1192. }
  1193. return NDetail::CreateClientImpl(serverName, options);
  1194. }
  1195. ////////////////////////////////////////////////////////////////////////////////
  1196. } // namespace NYT