|
@@ -184,7 +184,6 @@ private:
|
|
|
bool InclusiveTo;
|
|
|
ui64 RowsLimit = 100000;
|
|
|
ui64 BytesLimit = 1024*1024;
|
|
|
- ui64 Restarts = 0;
|
|
|
TRowVersion ReadVersion = TRowVersion::Max();
|
|
|
|
|
|
public:
|
|
@@ -200,18 +199,7 @@ public:
|
|
|
|
|
|
TTxType GetTxType() const override { return TXTYPE_READ_COLUMNS; }
|
|
|
|
|
|
- bool Precharge(NTable::TDatabase& db, ui32 localTid, const TVector<NTable::TTag>& valueColumns) {
|
|
|
- bool ready = db.Precharge(localTid,
|
|
|
- KeyFrom,
|
|
|
- KeyTo,
|
|
|
- valueColumns,
|
|
|
- 0,
|
|
|
- RowsLimit, BytesLimit,
|
|
|
- NTable::EDirection::Forward, ReadVersion);
|
|
|
- return ready;
|
|
|
- }
|
|
|
-
|
|
|
- bool Execute(TTransactionContext& txc, const TActorContext& ctx) override {
|
|
|
+ bool Execute(TTransactionContext&, const TActorContext& ctx) override {
|
|
|
// FIXME: we need to transform HEAD into some non-repeatable snapshot here
|
|
|
if (!ReadVersion.IsMax() && Self->GetVolatileTxManager().HasVolatileTxsAtSnapshot(ReadVersion)) {
|
|
|
Self->GetVolatileTxManager().AttachWaitingSnapshotEvent(
|
|
@@ -223,38 +211,13 @@ public:
|
|
|
|
|
|
Result = new TEvDataShard::TEvReadColumnsResponse(Self->TabletID());
|
|
|
|
|
|
- bool useScan = Self->ReadColumnsScanEnabled;
|
|
|
-
|
|
|
if (Self->IsFollower()) {
|
|
|
- NKikimrTxDataShard::TError::EKind status = NKikimrTxDataShard::TError::OK;
|
|
|
- TString errMessage;
|
|
|
-
|
|
|
- if (!Self->SyncSchemeOnFollower(txc, ctx, status, errMessage))
|
|
|
- return false;
|
|
|
-
|
|
|
- if (status != NKikimrTxDataShard::TError::OK) {
|
|
|
- SetError(status, errMessage);
|
|
|
- return true;
|
|
|
- }
|
|
|
-
|
|
|
- if (!ReadVersion.IsMax()) {
|
|
|
- NIceDb::TNiceDb db(txc.DB);
|
|
|
- TRowVersion lastCompleteTx;
|
|
|
- if (!TDataShard::SysGetUi64(db, Schema::Sys_LastCompleteStep, lastCompleteTx.Step))
|
|
|
- return false;
|
|
|
- if (!TDataShard::SysGetUi64(db, Schema::Sys_LastCompleteTx, lastCompleteTx.TxId))
|
|
|
- return false;
|
|
|
-
|
|
|
- if (ReadVersion > lastCompleteTx) {
|
|
|
- SetError(NKikimrTxDataShard::TError::WRONG_SHARD_STATE,
|
|
|
- TStringBuilder() << "RO replica last version " << lastCompleteTx
|
|
|
- << " lags behind the requested snapshot " << ReadVersion
|
|
|
- << " shard " << Self->TabletID());
|
|
|
- return true;
|
|
|
- }
|
|
|
- }
|
|
|
+ // Note: this request is no longer supported, and it has never been used with followers
|
|
|
+ NKikimrTxDataShard::TError::EKind status = NKikimrTxDataShard::TError::WRONG_SHARD_STATE;
|
|
|
+ TString errMessage = "followers are not supported";
|
|
|
|
|
|
- useScan = false;
|
|
|
+ SetError(status, errMessage);
|
|
|
+ return true;
|
|
|
}
|
|
|
|
|
|
LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " Read columns: " << Ev->Get()->Record);
|
|
@@ -330,26 +293,6 @@ public:
|
|
|
|
|
|
TSerializedCellVec toKeyCells;
|
|
|
|
|
|
- if (!useScan) {
|
|
|
- // Use histogram to limit the range for single request
|
|
|
- const auto& sizeHistogram = tableInfo.Stats.DataStats.DataSizeHistogram;
|
|
|
- auto histIt = LowerBound(sizeHistogram.begin(), sizeHistogram.end(), fromKeyCells,
|
|
|
- [&tableInfo] (const NTable::TBucket& bucket, const TSerializedCellVec& key) {
|
|
|
- TSerializedCellVec bk(bucket.EndKey);
|
|
|
- return CompareTypedCellVectors(
|
|
|
- bk.GetCells().data(), key.GetCells().data(),
|
|
|
- tableInfo.KeyColumnTypes.data(),
|
|
|
- bk.GetCells().size(), key.GetCells().size()) < 0;
|
|
|
- });
|
|
|
-
|
|
|
- if (histIt != sizeHistogram.end() && ++histIt != sizeHistogram.end()) {
|
|
|
- toKeyCells.Parse(histIt->EndKey);
|
|
|
- for (ui32 i = 0; i < toKeyCells.GetCells().size(); ++i) {
|
|
|
- KeyTo.push_back(TRawTypeValue(toKeyCells.GetCells()[i].AsRef(), tableInfo.KeyColumnTypes[i]));
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
TVector<NTable::TTag> valueColumns;
|
|
|
TVector<NScheme::TTypeInfo> valueColumnTypes;
|
|
|
TVector<std::pair<TString, NScheme::TTypeInfo>> columns;
|
|
@@ -384,121 +327,35 @@ public:
|
|
|
|
|
|
tableInfo.Stats.AccessTime = TAppData::TimeProvider->Now();
|
|
|
|
|
|
- if (useScan) {
|
|
|
- if (snapshotKey) {
|
|
|
- if (!Self->GetSnapshotManager().AcquireReference(*snapshotKey)) {
|
|
|
- SetError(NKikimrTxDataShard::TError::SNAPSHOT_NOT_EXIST,
|
|
|
- TStringBuilder() << "Table id " << tableId << " has no snapshot at " << ReadVersion
|
|
|
- << " shard " << Self->TabletID() << (Self->IsFollower() ? " RO replica" : ""));
|
|
|
- return true;
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- auto* scan = new TReadColumnsScan(TKeyBoundary{fromKeyCells, InclusiveFrom},
|
|
|
- TKeyBoundary{toKeyCells, InclusiveTo},
|
|
|
- valueColumns, valueColumnTypes,
|
|
|
- std::move(blockBuilder), RowsLimit, BytesLimit,
|
|
|
- TKeyBoundary{tableInfo.Range.To, tableInfo.Range.ToInclusive},
|
|
|
- Ev->Sender, ctx.SelfID,
|
|
|
- snapshotKey,
|
|
|
- tableInfo.Path,
|
|
|
- Self->TabletID());
|
|
|
- auto opts = TScanOptions()
|
|
|
- .SetResourceBroker("scan", 10)
|
|
|
- .SetSnapshotRowVersion(ReadVersion)
|
|
|
- .SetActorPoolId(Self->ReadColumnsScanInUserPool ? AppData(ctx)->UserPoolId : AppData(ctx)->BatchPoolId)
|
|
|
- .SetReadAhead(512*1024, 1024*1024)
|
|
|
- .SetReadPrio(TScanOptions::EReadPrio::Low);
|
|
|
-
|
|
|
- ui64 cookie = -1; // Should be ignored
|
|
|
- Self->QueueScan(localTableId, scan, cookie, opts);
|
|
|
-
|
|
|
- Result.Destroy(); // Scan is now responsible for sending the result
|
|
|
-
|
|
|
- return true;
|
|
|
- }
|
|
|
-
|
|
|
- // TODO: make sure KeyFrom and KeyTo properly reference non-inline cells data
|
|
|
-
|
|
|
- if (!Precharge(txc.DB, localTableId, valueColumns))
|
|
|
- return false;
|
|
|
-
|
|
|
- size_t rows = 0;
|
|
|
- size_t bytes = 0;
|
|
|
- bool shardFinished = false;
|
|
|
-
|
|
|
- {
|
|
|
- NTable::TKeyRange iterRange;
|
|
|
- iterRange.MinKey = KeyFrom;
|
|
|
- iterRange.MinInclusive = InclusiveFrom;
|
|
|
-
|
|
|
- auto iter = txc.DB.IterateRange(localTableId, iterRange, valueColumns, ReadVersion);
|
|
|
-
|
|
|
- TString lastKeySerialized;
|
|
|
- bool lastKeyInclusive = true;
|
|
|
- while (iter->Next(NTable::ENext::All) == NTable::EReady::Data) {
|
|
|
- TDbTupleRef rowKey = iter->GetKey();
|
|
|
- lastKeySerialized = TSerializedCellVec::Serialize(rowKey.Cells());
|
|
|
-
|
|
|
- // Compare current row with right boundary
|
|
|
- int cmp = -1;// CompareTypedCellVectors(tuple.Columns, KeyTo.data(), tuple.Types, KeyTo.size());
|
|
|
-
|
|
|
- if (cmp == 0 && KeyTo.size() < rowKey.ColumnCount) {
|
|
|
- cmp = -1;
|
|
|
- }
|
|
|
- if (InclusiveTo) {
|
|
|
- if (cmp > 0)
|
|
|
- break; // Stop iff greater(cmp > 0)
|
|
|
- } else {
|
|
|
- if (cmp >= 0)
|
|
|
- break; // Stop iff equal(cmp == 0) or greater(cmp > 0)
|
|
|
- }
|
|
|
-
|
|
|
- // Skip erased row
|
|
|
- if (iter->Row().GetRowState() == NTable::ERowOp::Erase) {
|
|
|
- continue;
|
|
|
- }
|
|
|
-
|
|
|
- TDbTupleRef rowValues = iter->GetValues();
|
|
|
-
|
|
|
- blockBuilder->AddRow(rowKey, rowValues);
|
|
|
-
|
|
|
- rows++;
|
|
|
- bytes = blockBuilder->Bytes();
|
|
|
-
|
|
|
- if (rows >= RowsLimit || bytes >= BytesLimit)
|
|
|
- break;
|
|
|
- }
|
|
|
-
|
|
|
- // We don't want to do many restarts if pages weren't precharged
|
|
|
- // So we just return whatever we read so far and the client can request more rows
|
|
|
- if (iter->Last() == NTable::EReady::Page && rows < 1000 && bytes < 100000 && Restarts < 1) {
|
|
|
- ++Restarts;
|
|
|
- return false;
|
|
|
- }
|
|
|
-
|
|
|
- if (iter->Last() == NTable::EReady::Gone) {
|
|
|
- shardFinished = true;
|
|
|
- lastKeySerialized = tableInfo.Range.To.GetBuffer();
|
|
|
- lastKeyInclusive = tableInfo.Range.ToInclusive;
|
|
|
+ if (snapshotKey) {
|
|
|
+ if (!Self->GetSnapshotManager().AcquireReference(*snapshotKey)) {
|
|
|
+ SetError(NKikimrTxDataShard::TError::SNAPSHOT_NOT_EXIST,
|
|
|
+ TStringBuilder() << "Table id " << tableId << " has no snapshot at " << ReadVersion
|
|
|
+ << " shard " << Self->TabletID() << (Self->IsFollower() ? " RO replica" : ""));
|
|
|
+ return true;
|
|
|
}
|
|
|
-
|
|
|
- TString buffer = blockBuilder->Finish();
|
|
|
- buffer.resize(blockBuilder->Bytes());
|
|
|
-
|
|
|
- Result->Record.SetBlocks(buffer);
|
|
|
- Result->Record.SetLastKey(lastKeySerialized);
|
|
|
- Result->Record.SetLastKeyInclusive(lastKeyInclusive);
|
|
|
- Result->Record.SetEndOfShard(shardFinished);
|
|
|
}
|
|
|
|
|
|
- Self->IncCounter(COUNTER_READ_COLUMNS_ROWS, rows);
|
|
|
- Self->IncCounter(COUNTER_READ_COLUMNS_BYTES, bytes);
|
|
|
-
|
|
|
- LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID()
|
|
|
- << " Read columns result for table [" << tableInfo.Path << "]: "
|
|
|
- << rows << " rows, " << bytes << " bytes (event size "
|
|
|
- << Result->Record.GetBlocks().size() << ") shardFinished: " << shardFinished);
|
|
|
+ auto* scan = new TReadColumnsScan(TKeyBoundary{fromKeyCells, InclusiveFrom},
|
|
|
+ TKeyBoundary{toKeyCells, InclusiveTo},
|
|
|
+ valueColumns, valueColumnTypes,
|
|
|
+ std::move(blockBuilder), RowsLimit, BytesLimit,
|
|
|
+ TKeyBoundary{tableInfo.Range.To, tableInfo.Range.ToInclusive},
|
|
|
+ Ev->Sender, ctx.SelfID,
|
|
|
+ snapshotKey,
|
|
|
+ tableInfo.Path,
|
|
|
+ Self->TabletID());
|
|
|
+ auto opts = TScanOptions()
|
|
|
+ .SetResourceBroker("scan", 10)
|
|
|
+ .SetSnapshotRowVersion(ReadVersion)
|
|
|
+ .SetActorPoolId(AppData(ctx)->BatchPoolId)
|
|
|
+ .SetReadAhead(512*1024, 1024*1024)
|
|
|
+ .SetReadPrio(TScanOptions::EReadPrio::Low);
|
|
|
+
|
|
|
+ ui64 cookie = -1; // Should be ignored
|
|
|
+ Self->QueueScan(localTableId, scan, cookie, opts);
|
|
|
+
|
|
|
+ Result.Destroy(); // Scan is now responsible for sending the result
|
|
|
|
|
|
return true;
|
|
|
}
|