client.h 15 KB


  1. #pragma once
  2. #include "client_reader.h"
  3. #include "client_writer.h"
  4. #include "transaction_pinger.h"
  5. #include <yt/cpp/mapreduce/interface/client.h>
  6. #include <yt/cpp/mapreduce/http/context.h>
  7. #include <yt/cpp/mapreduce/http/requests.h>
  8. namespace NYT {
  9. namespace NDetail {
  10. ////////////////////////////////////////////////////////////////////////////////
  11. class TYtPoller;
  12. class TClientBase;
  13. using TClientBasePtr = ::TIntrusivePtr<TClientBase>;
  14. class TClient;
  15. using TClientPtr = ::TIntrusivePtr<TClient>;
  16. ////////////////////////////////////////////////////////////////////////////////
  17. class TClientBase
  18. : virtual public IClientBase
  19. {
  20. public:
  21. TClientBase(
  22. const TClientContext& context,
  23. const TTransactionId& transactionId,
  24. IClientRetryPolicyPtr retryPolicy);
  25. ITransactionPtr StartTransaction(
  26. const TStartTransactionOptions& options) override;
  27. // cypress
  28. TNodeId Create(
  29. const TYPath& path,
  30. ENodeType type,
  31. const TCreateOptions& options) override;
  32. void Remove(
  33. const TYPath& path,
  34. const TRemoveOptions& options) override;
  35. bool Exists(
  36. const TYPath& path,
  37. const TExistsOptions& options) override;
  38. TNode Get(
  39. const TYPath& path,
  40. const TGetOptions& options) override;
  41. void Set(
  42. const TYPath& path,
  43. const TNode& value,
  44. const TSetOptions& options) override;
  45. void MultisetAttributes(
  46. const TYPath& path,
  47. const TNode::TMapType& value,
  48. const TMultisetAttributesOptions& options) override;
  49. TNode::TListType List(
  50. const TYPath& path,
  51. const TListOptions& options) override;
  52. TNodeId Copy(
  53. const TYPath& sourcePath,
  54. const TYPath& destinationPath,
  55. const TCopyOptions& options) override;
  56. TNodeId Move(
  57. const TYPath& sourcePath,
  58. const TYPath& destinationPath,
  59. const TMoveOptions& options) override;
  60. TNodeId Link(
  61. const TYPath& targetPath,
  62. const TYPath& linkPath,
  63. const TLinkOptions& options) override;
  64. void Concatenate(
  65. const TVector<TRichYPath>& sourcePaths,
  66. const TRichYPath& destinationPath,
  67. const TConcatenateOptions& options) override;
  68. TRichYPath CanonizeYPath(const TRichYPath& path) override;
  69. TVector<TTableColumnarStatistics> GetTableColumnarStatistics(
  70. const TVector<TRichYPath>& paths,
  71. const TGetTableColumnarStatisticsOptions& options) override;
  72. TMultiTablePartitions GetTablePartitions(
  73. const TVector<TRichYPath>& paths,
  74. const TGetTablePartitionsOptions& options) override;
  75. TMaybe<TYPath> GetFileFromCache(
  76. const TString& md5Signature,
  77. const TYPath& cachePath,
  78. const TGetFileFromCacheOptions& options = TGetFileFromCacheOptions()) override;
  79. TYPath PutFileToCache(
  80. const TYPath& filePath,
  81. const TString& md5Signature,
  82. const TYPath& cachePath,
  83. const TPutFileToCacheOptions& options = TPutFileToCacheOptions()) override;
  84. IFileReaderPtr CreateFileReader(
  85. const TRichYPath& path,
  86. const TFileReaderOptions& options) override;
  87. IFileWriterPtr CreateFileWriter(
  88. const TRichYPath& path,
  89. const TFileWriterOptions& options) override;
  90. TTableWriterPtr<::google::protobuf::Message> CreateTableWriter(
  91. const TRichYPath& path,
  92. const ::google::protobuf::Descriptor& descriptor,
  93. const TTableWriterOptions& options) override;
  94. TRawTableReaderPtr CreateRawReader(
  95. const TRichYPath& path,
  96. const TFormat& format,
  97. const TTableReaderOptions& options) override;
  98. TRawTableWriterPtr CreateRawWriter(
  99. const TRichYPath& path,
  100. const TFormat& format,
  101. const TTableWriterOptions& options) override;
  102. IFileReaderPtr CreateBlobTableReader(
  103. const TYPath& path,
  104. const TKey& key,
  105. const TBlobTableReaderOptions& options) override;
  106. // operations
  107. IOperationPtr DoMap(
  108. const TMapOperationSpec& spec,
  109. ::TIntrusivePtr<IStructuredJob> mapper,
  110. const TOperationOptions& options) override;
  111. IOperationPtr RawMap(
  112. const TRawMapOperationSpec& spec,
  113. ::TIntrusivePtr<IRawJob> mapper,
  114. const TOperationOptions& options) override;
  115. IOperationPtr DoReduce(
  116. const TReduceOperationSpec& spec,
  117. ::TIntrusivePtr<IStructuredJob> reducer,
  118. const TOperationOptions& options) override;
  119. IOperationPtr RawReduce(
  120. const TRawReduceOperationSpec& spec,
  121. ::TIntrusivePtr<IRawJob> mapper,
  122. const TOperationOptions& options) override;
  123. IOperationPtr DoJoinReduce(
  124. const TJoinReduceOperationSpec& spec,
  125. ::TIntrusivePtr<IStructuredJob> reducer,
  126. const TOperationOptions& options) override;
  127. IOperationPtr RawJoinReduce(
  128. const TRawJoinReduceOperationSpec& spec,
  129. ::TIntrusivePtr<IRawJob> mapper,
  130. const TOperationOptions& options) override;
  131. IOperationPtr DoMapReduce(
  132. const TMapReduceOperationSpec& spec,
  133. ::TIntrusivePtr<IStructuredJob> mapper,
  134. ::TIntrusivePtr<IStructuredJob> reduceCombiner,
  135. ::TIntrusivePtr<IStructuredJob> reducer,
  136. const TOperationOptions& options) override;
  137. IOperationPtr RawMapReduce(
  138. const TRawMapReduceOperationSpec& spec,
  139. ::TIntrusivePtr<IRawJob> mapper,
  140. ::TIntrusivePtr<IRawJob> reduceCombiner,
  141. ::TIntrusivePtr<IRawJob> reducer,
  142. const TOperationOptions& options) override;
  143. IOperationPtr Sort(
  144. const TSortOperationSpec& spec,
  145. const TOperationOptions& options) override;
  146. IOperationPtr Merge(
  147. const TMergeOperationSpec& spec,
  148. const TOperationOptions& options) override;
  149. IOperationPtr Erase(
  150. const TEraseOperationSpec& spec,
  151. const TOperationOptions& options) override;
  152. IOperationPtr RemoteCopy(
  153. const TRemoteCopyOperationSpec& spec,
  154. const TOperationOptions& options = TOperationOptions()) override;
  155. IOperationPtr RunVanilla(
  156. const TVanillaOperationSpec& spec,
  157. const TOperationOptions& options = TOperationOptions()) override;
  158. IOperationPtr AttachOperation(const TOperationId& operationId) override;
  159. EOperationBriefState CheckOperation(const TOperationId& operationId) override;
  160. void AbortOperation(const TOperationId& operationId) override;
  161. void CompleteOperation(const TOperationId& operationId) override;
  162. void WaitForOperation(const TOperationId& operationId) override;
  163. void AlterTable(
  164. const TYPath& path,
  165. const TAlterTableOptions& options) override;
  166. TBatchRequestPtr CreateBatchRequest() override;
  167. IClientPtr GetParentClient() override;
  168. const TClientContext& GetContext() const;
  169. const IClientRetryPolicyPtr& GetRetryPolicy() const;
  170. virtual ITransactionPingerPtr GetTransactionPinger() = 0;
  171. protected:
  172. virtual TClientPtr GetParentClientImpl() = 0;
  173. protected:
  174. const TClientContext Context_;
  175. TTransactionId TransactionId_;
  176. IClientRetryPolicyPtr ClientRetryPolicy_;
  177. private:
  178. ::TIntrusivePtr<TClientReader> CreateClientReader(
  179. const TRichYPath& path,
  180. const TFormat& format,
  181. const TTableReaderOptions& options,
  182. bool useFormatFromTableAttributes = false);
  183. THolder<TClientWriter> CreateClientWriter(
  184. const TRichYPath& path,
  185. const TFormat& format,
  186. const TTableWriterOptions& options);
  187. ::TIntrusivePtr<INodeReaderImpl> CreateNodeReader(
  188. const TRichYPath& path, const TTableReaderOptions& options) override;
  189. ::TIntrusivePtr<IYaMRReaderImpl> CreateYaMRReader(
  190. const TRichYPath& path, const TTableReaderOptions& options) override;
  191. ::TIntrusivePtr<IProtoReaderImpl> CreateProtoReader(
  192. const TRichYPath& path,
  193. const TTableReaderOptions& options,
  194. const Message* prototype) override;
  195. ::TIntrusivePtr<ISkiffRowReaderImpl> CreateSkiffRowReader(
  196. const TRichYPath& path,
  197. const TTableReaderOptions& options,
  198. const ISkiffRowSkipperPtr& skipper,
  199. const NSkiff::TSkiffSchemaPtr& schema) override;
  200. ::TIntrusivePtr<INodeWriterImpl> CreateNodeWriter(
  201. const TRichYPath& path, const TTableWriterOptions& options) override;
  202. ::TIntrusivePtr<IYaMRWriterImpl> CreateYaMRWriter(
  203. const TRichYPath& path, const TTableWriterOptions& options) override;
  204. ::TIntrusivePtr<IProtoWriterImpl> CreateProtoWriter(
  205. const TRichYPath& path,
  206. const TTableWriterOptions& options,
  207. const Message* prototype) override;
  208. };
  209. ////////////////////////////////////////////////////////////////////////////////
  210. class TTransaction
  211. : public ITransaction
  212. , public TClientBase
  213. {
  214. public:
  215. //
  216. // Start a new transaction.
  217. TTransaction(
  218. TClientPtr parentClient,
  219. const TClientContext& context,
  220. const TTransactionId& parentTransactionId,
  221. const TStartTransactionOptions& options);
  222. //
  223. // Attach an existing transaction.
  224. TTransaction(
  225. TClientPtr parentClient,
  226. const TClientContext& context,
  227. const TTransactionId& transactionId,
  228. const TAttachTransactionOptions& options);
  229. const TTransactionId& GetId() const override;
  230. ILockPtr Lock(
  231. const TYPath& path,
  232. ELockMode mode,
  233. const TLockOptions& options) override;
  234. void Unlock(
  235. const TYPath& path,
  236. const TUnlockOptions& options) override;
  237. void Commit() override;
  238. void Abort() override;
  239. void Ping() override;
  240. void Detach() override;
  241. ITransactionPingerPtr GetTransactionPinger() override;
  242. protected:
  243. TClientPtr GetParentClientImpl() override;
  244. private:
  245. ITransactionPingerPtr TransactionPinger_;
  246. THolder<TPingableTransaction> PingableTx_;
  247. TClientPtr ParentClient_;
  248. };
  249. ////////////////////////////////////////////////////////////////////////////////
  250. class TClient
  251. : public IClient
  252. , public TClientBase
  253. {
  254. public:
  255. TClient(
  256. const TClientContext& context,
  257. const TTransactionId& globalId,
  258. IClientRetryPolicyPtr retryPolicy);
  259. ~TClient();
  260. ITransactionPtr AttachTransaction(
  261. const TTransactionId& transactionId,
  262. const TAttachTransactionOptions& options) override;
  263. void MountTable(
  264. const TYPath& path,
  265. const TMountTableOptions& options) override;
  266. void UnmountTable(
  267. const TYPath& path,
  268. const TUnmountTableOptions& options) override;
  269. void RemountTable(
  270. const TYPath& path,
  271. const TRemountTableOptions& options) override;
  272. void FreezeTable(
  273. const TYPath& path,
  274. const TFreezeTableOptions& options) override;
  275. void UnfreezeTable(
  276. const TYPath& path,
  277. const TUnfreezeTableOptions& options) override;
  278. void ReshardTable(
  279. const TYPath& path,
  280. const TVector<TKey>& keys,
  281. const TReshardTableOptions& options) override;
  282. void ReshardTable(
  283. const TYPath& path,
  284. i64 tabletCount,
  285. const TReshardTableOptions& options) override;
  286. void InsertRows(
  287. const TYPath& path,
  288. const TNode::TListType& rows,
  289. const TInsertRowsOptions& options) override;
  290. void DeleteRows(
  291. const TYPath& path,
  292. const TNode::TListType& keys,
  293. const TDeleteRowsOptions& options) override;
  294. void TrimRows(
  295. const TYPath& path,
  296. i64 tabletIndex,
  297. i64 rowCount,
  298. const TTrimRowsOptions& options) override;
  299. TNode::TListType LookupRows(
  300. const TYPath& path,
  301. const TNode::TListType& keys,
  302. const TLookupRowsOptions& options) override;
  303. TNode::TListType SelectRows(
  304. const TString& query,
  305. const TSelectRowsOptions& options) override;
  306. void AlterTableReplica(
  307. const TReplicaId& replicaId,
  308. const TAlterTableReplicaOptions& alterTableReplicaOptions) override;
  309. ui64 GenerateTimestamp() override;
  310. TAuthorizationInfo WhoAmI() override;
  311. TOperationAttributes GetOperation(
  312. const TOperationId& operationId,
  313. const TGetOperationOptions& options) override;
  314. TOperationAttributes GetOperation(
  315. const TString& alias,
  316. const TGetOperationOptions& options) override;
  317. TListOperationsResult ListOperations(
  318. const TListOperationsOptions& options) override;
  319. void UpdateOperationParameters(
  320. const TOperationId& operationId,
  321. const TUpdateOperationParametersOptions& options) override;
  322. TJobAttributes GetJob(
  323. const TOperationId& operationId,
  324. const TJobId& jobId,
  325. const TGetJobOptions& options) override;
  326. TListJobsResult ListJobs(
  327. const TOperationId& operationId,
  328. const TListJobsOptions& options = TListJobsOptions()) override;
  329. IFileReaderPtr GetJobInput(
  330. const TJobId& jobId,
  331. const TGetJobInputOptions& options = TGetJobInputOptions()) override;
  332. IFileReaderPtr GetJobFailContext(
  333. const TOperationId& operationId,
  334. const TJobId& jobId,
  335. const TGetJobFailContextOptions& options = TGetJobFailContextOptions()) override;
  336. IFileReaderPtr GetJobStderr(
  337. const TOperationId& operationId,
  338. const TJobId& jobId,
  339. const TGetJobStderrOptions& options = TGetJobStderrOptions()) override;
  340. TNode::TListType SkyShareTable(
  341. const std::vector<TYPath>& tablePaths,
  342. const TSkyShareTableOptions& options = TSkyShareTableOptions()) override;
  343. TCheckPermissionResponse CheckPermission(
  344. const TString& user,
  345. EPermission permission,
  346. const TYPath& path,
  347. const TCheckPermissionOptions& options) override;
  348. TVector<TTabletInfo> GetTabletInfos(
  349. const TYPath& path,
  350. const TVector<int>& tabletIndexes,
  351. const TGetTabletInfosOptions& options) override;
  352. void SuspendOperation(
  353. const TOperationId& operationId,
  354. const TSuspendOperationOptions& options) override;
  355. void ResumeOperation(
  356. const TOperationId& operationId,
  357. const TResumeOperationOptions& options) override;
  358. void Shutdown() override;
  359. ITransactionPingerPtr GetTransactionPinger() override;
  360. // Helper methods
  361. TYtPoller& GetYtPoller();
  362. protected:
  363. TClientPtr GetParentClientImpl() override;
  364. private:
  365. template <class TOptions>
  366. void SetTabletParams(
  367. THttpHeader& header,
  368. const TYPath& path,
  369. const TOptions& options);
  370. void CheckShutdown() const;
  371. ITransactionPingerPtr TransactionPinger_;
  372. std::atomic<bool> Shutdown_ = false;
  373. TMutex Lock_;
  374. THolder<TYtPoller> YtPoller_;
  375. };
  376. ////////////////////////////////////////////////////////////////////////////////
  377. TClientPtr CreateClientImpl(
  378. const TString& serverName,
  379. const TCreateClientOptions& options = TCreateClientOptions());
  380. ////////////////////////////////////////////////////////////////////////////////
  381. } // namespace NDetail
  382. } // namespace NYT