Browse Source

Restoring authorship annotation for Daniil Cherednik <dcherednik@yandex-team.ru>. Commit 2 of 2.

Daniil Cherednik 3 years ago
parent
commit
4b11037e5a

+ 3 - 3
library/cpp/actors/core/actor.cpp

@@ -61,17 +61,17 @@ namespace NActors {
         return TActivationContext::Schedule(delta, new IEventHandle(*this, {}, ev), cookie);
     }
 
-    TActorId TActivationContext::RegisterWithSameMailbox(IActor* actor, TActorId parentId) { 
+    TActorId TActivationContext::RegisterWithSameMailbox(IActor* actor, TActorId parentId) {
         Y_VERIFY_DEBUG(parentId);
         auto& ctx = *TlsActivationContext;
         return ctx.ExecutorThread.RegisterActor(actor, &ctx.Mailbox, parentId.Hint(), parentId);
     }
 
-    TActorId TActorContext::RegisterWithSameMailbox(IActor* actor) const { 
+    TActorId TActorContext::RegisterWithSameMailbox(IActor* actor) const {
         return ExecutorThread.RegisterActor(actor, &Mailbox, SelfID.Hint(), SelfID);
     }
 
-    TActorId IActor::RegisterWithSameMailbox(IActor* actor) const noexcept { 
+    TActorId IActor::RegisterWithSameMailbox(IActor* actor) const noexcept {
         return TlsActivationContext->ExecutorThread.RegisterActor(actor, &TlsActivationContext->Mailbox, SelfActorId.Hint(), SelfActorId);
     }
 

+ 20 - 20
library/cpp/actors/core/actor.h

@@ -71,12 +71,12 @@ namespace NActors {
         // register new actor in ActorSystem on new fresh mailbox.
         static TActorId Register(IActor* actor, TActorId parentId = TActorId(), TMailboxType::EType mailboxType = TMailboxType::HTSwap, ui32 poolId = Max<ui32>());
 
-        // Register new actor in ActorSystem on same _mailbox_ as current actor. 
-        // There is one thread per mailbox to execute actor, which mean 
-        // no _cpu core scalability_ for such actors. 
-        // This method of registration can be usefull if multiple actors share 
-        // some memory. 
-        static TActorId RegisterWithSameMailbox(IActor* actor, TActorId parentId); 
+        // Register new actor in ActorSystem on same _mailbox_ as current actor.
+        // There is one thread per mailbox to execute actor, which mean
+        // no _cpu core scalability_ for such actors.
+        // This method of registration can be usefull if multiple actors share
+        // some memory.
+        static TActorId RegisterWithSameMailbox(IActor* actor, TActorId parentId);
 
         static const TActorContext& AsActorContext();
         static TActorContext ActorContextFor(TActorId id);
@@ -141,12 +141,12 @@ namespace NActors {
         // register new actor in ActorSystem on new fresh mailbox.
         TActorId Register(IActor* actor, TMailboxType::EType mailboxType = TMailboxType::HTSwap, ui32 poolId = Max<ui32>()) const;
 
-        // Register new actor in ActorSystem on same _mailbox_ as current actor. 
-        // There is one thread per mailbox to execute actor, which mean 
-        // no _cpu core scalability_ for such actors. 
-        // This method of registration can be usefull if multiple actors share 
-        // some memory. 
-        TActorId RegisterWithSameMailbox(IActor* actor) const; 
+        // Register new actor in ActorSystem on same _mailbox_ as current actor.
+        // There is one thread per mailbox to execute actor, which mean
+        // no _cpu core scalability_ for such actors.
+        // This method of registration can be usefull if multiple actors share
+        // some memory.
+        TActorId RegisterWithSameMailbox(IActor* actor) const;
 
         std::pair<ui32, ui32> CountMailboxEvents(ui32 maxTraverse = Max<ui32>()) const;
     };
@@ -204,7 +204,7 @@ namespace NActors {
         virtual void Schedule(TDuration delta, IEventBase* ev, ISchedulerCookie* cookie = nullptr) const noexcept = 0;
 
         virtual TActorId Register(IActor*, TMailboxType::EType mailboxType = TMailboxType::HTSwap, ui32 poolId = Max<ui32>()) const noexcept = 0;
-        virtual TActorId RegisterWithSameMailbox(IActor*) const noexcept = 0; 
+        virtual TActorId RegisterWithSameMailbox(IActor*) const noexcept = 0;
     };
 
     class TDecorator;
@@ -368,12 +368,12 @@ namespace NActors {
         // register new actor in ActorSystem on new fresh mailbox.
         TActorId Register(IActor* actor, TMailboxType::EType mailboxType = TMailboxType::HTSwap, ui32 poolId = Max<ui32>()) const noexcept final;
 
-        // Register new actor in ActorSystem on same _mailbox_ as current actor. 
-        // There is one thread per mailbox to execute actor, which mean 
-        // no _cpu core scalability_ for such actors. 
-        // This method of registration can be usefull if multiple actors share 
-        // some memory. 
-        TActorId RegisterWithSameMailbox(IActor* actor) const noexcept final; 
+        // Register new actor in ActorSystem on same _mailbox_ as current actor.
+        // There is one thread per mailbox to execute actor, which mean
+        // no _cpu core scalability_ for such actors.
+        // This method of registration can be usefull if multiple actors share
+        // some memory.
+        TActorId RegisterWithSameMailbox(IActor* actor) const noexcept final;
 
         std::pair<ui32, ui32> CountMailboxEvents(ui32 maxTraverse = Max<ui32>()) const;
 
@@ -478,7 +478,7 @@ namespace NActors {
         virtual bool DoBeforeReceiving(TAutoPtr<IEventHandle>& /*ev*/, const TActorContext& /*ctx*/) {
             return true;
         }
- 
+
         virtual void DoAfterReceiving(const TActorContext& /*ctx*/)
         {
         }

+ 1 - 1
library/cpp/actors/core/actor_coroutine.h

@@ -134,7 +134,7 @@ namespace NActors {
             return GetActorContext().Register(actor, mailboxType, poolId);
         }
 
-        TActorId RegisterWithSameMailbox(IActor* actor) { 
+        TActorId RegisterWithSameMailbox(IActor* actor) {
             return GetActorContext().RegisterWithSameMailbox(actor);
         }
 

+ 2 - 2
library/cpp/actors/dnsresolver/dnsresolver_ondemand.cpp

@@ -32,9 +32,9 @@ namespace NDnsResolver {
         TActorId GetUpstream() {
             if (Y_UNLIKELY(!CachingResolverId)) {
                 if (Y_LIKELY(!SimpleResolverId)) {
-                    SimpleResolverId = RegisterWithSameMailbox(CreateSimpleDnsResolver(Options)); 
+                    SimpleResolverId = RegisterWithSameMailbox(CreateSimpleDnsResolver(Options));
                 }
-                CachingResolverId = RegisterWithSameMailbox(CreateCachingDnsResolver(SimpleResolverId, Options)); 
+                CachingResolverId = RegisterWithSameMailbox(CreateCachingDnsResolver(SimpleResolverId, Options));
             }
             return CachingResolverId;
         }

+ 1 - 1
library/cpp/actors/helpers/flow_controlled_queue.cpp

@@ -94,7 +94,7 @@ class TFlowControlledRequestQueue : public IActor {
 
     void RegisterReqActor(THolder<IEventHandle> ev) {
         TFlowControlledRequestActor *reqActor = new TFlowControlledRequestActor(ActivityType, this, ev->Sender, ev->Cookie, ev->Flags);
-        const TActorId reqActorId = RegisterWithSameMailbox(reqActor); 
+        const TActorId reqActorId = RegisterWithSameMailbox(reqActor);
         RegisteredRequests.emplace_back(reqActor);
 
         if (!Subscribed && (Target.NodeId() != SelfId().NodeId())) {

+ 1 - 1
library/cpp/actors/interconnect/interconnect_tcp_proxy.cpp

@@ -364,7 +364,7 @@ namespace NActors {
             }
 
             // Create new session actor.
-            SessionID = RegisterWithSameMailbox(Session = new TInterconnectSessionTCP(this, msg->Params)); 
+            SessionID = RegisterWithSameMailbox(Session = new TInterconnectSessionTCP(this, msg->Params));
             IActor::InvokeOtherActor(*Session, &TInterconnectSessionTCP::Init);
             SessionVirtualId = msg->Self;
             RemoteSessionVirtualId = msg->Peer;

+ 1 - 1
library/cpp/actors/interconnect/interconnect_tcp_session.cpp

@@ -245,7 +245,7 @@ namespace NActors {
         auto actor = MakeHolder<TInputSessionTCP>(SelfId(), Socket, ReceiveContext, Proxy->Common,
             Proxy->Metrics, Proxy->PeerNodeId, nextPacket, GetDeadPeerTimeout(), Params);
         ReceiveContext->UnlockLastProcessedPacketSerial();
-        ReceiverId = Params.Encryption ? RegisterWithSameMailbox(actor.Release()) : Register(actor.Release(), TMailboxType::ReadAsFilled); 
+        ReceiverId = Params.Encryption ? RegisterWithSameMailbox(actor.Release()) : Register(actor.Release(), TMailboxType::ReadAsFilled);
 
         // register our socket in poller actor
         LOG_DEBUG_IC_SESSION("ICS11", "registering socket in PollerActor");

+ 1 - 1
library/cpp/actors/interconnect/mock/ic_mock.cpp

@@ -227,7 +227,7 @@ namespace NActors {
                 CheckSession();
                 if (!Session) {
                     Session = new TSessionMockActor(this, State.GetValidSessionId());
-                    RegisterWithSameMailbox(Session); 
+                    RegisterWithSameMailbox(Session);
                 }
                 return Session;
             }

+ 1 - 1
library/cpp/actors/testlib/test_runtime.cpp

@@ -1791,7 +1791,7 @@ namespace NActors {
 
         void Bootstrap(const TActorContext& ctx) {
             Become(&TStrandingActorDecorator::StateFunc);
-            ReplyId = ctx.RegisterWithSameMailbox(new TReplyActor(this)); 
+            ReplyId = ctx.RegisterWithSameMailbox(new TReplyActor(this));
             DelegateeOptions.OnlyMailboxes.push_back(TEventMailboxId(Delegatee.NodeId(), Delegatee.Hint()));
             for (const auto& actor : AdditionalActors) {
                 DelegateeOptions.OnlyMailboxes.push_back(TEventMailboxId(actor.NodeId(), actor.Hint()));

+ 207 - 207
library/cpp/grpc/client/grpc_client_low.cpp

@@ -1,164 +1,164 @@
-#include "grpc_client_low.h" 
-#include <contrib/libs/grpc/src/core/lib/iomgr/socket_mutator.h> 
-#include <contrib/libs/grpc/include/grpc/support/log.h> 
- 
-#include <library/cpp/containers/stack_vector/stack_vec.h> 
- 
-#include <util/string/printf.h> 
+#include "grpc_client_low.h"
+#include <contrib/libs/grpc/src/core/lib/iomgr/socket_mutator.h>
+#include <contrib/libs/grpc/include/grpc/support/log.h>
+
+#include <library/cpp/containers/stack_vector/stack_vec.h>
+
+#include <util/string/printf.h>
 #include <util/system/thread.h>
 #include <util/random/random.h>
- 
-#if !defined(_WIN32) && !defined(_WIN64) 
-#include <sys/types.h> 
-#include <sys/socket.h> 
-#include <netinet/in.h> 
-#include <netinet/tcp.h> 
-#endif 
- 
+
+#if !defined(_WIN32) && !defined(_WIN64)
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <netinet/tcp.h>
+#endif
+
 namespace NGrpc {
- 
-void EnableGRpcTracing() { 
-    grpc_tracer_set_enabled("tcp", true); 
-    grpc_tracer_set_enabled("client_channel", true); 
-    grpc_tracer_set_enabled("channel", true); 
-    grpc_tracer_set_enabled("api", true); 
-    grpc_tracer_set_enabled("connectivity_state", true); 
-    grpc_tracer_set_enabled("handshaker", true); 
-    grpc_tracer_set_enabled("http", true); 
-    grpc_tracer_set_enabled("http2_stream_state", true); 
-    grpc_tracer_set_enabled("op_failure", true); 
-    grpc_tracer_set_enabled("timer", true); 
-    gpr_set_log_verbosity(GPR_LOG_SEVERITY_DEBUG); 
-} 
- 
-class TGRpcKeepAliveSocketMutator : public grpc_socket_mutator { 
-public: 
-    TGRpcKeepAliveSocketMutator(int idle, int count, int interval) 
-        : Idle_(idle) 
-        , Count_(count) 
-        , Interval_(interval) 
-    { 
-        grpc_socket_mutator_init(this, &VTable); 
-    } 
-private: 
-    static TGRpcKeepAliveSocketMutator* Cast(grpc_socket_mutator* mutator) { 
-        return static_cast<TGRpcKeepAliveSocketMutator*>(mutator); 
-    } 
- 
-    template<typename TVal> 
-    bool SetOption(int fd, int level, int optname, const TVal& value) { 
-        return setsockopt(fd, level, optname, reinterpret_cast<const char*>(&value), sizeof(value)) == 0; 
-    } 
-    bool SetOption(int fd) { 
-        if (!SetOption(fd, SOL_SOCKET, SO_KEEPALIVE, 1)) { 
-            Cerr << Sprintf("Failed to set SO_KEEPALIVE option: %s", strerror(errno)) << Endl; 
-            return false; 
-        } 
-#ifdef _linux_ 
-        if (Idle_ && !SetOption(fd, IPPROTO_TCP, TCP_KEEPIDLE, Idle_)) { 
-            Cerr << Sprintf("Failed to set TCP_KEEPIDLE option: %s", strerror(errno)) << Endl; 
-            return false; 
-        } 
-        if (Count_ && !SetOption(fd, IPPROTO_TCP, TCP_KEEPCNT, Count_)) { 
-            Cerr << Sprintf("Failed to set TCP_KEEPCNT option: %s", strerror(errno)) << Endl; 
-            return false; 
-        } 
-        if (Interval_ && !SetOption(fd, IPPROTO_TCP, TCP_KEEPINTVL, Interval_)) { 
-            Cerr << Sprintf("Failed to set TCP_KEEPINTVL option: %s", strerror(errno)) << Endl; 
-            return false; 
-        } 
-#endif 
-        return true; 
-    } 
-    static bool Mutate(int fd, grpc_socket_mutator* mutator) { 
-        auto self = Cast(mutator); 
-        return self->SetOption(fd); 
-    } 
-    static int Compare(grpc_socket_mutator* a, grpc_socket_mutator* b) { 
-        const auto* selfA = Cast(a); 
-        const auto* selfB = Cast(b); 
-        auto tupleA = std::make_tuple(selfA->Idle_, selfA->Count_, selfA->Interval_); 
-        auto tupleB = std::make_tuple(selfB->Idle_, selfB->Count_, selfB->Interval_); 
-        return tupleA < tupleB ? -1 : tupleA > tupleB ? 1 : 0; 
-    } 
-    static void Destroy(grpc_socket_mutator* mutator) { 
-        delete Cast(mutator); 
-    } 
- 
-    static grpc_socket_mutator_vtable VTable; 
-    const int Idle_; 
-    const int Count_; 
-    const int Interval_; 
-}; 
- 
-grpc_socket_mutator_vtable TGRpcKeepAliveSocketMutator::VTable = 
-    { 
-        &TGRpcKeepAliveSocketMutator::Mutate, 
-        &TGRpcKeepAliveSocketMutator::Compare, 
-        &TGRpcKeepAliveSocketMutator::Destroy 
-    }; 
- 
+
+void EnableGRpcTracing() {
+    grpc_tracer_set_enabled("tcp", true);
+    grpc_tracer_set_enabled("client_channel", true);
+    grpc_tracer_set_enabled("channel", true);
+    grpc_tracer_set_enabled("api", true);
+    grpc_tracer_set_enabled("connectivity_state", true);
+    grpc_tracer_set_enabled("handshaker", true);
+    grpc_tracer_set_enabled("http", true);
+    grpc_tracer_set_enabled("http2_stream_state", true);
+    grpc_tracer_set_enabled("op_failure", true);
+    grpc_tracer_set_enabled("timer", true);
+    gpr_set_log_verbosity(GPR_LOG_SEVERITY_DEBUG);
+}
+
+class TGRpcKeepAliveSocketMutator : public grpc_socket_mutator {
+public:
+    TGRpcKeepAliveSocketMutator(int idle, int count, int interval)
+        : Idle_(idle)
+        , Count_(count)
+        , Interval_(interval)
+    {
+        grpc_socket_mutator_init(this, &VTable);
+    }
+private:
+    static TGRpcKeepAliveSocketMutator* Cast(grpc_socket_mutator* mutator) {
+        return static_cast<TGRpcKeepAliveSocketMutator*>(mutator);
+    }
+
+    template<typename TVal>
+    bool SetOption(int fd, int level, int optname, const TVal& value) {
+        return setsockopt(fd, level, optname, reinterpret_cast<const char*>(&value), sizeof(value)) == 0;
+    }
+    bool SetOption(int fd) {
+        if (!SetOption(fd, SOL_SOCKET, SO_KEEPALIVE, 1)) {
+            Cerr << Sprintf("Failed to set SO_KEEPALIVE option: %s", strerror(errno)) << Endl;
+            return false;
+        }
+#ifdef _linux_
+        if (Idle_ && !SetOption(fd, IPPROTO_TCP, TCP_KEEPIDLE, Idle_)) {
+            Cerr << Sprintf("Failed to set TCP_KEEPIDLE option: %s", strerror(errno)) << Endl;
+            return false;
+        }
+        if (Count_ && !SetOption(fd, IPPROTO_TCP, TCP_KEEPCNT, Count_)) {
+            Cerr << Sprintf("Failed to set TCP_KEEPCNT option: %s", strerror(errno)) << Endl;
+            return false;
+        }
+        if (Interval_ && !SetOption(fd, IPPROTO_TCP, TCP_KEEPINTVL, Interval_)) {
+            Cerr << Sprintf("Failed to set TCP_KEEPINTVL option: %s", strerror(errno)) << Endl;
+            return false;
+        }
+#endif
+        return true;
+    }
+    static bool Mutate(int fd, grpc_socket_mutator* mutator) {
+        auto self = Cast(mutator);
+        return self->SetOption(fd);
+    }
+    static int Compare(grpc_socket_mutator* a, grpc_socket_mutator* b) {
+        const auto* selfA = Cast(a);
+        const auto* selfB = Cast(b);
+        auto tupleA = std::make_tuple(selfA->Idle_, selfA->Count_, selfA->Interval_);
+        auto tupleB = std::make_tuple(selfB->Idle_, selfB->Count_, selfB->Interval_);
+        return tupleA < tupleB ? -1 : tupleA > tupleB ? 1 : 0;
+    }
+    static void Destroy(grpc_socket_mutator* mutator) {
+        delete Cast(mutator);
+    }
+
+    static grpc_socket_mutator_vtable VTable;
+    const int Idle_;
+    const int Count_;
+    const int Interval_;
+};
+
+grpc_socket_mutator_vtable TGRpcKeepAliveSocketMutator::VTable =
+    {
+        &TGRpcKeepAliveSocketMutator::Mutate,
+        &TGRpcKeepAliveSocketMutator::Compare,
+        &TGRpcKeepAliveSocketMutator::Destroy
+    };
+
 TChannelPool::TChannelPool(const TTcpKeepAliveSettings& tcpKeepAliveSettings, const TDuration& expireTime)
-    : TcpKeepAliveSettings_(tcpKeepAliveSettings) 
+    : TcpKeepAliveSettings_(tcpKeepAliveSettings)
     , ExpireTime_(expireTime)
     , UpdateReUseTime_(ExpireTime_ * 0.3 < TDuration::Seconds(20) ? ExpireTime_ * 0.3 : TDuration::Seconds(20))
-{} 
- 
-void TChannelPool::GetStubsHolderLocked( 
-    const TString& channelId, 
-    const TGRpcClientConfig& config, 
-    std::function<void(TStubsHolder&)> cb) 
-{ 
-    { 
-        std::shared_lock readGuard(RWMutex_); 
-        const auto it = Pool_.find(channelId); 
-        if (it != Pool_.end()) { 
+{}
+
+void TChannelPool::GetStubsHolderLocked(
+    const TString& channelId,
+    const TGRpcClientConfig& config,
+    std::function<void(TStubsHolder&)> cb)
+{
+    {
+        std::shared_lock readGuard(RWMutex_);
+        const auto it = Pool_.find(channelId);
+        if (it != Pool_.end()) {
             if (!it->second.IsChannelBroken() && !(Now() > it->second.GetLastUseTime() + UpdateReUseTime_)) {
-                return cb(it->second); 
-            } 
-        } 
-    } 
-    { 
-        std::unique_lock writeGuard(RWMutex_); 
-        { 
-            auto it = Pool_.find(channelId); 
-            if (it != Pool_.end()) { 
-                if (!it->second.IsChannelBroken()) { 
+                return cb(it->second);
+            }
+        }
+    }
+    {
+        std::unique_lock writeGuard(RWMutex_);
+        {
+            auto it = Pool_.find(channelId);
+            if (it != Pool_.end()) {
+                if (!it->second.IsChannelBroken()) {
                     EraseFromQueueByTime(it->second.GetLastUseTime(), channelId);
                     auto now = Now();
                     LastUsedQueue_.emplace(now, channelId);
                     it->second.SetLastUseTime(now);
-                    return cb(it->second); 
-                } else { 
-                    // This channel can't be used. Remove from pool to create new one 
+                    return cb(it->second);
+                } else {
+                    // This channel can't be used. Remove from pool to create new one
                     EraseFromQueueByTime(it->second.GetLastUseTime(), channelId);
-                    Pool_.erase(it); 
-                } 
-            } 
-        } 
-        TGRpcKeepAliveSocketMutator* mutator = nullptr; 
-        // will be destroyed inside grpc 
-        if (TcpKeepAliveSettings_.Enabled) { 
-            mutator = new TGRpcKeepAliveSocketMutator( 
-                TcpKeepAliveSettings_.Idle, 
-                TcpKeepAliveSettings_.Count, 
-                TcpKeepAliveSettings_.Interval 
-            ); 
-        } 
-        cb(Pool_.emplace(channelId, CreateChannelInterface(config, mutator)).first->second); 
+                    Pool_.erase(it);
+                }
+            }
+        }
+        TGRpcKeepAliveSocketMutator* mutator = nullptr;
+        // will be destroyed inside grpc
+        if (TcpKeepAliveSettings_.Enabled) {
+            mutator = new TGRpcKeepAliveSocketMutator(
+                TcpKeepAliveSettings_.Idle,
+                TcpKeepAliveSettings_.Count,
+                TcpKeepAliveSettings_.Interval
+            );
+        }
+        cb(Pool_.emplace(channelId, CreateChannelInterface(config, mutator)).first->second);
         LastUsedQueue_.emplace(Pool_.at(channelId).GetLastUseTime(), channelId);
-    } 
-} 
- 
-void TChannelPool::DeleteChannel(const TString& channelId) { 
-    std::unique_lock writeLock(RWMutex_); 
+    }
+}
+
+void TChannelPool::DeleteChannel(const TString& channelId) {
+    std::unique_lock writeLock(RWMutex_);
     auto poolIt = Pool_.find(channelId);
     if (poolIt != Pool_.end()) {
         EraseFromQueueByTime(poolIt->second.GetLastUseTime(), channelId);
         Pool_.erase(poolIt);
     }
-} 
- 
+}
+
 void TChannelPool::DeleteExpiredStubsHolders() {
     std::unique_lock writeLock(RWMutex_);
     auto lastExpired = LastUsedQueue_.lower_bound(Now() - ExpireTime_);
@@ -392,7 +392,7 @@ private:
 
     // Some children are stored inline, others are in a set
     std::array<TContextImpl*, 2> InlineChildren{ { nullptr, nullptr } };
-    std::unordered_set<TContextImpl*> Children; 
+    std::unordered_set<TContextImpl*> Children;
 
     // Single callback is stored without extra allocations
     TStackVec<TCallback, 1> Callbacks;
@@ -404,10 +404,10 @@ private:
 TGRpcClientLow::TGRpcClientLow(size_t numWorkerThread, bool useCompletionQueuePerThread)
     : UseCompletionQueuePerThread_(useCompletionQueuePerThread)
 {
-    Init(numWorkerThread); 
-} 
- 
-void TGRpcClientLow::Init(size_t numWorkerThread) { 
+    Init(numWorkerThread);
+}
+
+void TGRpcClientLow::Init(size_t numWorkerThread) {
     SetCqState(WORKING);
     if (UseCompletionQueuePerThread_) {
         for (size_t i = 0; i < numWorkerThread; i++) {
@@ -425,9 +425,9 @@ void TGRpcClientLow::Init(size_t numWorkerThread) {
                 PullEvents(cq);
             }).Release());
         }
-    } 
-} 
- 
+    }
+}
+
 void TGRpcClientLow::AddWorkerThreadForTest() {
     if (UseCompletionQueuePerThread_) {
         CQS_.push_back(std::make_unique<grpc::CompletionQueue>());
@@ -453,17 +453,17 @@ void TGRpcClientLow::Stop(bool wait) {
 
     if (wait) {
         WaitInternal();
-    } 
-} 
- 
+    }
+}
+
 void TGRpcClientLow::StopInternal(bool silent) {
     bool shutdown;
 
     TVector<TContextImpl::TContextPtr> cancelQueue;
 
-    { 
-        std::unique_lock<std::mutex> guard(Mtx_); 
- 
+    {
+        std::unique_lock<std::mutex> guard(Mtx_);
+
         auto allowStateChange = [&]() {
             switch (GetCqState()) {
                 case WORKING:
@@ -484,7 +484,7 @@ void TGRpcClientLow::StopInternal(bool silent) {
 
         SetCqState(silent ? STOP_SILENT : STOP_EXPLICIT);
 
-        if (!silent && !Contexts_.empty()) { 
+        if (!silent && !Contexts_.empty()) {
             cancelQueue.reserve(Contexts_.size());
             for (auto* ptr : Contexts_) {
                 // N.B. some contexts may be stuck in destructors
@@ -494,7 +494,7 @@ void TGRpcClientLow::StopInternal(bool silent) {
             }
         }
 
-        shutdown = Contexts_.empty(); 
+        shutdown = Contexts_.empty();
     }
 
     for (auto& context : cancelQueue) {
@@ -506,62 +506,62 @@ void TGRpcClientLow::StopInternal(bool silent) {
         for (auto& cq : CQS_) {
             cq->Shutdown();
         }
-    } 
+    }
 }
 
 void TGRpcClientLow::WaitInternal() {
-    std::unique_lock<std::mutex> guard(JoinMutex_); 
- 
-    for (auto& ti : WorkerThreads_) { 
-        ti->Join(); 
-    } 
-} 
- 
+    std::unique_lock<std::mutex> guard(JoinMutex_);
+
+    for (auto& ti : WorkerThreads_) {
+        ti->Join();
+    }
+}
+
 void TGRpcClientLow::WaitIdle() {
-    std::unique_lock<std::mutex> guard(Mtx_); 
- 
-    while (!Contexts_.empty()) { 
-        ContextsEmpty_.wait(guard); 
-    } 
-} 
- 
+    std::unique_lock<std::mutex> guard(Mtx_);
+
+    while (!Contexts_.empty()) {
+        ContextsEmpty_.wait(guard);
+    }
+}
+
 std::shared_ptr<IQueueClientContext> TGRpcClientLow::CreateContext() {
-    std::unique_lock<std::mutex> guard(Mtx_); 
- 
-    auto allowCreateContext = [&]() { 
-        switch (GetCqState()) { 
-            case WORKING: 
-                return true; 
-            case STOP_SILENT: 
-            case STOP_EXPLICIT: 
-                return false; 
+    std::unique_lock<std::mutex> guard(Mtx_);
+
+    auto allowCreateContext = [&]() {
+        switch (GetCqState()) {
+            case WORKING:
+                return true;
+            case STOP_SILENT:
+            case STOP_EXPLICIT:
+                return false;
         }
 
-        Y_UNREACHABLE(); 
-    }; 
- 
-    if (!allowCreateContext()) { 
-        // New context creation is forbidden 
-        return nullptr; 
-    }
- 
-    auto context = std::make_shared<TContextImpl>(); 
-    Contexts_.insert(context.get()); 
-    context->Owner = this; 
-    if (UseCompletionQueuePerThread_) { 
-        context->CQ = CQS_[RandomNumber(CQS_.size())].get(); 
-    } else { 
-        context->CQ = CQS_[0].get(); 
-    } 
-    return context; 
+        Y_UNREACHABLE();
+    };
+
+    if (!allowCreateContext()) {
+        // New context creation is forbidden
+        return nullptr;
+    }
+
+    auto context = std::make_shared<TContextImpl>();
+    Contexts_.insert(context.get());
+    context->Owner = this;
+    if (UseCompletionQueuePerThread_) {
+        context->CQ = CQS_[RandomNumber(CQS_.size())].get();
+    } else {
+        context->CQ = CQS_[0].get();
+    }
+    return context;
 }
 
 void TGRpcClientLow::ForgetContext(TContextImpl* context) {
     bool shutdown = false;
 
-    { 
-        std::unique_lock<std::mutex> guard(Mtx_); 
- 
+    {
+        std::unique_lock<std::mutex> guard(Mtx_);
+
         if (!Contexts_.erase(context)) {
             Y_FAIL("Unexpected ForgetContext(%p)", context);
         }
@@ -571,7 +571,7 @@ void TGRpcClientLow::ForgetContext(TContextImpl* context) {
                 shutdown = true;
             }
 
-            ContextsEmpty_.notify_all(); 
+            ContextsEmpty_.notify_all();
         }
     }
 
@@ -580,7 +580,7 @@ void TGRpcClientLow::ForgetContext(TContextImpl* context) {
         for (auto& cq : CQS_) {
             cq->Shutdown();
         }
-    } 
-} 
- 
+    }
+}
+
 } // namespace NGRpc

Some files were not shown because too many files changed in this diff