client.cpp 46 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456
  1. #include "client.h"
  2. #include "batch_request_impl.h"
  3. #include "client_reader.h"
  4. #include "client_writer.h"
  5. #include "file_reader.h"
  6. #include "file_writer.h"
  7. #include "format_hints.h"
  8. #include "lock.h"
  9. #include "operation.h"
  10. #include "retry_transaction.h"
  11. #include "retryful_writer.h"
  12. #include "transaction.h"
  13. #include "transaction_pinger.h"
  14. #include "yt_poller.h"
  15. #include <yt/cpp/mapreduce/common/helpers.h>
  16. #include <yt/cpp/mapreduce/common/retry_lib.h>
  17. #include <yt/cpp/mapreduce/http/helpers.h>
  18. #include <yt/cpp/mapreduce/http/http.h>
  19. #include <yt/cpp/mapreduce/http/http_client.h>
  20. #include <yt/cpp/mapreduce/http/requests.h>
  21. #include <yt/cpp/mapreduce/http/retry_request.h>
  22. #include <yt/cpp/mapreduce/interface/config.h>
  23. #include <yt/cpp/mapreduce/interface/client.h>
  24. #include <yt/cpp/mapreduce/interface/error_codes.h>
  25. #include <yt/cpp/mapreduce/interface/fluent.h>
  26. #include <yt/cpp/mapreduce/interface/logging/yt_log.h>
  27. #include <yt/cpp/mapreduce/interface/skiff_row.h>
  28. #include <yt/cpp/mapreduce/io/yamr_table_reader.h>
  29. #include <yt/cpp/mapreduce/io/yamr_table_writer.h>
  30. #include <yt/cpp/mapreduce/io/node_table_reader.h>
  31. #include <yt/cpp/mapreduce/io/node_table_writer.h>
  32. #include <yt/cpp/mapreduce/io/proto_table_reader.h>
  33. #include <yt/cpp/mapreduce/io/proto_table_writer.h>
  34. #include <yt/cpp/mapreduce/io/skiff_row_table_reader.h>
  35. #include <yt/cpp/mapreduce/io/proto_helpers.h>
  36. #include <yt/cpp/mapreduce/library/table_schema/protobuf.h>
  37. #include <yt/cpp/mapreduce/raw_client/raw_requests.h>
  38. #include <yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.h>
  39. #include <yt/yt/core/ytree/fluent.h>
  40. #include <library/cpp/json/json_reader.h>
  41. #include <util/generic/algorithm.h>
  42. #include <util/string/type.h>
  43. #include <util/system/env.h>
  44. using namespace NYT::NDetail::NRawClient;
  45. namespace NYT {
  46. ////////////////////////////////////////////////////////////////////////////////
  47. namespace NDetail {
  48. ////////////////////////////////////////////////////////////////////////////////
  49. namespace {
  50. ////////////////////////////////////////////////////////////////////////////////
  51. THashMap<TString, TString> ParseProxyUrlAliasingRules(TString envConfig)
  52. {
  53. if (envConfig.empty()) {
  54. return {};
  55. }
  56. return NYTree::ConvertTo<THashMap<TString, TString>>(NYson::TYsonString(envConfig));
  57. }
  58. void ApplyProxyUrlAliasingRules(TString& url)
  59. {
  60. static auto rules = ParseProxyUrlAliasingRules(GetEnv("YT_PROXY_URL_ALIASING_CONFIG"));
  61. if (auto ruleIt = rules.find(url); ruleIt != rules.end()) {
  62. url = ruleIt->second;
  63. }
  64. }
  65. ////////////////////////////////////////////////////////////////////////////////
  66. } // namespace
  67. ////////////////////////////////////////////////////////////////////////////////
  68. TClientBase::TClientBase(
  69. const TClientContext& context,
  70. const TTransactionId& transactionId,
  71. IClientRetryPolicyPtr retryPolicy)
  72. : Context_(context)
  73. , TransactionId_(transactionId)
  74. , ClientRetryPolicy_(std::move(retryPolicy))
  75. { }
  76. ITransactionPtr TClientBase::StartTransaction(
  77. const TStartTransactionOptions& options)
  78. {
  79. return MakeIntrusive<TTransaction>(GetParentClientImpl(), Context_, TransactionId_, options);
  80. }
  81. TNodeId TClientBase::Create(
  82. const TYPath& path,
  83. ENodeType type,
  84. const TCreateOptions& options)
  85. {
  86. return NRawClient::Create(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, TransactionId_, path, type, options);
  87. }
  88. void TClientBase::Remove(
  89. const TYPath& path,
  90. const TRemoveOptions& options)
  91. {
  92. return NRawClient::Remove(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, TransactionId_, path, options);
  93. }
  94. bool TClientBase::Exists(
  95. const TYPath& path,
  96. const TExistsOptions& options)
  97. {
  98. return NRawClient::Exists(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, TransactionId_, path, options);
  99. }
  100. TNode TClientBase::Get(
  101. const TYPath& path,
  102. const TGetOptions& options)
  103. {
  104. return NRawClient::Get(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, TransactionId_, path, options);
  105. }
  106. void TClientBase::Set(
  107. const TYPath& path,
  108. const TNode& value,
  109. const TSetOptions& options)
  110. {
  111. NRawClient::Set(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, TransactionId_, path, value, options);
  112. }
  113. void TClientBase::MultisetAttributes(
  114. const TYPath& path, const TNode::TMapType& value, const TMultisetAttributesOptions& options)
  115. {
  116. NRawClient::MultisetAttributes(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, TransactionId_, path, value, options);
  117. }
  118. TNode::TListType TClientBase::List(
  119. const TYPath& path,
  120. const TListOptions& options)
  121. {
  122. return NRawClient::List(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, TransactionId_, path, options);
  123. }
  124. TNodeId TClientBase::Copy(
  125. const TYPath& sourcePath,
  126. const TYPath& destinationPath,
  127. const TCopyOptions& options)
  128. {
  129. try {
  130. return NRawClient::CopyInsideMasterCell(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, TransactionId_, sourcePath, destinationPath, options);
  131. } catch (const TErrorResponse& e) {
  132. if (e.GetError().ContainsErrorCode(NClusterErrorCodes::NObjectClient::CrossCellAdditionalPath)) {
  133. // Do transaction for cross cell copying.
  134. std::function<TNodeId(ITransactionPtr)> lambda = [this, &sourcePath, &destinationPath, &options](ITransactionPtr transaction) {
  135. return NRawClient::CopyWithoutRetries(Context_, transaction->GetId(), sourcePath, destinationPath, options);
  136. };
  137. return RetryTransactionWithPolicy<TNodeId>(
  138. this,
  139. lambda,
  140. ClientRetryPolicy_->CreatePolicyForGenericRequest()
  141. );
  142. } else {
  143. throw;
  144. }
  145. }
  146. }
  147. TNodeId TClientBase::Move(
  148. const TYPath& sourcePath,
  149. const TYPath& destinationPath,
  150. const TMoveOptions& options)
  151. {
  152. try {
  153. return NRawClient::MoveInsideMasterCell(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, TransactionId_, sourcePath, destinationPath, options);
  154. } catch (const TErrorResponse& e) {
  155. if (e.GetError().ContainsErrorCode(NClusterErrorCodes::NObjectClient::CrossCellAdditionalPath)) {
  156. // Do transaction for cross cell moving.
  157. std::function<TNodeId(ITransactionPtr)> lambda = [this, &sourcePath, &destinationPath, &options](ITransactionPtr transaction) {
  158. return NRawClient::MoveWithoutRetries(Context_, transaction->GetId(), sourcePath, destinationPath, options);
  159. };
  160. return RetryTransactionWithPolicy<TNodeId>(
  161. this,
  162. lambda,
  163. ClientRetryPolicy_->CreatePolicyForGenericRequest()
  164. );
  165. } else {
  166. throw;
  167. }
  168. }
  169. }
  170. TNodeId TClientBase::Link(
  171. const TYPath& targetPath,
  172. const TYPath& linkPath,
  173. const TLinkOptions& options)
  174. {
  175. return NRawClient::Link(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, TransactionId_, targetPath, linkPath, options);
  176. }
  177. void TClientBase::Concatenate(
  178. const TVector<TRichYPath>& sourcePaths,
  179. const TRichYPath& destinationPath,
  180. const TConcatenateOptions& options)
  181. {
  182. std::function<void(ITransactionPtr)> lambda = [&sourcePaths, &destinationPath, &options, this](ITransactionPtr transaction) {
  183. if (!options.Append_ && !sourcePaths.empty() && !transaction->Exists(destinationPath.Path_)) {
  184. auto typeNode = transaction->Get(CanonizeYPath(sourcePaths.front()).Path_ + "/@type");
  185. auto type = FromString<ENodeType>(typeNode.AsString());
  186. transaction->Create(destinationPath.Path_, type, TCreateOptions().IgnoreExisting(true));
  187. }
  188. NRawClient::Concatenate(this->Context_, transaction->GetId(), sourcePaths, destinationPath, options);
  189. };
  190. RetryTransactionWithPolicy(this, lambda, ClientRetryPolicy_->CreatePolicyForGenericRequest());
  191. }
  192. TRichYPath TClientBase::CanonizeYPath(const TRichYPath& path)
  193. {
  194. return NRawClient::CanonizeYPath(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, path);
  195. }
  196. TVector<TTableColumnarStatistics> TClientBase::GetTableColumnarStatistics(
  197. const TVector<TRichYPath>& paths,
  198. const TGetTableColumnarStatisticsOptions& options)
  199. {
  200. return NRawClient::GetTableColumnarStatistics(
  201. ClientRetryPolicy_->CreatePolicyForGenericRequest(),
  202. Context_,
  203. TransactionId_,
  204. paths,
  205. options);
  206. }
  207. TMultiTablePartitions TClientBase::GetTablePartitions(
  208. const TVector<TRichYPath>& paths,
  209. const TGetTablePartitionsOptions& options)
  210. {
  211. return NRawClient::GetTablePartitions(
  212. ClientRetryPolicy_->CreatePolicyForGenericRequest(),
  213. Context_,
  214. TransactionId_,
  215. paths,
  216. options);
  217. }
  218. TMaybe<TYPath> TClientBase::GetFileFromCache(
  219. const TString& md5Signature,
  220. const TYPath& cachePath,
  221. const TGetFileFromCacheOptions& options)
  222. {
  223. return NRawClient::GetFileFromCache(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, TransactionId_, md5Signature, cachePath, options);
  224. }
  225. TYPath TClientBase::PutFileToCache(
  226. const TYPath& filePath,
  227. const TString& md5Signature,
  228. const TYPath& cachePath,
  229. const TPutFileToCacheOptions& options)
  230. {
  231. return NRawClient::PutFileToCache(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, TransactionId_, filePath, md5Signature, cachePath, options);
  232. }
  233. IFileReaderPtr TClientBase::CreateBlobTableReader(
  234. const TYPath& path,
  235. const TKey& key,
  236. const TBlobTableReaderOptions& options)
  237. {
  238. return new TBlobTableReader(
  239. path,
  240. key,
  241. ClientRetryPolicy_,
  242. GetTransactionPinger(),
  243. Context_,
  244. TransactionId_,
  245. options);
  246. }
  247. IFileReaderPtr TClientBase::CreateFileReader(
  248. const TRichYPath& path,
  249. const TFileReaderOptions& options)
  250. {
  251. return new TFileReader(
  252. CanonizeYPath(path),
  253. ClientRetryPolicy_,
  254. GetTransactionPinger(),
  255. Context_,
  256. TransactionId_,
  257. options);
  258. }
  259. IFileWriterPtr TClientBase::CreateFileWriter(
  260. const TRichYPath& path,
  261. const TFileWriterOptions& options)
  262. {
  263. auto realPath = CanonizeYPath(path);
  264. if (!NRawClient::Exists(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, TransactionId_, realPath.Path_)) {
  265. NRawClient::Create(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, TransactionId_, realPath.Path_, NT_FILE,
  266. TCreateOptions().IgnoreExisting(true));
  267. }
  268. return new TFileWriter(realPath, ClientRetryPolicy_, GetTransactionPinger(), Context_, TransactionId_, options);
  269. }
  270. TTableWriterPtr<::google::protobuf::Message> TClientBase::CreateTableWriter(
  271. const TRichYPath& path, const ::google::protobuf::Descriptor& descriptor, const TTableWriterOptions& options)
  272. {
  273. const Message* prototype = google::protobuf::MessageFactory::generated_factory()->GetPrototype(&descriptor);
  274. return new TTableWriter<::google::protobuf::Message>(CreateProtoWriter(path, options, prototype));
  275. }
  276. TRawTableReaderPtr TClientBase::CreateRawReader(
  277. const TRichYPath& path,
  278. const TFormat& format,
  279. const TTableReaderOptions& options)
  280. {
  281. return CreateClientReader(path, format, options).Get();
  282. }
  283. TRawTableWriterPtr TClientBase::CreateRawWriter(
  284. const TRichYPath& path,
  285. const TFormat& format,
  286. const TTableWriterOptions& options)
  287. {
  288. return ::MakeIntrusive<TRetryfulWriter>(
  289. ClientRetryPolicy_,
  290. GetTransactionPinger(),
  291. Context_,
  292. TransactionId_,
  293. GetWriteTableCommand(Context_.Config->ApiVersion),
  294. format,
  295. CanonizeYPath(path),
  296. options).Get();
  297. }
  298. IOperationPtr TClientBase::DoMap(
  299. const TMapOperationSpec& spec,
  300. ::TIntrusivePtr<IStructuredJob> mapper,
  301. const TOperationOptions& options)
  302. {
  303. auto operation = ::MakeIntrusive<TOperation>(GetParentClientImpl());
  304. auto prepareOperation = [
  305. this_ = ::TIntrusivePtr(this),
  306. operation,
  307. spec,
  308. mapper,
  309. options
  310. ] () {
  311. ExecuteMap(
  312. operation,
  313. ::MakeIntrusive<TOperationPreparer>(this_->GetParentClientImpl(), this_->TransactionId_),
  314. spec,
  315. mapper,
  316. options);
  317. };
  318. return ProcessOperation(GetParentClientImpl(), std::move(prepareOperation), std::move(operation), options);
  319. }
  320. IOperationPtr TClientBase::RawMap(
  321. const TRawMapOperationSpec& spec,
  322. ::TIntrusivePtr<IRawJob> mapper,
  323. const TOperationOptions& options)
  324. {
  325. auto operation = ::MakeIntrusive<TOperation>(GetParentClientImpl());
  326. auto prepareOperation = [
  327. this_=::TIntrusivePtr(this),
  328. operation,
  329. spec,
  330. mapper,
  331. options
  332. ] () {
  333. ExecuteRawMap(
  334. operation,
  335. ::MakeIntrusive<TOperationPreparer>(this_->GetParentClientImpl(), this_->TransactionId_),
  336. spec,
  337. mapper,
  338. options);
  339. };
  340. return ProcessOperation(GetParentClientImpl(), std::move(prepareOperation), std::move(operation), options);
  341. }
  342. IOperationPtr TClientBase::DoReduce(
  343. const TReduceOperationSpec& spec,
  344. ::TIntrusivePtr<IStructuredJob> reducer,
  345. const TOperationOptions& options)
  346. {
  347. auto operation = ::MakeIntrusive<TOperation>(GetParentClientImpl());
  348. auto prepareOperation = [
  349. this_=::TIntrusivePtr(this),
  350. operation,
  351. spec,
  352. reducer,
  353. options
  354. ] () {
  355. ExecuteReduce(
  356. operation,
  357. ::MakeIntrusive<TOperationPreparer>(this_->GetParentClientImpl(), this_->TransactionId_),
  358. spec,
  359. reducer,
  360. options);
  361. };
  362. return ProcessOperation(GetParentClientImpl(), std::move(prepareOperation), std::move(operation), options);
  363. }
  364. IOperationPtr TClientBase::RawReduce(
  365. const TRawReduceOperationSpec& spec,
  366. ::TIntrusivePtr<IRawJob> reducer,
  367. const TOperationOptions& options)
  368. {
  369. auto operation = ::MakeIntrusive<TOperation>(GetParentClientImpl());
  370. auto prepareOperation = [
  371. this_=::TIntrusivePtr(this),
  372. operation,
  373. spec,
  374. reducer,
  375. options
  376. ] () {
  377. ExecuteRawReduce(
  378. operation,
  379. ::MakeIntrusive<TOperationPreparer>(this_->GetParentClientImpl(), this_->TransactionId_),
  380. spec,
  381. reducer,
  382. options);
  383. };
  384. return ProcessOperation(GetParentClientImpl(), std::move(prepareOperation), std::move(operation), options);
  385. }
  386. IOperationPtr TClientBase::DoJoinReduce(
  387. const TJoinReduceOperationSpec& spec,
  388. ::TIntrusivePtr<IStructuredJob> reducer,
  389. const TOperationOptions& options)
  390. {
  391. auto operation = ::MakeIntrusive<TOperation>(GetParentClientImpl());
  392. auto prepareOperation = [
  393. this_=::TIntrusivePtr(this),
  394. operation,
  395. spec,
  396. reducer,
  397. options
  398. ] () {
  399. ExecuteJoinReduce(
  400. operation,
  401. ::MakeIntrusive<TOperationPreparer>(this_->GetParentClientImpl(), this_->TransactionId_),
  402. spec,
  403. reducer,
  404. options);
  405. };
  406. return ProcessOperation(GetParentClientImpl(), std::move(prepareOperation), std::move(operation), options);
  407. }
  408. IOperationPtr TClientBase::RawJoinReduce(
  409. const TRawJoinReduceOperationSpec& spec,
  410. ::TIntrusivePtr<IRawJob> reducer,
  411. const TOperationOptions& options)
  412. {
  413. auto operation = ::MakeIntrusive<TOperation>(GetParentClientImpl());
  414. auto prepareOperation = [
  415. this_=::TIntrusivePtr(this),
  416. operation,
  417. spec,
  418. reducer,
  419. options
  420. ] () {
  421. ExecuteRawJoinReduce(
  422. operation,
  423. ::MakeIntrusive<TOperationPreparer>(this_->GetParentClientImpl(), this_->TransactionId_),
  424. spec,
  425. reducer,
  426. options);
  427. };
  428. return ProcessOperation(GetParentClientImpl(), std::move(prepareOperation), std::move(operation), options);
  429. }
  430. IOperationPtr TClientBase::DoMapReduce(
  431. const TMapReduceOperationSpec& spec,
  432. ::TIntrusivePtr<IStructuredJob> mapper,
  433. ::TIntrusivePtr<IStructuredJob> reduceCombiner,
  434. ::TIntrusivePtr<IStructuredJob> reducer,
  435. const TOperationOptions& options)
  436. {
  437. auto operation = ::MakeIntrusive<TOperation>(GetParentClientImpl());
  438. auto prepareOperation = [
  439. this_=::TIntrusivePtr(this),
  440. operation,
  441. spec,
  442. mapper,
  443. reduceCombiner,
  444. reducer,
  445. options
  446. ] () {
  447. ExecuteMapReduce(
  448. operation,
  449. ::MakeIntrusive<TOperationPreparer>(this_->GetParentClientImpl(), this_->TransactionId_),
  450. spec,
  451. mapper,
  452. reduceCombiner,
  453. reducer,
  454. options);
  455. };
  456. return ProcessOperation(GetParentClientImpl(), std::move(prepareOperation), std::move(operation), options);
  457. }
  458. IOperationPtr TClientBase::RawMapReduce(
  459. const TRawMapReduceOperationSpec& spec,
  460. ::TIntrusivePtr<IRawJob> mapper,
  461. ::TIntrusivePtr<IRawJob> reduceCombiner,
  462. ::TIntrusivePtr<IRawJob> reducer,
  463. const TOperationOptions& options)
  464. {
  465. auto operation = ::MakeIntrusive<TOperation>(GetParentClientImpl());
  466. auto prepareOperation = [
  467. this_=::TIntrusivePtr(this),
  468. operation,
  469. spec,
  470. mapper,
  471. reduceCombiner,
  472. reducer,
  473. options
  474. ] () {
  475. ExecuteRawMapReduce(
  476. operation,
  477. ::MakeIntrusive<TOperationPreparer>(this_->GetParentClientImpl(), this_->TransactionId_),
  478. spec,
  479. mapper,
  480. reduceCombiner,
  481. reducer,
  482. options);
  483. };
  484. return ProcessOperation(GetParentClientImpl(), std::move(prepareOperation), std::move(operation), options);
  485. }
  486. IOperationPtr TClientBase::Sort(
  487. const TSortOperationSpec& spec,
  488. const TOperationOptions& options)
  489. {
  490. auto operation = ::MakeIntrusive<TOperation>(GetParentClientImpl());
  491. auto prepareOperation = [
  492. this_ = ::TIntrusivePtr(this),
  493. operation,
  494. spec,
  495. options
  496. ] () {
  497. ExecuteSort(
  498. operation,
  499. ::MakeIntrusive<TOperationPreparer>(this_->GetParentClientImpl(), this_->TransactionId_),
  500. spec,
  501. options);
  502. };
  503. return ProcessOperation(GetParentClientImpl(), std::move(prepareOperation), std::move(operation), options);
  504. }
  505. IOperationPtr TClientBase::Merge(
  506. const TMergeOperationSpec& spec,
  507. const TOperationOptions& options)
  508. {
  509. auto operation = ::MakeIntrusive<TOperation>(GetParentClientImpl());
  510. auto prepareOperation = [
  511. this_ = ::TIntrusivePtr(this),
  512. operation,
  513. spec,
  514. options
  515. ] () {
  516. ExecuteMerge(
  517. operation,
  518. ::MakeIntrusive<TOperationPreparer>(this_->GetParentClientImpl(), this_->TransactionId_),
  519. spec,
  520. options);
  521. };
  522. return ProcessOperation(GetParentClientImpl(), std::move(prepareOperation), std::move(operation), options);
  523. }
  524. IOperationPtr TClientBase::Erase(
  525. const TEraseOperationSpec& spec,
  526. const TOperationOptions& options)
  527. {
  528. auto operation = ::MakeIntrusive<TOperation>(GetParentClientImpl());
  529. auto prepareOperation = [
  530. this_ = ::TIntrusivePtr(this),
  531. operation,
  532. spec,
  533. options
  534. ] () {
  535. ExecuteErase(
  536. operation,
  537. ::MakeIntrusive<TOperationPreparer>(this_->GetParentClientImpl(), this_->TransactionId_),
  538. spec,
  539. options);
  540. };
  541. return ProcessOperation(GetParentClientImpl(), std::move(prepareOperation), std::move(operation), options);
  542. }
  543. IOperationPtr TClientBase::RemoteCopy(
  544. const TRemoteCopyOperationSpec& spec,
  545. const TOperationOptions& options)
  546. {
  547. auto operation = ::MakeIntrusive<TOperation>(GetParentClientImpl());
  548. auto prepareOperation = [
  549. this_ = ::TIntrusivePtr(this),
  550. operation,
  551. spec,
  552. options
  553. ] () {
  554. ExecuteRemoteCopy(
  555. operation,
  556. ::MakeIntrusive<TOperationPreparer>(this_->GetParentClientImpl(), this_->TransactionId_),
  557. spec,
  558. options);
  559. };
  560. return ProcessOperation(GetParentClientImpl(), std::move(prepareOperation), std::move(operation), options);
  561. }
  562. IOperationPtr TClientBase::RunVanilla(
  563. const TVanillaOperationSpec& spec,
  564. const TOperationOptions& options)
  565. {
  566. auto operation = ::MakeIntrusive<TOperation>(GetParentClientImpl());
  567. auto prepareOperation = [
  568. this_ = ::TIntrusivePtr(this),
  569. operation,
  570. spec,
  571. options
  572. ] () {
  573. ExecuteVanilla(
  574. operation,
  575. ::MakeIntrusive<TOperationPreparer>(this_->GetParentClientImpl(), this_->TransactionId_),
  576. spec,
  577. options);
  578. };
  579. return ProcessOperation(GetParentClientImpl(), std::move(prepareOperation), std::move(operation), options);
  580. }
  581. IOperationPtr TClientBase::AttachOperation(const TOperationId& operationId)
  582. {
  583. auto operation = ::MakeIntrusive<TOperation>(operationId, GetParentClientImpl());
  584. operation->GetBriefState(); // check that operation exists
  585. return operation;
  586. }
  587. EOperationBriefState TClientBase::CheckOperation(const TOperationId& operationId)
  588. {
  589. return NYT::NDetail::CheckOperation(ClientRetryPolicy_, Context_, operationId);
  590. }
  591. void TClientBase::AbortOperation(const TOperationId& operationId)
  592. {
  593. NRawClient::AbortOperation(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, operationId);
  594. }
  595. void TClientBase::CompleteOperation(const TOperationId& operationId)
  596. {
  597. NRawClient::CompleteOperation(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, operationId);
  598. }
  599. void TClientBase::WaitForOperation(const TOperationId& operationId)
  600. {
  601. NYT::NDetail::WaitForOperation(ClientRetryPolicy_, Context_, operationId);
  602. }
  603. void TClientBase::AlterTable(
  604. const TYPath& path,
  605. const TAlterTableOptions& options)
  606. {
  607. NRawClient::AlterTable(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, TransactionId_, path, options);
  608. }
  609. ::TIntrusivePtr<TClientReader> TClientBase::CreateClientReader(
  610. const TRichYPath& path,
  611. const TFormat& format,
  612. const TTableReaderOptions& options,
  613. bool useFormatFromTableAttributes)
  614. {
  615. return ::MakeIntrusive<TClientReader>(
  616. CanonizeYPath(path),
  617. ClientRetryPolicy_,
  618. GetTransactionPinger(),
  619. Context_,
  620. TransactionId_,
  621. format,
  622. options,
  623. useFormatFromTableAttributes);
  624. }
  625. THolder<TClientWriter> TClientBase::CreateClientWriter(
  626. const TRichYPath& path,
  627. const TFormat& format,
  628. const TTableWriterOptions& options)
  629. {
  630. auto realPath = CanonizeYPath(path);
  631. if (!NRawClient::Exists(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, TransactionId_, realPath.Path_)) {
  632. NRawClient::Create(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, TransactionId_, realPath.Path_, NT_TABLE,
  633. TCreateOptions().IgnoreExisting(true));
  634. }
  635. return MakeHolder<TClientWriter>(
  636. realPath,
  637. ClientRetryPolicy_,
  638. GetTransactionPinger(),
  639. Context_,
  640. TransactionId_,
  641. format,
  642. options
  643. );
  644. }
  645. ::TIntrusivePtr<INodeReaderImpl> TClientBase::CreateNodeReader(
  646. const TRichYPath& path, const TTableReaderOptions& options)
  647. {
  648. auto format = TFormat::YsonBinary();
  649. ApplyFormatHints<TNode>(&format, options.FormatHints_);
  650. // Skiff is disabled here because of large header problem (see https://st.yandex-team.ru/YT-6926).
  651. // Revert this code to r3614168 when it is fixed.
  652. return new TNodeTableReader(
  653. CreateClientReader(path, format, options));
  654. }
  655. ::TIntrusivePtr<IYaMRReaderImpl> TClientBase::CreateYaMRReader(
  656. const TRichYPath& path, const TTableReaderOptions& options)
  657. {
  658. return new TYaMRTableReader(
  659. CreateClientReader(path, TFormat::YaMRLenval(), options, /* useFormatFromTableAttributes = */ true));
  660. }
  661. ::TIntrusivePtr<IProtoReaderImpl> TClientBase::CreateProtoReader(
  662. const TRichYPath& path,
  663. const TTableReaderOptions& options,
  664. const Message* prototype)
  665. {
  666. TVector<const ::google::protobuf::Descriptor*> descriptors;
  667. descriptors.push_back(prototype->GetDescriptor());
  668. if (Context_.Config->UseClientProtobuf) {
  669. return new TProtoTableReader(
  670. CreateClientReader(path, TFormat::YsonBinary(), options),
  671. std::move(descriptors));
  672. } else {
  673. auto format = TFormat::Protobuf({prototype->GetDescriptor()}, Context_.Config->ProtobufFormatWithDescriptors);
  674. return new TLenvalProtoTableReader(
  675. CreateClientReader(path, format, options),
  676. std::move(descriptors));
  677. }
  678. }
  679. ::TIntrusivePtr<ISkiffRowReaderImpl> TClientBase::CreateSkiffRowReader(
  680. const TRichYPath& path,
  681. const TTableReaderOptions& options,
  682. const ISkiffRowSkipperPtr& skipper,
  683. const NSkiff::TSkiffSchemaPtr& schema)
  684. {
  685. auto skiffOptions = TCreateSkiffSchemaOptions().HasRangeIndex(true);
  686. auto resultSchema = NYT::NDetail::CreateSkiffSchema(TVector{schema}, skiffOptions);
  687. return new TSkiffRowTableReader(
  688. CreateClientReader(path, NYT::NDetail::CreateSkiffFormat(resultSchema), options),
  689. resultSchema,
  690. {skipper},
  691. std::move(skiffOptions));
  692. }
  693. ::TIntrusivePtr<INodeWriterImpl> TClientBase::CreateNodeWriter(
  694. const TRichYPath& path, const TTableWriterOptions& options)
  695. {
  696. auto format = TFormat::YsonBinary();
  697. ApplyFormatHints<TNode>(&format, options.FormatHints_);
  698. return new TNodeTableWriter(
  699. CreateClientWriter(path, format, options));
  700. }
  701. ::TIntrusivePtr<IYaMRWriterImpl> TClientBase::CreateYaMRWriter(
  702. const TRichYPath& path, const TTableWriterOptions& options)
  703. {
  704. auto format = TFormat::YaMRLenval();
  705. ApplyFormatHints<TYaMRRow>(&format, options.FormatHints_);
  706. return new TYaMRTableWriter(
  707. CreateClientWriter(path, format, options));
  708. }
  709. ::TIntrusivePtr<IProtoWriterImpl> TClientBase::CreateProtoWriter(
  710. const TRichYPath& path,
  711. const TTableWriterOptions& options,
  712. const Message* prototype)
  713. {
  714. TVector<const ::google::protobuf::Descriptor*> descriptors;
  715. descriptors.push_back(prototype->GetDescriptor());
  716. auto pathWithSchema = path;
  717. if (options.InferSchema_.GetOrElse(Context_.Config->InferTableSchema) && !path.Schema_) {
  718. pathWithSchema.Schema(CreateTableSchema(*prototype->GetDescriptor()));
  719. }
  720. if (Context_.Config->UseClientProtobuf) {
  721. auto format = TFormat::YsonBinary();
  722. ApplyFormatHints<TNode>(&format, options.FormatHints_);
  723. return new TProtoTableWriter(
  724. CreateClientWriter(pathWithSchema, format, options),
  725. std::move(descriptors));
  726. } else {
  727. auto format = TFormat::Protobuf({prototype->GetDescriptor()}, Context_.Config->ProtobufFormatWithDescriptors);
  728. ApplyFormatHints<::google::protobuf::Message>(&format, options.FormatHints_);
  729. return new TLenvalProtoTableWriter(
  730. CreateClientWriter(pathWithSchema, format, options),
  731. std::move(descriptors));
  732. }
  733. }
  734. TBatchRequestPtr TClientBase::CreateBatchRequest()
  735. {
  736. return MakeIntrusive<TBatchRequest>(TransactionId_, GetParentClientImpl());
  737. }
  738. IClientPtr TClientBase::GetParentClient()
  739. {
  740. return GetParentClientImpl();
  741. }
  742. const TClientContext& TClientBase::GetContext() const
  743. {
  744. return Context_;
  745. }
  746. const IClientRetryPolicyPtr& TClientBase::GetRetryPolicy() const
  747. {
  748. return ClientRetryPolicy_;
  749. }
  750. ////////////////////////////////////////////////////////////////////////////////
  751. TTransaction::TTransaction(
  752. TClientPtr parentClient,
  753. const TClientContext& context,
  754. const TTransactionId& parentTransactionId,
  755. const TStartTransactionOptions& options)
  756. : TClientBase(context, parentTransactionId, parentClient->GetRetryPolicy())
  757. , TransactionPinger_(parentClient->GetTransactionPinger())
  758. , PingableTx_(
  759. MakeHolder<TPingableTransaction>(
  760. parentClient->GetRetryPolicy(),
  761. context,
  762. parentTransactionId,
  763. TransactionPinger_->GetChildTxPinger(),
  764. options))
  765. , ParentClient_(parentClient)
  766. {
  767. TransactionId_ = PingableTx_->GetId();
  768. }
  769. TTransaction::TTransaction(
  770. TClientPtr parentClient,
  771. const TClientContext& context,
  772. const TTransactionId& transactionId,
  773. const TAttachTransactionOptions& options)
  774. : TClientBase(context, transactionId, parentClient->GetRetryPolicy())
  775. , TransactionPinger_(parentClient->GetTransactionPinger())
  776. , PingableTx_(
  777. new TPingableTransaction(
  778. parentClient->GetRetryPolicy(),
  779. context,
  780. transactionId,
  781. parentClient->GetTransactionPinger()->GetChildTxPinger(),
  782. options))
  783. , ParentClient_(parentClient)
  784. { }
  785. const TTransactionId& TTransaction::GetId() const
  786. {
  787. return TransactionId_;
  788. }
  789. ILockPtr TTransaction::Lock(
  790. const TYPath& path,
  791. ELockMode mode,
  792. const TLockOptions& options)
  793. {
  794. auto lockId = NRawClient::Lock(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, TransactionId_, path, mode, options);
  795. return ::MakeIntrusive<TLock>(lockId, GetParentClientImpl(), options.Waitable_);
  796. }
  797. void TTransaction::Unlock(
  798. const TYPath& path,
  799. const TUnlockOptions& options)
  800. {
  801. NRawClient::Unlock(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, TransactionId_, path, options);
  802. }
  803. void TTransaction::Commit()
  804. {
  805. PingableTx_->Commit();
  806. }
  807. void TTransaction::Abort()
  808. {
  809. PingableTx_->Abort();
  810. }
  811. void TTransaction::Ping()
  812. {
  813. PingTx(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, TransactionId_);
  814. }
  815. void TTransaction::Detach()
  816. {
  817. PingableTx_->Detach();
  818. }
  819. ITransactionPingerPtr TTransaction::GetTransactionPinger()
  820. {
  821. return TransactionPinger_;
  822. }
  823. TClientPtr TTransaction::GetParentClientImpl()
  824. {
  825. return ParentClient_;
  826. }
  827. ////////////////////////////////////////////////////////////////////////////////
  828. TClient::TClient(
  829. const TClientContext& context,
  830. const TTransactionId& globalId,
  831. IClientRetryPolicyPtr retryPolicy)
  832. : TClientBase(context, globalId, retryPolicy)
  833. , TransactionPinger_(nullptr)
  834. { }
  835. TClient::~TClient() = default;
  836. ITransactionPtr TClient::AttachTransaction(
  837. const TTransactionId& transactionId,
  838. const TAttachTransactionOptions& options)
  839. {
  840. CheckShutdown();
  841. return MakeIntrusive<TTransaction>(this, Context_, transactionId, options);
  842. }
  843. void TClient::MountTable(
  844. const TYPath& path,
  845. const TMountTableOptions& options)
  846. {
  847. CheckShutdown();
  848. THttpHeader header("POST", "mount_table");
  849. SetTabletParams(header, path, options);
  850. if (options.CellId_) {
  851. header.AddParameter("cell_id", GetGuidAsString(*options.CellId_));
  852. }
  853. header.AddParameter("freeze", options.Freeze_);
  854. RetryRequestWithPolicy(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, header);
  855. }
  856. void TClient::UnmountTable(
  857. const TYPath& path,
  858. const TUnmountTableOptions& options)
  859. {
  860. CheckShutdown();
  861. THttpHeader header("POST", "unmount_table");
  862. SetTabletParams(header, path, options);
  863. header.AddParameter("force", options.Force_);
  864. RetryRequestWithPolicy(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, header);
  865. }
  866. void TClient::RemountTable(
  867. const TYPath& path,
  868. const TRemountTableOptions& options)
  869. {
  870. CheckShutdown();
  871. THttpHeader header("POST", "remount_table");
  872. SetTabletParams(header, path, options);
  873. RetryRequestWithPolicy(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, header);
  874. }
  875. void TClient::FreezeTable(
  876. const TYPath& path,
  877. const TFreezeTableOptions& options)
  878. {
  879. CheckShutdown();
  880. NRawClient::FreezeTable(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, path, options);
  881. }
  882. void TClient::UnfreezeTable(
  883. const TYPath& path,
  884. const TUnfreezeTableOptions& options)
  885. {
  886. CheckShutdown();
  887. NRawClient::UnfreezeTable(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, path, options);
  888. }
  889. void TClient::ReshardTable(
  890. const TYPath& path,
  891. const TVector<TKey>& keys,
  892. const TReshardTableOptions& options)
  893. {
  894. CheckShutdown();
  895. THttpHeader header("POST", "reshard_table");
  896. SetTabletParams(header, path, options);
  897. header.AddParameter("pivot_keys", BuildYsonNodeFluently().List(keys));
  898. RetryRequestWithPolicy(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, header);
  899. }
  900. void TClient::ReshardTable(
  901. const TYPath& path,
  902. i64 tabletCount,
  903. const TReshardTableOptions& options)
  904. {
  905. CheckShutdown();
  906. THttpHeader header("POST", "reshard_table");
  907. SetTabletParams(header, path, options);
  908. header.AddParameter("tablet_count", tabletCount);
  909. RetryRequestWithPolicy(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, header);
  910. }
  911. void TClient::InsertRows(
  912. const TYPath& path,
  913. const TNode::TListType& rows,
  914. const TInsertRowsOptions& options)
  915. {
  916. CheckShutdown();
  917. THttpHeader header("PUT", "insert_rows");
  918. header.SetInputFormat(TFormat::YsonBinary());
  919. // TODO: use corresponding raw request
  920. header.MergeParameters(SerializeParametersForInsertRows(Context_.Config->Prefix, path, options));
  921. auto body = NodeListToYsonString(rows);
  922. TRequestConfig config;
  923. config.IsHeavy = true;
  924. RetryRequestWithPolicy(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, header, body, config);
  925. }
  926. void TClient::DeleteRows(
  927. const TYPath& path,
  928. const TNode::TListType& keys,
  929. const TDeleteRowsOptions& options)
  930. {
  931. CheckShutdown();
  932. return NRawClient::DeleteRows(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, path, keys, options);
  933. }
  934. void TClient::TrimRows(
  935. const TYPath& path,
  936. i64 tabletIndex,
  937. i64 rowCount,
  938. const TTrimRowsOptions& options)
  939. {
  940. CheckShutdown();
  941. THttpHeader header("POST", "trim_rows");
  942. header.AddParameter("trimmed_row_count", rowCount);
  943. header.AddParameter("tablet_index", tabletIndex);
  944. // TODO: use corresponding raw request
  945. header.MergeParameters(NRawClient::SerializeParametersForTrimRows(Context_.Config->Prefix, path, options));
  946. TRequestConfig config;
  947. config.IsHeavy = true;
  948. RetryRequestWithPolicy(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, header, {}, config);
  949. }
  950. TNode::TListType TClient::LookupRows(
  951. const TYPath& path,
  952. const TNode::TListType& keys,
  953. const TLookupRowsOptions& options)
  954. {
  955. CheckShutdown();
  956. Y_UNUSED(options);
  957. THttpHeader header("PUT", "lookup_rows");
  958. header.AddPath(AddPathPrefix(path, Context_.Config->ApiVersion));
  959. header.SetInputFormat(TFormat::YsonBinary());
  960. header.SetOutputFormat(TFormat::YsonBinary());
  961. header.MergeParameters(BuildYsonNodeFluently().BeginMap()
  962. .DoIf(options.Timeout_.Defined(), [&] (TFluentMap fluent) {
  963. fluent.Item("timeout").Value(static_cast<i64>(options.Timeout_->MilliSeconds()));
  964. })
  965. .Item("keep_missing_rows").Value(options.KeepMissingRows_)
  966. .DoIf(options.Versioned_.Defined(), [&] (TFluentMap fluent) {
  967. fluent.Item("versioned").Value(*options.Versioned_);
  968. })
  969. .DoIf(options.Columns_.Defined(), [&] (TFluentMap fluent) {
  970. fluent.Item("column_names").Value(*options.Columns_);
  971. })
  972. .EndMap());
  973. auto body = NodeListToYsonString(keys);
  974. TRequestConfig config;
  975. config.IsHeavy = true;
  976. auto result = RetryRequestWithPolicy(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, header, body, config);
  977. return NodeFromYsonString(result.Response, ::NYson::EYsonType::ListFragment).AsList();
  978. }
  979. TNode::TListType TClient::SelectRows(
  980. const TString& query,
  981. const TSelectRowsOptions& options)
  982. {
  983. CheckShutdown();
  984. THttpHeader header("GET", "select_rows");
  985. header.SetInputFormat(TFormat::YsonBinary());
  986. header.SetOutputFormat(TFormat::YsonBinary());
  987. header.MergeParameters(BuildYsonNodeFluently().BeginMap()
  988. .Item("query").Value(query)
  989. .DoIf(options.Timeout_.Defined(), [&] (TFluentMap fluent) {
  990. fluent.Item("timeout").Value(static_cast<i64>(options.Timeout_->MilliSeconds()));
  991. })
  992. .DoIf(options.InputRowLimit_.Defined(), [&] (TFluentMap fluent) {
  993. fluent.Item("input_row_limit").Value(*options.InputRowLimit_);
  994. })
  995. .DoIf(options.OutputRowLimit_.Defined(), [&] (TFluentMap fluent) {
  996. fluent.Item("output_row_limit").Value(*options.OutputRowLimit_);
  997. })
  998. .Item("range_expansion_limit").Value(options.RangeExpansionLimit_)
  999. .Item("fail_on_incomplete_result").Value(options.FailOnIncompleteResult_)
  1000. .Item("verbose_logging").Value(options.VerboseLogging_)
  1001. .Item("enable_code_cache").Value(options.EnableCodeCache_)
  1002. .EndMap());
  1003. TRequestConfig config;
  1004. config.IsHeavy = true;
  1005. auto result = RetryRequestWithPolicy(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, header, {}, config);
  1006. return NodeFromYsonString(result.Response, ::NYson::EYsonType::ListFragment).AsList();
  1007. }
  1008. void TClient::AlterTableReplica(const TReplicaId& replicaId, const TAlterTableReplicaOptions& options)
  1009. {
  1010. CheckShutdown();
  1011. NRawClient::AlterTableReplica(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, replicaId, options);
  1012. }
  1013. ui64 TClient::GenerateTimestamp()
  1014. {
  1015. CheckShutdown();
  1016. THttpHeader header("GET", "generate_timestamp");
  1017. TRequestConfig config;
  1018. config.IsHeavy = true;
  1019. auto requestResult = RetryRequestWithPolicy(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, header, {}, config);
  1020. return NodeFromYsonString(requestResult.Response).AsUint64();
  1021. }
  1022. TAuthorizationInfo TClient::WhoAmI()
  1023. {
  1024. CheckShutdown();
  1025. THttpHeader header("GET", "auth/whoami", /* isApi = */ false);
  1026. auto requestResult = RetryRequestWithPolicy(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, header);
  1027. TAuthorizationInfo result;
  1028. NJson::TJsonValue jsonValue;
  1029. bool ok = NJson::ReadJsonTree(requestResult.Response, &jsonValue, /* throwOnError = */ true);
  1030. Y_ABORT_UNLESS(ok);
  1031. result.Login = jsonValue["login"].GetString();
  1032. result.Realm = jsonValue["realm"].GetString();
  1033. return result;
  1034. }
  1035. TOperationAttributes TClient::GetOperation(
  1036. const TOperationId& operationId,
  1037. const TGetOperationOptions& options)
  1038. {
  1039. CheckShutdown();
  1040. return NRawClient::GetOperation(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, operationId, options);
  1041. }
  1042. TOperationAttributes TClient::GetOperation(
  1043. const TString& alias,
  1044. const TGetOperationOptions& options)
  1045. {
  1046. CheckShutdown();
  1047. return NRawClient::GetOperation(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, alias, options);
  1048. }
  1049. TListOperationsResult TClient::ListOperations(
  1050. const TListOperationsOptions& options)
  1051. {
  1052. CheckShutdown();
  1053. return NRawClient::ListOperations(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, options);
  1054. }
  1055. void TClient::UpdateOperationParameters(
  1056. const TOperationId& operationId,
  1057. const TUpdateOperationParametersOptions& options)
  1058. {
  1059. CheckShutdown();
  1060. return NRawClient::UpdateOperationParameters(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, operationId, options);
  1061. }
  1062. TJobAttributes TClient::GetJob(
  1063. const TOperationId& operationId,
  1064. const TJobId& jobId,
  1065. const TGetJobOptions& options)
  1066. {
  1067. CheckShutdown();
  1068. return NRawClient::GetJob(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, operationId, jobId, options);
  1069. }
  1070. TListJobsResult TClient::ListJobs(
  1071. const TOperationId& operationId,
  1072. const TListJobsOptions& options)
  1073. {
  1074. CheckShutdown();
  1075. return NRawClient::ListJobs(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, operationId, options);
  1076. }
  1077. IFileReaderPtr TClient::GetJobInput(
  1078. const TJobId& jobId,
  1079. const TGetJobInputOptions& options)
  1080. {
  1081. CheckShutdown();
  1082. return NRawClient::GetJobInput(Context_, jobId, options);
  1083. }
  1084. IFileReaderPtr TClient::GetJobFailContext(
  1085. const TOperationId& operationId,
  1086. const TJobId& jobId,
  1087. const TGetJobFailContextOptions& options)
  1088. {
  1089. CheckShutdown();
  1090. return NRawClient::GetJobFailContext(Context_, operationId, jobId, options);
  1091. }
  1092. IFileReaderPtr TClient::GetJobStderr(
  1093. const TOperationId& operationId,
  1094. const TJobId& jobId,
  1095. const TGetJobStderrOptions& options)
  1096. {
  1097. CheckShutdown();
  1098. return NRawClient::GetJobStderr(Context_, operationId, jobId, options);
  1099. }
  1100. TNode::TListType TClient::SkyShareTable(
  1101. const std::vector<TYPath>& tablePaths,
  1102. const TSkyShareTableOptions& options)
  1103. {
  1104. CheckShutdown();
  1105. return NRawClient::SkyShareTable(
  1106. ClientRetryPolicy_->CreatePolicyForGenericRequest(),
  1107. Context_,
  1108. tablePaths,
  1109. options);
  1110. }
  1111. TCheckPermissionResponse TClient::CheckPermission(
  1112. const TString& user,
  1113. EPermission permission,
  1114. const TYPath& path,
  1115. const TCheckPermissionOptions& options)
  1116. {
  1117. CheckShutdown();
  1118. return NRawClient::CheckPermission(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, user, permission, path, options);
  1119. }
  1120. TVector<TTabletInfo> TClient::GetTabletInfos(
  1121. const TYPath& path,
  1122. const TVector<int>& tabletIndexes,
  1123. const TGetTabletInfosOptions& options)
  1124. {
  1125. CheckShutdown();
  1126. return NRawClient::GetTabletInfos(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, path, tabletIndexes, options);
  1127. }
  1128. void TClient::SuspendOperation(
  1129. const TOperationId& operationId,
  1130. const TSuspendOperationOptions& options)
  1131. {
  1132. CheckShutdown();
  1133. NRawClient::SuspendOperation(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, operationId, options);
  1134. }
  1135. void TClient::ResumeOperation(
  1136. const TOperationId& operationId,
  1137. const TResumeOperationOptions& options)
  1138. {
  1139. CheckShutdown();
  1140. NRawClient::ResumeOperation(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, operationId, options);
  1141. }
  1142. TYtPoller& TClient::GetYtPoller()
  1143. {
  1144. auto g = Guard(Lock_);
  1145. if (!YtPoller_) {
  1146. CheckShutdown();
  1147. // We don't use current client and create new client because YtPoller_ might use
  1148. // this client during current client shutdown.
  1149. // That might lead to incrementing of current client refcount and double delete of current client object.
  1150. YtPoller_ = MakeHolder<TYtPoller>(Context_, ClientRetryPolicy_);
  1151. }
  1152. return *YtPoller_;
  1153. }
  1154. void TClient::Shutdown()
  1155. {
  1156. auto g = Guard(Lock_);
  1157. if (!Shutdown_.exchange(true) && YtPoller_) {
  1158. YtPoller_->Stop();
  1159. }
  1160. }
  1161. ITransactionPingerPtr TClient::GetTransactionPinger()
  1162. {
  1163. auto g = Guard(Lock_);
  1164. if (!TransactionPinger_) {
  1165. TransactionPinger_ = CreateTransactionPinger(Context_.Config);
  1166. }
  1167. return TransactionPinger_;
  1168. }
  1169. TClientPtr TClient::GetParentClientImpl()
  1170. {
  1171. return this;
  1172. }
  1173. template <class TOptions>
  1174. void TClient::SetTabletParams(
  1175. THttpHeader& header,
  1176. const TYPath& path,
  1177. const TOptions& options)
  1178. {
  1179. header.AddPath(AddPathPrefix(path, Context_.Config->Prefix));
  1180. if (options.FirstTabletIndex_) {
  1181. header.AddParameter("first_tablet_index", *options.FirstTabletIndex_);
  1182. }
  1183. if (options.LastTabletIndex_) {
  1184. header.AddParameter("last_tablet_index", *options.LastTabletIndex_);
  1185. }
  1186. }
  1187. void TClient::CheckShutdown() const
  1188. {
  1189. if (Shutdown_) {
  1190. ythrow TApiUsageError() << "Call client's methods after shutdown";
  1191. }
  1192. }
  1193. TClientPtr CreateClientImpl(
  1194. const TString& serverName,
  1195. const TCreateClientOptions& options)
  1196. {
  1197. TClientContext context;
  1198. context.Config = options.Config_ ? options.Config_ : TConfig::Get();
  1199. context.TvmOnly = options.TvmOnly_;
  1200. context.ProxyAddress = options.ProxyAddress_;
  1201. context.ServerName = serverName;
  1202. ApplyProxyUrlAliasingRules(context.ServerName);
  1203. if (context.ServerName.find('.') == TString::npos &&
  1204. context.ServerName.find(':') == TString::npos &&
  1205. context.ServerName.find("localhost") == TString::npos)
  1206. {
  1207. context.ServerName += ".yt.yandex.net";
  1208. }
  1209. static constexpr char httpUrlSchema[] = "http://";
  1210. static constexpr char httpsUrlSchema[] = "https://";
  1211. if (options.UseTLS_) {
  1212. context.UseTLS = *options.UseTLS_;
  1213. } else {
  1214. context.UseTLS = context.ServerName.StartsWith(httpsUrlSchema);
  1215. }
  1216. if (context.ServerName.StartsWith(httpUrlSchema)) {
  1217. if (context.UseTLS) {
  1218. ythrow TApiUsageError() << "URL schema doesn't match UseTLS option";
  1219. }
  1220. context.ServerName.erase(0, sizeof(httpUrlSchema) - 1);
  1221. }
  1222. if (context.ServerName.StartsWith(httpsUrlSchema)) {
  1223. if (!context.UseTLS) {
  1224. ythrow TApiUsageError() << "URL schema doesn't match UseTLS option";
  1225. }
  1226. context.ServerName.erase(0, sizeof(httpsUrlSchema) - 1);
  1227. }
  1228. if (context.ServerName.find(':') == TString::npos) {
  1229. context.ServerName = CreateHostNameWithPort(context.ServerName, context);
  1230. }
  1231. if (options.TvmOnly_) {
  1232. context.ServerName = Format("tvm.%v", context.ServerName);
  1233. }
  1234. if (context.UseTLS || options.UseCoreHttpClient_) {
  1235. context.HttpClient = NHttpClient::CreateCoreHttpClient(context.UseTLS, context.Config);
  1236. } else {
  1237. context.HttpClient = NHttpClient::CreateDefaultHttpClient();
  1238. }
  1239. context.Token = context.Config->Token;
  1240. if (options.Token_) {
  1241. context.Token = options.Token_;
  1242. } else if (options.TokenPath_) {
  1243. context.Token = TConfig::LoadTokenFromFile(options.TokenPath_);
  1244. } else if (options.ServiceTicketAuth_) {
  1245. context.ServiceTicketAuth = options.ServiceTicketAuth_;
  1246. }
  1247. context.ImpersonationUser = options.ImpersonationUser_;
  1248. if (context.Token) {
  1249. TConfig::ValidateToken(context.Token);
  1250. }
  1251. auto globalTxId = GetGuid(context.Config->GlobalTxId);
  1252. auto retryConfigProvider = options.RetryConfigProvider_;
  1253. if (!retryConfigProvider) {
  1254. retryConfigProvider = CreateDefaultRetryConfigProvider();
  1255. }
  1256. return new NDetail::TClient(context, globalTxId, CreateDefaultClientRetryPolicy(retryConfigProvider, context.Config));
  1257. }
  1258. ////////////////////////////////////////////////////////////////////////////////
  1259. } // namespace NDetail
  1260. ////////////////////////////////////////////////////////////////////////////////
  1261. IClientPtr CreateClient(
  1262. const TString& serverName,
  1263. const TCreateClientOptions& options)
  1264. {
  1265. return NDetail::CreateClientImpl(serverName, options);
  1266. }
  1267. IClientPtr CreateClientFromEnv(const TCreateClientOptions& options)
  1268. {
  1269. auto serverName = GetEnv("YT_PROXY");
  1270. if (!serverName) {
  1271. ythrow yexception() << "YT_PROXY is not set";
  1272. }
  1273. return NDetail::CreateClientImpl(serverName, options);
  1274. }
  1275. ////////////////////////////////////////////////////////////////////////////////
  1276. } // namespace NYT