Browse Source

Use a separate lock for Processor->Write calls (#7682)

qyryq 7 months ago
parent
commit
4bce249ad0

+ 21 - 8
ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.cpp

@@ -958,7 +958,8 @@ TWriteSessionImpl::TProcessSrvMessageResult TWriteSessionImpl::ProcessServerMess
                 FirstTokenSent = true;
             }
             // Kickstart send after session reestablishment
-            SendImpl();
+            FormGrpcMessagesImpl();
+            SendGrpcMessages();
             break;
         }
         case TServerMessage::kWriteResponse: {
@@ -1140,13 +1141,15 @@ void TWriteSessionImpl::CompressImpl(TBlock&& block_) {
 
 void TWriteSessionImpl::OnCompressed(TBlock&& block, bool isSyncCompression) {
     TMemoryUsageChange memoryUsage;
-    if (!isSyncCompression) {
+    if (isSyncCompression) {
+        // The Lock is already held somewhere up the stack.
+        memoryUsage = OnCompressedImpl(std::move(block));
+    } else {
         with_lock(Lock) {
             memoryUsage = OnCompressedImpl(std::move(block));
         }
-    } else {
-        memoryUsage = OnCompressedImpl(std::move(block));
     }
+    SendGrpcMessages();
     if (memoryUsage.NowOk && !memoryUsage.WasOk) {
         EventsQueue->PushEvent(TWriteSessionEvent::TReadyToAcceptEvent{IssueContinuationToken()});
     }
@@ -1162,7 +1165,7 @@ TMemoryUsageChange TWriteSessionImpl::OnCompressedImpl(TBlock&& block) {
     (*Counters->BytesInflightCompressed) += block.Data.size();
 
     PackedMessagesToSend.emplace(std::move(block));
-    SendImpl();
+    FormGrpcMessagesImpl();
     return memoryUsage;
 }
 
@@ -1279,7 +1282,7 @@ size_t TWriteSessionImpl::WriteBatchImpl() {
     }
     CurrentBatch.Reset();
     if (skipCompression) {
-        SendImpl();
+        FormGrpcMessagesImpl();
     }
     return size;
 }
@@ -1343,7 +1346,16 @@ bool TWriteSessionImpl::TxIsChanged(const Ydb::Topic::StreamWriteMessage_WriteRe
     return GetTransactionId(*writeRequest) != GetTransactionId(OriginalMessagesToSend.front().Tx);
 }
 
-void TWriteSessionImpl::SendImpl() {
+void TWriteSessionImpl::SendGrpcMessages() {
+    with_lock(ProcessorLock) {
+        TClientMessage message;
+        while (GrpcMessagesToSend.Dequeue(&message)) {
+            Processor->Write(std::move(message));
+        }
+    }
+}
+
+void TWriteSessionImpl::FormGrpcMessagesImpl() {
     Y_ABORT_UNLESS(Lock.IsLocked());
 
     // External cycle splits ready blocks into multiple gRPC messages. Current gRPC message size hard limit is 64MiB.
@@ -1413,7 +1425,7 @@ void TWriteSessionImpl::SendImpl() {
                 << OriginalMessagesToSend.size() << " left), first sequence number is "
                 << writeRequest->messages(0).seq_no()
         );
-        Processor->Write(std::move(clientMessage));
+        GrpcMessagesToSend.Enqueue(std::move(clientMessage));
     }
 }
 
@@ -1475,6 +1487,7 @@ void TWriteSessionImpl::HandleWakeUpImpl() {
             with_lock(self->Lock) {
                 self->HandleWakeUpImpl();
             }
+            self->SendGrpcMessages();
         }
     };
     if (TInstant::Now() - LastTokenUpdate > UPDATE_TOKEN_PERIOD) {

+ 6 - 1
ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.h

@@ -5,6 +5,7 @@
 #include <ydb/public/sdk/cpp/client/ydb_topic/impl/topic_impl.h>
 
 #include <util/generic/buffer.h>
+#include <util/thread/lfqueue.h>
 
 
 namespace NYdb::NTopic {
@@ -385,7 +386,8 @@ private:
     ui64 GetNextIdImpl(const TMaybe<ui64>& seqNo);
     ui64 GetSeqNoImpl(ui64 id);
     ui64 GetIdImpl(ui64 seqNo);
-    void SendImpl();
+    void FormGrpcMessagesImpl();
+    void SendGrpcMessages();
     void AbortImpl();
     void CloseImpl(EStatus statusCode, NYql::TIssues&& issues);
     void CloseImpl(EStatus statusCode, const TString& message);
@@ -446,6 +448,9 @@ private:
     std::queue<TOriginalMessage> SentOriginalMessages;
     std::queue<TBlock> SentPackedMessage;
 
+    TLockFreeQueue<TClientMessage> GrpcMessagesToSend;
+    TAdaptiveLock ProcessorLock;
+
     const size_t MaxBlockSize = std::numeric_limits<size_t>::max();
     const size_t MaxBlockMessageCount = 1; //!< Max message count that can be packed into a single block. In block version 0 is equal to 1 for compatibility
     bool Connected = false;