123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510 |
- #pragma once
- #include "client_reader.h"
- #include "client_writer.h"
- #include "transaction_pinger.h"
- #include <yt/cpp/mapreduce/interface/client.h>
- #include <yt/cpp/mapreduce/http/context.h>
- #include <yt/cpp/mapreduce/http/requests.h>
- namespace NYT {
- namespace NDetail {
- ////////////////////////////////////////////////////////////////////////////////
- class TYtPoller;
- class TClientBase;
- using TClientBasePtr = ::TIntrusivePtr<TClientBase>;
- class TClient;
- using TClientPtr = ::TIntrusivePtr<TClient>;
- ////////////////////////////////////////////////////////////////////////////////
- class TClientBase
- : virtual public IClientBase
- {
- public:
- TClientBase(
- const TClientContext& context,
- const TTransactionId& transactionId,
- IClientRetryPolicyPtr retryPolicy);
- ITransactionPtr StartTransaction(
- const TStartTransactionOptions& options) override;
- // cypress
- TNodeId Create(
- const TYPath& path,
- ENodeType type,
- const TCreateOptions& options) override;
- void Remove(
- const TYPath& path,
- const TRemoveOptions& options) override;
- bool Exists(
- const TYPath& path,
- const TExistsOptions& options) override;
- TNode Get(
- const TYPath& path,
- const TGetOptions& options) override;
- void Set(
- const TYPath& path,
- const TNode& value,
- const TSetOptions& options) override;
- void MultisetAttributes(
- const TYPath& path,
- const TNode::TMapType& value,
- const TMultisetAttributesOptions& options) override;
- TNode::TListType List(
- const TYPath& path,
- const TListOptions& options) override;
- TNodeId Copy(
- const TYPath& sourcePath,
- const TYPath& destinationPath,
- const TCopyOptions& options) override;
- TNodeId Move(
- const TYPath& sourcePath,
- const TYPath& destinationPath,
- const TMoveOptions& options) override;
- TNodeId Link(
- const TYPath& targetPath,
- const TYPath& linkPath,
- const TLinkOptions& options) override;
- void Concatenate(
- const TVector<TRichYPath>& sourcePaths,
- const TRichYPath& destinationPath,
- const TConcatenateOptions& options) override;
- TRichYPath CanonizeYPath(const TRichYPath& path) override;
- TVector<TTableColumnarStatistics> GetTableColumnarStatistics(
- const TVector<TRichYPath>& paths,
- const TGetTableColumnarStatisticsOptions& options) override;
- TMultiTablePartitions GetTablePartitions(
- const TVector<TRichYPath>& paths,
- const TGetTablePartitionsOptions& options) override;
- TMaybe<TYPath> GetFileFromCache(
- const TString& md5Signature,
- const TYPath& cachePath,
- const TGetFileFromCacheOptions& options = TGetFileFromCacheOptions()) override;
- TYPath PutFileToCache(
- const TYPath& filePath,
- const TString& md5Signature,
- const TYPath& cachePath,
- const TPutFileToCacheOptions& options = TPutFileToCacheOptions()) override;
- IFileReaderPtr CreateFileReader(
- const TRichYPath& path,
- const TFileReaderOptions& options) override;
- IFileWriterPtr CreateFileWriter(
- const TRichYPath& path,
- const TFileWriterOptions& options) override;
- TTableWriterPtr<::google::protobuf::Message> CreateTableWriter(
- const TRichYPath& path,
- const ::google::protobuf::Descriptor& descriptor,
- const TTableWriterOptions& options) override;
- TRawTableReaderPtr CreateRawReader(
- const TRichYPath& path,
- const TFormat& format,
- const TTableReaderOptions& options) override;
- TRawTableWriterPtr CreateRawWriter(
- const TRichYPath& path,
- const TFormat& format,
- const TTableWriterOptions& options) override;
- IFileReaderPtr CreateBlobTableReader(
- const TYPath& path,
- const TKey& key,
- const TBlobTableReaderOptions& options) override;
- // operations
- IOperationPtr DoMap(
- const TMapOperationSpec& spec,
- ::TIntrusivePtr<IStructuredJob> mapper,
- const TOperationOptions& options) override;
- IOperationPtr RawMap(
- const TRawMapOperationSpec& spec,
- ::TIntrusivePtr<IRawJob> mapper,
- const TOperationOptions& options) override;
- IOperationPtr DoReduce(
- const TReduceOperationSpec& spec,
- ::TIntrusivePtr<IStructuredJob> reducer,
- const TOperationOptions& options) override;
- IOperationPtr RawReduce(
- const TRawReduceOperationSpec& spec,
- ::TIntrusivePtr<IRawJob> mapper,
- const TOperationOptions& options) override;
- IOperationPtr DoJoinReduce(
- const TJoinReduceOperationSpec& spec,
- ::TIntrusivePtr<IStructuredJob> reducer,
- const TOperationOptions& options) override;
- IOperationPtr RawJoinReduce(
- const TRawJoinReduceOperationSpec& spec,
- ::TIntrusivePtr<IRawJob> mapper,
- const TOperationOptions& options) override;
- IOperationPtr DoMapReduce(
- const TMapReduceOperationSpec& spec,
- ::TIntrusivePtr<IStructuredJob> mapper,
- ::TIntrusivePtr<IStructuredJob> reduceCombiner,
- ::TIntrusivePtr<IStructuredJob> reducer,
- const TOperationOptions& options) override;
- IOperationPtr RawMapReduce(
- const TRawMapReduceOperationSpec& spec,
- ::TIntrusivePtr<IRawJob> mapper,
- ::TIntrusivePtr<IRawJob> reduceCombiner,
- ::TIntrusivePtr<IRawJob> reducer,
- const TOperationOptions& options) override;
- IOperationPtr Sort(
- const TSortOperationSpec& spec,
- const TOperationOptions& options) override;
- IOperationPtr Merge(
- const TMergeOperationSpec& spec,
- const TOperationOptions& options) override;
- IOperationPtr Erase(
- const TEraseOperationSpec& spec,
- const TOperationOptions& options) override;
- IOperationPtr RemoteCopy(
- const TRemoteCopyOperationSpec& spec,
- const TOperationOptions& options = TOperationOptions()) override;
- IOperationPtr RunVanilla(
- const TVanillaOperationSpec& spec,
- const TOperationOptions& options = TOperationOptions()) override;
- IOperationPtr AttachOperation(const TOperationId& operationId) override;
- EOperationBriefState CheckOperation(const TOperationId& operationId) override;
- void AbortOperation(const TOperationId& operationId) override;
- void CompleteOperation(const TOperationId& operationId) override;
- void WaitForOperation(const TOperationId& operationId) override;
- void AlterTable(
- const TYPath& path,
- const TAlterTableOptions& options) override;
- TBatchRequestPtr CreateBatchRequest() override;
- IClientPtr GetParentClient() override;
- const TClientContext& GetContext() const;
- const IClientRetryPolicyPtr& GetRetryPolicy() const;
- virtual ITransactionPingerPtr GetTransactionPinger() = 0;
- protected:
- virtual TClientPtr GetParentClientImpl() = 0;
- protected:
- const TClientContext Context_;
- TTransactionId TransactionId_;
- IClientRetryPolicyPtr ClientRetryPolicy_;
- private:
- ::TIntrusivePtr<TClientReader> CreateClientReader(
- const TRichYPath& path,
- const TFormat& format,
- const TTableReaderOptions& options,
- bool useFormatFromTableAttributes = false);
- THolder<TClientWriter> CreateClientWriter(
- const TRichYPath& path,
- const TFormat& format,
- const TTableWriterOptions& options);
- ::TIntrusivePtr<INodeReaderImpl> CreateNodeReader(
- const TRichYPath& path, const TTableReaderOptions& options) override;
- ::TIntrusivePtr<IYaMRReaderImpl> CreateYaMRReader(
- const TRichYPath& path, const TTableReaderOptions& options) override;
- ::TIntrusivePtr<IProtoReaderImpl> CreateProtoReader(
- const TRichYPath& path,
- const TTableReaderOptions& options,
- const Message* prototype) override;
- ::TIntrusivePtr<ISkiffRowReaderImpl> CreateSkiffRowReader(
- const TRichYPath& path,
- const TTableReaderOptions& options,
- const ISkiffRowSkipperPtr& skipper,
- const NSkiff::TSkiffSchemaPtr& schema) override;
- ::TIntrusivePtr<INodeWriterImpl> CreateNodeWriter(
- const TRichYPath& path, const TTableWriterOptions& options) override;
- ::TIntrusivePtr<IYaMRWriterImpl> CreateYaMRWriter(
- const TRichYPath& path, const TTableWriterOptions& options) override;
- ::TIntrusivePtr<IProtoWriterImpl> CreateProtoWriter(
- const TRichYPath& path,
- const TTableWriterOptions& options,
- const Message* prototype) override;
- };
- ////////////////////////////////////////////////////////////////////////////////
- class TTransaction
- : public ITransaction
- , public TClientBase
- {
- public:
- //
- // Start a new transaction.
- TTransaction(
- TClientPtr parentClient,
- const TClientContext& context,
- const TTransactionId& parentTransactionId,
- const TStartTransactionOptions& options);
- //
- // Attach an existing transaction.
- TTransaction(
- TClientPtr parentClient,
- const TClientContext& context,
- const TTransactionId& transactionId,
- const TAttachTransactionOptions& options);
- const TTransactionId& GetId() const override;
- ILockPtr Lock(
- const TYPath& path,
- ELockMode mode,
- const TLockOptions& options) override;
- void Unlock(
- const TYPath& path,
- const TUnlockOptions& options) override;
- void Commit() override;
- void Abort() override;
- void Ping() override;
- void Detach() override;
- ITransactionPingerPtr GetTransactionPinger() override;
- protected:
- TClientPtr GetParentClientImpl() override;
- private:
- ITransactionPingerPtr TransactionPinger_;
- THolder<TPingableTransaction> PingableTx_;
- TClientPtr ParentClient_;
- };
- ////////////////////////////////////////////////////////////////////////////////
- class TClient
- : public IClient
- , public TClientBase
- {
- public:
- TClient(
- const TClientContext& context,
- const TTransactionId& globalId,
- IClientRetryPolicyPtr retryPolicy);
- ~TClient();
- ITransactionPtr AttachTransaction(
- const TTransactionId& transactionId,
- const TAttachTransactionOptions& options) override;
- void MountTable(
- const TYPath& path,
- const TMountTableOptions& options) override;
- void UnmountTable(
- const TYPath& path,
- const TUnmountTableOptions& options) override;
- void RemountTable(
- const TYPath& path,
- const TRemountTableOptions& options) override;
- void FreezeTable(
- const TYPath& path,
- const TFreezeTableOptions& options) override;
- void UnfreezeTable(
- const TYPath& path,
- const TUnfreezeTableOptions& options) override;
- void ReshardTable(
- const TYPath& path,
- const TVector<TKey>& keys,
- const TReshardTableOptions& options) override;
- void ReshardTable(
- const TYPath& path,
- i64 tabletCount,
- const TReshardTableOptions& options) override;
- void InsertRows(
- const TYPath& path,
- const TNode::TListType& rows,
- const TInsertRowsOptions& options) override;
- void DeleteRows(
- const TYPath& path,
- const TNode::TListType& keys,
- const TDeleteRowsOptions& options) override;
- void TrimRows(
- const TYPath& path,
- i64 tabletIndex,
- i64 rowCount,
- const TTrimRowsOptions& options) override;
- TNode::TListType LookupRows(
- const TYPath& path,
- const TNode::TListType& keys,
- const TLookupRowsOptions& options) override;
- TNode::TListType SelectRows(
- const TString& query,
- const TSelectRowsOptions& options) override;
- void AlterTableReplica(
- const TReplicaId& replicaId,
- const TAlterTableReplicaOptions& alterTableReplicaOptions) override;
- ui64 GenerateTimestamp() override;
- TAuthorizationInfo WhoAmI() override;
- TOperationAttributes GetOperation(
- const TOperationId& operationId,
- const TGetOperationOptions& options) override;
- TOperationAttributes GetOperation(
- const TString& alias,
- const TGetOperationOptions& options) override;
- TListOperationsResult ListOperations(
- const TListOperationsOptions& options) override;
- void UpdateOperationParameters(
- const TOperationId& operationId,
- const TUpdateOperationParametersOptions& options) override;
- TJobAttributes GetJob(
- const TOperationId& operationId,
- const TJobId& jobId,
- const TGetJobOptions& options) override;
- TListJobsResult ListJobs(
- const TOperationId& operationId,
- const TListJobsOptions& options = TListJobsOptions()) override;
- IFileReaderPtr GetJobInput(
- const TJobId& jobId,
- const TGetJobInputOptions& options = TGetJobInputOptions()) override;
- IFileReaderPtr GetJobFailContext(
- const TOperationId& operationId,
- const TJobId& jobId,
- const TGetJobFailContextOptions& options = TGetJobFailContextOptions()) override;
- IFileReaderPtr GetJobStderr(
- const TOperationId& operationId,
- const TJobId& jobId,
- const TGetJobStderrOptions& options = TGetJobStderrOptions()) override;
- TNode::TListType SkyShareTable(
- const std::vector<TYPath>& tablePaths,
- const TSkyShareTableOptions& options = TSkyShareTableOptions()) override;
- TCheckPermissionResponse CheckPermission(
- const TString& user,
- EPermission permission,
- const TYPath& path,
- const TCheckPermissionOptions& options) override;
- TVector<TTabletInfo> GetTabletInfos(
- const TYPath& path,
- const TVector<int>& tabletIndexes,
- const TGetTabletInfosOptions& options) override;
- void SuspendOperation(
- const TOperationId& operationId,
- const TSuspendOperationOptions& options) override;
- void ResumeOperation(
- const TOperationId& operationId,
- const TResumeOperationOptions& options) override;
- void Shutdown() override;
- ITransactionPingerPtr GetTransactionPinger() override;
- // Helper methods
- TYtPoller& GetYtPoller();
- protected:
- TClientPtr GetParentClientImpl() override;
- private:
- template <class TOptions>
- void SetTabletParams(
- THttpHeader& header,
- const TYPath& path,
- const TOptions& options);
- void CheckShutdown() const;
- ITransactionPingerPtr TransactionPinger_;
- std::atomic<bool> Shutdown_ = false;
- TMutex Lock_;
- THolder<TYtPoller> YtPoller_;
- };
- ////////////////////////////////////////////////////////////////////////////////
- TClientPtr CreateClientImpl(
- const TString& serverName,
- const TCreateClientOptions& options = TCreateClientOptions());
- ////////////////////////////////////////////////////////////////////////////////
- } // namespace NDetail
- } // namespace NYT
|