worker_factory.cpp 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537
  1. #include "worker_factory.h"
  2. #include "type_from_schema.h"
  3. #include "worker.h"
  4. #include "compile_mkql.h"
  5. #include <yql/essentials/sql/sql.h>
  6. #include <yql/essentials/ast/yql_expr.h>
  7. #include <yql/essentials/core/yql_expr_optimize.h>
  8. #include <yql/essentials/core/yql_type_helpers.h>
  9. #include <yql/essentials/core/peephole_opt/yql_opt_peephole_physical.h>
  10. #include <yql/essentials/providers/common/codec/yql_codec.h>
  11. #include <yql/essentials/providers/common/udf_resolve/yql_simple_udf_resolver.h>
  12. #include <yql/essentials/providers/common/arrow_resolve/yql_simple_arrow_resolver.h>
  13. #include <yql/essentials/providers/common/schema/expr/yql_expr_schema.h>
  14. #include <yql/essentials/providers/common/provider/yql_provider.h>
  15. #include <yql/essentials/providers/common/provider/yql_provider_names.h>
  16. #include <yql/essentials/providers/config/yql_config_provider.h>
  17. #include <yql/essentials/minikql/mkql_node.h>
  18. #include <yql/essentials/minikql/mkql_node_serialization.h>
  19. #include <yql/essentials/minikql/mkql_alloc.h>
  20. #include <yql/essentials/minikql/aligned_page_pool.h>
  21. #include <yql/essentials/core/services/yql_transform_pipeline.h>
  22. #include <yql/essentials/public/purecalc/common/names.h>
  23. #include <yql/essentials/public/purecalc/common/transformations/type_annotation.h>
  24. #include <yql/essentials/public/purecalc/common/transformations/align_output_schema.h>
  25. #include <yql/essentials/public/purecalc/common/transformations/extract_used_columns.h>
  26. #include <yql/essentials/public/purecalc/common/transformations/output_columns_filter.h>
  27. #include <yql/essentials/public/purecalc/common/transformations/replace_table_reads.h>
  28. #include <yql/essentials/public/purecalc/common/transformations/root_to_blocks.h>
  29. #include <yql/essentials/public/purecalc/common/transformations/utils.h>
  30. #include <yql/essentials/utils/log/log.h>
  31. #include <util/stream/trace.h>
  32. using namespace NYql;
  33. using namespace NYql::NPureCalc;
  34. template <typename TBase>
  35. TWorkerFactory<TBase>::TWorkerFactory(TWorkerFactoryOptions options, EProcessorMode processorMode)
  36. : Factory_(std::move(options.Factory))
  37. , FuncRegistry_(std::move(options.FuncRegistry))
  38. , UserData_(std::move(options.UserData))
  39. , LLVMSettings_(std::move(options.LLVMSettings))
  40. , BlockEngineMode_(options.BlockEngineMode)
  41. , ExprOutputStream_(options.ExprOutputStream)
  42. , CountersProvider_(options.CountersProvider_)
  43. , NativeYtTypeFlags_(options.NativeYtTypeFlags_)
  44. , DeterministicTimeProviderSeed_(options.DeterministicTimeProviderSeed_)
  45. , UseSystemColumns_(options.UseSystemColumns)
  46. , UseWorkerPool_(options.UseWorkerPool)
  47. {
  48. // Prepare input struct types and extract all column names from inputs
  49. const auto& inputSchemas = options.InputSpec.GetSchemas();
  50. const auto& allVirtualColumns = options.InputSpec.GetAllVirtualColumns();
  51. YQL_ENSURE(inputSchemas.size() == allVirtualColumns.size());
  52. const auto inputsCount = inputSchemas.size();
  53. for (ui32 i = 0; i < inputsCount; ++i) {
  54. const auto* originalInputType = MakeTypeFromSchema(inputSchemas[i], ExprContext_);
  55. if (!ValidateInputSchema(originalInputType, ExprContext_)) {
  56. ythrow TCompileError("", GetIssues().ToString()) << "invalid schema for #" << i << " input";
  57. }
  58. const auto* originalStructType = originalInputType->template Cast<TStructExprType>();
  59. const auto* structType = ExtendStructType(originalStructType, allVirtualColumns[i], ExprContext_);
  60. InputTypes_.push_back(structType);
  61. OriginalInputTypes_.push_back(originalStructType);
  62. RawInputTypes_.push_back(originalStructType);
  63. auto& columnsSet = AllColumns_.emplace_back();
  64. for (const auto* structItem : structType->GetItems()) {
  65. columnsSet.insert(TString(structItem->GetName()));
  66. if (!UseSystemColumns_ && structItem->GetName().StartsWith(PurecalcSysColumnsPrefix)) {
  67. ythrow TCompileError("", GetIssues().ToString())
  68. << "#" << i << " input provides system column " << structItem->GetName()
  69. << ", but it is forbidden by options";
  70. }
  71. }
  72. }
  73. // Prepare output type
  74. auto outputSchema = options.OutputSpec.GetSchema();
  75. if (!outputSchema.IsNull()) {
  76. OutputType_ = MakeTypeFromSchema(outputSchema, ExprContext_);
  77. if (!ValidateOutputSchema(OutputType_, ExprContext_)) {
  78. ythrow TCompileError("", GetIssues().ToString()) << "invalid output schema";
  79. }
  80. } else {
  81. OutputType_ = nullptr;
  82. }
  83. RawOutputType_ = OutputType_;
  84. // Translate
  85. if (options.TranslationMode_ == ETranslationMode::Mkql) {
  86. SerializedProgram_ = TString{options.Query};
  87. } else {
  88. ExprRoot_ = Compile(options.Query, options.TranslationMode_,
  89. options.ModuleResolver, options.SyntaxVersion_, options.Modules,
  90. options.InputSpec, options.OutputSpec, options.UseAntlr4, processorMode);
  91. RawOutputType_ = GetSequenceItemType(ExprRoot_->Pos(), ExprRoot_->GetTypeAnn(), true, ExprContext_);
  92. // Deduce output type if it wasn't provided by output spec
  93. if (!OutputType_) {
  94. OutputType_ = RawOutputType_;
  95. // XXX: Tweak the obtained expression type, is the spec supports blocks:
  96. // 1. Remove "_yql_block_length" attribute, since it's for internal usage.
  97. // 2. Strip block container from the type to store its internal type.
  98. if (options.OutputSpec.AcceptsBlocks()) {
  99. Y_ENSURE(OutputType_->GetKind() == ETypeAnnotationKind::Struct);
  100. OutputType_ = UnwrapBlockStruct(OutputType_->Cast<TStructExprType>(), ExprContext_);
  101. }
  102. }
  103. if (!OutputType_) {
  104. ythrow TCompileError("", GetIssues().ToString()) << "cannot deduce output schema";
  105. }
  106. }
  107. }
  108. template <typename TBase>
  109. TExprNode::TPtr TWorkerFactory<TBase>::Compile(
  110. TStringBuf query,
  111. ETranslationMode mode,
  112. IModuleResolver::TPtr factoryModuleResolver,
  113. ui16 syntaxVersion,
  114. const THashMap<TString, TString>& modules,
  115. const TInputSpecBase& inputSpec,
  116. const TOutputSpecBase& outputSpec,
  117. bool useAntlr4,
  118. EProcessorMode processorMode
  119. ) {
  120. if (mode == ETranslationMode::PG && processorMode != EProcessorMode::PullList) {
  121. ythrow TCompileError("", "") << "only PullList mode is compatible to PostgreSQL syntax";
  122. }
  123. // Prepare type annotation context
  124. TTypeAnnotationContextPtr typeContext;
  125. IModuleResolver::TPtr moduleResolver = factoryModuleResolver ? factoryModuleResolver->CreateMutableChild() : nullptr;
  126. typeContext = MakeIntrusive<TTypeAnnotationContext>();
  127. typeContext->RandomProvider = CreateDefaultRandomProvider();
  128. typeContext->TimeProvider = DeterministicTimeProviderSeed_ ?
  129. CreateDeterministicTimeProvider(*DeterministicTimeProviderSeed_) :
  130. CreateDefaultTimeProvider();
  131. typeContext->UdfResolver = NCommon::CreateSimpleUdfResolver(FuncRegistry_.Get());
  132. typeContext->ArrowResolver = MakeSimpleArrowResolver(*FuncRegistry_.Get());
  133. typeContext->UserDataStorage = MakeIntrusive<TUserDataStorage>(nullptr, UserData_, nullptr, nullptr);
  134. typeContext->Modules = moduleResolver;
  135. typeContext->BlockEngineMode = BlockEngineMode_;
  136. auto configProvider = CreateConfigProvider(*typeContext, nullptr, "");
  137. typeContext->AddDataSource(ConfigProviderName, configProvider);
  138. typeContext->Initialize(ExprContext_);
  139. if (auto modules = dynamic_cast<TModuleResolver*>(moduleResolver.get())) {
  140. modules->AttachUserData(typeContext->UserDataStorage);
  141. }
  142. // Parse SQL/s-expr into AST
  143. TAstParseResult astRes;
  144. if (mode == ETranslationMode::SQL || mode == ETranslationMode::PG) {
  145. NSQLTranslation::TTranslationSettings settings;
  146. typeContext->DeprecatedSQL = (syntaxVersion == 0);
  147. if (mode == ETranslationMode::PG) {
  148. settings.PgParser = true;
  149. }
  150. settings.SyntaxVersion = syntaxVersion;
  151. settings.V0Behavior = NSQLTranslation::EV0Behavior::Disable;
  152. settings.EmitReadsForExists = true;
  153. settings.Antlr4Parser = useAntlr4;
  154. settings.Mode = NSQLTranslation::ESqlMode::LIMITED_VIEW;
  155. settings.DefaultCluster = PurecalcDefaultCluster;
  156. settings.ClusterMapping[settings.DefaultCluster] = PurecalcDefaultService;
  157. settings.ModuleMapping = modules;
  158. settings.EnableGenericUdfs = true;
  159. settings.File = "generated.sql";
  160. settings.Flags = {
  161. "AnsiOrderByLimitInUnionAll",
  162. "AnsiRankForNullableKeys",
  163. "DisableAnsiOptionalAs",
  164. "DisableCoalesceJoinKeysOnQualifiedAll",
  165. "DisableUnorderedSubqueries",
  166. "FlexibleTypes"
  167. };
  168. if (BlockEngineMode_ != EBlockEngineMode::Disable) {
  169. settings.Flags.insert("EmitAggApply");
  170. }
  171. for (const auto& [key, block] : UserData_) {
  172. TStringBuf alias(key.Alias());
  173. if (block.Usage.Test(EUserDataBlockUsage::Library) && !alias.StartsWith("/lib")) {
  174. alias.SkipPrefix("/home/");
  175. settings.Libraries.emplace(alias);
  176. }
  177. }
  178. astRes = SqlToYql(TString(query), settings);
  179. } else {
  180. astRes = ParseAst(TString(query));
  181. }
  182. ExprContext_.IssueManager.AddIssues(astRes.Issues);
  183. if (!astRes.IsOk()) {
  184. ythrow TCompileError(TString(query), GetIssues().ToString()) << "failed to parse " << mode;
  185. }
  186. if (ETraceLevel::TRACE_DETAIL <= StdDbgLevel()) {
  187. Cdbg << "Before optimization:" << Endl;
  188. astRes.Root->PrettyPrintTo(Cdbg, TAstPrintFlags::PerLine | TAstPrintFlags::ShortQuote | TAstPrintFlags::AdaptArbitraryContent);
  189. }
  190. // Translate AST into expression
  191. TExprNode::TPtr exprRoot;
  192. if (!CompileExpr(*astRes.Root, exprRoot, ExprContext_, moduleResolver.get(), nullptr, 0, syntaxVersion)) {
  193. TStringStream astStr;
  194. astRes.Root->PrettyPrintTo(astStr, TAstPrintFlags::ShortQuote | TAstPrintFlags::PerLine);
  195. ythrow TCompileError(astStr.Str(), GetIssues().ToString()) << "failed to compile";
  196. }
  197. // Prepare transformation pipeline
  198. THolder<IGraphTransformer> calcTransformer = CreateFunctorTransformer([&](TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx)
  199. -> IGraphTransformer::TStatus
  200. {
  201. output = input;
  202. auto valueNode = input->HeadPtr();
  203. auto peepHole = MakePeepholeOptimization(typeContext);
  204. auto status = SyncTransform(*peepHole, valueNode, ctx);
  205. if (status != IGraphTransformer::TStatus::Ok) {
  206. return status;
  207. }
  208. TStringStream out;
  209. NYson::TYsonWriter writer(&out, NYson::EYsonFormat::Text, ::NYson::EYsonType::Node, true);
  210. writer.OnBeginMap();
  211. writer.OnKeyedItem("Data");
  212. TWorkerGraph graph(
  213. valueNode,
  214. ctx,
  215. {},
  216. *FuncRegistry_,
  217. UserData_,
  218. {},
  219. {},
  220. {},
  221. valueNode->GetTypeAnn(),
  222. valueNode->GetTypeAnn(),
  223. LLVMSettings_,
  224. CountersProvider_,
  225. NativeYtTypeFlags_,
  226. DeterministicTimeProviderSeed_
  227. );
  228. with_lock (graph.ScopedAlloc_) {
  229. const auto value = graph.ComputationGraph_->GetValue();
  230. NCommon::WriteYsonValue(writer, value, const_cast<NKikimr::NMiniKQL::TType*>(graph.OutputType_), nullptr);
  231. }
  232. writer.OnEndMap();
  233. auto ysonAtom = ctx.NewAtom(TPositionHandle(), out.Str());
  234. input->SetResult(std::move(ysonAtom));
  235. return IGraphTransformer::TStatus::Ok;
  236. });
  237. const TString& selfName = TString(inputSpec.ProvidesBlocks()
  238. ? PurecalcBlockInputCallableName
  239. : PurecalcInputCallableName);
  240. TTransformationPipeline pipeline(typeContext);
  241. pipeline.Add(MakeTableReadsReplacer(InputTypes_, UseSystemColumns_, processorMode, selfName),
  242. "ReplaceTableReads", EYqlIssueCode::TIssuesIds_EIssueCode_DEFAULT_ERROR,
  243. "Replace reads from tables");
  244. pipeline.AddServiceTransformers();
  245. pipeline.AddPreTypeAnnotation();
  246. pipeline.AddExpressionEvaluation(*FuncRegistry_, calcTransformer.Get());
  247. pipeline.AddIOAnnotation();
  248. pipeline.AddTypeAnnotationTransformer(MakeTypeAnnotationTransformer(typeContext, InputTypes_, RawInputTypes_, processorMode, selfName));
  249. pipeline.AddPostTypeAnnotation();
  250. pipeline.Add(CreateFunctorTransformer(
  251. [&](const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) {
  252. return OptimizeExpr(input, output, [](const TExprNode::TPtr& node, TExprContext&) -> TExprNode::TPtr {
  253. if (node->IsCallable("Unordered") && node->Child(0)->IsCallable({
  254. PurecalcInputCallableName, PurecalcBlockInputCallableName
  255. })) {
  256. return node->ChildPtr(0);
  257. }
  258. return node;
  259. }, ctx, TOptimizeExprSettings(nullptr));
  260. }), "Unordered", EYqlIssueCode::TIssuesIds_EIssueCode_DEFAULT_ERROR,
  261. "Unordered optimizations");
  262. pipeline.Add(CreateFunctorTransformer(
  263. [&](const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) {
  264. return OptimizeExpr(input, output, [](const TExprNode::TPtr& node, TExprContext&) -> TExprNode::TPtr {
  265. if (node->IsCallable("Right!") && node->Head().IsCallable("Cons!")) {
  266. return node->Head().ChildPtr(1);
  267. }
  268. return node;
  269. }, ctx, TOptimizeExprSettings(nullptr));
  270. }), "Cons", EYqlIssueCode::TIssuesIds_EIssueCode_DEFAULT_ERROR,
  271. "Cons optimizations");
  272. pipeline.Add(MakeOutputColumnsFilter(outputSpec.GetOutputColumnsFilter()),
  273. "Filter", EYqlIssueCode::TIssuesIds_EIssueCode_DEFAULT_ERROR,
  274. "Filter output columns");
  275. pipeline.Add(MakeRootToBlocks(outputSpec.AcceptsBlocks(), processorMode),
  276. "RootToBlocks", EYqlIssueCode::TIssuesIds_EIssueCode_DEFAULT_ERROR,
  277. "Rewrite the root if the output spec accepts blocks");
  278. pipeline.Add(MakeOutputAligner(OutputType_, outputSpec.AcceptsBlocks(), processorMode),
  279. "Convert", EYqlIssueCode::TIssuesIds_EIssueCode_DEFAULT_ERROR,
  280. "Align return type of the program to output schema");
  281. pipeline.AddCommonOptimization();
  282. pipeline.AddFinalCommonOptimization();
  283. pipeline.Add(MakeUsedColumnsExtractor(&UsedColumns_, AllColumns_),
  284. "ExtractColumns", EYqlIssueCode::TIssuesIds_EIssueCode_DEFAULT_ERROR,
  285. "Extract used columns");
  286. pipeline.Add(MakePeepholeOptimization(typeContext),
  287. "PeepHole", EYqlIssueCode::TIssuesIds_EIssueCode_DEFAULT_ERROR,
  288. "Peephole optimizations");
  289. pipeline.AddCheckExecution(false);
  290. // Apply optimizations
  291. auto transformer = pipeline.Build();
  292. auto status = SyncTransform(*transformer, exprRoot, ExprContext_);
  293. auto transformStats = transformer->GetStatistics();
  294. TStringStream out;
  295. NYson::TYsonWriter writer(&out, NYson::EYsonFormat::Pretty);
  296. NCommon::TransformerStatsToYson("", transformStats, writer);
  297. YQL_CLOG(DEBUG, Core) << "Transform stats: " << out.Str();
  298. if (status == IGraphTransformer::TStatus::Error) {
  299. ythrow TCompileError("", GetIssues().ToString()) << "Failed to optimize";
  300. }
  301. IOutputStream* exprOut = nullptr;
  302. if (ExprOutputStream_) {
  303. exprOut = ExprOutputStream_;
  304. } else if (ETraceLevel::TRACE_DETAIL <= StdDbgLevel()) {
  305. exprOut = &Cdbg;
  306. }
  307. if (exprOut) {
  308. *exprOut << "After optimization:" << Endl;
  309. ConvertToAst(*exprRoot, ExprContext_, 0, true).Root
  310. ->PrettyPrintTo(*exprOut, TAstPrintFlags::PerLine
  311. | TAstPrintFlags::ShortQuote
  312. | TAstPrintFlags::AdaptArbitraryContent);
  313. }
  314. return exprRoot;
  315. }
  316. template <typename TBase>
  317. NYT::TNode TWorkerFactory<TBase>::MakeInputSchema(ui32 inputIndex) const {
  318. Y_ENSURE(
  319. inputIndex < InputTypes_.size(),
  320. "invalid input index (" << inputIndex << ") in MakeInputSchema call");
  321. return NCommon::TypeToYsonNode(InputTypes_[inputIndex]);
  322. }
  323. template <typename TBase>
  324. NYT::TNode TWorkerFactory<TBase>::MakeInputSchema() const {
  325. Y_ENSURE(
  326. InputTypes_.size() == 1,
  327. "MakeInputSchema() can be used only with single-input programs");
  328. return NCommon::TypeToYsonNode(InputTypes_[0]);
  329. }
  330. template <typename TBase>
  331. NYT::TNode TWorkerFactory<TBase>::MakeOutputSchema() const {
  332. Y_ENSURE(OutputType_, "MakeOutputSchema() cannot be used with precompiled programs");
  333. Y_ENSURE(
  334. OutputType_->GetKind() == ETypeAnnotationKind::Struct,
  335. "MakeOutputSchema() cannot be used with multi-output programs");
  336. return NCommon::TypeToYsonNode(OutputType_);
  337. }
  338. template <typename TBase>
  339. NYT::TNode TWorkerFactory<TBase>::MakeOutputSchema(ui32 index) const {
  340. Y_ENSURE(OutputType_, "MakeOutputSchema() cannot be used with precompiled programs");
  341. Y_ENSURE(
  342. OutputType_->GetKind() == ETypeAnnotationKind::Variant,
  343. "MakeOutputSchema(ui32) cannot be used with single-output programs");
  344. auto vtype = OutputType_->template Cast<TVariantExprType>();
  345. Y_ENSURE(
  346. vtype->GetUnderlyingType()->GetKind() == ETypeAnnotationKind::Tuple,
  347. "MakeOutputSchema(ui32) cannot be used to process variants over struct");
  348. auto ttype = vtype->GetUnderlyingType()->template Cast<TTupleExprType>();
  349. Y_ENSURE(
  350. index < ttype->GetSize(),
  351. "Invalid table index " << index);
  352. return NCommon::TypeToYsonNode(ttype->GetItems()[index]);
  353. }
  354. template <typename TBase>
  355. NYT::TNode TWorkerFactory<TBase>::MakeOutputSchema(TStringBuf tableName) const {
  356. Y_ENSURE(OutputType_, "MakeOutputSchema() cannot be used with precompiled programs");
  357. Y_ENSURE(
  358. OutputType_->GetKind() == ETypeAnnotationKind::Variant,
  359. "MakeOutputSchema(TStringBuf) cannot be used with single-output programs");
  360. auto vtype = OutputType_->template Cast<TVariantExprType>();
  361. Y_ENSURE(
  362. vtype->GetUnderlyingType()->GetKind() == ETypeAnnotationKind::Struct,
  363. "MakeOutputSchema(TStringBuf) cannot be used to process variants over tuple");
  364. auto stype = vtype->GetUnderlyingType()->template Cast<TStructExprType>();
  365. auto index = stype->FindItem(tableName);
  366. Y_ENSURE(
  367. index.Defined(),
  368. "Invalid table index " << TString{tableName}.Quote());
  369. return NCommon::TypeToYsonNode(stype->GetItems()[*index]->GetItemType());
  370. }
  371. template <typename TBase>
  372. NYT::TNode TWorkerFactory<TBase>::MakeFullOutputSchema() const {
  373. Y_ENSURE(OutputType_, "MakeFullOutputSchema() cannot be used with precompiled programs");
  374. return NCommon::TypeToYsonNode(OutputType_);
  375. }
  376. template <typename TBase>
  377. const THashSet<TString>& TWorkerFactory<TBase>::GetUsedColumns(ui32 inputIndex) const {
  378. Y_ENSURE(
  379. inputIndex < UsedColumns_.size(),
  380. "invalid input index (" << inputIndex << ") in GetUsedColumns call");
  381. return UsedColumns_[inputIndex];
  382. }
  383. template <typename TBase>
  384. const THashSet<TString>& TWorkerFactory<TBase>::GetUsedColumns() const {
  385. Y_ENSURE(
  386. UsedColumns_.size() == 1,
  387. "GetUsedColumns() can be used only with single-input programs");
  388. return UsedColumns_[0];
  389. }
  390. template <typename TBase>
  391. TIssues TWorkerFactory<TBase>::GetIssues() const {
  392. auto issues = ExprContext_.IssueManager.GetCompletedIssues();
  393. CheckFatalIssues(issues);
  394. return issues;
  395. }
  396. template <typename TBase>
  397. TString TWorkerFactory<TBase>::GetCompiledProgram() {
  398. if (ExprRoot_) {
  399. NKikimr::NMiniKQL::TScopedAlloc alloc(__LOCATION__, NKikimr::TAlignedPagePoolCounters(),
  400. FuncRegistry_->SupportsSizedAllocators());
  401. NKikimr::NMiniKQL::TTypeEnvironment env(alloc);
  402. auto rootNode = CompileMkql(ExprRoot_, ExprContext_, *FuncRegistry_, env, UserData_);
  403. return NKikimr::NMiniKQL::SerializeRuntimeNode(rootNode, env);
  404. }
  405. return SerializedProgram_;
  406. }
  407. template <typename TBase>
  408. void TWorkerFactory<TBase>::ReturnWorker(IWorker* worker) {
  409. THolder<IWorker> tmp(worker);
  410. if (UseWorkerPool_) {
  411. WorkerPool_.push_back(std::move(tmp));
  412. }
  413. }
  414. #define DEFINE_WORKER_MAKER(MODE) \
  415. TWorkerHolder<I##MODE##Worker> T##MODE##WorkerFactory::MakeWorker() { \
  416. if (!WorkerPool_.empty()) { \
  417. auto res = std::move(WorkerPool_.back()); \
  418. WorkerPool_.pop_back(); \
  419. return TWorkerHolder<I##MODE##Worker>((I##MODE##Worker *)res.Release()); \
  420. } \
  421. return TWorkerHolder<I##MODE##Worker>(new T##MODE##Worker( \
  422. weak_from_this(), \
  423. ExprRoot_, \
  424. ExprContext_, \
  425. SerializedProgram_, \
  426. *FuncRegistry_, \
  427. UserData_, \
  428. InputTypes_, \
  429. OriginalInputTypes_, \
  430. RawInputTypes_, \
  431. OutputType_, \
  432. RawOutputType_, \
  433. LLVMSettings_, \
  434. CountersProvider_, \
  435. NativeYtTypeFlags_, \
  436. DeterministicTimeProviderSeed_ \
  437. )); \
  438. }
  439. DEFINE_WORKER_MAKER(PullStream)
  440. DEFINE_WORKER_MAKER(PullList)
  441. DEFINE_WORKER_MAKER(PushStream)
  442. namespace NYql {
  443. namespace NPureCalc {
  444. template
  445. class TWorkerFactory<IPullStreamWorkerFactory>;
  446. template
  447. class TWorkerFactory<IPullListWorkerFactory>;
  448. template
  449. class TWorkerFactory<IPushStreamWorkerFactory>;
  450. }
  451. }