Browse Source

Add deduplication options checks (#2254)

FloatingCrowbar 1 year ago
parent
commit
2d5f041a76

+ 8 - 12
ydb/core/persqueue/writer/writer.cpp

@@ -33,7 +33,7 @@ namespace NKikimr::NPQ {
 #define INFO(message)  LOG_INFO_S(*NActors::TlsActivationContext, NKikimrServices::PQ_WRITE_PROXY, LOG_PREFIX << message);
 #define ERROR(message) LOG_ERROR_S(*NActors::TlsActivationContext, NKikimrServices::PQ_WRITE_PROXY, LOG_PREFIX << message);
 
-static const ui64 WRITE_BLOCK_SIZE = 4_KB;    
+static const ui64 WRITE_BLOCK_SIZE = 4_KB;
 
 TString TEvPartitionWriter::TEvInitResult::TSuccess::ToString() const {
     auto out = TStringBuilder() << "Success {"
@@ -106,7 +106,7 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter>, private TRl
     using EErrorCode = TEvPartitionWriter::TEvWriteResponse::EErrorCode;
 
     static constexpr size_t MAX_QUOTA_INFLIGHT = 3;
-    
+
     static void FillHeader(NKikimrClient::TPersQueuePartitionRequest& request,
             ui32 partitionId, const TActorId& pipeClient)
     {
@@ -292,11 +292,7 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter>, private TRl
 
         auto& request = *ev->Record.MutablePartitionRequest();
         auto& cmd = *request.MutableCmdGetOwnership();
-        if (Opts.UseDeduplication) {
-            cmd.SetOwner(SourceId);
-        } else {
-            cmd.SetOwner(CreateGuidAsString());
-        }
+        cmd.SetOwner(SourceId);
         cmd.SetForce(true);
 
         SetWriteId(request);
@@ -742,16 +738,16 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter>, private TRl
                 ReceivedQuota.insert(ReceivedQuota.end(), PendingQuota.begin(), PendingQuota.end());
                 PendingQuota.clear();
 
-                ProcessQuotaAndWrite();                
+                ProcessQuotaAndWrite();
 
                 break;
 
             case EWakeupTag::RlNoResource:
-                // Re-requesting the quota. We do this until we get a quota. 
+                // Re-requesting the quota. We do this until we get a quota.
                 // We do not request a quota with a long waiting time because the writer may already be a destroyer, and the quota will still be waiting to be received.
                 RequestDataQuota(PendingQuotaAmount, ctx);
                 break;
-            
+
             default:
                 Y_VERIFY_DEBUG_S(false, "Unsupported tag: " << static_cast<ui64>(tag));
         }
@@ -772,7 +768,7 @@ public:
         , TabletId(tabletId)
         , PartitionId(partitionId)
         , ExpectedGeneration(opts.ExpectedGeneration)
-        , SourceId(opts.SourceId)
+        , SourceId(opts.UseDeduplication ? opts.SourceId : CreateGuidAsString())
         , Opts(opts)
     {
         if (Opts.MeteringMode) {
@@ -858,7 +854,7 @@ private:
 IActor* CreatePartitionWriter(const TActorId& client,
                              // const NKikimrSchemeOp::TPersQueueGroupDescription& config,
                               ui64 tabletId,
-                              ui32 partitionId, 
+                              ui32 partitionId,
                               const TPartitionWriterOpts& opts) {
     return new TPartitionWriter(client, tabletId, partitionId, opts);
 }

+ 0 - 1
ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.h

@@ -317,7 +317,6 @@ private:
     };
 
     THandleResult OnErrorImpl(NYdb::TPlainStatus&& status); // true - should Start(), false - should Close(), empty - no action
-
 public:
     TWriteSessionImpl(const TWriteSessionSettings& settings,
             std::shared_ptr<TTopicClient::TImpl> client,

+ 23 - 3
ydb/public/sdk/cpp/client/ydb_topic/ut/basic_usage_ut.cpp

@@ -697,9 +697,29 @@ Y_UNIT_TEST_SUITE(BasicUsage) {
         UNIT_ASSERT_VALUES_EQUAL(stats->GetEndOffset(), count);
 
     }
+} // Y_UNIT_TEST_SUITE(BasicUsage)
 
+Y_UNIT_TEST_SUITE(TSettingsValidation) {
+    Y_UNIT_TEST(TWriteSessionProducerSettings) {
+        TTopicSdkTestSetup setup(TEST_CASE_NAME);
+        TTopicClient client = setup.MakeClient();
 
+        {
+            auto writeSettings = TWriteSessionSettings()
+                        .Path(TEST_TOPIC)
+                        .ProducerId("something")
+                        .DeduplicationEnabled(false);
+            try {
+                auto writeSession = client.CreateWriteSession(writeSettings);
+                auto event = writeSession->GetEvent(true);
+                UNIT_ASSERT(event.Defined());
+                auto* closedEvent = std::get_if<TSessionClosedEvent>(&event.GetRef());
+                UNIT_ASSERT(closedEvent);
+            } catch (NYdb::TContractViolation&) {
+                //pass
+            }
+        }
+    }
+} // Y_UNIT_TEST_SUITE(TSettingsValidation)
 
-}
-
-}
+} // namespace

+ 15 - 9
ydb/services/persqueue_v1/actors/write_session_actor.ipp

@@ -389,11 +389,12 @@ void TWriteSessionActor<UseMigrationProtocol>::Handle(typename TEvWriteInit::TPt
         //      1.2. non-empty partition_id (explicit partitioning)
         //      1.3. non-empty partition_with_generation (explicit partitioning && direct write to partition host)
         //    2. Empty producer id (no deduplication, partition is selected using round-robin).
-        bool isScenarioSupported = 
+        bool isScenarioSupported =
             !InitRequest.producer_id().empty() && (
-                InitRequest.has_message_group_id() && InitRequest.message_group_id() == InitRequest.producer_id() || 
+                InitRequest.has_message_group_id() && InitRequest.message_group_id() == InitRequest.producer_id() ||
                 InitRequest.has_partition_id() ||
-                InitRequest.has_partition_with_generation()) ||
+                InitRequest.has_partition_with_generation())
+            ||
             InitRequest.producer_id().empty();
 
         if (!isScenarioSupported) {
@@ -424,7 +425,6 @@ void TWriteSessionActor<UseMigrationProtocol>::Handle(typename TEvWriteInit::TPt
             return InitRequest.has_message_group_id() ? InitRequest.message_group_id() : InitRequest.producer_id();
         }
     }();
-
     LOG_INFO_S(ctx, NKikimrServices::PQ_WRITE_PROXY, "session request cookie: " << Cookie << " " << InitRequest.ShortDebugString() << " from " << PeerName);
     if (!UseDeduplication) {
         LOG_DEBUG_S(ctx, NKikimrServices::PQ_WRITE_PROXY, "session request cookie: " << Cookie << ". Disable deduplication for empty producer id");
@@ -467,8 +467,9 @@ template<bool UseMigrationProtocol>
 void TWriteSessionActor<UseMigrationProtocol>::InitAfterDiscovery(const TActorContext& ctx) {
     Y_UNUSED(ctx);
 
-    if (SourceId.empty()) {
-        Y_ABORT_UNLESS(!UseDeduplication);
+    if (SourceId.empty() && UseDeduplication) {
+        CloseSession("Internal server error: got empty SourceId with enabled deduplication", PersQueue::ErrorCode::ERROR, ctx);
+        return;
     }
 
     InitMeta = GetInitialDataChunk(InitRequest, FullConverter->GetClientsideName(), PeerName); // ToDo[migration] - check?
@@ -835,9 +836,14 @@ void TWriteSessionActor<UseMigrationProtocol>::Handle(NPQ::TEvPartitionWriter::T
     OwnerCookie = result.GetResult().OwnerCookie;
 
     const auto& maxSeqNo = result.GetResult().SourceIdInfo.GetSeqNo();
-    if (!UseDeduplication) {
-        Y_ABORT_UNLESS(maxSeqNo == 0);
-    }
+
+    // ToDo: uncomment after fixing KIKIMR-21124
+    // if (!UseDeduplication) {
+    //     if (maxSeqNo != 0) {
+    //         return CloseSession("Internal server error: have maxSeqNo != with deduplication disabled",
+    //                             PersQueue::ErrorCode::ERROR, ctx);
+    //     }
+    // }
 
     OwnerCookie = result.GetResult().OwnerCookie;
     MakeAndSentInitResponse(maxSeqNo, ctx);

+ 49 - 0
ydb/services/persqueue_v1/persqueue_ut.cpp

@@ -6699,6 +6699,55 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {
         }
     }
 
+    Y_UNIT_TEST(DisableWrongSettings) {
+        NPersQueue::TTestServer server;
+        server.EnableLogs({NKikimrServices::PQ_READ_PROXY, NKikimrServices::BLACKBOX_VALIDATOR });
+        server.EnableLogs({NKikimrServices::PERSQUEUE}, NActors::NLog::EPriority::PRI_INFO);
+        TString topicFullName = "rt3.dc1--acc--topic1";
+        auto driver = SetupTestAndGetDriver(server, topicFullName, 3);
+
+        std::shared_ptr<grpc::Channel> Channel_;
+        std::unique_ptr<Ydb::Topic::V1::TopicService::Stub> TopicStubP_;
+        {
+            Channel_ = grpc::CreateChannel("localhost:" + ToString(server.GrpcPort), grpc::InsecureChannelCredentials());
+            TopicStubP_ = Ydb::Topic::V1::TopicService::NewStub(Channel_);
+        }
+
+        {
+            grpc::ClientContext rcontext1;
+            auto writeStream1 = TopicStubP_->StreamWrite(&rcontext1);
+            UNIT_ASSERT(writeStream1);
+            Ydb::Topic::StreamWriteMessage::FromClient req;
+            Ydb::Topic::StreamWriteMessage::FromServer resp;
+
+            req.mutable_init_request()->set_path("acc/topic1");
+            req.mutable_init_request()->set_message_group_id("some-group");
+            if (!writeStream1->Write(req)) {
+                ythrow yexception() << "write fail";
+            }
+            UNIT_ASSERT(writeStream1->Read(&resp));
+            Cerr << "===Got response: " << resp.ShortDebugString() << Endl;
+            UNIT_ASSERT(resp.status() == Ydb::StatusIds::BAD_REQUEST);
+        }
+        {
+            grpc::ClientContext rcontext1;
+            auto writeStream1 = TopicStubP_->StreamWrite(&rcontext1);
+            UNIT_ASSERT(writeStream1);
+            Ydb::Topic::StreamWriteMessage::FromClient req;
+            Ydb::Topic::StreamWriteMessage::FromServer resp;
+
+            req.mutable_init_request()->set_path("acc/topic1");
+            req.mutable_init_request()->set_message_group_id("some-group");
+            req.mutable_init_request()->set_producer_id("producer");
+            if (!writeStream1->Write(req)) {
+                ythrow yexception() << "write fail";
+            }
+            UNIT_ASSERT(writeStream1->Read(&resp));
+            Cerr << "===Got response: " << resp.ShortDebugString() << Endl;
+            UNIT_ASSERT(resp.status() == Ydb::StatusIds::BAD_REQUEST);
+        }
+    }
+
     Y_UNIT_TEST(DisableDeduplication) {
         NPersQueue::TTestServer server;
         TString topicFullName = "rt3.dc1--topic1";