Browse Source

PingTask CP message is proto based + LoopbackService support

ref:d6c05ce50c167504800a671cf539f335e1ace15f
Aleksandr Khoroshilov 2 years ago
parent
commit
2d1f7aba6b

+ 3 - 1
README.md

@@ -80,9 +80,11 @@ For development purposes we test that YDB could be built and run under latest ve
 We are glad to welcome new contributors!
 
 1. Please read [contributor's guide](CONTRIBUTING.md).
-2. We can accept your work to YDB after you have signed contributor's license agreement (aka CLA).
+2. We can accept your work to YDB after you have read contributor's license agreement (aka CLA).
 3. Please don't forget to add a note to your pull request, that you agree to the terms of the CLA.
 
+More information can be found in [CONTRIBUTING](CONTRIBUTING) file.
+
 ## Success Stories
 
 Take a look at YDB [web site](https://ydb.tech/) for the latest success stories and user scenarios.

+ 6 - 101
ydb/core/yq/libs/actors/task_ping.cpp

@@ -101,101 +101,8 @@ private:
     )
 
     std::unique_ptr<NYq::TEvControlPlaneStorage::TEvPingTaskRequest> CreateControlPlaneEvent() {
-        auto event = std::make_unique<NYq::TEvControlPlaneStorage::TEvPingTaskRequest>(TenantName, CloudId, Scope, OperationId, OwnerId, Deadline);
-        const auto& req = Ev->Record;
-        ui64 issuesByteSize = 0;
-        ui64 transientIssuesByteSize = 0;
-        ui64 resultSetMetaByteSize = 0;
-        ui64 dqGraphBytesSize = 0;
-
-        //TODO use all fields
-        if (req.status() != YandexQuery::QueryMeta::COMPUTE_STATUS_UNSPECIFIED) {
-            event->Status = req.status();
-        }
-        if (!req.issues().empty()) {
-            NYql::TIssues reqIssues;
-            for (const auto& issue : req.issues()) {
-                issuesByteSize += issue.ByteSize();
-            }
-            NYql::IssuesFromMessage(req.issues(), reqIssues);
-            Issues.AddIssues(reqIssues);
-            event->Issues = Issues;
-        }
-        if (!req.transient_issues().empty()) {
-            NYql::TIssues transientIssues;
-            for (const auto& issue : req.transient_issues()) {
-                transientIssuesByteSize += issue.ByteSize();
-            }
-            NYql::IssuesFromMessage(req.transient_issues(), transientIssues);
-            event->TransientIssues = transientIssues;
-        }
-        if (req.statistics()) {
-            event->Statistics = req.statistics();
-        }
-        if (req.ast()) {
-            event->Ast = req.ast();
-        }
-        if (req.result_id().value()) {
-            event->ResultId = req.result_id().value();
-        }
-        if (req.plan()) {
-            event->Plan = req.plan();
-        }
-        if (!req.result_set_meta().empty()) {
-            for (const auto& rsMeta : req.result_set_meta()) {
-                resultSetMetaByteSize += rsMeta.ByteSize();
-            }
-            event->ResultSetMetas = {req.result_set_meta().begin(), req.result_set_meta().end()};
-        }
-        if (req.has_started_at()) {
-            event->StartedAt = TInstant::FromValue(google::protobuf::util::TimeUtil::TimestampToMicroseconds(req.started_at()));
-        }
-        if (req.has_finished_at()) {
-            event->FinishedAt = TInstant::FromValue(google::protobuf::util::TimeUtil::TimestampToMicroseconds(req.finished_at()));
-        }
-        event->ResignQuery = req.resign_query();
-        event->StatusCode = req.status_code();
-
-        event->CreatedTopicConsumers.reserve(req.created_topic_consumers_size());
-        for (const auto& topicConsumerProto : req.created_topic_consumers()) {
-            auto& topicConsumer = event->CreatedTopicConsumers.emplace_back();
-            topicConsumer.DatabaseId = topicConsumerProto.database_id();
-            topicConsumer.Database = topicConsumerProto.database();
-            topicConsumer.TopicPath = topicConsumerProto.topic_path();
-            topicConsumer.ConsumerName = topicConsumerProto.consumer_name();
-            topicConsumer.ClusterEndpoint = topicConsumerProto.cluster_endpoint();
-            topicConsumer.UseSsl = topicConsumerProto.use_ssl();
-            topicConsumer.TokenName = topicConsumerProto.token_name();
-            topicConsumer.AddBearerToToken = topicConsumerProto.add_bearer_to_token();
-        }
-
-        event->DqGraphs.reserve(req.dq_graph_size());
-        for (const auto& g : req.dq_graph()) {
-            dqGraphBytesSize += g.size();
-            event->DqGraphs.emplace_back(g);
-        }
-
-        if (req.state_load_mode()) {
-            event->StateLoadMode = req.state_load_mode();
-        }
-
-        if (req.has_disposition()) {
-            event->StreamingDisposition = req.disposition();
-        }
-
-
-        LOG_D("Statistics length: " << req.statistics().size() << ", "
-           << "Ast length: " << req.ast().size() << " bytes, "
-           << "Plan length: " << req.plan().size() << " bytes, "
-           << "Result set meta size: " << resultSetMetaByteSize << " bytes, "
-           << "Topic consumers size: " << event->CreatedTopicConsumers.size() * sizeof(TEvControlPlaneStorage::TTopicConsumer) << " bytes, "
-           << "Dq graphs size: " << dqGraphBytesSize << " bytes, "
-           << "Issues size: " << issuesByteSize << " bytes, "
-           << "Transient issues size: " << transientIssuesByteSize << " bytes");
-
-        event->DqGraphIndex = req.dq_graph_index();
-
-        return std::move(event);
+        auto request = Ev->Record;
+        return std::make_unique<NYq::TEvControlPlaneStorage::TEvPingTaskRequest>(std::move(request));
     }
 
     void HandleResponse(NYq::TEvControlPlaneStorage::TEvPingTaskResponse::TPtr& ev, const TActorContext& ctx) {
@@ -206,12 +113,10 @@ private:
             Fail("ControlPlane PingTaskError", Ydb::StatusIds::GENERIC_ERROR);
             return;
         }
-        auto res = MakeHolder<TEvents::TEvPingTaskResponse>();
-        res->Status = Ydb::StatusIds::SUCCESS;
-        Yq::Private::PingTaskResult result;
-        result.set_action(ev->Get()->Action);
-        res->Record.ConstructInPlace(result);
-        ctx.Send(Sender, res.Release());
+        auto response = MakeHolder<TEvents::TEvPingTaskResponse>();
+        response->Status = Ydb::StatusIds::SUCCESS;
+        response->Record.ConstructInPlace(ev->Get()->Record);
+        ctx.Send(Sender, response.Release());
         Die(ctx);
     }
 

+ 19 - 89
ydb/core/yq/libs/control_plane_storage/events/events.h

@@ -446,112 +446,42 @@ struct TEvControlPlaneStorage {
     };
 
     struct TEvPingTaskRequest : NActors::TEventLocal<TEvPingTaskRequest, EvPingTaskRequest> {
-        explicit TEvPingTaskRequest(const TString& tenantName, const TString& cloudId, const TString& scope, const TString& queryId, const TString& owner, const TInstant& deadline, const TString& resultId = "")
-            : TenantName(tenantName)
-            , CloudId(cloudId)
-            , Scope(scope)
-            , QueryId(queryId)
-            , Owner(owner)
-            , Deadline(deadline)
-            , ResultId(resultId)
-        {
-        }
 
-        size_t GetByteSize() const {
-            return sizeof(*this)
-                    + TenantName.Size()
-                    + CloudId.Size()
-                    + Scope.Size()
-                    + QueryId.Size()
-                    + Owner.Size()
-                    + ResultId.Size()
-                    + Status.Empty() ? 0 : sizeof(*Status)
-                    + GetIssuesByteSize(Issues)
-                    + GetIssuesByteSize(TransientIssues)
-                    + Statistics.Empty() ? 0 : Statistics->Size()
-                    + ResultSetMetasByteSizeLong()
-                    + Ast.Empty() ? 0 : Ast->Size()
-                    + Plan.Empty() ? 0 : Plan->Size()
-                    + StartedAt.Empty() ? 0 : sizeof(*StartedAt)
-                    + FinishedAt.Empty() ? 0 : sizeof(*FinishedAt)
-                    + CreatedTopicConsumersByteSizeLong()
-                    + DqGraphByteSizeLong()
-                    + StreamingDisposition.Empty() ? 0 : StreamingDisposition->ByteSizeLong();
-        }
-
-        size_t ResultSetMetasByteSizeLong() const {
-            if (ResultSetMetas.Empty()) {
-                return 0;
-            }
-            size_t size = 0;
-            for (const auto& resultSet: *ResultSetMetas) {
-                size += resultSet.ByteSizeLong();
-            }
-            size += ResultSetMetas->size() * sizeof(YandexQuery::ResultSetMeta);
-            return size;
-        }
+        TEvPingTaskRequest() = default;
 
-        size_t CreatedTopicConsumersByteSizeLong() const {
-            size_t size = 0;
-            for (const auto& topic: CreatedTopicConsumers) {
-                size += topic.GetByteSize();
-            }
-            size += CreatedTopicConsumers.size() * sizeof(YandexQuery::ResultSetMeta);
-            return size;
-        }
+        explicit TEvPingTaskRequest(
+            Yq::Private::PingTaskRequest&& request)
+            : Request(std::move(request))
+        {}
 
-        size_t DqGraphByteSizeLong() const {
-            size_t size = 0;
-            for (const auto& graph: DqGraphs) {
-                size += graph.Size();
-            }
-            size += DqGraphs.size() * sizeof(TString);
-            return size;
+        size_t GetByteSize() const {
+            return sizeof(*this)
+                    + Request.ByteSizeLong();
         }
 
-        const TString TenantName;
-        const TString CloudId;
-        const TString Scope;
-        const TString QueryId;
-        const TString Owner;
-        const TInstant Deadline;
-        TString ResultId;
-        TMaybe<YandexQuery::QueryMeta::ComputeStatus> Status;
-        TMaybe<NYql::TIssues> Issues;
-        TMaybe<NYql::TIssues> TransientIssues;
-        TMaybe<TString> Statistics;
-        TMaybe<TVector<YandexQuery::ResultSetMeta>> ResultSetMetas;
-        TMaybe<TString> Ast;
-        TMaybe<TString> Plan;
-        TMaybe<TInstant> StartedAt;
-        TMaybe<TInstant> FinishedAt;
-        bool ResignQuery = false;
-        ui64 StatusCode = 0;
-        TVector<TTopicConsumer> CreatedTopicConsumers;
-        TVector<TString> DqGraphs;
-        i32 DqGraphIndex = 0;
-        YandexQuery::StateLoadMode StateLoadMode = YandexQuery::STATE_LOAD_MODE_UNSPECIFIED;
-        TMaybe<YandexQuery::StreamingDisposition> StreamingDisposition;
+        Yq::Private::PingTaskRequest Request;
     };
 
     struct TEvPingTaskResponse : NActors::TEventLocal<TEvPingTaskResponse, EvPingTaskResponse> {
-        explicit TEvPingTaskResponse(const YandexQuery::QueryAction& action)
-            : Action(action)
-        {
-        }
 
-        explicit TEvPingTaskResponse(const NYql::TIssues& issues)
+        explicit TEvPingTaskResponse(
+            const Yq::Private::PingTaskResult& record)
+            : Record(record)
+        {}
+
+        explicit TEvPingTaskResponse(
+            const NYql::TIssues& issues)
             : Issues(issues)
-        {
-        }
+        {}
 
         size_t GetByteSize() const {
             return sizeof(*this)
+                    + Record.ByteSizeLong()
                     + GetIssuesByteSize(Issues)
                     + GetDebugInfoByteSize(DebugInfo);
         }
 
-        YandexQuery::QueryAction Action = YandexQuery::QUERY_ACTION_UNSPECIFIED;
+        Yq::Private::PingTaskResult Record;
         NYql::TIssues Issues;
         TDebugInfoPtr DebugInfo;
     };

+ 1 - 1
ydb/core/yq/libs/control_plane_storage/in_memory_control_plane_storage.cpp

@@ -307,7 +307,7 @@ private:
     {
         SendEmptyResponse<
             TEvControlPlaneStorage::TEvPingTaskRequest::TPtr,
-            YandexQuery::QueryAction,
+            Yq::Private::PingTaskResult,
             TEvControlPlaneStorage::TEvPingTaskResponse>(ev, "PingTaskRequest");
     }
 

+ 100 - 104
ydb/core/yq/libs/control_plane_storage/internal/task_ping.cpp

@@ -18,13 +18,13 @@ bool IsFinishedStatus(YandexQuery::QueryMeta::ComputeStatus status) {
 } // namespace
 
 std::tuple<TString, TParams, const std::function<std::pair<TString, NYdb::TParams>(const TVector<NYdb::TResultSet>&)>> ConstructHardPingTask(
-    const TEvControlPlaneStorage::TEvPingTaskRequest* request, std::shared_ptr<YandexQuery::QueryAction> response,
+    const Yq::Private::PingTaskRequest& request, std::shared_ptr<Yq::Private::PingTaskResult> response,
     const TString& tablePathPrefix, const TDuration& automaticQueriesTtl, const TDuration& taskLeaseTtl, const THashMap<ui64, TRetryPolicyItem>& retryPolicies) {
 
     TSqlQueryBuilder readQueryBuilder(tablePathPrefix, "HardPingTask(read)");
-    readQueryBuilder.AddString("tenant", request->TenantName);
-    readQueryBuilder.AddString("scope", request->Scope);
-    readQueryBuilder.AddString("query_id", request->QueryId);
+    readQueryBuilder.AddString("tenant", request.tenant());
+    readQueryBuilder.AddString("scope", request.scope());
+    readQueryBuilder.AddString("query_id", request.query_id().value());
     readQueryBuilder.AddText(
         "$last_job_id = SELECT `" LAST_JOB_ID_COLUMN_NAME "` FROM `" QUERIES_TABLE_NAME "`\n"
         "   WHERE `" SCOPE_COLUMN_NAME "` = $scope AND `" QUERY_ID_COLUMN_NAME "` = $query_id;\n"
@@ -53,23 +53,23 @@ std::tuple<TString, TParams, const std::function<std::pair<TString, NYdb::TParam
         {
             TResultSetParser parser(resultSets[0]);
             if (!parser.TryNextRow()) {
-                ythrow TControlPlaneStorageException(TIssuesIds::INTERNAL_ERROR) << "NOT FOUND " QUERIES_TABLE_NAME " where " SCOPE_COLUMN_NAME " = \"" << request->Scope << "\" and " QUERY_ID_COLUMN_NAME " = \"" << request->QueryId << "\"";
+                ythrow TControlPlaneStorageException(TIssuesIds::INTERNAL_ERROR) << "NOT FOUND " QUERIES_TABLE_NAME " where " SCOPE_COLUMN_NAME " = \"" << request.scope() << "\" and " QUERY_ID_COLUMN_NAME " = \"" << request.query_id().value() << "\"";
             }
             if (!query.ParseFromString(*parser.ColumnParser(QUERY_COLUMN_NAME).GetOptionalString())) {
-                ythrow TControlPlaneStorageException(TIssuesIds::INTERNAL_ERROR) << "ERROR PARSING " QUERIES_TABLE_NAME "." QUERY_COLUMN_NAME " where " QUERY_ID_COLUMN_NAME " = \"" << request->QueryId << "\" and " SCOPE_COLUMN_NAME " = \"" << request->Scope << "\"";
+                ythrow TControlPlaneStorageException(TIssuesIds::INTERNAL_ERROR) << "ERROR PARSING " QUERIES_TABLE_NAME "." QUERY_COLUMN_NAME " where " QUERY_ID_COLUMN_NAME " = \"" << request.query_id().value() << "\" and " SCOPE_COLUMN_NAME " = \"" << request.scope() << "\"";
             }
             if (!internal.ParseFromString(*parser.ColumnParser(INTERNAL_COLUMN_NAME).GetOptionalString())) {
-                ythrow TControlPlaneStorageException(TIssuesIds::INTERNAL_ERROR) << "ERROR PARSING " QUERIES_TABLE_NAME "." INTERNAL_COLUMN_NAME " where " QUERY_ID_COLUMN_NAME " = \"" << request->QueryId << "\" and " SCOPE_COLUMN_NAME " = \"" << request->Scope << "\"";
+                ythrow TControlPlaneStorageException(TIssuesIds::INTERNAL_ERROR) << "ERROR PARSING " QUERIES_TABLE_NAME "." INTERNAL_COLUMN_NAME " where " QUERY_ID_COLUMN_NAME " = \"" << request.query_id().value() << "\" and " SCOPE_COLUMN_NAME " = \"" << request.scope() << "\"";
             }
         }
 
         {
             TResultSetParser parser(resultSets[1]);
             if (!parser.TryNextRow()) {
-                ythrow TControlPlaneStorageException(TIssuesIds::INTERNAL_ERROR) << "NOT FOUND " JOBS_TABLE_NAME " where " SCOPE_COLUMN_NAME " = \"" << request->Scope << "\" and " QUERY_ID_COLUMN_NAME " = \"" << request->QueryId << "\"";
+                ythrow TControlPlaneStorageException(TIssuesIds::INTERNAL_ERROR) << "NOT FOUND " JOBS_TABLE_NAME " where " SCOPE_COLUMN_NAME " = \"" << request.scope() << "\" and " QUERY_ID_COLUMN_NAME " = \"" << request.query_id().value() << "\"";
             }
             if (!job.ParseFromString(*parser.ColumnParser(JOB_COLUMN_NAME).GetOptionalString())) {
-                ythrow TControlPlaneStorageException(TIssuesIds::INTERNAL_ERROR) << "ERROR PARSING " JOBS_TABLE_NAME "." JOB_COLUMN_NAME " where " SCOPE_COLUMN_NAME " = \"" << request->Scope << "\" and " QUERY_ID_COLUMN_NAME " = \"" << request->QueryId << "\"";
+                ythrow TControlPlaneStorageException(TIssuesIds::INTERNAL_ERROR) << "ERROR PARSING " JOBS_TABLE_NAME "." JOB_COLUMN_NAME " where " SCOPE_COLUMN_NAME " = \"" << request.scope() << "\" and " QUERY_ID_COLUMN_NAME " = \"" << request.query_id().value() << "\"";
             }
             jobId = *parser.ColumnParser(JOB_ID_COLUMN_NAME).GetOptionalString();
         }
@@ -78,11 +78,11 @@ std::tuple<TString, TParams, const std::function<std::pair<TString, NYdb::TParam
         {
             TResultSetParser parser(resultSets[2]);
             if (!parser.TryNextRow()) {
-                ythrow TControlPlaneStorageException(TIssuesIds::INTERNAL_ERROR) << "NOT FOUND " PENDING_SMALL_TABLE_NAME " where " TENANT_COLUMN_NAME " = \"" << request->TenantName << "\" and " SCOPE_COLUMN_NAME " = \"" << request->Scope << "\" and " QUERY_ID_COLUMN_NAME " = \"" << request->QueryId << "\"" ;
+                ythrow TControlPlaneStorageException(TIssuesIds::INTERNAL_ERROR) << "NOT FOUND " PENDING_SMALL_TABLE_NAME " where " TENANT_COLUMN_NAME " = \"" << request.tenant() << "\" and " SCOPE_COLUMN_NAME " = \"" << request.scope() << "\" and " QUERY_ID_COLUMN_NAME " = \"" << request.query_id().value() << "\"" ;
             }
             owner = *parser.ColumnParser(OWNER_COLUMN_NAME).GetOptionalString();
-            if (owner != request->Owner) {
-                ythrow TControlPlaneStorageException(TIssuesIds::BAD_REQUEST) << "OWNER of QUERY ID = \"" << request->QueryId << "\" MISMATCHED: \"" << request->Owner << "\" (received) != \"" << owner << "\" (selected)";
+            if (owner != request.owner_id()) {
+                ythrow TControlPlaneStorageException(TIssuesIds::BAD_REQUEST) << "OWNER of QUERY ID = \"" << request.query_id().value() << "\" MISMATCHED: \"" << request.owner_id() << "\" (received) != \"" << owner << "\" (selected)";
             }
             retryLimiter.Assign(
                 parser.ColumnParser(RETRY_COUNTER_COLUMN_NAME).GetOptionalUint64().GetOrElse(0),
@@ -91,16 +91,28 @@ std::tuple<TString, TParams, const std::function<std::pair<TString, NYdb::TParam
             );
         }
 
-        TMaybe<YandexQuery::QueryMeta::ComputeStatus> queryStatus = request->Status;
-        TMaybe<NYql::TIssues> issues = request->Issues;
-        TMaybe<NYql::TIssues> transientIssues = request->TransientIssues;
-
+        TMaybe<YandexQuery::QueryMeta::ComputeStatus> queryStatus;
+        if (request.status() != YandexQuery::QueryMeta::COMPUTE_STATUS_UNSPECIFIED) {
+            queryStatus = request.status();
+        }
+        TMaybe<NYql::TIssues> issues;
+        if (request.issues().size() > 0) {
+            NYql::TIssues requestIssues;
+            NYql::IssuesFromMessage(request.issues(), requestIssues);
+            issues = requestIssues;
+        }
+        TMaybe<NYql::TIssues> transientIssues;
+        if (request.transient_issues().size() > 0) {
+            NYql::TIssues requestTransientIssues;
+            NYql::IssuesFromMessage(request.transient_issues(), requestTransientIssues);
+            transientIssues = requestTransientIssues;
+        }
         // running query us locked for lease period
         TDuration backoff = taskLeaseTtl;
 
-        if (request->ResignQuery) {
+        if (request.resign_query()) {
             TRetryPolicyItem policy(0, TDuration::Seconds(1), TDuration::Zero());
-            auto it = retryPolicies.find(request->StatusCode);
+            auto it = retryPolicies.find(request.status_code());
             auto policyFound = it != retryPolicies.end();
             if (policyFound) {
                 policy = it->second;
@@ -125,7 +137,7 @@ std::tuple<TString, TParams, const std::function<std::pair<TString, NYdb::TParam
                     }
                 }
             }
-            CPS_LOG_AS_D(*actorSystem, "PingTaskRequest (resign): " << (!policyFound ? " DEFAULT POLICY" : "") << (owner ? " FAILURE" : "") << request->StatusCode << " " << retryLimiter.RetryCount << " " << retryLimiter.RetryCounterUpdatedAt << " " << backoff);
+            CPS_LOG_AS_D(*actorSystem, "PingTaskRequest (resign): " << (!policyFound ? " DEFAULT POLICY" : "") << (owner ? " FAILURE" : "") << NYql::NDqProto::StatusIds_StatusCode_Name(request.status_code()) << " " << retryLimiter.RetryCount << " " << retryLimiter.RetryCounterUpdatedAt << " " << backoff);
         }
 
         if (queryStatus) {
@@ -150,42 +162,38 @@ std::tuple<TString, TParams, const std::function<std::pair<TString, NYdb::TParam
             NYql::IssuesToMessage(newIssues, query.mutable_transient_issue());
         }
 
-        if (request->Statistics) {
-            *query.mutable_statistics()->mutable_json() = *request->Statistics;
-            *job.mutable_statistics()->mutable_json() = *request->Statistics;
+        if (request.statistics()) {
+            *query.mutable_statistics()->mutable_json() = request.statistics();
+            *job.mutable_statistics()->mutable_json() = request.statistics();
         }
 
-        if (request->ResultSetMetas) {
+        if (!request.result_set_meta().empty()) {
             // we will overwrite result_set_meta's COMPLETELY
-            query.clear_result_set_meta();
-            job.clear_result_set_meta();
-            for (const auto& resultSetMeta : *request->ResultSetMetas) {
-                *query.add_result_set_meta() = resultSetMeta;
-                *job.add_result_set_meta() = resultSetMeta;
-            }
+            *query.mutable_result_set_meta() = request.result_set_meta();
+            *job.mutable_result_set_meta() = request.result_set_meta();
         }
 
-        if (request->Ast) {
-            query.mutable_ast()->set_data(*request->Ast);
-            job.mutable_ast()->set_data(*request->Ast);
+        if (request.ast()) {
+            query.mutable_ast()->set_data(request.ast());
+            job.mutable_ast()->set_data(request.ast());
         }
 
-        if (request->Plan) {
-            query.mutable_plan()->set_json(*request->Plan);
-            job.mutable_plan()->set_json(*request->Plan);
+        if (request.plan()) {
+            query.mutable_plan()->set_json(request.plan());
+            job.mutable_plan()->set_json(request.plan());
         }
 
-        if (request->StartedAt) {
-            *query.mutable_meta()->mutable_started_at() = NProtoInterop::CastToProto(*request->StartedAt);
-            *job.mutable_query_meta()->mutable_started_at() = NProtoInterop::CastToProto(*request->StartedAt);
+        if (request.has_started_at()) {
+            *query.mutable_meta()->mutable_started_at() = request.started_at();
+            *job.mutable_query_meta()->mutable_started_at() = request.started_at();
         }
 
-        if (request->FinishedAt) {
-            *query.mutable_meta()->mutable_finished_at() = NProtoInterop::CastToProto(*request->FinishedAt);
-            *job.mutable_query_meta()->mutable_finished_at() = NProtoInterop::CastToProto(*request->FinishedAt);
+        if (request.has_finished_at()) {
+            *query.mutable_meta()->mutable_finished_at() = request.finished_at();
+            *job.mutable_query_meta()->mutable_finished_at() = request.finished_at();
             if (!query.meta().has_started_at()) {
-                *query.mutable_meta()->mutable_started_at() = NProtoInterop::CastToProto(*request->FinishedAt);
-                *job.mutable_query_meta()->mutable_started_at() = NProtoInterop::CastToProto(*request->FinishedAt);
+                *query.mutable_meta()->mutable_started_at() = request.finished_at();
+                *job.mutable_query_meta()->mutable_started_at() = request.finished_at();
             }
         }
 
@@ -197,43 +205,33 @@ std::tuple<TString, TParams, const std::function<std::pair<TString, NYdb::TParam
         }
 
         if (query.meta().status() == YandexQuery::QueryMeta::COMPLETED) {
-            *query.mutable_meta()->mutable_result_expire_at() = NProtoInterop::CastToProto(request->Deadline);
+            *query.mutable_meta()->mutable_result_expire_at() = request.deadline();
         }
 
-        if (request->StateLoadMode) {
-            internal.set_state_load_mode(request->StateLoadMode);
-            if (request->StateLoadMode == YandexQuery::FROM_LAST_CHECKPOINT) { // Saved checkpoint
+        if (request.state_load_mode()) {
+            internal.set_state_load_mode(request.state_load_mode());
+            if (request.state_load_mode() == YandexQuery::FROM_LAST_CHECKPOINT) { // Saved checkpoint
                 query.mutable_meta()->set_has_saved_checkpoints(true);
             }
         }
 
-        if (request->StreamingDisposition) {
-            internal.mutable_disposition()->CopyFrom(*request->StreamingDisposition);
+        if (request.has_disposition()) {
+            *internal.mutable_disposition() = request.disposition();
         }
 
-        if (request->Status && IsFinishedStatus(*request->Status)) {
+        if (request.status() && IsFinishedStatus(request.status())) {
             internal.clear_created_topic_consumers();
             internal.clear_dq_graph();
             internal.clear_dq_graph_index();
         }
 
-        if (!request->CreatedTopicConsumers.empty()) {
+        if (!request.created_topic_consumers().empty()) {
             std::set<Yq::Private::TopicConsumer, TTopicConsumerLess> mergedConsumers;
             for (auto&& c : *internal.mutable_created_topic_consumers()) {
                 mergedConsumers.emplace(std::move(c));
             }
-
-            for (const auto& c : request->CreatedTopicConsumers) {
-                Yq::Private::TopicConsumer proto;
-                proto.set_database_id(c.DatabaseId);
-                proto.set_database(c.Database);
-                proto.set_topic_path(c.TopicPath);
-                proto.set_consumer_name(c.ConsumerName);
-                proto.set_cluster_endpoint(c.ClusterEndpoint);
-                proto.set_use_ssl(c.UseSsl);
-                proto.set_token_name(c.TokenName);
-                proto.set_add_bearer_to_token(c.AddBearerToToken);
-                mergedConsumers.emplace(std::move(proto));
+            for (const auto& c : request.created_topic_consumers()) {
+                mergedConsumers.emplace(c);
             }
             internal.clear_created_topic_consumers();
             for (auto&& c : mergedConsumers) {
@@ -241,27 +239,24 @@ std::tuple<TString, TParams, const std::function<std::pair<TString, NYdb::TParam
             }
         }
 
-        if (!request->DqGraphs.empty()) {
-            internal.clear_dq_graph();
-            for (const auto& g : request->DqGraphs) {
-                internal.add_dq_graph(g);
-            }
+        if (!request.dq_graph().empty()) {
+            *internal.mutable_dq_graph() = request.dq_graph();
         }
 
-        if (request->DqGraphIndex) {
-            internal.set_dq_graph_index(request->DqGraphIndex);
+        if (request.dq_graph_index()) {
+            internal.set_dq_graph_index(request.dq_graph_index());
         }
 
         TSqlQueryBuilder writeQueryBuilder(tablePathPrefix, "HardPingTask(write)");
-        writeQueryBuilder.AddString("tenant", request->TenantName);
-        writeQueryBuilder.AddString("scope", request->Scope);
+        writeQueryBuilder.AddString("tenant", request.tenant());
+        writeQueryBuilder.AddString("scope", request.scope());
         writeQueryBuilder.AddString("job_id", jobId);
         writeQueryBuilder.AddString("job", job.SerializeAsString());
         writeQueryBuilder.AddString("query", query.SerializeAsString());
         writeQueryBuilder.AddInt64("status", query.meta().status());
         writeQueryBuilder.AddString("internal", internal.SerializeAsString());
-        writeQueryBuilder.AddString("result_id", request->ResultId);
-        writeQueryBuilder.AddString("query_id", request->QueryId);
+        writeQueryBuilder.AddString("result_id", request.result_id().value());
+        writeQueryBuilder.AddString("query_id", request.query_id().value());
 
         if (IsTerminalStatus(query.meta().status())) {
             // delete pending
@@ -301,7 +296,7 @@ std::tuple<TString, TParams, const std::function<std::pair<TString, NYdb::TParam
 
         TString updateResultSetsExpire;
         if (query.meta().status() == YandexQuery::QueryMeta::COMPLETED) {
-            writeQueryBuilder.AddTimestamp("result_sets_expire_at", request->Deadline);
+            writeQueryBuilder.AddTimestamp("result_sets_expire_at", NProtoInterop::CastFromProto(request.deadline()));
             updateResultSetsExpire = "`" RESULT_SETS_EXPIRE_AT_COLUMN_NAME "` = $result_sets_expire_at";
         } else {
             updateResultSetsExpire = "`" RESULT_SETS_EXPIRE_AT_COLUMN_NAME "` = NULL";
@@ -321,7 +316,8 @@ std::tuple<TString, TParams, const std::function<std::pair<TString, NYdb::TParam
             "WHERE `" SCOPE_COLUMN_NAME "` = $scope AND `" QUERY_ID_COLUMN_NAME "` = $query_id;\n"
         );
 
-        *response = internal.action();
+        response->set_action(internal.action());
+
         const auto writeQuery = writeQueryBuilder.Build();
         return std::make_pair(writeQuery.Sql, writeQuery.Params);
     };
@@ -330,12 +326,12 @@ std::tuple<TString, TParams, const std::function<std::pair<TString, NYdb::TParam
 }
 
 std::tuple<TString, TParams, const std::function<std::pair<TString, NYdb::TParams>(const TVector<NYdb::TResultSet>&)>> ConstructSoftPingTask(
-    const TEvControlPlaneStorage::TEvPingTaskRequest* request, std::shared_ptr<YandexQuery::QueryAction> response,
+    const Yq::Private::PingTaskRequest& request, std::shared_ptr<Yq::Private::PingTaskResult> response,
     const TString& tablePathPrefix, const TDuration& taskLeaseTtl) {
     TSqlQueryBuilder readQueryBuilder(tablePathPrefix, "SoftPingTask(read)");
-    readQueryBuilder.AddString("tenant", request->TenantName);
-    readQueryBuilder.AddString("scope", request->Scope);
-    readQueryBuilder.AddString("query_id", request->QueryId);
+    readQueryBuilder.AddString("tenant", request.tenant());
+    readQueryBuilder.AddString("scope", request.scope());
+    readQueryBuilder.AddString("query_id", request.query_id().value());
     readQueryBuilder.AddText(
         "SELECT `" INTERNAL_COLUMN_NAME "`\n"
         "FROM `" QUERIES_TABLE_NAME "` WHERE `" QUERY_ID_COLUMN_NAME "` = $query_id AND `" SCOPE_COLUMN_NAME "` = $scope;\n"
@@ -354,32 +350,32 @@ std::tuple<TString, TParams, const std::function<std::pair<TString, NYdb::TParam
         {
             TResultSetParser parser(resultSets[0]);
             if (!parser.TryNextRow()) {
-                ythrow TControlPlaneStorageException(TIssuesIds::INTERNAL_ERROR) << "NOT FOUND " QUERIES_TABLE_NAME " where " SCOPE_COLUMN_NAME " = \"" << request->Scope << "\" and " QUERY_ID_COLUMN_NAME " = \"" << request->QueryId << "\"" ;
+                ythrow TControlPlaneStorageException(TIssuesIds::INTERNAL_ERROR) << "NOT FOUND " QUERIES_TABLE_NAME " where " SCOPE_COLUMN_NAME " = \"" << request.scope() << "\" and " QUERY_ID_COLUMN_NAME " = \"" << request.query_id().value() << "\"" ;
             }
             if (!internal.ParseFromString(*parser.ColumnParser(INTERNAL_COLUMN_NAME).GetOptionalString())) {
-                ythrow TControlPlaneStorageException(TIssuesIds::INTERNAL_ERROR) << "ERROR PARSING " QUERIES_TABLE_NAME "." INTERNAL_COLUMN_NAME " where " QUERY_ID_COLUMN_NAME " = \"" << request->QueryId << "\" and " SCOPE_COLUMN_NAME " = \"" << request->Scope << "\"";
+                ythrow TControlPlaneStorageException(TIssuesIds::INTERNAL_ERROR) << "ERROR PARSING " QUERIES_TABLE_NAME "." INTERNAL_COLUMN_NAME " where " QUERY_ID_COLUMN_NAME " = \"" << request.query_id().value() << "\" and " SCOPE_COLUMN_NAME " = \"" << request.scope() << "\"";
             }
         }
 
         {
             TResultSetParser parser(resultSets[1]);
             if (!parser.TryNextRow()) {
-                ythrow TControlPlaneStorageException(TIssuesIds::INTERNAL_ERROR) << "NOT FOUND " PENDING_SMALL_TABLE_NAME " where " TENANT_COLUMN_NAME " = \"" << request->TenantName << "\" and " SCOPE_COLUMN_NAME " = \"" << request->Scope << "\" and " QUERY_ID_COLUMN_NAME " = \"" << request->QueryId << "\"" ;
+                ythrow TControlPlaneStorageException(TIssuesIds::INTERNAL_ERROR) << "NOT FOUND " PENDING_SMALL_TABLE_NAME " where " TENANT_COLUMN_NAME " = \"" << request.tenant() << "\" and " SCOPE_COLUMN_NAME " = \"" << request.scope() << "\" and " QUERY_ID_COLUMN_NAME " = \"" << request.query_id().value() << "\"" ;
             }
             owner = *parser.ColumnParser(OWNER_COLUMN_NAME).GetOptionalString();
-            if (owner != request->Owner) {
-                ythrow TControlPlaneStorageException(TIssuesIds::BAD_REQUEST) << "OWNER of QUERY ID = \"" << request->QueryId << "\" MISMATCHED: \"" << request->Owner << "\" (received) != \"" << owner << "\" (selected)";
+            if (owner != request.owner_id()) {
+                ythrow TControlPlaneStorageException(TIssuesIds::BAD_REQUEST) << "OWNER of QUERY ID = \"" << request.query_id().value() << "\" MISMATCHED: \"" << request.owner_id() << "\" (received) != \"" << owner << "\" (selected)";
             }
         }
 
-        *response = internal.action();
+        response->set_action(internal.action());
 
         TSqlQueryBuilder writeQueryBuilder(tablePathPrefix, "SoftPingTask(write)");
         writeQueryBuilder.AddTimestamp("now", TInstant::Now());
         writeQueryBuilder.AddTimestamp("ttl", TInstant::Now() + taskLeaseTtl);
-        writeQueryBuilder.AddString("tenant", request->TenantName);
-        writeQueryBuilder.AddString("scope", request->Scope);
-        writeQueryBuilder.AddString("query_id", request->QueryId);
+        writeQueryBuilder.AddString("tenant", request.tenant());
+        writeQueryBuilder.AddString("scope", request.scope());
+        writeQueryBuilder.AddString("query_id", request.query_id().value());
 
         writeQueryBuilder.AddText(
             "UPDATE `" PENDING_SMALL_TABLE_NAME "` SET `" LAST_SEEN_AT_COLUMN_NAME "` = $now, `" ASSIGNED_UNTIL_COLUMN_NAME "` = $ttl\n"
@@ -396,19 +392,19 @@ std::tuple<TString, TParams, const std::function<std::pair<TString, NYdb::TParam
 void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvPingTaskRequest::TPtr& ev)
 {
     TInstant startTime = TInstant::Now();
-    TEvControlPlaneStorage::TEvPingTaskRequest* request = ev->Get();
-    const TString cloudId = request->CloudId;
-    const TString scope = request->Scope;
-    TRequestCountersPtr requestCounters = Counters.GetScopeCounters(cloudId, scope, RTS_PING_TASK);
+    Yq::Private::PingTaskRequest& request = ev->Get()->Request;
+    const TString cloudId = "";
+    const TString scope = request.scope();
+    TRequestCountersPtr requestCounters = Counters.GetScopeCounters("" /*CloudId*/, scope, RTS_PING_TASK);
     requestCounters->InFly->Inc();
-    requestCounters->RequestBytes->Add(request->GetByteSize());
-    const TString queryId = request->QueryId;
-    const TString owner = request->Owner;
-    const TInstant deadline = request->Deadline;
+    requestCounters->RequestBytes->Add(ev->Get()->GetByteSize());
+    const TString queryId = request.query_id().value();
+    const TString owner = request.owner_id();
+    const TInstant deadline = NProtoInterop::CastFromProto(request.deadline());
 
-    CPS_LOG_T("PingTaskRequest: " << request->TenantName << " " << scope << " " << queryId
+    CPS_LOG_T("PingTaskRequest: " << request.tenant() << " " << scope << " " << queryId
         << " " << owner << " " << deadline << " "
-        << (request->Status ? YandexQuery::QueryMeta_ComputeStatus_Name(*request->Status) : "no status"));
+        << (request.status() ? YandexQuery::QueryMeta_ComputeStatus_Name(request.status()) : "no status"));
 
     NYql::TIssues issues = ValidatePingTask(scope, queryId, owner, deadline, Config.ResultSetsTtl);
     if (issues) {
@@ -419,10 +415,10 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvPingTaskReq
         return;
     }
 
-    std::shared_ptr<YandexQuery::QueryAction> response = std::make_shared<YandexQuery::QueryAction>();
+    std::shared_ptr<Yq::Private::PingTaskResult> response = std::make_shared<Yq::Private::PingTaskResult>();
 
-    if (request->Status)
-        Counters.GetFinalStatusCounters(cloudId, scope)->IncByStatus(*request->Status);
+    if (request.status())
+        Counters.GetFinalStatusCounters(cloudId, scope)->IncByStatus(request.status());
     auto pingTaskParams = DoesPingTaskUpdateQueriesTable(request) ?
         ConstructHardPingTask(request, response, YdbConnection->TablePathPrefix, Config.AutomaticQueriesTtl, Config.TaskLeaseTtl, Config.RetryPolicies) :
         ConstructSoftPingTask(request, response, YdbConnection->TablePathPrefix, Config.TaskLeaseTtl);
@@ -432,8 +428,8 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvPingTaskReq
 
     auto debugInfo = Config.Proto.GetEnableDebugMode() ? std::make_shared<TDebugInfo>() : TDebugInfoPtr{};
     auto result = ReadModifyWrite(NActors::TActivationContext::ActorSystem(), readQuery, readParams, prepareParams, requestCounters, debugInfo);
-    auto prepare = [response] { return std::make_tuple(*response); };
-    auto success = SendResponseTuple<TEvControlPlaneStorage::TEvPingTaskResponse, std::tuple<YandexQuery::QueryAction>>(
+    auto prepare = [response] { return *response; };
+    auto success = SendResponse<TEvControlPlaneStorage::TEvPingTaskResponse, Yq::Private::PingTaskResult>(
         "PingTaskRequest",
         NActors::TActivationContext::ActorSystem(),
         result,

+ 56 - 16
ydb/core/yq/libs/control_plane_storage/message_builders.h

@@ -3,6 +3,7 @@
 #include <util/datetime/base.h>
 
 #include <ydb/library/yql/dq/actors/protos/dq_status_codes.pb.h>
+#include <ydb/library/yql/public/issue/yql_issue_message.h>
 #include <ydb/public/api/protos/yq.pb.h>
 
 #include <ydb/core/yq/libs/control_plane_storage/events/events.h>
@@ -1171,22 +1172,61 @@ public:
 
     std::unique_ptr<TEvControlPlaneStorage::TEvPingTaskRequest> Build()
     {
-        auto request = std::make_unique<TEvControlPlaneStorage::TEvPingTaskRequest>(TenantName, CloudId, Scope, QueryId, Owner, Deadline, ResultId);
-        request->Status = Status;
-        request->Issues = Issues;
-        request->TransientIssues = TransientIssues;
-        request->Statistics = Statistics;
-        request->ResultSetMetas = ResultSetMetas;
-        request->Ast = Ast;
-        request->Plan = Plan;
-        request->StartedAt = StartedAt;
-        request->FinishedAt = FinishedAt;
-        request->ResignQuery = ResignQuery;
-        request->StatusCode = StatusCode;
-        request->CreatedTopicConsumers = CreatedTopicConsumers;
-        request->DqGraphs = DqGraphs;
-        request->DqGraphIndex = DqGraphIndex;
-        return request;
+        Yq::Private::PingTaskRequest request;
+        request.set_owner_id(Owner);
+        request.mutable_query_id()->set_value(QueryId);
+        request.mutable_result_id()->set_value(ResultId);
+        if (Status) {
+            request.set_status(*Status);
+        }
+        request.set_status_code(StatusCode);
+        if (Issues) {
+            NYql::IssuesToMessage(*Issues, request.mutable_issues());
+        }
+        if (TransientIssues) {
+            NYql::IssuesToMessage(*TransientIssues, request.mutable_transient_issues());
+        }
+        if (Statistics) {
+            request.set_statistics(*Statistics);
+        }
+        if (ResultSetMetas) {
+            for (const auto& meta : *ResultSetMetas) {
+                *request.add_result_set_meta() = meta;
+            }
+        }
+        for (const auto& dqGraph : DqGraphs) {
+            request.add_dq_graph(dqGraph);
+        }
+        request.set_dq_graph_index(DqGraphIndex);
+        if (Ast) {
+            request.set_ast(*Ast);
+        }
+        if (Plan) {
+            request.set_plan(*Plan);
+        }
+        request.set_resign_query(ResignQuery);
+        for (const auto& consumer : CreatedTopicConsumers) {
+            auto& cons = *request.add_created_topic_consumers();
+            cons.set_database_id(consumer.DatabaseId);
+            cons.set_database(consumer.Database);
+            cons.set_topic_path(consumer.TopicPath);
+            cons.set_consumer_name(consumer.ConsumerName);
+            cons.set_cluster_endpoint(consumer.ClusterEndpoint);
+            cons.set_use_ssl(consumer.UseSsl);
+            cons.set_token_name(consumer.TokenName);
+            cons.set_add_bearer_to_token(consumer.AddBearerToToken);
+        }
+        request.set_tenant(TenantName);
+        request.set_scope(Scope);
+        *request.mutable_deadline() = NProtoInterop::CastToProto(Deadline);
+        if (StartedAt) {
+            *request.mutable_started_at() = NProtoInterop::CastToProto(*StartedAt);
+        }
+        if (FinishedAt) {
+            *request.mutable_finished_at() = NProtoInterop::CastToProto(*FinishedAt);
+        }
+
+        return std::make_unique<TEvControlPlaneStorage::TEvPingTaskRequest>(std::move(request));
     }
 };
 

+ 17 - 19
ydb/core/yq/libs/control_plane_storage/util.cpp

@@ -132,25 +132,23 @@ NConfig::TControlPlaneStorageConfig FillDefaultParameters(NConfig::TControlPlane
     return config;
 }
 
-bool DoesPingTaskUpdateQueriesTable(const TEvControlPlaneStorage::TEvPingTaskRequest* request) {
-    if (!request) {
-        return false;
-    }
-    return request->Status ||
-        request->Issues ||
-        request->TransientIssues ||
-        request->Statistics ||
-        request->ResultSetMetas ||
-        request->Ast ||
-        request->Plan ||
-        request->StartedAt ||
-        request->FinishedAt ||
-        request->ResignQuery ||
-        !request->CreatedTopicConsumers.empty() ||
-        !request->DqGraphs.empty() ||
-        request->DqGraphIndex ||
-        request->StateLoadMode ||
-        request->StreamingDisposition;
+bool DoesPingTaskUpdateQueriesTable(const Yq::Private::PingTaskRequest& request) {
+    return request.status() != YandexQuery::QueryMeta::COMPUTE_STATUS_UNSPECIFIED
+        || !request.issues().empty()
+        || !request.transient_issues().empty()
+        || !request.statistics()
+        || !request.result_set_meta().empty()
+        || request.ast()
+        || request.plan()
+        || request.has_started_at()
+        || request.has_finished_at()
+        || request.resign_query()
+        || !request.created_topic_consumers().empty()
+        || !request.dq_graph().empty()
+        || request.dq_graph_index()
+        || request.state_load_mode()
+        || request.has_disposition()
+    ;
 }
 
 NYdb::TValue PackItemsToList(const TVector<NYdb::TValue>& items) {

+ 1 - 1
ydb/core/yq/libs/control_plane_storage/util.h

@@ -38,7 +38,7 @@ TDuration GetDuration(const TString& value, const TDuration& defaultValue);
 
 NConfig::TControlPlaneStorageConfig FillDefaultParameters(NConfig::TControlPlaneStorageConfig config);
 
-bool DoesPingTaskUpdateQueriesTable(const TEvControlPlaneStorage::TEvPingTaskRequest* request);
+bool DoesPingTaskUpdateQueriesTable(const Yq::Private::PingTaskRequest& request);
 
 NYdb::TValue PackItemsToList(const TVector<NYdb::TValue>& items);
 

+ 16 - 4
ydb/core/yq/libs/private_client/loopback_service.cpp

@@ -43,8 +43,9 @@ private:
         hFunc(TEvInternalService::TEvWriteResultRequest, Handle)
 
         hFunc(NYq::TEvControlPlaneStorage::TEvNodesHealthCheckResponse, Handle)
-        hFunc(NYq::TEvControlPlaneStorage::TEvWriteResultDataResponse, Handle)
         hFunc(NYq::TEvControlPlaneStorage::TEvGetTaskResponse, Handle)
+        hFunc(NYq::TEvControlPlaneStorage::TEvPingTaskResponse, Handle)
+        hFunc(NYq::TEvControlPlaneStorage::TEvWriteResultDataResponse, Handle)
     );
 
     void Handle(TEvInternalService::TEvHealthCheckRequest::TPtr& ev) {
@@ -87,13 +88,24 @@ private:
         }
     }
 
-    void Handle(TEvInternalService::TEvPingTaskRequest::TPtr& /*ev*/) {
-    /*
+    void Handle(TEvInternalService::TEvPingTaskRequest::TPtr& ev) {
         Cookie++;
         Senders[Cookie] = ev->Sender;
         auto request = ev->Get()->Request;
         Send(NYq::ControlPlaneStorageServiceActorId(), new NYq::TEvControlPlaneStorage::TEvPingTaskRequest(std::move(request)), 0, Cookie);
-    */
+    }
+
+    void Handle(NYq::TEvControlPlaneStorage::TEvPingTaskResponse::TPtr& ev) {
+        auto it = Senders.find(ev->Cookie);
+        if (it != Senders.end()) {
+            if (ev->Get()->Issues.Size() == 0) {
+                Send(it->second, new TEvInternalService::TEvPingTaskResponse(ev->Get()->Record));
+            } else {
+                auto issues = ev->Get()->Issues;
+                Send(it->second, new TEvInternalService::TEvPingTaskResponse(NYdb::EStatus::INTERNAL_ERROR, std::move(issues)));
+            }
+            Senders.erase(it);
+        }
     }
 
     void Handle(TEvInternalService::TEvWriteResultRequest::TPtr& ev) {