|
@@ -1,29 +1,43 @@
|
|
|
#pragma once
|
|
|
#include "common.h"
|
|
|
+#include <ydb/core/tx/columnshard/blobs_action/counters/read.h>
|
|
|
#include <ydb/core/tx/columnshard/blob.h>
|
|
|
#include <ydb/core/protos/base.pb.h>
|
|
|
+#include <ydb/library/conclusion/status.h>
|
|
|
#include <util/generic/hash_set.h>
|
|
|
|
|
|
namespace NKikimr::NOlap {
|
|
|
|
|
|
class IBlobsReadingAction: public ICommonBlobsAction {
|
|
|
+public:
|
|
|
+ using TErrorStatus = TConclusionSpecialStatus<NKikimrProto::EReplyStatus, NKikimrProto::EReplyStatus::OK, NKikimrProto::EReplyStatus::ERROR>;
|
|
|
private:
|
|
|
using TBase = ICommonBlobsAction;
|
|
|
+
|
|
|
THashMap<TUnifiedBlobId, THashSet<TBlobRange>> RangesForRead;
|
|
|
- THashSet<TBlobRange> WaitingRanges;
|
|
|
+ THashMap<TBlobRange, TString> RangesForResult;
|
|
|
+ THashMap<TBlobRange, TMonotonic> WaitingRanges;
|
|
|
THashMap<TBlobRange, TString> Replies;
|
|
|
- THashMap<TBlobRange, NKikimrProto::EReplyStatus> Fails;
|
|
|
+ THashMap<TBlobRange, TErrorStatus> Fails;
|
|
|
+ std::shared_ptr<NBlobOperations::TReadCounters> Counters;
|
|
|
bool Started = false;
|
|
|
protected:
|
|
|
virtual void DoStartReading(const THashMap<TUnifiedBlobId, THashSet<TBlobRange>>& range) = 0;
|
|
|
void StartReading(THashMap<TUnifiedBlobId, THashSet<TBlobRange>>&& ranges);
|
|
|
public:
|
|
|
+
|
|
|
+ void SetCounters(std::shared_ptr<NBlobOperations::TReadCounters> counters) {
|
|
|
+ Counters = counters;
|
|
|
+ }
|
|
|
+
|
|
|
IBlobsReadingAction(const TString& storageId)
|
|
|
: TBase(storageId)
|
|
|
{
|
|
|
|
|
|
}
|
|
|
|
|
|
+ void ExtractBlobsDataTo(THashMap<TBlobRange, TString>& result);
|
|
|
+
|
|
|
ui64 GetExpectedBlobsSize() const {
|
|
|
ui64 result = 0;
|
|
|
for (auto&& i : RangesForRead) {
|
|
@@ -31,6 +45,9 @@ public:
|
|
|
result += b.Size;
|
|
|
}
|
|
|
}
|
|
|
+ for (auto&& i : RangesForResult) {
|
|
|
+ result += i.first.Size;
|
|
|
+ }
|
|
|
return result;
|
|
|
}
|
|
|
|
|
@@ -39,7 +56,7 @@ public:
|
|
|
for (auto&& i : RangesForRead) {
|
|
|
result += i.second.size();
|
|
|
}
|
|
|
- return result;
|
|
|
+ return result + RangesForResult.size();
|
|
|
}
|
|
|
|
|
|
void FillExpectedRanges(THashSet<TBlobRange>& ranges) const {
|
|
@@ -48,52 +65,20 @@ public:
|
|
|
Y_VERIFY(ranges.emplace(b).second);
|
|
|
}
|
|
|
}
|
|
|
+ for (auto&& i : RangesForResult) {
|
|
|
+ Y_VERIFY(ranges.emplace(i.first).second);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
const THashMap<TUnifiedBlobId, THashSet<TBlobRange>>& GetRangesForRead() const {
|
|
|
return RangesForRead;
|
|
|
}
|
|
|
|
|
|
- void AddRange(const TBlobRange& range) {
|
|
|
- Y_VERIFY(!Started);
|
|
|
- Y_VERIFY(RangesForRead[range.BlobId].emplace(range).second);
|
|
|
- }
|
|
|
+ void AddRange(const TBlobRange& range, const TString& result = Default<TString>());
|
|
|
|
|
|
- void Start(const THashSet<TBlobRange>& rangesInProgress) {
|
|
|
- Y_VERIFY(!Started);
|
|
|
- Y_VERIFY(RangesForRead.size());
|
|
|
- for (auto&& i : RangesForRead) {
|
|
|
- for (auto&& r : i.second) {
|
|
|
- WaitingRanges.emplace(r);
|
|
|
- }
|
|
|
- }
|
|
|
- THashMap<TUnifiedBlobId, THashSet<TBlobRange>> rangesFiltered;
|
|
|
- if (rangesInProgress.empty()) {
|
|
|
- rangesFiltered = RangesForRead;
|
|
|
- } else {
|
|
|
- for (auto&& i : RangesForRead) {
|
|
|
- for (auto&& r : i.second) {
|
|
|
- if (!rangesInProgress.contains(r)) {
|
|
|
- rangesFiltered[r.BlobId].emplace(r);
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- if (rangesFiltered.size()) {
|
|
|
- StartReading(std::move(rangesFiltered));
|
|
|
- }
|
|
|
- Started = true;
|
|
|
- }
|
|
|
-
|
|
|
- void OnReadResult(const TBlobRange& range, const TString& data) {
|
|
|
- Y_VERIFY(WaitingRanges.erase(range));
|
|
|
- Replies.emplace(range, data);
|
|
|
- }
|
|
|
-
|
|
|
- void OnReadError(const TBlobRange& range, const NKikimrProto::EReplyStatus replyStatus) {
|
|
|
- Y_VERIFY(WaitingRanges.erase(range));
|
|
|
- Fails.emplace(range, replyStatus);
|
|
|
- }
|
|
|
+ void Start(const THashSet<TBlobRange>& rangesInProgress);
|
|
|
+ void OnReadResult(const TBlobRange& range, const TString& data);
|
|
|
+ void OnReadError(const TBlobRange& range, const TErrorStatus& replyStatus);
|
|
|
|
|
|
bool HasFails() const {
|
|
|
return Fails.size();
|