yql_pg_datasink.cpp 2.3 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576
  1. #include "yql_pg_provider_impl.h"
  2. #include <yql/essentials/providers/common/provider/yql_data_provider_impl.h>
  3. #include <yql/essentials/providers/common/provider/yql_provider_names.h>
  4. #include <yql/essentials/core/expr_nodes/yql_expr_nodes.h>
  5. #include <yql/essentials/core/yql_expr_type_annotation.h>
  6. namespace NYql {
  7. using namespace NNodes;
  8. class TPgDataSinkImpl : public TDataProviderBase {
  9. public:
  10. TPgDataSinkImpl(TPgState::TPtr state)
  11. : State_(state)
  12. , TypeAnnotationTransformer_(CreatePgDataSinkTypeAnnotationTransformer(state))
  13. , ExecutionTransformer_(CreatePgDataSinkExecTransformer(state))
  14. {}
  15. TStringBuf GetName() const override {
  16. return PgProviderName;
  17. }
  18. bool CanParse(const TExprNode& node) override {
  19. return TypeAnnotationTransformer_->CanParse(node);
  20. }
  21. bool CanExecute(const TExprNode& node) override {
  22. return ExecutionTransformer_->CanExec(node);
  23. }
  24. IGraphTransformer& GetTypeAnnotationTransformer(bool instantOnly) override {
  25. Y_UNUSED(instantOnly);
  26. return *TypeAnnotationTransformer_;
  27. }
  28. IGraphTransformer& GetCallableExecutionTransformer() override {
  29. return *ExecutionTransformer_;
  30. }
  31. bool ValidateParameters(TExprNode& node, TExprContext& ctx, TMaybe<TString>& cluster) override {
  32. if (node.IsCallable(TCoDataSink::CallableName())) {
  33. if (!EnsureArgsCount(node, 2, ctx)) {
  34. return false;
  35. }
  36. if (node.Child(0)->Content() == PgProviderName) {
  37. if (!EnsureAtom(*node.Child(1), ctx)) {
  38. return false;
  39. }
  40. if (node.Child(1)->Content().empty()) {
  41. ctx.AddError(TIssue(ctx.GetPosition(node.Child(1)->Pos()), "Empty cluster name"));
  42. return false;
  43. }
  44. cluster = TString(node.Child(1)->Content());
  45. return true;
  46. }
  47. }
  48. ctx.AddError(TIssue(ctx.GetPosition(node.Pos()), "Invalid Pg DataSink parameters"));
  49. return false;
  50. }
  51. private:
  52. TPgState::TPtr State_;
  53. const THolder<TVisitorTransformerBase> TypeAnnotationTransformer_;
  54. const THolder<TExecTransformerBase> ExecutionTransformer_;
  55. };
  56. TIntrusivePtr<IDataProvider> CreatePgDataSink(TPgState::TPtr state) {
  57. return MakeIntrusive<TPgDataSinkImpl>(state);
  58. }
  59. }