worker_factory.h 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172
  1. #pragma once
  2. #include <yql/essentials/public/purecalc/common/interface.h>
  3. #include "processor_mode.h"
  4. #include <util/generic/ptr.h>
  5. #include <yql/essentials/ast/yql_expr.h>
  6. #include <yql/essentials/core/yql_user_data.h>
  7. #include <yql/essentials/minikql/mkql_function_registry.h>
  8. #include <yql/essentials/core/yql_type_annotation.h>
  9. #include <utility>
  10. namespace NYql {
  11. namespace NPureCalc {
  12. struct TWorkerFactoryOptions {
  13. IProgramFactoryPtr Factory;
  14. const TInputSpecBase& InputSpec;
  15. const TOutputSpecBase& OutputSpec;
  16. TStringBuf Query;
  17. TIntrusivePtr<NKikimr::NMiniKQL::IMutableFunctionRegistry> FuncRegistry;
  18. IModuleResolver::TPtr ModuleResolver;
  19. const TUserDataTable& UserData;
  20. const THashMap<TString, TString>& Modules;
  21. TString LLVMSettings;
  22. EBlockEngineMode BlockEngineMode;
  23. IOutputStream* ExprOutputStream;
  24. NKikimr::NUdf::ICountersProvider* CountersProvider_;
  25. ETranslationMode TranslationMode_;
  26. ui16 SyntaxVersion_;
  27. ui64 NativeYtTypeFlags_;
  28. TMaybe<ui64> DeterministicTimeProviderSeed_;
  29. bool UseSystemColumns;
  30. bool UseWorkerPool;
  31. bool UseAntlr4;
  32. TWorkerFactoryOptions(
  33. IProgramFactoryPtr Factory,
  34. const TInputSpecBase& InputSpec,
  35. const TOutputSpecBase& OutputSpec,
  36. TStringBuf Query,
  37. TIntrusivePtr<NKikimr::NMiniKQL::IMutableFunctionRegistry> FuncRegistry,
  38. IModuleResolver::TPtr ModuleResolver,
  39. const TUserDataTable& UserData,
  40. const THashMap<TString, TString>& Modules,
  41. TString LLVMSettings,
  42. EBlockEngineMode BlockEngineMode,
  43. IOutputStream* ExprOutputStream,
  44. NKikimr::NUdf::ICountersProvider* CountersProvider,
  45. ETranslationMode translationMode,
  46. ui16 syntaxVersion,
  47. ui64 nativeYtTypeFlags,
  48. TMaybe<ui64> deterministicTimeProviderSeed,
  49. bool useSystemColumns,
  50. bool useWorkerPool,
  51. bool useAntlr4
  52. )
  53. : Factory(std::move(Factory))
  54. , InputSpec(InputSpec)
  55. , OutputSpec(OutputSpec)
  56. , Query(Query)
  57. , FuncRegistry(std::move(FuncRegistry))
  58. , ModuleResolver(std::move(ModuleResolver))
  59. , UserData(UserData)
  60. , Modules(Modules)
  61. , LLVMSettings(std::move(LLVMSettings))
  62. , BlockEngineMode(BlockEngineMode)
  63. , ExprOutputStream(ExprOutputStream)
  64. , CountersProvider_(CountersProvider)
  65. , TranslationMode_(translationMode)
  66. , SyntaxVersion_(syntaxVersion)
  67. , NativeYtTypeFlags_(nativeYtTypeFlags)
  68. , DeterministicTimeProviderSeed_(deterministicTimeProviderSeed)
  69. , UseSystemColumns(useSystemColumns)
  70. , UseWorkerPool(useWorkerPool)
  71. , UseAntlr4(useAntlr4)
  72. {
  73. }
  74. };
  75. template <typename TBase>
  76. class TWorkerFactory: public TBase {
  77. private:
  78. IProgramFactoryPtr Factory_;
  79. protected:
  80. TIntrusivePtr<NKikimr::NMiniKQL::IMutableFunctionRegistry> FuncRegistry_;
  81. const TUserDataTable& UserData_;
  82. TExprContext ExprContext_;
  83. TExprNode::TPtr ExprRoot_;
  84. TString SerializedProgram_;
  85. TVector<const TStructExprType*> InputTypes_;
  86. TVector<const TStructExprType*> OriginalInputTypes_;
  87. TVector<const TStructExprType*> RawInputTypes_;
  88. const TTypeAnnotationNode* OutputType_;
  89. const TTypeAnnotationNode* RawOutputType_;
  90. TVector<THashSet<TString>> AllColumns_;
  91. TVector<THashSet<TString>> UsedColumns_;
  92. TString LLVMSettings_;
  93. EBlockEngineMode BlockEngineMode_;
  94. IOutputStream* ExprOutputStream_;
  95. NKikimr::NUdf::ICountersProvider* CountersProvider_;
  96. ui64 NativeYtTypeFlags_;
  97. TMaybe<ui64> DeterministicTimeProviderSeed_;
  98. bool UseSystemColumns_;
  99. bool UseWorkerPool_;
  100. TVector<THolder<IWorker>> WorkerPool_;
  101. public:
  102. TWorkerFactory(TWorkerFactoryOptions, EProcessorMode);
  103. public:
  104. NYT::TNode MakeInputSchema(ui32) const override;
  105. NYT::TNode MakeInputSchema() const override;
  106. NYT::TNode MakeOutputSchema() const override;
  107. NYT::TNode MakeOutputSchema(ui32) const override;
  108. NYT::TNode MakeOutputSchema(TStringBuf) const override;
  109. NYT::TNode MakeFullOutputSchema() const override;
  110. const THashSet<TString>& GetUsedColumns(ui32 inputIndex) const override;
  111. const THashSet<TString>& GetUsedColumns() const override;
  112. TIssues GetIssues() const override;
  113. TString GetCompiledProgram() override;
  114. protected:
  115. void ReturnWorker(IWorker* worker) override;
  116. private:
  117. TExprNode::TPtr Compile(TStringBuf query,
  118. ETranslationMode mode,
  119. IModuleResolver::TPtr moduleResolver,
  120. ui16 syntaxVersion,
  121. const THashMap<TString, TString>& modules,
  122. const TInputSpecBase& inputSpec,
  123. const TOutputSpecBase& outputSpec,
  124. bool useAntlr4,
  125. EProcessorMode processorMode);
  126. };
  127. class TPullStreamWorkerFactory final: public TWorkerFactory<IPullStreamWorkerFactory> {
  128. public:
  129. explicit TPullStreamWorkerFactory(TWorkerFactoryOptions options)
  130. : TWorkerFactory(std::move(options), EProcessorMode::PullStream)
  131. {
  132. }
  133. public:
  134. TWorkerHolder<IPullStreamWorker> MakeWorker() override;
  135. };
  136. class TPullListWorkerFactory final: public TWorkerFactory<IPullListWorkerFactory> {
  137. public:
  138. explicit TPullListWorkerFactory(TWorkerFactoryOptions options)
  139. : TWorkerFactory(std::move(options), EProcessorMode::PullList)
  140. {
  141. }
  142. public:
  143. TWorkerHolder<IPullListWorker> MakeWorker() override;
  144. };
  145. class TPushStreamWorkerFactory final: public TWorkerFactory<IPushStreamWorkerFactory> {
  146. public:
  147. explicit TPushStreamWorkerFactory(TWorkerFactoryOptions options)
  148. : TWorkerFactory(std::move(options), EProcessorMode::PushStream)
  149. {
  150. }
  151. public:
  152. TWorkerHolder<IPushStreamWorker> MakeWorker() override;
  153. };
  154. }
  155. }