|
@@ -709,74 +709,48 @@ private:
|
|
|
protected:
|
|
|
virtual void DoOnDataReady() override {
|
|
|
TxEvent->IndexChanges->Blobs = std::move(ExtractBlobsData());
|
|
|
+ const bool isInsert = !!dynamic_pointer_cast<NOlap::TInsertColumnEngineChanges>(TxEvent->IndexChanges);
|
|
|
std::shared_ptr<NConveyor::ITask> task = std::make_shared<TChangesTask>(std::move(TxEvent), Counters, TabletId, ParentActorId);
|
|
|
- NConveyor::TCompServiceOperator::SendTaskToExecute(task);
|
|
|
+ if (isInsert) {
|
|
|
+ NConveyor::TInsertServiceOperator::SendTaskToExecute(task);
|
|
|
+ } else {
|
|
|
+ NConveyor::TCompServiceOperator::SendTaskToExecute(task);
|
|
|
+ }
|
|
|
}
|
|
|
- virtual bool DoOnError(const TBlobRange& /*range*/) override {
|
|
|
+ virtual bool DoOnError(const TBlobRange& range) override {
|
|
|
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "DoOnError")("blob_id", range);
|
|
|
+ AFL_VERIFY(false)("blob_id", range);
|
|
|
TxEvent->SetPutStatus(NKikimrProto::ERROR);
|
|
|
TActorContext::AsActorContext().Send(ParentActorId, std::move(TxEvent));
|
|
|
return false;
|
|
|
}
|
|
|
public:
|
|
|
TChangesReadTask(std::unique_ptr<TEvPrivate::TEvWriteIndex>&& event, const TActorId parentActorId, const ui64 tabletId, const TIndexationCounters& counters)
|
|
|
- : TBase(event->IndexChanges->GetReadingActions())
|
|
|
+ : TBase(event->IndexChanges->GetReadingActions(), event->IndexChanges->GetTaskIdentifier())
|
|
|
, ParentActorId(parentActorId)
|
|
|
, TabletId(tabletId)
|
|
|
, TxEvent(std::move(event))
|
|
|
, Counters(counters)
|
|
|
{
|
|
|
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "start_changes")("type", TxEvent->IndexChanges->TypeString())("task_id", TxEvent->IndexChanges->GetTaskIdentifier());
|
|
|
}
|
|
|
};
|
|
|
|
|
|
-void TColumnShard::SetupIndexation() {
|
|
|
- if (BackgroundController.IsIndexingActive()) {
|
|
|
- return;
|
|
|
- }
|
|
|
- CSCounters.OnSetupIndexation();
|
|
|
- ui32 blobs = 0;
|
|
|
- ui32 ignored = 0;
|
|
|
- ui64 size = 0;
|
|
|
- ui64 bytesToIndex = 0;
|
|
|
- std::vector<const NOlap::TInsertedData*> dataToIndex;
|
|
|
- dataToIndex.reserve(TLimits::MIN_SMALL_BLOBS_TO_INSERT);
|
|
|
- for (auto it = InsertTable->GetPathPriorities().rbegin(); it != InsertTable->GetPathPriorities().rend(); ++it) {
|
|
|
- for (auto* pathInfo : it->second) {
|
|
|
- for (auto& data : pathInfo->GetCommitted()) {
|
|
|
- ui32 dataSize = data.BlobSize();
|
|
|
- Y_VERIFY(dataSize);
|
|
|
-
|
|
|
- size += dataSize;
|
|
|
- ++blobs;
|
|
|
- bytesToIndex += dataSize;
|
|
|
- dataToIndex.push_back(&data);
|
|
|
- if (bytesToIndex >= (ui64)Limits.MaxInsertBytes) {
|
|
|
- break;
|
|
|
- }
|
|
|
- }
|
|
|
- if (bytesToIndex >= (ui64)Limits.MaxInsertBytes) {
|
|
|
- break;
|
|
|
- }
|
|
|
- }
|
|
|
- if (bytesToIndex >= (ui64)Limits.MaxInsertBytes) {
|
|
|
- break;
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- if (bytesToIndex < (ui64)Limits.MinInsertBytes && blobs < TLimits::MIN_SMALL_BLOBS_TO_INSERT) {
|
|
|
- LOG_S_DEBUG("Few data for indexation (" << bytesToIndex << " bytes in " << blobs << " blobs, ignored "
|
|
|
- << ignored << ") at tablet " << TabletID());
|
|
|
+void TColumnShard::StartIndexTask(std::vector<const NOlap::TInsertedData*>&& dataToIndex, const i64 bytesToIndex) {
|
|
|
+ if (bytesToIndex < Limits.MinInsertBytes && dataToIndex.size() < TLimits::MIN_SMALL_BLOBS_TO_INSERT) {
|
|
|
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "skip_indexation")("bytes", bytesToIndex)("blobs_count", dataToIndex.size());
|
|
|
|
|
|
if (!bytesToIndex || SkippedIndexations < TSettings::MAX_INDEXATIONS_TO_SKIP) {
|
|
|
++SkippedIndexations;
|
|
|
return;
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
CSCounters.IndexationInput(bytesToIndex);
|
|
|
SkippedIndexations = 0;
|
|
|
|
|
|
- LOG_S_DEBUG("Prepare indexing " << bytesToIndex << " bytes in " << dataToIndex.size() << " batches of committed "
|
|
|
- << size << " bytes in " << blobs << " blobs ignored " << ignored
|
|
|
- << " at tablet " << TabletID());
|
|
|
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "indexation")("bytes", bytesToIndex)("blobs_count", dataToIndex.size())("max_limit", (i64)Limits.MaxInsertBytes)
|
|
|
+ ("has_more", bytesToIndex >= Limits.MaxInsertBytes);
|
|
|
|
|
|
std::vector<NOlap::TInsertedData> data;
|
|
|
data.reserve(dataToIndex.size());
|
|
@@ -799,6 +773,32 @@ void TColumnShard::SetupIndexation() {
|
|
|
ActorContext().Send(BlobsReadActor, std::make_unique<NOlap::NBlobOperations::NRead::TEvStartReadTask>(std::make_unique<TChangesReadTask>(std::move(ev), SelfId(), TabletID(), IndexationCounters)));
|
|
|
}
|
|
|
|
|
|
+void TColumnShard::SetupIndexation() {
|
|
|
+ if (BackgroundController.IsIndexingActive()) {
|
|
|
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "skip_indexation")("reason", "in_progress");
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ CSCounters.OnSetupIndexation();
|
|
|
+ i64 bytesToIndex = 0;
|
|
|
+ std::vector<const NOlap::TInsertedData*> dataToIndex;
|
|
|
+ dataToIndex.reserve(TLimits::MIN_SMALL_BLOBS_TO_INSERT);
|
|
|
+ for (auto it = InsertTable->GetPathPriorities().rbegin(); it != InsertTable->GetPathPriorities().rend(); ++it) {
|
|
|
+ for (auto* pathInfo : it->second) {
|
|
|
+ for (auto& data : pathInfo->GetCommitted()) {
|
|
|
+ Y_VERIFY(data.BlobSize());
|
|
|
+ bytesToIndex += data.BlobSize();
|
|
|
+ dataToIndex.push_back(&data);
|
|
|
+ if (bytesToIndex >= Limits.MaxInsertBytes) {
|
|
|
+ StartIndexTask(std::move(dataToIndex), bytesToIndex);
|
|
|
+ dataToIndex.clear();
|
|
|
+ bytesToIndex = 0;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ StartIndexTask(std::move(dataToIndex), bytesToIndex);
|
|
|
+}
|
|
|
+
|
|
|
void TColumnShard::SetupCompaction() {
|
|
|
CSCounters.OnSetupCompaction();
|
|
|
|
|
@@ -807,9 +807,7 @@ void TColumnShard::SetupCompaction() {
|
|
|
auto limits = CompactionLimits.Get();
|
|
|
auto indexChanges = TablesManager.MutablePrimaryIndex().StartCompaction(limits, BackgroundController.GetConflictCompactionPortions());
|
|
|
if (!indexChanges) {
|
|
|
- if (!BackgroundController.GetCompactionsCount()) {
|
|
|
- LOG_S_DEBUG("Compaction not started: cannot prepare compaction at tablet " << TabletID());
|
|
|
- }
|
|
|
+ LOG_S_DEBUG("Compaction not started: cannot prepare compaction at tablet " << TabletID());
|
|
|
break;
|
|
|
}
|
|
|
|