client.cpp 46 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468
  1. #include "client.h"
  2. #include "batch_request_impl.h"
  3. #include "client_reader.h"
  4. #include "client_writer.h"
  5. #include "file_reader.h"
  6. #include "file_writer.h"
  7. #include "format_hints.h"
  8. #include "init.h"
  9. #include "lock.h"
  10. #include "operation.h"
  11. #include "retry_transaction.h"
  12. #include "retryful_writer.h"
  13. #include "transaction.h"
  14. #include "transaction_pinger.h"
  15. #include "yt_poller.h"
  16. #include <yt/cpp/mapreduce/common/helpers.h>
  17. #include <yt/cpp/mapreduce/common/retry_lib.h>
  18. #include <yt/cpp/mapreduce/http/helpers.h>
  19. #include <yt/cpp/mapreduce/http/http.h>
  20. #include <yt/cpp/mapreduce/http/http_client.h>
  21. #include <yt/cpp/mapreduce/http/requests.h>
  22. #include <yt/cpp/mapreduce/http/retry_request.h>
  23. #include <yt/cpp/mapreduce/interface/config.h>
  24. #include <yt/cpp/mapreduce/interface/client.h>
  25. #include <yt/cpp/mapreduce/interface/error_codes.h>
  26. #include <yt/cpp/mapreduce/interface/fluent.h>
  27. #include <yt/cpp/mapreduce/interface/logging/yt_log.h>
  28. #include <yt/cpp/mapreduce/interface/skiff_row.h>
  29. #include <yt/cpp/mapreduce/io/yamr_table_reader.h>
  30. #include <yt/cpp/mapreduce/io/yamr_table_writer.h>
  31. #include <yt/cpp/mapreduce/io/node_table_reader.h>
  32. #include <yt/cpp/mapreduce/io/node_table_writer.h>
  33. #include <yt/cpp/mapreduce/io/proto_table_reader.h>
  34. #include <yt/cpp/mapreduce/io/proto_table_writer.h>
  35. #include <yt/cpp/mapreduce/io/skiff_row_table_reader.h>
  36. #include <yt/cpp/mapreduce/io/proto_helpers.h>
  37. #include <yt/cpp/mapreduce/library/table_schema/protobuf.h>
  38. #include <yt/cpp/mapreduce/raw_client/raw_requests.h>
  39. #include <yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.h>
  40. #include <yt/yt/core/ytree/fluent.h>
  41. #include <library/cpp/json/json_reader.h>
  42. #include <util/generic/algorithm.h>
  43. #include <util/string/type.h>
  44. #include <util/system/env.h>
  45. using namespace NYT::NDetail::NRawClient;
  46. namespace NYT {
  47. ////////////////////////////////////////////////////////////////////////////////
  48. namespace NDetail {
  49. ////////////////////////////////////////////////////////////////////////////////
  50. namespace {
  51. ////////////////////////////////////////////////////////////////////////////////
  52. THashMap<TString, TString> ParseProxyUrlAliasingRules(TString envConfig)
  53. {
  54. if (envConfig.empty()) {
  55. return {};
  56. }
  57. return NYTree::ConvertTo<THashMap<TString, TString>>(NYson::TYsonString(envConfig));
  58. }
  59. void ApplyProxyUrlAliasingRules(TString& url)
  60. {
  61. static auto rules = ParseProxyUrlAliasingRules(GetEnv("YT_PROXY_URL_ALIASING_CONFIG"));
  62. if (auto ruleIt = rules.find(url); ruleIt != rules.end()) {
  63. url = ruleIt->second;
  64. }
  65. }
  66. ////////////////////////////////////////////////////////////////////////////////
  67. } // namespace
  68. ////////////////////////////////////////////////////////////////////////////////
  69. TClientBase::TClientBase(
  70. const TClientContext& context,
  71. const TTransactionId& transactionId,
  72. IClientRetryPolicyPtr retryPolicy)
  73. : Context_(context)
  74. , TransactionId_(transactionId)
  75. , ClientRetryPolicy_(std::move(retryPolicy))
  76. { }
  77. ITransactionPtr TClientBase::StartTransaction(
  78. const TStartTransactionOptions& options)
  79. {
  80. return MakeIntrusive<TTransaction>(GetParentClientImpl(), Context_, TransactionId_, options);
  81. }
  82. TNodeId TClientBase::Create(
  83. const TYPath& path,
  84. ENodeType type,
  85. const TCreateOptions& options)
  86. {
  87. return NRawClient::Create(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, TransactionId_, path, type, options);
  88. }
  89. void TClientBase::Remove(
  90. const TYPath& path,
  91. const TRemoveOptions& options)
  92. {
  93. return NRawClient::Remove(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, TransactionId_, path, options);
  94. }
  95. bool TClientBase::Exists(
  96. const TYPath& path,
  97. const TExistsOptions& options)
  98. {
  99. return NRawClient::Exists(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, TransactionId_, path, options);
  100. }
  101. TNode TClientBase::Get(
  102. const TYPath& path,
  103. const TGetOptions& options)
  104. {
  105. return NRawClient::Get(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, TransactionId_, path, options);
  106. }
  107. void TClientBase::Set(
  108. const TYPath& path,
  109. const TNode& value,
  110. const TSetOptions& options)
  111. {
  112. NRawClient::Set(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, TransactionId_, path, value, options);
  113. }
  114. void TClientBase::MultisetAttributes(
  115. const TYPath& path, const TNode::TMapType& value, const TMultisetAttributesOptions& options)
  116. {
  117. NRawClient::MultisetAttributes(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, TransactionId_, path, value, options);
  118. }
  119. TNode::TListType TClientBase::List(
  120. const TYPath& path,
  121. const TListOptions& options)
  122. {
  123. return NRawClient::List(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, TransactionId_, path, options);
  124. }
  125. TNodeId TClientBase::Copy(
  126. const TYPath& sourcePath,
  127. const TYPath& destinationPath,
  128. const TCopyOptions& options)
  129. {
  130. try {
  131. return NRawClient::CopyInsideMasterCell(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, TransactionId_, sourcePath, destinationPath, options);
  132. } catch (const TErrorResponse& e) {
  133. if (e.GetError().ContainsErrorCode(NClusterErrorCodes::NObjectClient::CrossCellAdditionalPath)) {
  134. // Do transaction for cross cell copying.
  135. std::function<TNodeId(ITransactionPtr)> lambda = [this, &sourcePath, &destinationPath, &options](ITransactionPtr transaction) {
  136. return NRawClient::CopyWithoutRetries(Context_, transaction->GetId(), sourcePath, destinationPath, options);
  137. };
  138. return RetryTransactionWithPolicy<TNodeId>(
  139. this,
  140. lambda,
  141. ClientRetryPolicy_->CreatePolicyForGenericRequest()
  142. );
  143. } else {
  144. throw;
  145. }
  146. }
  147. }
  148. TNodeId TClientBase::Move(
  149. const TYPath& sourcePath,
  150. const TYPath& destinationPath,
  151. const TMoveOptions& options)
  152. {
  153. try {
  154. return NRawClient::MoveInsideMasterCell(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, TransactionId_, sourcePath, destinationPath, options);
  155. } catch (const TErrorResponse& e) {
  156. if (e.GetError().ContainsErrorCode(NClusterErrorCodes::NObjectClient::CrossCellAdditionalPath)) {
  157. // Do transaction for cross cell moving.
  158. std::function<TNodeId(ITransactionPtr)> lambda = [this, &sourcePath, &destinationPath, &options](ITransactionPtr transaction) {
  159. return NRawClient::MoveWithoutRetries(Context_, transaction->GetId(), sourcePath, destinationPath, options);
  160. };
  161. return RetryTransactionWithPolicy<TNodeId>(
  162. this,
  163. lambda,
  164. ClientRetryPolicy_->CreatePolicyForGenericRequest()
  165. );
  166. } else {
  167. throw;
  168. }
  169. }
  170. }
  171. TNodeId TClientBase::Link(
  172. const TYPath& targetPath,
  173. const TYPath& linkPath,
  174. const TLinkOptions& options)
  175. {
  176. return NRawClient::Link(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, TransactionId_, targetPath, linkPath, options);
  177. }
  178. void TClientBase::Concatenate(
  179. const TVector<TRichYPath>& sourcePaths,
  180. const TRichYPath& destinationPath,
  181. const TConcatenateOptions& options)
  182. {
  183. std::function<void(ITransactionPtr)> lambda = [&sourcePaths, &destinationPath, &options, this](ITransactionPtr transaction) {
  184. if (!options.Append_ && !sourcePaths.empty() && !transaction->Exists(destinationPath.Path_)) {
  185. auto typeNode = transaction->Get(CanonizeYPath(sourcePaths.front()).Path_ + "/@type");
  186. auto type = FromString<ENodeType>(typeNode.AsString());
  187. transaction->Create(destinationPath.Path_, type, TCreateOptions().IgnoreExisting(true));
  188. }
  189. NRawClient::Concatenate(this->Context_, transaction->GetId(), sourcePaths, destinationPath, options);
  190. };
  191. RetryTransactionWithPolicy(this, lambda, ClientRetryPolicy_->CreatePolicyForGenericRequest());
  192. }
  193. TRichYPath TClientBase::CanonizeYPath(const TRichYPath& path)
  194. {
  195. return NRawClient::CanonizeYPath(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, path);
  196. }
  197. TVector<TTableColumnarStatistics> TClientBase::GetTableColumnarStatistics(
  198. const TVector<TRichYPath>& paths,
  199. const TGetTableColumnarStatisticsOptions& options)
  200. {
  201. return NRawClient::GetTableColumnarStatistics(
  202. ClientRetryPolicy_->CreatePolicyForGenericRequest(),
  203. Context_,
  204. TransactionId_,
  205. paths,
  206. options);
  207. }
  208. TMultiTablePartitions TClientBase::GetTablePartitions(
  209. const TVector<TRichYPath>& paths,
  210. const TGetTablePartitionsOptions& options)
  211. {
  212. return NRawClient::GetTablePartitions(
  213. ClientRetryPolicy_->CreatePolicyForGenericRequest(),
  214. Context_,
  215. TransactionId_,
  216. paths,
  217. options);
  218. }
  219. TMaybe<TYPath> TClientBase::GetFileFromCache(
  220. const TString& md5Signature,
  221. const TYPath& cachePath,
  222. const TGetFileFromCacheOptions& options)
  223. {
  224. return NRawClient::GetFileFromCache(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, TransactionId_, md5Signature, cachePath, options);
  225. }
  226. TYPath TClientBase::PutFileToCache(
  227. const TYPath& filePath,
  228. const TString& md5Signature,
  229. const TYPath& cachePath,
  230. const TPutFileToCacheOptions& options)
  231. {
  232. return NRawClient::PutFileToCache(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, TransactionId_, filePath, md5Signature, cachePath, options);
  233. }
  234. IFileReaderPtr TClientBase::CreateBlobTableReader(
  235. const TYPath& path,
  236. const TKey& key,
  237. const TBlobTableReaderOptions& options)
  238. {
  239. return new TBlobTableReader(
  240. path,
  241. key,
  242. ClientRetryPolicy_,
  243. GetTransactionPinger(),
  244. Context_,
  245. TransactionId_,
  246. options);
  247. }
  248. IFileReaderPtr TClientBase::CreateFileReader(
  249. const TRichYPath& path,
  250. const TFileReaderOptions& options)
  251. {
  252. return new TFileReader(
  253. CanonizeYPath(path),
  254. ClientRetryPolicy_,
  255. GetTransactionPinger(),
  256. Context_,
  257. TransactionId_,
  258. options);
  259. }
  260. IFileWriterPtr TClientBase::CreateFileWriter(
  261. const TRichYPath& path,
  262. const TFileWriterOptions& options)
  263. {
  264. auto realPath = CanonizeYPath(path);
  265. if (!NRawClient::Exists(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, TransactionId_, realPath.Path_)) {
  266. NRawClient::Create(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, TransactionId_, realPath.Path_, NT_FILE,
  267. TCreateOptions().IgnoreExisting(true));
  268. }
  269. return new TFileWriter(realPath, ClientRetryPolicy_, GetTransactionPinger(), Context_, TransactionId_, options);
  270. }
  271. TTableWriterPtr<::google::protobuf::Message> TClientBase::CreateTableWriter(
  272. const TRichYPath& path, const ::google::protobuf::Descriptor& descriptor, const TTableWriterOptions& options)
  273. {
  274. const Message* prototype = google::protobuf::MessageFactory::generated_factory()->GetPrototype(&descriptor);
  275. return new TTableWriter<::google::protobuf::Message>(CreateProtoWriter(path, options, prototype));
  276. }
  277. TRawTableReaderPtr TClientBase::CreateRawReader(
  278. const TRichYPath& path,
  279. const TFormat& format,
  280. const TTableReaderOptions& options)
  281. {
  282. return CreateClientReader(path, format, options).Get();
  283. }
  284. TRawTableWriterPtr TClientBase::CreateRawWriter(
  285. const TRichYPath& path,
  286. const TFormat& format,
  287. const TTableWriterOptions& options)
  288. {
  289. return ::MakeIntrusive<TRetryfulWriter>(
  290. ClientRetryPolicy_,
  291. GetTransactionPinger(),
  292. Context_,
  293. TransactionId_,
  294. GetWriteTableCommand(Context_.Config->ApiVersion),
  295. format,
  296. CanonizeYPath(path),
  297. options).Get();
  298. }
  299. IOperationPtr TClientBase::DoMap(
  300. const TMapOperationSpec& spec,
  301. ::TIntrusivePtr<IStructuredJob> mapper,
  302. const TOperationOptions& options)
  303. {
  304. auto operation = ::MakeIntrusive<TOperation>(GetParentClientImpl());
  305. auto prepareOperation = [
  306. this_ = ::TIntrusivePtr(this),
  307. operation,
  308. spec,
  309. mapper,
  310. options
  311. ] () {
  312. ExecuteMap(
  313. operation,
  314. ::MakeIntrusive<TOperationPreparer>(this_->GetParentClientImpl(), this_->TransactionId_),
  315. spec,
  316. mapper,
  317. options);
  318. };
  319. return ProcessOperation(GetParentClientImpl(), std::move(prepareOperation), std::move(operation), options);
  320. }
  321. IOperationPtr TClientBase::RawMap(
  322. const TRawMapOperationSpec& spec,
  323. ::TIntrusivePtr<IRawJob> mapper,
  324. const TOperationOptions& options)
  325. {
  326. auto operation = ::MakeIntrusive<TOperation>(GetParentClientImpl());
  327. auto prepareOperation = [
  328. this_=::TIntrusivePtr(this),
  329. operation,
  330. spec,
  331. mapper,
  332. options
  333. ] () {
  334. ExecuteRawMap(
  335. operation,
  336. ::MakeIntrusive<TOperationPreparer>(this_->GetParentClientImpl(), this_->TransactionId_),
  337. spec,
  338. mapper,
  339. options);
  340. };
  341. return ProcessOperation(GetParentClientImpl(), std::move(prepareOperation), std::move(operation), options);
  342. }
  343. IOperationPtr TClientBase::DoReduce(
  344. const TReduceOperationSpec& spec,
  345. ::TIntrusivePtr<IStructuredJob> reducer,
  346. const TOperationOptions& options)
  347. {
  348. auto operation = ::MakeIntrusive<TOperation>(GetParentClientImpl());
  349. auto prepareOperation = [
  350. this_=::TIntrusivePtr(this),
  351. operation,
  352. spec,
  353. reducer,
  354. options
  355. ] () {
  356. ExecuteReduce(
  357. operation,
  358. ::MakeIntrusive<TOperationPreparer>(this_->GetParentClientImpl(), this_->TransactionId_),
  359. spec,
  360. reducer,
  361. options);
  362. };
  363. return ProcessOperation(GetParentClientImpl(), std::move(prepareOperation), std::move(operation), options);
  364. }
  365. IOperationPtr TClientBase::RawReduce(
  366. const TRawReduceOperationSpec& spec,
  367. ::TIntrusivePtr<IRawJob> reducer,
  368. const TOperationOptions& options)
  369. {
  370. auto operation = ::MakeIntrusive<TOperation>(GetParentClientImpl());
  371. auto prepareOperation = [
  372. this_=::TIntrusivePtr(this),
  373. operation,
  374. spec,
  375. reducer,
  376. options
  377. ] () {
  378. ExecuteRawReduce(
  379. operation,
  380. ::MakeIntrusive<TOperationPreparer>(this_->GetParentClientImpl(), this_->TransactionId_),
  381. spec,
  382. reducer,
  383. options);
  384. };
  385. return ProcessOperation(GetParentClientImpl(), std::move(prepareOperation), std::move(operation), options);
  386. }
  387. IOperationPtr TClientBase::DoJoinReduce(
  388. const TJoinReduceOperationSpec& spec,
  389. ::TIntrusivePtr<IStructuredJob> reducer,
  390. const TOperationOptions& options)
  391. {
  392. auto operation = ::MakeIntrusive<TOperation>(GetParentClientImpl());
  393. auto prepareOperation = [
  394. this_=::TIntrusivePtr(this),
  395. operation,
  396. spec,
  397. reducer,
  398. options
  399. ] () {
  400. ExecuteJoinReduce(
  401. operation,
  402. ::MakeIntrusive<TOperationPreparer>(this_->GetParentClientImpl(), this_->TransactionId_),
  403. spec,
  404. reducer,
  405. options);
  406. };
  407. return ProcessOperation(GetParentClientImpl(), std::move(prepareOperation), std::move(operation), options);
  408. }
  409. IOperationPtr TClientBase::RawJoinReduce(
  410. const TRawJoinReduceOperationSpec& spec,
  411. ::TIntrusivePtr<IRawJob> reducer,
  412. const TOperationOptions& options)
  413. {
  414. auto operation = ::MakeIntrusive<TOperation>(GetParentClientImpl());
  415. auto prepareOperation = [
  416. this_=::TIntrusivePtr(this),
  417. operation,
  418. spec,
  419. reducer,
  420. options
  421. ] () {
  422. ExecuteRawJoinReduce(
  423. operation,
  424. ::MakeIntrusive<TOperationPreparer>(this_->GetParentClientImpl(), this_->TransactionId_),
  425. spec,
  426. reducer,
  427. options);
  428. };
  429. return ProcessOperation(GetParentClientImpl(), std::move(prepareOperation), std::move(operation), options);
  430. }
  431. IOperationPtr TClientBase::DoMapReduce(
  432. const TMapReduceOperationSpec& spec,
  433. ::TIntrusivePtr<IStructuredJob> mapper,
  434. ::TIntrusivePtr<IStructuredJob> reduceCombiner,
  435. ::TIntrusivePtr<IStructuredJob> reducer,
  436. const TOperationOptions& options)
  437. {
  438. auto operation = ::MakeIntrusive<TOperation>(GetParentClientImpl());
  439. auto prepareOperation = [
  440. this_=::TIntrusivePtr(this),
  441. operation,
  442. spec,
  443. mapper,
  444. reduceCombiner,
  445. reducer,
  446. options
  447. ] () {
  448. ExecuteMapReduce(
  449. operation,
  450. ::MakeIntrusive<TOperationPreparer>(this_->GetParentClientImpl(), this_->TransactionId_),
  451. spec,
  452. mapper,
  453. reduceCombiner,
  454. reducer,
  455. options);
  456. };
  457. return ProcessOperation(GetParentClientImpl(), std::move(prepareOperation), std::move(operation), options);
  458. }
  459. IOperationPtr TClientBase::RawMapReduce(
  460. const TRawMapReduceOperationSpec& spec,
  461. ::TIntrusivePtr<IRawJob> mapper,
  462. ::TIntrusivePtr<IRawJob> reduceCombiner,
  463. ::TIntrusivePtr<IRawJob> reducer,
  464. const TOperationOptions& options)
  465. {
  466. auto operation = ::MakeIntrusive<TOperation>(GetParentClientImpl());
  467. auto prepareOperation = [
  468. this_=::TIntrusivePtr(this),
  469. operation,
  470. spec,
  471. mapper,
  472. reduceCombiner,
  473. reducer,
  474. options
  475. ] () {
  476. ExecuteRawMapReduce(
  477. operation,
  478. ::MakeIntrusive<TOperationPreparer>(this_->GetParentClientImpl(), this_->TransactionId_),
  479. spec,
  480. mapper,
  481. reduceCombiner,
  482. reducer,
  483. options);
  484. };
  485. return ProcessOperation(GetParentClientImpl(), std::move(prepareOperation), std::move(operation), options);
  486. }
  487. IOperationPtr TClientBase::Sort(
  488. const TSortOperationSpec& spec,
  489. const TOperationOptions& options)
  490. {
  491. auto operation = ::MakeIntrusive<TOperation>(GetParentClientImpl());
  492. auto prepareOperation = [
  493. this_ = ::TIntrusivePtr(this),
  494. operation,
  495. spec,
  496. options
  497. ] () {
  498. ExecuteSort(
  499. operation,
  500. ::MakeIntrusive<TOperationPreparer>(this_->GetParentClientImpl(), this_->TransactionId_),
  501. spec,
  502. options);
  503. };
  504. return ProcessOperation(GetParentClientImpl(), std::move(prepareOperation), std::move(operation), options);
  505. }
  506. IOperationPtr TClientBase::Merge(
  507. const TMergeOperationSpec& spec,
  508. const TOperationOptions& options)
  509. {
  510. auto operation = ::MakeIntrusive<TOperation>(GetParentClientImpl());
  511. auto prepareOperation = [
  512. this_ = ::TIntrusivePtr(this),
  513. operation,
  514. spec,
  515. options
  516. ] () {
  517. ExecuteMerge(
  518. operation,
  519. ::MakeIntrusive<TOperationPreparer>(this_->GetParentClientImpl(), this_->TransactionId_),
  520. spec,
  521. options);
  522. };
  523. return ProcessOperation(GetParentClientImpl(), std::move(prepareOperation), std::move(operation), options);
  524. }
  525. IOperationPtr TClientBase::Erase(
  526. const TEraseOperationSpec& spec,
  527. const TOperationOptions& options)
  528. {
  529. auto operation = ::MakeIntrusive<TOperation>(GetParentClientImpl());
  530. auto prepareOperation = [
  531. this_ = ::TIntrusivePtr(this),
  532. operation,
  533. spec,
  534. options
  535. ] () {
  536. ExecuteErase(
  537. operation,
  538. ::MakeIntrusive<TOperationPreparer>(this_->GetParentClientImpl(), this_->TransactionId_),
  539. spec,
  540. options);
  541. };
  542. return ProcessOperation(GetParentClientImpl(), std::move(prepareOperation), std::move(operation), options);
  543. }
  544. IOperationPtr TClientBase::RemoteCopy(
  545. const TRemoteCopyOperationSpec& spec,
  546. const TOperationOptions& options)
  547. {
  548. auto operation = ::MakeIntrusive<TOperation>(GetParentClientImpl());
  549. auto prepareOperation = [
  550. this_ = ::TIntrusivePtr(this),
  551. operation,
  552. spec,
  553. options
  554. ] () {
  555. ExecuteRemoteCopy(
  556. operation,
  557. ::MakeIntrusive<TOperationPreparer>(this_->GetParentClientImpl(), this_->TransactionId_),
  558. spec,
  559. options);
  560. };
  561. return ProcessOperation(GetParentClientImpl(), std::move(prepareOperation), std::move(operation), options);
  562. }
  563. IOperationPtr TClientBase::RunVanilla(
  564. const TVanillaOperationSpec& spec,
  565. const TOperationOptions& options)
  566. {
  567. auto operation = ::MakeIntrusive<TOperation>(GetParentClientImpl());
  568. auto prepareOperation = [
  569. this_ = ::TIntrusivePtr(this),
  570. operation,
  571. spec,
  572. options
  573. ] () {
  574. ExecuteVanilla(
  575. operation,
  576. ::MakeIntrusive<TOperationPreparer>(this_->GetParentClientImpl(), this_->TransactionId_),
  577. spec,
  578. options);
  579. };
  580. return ProcessOperation(GetParentClientImpl(), std::move(prepareOperation), std::move(operation), options);
  581. }
  582. IOperationPtr TClientBase::AttachOperation(const TOperationId& operationId)
  583. {
  584. auto operation = ::MakeIntrusive<TOperation>(operationId, GetParentClientImpl());
  585. operation->GetBriefState(); // check that operation exists
  586. return operation;
  587. }
  588. EOperationBriefState TClientBase::CheckOperation(const TOperationId& operationId)
  589. {
  590. return NYT::NDetail::CheckOperation(ClientRetryPolicy_, Context_, operationId);
  591. }
  592. void TClientBase::AbortOperation(const TOperationId& operationId)
  593. {
  594. NRawClient::AbortOperation(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, operationId);
  595. }
  596. void TClientBase::CompleteOperation(const TOperationId& operationId)
  597. {
  598. NRawClient::CompleteOperation(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, operationId);
  599. }
  600. void TClientBase::WaitForOperation(const TOperationId& operationId)
  601. {
  602. NYT::NDetail::WaitForOperation(ClientRetryPolicy_, Context_, operationId);
  603. }
  604. void TClientBase::AlterTable(
  605. const TYPath& path,
  606. const TAlterTableOptions& options)
  607. {
  608. NRawClient::AlterTable(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, TransactionId_, path, options);
  609. }
  610. ::TIntrusivePtr<TClientReader> TClientBase::CreateClientReader(
  611. const TRichYPath& path,
  612. const TFormat& format,
  613. const TTableReaderOptions& options,
  614. bool useFormatFromTableAttributes)
  615. {
  616. return ::MakeIntrusive<TClientReader>(
  617. CanonizeYPath(path),
  618. ClientRetryPolicy_,
  619. GetTransactionPinger(),
  620. Context_,
  621. TransactionId_,
  622. format,
  623. options,
  624. useFormatFromTableAttributes);
  625. }
  626. THolder<TClientWriter> TClientBase::CreateClientWriter(
  627. const TRichYPath& path,
  628. const TFormat& format,
  629. const TTableWriterOptions& options)
  630. {
  631. auto realPath = CanonizeYPath(path);
  632. if (!NRawClient::Exists(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, TransactionId_, realPath.Path_)) {
  633. NRawClient::Create(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, TransactionId_, realPath.Path_, NT_TABLE,
  634. TCreateOptions().IgnoreExisting(true));
  635. }
  636. return MakeHolder<TClientWriter>(
  637. realPath,
  638. ClientRetryPolicy_,
  639. GetTransactionPinger(),
  640. Context_,
  641. TransactionId_,
  642. format,
  643. options
  644. );
  645. }
  646. ::TIntrusivePtr<INodeReaderImpl> TClientBase::CreateNodeReader(
  647. const TRichYPath& path, const TTableReaderOptions& options)
  648. {
  649. auto format = TFormat::YsonBinary();
  650. ApplyFormatHints<TNode>(&format, options.FormatHints_);
  651. // Skiff is disabled here because of large header problem (see https://st.yandex-team.ru/YT-6926).
  652. // Revert this code to r3614168 when it is fixed.
  653. return new TNodeTableReader(
  654. CreateClientReader(path, format, options));
  655. }
  656. ::TIntrusivePtr<IYaMRReaderImpl> TClientBase::CreateYaMRReader(
  657. const TRichYPath& path, const TTableReaderOptions& options)
  658. {
  659. return new TYaMRTableReader(
  660. CreateClientReader(path, TFormat::YaMRLenval(), options, /* useFormatFromTableAttributes = */ true));
  661. }
  662. ::TIntrusivePtr<IProtoReaderImpl> TClientBase::CreateProtoReader(
  663. const TRichYPath& path,
  664. const TTableReaderOptions& options,
  665. const Message* prototype)
  666. {
  667. TVector<const ::google::protobuf::Descriptor*> descriptors;
  668. descriptors.push_back(prototype->GetDescriptor());
  669. if (Context_.Config->UseClientProtobuf) {
  670. return new TProtoTableReader(
  671. CreateClientReader(path, TFormat::YsonBinary(), options),
  672. std::move(descriptors));
  673. } else {
  674. auto format = TFormat::Protobuf({prototype->GetDescriptor()}, Context_.Config->ProtobufFormatWithDescriptors);
  675. return new TLenvalProtoTableReader(
  676. CreateClientReader(path, format, options),
  677. std::move(descriptors));
  678. }
  679. }
  680. ::TIntrusivePtr<ISkiffRowReaderImpl> TClientBase::CreateSkiffRowReader(
  681. const TRichYPath& path,
  682. const TTableReaderOptions& options,
  683. const ISkiffRowSkipperPtr& skipper,
  684. const NSkiff::TSkiffSchemaPtr& schema)
  685. {
  686. auto skiffOptions = TCreateSkiffSchemaOptions().HasRangeIndex(true);
  687. auto resultSchema = NYT::NDetail::CreateSkiffSchema(TVector{schema}, skiffOptions);
  688. return new TSkiffRowTableReader(
  689. CreateClientReader(path, NYT::NDetail::CreateSkiffFormat(resultSchema), options),
  690. resultSchema,
  691. {skipper},
  692. std::move(skiffOptions));
  693. }
  694. ::TIntrusivePtr<INodeWriterImpl> TClientBase::CreateNodeWriter(
  695. const TRichYPath& path, const TTableWriterOptions& options)
  696. {
  697. auto format = TFormat::YsonBinary();
  698. ApplyFormatHints<TNode>(&format, options.FormatHints_);
  699. return new TNodeTableWriter(
  700. CreateClientWriter(path, format, options));
  701. }
  702. ::TIntrusivePtr<IYaMRWriterImpl> TClientBase::CreateYaMRWriter(
  703. const TRichYPath& path, const TTableWriterOptions& options)
  704. {
  705. auto format = TFormat::YaMRLenval();
  706. ApplyFormatHints<TYaMRRow>(&format, options.FormatHints_);
  707. return new TYaMRTableWriter(
  708. CreateClientWriter(path, format, options));
  709. }
  710. ::TIntrusivePtr<IProtoWriterImpl> TClientBase::CreateProtoWriter(
  711. const TRichYPath& path,
  712. const TTableWriterOptions& options,
  713. const Message* prototype)
  714. {
  715. TVector<const ::google::protobuf::Descriptor*> descriptors;
  716. descriptors.push_back(prototype->GetDescriptor());
  717. auto pathWithSchema = path;
  718. if (options.InferSchema_.GetOrElse(Context_.Config->InferTableSchema) && !path.Schema_) {
  719. pathWithSchema.Schema(CreateTableSchema(*prototype->GetDescriptor()));
  720. }
  721. if (Context_.Config->UseClientProtobuf) {
  722. auto format = TFormat::YsonBinary();
  723. ApplyFormatHints<TNode>(&format, options.FormatHints_);
  724. return new TProtoTableWriter(
  725. CreateClientWriter(pathWithSchema, format, options),
  726. std::move(descriptors));
  727. } else {
  728. auto format = TFormat::Protobuf({prototype->GetDescriptor()}, Context_.Config->ProtobufFormatWithDescriptors);
  729. ApplyFormatHints<::google::protobuf::Message>(&format, options.FormatHints_);
  730. return new TLenvalProtoTableWriter(
  731. CreateClientWriter(pathWithSchema, format, options),
  732. std::move(descriptors));
  733. }
  734. }
  735. TBatchRequestPtr TClientBase::CreateBatchRequest()
  736. {
  737. return MakeIntrusive<TBatchRequest>(TransactionId_, GetParentClientImpl());
  738. }
  739. IClientPtr TClientBase::GetParentClient()
  740. {
  741. return GetParentClientImpl();
  742. }
  743. const TClientContext& TClientBase::GetContext() const
  744. {
  745. return Context_;
  746. }
  747. const IClientRetryPolicyPtr& TClientBase::GetRetryPolicy() const
  748. {
  749. return ClientRetryPolicy_;
  750. }
  751. ////////////////////////////////////////////////////////////////////////////////
  752. TTransaction::TTransaction(
  753. TClientPtr parentClient,
  754. const TClientContext& context,
  755. const TTransactionId& parentTransactionId,
  756. const TStartTransactionOptions& options)
  757. : TClientBase(context, parentTransactionId, parentClient->GetRetryPolicy())
  758. , TransactionPinger_(parentClient->GetTransactionPinger())
  759. , PingableTx_(
  760. MakeHolder<TPingableTransaction>(
  761. parentClient->GetRetryPolicy(),
  762. context,
  763. parentTransactionId,
  764. TransactionPinger_->GetChildTxPinger(),
  765. options))
  766. , ParentClient_(parentClient)
  767. {
  768. TransactionId_ = PingableTx_->GetId();
  769. }
  770. TTransaction::TTransaction(
  771. TClientPtr parentClient,
  772. const TClientContext& context,
  773. const TTransactionId& transactionId,
  774. const TAttachTransactionOptions& options)
  775. : TClientBase(context, transactionId, parentClient->GetRetryPolicy())
  776. , TransactionPinger_(parentClient->GetTransactionPinger())
  777. , PingableTx_(
  778. new TPingableTransaction(
  779. parentClient->GetRetryPolicy(),
  780. context,
  781. transactionId,
  782. parentClient->GetTransactionPinger()->GetChildTxPinger(),
  783. options))
  784. , ParentClient_(parentClient)
  785. { }
  786. const TTransactionId& TTransaction::GetId() const
  787. {
  788. return TransactionId_;
  789. }
  790. ILockPtr TTransaction::Lock(
  791. const TYPath& path,
  792. ELockMode mode,
  793. const TLockOptions& options)
  794. {
  795. auto lockId = NRawClient::Lock(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, TransactionId_, path, mode, options);
  796. return ::MakeIntrusive<TLock>(lockId, GetParentClientImpl(), options.Waitable_);
  797. }
  798. void TTransaction::Unlock(
  799. const TYPath& path,
  800. const TUnlockOptions& options)
  801. {
  802. NRawClient::Unlock(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, TransactionId_, path, options);
  803. }
  804. void TTransaction::Commit()
  805. {
  806. PingableTx_->Commit();
  807. }
  808. void TTransaction::Abort()
  809. {
  810. PingableTx_->Abort();
  811. }
  812. void TTransaction::Ping()
  813. {
  814. PingTx(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, TransactionId_);
  815. }
  816. void TTransaction::Detach()
  817. {
  818. PingableTx_->Detach();
  819. }
  820. ITransactionPingerPtr TTransaction::GetTransactionPinger()
  821. {
  822. return TransactionPinger_;
  823. }
  824. TClientPtr TTransaction::GetParentClientImpl()
  825. {
  826. return ParentClient_;
  827. }
  828. ////////////////////////////////////////////////////////////////////////////////
  829. TClient::TClient(
  830. const TClientContext& context,
  831. const TTransactionId& globalId,
  832. IClientRetryPolicyPtr retryPolicy)
  833. : TClientBase(context, globalId, retryPolicy)
  834. , TransactionPinger_(nullptr)
  835. { }
  836. TClient::~TClient() = default;
  837. ITransactionPtr TClient::AttachTransaction(
  838. const TTransactionId& transactionId,
  839. const TAttachTransactionOptions& options)
  840. {
  841. CheckShutdown();
  842. return MakeIntrusive<TTransaction>(this, Context_, transactionId, options);
  843. }
  844. void TClient::MountTable(
  845. const TYPath& path,
  846. const TMountTableOptions& options)
  847. {
  848. CheckShutdown();
  849. THttpHeader header("POST", "mount_table");
  850. SetTabletParams(header, path, options);
  851. if (options.CellId_) {
  852. header.AddParameter("cell_id", GetGuidAsString(*options.CellId_));
  853. }
  854. header.AddParameter("freeze", options.Freeze_);
  855. RetryRequestWithPolicy(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, header);
  856. }
  857. void TClient::UnmountTable(
  858. const TYPath& path,
  859. const TUnmountTableOptions& options)
  860. {
  861. CheckShutdown();
  862. THttpHeader header("POST", "unmount_table");
  863. SetTabletParams(header, path, options);
  864. header.AddParameter("force", options.Force_);
  865. RetryRequestWithPolicy(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, header);
  866. }
  867. void TClient::RemountTable(
  868. const TYPath& path,
  869. const TRemountTableOptions& options)
  870. {
  871. CheckShutdown();
  872. THttpHeader header("POST", "remount_table");
  873. SetTabletParams(header, path, options);
  874. RetryRequestWithPolicy(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, header);
  875. }
  876. void TClient::FreezeTable(
  877. const TYPath& path,
  878. const TFreezeTableOptions& options)
  879. {
  880. CheckShutdown();
  881. NRawClient::FreezeTable(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, path, options);
  882. }
  883. void TClient::UnfreezeTable(
  884. const TYPath& path,
  885. const TUnfreezeTableOptions& options)
  886. {
  887. CheckShutdown();
  888. NRawClient::UnfreezeTable(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, path, options);
  889. }
  890. void TClient::ReshardTable(
  891. const TYPath& path,
  892. const TVector<TKey>& keys,
  893. const TReshardTableOptions& options)
  894. {
  895. CheckShutdown();
  896. THttpHeader header("POST", "reshard_table");
  897. SetTabletParams(header, path, options);
  898. header.AddParameter("pivot_keys", BuildYsonNodeFluently().List(keys));
  899. RetryRequestWithPolicy(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, header);
  900. }
  901. void TClient::ReshardTable(
  902. const TYPath& path,
  903. i64 tabletCount,
  904. const TReshardTableOptions& options)
  905. {
  906. CheckShutdown();
  907. THttpHeader header("POST", "reshard_table");
  908. SetTabletParams(header, path, options);
  909. header.AddParameter("tablet_count", tabletCount);
  910. RetryRequestWithPolicy(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, header);
  911. }
  912. void TClient::InsertRows(
  913. const TYPath& path,
  914. const TNode::TListType& rows,
  915. const TInsertRowsOptions& options)
  916. {
  917. CheckShutdown();
  918. THttpHeader header("PUT", "insert_rows");
  919. header.SetInputFormat(TFormat::YsonBinary());
  920. // TODO: use corresponding raw request
  921. header.MergeParameters(SerializeParametersForInsertRows(Context_.Config->Prefix, path, options));
  922. auto body = NodeListToYsonString(rows);
  923. TRequestConfig config;
  924. config.IsHeavy = true;
  925. RetryRequestWithPolicy(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, header, body, config);
  926. }
  927. void TClient::DeleteRows(
  928. const TYPath& path,
  929. const TNode::TListType& keys,
  930. const TDeleteRowsOptions& options)
  931. {
  932. CheckShutdown();
  933. return NRawClient::DeleteRows(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, path, keys, options);
  934. }
  935. void TClient::TrimRows(
  936. const TYPath& path,
  937. i64 tabletIndex,
  938. i64 rowCount,
  939. const TTrimRowsOptions& options)
  940. {
  941. CheckShutdown();
  942. THttpHeader header("POST", "trim_rows");
  943. header.AddParameter("trimmed_row_count", rowCount);
  944. header.AddParameter("tablet_index", tabletIndex);
  945. // TODO: use corresponding raw request
  946. header.MergeParameters(NRawClient::SerializeParametersForTrimRows(Context_.Config->Prefix, path, options));
  947. TRequestConfig config;
  948. config.IsHeavy = true;
  949. RetryRequestWithPolicy(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, header, {}, config);
  950. }
  951. TNode::TListType TClient::LookupRows(
  952. const TYPath& path,
  953. const TNode::TListType& keys,
  954. const TLookupRowsOptions& options)
  955. {
  956. CheckShutdown();
  957. Y_UNUSED(options);
  958. THttpHeader header("PUT", "lookup_rows");
  959. header.AddPath(AddPathPrefix(path, Context_.Config->ApiVersion));
  960. header.SetInputFormat(TFormat::YsonBinary());
  961. header.SetOutputFormat(TFormat::YsonBinary());
  962. header.MergeParameters(BuildYsonNodeFluently().BeginMap()
  963. .DoIf(options.Timeout_.Defined(), [&] (TFluentMap fluent) {
  964. fluent.Item("timeout").Value(static_cast<i64>(options.Timeout_->MilliSeconds()));
  965. })
  966. .Item("keep_missing_rows").Value(options.KeepMissingRows_)
  967. .DoIf(options.Versioned_.Defined(), [&] (TFluentMap fluent) {
  968. fluent.Item("versioned").Value(*options.Versioned_);
  969. })
  970. .DoIf(options.Columns_.Defined(), [&] (TFluentMap fluent) {
  971. fluent.Item("column_names").Value(*options.Columns_);
  972. })
  973. .EndMap());
  974. auto body = NodeListToYsonString(keys);
  975. TRequestConfig config;
  976. config.IsHeavy = true;
  977. auto result = RetryRequestWithPolicy(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, header, body, config);
  978. return NodeFromYsonString(result.Response, ::NYson::EYsonType::ListFragment).AsList();
  979. }
  980. TNode::TListType TClient::SelectRows(
  981. const TString& query,
  982. const TSelectRowsOptions& options)
  983. {
  984. CheckShutdown();
  985. THttpHeader header("GET", "select_rows");
  986. header.SetInputFormat(TFormat::YsonBinary());
  987. header.SetOutputFormat(TFormat::YsonBinary());
  988. header.MergeParameters(BuildYsonNodeFluently().BeginMap()
  989. .Item("query").Value(query)
  990. .DoIf(options.Timeout_.Defined(), [&] (TFluentMap fluent) {
  991. fluent.Item("timeout").Value(static_cast<i64>(options.Timeout_->MilliSeconds()));
  992. })
  993. .DoIf(options.InputRowLimit_.Defined(), [&] (TFluentMap fluent) {
  994. fluent.Item("input_row_limit").Value(*options.InputRowLimit_);
  995. })
  996. .DoIf(options.OutputRowLimit_.Defined(), [&] (TFluentMap fluent) {
  997. fluent.Item("output_row_limit").Value(*options.OutputRowLimit_);
  998. })
  999. .Item("range_expansion_limit").Value(options.RangeExpansionLimit_)
  1000. .Item("fail_on_incomplete_result").Value(options.FailOnIncompleteResult_)
  1001. .Item("verbose_logging").Value(options.VerboseLogging_)
  1002. .Item("enable_code_cache").Value(options.EnableCodeCache_)
  1003. .EndMap());
  1004. TRequestConfig config;
  1005. config.IsHeavy = true;
  1006. auto result = RetryRequestWithPolicy(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, header, {}, config);
  1007. return NodeFromYsonString(result.Response, ::NYson::EYsonType::ListFragment).AsList();
  1008. }
  1009. void TClient::AlterTableReplica(const TReplicaId& replicaId, const TAlterTableReplicaOptions& options)
  1010. {
  1011. CheckShutdown();
  1012. NRawClient::AlterTableReplica(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, replicaId, options);
  1013. }
  1014. ui64 TClient::GenerateTimestamp()
  1015. {
  1016. CheckShutdown();
  1017. THttpHeader header("GET", "generate_timestamp");
  1018. TRequestConfig config;
  1019. config.IsHeavy = true;
  1020. auto requestResult = RetryRequestWithPolicy(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, header, {}, config);
  1021. return NodeFromYsonString(requestResult.Response).AsUint64();
  1022. }
  1023. TAuthorizationInfo TClient::WhoAmI()
  1024. {
  1025. CheckShutdown();
  1026. THttpHeader header("GET", "auth/whoami", /* isApi = */ false);
  1027. auto requestResult = RetryRequestWithPolicy(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, header);
  1028. TAuthorizationInfo result;
  1029. NJson::TJsonValue jsonValue;
  1030. bool ok = NJson::ReadJsonTree(requestResult.Response, &jsonValue, /* throwOnError = */ true);
  1031. Y_ABORT_UNLESS(ok);
  1032. result.Login = jsonValue["login"].GetString();
  1033. result.Realm = jsonValue["realm"].GetString();
  1034. return result;
  1035. }
  1036. TOperationAttributes TClient::GetOperation(
  1037. const TOperationId& operationId,
  1038. const TGetOperationOptions& options)
  1039. {
  1040. CheckShutdown();
  1041. return NRawClient::GetOperation(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, operationId, options);
  1042. }
  1043. TOperationAttributes TClient::GetOperation(
  1044. const TString& alias,
  1045. const TGetOperationOptions& options)
  1046. {
  1047. CheckShutdown();
  1048. return NRawClient::GetOperation(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, alias, options);
  1049. }
  1050. TListOperationsResult TClient::ListOperations(
  1051. const TListOperationsOptions& options)
  1052. {
  1053. CheckShutdown();
  1054. return NRawClient::ListOperations(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, options);
  1055. }
  1056. void TClient::UpdateOperationParameters(
  1057. const TOperationId& operationId,
  1058. const TUpdateOperationParametersOptions& options)
  1059. {
  1060. CheckShutdown();
  1061. return NRawClient::UpdateOperationParameters(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, operationId, options);
  1062. }
  1063. TJobAttributes TClient::GetJob(
  1064. const TOperationId& operationId,
  1065. const TJobId& jobId,
  1066. const TGetJobOptions& options)
  1067. {
  1068. CheckShutdown();
  1069. return NRawClient::GetJob(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, operationId, jobId, options);
  1070. }
  1071. TListJobsResult TClient::ListJobs(
  1072. const TOperationId& operationId,
  1073. const TListJobsOptions& options)
  1074. {
  1075. CheckShutdown();
  1076. return NRawClient::ListJobs(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, operationId, options);
  1077. }
  1078. IFileReaderPtr TClient::GetJobInput(
  1079. const TJobId& jobId,
  1080. const TGetJobInputOptions& options)
  1081. {
  1082. CheckShutdown();
  1083. return NRawClient::GetJobInput(Context_, jobId, options);
  1084. }
  1085. IFileReaderPtr TClient::GetJobFailContext(
  1086. const TOperationId& operationId,
  1087. const TJobId& jobId,
  1088. const TGetJobFailContextOptions& options)
  1089. {
  1090. CheckShutdown();
  1091. return NRawClient::GetJobFailContext(Context_, operationId, jobId, options);
  1092. }
  1093. IFileReaderPtr TClient::GetJobStderr(
  1094. const TOperationId& operationId,
  1095. const TJobId& jobId,
  1096. const TGetJobStderrOptions& options)
  1097. {
  1098. CheckShutdown();
  1099. return NRawClient::GetJobStderr(Context_, operationId, jobId, options);
  1100. }
  1101. std::vector<TJobTraceEvent> TClient::GetJobTrace(
  1102. const TOperationId& operationId,
  1103. const TGetJobTraceOptions& options)
  1104. {
  1105. CheckShutdown();
  1106. return NRawClient::GetJobTrace(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, operationId, options);
  1107. }
  1108. TNode::TListType TClient::SkyShareTable(
  1109. const std::vector<TYPath>& tablePaths,
  1110. const TSkyShareTableOptions& options)
  1111. {
  1112. CheckShutdown();
  1113. return NRawClient::SkyShareTable(
  1114. ClientRetryPolicy_->CreatePolicyForGenericRequest(),
  1115. Context_,
  1116. tablePaths,
  1117. options);
  1118. }
  1119. TCheckPermissionResponse TClient::CheckPermission(
  1120. const TString& user,
  1121. EPermission permission,
  1122. const TYPath& path,
  1123. const TCheckPermissionOptions& options)
  1124. {
  1125. CheckShutdown();
  1126. return NRawClient::CheckPermission(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, user, permission, path, options);
  1127. }
  1128. TVector<TTabletInfo> TClient::GetTabletInfos(
  1129. const TYPath& path,
  1130. const TVector<int>& tabletIndexes,
  1131. const TGetTabletInfosOptions& options)
  1132. {
  1133. CheckShutdown();
  1134. return NRawClient::GetTabletInfos(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, path, tabletIndexes, options);
  1135. }
  1136. void TClient::SuspendOperation(
  1137. const TOperationId& operationId,
  1138. const TSuspendOperationOptions& options)
  1139. {
  1140. CheckShutdown();
  1141. NRawClient::SuspendOperation(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, operationId, options);
  1142. }
  1143. void TClient::ResumeOperation(
  1144. const TOperationId& operationId,
  1145. const TResumeOperationOptions& options)
  1146. {
  1147. CheckShutdown();
  1148. NRawClient::ResumeOperation(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, operationId, options);
  1149. }
  1150. TYtPoller& TClient::GetYtPoller()
  1151. {
  1152. auto g = Guard(Lock_);
  1153. if (!YtPoller_) {
  1154. CheckShutdown();
  1155. // We don't use current client and create new client because YtPoller_ might use
  1156. // this client during current client shutdown.
  1157. // That might lead to incrementing of current client refcount and double delete of current client object.
  1158. YtPoller_ = MakeHolder<TYtPoller>(Context_, ClientRetryPolicy_);
  1159. }
  1160. return *YtPoller_;
  1161. }
  1162. void TClient::Shutdown()
  1163. {
  1164. auto g = Guard(Lock_);
  1165. if (!Shutdown_.exchange(true) && YtPoller_) {
  1166. YtPoller_->Stop();
  1167. }
  1168. }
  1169. ITransactionPingerPtr TClient::GetTransactionPinger()
  1170. {
  1171. auto g = Guard(Lock_);
  1172. if (!TransactionPinger_) {
  1173. TransactionPinger_ = CreateTransactionPinger(Context_.Config);
  1174. }
  1175. return TransactionPinger_;
  1176. }
  1177. TClientPtr TClient::GetParentClientImpl()
  1178. {
  1179. return this;
  1180. }
  1181. template <class TOptions>
  1182. void TClient::SetTabletParams(
  1183. THttpHeader& header,
  1184. const TYPath& path,
  1185. const TOptions& options)
  1186. {
  1187. header.AddPath(AddPathPrefix(path, Context_.Config->Prefix));
  1188. if (options.FirstTabletIndex_) {
  1189. header.AddParameter("first_tablet_index", *options.FirstTabletIndex_);
  1190. }
  1191. if (options.LastTabletIndex_) {
  1192. header.AddParameter("last_tablet_index", *options.LastTabletIndex_);
  1193. }
  1194. }
  1195. void TClient::CheckShutdown() const
  1196. {
  1197. if (Shutdown_) {
  1198. ythrow TApiUsageError() << "Call client's methods after shutdown";
  1199. }
  1200. }
  1201. TClientPtr CreateClientImpl(
  1202. const TString& serverName,
  1203. const TCreateClientOptions& options)
  1204. {
  1205. TClientContext context;
  1206. context.Config = options.Config_ ? options.Config_ : TConfig::Get();
  1207. context.TvmOnly = options.TvmOnly_;
  1208. context.ProxyAddress = options.ProxyAddress_;
  1209. context.ServerName = serverName;
  1210. ApplyProxyUrlAliasingRules(context.ServerName);
  1211. if (context.ServerName.find('.') == TString::npos &&
  1212. context.ServerName.find(':') == TString::npos &&
  1213. context.ServerName.find("localhost") == TString::npos)
  1214. {
  1215. context.ServerName += ".yt.yandex.net";
  1216. }
  1217. static constexpr char httpUrlSchema[] = "http://";
  1218. static constexpr char httpsUrlSchema[] = "https://";
  1219. if (options.UseTLS_) {
  1220. context.UseTLS = *options.UseTLS_;
  1221. } else {
  1222. context.UseTLS = context.ServerName.StartsWith(httpsUrlSchema);
  1223. }
  1224. if (context.ServerName.StartsWith(httpUrlSchema)) {
  1225. if (context.UseTLS) {
  1226. ythrow TApiUsageError() << "URL schema doesn't match UseTLS option";
  1227. }
  1228. context.ServerName.erase(0, sizeof(httpUrlSchema) - 1);
  1229. }
  1230. if (context.ServerName.StartsWith(httpsUrlSchema)) {
  1231. if (!context.UseTLS) {
  1232. ythrow TApiUsageError() << "URL schema doesn't match UseTLS option";
  1233. }
  1234. context.ServerName.erase(0, sizeof(httpsUrlSchema) - 1);
  1235. }
  1236. if (context.ServerName.find(':') == TString::npos) {
  1237. context.ServerName = CreateHostNameWithPort(context.ServerName, context);
  1238. }
  1239. if (options.TvmOnly_) {
  1240. context.ServerName = Format("tvm.%v", context.ServerName);
  1241. }
  1242. if (context.UseTLS || options.UseCoreHttpClient_) {
  1243. context.HttpClient = NHttpClient::CreateCoreHttpClient(context.UseTLS, context.Config);
  1244. } else {
  1245. context.HttpClient = NHttpClient::CreateDefaultHttpClient();
  1246. }
  1247. context.Token = context.Config->Token;
  1248. if (options.Token_) {
  1249. context.Token = options.Token_;
  1250. } else if (options.TokenPath_) {
  1251. context.Token = TConfig::LoadTokenFromFile(options.TokenPath_);
  1252. } else if (options.ServiceTicketAuth_) {
  1253. context.ServiceTicketAuth = options.ServiceTicketAuth_;
  1254. }
  1255. context.ImpersonationUser = options.ImpersonationUser_;
  1256. if (context.Token) {
  1257. TConfig::ValidateToken(context.Token);
  1258. }
  1259. auto globalTxId = GetGuid(context.Config->GlobalTxId);
  1260. auto retryConfigProvider = options.RetryConfigProvider_;
  1261. if (!retryConfigProvider) {
  1262. retryConfigProvider = CreateDefaultRetryConfigProvider();
  1263. }
  1264. EnsureInitialized();
  1265. return new TClient(context, globalTxId, CreateDefaultClientRetryPolicy(retryConfigProvider, context.Config));
  1266. }
  1267. ////////////////////////////////////////////////////////////////////////////////
  1268. } // namespace NDetail
  1269. ////////////////////////////////////////////////////////////////////////////////
  1270. IClientPtr CreateClient(
  1271. const TString& serverName,
  1272. const TCreateClientOptions& options)
  1273. {
  1274. return NDetail::CreateClientImpl(serverName, options);
  1275. }
  1276. IClientPtr CreateClientFromEnv(const TCreateClientOptions& options)
  1277. {
  1278. auto serverName = GetEnv("YT_PROXY");
  1279. if (!serverName) {
  1280. ythrow yexception() << "YT_PROXY is not set";
  1281. }
  1282. return CreateClient(serverName, options);
  1283. }
  1284. ////////////////////////////////////////////////////////////////////////////////
  1285. } // namespace NYT