|
@@ -31,18 +31,14 @@ static const ui32 MAX_INLINE_SIZE = 1000;
|
|
|
|
|
|
static constexpr NPersQueue::NErrorCode::EErrorCode InactivePartitionErrorCode = NPersQueue::NErrorCode::WRITE_ERROR_PARTITION_IS_FULL;
|
|
|
|
|
|
-void TPartition::ReplyOwnerOk(const TActorContext& ctx, const ui64 dst, const TString& cookie, ui64 seqNo) {
|
|
|
+void TPartition::ReplyOwnerOk(const TActorContext& ctx, const ui64 dst, const TString& cookie) {
|
|
|
LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "TPartition::ReplyOwnerOk. Partition: " << Partition);
|
|
|
|
|
|
THolder<TEvPQ::TEvProxyResponse> response = MakeHolder<TEvPQ::TEvProxyResponse>(dst);
|
|
|
NKikimrClient::TResponse& resp = *response->Response;
|
|
|
resp.SetStatus(NMsgBusProxy::MSTATUS_OK);
|
|
|
resp.SetErrorCode(NPersQueue::NErrorCode::OK);
|
|
|
- auto* r = resp.MutablePartitionResponse()->MutableCmdGetOwnershipResult();
|
|
|
- r->SetOwnerCookie(cookie);
|
|
|
- r->SetStatus(PartitionConfig ? PartitionConfig->GetStatus() : NKikimrPQ::ETopicPartitionStatus::Active);
|
|
|
- r->SetSeqNo(seqNo);
|
|
|
-
|
|
|
+ resp.MutablePartitionResponse()->MutableCmdGetOwnershipResult()->SetOwnerCookie(cookie);
|
|
|
ctx.Send(Tablet, response.Release());
|
|
|
}
|
|
|
|
|
@@ -150,12 +146,8 @@ void TPartition::ProcessChangeOwnerRequest(TAutoPtr<TEvPQ::TEvChangeOwner> ev, c
|
|
|
auto &owner = ev->Owner;
|
|
|
auto it = Owners.find(owner);
|
|
|
if (it == Owners.end()) {
|
|
|
- if (ev->RegisterIfNotExists) {
|
|
|
- Owners[owner];
|
|
|
- it = Owners.find(owner);
|
|
|
- } else {
|
|
|
- return ReplyError(ctx, ev->Cookie, NPersQueue::NErrorCode::SOURCEID_DELETED, "SourceId isn't registered");
|
|
|
- }
|
|
|
+ Owners[owner];
|
|
|
+ it = Owners.find(owner);
|
|
|
}
|
|
|
if (it->second.NeedResetOwner || ev->Force) { //change owner
|
|
|
Y_ABORT_UNLESS(ReservedSize >= it->second.ReservedSize);
|
|
@@ -354,13 +346,10 @@ void TPartition::AnswerCurrentWrites(const TActorContext& ctx) {
|
|
|
if (!already && partNo + 1 == totalParts && !writeResponse.Msg.HeartbeatVersion)
|
|
|
++offset;
|
|
|
} else if (response.IsOwnership()) {
|
|
|
- const auto& r = response.GetOwnership();
|
|
|
- const TString& ownerCookie = r.OwnerCookie;
|
|
|
+ const TString& ownerCookie = response.GetOwnership().OwnerCookie;
|
|
|
auto it = Owners.find(TOwnerInfo::GetOwnerFromOwnerCookie(ownerCookie));
|
|
|
if (it != Owners.end() && it->second.OwnerCookie == ownerCookie) {
|
|
|
- auto sit = SourceIdStorage.GetInMemorySourceIds().find(NSourceIdEncoding::EncodeSimple(it->first));
|
|
|
- auto seqNo = sit == SourceIdStorage.GetInMemorySourceIds().end() ? 0 : sit->second.SeqNo;
|
|
|
- ReplyOwnerOk(ctx, response.GetCookie(), ownerCookie, seqNo);
|
|
|
+ ReplyOwnerOk(ctx, response.GetCookie(), ownerCookie);
|
|
|
} else {
|
|
|
ReplyError(ctx, response.GetCookie(), NPersQueue::NErrorCode::WRONG_COOKIE, "new GetOwnership request is dropped already");
|
|
|
}
|