worker_factory.cpp 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568
  1. #include "worker_factory.h"
  2. #include "type_from_schema.h"
  3. #include "worker.h"
  4. #include "compile_mkql.h"
  5. #include <yql/essentials/sql/sql.h>
  6. #include <yql/essentials/sql/v1/sql.h>
  7. //FIXME {
  8. #include <yql/essentials/sql/v1/lexer/antlr3/lexer.h>
  9. #include <yql/essentials/sql/v1/lexer/antlr3_ansi/lexer.h>
  10. #include <yql/essentials/sql/v1/proto_parser/antlr3/proto_parser.h>
  11. #include <yql/essentials/sql/v1/proto_parser/antlr3_ansi/proto_parser.h>
  12. //}
  13. #include <yql/essentials/sql/v1/lexer/antlr4/lexer.h>
  14. #include <yql/essentials/sql/v1/lexer/antlr4_ansi/lexer.h>
  15. #include <yql/essentials/sql/v1/proto_parser/antlr4/proto_parser.h>
  16. #include <yql/essentials/sql/v1/proto_parser/antlr4_ansi/proto_parser.h>
  17. #include <yql/essentials/parser/pg_wrapper/interface/parser.h>
  18. #include <yql/essentials/ast/yql_expr.h>
  19. #include <yql/essentials/core/yql_expr_optimize.h>
  20. #include <yql/essentials/core/yql_type_helpers.h>
  21. #include <yql/essentials/core/peephole_opt/yql_opt_peephole_physical.h>
  22. #include <yql/essentials/providers/common/codec/yql_codec.h>
  23. #include <yql/essentials/providers/common/udf_resolve/yql_simple_udf_resolver.h>
  24. #include <yql/essentials/providers/common/arrow_resolve/yql_simple_arrow_resolver.h>
  25. #include <yql/essentials/providers/common/schema/expr/yql_expr_schema.h>
  26. #include <yql/essentials/providers/common/provider/yql_provider.h>
  27. #include <yql/essentials/providers/common/provider/yql_provider_names.h>
  28. #include <yql/essentials/providers/config/yql_config_provider.h>
  29. #include <yql/essentials/minikql/mkql_node.h>
  30. #include <yql/essentials/minikql/mkql_node_serialization.h>
  31. #include <yql/essentials/minikql/mkql_alloc.h>
  32. #include <yql/essentials/minikql/aligned_page_pool.h>
  33. #include <yql/essentials/core/services/yql_transform_pipeline.h>
  34. #include <yql/essentials/public/purecalc/common/names.h>
  35. #include <yql/essentials/public/purecalc/common/transformations/type_annotation.h>
  36. #include <yql/essentials/public/purecalc/common/transformations/align_output_schema.h>
  37. #include <yql/essentials/public/purecalc/common/transformations/extract_used_columns.h>
  38. #include <yql/essentials/public/purecalc/common/transformations/output_columns_filter.h>
  39. #include <yql/essentials/public/purecalc/common/transformations/replace_table_reads.h>
  40. #include <yql/essentials/public/purecalc/common/transformations/root_to_blocks.h>
  41. #include <yql/essentials/public/purecalc/common/transformations/utils.h>
  42. #include <yql/essentials/utils/log/log.h>
  43. #include <util/stream/trace.h>
  44. using namespace NYql;
  45. using namespace NYql::NPureCalc;
  46. template <typename TBase>
  47. TWorkerFactory<TBase>::TWorkerFactory(TWorkerFactoryOptions options, EProcessorMode processorMode)
  48. : Factory_(std::move(options.Factory))
  49. , FuncRegistry_(std::move(options.FuncRegistry))
  50. , UserData_(std::move(options.UserData))
  51. , LLVMSettings_(std::move(options.LLVMSettings))
  52. , BlockEngineMode_(options.BlockEngineMode)
  53. , ExprOutputStream_(options.ExprOutputStream)
  54. , CountersProvider_(options.CountersProvider_)
  55. , NativeYtTypeFlags_(options.NativeYtTypeFlags_)
  56. , DeterministicTimeProviderSeed_(options.DeterministicTimeProviderSeed_)
  57. , UseSystemColumns_(options.UseSystemColumns)
  58. , UseWorkerPool_(options.UseWorkerPool)
  59. {
  60. // Prepare input struct types and extract all column names from inputs
  61. const auto& inputSchemas = options.InputSpec.GetSchemas();
  62. const auto& allVirtualColumns = options.InputSpec.GetAllVirtualColumns();
  63. YQL_ENSURE(inputSchemas.size() == allVirtualColumns.size());
  64. const auto inputsCount = inputSchemas.size();
  65. for (ui32 i = 0; i < inputsCount; ++i) {
  66. const auto* originalInputType = MakeTypeFromSchema(inputSchemas[i], ExprContext_);
  67. if (!ValidateInputSchema(originalInputType, ExprContext_)) {
  68. ythrow TCompileError("", GetIssues().ToString()) << "invalid schema for #" << i << " input";
  69. }
  70. const auto* originalStructType = originalInputType->template Cast<TStructExprType>();
  71. const auto* structType = ExtendStructType(originalStructType, allVirtualColumns[i], ExprContext_);
  72. InputTypes_.push_back(structType);
  73. OriginalInputTypes_.push_back(originalStructType);
  74. RawInputTypes_.push_back(originalStructType);
  75. auto& columnsSet = AllColumns_.emplace_back();
  76. for (const auto* structItem : structType->GetItems()) {
  77. columnsSet.insert(TString(structItem->GetName()));
  78. if (!UseSystemColumns_ && structItem->GetName().StartsWith(PurecalcSysColumnsPrefix)) {
  79. ythrow TCompileError("", GetIssues().ToString())
  80. << "#" << i << " input provides system column " << structItem->GetName()
  81. << ", but it is forbidden by options";
  82. }
  83. }
  84. }
  85. // Prepare output type
  86. auto outputSchema = options.OutputSpec.GetSchema();
  87. if (!outputSchema.IsNull()) {
  88. OutputType_ = MakeTypeFromSchema(outputSchema, ExprContext_);
  89. if (!ValidateOutputSchema(OutputType_, ExprContext_)) {
  90. ythrow TCompileError("", GetIssues().ToString()) << "invalid output schema";
  91. }
  92. } else {
  93. OutputType_ = nullptr;
  94. }
  95. RawOutputType_ = OutputType_;
  96. // Translate
  97. if (options.TranslationMode_ == ETranslationMode::Mkql) {
  98. SerializedProgram_ = TString{options.Query};
  99. } else {
  100. ExprRoot_ = Compile(options.Query, options.TranslationMode_,
  101. options.ModuleResolver, options.SyntaxVersion_, options.Modules,
  102. options.InputSpec, options.OutputSpec, options.UseAntlr4, processorMode);
  103. RawOutputType_ = GetSequenceItemType(ExprRoot_->Pos(), ExprRoot_->GetTypeAnn(), true, ExprContext_);
  104. // Deduce output type if it wasn't provided by output spec
  105. if (!OutputType_) {
  106. OutputType_ = RawOutputType_;
  107. // XXX: Tweak the obtained expression type, is the spec supports blocks:
  108. // 1. Remove "_yql_block_length" attribute, since it's for internal usage.
  109. // 2. Strip block container from the type to store its internal type.
  110. if (options.OutputSpec.AcceptsBlocks()) {
  111. Y_ENSURE(OutputType_->GetKind() == ETypeAnnotationKind::Struct);
  112. OutputType_ = UnwrapBlockStruct(OutputType_->Cast<TStructExprType>(), ExprContext_);
  113. }
  114. }
  115. if (!OutputType_) {
  116. ythrow TCompileError("", GetIssues().ToString()) << "cannot deduce output schema";
  117. }
  118. }
  119. }
  120. template <typename TBase>
  121. TExprNode::TPtr TWorkerFactory<TBase>::Compile(
  122. TStringBuf query,
  123. ETranslationMode mode,
  124. IModuleResolver::TPtr factoryModuleResolver,
  125. ui16 syntaxVersion,
  126. const THashMap<TString, TString>& modules,
  127. const TInputSpecBase& inputSpec,
  128. const TOutputSpecBase& outputSpec,
  129. bool useAntlr4,
  130. EProcessorMode processorMode
  131. ) {
  132. if (mode == ETranslationMode::PG && processorMode != EProcessorMode::PullList) {
  133. ythrow TCompileError("", "") << "only PullList mode is compatible to PostgreSQL syntax";
  134. }
  135. // Prepare type annotation context
  136. TTypeAnnotationContextPtr typeContext;
  137. IModuleResolver::TPtr moduleResolver = factoryModuleResolver ? factoryModuleResolver->CreateMutableChild() : nullptr;
  138. typeContext = MakeIntrusive<TTypeAnnotationContext>();
  139. typeContext->RandomProvider = CreateDefaultRandomProvider();
  140. typeContext->TimeProvider = DeterministicTimeProviderSeed_ ?
  141. CreateDeterministicTimeProvider(*DeterministicTimeProviderSeed_) :
  142. CreateDefaultTimeProvider();
  143. typeContext->UdfResolver = NCommon::CreateSimpleUdfResolver(FuncRegistry_.Get());
  144. typeContext->ArrowResolver = MakeSimpleArrowResolver(*FuncRegistry_.Get());
  145. typeContext->UserDataStorage = MakeIntrusive<TUserDataStorage>(nullptr, UserData_, nullptr, nullptr);
  146. typeContext->Modules = moduleResolver;
  147. typeContext->BlockEngineMode = BlockEngineMode_;
  148. auto configProvider = CreateConfigProvider(*typeContext, nullptr, "");
  149. typeContext->AddDataSource(ConfigProviderName, configProvider);
  150. typeContext->Initialize(ExprContext_);
  151. if (auto modules = dynamic_cast<TModuleResolver*>(moduleResolver.get())) {
  152. modules->AttachUserData(typeContext->UserDataStorage);
  153. }
  154. // Parse SQL/s-expr into AST
  155. TAstParseResult astRes;
  156. if (mode == ETranslationMode::SQL || mode == ETranslationMode::PG) {
  157. NSQLTranslation::TTranslationSettings settings;
  158. typeContext->DeprecatedSQL = (syntaxVersion == 0);
  159. if (mode == ETranslationMode::PG) {
  160. settings.PgParser = true;
  161. }
  162. settings.SyntaxVersion = syntaxVersion;
  163. settings.V0Behavior = NSQLTranslation::EV0Behavior::Disable;
  164. settings.EmitReadsForExists = true;
  165. settings.Antlr4Parser = useAntlr4;
  166. settings.Mode = NSQLTranslation::ESqlMode::LIMITED_VIEW;
  167. settings.DefaultCluster = PurecalcDefaultCluster;
  168. settings.ClusterMapping[settings.DefaultCluster] = PurecalcDefaultService;
  169. settings.ModuleMapping = modules;
  170. settings.EnableGenericUdfs = true;
  171. settings.File = "generated.sql";
  172. settings.Flags = {
  173. "AnsiOrderByLimitInUnionAll",
  174. "AnsiRankForNullableKeys",
  175. "DisableAnsiOptionalAs",
  176. "DisableCoalesceJoinKeysOnQualifiedAll",
  177. "DisableUnorderedSubqueries",
  178. "FlexibleTypes"
  179. };
  180. if (BlockEngineMode_ != EBlockEngineMode::Disable) {
  181. settings.Flags.insert("EmitAggApply");
  182. }
  183. for (const auto& [key, block] : UserData_) {
  184. TStringBuf alias(key.Alias());
  185. if (block.Usage.Test(EUserDataBlockUsage::Library) && !alias.StartsWith("/lib")) {
  186. alias.SkipPrefix("/home/");
  187. settings.Libraries.emplace(alias);
  188. }
  189. }
  190. NSQLTranslationV1::TLexers lexers;
  191. lexers.Antlr3 = NSQLTranslationV1::MakeAntlr3LexerFactory();
  192. lexers.Antlr3Ansi = NSQLTranslationV1::MakeAntlr3AnsiLexerFactory();
  193. lexers.Antlr4 = NSQLTranslationV1::MakeAntlr4LexerFactory();
  194. lexers.Antlr4Ansi = NSQLTranslationV1::MakeAntlr4AnsiLexerFactory();
  195. NSQLTranslationV1::TParsers parsers;
  196. parsers.Antlr3 = NSQLTranslationV1::MakeAntlr3ParserFactory();
  197. parsers.Antlr3Ansi = NSQLTranslationV1::MakeAntlr3AnsiParserFactory();
  198. parsers.Antlr4 = NSQLTranslationV1::MakeAntlr4ParserFactory();
  199. parsers.Antlr4Ansi = NSQLTranslationV1::MakeAntlr4AnsiParserFactory();
  200. NSQLTranslation::TTranslators translators(
  201. nullptr,
  202. NSQLTranslationV1::MakeTranslator(lexers, parsers),
  203. NSQLTranslationPG::MakeTranslator()
  204. );
  205. astRes = SqlToYql(translators, TString(query), settings);
  206. } else {
  207. astRes = ParseAst(TString(query));
  208. }
  209. ExprContext_.IssueManager.AddIssues(astRes.Issues);
  210. if (!astRes.IsOk()) {
  211. ythrow TCompileError(TString(query), GetIssues().ToString()) << "failed to parse " << mode;
  212. }
  213. if (ETraceLevel::TRACE_DETAIL <= StdDbgLevel()) {
  214. Cdbg << "Before optimization:" << Endl;
  215. astRes.Root->PrettyPrintTo(Cdbg, TAstPrintFlags::PerLine | TAstPrintFlags::ShortQuote | TAstPrintFlags::AdaptArbitraryContent);
  216. }
  217. // Translate AST into expression
  218. TExprNode::TPtr exprRoot;
  219. if (!CompileExpr(*astRes.Root, exprRoot, ExprContext_, moduleResolver.get(), nullptr, 0, syntaxVersion)) {
  220. TStringStream astStr;
  221. astRes.Root->PrettyPrintTo(astStr, TAstPrintFlags::ShortQuote | TAstPrintFlags::PerLine);
  222. ythrow TCompileError(astStr.Str(), GetIssues().ToString()) << "failed to compile";
  223. }
  224. // Prepare transformation pipeline
  225. THolder<IGraphTransformer> calcTransformer = CreateFunctorTransformer([&](TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx)
  226. -> IGraphTransformer::TStatus
  227. {
  228. output = input;
  229. auto valueNode = input->HeadPtr();
  230. auto peepHole = MakePeepholeOptimization(typeContext);
  231. auto status = SyncTransform(*peepHole, valueNode, ctx);
  232. if (status != IGraphTransformer::TStatus::Ok) {
  233. return status;
  234. }
  235. TStringStream out;
  236. NYson::TYsonWriter writer(&out, NYson::EYsonFormat::Text, ::NYson::EYsonType::Node, true);
  237. writer.OnBeginMap();
  238. writer.OnKeyedItem("Data");
  239. TWorkerGraph graph(
  240. valueNode,
  241. ctx,
  242. {},
  243. *FuncRegistry_,
  244. UserData_,
  245. {},
  246. {},
  247. {},
  248. valueNode->GetTypeAnn(),
  249. valueNode->GetTypeAnn(),
  250. LLVMSettings_,
  251. CountersProvider_,
  252. NativeYtTypeFlags_,
  253. DeterministicTimeProviderSeed_
  254. );
  255. with_lock (graph.ScopedAlloc_) {
  256. const auto value = graph.ComputationGraph_->GetValue();
  257. NCommon::WriteYsonValue(writer, value, const_cast<NKikimr::NMiniKQL::TType*>(graph.OutputType_), nullptr);
  258. }
  259. writer.OnEndMap();
  260. auto ysonAtom = ctx.NewAtom(TPositionHandle(), out.Str());
  261. input->SetResult(std::move(ysonAtom));
  262. return IGraphTransformer::TStatus::Ok;
  263. });
  264. const TString& selfName = TString(inputSpec.ProvidesBlocks()
  265. ? PurecalcBlockInputCallableName
  266. : PurecalcInputCallableName);
  267. TTransformationPipeline pipeline(typeContext);
  268. pipeline.Add(MakeTableReadsReplacer(InputTypes_, UseSystemColumns_, processorMode, selfName),
  269. "ReplaceTableReads", EYqlIssueCode::TIssuesIds_EIssueCode_DEFAULT_ERROR,
  270. "Replace reads from tables");
  271. pipeline.AddServiceTransformers();
  272. pipeline.AddPreTypeAnnotation();
  273. pipeline.AddExpressionEvaluation(*FuncRegistry_, calcTransformer.Get());
  274. pipeline.AddIOAnnotation();
  275. pipeline.AddTypeAnnotationTransformer(MakeTypeAnnotationTransformer(typeContext, InputTypes_, RawInputTypes_, processorMode, selfName));
  276. pipeline.AddPostTypeAnnotation();
  277. pipeline.Add(CreateFunctorTransformer(
  278. [&](const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) {
  279. return OptimizeExpr(input, output, [](const TExprNode::TPtr& node, TExprContext&) -> TExprNode::TPtr {
  280. if (node->IsCallable("Unordered") && node->Child(0)->IsCallable({
  281. PurecalcInputCallableName, PurecalcBlockInputCallableName
  282. })) {
  283. return node->ChildPtr(0);
  284. }
  285. return node;
  286. }, ctx, TOptimizeExprSettings(nullptr));
  287. }), "Unordered", EYqlIssueCode::TIssuesIds_EIssueCode_DEFAULT_ERROR,
  288. "Unordered optimizations");
  289. pipeline.Add(CreateFunctorTransformer(
  290. [&](const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) {
  291. return OptimizeExpr(input, output, [](const TExprNode::TPtr& node, TExprContext&) -> TExprNode::TPtr {
  292. if (node->IsCallable("Right!") && node->Head().IsCallable("Cons!")) {
  293. return node->Head().ChildPtr(1);
  294. }
  295. return node;
  296. }, ctx, TOptimizeExprSettings(nullptr));
  297. }), "Cons", EYqlIssueCode::TIssuesIds_EIssueCode_DEFAULT_ERROR,
  298. "Cons optimizations");
  299. pipeline.Add(MakeOutputColumnsFilter(outputSpec.GetOutputColumnsFilter()),
  300. "Filter", EYqlIssueCode::TIssuesIds_EIssueCode_DEFAULT_ERROR,
  301. "Filter output columns");
  302. pipeline.Add(MakeRootToBlocks(outputSpec.AcceptsBlocks(), processorMode),
  303. "RootToBlocks", EYqlIssueCode::TIssuesIds_EIssueCode_DEFAULT_ERROR,
  304. "Rewrite the root if the output spec accepts blocks");
  305. pipeline.Add(MakeOutputAligner(OutputType_, outputSpec.AcceptsBlocks(), processorMode),
  306. "Convert", EYqlIssueCode::TIssuesIds_EIssueCode_DEFAULT_ERROR,
  307. "Align return type of the program to output schema");
  308. pipeline.AddCommonOptimization();
  309. pipeline.AddFinalCommonOptimization();
  310. pipeline.Add(MakeUsedColumnsExtractor(&UsedColumns_, AllColumns_),
  311. "ExtractColumns", EYqlIssueCode::TIssuesIds_EIssueCode_DEFAULT_ERROR,
  312. "Extract used columns");
  313. pipeline.Add(MakePeepholeOptimization(typeContext),
  314. "PeepHole", EYqlIssueCode::TIssuesIds_EIssueCode_DEFAULT_ERROR,
  315. "Peephole optimizations");
  316. pipeline.AddCheckExecution(false);
  317. // Apply optimizations
  318. auto transformer = pipeline.Build();
  319. auto status = SyncTransform(*transformer, exprRoot, ExprContext_);
  320. auto transformStats = transformer->GetStatistics();
  321. TStringStream out;
  322. NYson::TYsonWriter writer(&out, NYson::EYsonFormat::Pretty);
  323. NCommon::TransformerStatsToYson("", transformStats, writer);
  324. YQL_CLOG(DEBUG, Core) << "Transform stats: " << out.Str();
  325. if (status == IGraphTransformer::TStatus::Error) {
  326. ythrow TCompileError("", GetIssues().ToString()) << "Failed to optimize";
  327. }
  328. IOutputStream* exprOut = nullptr;
  329. if (ExprOutputStream_) {
  330. exprOut = ExprOutputStream_;
  331. } else if (ETraceLevel::TRACE_DETAIL <= StdDbgLevel()) {
  332. exprOut = &Cdbg;
  333. }
  334. if (exprOut) {
  335. *exprOut << "After optimization:" << Endl;
  336. ConvertToAst(*exprRoot, ExprContext_, 0, true).Root
  337. ->PrettyPrintTo(*exprOut, TAstPrintFlags::PerLine
  338. | TAstPrintFlags::ShortQuote
  339. | TAstPrintFlags::AdaptArbitraryContent);
  340. }
  341. return exprRoot;
  342. }
  343. template <typename TBase>
  344. NYT::TNode TWorkerFactory<TBase>::MakeInputSchema(ui32 inputIndex) const {
  345. Y_ENSURE(
  346. inputIndex < InputTypes_.size(),
  347. "invalid input index (" << inputIndex << ") in MakeInputSchema call");
  348. return NCommon::TypeToYsonNode(InputTypes_[inputIndex]);
  349. }
  350. template <typename TBase>
  351. NYT::TNode TWorkerFactory<TBase>::MakeInputSchema() const {
  352. Y_ENSURE(
  353. InputTypes_.size() == 1,
  354. "MakeInputSchema() can be used only with single-input programs");
  355. return NCommon::TypeToYsonNode(InputTypes_[0]);
  356. }
  357. template <typename TBase>
  358. NYT::TNode TWorkerFactory<TBase>::MakeOutputSchema() const {
  359. Y_ENSURE(OutputType_, "MakeOutputSchema() cannot be used with precompiled programs");
  360. Y_ENSURE(
  361. OutputType_->GetKind() == ETypeAnnotationKind::Struct,
  362. "MakeOutputSchema() cannot be used with multi-output programs");
  363. return NCommon::TypeToYsonNode(OutputType_);
  364. }
  365. template <typename TBase>
  366. NYT::TNode TWorkerFactory<TBase>::MakeOutputSchema(ui32 index) const {
  367. Y_ENSURE(OutputType_, "MakeOutputSchema() cannot be used with precompiled programs");
  368. Y_ENSURE(
  369. OutputType_->GetKind() == ETypeAnnotationKind::Variant,
  370. "MakeOutputSchema(ui32) cannot be used with single-output programs");
  371. auto vtype = OutputType_->template Cast<TVariantExprType>();
  372. Y_ENSURE(
  373. vtype->GetUnderlyingType()->GetKind() == ETypeAnnotationKind::Tuple,
  374. "MakeOutputSchema(ui32) cannot be used to process variants over struct");
  375. auto ttype = vtype->GetUnderlyingType()->template Cast<TTupleExprType>();
  376. Y_ENSURE(
  377. index < ttype->GetSize(),
  378. "Invalid table index " << index);
  379. return NCommon::TypeToYsonNode(ttype->GetItems()[index]);
  380. }
  381. template <typename TBase>
  382. NYT::TNode TWorkerFactory<TBase>::MakeOutputSchema(TStringBuf tableName) const {
  383. Y_ENSURE(OutputType_, "MakeOutputSchema() cannot be used with precompiled programs");
  384. Y_ENSURE(
  385. OutputType_->GetKind() == ETypeAnnotationKind::Variant,
  386. "MakeOutputSchema(TStringBuf) cannot be used with single-output programs");
  387. auto vtype = OutputType_->template Cast<TVariantExprType>();
  388. Y_ENSURE(
  389. vtype->GetUnderlyingType()->GetKind() == ETypeAnnotationKind::Struct,
  390. "MakeOutputSchema(TStringBuf) cannot be used to process variants over tuple");
  391. auto stype = vtype->GetUnderlyingType()->template Cast<TStructExprType>();
  392. auto index = stype->FindItem(tableName);
  393. Y_ENSURE(
  394. index.Defined(),
  395. "Invalid table index " << TString{tableName}.Quote());
  396. return NCommon::TypeToYsonNode(stype->GetItems()[*index]->GetItemType());
  397. }
  398. template <typename TBase>
  399. NYT::TNode TWorkerFactory<TBase>::MakeFullOutputSchema() const {
  400. Y_ENSURE(OutputType_, "MakeFullOutputSchema() cannot be used with precompiled programs");
  401. return NCommon::TypeToYsonNode(OutputType_);
  402. }
  403. template <typename TBase>
  404. const THashSet<TString>& TWorkerFactory<TBase>::GetUsedColumns(ui32 inputIndex) const {
  405. Y_ENSURE(
  406. inputIndex < UsedColumns_.size(),
  407. "invalid input index (" << inputIndex << ") in GetUsedColumns call");
  408. return UsedColumns_[inputIndex];
  409. }
  410. template <typename TBase>
  411. const THashSet<TString>& TWorkerFactory<TBase>::GetUsedColumns() const {
  412. Y_ENSURE(
  413. UsedColumns_.size() == 1,
  414. "GetUsedColumns() can be used only with single-input programs");
  415. return UsedColumns_[0];
  416. }
  417. template <typename TBase>
  418. TIssues TWorkerFactory<TBase>::GetIssues() const {
  419. auto issues = ExprContext_.IssueManager.GetCompletedIssues();
  420. CheckFatalIssues(issues);
  421. return issues;
  422. }
  423. template <typename TBase>
  424. TString TWorkerFactory<TBase>::GetCompiledProgram() {
  425. if (ExprRoot_) {
  426. NKikimr::NMiniKQL::TScopedAlloc alloc(__LOCATION__, NKikimr::TAlignedPagePoolCounters(),
  427. FuncRegistry_->SupportsSizedAllocators());
  428. NKikimr::NMiniKQL::TTypeEnvironment env(alloc);
  429. auto rootNode = CompileMkql(ExprRoot_, ExprContext_, *FuncRegistry_, env, UserData_);
  430. return NKikimr::NMiniKQL::SerializeRuntimeNode(rootNode, env);
  431. }
  432. return SerializedProgram_;
  433. }
  434. template <typename TBase>
  435. void TWorkerFactory<TBase>::ReturnWorker(IWorker* worker) {
  436. THolder<IWorker> tmp(worker);
  437. if (UseWorkerPool_) {
  438. WorkerPool_.push_back(std::move(tmp));
  439. }
  440. }
  441. #define DEFINE_WORKER_MAKER(MODE) \
  442. TWorkerHolder<I##MODE##Worker> T##MODE##WorkerFactory::MakeWorker() { \
  443. if (!WorkerPool_.empty()) { \
  444. auto res = std::move(WorkerPool_.back()); \
  445. WorkerPool_.pop_back(); \
  446. return TWorkerHolder<I##MODE##Worker>((I##MODE##Worker *)res.Release()); \
  447. } \
  448. return TWorkerHolder<I##MODE##Worker>(new T##MODE##Worker( \
  449. weak_from_this(), \
  450. ExprRoot_, \
  451. ExprContext_, \
  452. SerializedProgram_, \
  453. *FuncRegistry_, \
  454. UserData_, \
  455. InputTypes_, \
  456. OriginalInputTypes_, \
  457. RawInputTypes_, \
  458. OutputType_, \
  459. RawOutputType_, \
  460. LLVMSettings_, \
  461. CountersProvider_, \
  462. NativeYtTypeFlags_, \
  463. DeterministicTimeProviderSeed_ \
  464. )); \
  465. }
  466. DEFINE_WORKER_MAKER(PullStream)
  467. DEFINE_WORKER_MAKER(PullList)
  468. DEFINE_WORKER_MAKER(PushStream)
  469. namespace NYql {
  470. namespace NPureCalc {
  471. template
  472. class TWorkerFactory<IPullStreamWorkerFactory>;
  473. template
  474. class TWorkerFactory<IPullListWorkerFactory>;
  475. template
  476. class TWorkerFactory<IPushStreamWorkerFactory>;
  477. }
  478. }