Browse Source

Improove tablet generation value in Topic protocol (#2375)

Nikolay Shestakov 1 year ago
parent
commit
0e0399a20d

+ 0 - 1
.github/config/muted_ya.txt

@@ -57,7 +57,6 @@ ydb/services/persqueue_v1/ut TPersQueueTest.CheckACLForGrpcWrite
 ydb/services/persqueue_v1/ut TPersQueueTest.DirectRead*
 ydb/services/persqueue_v1/ut TPersQueueTest.DirectRead*
 ydb/services/persqueue_v1/ut TPersQueueTest.SetupLockSession
 ydb/services/persqueue_v1/ut TPersQueueTest.SetupLockSession
 ydb/services/persqueue_v1/ut TPersQueueTest.TopicServiceCommitOffsetBadOffsets
 ydb/services/persqueue_v1/ut TPersQueueTest.TopicServiceCommitOffsetBadOffsets
-ydb/services/persqueue_v1/ut TPersQueueTest.UpdatePartitionLocation
 ydb/services/persqueue_v1/ut TPQCompatTest.BadTopics
 ydb/services/persqueue_v1/ut TPQCompatTest.BadTopics
 ydb/services/persqueue_v1/ut [3/10]*
 ydb/services/persqueue_v1/ut [3/10]*
 ydb/services/ydb/sdk_sessions_pool_ut YdbSdkSessionsPool.StressTestSync*
 ydb/services/ydb/sdk_sessions_pool_ut YdbSdkSessionsPool.StressTestSync*

+ 1 - 1
ydb/core/persqueue/partition.cpp

@@ -520,7 +520,7 @@ void TPartition::InitComplete(const TActorContext& ctx) {
 
 
     LOG_INFO_S(
     LOG_INFO_S(
             ctx, NKikimrServices::PERSQUEUE,
             ctx, NKikimrServices::PERSQUEUE,
-            "init complete for topic '" << TopicName() << "' partition " << Partition << " " << ctx.SelfID
+            "init complete for topic '" << TopicName() << "' partition " << Partition << " generation " << TabletGeneration << " " << ctx.SelfID
     );
     );
 
 
     TStringBuilder ss;
     TStringBuilder ss;

+ 3 - 4
ydb/services/persqueue_v1/actors/partition_actor.cpp

@@ -593,7 +593,7 @@ void TPartitionActor::Handle(TEvPersQueue::TEvResponse::TPtr& ev, const TActorCo
 
 
 
 
         if (!StartReading) {
         if (!StartReading) {
-            ctx.Send(ParentId, new TEvPQProxy::TEvPartitionStatus(Partition, CommittedOffset, EndOffset, WriteTimestampEstimateMs, TabletGeneration, NodeId));
+            ctx.Send(ParentId, new TEvPQProxy::TEvPartitionStatus(Partition, CommittedOffset, EndOffset, WriteTimestampEstimateMs, NodeId, TabletGeneration));
         } else {
         } else {
             InitStartReading(ctx);
             InitStartReading(ctx);
         }
         }
@@ -802,15 +802,14 @@ void TPartitionActor::Handle(TEvTabletPipe::TEvClientConnected::TPtr& ev, const
     TEvTabletPipe::TEvClientConnected *msg = ev->Get();
     TEvTabletPipe::TEvClientConnected *msg = ev->Get();
 
 
     LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " " << Partition
     LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " " << Partition
-                            << " pipe restart attempt " << PipeGeneration << " pipe creation result: " << msg->Status);
+                            << " pipe restart attempt " << PipeGeneration << " pipe creation result: " << msg->Status
+                            << " TabletId: " << msg->TabletId << " Generation: " << msg->Generation);
 
 
     if (msg->Status != NKikimrProto::OK) {
     if (msg->Status != NKikimrProto::OK) {
         RestartPipe(ctx, TStringBuilder() << "pipe to tablet is dead " << msg->TabletId, NPersQueue::NErrorCode::TABLET_PIPE_DISCONNECTED);
         RestartPipe(ctx, TStringBuilder() << "pipe to tablet is dead " << msg->TabletId, NPersQueue::NErrorCode::TABLET_PIPE_DISCONNECTED);
         return;
         return;
     }
     }
 
 
-    auto prevGeneration = TabletGeneration;
-    Y_UNUSED(prevGeneration);
     TabletGeneration = msg->Generation;
     TabletGeneration = msg->Generation;
     NodeId = msg->ServerId.NodeId();
     NodeId = msg->ServerId.NodeId();
 
 

+ 4 - 4
ydb/services/persqueue_v1/persqueue_ut.cpp

@@ -716,7 +716,7 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {
         TPersQueueV1TestServer server;
         TPersQueueV1TestServer server;
         SET_LOCALS;
         SET_LOCALS;
         MAKE_INSECURE_STUB(Ydb::Topic::V1::TopicService);
         MAKE_INSECURE_STUB(Ydb::Topic::V1::TopicService);
-        server.EnablePQLogs({ NKikimrServices::PQ_METACACHE, NKikimrServices::PQ_READ_PROXY});
+        server.EnablePQLogs({ NKikimrServices::PQ_METACACHE, NKikimrServices::PQ_READ_PROXY, NKikimrServices::PERSQUEUE});
         server.EnablePQLogs({ NKikimrServices::KQP_PROXY }, NLog::EPriority::PRI_EMERG);
         server.EnablePQLogs({ NKikimrServices::KQP_PROXY }, NLog::EPriority::PRI_EMERG);
         server.EnablePQLogs({ NKikimrServices::FLAT_TX_SCHEMESHARD }, NLog::EPriority::PRI_ERROR);
         server.EnablePQLogs({ NKikimrServices::FLAT_TX_SCHEMESHARD }, NLog::EPriority::PRI_ERROR);
 
 
@@ -742,8 +742,8 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {
         }
         }
 
 
         // await and confirm CreatePartitionStreamRequest from server
         // await and confirm CreatePartitionStreamRequest from server
-        i64 assignId = 0;
-        i64 generation = 0;
+        i64 assignId;
+        i64 generation;
         {
         {
             Ydb::Topic::StreamReadMessage::FromServer resp;
             Ydb::Topic::StreamReadMessage::FromServer resp;
 
 
@@ -756,8 +756,8 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {
             UNIT_ASSERT_VALUES_EQUAL(resp.start_partition_session_request().partition_session().path(), "acc/topic1");
             UNIT_ASSERT_VALUES_EQUAL(resp.start_partition_session_request().partition_session().path(), "acc/topic1");
             UNIT_ASSERT(resp.start_partition_session_request().partition_session().partition_id() == 0);
             UNIT_ASSERT(resp.start_partition_session_request().partition_session().partition_id() == 0);
             UNIT_ASSERT(resp.start_partition_session_request().partition_location().generation() > 0);
             UNIT_ASSERT(resp.start_partition_session_request().partition_location().generation() > 0);
-            generation = resp.start_partition_session_request().partition_location().generation();
             assignId = resp.start_partition_session_request().partition_session().partition_session_id();
             assignId = resp.start_partition_session_request().partition_session().partition_session_id();
+            generation = resp.start_partition_session_request().partition_location().generation();
         }
         }
 
 
         server.Server->AnnoyingClient->RestartPartitionTablets(server.Server->CleverServer->GetRuntime(), "rt3.dc1--acc--topic1");
         server.Server->AnnoyingClient->RestartPartitionTablets(server.Server->CleverServer->GetRuntime(), "rt3.dc1--acc--topic1");