Browse Source

Accurately PassAway

tesseract 1 year ago
parent
commit
88c45a1571
2 changed files with 57 additions and 28 deletions
  1. 56 26
      ydb/core/kafka_proxy/kafka_connection.cpp
  2. 1 2
      ydb/core/raw_socket/sock_listener.cpp

+ 56 - 26
ydb/core/kafka_proxy/kafka_connection.cpp

@@ -195,7 +195,9 @@ protected:
 
     void HandleAccepting(TEvPollerRegisterResult::TPtr ev) {
         PollerToken = std::move(ev->Get()->PollerToken);
-        UpgradeToSecure();
+        if (!UpgradeToSecure()) {
+            return;
+        }
         OnAccept();
     }
 
@@ -256,7 +258,7 @@ protected:
         return TMessagePtr<T>(request->Buffer, request->Message);
     }
    
-    void ProcessRequest(const TActorContext& ctx) {
+    bool ProcessRequest(const TActorContext& ctx) {
         KAFKA_LOG_D("process message: ApiKey=" << Request->Header.RequestApiKey << ", ExpectedSize=" << Request->ExpectedSize
                                                << ", Size=" << Request->Size);
 
@@ -293,15 +295,16 @@ protected:
 
             case LIST_OFFSETS:
                 HandleMessage(&Request->Header, Cast<TListOffsetsRequestData>(Request));
-                return;
+                break;
 
             case FETCH:
                 HandleMessage(&Request->Header, Cast<TFetchRequestData>(Request));
-                return;
+                break;
 
             default:
                 KAFKA_LOG_ERROR("Unsupported message: ApiKey=" << Request->Header.RequestApiKey);
                 PassAway();
+                return false;
         }
 
         // Now message and buffer are held by message actor
@@ -309,6 +312,8 @@ protected:
         Request->Buffer.reset();
 
         Request.reset();
+
+        return true;
     }
 
     void Handle(TEvKafka::TEvResponse::TPtr response, const TActorContext& ctx) {
@@ -366,27 +371,33 @@ protected:
         request->Response = response;
         request->ResponseErrorCode = errorCode;
 
-        ProcessReplyQueue(ctx);
+        if (!ProcessReplyQueue(ctx)) {
+            return;
+        }
         RequestPoller();
     }
 
-    void ProcessReplyQueue(const TActorContext& ctx) {
+    bool ProcessReplyQueue(const TActorContext& ctx) {
         while(!PendingRequestsQueue.empty()) {
             auto& request = PendingRequestsQueue.front();
             if (request->Response.get() == nullptr) {
                 break;
             }
 
-            Reply(&request->Header, request->Response.get(), request->Method, request->StartTime, request->ResponseErrorCode, ctx);
+            if (!Reply(&request->Header, request->Response.get(), request->Method, request->StartTime, request->ResponseErrorCode, ctx)) {
+                return false;
+            }
 
             InflightSize -= request->ExpectedSize;
 
             PendingRequests.erase(request->Header.CorrelationId);
             PendingRequestsQueue.pop_front();
         }
+
+        return true;
     }
 
-    void Reply(const TRequestHeaderData* header, const TApiMessage* reply, const TString method, const TInstant requestStartTime, EKafkaErrors errorCode, const TActorContext& ctx) {
+    bool Reply(const TRequestHeaderData* header, const TApiMessage* reply, const TString method, const TInstant requestStartTime, EKafkaErrors errorCode, const TActorContext& ctx) {
         TKafkaVersion headerVersion = ResponseHeaderVersion(header->RequestApiKey, header->RequestApiVersion);
         TKafkaVersion version = header->RequestApiVersion;
 
@@ -410,37 +421,44 @@ protected:
                                                      << ", Version=" << version
                                                      << ", CorrelationId=" << header->CorrelationId
                                                      << ", Error=" <<  e.what());
-            return PassAway();
+            PassAway();
+            return false;
         }
+
+        return true;
     }
 
-    void UpgradeToSecure() {
+    bool UpgradeToSecure() {
         if (IsSslRequired && !IsSslActive) {
             int res = Socket->TryUpgradeToSecure();
             if (res < 0) {
                 KAFKA_LOG_ERROR("connection closed - error in UpgradeToSecure: " << strerror(-res));
-                return PassAway();
+                PassAway();
+                return false;
             }
             IsSslActive = true;
         }
+        return true;
     }
 
-    void DoRead(const TActorContext& ctx) {
+    bool DoRead(const TActorContext& ctx) {
         KAFKA_LOG_T("DoRead: Demand=" << Demand.Length << ", Step=" << static_cast<i32>(Step));
         for (;;) {
             while (Demand) {
                 ssize_t received = 0;
                 ssize_t res = SocketReceive(Demand.Buffer, Demand.GetLength());
                 if (-res == EAGAIN || -res == EWOULDBLOCK) {
-                    return;
+                    return true;
                 } else if (-res == EINTR) {
                     continue;
                 } else if (!res) {
                     KAFKA_LOG_I("connection closed");
-                    return PassAway();
+                    PassAway();
+                    return false;
                 } else if (res < 0) {
                     KAFKA_LOG_I("connection closed - error in recv: " << strerror(-res));
-                    return PassAway();
+                    PassAway();
+                    return false;
                 }
                 received = res;
 
@@ -460,16 +478,19 @@ protected:
                         NormalizeNumber(Request->ExpectedSize);
                         if (Request->ExpectedSize < 0) {
                             KAFKA_LOG_ERROR("Wrong message size. Size: " << Request->ExpectedSize);
-                            return PassAway();
+                            PassAway();
+                            return false;
                         }
                         if ((ui64)Request->ExpectedSize > Context->Config.GetMaxMessageSize()) {
                             KAFKA_LOG_ERROR("message is big. Size: " << Request->ExpectedSize << ". MaxSize: "
                                                                      << Context->Config.GetMaxMessageSize());
-                            return PassAway();
+                            PassAway();
+                            return false;
                         }
                         if (static_cast<size_t>(Request->ExpectedSize) < HeaderSize) {
                             KAFKA_LOG_ERROR("message is small. Size: " << Request->ExpectedSize);
-                            return PassAway();
+                            PassAway();
+                            return false;
                         }
 
                         Step = INFLIGTH_CHECK;
@@ -477,11 +498,11 @@ protected:
                     case INFLIGTH_CHECK:
                         if (!Context->Authenticated() && !PendingRequestsQueue.empty()) {
                             // Allow only one message to be processed at a time for non-authenticated users
-                            return;
+                            return true;
                         }
                         if (InflightSize + Request->ExpectedSize > Context->Config.GetMaxInflightSize()) {
                             // We limit the size of processed messages so as not to exceed the size of available memory
-                            return;
+                            return true;
                         }
                         InflightSize += Request->ExpectedSize;
                         Step = MESSAGE_READ;
@@ -506,11 +527,13 @@ protected:
 
                         if (PendingRequests.contains(Request->CorrelationId)) {
                             KAFKA_LOG_ERROR("CorrelationId " << Request->CorrelationId << " already processing");
-                            return PassAway();
+                            PassAway();
+                            return false;
                         }
                         if (!Context->Authenticated() && RequireAuthentication(static_cast<EApiKey>(Request->ApiKey))) {
                             KAFKA_LOG_ERROR("unauthenticated request: ApiKey=" << Request->ApiKey);
-                            return PassAway();
+                            PassAway();
+                            return false;
                         }
 
                         Step = MESSAGE_READ;
@@ -540,12 +563,15 @@ protected:
                                                                     << ", Version=" << Request->ApiVersion
                                                                     << ", CorrelationId=" << Request->CorrelationId
                                                                     << ", Error=" <<  e.what());
-                            return PassAway();
+                            PassAway();
+                            return false;
                         }
 
                         Step = SIZE_READ;
 
-                        ProcessRequest(ctx);
+                        if (!ProcessRequest(ctx)) {
+                            return false;
+                        }
 
                         break;
                 }
@@ -556,7 +582,9 @@ protected:
     void HandleConnected(TEvPollerReady::TPtr event, const TActorContext& ctx) {
         if (event->Get()->Read) {
             if (!CloseConnection) {
-                DoRead(ctx);
+                if (!DoRead(ctx)) {
+                    return;
+                }
             }
 
             if (event->Get() == InactivityEvent) {
@@ -588,7 +616,9 @@ protected:
 
     void HandleConnected(TEvPollerRegisterResult::TPtr ev) {
         PollerToken = std::move(ev->Get()->PollerToken);
-        UpgradeToSecure();
+        if (!UpgradeToSecure()) {
+            return;
+        }
         PollerToken->Request(true, true);
     }
 

+ 1 - 2
ydb/core/raw_socket/sock_listener.cpp

@@ -95,8 +95,7 @@ public:
     }
 
     void Handle(TEvents::TEvUnsubscribe::TPtr ev) {
-        auto erased = Connections.erase(ev->Sender);
-        Y_VERIFY_DEBUG(erased);
+        Connections.erase(ev->Sender);
     }
 
     void Handle(NActors::TEvPollerRegisterResult::TPtr ev) {