worker_factory.cpp 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535
  1. #include "worker_factory.h"
  2. #include "type_from_schema.h"
  3. #include "worker.h"
  4. #include "compile_mkql.h"
  5. #include <yql/essentials/sql/sql.h>
  6. #include <yql/essentials/ast/yql_expr.h>
  7. #include <yql/essentials/core/yql_expr_optimize.h>
  8. #include <yql/essentials/core/yql_type_helpers.h>
  9. #include <yql/essentials/core/peephole_opt/yql_opt_peephole_physical.h>
  10. #include <yql/essentials/providers/common/codec/yql_codec.h>
  11. #include <yql/essentials/providers/common/udf_resolve/yql_simple_udf_resolver.h>
  12. #include <yql/essentials/providers/common/arrow_resolve/yql_simple_arrow_resolver.h>
  13. #include <yql/essentials/providers/common/schema/expr/yql_expr_schema.h>
  14. #include <yql/essentials/providers/common/provider/yql_provider.h>
  15. #include <yql/essentials/providers/common/provider/yql_provider_names.h>
  16. #include <yql/essentials/providers/config/yql_config_provider.h>
  17. #include <yql/essentials/minikql/mkql_node.h>
  18. #include <yql/essentials/minikql/mkql_node_serialization.h>
  19. #include <yql/essentials/minikql/mkql_alloc.h>
  20. #include <yql/essentials/minikql/aligned_page_pool.h>
  21. #include <yql/essentials/core/services/yql_transform_pipeline.h>
  22. #include <yql/essentials/public/purecalc/common/names.h>
  23. #include <yql/essentials/public/purecalc/common/transformations/type_annotation.h>
  24. #include <yql/essentials/public/purecalc/common/transformations/align_output_schema.h>
  25. #include <yql/essentials/public/purecalc/common/transformations/extract_used_columns.h>
  26. #include <yql/essentials/public/purecalc/common/transformations/output_columns_filter.h>
  27. #include <yql/essentials/public/purecalc/common/transformations/replace_table_reads.h>
  28. #include <yql/essentials/public/purecalc/common/transformations/root_to_blocks.h>
  29. #include <yql/essentials/public/purecalc/common/transformations/utils.h>
  30. #include <yql/essentials/utils/log/log.h>
  31. #include <util/stream/trace.h>
  32. using namespace NYql;
  33. using namespace NYql::NPureCalc;
  34. template <typename TBase>
  35. TWorkerFactory<TBase>::TWorkerFactory(TWorkerFactoryOptions options, EProcessorMode processorMode)
  36. : Factory_(std::move(options.Factory))
  37. , FuncRegistry_(std::move(options.FuncRegistry))
  38. , UserData_(std::move(options.UserData))
  39. , LLVMSettings_(std::move(options.LLVMSettings))
  40. , BlockEngineMode_(options.BlockEngineMode)
  41. , ExprOutputStream_(options.ExprOutputStream)
  42. , CountersProvider_(options.CountersProvider_)
  43. , NativeYtTypeFlags_(options.NativeYtTypeFlags_)
  44. , DeterministicTimeProviderSeed_(options.DeterministicTimeProviderSeed_)
  45. , UseSystemColumns_(options.UseSystemColumns)
  46. , UseWorkerPool_(options.UseWorkerPool)
  47. {
  48. // Prepare input struct types and extract all column names from inputs
  49. const auto& inputSchemas = options.InputSpec.GetSchemas();
  50. const auto& allVirtualColumns = options.InputSpec.GetAllVirtualColumns();
  51. YQL_ENSURE(inputSchemas.size() == allVirtualColumns.size());
  52. const auto inputsCount = inputSchemas.size();
  53. for (ui32 i = 0; i < inputsCount; ++i) {
  54. const auto* originalInputType = MakeTypeFromSchema(inputSchemas[i], ExprContext_);
  55. if (!ValidateInputSchema(originalInputType, ExprContext_)) {
  56. ythrow TCompileError("", GetIssues().ToString()) << "invalid schema for #" << i << " input";
  57. }
  58. const auto* originalStructType = originalInputType->template Cast<TStructExprType>();
  59. const auto* structType = ExtendStructType(originalStructType, allVirtualColumns[i], ExprContext_);
  60. InputTypes_.push_back(structType);
  61. OriginalInputTypes_.push_back(originalStructType);
  62. RawInputTypes_.push_back(originalStructType);
  63. auto& columnsSet = AllColumns_.emplace_back();
  64. for (const auto* structItem : structType->GetItems()) {
  65. columnsSet.insert(TString(structItem->GetName()));
  66. if (!UseSystemColumns_ && structItem->GetName().StartsWith(PurecalcSysColumnsPrefix)) {
  67. ythrow TCompileError("", GetIssues().ToString())
  68. << "#" << i << " input provides system column " << structItem->GetName()
  69. << ", but it is forbidden by options";
  70. }
  71. }
  72. }
  73. // Prepare output type
  74. auto outputSchema = options.OutputSpec.GetSchema();
  75. if (!outputSchema.IsNull()) {
  76. OutputType_ = MakeTypeFromSchema(outputSchema, ExprContext_);
  77. if (!ValidateOutputSchema(OutputType_, ExprContext_)) {
  78. ythrow TCompileError("", GetIssues().ToString()) << "invalid output schema";
  79. }
  80. } else {
  81. OutputType_ = nullptr;
  82. }
  83. RawOutputType_ = OutputType_;
  84. // Translate
  85. if (options.TranslationMode_ == ETranslationMode::Mkql) {
  86. SerializedProgram_ = TString{options.Query};
  87. } else {
  88. ExprRoot_ = Compile(options.Query, options.TranslationMode_,
  89. options.ModuleResolver, options.SyntaxVersion_, options.Modules,
  90. options.InputSpec, options.OutputSpec, processorMode);
  91. RawOutputType_ = GetSequenceItemType(ExprRoot_->Pos(), ExprRoot_->GetTypeAnn(), true, ExprContext_);
  92. // Deduce output type if it wasn't provided by output spec
  93. if (!OutputType_) {
  94. OutputType_ = RawOutputType_;
  95. // XXX: Tweak the obtained expression type, is the spec supports blocks:
  96. // 1. Remove "_yql_block_length" attribute, since it's for internal usage.
  97. // 2. Strip block container from the type to store its internal type.
  98. if (options.OutputSpec.AcceptsBlocks()) {
  99. Y_ENSURE(OutputType_->GetKind() == ETypeAnnotationKind::Struct);
  100. OutputType_ = UnwrapBlockStruct(OutputType_->Cast<TStructExprType>(), ExprContext_);
  101. }
  102. }
  103. if (!OutputType_) {
  104. ythrow TCompileError("", GetIssues().ToString()) << "cannot deduce output schema";
  105. }
  106. }
  107. }
  108. template <typename TBase>
  109. TExprNode::TPtr TWorkerFactory<TBase>::Compile(
  110. TStringBuf query,
  111. ETranslationMode mode,
  112. IModuleResolver::TPtr factoryModuleResolver,
  113. ui16 syntaxVersion,
  114. const THashMap<TString, TString>& modules,
  115. const TInputSpecBase& inputSpec,
  116. const TOutputSpecBase& outputSpec,
  117. EProcessorMode processorMode
  118. ) {
  119. if (mode == ETranslationMode::PG && processorMode != EProcessorMode::PullList) {
  120. ythrow TCompileError("", "") << "only PullList mode is compatible to PostgreSQL syntax";
  121. }
  122. // Prepare type annotation context
  123. TTypeAnnotationContextPtr typeContext;
  124. IModuleResolver::TPtr moduleResolver = factoryModuleResolver ? factoryModuleResolver->CreateMutableChild() : nullptr;
  125. typeContext = MakeIntrusive<TTypeAnnotationContext>();
  126. typeContext->RandomProvider = CreateDefaultRandomProvider();
  127. typeContext->TimeProvider = DeterministicTimeProviderSeed_ ?
  128. CreateDeterministicTimeProvider(*DeterministicTimeProviderSeed_) :
  129. CreateDefaultTimeProvider();
  130. typeContext->UdfResolver = NCommon::CreateSimpleUdfResolver(FuncRegistry_.Get());
  131. typeContext->ArrowResolver = MakeSimpleArrowResolver(*FuncRegistry_.Get());
  132. typeContext->UserDataStorage = MakeIntrusive<TUserDataStorage>(nullptr, UserData_, nullptr, nullptr);
  133. typeContext->Modules = moduleResolver;
  134. typeContext->BlockEngineMode = BlockEngineMode_;
  135. auto configProvider = CreateConfigProvider(*typeContext, nullptr, "");
  136. typeContext->AddDataSource(ConfigProviderName, configProvider);
  137. typeContext->Initialize(ExprContext_);
  138. if (auto modules = dynamic_cast<TModuleResolver*>(moduleResolver.get())) {
  139. modules->AttachUserData(typeContext->UserDataStorage);
  140. }
  141. // Parse SQL/s-expr into AST
  142. TAstParseResult astRes;
  143. if (mode == ETranslationMode::SQL || mode == ETranslationMode::PG) {
  144. NSQLTranslation::TTranslationSettings settings;
  145. typeContext->DeprecatedSQL = (syntaxVersion == 0);
  146. if (mode == ETranslationMode::PG) {
  147. settings.PgParser = true;
  148. }
  149. settings.SyntaxVersion = syntaxVersion;
  150. settings.V0Behavior = NSQLTranslation::EV0Behavior::Disable;
  151. settings.EmitReadsForExists = true;
  152. settings.Mode = NSQLTranslation::ESqlMode::LIMITED_VIEW;
  153. settings.DefaultCluster = PurecalcDefaultCluster;
  154. settings.ClusterMapping[settings.DefaultCluster] = PurecalcDefaultService;
  155. settings.ModuleMapping = modules;
  156. settings.EnableGenericUdfs = true;
  157. settings.File = "generated.sql";
  158. settings.Flags = {
  159. "AnsiOrderByLimitInUnionAll",
  160. "AnsiRankForNullableKeys",
  161. "DisableAnsiOptionalAs",
  162. "DisableCoalesceJoinKeysOnQualifiedAll",
  163. "DisableUnorderedSubqueries",
  164. "FlexibleTypes"
  165. };
  166. if (BlockEngineMode_ != EBlockEngineMode::Disable) {
  167. settings.Flags.insert("EmitAggApply");
  168. }
  169. for (const auto& [key, block] : UserData_) {
  170. TStringBuf alias(key.Alias());
  171. if (block.Usage.Test(EUserDataBlockUsage::Library) && !alias.StartsWith("/lib")) {
  172. alias.SkipPrefix("/home/");
  173. settings.Libraries.emplace(alias);
  174. }
  175. }
  176. astRes = SqlToYql(TString(query), settings);
  177. } else {
  178. astRes = ParseAst(TString(query));
  179. }
  180. ExprContext_.IssueManager.AddIssues(astRes.Issues);
  181. if (!astRes.IsOk()) {
  182. ythrow TCompileError(TString(query), GetIssues().ToString()) << "failed to parse " << mode;
  183. }
  184. if (ETraceLevel::TRACE_DETAIL <= StdDbgLevel()) {
  185. Cdbg << "Before optimization:" << Endl;
  186. astRes.Root->PrettyPrintTo(Cdbg, TAstPrintFlags::PerLine | TAstPrintFlags::ShortQuote | TAstPrintFlags::AdaptArbitraryContent);
  187. }
  188. // Translate AST into expression
  189. TExprNode::TPtr exprRoot;
  190. if (!CompileExpr(*astRes.Root, exprRoot, ExprContext_, moduleResolver.get(), nullptr, 0, syntaxVersion)) {
  191. TStringStream astStr;
  192. astRes.Root->PrettyPrintTo(astStr, TAstPrintFlags::ShortQuote | TAstPrintFlags::PerLine);
  193. ythrow TCompileError(astStr.Str(), GetIssues().ToString()) << "failed to compile";
  194. }
  195. // Prepare transformation pipeline
  196. THolder<IGraphTransformer> calcTransformer = CreateFunctorTransformer([&](TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx)
  197. -> IGraphTransformer::TStatus
  198. {
  199. output = input;
  200. auto valueNode = input->HeadPtr();
  201. auto peepHole = MakePeepholeOptimization(typeContext);
  202. auto status = SyncTransform(*peepHole, valueNode, ctx);
  203. if (status != IGraphTransformer::TStatus::Ok) {
  204. return status;
  205. }
  206. TStringStream out;
  207. NYson::TYsonWriter writer(&out, NYson::EYsonFormat::Text, ::NYson::EYsonType::Node, true);
  208. writer.OnBeginMap();
  209. writer.OnKeyedItem("Data");
  210. TWorkerGraph graph(
  211. valueNode,
  212. ctx,
  213. {},
  214. *FuncRegistry_,
  215. UserData_,
  216. {},
  217. {},
  218. {},
  219. valueNode->GetTypeAnn(),
  220. valueNode->GetTypeAnn(),
  221. LLVMSettings_,
  222. CountersProvider_,
  223. NativeYtTypeFlags_,
  224. DeterministicTimeProviderSeed_
  225. );
  226. with_lock (graph.ScopedAlloc_) {
  227. const auto value = graph.ComputationGraph_->GetValue();
  228. NCommon::WriteYsonValue(writer, value, const_cast<NKikimr::NMiniKQL::TType*>(graph.OutputType_), nullptr);
  229. }
  230. writer.OnEndMap();
  231. auto ysonAtom = ctx.NewAtom(TPositionHandle(), out.Str());
  232. input->SetResult(std::move(ysonAtom));
  233. return IGraphTransformer::TStatus::Ok;
  234. });
  235. const TString& selfName = TString(inputSpec.ProvidesBlocks()
  236. ? PurecalcBlockInputCallableName
  237. : PurecalcInputCallableName);
  238. TTransformationPipeline pipeline(typeContext);
  239. pipeline.Add(MakeTableReadsReplacer(InputTypes_, UseSystemColumns_, processorMode, selfName),
  240. "ReplaceTableReads", EYqlIssueCode::TIssuesIds_EIssueCode_DEFAULT_ERROR,
  241. "Replace reads from tables");
  242. pipeline.AddServiceTransformers();
  243. pipeline.AddPreTypeAnnotation();
  244. pipeline.AddExpressionEvaluation(*FuncRegistry_, calcTransformer.Get());
  245. pipeline.AddIOAnnotation();
  246. pipeline.AddTypeAnnotationTransformer(MakeTypeAnnotationTransformer(typeContext, InputTypes_, RawInputTypes_, processorMode, selfName));
  247. pipeline.AddPostTypeAnnotation();
  248. pipeline.Add(CreateFunctorTransformer(
  249. [&](const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) {
  250. return OptimizeExpr(input, output, [](const TExprNode::TPtr& node, TExprContext&) -> TExprNode::TPtr {
  251. if (node->IsCallable("Unordered") && node->Child(0)->IsCallable({
  252. PurecalcInputCallableName, PurecalcBlockInputCallableName
  253. })) {
  254. return node->ChildPtr(0);
  255. }
  256. return node;
  257. }, ctx, TOptimizeExprSettings(nullptr));
  258. }), "Unordered", EYqlIssueCode::TIssuesIds_EIssueCode_DEFAULT_ERROR,
  259. "Unordered optimizations");
  260. pipeline.Add(CreateFunctorTransformer(
  261. [&](const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) {
  262. return OptimizeExpr(input, output, [](const TExprNode::TPtr& node, TExprContext&) -> TExprNode::TPtr {
  263. if (node->IsCallable("Right!") && node->Head().IsCallable("Cons!")) {
  264. return node->Head().ChildPtr(1);
  265. }
  266. return node;
  267. }, ctx, TOptimizeExprSettings(nullptr));
  268. }), "Cons", EYqlIssueCode::TIssuesIds_EIssueCode_DEFAULT_ERROR,
  269. "Cons optimizations");
  270. pipeline.Add(MakeOutputColumnsFilter(outputSpec.GetOutputColumnsFilter()),
  271. "Filter", EYqlIssueCode::TIssuesIds_EIssueCode_DEFAULT_ERROR,
  272. "Filter output columns");
  273. pipeline.Add(MakeRootToBlocks(outputSpec.AcceptsBlocks(), processorMode),
  274. "RootToBlocks", EYqlIssueCode::TIssuesIds_EIssueCode_DEFAULT_ERROR,
  275. "Rewrite the root if the output spec accepts blocks");
  276. pipeline.Add(MakeOutputAligner(OutputType_, outputSpec.AcceptsBlocks(), processorMode),
  277. "Convert", EYqlIssueCode::TIssuesIds_EIssueCode_DEFAULT_ERROR,
  278. "Align return type of the program to output schema");
  279. pipeline.AddCommonOptimization();
  280. pipeline.AddFinalCommonOptimization();
  281. pipeline.Add(MakeUsedColumnsExtractor(&UsedColumns_, AllColumns_),
  282. "ExtractColumns", EYqlIssueCode::TIssuesIds_EIssueCode_DEFAULT_ERROR,
  283. "Extract used columns");
  284. pipeline.Add(MakePeepholeOptimization(typeContext),
  285. "PeepHole", EYqlIssueCode::TIssuesIds_EIssueCode_DEFAULT_ERROR,
  286. "Peephole optimizations");
  287. pipeline.AddCheckExecution(false);
  288. // Apply optimizations
  289. auto transformer = pipeline.Build();
  290. auto status = SyncTransform(*transformer, exprRoot, ExprContext_);
  291. auto transformStats = transformer->GetStatistics();
  292. TStringStream out;
  293. NYson::TYsonWriter writer(&out, NYson::EYsonFormat::Pretty);
  294. NCommon::TransformerStatsToYson("", transformStats, writer);
  295. YQL_CLOG(DEBUG, Core) << "Transform stats: " << out.Str();
  296. if (status == IGraphTransformer::TStatus::Error) {
  297. ythrow TCompileError("", GetIssues().ToString()) << "Failed to optimize";
  298. }
  299. IOutputStream* exprOut = nullptr;
  300. if (ExprOutputStream_) {
  301. exprOut = ExprOutputStream_;
  302. } else if (ETraceLevel::TRACE_DETAIL <= StdDbgLevel()) {
  303. exprOut = &Cdbg;
  304. }
  305. if (exprOut) {
  306. *exprOut << "After optimization:" << Endl;
  307. ConvertToAst(*exprRoot, ExprContext_, 0, true).Root
  308. ->PrettyPrintTo(*exprOut, TAstPrintFlags::PerLine
  309. | TAstPrintFlags::ShortQuote
  310. | TAstPrintFlags::AdaptArbitraryContent);
  311. }
  312. return exprRoot;
  313. }
  314. template <typename TBase>
  315. NYT::TNode TWorkerFactory<TBase>::MakeInputSchema(ui32 inputIndex) const {
  316. Y_ENSURE(
  317. inputIndex < InputTypes_.size(),
  318. "invalid input index (" << inputIndex << ") in MakeInputSchema call");
  319. return NCommon::TypeToYsonNode(InputTypes_[inputIndex]);
  320. }
  321. template <typename TBase>
  322. NYT::TNode TWorkerFactory<TBase>::MakeInputSchema() const {
  323. Y_ENSURE(
  324. InputTypes_.size() == 1,
  325. "MakeInputSchema() can be used only with single-input programs");
  326. return NCommon::TypeToYsonNode(InputTypes_[0]);
  327. }
  328. template <typename TBase>
  329. NYT::TNode TWorkerFactory<TBase>::MakeOutputSchema() const {
  330. Y_ENSURE(OutputType_, "MakeOutputSchema() cannot be used with precompiled programs");
  331. Y_ENSURE(
  332. OutputType_->GetKind() == ETypeAnnotationKind::Struct,
  333. "MakeOutputSchema() cannot be used with multi-output programs");
  334. return NCommon::TypeToYsonNode(OutputType_);
  335. }
  336. template <typename TBase>
  337. NYT::TNode TWorkerFactory<TBase>::MakeOutputSchema(ui32 index) const {
  338. Y_ENSURE(OutputType_, "MakeOutputSchema() cannot be used with precompiled programs");
  339. Y_ENSURE(
  340. OutputType_->GetKind() == ETypeAnnotationKind::Variant,
  341. "MakeOutputSchema(ui32) cannot be used with single-output programs");
  342. auto vtype = OutputType_->template Cast<TVariantExprType>();
  343. Y_ENSURE(
  344. vtype->GetUnderlyingType()->GetKind() == ETypeAnnotationKind::Tuple,
  345. "MakeOutputSchema(ui32) cannot be used to process variants over struct");
  346. auto ttype = vtype->GetUnderlyingType()->template Cast<TTupleExprType>();
  347. Y_ENSURE(
  348. index < ttype->GetSize(),
  349. "Invalid table index " << index);
  350. return NCommon::TypeToYsonNode(ttype->GetItems()[index]);
  351. }
  352. template <typename TBase>
  353. NYT::TNode TWorkerFactory<TBase>::MakeOutputSchema(TStringBuf tableName) const {
  354. Y_ENSURE(OutputType_, "MakeOutputSchema() cannot be used with precompiled programs");
  355. Y_ENSURE(
  356. OutputType_->GetKind() == ETypeAnnotationKind::Variant,
  357. "MakeOutputSchema(TStringBuf) cannot be used with single-output programs");
  358. auto vtype = OutputType_->template Cast<TVariantExprType>();
  359. Y_ENSURE(
  360. vtype->GetUnderlyingType()->GetKind() == ETypeAnnotationKind::Struct,
  361. "MakeOutputSchema(TStringBuf) cannot be used to process variants over tuple");
  362. auto stype = vtype->GetUnderlyingType()->template Cast<TStructExprType>();
  363. auto index = stype->FindItem(tableName);
  364. Y_ENSURE(
  365. index.Defined(),
  366. "Invalid table index " << TString{tableName}.Quote());
  367. return NCommon::TypeToYsonNode(stype->GetItems()[*index]->GetItemType());
  368. }
  369. template <typename TBase>
  370. NYT::TNode TWorkerFactory<TBase>::MakeFullOutputSchema() const {
  371. Y_ENSURE(OutputType_, "MakeFullOutputSchema() cannot be used with precompiled programs");
  372. return NCommon::TypeToYsonNode(OutputType_);
  373. }
  374. template <typename TBase>
  375. const THashSet<TString>& TWorkerFactory<TBase>::GetUsedColumns(ui32 inputIndex) const {
  376. Y_ENSURE(
  377. inputIndex < UsedColumns_.size(),
  378. "invalid input index (" << inputIndex << ") in GetUsedColumns call");
  379. return UsedColumns_[inputIndex];
  380. }
  381. template <typename TBase>
  382. const THashSet<TString>& TWorkerFactory<TBase>::GetUsedColumns() const {
  383. Y_ENSURE(
  384. UsedColumns_.size() == 1,
  385. "GetUsedColumns() can be used only with single-input programs");
  386. return UsedColumns_[0];
  387. }
  388. template <typename TBase>
  389. TIssues TWorkerFactory<TBase>::GetIssues() const {
  390. auto issues = ExprContext_.IssueManager.GetCompletedIssues();
  391. CheckFatalIssues(issues);
  392. return issues;
  393. }
  394. template <typename TBase>
  395. TString TWorkerFactory<TBase>::GetCompiledProgram() {
  396. if (ExprRoot_) {
  397. NKikimr::NMiniKQL::TScopedAlloc alloc(__LOCATION__, NKikimr::TAlignedPagePoolCounters(),
  398. FuncRegistry_->SupportsSizedAllocators());
  399. NKikimr::NMiniKQL::TTypeEnvironment env(alloc);
  400. auto rootNode = CompileMkql(ExprRoot_, ExprContext_, *FuncRegistry_, env, UserData_);
  401. return NKikimr::NMiniKQL::SerializeRuntimeNode(rootNode, env);
  402. }
  403. return SerializedProgram_;
  404. }
  405. template <typename TBase>
  406. void TWorkerFactory<TBase>::ReturnWorker(IWorker* worker) {
  407. THolder<IWorker> tmp(worker);
  408. if (UseWorkerPool_) {
  409. WorkerPool_.push_back(std::move(tmp));
  410. }
  411. }
  412. #define DEFINE_WORKER_MAKER(MODE) \
  413. TWorkerHolder<I##MODE##Worker> T##MODE##WorkerFactory::MakeWorker() { \
  414. if (!WorkerPool_.empty()) { \
  415. auto res = std::move(WorkerPool_.back()); \
  416. WorkerPool_.pop_back(); \
  417. return TWorkerHolder<I##MODE##Worker>((I##MODE##Worker *)res.Release()); \
  418. } \
  419. return TWorkerHolder<I##MODE##Worker>(new T##MODE##Worker( \
  420. weak_from_this(), \
  421. ExprRoot_, \
  422. ExprContext_, \
  423. SerializedProgram_, \
  424. *FuncRegistry_, \
  425. UserData_, \
  426. InputTypes_, \
  427. OriginalInputTypes_, \
  428. RawInputTypes_, \
  429. OutputType_, \
  430. RawOutputType_, \
  431. LLVMSettings_, \
  432. CountersProvider_, \
  433. NativeYtTypeFlags_, \
  434. DeterministicTimeProviderSeed_ \
  435. )); \
  436. }
  437. DEFINE_WORKER_MAKER(PullStream)
  438. DEFINE_WORKER_MAKER(PullList)
  439. DEFINE_WORKER_MAKER(PushStream)
  440. namespace NYql {
  441. namespace NPureCalc {
  442. template
  443. class TWorkerFactory<IPullStreamWorkerFactory>;
  444. template
  445. class TWorkerFactory<IPullListWorkerFactory>;
  446. template
  447. class TWorkerFactory<IPushStreamWorkerFactory>;
  448. }
  449. }