|
@@ -26,11 +26,11 @@
|
|
|
namespace NKikimr {
|
|
|
namespace NSchemeBoard {
|
|
|
|
|
|
-#define SBR_LOG_T(stream) SB_LOG_T(SCHEME_BOARD_REPLICA, stream)
|
|
|
-#define SBR_LOG_D(stream) SB_LOG_D(SCHEME_BOARD_REPLICA, stream)
|
|
|
-#define SBR_LOG_I(stream) SB_LOG_I(SCHEME_BOARD_REPLICA, stream)
|
|
|
-#define SBR_LOG_N(stream) SB_LOG_N(SCHEME_BOARD_REPLICA, stream)
|
|
|
-#define SBR_LOG_E(stream) SB_LOG_E(SCHEME_BOARD_REPLICA, stream)
|
|
|
+#define SBR_LOG_T(stream) SB_LOG_T(SCHEME_BOARD_REPLICA, "" << SelfId() << " " << stream)
|
|
|
+#define SBR_LOG_D(stream) SB_LOG_D(SCHEME_BOARD_REPLICA, "" << SelfId() << " " << stream)
|
|
|
+#define SBR_LOG_I(stream) SB_LOG_I(SCHEME_BOARD_REPLICA, "" << SelfId() << " " << stream)
|
|
|
+#define SBR_LOG_N(stream) SB_LOG_N(SCHEME_BOARD_REPLICA, "" << SelfId() << " " << stream)
|
|
|
+#define SBR_LOG_E(stream) SB_LOG_E(SCHEME_BOARD_REPLICA, "" << SelfId() << " " << stream)
|
|
|
|
|
|
class TReplica: public TMonitorableActor<TReplica> {
|
|
|
using TDescribeSchemeResult = NKikimrScheme::TEvDescribeSchemeResult;
|
|
@@ -53,6 +53,12 @@ class TReplica: public TMonitorableActor<TReplica> {
|
|
|
: Owner(owner)
|
|
|
{
|
|
|
}
|
|
|
+
|
|
|
+ TString ToString() const override {
|
|
|
+ return TStringBuilder() << ToStringHeader() << " {"
|
|
|
+ << " Owner: " << Owner
|
|
|
+ << " }";
|
|
|
+ }
|
|
|
};
|
|
|
};
|
|
|
|
|
@@ -226,6 +232,10 @@ public:
|
|
|
other.TrackMemory();
|
|
|
}
|
|
|
|
|
|
+ auto SelfId() const {
|
|
|
+ return Owner->SelfId();
|
|
|
+ }
|
|
|
+
|
|
|
public:
|
|
|
explicit TDescription(TReplica* owner, const TString& path)
|
|
|
: Owner(owner)
|
|
@@ -288,8 +298,7 @@ public:
|
|
|
TDescription(const TDescription& other) = delete;
|
|
|
TDescription& operator=(const TDescription& other) = delete;
|
|
|
|
|
|
- ~TDescription()
|
|
|
- {
|
|
|
+ ~TDescription() {
|
|
|
UntrackMemory();
|
|
|
}
|
|
|
|
|
@@ -321,13 +330,8 @@ public:
|
|
|
<< ", other# " << other.ToString());
|
|
|
|
|
|
SBR_LOG_T("Merge descriptions"
|
|
|
- << ": self# " << Owner->SelfId()
|
|
|
- << ", left path# " << Path
|
|
|
- << ", left pathId# " << PathId
|
|
|
- << ", left version# " << GetVersion()
|
|
|
- << ", rigth path# " << other.Path
|
|
|
- << ", rigth pathId# " << other.PathId
|
|
|
- << ", rigth version# " << other.GetVersion());
|
|
|
+ << ": self# " << ToLogString()
|
|
|
+ << ", other# " << other.ToLogString());
|
|
|
|
|
|
UntrackMemory();
|
|
|
other.UntrackMemory();
|
|
@@ -374,6 +378,15 @@ public:
|
|
|
<< " }";
|
|
|
}
|
|
|
|
|
|
+ TString ToLogString() const {
|
|
|
+ return TStringBuilder() << "{"
|
|
|
+ << " Path# " << Path
|
|
|
+ << " PathId# " << PathId
|
|
|
+ << " Version# " << GetVersion()
|
|
|
+ << " ExplicitlyDeleted# " << (ExplicitlyDeleted ? "true" : "false")
|
|
|
+ << " }";
|
|
|
+ }
|
|
|
+
|
|
|
const TString& GetPath() const {
|
|
|
return Path;
|
|
|
}
|
|
@@ -534,26 +547,24 @@ private:
|
|
|
template <typename TPath>
|
|
|
TDescription& UpsertDescription(const TPath& path) {
|
|
|
SBR_LOG_I("Upsert description"
|
|
|
- << ": self# " << SelfId()
|
|
|
- << ", path# " << path);
|
|
|
+ << ": path# " << path);
|
|
|
|
|
|
return Descriptions.Upsert(path, TDescription(this, path));
|
|
|
}
|
|
|
|
|
|
TDescription& UpsertDescription(const TString& path, const TPathId& pathId) {
|
|
|
SBR_LOG_I("Upsert description"
|
|
|
- << ": self# " << SelfId()
|
|
|
- << ", path# " << path
|
|
|
+ << ": path# " << path
|
|
|
<< ", pathId# " << pathId);
|
|
|
|
|
|
return Descriptions.Upsert(path, pathId, TDescription(this, path, pathId));
|
|
|
}
|
|
|
|
|
|
template <typename TPath>
|
|
|
- TDescription& UpsertDescription(const TPath path, TDescription&& description) {
|
|
|
+ TDescription& UpsertDescription(const TPath& path, TDescription&& description) {
|
|
|
SBR_LOG_I("Upsert description"
|
|
|
- << ": self# " << SelfId()
|
|
|
- << ", path# " << path);
|
|
|
+ << ": path# " << path
|
|
|
+ << ", desc# " << description.ToLogString());
|
|
|
|
|
|
return Descriptions.Upsert(path, std::move(description));
|
|
|
}
|
|
@@ -564,8 +575,7 @@ private:
|
|
|
TDescribeSchemeResult&& describeSchemeResult
|
|
|
) {
|
|
|
SBR_LOG_I("Upsert description"
|
|
|
- << ": self# " << SelfId()
|
|
|
- << ", path# " << path
|
|
|
+ << ": path# " << path
|
|
|
<< ", pathId# " << pathId);
|
|
|
|
|
|
return Descriptions.Upsert(path, pathId, TDescription(this, path, pathId, std::move(describeSchemeResult)));
|
|
@@ -590,8 +600,7 @@ private:
|
|
|
auto path = desc->GetPath();
|
|
|
|
|
|
SBR_LOG_I("Delete description"
|
|
|
- << ": self# " << SelfId()
|
|
|
- << ", path# " << path
|
|
|
+ << ": path# " << path
|
|
|
<< ", pathId# " << pathId);
|
|
|
|
|
|
if (TDescription* descByPath = Descriptions.FindPtr(path)) {
|
|
@@ -601,10 +610,9 @@ private:
|
|
|
auto curDomainId = descByPath->GetDomainId();
|
|
|
auto domainId = desc->GetDomainId();
|
|
|
|
|
|
- if (curDomainId == pathId) { //Deletion from GSS
|
|
|
+ if (curDomainId == pathId) { // Deletion from GSS
|
|
|
SBR_LOG_N("Delete description by GSS"
|
|
|
- << ": self# " << SelfId()
|
|
|
- << ", path# " << path
|
|
|
+ << ": path# " << path
|
|
|
<< ", pathId# " << pathId
|
|
|
<< ", domainId# " << domainId
|
|
|
<< ", curPathId# " << curPathId
|
|
@@ -660,9 +668,8 @@ private:
|
|
|
TDescription* desc = Descriptions.FindPtr(path);
|
|
|
Y_VERIFY(desc);
|
|
|
|
|
|
- SBR_LOG_N("Subscribe"
|
|
|
- << ": self# " << SelfId()
|
|
|
- << ", subscriber# " << subscriber
|
|
|
+ SBR_LOG_I("Subscribe"
|
|
|
+ << ": subscriber# " << subscriber
|
|
|
<< ", path# " << path
|
|
|
<< ", domainOwnerId# " << domainOwnerId
|
|
|
<< ", capabilities# " << capabilities.ShortDebugString());
|
|
@@ -679,9 +686,8 @@ private:
|
|
|
TDescription* desc = Descriptions.FindPtr(path);
|
|
|
Y_VERIFY(desc);
|
|
|
|
|
|
- SBR_LOG_N("Unsubscribe"
|
|
|
- << ": self# " << SelfId()
|
|
|
- << ", subscriber# " << subscriber
|
|
|
+ SBR_LOG_I("Unsubscribe"
|
|
|
+ << ": subscriber# " << subscriber
|
|
|
<< ", path# " << path);
|
|
|
|
|
|
desc->Unsubscribe(subscriber);
|
|
@@ -770,22 +776,17 @@ private:
|
|
|
}
|
|
|
|
|
|
void Handle(TSchemeBoardEvents::TEvHandshakeRequest::TPtr& ev) {
|
|
|
- const auto& record = ev->Get()->Record;
|
|
|
+ SBR_LOG_D("Handle " << ev->Get()->ToString()
|
|
|
+ << ": sender# " << ev->Sender);
|
|
|
|
|
|
+ const auto& record = ev->Get()->Record;
|
|
|
const ui64 owner = record.GetOwner();
|
|
|
const ui64 generation = record.GetGeneration();
|
|
|
|
|
|
- SBR_LOG_D("Handle TSchemeBoardEvents::TEvHandshakeRequest"
|
|
|
- << ": self# " << SelfId()
|
|
|
- << ", sender# " << ev->Sender
|
|
|
- << ", owner# " << owner
|
|
|
- << ", generation# " << generation);
|
|
|
-
|
|
|
TPopulatorInfo& info = Populators[owner];
|
|
|
if (generation < info.PendingGeneration) {
|
|
|
SBR_LOG_E("Reject handshake from stale populator"
|
|
|
- << ": self# " << SelfId()
|
|
|
- << ", sender# " << ev->Sender
|
|
|
+ << ": sender# " << ev->Sender
|
|
|
<< ", owner# " << owner
|
|
|
<< ", generation# " << generation
|
|
|
<< ", pending generation# " << info.PendingGeneration);
|
|
@@ -793,8 +794,7 @@ private:
|
|
|
}
|
|
|
|
|
|
SBR_LOG_N("Successful handshake"
|
|
|
- << ": self# " << SelfId()
|
|
|
- << ", owner# " << owner
|
|
|
+ << ": owner# " << owner
|
|
|
<< ", generation# " << generation);
|
|
|
|
|
|
info.PendingGeneration = generation;
|
|
@@ -804,32 +804,25 @@ private:
|
|
|
}
|
|
|
|
|
|
void Handle(TSchemeBoardEvents::TEvUpdate::TPtr& ev) {
|
|
|
- auto& record = *ev->Get()->MutableRecord();
|
|
|
+ SBR_LOG_D("Handle " << ev->Get()->ToString()
|
|
|
+ << ": sender# " << ev->Sender
|
|
|
+ << ", cookie# " << ev->Cookie);
|
|
|
|
|
|
+ auto& record = *ev->Get()->MutableRecord();
|
|
|
const ui64 owner = record.GetOwner();
|
|
|
const ui64 generation = record.GetGeneration();
|
|
|
|
|
|
- SBR_LOG_D("Handle TSchemeBoardEvents::TEvUpdate"
|
|
|
- << ": self# " << SelfId()
|
|
|
- << ", sender# " << ev->Sender
|
|
|
- << ", cookie# " << ev->Cookie
|
|
|
- << ", owner# " << owner
|
|
|
- << ", generation# " << generation);
|
|
|
- SBR_LOG_T("Message:\n" << ev->Get()->ToString().substr(0, 10000));
|
|
|
-
|
|
|
const auto populatorIt = Populators.find(owner);
|
|
|
if (populatorIt == Populators.end()) {
|
|
|
SBR_LOG_E("Reject update from unknown populator"
|
|
|
- << ": self# " << SelfId()
|
|
|
- << ", sender# " << ev->Sender
|
|
|
+ << ": sender# " << ev->Sender
|
|
|
<< ", owner# " << owner
|
|
|
<< ", generation# " << generation);
|
|
|
return;
|
|
|
}
|
|
|
if (generation != populatorIt->second.PendingGeneration) {
|
|
|
SBR_LOG_E("Reject update from stale populator"
|
|
|
- << ": self# " << SelfId()
|
|
|
- << ", sender# " << ev->Sender
|
|
|
+ << ": sender# " << ev->Sender
|
|
|
<< ", owner# " << owner
|
|
|
<< ", generation# " << generation
|
|
|
<< ", pending generation# " << populatorIt->second.PendingGeneration);
|
|
@@ -850,8 +843,7 @@ private:
|
|
|
const TPathId pathId = ev->Get()->GetPathId();
|
|
|
|
|
|
SBR_LOG_N("Update description"
|
|
|
- << ": self# " << SelfId()
|
|
|
- << ", path# " << path
|
|
|
+ << ": path# " << path
|
|
|
<< ", pathId# " << pathId
|
|
|
<< ", deletion# " << (record.GetIsDeletion() ? "true" : "false"));
|
|
|
|
|
@@ -863,8 +855,7 @@ private:
|
|
|
if (TDescription* desc = Descriptions.FindPtr(pathId)) {
|
|
|
if (desc->IsExplicitlyDeleted()) {
|
|
|
SBR_LOG_N("Path was explicitly deleted, ignoring"
|
|
|
- << ": self# " << SelfId()
|
|
|
- << ", path# " << path
|
|
|
+ << ": path# " << path
|
|
|
<< ", pathId# " << pathId);
|
|
|
|
|
|
return AckUpdate(ev);
|
|
@@ -899,97 +890,64 @@ private:
|
|
|
return AckUpdate(ev);
|
|
|
}
|
|
|
|
|
|
- Y_VERIFY_S(desc->IsFilled(), "desc :"
|
|
|
- << ": path# " << desc->GetPath()
|
|
|
- << ", pathId# " << desc->GetPathId()
|
|
|
- << ", domainId# " << desc->GetDomainId()
|
|
|
- << ", version# " << desc->GetVersion());
|
|
|
+ Y_VERIFY_S(desc->IsFilled(), "Description is not filled"
|
|
|
+ << ": desc# " << desc->ToLogString());
|
|
|
|
|
|
auto curDomainId = desc->GetDomainId();
|
|
|
auto domainId = GetDomainId(record.GetDescribeSchemeResult());
|
|
|
|
|
|
- if (curPathId == domainId) { //Update from TSS, GSS->TSS
|
|
|
+ auto log = [&](const TString& message) {
|
|
|
+ SBR_LOG_N("" << message
|
|
|
+ << ": path# " << path
|
|
|
+ << ", pathId# " << pathId
|
|
|
+ << ", domainId# " << domainId
|
|
|
+ << ", curPathId# " << curPathId
|
|
|
+ << ", curDomainId# " << curDomainId);
|
|
|
+ };
|
|
|
|
|
|
+ if (curPathId == domainId) { // Update from TSS, GSS->TSS
|
|
|
// it is only because we need to manage undo of upgrade subdomain, finally remove it
|
|
|
auto abandonedSchemeShards = desc->GetAbandonedSchemeShardIds();
|
|
|
- if (abandonedSchemeShards.contains(pathId.OwnerId)) { //TSS is ignored, present GSS reverted it
|
|
|
- SBR_LOG_N("Replace GSS by TSS description is rejected, GSS implicitly knows that TSS has been reverted"
|
|
|
- ", but still inject description only by pathId for safe"
|
|
|
- << ": self# " << SelfId()
|
|
|
- << ", path# " << path
|
|
|
- << ", pathId# " << pathId
|
|
|
- << ", domainId# " << domainId
|
|
|
- << ", curPathId# " << curPathId
|
|
|
- << ", curDomainId# " << curDomainId);
|
|
|
+ if (abandonedSchemeShards.contains(pathId.OwnerId)) { // TSS is ignored, present GSS reverted it
|
|
|
+ log("Replace GSS by TSS description is rejected, GSS implicitly knows that TSS has been reverted"
|
|
|
+ ", but still inject description only by pathId for safe");
|
|
|
UpsertDescription(pathId, TDescription(this, path, pathId, std::move(*record.MutableDescribeSchemeResult())));
|
|
|
return AckUpdate(ev);
|
|
|
}
|
|
|
|
|
|
- SBR_LOG_N("Replace GSS by TSS description"
|
|
|
- << ": self# " << SelfId()
|
|
|
- << ", path# " << path
|
|
|
- << ", pathId# " << pathId
|
|
|
- << ", domainId# " << domainId
|
|
|
- << ", curPathId# " << curPathId
|
|
|
- << ", curDomainId# " << curDomainId);
|
|
|
- //unlick GSS desc by path
|
|
|
+ log("Replace GSS by TSS description");
|
|
|
+ // unlick GSS desc by path
|
|
|
Descriptions.DeleteIndex(path);
|
|
|
RelinkSubscribers(desc, path);
|
|
|
UpsertDescription(path, pathId, std::move(*record.MutableDescribeSchemeResult()));
|
|
|
return AckUpdate(ev);
|
|
|
}
|
|
|
|
|
|
- if (curDomainId == pathId) { //Update from GSS, TSS->GSS
|
|
|
-
|
|
|
+ if (curDomainId == pathId) { // Update from GSS, TSS->GSS
|
|
|
// it is only because we need to manage undo of upgrade subdomain, finally remove it
|
|
|
auto abandonedSchemeShards = GetAbandonedSchemeShardIds(record.GetDescribeSchemeResult());
|
|
|
- if (abandonedSchemeShards.contains(curPathId.OwnerId)) { //GSS reverts TSS
|
|
|
- SBR_LOG_N("Replace TSS by GSS description, TSS was implicitly reverted by GSS"
|
|
|
- << ": self# " << SelfId()
|
|
|
- << ", path# " << path
|
|
|
- << ", pathId# " << pathId
|
|
|
- << ", domainId# " << domainId
|
|
|
- << ", curPathId# " << curPathId
|
|
|
- << ", curDomainId# " << curDomainId);
|
|
|
- //unlick TSS desc by path
|
|
|
+ if (abandonedSchemeShards.contains(curPathId.OwnerId)) { // GSS reverts TSS
|
|
|
+ log("Replace TSS by GSS description, TSS was implicitly reverted by GSS");
|
|
|
+ // unlick TSS desc by path
|
|
|
Descriptions.DeleteIndex(path);
|
|
|
RelinkSubscribers(desc, path);
|
|
|
UpsertDescription(path, pathId, std::move(*record.MutableDescribeSchemeResult()));
|
|
|
return AckUpdate(ev);
|
|
|
}
|
|
|
|
|
|
- SBR_LOG_N("Inject description only by pathId, it is update from GSS"
|
|
|
- << ": self# " << SelfId()
|
|
|
- << ", path# " << path
|
|
|
- << ", pathId# " << pathId
|
|
|
- << ", domainId# " << domainId
|
|
|
- << ", curPathId# " << curPathId
|
|
|
- << ", curDomainId# " << curDomainId);
|
|
|
+ log("Inject description only by pathId, it is update from GSS");
|
|
|
UpsertDescription(pathId, TDescription(this, path, pathId, std::move(*record.MutableDescribeSchemeResult())));
|
|
|
return AckUpdate(ev);
|
|
|
}
|
|
|
|
|
|
if (curDomainId == domainId) {
|
|
|
if (curPathId > pathId) {
|
|
|
- SBR_LOG_N("Totally ignore description, path with obsolete pathId"
|
|
|
- << ": self# " << SelfId()
|
|
|
- << ", path# " << path
|
|
|
- << ", pathId# " << pathId
|
|
|
- << ", domainId# " << domainId
|
|
|
- << ", curPathId# " << curPathId
|
|
|
- << ", curDomainId# " << curDomainId);
|
|
|
+ log("Totally ignore description, path with obsolete pathId");
|
|
|
return AckUpdate(ev);
|
|
|
}
|
|
|
|
|
|
if (curPathId < pathId) {
|
|
|
- SBR_LOG_N("Update description by newest path form tenant schemeshard"
|
|
|
- << ": self# " << SelfId()
|
|
|
- << ", path# " << path
|
|
|
- << ", pathId# " << pathId
|
|
|
- << ", domainId# " << domainId
|
|
|
- << ", curPathId# " << curPathId
|
|
|
- << ", curDomainId# " << curDomainId);
|
|
|
-
|
|
|
+ log("Update description by newest path form tenant schemeshard");
|
|
|
SoftDeleteDescription(desc->GetPathId());
|
|
|
Descriptions.DeleteIndex(path);
|
|
|
RelinkSubscribers(desc, path);
|
|
@@ -998,31 +956,18 @@ private:
|
|
|
UpsertDescription(path, pathId, std::move(*record.MutableDescribeSchemeResult()));
|
|
|
return AckUpdate(ev);
|
|
|
} else if (curDomainId < domainId) {
|
|
|
- SBR_LOG_N("Update description by newest path with newer domainId"
|
|
|
- << ": self# " << SelfId()
|
|
|
- << ", path# " << path
|
|
|
- << ", pathId# " << pathId
|
|
|
- << ", domainId# " << domainId
|
|
|
- << ", curPathId# " << curPathId
|
|
|
- << ", curDomainId# " << curDomainId);
|
|
|
+ log("Update description by newest path with newer domainId");
|
|
|
Descriptions.DeleteIndex(path);
|
|
|
RelinkSubscribers(desc, path);
|
|
|
UpsertDescription(path, pathId, std::move(*record.MutableDescribeSchemeResult()));
|
|
|
return AckUpdate(ev);
|
|
|
} else {
|
|
|
- SBR_LOG_N("Totally ignore description, path with obsolete domainId"
|
|
|
- << ": self# " << SelfId()
|
|
|
- << ", path# " << path
|
|
|
- << ", pathId# " << pathId
|
|
|
- << ", domainId# " << domainId
|
|
|
- << ", curPathId# " << curPathId
|
|
|
- << ", curDomainId# " << curDomainId);
|
|
|
+ log("Totally ignore description, path with obsolete domainId");
|
|
|
return AckUpdate(ev);
|
|
|
}
|
|
|
|
|
|
Y_FAIL_S("Can't insert old description, no relation between obj"
|
|
|
- << ": self# " << SelfId()
|
|
|
- << ", path# " << path
|
|
|
+ << ": path# " << path
|
|
|
<< ", pathId# " << pathId
|
|
|
<< ", domainId# " << domainId
|
|
|
<< ", curPathId# " << curPathId
|
|
@@ -1030,22 +975,17 @@ private:
|
|
|
}
|
|
|
|
|
|
void Handle(TSchemeBoardEvents::TEvCommitRequest::TPtr& ev) {
|
|
|
- const auto& record = ev->Get()->Record;
|
|
|
+ SBR_LOG_D("Handle " << ev->Get()->ToString()
|
|
|
+ << ": sender# " << ev->Sender);
|
|
|
|
|
|
+ const auto& record = ev->Get()->Record;
|
|
|
const ui64 owner = record.GetOwner();
|
|
|
const ui64 generation = record.GetGeneration();
|
|
|
|
|
|
- SBR_LOG_D("Handle TSchemeBoardEvents::TEvCommitRequest"
|
|
|
- << ": self# " << SelfId()
|
|
|
- << ", sender# " << ev->Sender
|
|
|
- << ", owner# " << owner
|
|
|
- << ", generation# " << generation);
|
|
|
-
|
|
|
auto it = Populators.find(owner);
|
|
|
if (it == Populators.end()) {
|
|
|
SBR_LOG_E("Reject commit from unknown populator"
|
|
|
- << ": self# " << SelfId()
|
|
|
- << ", sender# " << ev->Sender
|
|
|
+ << ": sender# " << ev->Sender
|
|
|
<< ", owner# " << owner
|
|
|
<< ", generation# " << generation);
|
|
|
return;
|
|
@@ -1054,8 +994,7 @@ private:
|
|
|
TPopulatorInfo& info = it->second;
|
|
|
if (generation != info.PendingGeneration) {
|
|
|
SBR_LOG_E("Reject commit from stale populator"
|
|
|
- << ": self# " << SelfId()
|
|
|
- << ", sender# " << ev->Sender
|
|
|
+ << ": sender# " << ev->Sender
|
|
|
<< ", owner# " << owner
|
|
|
<< ", generation# " << generation
|
|
|
<< ", pending generation# " << info.PendingGeneration);
|
|
@@ -1063,8 +1002,7 @@ private:
|
|
|
}
|
|
|
|
|
|
SBR_LOG_N("Commit generation"
|
|
|
- << ": self# " << SelfId()
|
|
|
- << ", owner# " << owner
|
|
|
+ << ": owner# " << owner
|
|
|
<< ", generation# " << generation);
|
|
|
|
|
|
info.Generation = info.PendingGeneration;
|
|
@@ -1076,25 +1014,19 @@ private:
|
|
|
}
|
|
|
|
|
|
void Handle(TEvPrivate::TEvSendStrongNotifications::TPtr& ev) {
|
|
|
- const auto limit = ev->Get()->BatchSize;
|
|
|
- const auto owner = ev->Get()->Owner;
|
|
|
-
|
|
|
- SBR_LOG_D("Handle TEvPrivate::TEvSendStrongNotifications"
|
|
|
- << ": self# " << SelfId()
|
|
|
- << ", owner# " << owner);
|
|
|
+ SBR_LOG_D("Handle " << ev->Get()->ToString());
|
|
|
|
|
|
+ const auto owner = ev->Get()->Owner;
|
|
|
if (!IsPopulatorCommited(owner)) {
|
|
|
SBR_LOG_N("Populator is not commited"
|
|
|
- << ": self# " << SelfId()
|
|
|
- << ", owner# " << owner);
|
|
|
+ << ": owner# " << owner);
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
auto itSubscribers = WaitStrongNotifications.find(owner);
|
|
|
if (itSubscribers == WaitStrongNotifications.end()) {
|
|
|
SBR_LOG_E("Invalid owner"
|
|
|
- << ": self# " << SelfId()
|
|
|
- << ", owner# " << owner);
|
|
|
+ << ": owner# " << owner);
|
|
|
return;
|
|
|
}
|
|
|
|
|
@@ -1102,6 +1034,7 @@ private:
|
|
|
auto it = subscribers.begin();
|
|
|
ui32 count = 0;
|
|
|
|
|
|
+ const auto limit = ev->Get()->BatchSize;
|
|
|
while (count++ < limit && it != subscribers.end()) {
|
|
|
const TActorId subscriber = *it;
|
|
|
it = subscribers.erase(it);
|
|
@@ -1141,15 +1074,12 @@ private:
|
|
|
|
|
|
void Handle(TSchemeBoardEvents::TEvSubscribe::TPtr& ev) {
|
|
|
const auto& record = ev->Get()->Record;
|
|
|
-
|
|
|
- SBR_LOG_D("Handle TSchemeBoardEvents::TEvSubscribe"
|
|
|
- << ": self# " << SelfId()
|
|
|
- << ", sender# " << ev->Sender
|
|
|
- << ", record# " << record.ShortDebugString());
|
|
|
-
|
|
|
const ui64 domainOwnerId = record.GetDomainOwnerId();
|
|
|
const auto& capabilities = record.GetCapabilities();
|
|
|
|
|
|
+ SBR_LOG_D("Handle " << ev->Get()->ToString()
|
|
|
+ << ": sender# " << ev->Sender);
|
|
|
+
|
|
|
if (record.HasPath()) {
|
|
|
SubscribeBy(ev->Sender, record.GetPath(), domainOwnerId, capabilities);
|
|
|
} else {
|
|
@@ -1161,10 +1091,8 @@ private:
|
|
|
void Handle(TSchemeBoardEvents::TEvUnsubscribe::TPtr& ev) {
|
|
|
const auto& record = ev->Get()->Record;
|
|
|
|
|
|
- SBR_LOG_D("Handle TSchemeBoardEvents::TEvUnsubscribe"
|
|
|
- << ": self# " << SelfId()
|
|
|
- << ", sender# " << ev->Sender
|
|
|
- << ", record# " << record.ShortDebugString());
|
|
|
+ SBR_LOG_D("Handle " << ev->Get()->ToString()
|
|
|
+ << ": sender# " << ev->Sender);
|
|
|
|
|
|
if (record.HasPath()) {
|
|
|
UnsubscribeBy(ev->Sender, record.GetPath());
|
|
@@ -1177,10 +1105,8 @@ private:
|
|
|
void Handle(TSchemeBoardEvents::TEvNotifyAck::TPtr& ev) {
|
|
|
const auto& record = ev->Get()->Record;
|
|
|
|
|
|
- SBR_LOG_D("Handle TSchemeBoardEvents::TEvNotifyAck"
|
|
|
- << ": self# " << SelfId()
|
|
|
- << ", sender# " << ev->Sender
|
|
|
- << ", record# " << record.ShortDebugString());
|
|
|
+ SBR_LOG_D("Handle " << ev->Get()->ToString()
|
|
|
+ << ": sender# " << ev->Sender);
|
|
|
|
|
|
auto it = Subscribers.find(ev->Sender);
|
|
|
if (it == Subscribers.end()) {
|
|
@@ -1217,11 +1143,9 @@ private:
|
|
|
void Handle(TSchemeBoardEvents::TEvSyncVersionRequest::TPtr& ev) {
|
|
|
const auto& record = ev->Get()->Record;
|
|
|
|
|
|
- SBR_LOG_D("Handle TSchemeBoardEvents::TEvSyncVersionRequest"
|
|
|
- << ": self# " << SelfId()
|
|
|
- << ", sender# " << ev->Sender
|
|
|
- << ", cookie# " << ev->Cookie
|
|
|
- << ", record# " << record.ShortDebugString());
|
|
|
+ SBR_LOG_D("Handle " << ev->Get()->ToString()
|
|
|
+ << ": sender# " << ev->Sender
|
|
|
+ << ", cookie# " << ev->Cookie);
|
|
|
|
|
|
auto it = Subscribers.find(ev->Sender);
|
|
|
if (it == Subscribers.end()) {
|
|
@@ -1330,9 +1254,8 @@ private:
|
|
|
void Handle(TEvInterconnect::TEvNodeDisconnected::TPtr& ev) {
|
|
|
const ui32 nodeId = ev->Get()->NodeId;
|
|
|
|
|
|
- SBR_LOG_D("Handle TEvInterconnect::TEvNodeDisconnected"
|
|
|
- << ": self# " << SelfId()
|
|
|
- << ", nodeId# " << nodeId);
|
|
|
+ SBR_LOG_D("Handle " << ev->Get()->ToString()
|
|
|
+ << ": nodeId# " << nodeId);
|
|
|
|
|
|
auto it = Subscribers.lower_bound(TActorId(nodeId, 0, 0, 0));
|
|
|
while (it != Subscribers.end() && it->first.NodeId() == nodeId) {
|