insert.cpp 16 KB


  1. #include "source.h"
  2. #include "context.h"
  3. #include <yql/essentials/utils/yql_panic.h>
  4. using namespace NYql;
  5. namespace NSQLTranslationV1 {
  6. static const TMap<ESQLWriteColumnMode, EWriteColumnMode> sqlIntoMode2WriteColumn = {
  7. {ESQLWriteColumnMode::InsertInto, EWriteColumnMode::Insert},
  8. {ESQLWriteColumnMode::InsertOrAbortInto, EWriteColumnMode::InsertOrAbort},
  9. {ESQLWriteColumnMode::InsertOrIgnoreInto, EWriteColumnMode::InsertOrIgnore},
  10. {ESQLWriteColumnMode::InsertOrRevertInto, EWriteColumnMode::InsertOrRevert},
  11. {ESQLWriteColumnMode::UpsertInto, EWriteColumnMode::Upsert},
  12. {ESQLWriteColumnMode::ReplaceInto, EWriteColumnMode::Replace},
  13. {ESQLWriteColumnMode::InsertIntoWithTruncate, EWriteColumnMode::Renew},
  14. {ESQLWriteColumnMode::Update, EWriteColumnMode::Update},
  15. {ESQLWriteColumnMode::Delete, EWriteColumnMode::Delete},
  16. };
  17. class TModifySourceBase: public ISource {
  18. public:
  19. TModifySourceBase(TPosition pos, const TVector<TString>& columnsHint)
  20. : ISource(pos)
  21. , ColumnsHint(columnsHint)
  22. {
  23. }
  24. bool AddFilter(TContext& ctx, TNodePtr filter) override {
  25. Y_UNUSED(filter);
  26. ctx.Error(Pos) << "Source does not allow filtering";
  27. return false;
  28. }
  29. bool AddGroupKey(TContext& ctx, const TString& column) override {
  30. Y_UNUSED(column);
  31. ctx.Error(Pos) << "Source does not allow grouping";
  32. return false;
  33. }
  34. bool AddAggregation(TContext& ctx, TAggregationPtr aggr) override {
  35. YQL_ENSURE(aggr);
  36. ctx.Error(aggr->GetPos()) << "Source does not allow aggregation";
  37. return false;
  38. }
  39. TNodePtr BuildFilter(TContext& ctx, const TString& label) override {
  40. Y_UNUSED(ctx);
  41. Y_UNUSED(label);
  42. return nullptr;
  43. }
  44. std::pair<TNodePtr, bool> BuildAggregation(const TString& label, TContext& ctx) override {
  45. Y_UNUSED(label);
  46. Y_UNUSED(ctx);
  47. return { nullptr, true };
  48. }
  49. protected:
  50. TVector<TString> ColumnsHint;
  51. TString OperationHumanName;
  52. };
  53. class TUpdateByValues: public TModifySourceBase {
  54. public:
  55. TUpdateByValues(TPosition pos, const TString& operationHumanName, const TVector<TString>& columnsHint, const TVector<TNodePtr>& values)
  56. : TModifySourceBase(pos, columnsHint)
  57. , OperationHumanName(operationHumanName)
  58. , Values(values)
  59. {}
  60. bool DoInit(TContext& ctx, ISource* src) override {
  61. if (ColumnsHint.size() != Values.size()) {
  62. ctx.Error(Pos) << "VALUES have " << Values.size() << " columns, " << OperationHumanName << " expects: " << ColumnsHint.size();
  63. return false;
  64. }
  65. for (auto& value: Values) {
  66. if (!value->Init(ctx, src)) {
  67. return false;
  68. }
  69. }
  70. return true;
  71. }
  72. TNodePtr Build(TContext& ctx) override {
  73. Y_UNUSED(ctx);
  74. YQL_ENSURE(Values.size() == ColumnsHint.size());
  75. auto structObj = Y("AsStruct");
  76. for (size_t i = 0; i < Values.size(); ++i) {
  77. TString column = ColumnsHint[i];
  78. TNodePtr value = Values[i];
  79. structObj = L(structObj, Q(Y(Q(column), value)));
  80. }
  81. auto updateRow = BuildLambda(Pos, Y("row"), structObj);
  82. return updateRow;
  83. }
  84. TNodePtr DoClone() const final {
  85. return new TUpdateByValues(Pos, OperationHumanName, ColumnsHint, CloneContainer(Values));
  86. }
  87. private:
  88. TString OperationHumanName;
  89. protected:
  90. TVector<TNodePtr> Values;
  91. };
  92. class TModifyByValues: public TModifySourceBase {
  93. public:
  94. TModifyByValues(TPosition pos, const TString& operationHumanName, const TVector<TString>& columnsHint, const TVector<TVector<TNodePtr>>& values)
  95. : TModifySourceBase(pos, columnsHint)
  96. , OperationHumanName(operationHumanName)
  97. , Values(values)
  98. {
  99. FakeSource = BuildFakeSource(pos);
  100. }
  101. bool DoInit(TContext& ctx, ISource* src) override {
  102. Y_UNUSED(src);
  103. bool hasError = false;
  104. for (const auto& row: Values) {
  105. if (ColumnsHint.empty()) {
  106. ctx.Error(Pos) << OperationHumanName << " ... VALUES requires specification of table columns";
  107. hasError = true;
  108. continue;
  109. }
  110. if (ColumnsHint.size() != row.size()) {
  111. ctx.Error(Pos) << "VALUES have " << row.size() << " columns, " << OperationHumanName << " expects: " << ColumnsHint.size();
  112. hasError = true;
  113. continue;
  114. }
  115. for (auto& value: row) {
  116. if (!value->Init(ctx, FakeSource.Get())) {
  117. hasError = true;
  118. continue;
  119. }
  120. }
  121. }
  122. return !hasError;
  123. }
  124. TNodePtr Build(TContext& ctx) override {
  125. Y_UNUSED(ctx);
  126. auto tuple = Y();
  127. for (const auto& row: Values) {
  128. auto rowValues = Y("AsStruct"); // ordered struct
  129. auto column = ColumnsHint.begin();
  130. for (auto value: row) {
  131. rowValues = L(rowValues, Q(Y(BuildQuotedAtom(Pos, *column), value)));
  132. ++column;
  133. }
  134. tuple = L(tuple, rowValues);
  135. }
  136. return Y("PersistableRepr", Q(tuple));
  137. }
  138. TNodePtr DoClone() const final {
  139. TVector<TVector<TNodePtr>> clonedValues;
  140. clonedValues.reserve(Values.size());
  141. for (auto cur: Values) {
  142. clonedValues.push_back(CloneContainer(cur));
  143. }
  144. return new TModifyByValues(Pos, OperationHumanName, ColumnsHint, clonedValues);
  145. }
  146. private:
  147. TString OperationHumanName;
  148. TVector<TVector<TNodePtr>> Values;
  149. TSourcePtr FakeSource;
  150. };
  151. class TModifyBySource: public TModifySourceBase {
  152. public:
  153. TModifyBySource(TPosition pos, const TString& operationHumanName, const TVector<TString>& columnsHint, TSourcePtr source)
  154. : TModifySourceBase(pos, columnsHint)
  155. , OperationHumanName(operationHumanName)
  156. , Source(std::move(source))
  157. {}
  158. void GetInputTables(TTableList& tableList) const override {
  159. if (Source) {
  160. return Source->GetInputTables(tableList);
  161. }
  162. }
  163. bool DoInit(TContext& ctx, ISource* src) override {
  164. if (!Source->Init(ctx, src)) {
  165. return false;
  166. }
  167. const size_t numColumns = ColumnsHint.size();
  168. if (numColumns) {
  169. const auto sourceColumns = Source->GetColumns();
  170. if (!sourceColumns || sourceColumns->All || sourceColumns->QualifiedAll) {
  171. return true;
  172. }
  173. if (numColumns != sourceColumns->List.size()) {
  174. ctx.Error(Pos) << "SELECT have " << numColumns << " columns, " << OperationHumanName << " expects: " << ColumnsHint.size();
  175. return false;
  176. }
  177. TStringStream str;
  178. bool mismatchFound = false;
  179. for (size_t i = 0; i < numColumns; ++i) {
  180. bool hasName = sourceColumns->NamedColumns[i];
  181. if (hasName) {
  182. const auto& hintColumn = ColumnsHint[i];
  183. const auto& sourceColumn = sourceColumns->List[i];
  184. if (hintColumn != sourceColumn) {
  185. if (!mismatchFound) {
  186. str << "Column names in SELECT don't match column specification in parenthesis";
  187. mismatchFound = true;
  188. }
  189. str << ". \"" << hintColumn << "\" doesn't match \"" << sourceColumn << "\"";
  190. }
  191. }
  192. }
  193. if (mismatchFound) {
  194. ctx.Warning(Pos, TIssuesIds::YQL_SOURCE_SELECT_COLUMN_MISMATCH) << str.Str();
  195. }
  196. }
  197. return true;
  198. }
  199. TNodePtr Build(TContext& ctx) override {
  200. auto input = Source->Build(ctx);
  201. if (ColumnsHint.empty()) {
  202. return input;
  203. }
  204. auto columns = Y();
  205. for (auto column: ColumnsHint) {
  206. columns = L(columns, BuildQuotedAtom(Pos, column));
  207. }
  208. const auto sourceColumns = Source->GetColumns();
  209. if (!sourceColumns || sourceColumns->All || sourceColumns->QualifiedAll || sourceColumns->HasUnnamed) {
  210. // will try to resolve column mapping on type annotation stage
  211. return Y("OrderedSqlRename", input, Q(columns));
  212. }
  213. YQL_ENSURE(sourceColumns->List.size() == ColumnsHint.size());
  214. auto srcColumn = Source->GetColumns()->List.begin();
  215. auto structObj = Y("AsStruct"); // ordered struct
  216. for (auto column: ColumnsHint) {
  217. structObj = L(structObj, Q(Y(BuildQuotedAtom(Pos, column),
  218. Y("Member", "row", BuildQuotedAtom(Pos, *srcColumn))
  219. )));
  220. ++srcColumn;
  221. }
  222. return Y("AssumeColumnOrder", Y("OrderedMap", input, BuildLambda(Pos, Y("row"), structObj)), Q(columns));
  223. }
  224. TNodePtr DoClone() const final {
  225. return new TModifyBySource(Pos, OperationHumanName, ColumnsHint, Source->CloneSource());
  226. }
  227. EOrderKind GetOrderKind() const final {
  228. return Source->GetOrderKind();
  229. }
  230. private:
  231. TString OperationHumanName;
  232. TSourcePtr Source;
  233. };
  234. TSourcePtr BuildWriteValues(TPosition pos, const TString& operationHumanName, const TVector<TString>& columnsHint, const TVector<TVector<TNodePtr>>& values) {
  235. return new TModifyByValues(pos, operationHumanName, columnsHint, values);
  236. }
  237. TSourcePtr BuildWriteValues(TPosition pos, const TString& operationHumanName, const TVector<TString>& columnsHint, TSourcePtr source) {
  238. return new TModifyBySource(pos, operationHumanName, columnsHint, std::move(source));
  239. }
  240. TSourcePtr BuildUpdateValues(TPosition pos, const TVector<TString>& columnsHint, const TVector<TNodePtr>& values) {
  241. return new TUpdateByValues(pos, "UPDATE", columnsHint, values);
  242. }
  243. class TWriteColumnsNode: public TAstListNode {
  244. public:
  245. TWriteColumnsNode(TPosition pos, TScopedStatePtr scoped,
  246. const TTableRef& table, EWriteColumnMode mode, TSourcePtr values = nullptr, TNodePtr options = nullptr)
  247. : TAstListNode(pos)
  248. , Scoped(scoped)
  249. , Table(table)
  250. , Mode(mode)
  251. , Values(std::move(values))
  252. , Options(std::move(options))
  253. {
  254. FakeSource = BuildFakeSource(pos);
  255. }
  256. void ResetSource(TSourcePtr source) {
  257. TableSource = std::move(source);
  258. }
  259. void ResetUpdate(TSourcePtr update) {
  260. Update = std::move(update);
  261. }
  262. void ResetIsBatch(bool isBatch) {
  263. IsBatch = isBatch;
  264. }
  265. bool DoInit(TContext& ctx, ISource* src) override {
  266. TTableList tableList;
  267. TNodePtr values;
  268. auto options = Y();
  269. if (Options) {
  270. if (!Options->Init(ctx, src)) {
  271. return false;
  272. }
  273. options = L(Options);
  274. }
  275. ISource* underlyingSrc = src;
  276. if (TableSource) {
  277. if (!TableSource->Init(ctx, src) || !TableSource->InitFilters(ctx)) {
  278. return false;
  279. }
  280. options = L(options, Q(Y(Q("filter"), TableSource->BuildFilterLambda())));
  281. }
  282. bool unordered = false;
  283. if (Values) {
  284. if (!Values->Init(ctx, TableSource.Get())) {
  285. return false;
  286. }
  287. Values->GetInputTables(tableList);
  288. underlyingSrc = Values.Get();
  289. values = Values->Build(ctx);
  290. if (!values) {
  291. return false;
  292. }
  293. unordered = (EOrderKind::None == Values->GetOrderKind());
  294. }
  295. TNodePtr node(BuildInputTables(Pos, tableList, false, Scoped));
  296. if (!node->Init(ctx, underlyingSrc)) {
  297. return false;
  298. }
  299. if (Update) {
  300. if (!Update->Init(ctx, TableSource.Get()) || !Update->InitFilters(ctx)) {
  301. return false;
  302. }
  303. options = L(options, Q(Y(Q("update"), Update->Build(ctx))));
  304. }
  305. if (IsBatch) {
  306. options = L(options, Q(Y(Q("is_batch"), Q("true"))));
  307. }
  308. auto write = BuildWriteTable(Pos, "values", Table, Mode, std::move(options), Scoped);
  309. if (!write->Init(ctx, FakeSource.Get())) {
  310. return false;
  311. }
  312. if (values) {
  313. node = L(node, Y("let", "values", values));
  314. if (unordered && ctx.UseUnordered(Table)) {
  315. node = L(node, Y("let", "values", Y("Unordered", "values")));
  316. }
  317. } else {
  318. node = L(node, Y("let", "values", Y("Void")));
  319. }
  320. node = L(node, Y("let", "world", write));
  321. node = L(node, Y("return", "world"));
  322. Add("block", Q(node));
  323. return true;
  324. }
  325. TNodePtr DoClone() const final {
  326. return {};
  327. }
  328. protected:
  329. TScopedStatePtr Scoped;
  330. TTableRef Table;
  331. TSourcePtr TableSource;
  332. EWriteColumnMode Mode;
  333. TSourcePtr Values;
  334. TSourcePtr Update;
  335. TSourcePtr FakeSource;
  336. TNodePtr Options;
  337. bool IsBatch = false;
  338. };
  339. EWriteColumnMode ToWriteColumnsMode(ESQLWriteColumnMode sqlWriteColumnMode) {
  340. return sqlIntoMode2WriteColumn.at(sqlWriteColumnMode);
  341. }
  342. TNodePtr BuildWriteColumns(TPosition pos, TScopedStatePtr scoped, const TTableRef& table, EWriteColumnMode mode, TSourcePtr values, TNodePtr options) {
  343. YQL_ENSURE(values, "Invalid values node");
  344. return new TWriteColumnsNode(pos, scoped, table, mode, std::move(values), std::move(options));
  345. }
  346. TNodePtr BuildUpdateColumns(TPosition pos, TScopedStatePtr scoped, const TTableRef& table, TSourcePtr values, TSourcePtr source, TNodePtr options) {
  347. YQL_ENSURE(values, "Invalid values node");
  348. TIntrusivePtr<TWriteColumnsNode> writeNode = new TWriteColumnsNode(pos, scoped, table, EWriteColumnMode::Update, nullptr, options);
  349. writeNode->ResetSource(std::move(source));
  350. writeNode->ResetUpdate(std::move(values));
  351. return writeNode;
  352. }
  353. TNodePtr BuildBatchUpdate(TPosition pos, TScopedStatePtr scoped, const TTableRef& table, TSourcePtr values, TSourcePtr source, TNodePtr options) {
  354. YQL_ENSURE(values, "Invalid values node");
  355. TIntrusivePtr<TWriteColumnsNode> writeNode = new TWriteColumnsNode(pos, scoped, table, EWriteColumnMode::Update, nullptr, options);
  356. writeNode->ResetSource(std::move(source));
  357. writeNode->ResetUpdate(std::move(values));
  358. writeNode->ResetIsBatch(true);
  359. return writeNode;
  360. }
  361. TNodePtr BuildDelete(TPosition pos, TScopedStatePtr scoped, const TTableRef& table, TSourcePtr source, TNodePtr options) {
  362. TIntrusivePtr<TWriteColumnsNode> writeNode = new TWriteColumnsNode(pos, scoped, table, EWriteColumnMode::Delete, nullptr, options);
  363. writeNode->ResetSource(std::move(source));
  364. return writeNode;
  365. }
  366. TNodePtr BuildBatchDelete(TPosition pos, TScopedStatePtr scoped, const TTableRef& table, TSourcePtr source, TNodePtr options) {
  367. TIntrusivePtr<TWriteColumnsNode> writeNode = new TWriteColumnsNode(pos, scoped, table, EWriteColumnMode::Delete, nullptr, options);
  368. writeNode->ResetSource(std::move(source));
  369. writeNode->ResetIsBatch(true);
  370. return writeNode;
  371. }
  372. class TEraseColumnsNode: public TAstListNode {
  373. public:
  374. TEraseColumnsNode(TPosition pos, const TVector<TString>& columns)
  375. : TAstListNode(pos)
  376. , Columns(columns)
  377. {
  378. }
  379. bool DoInit(TContext& ctx, ISource* src) override {
  380. Y_UNUSED(ctx);
  381. Y_UNUSED(src);
  382. TNodePtr columnList = Y();
  383. for (const auto& column: Columns) {
  384. columnList->Add(Q(column));
  385. }
  386. Add(Q(Y(Q("erase_columns"), Q(columnList))));
  387. return true;
  388. }
  389. TNodePtr DoClone() const final {
  390. return new TEraseColumnsNode(GetPos(), Columns);
  391. }
  392. private:
  393. TVector<TString> Columns;
  394. };
  395. TNodePtr BuildEraseColumns(TPosition pos, const TVector<TString>& columns) {
  396. return new TEraseColumnsNode(pos, columns);
  397. }
  398. } // namespace NSQLTranslationV1