client.h 15 KB


  1. #pragma once
  2. #include "client_reader.h"
  3. #include "client_writer.h"
  4. #include "transaction_pinger.h"
  5. #include <yt/cpp/mapreduce/interface/client.h>
  6. #include <yt/cpp/mapreduce/interface/raw_client.h>
  7. #include <yt/cpp/mapreduce/http/context.h>
  8. #include <yt/cpp/mapreduce/http/requests.h>
  9. namespace NYT {
  10. namespace NDetail {
  11. ////////////////////////////////////////////////////////////////////////////////
  12. class TYtPoller;
  13. class TClientBase;
  14. using TClientBasePtr = ::TIntrusivePtr<TClientBase>;
  15. class TClient;
  16. using TClientPtr = ::TIntrusivePtr<TClient>;
  17. ////////////////////////////////////////////////////////////////////////////////
  18. class TClientBase
  19. : virtual public IClientBase
  20. {
  21. public:
  22. TClientBase(
  23. IRawClientPtr rawClient,
  24. const TClientContext& context,
  25. const TTransactionId& transactionId,
  26. IClientRetryPolicyPtr retryPolicy);
  27. ITransactionPtr StartTransaction(
  28. const TStartTransactionOptions& options) override;
  29. // cypress
  30. TNodeId Create(
  31. const TYPath& path,
  32. ENodeType type,
  33. const TCreateOptions& options) override;
  34. void Remove(
  35. const TYPath& path,
  36. const TRemoveOptions& options) override;
  37. bool Exists(
  38. const TYPath& path,
  39. const TExistsOptions& options) override;
  40. TNode Get(
  41. const TYPath& path,
  42. const TGetOptions& options) override;
  43. void Set(
  44. const TYPath& path,
  45. const TNode& value,
  46. const TSetOptions& options) override;
  47. void MultisetAttributes(
  48. const TYPath& path,
  49. const TNode::TMapType& value,
  50. const TMultisetAttributesOptions& options) override;
  51. TNode::TListType List(
  52. const TYPath& path,
  53. const TListOptions& options) override;
  54. TNodeId Copy(
  55. const TYPath& sourcePath,
  56. const TYPath& destinationPath,
  57. const TCopyOptions& options) override;
  58. TNodeId Move(
  59. const TYPath& sourcePath,
  60. const TYPath& destinationPath,
  61. const TMoveOptions& options) override;
  62. TNodeId Link(
  63. const TYPath& targetPath,
  64. const TYPath& linkPath,
  65. const TLinkOptions& options) override;
  66. void Concatenate(
  67. const TVector<TRichYPath>& sourcePaths,
  68. const TRichYPath& destinationPath,
  69. const TConcatenateOptions& options) override;
  70. TRichYPath CanonizeYPath(const TRichYPath& path) override;
  71. TVector<TTableColumnarStatistics> GetTableColumnarStatistics(
  72. const TVector<TRichYPath>& paths,
  73. const TGetTableColumnarStatisticsOptions& options) override;
  74. TMultiTablePartitions GetTablePartitions(
  75. const TVector<TRichYPath>& paths,
  76. const TGetTablePartitionsOptions& options) override;
  77. TMaybe<TYPath> GetFileFromCache(
  78. const TString& md5Signature,
  79. const TYPath& cachePath,
  80. const TGetFileFromCacheOptions& options = TGetFileFromCacheOptions()) override;
  81. TYPath PutFileToCache(
  82. const TYPath& filePath,
  83. const TString& md5Signature,
  84. const TYPath& cachePath,
  85. const TPutFileToCacheOptions& options = TPutFileToCacheOptions()) override;
  86. IFileReaderPtr CreateFileReader(
  87. const TRichYPath& path,
  88. const TFileReaderOptions& options) override;
  89. IFileWriterPtr CreateFileWriter(
  90. const TRichYPath& path,
  91. const TFileWriterOptions& options) override;
  92. TTableWriterPtr<::google::protobuf::Message> CreateTableWriter(
  93. const TRichYPath& path,
  94. const ::google::protobuf::Descriptor& descriptor,
  95. const TTableWriterOptions& options) override;
  96. TRawTableReaderPtr CreateRawReader(
  97. const TRichYPath& path,
  98. const TFormat& format,
  99. const TTableReaderOptions& options) override;
  100. TRawTableWriterPtr CreateRawWriter(
  101. const TRichYPath& path,
  102. const TFormat& format,
  103. const TTableWriterOptions& options) override;
  104. IFileReaderPtr CreateBlobTableReader(
  105. const TYPath& path,
  106. const TKey& key,
  107. const TBlobTableReaderOptions& options) override;
  108. // operations
  109. IOperationPtr DoMap(
  110. const TMapOperationSpec& spec,
  111. ::TIntrusivePtr<IStructuredJob> mapper,
  112. const TOperationOptions& options) override;
  113. IOperationPtr RawMap(
  114. const TRawMapOperationSpec& spec,
  115. ::TIntrusivePtr<IRawJob> mapper,
  116. const TOperationOptions& options) override;
  117. IOperationPtr DoReduce(
  118. const TReduceOperationSpec& spec,
  119. ::TIntrusivePtr<IStructuredJob> reducer,
  120. const TOperationOptions& options) override;
  121. IOperationPtr RawReduce(
  122. const TRawReduceOperationSpec& spec,
  123. ::TIntrusivePtr<IRawJob> mapper,
  124. const TOperationOptions& options) override;
  125. IOperationPtr DoJoinReduce(
  126. const TJoinReduceOperationSpec& spec,
  127. ::TIntrusivePtr<IStructuredJob> reducer,
  128. const TOperationOptions& options) override;
  129. IOperationPtr RawJoinReduce(
  130. const TRawJoinReduceOperationSpec& spec,
  131. ::TIntrusivePtr<IRawJob> mapper,
  132. const TOperationOptions& options) override;
  133. IOperationPtr DoMapReduce(
  134. const TMapReduceOperationSpec& spec,
  135. ::TIntrusivePtr<IStructuredJob> mapper,
  136. ::TIntrusivePtr<IStructuredJob> reduceCombiner,
  137. ::TIntrusivePtr<IStructuredJob> reducer,
  138. const TOperationOptions& options) override;
  139. IOperationPtr RawMapReduce(
  140. const TRawMapReduceOperationSpec& spec,
  141. ::TIntrusivePtr<IRawJob> mapper,
  142. ::TIntrusivePtr<IRawJob> reduceCombiner,
  143. ::TIntrusivePtr<IRawJob> reducer,
  144. const TOperationOptions& options) override;
  145. IOperationPtr Sort(
  146. const TSortOperationSpec& spec,
  147. const TOperationOptions& options) override;
  148. IOperationPtr Merge(
  149. const TMergeOperationSpec& spec,
  150. const TOperationOptions& options) override;
  151. IOperationPtr Erase(
  152. const TEraseOperationSpec& spec,
  153. const TOperationOptions& options) override;
  154. IOperationPtr RemoteCopy(
  155. const TRemoteCopyOperationSpec& spec,
  156. const TOperationOptions& options = TOperationOptions()) override;
  157. IOperationPtr RunVanilla(
  158. const TVanillaOperationSpec& spec,
  159. const TOperationOptions& options = TOperationOptions()) override;
  160. IOperationPtr AttachOperation(const TOperationId& operationId) override;
  161. EOperationBriefState CheckOperation(const TOperationId& operationId) override;
  162. void AbortOperation(const TOperationId& operationId) override;
  163. void CompleteOperation(const TOperationId& operationId) override;
  164. void WaitForOperation(const TOperationId& operationId) override;
  165. void AlterTable(
  166. const TYPath& path,
  167. const TAlterTableOptions& options) override;
  168. TBatchRequestPtr CreateBatchRequest() override;
  169. IClientPtr GetParentClient() override;
  170. IRawClientPtr GetRawClient() const;
  171. const TClientContext& GetContext() const;
  172. const IClientRetryPolicyPtr& GetRetryPolicy() const;
  173. virtual ITransactionPingerPtr GetTransactionPinger() = 0;
  174. protected:
  175. virtual TClientPtr GetParentClientImpl() = 0;
  176. protected:
  177. const IRawClientPtr RawClient_;
  178. const TClientContext Context_;
  179. TTransactionId TransactionId_;
  180. IClientRetryPolicyPtr ClientRetryPolicy_;
  181. private:
  182. ::TIntrusivePtr<TClientReader> CreateClientReader(
  183. const TRichYPath& path,
  184. const TFormat& format,
  185. const TTableReaderOptions& options,
  186. bool useFormatFromTableAttributes = false);
  187. THolder<TClientWriter> CreateClientWriter(
  188. const TRichYPath& path,
  189. const TFormat& format,
  190. const TTableWriterOptions& options);
  191. ::TIntrusivePtr<INodeReaderImpl> CreateNodeReader(
  192. const TRichYPath& path, const TTableReaderOptions& options) override;
  193. ::TIntrusivePtr<IYaMRReaderImpl> CreateYaMRReader(
  194. const TRichYPath& path, const TTableReaderOptions& options) override;
  195. ::TIntrusivePtr<IProtoReaderImpl> CreateProtoReader(
  196. const TRichYPath& path,
  197. const TTableReaderOptions& options,
  198. const Message* prototype) override;
  199. ::TIntrusivePtr<ISkiffRowReaderImpl> CreateSkiffRowReader(
  200. const TRichYPath& path,
  201. const TTableReaderOptions& options,
  202. const ISkiffRowSkipperPtr& skipper,
  203. const NSkiff::TSkiffSchemaPtr& schema) override;
  204. ::TIntrusivePtr<INodeWriterImpl> CreateNodeWriter(
  205. const TRichYPath& path, const TTableWriterOptions& options) override;
  206. ::TIntrusivePtr<IYaMRWriterImpl> CreateYaMRWriter(
  207. const TRichYPath& path, const TTableWriterOptions& options) override;
  208. ::TIntrusivePtr<IProtoWriterImpl> CreateProtoWriter(
  209. const TRichYPath& path,
  210. const TTableWriterOptions& options,
  211. const Message* prototype) override;
  212. };
  213. ////////////////////////////////////////////////////////////////////////////////
  214. class TTransaction
  215. : public ITransaction
  216. , public TClientBase
  217. {
  218. public:
  219. //
  220. // Start a new transaction.
  221. TTransaction(
  222. const IRawClientPtr& rawClient,
  223. TClientPtr parentClient,
  224. const TClientContext& context,
  225. const TTransactionId& parentTransactionId,
  226. const TStartTransactionOptions& options);
  227. //
  228. // Attach an existing transaction.
  229. TTransaction(
  230. const IRawClientPtr& rawClient,
  231. TClientPtr parentClient,
  232. const TClientContext& context,
  233. const TTransactionId& transactionId,
  234. const TAttachTransactionOptions& options);
  235. const TTransactionId& GetId() const override;
  236. ILockPtr Lock(
  237. const TYPath& path,
  238. ELockMode mode,
  239. const TLockOptions& options) override;
  240. void Unlock(
  241. const TYPath& path,
  242. const TUnlockOptions& options) override;
  243. void Commit() override;
  244. void Abort() override;
  245. void Ping() override;
  246. void Detach() override;
  247. ITransactionPingerPtr GetTransactionPinger() override;
  248. protected:
  249. TClientPtr GetParentClientImpl() override;
  250. private:
  251. ITransactionPingerPtr TransactionPinger_;
  252. std::unique_ptr<TPingableTransaction> PingableTx_;
  253. TClientPtr ParentClient_;
  254. };
  255. ////////////////////////////////////////////////////////////////////////////////
  256. class TClient
  257. : public IClient
  258. , public TClientBase
  259. {
  260. public:
  261. TClient(
  262. IRawClientPtr rawClient,
  263. const TClientContext& context,
  264. const TTransactionId& globalId,
  265. IClientRetryPolicyPtr retryPolicy);
  266. ~TClient();
  267. ITransactionPtr AttachTransaction(
  268. const TTransactionId& transactionId,
  269. const TAttachTransactionOptions& options) override;
  270. void MountTable(
  271. const TYPath& path,
  272. const TMountTableOptions& options) override;
  273. void UnmountTable(
  274. const TYPath& path,
  275. const TUnmountTableOptions& options) override;
  276. void RemountTable(
  277. const TYPath& path,
  278. const TRemountTableOptions& options) override;
  279. void FreezeTable(
  280. const TYPath& path,
  281. const TFreezeTableOptions& options) override;
  282. void UnfreezeTable(
  283. const TYPath& path,
  284. const TUnfreezeTableOptions& options) override;
  285. void ReshardTable(
  286. const TYPath& path,
  287. const TVector<TKey>& keys,
  288. const TReshardTableOptions& options) override;
  289. void ReshardTable(
  290. const TYPath& path,
  291. i64 tabletCount,
  292. const TReshardTableOptions& options) override;
  293. void InsertRows(
  294. const TYPath& path,
  295. const TNode::TListType& rows,
  296. const TInsertRowsOptions& options) override;
  297. void DeleteRows(
  298. const TYPath& path,
  299. const TNode::TListType& keys,
  300. const TDeleteRowsOptions& options) override;
  301. void TrimRows(
  302. const TYPath& path,
  303. i64 tabletIndex,
  304. i64 rowCount,
  305. const TTrimRowsOptions& options) override;
  306. TNode::TListType LookupRows(
  307. const TYPath& path,
  308. const TNode::TListType& keys,
  309. const TLookupRowsOptions& options) override;
  310. TNode::TListType SelectRows(
  311. const TString& query,
  312. const TSelectRowsOptions& options) override;
  313. void AlterTableReplica(
  314. const TReplicaId& replicaId,
  315. const TAlterTableReplicaOptions& alterTableReplicaOptions) override;
  316. ui64 GenerateTimestamp() override;
  317. TAuthorizationInfo WhoAmI() override;
  318. TOperationAttributes GetOperation(
  319. const TOperationId& operationId,
  320. const TGetOperationOptions& options) override;
  321. TOperationAttributes GetOperation(
  322. const TString& alias,
  323. const TGetOperationOptions& options) override;
  324. TListOperationsResult ListOperations(
  325. const TListOperationsOptions& options) override;
  326. void UpdateOperationParameters(
  327. const TOperationId& operationId,
  328. const TUpdateOperationParametersOptions& options) override;
  329. TJobAttributes GetJob(
  330. const TOperationId& operationId,
  331. const TJobId& jobId,
  332. const TGetJobOptions& options) override;
  333. TListJobsResult ListJobs(
  334. const TOperationId& operationId,
  335. const TListJobsOptions& options = TListJobsOptions()) override;
  336. IFileReaderPtr GetJobInput(
  337. const TJobId& jobId,
  338. const TGetJobInputOptions& options = TGetJobInputOptions()) override;
  339. IFileReaderPtr GetJobFailContext(
  340. const TOperationId& operationId,
  341. const TJobId& jobId,
  342. const TGetJobFailContextOptions& options = TGetJobFailContextOptions()) override;
  343. IFileReaderPtr GetJobStderr(
  344. const TOperationId& operationId,
  345. const TJobId& jobId,
  346. const TGetJobStderrOptions& options = TGetJobStderrOptions()) override;
  347. std::vector<TJobTraceEvent> GetJobTrace(
  348. const TOperationId& operationId,
  349. const TGetJobTraceOptions& options = TGetJobTraceOptions()) override;
  350. TNode::TListType SkyShareTable(
  351. const std::vector<TYPath>& tablePaths,
  352. const TSkyShareTableOptions& options = TSkyShareTableOptions()) override;
  353. TCheckPermissionResponse CheckPermission(
  354. const TString& user,
  355. EPermission permission,
  356. const TYPath& path,
  357. const TCheckPermissionOptions& options) override;
  358. TVector<TTabletInfo> GetTabletInfos(
  359. const TYPath& path,
  360. const TVector<int>& tabletIndexes,
  361. const TGetTabletInfosOptions& options) override;
  362. void SuspendOperation(
  363. const TOperationId& operationId,
  364. const TSuspendOperationOptions& options) override;
  365. void ResumeOperation(
  366. const TOperationId& operationId,
  367. const TResumeOperationOptions& options) override;
  368. void Shutdown() override;
  369. ITransactionPingerPtr GetTransactionPinger() override;
  370. // Helper methods
  371. TYtPoller& GetYtPoller();
  372. protected:
  373. TClientPtr GetParentClientImpl() override;
  374. private:
  375. void CheckShutdown() const;
  376. private:
  377. ITransactionPingerPtr TransactionPinger_;
  378. std::atomic<bool> Shutdown_ = false;
  379. TMutex Lock_;
  380. std::unique_ptr<TYtPoller> YtPoller_;
  381. };
  382. ////////////////////////////////////////////////////////////////////////////////
  383. TClientContext CreateClientContext(
  384. const TString& serverName,
  385. const TCreateClientOptions& options);
  386. TClientPtr CreateClientImpl(
  387. const TString& serverName,
  388. const TCreateClientOptions& options = {});
  389. ////////////////////////////////////////////////////////////////////////////////
  390. } // namespace NDetail
  391. } // namespace NYT