sql_into_tables.cpp 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266
  1. #include "sql_into_tables.h"
  2. #include "sql_values.h"
  3. #include <util/string/join.h>
  4. using namespace NYql;
  5. namespace NSQLTranslationV1 {
  6. using namespace NSQLv1Generated;
  7. TNodePtr TSqlIntoTable::Build(const TRule_into_table_stmt& node) {
  8. static const TMap<TString, ESQLWriteColumnMode> str2Mode = {
  9. {"InsertInto", ESQLWriteColumnMode::InsertInto},
  10. {"InsertOrAbortInto", ESQLWriteColumnMode::InsertOrAbortInto},
  11. {"InsertOrIgnoreInto", ESQLWriteColumnMode::InsertOrIgnoreInto},
  12. {"InsertOrRevertInto", ESQLWriteColumnMode::InsertOrRevertInto},
  13. {"UpsertInto", ESQLWriteColumnMode::UpsertInto},
  14. {"ReplaceInto", ESQLWriteColumnMode::ReplaceInto},
  15. {"InsertIntoWithTruncate", ESQLWriteColumnMode::InsertIntoWithTruncate}
  16. };
  17. auto& modeBlock = node.GetBlock1();
  18. TVector<TToken> modeTokens;
  19. switch (modeBlock.Alt_case()) {
  20. case TRule_into_table_stmt_TBlock1::AltCase::kAlt1:
  21. modeTokens = {modeBlock.GetAlt1().GetToken1()};
  22. break;
  23. case TRule_into_table_stmt_TBlock1::AltCase::kAlt2:
  24. modeTokens = {
  25. modeBlock.GetAlt2().GetToken1(),
  26. modeBlock.GetAlt2().GetToken2(),
  27. modeBlock.GetAlt2().GetToken3()
  28. };
  29. break;
  30. case TRule_into_table_stmt_TBlock1::AltCase::kAlt3:
  31. modeTokens = {
  32. modeBlock.GetAlt3().GetToken1(),
  33. modeBlock.GetAlt3().GetToken2(),
  34. modeBlock.GetAlt3().GetToken3()
  35. };
  36. break;
  37. case TRule_into_table_stmt_TBlock1::AltCase::kAlt4:
  38. modeTokens = {
  39. modeBlock.GetAlt4().GetToken1(),
  40. modeBlock.GetAlt4().GetToken2(),
  41. modeBlock.GetAlt4().GetToken3()
  42. };
  43. break;
  44. case TRule_into_table_stmt_TBlock1::AltCase::kAlt5:
  45. modeTokens = {modeBlock.GetAlt5().GetToken1()};
  46. break;
  47. case TRule_into_table_stmt_TBlock1::AltCase::kAlt6:
  48. modeTokens = {modeBlock.GetAlt6().GetToken1()};
  49. break;
  50. case TRule_into_table_stmt_TBlock1::AltCase::ALT_NOT_SET:
  51. Y_ABORT("You should change implementation according to grammar changes");
  52. }
  53. TVector<TString> modeStrings;
  54. modeStrings.reserve(modeTokens.size());
  55. TVector<TString> userModeStrings;
  56. userModeStrings.reserve(modeTokens.size());
  57. for (auto& token : modeTokens) {
  58. auto tokenStr = Token(token);
  59. auto modeStr = tokenStr;
  60. modeStr.to_lower();
  61. modeStr.to_upper(0, 1);
  62. modeStrings.push_back(modeStr);
  63. auto userModeStr = tokenStr;
  64. userModeStr.to_upper();
  65. userModeStrings.push_back(userModeStr);
  66. }
  67. modeStrings.push_back("Into");
  68. userModeStrings.push_back("INTO");
  69. SqlIntoModeStr = JoinRange("", modeStrings.begin(), modeStrings.end());
  70. SqlIntoUserModeStr = JoinRange(" ", userModeStrings.begin(), userModeStrings.end());
  71. const auto& intoTableRef = node.GetRule_into_simple_table_ref3();
  72. const auto& tableRef = intoTableRef.GetRule_simple_table_ref1();
  73. const auto& tableRefCore = tableRef.GetRule_simple_table_ref_core1();
  74. auto service = Ctx.Scoped->CurrService;
  75. auto cluster = Ctx.Scoped->CurrCluster;
  76. std::pair<bool, TDeferredAtom> nameOrAt;
  77. bool isBinding = false;
  78. switch (tableRefCore.Alt_case()) {
  79. case TRule_simple_table_ref_core::AltCase::kAltSimpleTableRefCore1: {
  80. if (tableRefCore.GetAlt_simple_table_ref_core1().GetRule_object_ref1().HasBlock1()) {
  81. const auto& clusterExpr = tableRefCore.GetAlt_simple_table_ref_core1().GetRule_object_ref1().GetBlock1().GetRule_cluster_expr1();
  82. bool hasAt = tableRefCore.GetAlt_simple_table_ref_core1().GetRule_object_ref1().GetRule_id_or_at2().HasBlock1();
  83. bool result = !hasAt ?
  84. ClusterExprOrBinding(clusterExpr, service, cluster, isBinding) : ClusterExpr(clusterExpr, false, service, cluster);
  85. if (!result) {
  86. return nullptr;
  87. }
  88. }
  89. if (!isBinding && cluster.Empty()) {
  90. Ctx.Error() << "No cluster name given and no default cluster is selected";
  91. return nullptr;
  92. }
  93. auto id = Id(tableRefCore.GetAlt_simple_table_ref_core1().GetRule_object_ref1().GetRule_id_or_at2(), *this);
  94. nameOrAt = std::make_pair(id.first, TDeferredAtom(Ctx.Pos(), id.second));
  95. break;
  96. }
  97. case TRule_simple_table_ref_core::AltCase::kAltSimpleTableRefCore2: {
  98. auto at = tableRefCore.GetAlt_simple_table_ref_core2().HasBlock1();
  99. TString name;
  100. if (!NamedNodeImpl(tableRefCore.GetAlt_simple_table_ref_core2().GetRule_bind_parameter2(), name, *this)) {
  101. return nullptr;
  102. }
  103. auto named = GetNamedNode(name);
  104. if (!named) {
  105. return nullptr;
  106. }
  107. named->SetRefPos(Ctx.Pos());
  108. if (cluster.Empty()) {
  109. Ctx.Error() << "No cluster name given and no default cluster is selected";
  110. return nullptr;
  111. }
  112. TDeferredAtom table;
  113. MakeTableFromExpression(Ctx.Pos(), Ctx, named, table);
  114. nameOrAt = std::make_pair(at, table);
  115. break;
  116. }
  117. case TRule_simple_table_ref_core::AltCase::ALT_NOT_SET:
  118. Y_ABORT("You should change implementation according to grammar changes");
  119. }
  120. bool withTruncate = false;
  121. TTableHints tableHints;
  122. if (tableRef.HasBlock2()) {
  123. auto hints = TableHintsImpl(tableRef.GetBlock2().GetRule_table_hints1(), service);
  124. if (!hints) {
  125. Ctx.Error() << "Failed to parse table hints";
  126. return nullptr;
  127. }
  128. for (const auto& hint : *hints) {
  129. if (to_upper(hint.first) == "TRUNCATE") {
  130. withTruncate = true;
  131. }
  132. }
  133. std::erase_if(*hints, [](const auto &hint) { return to_upper(hint.first) == "TRUNCATE"; });
  134. tableHints = std::move(*hints);
  135. }
  136. TVector<TString> eraseColumns;
  137. if (intoTableRef.HasBlock2()) {
  138. if (service != StatProviderName) {
  139. Ctx.Error() << "ERASE BY is unsupported for " << service;
  140. return nullptr;
  141. }
  142. PureColumnListStr(
  143. intoTableRef.GetBlock2().GetRule_pure_column_list3(), *this, eraseColumns
  144. );
  145. }
  146. if (withTruncate) {
  147. if (SqlIntoModeStr != "InsertInto") {
  148. Error() << "Unable " << SqlIntoUserModeStr << " with truncate mode";
  149. return nullptr;
  150. }
  151. SqlIntoModeStr += "WithTruncate";
  152. SqlIntoUserModeStr += " ... WITH TRUNCATE";
  153. }
  154. const auto iterMode = str2Mode.find(SqlIntoModeStr);
  155. YQL_ENSURE(iterMode != str2Mode.end(), "Invalid sql write mode string: " << SqlIntoModeStr);
  156. const auto SqlIntoMode = iterMode->second;
  157. TPosition pos(Ctx.Pos());
  158. TTableRef table(Ctx.MakeName("table"), service, cluster, nullptr);
  159. if (isBinding) {
  160. const TString* binding = nameOrAt.second.GetLiteral();
  161. YQL_ENSURE(binding);
  162. YQL_ENSURE(!nameOrAt.first);
  163. if (!ApplyTableBinding(*binding, table, tableHints)) {
  164. return nullptr;
  165. }
  166. } else {
  167. table.Keys = BuildTableKey(pos, service, cluster, nameOrAt.second, {nameOrAt.first ? "@" : ""});
  168. }
  169. Ctx.IncrementMonCounter("sql_insert_clusters", table.Cluster.GetLiteral() ? *table.Cluster.GetLiteral() : "unknown");
  170. auto values = TSqlIntoValues(Ctx, Mode).Build(node.GetRule_into_values_source4(), SqlIntoUserModeStr);
  171. if (!values) {
  172. return nullptr;
  173. }
  174. if (!ValidateServiceName(node, table, SqlIntoMode, GetPos(modeTokens[0]))) {
  175. return nullptr;
  176. }
  177. Ctx.IncrementMonCounter("sql_features", SqlIntoModeStr);
  178. auto options = BuildIntoTableOptions(pos, eraseColumns, tableHints);
  179. if (node.HasBlock5()) {
  180. options = options->L(options, ReturningList(node.GetBlock5().GetRule_returning_columns_list1()));
  181. }
  182. return BuildWriteColumns(pos, Ctx.Scoped, table,
  183. ToWriteColumnsMode(SqlIntoMode), std::move(values),
  184. options);
  185. }
  186. bool TSqlIntoTable::ValidateServiceName(const TRule_into_table_stmt& node, const TTableRef& table,
  187. ESQLWriteColumnMode mode, const TPosition& pos) {
  188. Y_UNUSED(node);
  189. auto serviceName = table.Service;
  190. const bool isMapReduce = serviceName == YtProviderName;
  191. const bool isKikimr = serviceName == KikimrProviderName || serviceName == YdbProviderName;
  192. const bool isRtmr = serviceName == RtmrProviderName;
  193. const bool isStat = serviceName == StatProviderName;
  194. if (!isKikimr) {
  195. if (mode == ESQLWriteColumnMode::InsertOrAbortInto ||
  196. mode == ESQLWriteColumnMode::InsertOrIgnoreInto ||
  197. mode == ESQLWriteColumnMode::InsertOrRevertInto ||
  198. mode == ESQLWriteColumnMode::UpsertInto && !isStat)
  199. {
  200. Ctx.Error(pos) << SqlIntoUserModeStr << " is not supported for " << serviceName << " tables";
  201. Ctx.IncrementMonCounter("sql_errors", TStringBuilder() << SqlIntoUserModeStr << "UnsupportedFor" << serviceName);
  202. return false;
  203. }
  204. }
  205. if (isMapReduce) {
  206. if (mode == ESQLWriteColumnMode::ReplaceInto) {
  207. Ctx.Error(pos) << "Meaning of REPLACE INTO has been changed, now you should use INSERT INTO <table> WITH TRUNCATE ... for " << serviceName;
  208. Ctx.IncrementMonCounter("sql_errors", "ReplaceIntoConflictUsage");
  209. return false;
  210. }
  211. } else if (isKikimr) {
  212. if (mode == ESQLWriteColumnMode::InsertIntoWithTruncate) {
  213. Ctx.Error(pos) << "INSERT INTO WITH TRUNCATE is not supported for " << serviceName << " tables";
  214. Ctx.IncrementMonCounter("sql_errors", TStringBuilder() << SqlIntoUserModeStr << "UnsupportedFor" << serviceName);
  215. return false;
  216. }
  217. } else if (isRtmr) {
  218. if (mode != ESQLWriteColumnMode::InsertInto) {
  219. Ctx.Error(pos) << SqlIntoUserModeStr << " is unsupported for " << serviceName;
  220. Ctx.IncrementMonCounter("sql_errors", TStringBuilder() << SqlIntoUserModeStr << "UnsupportedFor" << serviceName);
  221. return false;
  222. }
  223. } else if (isStat) {
  224. if (mode != ESQLWriteColumnMode::UpsertInto) {
  225. Ctx.Error(pos) << SqlIntoUserModeStr << " is unsupported for " << serviceName;
  226. Ctx.IncrementMonCounter("sql_errors", TStringBuilder() << SqlIntoUserModeStr << "UnsupportedFor" << serviceName);
  227. return false;
  228. }
  229. }
  230. return true;
  231. }
  232. } // namespace NSQLTranslationV1