|
@@ -3,6 +3,7 @@
|
|
|
#include "compaction/column_cursor.h"
|
|
|
#include "compaction/merge_context.h"
|
|
|
#include "compaction/merged_column.h"
|
|
|
+#include "counters/general.h"
|
|
|
|
|
|
#include <ydb/core/tx/columnshard/columnshard_impl.h>
|
|
|
#include <ydb/core/tx/columnshard/engines/portions/with_blobs.h>
|
|
@@ -18,11 +19,27 @@ TConclusionStatus TGeneralCompactColumnEngineChanges::DoConstructBlobs(TConstruc
|
|
|
std::vector<TPortionInfoWithBlobs> portions = TPortionInfoWithBlobs::RestorePortions(SwitchedPortions, Blobs);
|
|
|
Blobs.clear();
|
|
|
std::optional<TSnapshot> maxSnapshot;
|
|
|
+ i64 portionsSize = 0;
|
|
|
+ i64 portionsCount = 0;
|
|
|
+ i64 insertedPortionsSize = 0;
|
|
|
+ i64 compactedPortionsSize = 0;
|
|
|
+ i64 otherPortionsSize = 0;
|
|
|
for (auto&& i : SwitchedPortions) {
|
|
|
if (!maxSnapshot || *maxSnapshot < i.GetMinSnapshot()) {
|
|
|
maxSnapshot = i.GetMinSnapshot();
|
|
|
}
|
|
|
+ if (i.GetMeta().GetProduced() == TPortionMeta::EProduced::INSERTED) {
|
|
|
+ insertedPortionsSize += i.GetBlobBytes();
|
|
|
+ } else if (i.GetMeta().GetProduced() == TPortionMeta::EProduced::SPLIT_COMPACTED) {
|
|
|
+ compactedPortionsSize += i.GetBlobBytes();
|
|
|
+ } else {
|
|
|
+ otherPortionsSize += i.GetBlobBytes();
|
|
|
+ }
|
|
|
+ portionsSize += i.GetBlobBytes();
|
|
|
+ ++portionsCount;
|
|
|
}
|
|
|
+ NChanges::TGeneralCompactionCounters::OnPortionsKind(insertedPortionsSize, compactedPortionsSize, otherPortionsSize);
|
|
|
+ NChanges::TGeneralCompactionCounters::OnRepackPortions(portionsCount, portionsSize);
|
|
|
Y_ABORT_UNLESS(maxSnapshot);
|
|
|
|
|
|
static const TString portionIdFieldName = "$$__portion_id";
|
|
@@ -208,8 +225,8 @@ NColumnShard::ECumulativeCounters TGeneralCompactColumnEngineChanges::GetCounter
|
|
|
return isSuccess ? NColumnShard::COUNTER_COMPACTION_SUCCESS : NColumnShard::COUNTER_COMPACTION_FAIL;
|
|
|
}
|
|
|
|
|
|
-void TGeneralCompactColumnEngineChanges::AddCheckPoint(const NIndexedReader::TSortableBatchPosition& position, const bool include) {
|
|
|
- AFL_VERIFY(CheckPoints.emplace(position, include).second);
|
|
|
+void TGeneralCompactColumnEngineChanges::AddCheckPoint(const NIndexedReader::TSortableBatchPosition& position, const bool include, const bool validationDuplications) {
|
|
|
+ AFL_VERIFY(CheckPoints.emplace(position, include).second || !validationDuplications);
|
|
|
}
|
|
|
|
|
|
}
|