insert.cpp 14 KB


  1. #include "node.h"
  2. #include "context.h"
  3. #include <yql/essentials/utils/yql_panic.h>
  4. using namespace NYql;
  5. namespace NSQLTranslationV0 {
  6. static const TMap<ESQLWriteColumnMode, EWriteColumnMode> sqlIntoMode2WriteColumn = {
  7. {ESQLWriteColumnMode::InsertInto, EWriteColumnMode::Insert},
  8. {ESQLWriteColumnMode::InsertOrAbortInto, EWriteColumnMode::InsertOrAbort},
  9. {ESQLWriteColumnMode::InsertOrIgnoreInto, EWriteColumnMode::InsertOrIgnore},
  10. {ESQLWriteColumnMode::InsertOrRevertInto, EWriteColumnMode::InsertOrRevert},
  11. {ESQLWriteColumnMode::UpsertInto, EWriteColumnMode::Upsert},
  12. {ESQLWriteColumnMode::ReplaceInto, EWriteColumnMode::Replace},
  13. {ESQLWriteColumnMode::InsertIntoWithTruncate, EWriteColumnMode::Renew},
  14. {ESQLWriteColumnMode::Update, EWriteColumnMode::Update},
  15. {ESQLWriteColumnMode::Delete, EWriteColumnMode::Delete},
  16. };
  17. class TModifySourceBase: public ISource {
  18. public:
  19. TModifySourceBase(TPosition pos, const TVector<TString>& columnsHint)
  20. : ISource(pos)
  21. , ColumnsHint(columnsHint)
  22. {
  23. }
  24. bool AddFilter(TContext& ctx, TNodePtr filter) override {
  25. Y_UNUSED(filter);
  26. ctx.Error(Pos) << "Source does not allow filtering";
  27. return false;
  28. }
  29. bool AddGroupKey(TContext& ctx, const TString& column) override {
  30. Y_UNUSED(column);
  31. ctx.Error(Pos) << "Source does not allow grouping";
  32. return false;
  33. }
  34. bool AddAggregation(TContext& ctx, TAggregationPtr aggr) override {
  35. Y_UNUSED(aggr);
  36. ctx.Error(Pos) << "Source does not allow aggregation";
  37. return false;
  38. }
  39. TNodePtr BuildFilter(TContext& ctx, const TString& label, const TNodePtr& groundNode) override {
  40. Y_UNUSED(ctx);
  41. Y_UNUSED(label);
  42. Y_UNUSED(groundNode);
  43. return nullptr;
  44. }
  45. TNodePtr BuildAggregation(const TString& label) override {
  46. Y_UNUSED(label);
  47. return nullptr;
  48. }
  49. protected:
  50. TVector<TString> ColumnsHint;
  51. TString OperationHumanName;
  52. };
  53. class TUpdateByValues: public TModifySourceBase {
  54. public:
  55. TUpdateByValues(TPosition pos, const TString& operationHumanName, const TVector<TString>& columnsHint, const TVector<TNodePtr>& values)
  56. : TModifySourceBase(pos, columnsHint)
  57. , OperationHumanName(operationHumanName)
  58. , Values(values)
  59. {}
  60. bool DoInit(TContext& ctx, ISource* src) override {
  61. if (ColumnsHint.size() != Values.size()) {
  62. ctx.Error(Pos) << "VALUES have " << Values.size() << " columns, " << OperationHumanName << " expects: " << ColumnsHint.size();
  63. return false;
  64. }
  65. for (auto& value: Values) {
  66. if (!value->Init(ctx, src)) {
  67. return false;
  68. }
  69. }
  70. return true;
  71. }
  72. TNodePtr Build(TContext& ctx) override {
  73. Y_UNUSED(ctx);
  74. YQL_ENSURE(Values.size() == ColumnsHint.size());
  75. auto structObj = Y("AsStruct");
  76. for (size_t i = 0; i < Values.size(); ++i) {
  77. TString column = ColumnsHint[i];
  78. TNodePtr value = Values[i];
  79. structObj = L(structObj, Q(Y(Q(column), value)));
  80. }
  81. auto updateRow = BuildLambda(Pos, Y("row"), structObj);
  82. return updateRow;
  83. }
  84. TNodePtr DoClone() const final {
  85. return new TUpdateByValues(Pos, OperationHumanName, ColumnsHint, CloneContainer(Values));
  86. }
  87. private:
  88. TString OperationHumanName;
  89. protected:
  90. TVector<TNodePtr> Values;
  91. };
  92. class TModifyByValues: public TModifySourceBase {
  93. public:
  94. TModifyByValues(TPosition pos, const TString& operationHumanName, const TVector<TString>& columnsHint, const TVector<TVector<TNodePtr>>& values)
  95. : TModifySourceBase(pos, columnsHint)
  96. , OperationHumanName(operationHumanName)
  97. , Values(values)
  98. {
  99. FakeSource = BuildFakeSource(pos);
  100. }
  101. bool DoInit(TContext& ctx, ISource* src) override {
  102. Y_UNUSED(src);
  103. bool hasError = false;
  104. for (const auto& row: Values) {
  105. if (ColumnsHint.empty()) {
  106. ctx.Error(Pos) << OperationHumanName << " ... VALUES requires specification of table columns";
  107. hasError = true;
  108. continue;
  109. }
  110. if (ColumnsHint.size() != row.size()) {
  111. ctx.Error(Pos) << "VALUES have " << row.size() << " columns, " << OperationHumanName << " expects: " << ColumnsHint.size();
  112. hasError = true;
  113. continue;
  114. }
  115. for (auto& value: row) {
  116. if (!value->Init(ctx, FakeSource.Get())) {
  117. hasError = true;
  118. continue;
  119. }
  120. }
  121. }
  122. return !hasError;
  123. }
  124. TNodePtr Build(TContext& ctx) override {
  125. Y_UNUSED(ctx);
  126. auto tuple = Y();
  127. for (const auto& row: Values) {
  128. auto rowValues = Y("AsStruct");
  129. auto column = ColumnsHint.begin();
  130. for (auto value: row) {
  131. rowValues = L(rowValues, Q(Y(BuildQuotedAtom(Pos, *column), value)));
  132. ++column;
  133. }
  134. tuple = L(tuple, rowValues);
  135. }
  136. return Y("EnsurePersistable", Q(tuple));
  137. }
  138. TNodePtr DoClone() const final {
  139. TVector<TVector<TNodePtr>> clonedValues;
  140. clonedValues.reserve(Values.size());
  141. for (auto cur: Values) {
  142. clonedValues.push_back(CloneContainer(cur));
  143. }
  144. return new TModifyByValues(Pos, OperationHumanName, ColumnsHint, clonedValues);
  145. }
  146. private:
  147. TString OperationHumanName;
  148. TVector<TVector<TNodePtr>> Values;
  149. TSourcePtr FakeSource;
  150. };
  151. class TModifyBySource: public TModifySourceBase {
  152. public:
  153. TModifyBySource(TPosition pos, const TString& operationHumanName, const TVector<TString>& columnsHint, TSourcePtr source)
  154. : TModifySourceBase(pos, columnsHint)
  155. , OperationHumanName(operationHumanName)
  156. , Source(std::move(source))
  157. {}
  158. void GetInputTables(TTableList& tableList) const override {
  159. if (Source) {
  160. return Source->GetInputTables(tableList);
  161. }
  162. }
  163. bool DoInit(TContext& ctx, ISource* src) override {
  164. if (!Source->Init(ctx, src)) {
  165. return false;
  166. }
  167. const auto& sourceColumns = Source->GetColumns();
  168. const auto numColumns = !ColumnsHint.empty() && sourceColumns ? sourceColumns->List.size() : 0;
  169. if (ColumnsHint.size() != numColumns) {
  170. ctx.Error(Pos) << "SELECT has " << numColumns << " columns, " << OperationHumanName << " expects: " << ColumnsHint.size();
  171. return false;
  172. }
  173. if (numColumns) {
  174. TStringStream str;
  175. bool mismatchFound = false;
  176. for (size_t i = 0; i < numColumns; ++i) {
  177. bool hasName = sourceColumns->NamedColumns[i];
  178. if (hasName) {
  179. const auto& hintColumn = ColumnsHint[i];
  180. const auto& sourceColumn = sourceColumns->List[i];
  181. if (hintColumn != sourceColumn) {
  182. if (!mismatchFound) {
  183. str << "Column names in SELECT don't match column specification in parenthesis";
  184. mismatchFound = true;
  185. }
  186. str << ". \"" << hintColumn << "\" doesn't match \"" << sourceColumn << "\"";
  187. }
  188. }
  189. }
  190. if (mismatchFound) {
  191. ctx.Warning(Pos, TIssuesIds::YQL_SOURCE_SELECT_COLUMN_MISMATCH) << str.Str();
  192. }
  193. }
  194. return true;
  195. }
  196. TNodePtr Build(TContext& ctx) override {
  197. auto input = Source->Build(ctx);
  198. if (ColumnsHint.empty() || !Source->GetColumns()) {
  199. return input;
  200. }
  201. auto srcColumn = Source->GetColumns()->List.begin();
  202. auto structObj = Y("AsStruct");
  203. for (auto column: ColumnsHint) {
  204. structObj = L(structObj, Q(Y(BuildQuotedAtom(Pos, column),
  205. Y("Member", "row", BuildQuotedAtom(Pos, *srcColumn))
  206. )));
  207. ++srcColumn;
  208. }
  209. return Y("OrderedMap", input, BuildLambda(Pos, Y("row"), structObj));
  210. }
  211. TNodePtr DoClone() const final {
  212. return new TModifyBySource(Pos, OperationHumanName, ColumnsHint, Source->CloneSource());
  213. }
  214. bool IsOrdered() const final {
  215. return Source->IsOrdered();
  216. }
  217. private:
  218. TString OperationHumanName;
  219. TSourcePtr Source;
  220. };
  221. TSourcePtr BuildWriteValues(TPosition pos, const TString& operationHumanName, const TVector<TString>& columnsHint, const TVector<TVector<TNodePtr>>& values) {
  222. return new TModifyByValues(pos, operationHumanName, columnsHint, values);
  223. }
  224. TSourcePtr BuildWriteValues(TPosition pos, const TString& operationHumanName, const TVector<TString>& columnsHint, const TVector<TNodePtr>& values) {
  225. return new TModifyByValues(pos, operationHumanName, columnsHint, {values});
  226. }
  227. TSourcePtr BuildWriteValues(TPosition pos, const TString& operationHumanName, const TVector<TString>& columnsHint, TSourcePtr source) {
  228. return new TModifyBySource(pos, operationHumanName, columnsHint, std::move(source));
  229. }
  230. TSourcePtr BuildUpdateValues(TPosition pos, const TVector<TString>& columnsHint, const TVector<TNodePtr>& values) {
  231. return new TUpdateByValues(pos, "UPDATE", columnsHint, values);
  232. }
  233. class TWriteColumnsNode: public TAstListNode {
  234. public:
  235. TWriteColumnsNode(TPosition pos, const TTableRef& table, EWriteColumnMode mode, TSourcePtr values = nullptr, TNodePtr options = nullptr)
  236. : TAstListNode(pos)
  237. , Table(table)
  238. , Mode(mode)
  239. , Values(std::move(values))
  240. , Options(std::move(options))
  241. {
  242. FakeSource = BuildFakeSource(pos);
  243. }
  244. void ResetSource(TSourcePtr source) {
  245. TableSource = std::move(source);
  246. }
  247. void ResetUpdate(TSourcePtr update) {
  248. Update = std::move(update);
  249. }
  250. bool DoInit(TContext& ctx, ISource* src) override {
  251. if (!Table.Check(ctx)) {
  252. return false;
  253. }
  254. TTableList tableList;
  255. TNodePtr values;
  256. auto options = Y();
  257. if (Options) {
  258. if (!Options->Init(ctx, src)) {
  259. return false;
  260. }
  261. options = L(Options);
  262. }
  263. ISource* underlyingSrc = src;
  264. if (TableSource) {
  265. ctx.PushBlockShortcuts();
  266. if (!TableSource->Init(ctx, src) || !TableSource->InitFilters(ctx)) {
  267. return false;
  268. }
  269. options = L(options, Q(Y(Q("filter"), TableSource->BuildFilterLambda(ctx.GroundBlockShortcuts(Pos)))));
  270. }
  271. bool unordered = false;
  272. ctx.PushBlockShortcuts();
  273. if (Values) {
  274. if (!Values->Init(ctx, TableSource.Get())) {
  275. return false;
  276. }
  277. Values->GetInputTables(tableList);
  278. underlyingSrc = Values.Get();
  279. values = Values->Build(ctx);
  280. if (!values) {
  281. return false;
  282. }
  283. unordered = !Values->IsOrdered();
  284. }
  285. TNodePtr node(BuildInputTables(Pos, tableList, false));
  286. if (!node->Init(ctx, underlyingSrc)) {
  287. return false;
  288. }
  289. if (Update) {
  290. if (!Update->Init(ctx, TableSource.Get()) || !Update->InitFilters(ctx)) {
  291. return false;
  292. }
  293. options = L(options, Q(Y(Q("update"), Update->Build(ctx))));
  294. }
  295. auto write = BuildWriteTable(Pos, "values", Table, Mode, std::move(options));
  296. if (!write->Init(ctx, FakeSource.Get())) {
  297. return false;
  298. }
  299. node = ctx.GroundBlockShortcuts(Pos, node);
  300. if (values) {
  301. node = L(node, Y("let", "values", values));
  302. if (unordered && ctx.UseUnordered(Table)) {
  303. node = L(node, Y("let", "values", Y("Unordered", "values")));
  304. }
  305. } else {
  306. node = L(node, Y("let", "values", Y("Void")));
  307. }
  308. node = L(node, Y("let", "world", write));
  309. node = L(node, Y("return", "world"));
  310. Add("block", Q(node));
  311. return true;
  312. }
  313. TNodePtr DoClone() const final {
  314. return {};
  315. }
  316. protected:
  317. TTableRef Table;
  318. TSourcePtr TableSource;
  319. EWriteColumnMode Mode;
  320. TSourcePtr Values;
  321. TSourcePtr Update;
  322. TSourcePtr FakeSource;
  323. TNodePtr Options;
  324. };
  325. EWriteColumnMode ToWriteColumnsMode(ESQLWriteColumnMode sqlWriteColumnMode) {
  326. return sqlIntoMode2WriteColumn.at(sqlWriteColumnMode);
  327. }
  328. TNodePtr BuildWriteColumns(TPosition pos, const TTableRef& table, EWriteColumnMode mode, TSourcePtr values, TNodePtr options) {
  329. YQL_ENSURE(values, "Invalid values node");
  330. return new TWriteColumnsNode(pos, table, mode, std::move(values), std::move(options));
  331. }
  332. TNodePtr BuildUpdateColumns(TPosition pos, const TTableRef& table, TSourcePtr values, TSourcePtr source) {
  333. YQL_ENSURE(values, "Invalid values node");
  334. TIntrusivePtr<TWriteColumnsNode> writeNode = new TWriteColumnsNode(pos, table, EWriteColumnMode::Update);
  335. writeNode->ResetSource(std::move(source));
  336. writeNode->ResetUpdate(std::move(values));
  337. return writeNode;
  338. }
  339. TNodePtr BuildDelete(TPosition pos, const TTableRef& table, TSourcePtr source) {
  340. TIntrusivePtr<TWriteColumnsNode> writeNode = new TWriteColumnsNode(pos, table, EWriteColumnMode::Delete);
  341. writeNode->ResetSource(std::move(source));
  342. return writeNode;
  343. }
  344. class TEraseColumnsNode: public TAstListNode {
  345. public:
  346. TEraseColumnsNode(TPosition pos, const TVector<TString>& columns)
  347. : TAstListNode(pos)
  348. , Columns(columns)
  349. {
  350. }
  351. bool DoInit(TContext& ctx, ISource* src) override {
  352. Y_UNUSED(ctx);
  353. Y_UNUSED(src);
  354. TNodePtr columnList = Y();
  355. for (const auto& column: Columns) {
  356. columnList->Add(Q(column));
  357. }
  358. Add(Q(Y(Q("erase_columns"), Q(columnList))));
  359. return true;
  360. }
  361. TNodePtr DoClone() const final {
  362. return {};
  363. }
  364. private:
  365. TVector<TString> Columns;
  366. };
  367. TNodePtr BuildEraseColumns(TPosition pos, const TVector<TString>& columns) {
  368. return new TEraseColumnsNode(pos, columns);
  369. }
  370. } // namespace NSQLTranslationV0