Browse Source

fix for several continuation tokens

fix for double continuation token
alexnick 2 years ago
parent
commit
6321e25841

+ 4 - 3
ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session_impl.cpp

@@ -741,8 +741,10 @@ TWriteSessionImpl::TProcessSrvMessageResult TWriteSessionImpl::ProcessServerMess
             SessionStartedTs = TInstant::Now();
             OnErrorResolved();
 
-            //EventsQueue->PushEvent(TWriteSessionEvent::TReadyToAcceptEvent{TContinuationToken{}});
-            result.Events.emplace_back(TWriteSessionEvent::TReadyToAcceptEvent{TContinuationToken{}});
+            if (!FirstTokenSent) {
+                result.Events.emplace_back(TWriteSessionEvent::TReadyToAcceptEvent{TContinuationToken{}});
+                FirstTokenSent = true;
+            }
             // Kickstart send after session reestablishment
             SendImpl();
             break;
@@ -804,7 +806,6 @@ bool TWriteSessionImpl::CleanupOnAcknowledged(ui64 sequenceNumber) {
     if(!SentPackedMessage.empty() && SentPackedMessage.front().Offset == sequenceNumber) {
         auto memoryUsage = OnMemoryUsageChangedImpl(-SentPackedMessage.front().Data.size());
         result = memoryUsage.NowOk && !memoryUsage.WasOk;
-            //EventsQueue->PushEvent(TWriteSessionEvent::TReadyToAcceptEvent{TContinuationToken{}});
         const auto& front = SentPackedMessage.front();
         if (front.Compressed) {
             compressedSize = front.Data.size();

+ 1 - 0
ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session_impl.h

@@ -412,6 +412,7 @@ private:
     IExecutor::TPtr Executor;
     IExecutor::TPtr CompressionExecutor;
     size_t MemoryUsage = 0; //!< Estimated amount of memory used
+    bool FirstTokenSent = false;
 
     TMessageBatch CurrentBatch;
 

+ 4 - 3
ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.cpp

@@ -595,8 +595,10 @@ TWriteSessionImpl::TProcessSrvMessageResult TWriteSessionImpl::ProcessServerMess
             SessionStartedTs = TInstant::Now();
             OnErrorResolved();
 
-            //EventsQueue->PushEvent(TWriteSessionEvent::TReadyToAcceptEvent{TContinuationToken{}});
-            result.Events.emplace_back(TWriteSessionEvent::TReadyToAcceptEvent{TContinuationToken{}});
+            if (!FirstTokenSent) {
+                result.Events.emplace_back(TWriteSessionEvent::TReadyToAcceptEvent{TContinuationToken{}});
+                FirstTokenSent = true;
+            }
             // Kickstart send after session reestablishment
             SendImpl();
             break;
@@ -673,7 +675,6 @@ bool TWriteSessionImpl::CleanupOnAcknowledged(ui64 sequenceNumber) {
     if(!SentPackedMessage.empty() && SentPackedMessage.front().Offset == sequenceNumber) {
         auto memoryUsage = OnMemoryUsageChangedImpl(-SentPackedMessage.front().Data.size());
         result = memoryUsage.NowOk && !memoryUsage.WasOk;
-            //EventsQueue->PushEvent(TWriteSessionEvent::TReadyToAcceptEvent{TContinuationToken{}});
         const auto& front = SentPackedMessage.front();
         if (front.Compressed) {
             compressedSize = front.Data.size();

+ 1 - 0
ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.h

@@ -401,6 +401,7 @@ private:
     IExecutor::TPtr Executor;
     IExecutor::TPtr CompressionExecutor;
     size_t MemoryUsage = 0; //!< Estimated amount of memory used
+    bool FirstTokenSent = false;
 
     TMessageBatch CurrentBatch;