Browse Source

DataShard EvWrite Immediate

azevaykin 1 year ago
parent
commit
fc639245d6

+ 6 - 0
.mapping.json

@@ -6736,6 +6736,12 @@
   "ydb/core/tx/datashard/ut_volatile/CMakeLists.linux-x86_64.txt":"",
   "ydb/core/tx/datashard/ut_volatile/CMakeLists.txt":"",
   "ydb/core/tx/datashard/ut_volatile/CMakeLists.windows-x86_64.txt":"",
+  "ydb/core/tx/datashard/ut_write/CMakeLists.darwin-arm64.txt":"",
+  "ydb/core/tx/datashard/ut_write/CMakeLists.darwin-x86_64.txt":"",
+  "ydb/core/tx/datashard/ut_write/CMakeLists.linux-aarch64.txt":"",
+  "ydb/core/tx/datashard/ut_write/CMakeLists.linux-x86_64.txt":"",
+  "ydb/core/tx/datashard/ut_write/CMakeLists.txt":"",
+  "ydb/core/tx/datashard/ut_write/CMakeLists.windows-x86_64.txt":"",
   "ydb/core/tx/long_tx_service/CMakeLists.darwin-arm64.txt":"",
   "ydb/core/tx/long_tx_service/CMakeLists.darwin-x86_64.txt":"",
   "ydb/core/tx/long_tx_service/CMakeLists.linux-aarch64.txt":"",

+ 1 - 0
ydb/core/protos/counters_datashard.proto

@@ -466,4 +466,5 @@ enum ETxTypes {
     TXTYPE_CDC_STREAM_EMIT_HEARTBEATS = 79                [(TxTypeOpts) = {Name: "TTxCdcStreamEmitHeartbeats"}];
     TXTYPE_CLEANUP_VOLATILE = 80                          [(TxTypeOpts) = {Name: "TxCleanupVolatile"}];
     TXTYPE_PLAN_PREDICTED_TXS = 81                        [(TxTypeOpts) = {Name: "TxPlanPredictedTxs"}];
+    TXTYPE_WRITE = 82                                     [(TxTypeOpts) = {Name: "TxWrite"}];
 }

+ 5 - 0
ydb/core/tx/datashard/CMakeLists.darwin-arm64.txt

@@ -36,6 +36,7 @@ add_subdirectory(ut_snapshot)
 add_subdirectory(ut_stats)
 add_subdirectory(ut_upload_rows)
 add_subdirectory(ut_volatile)
+add_subdirectory(ut_write)
 get_built_tool_path(
   TOOL_enum_parser_bin
   TOOL_enum_parser_dependency
@@ -180,6 +181,7 @@ target_sources(core-tx-datashard PRIVATE
   ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/check_read_unit.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/check_scheme_tx_unit.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/check_snapshot_tx_unit.cpp
+  ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/check_write_unit.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/complete_data_tx_unit.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/completed_operations_unit.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/conflicts_cache.cpp
@@ -216,6 +218,7 @@ target_sources(core-tx-datashard PRIVATE
   ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard__stats.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard__store_table_path.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard__store_scan_state.cpp
+  ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard__write.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_change_receiving.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_change_sender_activation.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_change_sending.cpp
@@ -257,6 +260,7 @@ target_sources(core-tx-datashard PRIVATE
   ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_repl_offsets_client.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_repl_offsets_server.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_subdomain_path_id.cpp
+  ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_write_operation.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/direct_tx_unit.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/drop_cdc_stream_unit.cpp
@@ -313,6 +317,7 @@ target_sources(core-tx-datashard PRIVATE
   ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/volatile_tx.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/wait_for_plan_unit.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/wait_for_stream_clearance_unit.cpp
+  ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/write_unit.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/upload_stats.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/export_s3_buffer_raw.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/export_s3_buffer_zstd.cpp

+ 5 - 0
ydb/core/tx/datashard/CMakeLists.darwin-x86_64.txt

@@ -36,6 +36,7 @@ add_subdirectory(ut_snapshot)
 add_subdirectory(ut_stats)
 add_subdirectory(ut_upload_rows)
 add_subdirectory(ut_volatile)
+add_subdirectory(ut_write)
 get_built_tool_path(
   TOOL_enum_parser_bin
   TOOL_enum_parser_dependency
@@ -180,6 +181,7 @@ target_sources(core-tx-datashard PRIVATE
   ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/check_read_unit.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/check_scheme_tx_unit.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/check_snapshot_tx_unit.cpp
+  ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/check_write_unit.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/complete_data_tx_unit.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/completed_operations_unit.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/conflicts_cache.cpp
@@ -216,6 +218,7 @@ target_sources(core-tx-datashard PRIVATE
   ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard__stats.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard__store_table_path.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard__store_scan_state.cpp
+  ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard__write.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_change_receiving.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_change_sender_activation.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_change_sending.cpp
@@ -257,6 +260,7 @@ target_sources(core-tx-datashard PRIVATE
   ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_repl_offsets_client.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_repl_offsets_server.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_subdomain_path_id.cpp
+  ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_write_operation.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/direct_tx_unit.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/drop_cdc_stream_unit.cpp
@@ -313,6 +317,7 @@ target_sources(core-tx-datashard PRIVATE
   ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/volatile_tx.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/wait_for_plan_unit.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/wait_for_stream_clearance_unit.cpp
+  ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/write_unit.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/upload_stats.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/export_s3_buffer_raw.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/export_s3_buffer_zstd.cpp

+ 5 - 0
ydb/core/tx/datashard/CMakeLists.linux-aarch64.txt

@@ -36,6 +36,7 @@ add_subdirectory(ut_snapshot)
 add_subdirectory(ut_stats)
 add_subdirectory(ut_upload_rows)
 add_subdirectory(ut_volatile)
+add_subdirectory(ut_write)
 get_built_tool_path(
   TOOL_enum_parser_bin
   TOOL_enum_parser_dependency
@@ -181,6 +182,7 @@ target_sources(core-tx-datashard PRIVATE
   ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/check_read_unit.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/check_scheme_tx_unit.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/check_snapshot_tx_unit.cpp
+  ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/check_write_unit.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/complete_data_tx_unit.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/completed_operations_unit.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/conflicts_cache.cpp
@@ -217,6 +219,7 @@ target_sources(core-tx-datashard PRIVATE
   ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard__stats.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard__store_table_path.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard__store_scan_state.cpp
+  ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard__write.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_change_receiving.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_change_sender_activation.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_change_sending.cpp
@@ -258,6 +261,7 @@ target_sources(core-tx-datashard PRIVATE
   ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_repl_offsets_client.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_repl_offsets_server.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_subdomain_path_id.cpp
+  ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_write_operation.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/direct_tx_unit.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/drop_cdc_stream_unit.cpp
@@ -314,6 +318,7 @@ target_sources(core-tx-datashard PRIVATE
   ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/volatile_tx.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/wait_for_plan_unit.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/wait_for_stream_clearance_unit.cpp
+  ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/write_unit.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/upload_stats.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/export_s3_buffer_raw.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/export_s3_buffer_zstd.cpp

+ 5 - 0
ydb/core/tx/datashard/CMakeLists.linux-x86_64.txt

@@ -36,6 +36,7 @@ add_subdirectory(ut_snapshot)
 add_subdirectory(ut_stats)
 add_subdirectory(ut_upload_rows)
 add_subdirectory(ut_volatile)
+add_subdirectory(ut_write)
 get_built_tool_path(
   TOOL_enum_parser_bin
   TOOL_enum_parser_dependency
@@ -181,6 +182,7 @@ target_sources(core-tx-datashard PRIVATE
   ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/check_read_unit.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/check_scheme_tx_unit.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/check_snapshot_tx_unit.cpp
+  ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/check_write_unit.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/complete_data_tx_unit.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/completed_operations_unit.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/conflicts_cache.cpp
@@ -217,6 +219,7 @@ target_sources(core-tx-datashard PRIVATE
   ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard__stats.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard__store_table_path.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard__store_scan_state.cpp
+  ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard__write.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_change_receiving.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_change_sender_activation.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_change_sending.cpp
@@ -258,6 +261,7 @@ target_sources(core-tx-datashard PRIVATE
   ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_repl_offsets_client.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_repl_offsets_server.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_subdomain_path_id.cpp
+  ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_write_operation.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/direct_tx_unit.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/drop_cdc_stream_unit.cpp
@@ -314,6 +318,7 @@ target_sources(core-tx-datashard PRIVATE
   ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/volatile_tx.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/wait_for_plan_unit.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/wait_for_stream_clearance_unit.cpp
+  ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/write_unit.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/upload_stats.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/export_s3_buffer_raw.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/export_s3_buffer_zstd.cpp

+ 5 - 0
ydb/core/tx/datashard/CMakeLists.windows-x86_64.txt

@@ -36,6 +36,7 @@ add_subdirectory(ut_snapshot)
 add_subdirectory(ut_stats)
 add_subdirectory(ut_upload_rows)
 add_subdirectory(ut_volatile)
+add_subdirectory(ut_write)
 get_built_tool_path(
   TOOL_enum_parser_bin
   TOOL_enum_parser_dependency
@@ -181,6 +182,7 @@ target_sources(core-tx-datashard PRIVATE
   ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/check_read_unit.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/check_scheme_tx_unit.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/check_snapshot_tx_unit.cpp
+  ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/check_write_unit.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/complete_data_tx_unit.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/completed_operations_unit.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/conflicts_cache.cpp
@@ -217,6 +219,7 @@ target_sources(core-tx-datashard PRIVATE
   ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard__stats.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard__store_table_path.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard__store_scan_state.cpp
+  ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard__write.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_change_receiving.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_change_sender_activation.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_change_sending.cpp
@@ -258,6 +261,7 @@ target_sources(core-tx-datashard PRIVATE
   ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_repl_offsets_client.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_repl_offsets_server.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_subdomain_path_id.cpp
+  ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_write_operation.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/direct_tx_unit.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/drop_cdc_stream_unit.cpp
@@ -314,6 +318,7 @@ target_sources(core-tx-datashard PRIVATE
   ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/volatile_tx.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/wait_for_plan_unit.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/wait_for_stream_clearance_unit.cpp
+  ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/write_unit.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/upload_stats.cpp
 )
 generate_enum_serilization(core-tx-datashard

+ 337 - 0
ydb/core/tx/datashard/check_write_unit.cpp

@@ -0,0 +1,337 @@
+#include "datashard_impl.h"
+#include "datashard_pipeline.h"
+#include "execution_unit_ctors.h"
+
+#include <ydb/core/tablet/tablet_exception.h>
+
+namespace NKikimr {
+namespace NDataShard {
+
+class TCheckWriteUnit: public TExecutionUnit {
+public:
+    TCheckWriteUnit(TDataShard &dataShard, TPipeline &pipeline);
+    ~TCheckWriteUnit() override;
+
+    bool IsReadyToExecute(TOperation::TPtr op) const override;
+    EExecutionStatus Execute(TOperation::TPtr op,
+                             TTransactionContext &txc,
+                             const TActorContext &ctx) override;
+    void Complete(TOperation::TPtr op,
+                  const TActorContext &ctx) override;
+
+private:
+};
+
+TCheckWriteUnit::TCheckWriteUnit(TDataShard &dataShard,
+                                   TPipeline &pipeline)
+    : TExecutionUnit(EExecutionUnitKind::CheckDataTx, false, dataShard, pipeline)
+{
+}
+
+TCheckWriteUnit::~TCheckWriteUnit()
+{
+}
+
+bool TCheckWriteUnit::IsReadyToExecute(TOperation::TPtr) const
+{
+    return true;
+}
+
+EExecutionStatus TCheckWriteUnit::Execute(TOperation::TPtr op,
+                                           TTransactionContext &,
+                                           const TActorContext &ctx)
+{
+    Y_ABORT_UNLESS(op->IsDataTx() || op->IsReadTable());
+    Y_ABORT_UNLESS(!op->IsAborted());
+
+    if (CheckRejectDataTx(op, ctx)) {
+        op->Abort(EExecutionUnitKind::FinishPropose);
+
+        return EExecutionStatus::Executed;
+    }
+
+    //TODO: remove this return
+    return EExecutionStatus::Executed;
+
+    TActiveTransaction *tx = dynamic_cast<TActiveTransaction*>(op.Get());
+    Y_VERIFY_S(tx, "cannot cast operation of kind " << op->GetKind());
+    auto dataTx = tx->GetDataTx();
+    Y_ABORT_UNLESS(dataTx);
+    Y_ABORT_UNLESS(dataTx->Ready() || dataTx->RequirePrepare());
+
+    if (dataTx->Ready()) {
+        DataShard.IncCounter(COUNTER_MINIKQL_PROGRAM_SIZE, dataTx->ProgramSize());
+    } else {
+        Y_ABORT_UNLESS(dataTx->RequirePrepare());
+        LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD,
+                    "Require prepare Tx " << op->GetTxId() <<  " at " << DataShard.TabletID()
+                    << ": " << dataTx->GetErrors());
+    }
+
+    // Check if we are out of space and tx wants to update user
+    // or system table.
+    if (DataShard.IsAnyChannelYellowStop()
+        && (dataTx->HasWrites() || !op->IsImmediate())) {
+        TString err = TStringBuilder()
+            << "Cannot perform transaction: out of disk space at tablet "
+            << DataShard.TabletID() << " txId " << op->GetTxId();
+
+        DataShard.IncCounter(COUNTER_PREPARE_OUT_OF_SPACE);
+
+        BuildResult(op)->AddError(NKikimrTxDataShard::TError::OUT_OF_SPACE, err);
+        op->Abort(EExecutionUnitKind::FinishPropose);
+
+        LOG_LOG_S_THROTTLE(DataShard.GetLogThrottler(TDataShard::ELogThrottlerType::CheckDataTxUnit_Execute), ctx, NActors::NLog::PRI_ERROR, NKikimrServices::TX_DATASHARD, err);
+
+        return EExecutionStatus::Executed;
+    }
+
+    if (tx->IsMvccSnapshotRead()) {
+        auto snapshot = tx->GetMvccSnapshot();
+        if (DataShard.IsFollower()) {
+            TString err = TStringBuilder()
+                << "Operation " << *op << " cannot read from snapshot " << snapshot
+                << " using data tx on a follower " << DataShard.TabletID();
+
+            BuildResult(op, NKikimrTxDataShard::TEvProposeTransactionResult::BAD_REQUEST)
+                ->AddError(NKikimrTxDataShard::TError::BAD_ARGUMENT, err);
+            op->Abort(EExecutionUnitKind::FinishPropose);
+
+            LOG_ERROR_S(ctx, NKikimrServices::TX_DATASHARD, err);
+
+            return EExecutionStatus::Executed;
+        } else if (!DataShard.IsMvccEnabled()) {
+            TString err = TStringBuilder()
+                << "Operation " << *op << " reads from snapshot " << snapshot
+                << " with MVCC feature disabled at " << DataShard.TabletID();
+
+            BuildResult(op, NKikimrTxDataShard::TEvProposeTransactionResult::BAD_REQUEST)
+                ->AddError(NKikimrTxDataShard::TError::BAD_ARGUMENT, err);
+            op->Abort(EExecutionUnitKind::FinishPropose);
+
+            LOG_ERROR_S(ctx, NKikimrServices::TX_DATASHARD, err);
+
+            return EExecutionStatus::Executed;
+        } else if (snapshot < DataShard.GetSnapshotManager().GetLowWatermark()) {
+            TString err = TStringBuilder()
+                << "Operation " << *op << " reads from stale snapshot " << snapshot
+                << " at " << DataShard.TabletID();
+
+            BuildResult(op, NKikimrTxDataShard::TEvProposeTransactionResult::BAD_REQUEST)
+                ->AddError(NKikimrTxDataShard::TError::SNAPSHOT_NOT_EXIST, err);
+            op->Abort(EExecutionUnitKind::FinishPropose);
+
+            LOG_ERROR_S(ctx, NKikimrServices::TX_DATASHARD, err);
+
+            ;
+        }
+    }
+
+    TEngineBay::TSizes txReads;
+
+    if (op->IsDataTx()) {
+        bool hasTotalKeysSizeLimit = !!dataTx->PerShardKeysSizeLimitBytes();
+        txReads = dataTx->CalcReadSizes(hasTotalKeysSizeLimit);
+
+        if (txReads.ReadSize > DataShard.GetTxReadSizeLimit()) {
+            TString err = TStringBuilder()
+                << "Transaction read size " << txReads.ReadSize << " exceeds limit "
+                << DataShard.GetTxReadSizeLimit() << " at tablet " << DataShard.TabletID()
+                << " txId " << op->GetTxId();
+
+            BuildResult(op, NKikimrTxDataShard::TEvProposeTransactionResult::BAD_REQUEST)
+                ->AddError(NKikimrTxDataShard::TError::READ_SIZE_EXECEEDED, err);
+            op->Abort(EExecutionUnitKind::FinishPropose);
+
+            LOG_ERROR_S(ctx, NKikimrServices::TX_DATASHARD, err);
+
+            return EExecutionStatus::Executed;
+        }
+
+        if (hasTotalKeysSizeLimit
+            && txReads.TotalKeysSize > *dataTx->PerShardKeysSizeLimitBytes()) {
+            TString err = TStringBuilder()
+                << "Transaction total keys size " << txReads.TotalKeysSize
+                << " exceeds limit " << *dataTx->PerShardKeysSizeLimitBytes()
+                << " at tablet " << DataShard.TabletID() << " txId " << op->GetTxId();
+
+            BuildResult(op, NKikimrTxDataShard::TEvProposeTransactionResult::BAD_REQUEST)
+                ->AddError(NKikimrTxDataShard::TError::READ_SIZE_EXECEEDED, err);
+            op->Abort(EExecutionUnitKind::FinishPropose);
+
+            LOG_ERROR_S(ctx, NKikimrServices::TX_DATASHARD, err);
+
+            return EExecutionStatus::Executed;
+        }
+
+        for (const auto& key : dataTx->TxInfo().Keys) {
+            if (key.IsWrite && DataShard.IsUserTable(key.Key->TableId)) {
+                ui64 keySize = 0;
+                for (const auto& cell : key.Key->Range.From) {
+                    keySize += cell.Size();
+                }
+                if (keySize > NLimits::MaxWriteKeySize) {
+                    TString err = TStringBuilder()
+                        << "Operation " << *op << " writes key of " << keySize
+                        << " bytes which exceeds limit " << NLimits::MaxWriteKeySize
+                        << " bytes at " << DataShard.TabletID();
+
+                    BuildResult(op, NKikimrTxDataShard::TEvProposeTransactionResult::BAD_REQUEST)
+                        ->AddError(NKikimrTxDataShard::TError::BAD_ARGUMENT, err);
+                    op->Abort(EExecutionUnitKind::FinishPropose);
+
+                    LOG_ERROR_S(ctx, NKikimrServices::TX_DATASHARD, err);
+
+                    return EExecutionStatus::Executed;
+                }
+                for (const auto& col : key.Key->Columns) {
+                    if (col.Operation == TKeyDesc::EColumnOperation::Set ||
+                        col.Operation == TKeyDesc::EColumnOperation::InplaceUpdate)
+                    {
+                        if (col.ImmediateUpdateSize > NLimits::MaxWriteValueSize) {
+                            TString err = TStringBuilder()
+                                << "Transaction write column value of " << col.ImmediateUpdateSize
+                                << " bytes is larger than the allowed threshold";
+
+                            BuildResult(op, NKikimrTxDataShard::TEvProposeTransactionResult::EXEC_ERROR)->AddError(NKikimrTxDataShard::TError::BAD_ARGUMENT, err);
+                            op->Abort(EExecutionUnitKind::FinishPropose);
+
+                            LOG_ERROR_S(ctx, NKikimrServices::TX_DATASHARD, err);
+
+                            return EExecutionStatus::Executed;
+                        }
+                    }
+                }
+
+                if (DataShard.IsSubDomainOutOfSpace()) {
+                    switch (key.Key->RowOperation) {
+                        case TKeyDesc::ERowOperation::Read:
+                        case TKeyDesc::ERowOperation::Erase:
+                            // Read and erase are allowed even when we're out of disk space
+                            break;
+
+                        default: {
+                            // Updates are not allowed when database is out of space
+                            TString err = "Cannot perform writes: database is out of disk space";
+
+                            DataShard.IncCounter(COUNTER_PREPARE_OUT_OF_SPACE);
+
+                            BuildResult(op)->AddError(NKikimrTxDataShard::TError::OUT_OF_SPACE, err);
+                            op->Abort(EExecutionUnitKind::FinishPropose);
+
+                            LOG_LOG_S_THROTTLE(DataShard.GetLogThrottler(TDataShard::ELogThrottlerType::CheckDataTxUnit_Execute), ctx, NActors::NLog::PRI_ERROR, NKikimrServices::TX_DATASHARD, err);
+
+                            return EExecutionStatus::Executed;
+                        }
+                    }
+                }
+            }
+        }
+    }
+
+    if (op->IsReadTable()) {
+        const auto& record = dataTx->GetReadTableTransaction();
+        const auto& userTables = DataShard.GetUserTables();
+
+        TMaybe<TString> schemaChangedError;
+        if (auto it = userTables.find(record.GetTableId().GetTableId()); it != userTables.end()) {
+            const auto& tableInfo = *it->second;
+            for (const auto& columnRecord : record.GetColumns()) {
+                if (auto* columnInfo = tableInfo.Columns.FindPtr(columnRecord.GetId())) {
+                    // TODO: column types don't change when bound by id, but we may want to check anyway
+                } else {
+                    schemaChangedError = TStringBuilder() << "ReadTable cannot find column "
+                        << columnRecord.GetName() << " (" << columnRecord.GetId() << ")";
+                    break;
+                }
+            }
+            // TODO: validate key ranges?
+        } else {
+            schemaChangedError = TStringBuilder() << "ReadTable cannot find table "
+                << record.GetTableId().GetOwnerId() << ":" << record.GetTableId().GetTableId();
+        }
+
+        if (schemaChangedError) {
+            BuildResult(op, NKikimrTxDataShard::TEvProposeTransactionResult::ERROR)
+                ->AddError(NKikimrTxDataShard::TError::SCHEME_CHANGED, *schemaChangedError);
+            op->Abort(EExecutionUnitKind::FinishPropose);
+            return EExecutionStatus::Executed;
+        }
+
+        if (record.HasSnapshotStep() && record.HasSnapshotTxId()) {
+            if (!op->IsImmediate()) {
+                BuildResult(op, NKikimrTxDataShard::TEvProposeTransactionResult::BAD_REQUEST)->AddError(
+                    NKikimrTxDataShard::TError::BAD_ARGUMENT,
+                    "ReadTable from snapshot must be an immediate transaction");
+                op->Abort(EExecutionUnitKind::FinishPropose);
+                return EExecutionStatus::Executed;
+            }
+
+            const TSnapshotKey key(
+                record.GetTableId().GetOwnerId(),
+                record.GetTableId().GetTableId(),
+                record.GetSnapshotStep(),
+                record.GetSnapshotTxId());
+
+            if (!DataShard.GetSnapshotManager().AcquireReference(key)) {
+                // TODO: try upgrading to mvcc snapshot when available
+                BuildResult(op, NKikimrTxDataShard::TEvProposeTransactionResult::BAD_REQUEST)->AddError(
+                    NKikimrTxDataShard::TError::SNAPSHOT_NOT_EXIST,
+                    TStringBuilder()
+                        << "Shard " << DataShard.TabletID()
+                        << " has no snapshot " << key);
+                op->Abort(EExecutionUnitKind::FinishPropose);
+                return EExecutionStatus::Executed;
+            }
+
+            op->SetAcquiredSnapshotKey(key);
+            op->SetUsingSnapshotFlag();
+        }
+    }
+
+    if (!op->IsImmediate()) {
+        if (!Pipeline.AssignPlanInterval(op)) {
+            TString err = TStringBuilder()
+                << "Can't propose tx " << op->GetTxId() << " at blocked shard "
+                << DataShard.TabletID();
+            BuildResult(op)->AddError(NKikimrTxDataShard::TError::SHARD_IS_BLOCKED, err);
+            op->Abort(EExecutionUnitKind::FinishPropose);
+
+            LOG_NOTICE_S(ctx, NKikimrServices::TX_DATASHARD, err);
+
+            return EExecutionStatus::Executed;
+        }
+
+        auto &res = BuildResult(op);
+        res->SetPrepared(op->GetMinStep(), op->GetMaxStep(), op->GetReceivedAt());
+
+        if (op->IsDataTx()) {
+            res->Record.SetReadSize(txReads.ReadSize);
+            res->Record.SetReplySize(txReads.ReplySize);
+
+            for (const auto& rs : txReads.OutReadSetSize) {
+                auto entry = res->Record.AddOutgoingReadSetInfo();
+                entry->SetShardId(rs.first);
+                entry->SetSize(rs.second);
+            }
+        }
+
+        LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD,
+                    "Prepared " << op->GetKind() << " transaction txId " << op->GetTxId()
+                    << " at tablet " << DataShard.TabletID());
+    }
+
+    return EExecutionStatus::Executed;
+}
+
+void TCheckWriteUnit::Complete(TOperation::TPtr, const TActorContext &)
+{
+}
+
+THolder<TExecutionUnit> CreateCheckWriteUnit(TDataShard &dataShard, TPipeline &pipeline)
+{
+    return THolder(new TCheckWriteUnit(dataShard, pipeline));
+}
+
+} // namespace NDataShard
+} // namespace NKikimr

+ 104 - 21
ydb/core/tx/datashard/datashard.cpp

@@ -2634,6 +2634,41 @@ bool TDataShard::CheckDataTxRejectAndReply(const TEvDataShard::TEvProposeTransac
     return false;
 }
 
+bool TDataShard::CheckDataTxRejectAndReply(const NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActorContext& ctx)
+{
+    auto* msg = ev->Get();
+    TString txDescr = TStringBuilder() << "data TxId " << msg->GetTxId();
+
+    NKikimrTxDataShard::TEvProposeTransactionResult::EStatus rejectStatus;
+    ERejectReasons rejectReasons;
+    TString rejectDescription;
+    bool reject = CheckDataTxReject(txDescr, ctx, rejectStatus, rejectReasons, rejectDescription);
+
+    if (reject) {
+        LWTRACK(ProposeTransactionReject, msg->GetOrbit());
+        NKikimrDataEvents::TEvWriteResult::EStatus status;
+        switch (rejectStatus) {
+            case NKikimrTxDataShard::TEvProposeTransactionResult::OVERLOADED:
+                status = NKikimrDataEvents::TEvWriteResult::STATUS_OVERLOADED;
+                break;
+            case NKikimrTxDataShard::TEvProposeTransactionResult::ERROR:
+                status = NKikimrDataEvents::TEvWriteResult::STATUS_INTERNAL_ERROR;
+                break;
+            default:
+                Y_FAIL_S("Unexpected rejectStatus " << rejectStatus);
+        }
+        auto result = NEvents::TDataEvents::TEvWriteResult::BuildError(TabletID(), msg->GetTxId(), status, rejectDescription);
+
+        LOG_NOTICE_S(ctx, NKikimrServices::TX_DATASHARD, rejectDescription);
+
+        ctx.Send(ev->Sender, result.release());
+        IncCounter(COUNTER_PREPARE_OVERLOADED);
+        IncCounter(COUNTER_PREPARE_COMPLETE);
+        return true;
+    }
+
+    return false;
+}
 void TDataShard::UpdateProposeQueueSize() const {
     SetCounter(COUNTER_PROPOSE_QUEUE_SIZE, MediatorStateWaitingMsgs.size() + ProposeQueue.Size() + DelayedProposeQueue.size() + Pipeline.WaitingTxs());
     SetCounter(COUNTER_READ_ITERATORS_WAITING, Pipeline.WaitingReadIterators());
@@ -2771,26 +2806,38 @@ void TDataShard::CheckDelayedProposeQueue(const TActorContext &ctx) {
 
 void TDataShard::ProposeTransaction(TEvDataShard::TEvProposeTransaction::TPtr &&ev, const TActorContext &ctx) {
     auto* msg = ev->Get();
-    bool mayRunImmediate = false;
 
-    if ((msg->GetFlags() & TTxFlags::Immediate) &&
-        !(msg->GetFlags() & TTxFlags::ForceOnline) &&
-        msg->GetTxKind() == NKikimrTxDataShard::TX_KIND_DATA)
-    {
-        // This transaction may run in immediate mode
-        mayRunImmediate = true;
-    }
+    // This transaction may run in immediate mode
+    bool mayRunImmediate = (msg->GetFlags() & TTxFlags::Immediate) && !(msg->GetFlags() & TTxFlags::ForceOnline) &&
+        msg->GetTxKind() == NKikimrTxDataShard::TX_KIND_DATA;
 
     if (mayRunImmediate) {
         // Enqueue immediate transactions so they don't starve existing operations
         LWTRACK(ProposeTransactionEnqueue, msg->Orbit);
-        ProposeQueue.Enqueue(std::move(ev), TAppData::TimeProvider->Now(), NextTieBreakerIndex++, ctx);
+        ProposeQueue.Enqueue(IEventHandle::Upcast<TEvDataShard::TEvProposeTransaction>(std::move(ev)), TAppData::TimeProvider->Now(), NextTieBreakerIndex++, ctx);
         UpdateProposeQueueSize();
     } else {
         // Prepare planned transactions as soon as possible
         Execute(new TTxProposeTransactionBase(this, std::move(ev), TAppData::TimeProvider->Now(), NextTieBreakerIndex++, /* delayed */ false), ctx);
     }
 }
+void TDataShard::ProposeTransaction(NEvents::TDataEvents::TEvWrite::TPtr&& ev, const TActorContext& ctx) {
+    auto* msg = ev->Get();
+    const auto& record = msg->Record;
+
+    // This transaction may run in immediate mode
+    bool mayRunImmediate = record.txmode() == NKikimrDataEvents::TEvWrite::MODE_IMMEDIATE;
+
+    if (mayRunImmediate) {
+        // Enqueue immediate transactions so they don't starve existing operations
+        LWTRACK(ProposeTransactionEnqueue, msg->GetOrbit());
+        ProposeQueue.Enqueue(IEventHandle::Upcast<NEvents::TDataEvents::TEvWrite>(std::move(ev)), TAppData::TimeProvider->Now(), NextTieBreakerIndex++, ctx);
+        UpdateProposeQueueSize();
+    } else {
+        // Prepare planned transactions as soon as possible
+        Execute(new TTxWrite(this, std::move(ev), TAppData::TimeProvider->Now(), NextTieBreakerIndex++, /* delayed */ false), ctx);
+    }
+}
 
 void TDataShard::Handle(TEvTxProcessing::TEvPlanStep::TPtr &ev, const TActorContext &ctx) {
     ui64 srcMediatorId = ev->Get()->Record.GetMediatorID();
@@ -2849,18 +2896,47 @@ void TDataShard::Handle(TEvPrivate::TEvDelayedProposeTransaction::TPtr &ev, cons
 
         if (!item.Cancelled) {
             // N.B. we don't call ProposeQueue.Reset(), tx will Ack() on its first Execute()
-            Execute(new TTxProposeTransactionBase(this, std::move(item.Event), item.ReceivedAt, item.TieBreakerIndex, /* delayed */ true), ctx);
-            return;
+
+            switch (item.Event->GetTypeRewrite()) {
+                case TEvDataShard::TEvProposeTransaction::EventType: {
+                    auto event = IEventHandle::Downcast<TEvDataShard::TEvProposeTransaction>(std::move(item.Event));
+                    Execute(new TTxProposeTransactionBase(this, std::move(event), item.ReceivedAt, item.TieBreakerIndex, /* delayed */ true), ctx);
+                    return;
+                }
+                case NEvents::TDataEvents::TEvWrite::EventType: {
+                    auto event = IEventHandle::Downcast<NEvents::TDataEvents::TEvWrite>(std::move(item.Event));
+                    Execute(new TTxWrite(this, std::move(event), item.ReceivedAt, item.TieBreakerIndex, /* delayed */ true), ctx);
+                    return;
+                }
+                default:
+                    Y_FAIL_S("Unexpected event type " << item.Event->GetTypeRewrite());
+            }
         }
 
         TActorId target = item.Event->Sender;
         ui64 cookie = item.Event->Cookie;
-        auto kind = item.Event->Get()->GetTxKind();
-        auto txId = item.Event->Get()->GetTxId();
-        auto result = new TEvDataShard::TEvProposeTransactionResult(
-                kind, TabletID(), txId,
-                NKikimrTxDataShard::TEvProposeTransactionResult::CANCELLED);
-        ctx.Send(target, result, 0, cookie);
+        switch (item.Event->GetTypeRewrite()) {
+            case TEvDataShard::TEvProposeTransaction::EventType: {
+                auto* msg = item.Event->Get<TEvDataShard::TEvProposeTransaction>();
+                auto kind = msg->GetTxKind();
+                auto txId = msg->GetTxId();
+                auto result = new TEvDataShard::TEvProposeTransactionResult(
+                    kind, TabletID(), txId,
+                    NKikimrTxDataShard::TEvProposeTransactionResult::CANCELLED);
+                ctx.Send(target, result, 0, cookie);
+                return;
+            }
+            case NEvents::TDataEvents::TEvWrite::EventType: {
+                auto* msg = item.Event->Get<NEvents::TDataEvents::TEvWrite>();
+                auto result = NEvents::TDataEvents::TEvWriteResult::BuildError(TabletID(), msg->GetTxId(), NKikimrDataEvents::TEvWriteResult::STATUS_CANCELLED, "Canceled");
+                ctx.Send(target, result.release(), 0, cookie);
+                return;
+            }
+            default:
+                Y_FAIL_S("Unexpected event type " << item.Event->GetTypeRewrite());
+        }
+
+        
     }
 
     // N.B. Ack directly since we didn't start any delayed transactions
@@ -3226,15 +3302,22 @@ void TDataShard::WaitPredictedPlanStep(ui64 step) {
     }
 }
 
-bool TDataShard::CheckTxNeedWait(const TEvDataShard::TEvProposeTransaction::TPtr& ev) const {
-    auto* msg = ev->Get();
-
+bool TDataShard::CheckTxNeedWait() const {
     if (MvccSwitchState == TSwitchState::SWITCHING) {
         LOG_TRACE_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "New transaction needs to wait because of mvcc state switching");
         return true;
     }
 
-    auto &rec = ev->Get()->Record;
+    return false;
+}
+
+bool TDataShard::CheckTxNeedWait(const TEvDataShard::TEvProposeTransaction::TPtr& ev) const {
+    if (CheckTxNeedWait()) {
+        return true;
+    }
+
+    auto* msg = ev->Get();
+    auto& rec = msg->Record;
     if (rec.HasMvccSnapshot()) {
         TRowVersion rowVersion(rec.GetMvccSnapshot().GetStep(), rec.GetMvccSnapshot().GetTxId());
         TRowVersion unreadableEdge = Pipeline.GetUnreadableEdge(GetEnablePrioritizedMvccSnapshotReads());

+ 1 - 0
ydb/core/tx/datashard/datashard.h

@@ -4,6 +4,7 @@
 #include "datashard_s3_upload.h"
 
 #include <ydb/core/tx/tx.h>
+#include <ydb/core/tx/data_events/events.h>
 #include <ydb/core/tx/message_seqno.h>
 #include <ydb/core/base/domain.h>
 #include <ydb/core/base/row_version.h>

Some files were not shown because too many files changed in this diff