yql_pg_datasource.cpp 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128
  1. #include "yql_pg_provider_impl.h"
  2. #include <yql/essentials/providers/common/provider/yql_data_provider_impl.h>
  3. #include <yql/essentials/providers/common/provider/yql_provider_names.h>
  4. #include <yql/essentials/core/expr_nodes/yql_expr_nodes.h>
  5. #include <yql/essentials/providers/pg/expr_nodes/yql_pg_expr_nodes.h>
  6. #include <yql/essentials/core/yql_expr_type_annotation.h>
  7. #include <yql/essentials/utils/log/log.h>
  8. namespace NYql {
  9. using namespace NNodes;
  10. class TPgDataSourceImpl : public TDataProviderBase {
  11. public:
  12. TPgDataSourceImpl(TPgState::TPtr state)
  13. : State_(state)
  14. , TypeAnnotationTransformer_(CreatePgDataSourceTypeAnnotationTransformer(state))
  15. , DqIntegration_(CreatePgDqIntegration(State_))
  16. {}
  17. TStringBuf GetName() const override {
  18. return PgProviderName;
  19. }
  20. IGraphTransformer& GetTypeAnnotationTransformer(bool instantOnly) override {
  21. Y_UNUSED(instantOnly);
  22. return *TypeAnnotationTransformer_;
  23. }
  24. bool CanParse(const TExprNode& node) override {
  25. if (node.IsCallable(TCoRead::CallableName())) {
  26. return TPgDataSource::Match(node.Child(1));
  27. }
  28. return TypeAnnotationTransformer_->CanParse(node);
  29. }
  30. bool GetExecWorld(const TExprNode::TPtr& node, TExprNode::TPtr& root) override {
  31. auto read = TMaybeNode<TPgReadTable>(node);
  32. if (!read) {
  33. return false;
  34. }
  35. root = read.Cast().World().Ptr();
  36. return true;
  37. }
  38. TExprNode::TPtr RewriteIO(const TExprNode::TPtr& node, TExprContext& ctx) override {
  39. YQL_CLOG(INFO, ProviderPg) << "RewriteIO";
  40. if (auto left = TMaybeNode<TCoLeft>(node)) {
  41. return left.Input().Maybe<TPgRead>().World().Cast().Ptr();
  42. }
  43. auto read = TCoRight(node).Input().Cast<TPgRead>();
  44. auto keyNode = read.FreeArgs().Get(2).Ptr();
  45. if (keyNode->IsCallable("MrTableConcat")) {
  46. if (keyNode->ChildrenSize() != 1) {
  47. ctx.AddError(TIssue(ctx.GetPosition(keyNode->Pos()), TStringBuilder() << "Expected single table name"));
  48. return nullptr;
  49. }
  50. keyNode = keyNode->HeadPtr();
  51. }
  52. const auto maybeKey = TExprBase(keyNode).Maybe<TCoKey>();
  53. if (!maybeKey) {
  54. ctx.AddError(TIssue(ctx.GetPosition(read.FreeArgs().Get(0).Pos()), TStringBuilder() << "Expected Key"));
  55. return nullptr;
  56. }
  57. const auto& keyArg = maybeKey.Cast().Ref().Head();
  58. if (!keyArg.IsList() || keyArg.ChildrenSize() != 2U
  59. || !keyArg.Head().IsAtom("table") || !keyArg.Tail().IsCallable(TCoString::CallableName())) {
  60. ctx.AddError(TIssue(ctx.GetPosition(keyArg.Pos()), TStringBuilder() << "Expected single table name"));
  61. return nullptr;
  62. }
  63. const auto tableName = TString(keyArg.Tail().Head().Content());
  64. auto childrenList = read.Ref().ChildrenList();
  65. childrenList[2] = ctx.NewAtom(childrenList[2]->Pos(), tableName);
  66. auto newRead = ctx.NewCallable(read.Ref().Pos(), TPgReadTable::CallableName(), std::move(childrenList));
  67. return Build<TCoRight>(ctx, read.Pos())
  68. .Input(newRead)
  69. .Done().Ptr();
  70. }
  71. bool ValidateParameters(TExprNode& node, TExprContext& ctx, TMaybe<TString>& cluster) override {
  72. if (node.IsCallable(TCoDataSource::CallableName())) {
  73. if (!EnsureArgsCount(node, 2, ctx)) {
  74. return false;
  75. }
  76. if (node.Child(0)->Content() == PgProviderName) {
  77. if (!EnsureAtom(*node.Child(1), ctx)) {
  78. return false;
  79. }
  80. if (node.Child(1)->Content().empty()) {
  81. ctx.AddError(TIssue(ctx.GetPosition(node.Child(1)->Pos()), "Empty cluster name"));
  82. return false;
  83. }
  84. cluster = TString(node.Child(1)->Content());
  85. return true;
  86. }
  87. }
  88. ctx.AddError(TIssue(ctx.GetPosition(node.Pos()), "Invalid Pg DataSource parameters"));
  89. return false;
  90. }
  91. IDqIntegration* GetDqIntegration() override {
  92. return DqIntegration_.Get();
  93. }
  94. private:
  95. TPgState::TPtr State_;
  96. const THolder<TVisitorTransformerBase> TypeAnnotationTransformer_;
  97. const THolder<IDqIntegration> DqIntegration_;
  98. };
  99. TIntrusivePtr<IDataProvider> CreatePgDataSource(TPgState::TPtr state) {
  100. return MakeIntrusive<TPgDataSourceImpl>(state);
  101. }
  102. }