#pragma once #include #include "processor_mode.h" #include #include #include #include #include #include namespace NYql { namespace NPureCalc { struct TWorkerFactoryOptions { IProgramFactoryPtr Factory; const TInputSpecBase& InputSpec; const TOutputSpecBase& OutputSpec; TStringBuf Query; TIntrusivePtr FuncRegistry; IModuleResolver::TPtr ModuleResolver; const TUserDataTable& UserData; const THashMap& Modules; TString LLVMSettings; EBlockEngineMode BlockEngineMode; IOutputStream* ExprOutputStream; NKikimr::NUdf::ICountersProvider* CountersProvider_; ETranslationMode TranslationMode_; ui16 SyntaxVersion_; ui64 NativeYtTypeFlags_; TMaybe DeterministicTimeProviderSeed_; bool UseSystemColumns; bool UseWorkerPool; TWorkerFactoryOptions( IProgramFactoryPtr Factory, const TInputSpecBase& InputSpec, const TOutputSpecBase& OutputSpec, TStringBuf Query, TIntrusivePtr FuncRegistry, IModuleResolver::TPtr ModuleResolver, const TUserDataTable& UserData, const THashMap& Modules, TString LLVMSettings, EBlockEngineMode BlockEngineMode, IOutputStream* ExprOutputStream, NKikimr::NUdf::ICountersProvider* CountersProvider, ETranslationMode translationMode, ui16 syntaxVersion, ui64 nativeYtTypeFlags, TMaybe deterministicTimeProviderSeed, bool useSystemColumns, bool useWorkerPool ) : Factory(std::move(Factory)) , InputSpec(InputSpec) , OutputSpec(OutputSpec) , Query(Query) , FuncRegistry(std::move(FuncRegistry)) , ModuleResolver(std::move(ModuleResolver)) , UserData(UserData) , Modules(Modules) , LLVMSettings(std::move(LLVMSettings)) , BlockEngineMode(BlockEngineMode) , ExprOutputStream(ExprOutputStream) , CountersProvider_(CountersProvider) , TranslationMode_(translationMode) , SyntaxVersion_(syntaxVersion) , NativeYtTypeFlags_(nativeYtTypeFlags) , DeterministicTimeProviderSeed_(deterministicTimeProviderSeed) , UseSystemColumns(useSystemColumns) , UseWorkerPool(useWorkerPool) { } }; template class TWorkerFactory: public TBase { private: IProgramFactoryPtr Factory_; protected: TIntrusivePtr FuncRegistry_; const TUserDataTable& UserData_; TExprContext ExprContext_; TExprNode::TPtr ExprRoot_; TString SerializedProgram_; TVector InputTypes_; TVector OriginalInputTypes_; TVector RawInputTypes_; const TTypeAnnotationNode* OutputType_; const TTypeAnnotationNode* RawOutputType_; TVector> AllColumns_; TVector> UsedColumns_; TString LLVMSettings_; EBlockEngineMode BlockEngineMode_; IOutputStream* ExprOutputStream_; NKikimr::NUdf::ICountersProvider* CountersProvider_; ui64 NativeYtTypeFlags_; TMaybe DeterministicTimeProviderSeed_; bool UseSystemColumns_; bool UseWorkerPool_; TVector> WorkerPool_; public: TWorkerFactory(TWorkerFactoryOptions, EProcessorMode); public: NYT::TNode MakeInputSchema(ui32) const override; NYT::TNode MakeInputSchema() const override; NYT::TNode MakeOutputSchema() const override; NYT::TNode MakeOutputSchema(ui32) const override; NYT::TNode MakeOutputSchema(TStringBuf) const override; NYT::TNode MakeFullOutputSchema() const override; const THashSet& GetUsedColumns(ui32 inputIndex) const override; const THashSet& GetUsedColumns() const override; TIssues GetIssues() const override; TString GetCompiledProgram() override; protected: void ReturnWorker(IWorker* worker) override; private: TExprNode::TPtr Compile(TStringBuf query, ETranslationMode mode, IModuleResolver::TPtr moduleResolver, ui16 syntaxVersion, const THashMap& modules, const TInputSpecBase& inputSpec, const TOutputSpecBase& outputSpec, EProcessorMode processorMode); }; class TPullStreamWorkerFactory final: public TWorkerFactory { public: explicit TPullStreamWorkerFactory(TWorkerFactoryOptions options) : TWorkerFactory(std::move(options), EProcessorMode::PullStream) { } public: TWorkerHolder MakeWorker() override; }; class TPullListWorkerFactory final: public TWorkerFactory { public: explicit TPullListWorkerFactory(TWorkerFactoryOptions options) : TWorkerFactory(std::move(options), EProcessorMode::PullList) { } public: TWorkerHolder MakeWorker() override; }; class TPushStreamWorkerFactory final: public TWorkerFactory { public: explicit TPushStreamWorkerFactory(TWorkerFactoryOptions options) : TWorkerFactory(std::move(options), EProcessorMode::PushStream) { } public: TWorkerHolder MakeWorker() override; }; } }