Browse Source

Handle RaiseError calls in OffsetFetchActor

sergeyveselov 1 year ago
parent
commit
b8c6f928ac

+ 62 - 30
ydb/core/kafka_proxy/actors/kafka_offset_fetch_actor.cpp

@@ -41,8 +41,8 @@ class TTopicOffsetActor: public NKikimr::NGRpcProxy::V1::TPQInternalSchemaActor<
             std::shared_ptr<TSet<ui32>> partitions)
         : TBase(request, requester)
         , TDescribeTopicActorImpl(ConsumerOffsetSettings(consumers, partitions))
-        , Requester_(requester)
-        , TopicName_(request.Topic)
+        , Requester(requester)
+        , TopicName(request.Topic)
         {
             Y_UNUSED(requester);
         };
@@ -63,7 +63,6 @@ class TTopicOffsetActor: public NKikimr::NGRpcProxy::V1::TPQInternalSchemaActor<
         }
     }
 
-    // Noop
     void RaiseError(const TString& error,
             const Ydb::PersQueue::ErrorCode::ErrorCode errorCode,
             const Ydb::StatusIds::StatusCode status,
@@ -72,6 +71,12 @@ class TTopicOffsetActor: public NKikimr::NGRpcProxy::V1::TPQInternalSchemaActor<
         Y_UNUSED(errorCode);
         Y_UNUSED(status);
         Y_UNUSED(ctx);
+
+        THolder<TEvKafka::TEvCommitedOffsetsResponse> response(new TEvKafka::TEvCommitedOffsetsResponse());
+        response->TopicName = TopicName;
+        response->Status = TEvKafka::TEvCommitedOffsetsResponse::EStatus::ERROR;
+        Send(Requester, response.Release());
+        Die(ctx);
     }
 
     void ApplyResponse(TTabletInfo& tabletInfo,
@@ -86,7 +91,7 @@ class TTopicOffsetActor: public NKikimr::NGRpcProxy::V1::TPQInternalSchemaActor<
                     consumerToOffset[consumerResult.GetConsumer()] = consumerResult.GetCommitedOffset();
                 }
             }
-            (*PartitionIdToOffsets_)[partResult.GetPartition()] = consumerToOffset;
+            (*PartitionIdToOffsets)[partResult.GetPartition()] = consumerToOffset;
         }
     };
 
@@ -99,17 +104,24 @@ class TTopicOffsetActor: public NKikimr::NGRpcProxy::V1::TPQInternalSchemaActor<
         Y_UNUSED(ev);
     };
 
-    // Noop
+    // Should never be called
     bool ApplyResponse(NKikimr::TEvPersQueue::TEvGetPartitionsLocationResponse::TPtr& ev,
                        const TActorContext& ctx) override {
         Y_UNUSED(ctx);
         Y_UNUSED(ev);
-        return true;
+        Y_ABORT();
     };
 
     void HandleCacheNavigateResponse(NKikimr::TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) override {
         const auto& response = ev->Get()->Request.Get()->ResultSet.front();
-        Y_ABORT_UNLESS(response.PQGroupInfo);
+        if (!response.PQGroupInfo) {
+            THolder<TEvKafka::TEvCommitedOffsetsResponse> response(new TEvKafka::TEvCommitedOffsetsResponse());
+            response->TopicName = TopicName;
+            response->Status = TEvKafka::TEvCommitedOffsetsResponse::EStatus::UNKNOWN_TOPIC;
+            Send(Requester, response.Release());
+            TActorBootstrapped::PassAway();
+            return;
+        }
 
         const auto& pqDescr = response.PQGroupInfo->Description;
         ProcessTablets(pqDescr, ActorContext());
@@ -117,17 +129,18 @@ class TTopicOffsetActor: public NKikimr::NGRpcProxy::V1::TPQInternalSchemaActor<
 
     void Reply(const TActorContext& ctx) override {
         THolder<TEvKafka::TEvCommitedOffsetsResponse> response(new TEvKafka::TEvCommitedOffsetsResponse());
-        response->TopicName = TopicName_;
-        response->PartitionIdToOffsets = PartitionIdToOffsets_;
-        Send(Requester_, response.Release());
+        response->TopicName = TopicName;
+        response->Status = TEvKafka::TEvCommitedOffsetsResponse::EStatus::OK;
+        response->PartitionIdToOffsets = PartitionIdToOffsets;
+        Send(Requester, response.Release());
         Die(ctx);
     };
 
     private:
-        TActorId Requester_;
-        TString TopicName_;
-        std::unordered_map<ui32, ui32> PartitionIdToOffset_ {};
-        std::shared_ptr<std::unordered_map<ui32, std::unordered_map<TString, ui32>>> PartitionIdToOffsets_ = std::make_shared<std::unordered_map<ui32, std::unordered_map<TString, ui32>>>();
+        TActorId Requester;
+        TString TopicName;
+        std::unordered_map<ui32, ui32> PartitionIdToOffset {};
+        std::shared_ptr<std::unordered_map<ui32, std::unordered_map<TString, ui32>>> PartitionIdToOffsets = std::make_shared<std::unordered_map<ui32, std::unordered_map<TString, ui32>>>();
 };
 
 NActors::IActor* CreateKafkaOffsetFetchActor(const TContext::TPtr context, const ui64 correlationId, const TMessagePtr<TOffsetFetchRequestData>& message) {
@@ -143,7 +156,27 @@ TOffsetFetchResponseData::TPtr TKafkaOffsetFetchActor::GetOffsetFetchResponse()
             TOffsetFetchResponseData::TOffsetFetchResponseGroup::TOffsetFetchResponseTopics topic;
             TString topicName = requestTopic.Name.value();
             topic.Name = topicName;
-            auto partitionsToOffsets = TopicToOffsets_[topicName];
+            if (UnknownTopics.contains(topicName)) {
+                for (auto requestPartition: requestTopic.PartitionIndexes) {
+                    TOffsetFetchResponseData::TOffsetFetchResponseGroup::TOffsetFetchResponseTopics::TOffsetFetchResponsePartitions partition;
+                    partition.PartitionIndex = requestPartition;
+                    partition.ErrorCode = UNKNOWN_TOPIC_OR_PARTITION;
+                    topic.Partitions.push_back(partition);
+                }
+                group.Topics.push_back(topic);
+                continue;
+            }
+            if (ErroredTopics.contains(topicName)) {
+                for (auto requestPartition: requestTopic.PartitionIndexes) {
+                    TOffsetFetchResponseData::TOffsetFetchResponseGroup::TOffsetFetchResponseTopics::TOffsetFetchResponsePartitions partition;
+                    partition.PartitionIndex = requestPartition;
+                    partition.ErrorCode = UNKNOWN_SERVER_ERROR;
+                    topic.Partitions.push_back(partition);
+                }
+                group.Topics.push_back(topic);
+                continue;
+            }
+            auto partitionsToOffsets = TopicToOffsets[topicName];
             for (auto requestPartition: requestTopic.PartitionIndexes) {
                 TOffsetFetchResponseData::TOffsetFetchResponseGroup::TOffsetFetchResponseTopics::TOffsetFetchResponsePartitions partition;
                 partition.PartitionIndex = requestPartition;
@@ -169,12 +202,12 @@ TOffsetFetchResponseData::TPtr TKafkaOffsetFetchActor::GetOffsetFetchResponse()
                 partition.PartitionIndex = sourcePartition.PartitionIndex;
                 partition.ErrorCode = sourcePartition.ErrorCode;
             }
+            response->Topics.push_back(topic);
         }
     }
     return response;
 }
 
-
 void TKafkaOffsetFetchActor::Bootstrap(const NActors::TActorContext& ctx) {
     // If API level <= 7, Groups would be empty. In this case we convert message to level 8 and process it uniformely later
     if (Message->Groups.empty()) {
@@ -196,7 +229,7 @@ void TKafkaOffsetFetchActor::Bootstrap(const NActors::TActorContext& ctx) {
         }
     }
 
-    for (const auto& topicToEntities : TopicToEntities_) {
+    for (const auto& topicToEntities : TopicToEntities) {
         NKikimr::NGRpcProxy::V1::TGetPartitionsLocationRequest locationRequest{};
         locationRequest.Topic = topicToEntities.first;
         locationRequest.Token = Context->UserToken->GetSerializedToken();
@@ -207,7 +240,7 @@ void TKafkaOffsetFetchActor::Bootstrap(const NActors::TActorContext& ctx) {
             SelfId(),
             topicToEntities.second.Partitions
         ));
-        InflyTopics_++;
+        InflyTopics++;
     }
     Become(&TKafkaOffsetFetchActor::StateWork);
 }
@@ -215,27 +248,26 @@ void TKafkaOffsetFetchActor::Bootstrap(const NActors::TActorContext& ctx) {
 
 void TKafkaOffsetFetchActor::ExtractPartitions(const TString& group, const NKafka::TOffsetFetchRequestData::TOffsetFetchRequestGroup::TOffsetFetchRequestTopics& topic) {
     TString topicName = topic.Name.value();
-    if (!TopicToEntities_.contains(topicName)) {
+    if (!TopicToEntities.contains(topicName)) {
         TopicEntities newEntities;
-        TopicToEntities_[topicName] = newEntities;
+        TopicToEntities[topicName] = newEntities;
     }
-    TopicEntities& entities = TopicToEntities_[topicName];
+    TopicEntities& entities = TopicToEntities[topicName];
     entities.Consumers->insert(group);
     for (auto partition: topic.PartitionIndexes) {
         entities.Partitions->insert(partition);
     }
 };
 
-void TKafkaOffsetFetchActor::StateWork(TAutoPtr<IEventHandle>& ev) {
-    switch (ev->GetTypeRewrite()) {
-        HFunc(TEvKafka::TEvCommitedOffsetsResponse, Handle);
-    }
-}
-
 void TKafkaOffsetFetchActor::Handle(TEvKafka::TEvCommitedOffsetsResponse::TPtr& ev, const TActorContext& ctx) {
-    InflyTopics_--;
-    TopicToOffsets_[ev->Get()->TopicName] = ev->Get()->PartitionIdToOffsets;
-    if (InflyTopics_ == 0) {
+    InflyTopics--;
+    TopicToOffsets[ev->Get()->TopicName] = ev->Get()->PartitionIdToOffsets;
+    if (ev->Get()->Status == TEvKafka::TEvCommitedOffsetsResponse::ERROR) {
+        ErroredTopics.insert(ev->Get()->TopicName);
+    } else if (ev->Get()->Status == TEvKafka::TEvCommitedOffsetsResponse::UNKNOWN_TOPIC) {
+        UnknownTopics.insert(ev->Get()->TopicName);
+    }
+    if (InflyTopics == 0) {
         auto response = GetOffsetFetchResponse();
         Send(Context->ConnectionId, new TEvKafka::TEvResponse(CorrelationId, response, static_cast<EKafkaErrors>(response->ErrorCode)));
         Die(ctx);

+ 13 - 4
ydb/core/kafka_proxy/actors/kafka_offset_fetch_actor.h

@@ -19,18 +19,27 @@ public:
     }
 
     void Bootstrap(const NActors::TActorContext& ctx);
-    void StateWork(TAutoPtr<IEventHandle>& ev);
+
+    STATEFN(StateWork) {
+        switch (ev->GetTypeRewrite()) {
+            HFunc(TEvKafka::TEvCommitedOffsetsResponse, Handle);
+        }
+    }
+
     void Handle(TEvKafka::TEvCommitedOffsetsResponse::TPtr& ev, const TActorContext& ctx);
     void ExtractPartitions(const TString& group, const NKafka::TOffsetFetchRequestData::TOffsetFetchRequestGroup::TOffsetFetchRequestTopics& topic);
     TOffsetFetchResponseData::TPtr GetOffsetFetchResponse();
+    void ReplyError(const TActorContext& ctx);
 
 private:
     const TContext::TPtr Context;
     const ui64 CorrelationId;
     const TMessagePtr<TOffsetFetchRequestData> Message;
-    std::unordered_map<TString, TopicEntities> TopicToEntities_;
-    std::unordered_map<TString, std::shared_ptr<std::unordered_map<ui32, std::unordered_map<TString, ui32>>>> TopicToOffsets_;
-    ui32 InflyTopics_ = 0;
+    std::unordered_map<TString, TopicEntities> TopicToEntities;
+    std::unordered_map<TString, std::shared_ptr<std::unordered_map<ui32, std::unordered_map<TString, ui32>>>> TopicToOffsets;
+    std::set<TString> UnknownTopics;
+    std::set<TString> ErroredTopics;
+    ui32 InflyTopics = 0;
 
 };
 

+ 7 - 0
ydb/core/kafka_proxy/kafka_events.h

@@ -210,10 +210,17 @@ struct TEvTopicOffsetsResponse : public NActors::TEventLocal<TEvTopicOffsetsResp
 struct TEvCommitedOffsetsResponse : public NActors::TEventLocal<TEvCommitedOffsetsResponse, EvTopicOffsetsResponse> 
                            , public NKikimr::NGRpcProxy::V1::TEvPQProxy::TLocalResponseBase
 {
+    enum EStatus {
+        OK,
+        ERROR,
+        UNKNOWN_TOPIC,
+    };
+
     TEvCommitedOffsetsResponse()
     {}
 
     TString TopicName;
+    EStatus Status;
     std::shared_ptr<std::unordered_map<ui32, std::unordered_map<TString, ui32>>> PartitionIdToOffsets;
 };
 

+ 88 - 2
ydb/core/kafka_proxy/ut/ut_protocol.cpp

@@ -1208,6 +1208,7 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) {
         ui64 minActivePartitions = 10;
 
         TString consumerName = "consumer-0";
+        TString consumer1Name = "consumer-1";
 
         TString key = "record-key";
         TString value = "record-value";
@@ -1221,6 +1222,7 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) {
                     .CreateTopic(topicName,
                                  NYdb::NTopic::TCreateTopicSettings()
                                     .BeginAddConsumer(consumerName).EndAddConsumer()
+                                    .BeginAddConsumer(consumer1Name).EndAddConsumer()
                                     .PartitioningSettings(minActivePartitions, 100))
                     .ExtractValueSync();
             UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false);
@@ -1270,6 +1272,36 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) {
                                      static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
         }
 
+        {
+            // Commit offset for consumer-0
+            auto settings = NTopic::TReadSessionSettings()
+                .AppendTopics(NTopic::TTopicReadSettings(topicName))
+                .ConsumerName("consumer-0");
+            auto topicReader = pqClient.CreateReadSession(settings);
+
+            auto m = Read(topicReader);
+            UNIT_ASSERT_EQUAL(m.size(), 1);
+
+            UNIT_ASSERT_EQUAL(m[0].GetMessages().size(), 1);
+            auto& m0 = m[0].GetMessages()[0];
+            m0.Commit();
+        }
+
+        {
+            // Commit offset for consumer-1
+            auto settings = NTopic::TReadSessionSettings()
+                .AppendTopics(NTopic::TTopicReadSettings(topicName))
+                .ConsumerName("consumer-1");
+            auto topicReader = pqClient.CreateReadSession(settings);
+
+            auto m = Read(topicReader);
+            UNIT_ASSERT_EQUAL(m.size(), 1);
+
+            UNIT_ASSERT_EQUAL(m[0].GetMessages().size(), 1);
+            auto& m0 = m[0].GetMessages()[0];
+            m0.Commit();
+        }
+
         {
             // Check commited offset after produce
             std::map<TString, std::vector<i32>> topicsToPartions;
@@ -1281,10 +1313,64 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) {
             UNIT_ASSERT_VALUES_EQUAL(partitions.size(), 4);
             auto partition0 = std::find_if(partitions.begin(), partitions.end(), [](const auto& partition) { return partition.PartitionIndex == 0; });
             UNIT_ASSERT_VALUES_UNEQUAL(partition0, partitions.end());
-            // This check faled one time under asan, commented until I figure out the exact reason.
-            // UNIT_ASSERT_VALUES_EQUAL(partition0->CommittedOffset, 1);
+            UNIT_ASSERT_VALUES_EQUAL(partition0->CommittedOffset, 1);
         }
 
+        {
+            // Check with nonexistent topic
+            std::map<TString, std::vector<i32>> topicsToPartions;
+            topicsToPartions["nonexTopic"] = std::vector<i32>{0, 1};
+            auto msg = client.OffsetFetch(consumerName, topicsToPartions);
+            UNIT_ASSERT_VALUES_EQUAL(msg->Groups.size(), 1);
+            UNIT_ASSERT_VALUES_EQUAL(msg->Groups[0].Topics.size(), 1);
+            UNIT_ASSERT_VALUES_EQUAL(msg->Groups[0].Topics[0].Partitions.size(), 2);
+            for (const auto& partition : msg->Groups[0].Topics[0].Partitions) {
+                UNIT_ASSERT_VALUES_EQUAL(partition.ErrorCode, UNKNOWN_TOPIC_OR_PARTITION);
+            }
+        }
+
+        {
+            // Check with nonexistent consumer 
+            std::map<TString, std::vector<i32>> topicsToPartions;
+            topicsToPartions[topicName] = std::vector<i32>{0, 1};
+            auto msg = client.OffsetFetch("nonexConsumer", topicsToPartions);
+            UNIT_ASSERT_VALUES_EQUAL(msg->Groups.size(), 1);
+            UNIT_ASSERT_VALUES_EQUAL(msg->Groups[0].Topics.size(), 1);
+            UNIT_ASSERT_VALUES_EQUAL(msg->Groups[0].Topics[0].Partitions.size(), 2);
+            for (const auto& partition : msg->Groups[0].Topics[0].Partitions) {
+                UNIT_ASSERT_VALUES_EQUAL(partition.ErrorCode, RESOURCE_NOT_FOUND);
+            }
+        }
+
+        {
+            // Check with 2 consumers
+            TOffsetFetchRequestData request;
+
+            TOffsetFetchRequestData::TOffsetFetchRequestGroup::TOffsetFetchRequestTopics topic;
+            topic.Name = topicName;
+            auto partitionIndexes = std::vector<int>{0};
+            topic.PartitionIndexes = partitionIndexes;
+
+            TOffsetFetchRequestData::TOffsetFetchRequestGroup group0;
+            group0.GroupId = consumerName;
+            group0.Topics.push_back(topic);
+            request.Groups.push_back(group0);
+
+            TOffsetFetchRequestData::TOffsetFetchRequestGroup group1;
+            group1.GroupId = consumer1Name;
+            group1.Topics.push_back(topic);
+            request.Groups.push_back(group1);
+
+            auto msg = client.OffsetFetch(request);
+
+            UNIT_ASSERT_VALUES_EQUAL(msg->Groups.size(), 2);
+            for (const auto& group: msg->Groups) {
+                UNIT_ASSERT_VALUES_EQUAL(group.Topics.size(), 1);
+                UNIT_ASSERT_VALUES_EQUAL(group.Topics[0].Partitions.size(), 1);
+                UNIT_ASSERT_VALUES_EQUAL(group.Topics[0].Partitions[0].CommittedOffset, 1);
+                UNIT_ASSERT_VALUES_EQUAL(group.Topics[0].Partitions[0].ErrorCode, NONE_ERROR);
+            }
+        }
     } // Y_UNIT_TEST(OffsetFetchScenario)
 
     Y_UNIT_TEST(LoginWithApiKey) {