sql_into_tables.cpp 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268
  1. #include "sql_into_tables.h"
  2. #include "sql_values.h"
  3. #include <util/string/join.h>
  4. using namespace NYql;
  5. namespace NSQLTranslationV1 {
  6. using NALPDefault::SQLv1LexerTokens;
  7. using namespace NSQLv1Generated;
  8. TNodePtr TSqlIntoTable::Build(const TRule_into_table_stmt& node) {
  9. static const TMap<TString, ESQLWriteColumnMode> str2Mode = {
  10. {"InsertInto", ESQLWriteColumnMode::InsertInto},
  11. {"InsertOrAbortInto", ESQLWriteColumnMode::InsertOrAbortInto},
  12. {"InsertOrIgnoreInto", ESQLWriteColumnMode::InsertOrIgnoreInto},
  13. {"InsertOrRevertInto", ESQLWriteColumnMode::InsertOrRevertInto},
  14. {"UpsertInto", ESQLWriteColumnMode::UpsertInto},
  15. {"ReplaceInto", ESQLWriteColumnMode::ReplaceInto},
  16. {"InsertIntoWithTruncate", ESQLWriteColumnMode::InsertIntoWithTruncate}
  17. };
  18. auto& modeBlock = node.GetBlock1();
  19. TVector<TToken> modeTokens;
  20. switch (modeBlock.Alt_case()) {
  21. case TRule_into_table_stmt_TBlock1::AltCase::kAlt1:
  22. modeTokens = {modeBlock.GetAlt1().GetToken1()};
  23. break;
  24. case TRule_into_table_stmt_TBlock1::AltCase::kAlt2:
  25. modeTokens = {
  26. modeBlock.GetAlt2().GetToken1(),
  27. modeBlock.GetAlt2().GetToken2(),
  28. modeBlock.GetAlt2().GetToken3()
  29. };
  30. break;
  31. case TRule_into_table_stmt_TBlock1::AltCase::kAlt3:
  32. modeTokens = {
  33. modeBlock.GetAlt3().GetToken1(),
  34. modeBlock.GetAlt3().GetToken2(),
  35. modeBlock.GetAlt3().GetToken3()
  36. };
  37. break;
  38. case TRule_into_table_stmt_TBlock1::AltCase::kAlt4:
  39. modeTokens = {
  40. modeBlock.GetAlt4().GetToken1(),
  41. modeBlock.GetAlt4().GetToken2(),
  42. modeBlock.GetAlt4().GetToken3()
  43. };
  44. break;
  45. case TRule_into_table_stmt_TBlock1::AltCase::kAlt5:
  46. modeTokens = {modeBlock.GetAlt5().GetToken1()};
  47. break;
  48. case TRule_into_table_stmt_TBlock1::AltCase::kAlt6:
  49. modeTokens = {modeBlock.GetAlt6().GetToken1()};
  50. break;
  51. case TRule_into_table_stmt_TBlock1::AltCase::ALT_NOT_SET:
  52. Y_ABORT("You should change implementation according to grammar changes");
  53. }
  54. TVector<TString> modeStrings;
  55. modeStrings.reserve(modeTokens.size());
  56. TVector<TString> userModeStrings;
  57. userModeStrings.reserve(modeTokens.size());
  58. for (auto& token : modeTokens) {
  59. auto tokenStr = Token(token);
  60. auto modeStr = tokenStr;
  61. modeStr.to_lower();
  62. modeStr.to_upper(0, 1);
  63. modeStrings.push_back(modeStr);
  64. auto userModeStr = tokenStr;
  65. userModeStr.to_upper();
  66. userModeStrings.push_back(userModeStr);
  67. }
  68. modeStrings.push_back("Into");
  69. userModeStrings.push_back("INTO");
  70. SqlIntoModeStr = JoinRange("", modeStrings.begin(), modeStrings.end());
  71. SqlIntoUserModeStr = JoinRange(" ", userModeStrings.begin(), userModeStrings.end());
  72. const auto& intoTableRef = node.GetRule_into_simple_table_ref3();
  73. const auto& tableRef = intoTableRef.GetRule_simple_table_ref1();
  74. const auto& tableRefCore = tableRef.GetRule_simple_table_ref_core1();
  75. auto service = Ctx.Scoped->CurrService;
  76. auto cluster = Ctx.Scoped->CurrCluster;
  77. std::pair<bool, TDeferredAtom> nameOrAt;
  78. bool isBinding = false;
  79. switch (tableRefCore.Alt_case()) {
  80. case TRule_simple_table_ref_core::AltCase::kAltSimpleTableRefCore1: {
  81. if (tableRefCore.GetAlt_simple_table_ref_core1().GetRule_object_ref1().HasBlock1()) {
  82. const auto& clusterExpr = tableRefCore.GetAlt_simple_table_ref_core1().GetRule_object_ref1().GetBlock1().GetRule_cluster_expr1();
  83. bool hasAt = tableRefCore.GetAlt_simple_table_ref_core1().GetRule_object_ref1().GetRule_id_or_at2().HasBlock1();
  84. bool result = !hasAt ?
  85. ClusterExprOrBinding(clusterExpr, service, cluster, isBinding) : ClusterExpr(clusterExpr, false, service, cluster);
  86. if (!result) {
  87. return nullptr;
  88. }
  89. }
  90. if (!isBinding && cluster.Empty()) {
  91. Ctx.Error() << "No cluster name given and no default cluster is selected";
  92. return nullptr;
  93. }
  94. auto id = Id(tableRefCore.GetAlt_simple_table_ref_core1().GetRule_object_ref1().GetRule_id_or_at2(), *this);
  95. nameOrAt = std::make_pair(id.first, TDeferredAtom(Ctx.Pos(), id.second));
  96. break;
  97. }
  98. case TRule_simple_table_ref_core::AltCase::kAltSimpleTableRefCore2: {
  99. auto at = tableRefCore.GetAlt_simple_table_ref_core2().HasBlock1();
  100. TString name;
  101. if (!NamedNodeImpl(tableRefCore.GetAlt_simple_table_ref_core2().GetRule_bind_parameter2(), name, *this)) {
  102. return nullptr;
  103. }
  104. auto named = GetNamedNode(name);
  105. if (!named) {
  106. return nullptr;
  107. }
  108. named->SetRefPos(Ctx.Pos());
  109. if (cluster.Empty()) {
  110. Ctx.Error() << "No cluster name given and no default cluster is selected";
  111. return nullptr;
  112. }
  113. TDeferredAtom table;
  114. MakeTableFromExpression(Ctx.Pos(), Ctx, named, table);
  115. nameOrAt = std::make_pair(at, table);
  116. break;
  117. }
  118. case TRule_simple_table_ref_core::AltCase::ALT_NOT_SET:
  119. Y_ABORT("You should change implementation according to grammar changes");
  120. }
  121. bool withTruncate = false;
  122. TTableHints tableHints;
  123. if (tableRef.HasBlock2()) {
  124. auto hints = TableHintsImpl(tableRef.GetBlock2().GetRule_table_hints1(), service);
  125. if (!hints) {
  126. Ctx.Error() << "Failed to parse table hints";
  127. return nullptr;
  128. }
  129. for (const auto& hint : *hints) {
  130. if (to_upper(hint.first) == "TRUNCATE") {
  131. withTruncate = true;
  132. }
  133. }
  134. std::erase_if(*hints, [](const auto &hint) { return to_upper(hint.first) == "TRUNCATE"; });
  135. tableHints = std::move(*hints);
  136. }
  137. TVector<TString> eraseColumns;
  138. if (intoTableRef.HasBlock2()) {
  139. if (service != StatProviderName) {
  140. Ctx.Error() << "ERASE BY is unsupported for " << service;
  141. return nullptr;
  142. }
  143. PureColumnListStr(
  144. intoTableRef.GetBlock2().GetRule_pure_column_list3(), *this, eraseColumns
  145. );
  146. }
  147. if (withTruncate) {
  148. if (SqlIntoModeStr != "InsertInto") {
  149. Error() << "Unable " << SqlIntoUserModeStr << " with truncate mode";
  150. return nullptr;
  151. }
  152. SqlIntoModeStr += "WithTruncate";
  153. SqlIntoUserModeStr += " ... WITH TRUNCATE";
  154. }
  155. const auto iterMode = str2Mode.find(SqlIntoModeStr);
  156. YQL_ENSURE(iterMode != str2Mode.end(), "Invalid sql write mode string: " << SqlIntoModeStr);
  157. const auto SqlIntoMode = iterMode->second;
  158. TPosition pos(Ctx.Pos());
  159. TTableRef table(Ctx.MakeName("table"), service, cluster, nullptr);
  160. if (isBinding) {
  161. const TString* binding = nameOrAt.second.GetLiteral();
  162. YQL_ENSURE(binding);
  163. YQL_ENSURE(!nameOrAt.first);
  164. if (!ApplyTableBinding(*binding, table, tableHints)) {
  165. return nullptr;
  166. }
  167. } else {
  168. table.Keys = BuildTableKey(pos, service, cluster, nameOrAt.second, {nameOrAt.first ? "@" : ""});
  169. }
  170. Ctx.IncrementMonCounter("sql_insert_clusters", table.Cluster.GetLiteral() ? *table.Cluster.GetLiteral() : "unknown");
  171. auto values = TSqlIntoValues(Ctx, Mode).Build(node.GetRule_into_values_source4(), SqlIntoUserModeStr);
  172. if (!values) {
  173. return nullptr;
  174. }
  175. if (!ValidateServiceName(node, table, SqlIntoMode, GetPos(modeTokens[0]))) {
  176. return nullptr;
  177. }
  178. Ctx.IncrementMonCounter("sql_features", SqlIntoModeStr);
  179. auto options = BuildIntoTableOptions(pos, eraseColumns, tableHints);
  180. if (node.HasBlock5()) {
  181. options = options->L(options, ReturningList(node.GetBlock5().GetRule_returning_columns_list1()));
  182. }
  183. return BuildWriteColumns(pos, Ctx.Scoped, table,
  184. ToWriteColumnsMode(SqlIntoMode), std::move(values),
  185. options);
  186. }
  187. bool TSqlIntoTable::ValidateServiceName(const TRule_into_table_stmt& node, const TTableRef& table,
  188. ESQLWriteColumnMode mode, const TPosition& pos) {
  189. Y_UNUSED(node);
  190. auto serviceName = table.Service;
  191. const bool isMapReduce = serviceName == YtProviderName;
  192. const bool isKikimr = serviceName == KikimrProviderName || serviceName == YdbProviderName;
  193. const bool isRtmr = serviceName == RtmrProviderName;
  194. const bool isStat = serviceName == StatProviderName;
  195. if (!isKikimr) {
  196. if (mode == ESQLWriteColumnMode::InsertOrAbortInto ||
  197. mode == ESQLWriteColumnMode::InsertOrIgnoreInto ||
  198. mode == ESQLWriteColumnMode::InsertOrRevertInto ||
  199. mode == ESQLWriteColumnMode::UpsertInto && !isStat)
  200. {
  201. Ctx.Error(pos) << SqlIntoUserModeStr << " is not supported for " << serviceName << " tables";
  202. Ctx.IncrementMonCounter("sql_errors", TStringBuilder() << SqlIntoUserModeStr << "UnsupportedFor" << serviceName);
  203. return false;
  204. }
  205. }
  206. if (isMapReduce) {
  207. if (mode == ESQLWriteColumnMode::ReplaceInto) {
  208. Ctx.Error(pos) << "Meaning of REPLACE INTO has been changed, now you should use INSERT INTO <table> WITH TRUNCATE ... for " << serviceName;
  209. Ctx.IncrementMonCounter("sql_errors", "ReplaceIntoConflictUsage");
  210. return false;
  211. }
  212. } else if (isKikimr) {
  213. if (mode == ESQLWriteColumnMode::InsertIntoWithTruncate) {
  214. Ctx.Error(pos) << "INSERT INTO WITH TRUNCATE is not supported for " << serviceName << " tables";
  215. Ctx.IncrementMonCounter("sql_errors", TStringBuilder() << SqlIntoUserModeStr << "UnsupportedFor" << serviceName);
  216. return false;
  217. }
  218. } else if (isRtmr) {
  219. if (mode != ESQLWriteColumnMode::InsertInto) {
  220. Ctx.Error(pos) << SqlIntoUserModeStr << " is unsupported for " << serviceName;
  221. Ctx.IncrementMonCounter("sql_errors", TStringBuilder() << SqlIntoUserModeStr << "UnsupportedFor" << serviceName);
  222. return false;
  223. }
  224. } else if (isStat) {
  225. if (mode != ESQLWriteColumnMode::UpsertInto) {
  226. Ctx.Error(pos) << SqlIntoUserModeStr << " is unsupported for " << serviceName;
  227. Ctx.IncrementMonCounter("sql_errors", TStringBuilder() << SqlIntoUserModeStr << "UnsupportedFor" << serviceName);
  228. return false;
  229. }
  230. }
  231. return true;
  232. }
  233. } // namespace NSQLTranslationV1