#pragma once #include "client_reader.h" #include "client_writer.h" #include "transaction_pinger.h" #include #include #include namespace NYT { namespace NDetail { //////////////////////////////////////////////////////////////////////////////// class TYtPoller; class TClientBase; using TClientBasePtr = ::TIntrusivePtr; class TClient; using TClientPtr = ::TIntrusivePtr; //////////////////////////////////////////////////////////////////////////////// class TClientBase : virtual public IClientBase { public: TClientBase( const TClientContext& context, const TTransactionId& transactionId, IClientRetryPolicyPtr retryPolicy); ITransactionPtr StartTransaction( const TStartTransactionOptions& options) override; // cypress TNodeId Create( const TYPath& path, ENodeType type, const TCreateOptions& options) override; void Remove( const TYPath& path, const TRemoveOptions& options) override; bool Exists( const TYPath& path, const TExistsOptions& options) override; TNode Get( const TYPath& path, const TGetOptions& options) override; void Set( const TYPath& path, const TNode& value, const TSetOptions& options) override; void MultisetAttributes( const TYPath& path, const TNode::TMapType& value, const TMultisetAttributesOptions& options) override; TNode::TListType List( const TYPath& path, const TListOptions& options) override; TNodeId Copy( const TYPath& sourcePath, const TYPath& destinationPath, const TCopyOptions& options) override; TNodeId Move( const TYPath& sourcePath, const TYPath& destinationPath, const TMoveOptions& options) override; TNodeId Link( const TYPath& targetPath, const TYPath& linkPath, const TLinkOptions& options) override; void Concatenate( const TVector& sourcePaths, const TRichYPath& destinationPath, const TConcatenateOptions& options) override; TRichYPath CanonizeYPath(const TRichYPath& path) override; TVector GetTableColumnarStatistics( const TVector& paths, const TGetTableColumnarStatisticsOptions& options) override; TMultiTablePartitions GetTablePartitions( const TVector& paths, const TGetTablePartitionsOptions& options) override; TMaybe GetFileFromCache( const TString& md5Signature, const TYPath& cachePath, const TGetFileFromCacheOptions& options = TGetFileFromCacheOptions()) override; TYPath PutFileToCache( const TYPath& filePath, const TString& md5Signature, const TYPath& cachePath, const TPutFileToCacheOptions& options = TPutFileToCacheOptions()) override; IFileReaderPtr CreateFileReader( const TRichYPath& path, const TFileReaderOptions& options) override; IFileWriterPtr CreateFileWriter( const TRichYPath& path, const TFileWriterOptions& options) override; TTableWriterPtr<::google::protobuf::Message> CreateTableWriter( const TRichYPath& path, const ::google::protobuf::Descriptor& descriptor, const TTableWriterOptions& options) override; TRawTableReaderPtr CreateRawReader( const TRichYPath& path, const TFormat& format, const TTableReaderOptions& options) override; TRawTableWriterPtr CreateRawWriter( const TRichYPath& path, const TFormat& format, const TTableWriterOptions& options) override; IFileReaderPtr CreateBlobTableReader( const TYPath& path, const TKey& key, const TBlobTableReaderOptions& options) override; // operations IOperationPtr DoMap( const TMapOperationSpec& spec, ::TIntrusivePtr mapper, const TOperationOptions& options) override; IOperationPtr RawMap( const TRawMapOperationSpec& spec, ::TIntrusivePtr mapper, const TOperationOptions& options) override; IOperationPtr DoReduce( const TReduceOperationSpec& spec, ::TIntrusivePtr reducer, const TOperationOptions& options) override; IOperationPtr RawReduce( const TRawReduceOperationSpec& spec, ::TIntrusivePtr mapper, const TOperationOptions& options) override; IOperationPtr DoJoinReduce( const TJoinReduceOperationSpec& spec, ::TIntrusivePtr reducer, const TOperationOptions& options) override; IOperationPtr RawJoinReduce( const TRawJoinReduceOperationSpec& spec, ::TIntrusivePtr mapper, const TOperationOptions& options) override; IOperationPtr DoMapReduce( const TMapReduceOperationSpec& spec, ::TIntrusivePtr mapper, ::TIntrusivePtr reduceCombiner, ::TIntrusivePtr reducer, const TOperationOptions& options) override; IOperationPtr RawMapReduce( const TRawMapReduceOperationSpec& spec, ::TIntrusivePtr mapper, ::TIntrusivePtr reduceCombiner, ::TIntrusivePtr reducer, const TOperationOptions& options) override; IOperationPtr Sort( const TSortOperationSpec& spec, const TOperationOptions& options) override; IOperationPtr Merge( const TMergeOperationSpec& spec, const TOperationOptions& options) override; IOperationPtr Erase( const TEraseOperationSpec& spec, const TOperationOptions& options) override; IOperationPtr RemoteCopy( const TRemoteCopyOperationSpec& spec, const TOperationOptions& options = TOperationOptions()) override; IOperationPtr RunVanilla( const TVanillaOperationSpec& spec, const TOperationOptions& options = TOperationOptions()) override; IOperationPtr AttachOperation(const TOperationId& operationId) override; EOperationBriefState CheckOperation(const TOperationId& operationId) override; void AbortOperation(const TOperationId& operationId) override; void CompleteOperation(const TOperationId& operationId) override; void WaitForOperation(const TOperationId& operationId) override; void AlterTable( const TYPath& path, const TAlterTableOptions& options) override; TBatchRequestPtr CreateBatchRequest() override; IClientPtr GetParentClient() override; const TClientContext& GetContext() const; const IClientRetryPolicyPtr& GetRetryPolicy() const; virtual ITransactionPingerPtr GetTransactionPinger() = 0; protected: virtual TClientPtr GetParentClientImpl() = 0; protected: const TClientContext Context_; TTransactionId TransactionId_; IClientRetryPolicyPtr ClientRetryPolicy_; private: ::TIntrusivePtr CreateClientReader( const TRichYPath& path, const TFormat& format, const TTableReaderOptions& options, bool useFormatFromTableAttributes = false); THolder CreateClientWriter( const TRichYPath& path, const TFormat& format, const TTableWriterOptions& options); ::TIntrusivePtr CreateNodeReader( const TRichYPath& path, const TTableReaderOptions& options) override; ::TIntrusivePtr CreateYaMRReader( const TRichYPath& path, const TTableReaderOptions& options) override; ::TIntrusivePtr CreateProtoReader( const TRichYPath& path, const TTableReaderOptions& options, const Message* prototype) override; ::TIntrusivePtr CreateSkiffRowReader( const TRichYPath& path, const TTableReaderOptions& options, const ISkiffRowSkipperPtr& skipper, const NSkiff::TSkiffSchemaPtr& schema) override; ::TIntrusivePtr CreateNodeWriter( const TRichYPath& path, const TTableWriterOptions& options) override; ::TIntrusivePtr CreateYaMRWriter( const TRichYPath& path, const TTableWriterOptions& options) override; ::TIntrusivePtr CreateProtoWriter( const TRichYPath& path, const TTableWriterOptions& options, const Message* prototype) override; }; //////////////////////////////////////////////////////////////////////////////// class TTransaction : public ITransaction , public TClientBase { public: // // Start a new transaction. TTransaction( TClientPtr parentClient, const TClientContext& context, const TTransactionId& parentTransactionId, const TStartTransactionOptions& options); // // Attach an existing transaction. TTransaction( TClientPtr parentClient, const TClientContext& context, const TTransactionId& transactionId, const TAttachTransactionOptions& options); const TTransactionId& GetId() const override; ILockPtr Lock( const TYPath& path, ELockMode mode, const TLockOptions& options) override; void Unlock( const TYPath& path, const TUnlockOptions& options) override; void Commit() override; void Abort() override; void Ping() override; void Detach() override; ITransactionPingerPtr GetTransactionPinger() override; protected: TClientPtr GetParentClientImpl() override; private: ITransactionPingerPtr TransactionPinger_; THolder PingableTx_; TClientPtr ParentClient_; }; //////////////////////////////////////////////////////////////////////////////// class TClient : public IClient , public TClientBase { public: TClient( const TClientContext& context, const TTransactionId& globalId, IClientRetryPolicyPtr retryPolicy); ~TClient(); ITransactionPtr AttachTransaction( const TTransactionId& transactionId, const TAttachTransactionOptions& options) override; void MountTable( const TYPath& path, const TMountTableOptions& options) override; void UnmountTable( const TYPath& path, const TUnmountTableOptions& options) override; void RemountTable( const TYPath& path, const TRemountTableOptions& options) override; void FreezeTable( const TYPath& path, const TFreezeTableOptions& options) override; void UnfreezeTable( const TYPath& path, const TUnfreezeTableOptions& options) override; void ReshardTable( const TYPath& path, const TVector& keys, const TReshardTableOptions& options) override; void ReshardTable( const TYPath& path, i64 tabletCount, const TReshardTableOptions& options) override; void InsertRows( const TYPath& path, const TNode::TListType& rows, const TInsertRowsOptions& options) override; void DeleteRows( const TYPath& path, const TNode::TListType& keys, const TDeleteRowsOptions& options) override; void TrimRows( const TYPath& path, i64 tabletIndex, i64 rowCount, const TTrimRowsOptions& options) override; TNode::TListType LookupRows( const TYPath& path, const TNode::TListType& keys, const TLookupRowsOptions& options) override; TNode::TListType SelectRows( const TString& query, const TSelectRowsOptions& options) override; void AlterTableReplica( const TReplicaId& replicaId, const TAlterTableReplicaOptions& alterTableReplicaOptions) override; ui64 GenerateTimestamp() override; TAuthorizationInfo WhoAmI() override; TOperationAttributes GetOperation( const TOperationId& operationId, const TGetOperationOptions& options) override; TOperationAttributes GetOperation( const TString& alias, const TGetOperationOptions& options) override; TListOperationsResult ListOperations( const TListOperationsOptions& options) override; void UpdateOperationParameters( const TOperationId& operationId, const TUpdateOperationParametersOptions& options) override; TJobAttributes GetJob( const TOperationId& operationId, const TJobId& jobId, const TGetJobOptions& options) override; TListJobsResult ListJobs( const TOperationId& operationId, const TListJobsOptions& options = TListJobsOptions()) override; IFileReaderPtr GetJobInput( const TJobId& jobId, const TGetJobInputOptions& options = TGetJobInputOptions()) override; IFileReaderPtr GetJobFailContext( const TOperationId& operationId, const TJobId& jobId, const TGetJobFailContextOptions& options = TGetJobFailContextOptions()) override; IFileReaderPtr GetJobStderr( const TOperationId& operationId, const TJobId& jobId, const TGetJobStderrOptions& options = TGetJobStderrOptions()) override; TNode::TListType SkyShareTable( const std::vector& tablePaths, const TSkyShareTableOptions& options = TSkyShareTableOptions()) override; TCheckPermissionResponse CheckPermission( const TString& user, EPermission permission, const TYPath& path, const TCheckPermissionOptions& options) override; TVector GetTabletInfos( const TYPath& path, const TVector& tabletIndexes, const TGetTabletInfosOptions& options) override; void SuspendOperation( const TOperationId& operationId, const TSuspendOperationOptions& options) override; void ResumeOperation( const TOperationId& operationId, const TResumeOperationOptions& options) override; void Shutdown() override; ITransactionPingerPtr GetTransactionPinger() override; // Helper methods TYtPoller& GetYtPoller(); protected: TClientPtr GetParentClientImpl() override; private: template void SetTabletParams( THttpHeader& header, const TYPath& path, const TOptions& options); void CheckShutdown() const; ITransactionPingerPtr TransactionPinger_; std::atomic Shutdown_ = false; TMutex Lock_; THolder YtPoller_; }; //////////////////////////////////////////////////////////////////////////////// TClientPtr CreateClientImpl( const TString& serverName, const TCreateClientOptions& options = TCreateClientOptions()); //////////////////////////////////////////////////////////////////////////////// } // namespace NDetail } // namespace NYT