client.cpp 51 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639
  1. #include "client.h"
  2. #include "batch_request_impl.h"
  3. #include "client_reader.h"
  4. #include "client_writer.h"
  5. #include "file_reader.h"
  6. #include "file_writer.h"
  7. #include "format_hints.h"
  8. #include "init.h"
  9. #include "lock.h"
  10. #include "operation.h"
  11. #include "retryful_writer.h"
  12. #include "transaction.h"
  13. #include "transaction_pinger.h"
  14. #include "yt_poller.h"
  15. #include <yt/cpp/mapreduce/common/helpers.h>
  16. #include <yt/cpp/mapreduce/common/retry_lib.h>
  17. #include <yt/cpp/mapreduce/http/helpers.h>
  18. #include <yt/cpp/mapreduce/http/http.h>
  19. #include <yt/cpp/mapreduce/http/http_client.h>
  20. #include <yt/cpp/mapreduce/http/requests.h>
  21. #include <yt/cpp/mapreduce/http/retry_request.h>
  22. #include <yt/cpp/mapreduce/interface/config.h>
  23. #include <yt/cpp/mapreduce/interface/client.h>
  24. #include <yt/cpp/mapreduce/interface/error_codes.h>
  25. #include <yt/cpp/mapreduce/interface/fluent.h>
  26. #include <yt/cpp/mapreduce/interface/logging/yt_log.h>
  27. #include <yt/cpp/mapreduce/interface/skiff_row.h>
  28. #include <yt/cpp/mapreduce/io/yamr_table_reader.h>
  29. #include <yt/cpp/mapreduce/io/yamr_table_writer.h>
  30. #include <yt/cpp/mapreduce/io/node_table_reader.h>
  31. #include <yt/cpp/mapreduce/io/node_table_writer.h>
  32. #include <yt/cpp/mapreduce/io/proto_table_reader.h>
  33. #include <yt/cpp/mapreduce/io/proto_table_writer.h>
  34. #include <yt/cpp/mapreduce/io/skiff_row_table_reader.h>
  35. #include <yt/cpp/mapreduce/io/proto_helpers.h>
  36. #include <yt/cpp/mapreduce/library/table_schema/protobuf.h>
  37. #include <yt/cpp/mapreduce/http_client/raw_client.h>
  38. #include <yt/cpp/mapreduce/http_client/raw_requests.h>
  39. #include <yt/yt/core/ytree/fluent.h>
  40. #include <library/cpp/json/json_reader.h>
  41. #include <util/generic/algorithm.h>
  42. #include <util/string/type.h>
  43. #include <util/system/env.h>
  44. namespace NYT {
  45. ////////////////////////////////////////////////////////////////////////////////
  46. namespace NDetail {
  47. ////////////////////////////////////////////////////////////////////////////////
  48. namespace {
  49. ////////////////////////////////////////////////////////////////////////////////
  50. THashMap<TString, TString> ParseProxyUrlAliasingRules(TString envConfig)
  51. {
  52. if (envConfig.empty()) {
  53. return {};
  54. }
  55. return NYTree::ConvertTo<THashMap<TString, TString>>(NYson::TYsonString(envConfig));
  56. }
  57. void ApplyProxyUrlAliasingRules(TString& url)
  58. {
  59. static auto rules = ParseProxyUrlAliasingRules(GetEnv("YT_PROXY_URL_ALIASING_CONFIG"));
  60. if (auto ruleIt = rules.find(url); ruleIt != rules.end()) {
  61. url = ruleIt->second;
  62. }
  63. }
  64. ////////////////////////////////////////////////////////////////////////////////
  65. } // namespace
  66. ////////////////////////////////////////////////////////////////////////////////
  67. TClientBase::TClientBase(
  68. IRawClientPtr rawClient,
  69. const TClientContext& context,
  70. const TTransactionId& transactionId,
  71. IClientRetryPolicyPtr retryPolicy)
  72. : RawClient_(std::move(rawClient))
  73. , Context_(context)
  74. , TransactionId_(transactionId)
  75. , ClientRetryPolicy_(std::move(retryPolicy))
  76. { }
  77. ITransactionPtr TClientBase::StartTransaction(
  78. const TStartTransactionOptions& options)
  79. {
  80. return MakeIntrusive<TTransaction>(RawClient_, GetParentClientImpl(), Context_, TransactionId_, options);
  81. }
  82. TNodeId TClientBase::Create(
  83. const TYPath& path,
  84. ENodeType type,
  85. const TCreateOptions& options)
  86. {
  87. return RequestWithRetry<TNodeId>(
  88. ClientRetryPolicy_->CreatePolicyForGenericRequest(),
  89. [this, &path, &type, &options] (TMutationId& mutationId) {
  90. return RawClient_->Create(mutationId, TransactionId_, path, type, options);
  91. });
  92. }
  93. void TClientBase::Remove(
  94. const TYPath& path,
  95. const TRemoveOptions& options)
  96. {
  97. RequestWithRetry<void>(
  98. ClientRetryPolicy_->CreatePolicyForGenericRequest(),
  99. [this, &path, &options] (TMutationId& mutationId) {
  100. RawClient_->Remove(mutationId, TransactionId_, path, options);
  101. });
  102. }
  103. bool TClientBase::Exists(
  104. const TYPath& path,
  105. const TExistsOptions& options)
  106. {
  107. return RequestWithRetry<bool>(
  108. ClientRetryPolicy_->CreatePolicyForGenericRequest(),
  109. [this, &path, &options] (TMutationId /*mutationId*/) {
  110. return RawClient_->Exists(TransactionId_, path, options);
  111. });
  112. }
  113. TNode TClientBase::Get(
  114. const TYPath& path,
  115. const TGetOptions& options)
  116. {
  117. return RequestWithRetry<TNode>(
  118. ClientRetryPolicy_->CreatePolicyForGenericRequest(),
  119. [this, &path, &options] (TMutationId /*mutationId*/) {
  120. return RawClient_->Get(TransactionId_, path, options);
  121. });
  122. }
  123. void TClientBase::Set(
  124. const TYPath& path,
  125. const TNode& value,
  126. const TSetOptions& options)
  127. {
  128. RequestWithRetry<void>(
  129. ClientRetryPolicy_->CreatePolicyForGenericRequest(),
  130. [this, &path, &value, &options] (TMutationId& mutationId) {
  131. RawClient_->Set(mutationId, TransactionId_, path, value, options);
  132. });
  133. }
  134. void TClientBase::MultisetAttributes(
  135. const TYPath& path,
  136. const TNode::TMapType& value,
  137. const TMultisetAttributesOptions& options)
  138. {
  139. RequestWithRetry<void>(
  140. ClientRetryPolicy_->CreatePolicyForGenericRequest(),
  141. [this, &path, &value, &options] (TMutationId& mutationId) {
  142. RawClient_->MultisetAttributes(mutationId, TransactionId_, path, value, options);
  143. });
  144. }
  145. TNode::TListType TClientBase::List(
  146. const TYPath& path,
  147. const TListOptions& options)
  148. {
  149. return RequestWithRetry<TNode::TListType>(
  150. ClientRetryPolicy_->CreatePolicyForGenericRequest(),
  151. [this, &path, &options] (TMutationId /*mutationId*/) {
  152. return RawClient_->List(TransactionId_, path, options);
  153. });
  154. }
  155. TNodeId TClientBase::Copy(
  156. const TYPath& sourcePath,
  157. const TYPath& destinationPath,
  158. const TCopyOptions& options)
  159. {
  160. try {
  161. return RequestWithRetry<TNodeId>(
  162. ClientRetryPolicy_->CreatePolicyForGenericRequest(),
  163. [this, &sourcePath, &destinationPath, &options] (TMutationId& mutationId) {
  164. return RawClient_->CopyInsideMasterCell(mutationId, TransactionId_, sourcePath, destinationPath, options);
  165. });
  166. } catch (const TErrorResponse& e) {
  167. if (e.GetError().ContainsErrorCode(NClusterErrorCodes::NObjectClient::CrossCellAdditionalPath)) {
  168. // Do transaction for cross cell copying.
  169. return RequestWithRetry<TNodeId>(
  170. ClientRetryPolicy_->CreatePolicyForGenericRequest(),
  171. [this, &sourcePath, &destinationPath, &options] (TMutationId /*mutationId*/) {
  172. auto transaction = StartTransaction(TStartTransactionOptions());
  173. auto nodeId = RawClient_->CopyWithoutRetries(transaction->GetId(), sourcePath, destinationPath, options);
  174. transaction->Commit();
  175. return nodeId;
  176. });
  177. } else {
  178. throw;
  179. }
  180. }
  181. }
  182. TNodeId TClientBase::Move(
  183. const TYPath& sourcePath,
  184. const TYPath& destinationPath,
  185. const TMoveOptions& options)
  186. {
  187. try {
  188. return RequestWithRetry<TNodeId>(
  189. ClientRetryPolicy_->CreatePolicyForGenericRequest(),
  190. [this, &sourcePath, &destinationPath, &options] (TMutationId& mutationId) {
  191. return RawClient_->MoveInsideMasterCell(mutationId, TransactionId_, sourcePath, destinationPath, options);
  192. });
  193. } catch (const TErrorResponse& e) {
  194. if (e.GetError().ContainsErrorCode(NClusterErrorCodes::NObjectClient::CrossCellAdditionalPath)) {
  195. // Do transaction for cross cell moving.
  196. return RequestWithRetry<TNodeId>(
  197. ClientRetryPolicy_->CreatePolicyForGenericRequest(),
  198. [this, &sourcePath, &destinationPath, &options] (TMutationId /*mutationId*/) {
  199. auto transaction = StartTransaction(TStartTransactionOptions());
  200. auto nodeId = RawClient_->MoveWithoutRetries(transaction->GetId(), sourcePath, destinationPath, options);
  201. transaction->Commit();
  202. return nodeId;
  203. });
  204. } else {
  205. throw;
  206. }
  207. }
  208. }
  209. TNodeId TClientBase::Link(
  210. const TYPath& targetPath,
  211. const TYPath& linkPath,
  212. const TLinkOptions& options)
  213. {
  214. return RequestWithRetry<TNodeId>(
  215. ClientRetryPolicy_->CreatePolicyForGenericRequest(),
  216. [this, &targetPath, &linkPath, &options] (TMutationId& mutationId) {
  217. return RawClient_->Link(mutationId, TransactionId_, targetPath, linkPath, options);
  218. });
  219. }
  220. void TClientBase::Concatenate(
  221. const TVector<TRichYPath>& sourcePaths,
  222. const TRichYPath& destinationPath,
  223. const TConcatenateOptions& options)
  224. {
  225. Y_ABORT_IF(options.MaxBatchSize_ <= 0);
  226. ITransactionPtr outerTransaction;
  227. IClientBase* outerClient;
  228. if (std::ssize(sourcePaths) > options.MaxBatchSize_) {
  229. outerTransaction = StartTransaction(TStartTransactionOptions());
  230. outerClient = outerTransaction.Get();
  231. } else {
  232. outerClient = this;
  233. }
  234. TVector<TRichYPath> batch;
  235. for (ssize_t i = 0; i < std::ssize(sourcePaths); i += options.MaxBatchSize_) {
  236. auto begin = sourcePaths.begin() + i;
  237. auto end = sourcePaths.begin() + std::min(i + options.MaxBatchSize_, std::ssize(sourcePaths));
  238. batch.assign(begin, end);
  239. bool firstBatch = (i == 0);
  240. RequestWithRetry<void>(
  241. ClientRetryPolicy_->CreatePolicyForGenericRequest(),
  242. [this, &batch, &destinationPath, &options, outerClient, firstBatch] (TMutationId /*mutationId*/) {
  243. auto transaction = outerClient->StartTransaction(TStartTransactionOptions());
  244. if (firstBatch && !options.Append_ && !batch.empty() && !transaction->Exists(destinationPath.Path_)) {
  245. auto typeNode = transaction->Get(transaction->CanonizeYPath(batch.front()).Path_ + "/@type");
  246. auto type = FromString<ENodeType>(typeNode.AsString());
  247. transaction->Create(destinationPath.Path_, type, TCreateOptions().IgnoreExisting(true));
  248. }
  249. TConcatenateOptions currentOptions = options;
  250. if (!firstBatch) {
  251. currentOptions.Append_ = true;
  252. }
  253. RawClient_->Concatenate(transaction->GetId(), batch, destinationPath, currentOptions);
  254. transaction->Commit();
  255. });
  256. }
  257. if (outerTransaction) {
  258. outerTransaction->Commit();
  259. }
  260. }
  261. TRichYPath TClientBase::CanonizeYPath(const TRichYPath& path)
  262. {
  263. return NRawClient::CanonizeYPath(RawClient_, path);
  264. }
  265. TVector<TTableColumnarStatistics> TClientBase::GetTableColumnarStatistics(
  266. const TVector<TRichYPath>& paths,
  267. const TGetTableColumnarStatisticsOptions& options)
  268. {
  269. return RequestWithRetry<TVector<TTableColumnarStatistics>>(
  270. ClientRetryPolicy_->CreatePolicyForGenericRequest(),
  271. [this, &paths, &options] (TMutationId /*mutationId*/) {
  272. return RawClient_->GetTableColumnarStatistics(TransactionId_, paths, options);
  273. });
  274. }
  275. TMultiTablePartitions TClientBase::GetTablePartitions(
  276. const TVector<TRichYPath>& paths,
  277. const TGetTablePartitionsOptions& options)
  278. {
  279. return RequestWithRetry<TMultiTablePartitions>(
  280. ClientRetryPolicy_->CreatePolicyForGenericRequest(),
  281. [this, &paths, &options] (TMutationId /*mutationId*/) {
  282. return RawClient_->GetTablePartitions(TransactionId_, paths, options);
  283. });
  284. }
  285. TMaybe<TYPath> TClientBase::GetFileFromCache(
  286. const TString& md5Signature,
  287. const TYPath& cachePath,
  288. const TGetFileFromCacheOptions& options)
  289. {
  290. return RequestWithRetry<TMaybe<TYPath>>(
  291. ClientRetryPolicy_->CreatePolicyForGenericRequest(),
  292. [this, &md5Signature, &cachePath, &options] (TMutationId /*mutationId*/) {
  293. return RawClient_->GetFileFromCache(TransactionId_, md5Signature, cachePath, options);
  294. });
  295. }
  296. TYPath TClientBase::PutFileToCache(
  297. const TYPath& filePath,
  298. const TString& md5Signature,
  299. const TYPath& cachePath,
  300. const TPutFileToCacheOptions& options)
  301. {
  302. return RequestWithRetry<TYPath>(
  303. ClientRetryPolicy_->CreatePolicyForGenericRequest(),
  304. [this, &filePath, &md5Signature, &cachePath, &options] (TMutationId /*mutationId*/) {
  305. return RawClient_->PutFileToCache(TransactionId_, filePath, md5Signature, cachePath, options);
  306. });
  307. }
  308. IFileReaderPtr TClientBase::CreateBlobTableReader(
  309. const TYPath& path,
  310. const TKey& key,
  311. const TBlobTableReaderOptions& options)
  312. {
  313. return new TBlobTableReader(
  314. path,
  315. key,
  316. RawClient_,
  317. ClientRetryPolicy_,
  318. GetTransactionPinger(),
  319. Context_,
  320. TransactionId_,
  321. options);
  322. }
  323. IFileReaderPtr TClientBase::CreateFileReader(
  324. const TRichYPath& path,
  325. const TFileReaderOptions& options)
  326. {
  327. return new TFileReader(
  328. CanonizeYPath(path),
  329. RawClient_,
  330. ClientRetryPolicy_,
  331. GetTransactionPinger(),
  332. Context_,
  333. TransactionId_,
  334. options);
  335. }
  336. IFileWriterPtr TClientBase::CreateFileWriter(
  337. const TRichYPath& path,
  338. const TFileWriterOptions& options)
  339. {
  340. auto realPath = CanonizeYPath(path);
  341. auto exists = RequestWithRetry<bool>(
  342. ClientRetryPolicy_->CreatePolicyForGenericRequest(),
  343. [this, &realPath] (TMutationId /*mutationId*/) {
  344. return RawClient_->Exists(TransactionId_, realPath.Path_);
  345. });
  346. if (!exists) {
  347. RequestWithRetry<void>(
  348. ClientRetryPolicy_->CreatePolicyForGenericRequest(),
  349. [this, &realPath] (TMutationId& mutationId) {
  350. RawClient_->Create(mutationId, TransactionId_, realPath.Path_, NT_FILE, TCreateOptions().IgnoreExisting(true));
  351. });
  352. }
  353. return new TFileWriter(realPath, RawClient_, ClientRetryPolicy_, GetTransactionPinger(), Context_, TransactionId_, options);
  354. }
  355. TTableWriterPtr<::google::protobuf::Message> TClientBase::CreateTableWriter(
  356. const TRichYPath& path, const ::google::protobuf::Descriptor& descriptor, const TTableWriterOptions& options)
  357. {
  358. const Message* prototype = google::protobuf::MessageFactory::generated_factory()->GetPrototype(&descriptor);
  359. return new TTableWriter<::google::protobuf::Message>(CreateProtoWriter(path, options, prototype));
  360. }
  361. TRawTableReaderPtr TClientBase::CreateRawReader(
  362. const TRichYPath& path,
  363. const TFormat& format,
  364. const TTableReaderOptions& options)
  365. {
  366. return CreateClientReader(path, format, options).Get();
  367. }
  368. TRawTableWriterPtr TClientBase::CreateRawWriter(
  369. const TRichYPath& path,
  370. const TFormat& format,
  371. const TTableWriterOptions& options)
  372. {
  373. return ::MakeIntrusive<TRetryfulWriter>(
  374. RawClient_,
  375. ClientRetryPolicy_,
  376. GetTransactionPinger(),
  377. Context_,
  378. TransactionId_,
  379. GetWriteTableCommand(Context_.Config->ApiVersion),
  380. format,
  381. CanonizeYPath(path),
  382. options).Get();
  383. }
  384. IOperationPtr TClientBase::DoMap(
  385. const TMapOperationSpec& spec,
  386. ::TIntrusivePtr<IStructuredJob> mapper,
  387. const TOperationOptions& options)
  388. {
  389. auto operation = ::MakeIntrusive<TOperation>(GetParentClientImpl());
  390. auto prepareOperation = [
  391. this_ = ::TIntrusivePtr(this),
  392. operation,
  393. spec,
  394. mapper,
  395. options
  396. ] () {
  397. ExecuteMap(
  398. operation,
  399. ::MakeIntrusive<TOperationPreparer>(this_->GetParentClientImpl(), this_->TransactionId_),
  400. spec,
  401. mapper,
  402. options);
  403. };
  404. return ProcessOperation(GetParentClientImpl(), std::move(prepareOperation), std::move(operation), options);
  405. }
  406. IOperationPtr TClientBase::RawMap(
  407. const TRawMapOperationSpec& spec,
  408. ::TIntrusivePtr<IRawJob> mapper,
  409. const TOperationOptions& options)
  410. {
  411. auto operation = ::MakeIntrusive<TOperation>(GetParentClientImpl());
  412. auto prepareOperation = [
  413. this_=::TIntrusivePtr(this),
  414. operation,
  415. spec,
  416. mapper,
  417. options
  418. ] () {
  419. ExecuteRawMap(
  420. operation,
  421. ::MakeIntrusive<TOperationPreparer>(this_->GetParentClientImpl(), this_->TransactionId_),
  422. spec,
  423. mapper,
  424. options);
  425. };
  426. return ProcessOperation(GetParentClientImpl(), std::move(prepareOperation), std::move(operation), options);
  427. }
  428. IOperationPtr TClientBase::DoReduce(
  429. const TReduceOperationSpec& spec,
  430. ::TIntrusivePtr<IStructuredJob> reducer,
  431. const TOperationOptions& options)
  432. {
  433. auto operation = ::MakeIntrusive<TOperation>(GetParentClientImpl());
  434. auto prepareOperation = [
  435. this_=::TIntrusivePtr(this),
  436. operation,
  437. spec,
  438. reducer,
  439. options
  440. ] () {
  441. ExecuteReduce(
  442. operation,
  443. ::MakeIntrusive<TOperationPreparer>(this_->GetParentClientImpl(), this_->TransactionId_),
  444. spec,
  445. reducer,
  446. options);
  447. };
  448. return ProcessOperation(GetParentClientImpl(), std::move(prepareOperation), std::move(operation), options);
  449. }
  450. IOperationPtr TClientBase::RawReduce(
  451. const TRawReduceOperationSpec& spec,
  452. ::TIntrusivePtr<IRawJob> reducer,
  453. const TOperationOptions& options)
  454. {
  455. auto operation = ::MakeIntrusive<TOperation>(GetParentClientImpl());
  456. auto prepareOperation = [
  457. this_=::TIntrusivePtr(this),
  458. operation,
  459. spec,
  460. reducer,
  461. options
  462. ] () {
  463. ExecuteRawReduce(
  464. operation,
  465. ::MakeIntrusive<TOperationPreparer>(this_->GetParentClientImpl(), this_->TransactionId_),
  466. spec,
  467. reducer,
  468. options);
  469. };
  470. return ProcessOperation(GetParentClientImpl(), std::move(prepareOperation), std::move(operation), options);
  471. }
  472. IOperationPtr TClientBase::DoJoinReduce(
  473. const TJoinReduceOperationSpec& spec,
  474. ::TIntrusivePtr<IStructuredJob> reducer,
  475. const TOperationOptions& options)
  476. {
  477. auto operation = ::MakeIntrusive<TOperation>(GetParentClientImpl());
  478. auto prepareOperation = [
  479. this_=::TIntrusivePtr(this),
  480. operation,
  481. spec,
  482. reducer,
  483. options
  484. ] () {
  485. ExecuteJoinReduce(
  486. operation,
  487. ::MakeIntrusive<TOperationPreparer>(this_->GetParentClientImpl(), this_->TransactionId_),
  488. spec,
  489. reducer,
  490. options);
  491. };
  492. return ProcessOperation(GetParentClientImpl(), std::move(prepareOperation), std::move(operation), options);
  493. }
  494. IOperationPtr TClientBase::RawJoinReduce(
  495. const TRawJoinReduceOperationSpec& spec,
  496. ::TIntrusivePtr<IRawJob> reducer,
  497. const TOperationOptions& options)
  498. {
  499. auto operation = ::MakeIntrusive<TOperation>(GetParentClientImpl());
  500. auto prepareOperation = [
  501. this_=::TIntrusivePtr(this),
  502. operation,
  503. spec,
  504. reducer,
  505. options
  506. ] () {
  507. ExecuteRawJoinReduce(
  508. operation,
  509. ::MakeIntrusive<TOperationPreparer>(this_->GetParentClientImpl(), this_->TransactionId_),
  510. spec,
  511. reducer,
  512. options);
  513. };
  514. return ProcessOperation(GetParentClientImpl(), std::move(prepareOperation), std::move(operation), options);
  515. }
  516. IOperationPtr TClientBase::DoMapReduce(
  517. const TMapReduceOperationSpec& spec,
  518. ::TIntrusivePtr<IStructuredJob> mapper,
  519. ::TIntrusivePtr<IStructuredJob> reduceCombiner,
  520. ::TIntrusivePtr<IStructuredJob> reducer,
  521. const TOperationOptions& options)
  522. {
  523. auto operation = ::MakeIntrusive<TOperation>(GetParentClientImpl());
  524. auto prepareOperation = [
  525. this_=::TIntrusivePtr(this),
  526. operation,
  527. spec,
  528. mapper,
  529. reduceCombiner,
  530. reducer,
  531. options
  532. ] () {
  533. ExecuteMapReduce(
  534. operation,
  535. ::MakeIntrusive<TOperationPreparer>(this_->GetParentClientImpl(), this_->TransactionId_),
  536. spec,
  537. mapper,
  538. reduceCombiner,
  539. reducer,
  540. options);
  541. };
  542. return ProcessOperation(GetParentClientImpl(), std::move(prepareOperation), std::move(operation), options);
  543. }
  544. IOperationPtr TClientBase::RawMapReduce(
  545. const TRawMapReduceOperationSpec& spec,
  546. ::TIntrusivePtr<IRawJob> mapper,
  547. ::TIntrusivePtr<IRawJob> reduceCombiner,
  548. ::TIntrusivePtr<IRawJob> reducer,
  549. const TOperationOptions& options)
  550. {
  551. auto operation = ::MakeIntrusive<TOperation>(GetParentClientImpl());
  552. auto prepareOperation = [
  553. this_=::TIntrusivePtr(this),
  554. operation,
  555. spec,
  556. mapper,
  557. reduceCombiner,
  558. reducer,
  559. options
  560. ] () {
  561. ExecuteRawMapReduce(
  562. operation,
  563. ::MakeIntrusive<TOperationPreparer>(this_->GetParentClientImpl(), this_->TransactionId_),
  564. spec,
  565. mapper,
  566. reduceCombiner,
  567. reducer,
  568. options);
  569. };
  570. return ProcessOperation(GetParentClientImpl(), std::move(prepareOperation), std::move(operation), options);
  571. }
  572. IOperationPtr TClientBase::Sort(
  573. const TSortOperationSpec& spec,
  574. const TOperationOptions& options)
  575. {
  576. auto operation = ::MakeIntrusive<TOperation>(GetParentClientImpl());
  577. auto prepareOperation = [
  578. this_ = ::TIntrusivePtr(this),
  579. operation,
  580. spec,
  581. options
  582. ] () {
  583. ExecuteSort(
  584. operation,
  585. ::MakeIntrusive<TOperationPreparer>(this_->GetParentClientImpl(), this_->TransactionId_),
  586. spec,
  587. options);
  588. };
  589. return ProcessOperation(GetParentClientImpl(), std::move(prepareOperation), std::move(operation), options);
  590. }
  591. IOperationPtr TClientBase::Merge(
  592. const TMergeOperationSpec& spec,
  593. const TOperationOptions& options)
  594. {
  595. auto operation = ::MakeIntrusive<TOperation>(GetParentClientImpl());
  596. auto prepareOperation = [
  597. this_ = ::TIntrusivePtr(this),
  598. operation,
  599. spec,
  600. options
  601. ] () {
  602. ExecuteMerge(
  603. operation,
  604. ::MakeIntrusive<TOperationPreparer>(this_->GetParentClientImpl(), this_->TransactionId_),
  605. spec,
  606. options);
  607. };
  608. return ProcessOperation(GetParentClientImpl(), std::move(prepareOperation), std::move(operation), options);
  609. }
  610. IOperationPtr TClientBase::Erase(
  611. const TEraseOperationSpec& spec,
  612. const TOperationOptions& options)
  613. {
  614. auto operation = ::MakeIntrusive<TOperation>(GetParentClientImpl());
  615. auto prepareOperation = [
  616. this_ = ::TIntrusivePtr(this),
  617. operation,
  618. spec,
  619. options
  620. ] () {
  621. ExecuteErase(
  622. operation,
  623. ::MakeIntrusive<TOperationPreparer>(this_->GetParentClientImpl(), this_->TransactionId_),
  624. spec,
  625. options);
  626. };
  627. return ProcessOperation(GetParentClientImpl(), std::move(prepareOperation), std::move(operation), options);
  628. }
  629. IOperationPtr TClientBase::RemoteCopy(
  630. const TRemoteCopyOperationSpec& spec,
  631. const TOperationOptions& options)
  632. {
  633. auto operation = ::MakeIntrusive<TOperation>(GetParentClientImpl());
  634. auto prepareOperation = [
  635. this_ = ::TIntrusivePtr(this),
  636. operation,
  637. spec,
  638. options
  639. ] () {
  640. ExecuteRemoteCopy(
  641. operation,
  642. ::MakeIntrusive<TOperationPreparer>(this_->GetParentClientImpl(), this_->TransactionId_),
  643. spec,
  644. options);
  645. };
  646. return ProcessOperation(GetParentClientImpl(), std::move(prepareOperation), std::move(operation), options);
  647. }
  648. IOperationPtr TClientBase::RunVanilla(
  649. const TVanillaOperationSpec& spec,
  650. const TOperationOptions& options)
  651. {
  652. auto operation = ::MakeIntrusive<TOperation>(GetParentClientImpl());
  653. auto prepareOperation = [
  654. this_ = ::TIntrusivePtr(this),
  655. operation,
  656. spec,
  657. options
  658. ] () {
  659. ExecuteVanilla(
  660. operation,
  661. ::MakeIntrusive<TOperationPreparer>(this_->GetParentClientImpl(), this_->TransactionId_),
  662. spec,
  663. options);
  664. };
  665. return ProcessOperation(GetParentClientImpl(), std::move(prepareOperation), std::move(operation), options);
  666. }
  667. IOperationPtr TClientBase::AttachOperation(const TOperationId& operationId)
  668. {
  669. auto operation = ::MakeIntrusive<TOperation>(operationId, GetParentClientImpl());
  670. operation->GetBriefState(); // check that operation exists
  671. return operation;
  672. }
  673. EOperationBriefState TClientBase::CheckOperation(const TOperationId& operationId)
  674. {
  675. return NYT::NDetail::CheckOperation(RawClient_, ClientRetryPolicy_, operationId);
  676. }
  677. void TClientBase::AbortOperation(const TOperationId& operationId)
  678. {
  679. RequestWithRetry<void>(
  680. ClientRetryPolicy_->CreatePolicyForGenericRequest(),
  681. [this, &operationId] (TMutationId& mutationId) {
  682. RawClient_->AbortOperation(mutationId, operationId);
  683. });
  684. }
  685. void TClientBase::CompleteOperation(const TOperationId& operationId)
  686. {
  687. RequestWithRetry<void>(
  688. ClientRetryPolicy_->CreatePolicyForGenericRequest(),
  689. [this, &operationId] (TMutationId& mutationId) {
  690. RawClient_->CompleteOperation(mutationId, operationId);
  691. });
  692. }
  693. void TClientBase::WaitForOperation(const TOperationId& operationId)
  694. {
  695. NYT::NDetail::WaitForOperation(ClientRetryPolicy_, RawClient_, Context_, operationId);
  696. }
  697. void TClientBase::AlterTable(
  698. const TYPath& path,
  699. const TAlterTableOptions& options)
  700. {
  701. RequestWithRetry<void>(
  702. ClientRetryPolicy_->CreatePolicyForGenericRequest(),
  703. [this, &path, &options] (TMutationId& mutationId) {
  704. RawClient_->AlterTable(mutationId, TransactionId_, path, options);
  705. });
  706. }
  707. ::TIntrusivePtr<TClientReader> TClientBase::CreateClientReader(
  708. const TRichYPath& path,
  709. const TFormat& format,
  710. const TTableReaderOptions& options,
  711. bool useFormatFromTableAttributes)
  712. {
  713. return ::MakeIntrusive<TClientReader>(
  714. CanonizeYPath(path),
  715. RawClient_,
  716. ClientRetryPolicy_,
  717. GetTransactionPinger(),
  718. Context_,
  719. TransactionId_,
  720. format,
  721. options,
  722. useFormatFromTableAttributes);
  723. }
  724. THolder<TClientWriter> TClientBase::CreateClientWriter(
  725. const TRichYPath& path,
  726. const TFormat& format,
  727. const TTableWriterOptions& options)
  728. {
  729. auto realPath = CanonizeYPath(path);
  730. auto exists = RequestWithRetry<bool>(
  731. ClientRetryPolicy_->CreatePolicyForGenericRequest(),
  732. [this, &realPath] (TMutationId /*mutationId*/) {
  733. return RawClient_->Exists(TransactionId_, realPath.Path_);
  734. });
  735. if (!exists) {
  736. RequestWithRetry<void>(
  737. ClientRetryPolicy_->CreatePolicyForGenericRequest(),
  738. [this, &realPath] (TMutationId& mutataionId) {
  739. RawClient_->Create(mutataionId, TransactionId_, realPath.Path_, NT_TABLE, TCreateOptions().IgnoreExisting(true));
  740. });
  741. }
  742. return MakeHolder<TClientWriter>(
  743. realPath,
  744. RawClient_,
  745. ClientRetryPolicy_,
  746. GetTransactionPinger(),
  747. Context_,
  748. TransactionId_,
  749. format,
  750. options
  751. );
  752. }
  753. ::TIntrusivePtr<INodeReaderImpl> TClientBase::CreateNodeReader(
  754. const TRichYPath& path, const TTableReaderOptions& options)
  755. {
  756. auto format = TFormat::YsonBinary();
  757. ApplyFormatHints<TNode>(&format, options.FormatHints_);
  758. // Skiff is disabled here because of large header problem (see https://st.yandex-team.ru/YT-6926).
  759. // Revert this code to r3614168 when it is fixed.
  760. return new TNodeTableReader(
  761. CreateClientReader(path, format, options));
  762. }
  763. ::TIntrusivePtr<IYaMRReaderImpl> TClientBase::CreateYaMRReader(
  764. const TRichYPath& path, const TTableReaderOptions& options)
  765. {
  766. return new TYaMRTableReader(
  767. CreateClientReader(path, TFormat::YaMRLenval(), options, /* useFormatFromTableAttributes = */ true));
  768. }
  769. ::TIntrusivePtr<IProtoReaderImpl> TClientBase::CreateProtoReader(
  770. const TRichYPath& path,
  771. const TTableReaderOptions& options,
  772. const Message* prototype)
  773. {
  774. TVector<const ::google::protobuf::Descriptor*> descriptors;
  775. descriptors.push_back(prototype->GetDescriptor());
  776. if (Context_.Config->UseClientProtobuf) {
  777. return new TProtoTableReader(
  778. CreateClientReader(path, TFormat::YsonBinary(), options),
  779. std::move(descriptors));
  780. } else {
  781. auto format = TFormat::Protobuf({prototype->GetDescriptor()}, Context_.Config->ProtobufFormatWithDescriptors);
  782. return new TLenvalProtoTableReader(
  783. CreateClientReader(path, format, options),
  784. std::move(descriptors));
  785. }
  786. }
  787. ::TIntrusivePtr<ISkiffRowReaderImpl> TClientBase::CreateSkiffRowReader(
  788. const TRichYPath& path,
  789. const TTableReaderOptions& options,
  790. const ISkiffRowSkipperPtr& skipper,
  791. const NSkiff::TSkiffSchemaPtr& schema)
  792. {
  793. auto skiffOptions = TCreateSkiffSchemaOptions().HasRangeIndex(true);
  794. auto resultSchema = NYT::NDetail::CreateSkiffSchema(TVector{schema}, skiffOptions);
  795. return new TSkiffRowTableReader(
  796. CreateClientReader(path, NYT::NDetail::CreateSkiffFormat(resultSchema), options),
  797. resultSchema,
  798. {skipper},
  799. std::move(skiffOptions));
  800. }
  801. ::TIntrusivePtr<INodeWriterImpl> TClientBase::CreateNodeWriter(
  802. const TRichYPath& path, const TTableWriterOptions& options)
  803. {
  804. auto format = TFormat::YsonBinary();
  805. ApplyFormatHints<TNode>(&format, options.FormatHints_);
  806. return new TNodeTableWriter(
  807. CreateClientWriter(path, format, options));
  808. }
  809. ::TIntrusivePtr<IYaMRWriterImpl> TClientBase::CreateYaMRWriter(
  810. const TRichYPath& path, const TTableWriterOptions& options)
  811. {
  812. auto format = TFormat::YaMRLenval();
  813. ApplyFormatHints<TYaMRRow>(&format, options.FormatHints_);
  814. return new TYaMRTableWriter(
  815. CreateClientWriter(path, format, options));
  816. }
  817. ::TIntrusivePtr<IProtoWriterImpl> TClientBase::CreateProtoWriter(
  818. const TRichYPath& path,
  819. const TTableWriterOptions& options,
  820. const Message* prototype)
  821. {
  822. TVector<const ::google::protobuf::Descriptor*> descriptors;
  823. descriptors.push_back(prototype->GetDescriptor());
  824. auto pathWithSchema = path;
  825. if (options.InferSchema_.GetOrElse(Context_.Config->InferTableSchema) && !path.Schema_) {
  826. pathWithSchema.Schema(CreateTableSchema(*prototype->GetDescriptor()));
  827. }
  828. if (Context_.Config->UseClientProtobuf) {
  829. auto format = TFormat::YsonBinary();
  830. ApplyFormatHints<TNode>(&format, options.FormatHints_);
  831. return new TProtoTableWriter(
  832. CreateClientWriter(pathWithSchema, format, options),
  833. std::move(descriptors));
  834. } else {
  835. auto format = TFormat::Protobuf({prototype->GetDescriptor()}, Context_.Config->ProtobufFormatWithDescriptors);
  836. ApplyFormatHints<::google::protobuf::Message>(&format, options.FormatHints_);
  837. return new TLenvalProtoTableWriter(
  838. CreateClientWriter(pathWithSchema, format, options),
  839. std::move(descriptors));
  840. }
  841. }
  842. TBatchRequestPtr TClientBase::CreateBatchRequest()
  843. {
  844. return MakeIntrusive<TBatchRequest>(TransactionId_, GetParentClientImpl());
  845. }
  846. IRawClientPtr TClientBase::GetRawClient() const
  847. {
  848. return RawClient_;
  849. }
  850. const TClientContext& TClientBase::GetContext() const
  851. {
  852. return Context_;
  853. }
  854. const IClientRetryPolicyPtr& TClientBase::GetRetryPolicy() const
  855. {
  856. return ClientRetryPolicy_;
  857. }
  858. ////////////////////////////////////////////////////////////////////////////////
  859. TTransaction::TTransaction(
  860. const IRawClientPtr& rawClient,
  861. TClientPtr parentClient,
  862. const TClientContext& context,
  863. const TTransactionId& parentTransactionId,
  864. const TStartTransactionOptions& options)
  865. : TClientBase(rawClient, context, parentTransactionId, parentClient->GetRetryPolicy())
  866. , TransactionPinger_(parentClient->GetTransactionPinger())
  867. , PingableTx_(
  868. std::make_unique<TPingableTransaction>(
  869. rawClient,
  870. parentClient->GetRetryPolicy(),
  871. context,
  872. parentTransactionId,
  873. TransactionPinger_->GetChildTxPinger(),
  874. options))
  875. , ParentClient_(parentClient)
  876. {
  877. TransactionId_ = PingableTx_->GetId();
  878. }
  879. TTransaction::TTransaction(
  880. const IRawClientPtr& rawClient,
  881. TClientPtr parentClient,
  882. const TClientContext& context,
  883. const TTransactionId& transactionId,
  884. const TAttachTransactionOptions& options)
  885. : TClientBase(rawClient, context, transactionId, parentClient->GetRetryPolicy())
  886. , TransactionPinger_(parentClient->GetTransactionPinger())
  887. , PingableTx_(
  888. new TPingableTransaction(
  889. rawClient,
  890. parentClient->GetRetryPolicy(),
  891. context,
  892. transactionId,
  893. parentClient->GetTransactionPinger()->GetChildTxPinger(),
  894. options))
  895. , ParentClient_(parentClient)
  896. { }
  897. const TTransactionId& TTransaction::GetId() const
  898. {
  899. return TransactionId_;
  900. }
  901. ILockPtr TTransaction::Lock(
  902. const TYPath& path,
  903. ELockMode mode,
  904. const TLockOptions& options)
  905. {
  906. auto lockId = RequestWithRetry<TLockId>(
  907. ClientRetryPolicy_->CreatePolicyForGenericRequest(),
  908. [this, &path, &mode, &options] (TMutationId& mutationId) {
  909. return RawClient_->Lock(mutationId, TransactionId_, path, mode, options);
  910. });
  911. return ::MakeIntrusive<TLock>(lockId, GetParentClientImpl(), options.Waitable_);
  912. }
  913. void TTransaction::Unlock(
  914. const TYPath& path,
  915. const TUnlockOptions& options)
  916. {
  917. RequestWithRetry<void>(
  918. ClientRetryPolicy_->CreatePolicyForGenericRequest(),
  919. [this, &path, &options] (TMutationId& mutationId) {
  920. RawClient_->Unlock(mutationId, TransactionId_, path, options);
  921. });
  922. }
  923. void TTransaction::Commit()
  924. {
  925. PingableTx_->Commit();
  926. }
  927. void TTransaction::Abort()
  928. {
  929. PingableTx_->Abort();
  930. }
  931. void TTransaction::Ping()
  932. {
  933. RequestWithRetry<void>(
  934. ClientRetryPolicy_->CreatePolicyForGenericRequest(),
  935. [this] (TMutationId /*mutationId*/) {
  936. RawClient_->PingTransaction(TransactionId_);
  937. });
  938. }
  939. void TTransaction::Detach()
  940. {
  941. PingableTx_->Detach();
  942. }
  943. ITransactionPingerPtr TTransaction::GetTransactionPinger()
  944. {
  945. return TransactionPinger_;
  946. }
  947. IClientPtr TTransaction::GetParentClient(bool ignoreGlobalTx)
  948. {
  949. return GetParentClientImpl()->GetParentClient(ignoreGlobalTx);
  950. }
  951. TClientPtr TTransaction::GetParentClientImpl()
  952. {
  953. return ParentClient_;
  954. }
  955. ////////////////////////////////////////////////////////////////////////////////
  956. TClient::TClient(
  957. IRawClientPtr rawClient,
  958. const TClientContext& context,
  959. const TTransactionId& globalId,
  960. IClientRetryPolicyPtr retryPolicy)
  961. : TClientBase(std::move(rawClient), context, globalId, retryPolicy)
  962. , TransactionPinger_(nullptr)
  963. { }
  964. TClient::~TClient() = default;
  965. ITransactionPtr TClient::AttachTransaction(
  966. const TTransactionId& transactionId,
  967. const TAttachTransactionOptions& options)
  968. {
  969. CheckShutdown();
  970. return MakeIntrusive<TTransaction>(RawClient_, this, Context_, transactionId, options);
  971. }
  972. void TClient::MountTable(
  973. const TYPath& path,
  974. const TMountTableOptions& options)
  975. {
  976. CheckShutdown();
  977. RequestWithRetry<void>(
  978. ClientRetryPolicy_->CreatePolicyForGenericRequest(),
  979. [this, &path, &options] (TMutationId& mutationId) {
  980. RawClient_->MountTable(mutationId, path, options);
  981. });
  982. }
  983. void TClient::UnmountTable(
  984. const TYPath& path,
  985. const TUnmountTableOptions& options)
  986. {
  987. CheckShutdown();
  988. RequestWithRetry<void>(
  989. ClientRetryPolicy_->CreatePolicyForGenericRequest(),
  990. [this, &path, &options] (TMutationId& mutationId) {
  991. RawClient_->UnmountTable(mutationId, path, options);
  992. });
  993. }
  994. void TClient::RemountTable(
  995. const TYPath& path,
  996. const TRemountTableOptions& options)
  997. {
  998. CheckShutdown();
  999. RequestWithRetry<void>(
  1000. ClientRetryPolicy_->CreatePolicyForGenericRequest(),
  1001. [this, &path, &options] (TMutationId& mutationId) {
  1002. RawClient_->RemountTable(mutationId, path, options);
  1003. });
  1004. }
  1005. void TClient::FreezeTable(
  1006. const TYPath& path,
  1007. const TFreezeTableOptions& options)
  1008. {
  1009. CheckShutdown();
  1010. RequestWithRetry<void>(
  1011. ClientRetryPolicy_->CreatePolicyForGenericRequest(),
  1012. [this, &path, &options] (TMutationId /*mutationId*/) {
  1013. RawClient_->FreezeTable(path, options);
  1014. });
  1015. }
  1016. void TClient::UnfreezeTable(
  1017. const TYPath& path,
  1018. const TUnfreezeTableOptions& options)
  1019. {
  1020. CheckShutdown();
  1021. RequestWithRetry<void>(
  1022. ClientRetryPolicy_->CreatePolicyForGenericRequest(),
  1023. [this, &path, &options] (TMutationId /*mutationId*/) {
  1024. RawClient_->UnfreezeTable(path, options);
  1025. });
  1026. }
  1027. void TClient::ReshardTable(
  1028. const TYPath& path,
  1029. const TVector<TKey>& keys,
  1030. const TReshardTableOptions& options)
  1031. {
  1032. CheckShutdown();
  1033. RequestWithRetry<void>(
  1034. ClientRetryPolicy_->CreatePolicyForGenericRequest(),
  1035. [this, &path, &keys, &options] (TMutationId& mutationId) {
  1036. RawClient_->ReshardTableByPivotKeys(mutationId, path, keys, options);
  1037. });
  1038. }
  1039. void TClient::ReshardTable(
  1040. const TYPath& path,
  1041. i64 tabletCount,
  1042. const TReshardTableOptions& options)
  1043. {
  1044. CheckShutdown();
  1045. RequestWithRetry<void>(
  1046. ClientRetryPolicy_->CreatePolicyForGenericRequest(),
  1047. [this, &path, tabletCount, &options] (TMutationId& mutationId) {
  1048. RawClient_->ReshardTableByTabletCount(mutationId, path, tabletCount, options);
  1049. });
  1050. }
  1051. void TClient::InsertRows(
  1052. const TYPath& path,
  1053. const TNode::TListType& rows,
  1054. const TInsertRowsOptions& options)
  1055. {
  1056. CheckShutdown();
  1057. RequestWithRetry<void>(
  1058. ClientRetryPolicy_->CreatePolicyForGenericRequest(),
  1059. [this, &path, &rows, &options] (TMutationId /*mutationId*/) {
  1060. NRawClient::InsertRows(Context_, path, rows, options);
  1061. });
  1062. }
  1063. void TClient::DeleteRows(
  1064. const TYPath& path,
  1065. const TNode::TListType& keys,
  1066. const TDeleteRowsOptions& options)
  1067. {
  1068. CheckShutdown();
  1069. RequestWithRetry<void>(
  1070. ClientRetryPolicy_->CreatePolicyForGenericRequest(),
  1071. [this, &path, &keys, &options] (TMutationId /*mutationId*/) {
  1072. NRawClient::DeleteRows(Context_, path, keys, options);
  1073. });
  1074. }
  1075. void TClient::TrimRows(
  1076. const TYPath& path,
  1077. i64 tabletIndex,
  1078. i64 rowCount,
  1079. const TTrimRowsOptions& options)
  1080. {
  1081. CheckShutdown();
  1082. RequestWithRetry<void>(
  1083. ClientRetryPolicy_->CreatePolicyForGenericRequest(),
  1084. [this, &path, tabletIndex, rowCount, &options] (TMutationId /*mutationId*/) {
  1085. RawClient_->TrimRows(path, tabletIndex, rowCount, options);
  1086. });
  1087. }
  1088. TNode::TListType TClient::LookupRows(
  1089. const TYPath& path,
  1090. const TNode::TListType& keys,
  1091. const TLookupRowsOptions& options)
  1092. {
  1093. CheckShutdown();
  1094. return RequestWithRetry<TNode::TListType>(
  1095. ClientRetryPolicy_->CreatePolicyForGenericRequest(),
  1096. [this, &path, &keys, &options] (TMutationId /*mutationId*/) {
  1097. return NRawClient::LookupRows(Context_, path, keys, options);
  1098. });
  1099. }
  1100. TNode::TListType TClient::SelectRows(
  1101. const TString& query,
  1102. const TSelectRowsOptions& options)
  1103. {
  1104. CheckShutdown();
  1105. return RequestWithRetry<TNode::TListType>(
  1106. ClientRetryPolicy_->CreatePolicyForGenericRequest(),
  1107. [this, &query, &options] (TMutationId /*mutationId*/) {
  1108. return RawClient_->SelectRows(query, options);
  1109. });
  1110. }
  1111. void TClient::AlterTableReplica(const TReplicaId& replicaId, const TAlterTableReplicaOptions& options)
  1112. {
  1113. CheckShutdown();
  1114. RequestWithRetry<void>(
  1115. ClientRetryPolicy_->CreatePolicyForGenericRequest(),
  1116. [this, &replicaId, &options] (TMutationId& mutationId) {
  1117. RawClient_->AlterTableReplica(mutationId, replicaId, options);
  1118. });
  1119. }
  1120. ui64 TClient::GenerateTimestamp()
  1121. {
  1122. CheckShutdown();
  1123. return RequestWithRetry<ui64>(
  1124. ClientRetryPolicy_->CreatePolicyForGenericRequest(),
  1125. [this] (TMutationId /*mutationId*/) {
  1126. return RawClient_->GenerateTimestamp();
  1127. });
  1128. }
  1129. TAuthorizationInfo TClient::WhoAmI()
  1130. {
  1131. CheckShutdown();
  1132. return RequestWithRetry<TAuthorizationInfo>(
  1133. ClientRetryPolicy_->CreatePolicyForGenericRequest(),
  1134. [this] (TMutationId /*mutationId*/) {
  1135. return NRawClient::WhoAmI(Context_);
  1136. });
  1137. }
  1138. TOperationAttributes TClient::GetOperation(
  1139. const TOperationId& operationId,
  1140. const TGetOperationOptions& options)
  1141. {
  1142. CheckShutdown();
  1143. return RequestWithRetry<TOperationAttributes>(
  1144. ClientRetryPolicy_->CreatePolicyForGenericRequest(),
  1145. [this, &operationId, &options] (TMutationId /*mutationId*/) {
  1146. return RawClient_->GetOperation(operationId, options);
  1147. });
  1148. }
  1149. TOperationAttributes TClient::GetOperation(
  1150. const TString& alias,
  1151. const TGetOperationOptions& options)
  1152. {
  1153. CheckShutdown();
  1154. return RequestWithRetry<TOperationAttributes>(
  1155. ClientRetryPolicy_->CreatePolicyForGenericRequest(),
  1156. [this, &alias, &options] (TMutationId /*mutationId*/) {
  1157. return RawClient_->GetOperation(alias, options);
  1158. });
  1159. }
  1160. TListOperationsResult TClient::ListOperations(const TListOperationsOptions& options)
  1161. {
  1162. CheckShutdown();
  1163. return RequestWithRetry<TListOperationsResult>(
  1164. ClientRetryPolicy_->CreatePolicyForGenericRequest(),
  1165. [this, &options] (TMutationId /*mutationId*/) {
  1166. return RawClient_->ListOperations(options);
  1167. });
  1168. }
  1169. void TClient::UpdateOperationParameters(
  1170. const TOperationId& operationId,
  1171. const TUpdateOperationParametersOptions& options)
  1172. {
  1173. CheckShutdown();
  1174. RequestWithRetry<void>(
  1175. ClientRetryPolicy_->CreatePolicyForGenericRequest(),
  1176. [this, &operationId, &options] (TMutationId /*mutationId*/) {
  1177. RawClient_->UpdateOperationParameters(operationId, options);
  1178. });
  1179. }
  1180. TJobAttributes TClient::GetJob(
  1181. const TOperationId& operationId,
  1182. const TJobId& jobId,
  1183. const TGetJobOptions& options)
  1184. {
  1185. CheckShutdown();
  1186. auto result = RequestWithRetry<NYson::TYsonString>(
  1187. ClientRetryPolicy_->CreatePolicyForGenericRequest(),
  1188. [this, &operationId, &jobId, &options] (TMutationId /*mutationId*/) {
  1189. return RawClient_->GetJob(operationId, jobId, options);
  1190. });
  1191. return NRawClient::ParseJobAttributes(NodeFromYsonString(result.AsStringBuf()));
  1192. }
  1193. TListJobsResult TClient::ListJobs(
  1194. const TOperationId& operationId,
  1195. const TListJobsOptions& options)
  1196. {
  1197. CheckShutdown();
  1198. return RequestWithRetry<TListJobsResult>(
  1199. ClientRetryPolicy_->CreatePolicyForGenericRequest(),
  1200. [this, &operationId, &options] (TMutationId /*mutationId*/) {
  1201. return RawClient_->ListJobs(operationId, options);
  1202. });
  1203. }
  1204. IFileReaderPtr TClient::GetJobInput(
  1205. const TJobId& jobId,
  1206. const TGetJobInputOptions& options)
  1207. {
  1208. CheckShutdown();
  1209. return RawClient_->GetJobInput(jobId, options);
  1210. }
  1211. IFileReaderPtr TClient::GetJobFailContext(
  1212. const TOperationId& operationId,
  1213. const TJobId& jobId,
  1214. const TGetJobFailContextOptions& options)
  1215. {
  1216. CheckShutdown();
  1217. return RawClient_->GetJobFailContext(operationId, jobId, options);
  1218. }
  1219. IFileReaderPtr TClient::GetJobStderr(
  1220. const TOperationId& operationId,
  1221. const TJobId& jobId,
  1222. const TGetJobStderrOptions& options)
  1223. {
  1224. CheckShutdown();
  1225. return RawClient_->GetJobStderr(operationId, jobId, options);
  1226. }
  1227. std::vector<TJobTraceEvent> TClient::GetJobTrace(
  1228. const TOperationId& operationId,
  1229. const TGetJobTraceOptions& options)
  1230. {
  1231. CheckShutdown();
  1232. return RequestWithRetry<std::vector<TJobTraceEvent>>(
  1233. ClientRetryPolicy_->CreatePolicyForGenericRequest(),
  1234. [this, &operationId, &options] (TMutationId /*mutationId*/) {
  1235. return RawClient_->GetJobTrace(operationId, options);
  1236. });
  1237. }
  1238. TNode::TListType TClient::SkyShareTable(
  1239. const std::vector<TYPath>& tablePaths,
  1240. const TSkyShareTableOptions& options)
  1241. {
  1242. CheckShutdown();
  1243. // As documented at https://wiki.yandex-team.ru/yt/userdoc/blob_tables/#shag3.sozdajomrazdachu
  1244. // first request returns HTTP status code 202 (Accepted). And we need retrying until we have 200 (OK).
  1245. NHttpClient::IHttpResponsePtr response;
  1246. do {
  1247. response = RequestWithRetry<NHttpClient::IHttpResponsePtr>(
  1248. ClientRetryPolicy_->CreatePolicyForGenericRequest(),
  1249. [this, &tablePaths, &options] (TMutationId /*mutationId*/) {
  1250. return NRawClient::SkyShareTable(Context_, tablePaths, options);
  1251. });
  1252. TWaitProxy::Get()->Sleep(TDuration::Seconds(5));
  1253. } while (response->GetStatusCode() != 200);
  1254. if (options.KeyColumns_) {
  1255. return NodeFromJsonString(response->GetResponse())["torrents"].AsList();
  1256. } else {
  1257. TNode torrent;
  1258. torrent["key"] = TNode::CreateList();
  1259. torrent["rbtorrent"] = response->GetResponse();
  1260. return TNode::TListType{torrent};
  1261. }
  1262. }
  1263. TCheckPermissionResponse TClient::CheckPermission(
  1264. const TString& user,
  1265. EPermission permission,
  1266. const TYPath& path,
  1267. const TCheckPermissionOptions& options)
  1268. {
  1269. CheckShutdown();
  1270. return RequestWithRetry<TCheckPermissionResponse>(
  1271. ClientRetryPolicy_->CreatePolicyForGenericRequest(),
  1272. [this, &user, &permission, &path, &options] (TMutationId /*mutationId*/) {
  1273. return RawClient_->CheckPermission(user, permission, path, options);
  1274. });
  1275. }
  1276. TVector<TTabletInfo> TClient::GetTabletInfos(
  1277. const TYPath& path,
  1278. const TVector<int>& tabletIndexes,
  1279. const TGetTabletInfosOptions& options)
  1280. {
  1281. CheckShutdown();
  1282. return RequestWithRetry<TVector<TTabletInfo>>(
  1283. ClientRetryPolicy_->CreatePolicyForGenericRequest(),
  1284. [this, &path, &tabletIndexes, &options] (TMutationId /*mutationId*/) {
  1285. return RawClient_->GetTabletInfos(path, tabletIndexes, options);
  1286. });
  1287. }
  1288. void TClient::SuspendOperation(
  1289. const TOperationId& operationId,
  1290. const TSuspendOperationOptions& options)
  1291. {
  1292. CheckShutdown();
  1293. RequestWithRetry<void>(
  1294. ClientRetryPolicy_->CreatePolicyForGenericRequest(),
  1295. [this, &operationId, &options] (TMutationId& mutationId) {
  1296. RawClient_->SuspendOperation(mutationId, operationId, options);
  1297. });
  1298. }
  1299. void TClient::ResumeOperation(
  1300. const TOperationId& operationId,
  1301. const TResumeOperationOptions& options)
  1302. {
  1303. CheckShutdown();
  1304. RequestWithRetry<void>(
  1305. ClientRetryPolicy_->CreatePolicyForGenericRequest(),
  1306. [this, &operationId, &options] (TMutationId& mutationId) {
  1307. RawClient_->ResumeOperation(mutationId, operationId, options);
  1308. });
  1309. }
  1310. TYtPoller& TClient::GetYtPoller()
  1311. {
  1312. auto g = Guard(Lock_);
  1313. if (!YtPoller_) {
  1314. CheckShutdown();
  1315. // We don't use current client and create new client because YtPoller_ might use
  1316. // this client during current client shutdown.
  1317. // That might lead to incrementing of current client refcount and double delete of current client object.
  1318. YtPoller_ = std::make_unique<TYtPoller>(RawClient_->Clone(), Context_.Config, ClientRetryPolicy_);
  1319. }
  1320. return *YtPoller_;
  1321. }
  1322. void TClient::Shutdown()
  1323. {
  1324. auto g = Guard(Lock_);
  1325. if (!Shutdown_.exchange(true) && YtPoller_) {
  1326. YtPoller_->Stop();
  1327. }
  1328. }
  1329. ITransactionPingerPtr TClient::GetTransactionPinger()
  1330. {
  1331. auto g = Guard(Lock_);
  1332. if (!TransactionPinger_) {
  1333. TransactionPinger_ = CreateTransactionPinger(Context_.Config);
  1334. }
  1335. return TransactionPinger_;
  1336. }
  1337. TClientPtr TClient::GetParentClientImpl()
  1338. {
  1339. return this;
  1340. }
  1341. IClientPtr TClient::GetParentClient(bool ignoreGlobalTx)
  1342. {
  1343. if (!TransactionId_.IsEmpty() && ignoreGlobalTx) {
  1344. return MakeIntrusive<TClient>(
  1345. RawClient_,
  1346. Context_,
  1347. TTransactionId(),
  1348. ClientRetryPolicy_
  1349. );
  1350. } else {
  1351. return this;
  1352. }
  1353. }
  1354. void TClient::CheckShutdown() const
  1355. {
  1356. if (Shutdown_) {
  1357. ythrow TApiUsageError() << "Call client's methods after shutdown";
  1358. }
  1359. }
  1360. TClientContext CreateClientContext(
  1361. const TString& serverName,
  1362. const TCreateClientOptions& options)
  1363. {
  1364. TClientContext context;
  1365. context.Config = options.Config_ ? options.Config_ : TConfig::Get();
  1366. context.TvmOnly = options.TvmOnly_;
  1367. context.ProxyAddress = options.ProxyAddress_;
  1368. context.ServerName = serverName;
  1369. ApplyProxyUrlAliasingRules(context.ServerName);
  1370. if (context.ServerName.find('.') == TString::npos &&
  1371. context.ServerName.find(':') == TString::npos &&
  1372. context.ServerName.find("localhost") == TString::npos)
  1373. {
  1374. context.ServerName += ".yt.yandex.net";
  1375. }
  1376. static constexpr char httpUrlSchema[] = "http://";
  1377. static constexpr char httpsUrlSchema[] = "https://";
  1378. if (options.UseTLS_) {
  1379. context.UseTLS = *options.UseTLS_;
  1380. } else {
  1381. context.UseTLS = context.ServerName.StartsWith(httpsUrlSchema);
  1382. }
  1383. if (context.ServerName.StartsWith(httpUrlSchema)) {
  1384. if (context.UseTLS) {
  1385. ythrow TApiUsageError() << "URL schema doesn't match UseTLS option";
  1386. }
  1387. context.ServerName.erase(0, sizeof(httpUrlSchema) - 1);
  1388. }
  1389. if (context.ServerName.StartsWith(httpsUrlSchema)) {
  1390. if (!context.UseTLS) {
  1391. ythrow TApiUsageError() << "URL schema doesn't match UseTLS option";
  1392. }
  1393. context.ServerName.erase(0, sizeof(httpsUrlSchema) - 1);
  1394. }
  1395. if (context.ServerName.find(':') == TString::npos) {
  1396. context.ServerName = CreateHostNameWithPort(context.ServerName, context);
  1397. }
  1398. if (options.TvmOnly_) {
  1399. context.ServerName = Format("tvm.%v", context.ServerName);
  1400. }
  1401. if (options.ProxyRole_) {
  1402. context.Config->Hosts = "hosts?role=" + *options.ProxyRole_;
  1403. }
  1404. if (context.UseTLS || options.UseCoreHttpClient_) {
  1405. context.HttpClient = NHttpClient::CreateCoreHttpClient(context.UseTLS, context.Config);
  1406. } else {
  1407. context.HttpClient = NHttpClient::CreateDefaultHttpClient();
  1408. }
  1409. context.Token = context.Config->Token;
  1410. if (options.Token_) {
  1411. context.Token = options.Token_;
  1412. } else if (options.TokenPath_) {
  1413. context.Token = TConfig::LoadTokenFromFile(options.TokenPath_);
  1414. } else if (options.ServiceTicketAuth_) {
  1415. context.ServiceTicketAuth = options.ServiceTicketAuth_;
  1416. }
  1417. context.ImpersonationUser = options.ImpersonationUser_;
  1418. if (context.Token) {
  1419. TConfig::ValidateToken(context.Token);
  1420. }
  1421. return context;
  1422. }
  1423. TClientPtr CreateClientImpl(
  1424. const TString& serverName,
  1425. const TCreateClientOptions& options)
  1426. {
  1427. auto context = CreateClientContext(serverName, options);
  1428. auto globalTxId = GetGuid(context.Config->GlobalTxId);
  1429. auto retryConfigProvider = options.RetryConfigProvider_;
  1430. if (!retryConfigProvider) {
  1431. retryConfigProvider = CreateDefaultRetryConfigProvider();
  1432. }
  1433. auto rawClient = MakeIntrusive<THttpRawClient>(context);
  1434. EnsureInitialized();
  1435. return new TClient(
  1436. std::move(rawClient),
  1437. context,
  1438. globalTxId,
  1439. CreateDefaultClientRetryPolicy(retryConfigProvider, context.Config));
  1440. }
  1441. ////////////////////////////////////////////////////////////////////////////////
  1442. } // namespace NDetail
  1443. ////////////////////////////////////////////////////////////////////////////////
  1444. IClientPtr CreateClient(
  1445. const TString& serverName,
  1446. const TCreateClientOptions& options)
  1447. {
  1448. return NDetail::CreateClientImpl(serverName, options);
  1449. }
  1450. IClientPtr CreateClientFromEnv(const TCreateClientOptions& options)
  1451. {
  1452. auto serverName = GetEnv("YT_PROXY");
  1453. if (!serverName) {
  1454. ythrow yexception() << "YT_PROXY is not set";
  1455. }
  1456. return CreateClient(serverName, options);
  1457. }
  1458. ////////////////////////////////////////////////////////////////////////////////
  1459. } // namespace NYT