Browse Source

Implement batch limits in ReadTable KIKIMR-16827

va-kuznecov 1 year ago
parent
commit
f9c6b6f0bd

+ 12 - 4
ydb/core/grpc_services/rpc_read_table.cpp

@@ -145,6 +145,8 @@ public:
             InactiveServerTimerPending_ = true;
         }
 
+        MessageSizeLimit = cfg.GetMessageSizeLimit();
+
         SendProposeRequest(ctx);
 
         auto actorId = SelfId();
@@ -481,6 +483,9 @@ private:
             return ReplyFinishStream(StatusIds::BAD_REQUEST, message, ctx);
         }
 
+        MessageSizeLimit = std::min(MessageSizeLimit, req->batch_limit_bytes() ? req->batch_limit_bytes() : Max<ui64>());
+        MessageRowsLimit = req->batch_limit_rows();
+
         // Snapshots are always enabled and cannot be disabled
         switch (req->use_snapshot()) {
             case Ydb::FeatureFlag::STATUS_UNSPECIFIED:
@@ -543,8 +548,6 @@ private:
         LOG_NOTICE_S(ctx, NKikimrServices::READ_TABLE_API,
             SelfId() << " Finish grpc stream, status: " << (int)status);
 
-        auto &cfg = AppData(ctx)->StreamingConfig.GetOutputStreamConfig();
-
         // Answer all pending quota requests.
         while (!QuotaRequestQueue_.empty()) {
             auto request = QuotaRequestQueue_.front();
@@ -554,7 +557,8 @@ private:
             TAutoPtr<TEvTxProcessing::TEvStreamQuotaResponse> response
                 = new TEvTxProcessing::TEvStreamQuotaResponse;
             response->Record.SetTxId(rec.GetTxId());
-            response->Record.SetMessageSizeLimit(cfg.GetMessageSizeLimit());
+            response->Record.SetMessageSizeLimit(MessageSizeLimit);
+            response->Record.SetMessageRowsLimit(MessageRowsLimit);
             response->Record.SetReservedMessages(0);
 
             LOG_DEBUG_S(ctx, NKikimrServices::READ_TABLE_API,
@@ -632,7 +636,8 @@ private:
         TAutoPtr<TEvTxProcessing::TEvStreamQuotaResponse> response
             = new TEvTxProcessing::TEvStreamQuotaResponse;
         response->Record.SetTxId(rec.GetTxId());
-        response->Record.SetMessageSizeLimit(cfg.GetMessageSizeLimit());
+        response->Record.SetMessageSizeLimit(MessageSizeLimit);
+        response->Record.SetMessageRowsLimit(MessageRowsLimit);
         response->Record.SetReservedMessages(quotaSize);
 
         LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::READ_TABLE_API,
@@ -754,6 +759,9 @@ private:
     bool InactiveClientTimerPending_ = false;
     bool InactiveServerTimerPending_ = false;
 
+    ui64 MessageSizeLimit = 0;
+    ui64 MessageRowsLimit = 0;
+
     struct TBuffEntry
     {
         TString Buf;

+ 1 - 0
ydb/core/protos/tx.proto

@@ -399,6 +399,7 @@ message TEvStreamQuotaResponse {
     optional uint64 MessageSizeLimit = 2;
     optional uint32 ReservedMessages = 3;
     optional uint64 RowLimit = 4;
+    optional uint64 MessageRowsLimit = 5;
 }
 
 message TEvStreamQuotaRelease {

+ 5 - 0
ydb/core/tx/datashard/read_table_scan.cpp

@@ -311,6 +311,7 @@ public:
         , Driver(nullptr)
         , MessageQuota(0)
         , MessageSizeLimit(10 << 20)
+        , MessageRowsLimit(0)
         , TableInfo(tableInfo)
         , Tx(tx)
         , ScanRange(tx.GetRange())
@@ -408,6 +409,7 @@ private:
         auto &rec = ev->Get()->Record;
 
         MessageSizeLimit = rec.GetMessageSizeLimit();
+        MessageRowsLimit = rec.GetMessageRowsLimit();
         MessageQuota += rec.GetReservedMessages();
         RowLimit = rec.GetRowLimit();
 
@@ -433,6 +435,7 @@ private:
         response->Record.SetTxId(TxId);
         response->Record.SetMessageQuota(MessageQuota);
         response->Record.SetMessageSizeLimit(MessageSizeLimit);
+        //response->Record.SetMessageRowsLimit(MessageRowsLimit); for monitoring only
         response->Record.SetRowsLimit(RowLimit);
         response->Record.SetPendingAcks(PendingAcks);
         response->Record.SetResultSize(Writer->GetMessageSize());
@@ -532,6 +535,7 @@ private:
 
         // May collect more rows.
         if (Writer->GetMessageSize() < MessageSizeLimit
+            && (!MessageRowsLimit || MessageRowsLimit > rows)
             && (!RowLimit || RowLimit > rows)
             && !last)
             return EScan::Feed;
@@ -631,6 +635,7 @@ private:
     IDriver *Driver;
     ui64 MessageQuota;
     ui64 MessageSizeLimit;
+    ui64 MessageRowsLimit;
     TString Error;
     THolder<TRowsToResult> Writer;
     TUserTable::TCPtr TableInfo;

+ 2 - 0
ydb/core/tx/tx_proxy/read_table.h

@@ -27,6 +27,8 @@ namespace NTxProxy {
         EReadTableFormat DataFormat = EReadTableFormat::YdbResultSet;
         bool Ordered = false;
         bool RequireResultSet = false;
+        ui64 MaxBatchSizeBytes = Max<ui64>();
+        ui64 MaxBatchSizeRows = Max<ui64>();
     };
 
     IActor* CreateReadTableSnapshotWorker(const TReadTableSettings& settings);

+ 6 - 0
ydb/core/tx/tx_proxy/read_table_impl.cpp

@@ -280,6 +280,7 @@ private:
         ui64 Reserved = 0;
         ui64 Allocated = 0;
         ui64 MessageSize = 0;
+        ui64 MessageRows = 0;
     };
 
 public:
@@ -1697,6 +1698,7 @@ private:
 
         TxProxyMon->ReportStatusStreamData->Inc();
         ctx.Send(Settings.Owner, x.Release(), 0, Settings.Cookie);
+
         SentResultSet = true;
 
         if (state.QuotaReserved > 0) {
@@ -2019,9 +2021,11 @@ private:
 
         Quota.Allocated += record.GetReservedMessages();
         Quota.MessageSize = record.GetMessageSizeLimit();
+        Quota.MessageRows = record.GetMessageRowsLimit();
 
         TXLOG_D("Updated quotas, allocated = " << Quota.Allocated
                 << ", message size = " << Quota.MessageSize
+                << ", message rows = " << Quota.MessageRows
                 << ", available = " << (Quota.Allocated - Quota.Reserved));
 
         ProcessQuotaRequests(ctx);
@@ -2090,6 +2094,7 @@ private:
             response->Record.SetTxId(state.ReadTxId);
             response->Record.SetReservedMessages(available);
             response->Record.SetMessageSizeLimit(Quota.MessageSize);
+            response->Record.SetMessageRowsLimit(Quota.MessageRows);
             if (RemainingRows != Max<ui64>()) {
                 response->Record.SetRowLimit(RemainingRows);
             }
@@ -2121,6 +2126,7 @@ private:
             response->Record.SetTxId(state.ReadTxId);
             response->Record.SetReservedMessages(0);
             response->Record.SetMessageSizeLimit(Quota.MessageSize);
+            response->Record.SetMessageRowsLimit(Quota.MessageRows);
             ctx.Send(state.QuotaActor, response.Release());
         }
 

+ 3 - 0
ydb/public/api/protos/ydb_table.proto

@@ -1038,6 +1038,9 @@ message ReadTableRequest {
     uint64 row_limit = 6;
     // Use a server-side snapshot
     Ydb.FeatureFlag.Status use_snapshot = 7;
+    // Per-batch limits
+    uint64 batch_limit_bytes = 8;
+    uint64 batch_limit_rows = 9;
 }
 
 // ReadTable doesn't use Operation, returns result directly

+ 8 - 0
ydb/public/sdk/cpp/client/ydb_table/impl/table_client.cpp

@@ -775,6 +775,14 @@ NThreading::TFuture<std::pair<TPlainStatus, TTableClient::TImpl::TReadTableStrea
         }
     }
 
+    if (settings.BatchLimitBytes_) {
+        request.set_batch_limit_bytes(*settings.BatchLimitBytes_);
+    }
+
+    if (settings.BatchLimitRows_) {
+        request.set_batch_limit_rows(*settings.BatchLimitRows_);
+    }
+
     auto promise = NewPromise<std::pair<TPlainStatus, TReadTableStreamProcessorPtr>>();
 
     Connections_->StartReadStream<Ydb::Table::V1::TableService, Ydb::Table::ReadTableRequest, Ydb::Table::ReadTableResponse>(

+ 4 - 0
ydb/public/sdk/cpp/client/ydb_table/table.h

@@ -1546,6 +1546,10 @@ struct TReadTableSettings : public TRequestSettings<TReadTableSettings> {
     FLUENT_SETTING_OPTIONAL(ui64, RowLimit);
 
     FLUENT_SETTING_OPTIONAL(bool, UseSnapshot);
+
+    FLUENT_SETTING_OPTIONAL(ui64, BatchLimitBytes);
+
+    FLUENT_SETTING_OPTIONAL(ui64, BatchLimitRows);
 };
 
 //! Represents all session operations

+ 100 - 0
ydb/services/ydb/ydb_table_ut.cpp

@@ -1738,10 +1738,12 @@ R"___(<main>: Error: Transaction not found: , code: 2015
 
                 UNIT_ASSERT_VALUES_EQUAL(streamPart.IsSuccess(), true);
 
+                int readRows = 0;
                 auto rsParser = TResultSetParser(streamPart.ExtractPart());
                 while (rsParser.TryNextRow()) {
                     auto columns = rsParser.ColumnsCount();
                     const auto& expRow = expected[row++];
+                    ++readRows;
                     TString tmp = "[";
                     for (size_t c = 0; c < columns; c++) {
                         auto colYson = FormatValueYson(rsParser.GetValue(c));
@@ -1752,6 +1754,9 @@ R"___(<main>: Error: Transaction not found: , code: 2015
                     tmp += "]";
                     UNIT_ASSERT_VALUES_EQUAL(tmp, expRow);
                 }
+                if (rowLimit) {
+                    UNIT_ASSERT(readRows <= 1);
+                }
             }
         }
         UNIT_ASSERT_VALUES_EQUAL(row, rowLimit ? 5 : expected.size());
@@ -1765,6 +1770,101 @@ R"___(<main>: Error: Transaction not found: , code: 2015
         TestReadTableMultiShardWithDescribe(true);
     }
 
+
+    void TestReadTable(TSession& session, int rowsTotalCount, int batchLimitBytes, int batchLimitRows) {
+        int row = 0;
+        TReadTableSettings readTableSettings;
+        readTableSettings.Ordered(true);
+        readTableSettings.BatchLimitBytes(batchLimitBytes);
+        readTableSettings.BatchLimitRows(batchLimitRows);
+
+        auto it = session.ReadTable("Root/Test", readTableSettings).ExtractValueSync();
+
+        TStringStream out;
+        while (true) {
+            TReadTableResultPart streamPart = it.ReadNext().GetValueSync();
+
+            if (streamPart.EOS()) {
+                break;
+            }
+
+            UNIT_ASSERT_VALUES_EQUAL(streamPart.IsSuccess(), true);
+
+            auto rsParser = TResultSetParser(streamPart.ExtractPart());
+            i64 batchRows = 0;
+            out << "---- batch start ----" << Endl;
+            while (rsParser.TryNextRow()) {
+                auto columns = rsParser.ColumnsCount();
+                ++row;
+                ++batchRows;
+                TString tmp = "[";
+                for (size_t c = 0; c < columns; c++) {
+                    auto colYson = FormatValueYson(rsParser.GetValue(c));
+                    tmp += colYson;
+                    if (c != columns - 1)
+                        tmp += ";";
+                }
+                tmp += "]";
+                out << tmp << Endl;
+            }
+            out << "---- batch end ----" << Endl;
+            UNIT_ASSERT(!batchLimitRows || batchRows <= batchLimitRows);
+        }
+        Cerr << out.Str();
+        UNIT_ASSERT_VALUES_EQUAL(row, rowsTotalCount);
+    }
+
+    Y_UNIT_TEST(TestReadTableBatchLimits) {
+        TKikimrWithGrpcAndRootSchema server;
+        ui16 grpc = server.GetPort();
+
+        server.Server_->GetRuntime()->SetLogPriority(NKikimrServices::GRPC_SERVER, NLog::PRI_TRACE);
+        server.Server_->GetRuntime()->SetLogPriority(NKikimrServices::READ_TABLE_API, NLog::PRI_TRACE);
+        server.Server_->GetRuntime()->SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
+
+        TString location = TStringBuilder() << "localhost:" << grpc;
+
+        auto connection = NYdb::TDriver(
+            TDriverConfig()
+                .SetEndpoint(location));
+        NYdb::NTable::TTableClient client(connection);
+        auto session = client.CreateSession().ExtractValueSync().GetSession();
+
+        auto tableBuilder = client.GetTableBuilder();
+        tableBuilder
+            .AddNullableColumn("Key", EPrimitiveType::Uint32)
+            .AddNullableColumn("Key2", EPrimitiveType::Uint32)
+            .AddNullableColumn("Value", EPrimitiveType::String);
+        tableBuilder.SetPrimaryKeyColumns(TVector<TString>{"Key", "Key2"});
+
+        TCreateTableSettings createTableSettings =
+            TCreateTableSettings()
+                .PartitioningPolicy(TPartitioningPolicy().UniformPartitions(10));
+
+        auto result = session.CreateTable("Root/Test", tableBuilder.Build(), createTableSettings).ExtractValueSync();
+        UNIT_ASSERT_EQUAL(result.GetStatus(), EStatus::SUCCESS);
+
+        int rowsTotalCount = 100;
+        TStringStream query;
+        query << R"_(  UPSERT INTO `Root/Test` (Key, Key2, Value) VALUES )_";
+        for (int i = 0; i < rowsTotalCount; ++i) {
+            query << Sprintf(R"_( (%d, %d, "A")%s)_", i, 2 * i, i + 1 < rowsTotalCount ? "," : ";");
+        }
+        result = session.ExecuteDataQuery(query.Str(),
+            TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx()).ExtractValueSync();
+
+        UNIT_ASSERT_EQUAL(result.GetStatus(), EStatus::SUCCESS);
+
+        // all limits disabled
+        TestReadTable(session, rowsTotalCount, 0, 0);
+        for (int i = 1; i <= rowsTotalCount; i *= 2) {
+            // test BatchLimitRows
+            TestReadTable(session, rowsTotalCount, 0, i);
+        }
+        // test BatchLimitBytes == 1 returns not more than one row
+        TestReadTable(session, rowsTotalCount, 1, 1);
+    }
+
     Y_UNIT_TEST(TestReadWrongTable) {
         TKikimrWithGrpcAndRootSchema server;
         server.Server_->GetRuntime()->SetLogPriority(NKikimrServices::GRPC_SERVER, NLog::PRI_TRACE);