|
@@ -2,6 +2,7 @@
|
|
|
#include "datashard_distributed_erase.h"
|
|
|
#include "datashard_impl.h"
|
|
|
#include "datashard_pipeline.h"
|
|
|
+#include "datashard_user_db.h"
|
|
|
#include "erase_rows_condition.h"
|
|
|
#include "execution_unit_ctors.h"
|
|
|
|
|
@@ -75,8 +76,8 @@ public:
|
|
|
{
|
|
|
}
|
|
|
|
|
|
- bool IsReadyToExecute(TOperation::TPtr) const override {
|
|
|
- return true;
|
|
|
+ bool IsReadyToExecute(TOperation::TPtr op) const override {
|
|
|
+ return !op->HasRuntimeConflicts();
|
|
|
}
|
|
|
|
|
|
EExecutionStatus Execute(TOperation::TPtr op, TTransactionContext& txc, const TActorContext&) override {
|
|
@@ -92,6 +93,7 @@ public:
|
|
|
|
|
|
const auto& request = eraseTx->GetRequest();
|
|
|
const ui64 tableId = request.GetTableId();
|
|
|
+ const TTableId fullTableId(DataShard.GetPathOwnerId(), tableId);
|
|
|
|
|
|
Y_VERIFY(DataShard.GetUserTables().contains(tableId));
|
|
|
const TUserTable& tableInfo = *DataShard.GetUserTables().at(tableId);
|
|
@@ -102,6 +104,7 @@ public:
|
|
|
|
|
|
const auto tags = MakeTags(condition->Tags(), eraseTx->GetIndexColumnIds());
|
|
|
auto readVersion = DataShard.GetReadWriteVersions(tx).ReadVersion;
|
|
|
+ TDataShardUserDb userDb(DataShard, txc.DB, readVersion);
|
|
|
bool pageFault = false;
|
|
|
|
|
|
TDynBitMap confirmedRows;
|
|
@@ -136,7 +139,7 @@ public:
|
|
|
}
|
|
|
|
|
|
NTable::TRowState row;
|
|
|
- const auto ready = txc.DB.Select(tableInfo.LocalTid, key, tags, row, 0, readVersion);
|
|
|
+ const auto ready = userDb.SelectRow(fullTableId, key, tags, row);
|
|
|
|
|
|
if (pageFault) {
|
|
|
continue;
|
|
@@ -159,6 +162,17 @@ public:
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ if (!userDb.GetVolatileReadDependencies().empty()) {
|
|
|
+ for (ui64 txId : userDb.GetVolatileReadDependencies()) {
|
|
|
+ op->AddVolatileDependency(txId);
|
|
|
+ bool ok = DataShard.GetVolatileTxManager().AttachBlockedOperation(txId, op->GetTxId());
|
|
|
+ Y_VERIFY_S(ok, "Unexpected failure to attach " << *op << " to volatile tx " << txId);
|
|
|
+ }
|
|
|
+ Y_VERIFY(!txc.DB.HasChanges(),
|
|
|
+ "Unexpected database changes while building distributed erase outgoing readsets");
|
|
|
+ return EExecutionStatus::Continue;
|
|
|
+ }
|
|
|
+
|
|
|
if (pageFault) {
|
|
|
return EExecutionStatus::Restart;
|
|
|
}
|