123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468 |
- #include "source.h"
- #include "context.h"
- #include <yql/essentials/utils/yql_panic.h>
- using namespace NYql;
- namespace NSQLTranslationV1 {
- static const TMap<ESQLWriteColumnMode, EWriteColumnMode> sqlIntoMode2WriteColumn = {
- {ESQLWriteColumnMode::InsertInto, EWriteColumnMode::Insert},
- {ESQLWriteColumnMode::InsertOrAbortInto, EWriteColumnMode::InsertOrAbort},
- {ESQLWriteColumnMode::InsertOrIgnoreInto, EWriteColumnMode::InsertOrIgnore},
- {ESQLWriteColumnMode::InsertOrRevertInto, EWriteColumnMode::InsertOrRevert},
- {ESQLWriteColumnMode::UpsertInto, EWriteColumnMode::Upsert},
- {ESQLWriteColumnMode::ReplaceInto, EWriteColumnMode::Replace},
- {ESQLWriteColumnMode::InsertIntoWithTruncate, EWriteColumnMode::Renew},
- {ESQLWriteColumnMode::Update, EWriteColumnMode::Update},
- {ESQLWriteColumnMode::Delete, EWriteColumnMode::Delete},
- };
- class TModifySourceBase: public ISource {
- public:
- TModifySourceBase(TPosition pos, const TVector<TString>& columnsHint)
- : ISource(pos)
- , ColumnsHint(columnsHint)
- {
- }
- bool AddFilter(TContext& ctx, TNodePtr filter) override {
- Y_UNUSED(filter);
- ctx.Error(Pos) << "Source does not allow filtering";
- return false;
- }
- bool AddGroupKey(TContext& ctx, const TString& column) override {
- Y_UNUSED(column);
- ctx.Error(Pos) << "Source does not allow grouping";
- return false;
- }
- bool AddAggregation(TContext& ctx, TAggregationPtr aggr) override {
- YQL_ENSURE(aggr);
- ctx.Error(aggr->GetPos()) << "Source does not allow aggregation";
- return false;
- }
- TNodePtr BuildFilter(TContext& ctx, const TString& label) override {
- Y_UNUSED(ctx);
- Y_UNUSED(label);
- return nullptr;
- }
- std::pair<TNodePtr, bool> BuildAggregation(const TString& label, TContext& ctx) override {
- Y_UNUSED(label);
- Y_UNUSED(ctx);
- return { nullptr, true };
- }
- protected:
- TVector<TString> ColumnsHint;
- TString OperationHumanName;
- };
- class TUpdateByValues: public TModifySourceBase {
- public:
- TUpdateByValues(TPosition pos, const TString& operationHumanName, const TVector<TString>& columnsHint, const TVector<TNodePtr>& values)
- : TModifySourceBase(pos, columnsHint)
- , OperationHumanName(operationHumanName)
- , Values(values)
- {}
- bool DoInit(TContext& ctx, ISource* src) override {
- if (ColumnsHint.size() != Values.size()) {
- ctx.Error(Pos) << "VALUES have " << Values.size() << " columns, " << OperationHumanName << " expects: " << ColumnsHint.size();
- return false;
- }
- for (auto& value: Values) {
- if (!value->Init(ctx, src)) {
- return false;
- }
- }
- return true;
- }
- TNodePtr Build(TContext& ctx) override {
- Y_UNUSED(ctx);
- YQL_ENSURE(Values.size() == ColumnsHint.size());
- auto structObj = Y("AsStruct");
- for (size_t i = 0; i < Values.size(); ++i) {
- TString column = ColumnsHint[i];
- TNodePtr value = Values[i];
- structObj = L(structObj, Q(Y(Q(column), value)));
- }
- auto updateRow = BuildLambda(Pos, Y("row"), structObj);
- return updateRow;
- }
- TNodePtr DoClone() const final {
- return new TUpdateByValues(Pos, OperationHumanName, ColumnsHint, CloneContainer(Values));
- }
- private:
- TString OperationHumanName;
- protected:
- TVector<TNodePtr> Values;
- };
- class TModifyByValues: public TModifySourceBase {
- public:
- TModifyByValues(TPosition pos, const TString& operationHumanName, const TVector<TString>& columnsHint, const TVector<TVector<TNodePtr>>& values)
- : TModifySourceBase(pos, columnsHint)
- , OperationHumanName(operationHumanName)
- , Values(values)
- {
- FakeSource = BuildFakeSource(pos);
- }
- bool DoInit(TContext& ctx, ISource* src) override {
- Y_UNUSED(src);
- bool hasError = false;
- for (const auto& row: Values) {
- if (ColumnsHint.empty()) {
- ctx.Error(Pos) << OperationHumanName << " ... VALUES requires specification of table columns";
- hasError = true;
- continue;
- }
- if (ColumnsHint.size() != row.size()) {
- ctx.Error(Pos) << "VALUES have " << row.size() << " columns, " << OperationHumanName << " expects: " << ColumnsHint.size();
- hasError = true;
- continue;
- }
- for (auto& value: row) {
- if (!value->Init(ctx, FakeSource.Get())) {
- hasError = true;
- continue;
- }
- }
- }
- return !hasError;
- }
- TNodePtr Build(TContext& ctx) override {
- Y_UNUSED(ctx);
- auto tuple = Y();
- for (const auto& row: Values) {
- auto rowValues = Y("AsStruct"); // ordered struct
- auto column = ColumnsHint.begin();
- for (auto value: row) {
- rowValues = L(rowValues, Q(Y(BuildQuotedAtom(Pos, *column), value)));
- ++column;
- }
- tuple = L(tuple, rowValues);
- }
- return Y("PersistableRepr", Q(tuple));
- }
- TNodePtr DoClone() const final {
- TVector<TVector<TNodePtr>> clonedValues;
- clonedValues.reserve(Values.size());
- for (auto cur: Values) {
- clonedValues.push_back(CloneContainer(cur));
- }
- return new TModifyByValues(Pos, OperationHumanName, ColumnsHint, clonedValues);
- }
- private:
- TString OperationHumanName;
- TVector<TVector<TNodePtr>> Values;
- TSourcePtr FakeSource;
- };
- class TModifyBySource: public TModifySourceBase {
- public:
- TModifyBySource(TPosition pos, const TString& operationHumanName, const TVector<TString>& columnsHint, TSourcePtr source)
- : TModifySourceBase(pos, columnsHint)
- , OperationHumanName(operationHumanName)
- , Source(std::move(source))
- {}
- void GetInputTables(TTableList& tableList) const override {
- if (Source) {
- return Source->GetInputTables(tableList);
- }
- }
- bool DoInit(TContext& ctx, ISource* src) override {
- if (!Source->Init(ctx, src)) {
- return false;
- }
- const size_t numColumns = ColumnsHint.size();
- if (numColumns) {
- const auto sourceColumns = Source->GetColumns();
- if (!sourceColumns || sourceColumns->All || sourceColumns->QualifiedAll) {
- return true;
- }
- if (numColumns != sourceColumns->List.size()) {
- ctx.Error(Pos) << "SELECT have " << numColumns << " columns, " << OperationHumanName << " expects: " << ColumnsHint.size();
- return false;
- }
- TStringStream str;
- bool mismatchFound = false;
- for (size_t i = 0; i < numColumns; ++i) {
- bool hasName = sourceColumns->NamedColumns[i];
- if (hasName) {
- const auto& hintColumn = ColumnsHint[i];
- const auto& sourceColumn = sourceColumns->List[i];
- if (hintColumn != sourceColumn) {
- if (!mismatchFound) {
- str << "Column names in SELECT don't match column specification in parenthesis";
- mismatchFound = true;
- }
- str << ". \"" << hintColumn << "\" doesn't match \"" << sourceColumn << "\"";
- }
- }
- }
- if (mismatchFound) {
- ctx.Warning(Pos, TIssuesIds::YQL_SOURCE_SELECT_COLUMN_MISMATCH) << str.Str();
- }
- }
- return true;
- }
- TNodePtr Build(TContext& ctx) override {
- auto input = Source->Build(ctx);
- if (ColumnsHint.empty()) {
- return input;
- }
- auto columns = Y();
- for (auto column: ColumnsHint) {
- columns = L(columns, BuildQuotedAtom(Pos, column));
- }
- const auto sourceColumns = Source->GetColumns();
- if (!sourceColumns || sourceColumns->All || sourceColumns->QualifiedAll || sourceColumns->HasUnnamed) {
- // will try to resolve column mapping on type annotation stage
- return Y("OrderedSqlRename", input, Q(columns));
- }
- YQL_ENSURE(sourceColumns->List.size() == ColumnsHint.size());
- auto srcColumn = Source->GetColumns()->List.begin();
- auto structObj = Y("AsStruct"); // ordered struct
- for (auto column: ColumnsHint) {
- structObj = L(structObj, Q(Y(BuildQuotedAtom(Pos, column),
- Y("Member", "row", BuildQuotedAtom(Pos, *srcColumn))
- )));
- ++srcColumn;
- }
- return Y("AssumeColumnOrder", Y("OrderedMap", input, BuildLambda(Pos, Y("row"), structObj)), Q(columns));
- }
- TNodePtr DoClone() const final {
- return new TModifyBySource(Pos, OperationHumanName, ColumnsHint, Source->CloneSource());
- }
- EOrderKind GetOrderKind() const final {
- return Source->GetOrderKind();
- }
- private:
- TString OperationHumanName;
- TSourcePtr Source;
- };
- TSourcePtr BuildWriteValues(TPosition pos, const TString& operationHumanName, const TVector<TString>& columnsHint, const TVector<TVector<TNodePtr>>& values) {
- return new TModifyByValues(pos, operationHumanName, columnsHint, values);
- }
- TSourcePtr BuildWriteValues(TPosition pos, const TString& operationHumanName, const TVector<TString>& columnsHint, TSourcePtr source) {
- return new TModifyBySource(pos, operationHumanName, columnsHint, std::move(source));
- }
- TSourcePtr BuildUpdateValues(TPosition pos, const TVector<TString>& columnsHint, const TVector<TNodePtr>& values) {
- return new TUpdateByValues(pos, "UPDATE", columnsHint, values);
- }
- class TWriteColumnsNode: public TAstListNode {
- public:
- TWriteColumnsNode(TPosition pos, TScopedStatePtr scoped,
- const TTableRef& table, EWriteColumnMode mode, TSourcePtr values = nullptr, TNodePtr options = nullptr)
- : TAstListNode(pos)
- , Scoped(scoped)
- , Table(table)
- , Mode(mode)
- , Values(std::move(values))
- , Options(std::move(options))
- {
- FakeSource = BuildFakeSource(pos);
- }
- void ResetSource(TSourcePtr source) {
- TableSource = std::move(source);
- }
- void ResetUpdate(TSourcePtr update) {
- Update = std::move(update);
- }
- void ResetIsBatch(bool isBatch) {
- IsBatch = isBatch;
- }
- bool DoInit(TContext& ctx, ISource* src) override {
- TTableList tableList;
- TNodePtr values;
- auto options = Y();
- if (Options) {
- if (!Options->Init(ctx, src)) {
- return false;
- }
- options = L(Options);
- }
- ISource* underlyingSrc = src;
- if (TableSource) {
- if (!TableSource->Init(ctx, src) || !TableSource->InitFilters(ctx)) {
- return false;
- }
- options = L(options, Q(Y(Q("filter"), TableSource->BuildFilterLambda())));
- }
- bool unordered = false;
- if (Values) {
- if (!Values->Init(ctx, TableSource.Get())) {
- return false;
- }
- Values->GetInputTables(tableList);
- underlyingSrc = Values.Get();
- values = Values->Build(ctx);
- if (!values) {
- return false;
- }
- unordered = (EOrderKind::None == Values->GetOrderKind());
- }
- TNodePtr node(BuildInputTables(Pos, tableList, false, Scoped));
- if (!node->Init(ctx, underlyingSrc)) {
- return false;
- }
- if (Update) {
- if (!Update->Init(ctx, TableSource.Get()) || !Update->InitFilters(ctx)) {
- return false;
- }
- options = L(options, Q(Y(Q("update"), Update->Build(ctx))));
- }
- if (IsBatch) {
- options = L(options, Q(Y(Q("is_batch"), Q("true"))));
- }
- auto write = BuildWriteTable(Pos, "values", Table, Mode, std::move(options), Scoped);
- if (!write->Init(ctx, FakeSource.Get())) {
- return false;
- }
- if (values) {
- node = L(node, Y("let", "values", values));
- if (unordered && ctx.UseUnordered(Table)) {
- node = L(node, Y("let", "values", Y("Unordered", "values")));
- }
- } else {
- node = L(node, Y("let", "values", Y("Void")));
- }
- node = L(node, Y("let", "world", write));
- node = L(node, Y("return", "world"));
- Add("block", Q(node));
- return true;
- }
- TNodePtr DoClone() const final {
- return {};
- }
- protected:
- TScopedStatePtr Scoped;
- TTableRef Table;
- TSourcePtr TableSource;
- EWriteColumnMode Mode;
- TSourcePtr Values;
- TSourcePtr Update;
- TSourcePtr FakeSource;
- TNodePtr Options;
- bool IsBatch = false;
- };
- EWriteColumnMode ToWriteColumnsMode(ESQLWriteColumnMode sqlWriteColumnMode) {
- return sqlIntoMode2WriteColumn.at(sqlWriteColumnMode);
- }
- TNodePtr BuildWriteColumns(TPosition pos, TScopedStatePtr scoped, const TTableRef& table, EWriteColumnMode mode, TSourcePtr values, TNodePtr options) {
- YQL_ENSURE(values, "Invalid values node");
- return new TWriteColumnsNode(pos, scoped, table, mode, std::move(values), std::move(options));
- }
- TNodePtr BuildUpdateColumns(TPosition pos, TScopedStatePtr scoped, const TTableRef& table, TSourcePtr values, TSourcePtr source, TNodePtr options) {
- YQL_ENSURE(values, "Invalid values node");
- TIntrusivePtr<TWriteColumnsNode> writeNode = new TWriteColumnsNode(pos, scoped, table, EWriteColumnMode::Update, nullptr, options);
- writeNode->ResetSource(std::move(source));
- writeNode->ResetUpdate(std::move(values));
- return writeNode;
- }
- TNodePtr BuildBatchUpdate(TPosition pos, TScopedStatePtr scoped, const TTableRef& table, TSourcePtr values, TSourcePtr source, TNodePtr options) {
- YQL_ENSURE(values, "Invalid values node");
- TIntrusivePtr<TWriteColumnsNode> writeNode = new TWriteColumnsNode(pos, scoped, table, EWriteColumnMode::Update, nullptr, options);
- writeNode->ResetSource(std::move(source));
- writeNode->ResetUpdate(std::move(values));
- writeNode->ResetIsBatch(true);
- return writeNode;
- }
- TNodePtr BuildDelete(TPosition pos, TScopedStatePtr scoped, const TTableRef& table, TSourcePtr source, TNodePtr options) {
- TIntrusivePtr<TWriteColumnsNode> writeNode = new TWriteColumnsNode(pos, scoped, table, EWriteColumnMode::Delete, nullptr, options);
- writeNode->ResetSource(std::move(source));
- return writeNode;
- }
- TNodePtr BuildBatchDelete(TPosition pos, TScopedStatePtr scoped, const TTableRef& table, TSourcePtr source, TNodePtr options) {
- TIntrusivePtr<TWriteColumnsNode> writeNode = new TWriteColumnsNode(pos, scoped, table, EWriteColumnMode::Delete, nullptr, options);
- writeNode->ResetSource(std::move(source));
- writeNode->ResetIsBatch(true);
- return writeNode;
- }
- class TEraseColumnsNode: public TAstListNode {
- public:
- TEraseColumnsNode(TPosition pos, const TVector<TString>& columns)
- : TAstListNode(pos)
- , Columns(columns)
- {
- }
- bool DoInit(TContext& ctx, ISource* src) override {
- Y_UNUSED(ctx);
- Y_UNUSED(src);
- TNodePtr columnList = Y();
- for (const auto& column: Columns) {
- columnList->Add(Q(column));
- }
- Add(Q(Y(Q("erase_columns"), Q(columnList))));
- return true;
- }
- TNodePtr DoClone() const final {
- return new TEraseColumnsNode(GetPos(), Columns);
- }
- private:
- TVector<TString> Columns;
- };
- TNodePtr BuildEraseColumns(TPosition pos, const TVector<TString>& columns) {
- return new TEraseColumnsNode(pos, columns);
- }
- } // namespace NSQLTranslationV1
|