yql_opt_rewrite_io.cpp 1.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354
  1. #include "yql_opt_rewrite_io.h"
  2. #include "yql_expr_optimize.h"
  3. namespace NYql {
  4. IGraphTransformer::TStatus RewriteIO(const TExprNode::TPtr& input, TExprNode::TPtr& output, const TTypeAnnotationContext& types, TExprContext& ctx) {
  5. if (ctx.Step.IsDone(TExprStep::RewriteIO)) {
  6. return IGraphTransformer::TStatus::Ok;
  7. }
  8. TOptimizeExprSettings settings(nullptr);
  9. settings.VisitChanges = true;
  10. auto ret = OptimizeExpr(input, output, [&](const TExprNode::TPtr& node, TExprContext& ctx) -> TExprNode::TPtr {
  11. YQL_ENSURE(node->Type() == TExprNode::Callable);
  12. if (node->Content() == LeftName || node->Content() == RightName) {
  13. auto child = node->Child(0);
  14. if (child->IsCallable(ReadName)) {
  15. auto dataSourceName = child->Child(1)->Child(0)->Content();
  16. auto datasource = types.DataSourceMap.FindPtr(dataSourceName);
  17. YQL_ENSURE(datasource);
  18. return (*datasource)->RewriteIO(node, ctx);
  19. }
  20. } else if (node->IsCallable(WriteName)) {
  21. auto dataSinkName = node->Child(1)->Child(0)->Content();
  22. auto datasink = types.DataSinkMap.FindPtr(dataSinkName);
  23. YQL_ENSURE(datasink);
  24. return (*datasink)->RewriteIO(node, ctx);
  25. }
  26. return node;
  27. }, ctx, settings);
  28. if (ret.Level == IGraphTransformer::TStatus::Error) {
  29. return ret;
  30. }
  31. if (
  32. !ctx.Step.IsDone(TExprStep::DiscoveryIO) ||
  33. !ctx.Step.IsDone(TExprStep::ExpandApplyForLambdas) ||
  34. !ctx.Step.IsDone(TExprStep::ExprEval)
  35. ) {
  36. return IGraphTransformer::TStatus(IGraphTransformer::TStatus::Repeat, true);
  37. }
  38. for (const auto& ds : types.DataSinks)
  39. ds->PostRewriteIO();
  40. for (const auto& ds : types.DataSources)
  41. ds->PostRewriteIO();
  42. ctx.Step.Done(TExprStep::RewriteIO);
  43. return IGraphTransformer::TStatus::Ok;
  44. }
  45. }