Browse Source

add limit_rows and truncated to viewer/query (#7715)

Alexey Efimov 7 months ago
parent
commit
e0e98b1d85
1 changed files with 31 additions and 4 deletions
  1. 31 4
      ydb/core/viewer/viewer_query.h

+ 31 - 4
ydb/core/viewer/viewer_query.h

@@ -31,6 +31,8 @@ class TJsonQuery : public TViewerPipeClient {
     TString TransactionMode;
     bool Direct = false;
     bool IsBase64Encode = true;
+    int LimitRows = 10000;
+    int TotalRows = 0;
 
     enum ESchemaType {
         Classic,
@@ -89,6 +91,9 @@ public:
         if (params.Has("base64")) {
             IsBase64Encode = FromStringWithDefault<bool>(params.Get("base64"), true);
         }
+        if (params.Has("limit_rows")) {
+            LimitRows = std::clamp<int>(FromStringWithDefault<int>(params.Get("limit_rows"), 10000), 1, 100000);
+        }
         Direct = FromStringWithDefault<bool>(params.Get("direct"), Direct);
     }
 
@@ -124,6 +129,9 @@ public:
             if (requestData.Has("base64")) {
                 IsBase64Encode = requestData["base64"].GetBooleanRobust();
             }
+            if (requestData.Has("limit_rows")) {
+                LimitRows = std::clamp<int>(requestData["limit_rows"].GetIntegerRobust(), 1, 100000);
+            }
         }
         return success;
     }
@@ -307,7 +315,13 @@ public:
             request.SetAction(NKikimrKqp::QUERY_ACTION_EXPLAIN);
             request.SetType(NKikimrKqp::QUERY_TYPE_SQL_SCRIPT);
         }
-        if (Stats == "profile") {
+        if (Stats == "none") {
+            request.SetStatsMode(NYql::NDqProto::DQ_STATS_MODE_NONE);
+            request.SetCollectStats(Ydb::Table::QueryStatsCollection::STATS_COLLECTION_NONE);
+        } else if (Stats == "basic") {
+            request.SetStatsMode(NYql::NDqProto::DQ_STATS_MODE_BASIC);
+            request.SetCollectStats(Ydb::Table::QueryStatsCollection::STATS_COLLECTION_BASIC);
+        } else if (Stats == "profile") {
             request.SetStatsMode(NYql::NDqProto::DQ_STATS_MODE_PROFILE);
             request.SetCollectStats(Ydb::Table::QueryStatsCollection::STATS_COLLECTION_PROFILE);
         } else if (Stats == "full") {
@@ -479,13 +493,23 @@ private:
     }
 
     void HandleReply(NKqp::TEvKqpExecuter::TEvStreamData::TPtr& ev) {
-        const NKikimrKqp::TEvExecuterStreamData& data(ev->Get()->Record);
+        NKikimrKqp::TEvExecuterStreamData& data(ev->Get()->Record);
 
-        ResultSets.emplace_back();
-        ResultSets.back() = std::move(data.GetResultSet());
+        if (TotalRows < LimitRows) {
+            int rowsAvailable = LimitRows - TotalRows;
+            if (data.GetResultSet().rows_size() > rowsAvailable) {
+                data.MutableResultSet()->mutable_rows()->Truncate(rowsAvailable);
+                data.MutableResultSet()->set_truncated(true);
+            }
+            TotalRows += data.GetResultSet().rows_size();
+            ResultSets.emplace_back() = std::move(*data.MutableResultSet());
+        }
 
         THolder<NKqp::TEvKqpExecuter::TEvStreamDataAck> ack = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>();
         ack->Record.SetSeqNo(ev->Get()->Record.GetSeqNo());
+        if (TotalRows >= LimitRows) {
+            ack->Record.SetEnough(true);
+        }
         Send(ev->Sender, ack.Release());
     }
 
@@ -618,6 +642,9 @@ private:
                             jsonColumn = ColumnValueToJsonValue(rsParser.ColumnParser(columnNum));
                         }
                     }
+                    if (resultSet.Truncated()) {
+                        jsonResult["truncated"] = true;
+                    }
                 }
             }