yql_callable_transform.h 10 KB


  1. #pragma once
  2. #include "yql_graph_transformer.h"
  3. #include "yql_type_annotation.h"
  4. #include "yql_expr_type_annotation.h"
  5. #include <yql/essentials/core/sql_types/yql_callable_names.h>
  6. #include <yql/essentials/ast/yql_expr.h>
  7. #include <yql/essentials/public/issue/yql_issue_manager.h>
  8. #include <yql/essentials/public/issue/yql_issue.h>
  9. #include <yql/essentials/utils/yql_panic.h>
  10. #include <library/cpp/threading/future/future.h>
  11. #include <util/generic/ptr.h>
  12. #include <util/string/builder.h>
  13. #include <utility>
  14. namespace NYql {
  15. template <class TDerived>
  16. class TCallableTransformerBase : public TGraphTransformerBase {
  17. public:
  18. TCallableTransformerBase(TTypeAnnotationContext& types, bool instantOnly)
  19. : Types(types)
  20. , InstantOnly(instantOnly)
  21. {}
  22. IGraphTransformer::TStatus DoTransform(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) final {
  23. output = input;
  24. if (input->IsList()) {
  25. if (const auto maybeStatus = static_cast<TDerived*>(this)->ProcessList(input, output, ctx)) {
  26. return *maybeStatus;
  27. }
  28. }
  29. auto name = input->Content();
  30. TIssueScopeGuard issueScope(ctx.IssueManager, [&]() {
  31. return MakeIntrusive<TIssue>(ctx.GetPosition(input->Pos()),
  32. TStringBuilder() << "At function: " << NormalizeCallableName(name));
  33. });
  34. TStatus status = TStatus::Ok;
  35. if (auto maybeStatus = static_cast<TDerived*>(this)->ProcessCore(input, output, ctx)) {
  36. status = *maybeStatus;
  37. } else {
  38. if (name == CommitName) {
  39. auto datasink = ParseCommit(*input, ctx);
  40. if (!datasink) {
  41. status = TStatus::Error;
  42. } else {
  43. status = ProcessDataProviderAnnotation(*datasink, input, output, ctx);
  44. if (status == TStatus::Ok) {
  45. status = static_cast<TDerived*>(this)->ValidateProviderCommitResult(input, ctx);
  46. }
  47. }
  48. } else if (name == ReadName) {
  49. auto datasource = ParseRead(*input, ctx);
  50. if (!datasource) {
  51. status = TStatus::Error;
  52. } else {
  53. status = ProcessDataProviderAnnotation(*datasource, input, output, ctx);
  54. if (status == TStatus::Ok) {
  55. status = static_cast<TDerived*>(this)->ValidateProviderReadResult(input, ctx);
  56. }
  57. }
  58. } else if (name == WriteName) {
  59. auto datasink = ParseWrite(*input, ctx);
  60. if (!datasink) {
  61. status = TStatus::Error;
  62. } else {
  63. status = ProcessDataProviderAnnotation(*datasink, input, output, ctx);
  64. if (status == TStatus::Ok) {
  65. status = static_cast<TDerived*>(this)->ValidateProviderWriteResult(input, ctx);
  66. }
  67. }
  68. } else if (name == ConfigureName) {
  69. auto provider = ParseConfigure(*input, ctx);
  70. if (!provider) {
  71. status = TStatus::Error;
  72. } else {
  73. status = ProcessDataProviderAnnotation(*provider, input, output, ctx);
  74. if (status == TStatus::Ok) {
  75. status = static_cast<TDerived*>(this)->ValidateProviderConfigureResult(input, ctx);
  76. }
  77. }
  78. } else {
  79. bool foundFunc = false;
  80. for (auto& datasource : Types.DataSources) {
  81. if (!datasource->CanParse(*input)) {
  82. continue;
  83. }
  84. foundFunc = true;
  85. status = ProcessDataProviderAnnotation(*datasource, input, output, ctx);
  86. break;
  87. }
  88. if (!foundFunc) {
  89. for (auto& datasink : Types.DataSinks) {
  90. if (!datasink->CanParse(*input)) {
  91. continue;
  92. }
  93. foundFunc = true;
  94. status = ProcessDataProviderAnnotation(*datasink, input, output, ctx);
  95. break;
  96. }
  97. }
  98. if (!foundFunc) {
  99. return static_cast<TDerived*>(this)->ProcessUnknown(input, ctx);
  100. }
  101. }
  102. }
  103. return status;
  104. }
  105. NThreading::TFuture<void> DoGetAsyncFuture(const TExprNode& input) final {
  106. const auto it = PendingNodes.find(&input);
  107. YQL_ENSURE(it != PendingNodes.cend());
  108. return static_cast<TDerived*>(this)->GetTransformer(*it->second.second).GetAsyncFuture(input);
  109. }
  110. TStatus DoApplyAsyncChanges(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) final {
  111. const auto it = PendingNodes.find(input.Get());
  112. YQL_ENSURE(it != PendingNodes.cend());
  113. const auto provider = it->second.second;
  114. IGraphTransformer& transformer = static_cast<TDerived*>(this)->GetTransformer(*provider);
  115. const auto status = transformer.ApplyAsyncChanges(it->second.first, output, ctx);
  116. PendingNodes.erase(it);
  117. return status;
  118. }
  119. void Rewind() override {
  120. PendingNodes.clear();
  121. }
  122. protected:
  123. IDataProvider* ParseCommit(const TExprNode& input, TExprContext& ctx) {
  124. if (!EnsureMinArgsCount(input, 2, ctx)) {
  125. return nullptr;
  126. }
  127. if (!EnsureMaxArgsCount(input, 3, ctx)) {
  128. return nullptr;
  129. }
  130. if (!EnsureWorldType(*input.Child(0), ctx)) {
  131. return nullptr;
  132. }
  133. if (!EnsureDataSink(*input.Child(1), ctx)) {
  134. return nullptr;
  135. }
  136. if (input.ChildrenSize() == 3) {
  137. for (auto& setting : input.Child(2)->Children()) {
  138. if (!EnsureTupleSize(*setting, 2, ctx)) {
  139. return nullptr;
  140. }
  141. auto nameNode = setting->Child(0);
  142. if (!EnsureAtom(*nameNode, ctx)) {
  143. return nullptr;
  144. }
  145. }
  146. }
  147. auto datasinkName = input.Child(1)->Child(0)->Content();
  148. auto datasink = Types.DataSinkMap.FindPtr(datasinkName);
  149. if (!datasink) {
  150. ctx.AddError(TIssue(ctx.GetPosition(input.Pos()), TStringBuilder() << "Unsupported datasink: " << datasinkName));
  151. return nullptr;
  152. }
  153. return (*datasink).Get();
  154. }
  155. IDataProvider* ParseRead(const TExprNode& input, TExprContext& ctx) {
  156. if (!EnsureMinArgsCount(input, 2, ctx)) {
  157. return nullptr;
  158. }
  159. if (!EnsureWorldType(*input.Child(0), ctx)) {
  160. return nullptr;
  161. }
  162. if (!EnsureDataSource(*input.Child(1), ctx)) {
  163. return nullptr;
  164. }
  165. auto datasourceName = input.Child(1)->Child(0)->Content();
  166. auto datasource = Types.DataSourceMap.FindPtr(datasourceName);
  167. if (!datasource) {
  168. ctx.AddError(TIssue(ctx.GetPosition(input.Pos()), TStringBuilder() << "Unsupported datasource: " << datasourceName));
  169. return nullptr;
  170. }
  171. return (*datasource).Get();
  172. }
  173. IDataProvider* ParseWrite(const TExprNode& input, TExprContext& ctx) {
  174. if (!EnsureMinArgsCount(input, 2, ctx)) {
  175. return nullptr;
  176. }
  177. if (!EnsureWorldType(*input.Child(0), ctx)) {
  178. return nullptr;
  179. }
  180. if (!EnsureDataSink(*input.Child(1), ctx)) {
  181. return nullptr;
  182. }
  183. auto datasinkName = input.Child(1)->Child(0)->Content();
  184. auto datasink = Types.DataSinkMap.FindPtr(datasinkName);
  185. if (!datasink) {
  186. ctx.AddError(TIssue(ctx.GetPosition(input.Pos()), TStringBuilder() << "Unsupported datasink: " << datasinkName));
  187. return nullptr;
  188. }
  189. return (*datasink).Get();
  190. }
  191. IDataProvider* ParseConfigure(const TExprNode& input, TExprContext& ctx) {
  192. if (!EnsureMinArgsCount(input, 2, ctx)) {
  193. return nullptr;
  194. }
  195. if (!EnsureWorldType(*input.Child(0), ctx)) {
  196. return nullptr;
  197. }
  198. if (!EnsureDataProvider(*input.Child(1), ctx)) {
  199. return nullptr;
  200. }
  201. if (input.Child(1)->IsCallable("DataSource")) {
  202. auto datasourceName = input.Child(1)->Child(0)->Content();
  203. auto datasource = Types.DataSourceMap.FindPtr(datasourceName);
  204. if (!datasource) {
  205. ctx.AddError(TIssue(ctx.GetPosition(input.Pos()), TStringBuilder() << "Unsupported datasource: " << datasourceName));
  206. return nullptr;
  207. }
  208. return (*datasource).Get();
  209. }
  210. if (input.Child(1)->IsCallable("DataSink")) {
  211. auto datasinkName = input.Child(1)->Child(0)->Content();
  212. auto datasink = Types.DataSinkMap.FindPtr(datasinkName);
  213. if (!datasink) {
  214. ctx.AddError(TIssue(ctx.GetPosition(input.Pos()), TStringBuilder() << "Unsupported datasink: " << datasinkName));
  215. return nullptr;
  216. }
  217. return (*datasink).Get();
  218. }
  219. YQL_ENSURE(false, "Unexpected provider class");
  220. }
  221. IGraphTransformer::TStatus ProcessDataProviderAnnotation(IDataProvider& dataProvider,
  222. const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) {
  223. auto status = static_cast<TDerived*>(this)->GetTransformer(dataProvider).Transform(input, output, ctx);
  224. if (status.Level == IGraphTransformer::TStatus::Async) {
  225. if (InstantOnly) {
  226. ctx.AddError(TIssue(ctx.GetPosition(input->Pos()), TStringBuilder() <<
  227. "Async status is not allowed for instant transform, provider name: " << dataProvider.GetName()));
  228. return IGraphTransformer::TStatus::Error;
  229. }
  230. PendingNodes[input.Get()] = std::make_pair(input, &dataProvider);
  231. }
  232. return status;
  233. }
  234. protected:
  235. TTypeAnnotationContext& Types;
  236. const bool InstantOnly;
  237. TNodeMap<std::pair<TExprNode::TPtr, IDataProvider*>> PendingNodes;
  238. };
  239. } // NYql