worker_factory.h 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168
  1. #pragma once
  2. #include <yql/essentials/public/purecalc/common/interface.h>
  3. #include "processor_mode.h"
  4. #include <util/generic/ptr.h>
  5. #include <yql/essentials/ast/yql_expr.h>
  6. #include <yql/essentials/core/yql_user_data.h>
  7. #include <yql/essentials/minikql/mkql_function_registry.h>
  8. #include <yql/essentials/core/yql_type_annotation.h>
  9. #include <utility>
  10. namespace NYql {
  11. namespace NPureCalc {
  12. struct TWorkerFactoryOptions {
  13. IProgramFactoryPtr Factory;
  14. const TInputSpecBase& InputSpec;
  15. const TOutputSpecBase& OutputSpec;
  16. TStringBuf Query;
  17. TIntrusivePtr<NKikimr::NMiniKQL::IMutableFunctionRegistry> FuncRegistry;
  18. IModuleResolver::TPtr ModuleResolver;
  19. const TUserDataTable& UserData;
  20. const THashMap<TString, TString>& Modules;
  21. TString LLVMSettings;
  22. EBlockEngineMode BlockEngineMode;
  23. IOutputStream* ExprOutputStream;
  24. NKikimr::NUdf::ICountersProvider* CountersProvider_;
  25. ETranslationMode TranslationMode_;
  26. ui16 SyntaxVersion_;
  27. ui64 NativeYtTypeFlags_;
  28. TMaybe<ui64> DeterministicTimeProviderSeed_;
  29. bool UseSystemColumns;
  30. bool UseWorkerPool;
  31. TWorkerFactoryOptions(
  32. IProgramFactoryPtr Factory,
  33. const TInputSpecBase& InputSpec,
  34. const TOutputSpecBase& OutputSpec,
  35. TStringBuf Query,
  36. TIntrusivePtr<NKikimr::NMiniKQL::IMutableFunctionRegistry> FuncRegistry,
  37. IModuleResolver::TPtr ModuleResolver,
  38. const TUserDataTable& UserData,
  39. const THashMap<TString, TString>& Modules,
  40. TString LLVMSettings,
  41. EBlockEngineMode BlockEngineMode,
  42. IOutputStream* ExprOutputStream,
  43. NKikimr::NUdf::ICountersProvider* CountersProvider,
  44. ETranslationMode translationMode,
  45. ui16 syntaxVersion,
  46. ui64 nativeYtTypeFlags,
  47. TMaybe<ui64> deterministicTimeProviderSeed,
  48. bool useSystemColumns,
  49. bool useWorkerPool
  50. )
  51. : Factory(std::move(Factory))
  52. , InputSpec(InputSpec)
  53. , OutputSpec(OutputSpec)
  54. , Query(Query)
  55. , FuncRegistry(std::move(FuncRegistry))
  56. , ModuleResolver(std::move(ModuleResolver))
  57. , UserData(UserData)
  58. , Modules(Modules)
  59. , LLVMSettings(std::move(LLVMSettings))
  60. , BlockEngineMode(BlockEngineMode)
  61. , ExprOutputStream(ExprOutputStream)
  62. , CountersProvider_(CountersProvider)
  63. , TranslationMode_(translationMode)
  64. , SyntaxVersion_(syntaxVersion)
  65. , NativeYtTypeFlags_(nativeYtTypeFlags)
  66. , DeterministicTimeProviderSeed_(deterministicTimeProviderSeed)
  67. , UseSystemColumns(useSystemColumns)
  68. , UseWorkerPool(useWorkerPool)
  69. {
  70. }
  71. };
  72. template <typename TBase>
  73. class TWorkerFactory: public TBase {
  74. private:
  75. IProgramFactoryPtr Factory_;
  76. protected:
  77. TIntrusivePtr<NKikimr::NMiniKQL::IMutableFunctionRegistry> FuncRegistry_;
  78. const TUserDataTable& UserData_;
  79. TExprContext ExprContext_;
  80. TExprNode::TPtr ExprRoot_;
  81. TString SerializedProgram_;
  82. TVector<const TStructExprType*> InputTypes_;
  83. TVector<const TStructExprType*> OriginalInputTypes_;
  84. TVector<const TStructExprType*> RawInputTypes_;
  85. const TTypeAnnotationNode* OutputType_;
  86. const TTypeAnnotationNode* RawOutputType_;
  87. TVector<THashSet<TString>> AllColumns_;
  88. TVector<THashSet<TString>> UsedColumns_;
  89. TString LLVMSettings_;
  90. EBlockEngineMode BlockEngineMode_;
  91. IOutputStream* ExprOutputStream_;
  92. NKikimr::NUdf::ICountersProvider* CountersProvider_;
  93. ui64 NativeYtTypeFlags_;
  94. TMaybe<ui64> DeterministicTimeProviderSeed_;
  95. bool UseSystemColumns_;
  96. bool UseWorkerPool_;
  97. TVector<THolder<IWorker>> WorkerPool_;
  98. public:
  99. TWorkerFactory(TWorkerFactoryOptions, EProcessorMode);
  100. public:
  101. NYT::TNode MakeInputSchema(ui32) const override;
  102. NYT::TNode MakeInputSchema() const override;
  103. NYT::TNode MakeOutputSchema() const override;
  104. NYT::TNode MakeOutputSchema(ui32) const override;
  105. NYT::TNode MakeOutputSchema(TStringBuf) const override;
  106. NYT::TNode MakeFullOutputSchema() const override;
  107. const THashSet<TString>& GetUsedColumns(ui32 inputIndex) const override;
  108. const THashSet<TString>& GetUsedColumns() const override;
  109. TIssues GetIssues() const override;
  110. TString GetCompiledProgram() override;
  111. protected:
  112. void ReturnWorker(IWorker* worker) override;
  113. private:
  114. TExprNode::TPtr Compile(TStringBuf query,
  115. ETranslationMode mode,
  116. IModuleResolver::TPtr moduleResolver,
  117. ui16 syntaxVersion,
  118. const THashMap<TString, TString>& modules,
  119. const TInputSpecBase& inputSpec,
  120. const TOutputSpecBase& outputSpec,
  121. EProcessorMode processorMode);
  122. };
  123. class TPullStreamWorkerFactory final: public TWorkerFactory<IPullStreamWorkerFactory> {
  124. public:
  125. explicit TPullStreamWorkerFactory(TWorkerFactoryOptions options)
  126. : TWorkerFactory(std::move(options), EProcessorMode::PullStream)
  127. {
  128. }
  129. public:
  130. TWorkerHolder<IPullStreamWorker> MakeWorker() override;
  131. };
  132. class TPullListWorkerFactory final: public TWorkerFactory<IPullListWorkerFactory> {
  133. public:
  134. explicit TPullListWorkerFactory(TWorkerFactoryOptions options)
  135. : TWorkerFactory(std::move(options), EProcessorMode::PullList)
  136. {
  137. }
  138. public:
  139. TWorkerHolder<IPullListWorker> MakeWorker() override;
  140. };
  141. class TPushStreamWorkerFactory final: public TWorkerFactory<IPushStreamWorkerFactory> {
  142. public:
  143. explicit TPushStreamWorkerFactory(TWorkerFactoryOptions options)
  144. : TWorkerFactory(std::move(options), EProcessorMode::PushStream)
  145. {
  146. }
  147. public:
  148. TWorkerHolder<IPushStreamWorker> MakeWorker() override;
  149. };
  150. }
  151. }