worker_factory.cpp 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545
  1. #include "worker_factory.h"
  2. #include "type_from_schema.h"
  3. #include "worker.h"
  4. #include "compile_mkql.h"
  5. #include <yql/essentials/sql/sql.h>
  6. #include <yql/essentials/sql/v1/sql.h>
  7. #include <yql/essentials/parser/pg_wrapper/interface/parser.h>
  8. #include <yql/essentials/ast/yql_expr.h>
  9. #include <yql/essentials/core/yql_expr_optimize.h>
  10. #include <yql/essentials/core/yql_type_helpers.h>
  11. #include <yql/essentials/core/peephole_opt/yql_opt_peephole_physical.h>
  12. #include <yql/essentials/providers/common/codec/yql_codec.h>
  13. #include <yql/essentials/providers/common/udf_resolve/yql_simple_udf_resolver.h>
  14. #include <yql/essentials/providers/common/arrow_resolve/yql_simple_arrow_resolver.h>
  15. #include <yql/essentials/providers/common/schema/expr/yql_expr_schema.h>
  16. #include <yql/essentials/providers/common/provider/yql_provider.h>
  17. #include <yql/essentials/providers/common/provider/yql_provider_names.h>
  18. #include <yql/essentials/providers/config/yql_config_provider.h>
  19. #include <yql/essentials/minikql/mkql_node.h>
  20. #include <yql/essentials/minikql/mkql_node_serialization.h>
  21. #include <yql/essentials/minikql/mkql_alloc.h>
  22. #include <yql/essentials/minikql/aligned_page_pool.h>
  23. #include <yql/essentials/core/services/yql_transform_pipeline.h>
  24. #include <yql/essentials/public/purecalc/common/names.h>
  25. #include <yql/essentials/public/purecalc/common/transformations/type_annotation.h>
  26. #include <yql/essentials/public/purecalc/common/transformations/align_output_schema.h>
  27. #include <yql/essentials/public/purecalc/common/transformations/extract_used_columns.h>
  28. #include <yql/essentials/public/purecalc/common/transformations/output_columns_filter.h>
  29. #include <yql/essentials/public/purecalc/common/transformations/replace_table_reads.h>
  30. #include <yql/essentials/public/purecalc/common/transformations/root_to_blocks.h>
  31. #include <yql/essentials/public/purecalc/common/transformations/utils.h>
  32. #include <yql/essentials/utils/log/log.h>
  33. #include <util/stream/trace.h>
  34. using namespace NYql;
  35. using namespace NYql::NPureCalc;
  36. template <typename TBase>
  37. TWorkerFactory<TBase>::TWorkerFactory(TWorkerFactoryOptions options, EProcessorMode processorMode)
  38. : Factory_(std::move(options.Factory))
  39. , FuncRegistry_(std::move(options.FuncRegistry))
  40. , UserData_(std::move(options.UserData))
  41. , LLVMSettings_(std::move(options.LLVMSettings))
  42. , BlockEngineMode_(options.BlockEngineMode)
  43. , ExprOutputStream_(options.ExprOutputStream)
  44. , CountersProvider_(options.CountersProvider_)
  45. , NativeYtTypeFlags_(options.NativeYtTypeFlags_)
  46. , DeterministicTimeProviderSeed_(options.DeterministicTimeProviderSeed_)
  47. , UseSystemColumns_(options.UseSystemColumns)
  48. , UseWorkerPool_(options.UseWorkerPool)
  49. {
  50. // Prepare input struct types and extract all column names from inputs
  51. const auto& inputSchemas = options.InputSpec.GetSchemas();
  52. const auto& allVirtualColumns = options.InputSpec.GetAllVirtualColumns();
  53. YQL_ENSURE(inputSchemas.size() == allVirtualColumns.size());
  54. const auto inputsCount = inputSchemas.size();
  55. for (ui32 i = 0; i < inputsCount; ++i) {
  56. const auto* originalInputType = MakeTypeFromSchema(inputSchemas[i], ExprContext_);
  57. if (!ValidateInputSchema(originalInputType, ExprContext_)) {
  58. ythrow TCompileError("", GetIssues().ToString()) << "invalid schema for #" << i << " input";
  59. }
  60. const auto* originalStructType = originalInputType->template Cast<TStructExprType>();
  61. const auto* structType = ExtendStructType(originalStructType, allVirtualColumns[i], ExprContext_);
  62. InputTypes_.push_back(structType);
  63. OriginalInputTypes_.push_back(originalStructType);
  64. RawInputTypes_.push_back(originalStructType);
  65. auto& columnsSet = AllColumns_.emplace_back();
  66. for (const auto* structItem : structType->GetItems()) {
  67. columnsSet.insert(TString(structItem->GetName()));
  68. if (!UseSystemColumns_ && structItem->GetName().StartsWith(PurecalcSysColumnsPrefix)) {
  69. ythrow TCompileError("", GetIssues().ToString())
  70. << "#" << i << " input provides system column " << structItem->GetName()
  71. << ", but it is forbidden by options";
  72. }
  73. }
  74. }
  75. // Prepare output type
  76. auto outputSchema = options.OutputSpec.GetSchema();
  77. if (!outputSchema.IsNull()) {
  78. OutputType_ = MakeTypeFromSchema(outputSchema, ExprContext_);
  79. if (!ValidateOutputSchema(OutputType_, ExprContext_)) {
  80. ythrow TCompileError("", GetIssues().ToString()) << "invalid output schema";
  81. }
  82. } else {
  83. OutputType_ = nullptr;
  84. }
  85. RawOutputType_ = OutputType_;
  86. // Translate
  87. if (options.TranslationMode_ == ETranslationMode::Mkql) {
  88. SerializedProgram_ = TString{options.Query};
  89. } else {
  90. ExprRoot_ = Compile(options.Query, options.TranslationMode_,
  91. options.ModuleResolver, options.SyntaxVersion_, options.Modules,
  92. options.InputSpec, options.OutputSpec, options.UseAntlr4, processorMode);
  93. RawOutputType_ = GetSequenceItemType(ExprRoot_->Pos(), ExprRoot_->GetTypeAnn(), true, ExprContext_);
  94. // Deduce output type if it wasn't provided by output spec
  95. if (!OutputType_) {
  96. OutputType_ = RawOutputType_;
  97. // XXX: Tweak the obtained expression type, is the spec supports blocks:
  98. // 1. Remove "_yql_block_length" attribute, since it's for internal usage.
  99. // 2. Strip block container from the type to store its internal type.
  100. if (options.OutputSpec.AcceptsBlocks()) {
  101. Y_ENSURE(OutputType_->GetKind() == ETypeAnnotationKind::Struct);
  102. OutputType_ = UnwrapBlockStruct(OutputType_->Cast<TStructExprType>(), ExprContext_);
  103. }
  104. }
  105. if (!OutputType_) {
  106. ythrow TCompileError("", GetIssues().ToString()) << "cannot deduce output schema";
  107. }
  108. }
  109. }
  110. template <typename TBase>
  111. TExprNode::TPtr TWorkerFactory<TBase>::Compile(
  112. TStringBuf query,
  113. ETranslationMode mode,
  114. IModuleResolver::TPtr factoryModuleResolver,
  115. ui16 syntaxVersion,
  116. const THashMap<TString, TString>& modules,
  117. const TInputSpecBase& inputSpec,
  118. const TOutputSpecBase& outputSpec,
  119. bool useAntlr4,
  120. EProcessorMode processorMode
  121. ) {
  122. if (mode == ETranslationMode::PG && processorMode != EProcessorMode::PullList) {
  123. ythrow TCompileError("", "") << "only PullList mode is compatible to PostgreSQL syntax";
  124. }
  125. // Prepare type annotation context
  126. TTypeAnnotationContextPtr typeContext;
  127. IModuleResolver::TPtr moduleResolver = factoryModuleResolver ? factoryModuleResolver->CreateMutableChild() : nullptr;
  128. typeContext = MakeIntrusive<TTypeAnnotationContext>();
  129. typeContext->RandomProvider = CreateDefaultRandomProvider();
  130. typeContext->TimeProvider = DeterministicTimeProviderSeed_ ?
  131. CreateDeterministicTimeProvider(*DeterministicTimeProviderSeed_) :
  132. CreateDefaultTimeProvider();
  133. typeContext->UdfResolver = NCommon::CreateSimpleUdfResolver(FuncRegistry_.Get());
  134. typeContext->ArrowResolver = MakeSimpleArrowResolver(*FuncRegistry_.Get());
  135. typeContext->UserDataStorage = MakeIntrusive<TUserDataStorage>(nullptr, UserData_, nullptr, nullptr);
  136. typeContext->Modules = moduleResolver;
  137. typeContext->BlockEngineMode = BlockEngineMode_;
  138. auto configProvider = CreateConfigProvider(*typeContext, nullptr, "");
  139. typeContext->AddDataSource(ConfigProviderName, configProvider);
  140. typeContext->Initialize(ExprContext_);
  141. if (auto modules = dynamic_cast<TModuleResolver*>(moduleResolver.get())) {
  142. modules->AttachUserData(typeContext->UserDataStorage);
  143. }
  144. // Parse SQL/s-expr into AST
  145. TAstParseResult astRes;
  146. if (mode == ETranslationMode::SQL || mode == ETranslationMode::PG) {
  147. NSQLTranslation::TTranslationSettings settings;
  148. typeContext->DeprecatedSQL = (syntaxVersion == 0);
  149. if (mode == ETranslationMode::PG) {
  150. settings.PgParser = true;
  151. }
  152. settings.SyntaxVersion = syntaxVersion;
  153. settings.V0Behavior = NSQLTranslation::EV0Behavior::Disable;
  154. settings.EmitReadsForExists = true;
  155. settings.Antlr4Parser = useAntlr4;
  156. settings.Mode = NSQLTranslation::ESqlMode::LIMITED_VIEW;
  157. settings.DefaultCluster = PurecalcDefaultCluster;
  158. settings.ClusterMapping[settings.DefaultCluster] = PurecalcDefaultService;
  159. settings.ModuleMapping = modules;
  160. settings.EnableGenericUdfs = true;
  161. settings.File = "generated.sql";
  162. settings.Flags = {
  163. "AnsiOrderByLimitInUnionAll",
  164. "AnsiRankForNullableKeys",
  165. "DisableAnsiOptionalAs",
  166. "DisableCoalesceJoinKeysOnQualifiedAll",
  167. "DisableUnorderedSubqueries",
  168. "FlexibleTypes"
  169. };
  170. if (BlockEngineMode_ != EBlockEngineMode::Disable) {
  171. settings.Flags.insert("EmitAggApply");
  172. }
  173. for (const auto& [key, block] : UserData_) {
  174. TStringBuf alias(key.Alias());
  175. if (block.Usage.Test(EUserDataBlockUsage::Library) && !alias.StartsWith("/lib")) {
  176. alias.SkipPrefix("/home/");
  177. settings.Libraries.emplace(alias);
  178. }
  179. }
  180. NSQLTranslation::TTranslators translators(
  181. nullptr,
  182. NSQLTranslationV1::MakeTranslator(),
  183. NSQLTranslationPG::MakeTranslator()
  184. );
  185. astRes = SqlToYql(translators, TString(query), settings);
  186. } else {
  187. astRes = ParseAst(TString(query));
  188. }
  189. ExprContext_.IssueManager.AddIssues(astRes.Issues);
  190. if (!astRes.IsOk()) {
  191. ythrow TCompileError(TString(query), GetIssues().ToString()) << "failed to parse " << mode;
  192. }
  193. if (ETraceLevel::TRACE_DETAIL <= StdDbgLevel()) {
  194. Cdbg << "Before optimization:" << Endl;
  195. astRes.Root->PrettyPrintTo(Cdbg, TAstPrintFlags::PerLine | TAstPrintFlags::ShortQuote | TAstPrintFlags::AdaptArbitraryContent);
  196. }
  197. // Translate AST into expression
  198. TExprNode::TPtr exprRoot;
  199. if (!CompileExpr(*astRes.Root, exprRoot, ExprContext_, moduleResolver.get(), nullptr, 0, syntaxVersion)) {
  200. TStringStream astStr;
  201. astRes.Root->PrettyPrintTo(astStr, TAstPrintFlags::ShortQuote | TAstPrintFlags::PerLine);
  202. ythrow TCompileError(astStr.Str(), GetIssues().ToString()) << "failed to compile";
  203. }
  204. // Prepare transformation pipeline
  205. THolder<IGraphTransformer> calcTransformer = CreateFunctorTransformer([&](TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx)
  206. -> IGraphTransformer::TStatus
  207. {
  208. output = input;
  209. auto valueNode = input->HeadPtr();
  210. auto peepHole = MakePeepholeOptimization(typeContext);
  211. auto status = SyncTransform(*peepHole, valueNode, ctx);
  212. if (status != IGraphTransformer::TStatus::Ok) {
  213. return status;
  214. }
  215. TStringStream out;
  216. NYson::TYsonWriter writer(&out, NYson::EYsonFormat::Text, ::NYson::EYsonType::Node, true);
  217. writer.OnBeginMap();
  218. writer.OnKeyedItem("Data");
  219. TWorkerGraph graph(
  220. valueNode,
  221. ctx,
  222. {},
  223. *FuncRegistry_,
  224. UserData_,
  225. {},
  226. {},
  227. {},
  228. valueNode->GetTypeAnn(),
  229. valueNode->GetTypeAnn(),
  230. LLVMSettings_,
  231. CountersProvider_,
  232. NativeYtTypeFlags_,
  233. DeterministicTimeProviderSeed_
  234. );
  235. with_lock (graph.ScopedAlloc_) {
  236. const auto value = graph.ComputationGraph_->GetValue();
  237. NCommon::WriteYsonValue(writer, value, const_cast<NKikimr::NMiniKQL::TType*>(graph.OutputType_), nullptr);
  238. }
  239. writer.OnEndMap();
  240. auto ysonAtom = ctx.NewAtom(TPositionHandle(), out.Str());
  241. input->SetResult(std::move(ysonAtom));
  242. return IGraphTransformer::TStatus::Ok;
  243. });
  244. const TString& selfName = TString(inputSpec.ProvidesBlocks()
  245. ? PurecalcBlockInputCallableName
  246. : PurecalcInputCallableName);
  247. TTransformationPipeline pipeline(typeContext);
  248. pipeline.Add(MakeTableReadsReplacer(InputTypes_, UseSystemColumns_, processorMode, selfName),
  249. "ReplaceTableReads", EYqlIssueCode::TIssuesIds_EIssueCode_DEFAULT_ERROR,
  250. "Replace reads from tables");
  251. pipeline.AddServiceTransformers();
  252. pipeline.AddPreTypeAnnotation();
  253. pipeline.AddExpressionEvaluation(*FuncRegistry_, calcTransformer.Get());
  254. pipeline.AddIOAnnotation();
  255. pipeline.AddTypeAnnotationTransformer(MakeTypeAnnotationTransformer(typeContext, InputTypes_, RawInputTypes_, processorMode, selfName));
  256. pipeline.AddPostTypeAnnotation();
  257. pipeline.Add(CreateFunctorTransformer(
  258. [&](const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) {
  259. return OptimizeExpr(input, output, [](const TExprNode::TPtr& node, TExprContext&) -> TExprNode::TPtr {
  260. if (node->IsCallable("Unordered") && node->Child(0)->IsCallable({
  261. PurecalcInputCallableName, PurecalcBlockInputCallableName
  262. })) {
  263. return node->ChildPtr(0);
  264. }
  265. return node;
  266. }, ctx, TOptimizeExprSettings(nullptr));
  267. }), "Unordered", EYqlIssueCode::TIssuesIds_EIssueCode_DEFAULT_ERROR,
  268. "Unordered optimizations");
  269. pipeline.Add(CreateFunctorTransformer(
  270. [&](const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) {
  271. return OptimizeExpr(input, output, [](const TExprNode::TPtr& node, TExprContext&) -> TExprNode::TPtr {
  272. if (node->IsCallable("Right!") && node->Head().IsCallable("Cons!")) {
  273. return node->Head().ChildPtr(1);
  274. }
  275. return node;
  276. }, ctx, TOptimizeExprSettings(nullptr));
  277. }), "Cons", EYqlIssueCode::TIssuesIds_EIssueCode_DEFAULT_ERROR,
  278. "Cons optimizations");
  279. pipeline.Add(MakeOutputColumnsFilter(outputSpec.GetOutputColumnsFilter()),
  280. "Filter", EYqlIssueCode::TIssuesIds_EIssueCode_DEFAULT_ERROR,
  281. "Filter output columns");
  282. pipeline.Add(MakeRootToBlocks(outputSpec.AcceptsBlocks(), processorMode),
  283. "RootToBlocks", EYqlIssueCode::TIssuesIds_EIssueCode_DEFAULT_ERROR,
  284. "Rewrite the root if the output spec accepts blocks");
  285. pipeline.Add(MakeOutputAligner(OutputType_, outputSpec.AcceptsBlocks(), processorMode),
  286. "Convert", EYqlIssueCode::TIssuesIds_EIssueCode_DEFAULT_ERROR,
  287. "Align return type of the program to output schema");
  288. pipeline.AddCommonOptimization();
  289. pipeline.AddFinalCommonOptimization();
  290. pipeline.Add(MakeUsedColumnsExtractor(&UsedColumns_, AllColumns_),
  291. "ExtractColumns", EYqlIssueCode::TIssuesIds_EIssueCode_DEFAULT_ERROR,
  292. "Extract used columns");
  293. pipeline.Add(MakePeepholeOptimization(typeContext),
  294. "PeepHole", EYqlIssueCode::TIssuesIds_EIssueCode_DEFAULT_ERROR,
  295. "Peephole optimizations");
  296. pipeline.AddCheckExecution(false);
  297. // Apply optimizations
  298. auto transformer = pipeline.Build();
  299. auto status = SyncTransform(*transformer, exprRoot, ExprContext_);
  300. auto transformStats = transformer->GetStatistics();
  301. TStringStream out;
  302. NYson::TYsonWriter writer(&out, NYson::EYsonFormat::Pretty);
  303. NCommon::TransformerStatsToYson("", transformStats, writer);
  304. YQL_CLOG(DEBUG, Core) << "Transform stats: " << out.Str();
  305. if (status == IGraphTransformer::TStatus::Error) {
  306. ythrow TCompileError("", GetIssues().ToString()) << "Failed to optimize";
  307. }
  308. IOutputStream* exprOut = nullptr;
  309. if (ExprOutputStream_) {
  310. exprOut = ExprOutputStream_;
  311. } else if (ETraceLevel::TRACE_DETAIL <= StdDbgLevel()) {
  312. exprOut = &Cdbg;
  313. }
  314. if (exprOut) {
  315. *exprOut << "After optimization:" << Endl;
  316. ConvertToAst(*exprRoot, ExprContext_, 0, true).Root
  317. ->PrettyPrintTo(*exprOut, TAstPrintFlags::PerLine
  318. | TAstPrintFlags::ShortQuote
  319. | TAstPrintFlags::AdaptArbitraryContent);
  320. }
  321. return exprRoot;
  322. }
  323. template <typename TBase>
  324. NYT::TNode TWorkerFactory<TBase>::MakeInputSchema(ui32 inputIndex) const {
  325. Y_ENSURE(
  326. inputIndex < InputTypes_.size(),
  327. "invalid input index (" << inputIndex << ") in MakeInputSchema call");
  328. return NCommon::TypeToYsonNode(InputTypes_[inputIndex]);
  329. }
  330. template <typename TBase>
  331. NYT::TNode TWorkerFactory<TBase>::MakeInputSchema() const {
  332. Y_ENSURE(
  333. InputTypes_.size() == 1,
  334. "MakeInputSchema() can be used only with single-input programs");
  335. return NCommon::TypeToYsonNode(InputTypes_[0]);
  336. }
  337. template <typename TBase>
  338. NYT::TNode TWorkerFactory<TBase>::MakeOutputSchema() const {
  339. Y_ENSURE(OutputType_, "MakeOutputSchema() cannot be used with precompiled programs");
  340. Y_ENSURE(
  341. OutputType_->GetKind() == ETypeAnnotationKind::Struct,
  342. "MakeOutputSchema() cannot be used with multi-output programs");
  343. return NCommon::TypeToYsonNode(OutputType_);
  344. }
  345. template <typename TBase>
  346. NYT::TNode TWorkerFactory<TBase>::MakeOutputSchema(ui32 index) const {
  347. Y_ENSURE(OutputType_, "MakeOutputSchema() cannot be used with precompiled programs");
  348. Y_ENSURE(
  349. OutputType_->GetKind() == ETypeAnnotationKind::Variant,
  350. "MakeOutputSchema(ui32) cannot be used with single-output programs");
  351. auto vtype = OutputType_->template Cast<TVariantExprType>();
  352. Y_ENSURE(
  353. vtype->GetUnderlyingType()->GetKind() == ETypeAnnotationKind::Tuple,
  354. "MakeOutputSchema(ui32) cannot be used to process variants over struct");
  355. auto ttype = vtype->GetUnderlyingType()->template Cast<TTupleExprType>();
  356. Y_ENSURE(
  357. index < ttype->GetSize(),
  358. "Invalid table index " << index);
  359. return NCommon::TypeToYsonNode(ttype->GetItems()[index]);
  360. }
  361. template <typename TBase>
  362. NYT::TNode TWorkerFactory<TBase>::MakeOutputSchema(TStringBuf tableName) const {
  363. Y_ENSURE(OutputType_, "MakeOutputSchema() cannot be used with precompiled programs");
  364. Y_ENSURE(
  365. OutputType_->GetKind() == ETypeAnnotationKind::Variant,
  366. "MakeOutputSchema(TStringBuf) cannot be used with single-output programs");
  367. auto vtype = OutputType_->template Cast<TVariantExprType>();
  368. Y_ENSURE(
  369. vtype->GetUnderlyingType()->GetKind() == ETypeAnnotationKind::Struct,
  370. "MakeOutputSchema(TStringBuf) cannot be used to process variants over tuple");
  371. auto stype = vtype->GetUnderlyingType()->template Cast<TStructExprType>();
  372. auto index = stype->FindItem(tableName);
  373. Y_ENSURE(
  374. index.Defined(),
  375. "Invalid table index " << TString{tableName}.Quote());
  376. return NCommon::TypeToYsonNode(stype->GetItems()[*index]->GetItemType());
  377. }
  378. template <typename TBase>
  379. NYT::TNode TWorkerFactory<TBase>::MakeFullOutputSchema() const {
  380. Y_ENSURE(OutputType_, "MakeFullOutputSchema() cannot be used with precompiled programs");
  381. return NCommon::TypeToYsonNode(OutputType_);
  382. }
  383. template <typename TBase>
  384. const THashSet<TString>& TWorkerFactory<TBase>::GetUsedColumns(ui32 inputIndex) const {
  385. Y_ENSURE(
  386. inputIndex < UsedColumns_.size(),
  387. "invalid input index (" << inputIndex << ") in GetUsedColumns call");
  388. return UsedColumns_[inputIndex];
  389. }
  390. template <typename TBase>
  391. const THashSet<TString>& TWorkerFactory<TBase>::GetUsedColumns() const {
  392. Y_ENSURE(
  393. UsedColumns_.size() == 1,
  394. "GetUsedColumns() can be used only with single-input programs");
  395. return UsedColumns_[0];
  396. }
  397. template <typename TBase>
  398. TIssues TWorkerFactory<TBase>::GetIssues() const {
  399. auto issues = ExprContext_.IssueManager.GetCompletedIssues();
  400. CheckFatalIssues(issues);
  401. return issues;
  402. }
  403. template <typename TBase>
  404. TString TWorkerFactory<TBase>::GetCompiledProgram() {
  405. if (ExprRoot_) {
  406. NKikimr::NMiniKQL::TScopedAlloc alloc(__LOCATION__, NKikimr::TAlignedPagePoolCounters(),
  407. FuncRegistry_->SupportsSizedAllocators());
  408. NKikimr::NMiniKQL::TTypeEnvironment env(alloc);
  409. auto rootNode = CompileMkql(ExprRoot_, ExprContext_, *FuncRegistry_, env, UserData_);
  410. return NKikimr::NMiniKQL::SerializeRuntimeNode(rootNode, env);
  411. }
  412. return SerializedProgram_;
  413. }
  414. template <typename TBase>
  415. void TWorkerFactory<TBase>::ReturnWorker(IWorker* worker) {
  416. THolder<IWorker> tmp(worker);
  417. if (UseWorkerPool_) {
  418. WorkerPool_.push_back(std::move(tmp));
  419. }
  420. }
  421. #define DEFINE_WORKER_MAKER(MODE) \
  422. TWorkerHolder<I##MODE##Worker> T##MODE##WorkerFactory::MakeWorker() { \
  423. if (!WorkerPool_.empty()) { \
  424. auto res = std::move(WorkerPool_.back()); \
  425. WorkerPool_.pop_back(); \
  426. return TWorkerHolder<I##MODE##Worker>((I##MODE##Worker *)res.Release()); \
  427. } \
  428. return TWorkerHolder<I##MODE##Worker>(new T##MODE##Worker( \
  429. weak_from_this(), \
  430. ExprRoot_, \
  431. ExprContext_, \
  432. SerializedProgram_, \
  433. *FuncRegistry_, \
  434. UserData_, \
  435. InputTypes_, \
  436. OriginalInputTypes_, \
  437. RawInputTypes_, \
  438. OutputType_, \
  439. RawOutputType_, \
  440. LLVMSettings_, \
  441. CountersProvider_, \
  442. NativeYtTypeFlags_, \
  443. DeterministicTimeProviderSeed_ \
  444. )); \
  445. }
  446. DEFINE_WORKER_MAKER(PullStream)
  447. DEFINE_WORKER_MAKER(PullList)
  448. DEFINE_WORKER_MAKER(PushStream)
  449. namespace NYql {
  450. namespace NPureCalc {
  451. template
  452. class TWorkerFactory<IPullStreamWorkerFactory>;
  453. template
  454. class TWorkerFactory<IPullListWorkerFactory>;
  455. template
  456. class TWorkerFactory<IPushStreamWorkerFactory>;
  457. }
  458. }