Browse Source

Check queue overflow for direct ops KIKIMR-19080

ilnaz 1 year ago
parent
commit
2d1202dcd2

+ 24 - 25
ydb/core/tx/datashard/datashard__op_rows.cpp

@@ -128,6 +128,10 @@ static void ReadOnly(NKikimrTxDataShard::TEvUploadRowsResponse& response) {
     response.SetStatus(NKikimrTxDataShard::TError::READONLY);
 }
 
+static void Overloaded(NKikimrTxDataShard::TEvUploadRowsResponse& response) {
+    response.SetStatus(NKikimrTxDataShard::TError::SHARD_IS_BLOCKED);
+}
+
 static void OutOfSpace(NKikimrTxDataShard::TEvEraseRowsResponse& response) {
     // NOTE: this function is never called, because erase is allowed when out of space
     response.SetStatus(NKikimrTxDataShard::TEvEraseRowsResponse::WRONG_SHARD_STATE);
@@ -146,6 +150,10 @@ static void ExecError(NKikimrTxDataShard::TEvEraseRowsResponse& response) {
     response.SetStatus(NKikimrTxDataShard::TEvEraseRowsResponse::EXEC_ERROR);
 }
 
+static void Overloaded(NKikimrTxDataShard::TEvEraseRowsResponse& response) {
+    response.SetStatus(NKikimrTxDataShard::TEvEraseRowsResponse::SHARD_OVERLOADED);
+}
+
 template <typename TEvResponse>
 using TSetStatusFunc = void(*)(typename TEvResponse::ProtoRecordType&);
 
@@ -168,41 +176,32 @@ template <typename TEvResponse, typename TEvRequest>
 static bool MaybeReject(TDataShard* self, TEvRequest& ev, const TActorContext& ctx, const TString& txDesc, bool isWrite) {
     NKikimrTxDataShard::TEvProposeTransactionResult::EStatus rejectStatus;
     TString rejectReason;
-    bool reject = self->CheckDataTxReject(txDesc, ctx, rejectStatus, rejectReason);
-    bool outOfSpace = false;
-    bool isDiskSpaceExhausted = false;
+    if (self->CheckDataTxReject(txDesc, ctx, rejectStatus, rejectReason)) {
+        Reject<TEvResponse, TEvRequest>(self, ev, txDesc, rejectReason, &WrongShardState, ctx);
+        return true;
+    }
 
+    if (self->CheckChangesQueueOverflow()) {
+        rejectReason = TStringBuilder() << "Change queue overflow at tablet " << self->TabletID();
+        Reject<TEvResponse, TEvRequest>(self, ev, txDesc, rejectReason, &Overloaded, ctx);
+        return true;
+    }
 
-    if (!reject && isWrite) {
+    if (isWrite) {
         if (self->IsAnyChannelYellowStop()) {
-            reject = true;
-            outOfSpace = true;
-            rejectReason = TStringBuilder() << "Cannot perform writes: out of disk space at tablet " << self->TabletID();
             self->IncCounter(COUNTER_PREPARE_OUT_OF_SPACE);
+            rejectReason = TStringBuilder() << "Cannot perform writes: out of disk space at tablet " << self->TabletID();
+            Reject<TEvResponse, TEvRequest>(self, ev, txDesc, rejectReason, &OutOfSpace, ctx);
+            return true;
         } else if (self->IsSubDomainOutOfSpace()) {
-            reject = true;
-            outOfSpace = true;
-            rejectReason = "Cannot perform writes: database is out of disk space";
-            isDiskSpaceExhausted = true;
             self->IncCounter(COUNTER_PREPARE_OUT_OF_SPACE);
-        }
-    }
-
-    if (!reject) {
-        return false;
-    }
-
-    if (outOfSpace) {
-        if (isDiskSpaceExhausted) {
+            rejectReason = "Cannot perform writes: database is out of disk space";
             Reject<TEvResponse, TEvRequest>(self, ev, txDesc, rejectReason, &DiskSpaceExhausted, ctx);
-        } else {
-            Reject<TEvResponse, TEvRequest>(self, ev, txDesc, rejectReason, &OutOfSpace, ctx);
+            return true;
         }
-    } else {
-        Reject<TEvResponse, TEvRequest>(self, ev, txDesc, rejectReason, &WrongShardState, ctx);
     }
 
-    return true;
+    return false;
 }
 
 void TDataShard::Handle(TEvDataShard::TEvUploadRowsRequest::TPtr& ev, const TActorContext& ctx) {

+ 39 - 0
ydb/core/tx/datashard/datashard_ut_upload_rows.cpp

@@ -744,6 +744,45 @@ Y_UNIT_TEST_SUITE(TTxDataShardUploadRows) {
         DoUploadTestRows(server, sender, "/Root/table-1", Ydb::Type::UINT32, Ydb::StatusIds::GENERIC_ERROR);
     }
 
+    Y_UNIT_TEST(ShouldRejectOnChangeQueueOverflow) {
+        TPortManager pm;
+        TServerSettings serverSettings(pm.GetPort(2134));
+        serverSettings.SetDomainName("Root")
+            .SetUseRealThreads(false)
+            .SetChangesQueueItemsLimit(1);
+
+        TServer::TPtr server = new TServer(serverSettings);
+        auto &runtime = *server->GetRuntime();
+        auto sender = runtime.AllocateEdgeActor();
+
+        runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_DEBUG);
+        runtime.SetLogPriority(NKikimrServices::CHANGE_EXCHANGE, NLog::PRI_DEBUG);
+
+        InitRoot(server, sender);
+        CreateShardedTable(server, sender, "/Root", "table-1", TShardedTableOptions()
+            .Columns({
+                {"key", "Uint32", true, false},
+                {"value", "Uint32", false, false},
+            })
+            .Indexes({
+                TShardedTableOptions::TIndex{
+                    "by_value", {"value"}, {}, NKikimrSchemeOp::EIndexTypeGlobalAsync
+                }
+            })
+        );
+
+        runtime.SetObserverFunc([&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& ev) {
+            if (ev->GetTypeRewrite() == NDataShard::TEvChangeExchange::EvEnqueueRecords) {
+                return TTestActorRuntime::EEventAction::DROP;
+            }
+
+            return TTestActorRuntime::EEventAction::PROCESS;
+        });
+
+        DoUploadTestRows(server, sender, "/Root/table-1", Ydb::Type::UINT32, Ydb::StatusIds::SUCCESS);
+        DoUploadTestRows(server, sender, "/Root/table-1", Ydb::Type::UINT32, Ydb::StatusIds::GENERIC_ERROR);
+    }
+
 }
 
 } // namespace NKikimr