#include "sql.h" #include "source.h" #include "context.h" #include "match_recognize.h" #include #include #include using namespace NYql; namespace NSQLTranslationV1 { class TSubqueryNode: public INode { public: TSubqueryNode(TSourcePtr&& source, const TString& alias, bool inSubquery, int ensureTupleSize, TScopedStatePtr scoped) : INode(source->GetPos()) , Source(std::move(source)) , Alias(alias) , InSubquery(inSubquery) , EnsureTupleSize(ensureTupleSize) , Scoped(scoped) { YQL_ENSURE(!Alias.empty()); } ISource* GetSource() override { return Source.Get(); } bool DoInit(TContext& ctx, ISource* src) override { YQL_ENSURE(!src, "Source not expected for subquery node"); Source->UseAsInner(); if (!Source->Init(ctx, nullptr)) { return false; } TTableList tableList; Source->GetInputTables(tableList); auto tables = BuildInputTables(Pos, tableList, InSubquery, Scoped); if (!tables->Init(ctx, Source.Get())) { return false; } auto source = Source->Build(ctx); if (!source) { return false; } if (EnsureTupleSize != -1) { source = Y("EnsureTupleSize", source, Q(ToString(EnsureTupleSize))); } Node = Y("let", Alias, Y("block", Q(L(tables, Y("return", Q(Y("world", source))))))); IsUsed = true; return true; } void DoUpdateState() const override { State.Set(ENodeState::Const, true); } bool UsedSubquery() const override { return IsUsed; } TAstNode* Translate(TContext& ctx) const override { Y_DEBUG_ABORT_UNLESS(Node); return Node->Translate(ctx); } const TString* SubqueryAlias() const override { return &Alias; } TPtr DoClone() const final { return new TSubqueryNode(Source->CloneSource(), Alias, InSubquery, EnsureTupleSize, Scoped); } protected: TSourcePtr Source; TNodePtr Node; const TString Alias; const bool InSubquery; const int EnsureTupleSize; bool IsUsed = false; TScopedStatePtr Scoped; }; TNodePtr BuildSubquery(TSourcePtr source, const TString& alias, bool inSubquery, int ensureTupleSize, TScopedStatePtr scoped) { return new TSubqueryNode(std::move(source), alias, inSubquery, ensureTupleSize, scoped); } class TSourceNode: public INode { public: TSourceNode(TPosition pos, TSourcePtr&& source, bool checkExist, bool withTables) : INode(pos) , Source(std::move(source)) , CheckExist(checkExist) , WithTables(withTables) {} ISource* GetSource() override { return Source.Get(); } bool DoInit(TContext& ctx, ISource* src) override { if (AsInner) { Source->UseAsInner(); } if (!Source->Init(ctx, src)) { return false; } Node = Source->Build(ctx); if (!Node) { return false; } if (src) { if (IsSubquery()) { /// should be not used? auto columnsPtr = Source->GetColumns(); if (columnsPtr && (columnsPtr->All || columnsPtr->QualifiedAll || columnsPtr->List.size() == 1)) { Node = Y("SingleMember", Y("SqlAccess", Q("dict"), Y("Take", Node, Y("Uint64", Q("1"))), Y("Uint64", Q("0")))); } else { ctx.Error(Pos) << "Source used in expression should contain one concrete column"; if (RefPos) { ctx.Error(*RefPos) << "Source is used here"; } return false; } } src->AddDependentSource(Source.Get()); } if (Node && WithTables) { TTableList tableList; Source->GetInputTables(tableList); TNodePtr inputTables(BuildInputTables(ctx.Pos(), tableList, IsSubquery(), ctx.Scoped)); if (!inputTables->Init(ctx, Source.Get())) { return false; } auto blockContent = inputTables; blockContent = L(blockContent, Y("return", Node)); Node = Y("block", Q(blockContent)); } return true; } bool IsSubquery() const { return !AsInner && Source->IsSelect() && !CheckExist; } void DoUpdateState() const override { State.Set(ENodeState::Const, IsSubquery()); } TAstNode* Translate(TContext& ctx) const override { Y_DEBUG_ABORT_UNLESS(Node); return Node->Translate(ctx); } TPtr DoClone() const final { return new TSourceNode(Pos, Source->CloneSource(), CheckExist, WithTables); } protected: TSourcePtr Source; TNodePtr Node; bool CheckExist; bool WithTables; }; TNodePtr BuildSourceNode(TPosition pos, TSourcePtr source, bool checkExist, bool withTables) { return new TSourceNode(pos, std::move(source), checkExist, withTables); } class TFakeSource: public ISource { public: TFakeSource(TPosition pos, bool missingFrom, bool inSubquery) : ISource(pos) , MissingFrom(missingFrom) , InSubquery(inSubquery) {} bool IsFake() const override { return true; } TMaybe AddColumn(TContext& ctx, TColumnNode& column) override { // TODO: fix column reference scope - with proper scopes error below should happen earlier if (column.CanBeType()) { return true; } ctx.Error(Pos) << (MissingFrom ? "Column references are not allowed without FROM" : "Source does not allow column references"); ctx.Error(column.GetPos()) << "Column reference " << (column.GetColumnName() ? "'" + *column.GetColumnName() + "'" : "(expr)"); return {}; } bool AddFilter(TContext& ctx, TNodePtr filter) override { Y_UNUSED(filter); auto pos = filter ? filter->GetPos() : Pos; ctx.Error(pos) << (MissingFrom ? "Filtering is not allowed without FROM" : "Source does not allow filtering"); return false; } TNodePtr Build(TContext& ctx) override { Y_UNUSED(ctx); auto ret = Y("AsList", Y("AsStruct")); if (InSubquery) { return Y("WithWorld", ret, "world"); } else { return ret; } } bool AddGroupKey(TContext& ctx, const TString& column) override { Y_UNUSED(column); ctx.Error(Pos) << "Grouping is not allowed " << (MissingFrom ? "without FROM" : "in this context"); return false; } bool AddAggregation(TContext& ctx, TAggregationPtr aggr) override { YQL_ENSURE(aggr); ctx.Error(aggr->GetPos()) << "Aggregation is not allowed " << (MissingFrom ? "without FROM" : "in this context"); return false; } bool AddAggregationOverWindow(TContext& ctx, const TString& windowName, TAggregationPtr func) override { Y_UNUSED(windowName); YQL_ENSURE(func); ctx.Error(func->GetPos()) << "Aggregation is not allowed " << (MissingFrom ? "without FROM" : "in this context"); return false; } bool AddFuncOverWindow(TContext& ctx, const TString& windowName, TNodePtr func) override { Y_UNUSED(windowName); YQL_ENSURE(func); ctx.Error(func->GetPos()) << "Window functions are not allowed " << (MissingFrom ? "without FROM" : "in this context"); return false; } TWindowSpecificationPtr FindWindowSpecification(TContext& ctx, const TString& windowName) const override { Y_UNUSED(windowName); ctx.Error(Pos) << "Window and aggregation functions are not allowed " << (MissingFrom ? "without FROM" : "in this context"); return {}; } bool IsGroupByColumn(const TString& column) const override { Y_UNUSED(column); return false; } TNodePtr BuildFilter(TContext& ctx, const TString& label) override { Y_UNUSED(ctx); Y_UNUSED(label); return nullptr; } std::pair BuildAggregation(const TString& label, TContext& ctx) override { Y_UNUSED(label); Y_UNUSED(ctx); return { nullptr, true }; } TPtr DoClone() const final { return new TFakeSource(Pos, MissingFrom, InSubquery); } private: const bool MissingFrom; const bool InSubquery; }; TSourcePtr BuildFakeSource(TPosition pos, bool missingFrom, bool inSubquery) { return new TFakeSource(pos, missingFrom, inSubquery); } class TNodeSource: public ISource { public: TNodeSource(TPosition pos, const TNodePtr& node, bool wrapToList, bool wrapByTableSource) : ISource(pos) , Node(node) , WrapToList(wrapToList) , WrapByTableSource(wrapByTableSource) { YQL_ENSURE(Node); FakeSource = BuildFakeSource(pos); } bool ShouldUseSourceAsColumn(const TString& source) const final { return source && source != GetLabel(); } TMaybe AddColumn(TContext& ctx, TColumnNode& column) final { Y_UNUSED(ctx); Y_UNUSED(column); return true; } bool DoInit(TContext& ctx, ISource* src) final { if (!Node->Init(ctx, FakeSource.Get())) { return false; } return ISource::DoInit(ctx, src); } TNodePtr Build(TContext& /*ctx*/) final { auto nodeAst = AstNode(Node); if (WrapToList) { nodeAst = Y("ToList", nodeAst); } if (WrapByTableSource) { nodeAst = Y("TableSource", nodeAst); } return nodeAst; } TPtr DoClone() const final { return new TNodeSource(Pos, SafeClone(Node), WrapToList, WrapByTableSource); } private: TNodePtr Node; const bool WrapToList; const bool WrapByTableSource; TSourcePtr FakeSource; }; TSourcePtr BuildNodeSource(TPosition pos, const TNodePtr& node, bool wrapToList, bool wrapByTableSource) { return new TNodeSource(pos, node, wrapToList, wrapByTableSource); } class IProxySource: public ISource { protected: IProxySource(TPosition pos, ISource* src) : ISource(pos) , Source(src) {} void AllColumns() override { Y_DEBUG_ABORT_UNLESS(Source); return Source->AllColumns(); } const TColumns* GetColumns() const override { Y_DEBUG_ABORT_UNLESS(Source); return Source->GetColumns(); } void GetInputTables(TTableList& tableList) const override { if (Source) { Source->GetInputTables(tableList); } ISource::GetInputTables(tableList); } TMaybe AddColumn(TContext& ctx, TColumnNode& column) override { Y_DEBUG_ABORT_UNLESS(Source); const TString label(Source->GetLabel()); Source->SetLabel(Label); const auto ret = Source->AddColumn(ctx, column); Source->SetLabel(label); return ret; } bool ShouldUseSourceAsColumn(const TString& source) const override { return Source->ShouldUseSourceAsColumn(source); } bool IsStream() const override { Y_DEBUG_ABORT_UNLESS(Source); return Source->IsStream(); } EOrderKind GetOrderKind() const override { Y_DEBUG_ABORT_UNLESS(Source); return Source->GetOrderKind(); } TWriteSettings GetWriteSettings() const override { Y_DEBUG_ABORT_UNLESS(Source); return Source->GetWriteSettings(); } protected: void SetSource(ISource* source) { Source = source; } ISource* Source; }; class IRealSource: public ISource { protected: IRealSource(TPosition pos) : ISource(pos) { } void AllColumns() override { Columns.SetAll(); } const TColumns* GetColumns() const override { return &Columns; } TMaybe AddColumn(TContext& ctx, TColumnNode& column) override { const auto& label = *column.GetSourceName(); const auto& source = GetLabel(); if (!label.empty() && label != source && !(source.StartsWith(label) && source[label.size()] == ':')) { if (column.IsReliable()) { ctx.Error(column.GetPos()) << "Unknown correlation name: " << label; } return {}; } if (column.IsAsterisk()) { return true; } const auto* name = column.GetColumnName(); if (name && !column.CanBeType() && !Columns.IsColumnPossible(ctx, *name) && !IsAlias(EExprSeat::GroupBy, *name) && !IsAlias(EExprSeat::DistinctAggr, *name)) { if (column.IsReliable()) { TStringBuilder sb; sb << "Column " << *name << " is not in source column set"; if (const auto mistype = FindColumnMistype(*name)) { sb << ". Did you mean " << mistype.GetRef() << "?"; } ctx.Error(column.GetPos()) << sb; } return {}; } return true; } TMaybe FindColumnMistype(const TString& name) const override { auto result = FindMistypeIn(Columns.Real, name); if (!result) { auto result = FindMistypeIn(Columns.Artificial, name); } return result ? result : ISource::FindColumnMistype(name); } protected: TColumns Columns; }; class IComposableSource : private TNonCopyable { public: virtual ~IComposableSource() = default; virtual void BuildProjectWindowDistinct(TNodePtr& blocks, TContext& ctx, bool ordered) = 0; }; using TComposableSourcePtr = TIntrusivePtr; class TMuxSource: public ISource { public: TMuxSource(TPosition pos, TVector&& sources) : ISource(pos) , Sources(std::move(sources)) { YQL_ENSURE(Sources.size() > 1); } void AllColumns() final { for (auto& source: Sources) { source->AllColumns(); } } const TColumns* GetColumns() const final { // Columns are equal in all sources. Return from the first one return Sources.front()->GetColumns(); } void GetInputTables(TTableList& tableList) const final { for (auto& source: Sources) { source->GetInputTables(tableList); } ISource::GetInputTables(tableList); } bool IsStream() const final { return AnyOf(Sources, [] (const TSourcePtr& s) { return s->IsStream(); }); } bool DoInit(TContext& ctx, ISource* src) final { for (auto& source: Sources) { if (AsInner) { source->UseAsInner(); } if (src) { src->AddDependentSource(source.Get()); } if (!source->Init(ctx, src)) { return false; } if (!source->InitFilters(ctx)) { return false; } } return true; } TMaybe AddColumn(TContext& ctx, TColumnNode& column) final { for (auto& source: Sources) { if (!source->AddColumn(ctx, column)) { return {}; } } return true; } TNodePtr Build(TContext& ctx) final { TNodePtr block; auto muxArgs = Y(); for (size_t i = 0; i < Sources.size(); ++i) { auto& source = Sources[i]; auto input = source->Build(ctx); auto ref = ctx.MakeName("src"); muxArgs->Add(ref); if (block) { block = L(block, Y("let", ref, input)); } else { block = Y(Y("let", ref, input)); } auto filter = source->BuildFilter(ctx, ref); if (filter) { block = L(block, Y("let", ref, filter)); } if (ctx.EnableSystemColumns) { block = L(block, Y("let", ref, Y("RemoveSystemMembers", ref))); } } return GroundWithExpr(block, Y("Mux", Q(muxArgs))); } bool AddFilter(TContext& ctx, TNodePtr filter) final { Y_UNUSED(filter); ctx.Error() << "Filter is not allowed for multiple sources"; return false; } TPtr DoClone() const final { return new TMuxSource(Pos, CloneContainer(Sources)); } protected: TVector Sources; }; TSourcePtr BuildMuxSource(TPosition pos, TVector&& sources) { return new TMuxSource(pos, std::move(sources)); } class TSubqueryRefNode: public IRealSource { public: TSubqueryRefNode(const TNodePtr& subquery, const TString& alias, int tupleIndex) : IRealSource(subquery->GetPos()) , Subquery(subquery) , Alias(alias) , TupleIndex(tupleIndex) { YQL_ENSURE(subquery->GetSource()); } ISource* GetSource() override { return this; } bool DoInit(TContext& ctx, ISource* src) override { // independent subquery should not connect source Subquery->UseAsInner(); if (!Subquery->Init(ctx, nullptr)) { return false; } Columns = *Subquery->GetSource()->GetColumns(); Node = BuildAtom(Pos, Alias, TNodeFlags::Default); if (TupleIndex != -1) { Node = Y("Nth", Node, Q(ToString(TupleIndex))); } if (!Node->Init(ctx, src)) { return false; } if (src && Subquery->GetSource()->IsSelect()) { auto columnsPtr = &Columns; if (columnsPtr && (columnsPtr->All || columnsPtr->QualifiedAll || columnsPtr->List.size() == 1)) { Node = Y("SingleMember", Y("SqlAccess", Q("dict"), Y("Take", Node, Y("Uint64", Q("1"))), Y("Uint64", Q("0")))); } else { ctx.Error(Pos) << "Source used in expression should contain one concrete column"; if (RefPos) { ctx.Error(*RefPos) << "Source is used here"; } return false; } } TNodePtr sample; if (!BuildSamplingLambda(sample)) { return false; } else if (sample) { Node = Y("block", Q(Y(Y("let", Node, Y("OrderedFlatMap", Node, sample)), Y("return", Node)))); } return true; } TNodePtr Build(TContext& ctx) override { Y_UNUSED(ctx); return Node; } bool SetSamplingOptions( TContext& ctx, TPosition pos, ESampleClause sampleClause, ESampleMode mode, TNodePtr samplingRate, TNodePtr samplingSeed) override { if (mode == ESampleMode::System) { ctx.Error(pos) << "only Bernoulli sampling mode is supported for subqueries"; return false; } if (samplingSeed) { ctx.Error(pos) << "'Repeatable' keyword is not supported for subqueries"; return false; } return SetSamplingRate(ctx, sampleClause, samplingRate); } bool IsStream() const override { return Subquery->GetSource()->IsStream(); } void DoUpdateState() const override { State.Set(ENodeState::Const, true); } TAstNode* Translate(TContext& ctx) const override { Y_DEBUG_ABORT_UNLESS(Node); return Node->Translate(ctx); } TPtr DoClone() const final { return new TSubqueryRefNode(Subquery, Alias, TupleIndex); } protected: TNodePtr Subquery; const TString Alias; const int TupleIndex; TNodePtr Node; }; TNodePtr BuildSubqueryRef(TNodePtr subquery, const TString& alias, int tupleIndex) { return new TSubqueryRefNode(std::move(subquery), alias, tupleIndex); } class TInvalidSubqueryRefNode: public ISource { public: TInvalidSubqueryRefNode(TPosition pos) : ISource(pos) , Pos(pos) { } bool DoInit(TContext& ctx, ISource* src) override { Y_UNUSED(src); ctx.Error(Pos) << "Named subquery can not be used as a top level statement in libraries"; return false; } TNodePtr Build(TContext& ctx) override { Y_UNUSED(ctx); return {}; } TPtr DoClone() const final { return new TInvalidSubqueryRefNode(Pos); } protected: const TPosition Pos; }; TNodePtr BuildInvalidSubqueryRef(TPosition subqueryPos) { return new TInvalidSubqueryRefNode(subqueryPos); } class TTableSource: public IRealSource { public: TTableSource(TPosition pos, const TTableRef& table, const TString& label) : IRealSource(pos) , Table(table) , FakeSource(BuildFakeSource(pos)) { SetLabel(label.empty() ? Table.ShortName() : label); } void GetInputTables(TTableList& tableList) const override { tableList.push_back(Table); ISource::GetInputTables(tableList); } bool ShouldUseSourceAsColumn(const TString& source) const override { const auto& label = GetLabel(); return source && source != label && !(label.StartsWith(source) && label[source.size()] == ':'); } TMaybe AddColumn(TContext& ctx, TColumnNode& column) override { Columns.Add(column.GetColumnName(), column.GetCountHint(), column.IsArtificial(), column.IsReliable()); if (!IRealSource::AddColumn(ctx, column)) { return {}; } return false; } bool SetSamplingOptions( TContext& ctx, TPosition pos, ESampleClause sampleClause, ESampleMode mode, TNodePtr samplingRate, TNodePtr samplingSeed) override { Y_UNUSED(pos); TString modeName; if (!samplingSeed) { samplingSeed = Y("Int32", Q("0")); } if (ESampleClause::Sample == sampleClause) { YQL_ENSURE(ESampleMode::Bernoulli == mode, "Internal logic error"); } switch (mode) { case ESampleMode::Bernoulli: modeName = "bernoulli"; break; case ESampleMode::System: modeName = "system"; break; } if (!samplingRate->Init(ctx, FakeSource.Get())) { return false; } samplingRate = PrepareSamplingRate(pos, sampleClause, samplingRate); auto sampleSettings = Q(Y(Q(modeName), Y("EvaluateAtom", Y("ToString", samplingRate)), Y("EvaluateAtom", Y("ToString", samplingSeed)))); auto sampleOption = Q(Y(Q("sample"), sampleSettings)); if (Table.Options) { if (!Table.Options->Init(ctx, this)) { return false; } Table.Options = L(Table.Options, sampleOption); } else { Table.Options = Y(sampleOption); } return true; } bool SetTableHints(TContext& ctx, TPosition pos, const TTableHints& hints, const TTableHints& contextHints) override { Y_UNUSED(ctx); TTableHints merged = contextHints; MergeHints(merged, hints); Table.Options = BuildInputOptions(pos, merged); return true; } bool SetViewName(TContext& ctx, TPosition pos, const TString& view) override { return Table.Keys->SetViewName(ctx, pos, view); } TNodePtr Build(TContext& ctx) override { if (!Table.Keys->Init(ctx, nullptr)) { return nullptr; } return AstNode(Table.RefName); } bool IsStream() const override { return IsStreamingService(Table.Service); } TPtr DoClone() const final { return new TTableSource(Pos, Table, GetLabel()); } bool IsTableSource() const override { return true; } protected: TTableRef Table; private: const TSourcePtr FakeSource; }; TSourcePtr BuildTableSource(TPosition pos, const TTableRef& table, const TString& label) { return new TTableSource(pos, table, label); } class TInnerSource: public IProxySource { public: TInnerSource(TPosition pos, TNodePtr node, const TString& service, const TDeferredAtom& cluster, const TString& label) : IProxySource(pos, nullptr) , Node(node) , Service(service) , Cluster(cluster) { SetLabel(label); } bool SetSamplingOptions(TContext& ctx, TPosition pos, ESampleClause sampleClause, ESampleMode mode, TNodePtr samplingRate, TNodePtr samplingSeed) override { Y_UNUSED(ctx); SamplingPos = pos; SamplingClause = sampleClause; SamplingMode = mode; SamplingRate = samplingRate; SamplingSeed = samplingSeed; return true; } bool SetTableHints(TContext& ctx, TPosition pos, const TTableHints& hints, const TTableHints& contextHints) override { Y_UNUSED(ctx); HintsPos = pos; Hints = hints; ContextHints = contextHints; return true; } bool SetViewName(TContext& ctx, TPosition pos, const TString& view) override { Y_UNUSED(ctx); ViewPos = pos; View = view; return true; } bool ShouldUseSourceAsColumn(const TString& source) const override { return source && source != GetLabel(); } TMaybe AddColumn(TContext& ctx, TColumnNode& column) override { if (const TString* columnName = column.GetColumnName()) { if (columnName && IsExprAlias(*columnName)) { return true; } } return IProxySource::AddColumn(ctx, column); } bool DoInit(TContext& ctx, ISource* initSrc) override { Y_UNUSED(initSrc); auto source = Node->GetSource(); if (!source) { NewSource = TryMakeSourceFromExpression(Pos, ctx, Service, Cluster, Node); source = NewSource.Get(); } if (!source) { ctx.Error(Pos) << "Invalid inner source node"; return false; } if (SamplingPos) { if (!source->SetSamplingOptions(ctx, *SamplingPos, SamplingClause, SamplingMode, SamplingRate, SamplingSeed)) { return false; } } if (ViewPos) { if (!source->SetViewName(ctx, *ViewPos, View)) { return false; } } if (HintsPos) { if (!source->SetTableHints(ctx, *HintsPos, Hints, ContextHints)) { return false; } } source->SetLabel(Label); if (!NewSource) { Node->UseAsInner(); if (!Node->Init(ctx, nullptr)) { return false; } } SetSource(source); if (NewSource && !NewSource->Init(ctx, nullptr)) { return false; } return ISource::DoInit(ctx, source); } TNodePtr Build(TContext& ctx) override { Y_UNUSED(ctx); return NewSource ? NewSource->Build(ctx) : Node; } bool IsStream() const override { auto source = Node->GetSource(); if (source) { return source->IsStream(); } // NewSource will be built later in DoInit->TryMakeSourceFromExpression // where Service will be used in all situations // let's detect IsStream by Service value return IsStreamingService(Service); } TPtr DoClone() const final { return new TInnerSource(Pos, SafeClone(Node), Service, Cluster, GetLabel()); } protected: TNodePtr Node; TString Service; TDeferredAtom Cluster; TSourcePtr NewSource; private: TMaybe SamplingPos; ESampleClause SamplingClause; ESampleMode SamplingMode; TNodePtr SamplingRate; TNodePtr SamplingSeed; TMaybe ViewPos; TString View; TMaybe HintsPos; TTableHints Hints; TTableHints ContextHints; }; TSourcePtr BuildInnerSource(TPosition pos, TNodePtr node, const TString& service, const TDeferredAtom& cluster, const TString& label) { return new TInnerSource(pos, node, service, cluster, label); } static bool IsComparableExpression(TContext& ctx, const TNodePtr& expr, bool assume, const char* sqlConstruction) { if (assume && !expr->IsPlainColumn()) { ctx.Error(expr->GetPos()) << "Only column names can be used in " << sqlConstruction; return false; } if (expr->IsConstant()) { ctx.Error(expr->GetPos()) << "Unable to " << sqlConstruction << " constant expression"; return false; } if (expr->IsAggregated() && !expr->HasState(ENodeState::AggregationKey)) { ctx.Error(expr->GetPos()) << "Unable to " << sqlConstruction << " aggregated values"; return false; } if (expr->IsPlainColumn()) { return true; } if (expr->GetOpName().empty()) { ctx.Error(expr->GetPos()) << "You should use in " << sqlConstruction << " column name, qualified field, callable function or expression"; return false; } return true; } /// \todo move to reduce.cpp? or mapreduce.cpp? class TReduceSource: public IRealSource { public: TReduceSource(TPosition pos, ReduceMode mode, TSourcePtr source, TVector&& orderBy, TVector&& keys, TVector&& args, TNodePtr udf, TNodePtr having, const TWriteSettings& settings, const TVector& assumeOrderBy, bool listCall) : IRealSource(pos) , Mode(mode) , Source(std::move(source)) , OrderBy(std::move(orderBy)) , Keys(std::move(keys)) , Args(std::move(args)) , Udf(udf) , Having(having) , Settings(settings) , AssumeOrderBy(assumeOrderBy) , ListCall(listCall) { YQL_ENSURE(!Keys.empty()); YQL_ENSURE(Source); } void GetInputTables(TTableList& tableList) const override { Source->GetInputTables(tableList); ISource::GetInputTables(tableList); } bool DoInit(TContext& ctx, ISource* src) final { if (AsInner) { Source->UseAsInner(); } YQL_ENSURE(!src); if (!Source->Init(ctx, src)) { return false; } if (!Source->InitFilters(ctx)) { return false; } src = Source.Get(); for (auto& key: Keys) { if (!key->Init(ctx, src)) { return false; } auto keyNamePtr = key->GetColumnName(); YQL_ENSURE(keyNamePtr); if (!src->AddGroupKey(ctx, *keyNamePtr)) { return false; } } if (Having && !Having->Init(ctx, nullptr)) { return false; } /// SIN: verify reduce one argument if (Args.size() != 1) { ctx.Error(Pos) << "REDUCE requires exactly one UDF argument"; return false; } if (!Args[0]->Init(ctx, src)) { return false; } for (auto orderSpec: OrderBy) { if (!orderSpec->OrderExpr->Init(ctx, src)) { return false; } } if (!Udf->Init(ctx, src)) { return false; } if (Udf->GetLabel().empty()) { Columns.SetAll(); } else { Columns.Add(&Udf->GetLabel(), false); } const auto label = GetLabel(); for (const auto& sortSpec: AssumeOrderBy) { auto& expr = sortSpec->OrderExpr; SetLabel(Source->GetLabel()); if (!expr->Init(ctx, this)) { return false; } if (!IsComparableExpression(ctx, expr, true, "ASSUME ORDER BY")) { return false; } } SetLabel(label); return true; } TNodePtr Build(TContext& ctx) final { auto input = Source->Build(ctx); if (!input) { return nullptr; } auto keysTuple = Y(); if (Keys.size() == 1) { keysTuple = Y("Member", "row", BuildQuotedAtom(Pos, *Keys.back()->GetColumnName())); } else { for (const auto& key: Keys) { keysTuple = L(keysTuple, Y("Member", "row", BuildQuotedAtom(Pos, *key->GetColumnName()))); } keysTuple = Q(keysTuple); } auto extractKey = Y("SqlExtractKey", "row", BuildLambda(Pos, Y("row"), keysTuple)); auto extractKeyLambda = BuildLambda(Pos, Y("row"), extractKey); TNodePtr processPartitions; if (ListCall) { if (Mode != ReduceMode::ByAll) { ctx.Error(Pos) << "TableRows() must be used only with USING ALL"; return nullptr; } TNodePtr expr = BuildAtom(Pos, "partitionStream"); processPartitions = Y("SqlReduce", "partitionStream", BuildQuotedAtom(Pos, "byAllList", TNodeFlags::Default), Udf, expr); } else { switch (Mode) { case ReduceMode::ByAll: { auto columnPtr = Args[0]->GetColumnName(); TNodePtr expr = BuildAtom(Pos, "partitionStream"); if (!columnPtr || *columnPtr != "*") { expr = Y("Map", "partitionStream", BuildLambda(Pos, Y("keyPair"), Q(L(Y(),\ Y("Nth", "keyPair", Q(ToString("0"))),\ Y("Map", Y("Nth", "keyPair", Q(ToString("1"))), BuildLambda(Pos, Y("row"), Args[0])))))); } processPartitions = Y("SqlReduce", "partitionStream", BuildQuotedAtom(Pos, "byAll", TNodeFlags::Default), Udf, expr); break; } case ReduceMode::ByPartition: { processPartitions = Y("SqlReduce", "partitionStream", extractKeyLambda, Udf, BuildLambda(Pos, Y("row"), Args[0])); break; } default: YQL_ENSURE(false, "Unexpected REDUCE mode"); } } TNodePtr sortDirection; TNodePtr sortKeySelector; FillSortParts(OrderBy, sortDirection, sortKeySelector); if (!OrderBy.empty()) { sortKeySelector = BuildLambda(Pos, Y("row"), Y("SqlExtractKey", "row", sortKeySelector)); } auto partitionByKey = Y(!ListCall && Mode == ReduceMode::ByAll ? "PartitionByKey" : "PartitionsByKeys", "core", extractKeyLambda, sortDirection, sortKeySelector, BuildLambda(Pos, Y("partitionStream"), processPartitions)); auto inputLabel = ListCall ? "inputRowsList" : "core"; auto block(Y(Y("let", inputLabel, input))); auto filter = Source->BuildFilter(ctx, inputLabel); if (filter) { block = L(block, Y("let", inputLabel, filter)); } if (ListCall) { block = L(block, Y("let", "core", "inputRowsList")); } if (ctx.EnableSystemColumns) { block = L(block, Y("let", "core", Y("RemoveSystemMembers", "core"))); } block = L(block, Y("let", "core", Y("AutoDemux", partitionByKey))); if (Having) { block = L(block, Y("let", "core", Y("Filter", "core", BuildLambda(Pos, Y("row"), Y("Coalesce", Having, Y("Bool", Q("false"))))) )); } return Y("block", Q(L(block, Y("return", "core")))); } TNodePtr BuildSort(TContext& ctx, const TString& label) override { Y_UNUSED(ctx); if (AssumeOrderBy.empty()) { return nullptr; } return Y("let", label, BuildSortSpec(AssumeOrderBy, label, false, true)); } EOrderKind GetOrderKind() const override { return AssumeOrderBy.empty() ? EOrderKind::None : EOrderKind::Assume; } TWriteSettings GetWriteSettings() const final { return Settings; } bool HasSelectResult() const final { return !Settings.Discard; } TPtr DoClone() const final { return new TReduceSource(Pos, Mode, Source->CloneSource(), CloneContainer(OrderBy), CloneContainer(Keys), CloneContainer(Args), SafeClone(Udf), SafeClone(Having), Settings, CloneContainer(AssumeOrderBy), ListCall); } private: ReduceMode Mode; TSourcePtr Source; TVector OrderBy; TVector Keys; TVector Args; TNodePtr Udf; TNodePtr Having; const TWriteSettings Settings; TVector AssumeOrderBy; const bool ListCall; }; TSourcePtr BuildReduce(TPosition pos, ReduceMode mode, TSourcePtr source, TVector&& orderBy, TVector&& keys, TVector&& args, TNodePtr udf, TNodePtr having, const TWriteSettings& settings, const TVector& assumeOrderBy, bool listCall) { return new TReduceSource(pos, mode, std::move(source), std::move(orderBy), std::move(keys), std::move(args), udf, having, settings, assumeOrderBy, listCall); } namespace { bool InitAndGetGroupKey(TContext& ctx, const TNodePtr& expr, ISource* src, TStringBuf where, TString& keyColumn) { keyColumn.clear(); YQL_ENSURE(src); const bool isJoin = src->GetJoin(); if (!expr->Init(ctx, src)) { return false; } auto keyNamePtr = expr->GetColumnName(); if (keyNamePtr && expr->GetLabel().empty()) { keyColumn = *keyNamePtr; auto sourceNamePtr = expr->GetSourceName(); auto columnNode = expr->GetColumnNode(); if (isJoin && (!columnNode || !columnNode->IsArtificial())) { if (!sourceNamePtr || sourceNamePtr->empty()) { if (!src->IsAlias(EExprSeat::GroupBy, keyColumn)) { ctx.Error(expr->GetPos()) << "Columns in " << where << " should have correlation name, error in key: " << keyColumn; return false; } } else { keyColumn = DotJoin(*sourceNamePtr, keyColumn); } } } return true; } } class TCompositeSelect: public IRealSource { public: TCompositeSelect(TPosition pos, TSourcePtr source, TSourcePtr originalSource, const TWriteSettings& settings) : IRealSource(pos) , Source(std::move(source)) , OriginalSource(std::move(originalSource)) , Settings(settings) { YQL_ENSURE(Source); } void SetSubselects(TVector&& subselects, TVector&& grouping, TVector&& groupByExpr) { Subselects = std::move(subselects); Grouping = std::move(grouping); GroupByExpr = std::move(groupByExpr); Y_DEBUG_ABORT_UNLESS(Subselects.size() > 1); } void GetInputTables(TTableList& tableList) const override { for (const auto& select: Subselects) { select->GetInputTables(tableList); } ISource::GetInputTables(tableList); } bool DoInit(TContext& ctx, ISource* src) override { if (AsInner) { Source->UseAsInner(); } if (src) { src->AddDependentSource(Source.Get()); } if (!Source->Init(ctx, src)) { return false; } if (!Source->InitFilters(ctx)) { return false; } if (!CalculateGroupingCols(ctx, src)) { return false; } auto origSrc = OriginalSource.Get(); if (!origSrc->Init(ctx, src)) { return false; } if (origSrc->IsFlattenByColumns() || origSrc->IsFlattenColumns()) { Flatten = origSrc->IsFlattenByColumns() ? origSrc->BuildFlattenByColumns("row") : origSrc->BuildFlattenColumns("row"); if (!Flatten || !Flatten->Init(ctx, src)) { return false; } } if (origSrc->IsFlattenByExprs()) { for (auto& expr : static_cast(origSrc)->Expressions(EExprSeat::FlattenByExpr)) { if (!expr->Init(ctx, origSrc)) { return false; } } PreFlattenMap = origSrc->BuildPreFlattenMap(ctx); if (!PreFlattenMap) { return false; } } for (const auto& select: Subselects) { select->SetLabel(Label); if (AsInner) { select->UseAsInner(); } if (!select->Init(ctx, Source.Get())) { return false; } } TMaybe groupingColumnsCount; size_t idx = 0; for (const auto& select : Subselects) { size_t count = select->GetGroupingColumnsCount(); if (!groupingColumnsCount.Defined()) { groupingColumnsCount = count; } else if (*groupingColumnsCount != count) { ctx.Error(select->GetPos()) << TStringBuilder() << "Mismatch GROUPING() column count in composite select input #" << idx << ": expected " << *groupingColumnsCount << ", got: " << count << ". Please submit bug report"; return false; } ++idx; } return true; } TMaybe AddColumn(TContext& ctx, TColumnNode& column) override { for (const auto& select: Subselects) { if (!select->AddColumn(ctx, column)) { return {}; } } return true; } TNodePtr Build(TContext& ctx) override { auto input = Source->Build(ctx); auto block(Y(Y("let", "composite", input))); bool ordered = ctx.UseUnordered(*this); if (PreFlattenMap) { block = L(block, Y("let", "composite", Y(ordered ? "OrderedFlatMap" : "FlatMap", "composite", BuildLambda(Pos, Y("row"), PreFlattenMap)))); } if (Flatten) { block = L(block, Y("let", "composite", Y(ordered ? "OrderedFlatMap" : "FlatMap", "composite", BuildLambda(Pos, Y("row"), Flatten, "res")))); } auto filter = Source->BuildFilter(ctx, "composite"); if (filter) { block = L(block, Y("let", "composite", filter)); } TNodePtr compositeNode = Y("UnionAll"); for (const auto& select: Subselects) { YQL_ENSURE(dynamic_cast(select.Get())); auto addNode = select->Build(ctx); if (!addNode) { return nullptr; } compositeNode->Add(addNode); } block = L(block, Y("let", "core", compositeNode)); YQL_ENSURE(!Subselects.empty()); dynamic_cast(Subselects.front().Get())->BuildProjectWindowDistinct(block, ctx, false); return Y("block", Q(L(block, Y("return", "core")))); } bool IsGroupByColumn(const TString& column) const override { YQL_ENSURE(!GroupingCols.empty()); return GroupingCols.contains(column); } const TSet& GetGroupingCols() const { return GroupingCols; } TNodePtr BuildSort(TContext& ctx, const TString& label) override { return Subselects.front()->BuildSort(ctx, label); } EOrderKind GetOrderKind() const override { return Subselects.front()->GetOrderKind(); } const TColumns* GetColumns() const override{ return Subselects.front()->GetColumns(); } ISource* RealSource() const { return Source.Get(); } TWriteSettings GetWriteSettings() const override { return Settings; } bool HasSelectResult() const override { return !Settings.Discard; } TNodePtr DoClone() const final { auto newSource = MakeIntrusive(Pos, Source->CloneSource(), OriginalSource->CloneSource(), Settings); newSource->SetSubselects(CloneContainer(Subselects), CloneContainer(Grouping), CloneContainer(GroupByExpr)); return newSource; } private: bool CalculateGroupingCols(TContext& ctx, ISource* initSrc) { auto origSrc = OriginalSource->CloneSource(); if (!origSrc->Init(ctx, initSrc)) { return false; } bool hasError = false; for (auto& expr: GroupByExpr) { if (!expr->Init(ctx, origSrc.Get()) || !IsComparableExpression(ctx, expr, false, "GROUP BY")) { hasError = true; } } if (!origSrc->AddExpressions(ctx, GroupByExpr, EExprSeat::GroupBy)) { hasError = true; } YQL_ENSURE(!Grouping.empty()); for (auto& grouping : Grouping) { TString keyColumn; if (!InitAndGetGroupKey(ctx, grouping, origSrc.Get(), "grouping sets", keyColumn)) { hasError = true; } else if (!keyColumn.empty()) { GroupingCols.insert(keyColumn); } } return !hasError; } TSourcePtr Source; TSourcePtr OriginalSource; TNodePtr Flatten; TNodePtr PreFlattenMap; const TWriteSettings Settings; TVector Subselects; TVector Grouping; TVector GroupByExpr; TSet GroupingCols; }; namespace { TString FullColumnName(const TColumnNode& column) { YQL_ENSURE(column.GetColumnName()); TString columnName = *column.GetColumnName(); if (column.IsUseSource()) { columnName = DotJoin(*column.GetSourceName(), columnName); } return columnName; } } /// \todo simplify class class TSelectCore: public IRealSource, public IComposableSource { public: TSelectCore( TPosition pos, TSourcePtr source, const TVector& groupByExpr, const TVector& groupBy, bool compactGroupBy, const TString& groupBySuffix, bool assumeSorted, const TVector& orderBy, TNodePtr having, const TWinSpecs& winSpecs, TLegacyHoppingWindowSpecPtr legacyHoppingWindowSpec, const TVector& terms, bool distinct, const TVector& without, bool selectStream, const TWriteSettings& settings, TColumnsSets&& uniqueSets, TColumnsSets&& distinctSets ) : IRealSource(pos) , Source(std::move(source)) , GroupByExpr(groupByExpr) , GroupBy(groupBy) , AssumeSorted(assumeSorted) , CompactGroupBy(compactGroupBy) , GroupBySuffix(groupBySuffix) , OrderBy(orderBy) , Having(having) , WinSpecs(winSpecs) , Terms(terms) , Without(without) , Distinct(distinct) , LegacyHoppingWindowSpec(legacyHoppingWindowSpec) , SelectStream(selectStream) , Settings(settings) , UniqueSets(std::move(uniqueSets)) , DistinctSets(std::move(distinctSets)) { } void AllColumns() override { if (!OrderByInit) { Columns.SetAll(); } } void GetInputTables(TTableList& tableList) const override { Source->GetInputTables(tableList); ISource::GetInputTables(tableList); } size_t GetGroupingColumnsCount() const override { return Source->GetGroupingColumnsCount(); } bool DoInit(TContext& ctx, ISource* initSrc) override { if (AsInner) { Source->UseAsInner(); } if (!Source->Init(ctx, initSrc)) { return false; } if (SelectStream && !Source->IsStream()) { ctx.Error(Pos) << "SELECT STREAM is unsupported for non-streaming sources"; return false; } auto src = Source.Get(); bool hasError = false; if (src->IsFlattenByExprs()) { for (auto& expr : static_cast(src)->Expressions(EExprSeat::FlattenByExpr)) { if (!expr->Init(ctx, src)) { hasError = true; continue; } } } if (hasError) { return false; } src->SetCompactGroupBy(CompactGroupBy); src->SetGroupBySuffix(GroupBySuffix); for (auto& term: Terms) { term->CollectPreaggregateExprs(ctx, *src, DistinctAggrExpr); } if (Having) { Having->CollectPreaggregateExprs(ctx, *src, DistinctAggrExpr); } for (auto& expr: GroupByExpr) { if (auto sessionWindow = dynamic_cast(expr.Get())) { if (Source->IsStream()) { ctx.Error(Pos) << "SessionWindow is unsupported for streaming sources"; return false; } sessionWindow->MarkValid(); } if (auto hoppingWindow = dynamic_cast(expr.Get())) { hoppingWindow->MarkValid(); } // need to collect and Init() preaggregated exprs before calling Init() on GROUP BY expression TVector distinctAggrsInGroupBy; expr->CollectPreaggregateExprs(ctx, *src, distinctAggrsInGroupBy); for (auto& distinct : distinctAggrsInGroupBy) { if (!distinct->Init(ctx, src)) { return false; } } DistinctAggrExpr.insert(DistinctAggrExpr.end(), distinctAggrsInGroupBy.begin(), distinctAggrsInGroupBy.end()); if (!expr->Init(ctx, src) || !IsComparableExpression(ctx, expr, false, "GROUP BY")) { hasError = true; } } if (hasError || !src->AddExpressions(ctx, GroupByExpr, EExprSeat::GroupBy)) { return false; } for (auto& expr: DistinctAggrExpr) { if (!expr->Init(ctx, src)) { hasError = true; } } if (hasError || !src->AddExpressions(ctx, DistinctAggrExpr, EExprSeat::DistinctAggr)) { return false; } /// grouped expressions are available in filters if (!Source->InitFilters(ctx)) { return false; } for (auto& expr: GroupBy) { TString usedColumn; if (!InitAndGetGroupKey(ctx, expr, src, "GROUP BY", usedColumn)) { hasError = true; } else if (usedColumn) { if (!src->AddGroupKey(ctx, usedColumn)) { hasError = true; } } } if (hasError) { return false; } if (Having && !Having->Init(ctx, src)) { return false; } src->AddWindowSpecs(WinSpecs); const bool isJoin = Source->GetJoin(); if (!InitSelect(ctx, src, isJoin, hasError)) { return false; } src->FinishColumns(); auto aggRes = src->BuildAggregation("core", ctx); if (!aggRes.second) { return false; } Aggregate = aggRes.first; if (src->IsFlattenByColumns() || src->IsFlattenColumns()) { Flatten = src->IsFlattenByColumns() ? src->BuildFlattenByColumns("row") : src->BuildFlattenColumns("row"); if (!Flatten || !Flatten->Init(ctx, src)) { return false; } } if (src->IsFlattenByExprs()) { PreFlattenMap = src->BuildPreFlattenMap(ctx); if (!PreFlattenMap) { return false; } } if (GroupByExpr || DistinctAggrExpr) { PreaggregatedMap = src->BuildPreaggregatedMap(ctx); if (!PreaggregatedMap) { return false; } } if (Aggregate) { if (!Aggregate->Init(ctx, src)) { return false; } if (Having) { Aggregate = Y( "Filter", Aggregate, BuildLambda(Pos, Y("row"), Y("Coalesce", Having, Y("Bool", Q("false")))) ); } } else if (Having) { if (Distinct) { Aggregate = Y( "Filter", "core", BuildLambda(Pos, Y("row"), Y("Coalesce", Having, Y("Bool", Q("false")))) ); ctx.Warning(Having->GetPos(), TIssuesIds::YQL_HAVING_WITHOUT_AGGREGATION_IN_SELECT_DISTINCT) << "The usage of HAVING without aggregations with SELECT DISTINCT is non-standard and will stop working soon. Please use WHERE instead."; } else { ctx.Error(Having->GetPos()) << "HAVING with meaning GROUP BY () should be with aggregation function."; return false; } } else if (!Distinct && !GroupBy.empty()) { ctx.Error(Pos) << "No aggregations were specified"; return false; } if (hasError) { return false; } if (src->IsCalcOverWindow()) { if (src->IsExprSeat(EExprSeat::WindowPartitionBy, EExprType::WithExpression)) { PrewindowMap = src->BuildPrewindowMap(ctx); if (!PrewindowMap) { return false; } } CalcOverWindow = src->BuildCalcOverWindow(ctx, "core"); if (!CalcOverWindow || !CalcOverWindow->Init(ctx, src)) { return false; } } return true; } TNodePtr Build(TContext& ctx) override { auto input = Source->Build(ctx); if (!input) { return nullptr; } auto block(Y(Y("let", "core", input))); if (Source->HasMatchRecognize()) { if (auto matchRecognize = Source->BuildMatchRecognize(ctx, "core")) { //use unique name match_recognize to find this block easily in unit tests block = L(block, Y("let", "match_recognize", matchRecognize)); //then bind to the conventional name block = L(block, Y("let", "core", "match_recognize")); } else { return nullptr; } } bool ordered = ctx.UseUnordered(*this); if (PreFlattenMap) { block = L(block, Y("let", "core", Y(ordered ? "OrderedFlatMap" : "FlatMap", "core", BuildLambda(Pos, Y("row"), PreFlattenMap)))); } if (Flatten) { block = L(block, Y("let", "core", Y(ordered ? "OrderedFlatMap" : "FlatMap", "core", BuildLambda(Pos, Y("row"), Flatten, "res")))); } if (PreaggregatedMap) { block = L(block, Y("let", "core", PreaggregatedMap)); if (Source->IsCompositeSource() && !Columns.QualifiedAll) { block = L(block, Y("let", "preaggregated", "core")); } } else if (Source->IsCompositeSource() && !Columns.QualifiedAll) { block = L(block, Y("let", "origcore", "core")); } auto filter = Source->BuildFilter(ctx, "core"); if (filter) { block = L(block, Y("let", "core", filter)); } if (Aggregate) { block = L(block, Y("let", "core", Aggregate)); ordered = false; } const bool haveCompositeTerms = Source->IsCompositeSource() && !Columns.All && !Columns.QualifiedAll && !Columns.List.empty(); if (haveCompositeTerms) { // column order does not matter here - it will be set in projection YQL_ENSURE(Aggregate); block = L(block, Y("let", "core", Y("Map", "core", BuildLambda(Pos, Y("row"), CompositeTerms, "row")))); } if (auto grouping = Source->BuildGroupingColumns("core")) { block = L(block, Y("let", "core", grouping)); } if (!Source->GetCompositeSource()) { BuildProjectWindowDistinct(block, ctx, ordered); } return Y("block", Q(L(block, Y("return", "core")))); } void BuildProjectWindowDistinct(TNodePtr& block, TContext& ctx, bool ordered) override { if (PrewindowMap) { block = L(block, Y("let", "core", PrewindowMap)); } if (CalcOverWindow) { block = L(block, Y("let", "core", CalcOverWindow)); } block = L(block, Y("let", "core", Y("PersistableRepr", BuildSqlProject(ctx, ordered)))); if (Distinct) { block = L(block, Y("let", "core", Y("PersistableRepr", Y("SqlAggregateAll", Y("RemoveSystemMembers", "core"))))); } } TNodePtr BuildSort(TContext& ctx, const TString& label) override { Y_UNUSED(ctx); if (OrderBy.empty() || DisableSort_) { return nullptr; } auto sorted = BuildSortSpec(OrderBy, label, false, AssumeSorted); if (ExtraSortColumns.empty()) { return Y("let", label, sorted); } auto body = Y(); for (const auto& [column, _] : ExtraSortColumns) { body = L(body, Y("let", "row", Y("RemoveMember", "row", Q(column)))); } body = L(body, Y("let", "res", "row")); return Y("let", label, Y("OrderedMap", sorted, BuildLambda(Pos, Y("row"), body, "res"))); } TNodePtr BuildCleanupColumns(TContext& ctx, const TString& label) override { TNodePtr cleanup; if (ctx.EnableSystemColumns && ctx.Settings.Mode != NSQLTranslation::ESqlMode::LIMITED_VIEW) { if (Columns.All) { cleanup = Y("let", label, Y("RemoveSystemMembers", label)); } else if (!Columns.List.empty()) { const bool isJoin = Source->GetJoin(); if (!isJoin && Columns.QualifiedAll) { if (ctx.SimpleColumns) { cleanup = Y("let", label, Y("RemoveSystemMembers", label)); } else { TNodePtr members; for (auto& term: Terms) { if (term->IsAsterisk()) { auto sourceName = term->GetSourceName(); YQL_ENSURE(*sourceName && !sourceName->empty()); auto prefix = *sourceName + "._yql_"; members = members ? L(members, Q(prefix)) : Y(Q(prefix)); } } if (members) { cleanup = Y("let", label, Y("RemovePrefixMembers", label, Q(members))); } } } } } return cleanup; } bool IsSelect() const override { return true; } bool HasSelectResult() const override { return !Settings.Discard; } bool IsStream() const override { return Source->IsStream(); } EOrderKind GetOrderKind() const override { if (OrderBy.empty()) { return EOrderKind::None; } return AssumeSorted ? EOrderKind::Assume : EOrderKind::Sort; } TWriteSettings GetWriteSettings() const override { return Settings; } TMaybe AddColumn(TContext& ctx, TColumnNode& column) override { const bool aggregated = Source->HasAggregations() || Distinct; if (OrderByInit && (Source->GetJoin() || !aggregated)) { // ORDER BY will try to find column not only in projection items, but also in Source. // ```SELECT a, b FROM T ORDER BY c``` should work if c is present in T const bool reliable = column.IsReliable(); column.SetAsNotReliable(); auto maybeExist = IRealSource::AddColumn(ctx, column); if (reliable && !Source->GetJoin()) { column.ResetAsReliable(); } if (!maybeExist || !maybeExist.GetRef()) { maybeExist = Source->AddColumn(ctx, column); } if (!maybeExist.Defined()) { return maybeExist; } if (!DisableSort_ && !aggregated && column.GetColumnName() && IsMissingInProjection(ctx, column)) { ExtraSortColumns[FullColumnName(column)] = &column; } return maybeExist; } return IRealSource::AddColumn(ctx, column); } bool IsMissingInProjection(TContext& ctx, const TColumnNode& column) const { TString columnName = FullColumnName(column); if (Columns.Real.contains(columnName) || Columns.Artificial.contains(columnName)) { return false; } if (!ctx.SimpleColumns && Columns.QualifiedAll && !columnName.Contains('.')) { return false; } if (!Columns.IsColumnPossible(ctx, columnName)) { return true; } for (auto without: Without) { auto name = *without->GetColumnName(); if (Source && Source->GetJoin()) { name = DotJoin(*without->GetSourceName(), name); } if (name == columnName) { return true; } } return false; } TNodePtr PrepareWithout(const TNodePtr& base) { auto terms = base; if (Without) { for (auto without: Without) { auto name = *without->GetColumnName(); if (Source && Source->GetJoin()) { name = DotJoin(*without->GetSourceName(), name); } terms = L(terms, Y("let", "row", Y("RemoveMember", "row", Q(name)))); } } if (Source) { for (auto column : Source->GetTmpWindowColumns()) { terms = L(terms, Y("let", "row", Y("RemoveMember", "row", Q(column)))); } } return terms; } TNodePtr DoClone() const final { return new TSelectCore(Pos, Source->CloneSource(), CloneContainer(GroupByExpr), CloneContainer(GroupBy), CompactGroupBy, GroupBySuffix, AssumeSorted, CloneContainer(OrderBy), SafeClone(Having), CloneContainer(WinSpecs), SafeClone(LegacyHoppingWindowSpec), CloneContainer(Terms), Distinct, Without, SelectStream, Settings, TColumnsSets(UniqueSets), TColumnsSets(DistinctSets)); } private: bool InitSelect(TContext& ctx, ISource* src, bool isJoin, bool& hasError) { for (auto& [name, winSpec] : WinSpecs) { for (size_t i = 0; i < winSpec->Partitions.size(); ++i) { auto partitionNode = winSpec->Partitions[i]; if (auto sessionWindow = dynamic_cast(partitionNode.Get())) { if (winSpec->Session) { ctx.Error(partitionNode->GetPos()) << "Duplicate session window specification:"; ctx.Error(winSpec->Session->GetPos()) << "Previous session window is declared here"; hasError = true; continue; } sessionWindow->MarkValid(); winSpec->Session = partitionNode; } if (!partitionNode->Init(ctx, src)) { hasError = true; continue; } if (!partitionNode->GetLabel() && !partitionNode->GetColumnName()) { TString label = TStringBuilder() << "group_" << name << "_" << i; partitionNode->SetLabel(label); src->AddTmpWindowColumn(label); } } if (!src->AddExpressions(ctx, winSpec->Partitions, EExprSeat::WindowPartitionBy)) { hasError = true; } } if (LegacyHoppingWindowSpec) { if (!LegacyHoppingWindowSpec->TimeExtractor->Init(ctx, src)) { hasError = true; } src->SetLegacyHoppingWindowSpec(LegacyHoppingWindowSpec); } for (auto& term: Terms) { if (!term->Init(ctx, src)) { hasError = true; continue; } auto column = term->GetColumnName(); TString label(term->GetLabel()); bool hasName = true; if (label.empty()) { auto source = term->GetSourceName(); if (term->IsAsterisk() && !source->empty()) { Columns.QualifiedAll = true; label = DotJoin(*source, "*"); } else if (column) { label = isJoin && source && *source ? DotJoin(*source, *column) : *column; } else { label = Columns.AddUnnamed(); hasName = false; if (ctx.WarnUnnamedColumns) { ctx.Warning(term->GetPos(), TIssuesIds::YQL_UNNAMED_COLUMN) << "Autogenerated column name " << label << " will be used for expression"; } } } if (hasName && !Columns.Add(&label, false, false, true)) { ctx.Error(Pos) << "Duplicate column: " << label; hasError = true; } } CompositeTerms = Y(); if (!hasError && Source->IsCompositeSource() && !Columns.All && !Columns.QualifiedAll && !Columns.List.empty()) { auto compositeSrcPtr = static_cast(Source->GetCompositeSource()); if (compositeSrcPtr) { const auto& groupings = compositeSrcPtr->GetGroupingCols(); for (const auto& column: groupings) { if (Source->IsGroupByColumn(column)) { continue; } const TString tableName = (GroupByExpr || DistinctAggrExpr) ? "preaggregated" : "origcore"; CompositeTerms = L(CompositeTerms, Y("let", "row", Y("AddMember", "row", BuildQuotedAtom(Pos, column), Y("Nothing", Y("MatchType", Y("StructMemberType", Y("ListItemType", Y("TypeOf", tableName)), Q(column)), Q("Optional"), Y("lambda", Q(Y("item")), "item"), Y("lambda", Q(Y("item")), Y("OptionalType", "item"))))))); } } } for (auto iter: WinSpecs) { auto winSpec = *iter.second; for (auto orderSpec: winSpec.OrderBy) { if (!orderSpec->OrderExpr->Init(ctx, src)) { hasError = true; } } } if (Columns.All || Columns.QualifiedAll) { Source->AllColumns(); } for (const auto& without: Without) { auto namePtr = without->GetColumnName(); auto sourcePtr = without->GetSourceName(); YQL_ENSURE(namePtr && *namePtr); if (isJoin && !(sourcePtr && *sourcePtr)) { ctx.Error(without->GetPos()) << "Expected correlation name for WITHOUT in JOIN"; hasError = true; continue; } } if (Having && !Having->Init(ctx, src)) { hasError = true; } if (!src->IsCompositeSource() && !Columns.All && src->HasAggregations()) { WarnIfAliasFromSelectIsUsedInGroupBy(ctx, Terms, GroupBy, GroupByExpr); /// verify select aggregation compatibility TVector exprs(Terms); if (Having) { exprs.push_back(Having); } for (const auto& iter: WinSpecs) { for (const auto& sortSpec: iter.second->OrderBy) { exprs.push_back(sortSpec->OrderExpr); } } if (!ValidateAllNodesForAggregation(ctx, exprs)) { hasError = true; } } const auto label = GetLabel(); for (const auto& sortSpec: OrderBy) { auto& expr = sortSpec->OrderExpr; SetLabel(Source->GetLabel()); OrderByInit = true; if (!expr->Init(ctx, this)) { hasError = true; continue; } OrderByInit = false; if (!IsComparableExpression(ctx, expr, AssumeSorted, AssumeSorted ? "ASSUME ORDER BY" : "ORDER BY")) { hasError = true; continue; } } SetLabel(label); return !hasError; } TNodePtr PrepareJoinCoalesce(TContext& ctx, const TNodePtr& base, bool multipleQualifiedAll, const TVector& coalesceLabels) { const bool isJoin = Source->GetJoin(); const bool needCoalesce = isJoin && ctx.SimpleColumns && (Columns.All || multipleQualifiedAll || ctx.CoalesceJoinKeysOnQualifiedAll); if (!needCoalesce) { return base; } auto terms = base; const auto& sameKeyMap = Source->GetJoin()->GetSameKeysMap(); if (sameKeyMap) { terms = L(terms, Y("let", "flatSameKeys", "row")); for (const auto& [key, sources]: sameKeyMap) { auto coalesceKeys = Y(); for (const auto& label : coalesceLabels) { if (sources.contains(label)) { coalesceKeys = L(coalesceKeys, Q(DotJoin(label, key))); } } terms = L(terms, Y("let", "flatSameKeys", Y("CoalesceMembers", "flatSameKeys", Q(coalesceKeys)))); } terms = L(terms, Y("let", "row", "flatSameKeys")); } return terms; } TNodePtr BuildSqlProject(TContext& ctx, bool ordered) { auto sqlProjectArgs = Y(); const bool isJoin = Source->GetJoin(); if (Columns.All) { YQL_ENSURE(Columns.List.empty()); auto terms = PrepareWithout(Y()); auto options = Y(); if (isJoin && ctx.SimpleColumns) { terms = PrepareJoinCoalesce(ctx, terms, false, Source->GetJoin()->GetJoinLabels()); auto members = Y(); for (auto& source : Source->GetJoin()->GetJoinLabels()) { YQL_ENSURE(!source.empty()); members = L(members, BuildQuotedAtom(Pos, source + ".")); } if (GroupByExpr.empty() || ctx.BogousStarInGroupByOverJoin) { terms = L(terms, Y("let", "res", Y("DivePrefixMembers", "row", Q(members)))); } else { auto groupExprStruct = Y("AsStruct"); for (auto node : GroupByExpr) { auto label = node->GetLabel(); YQL_ENSURE(label); if (Source->IsGroupByColumn(label)) { auto name = BuildQuotedAtom(Pos, label); groupExprStruct = L(groupExprStruct, Q(Y(name, Y("Member", "row", name)))); } } auto groupColumnsStruct = Y("DivePrefixMembers", "row", Q(members)); terms = L(terms, Y("let", "res", Y("FlattenMembers", Q(Y(BuildQuotedAtom(Pos, ""), groupExprStruct)), Q(Y(BuildQuotedAtom(Pos, ""), groupColumnsStruct))))); } options = L(options, Q(Y(Q("divePrefix"), Q(members)))); } else { terms = L(terms, Y("let", "res", "row")); } sqlProjectArgs = L(sqlProjectArgs, Y("SqlProjectStarItem", "projectCoreType", BuildQuotedAtom(Pos, ""), BuildLambda(Pos, Y("row"), terms, "res"), Q(options))); } else { YQL_ENSURE(!Columns.List.empty()); YQL_ENSURE(Columns.List.size() == Terms.size()); TVector coalesceLabels; bool multipleQualifiedAll = false; if (isJoin && ctx.SimpleColumns) { THashSet starTerms; for (auto& term: Terms) { if (term->IsAsterisk()) { auto sourceName = term->GetSourceName(); YQL_ENSURE(*sourceName && !sourceName->empty()); YQL_ENSURE(Columns.QualifiedAll); starTerms.insert(*sourceName); } } TVector matched; TVector unmatched; for (auto& label : Source->GetJoin()->GetJoinLabels()) { if (starTerms.contains(label)) { matched.push_back(label); } else { unmatched.push_back(label); } } coalesceLabels.insert(coalesceLabels.end(), matched.begin(), matched.end()); coalesceLabels.insert(coalesceLabels.end(), unmatched.begin(), unmatched.end()); multipleQualifiedAll = starTerms.size() > 1; } auto column = Columns.List.begin(); auto isNamedColumn = Columns.NamedColumns.begin(); for (auto& term: Terms) { auto sourceName = term->GetSourceName(); if (!term->IsAsterisk()) { auto body = Y(); body = L(body, Y("let", "res", term)); TPosition lambdaPos = Pos; TPosition aliasPos = Pos; if (term->IsImplicitLabel() && ctx.WarnOnAnsiAliasShadowing) { // TODO: recanonize for positions below lambdaPos = term->GetPos(); aliasPos = term->GetLabelPos() ? *term->GetLabelPos() : lambdaPos; } auto projectItem = Y("SqlProjectItem", "projectCoreType", BuildQuotedAtom(aliasPos, *isNamedColumn ? *column : ""), BuildLambda(lambdaPos, Y("row"), body, "res")); if (term->IsImplicitLabel() && ctx.WarnOnAnsiAliasShadowing) { projectItem = L(projectItem, Q(Y(Q(Y(Q("warnShadow")))))); } if (!*isNamedColumn) { projectItem = L(projectItem, Q(Y(Q(Y(Q("autoName")))))); } sqlProjectArgs = L(sqlProjectArgs, projectItem); } else { auto terms = PrepareWithout(Y()); auto options = Y(); if (ctx.SimpleColumns && !isJoin) { terms = L(terms, Y("let", "res", "row")); } else { terms = PrepareJoinCoalesce(ctx, terms, multipleQualifiedAll, coalesceLabels); auto members = isJoin ? Y() : Y("FlattenMembers"); if (isJoin) { members = L(members, BuildQuotedAtom(Pos, *sourceName + ".")); if (ctx.SimpleColumns) { options = L(options, Q(Y(Q("divePrefix"), Q(members)))); } members = Y(ctx.SimpleColumns ? "DivePrefixMembers" : "SelectMembers", "row", Q(members)); } else { auto prefix = BuildQuotedAtom(Pos, ctx.SimpleColumns ? "" : *sourceName + "."); members = L(members, Q(Y(prefix, "row"))); if (!ctx.SimpleColumns) { options = L(options, Q(Y(Q("addPrefix"), prefix))); } } terms = L(terms, Y("let", "res", members)); } sqlProjectArgs = L(sqlProjectArgs, Y("SqlProjectStarItem", "projectCoreType", BuildQuotedAtom(Pos, *sourceName), BuildLambda(Pos, Y("row"), terms, "res"), Q(options))); } ++column; ++isNamedColumn; } } for (const auto& [columnName, column]: ExtraSortColumns) { auto body = Y(); body = L(body, Y("let", "res", column)); TPosition pos = column->GetPos(); auto projectItem = Y("SqlProjectItem", "projectCoreType", BuildQuotedAtom(pos, columnName), BuildLambda(pos, Y("row"), body, "res")); sqlProjectArgs = L(sqlProjectArgs, projectItem); } auto block(Y(Y("let", "projectCoreType", Y("TypeOf", "core")))); block = L(block, Y("let", "core", Y(ordered ? "OrderedSqlProject" : "SqlProject", "core", Q(sqlProjectArgs)))); if (!(UniqueSets.empty() && DistinctSets.empty())) { block = L(block, Y("let", "core", Y("RemoveSystemMembers", "core"))); const auto MakeUniqueHint = [this](INode::TPtr& block, const TColumnsSets& sets, bool distinct) { if (!sets.empty()) { auto assume = Y(distinct ? "AssumeDistinctHint" : "AssumeUniqueHint", "core"); if (!sets.front().empty()) { for (const auto& columns : sets) { auto set = Y(); for (const auto& column : columns) { set = L(set, Q(column)); } assume = L(assume, Q(set)); } } block = L(block, Y("let", "core", assume)); } }; MakeUniqueHint(block, DistinctSets, true); MakeUniqueHint(block, UniqueSets, false); } return Y("block", Q(L(block, Y("return", "core")))); } private: TSourcePtr Source; TVector GroupByExpr; TVector DistinctAggrExpr; TVector GroupBy; bool AssumeSorted = false; bool CompactGroupBy = false; TString GroupBySuffix; TVector OrderBy; TNodePtr Having; TWinSpecs WinSpecs; TNodePtr Flatten; TNodePtr PreFlattenMap; TNodePtr PreaggregatedMap; TNodePtr PrewindowMap; TNodePtr Aggregate; TNodePtr CalcOverWindow; TNodePtr CompositeTerms; TVector Terms; TVector Without; const bool Distinct; bool OrderByInit = false; TLegacyHoppingWindowSpecPtr LegacyHoppingWindowSpec; const bool SelectStream; const TWriteSettings Settings; const TColumnsSets UniqueSets, DistinctSets; TMap ExtraSortColumns; }; class TProcessSource: public IRealSource { public: TProcessSource( TPosition pos, TSourcePtr source, TNodePtr with, bool withExtFunction, TVector&& terms, bool listCall, bool processStream, const TWriteSettings& settings, const TVector& assumeOrderBy ) : IRealSource(pos) , Source(std::move(source)) , With(with) , WithExtFunction(withExtFunction) , Terms(std::move(terms)) , ListCall(listCall) , ProcessStream(processStream) , Settings(settings) , AssumeOrderBy(assumeOrderBy) { } void GetInputTables(TTableList& tableList) const override { Source->GetInputTables(tableList); ISource::GetInputTables(tableList); } bool DoInit(TContext& ctx, ISource* initSrc) override { if (AsInner) { Source->UseAsInner(); } if (!Source->Init(ctx, initSrc)) { return false; } if (ProcessStream && !Source->IsStream()) { ctx.Error(Pos) << "PROCESS STREAM is unsupported for non-streaming sources"; return false; } auto src = Source.Get(); if (!With) { src->AllColumns(); Columns.SetAll(); src->FinishColumns(); return true; } /// grouped expressions are available in filters if (!Source->InitFilters(ctx)) { return false; } TSourcePtr fakeSource = nullptr; if (ListCall && !WithExtFunction) { fakeSource = BuildFakeSource(src->GetPos()); src->AllColumns(); } auto processSource = fakeSource != nullptr ? fakeSource.Get() : src; Y_DEBUG_ABORT_UNLESS(processSource != nullptr); if (!With->Init(ctx, processSource)) { return false; } if (With->GetLabel().empty()) { Columns.SetAll(); } else { if (ListCall) { ctx.Error(With->GetPos()) << "Label is not allowed to use with TableRows()"; return false; } Columns.Add(&With->GetLabel(), false); } bool hasError = false; TNodePtr produce; if (WithExtFunction) { produce = Y(); } else { TString processCall = (ListCall ? "SqlProcess" : "Apply"); produce = Y(processCall, With); } TMaybe listPosIndex; ui32 termIndex = 0; for (auto& term: Terms) { if (!term->GetLabel().empty()) { ctx.Error(term->GetPos()) << "Labels are not allowed for PROCESS terms"; hasError = true; continue; } if (!term->Init(ctx, processSource)) { hasError = true; continue; } if (ListCall) { if (auto atom = dynamic_cast(term.Get())) { listPosIndex = termIndex; } } ++termIndex; produce = L(produce, term); } if (hasError) { return false; } if (ListCall && !WithExtFunction) { YQL_ENSURE(listPosIndex.Defined()); produce = L(produce, Q(ToString(*listPosIndex))); } if (!produce->Init(ctx, src)) { hasError = true; } if (!(WithExtFunction && Terms.empty())) { TVector(1, produce).swap(Terms); } src->FinishColumns(); const auto label = GetLabel(); for (const auto& sortSpec: AssumeOrderBy) { auto& expr = sortSpec->OrderExpr; SetLabel(Source->GetLabel()); if (!expr->Init(ctx, this)) { hasError = true; continue; } if (!IsComparableExpression(ctx, expr, true, "ASSUME ORDER BY")) { hasError = true; continue; } } SetLabel(label); return !hasError; } TNodePtr Build(TContext& ctx) override { auto input = Source->Build(ctx); if (!input) { return nullptr; } if (!With) { auto res = input; if (ctx.EnableSystemColumns) { res = Y("RemoveSystemMembers", res); } return res; } TString inputLabel = ListCall ? "inputRowsList" : "core"; auto block(Y(Y("let", inputLabel, input))); auto filter = Source->BuildFilter(ctx, inputLabel); if (filter) { block = L(block, Y("let", inputLabel, filter)); } if (WithExtFunction) { auto preTransform = Y("RemoveSystemMembers", inputLabel); if (Terms.size() > 0) { preTransform = Y("Map", preTransform, BuildLambda(Pos, Y("row"), Q(Terms[0]))); } block = L(block, Y("let", inputLabel, preTransform)); block = L(block, Y("let", "transform", With)); block = L(block, Y("let", "core", Y("Apply", "transform", inputLabel))); } else if (ListCall) { block = L(block, Y("let", "core", Terms[0])); } else { auto terms = BuildColumnsTerms(ctx); block = L(block, Y("let", "core", Y(ctx.UseUnordered(*this) ? "OrderedFlatMap" : "FlatMap", "core", BuildLambda(Pos, Y("row"), terms, "res")))); } block = L(block, Y("let", "core", Y("AutoDemux", Y("PersistableRepr", "core")))); return Y("block", Q(L(block, Y("return", "core")))); } TNodePtr BuildSort(TContext& ctx, const TString& label) override { Y_UNUSED(ctx); if (AssumeOrderBy.empty()) { return nullptr; } return Y("let", label, BuildSortSpec(AssumeOrderBy, label, false, true)); } EOrderKind GetOrderKind() const override { if (!With) { return EOrderKind::Passthrough; } return AssumeOrderBy.empty() ? EOrderKind::None : EOrderKind::Assume; } bool IsSelect() const override { return false; } bool HasSelectResult() const override { return !Settings.Discard; } bool IsStream() const override { return Source->IsStream(); } TWriteSettings GetWriteSettings() const override { return Settings; } TNodePtr DoClone() const final { return new TProcessSource(Pos, Source->CloneSource(), SafeClone(With), WithExtFunction, CloneContainer(Terms), ListCall, ProcessStream, Settings, CloneContainer(AssumeOrderBy)); } private: TNodePtr BuildColumnsTerms(TContext& ctx) { Y_UNUSED(ctx); TNodePtr terms; Y_DEBUG_ABORT_UNLESS(Terms.size() == 1); if (Columns.All) { terms = Y(Y("let", "res", Y("ToSequence", Terms.front()))); } else { Y_DEBUG_ABORT_UNLESS(Columns.List.size() == Terms.size()); terms = L(Y(), Y("let", "res", L(Y("AsStructUnordered"), Q(Y(BuildQuotedAtom(Pos, Columns.List.front()), Terms.front()))))); terms = L(terms, Y("let", "res", Y("Just", "res"))); } return terms; } private: TSourcePtr Source; TNodePtr With; const bool WithExtFunction; TVector Terms; const bool ListCall; const bool ProcessStream; const TWriteSettings Settings; TVector AssumeOrderBy; }; TSourcePtr BuildProcess( TPosition pos, TSourcePtr source, TNodePtr with, bool withExtFunction, TVector&& terms, bool listCall, bool processStream, const TWriteSettings& settings, const TVector& assumeOrderBy ) { return new TProcessSource(pos, std::move(source), with, withExtFunction, std::move(terms), listCall, processStream, settings, assumeOrderBy); } class TNestedProxySource: public IProxySource { public: TNestedProxySource(TPosition pos, const TVector& groupBy, TSourcePtr source) : IProxySource(pos, source.Get()) , CompositeSelect(nullptr) , Holder(std::move(source)) , GroupBy(groupBy) {} TNestedProxySource(TCompositeSelect* compositeSelect, const TVector& groupBy) : IProxySource(compositeSelect->GetPos(), compositeSelect->RealSource()) , CompositeSelect(compositeSelect) , GroupBy(groupBy) {} bool DoInit(TContext& ctx, ISource* src) override { return Source->Init(ctx, src); } TNodePtr Build(TContext& ctx) override { return CompositeSelect ? BuildAtom(Pos, "composite", TNodeFlags::Default) : Source->Build(ctx); } bool InitFilters(TContext& ctx) override { return CompositeSelect ? true : Source->InitFilters(ctx); } TNodePtr BuildFilter(TContext& ctx, const TString& label) override { return CompositeSelect ? nullptr : Source->BuildFilter(ctx, label); } IJoin* GetJoin() override { return Source->GetJoin(); } bool IsCompositeSource() const override { return true; } ISource* GetCompositeSource() override { return CompositeSelect; } bool AddGrouping(TContext& ctx, const TVector& columns, TString& hintColumn) override { Y_UNUSED(ctx); hintColumn = TStringBuilder() << "GroupingHint" << Hints.size(); ui64 hint = 0; if (GroupByColumns.empty()) { const bool isJoin = GetJoin(); for (const auto& groupByNode: GroupBy) { auto namePtr = groupByNode->GetColumnName(); YQL_ENSURE(namePtr); TString column = *namePtr; if (isJoin) { auto sourceNamePtr = groupByNode->GetSourceName(); if (sourceNamePtr && !sourceNamePtr->empty()) { column = DotJoin(*sourceNamePtr, column); } } GroupByColumns.insert(column); } } for (const auto& column: columns) { hint <<= 1; if (!GroupByColumns.contains(column)) { hint += 1; } } Hints.push_back(hint); return true; } size_t GetGroupingColumnsCount() const override { return Hints.size(); } TNodePtr BuildGroupingColumns(const TString& label) override { if (Hints.empty()) { return nullptr; } auto body = Y(); for (size_t i = 0; i < Hints.size(); ++i) { TString hintColumn = TStringBuilder() << "GroupingHint" << i; TString hintValue = ToString(Hints[i]); body = L(body, Y("let", "row", Y("AddMember", "row", Q(hintColumn), Y("Uint64", Q(hintValue))))); } return Y("Map", label, BuildLambda(Pos, Y("row"), body, "row")); } void FinishColumns() override { Source->FinishColumns(); } TMaybe AddColumn(TContext& ctx, TColumnNode& column) override { if (const TString* columnName = column.GetColumnName()) { if (columnName && IsExprAlias(*columnName)) { return true; } } return Source->AddColumn(ctx, column); } TPtr DoClone() const final { YQL_ENSURE(Hints.empty()); return Holder.Get() ? new TNestedProxySource(Pos, CloneContainer(GroupBy), Holder->CloneSource()) : new TNestedProxySource(CompositeSelect, CloneContainer(GroupBy)); } private: TCompositeSelect* CompositeSelect; TSourcePtr Holder; TVector GroupBy; mutable TSet GroupByColumns; mutable TVector Hints; }; namespace { TSourcePtr DoBuildSelectCore( TContext& ctx, TPosition pos, TSourcePtr originalSource, TSourcePtr source, const TVector& groupByExpr, const TVector& groupBy, bool compactGroupBy, const TString& groupBySuffix, bool assumeSorted, const TVector& orderBy, TNodePtr having, TWinSpecs&& winSpecs, TLegacyHoppingWindowSpecPtr legacyHoppingWindowSpec, TVector&& terms, bool distinct, TVector&& without, bool selectStream, const TWriteSettings& settings, TColumnsSets&& uniqueSets, TColumnsSets&& distinctSets ) { if (groupBy.empty() || !groupBy.front()->ContentListPtr()) { return new TSelectCore(pos, std::move(source), groupByExpr, groupBy, compactGroupBy, groupBySuffix, assumeSorted, orderBy, having, winSpecs, legacyHoppingWindowSpec, terms, distinct, without, selectStream, settings, std::move(uniqueSets), std::move(distinctSets)); } if (groupBy.size() == 1) { /// actualy no big idea to use grouping function in this case (result allways 0) auto contentPtr = groupBy.front()->ContentListPtr(); source = new TNestedProxySource(pos, *contentPtr, source); return DoBuildSelectCore(ctx, pos, originalSource, source, groupByExpr, *contentPtr, compactGroupBy, groupBySuffix, assumeSorted, orderBy, having, std::move(winSpecs), legacyHoppingWindowSpec, std::move(terms), distinct, std::move(without), selectStream, settings, std::move(uniqueSets), std::move(distinctSets)); } /// \todo some smart merge logic, generalize common part of grouping (expr, flatten, etc)? TIntrusivePtr compositeSelect = new TCompositeSelect(pos, std::move(source), originalSource->CloneSource(), settings); size_t totalGroups = 0; TVector subselects; TVector groupingCols; for (auto& grouping: groupBy) { auto contentPtr = grouping->ContentListPtr(); TVector cache(1, nullptr); if (!contentPtr) { cache[0] = grouping; contentPtr = &cache; } groupingCols.insert(groupingCols.end(), contentPtr->cbegin(), contentPtr->cend()); TSourcePtr proxySource = new TNestedProxySource(compositeSelect.Get(), CloneContainer(*contentPtr)); if (!subselects.empty()) { /// clone terms for others usage TVector termsCopy; for (const auto& term: terms) { termsCopy.emplace_back(term->Clone()); } std::swap(terms, termsCopy); } totalGroups += contentPtr->size(); TSelectCore* selectCore = new TSelectCore(pos, std::move(proxySource), CloneContainer(groupByExpr), CloneContainer(*contentPtr), compactGroupBy, groupBySuffix, assumeSorted, orderBy, SafeClone(having), CloneContainer(winSpecs), legacyHoppingWindowSpec, terms, distinct, without, selectStream, settings, TColumnsSets(uniqueSets), TColumnsSets(distinctSets)); subselects.emplace_back(selectCore); } if (totalGroups > ctx.PragmaGroupByLimit) { ctx.Error(pos) << "Unable to GROUP BY more than " << ctx.PragmaGroupByLimit << " groups, you try use " << totalGroups << " groups"; return nullptr; } compositeSelect->SetSubselects(std::move(subselects), std::move(groupingCols), CloneContainer(groupByExpr)); return compositeSelect; } } TSourcePtr BuildSelectCore( TContext& ctx, TPosition pos, TSourcePtr source, const TVector& groupByExpr, const TVector& groupBy, bool compactGroupBy, const TString& groupBySuffix, bool assumeSorted, const TVector& orderBy, TNodePtr having, TWinSpecs&& winSpecs, TLegacyHoppingWindowSpecPtr legacyHoppingWindowSpec, TVector&& terms, bool distinct, TVector&& without, bool selectStream, const TWriteSettings& settings, TColumnsSets&& uniqueSets, TColumnsSets&& distinctSets ) { return DoBuildSelectCore(ctx, pos, source, source, groupByExpr, groupBy, compactGroupBy, groupBySuffix, assumeSorted, orderBy, having, std::move(winSpecs), legacyHoppingWindowSpec, std::move(terms), distinct, std::move(without), selectStream, settings, std::move(uniqueSets), std::move(distinctSets)); } class TUnion: public IRealSource { public: TUnion(TPosition pos, TVector&& sources, bool quantifierAll, const TWriteSettings& settings) : IRealSource(pos) , Sources(std::move(sources)) , QuantifierAll(quantifierAll) , Settings(settings) { } const TColumns* GetColumns() const override { return IRealSource::GetColumns(); } void GetInputTables(TTableList& tableList) const override { for (auto& x : Sources) { x->GetInputTables(tableList); } ISource::GetInputTables(tableList); } bool DoInit(TContext& ctx, ISource* src) override { bool first = true; for (auto& s: Sources) { s->UseAsInner(); if (!s->Init(ctx, src)) { return false; } if (!ctx.PositionalUnionAll || first) { auto c = s->GetColumns(); Y_DEBUG_ABORT_UNLESS(c); Columns.Merge(*c); first = false; } } return true; } TNodePtr Build(TContext& ctx) override { TPtr res; if (QuantifierAll) { if (ctx.EmitUnionMerge) { res = ctx.PositionalUnionAll ? Y("UnionMergePositional") : Y("UnionMerge"); } else { res = ctx.PositionalUnionAll ? Y("UnionAllPositional") : Y("UnionAll"); } } else { res = ctx.PositionalUnionAll ? Y("UnionPositional") : Y("Union"); } for (auto& s: Sources) { auto input = s->Build(ctx); if (!input) { return nullptr; } res->Add(input); } return res; } bool IsStream() const override { for (auto& s: Sources) { if (!s->IsStream()) { return false; } } return true; } TNodePtr DoClone() const final { return MakeIntrusive(Pos, CloneContainer(Sources), QuantifierAll, Settings); } bool IsSelect() const override { return true; } bool HasSelectResult() const override { return !Settings.Discard; } TWriteSettings GetWriteSettings() const override { return Settings; } private: TVector Sources; bool QuantifierAll; const TWriteSettings Settings; }; TSourcePtr BuildUnion( TPosition pos, TVector&& sources, bool quantifierAll, const TWriteSettings& settings ) { return new TUnion(pos, std::move(sources), quantifierAll, settings); } class TOverWindowSource: public IProxySource { public: TOverWindowSource(TPosition pos, const TString& windowName, ISource* origSource) : IProxySource(pos, origSource) , WindowName(windowName) { Source->SetLabel(origSource->GetLabel()); } TString MakeLocalName(const TString& name) override { return Source->MakeLocalName(name); } void AddTmpWindowColumn(const TString& column) override { return Source->AddTmpWindowColumn(column); } bool AddAggregation(TContext& ctx, TAggregationPtr aggr) override { if (aggr->IsOverWindow() || aggr->IsOverWindowDistinct()) { return Source->AddAggregationOverWindow(ctx, WindowName, aggr); } return Source->AddAggregation(ctx, aggr); } bool AddFuncOverWindow(TContext& ctx, TNodePtr expr) override { return Source->AddFuncOverWindow(ctx, WindowName, expr); } bool IsOverWindowSource() const override { return true; } TMaybe AddColumn(TContext& ctx, TColumnNode& column) override { return Source->AddColumn(ctx, column); } TNodePtr Build(TContext& ctx) override { Y_UNUSED(ctx); Y_ABORT("Unexpected call"); } const TString* GetWindowName() const override { return &WindowName; } TWindowSpecificationPtr FindWindowSpecification(TContext& ctx, const TString& windowName) const override { return Source->FindWindowSpecification(ctx, windowName); } TNodePtr GetSessionWindowSpec() const override { return Source->GetSessionWindowSpec(); } TNodePtr DoClone() const final { return {}; } private: const TString WindowName; }; TSourcePtr BuildOverWindowSource(TPosition pos, const TString& windowName, ISource* origSource) { return new TOverWindowSource(pos, windowName, origSource); } class TSkipTakeNode final: public TAstListNode { public: TSkipTakeNode(TPosition pos, const TNodePtr& skip, const TNodePtr& take) : TAstListNode(pos), IsSkipProvided_(!!skip) { TNodePtr select(AstNode("select")); if (skip) { select = Y("Skip", select, Y("Coalesce", skip, Y("Uint64", Q("0")))); } static const TString uiMax = ::ToString(std::numeric_limits::max()); Add("let", "select", Y("Take", select, Y("Coalesce", take, Y("Uint64", Q(uiMax))))); } TPtr DoClone() const final { return {}; } bool HasSkip() const { return IsSkipProvided_; } private: const bool IsSkipProvided_; }; TNodePtr BuildSkipTake(TPosition pos, const TNodePtr& skip, const TNodePtr& take) { return new TSkipTakeNode(pos, skip, take); } class TSelect: public IProxySource { public: TSelect(TPosition pos, TSourcePtr source, TNodePtr skipTake) : IProxySource(pos, source.Get()) , Source(std::move(source)) , SkipTake(skipTake) {} bool DoInit(TContext& ctx, ISource* src) override { Source->SetLabel(Label); if (AsInner) { Source->UseAsInner(); } if (IgnoreSort()) { Source->DisableSort(); ctx.Warning(Source->GetPos(), TIssuesIds::YQL_ORDER_BY_WITHOUT_LIMIT_IN_SUBQUERY) << "ORDER BY without LIMIT in subquery will be ignored"; } if (!Source->Init(ctx, src)) { return false; } src = Source.Get(); if (SkipTake) { FakeSource = BuildFakeSource(SkipTake->GetPos()); if (!SkipTake->Init(ctx, FakeSource.Get())) { return false; } if (SkipTake->HasSkip() && EOrderKind::Sort != Source->GetOrderKind()) { ctx.Warning(Source->GetPos(), TIssuesIds::YQL_OFFSET_WITHOUT_SORT) << "LIMIT with OFFSET without ORDER BY may provide different results from run to run"; } } return true; } TNodePtr Build(TContext& ctx) override { auto input = Source->Build(ctx); if (!input) { return nullptr; } const auto label = "select"; auto block(Y(Y("let", label, input))); auto sortNode = Source->BuildSort(ctx, label); if (sortNode && !IgnoreSort()) { block = L(block, sortNode); } if (SkipTake) { block = L(block, SkipTake); } TNodePtr sample; if (!BuildSamplingLambda(sample)) { return nullptr; } else if (sample) { block = L(block, Y("let", "select", Y("OrderedFlatMap", "select", sample))); } if (auto removeNode = Source->BuildCleanupColumns(ctx, label)) { block = L(block, removeNode); } block = L(block, Y("return", label)); return Y("block", Q(block)); } bool SetSamplingOptions( TContext& ctx, TPosition pos, ESampleClause sampleClause, ESampleMode mode, TNodePtr samplingRate, TNodePtr samplingSeed) override { if (mode == ESampleMode::System) { ctx.Error(pos) << "only Bernoulli sampling mode is supported for subqueries"; return false; } if (samplingSeed) { ctx.Error(pos) << "'Repeatable' keyword is not supported for subqueries"; return false; } return SetSamplingRate(ctx, sampleClause, samplingRate); } bool IsSelect() const override { return Source->IsSelect(); } bool HasSelectResult() const override { return Source->HasSelectResult(); } TPtr DoClone() const final { return MakeIntrusive(Pos, Source->CloneSource(), SafeClone(SkipTake)); } protected: bool IgnoreSort() const { return AsInner && !SkipTake && EOrderKind::Sort == Source->GetOrderKind(); } TSourcePtr Source; TNodePtr SkipTake; TSourcePtr FakeSource; }; TSourcePtr BuildSelect(TPosition pos, TSourcePtr source, TNodePtr skipTake) { return new TSelect(pos, std::move(source), skipTake); } class TSelectResultNode final: public TAstListNode { public: TSelectResultNode(TPosition pos, TSourcePtr source, bool writeResult, bool inSubquery, TScopedStatePtr scoped) : TAstListNode(pos) , Source(std::move(source)) , WriteResult(writeResult) , InSubquery(inSubquery) , Scoped(scoped) { YQL_ENSURE(Source, "Invalid source node"); FakeSource = BuildFakeSource(pos); } bool IsSelect() const override { return true; } bool HasSelectResult() const override { return Source->HasSelectResult(); } bool DoInit(TContext& ctx, ISource* src) override { if (!Source->Init(ctx, src)) { return false; } src = Source.Get(); TTableList tableList; Source->GetInputTables(tableList); TNodePtr node(BuildInputTables(Pos, tableList, InSubquery, Scoped)); if (!node->Init(ctx, src)) { return false; } auto writeSettings = src->GetWriteSettings(); bool asRef = ctx.PragmaRefSelect; bool asAutoRef = true; if (ctx.PragmaSampleSelect) { asRef = false; asAutoRef = false; } auto settings = Y(Q(Y(Q("type")))); if (writeSettings.Discard) { settings = L(settings, Q(Y(Q("discard")))); } if (!writeSettings.Label.Empty()) { auto labelNode = writeSettings.Label.Build(); if (!writeSettings.Label.GetLiteral()) { labelNode = Y("EvaluateAtom", labelNode); } if (!labelNode->Init(ctx, FakeSource.Get())) { return false; } settings = L(settings, Q(Y(Q("label"), labelNode))); } if (asRef) { settings = L(settings, Q(Y(Q("ref")))); } else if (asAutoRef) { settings = L(settings, Q(Y(Q("autoref")))); } auto columns = Source->GetColumns(); if (columns && !columns->All && !(columns->QualifiedAll && ctx.SimpleColumns)) { auto list = Y(); YQL_ENSURE(columns->List.size() == columns->NamedColumns.size()); for (size_t i = 0; i < columns->List.size(); ++i) { auto& c = columns->List[i]; if (c.EndsWith('*')) { list = L(list, Q(Y(Q("prefix"), BuildQuotedAtom(Pos, c.substr(0, c.size() - 1))))); } else if (columns->NamedColumns[i]) { list = L(list, BuildQuotedAtom(Pos, c)); } else { list = L(list, Q(Y(Q("auto")))); } } settings = L(settings, Q(Y(Q("columns"), Q(list)))); } if (ctx.ResultRowsLimit > 0) { settings = L(settings, Q(Y(Q("take"), Q(ToString(ctx.ResultRowsLimit))))); } auto output = Source->Build(ctx); if (!output) { return false; } node = L(node, Y("let", "output", output)); if (WriteResult || writeSettings.Discard) { if (EOrderKind::None == Source->GetOrderKind() && ctx.UseUnordered(*Source)) { node = L(node, Y("let", "output", Y("Unordered", "output"))); if (ctx.UnorderedResult) { settings = L(settings, Q(Y(Q("unordered")))); } } auto writeResult(BuildWriteResult(Pos, "output", settings)); if (!writeResult->Init(ctx, src)) { return false; } node = L(node, Y("let", "world", writeResult)); node = L(node, Y("return", "world")); } else { node = L(node, Y("return", "output")); } Add("block", Q(node)); return true; } TPtr DoClone() const final { return {}; } protected: TSourcePtr Source; const bool WriteResult; const bool InSubquery; TScopedStatePtr Scoped; TSourcePtr FakeSource; }; TNodePtr BuildSelectResult(TPosition pos, TSourcePtr source, bool writeResult, bool inSubquery, TScopedStatePtr scoped) { return new TSelectResultNode(pos, std::move(source), writeResult, inSubquery, scoped); } } // namespace NSQLTranslationV1