Browse Source

EvWrite OverloadSubscribe (#2341)

azevaykin 1 year ago
parent
commit
c30bbc485e

+ 6 - 2
ydb/core/tx/datashard/check_write_unit.cpp

@@ -64,9 +64,11 @@ EExecutionStatus TCheckWriteUnit::Execute(TOperation::TPtr op,
 
         DataShard.IncCounter(COUNTER_WRITE_OUT_OF_SPACE);
 
-        writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_INTERNAL_ERROR, err);
+        writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_OVERLOADED, err);
         op->Abort(EExecutionUnitKind::FinishProposeWrite);
 
+        DataShard.SetOverloadSubscribed(writeOp->GetWriteTx()->GetOverloadSubscribe(), writeOp->GetRecipient(), op->GetTarget(), ERejectReasons::YellowChannels, writeOp->GetWriteResult()->Record);
+
         LOG_LOG_S_THROTTLE(DataShard.GetLogThrottler(TDataShard::ELogThrottlerType::CheckWriteUnit_Execute), ctx, NActors::NLog::PRI_ERROR, NKikimrServices::TX_DATASHARD, err);
 
         return EExecutionStatus::Executed;
@@ -88,9 +90,11 @@ EExecutionStatus TCheckWriteUnit::Execute(TOperation::TPtr op,
 
                             DataShard.IncCounter(COUNTER_WRITE_OUT_OF_SPACE);
 
-                            writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_INTERNAL_ERROR, err);
+                            writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_OVERLOADED, err);
                             op->Abort(EExecutionUnitKind::FinishProposeWrite);
 
+                            DataShard.SetOverloadSubscribed(writeOp->GetWriteTx()->GetOverloadSubscribe(), writeOp->GetRecipient(), op->GetTarget(), ERejectReasons::YellowChannels, writeOp->GetWriteResult()->Record);
+
                             LOG_LOG_S_THROTTLE(DataShard.GetLogThrottler(TDataShard::ELogThrottlerType::CheckWriteUnit_Execute), ctx, NActors::NLog::PRI_ERROR, NKikimrServices::TX_DATASHARD, err);
 
                             return EExecutionStatus::Executed;

+ 5 - 0
ydb/core/tx/datashard/datashard.cpp

@@ -2742,6 +2742,11 @@ bool TDataShard::CheckDataTxRejectAndReply(const NEvents::TDataEvents::TEvWrite:
 
         LOG_NOTICE_S(ctx, NKikimrServices::TX_DATASHARD, rejectDescription);
 
+        if (status == NKikimrDataEvents::TEvWriteResult::STATUS_OVERLOADED) {
+            std::optional<ui64> overloadSubscribe = ev->Get()->Record.HasOverloadSubscribe() ? ev->Get()->Record.GetOverloadSubscribe() : std::optional<ui64>{};
+            SetOverloadSubscribed(overloadSubscribe, ev->Recipient, ev->Sender, rejectReasons, result->Record);
+        }
+
         ctx.Send(ev->Sender, result.release());
         IncCounter(COUNTER_WRITE_OVERLOADED);
         IncCounter(COUNTER_WRITE_COMPLETE);

+ 2 - 14
ydb/core/tx/datashard/datashard__op_rows.cpp

@@ -172,20 +172,8 @@ static void Reject(TDataShard* self, TEvRequest& ev, const TString& txDesc,
     response->Record.SetTabletID(self->TabletID());
     response->Record.SetErrorDescription(rejectDescription);
 
-    if (ev->Get()->Record.HasOverloadSubscribe() && self->HasPipeServer(ev->Recipient)) {
-        ui64 seqNo = ev->Get()->Record.GetOverloadSubscribe();
-        auto allowed = (
-            ERejectReasons::OverloadByProbability |
-            ERejectReasons::YellowChannels |
-            ERejectReasons::ChangesQueueOverflow);
-        if ((rejectReasons & allowed) != ERejectReasons::None &&
-            (rejectReasons - allowed) == ERejectReasons::None)
-        {
-            if (self->AddOverloadSubscriber(ev->Recipient, ev->Sender, seqNo, rejectReasons)) {
-                response->Record.SetOverloadSubscribed(seqNo);
-            }
-        }
-    }
+    std::optional<ui64> overloadSubscribe = ev->Get()->Record.HasOverloadSubscribe() ? ev->Get()->Record.GetOverloadSubscribe() : std::optional<ui64>{};
+    self->SetOverloadSubscribed(overloadSubscribe, ev->Recipient, ev->Sender, rejectReasons, response->Record);
 
     ctx.Send(ev->Sender, std::move(response));
 }

+ 3 - 0
ydb/core/tx/datashard/datashard__write.cpp

@@ -289,6 +289,9 @@ NKikimrDataEvents::TEvWriteResult::EStatus NEvWrite::TConvertor::ConvertErrCode(
             return NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST;
         case NKikimrTxDataShard::TError_EKind_SCHEME_CHANGED:
             return NKikimrDataEvents::TEvWriteResult::STATUS_SCHEME_CHANGED;
+        case NKikimrTxDataShard::TError_EKind_OUT_OF_SPACE:
+        case NKikimrTxDataShard::TError_EKind_DISK_SPACE_EXHAUSTED:
+            return NKikimrDataEvents::TEvWriteResult::STATUS_OVERLOADED;
         default:
             return NKikimrDataEvents::TEvWriteResult::STATUS_INTERNAL_ERROR;
     }

+ 15 - 0
ydb/core/tx/datashard/datashard_impl.h

@@ -1711,6 +1711,21 @@ public:
     void NotifyOverloadSubscribers(ERejectReason reason);
     void NotifyAllOverloadSubscribers();
 
+    template <typename TResponseRecord>
+    void SetOverloadSubscribed(const std::optional<ui64>& overloadSubscribe, const TActorId& recipient, const TActorId& sender, const ERejectReasons rejectReasons, TResponseRecord& responseRecord) {
+        if (overloadSubscribe && HasPipeServer(recipient)) {
+            ui64 seqNo = overloadSubscribe.value();
+            auto allowed = (ERejectReasons::OverloadByProbability | ERejectReasons::YellowChannels | ERejectReasons::ChangesQueueOverflow);
+            if ((rejectReasons & allowed) != ERejectReasons::None &&
+                (rejectReasons - allowed) == ERejectReasons::None)
+            {
+                if (AddOverloadSubscriber(recipient, sender, seqNo, rejectReasons)) {
+                    responseRecord.SetOverloadSubscribed(seqNo);
+                }
+            }
+        }
+    }
+
     bool HasSharedBlobs() const;
     void CheckInitiateBorrowedPartsReturn(const TActorContext& ctx);
     void CheckStateChange(const TActorContext& ctx);

+ 13 - 2
ydb/core/tx/datashard/datashard_ut_write.cpp

@@ -322,7 +322,7 @@ Y_UNIT_TEST_SUITE(DataShardWrite) {
 
     }
 
-    Y_UNIT_TEST(ShouldRejectOnChangeQueueOverflow) {
+    Y_UNIT_TEST(RejectOnChangeQueueOverflow) {
         TPortManager pm;
         TServerSettings serverSettings(pm.GetPort(2134));
         serverSettings.SetDomainName("Root").SetUseRealThreads(false).SetChangesQueueItemsLimit(1);
@@ -362,7 +362,18 @@ Y_UNIT_TEST_SUITE(DataShardWrite) {
             Write(runtime, sender, shard, tableId, opts.Columns_, rowCount, ++txId, NKikimrDataEvents::TEvWrite::MODE_IMMEDIATE, NKikimrDataEvents::TEvWriteResult::STATUS_OVERLOADED);
         }
 
-    } // Y_UNIT_TEST
+        Cout << "========= Send immediate write + OverloadSubscribe, expecting overloaded =========\n";
+        {
+            ui64 secNo = 55;
+
+            auto request = MakeWriteRequest(++txId, NKikimrDataEvents::TEvWrite::MODE_IMMEDIATE, tableId, opts.Columns_, rowCount);
+            request->Record.SetOverloadSubscribe(secNo);
+           
+            auto writeResult = Write(runtime, sender, shard, std::move(request), NKikimrDataEvents::TEvWriteResult::STATUS_OVERLOADED);
+            UNIT_ASSERT_VALUES_EQUAL(writeResult.GetOverloadSubscribed(), secNo);
+        }
+
+    }  // Y_UNIT_TEST
 
 } // Y_UNIT_TEST_SUITE
 } // namespace NKikimr

+ 3 - 0
ydb/core/tx/datashard/datashard_write_operation.cpp

@@ -55,6 +55,8 @@ TValidatedWriteTx::TValidatedWriteTx(TDataShard* self, ui64 globalTxId, TInstant
         LockNodeId = record.GetLockNodeId();
     }
 
+    OverloadSubscribe = record.HasOverloadSubscribe() ? record.GetOverloadSubscribe() : std::optional<ui64>{};
+
     NKikimrTxDataShard::TKqpTransaction::TDataTaskMeta meta;
 
     LOG_T("Parsing write transaction for " << globalTxId << " at " << TabletId << ", record: " << record.ShortDebugString());
@@ -307,6 +309,7 @@ TWriteOperation::TWriteOperation(const TBasicOpInfo& op, ui64 tabletId)
 TWriteOperation::TWriteOperation(const TBasicOpInfo& op, NEvents::TDataEvents::TEvWrite::TPtr&& ev, TDataShard* self)
     : TWriteOperation(op, self->TabletID())
 {
+    Recipient = ev->Recipient;
     SetTarget(ev->Sender);
     SetCookie(ev->Cookie);
 

+ 2 - 0
ydb/core/tx/datashard/datashard_write_operation.h

@@ -120,6 +120,7 @@ private:
     YDB_READONLY_DEF(std::vector<ui32>, ColumnIds);
     YDB_READONLY_DEF(TSerializedCellMatrix, Matrix);
     YDB_READONLY_DEF(TInstant, ReceivedAt);
+    YDB_READONLY_DEF(std::optional<ui64>, OverloadSubscribe);
 
     YDB_READONLY_DEF(ui64, TxSize);
 
@@ -274,6 +275,7 @@ private:
 
     const ui64 TabletId;
 
+    YDB_READONLY_DEF(TActorId, Recipient);
     YDB_READONLY_DEF(ui64, ArtifactFlags);
     YDB_ACCESSOR_DEF(ui64, TxCacheUsage);
     YDB_ACCESSOR_DEF(ui64, ReleasedTxDataSize);

+ 1 - 1
ydb/core/tx/datashard/execute_write_unit.cpp

@@ -79,7 +79,7 @@ public:
         const TTableId fullTableId(DataShard.GetPathOwnerId(), tableId);
         const ui64 localTableId = DataShard.GetLocalTableId(fullTableId);
         if (localTableId == 0) {
-            writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_INTERNAL_ERROR, TStringBuilder() << "Unknown table id " << tableId);
+            writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST, TStringBuilder() << "Unknown table id " << tableId);
             return;
         }
         const ui64 shadowTableId = DataShard.GetShadowTableId(fullTableId);

+ 2 - 0
ydb/core/tx/datashard/execution_unit.cpp

@@ -248,6 +248,8 @@ bool TExecutionUnit::CheckRejectDataTx(TOperation::TPtr op, const TActorContext&
 
         if (writeOp) {
             writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_OVERLOADED, err);
+
+            DataShard.SetOverloadSubscribed(writeOp->GetWriteTx()->GetOverloadSubscribe(), writeOp->GetRecipient(), op->GetTarget(), ERejectReasons::ChangesQueueOverflow, writeOp->GetWriteResult()->Record);
         } else {                
             BuildResult(op, NKikimrTxDataShard::TEvProposeTransactionResult::OVERLOADED)
                     ->AddError(NKikimrTxDataShard::TError::SHARD_IS_BLOCKED, err);