Browse Source

Intermediate changes

robot-piglet 1 year ago
parent
commit
e69ee2703d

+ 0 - 1
yt/yt/client/driver/command.cpp

@@ -16,7 +16,6 @@ using namespace NConcurrency;
 
 ////////////////////////////////////////////////////////////////////////////////
 
-
 void ProduceOutput(
     ICommandContextPtr context,
     const std::function<void(IYsonConsumer*)>& producer)

+ 9 - 3
yt/yt/client/driver/driver.cpp

@@ -362,9 +362,15 @@ public:
         REGISTER_ALL(TGetBundleConfigCommand,              "get_bundle_config",               Null,       Structured, false,  false);
         REGISTER_ALL(TSetBundleConfigCommand,              "set_bundle_config",               Structured, Null,       false,  false);
 
-        REGISTER_ALL(TStartPipelineCommand,                "start_pipeline",                  Null,       Structured, false,  false);
-        REGISTER_ALL(TStopPipelineCommand,                 "stop_pipeline",                   Null,       Structured, false,  false);
-        REGISTER_ALL(TPausePipelineCommand,                "pause_pipeline",                  Null,       Structured, false,  false);
+        REGISTER    (TGetPipelineSpecCommand,              "get_pipeline_spec",               Null,       Structured, true,   false, ApiVersion4);
+        REGISTER    (TSetPipelineSpecCommand,              "set_pipeline_spec",               Structured, Null,       true,   false, ApiVersion4);
+        REGISTER    (TRemovePipelineDynamicSpecCommand,    "remove_pipeline_spec",            Null,       Null,       true,   false, ApiVersion4);
+        REGISTER    (TGetPipelineDynamicSpecCommand,       "get_pipeline_dynamic_spec",       Null,       Structured, true,   false, ApiVersion4);
+        REGISTER    (TSetPipelineDynamicSpecCommand,       "set_pipeline_dynamic_spec",       Structured, Null,       true,   false, ApiVersion4);
+        REGISTER    (TRemovePipelineDynamicSpecCommand,    "remove_pipeline_dynamic_spec",    Null,       Null,       true,   false, ApiVersion4);
+        REGISTER    (TStartPipelineCommand,                "start_pipeline",                  Null,       Structured, false,  false, ApiVersion4);
+        REGISTER    (TStopPipelineCommand,                 "stop_pipeline",                   Null,       Structured, false,  false, ApiVersion4);
+        REGISTER    (TPausePipelineCommand,                "pause_pipeline",                  Null,       Structured, false,  false, ApiVersion4);
 
         if (Config_->EnableInternalCommands) {
             REGISTER_ALL(TReadHunksCommand,                "read_hunks",                      Null,       Structured, false,  true );

+ 274 - 12
yt/yt/client/driver/flow_commands.cpp

@@ -1,36 +1,286 @@
 #include "flow_commands.h"
 
+#include <yt/yt/core/ytree/fluent.h>
+#include <yt/yt/core/ytree/ypath_client.h>
+
 namespace NYT::NDriver {
 
 using namespace NConcurrency;
+using namespace NYTree;
+using namespace NYPath;
+using namespace NYson;
+using namespace NApi;
 
 ////////////////////////////////////////////////////////////////////////////////
 
-void TStartPipelineCommand::Register(TRegistrar registrar)
+namespace {
+
+void ExecuteGetPipelineSpecCommand(
+    const ICommandContextPtr& context,
+    const TYPath& specPath,
+    const auto& specGetterOptions,
+    auto specGetter)
 {
-    registrar.Parameter("pipeline_path", &TThis::PipelinePath);
+    auto client = context->GetClient();
+    auto result = WaitFor(specGetter(client, specGetterOptions))
+        .ValueOrThrow();
+
+    auto spec = SyncYPathGet(ConvertToNode(result.Spec), specPath);
+
+    ProduceOutput(context, [&] (NYson::IYsonConsumer* consumer) {
+        BuildYsonFluently(consumer)
+            .BeginMap()
+                .Item("spec").Value(spec)
+                .Item("version").Value(result.Version)
+            .EndMap();
+        });
 }
 
-void TStartPipelineCommand::DoExecute(ICommandContextPtr context)
+void ExecuteSetPipelineSpecCommand(
+    const ICommandContextPtr& context,
+    const TYPath& specPath,
+    const auto& specGetterOptions,
+    auto specGetter,
+    const auto& specSetterOptions,
+    auto specSetter)
 {
     auto client = context->GetClient();
-    WaitFor(client->StartPipeline(PipelinePath))
-        .ThrowOnError();
+    auto spec = context->ConsumeInputValue();
 
-    ProduceEmptyOutput(context);
+    auto setResult = [&] {
+        if (specPath.empty()) {
+            return WaitFor(specSetter(client, spec, specSetterOptions))
+                .ValueOrThrow();
+        } else {
+            auto getResult = WaitFor(specGetter(client, specGetterOptions))
+                .ValueOrThrow();
+
+            if (specSetterOptions.ExpectedVersion && getResult.Version != *specSetterOptions.ExpectedVersion) {
+                THROW_ERROR_EXCEPTION(
+                    NFlow::EErrorCode::SpecVersionMismatch,
+                    "Spec version mismatch: expected %v, got %v",
+                    *specSetterOptions.ExpectedVersion,
+                    getResult.Version);
+            }
+
+            auto fullSpec = ConvertToNode(getResult.Spec);
+            SyncYPathSet(fullSpec, specPath, spec);
+
+            auto adjustedOptions = specSetterOptions;
+            adjustedOptions.ExpectedVersion = getResult.Version;
+            return WaitFor(specSetter(client, ConvertToYsonString(fullSpec), adjustedOptions))
+                .ValueOrThrow();
+        }
+    }();
+
+    ProduceOutput(context, [&] (NYson::IYsonConsumer* consumer) {
+        BuildYsonFluently(consumer)
+            .BeginMap()
+                .Item("version").Value(setResult.Version)
+            .EndMap();
+        });
+}
+
+void ExecuteRemovePipelineSpecCommand(
+    const ICommandContextPtr& context,
+    const TYPath& specPath,
+    const auto& specGetterOptions,
+    auto specGetter,
+    const auto& specSetterOptions,
+    auto specSetter)
+{
+    auto client = context->GetClient();
+    auto spec = context->ConsumeInputValue();
+
+    auto getResult = WaitFor(specGetter(client, specGetterOptions))
+        .ValueOrThrow();
+
+    if (specSetterOptions.ExpectedVersion && getResult.Version != *specSetterOptions.ExpectedVersion) {
+        THROW_ERROR_EXCEPTION(
+            NFlow::EErrorCode::SpecVersionMismatch,
+            "Spec version mismatch: expected %v, got %v",
+            *specSetterOptions.ExpectedVersion,
+            getResult.Version);
+    }
+
+    auto fullSpec = ConvertToNode(getResult.Spec);
+    SyncYPathRemove(fullSpec, specPath);
+
+    auto adjustedOptions = specSetterOptions;
+    adjustedOptions.ExpectedVersion = getResult.Version;
+    auto setResult = WaitFor(specSetter(client, ConvertToYsonString(fullSpec), adjustedOptions))
+        .ValueOrThrow();
+
+    ProduceOutput(context, [&] (NYson::IYsonConsumer* consumer) {
+        BuildYsonFluently(consumer)
+            .BeginMap()
+                .Item("version").Value(setResult.Version)
+            .EndMap();
+        });
 }
 
+} // namespace
+
 ////////////////////////////////////////////////////////////////////////////////
 
-void TStopPipelineCommand::Register(TRegistrar registrar)
+void TPipelineCommandBase::Register(TRegistrar registrar)
 {
     registrar.Parameter("pipeline_path", &TThis::PipelinePath);
 }
 
-void TStopPipelineCommand::DoExecute(ICommandContextPtr context)
+////////////////////////////////////////////////////////////////////////////////
+
+void TGetPipelineSpecCommand::Register(TRegistrar registrar)
+{
+    registrar.Parameter("spec_path", &TThis::SpecPath)
+        .Default();
+}
+
+void TGetPipelineSpecCommand::DoExecute(ICommandContextPtr context)
+{
+    ExecuteGetPipelineSpecCommand(
+        context,
+        SpecPath,
+        Options,
+        [&] (const auto& client, const auto& options) { return client->GetPipelineSpec(PipelinePath, options); });
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+void TSetPipelineSpecCommand::Register(TRegistrar registrar)
+{
+    registrar.Parameter("spec_path", &TThis::SpecPath)
+        .Default();
+    registrar.ParameterWithUniversalAccessor<bool>(
+        "force",
+        [] (TThis* command) -> auto& {
+            return command->Options.Force;
+        })
+        .Optional(/*init*/ false);
+    registrar.ParameterWithUniversalAccessor<std::optional<NFlow::TVersion>>(
+        "expected_version",
+        [] (TThis* command) -> auto& {
+            return command->Options.ExpectedVersion;
+        })
+        .Optional(/*init*/ false);
+}
+
+void TSetPipelineSpecCommand::DoExecute(ICommandContextPtr context)
+{
+    ExecuteSetPipelineSpecCommand(
+        context,
+        SpecPath,
+        TGetPipelineSpecOptions(),
+        [&] (const auto& client, const auto& options) { return client->GetPipelineSpec(PipelinePath, options); },
+        Options,
+        [&] (const auto& client, const auto& spec, const auto& options) { return client->SetPipelineSpec(PipelinePath, spec, options); });
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+void TRemovePipelineSpecCommand::Register(TRegistrar registrar)
+{
+    registrar.Parameter("spec_path", &TThis::SpecPath);
+    registrar.ParameterWithUniversalAccessor<bool>(
+        "force",
+        [] (TThis* command) -> auto& {
+            return command->Options.Force;
+        })
+        .Optional(/*init*/ false);
+    registrar.ParameterWithUniversalAccessor<std::optional<NFlow::TVersion>>(
+        "expected_version",
+        [] (TThis* command) -> auto& {
+            return command->Options.ExpectedVersion;
+        })
+        .Optional(/*init*/ false);
+}
+
+void TRemovePipelineSpecCommand::DoExecute(ICommandContextPtr context)
+{
+    ExecuteRemovePipelineSpecCommand(
+        context,
+        SpecPath,
+        TGetPipelineSpecOptions(),
+        [&] (const auto& client, const auto& options) { return client->GetPipelineSpec(PipelinePath, options); },
+        Options,
+        [&] (const auto& client, const auto& spec, const auto& options) { return client->SetPipelineSpec(PipelinePath, spec, options); });
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+void TGetPipelineDynamicSpecCommand::Register(TRegistrar registrar)
+{
+    registrar.Parameter("spec_path", &TThis::SpecPath)
+        .Default();
+}
+
+void TGetPipelineDynamicSpecCommand::DoExecute(ICommandContextPtr context)
+{
+    ExecuteGetPipelineSpecCommand(
+        context,
+        SpecPath,
+        Options,
+        [&] (const auto& client, const auto& options) { return client->GetPipelineDynamicSpec(PipelinePath, options); });
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+void TSetPipelineDynamicSpecCommand::Register(TRegistrar registrar)
+{
+    registrar.Parameter("spec_path", &TThis::SpecPath)
+        .Default();
+    registrar.ParameterWithUniversalAccessor<std::optional<NFlow::TVersion>>(
+        "expected_version",
+        [] (TThis* command) -> auto& {
+            return command->Options.ExpectedVersion;
+        })
+        .Optional(/*init*/ false);
+}
+
+void TSetPipelineDynamicSpecCommand::DoExecute(ICommandContextPtr context)
+{
+    ExecuteSetPipelineSpecCommand(
+        context,
+        SpecPath,
+        TGetPipelineDynamicSpecOptions(),
+        [&] (const auto& client, const auto& options) { return client->GetPipelineDynamicSpec(PipelinePath, options); },
+        Options,
+        [&] (const auto& client, const auto& spec, const auto& options) { return client->SetPipelineDynamicSpec(PipelinePath, spec, options); });
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+void TRemovePipelineDynamicSpecCommand::Register(TRegistrar registrar)
+{
+    registrar.Parameter("spec_path", &TThis::SpecPath);
+    registrar.ParameterWithUniversalAccessor<std::optional<NFlow::TVersion>>(
+        "expected_version",
+        [] (TThis* command) -> auto& {
+            return command->Options.ExpectedVersion;
+        })
+        .Optional(/*init*/ false);
+}
+
+void TRemovePipelineDynamicSpecCommand::DoExecute(ICommandContextPtr context)
+{
+    ExecuteRemovePipelineSpecCommand(
+        context,
+        SpecPath,
+        TGetPipelineDynamicSpecOptions(),
+        [&] (const auto& client, const auto& options) { return client->GetPipelineDynamicSpec(PipelinePath, options); },
+        Options,
+        [&] (const auto& client, const auto& spec, const auto& options) { return client->SetPipelineDynamicSpec(PipelinePath, spec, options); });
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+void TStartPipelineCommand::Register(TRegistrar /*registrar*/)
+{ }
+
+void TStartPipelineCommand::DoExecute(ICommandContextPtr context)
 {
     auto client = context->GetClient();
-    WaitFor(client->StopPipeline(PipelinePath))
+    WaitFor(client->StartPipeline(PipelinePath, Options))
         .ThrowOnError();
 
     ProduceEmptyOutput(context);
@@ -38,15 +288,27 @@ void TStopPipelineCommand::DoExecute(ICommandContextPtr context)
 
 ////////////////////////////////////////////////////////////////////////////////
 
-void TPausePipelineCommand::Register(TRegistrar registrar)
+void TStopPipelineCommand::Register(TRegistrar /*registrar*/)
+{ }
+
+void TStopPipelineCommand::DoExecute(ICommandContextPtr context)
 {
-    registrar.Parameter("pipeline_path", &TThis::PipelinePath);
+    auto client = context->GetClient();
+    WaitFor(client->StopPipeline(PipelinePath, Options))
+        .ThrowOnError();
+
+    ProduceEmptyOutput(context);
 }
 
+////////////////////////////////////////////////////////////////////////////////
+
+void TPausePipelineCommand::Register(TRegistrar /*registrar*/)
+{ }
+
 void TPausePipelineCommand::DoExecute(ICommandContextPtr context)
 {
     auto client = context->GetClient();
-    WaitFor(client->PausePipeline(PipelinePath))
+    WaitFor(client->PausePipeline(PipelinePath, Options))
         .ThrowOnError();
 
     ProduceEmptyOutput(context);

+ 121 - 9
yt/yt/client/driver/flow_commands.h

@@ -8,8 +8,125 @@ namespace NYT::NDriver {
 
 ////////////////////////////////////////////////////////////////////////////////
 
+class TPipelineCommandBase
+    : public virtual NYTree::TYsonStructLite
+{
+public:
+    REGISTER_YSON_STRUCT_LITE(TPipelineCommandBase);
+
+    static void Register(TRegistrar registrar);
+
+protected:
+    NYPath::TYPath PipelinePath;
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+class TGetPipelineSpecCommand
+    : public TTypedCommand<NApi::TGetPipelineSpecOptions>
+    , public TPipelineCommandBase
+{
+public:
+    REGISTER_YSON_STRUCT_LITE(TGetPipelineSpecCommand);
+
+    static void Register(TRegistrar registrar);
+
+private:
+    NYPath::TYPath SpecPath;
+
+    void DoExecute(ICommandContextPtr context) override;
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+class TSetPipelineSpecCommand
+    : public TTypedCommand<NApi::TSetPipelineSpecOptions>
+    , public TPipelineCommandBase
+{
+public:
+    REGISTER_YSON_STRUCT_LITE(TSetPipelineSpecCommand);
+
+    static void Register(TRegistrar registrar);
+
+private:
+    NYPath::TYPath SpecPath;
+
+    void DoExecute(ICommandContextPtr context) override;
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+class TRemovePipelineSpecCommand
+    : public TTypedCommand<NApi::TSetPipelineSpecOptions>
+    , public TPipelineCommandBase
+{
+public:
+    REGISTER_YSON_STRUCT_LITE(TRemovePipelineSpecCommand);
+
+    static void Register(TRegistrar registrar);
+
+private:
+    NYPath::TYPath SpecPath;
+
+    void DoExecute(ICommandContextPtr context) override;
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+class TGetPipelineDynamicSpecCommand
+    : public TTypedCommand<NApi::TGetPipelineDynamicSpecOptions>
+    , public TPipelineCommandBase
+{
+public:
+    REGISTER_YSON_STRUCT_LITE(TGetPipelineDynamicSpecCommand);
+
+    static void Register(TRegistrar registrar);
+
+private:
+    NYPath::TYPath SpecPath;
+
+    void DoExecute(ICommandContextPtr context) override;
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+class TSetPipelineDynamicSpecCommand
+    : public TTypedCommand<NApi::TSetPipelineDynamicSpecOptions>
+    , public TPipelineCommandBase
+{
+public:
+    REGISTER_YSON_STRUCT_LITE(TSetPipelineDynamicSpecCommand);
+
+    static void Register(TRegistrar registrar);
+
+private:
+    NYPath::TYPath SpecPath;
+
+    void DoExecute(ICommandContextPtr context) override;
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+class TRemovePipelineDynamicSpecCommand
+    : public TTypedCommand<NApi::TSetPipelineDynamicSpecOptions>
+    , public TPipelineCommandBase
+{
+public:
+    REGISTER_YSON_STRUCT_LITE(TRemovePipelineDynamicSpecCommand);
+
+    static void Register(TRegistrar registrar);
+
+private:
+    NYPath::TYPath SpecPath;
+
+    void DoExecute(ICommandContextPtr context) override;
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
 class TStartPipelineCommand
     : public TTypedCommand<NApi::TStartPipelineOptions>
+    , public TPipelineCommandBase
 {
 public:
     REGISTER_YSON_STRUCT_LITE(TStartPipelineCommand);
@@ -17,8 +134,6 @@ public:
     static void Register(TRegistrar registrar);
 
 private:
-    NYPath::TYPath PipelinePath;
-
     void DoExecute(ICommandContextPtr context) override;
 };
 
@@ -26,6 +141,7 @@ private:
 
 class TStopPipelineCommand
     : public TTypedCommand<NApi::TStopPipelineOptions>
+    , public TPipelineCommandBase
 {
 public:
     REGISTER_YSON_STRUCT_LITE(TStopPipelineCommand);
@@ -33,15 +149,14 @@ public:
     static void Register(TRegistrar registrar);
 
 private:
-    NYPath::TYPath PipelinePath;
-
     void DoExecute(ICommandContextPtr context) override;
 };
 
 ////////////////////////////////////////////////////////////////////////////////
 
 class TPausePipelineCommand
-    : public TTypedCommand<NApi::TStopPipelineOptions>
+    : public TTypedCommand<NApi::TPausePipelineOptions>
+    , public TPipelineCommandBase
 {
 public:
     REGISTER_YSON_STRUCT_LITE(TPausePipelineCommand);
@@ -49,8 +164,6 @@ public:
     static void Register(TRegistrar registrar);
 
 private:
-    NYPath::TYPath PipelinePath;
-
     void DoExecute(ICommandContextPtr context) override;
 };
 
@@ -58,6 +171,7 @@ private:
 
 class TGetPipelineStatusCommand
     : public TTypedCommand<NApi::TGetPipelineStatusOptions>
+    , public TPipelineCommandBase
 {
 public:
     REGISTER_YSON_STRUCT_LITE(TGetPipelineStatusCommand);
@@ -65,8 +179,6 @@ public:
     static void Register(TRegistrar registrar);
 
 private:
-    NYPath::TYPath PipelinePath;
-
     void DoExecute(ICommandContextPtr context) override;
 };
 

+ 4 - 0
yt/yt/flow/lib/client/public.h

@@ -27,6 +27,10 @@ DEFINE_ENUM(EPipelineState,
     ((Completed)      (6))
 );
 
+YT_DEFINE_ERROR_ENUM(
+    ((SpecVersionMismatch)    (3300))
+);
+
 YT_DEFINE_STRONG_TYPEDEF(TVersion, i64);
 
 ////////////////////////////////////////////////////////////////////////////////