client.cpp 46 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460
  1. #include "client.h"
  2. #include "batch_request_impl.h"
  3. #include "client_reader.h"
  4. #include "client_writer.h"
  5. #include "file_reader.h"
  6. #include "file_writer.h"
  7. #include "format_hints.h"
  8. #include "lock.h"
  9. #include "operation.h"
  10. #include "retry_transaction.h"
  11. #include "retryful_writer.h"
  12. #include "transaction.h"
  13. #include "transaction_pinger.h"
  14. #include "yt_poller.h"
  15. #include <yt/cpp/mapreduce/common/helpers.h>
  16. #include <yt/cpp/mapreduce/common/retry_lib.h>
  17. #include <yt/cpp/mapreduce/http/helpers.h>
  18. #include <yt/cpp/mapreduce/http/http.h>
  19. #include <yt/cpp/mapreduce/http/http_client.h>
  20. #include <yt/cpp/mapreduce/http/requests.h>
  21. #include <yt/cpp/mapreduce/http/retry_request.h>
  22. #include <yt/cpp/mapreduce/interface/config.h>
  23. #include <yt/cpp/mapreduce/interface/client.h>
  24. #include <yt/cpp/mapreduce/interface/error_codes.h>
  25. #include <yt/cpp/mapreduce/interface/fluent.h>
  26. #include <yt/cpp/mapreduce/interface/logging/yt_log.h>
  27. #include <yt/cpp/mapreduce/interface/skiff_row.h>
  28. #include <yt/cpp/mapreduce/io/yamr_table_reader.h>
  29. #include <yt/cpp/mapreduce/io/yamr_table_writer.h>
  30. #include <yt/cpp/mapreduce/io/node_table_reader.h>
  31. #include <yt/cpp/mapreduce/io/node_table_writer.h>
  32. #include <yt/cpp/mapreduce/io/proto_table_reader.h>
  33. #include <yt/cpp/mapreduce/io/proto_table_writer.h>
  34. #include <yt/cpp/mapreduce/io/skiff_row_table_reader.h>
  35. #include <yt/cpp/mapreduce/io/proto_helpers.h>
  36. #include <yt/cpp/mapreduce/library/table_schema/protobuf.h>
  37. #include <yt/cpp/mapreduce/raw_client/raw_requests.h>
  38. #include <yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.h>
  39. #include <yt/yt/core/ytree/fluent.h>
  40. #include <library/cpp/json/json_reader.h>
  41. #include <util/generic/algorithm.h>
  42. #include <util/string/type.h>
  43. #include <util/system/env.h>
  44. #include <exception>
  45. using namespace NYT::NDetail::NRawClient;
  46. namespace NYT {
  47. ////////////////////////////////////////////////////////////////////////////////
  48. namespace NDetail {
  49. ////////////////////////////////////////////////////////////////////////////////
  50. namespace {
  51. ////////////////////////////////////////////////////////////////////////////////
  52. THashMap<TString, TString> ParseProxyUrlAliasingRules(TString envConfig)
  53. {
  54. if (envConfig.empty()) {
  55. return {};
  56. }
  57. return NYTree::ConvertTo<THashMap<TString, TString>>(NYson::TYsonString(envConfig));
  58. }
  59. void ApplyProxyUrlAliasingRules(TString& url)
  60. {
  61. static auto rules = ParseProxyUrlAliasingRules(GetEnv("YT_PROXY_URL_ALIASING_CONFIG"));
  62. if (auto ruleIt = rules.find(url); ruleIt != rules.end()) {
  63. url = ruleIt->second;
  64. }
  65. }
  66. ////////////////////////////////////////////////////////////////////////////////
  67. } // namespace
  68. ////////////////////////////////////////////////////////////////////////////////
  69. TClientBase::TClientBase(
  70. const TClientContext& context,
  71. const TTransactionId& transactionId,
  72. IClientRetryPolicyPtr retryPolicy)
  73. : Context_(context)
  74. , TransactionId_(transactionId)
  75. , ClientRetryPolicy_(std::move(retryPolicy))
  76. { }
  77. ITransactionPtr TClientBase::StartTransaction(
  78. const TStartTransactionOptions& options)
  79. {
  80. return MakeIntrusive<TTransaction>(GetParentClientImpl(), Context_, TransactionId_, options);
  81. }
  82. TNodeId TClientBase::Create(
  83. const TYPath& path,
  84. ENodeType type,
  85. const TCreateOptions& options)
  86. {
  87. return NRawClient::Create(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, TransactionId_, path, type, options);
  88. }
  89. void TClientBase::Remove(
  90. const TYPath& path,
  91. const TRemoveOptions& options)
  92. {
  93. return NRawClient::Remove(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, TransactionId_, path, options);
  94. }
  95. bool TClientBase::Exists(
  96. const TYPath& path,
  97. const TExistsOptions& options)
  98. {
  99. return NRawClient::Exists(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, TransactionId_, path, options);
  100. }
  101. TNode TClientBase::Get(
  102. const TYPath& path,
  103. const TGetOptions& options)
  104. {
  105. return NRawClient::Get(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, TransactionId_, path, options);
  106. }
  107. void TClientBase::Set(
  108. const TYPath& path,
  109. const TNode& value,
  110. const TSetOptions& options)
  111. {
  112. NRawClient::Set(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, TransactionId_, path, value, options);
  113. }
  114. void TClientBase::MultisetAttributes(
  115. const TYPath& path, const TNode::TMapType& value, const TMultisetAttributesOptions& options)
  116. {
  117. NRawClient::MultisetAttributes(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, TransactionId_, path, value, options);
  118. }
  119. TNode::TListType TClientBase::List(
  120. const TYPath& path,
  121. const TListOptions& options)
  122. {
  123. return NRawClient::List(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, TransactionId_, path, options);
  124. }
  125. TNodeId TClientBase::Copy(
  126. const TYPath& sourcePath,
  127. const TYPath& destinationPath,
  128. const TCopyOptions& options)
  129. {
  130. try {
  131. return NRawClient::CopyInsideMasterCell(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, TransactionId_, sourcePath, destinationPath, options);
  132. } catch (const TErrorResponse& e) {
  133. if (e.GetError().ContainsErrorCode(NClusterErrorCodes::NObjectClient::CrossCellAdditionalPath)) {
  134. // Do transaction for cross cell copying.
  135. std::function<TNodeId(ITransactionPtr)> lambda = [this, &sourcePath, &destinationPath, &options](ITransactionPtr transaction) {
  136. return NRawClient::CopyWithoutRetries(Context_, transaction->GetId(), sourcePath, destinationPath, options);
  137. };
  138. return RetryTransactionWithPolicy<TNodeId>(
  139. this,
  140. lambda,
  141. ClientRetryPolicy_->CreatePolicyForGenericRequest()
  142. );
  143. } else {
  144. throw;
  145. }
  146. }
  147. }
  148. TNodeId TClientBase::Move(
  149. const TYPath& sourcePath,
  150. const TYPath& destinationPath,
  151. const TMoveOptions& options)
  152. {
  153. try {
  154. return NRawClient::MoveInsideMasterCell(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, TransactionId_, sourcePath, destinationPath, options);
  155. } catch (const TErrorResponse& e) {
  156. if (e.GetError().ContainsErrorCode(NClusterErrorCodes::NObjectClient::CrossCellAdditionalPath)) {
  157. // Do transaction for cross cell moving.
  158. std::function<TNodeId(ITransactionPtr)> lambda = [this, &sourcePath, &destinationPath, &options](ITransactionPtr transaction) {
  159. return NRawClient::MoveWithoutRetries(Context_, transaction->GetId(), sourcePath, destinationPath, options);
  160. };
  161. return RetryTransactionWithPolicy<TNodeId>(
  162. this,
  163. lambda,
  164. ClientRetryPolicy_->CreatePolicyForGenericRequest()
  165. );
  166. } else {
  167. throw;
  168. }
  169. }
  170. }
  171. TNodeId TClientBase::Link(
  172. const TYPath& targetPath,
  173. const TYPath& linkPath,
  174. const TLinkOptions& options)
  175. {
  176. return NRawClient::Link(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, TransactionId_, targetPath, linkPath, options);
  177. }
  178. void TClientBase::Concatenate(
  179. const TVector<TRichYPath>& sourcePaths,
  180. const TRichYPath& destinationPath,
  181. const TConcatenateOptions& options)
  182. {
  183. std::function<void(ITransactionPtr)> lambda = [&sourcePaths, &destinationPath, &options, this](ITransactionPtr transaction) {
  184. if (!options.Append_ && !sourcePaths.empty() && !transaction->Exists(destinationPath.Path_)) {
  185. auto typeNode = transaction->Get(CanonizeYPath(sourcePaths.front()).Path_ + "/@type");
  186. auto type = FromString<ENodeType>(typeNode.AsString());
  187. transaction->Create(destinationPath.Path_, type, TCreateOptions().IgnoreExisting(true));
  188. }
  189. NRawClient::Concatenate(this->Context_, transaction->GetId(), sourcePaths, destinationPath, options);
  190. };
  191. RetryTransactionWithPolicy(this, lambda, ClientRetryPolicy_->CreatePolicyForGenericRequest());
  192. }
  193. TRichYPath TClientBase::CanonizeYPath(const TRichYPath& path)
  194. {
  195. return NRawClient::CanonizeYPath(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, path);
  196. }
  197. TVector<TTableColumnarStatistics> TClientBase::GetTableColumnarStatistics(
  198. const TVector<TRichYPath>& paths,
  199. const TGetTableColumnarStatisticsOptions& options)
  200. {
  201. return NRawClient::GetTableColumnarStatistics(
  202. ClientRetryPolicy_->CreatePolicyForGenericRequest(),
  203. Context_,
  204. TransactionId_,
  205. paths,
  206. options);
  207. }
  208. TMultiTablePartitions TClientBase::GetTablePartitions(
  209. const TVector<TRichYPath>& paths,
  210. const TGetTablePartitionsOptions& options)
  211. {
  212. return NRawClient::GetTablePartitions(
  213. ClientRetryPolicy_->CreatePolicyForGenericRequest(),
  214. Context_,
  215. TransactionId_,
  216. paths,
  217. options);
  218. }
  219. TMaybe<TYPath> TClientBase::GetFileFromCache(
  220. const TString& md5Signature,
  221. const TYPath& cachePath,
  222. const TGetFileFromCacheOptions& options)
  223. {
  224. return NRawClient::GetFileFromCache(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, TransactionId_, md5Signature, cachePath, options);
  225. }
  226. TYPath TClientBase::PutFileToCache(
  227. const TYPath& filePath,
  228. const TString& md5Signature,
  229. const TYPath& cachePath,
  230. const TPutFileToCacheOptions& options)
  231. {
  232. return NRawClient::PutFileToCache(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, TransactionId_, filePath, md5Signature, cachePath, options);
  233. }
  234. IFileReaderPtr TClientBase::CreateBlobTableReader(
  235. const TYPath& path,
  236. const TKey& key,
  237. const TBlobTableReaderOptions& options)
  238. {
  239. return new TBlobTableReader(
  240. path,
  241. key,
  242. ClientRetryPolicy_,
  243. GetTransactionPinger(),
  244. Context_,
  245. TransactionId_,
  246. options);
  247. }
  248. IFileReaderPtr TClientBase::CreateFileReader(
  249. const TRichYPath& path,
  250. const TFileReaderOptions& options)
  251. {
  252. return new TFileReader(
  253. CanonizeYPath(path),
  254. ClientRetryPolicy_,
  255. GetTransactionPinger(),
  256. Context_,
  257. TransactionId_,
  258. options);
  259. }
  260. IFileWriterPtr TClientBase::CreateFileWriter(
  261. const TRichYPath& path,
  262. const TFileWriterOptions& options)
  263. {
  264. auto realPath = CanonizeYPath(path);
  265. if (!NRawClient::Exists(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, TransactionId_, realPath.Path_)) {
  266. NRawClient::Create(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, TransactionId_, realPath.Path_, NT_FILE,
  267. TCreateOptions().IgnoreExisting(true));
  268. }
  269. return new TFileWriter(realPath, ClientRetryPolicy_, GetTransactionPinger(), Context_, TransactionId_, options);
  270. }
  271. TTableWriterPtr<::google::protobuf::Message> TClientBase::CreateTableWriter(
  272. const TRichYPath& path, const ::google::protobuf::Descriptor& descriptor, const TTableWriterOptions& options)
  273. {
  274. const Message* prototype = google::protobuf::MessageFactory::generated_factory()->GetPrototype(&descriptor);
  275. return new TTableWriter<::google::protobuf::Message>(CreateProtoWriter(path, options, prototype));
  276. }
  277. TRawTableReaderPtr TClientBase::CreateRawReader(
  278. const TRichYPath& path,
  279. const TFormat& format,
  280. const TTableReaderOptions& options)
  281. {
  282. return CreateClientReader(path, format, options).Get();
  283. }
  284. TRawTableWriterPtr TClientBase::CreateRawWriter(
  285. const TRichYPath& path,
  286. const TFormat& format,
  287. const TTableWriterOptions& options)
  288. {
  289. return ::MakeIntrusive<TRetryfulWriter>(
  290. ClientRetryPolicy_,
  291. GetTransactionPinger(),
  292. Context_,
  293. TransactionId_,
  294. GetWriteTableCommand(Context_.Config->ApiVersion),
  295. format,
  296. CanonizeYPath(path),
  297. options).Get();
  298. }
  299. IOperationPtr TClientBase::DoMap(
  300. const TMapOperationSpec& spec,
  301. ::TIntrusivePtr<IStructuredJob> mapper,
  302. const TOperationOptions& options)
  303. {
  304. auto operation = ::MakeIntrusive<TOperation>(GetParentClientImpl());
  305. auto prepareOperation = [
  306. this_ = ::TIntrusivePtr(this),
  307. operation,
  308. spec,
  309. mapper,
  310. options
  311. ] () {
  312. ExecuteMap(
  313. operation,
  314. ::MakeIntrusive<TOperationPreparer>(this_->GetParentClientImpl(), this_->TransactionId_),
  315. spec,
  316. mapper,
  317. options);
  318. };
  319. return ProcessOperation(GetParentClientImpl(), std::move(prepareOperation), std::move(operation), options);
  320. }
  321. IOperationPtr TClientBase::RawMap(
  322. const TRawMapOperationSpec& spec,
  323. ::TIntrusivePtr<IRawJob> mapper,
  324. const TOperationOptions& options)
  325. {
  326. auto operation = ::MakeIntrusive<TOperation>(GetParentClientImpl());
  327. auto prepareOperation = [
  328. this_=::TIntrusivePtr(this),
  329. operation,
  330. spec,
  331. mapper,
  332. options
  333. ] () {
  334. ExecuteRawMap(
  335. operation,
  336. ::MakeIntrusive<TOperationPreparer>(this_->GetParentClientImpl(), this_->TransactionId_),
  337. spec,
  338. mapper,
  339. options);
  340. };
  341. return ProcessOperation(GetParentClientImpl(), std::move(prepareOperation), std::move(operation), options);
  342. }
  343. IOperationPtr TClientBase::DoReduce(
  344. const TReduceOperationSpec& spec,
  345. ::TIntrusivePtr<IStructuredJob> reducer,
  346. const TOperationOptions& options)
  347. {
  348. auto operation = ::MakeIntrusive<TOperation>(GetParentClientImpl());
  349. auto prepareOperation = [
  350. this_=::TIntrusivePtr(this),
  351. operation,
  352. spec,
  353. reducer,
  354. options
  355. ] () {
  356. ExecuteReduce(
  357. operation,
  358. ::MakeIntrusive<TOperationPreparer>(this_->GetParentClientImpl(), this_->TransactionId_),
  359. spec,
  360. reducer,
  361. options);
  362. };
  363. return ProcessOperation(GetParentClientImpl(), std::move(prepareOperation), std::move(operation), options);
  364. }
  365. IOperationPtr TClientBase::RawReduce(
  366. const TRawReduceOperationSpec& spec,
  367. ::TIntrusivePtr<IRawJob> reducer,
  368. const TOperationOptions& options)
  369. {
  370. auto operation = ::MakeIntrusive<TOperation>(GetParentClientImpl());
  371. auto prepareOperation = [
  372. this_=::TIntrusivePtr(this),
  373. operation,
  374. spec,
  375. reducer,
  376. options
  377. ] () {
  378. ExecuteRawReduce(
  379. operation,
  380. ::MakeIntrusive<TOperationPreparer>(this_->GetParentClientImpl(), this_->TransactionId_),
  381. spec,
  382. reducer,
  383. options);
  384. };
  385. return ProcessOperation(GetParentClientImpl(), std::move(prepareOperation), std::move(operation), options);
  386. }
  387. IOperationPtr TClientBase::DoJoinReduce(
  388. const TJoinReduceOperationSpec& spec,
  389. ::TIntrusivePtr<IStructuredJob> reducer,
  390. const TOperationOptions& options)
  391. {
  392. auto operation = ::MakeIntrusive<TOperation>(GetParentClientImpl());
  393. auto prepareOperation = [
  394. this_=::TIntrusivePtr(this),
  395. operation,
  396. spec,
  397. reducer,
  398. options
  399. ] () {
  400. ExecuteJoinReduce(
  401. operation,
  402. ::MakeIntrusive<TOperationPreparer>(this_->GetParentClientImpl(), this_->TransactionId_),
  403. spec,
  404. reducer,
  405. options);
  406. };
  407. return ProcessOperation(GetParentClientImpl(), std::move(prepareOperation), std::move(operation), options);
  408. }
  409. IOperationPtr TClientBase::RawJoinReduce(
  410. const TRawJoinReduceOperationSpec& spec,
  411. ::TIntrusivePtr<IRawJob> reducer,
  412. const TOperationOptions& options)
  413. {
  414. auto operation = ::MakeIntrusive<TOperation>(GetParentClientImpl());
  415. auto prepareOperation = [
  416. this_=::TIntrusivePtr(this),
  417. operation,
  418. spec,
  419. reducer,
  420. options
  421. ] () {
  422. ExecuteRawJoinReduce(
  423. operation,
  424. ::MakeIntrusive<TOperationPreparer>(this_->GetParentClientImpl(), this_->TransactionId_),
  425. spec,
  426. reducer,
  427. options);
  428. };
  429. return ProcessOperation(GetParentClientImpl(), std::move(prepareOperation), std::move(operation), options);
  430. }
  431. IOperationPtr TClientBase::DoMapReduce(
  432. const TMapReduceOperationSpec& spec,
  433. ::TIntrusivePtr<IStructuredJob> mapper,
  434. ::TIntrusivePtr<IStructuredJob> reduceCombiner,
  435. ::TIntrusivePtr<IStructuredJob> reducer,
  436. const TOperationOptions& options)
  437. {
  438. auto operation = ::MakeIntrusive<TOperation>(GetParentClientImpl());
  439. auto prepareOperation = [
  440. this_=::TIntrusivePtr(this),
  441. operation,
  442. spec,
  443. mapper,
  444. reduceCombiner,
  445. reducer,
  446. options
  447. ] () {
  448. ExecuteMapReduce(
  449. operation,
  450. ::MakeIntrusive<TOperationPreparer>(this_->GetParentClientImpl(), this_->TransactionId_),
  451. spec,
  452. mapper,
  453. reduceCombiner,
  454. reducer,
  455. options);
  456. };
  457. return ProcessOperation(GetParentClientImpl(), std::move(prepareOperation), std::move(operation), options);
  458. }
  459. IOperationPtr TClientBase::RawMapReduce(
  460. const TRawMapReduceOperationSpec& spec,
  461. ::TIntrusivePtr<IRawJob> mapper,
  462. ::TIntrusivePtr<IRawJob> reduceCombiner,
  463. ::TIntrusivePtr<IRawJob> reducer,
  464. const TOperationOptions& options)
  465. {
  466. auto operation = ::MakeIntrusive<TOperation>(GetParentClientImpl());
  467. auto prepareOperation = [
  468. this_=::TIntrusivePtr(this),
  469. operation,
  470. spec,
  471. mapper,
  472. reduceCombiner,
  473. reducer,
  474. options
  475. ] () {
  476. ExecuteRawMapReduce(
  477. operation,
  478. ::MakeIntrusive<TOperationPreparer>(this_->GetParentClientImpl(), this_->TransactionId_),
  479. spec,
  480. mapper,
  481. reduceCombiner,
  482. reducer,
  483. options);
  484. };
  485. return ProcessOperation(GetParentClientImpl(), std::move(prepareOperation), std::move(operation), options);
  486. }
  487. IOperationPtr TClientBase::Sort(
  488. const TSortOperationSpec& spec,
  489. const TOperationOptions& options)
  490. {
  491. auto operation = ::MakeIntrusive<TOperation>(GetParentClientImpl());
  492. auto prepareOperation = [
  493. this_ = ::TIntrusivePtr(this),
  494. operation,
  495. spec,
  496. options
  497. ] () {
  498. ExecuteSort(
  499. operation,
  500. ::MakeIntrusive<TOperationPreparer>(this_->GetParentClientImpl(), this_->TransactionId_),
  501. spec,
  502. options);
  503. };
  504. return ProcessOperation(GetParentClientImpl(), std::move(prepareOperation), std::move(operation), options);
  505. }
  506. IOperationPtr TClientBase::Merge(
  507. const TMergeOperationSpec& spec,
  508. const TOperationOptions& options)
  509. {
  510. auto operation = ::MakeIntrusive<TOperation>(GetParentClientImpl());
  511. auto prepareOperation = [
  512. this_ = ::TIntrusivePtr(this),
  513. operation,
  514. spec,
  515. options
  516. ] () {
  517. ExecuteMerge(
  518. operation,
  519. ::MakeIntrusive<TOperationPreparer>(this_->GetParentClientImpl(), this_->TransactionId_),
  520. spec,
  521. options);
  522. };
  523. return ProcessOperation(GetParentClientImpl(), std::move(prepareOperation), std::move(operation), options);
  524. }
  525. IOperationPtr TClientBase::Erase(
  526. const TEraseOperationSpec& spec,
  527. const TOperationOptions& options)
  528. {
  529. auto operation = ::MakeIntrusive<TOperation>(GetParentClientImpl());
  530. auto prepareOperation = [
  531. this_ = ::TIntrusivePtr(this),
  532. operation,
  533. spec,
  534. options
  535. ] () {
  536. ExecuteErase(
  537. operation,
  538. ::MakeIntrusive<TOperationPreparer>(this_->GetParentClientImpl(), this_->TransactionId_),
  539. spec,
  540. options);
  541. };
  542. return ProcessOperation(GetParentClientImpl(), std::move(prepareOperation), std::move(operation), options);
  543. }
  544. IOperationPtr TClientBase::RemoteCopy(
  545. const TRemoteCopyOperationSpec& spec,
  546. const TOperationOptions& options)
  547. {
  548. auto operation = ::MakeIntrusive<TOperation>(GetParentClientImpl());
  549. auto prepareOperation = [
  550. this_ = ::TIntrusivePtr(this),
  551. operation,
  552. spec,
  553. options
  554. ] () {
  555. ExecuteRemoteCopy(
  556. operation,
  557. ::MakeIntrusive<TOperationPreparer>(this_->GetParentClientImpl(), this_->TransactionId_),
  558. spec,
  559. options);
  560. };
  561. return ProcessOperation(GetParentClientImpl(), std::move(prepareOperation), std::move(operation), options);
  562. }
  563. IOperationPtr TClientBase::RunVanilla(
  564. const TVanillaOperationSpec& spec,
  565. const TOperationOptions& options)
  566. {
  567. auto operation = ::MakeIntrusive<TOperation>(GetParentClientImpl());
  568. auto prepareOperation = [
  569. this_ = ::TIntrusivePtr(this),
  570. operation,
  571. spec,
  572. options
  573. ] () {
  574. ExecuteVanilla(
  575. operation,
  576. ::MakeIntrusive<TOperationPreparer>(this_->GetParentClientImpl(), this_->TransactionId_),
  577. spec,
  578. options);
  579. };
  580. return ProcessOperation(GetParentClientImpl(), std::move(prepareOperation), std::move(operation), options);
  581. }
  582. IOperationPtr TClientBase::AttachOperation(const TOperationId& operationId)
  583. {
  584. auto operation = ::MakeIntrusive<TOperation>(operationId, GetParentClientImpl());
  585. operation->GetBriefState(); // check that operation exists
  586. return operation;
  587. }
  588. EOperationBriefState TClientBase::CheckOperation(const TOperationId& operationId)
  589. {
  590. return NYT::NDetail::CheckOperation(ClientRetryPolicy_, Context_, operationId);
  591. }
  592. void TClientBase::AbortOperation(const TOperationId& operationId)
  593. {
  594. NRawClient::AbortOperation(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, operationId);
  595. }
  596. void TClientBase::CompleteOperation(const TOperationId& operationId)
  597. {
  598. NRawClient::CompleteOperation(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, operationId);
  599. }
  600. void TClientBase::WaitForOperation(const TOperationId& operationId)
  601. {
  602. NYT::NDetail::WaitForOperation(ClientRetryPolicy_, Context_, operationId);
  603. }
  604. void TClientBase::AlterTable(
  605. const TYPath& path,
  606. const TAlterTableOptions& options)
  607. {
  608. NRawClient::AlterTable(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, TransactionId_, path, options);
  609. }
  610. ::TIntrusivePtr<TClientReader> TClientBase::CreateClientReader(
  611. const TRichYPath& path,
  612. const TFormat& format,
  613. const TTableReaderOptions& options,
  614. bool useFormatFromTableAttributes)
  615. {
  616. return ::MakeIntrusive<TClientReader>(
  617. CanonizeYPath(path),
  618. ClientRetryPolicy_,
  619. GetTransactionPinger(),
  620. Context_,
  621. TransactionId_,
  622. format,
  623. options,
  624. useFormatFromTableAttributes);
  625. }
  626. THolder<TClientWriter> TClientBase::CreateClientWriter(
  627. const TRichYPath& path,
  628. const TFormat& format,
  629. const TTableWriterOptions& options)
  630. {
  631. auto realPath = CanonizeYPath(path);
  632. if (!NRawClient::Exists(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, TransactionId_, realPath.Path_)) {
  633. NRawClient::Create(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, TransactionId_, realPath.Path_, NT_TABLE,
  634. TCreateOptions().IgnoreExisting(true));
  635. }
  636. return MakeHolder<TClientWriter>(
  637. realPath,
  638. ClientRetryPolicy_,
  639. GetTransactionPinger(),
  640. Context_,
  641. TransactionId_,
  642. format,
  643. options
  644. );
  645. }
  646. ::TIntrusivePtr<INodeReaderImpl> TClientBase::CreateNodeReader(
  647. const TRichYPath& path, const TTableReaderOptions& options)
  648. {
  649. auto format = TFormat::YsonBinary();
  650. ApplyFormatHints<TNode>(&format, options.FormatHints_);
  651. // Skiff is disabled here because of large header problem (see https://st.yandex-team.ru/YT-6926).
  652. // Revert this code to r3614168 when it is fixed.
  653. return new TNodeTableReader(
  654. CreateClientReader(path, format, options));
  655. }
  656. ::TIntrusivePtr<IYaMRReaderImpl> TClientBase::CreateYaMRReader(
  657. const TRichYPath& path, const TTableReaderOptions& options)
  658. {
  659. return new TYaMRTableReader(
  660. CreateClientReader(path, TFormat::YaMRLenval(), options, /* useFormatFromTableAttributes = */ true));
  661. }
  662. ::TIntrusivePtr<IProtoReaderImpl> TClientBase::CreateProtoReader(
  663. const TRichYPath& path,
  664. const TTableReaderOptions& options,
  665. const Message* prototype)
  666. {
  667. TVector<const ::google::protobuf::Descriptor*> descriptors;
  668. descriptors.push_back(prototype->GetDescriptor());
  669. if (Context_.Config->UseClientProtobuf) {
  670. return new TProtoTableReader(
  671. CreateClientReader(path, TFormat::YsonBinary(), options),
  672. std::move(descriptors));
  673. } else {
  674. auto format = TFormat::Protobuf({prototype->GetDescriptor()}, Context_.Config->ProtobufFormatWithDescriptors);
  675. return new TLenvalProtoTableReader(
  676. CreateClientReader(path, format, options),
  677. std::move(descriptors));
  678. }
  679. }
  680. ::TIntrusivePtr<ISkiffRowReaderImpl> TClientBase::CreateSkiffRowReader(
  681. const TRichYPath& path,
  682. const TTableReaderOptions& options,
  683. const ISkiffRowSkipperPtr& skipper,
  684. const NSkiff::TSkiffSchemaPtr& schema)
  685. {
  686. auto skiffOptions = TCreateSkiffSchemaOptions().HasRangeIndex(true);
  687. auto resultSchema = NYT::NDetail::CreateSkiffSchema(TVector{schema}, skiffOptions);
  688. return new TSkiffRowTableReader(
  689. CreateClientReader(path, NYT::NDetail::CreateSkiffFormat(resultSchema), options),
  690. resultSchema,
  691. {skipper},
  692. std::move(skiffOptions));
  693. }
  694. ::TIntrusivePtr<INodeWriterImpl> TClientBase::CreateNodeWriter(
  695. const TRichYPath& path, const TTableWriterOptions& options)
  696. {
  697. auto format = TFormat::YsonBinary();
  698. ApplyFormatHints<TNode>(&format, options.FormatHints_);
  699. return new TNodeTableWriter(
  700. CreateClientWriter(path, format, options));
  701. }
  702. ::TIntrusivePtr<IYaMRWriterImpl> TClientBase::CreateYaMRWriter(
  703. const TRichYPath& path, const TTableWriterOptions& options)
  704. {
  705. auto format = TFormat::YaMRLenval();
  706. ApplyFormatHints<TYaMRRow>(&format, options.FormatHints_);
  707. return new TYaMRTableWriter(
  708. CreateClientWriter(path, format, options));
  709. }
  710. ::TIntrusivePtr<IProtoWriterImpl> TClientBase::CreateProtoWriter(
  711. const TRichYPath& path,
  712. const TTableWriterOptions& options,
  713. const Message* prototype)
  714. {
  715. TVector<const ::google::protobuf::Descriptor*> descriptors;
  716. descriptors.push_back(prototype->GetDescriptor());
  717. auto pathWithSchema = path;
  718. if (options.InferSchema_.GetOrElse(Context_.Config->InferTableSchema) && !path.Schema_) {
  719. pathWithSchema.Schema(CreateTableSchema(*prototype->GetDescriptor()));
  720. }
  721. if (Context_.Config->UseClientProtobuf) {
  722. auto format = TFormat::YsonBinary();
  723. ApplyFormatHints<TNode>(&format, options.FormatHints_);
  724. return new TProtoTableWriter(
  725. CreateClientWriter(pathWithSchema, format, options),
  726. std::move(descriptors));
  727. } else {
  728. auto format = TFormat::Protobuf({prototype->GetDescriptor()}, Context_.Config->ProtobufFormatWithDescriptors);
  729. ApplyFormatHints<::google::protobuf::Message>(&format, options.FormatHints_);
  730. return new TLenvalProtoTableWriter(
  731. CreateClientWriter(pathWithSchema, format, options),
  732. std::move(descriptors));
  733. }
  734. }
  735. TBatchRequestPtr TClientBase::CreateBatchRequest()
  736. {
  737. return MakeIntrusive<TBatchRequest>(TransactionId_, GetParentClientImpl());
  738. }
  739. IClientPtr TClientBase::GetParentClient()
  740. {
  741. return GetParentClientImpl();
  742. }
  743. const TClientContext& TClientBase::GetContext() const
  744. {
  745. return Context_;
  746. }
  747. const IClientRetryPolicyPtr& TClientBase::GetRetryPolicy() const
  748. {
  749. return ClientRetryPolicy_;
  750. }
  751. ////////////////////////////////////////////////////////////////////////////////
  752. TTransaction::TTransaction(
  753. TClientPtr parentClient,
  754. const TClientContext& context,
  755. const TTransactionId& parentTransactionId,
  756. const TStartTransactionOptions& options)
  757. : TClientBase(context, parentTransactionId, parentClient->GetRetryPolicy())
  758. , TransactionPinger_(parentClient->GetTransactionPinger())
  759. , PingableTx_(
  760. MakeHolder<TPingableTransaction>(
  761. parentClient->GetRetryPolicy(),
  762. context,
  763. parentTransactionId,
  764. TransactionPinger_->GetChildTxPinger(),
  765. options))
  766. , ParentClient_(parentClient)
  767. {
  768. TransactionId_ = PingableTx_->GetId();
  769. }
  770. TTransaction::TTransaction(
  771. TClientPtr parentClient,
  772. const TClientContext& context,
  773. const TTransactionId& transactionId,
  774. const TAttachTransactionOptions& options)
  775. : TClientBase(context, transactionId, parentClient->GetRetryPolicy())
  776. , TransactionPinger_(parentClient->GetTransactionPinger())
  777. , PingableTx_(
  778. new TPingableTransaction(
  779. parentClient->GetRetryPolicy(),
  780. context,
  781. transactionId,
  782. parentClient->GetTransactionPinger()->GetChildTxPinger(),
  783. options))
  784. , ParentClient_(parentClient)
  785. { }
  786. const TTransactionId& TTransaction::GetId() const
  787. {
  788. return TransactionId_;
  789. }
  790. ILockPtr TTransaction::Lock(
  791. const TYPath& path,
  792. ELockMode mode,
  793. const TLockOptions& options)
  794. {
  795. auto lockId = NRawClient::Lock(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, TransactionId_, path, mode, options);
  796. return ::MakeIntrusive<TLock>(lockId, GetParentClientImpl(), options.Waitable_);
  797. }
  798. void TTransaction::Unlock(
  799. const TYPath& path,
  800. const TUnlockOptions& options)
  801. {
  802. NRawClient::Unlock(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, TransactionId_, path, options);
  803. }
  804. void TTransaction::Commit()
  805. {
  806. PingableTx_->Commit();
  807. }
  808. void TTransaction::Abort()
  809. {
  810. PingableTx_->Abort();
  811. }
  812. void TTransaction::Ping()
  813. {
  814. PingTx(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, TransactionId_);
  815. }
  816. void TTransaction::Detach()
  817. {
  818. PingableTx_->Detach();
  819. }
  820. ITransactionPingerPtr TTransaction::GetTransactionPinger()
  821. {
  822. return TransactionPinger_;
  823. }
  824. TClientPtr TTransaction::GetParentClientImpl()
  825. {
  826. return ParentClient_;
  827. }
  828. ////////////////////////////////////////////////////////////////////////////////
  829. TClient::TClient(
  830. const TClientContext& context,
  831. const TTransactionId& globalId,
  832. IClientRetryPolicyPtr retryPolicy)
  833. : TClientBase(context, globalId, retryPolicy)
  834. , TransactionPinger_(nullptr)
  835. { }
  836. TClient::~TClient() = default;
  837. ITransactionPtr TClient::AttachTransaction(
  838. const TTransactionId& transactionId,
  839. const TAttachTransactionOptions& options)
  840. {
  841. CheckShutdown();
  842. return MakeIntrusive<TTransaction>(this, Context_, transactionId, options);
  843. }
  844. void TClient::MountTable(
  845. const TYPath& path,
  846. const TMountTableOptions& options)
  847. {
  848. CheckShutdown();
  849. THttpHeader header("POST", "mount_table");
  850. SetTabletParams(header, path, options);
  851. if (options.CellId_) {
  852. header.AddParameter("cell_id", GetGuidAsString(*options.CellId_));
  853. }
  854. header.AddParameter("freeze", options.Freeze_);
  855. RetryRequestWithPolicy(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, header);
  856. }
  857. void TClient::UnmountTable(
  858. const TYPath& path,
  859. const TUnmountTableOptions& options)
  860. {
  861. CheckShutdown();
  862. THttpHeader header("POST", "unmount_table");
  863. SetTabletParams(header, path, options);
  864. header.AddParameter("force", options.Force_);
  865. RetryRequestWithPolicy(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, header);
  866. }
  867. void TClient::RemountTable(
  868. const TYPath& path,
  869. const TRemountTableOptions& options)
  870. {
  871. CheckShutdown();
  872. THttpHeader header("POST", "remount_table");
  873. SetTabletParams(header, path, options);
  874. RetryRequestWithPolicy(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, header);
  875. }
  876. void TClient::FreezeTable(
  877. const TYPath& path,
  878. const TFreezeTableOptions& options)
  879. {
  880. CheckShutdown();
  881. NRawClient::FreezeTable(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, path, options);
  882. }
  883. void TClient::UnfreezeTable(
  884. const TYPath& path,
  885. const TUnfreezeTableOptions& options)
  886. {
  887. CheckShutdown();
  888. NRawClient::UnfreezeTable(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, path, options);
  889. }
  890. void TClient::ReshardTable(
  891. const TYPath& path,
  892. const TVector<TKey>& keys,
  893. const TReshardTableOptions& options)
  894. {
  895. CheckShutdown();
  896. THttpHeader header("POST", "reshard_table");
  897. SetTabletParams(header, path, options);
  898. header.AddParameter("pivot_keys", BuildYsonNodeFluently().List(keys));
  899. RetryRequestWithPolicy(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, header);
  900. }
  901. void TClient::ReshardTable(
  902. const TYPath& path,
  903. i64 tabletCount,
  904. const TReshardTableOptions& options)
  905. {
  906. CheckShutdown();
  907. THttpHeader header("POST", "reshard_table");
  908. SetTabletParams(header, path, options);
  909. header.AddParameter("tablet_count", tabletCount);
  910. RetryRequestWithPolicy(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, header);
  911. }
  912. void TClient::InsertRows(
  913. const TYPath& path,
  914. const TNode::TListType& rows,
  915. const TInsertRowsOptions& options)
  916. {
  917. CheckShutdown();
  918. THttpHeader header("PUT", "insert_rows");
  919. header.SetInputFormat(TFormat::YsonBinary());
  920. // TODO: use corresponding raw request
  921. header.MergeParameters(SerializeParametersForInsertRows(Context_.Config->Prefix, path, options));
  922. auto body = NodeListToYsonString(rows);
  923. TRequestConfig config;
  924. config.IsHeavy = true;
  925. RetryRequestWithPolicy(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, header, body, config);
  926. }
  927. void TClient::DeleteRows(
  928. const TYPath& path,
  929. const TNode::TListType& keys,
  930. const TDeleteRowsOptions& options)
  931. {
  932. CheckShutdown();
  933. return NRawClient::DeleteRows(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, path, keys, options);
  934. }
  935. void TClient::TrimRows(
  936. const TYPath& path,
  937. i64 tabletIndex,
  938. i64 rowCount,
  939. const TTrimRowsOptions& options)
  940. {
  941. CheckShutdown();
  942. THttpHeader header("POST", "trim_rows");
  943. header.AddParameter("trimmed_row_count", rowCount);
  944. header.AddParameter("tablet_index", tabletIndex);
  945. // TODO: use corresponding raw request
  946. header.MergeParameters(NRawClient::SerializeParametersForTrimRows(Context_.Config->Prefix, path, options));
  947. TRequestConfig config;
  948. config.IsHeavy = true;
  949. RetryRequestWithPolicy(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, header, {}, config);
  950. }
  951. TNode::TListType TClient::LookupRows(
  952. const TYPath& path,
  953. const TNode::TListType& keys,
  954. const TLookupRowsOptions& options)
  955. {
  956. CheckShutdown();
  957. Y_UNUSED(options);
  958. THttpHeader header("PUT", "lookup_rows");
  959. header.AddPath(AddPathPrefix(path, Context_.Config->ApiVersion));
  960. header.SetInputFormat(TFormat::YsonBinary());
  961. header.SetOutputFormat(TFormat::YsonBinary());
  962. header.MergeParameters(BuildYsonNodeFluently().BeginMap()
  963. .DoIf(options.Timeout_.Defined(), [&] (TFluentMap fluent) {
  964. fluent.Item("timeout").Value(static_cast<i64>(options.Timeout_->MilliSeconds()));
  965. })
  966. .Item("keep_missing_rows").Value(options.KeepMissingRows_)
  967. .DoIf(options.Versioned_.Defined(), [&] (TFluentMap fluent) {
  968. fluent.Item("versioned").Value(*options.Versioned_);
  969. })
  970. .DoIf(options.Columns_.Defined(), [&] (TFluentMap fluent) {
  971. fluent.Item("column_names").Value(*options.Columns_);
  972. })
  973. .EndMap());
  974. auto body = NodeListToYsonString(keys);
  975. TRequestConfig config;
  976. config.IsHeavy = true;
  977. auto result = RetryRequestWithPolicy(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, header, body, config);
  978. return NodeFromYsonString(result.Response, ::NYson::EYsonType::ListFragment).AsList();
  979. }
  980. TNode::TListType TClient::SelectRows(
  981. const TString& query,
  982. const TSelectRowsOptions& options)
  983. {
  984. CheckShutdown();
  985. THttpHeader header("GET", "select_rows");
  986. header.SetInputFormat(TFormat::YsonBinary());
  987. header.SetOutputFormat(TFormat::YsonBinary());
  988. header.MergeParameters(BuildYsonNodeFluently().BeginMap()
  989. .Item("query").Value(query)
  990. .DoIf(options.Timeout_.Defined(), [&] (TFluentMap fluent) {
  991. fluent.Item("timeout").Value(static_cast<i64>(options.Timeout_->MilliSeconds()));
  992. })
  993. .DoIf(options.InputRowLimit_.Defined(), [&] (TFluentMap fluent) {
  994. fluent.Item("input_row_limit").Value(*options.InputRowLimit_);
  995. })
  996. .DoIf(options.OutputRowLimit_.Defined(), [&] (TFluentMap fluent) {
  997. fluent.Item("output_row_limit").Value(*options.OutputRowLimit_);
  998. })
  999. .Item("range_expansion_limit").Value(options.RangeExpansionLimit_)
  1000. .Item("fail_on_incomplete_result").Value(options.FailOnIncompleteResult_)
  1001. .Item("verbose_logging").Value(options.VerboseLogging_)
  1002. .Item("enable_code_cache").Value(options.EnableCodeCache_)
  1003. .EndMap());
  1004. TRequestConfig config;
  1005. config.IsHeavy = true;
  1006. auto result = RetryRequestWithPolicy(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, header, {}, config);
  1007. return NodeFromYsonString(result.Response, ::NYson::EYsonType::ListFragment).AsList();
  1008. }
  1009. void TClient::AlterTableReplica(const TReplicaId& replicaId, const TAlterTableReplicaOptions& options)
  1010. {
  1011. CheckShutdown();
  1012. NRawClient::AlterTableReplica(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, replicaId, options);
  1013. }
  1014. ui64 TClient::GenerateTimestamp()
  1015. {
  1016. CheckShutdown();
  1017. THttpHeader header("GET", "generate_timestamp");
  1018. TRequestConfig config;
  1019. config.IsHeavy = true;
  1020. auto requestResult = RetryRequestWithPolicy(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, header, {}, config);
  1021. return NodeFromYsonString(requestResult.Response).AsUint64();
  1022. }
  1023. TAuthorizationInfo TClient::WhoAmI()
  1024. {
  1025. CheckShutdown();
  1026. THttpHeader header("GET", "auth/whoami", /* isApi = */ false);
  1027. auto requestResult = RetryRequestWithPolicy(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, header);
  1028. TAuthorizationInfo result;
  1029. NJson::TJsonValue jsonValue;
  1030. bool ok = NJson::ReadJsonTree(requestResult.Response, &jsonValue, /* throwOnError = */ true);
  1031. Y_ABORT_UNLESS(ok);
  1032. result.Login = jsonValue["login"].GetString();
  1033. result.Realm = jsonValue["realm"].GetString();
  1034. return result;
  1035. }
  1036. TOperationAttributes TClient::GetOperation(
  1037. const TOperationId& operationId,
  1038. const TGetOperationOptions& options)
  1039. {
  1040. CheckShutdown();
  1041. return NRawClient::GetOperation(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, operationId, options);
  1042. }
  1043. TOperationAttributes TClient::GetOperation(
  1044. const TString& alias,
  1045. const TGetOperationOptions& options)
  1046. {
  1047. CheckShutdown();
  1048. return NRawClient::GetOperation(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, alias, options);
  1049. }
  1050. TListOperationsResult TClient::ListOperations(
  1051. const TListOperationsOptions& options)
  1052. {
  1053. CheckShutdown();
  1054. return NRawClient::ListOperations(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, options);
  1055. }
  1056. void TClient::UpdateOperationParameters(
  1057. const TOperationId& operationId,
  1058. const TUpdateOperationParametersOptions& options)
  1059. {
  1060. CheckShutdown();
  1061. return NRawClient::UpdateOperationParameters(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, operationId, options);
  1062. }
  1063. TJobAttributes TClient::GetJob(
  1064. const TOperationId& operationId,
  1065. const TJobId& jobId,
  1066. const TGetJobOptions& options)
  1067. {
  1068. CheckShutdown();
  1069. return NRawClient::GetJob(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, operationId, jobId, options);
  1070. }
  1071. TListJobsResult TClient::ListJobs(
  1072. const TOperationId& operationId,
  1073. const TListJobsOptions& options)
  1074. {
  1075. CheckShutdown();
  1076. return NRawClient::ListJobs(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, operationId, options);
  1077. }
  1078. IFileReaderPtr TClient::GetJobInput(
  1079. const TJobId& jobId,
  1080. const TGetJobInputOptions& options)
  1081. {
  1082. CheckShutdown();
  1083. return NRawClient::GetJobInput(Context_, jobId, options);
  1084. }
  1085. IFileReaderPtr TClient::GetJobFailContext(
  1086. const TOperationId& operationId,
  1087. const TJobId& jobId,
  1088. const TGetJobFailContextOptions& options)
  1089. {
  1090. CheckShutdown();
  1091. return NRawClient::GetJobFailContext(Context_, operationId, jobId, options);
  1092. }
  1093. IFileReaderPtr TClient::GetJobStderr(
  1094. const TOperationId& operationId,
  1095. const TJobId& jobId,
  1096. const TGetJobStderrOptions& options)
  1097. {
  1098. CheckShutdown();
  1099. return NRawClient::GetJobStderr(Context_, operationId, jobId, options);
  1100. }
  1101. TNode::TListType TClient::SkyShareTable(
  1102. const std::vector<TYPath>& tablePaths,
  1103. const TSkyShareTableOptions& options)
  1104. {
  1105. CheckShutdown();
  1106. return NRawClient::SkyShareTable(
  1107. ClientRetryPolicy_->CreatePolicyForGenericRequest(),
  1108. Context_,
  1109. tablePaths,
  1110. options);
  1111. }
  1112. TCheckPermissionResponse TClient::CheckPermission(
  1113. const TString& user,
  1114. EPermission permission,
  1115. const TYPath& path,
  1116. const TCheckPermissionOptions& options)
  1117. {
  1118. CheckShutdown();
  1119. return NRawClient::CheckPermission(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, user, permission, path, options);
  1120. }
  1121. TVector<TTabletInfo> TClient::GetTabletInfos(
  1122. const TYPath& path,
  1123. const TVector<int>& tabletIndexes,
  1124. const TGetTabletInfosOptions& options)
  1125. {
  1126. CheckShutdown();
  1127. return NRawClient::GetTabletInfos(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, path, tabletIndexes, options);
  1128. }
  1129. void TClient::SuspendOperation(
  1130. const TOperationId& operationId,
  1131. const TSuspendOperationOptions& options)
  1132. {
  1133. CheckShutdown();
  1134. NRawClient::SuspendOperation(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, operationId, options);
  1135. }
  1136. void TClient::ResumeOperation(
  1137. const TOperationId& operationId,
  1138. const TResumeOperationOptions& options)
  1139. {
  1140. CheckShutdown();
  1141. NRawClient::ResumeOperation(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, operationId, options);
  1142. }
  1143. TYtPoller& TClient::GetYtPoller()
  1144. {
  1145. auto g = Guard(Lock_);
  1146. if (!YtPoller_) {
  1147. CheckShutdown();
  1148. // We don't use current client and create new client because YtPoller_ might use
  1149. // this client during current client shutdown.
  1150. // That might lead to incrementing of current client refcount and double delete of current client object.
  1151. YtPoller_ = MakeHolder<TYtPoller>(Context_, ClientRetryPolicy_);
  1152. }
  1153. return *YtPoller_;
  1154. }
  1155. void TClient::Shutdown()
  1156. {
  1157. auto g = Guard(Lock_);
  1158. if (!Shutdown_.exchange(true) && YtPoller_) {
  1159. YtPoller_->Stop();
  1160. }
  1161. }
  1162. ITransactionPingerPtr TClient::GetTransactionPinger()
  1163. {
  1164. auto g = Guard(Lock_);
  1165. if (!TransactionPinger_) {
  1166. TransactionPinger_ = CreateTransactionPinger(Context_.Config);
  1167. }
  1168. return TransactionPinger_;
  1169. }
  1170. TClientPtr TClient::GetParentClientImpl()
  1171. {
  1172. return this;
  1173. }
  1174. template <class TOptions>
  1175. void TClient::SetTabletParams(
  1176. THttpHeader& header,
  1177. const TYPath& path,
  1178. const TOptions& options)
  1179. {
  1180. header.AddPath(AddPathPrefix(path, Context_.Config->Prefix));
  1181. if (options.FirstTabletIndex_) {
  1182. header.AddParameter("first_tablet_index", *options.FirstTabletIndex_);
  1183. }
  1184. if (options.LastTabletIndex_) {
  1185. header.AddParameter("last_tablet_index", *options.LastTabletIndex_);
  1186. }
  1187. }
  1188. void TClient::CheckShutdown() const
  1189. {
  1190. if (Shutdown_) {
  1191. ythrow TApiUsageError() << "Call client's methods after shutdown";
  1192. }
  1193. }
  1194. TClientPtr CreateClientImpl(
  1195. const TString& serverName,
  1196. const TCreateClientOptions& options)
  1197. {
  1198. TClientContext context;
  1199. context.Config = options.Config_ ? options.Config_ : TConfig::Get();
  1200. context.TvmOnly = options.TvmOnly_;
  1201. context.ProxyAddress = options.ProxyAddress_;
  1202. context.ServerName = serverName;
  1203. if (context.ServerName.find('.') == TString::npos &&
  1204. context.ServerName.find(':') == TString::npos &&
  1205. context.ServerName.find("localhost") == TString::npos)
  1206. {
  1207. context.ServerName += ".yt.yandex.net";
  1208. }
  1209. static constexpr char httpUrlSchema[] = "http://";
  1210. static constexpr char httpsUrlSchema[] = "https://";
  1211. if (options.UseTLS_) {
  1212. context.UseTLS = *options.UseTLS_;
  1213. } else {
  1214. context.UseTLS = context.ServerName.StartsWith(httpsUrlSchema);
  1215. }
  1216. if (context.ServerName.StartsWith(httpUrlSchema)) {
  1217. if (context.UseTLS) {
  1218. ythrow TApiUsageError() << "URL schema doesn't match UseTLS option";
  1219. }
  1220. context.ServerName.erase(0, sizeof(httpUrlSchema) - 1);
  1221. }
  1222. if (context.ServerName.StartsWith(httpsUrlSchema)) {
  1223. if (!context.UseTLS) {
  1224. ythrow TApiUsageError() << "URL schema doesn't match UseTLS option";
  1225. }
  1226. context.ServerName.erase(0, sizeof(httpsUrlSchema) - 1);
  1227. }
  1228. if (context.ServerName.find(':') == TString::npos) {
  1229. context.ServerName = CreateHostNameWithPort(context.ServerName, context);
  1230. }
  1231. if (options.TvmOnly_) {
  1232. context.ServerName = Format("tvm.%v", context.ServerName);
  1233. }
  1234. if (context.UseTLS || options.UseCoreHttpClient_) {
  1235. context.HttpClient = NHttpClient::CreateCoreHttpClient(context.UseTLS, context.Config);
  1236. } else {
  1237. context.HttpClient = NHttpClient::CreateDefaultHttpClient();
  1238. }
  1239. context.Token = context.Config->Token;
  1240. if (options.Token_) {
  1241. context.Token = options.Token_;
  1242. } else if (options.TokenPath_) {
  1243. context.Token = TConfig::LoadTokenFromFile(options.TokenPath_);
  1244. } else if (options.ServiceTicketAuth_) {
  1245. context.ServiceTicketAuth = options.ServiceTicketAuth_;
  1246. }
  1247. context.ImpersonationUser = options.ImpersonationUser_;
  1248. if (context.Token) {
  1249. TConfig::ValidateToken(context.Token);
  1250. }
  1251. auto globalTxId = GetGuid(context.Config->GlobalTxId);
  1252. auto retryConfigProvider = options.RetryConfigProvider_;
  1253. if (!retryConfigProvider) {
  1254. retryConfigProvider = CreateDefaultRetryConfigProvider();
  1255. }
  1256. return new NDetail::TClient(context, globalTxId, CreateDefaultClientRetryPolicy(retryConfigProvider, context.Config));
  1257. }
  1258. ////////////////////////////////////////////////////////////////////////////////
  1259. } // namespace NDetail
  1260. ////////////////////////////////////////////////////////////////////////////////
  1261. IClientPtr CreateClient(
  1262. const TString& serverName,
  1263. const TCreateClientOptions& options)
  1264. {
  1265. return NDetail::CreateClientImpl(serverName, options);
  1266. }
  1267. IClientPtr CreateClientFromEnv(const TCreateClientOptions& options)
  1268. {
  1269. auto serverName = GetEnv("YT_PROXY");
  1270. if (!serverName) {
  1271. ythrow yexception() << "YT_PROXY is not set";
  1272. }
  1273. NDetail::ApplyProxyUrlAliasingRules(serverName);
  1274. return NDetail::CreateClientImpl(serverName, options);
  1275. }
  1276. ////////////////////////////////////////////////////////////////////////////////
  1277. } // namespace NYT