123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586 |
- #include "grpc_client_low.h"
- #include <contrib/libs/grpc/src/core/lib/iomgr/socket_mutator.h>
- #include <contrib/libs/grpc/include/grpc/support/log.h>
- #include <library/cpp/containers/stack_vector/stack_vec.h>
- #include <util/string/printf.h>
- #include <util/system/thread.h>
- #include <util/random/random.h>
- #if !defined(_WIN32) && !defined(_WIN64)
- #include <sys/types.h>
- #include <sys/socket.h>
- #include <netinet/in.h>
- #include <netinet/tcp.h>
- #endif
- namespace NGrpc {
- void EnableGRpcTracing() {
- grpc_tracer_set_enabled("tcp", true);
- grpc_tracer_set_enabled("client_channel", true);
- grpc_tracer_set_enabled("channel", true);
- grpc_tracer_set_enabled("api", true);
- grpc_tracer_set_enabled("connectivity_state", true);
- grpc_tracer_set_enabled("handshaker", true);
- grpc_tracer_set_enabled("http", true);
- grpc_tracer_set_enabled("http2_stream_state", true);
- grpc_tracer_set_enabled("op_failure", true);
- grpc_tracer_set_enabled("timer", true);
- gpr_set_log_verbosity(GPR_LOG_SEVERITY_DEBUG);
- }
- class TGRpcKeepAliveSocketMutator : public grpc_socket_mutator {
- public:
- TGRpcKeepAliveSocketMutator(int idle, int count, int interval)
- : Idle_(idle)
- , Count_(count)
- , Interval_(interval)
- {
- grpc_socket_mutator_init(this, &VTable);
- }
- private:
- static TGRpcKeepAliveSocketMutator* Cast(grpc_socket_mutator* mutator) {
- return static_cast<TGRpcKeepAliveSocketMutator*>(mutator);
- }
- template<typename TVal>
- bool SetOption(int fd, int level, int optname, const TVal& value) {
- return setsockopt(fd, level, optname, reinterpret_cast<const char*>(&value), sizeof(value)) == 0;
- }
- bool SetOption(int fd) {
- if (!SetOption(fd, SOL_SOCKET, SO_KEEPALIVE, 1)) {
- Cerr << Sprintf("Failed to set SO_KEEPALIVE option: %s", strerror(errno)) << Endl;
- return false;
- }
- #ifdef _linux_
- if (Idle_ && !SetOption(fd, IPPROTO_TCP, TCP_KEEPIDLE, Idle_)) {
- Cerr << Sprintf("Failed to set TCP_KEEPIDLE option: %s", strerror(errno)) << Endl;
- return false;
- }
- if (Count_ && !SetOption(fd, IPPROTO_TCP, TCP_KEEPCNT, Count_)) {
- Cerr << Sprintf("Failed to set TCP_KEEPCNT option: %s", strerror(errno)) << Endl;
- return false;
- }
- if (Interval_ && !SetOption(fd, IPPROTO_TCP, TCP_KEEPINTVL, Interval_)) {
- Cerr << Sprintf("Failed to set TCP_KEEPINTVL option: %s", strerror(errno)) << Endl;
- return false;
- }
- #endif
- return true;
- }
- static bool Mutate(int fd, grpc_socket_mutator* mutator) {
- auto self = Cast(mutator);
- return self->SetOption(fd);
- }
- static int Compare(grpc_socket_mutator* a, grpc_socket_mutator* b) {
- const auto* selfA = Cast(a);
- const auto* selfB = Cast(b);
- auto tupleA = std::make_tuple(selfA->Idle_, selfA->Count_, selfA->Interval_);
- auto tupleB = std::make_tuple(selfB->Idle_, selfB->Count_, selfB->Interval_);
- return tupleA < tupleB ? -1 : tupleA > tupleB ? 1 : 0;
- }
- static void Destroy(grpc_socket_mutator* mutator) {
- delete Cast(mutator);
- }
- static grpc_socket_mutator_vtable VTable;
- const int Idle_;
- const int Count_;
- const int Interval_;
- };
- grpc_socket_mutator_vtable TGRpcKeepAliveSocketMutator::VTable =
- {
- &TGRpcKeepAliveSocketMutator::Mutate,
- &TGRpcKeepAliveSocketMutator::Compare,
- &TGRpcKeepAliveSocketMutator::Destroy
- };
- TChannelPool::TChannelPool(const TTcpKeepAliveSettings& tcpKeepAliveSettings, const TDuration& expireTime)
- : TcpKeepAliveSettings_(tcpKeepAliveSettings)
- , ExpireTime_(expireTime)
- , UpdateReUseTime_(ExpireTime_ * 0.3 < TDuration::Seconds(20) ? ExpireTime_ * 0.3 : TDuration::Seconds(20))
- {}
- void TChannelPool::GetStubsHolderLocked(
- const TString& channelId,
- const TGRpcClientConfig& config,
- std::function<void(TStubsHolder&)> cb)
- {
- {
- std::shared_lock readGuard(RWMutex_);
- const auto it = Pool_.find(channelId);
- if (it != Pool_.end()) {
- if (!it->second.IsChannelBroken() && !(Now() > it->second.GetLastUseTime() + UpdateReUseTime_)) {
- return cb(it->second);
- }
- }
- }
- {
- std::unique_lock writeGuard(RWMutex_);
- {
- auto it = Pool_.find(channelId);
- if (it != Pool_.end()) {
- if (!it->second.IsChannelBroken()) {
- EraseFromQueueByTime(it->second.GetLastUseTime(), channelId);
- auto now = Now();
- LastUsedQueue_.emplace(now, channelId);
- it->second.SetLastUseTime(now);
- return cb(it->second);
- } else {
- // This channel can't be used. Remove from pool to create new one
- EraseFromQueueByTime(it->second.GetLastUseTime(), channelId);
- Pool_.erase(it);
- }
- }
- }
- TGRpcKeepAliveSocketMutator* mutator = nullptr;
- // will be destroyed inside grpc
- if (TcpKeepAliveSettings_.Enabled) {
- mutator = new TGRpcKeepAliveSocketMutator(
- TcpKeepAliveSettings_.Idle,
- TcpKeepAliveSettings_.Count,
- TcpKeepAliveSettings_.Interval
- );
- }
- cb(Pool_.emplace(channelId, CreateChannelInterface(config, mutator)).first->second);
- LastUsedQueue_.emplace(Pool_.at(channelId).GetLastUseTime(), channelId);
- }
- }
- void TChannelPool::DeleteChannel(const TString& channelId) {
- std::unique_lock writeLock(RWMutex_);
- auto poolIt = Pool_.find(channelId);
- if (poolIt != Pool_.end()) {
- EraseFromQueueByTime(poolIt->second.GetLastUseTime(), channelId);
- Pool_.erase(poolIt);
- }
- }
- void TChannelPool::DeleteExpiredStubsHolders() {
- std::unique_lock writeLock(RWMutex_);
- auto lastExpired = LastUsedQueue_.lower_bound(Now() - ExpireTime_);
- for (auto i = LastUsedQueue_.begin(); i != lastExpired; ++i){
- Pool_.erase(i->second);
- }
- LastUsedQueue_.erase(LastUsedQueue_.begin(), lastExpired);
- }
- void TChannelPool::EraseFromQueueByTime(const TInstant& lastUseTime, const TString& channelId) {
- auto [begin, end] = LastUsedQueue_.equal_range(lastUseTime);
- auto pos = std::find_if(begin, end, [&](auto a){return a.second == channelId;});
- Y_VERIFY(pos != LastUsedQueue_.end(), "data corruption at TChannelPool");
- LastUsedQueue_.erase(pos);
- }
- static void PullEvents(grpc::CompletionQueue* cq) {
- TThread::SetCurrentThreadName("grpc_client");
- while (true) {
- void* tag;
- bool ok;
- if (!cq->Next(&tag, &ok)) {
- break;
- }
- if (auto* ev = static_cast<IQueueClientEvent*>(tag)) {
- if (!ev->Execute(ok)) {
- ev->Destroy();
- }
- }
- }
- }
- class TGRpcClientLow::TContextImpl final
- : public std::enable_shared_from_this<TContextImpl>
- , public IQueueClientContext
- {
- friend class TGRpcClientLow;
- using TCallback = std::function<void()>;
- using TContextPtr = std::shared_ptr<TContextImpl>;
- public:
- ~TContextImpl() override {
- Y_VERIFY(CountChildren() == 0,
- "Destructor called with non-empty children");
- if (Parent) {
- Parent->ForgetContext(this);
- } else if (Y_LIKELY(Owner)) {
- Owner->ForgetContext(this);
- }
- }
- /**
- * Helper for locking child pointer from a parent container
- */
- static TContextPtr LockChildPtr(TContextImpl* ptr) {
- if (ptr) {
- // N.B. it is safe to do as long as it's done under a mutex and
- // pointer is among valid children. When that's the case we
- // know that TContextImpl destructor has not finished yet, so
- // the object is valid. The lock() method may return nullptr
- // though, if the object is being destructed right now.
- return ptr->weak_from_this().lock();
- } else {
- return nullptr;
- }
- }
- void ForgetContext(TContextImpl* child) {
- std::unique_lock<std::mutex> guard(Mutex);
- auto removed = RemoveChild(child);
- Y_VERIFY(removed, "Unexpected ForgetContext(%p)", child);
- }
- IQueueClientContextPtr CreateContext() override {
- auto self = shared_from_this();
- auto child = std::make_shared<TContextImpl>();
- {
- std::unique_lock<std::mutex> guard(Mutex);
- AddChild(child.get());
- // It's now safe to initialize parent and owner
- child->Parent = std::move(self);
- child->Owner = Owner;
- child->CQ = CQ;
- // Propagate cancellation to a child context
- if (Cancelled.load(std::memory_order_relaxed)) {
- child->Cancelled.store(true, std::memory_order_relaxed);
- }
- }
- return child;
- }
- grpc::CompletionQueue* CompletionQueue() override {
- Y_VERIFY(Owner, "Uninitialized context");
- return CQ;
- }
- bool IsCancelled() const override {
- return Cancelled.load(std::memory_order_acquire);
- }
- bool Cancel() override {
- TStackVec<TCallback, 1> callbacks;
- TStackVec<TContextPtr, 2> children;
- {
- std::unique_lock<std::mutex> guard(Mutex);
- if (Cancelled.load(std::memory_order_relaxed)) {
- // Already cancelled in another thread
- return false;
- }
- callbacks.reserve(Callbacks.size());
- children.reserve(CountChildren());
- for (auto& callback : Callbacks) {
- callbacks.emplace_back().swap(callback);
- }
- Callbacks.clear();
- // Collect all children we need to cancel
- // N.B. we don't clear children links (cleared by destructors)
- // N.B. some children may be stuck in destructors at the moment
- for (TContextImpl* ptr : InlineChildren) {
- if (auto child = LockChildPtr(ptr)) {
- children.emplace_back(std::move(child));
- }
- }
- for (auto* ptr : Children) {
- if (auto child = LockChildPtr(ptr)) {
- children.emplace_back(std::move(child));
- }
- }
- Cancelled.store(true, std::memory_order_release);
- }
- // Call directly subscribed callbacks
- if (callbacks) {
- RunCallbacksNoExcept(callbacks);
- }
- // Cancel all children
- for (auto& child : children) {
- child->Cancel();
- child.reset();
- }
- return true;
- }
- void SubscribeCancel(TCallback callback) override {
- Y_VERIFY(callback, "SubscribeCancel called with an empty callback");
- {
- std::unique_lock<std::mutex> guard(Mutex);
- if (!Cancelled.load(std::memory_order_relaxed)) {
- Callbacks.emplace_back().swap(callback);
- return;
- }
- }
- // Already cancelled, run immediately
- callback();
- }
- private:
- void AddChild(TContextImpl* child) {
- for (TContextImpl*& slot : InlineChildren) {
- if (!slot) {
- slot = child;
- return;
- }
- }
- Children.insert(child);
- }
- bool RemoveChild(TContextImpl* child) {
- for (TContextImpl*& slot : InlineChildren) {
- if (slot == child) {
- slot = nullptr;
- return true;
- }
- }
- return Children.erase(child);
- }
- size_t CountChildren() {
- size_t count = 0;
- for (TContextImpl* ptr : InlineChildren) {
- if (ptr) {
- ++count;
- }
- }
- return count + Children.size();
- }
- template<class TCallbacks>
- static void RunCallbacksNoExcept(TCallbacks& callbacks) noexcept {
- for (auto& callback : callbacks) {
- if (callback) {
- callback();
- callback = nullptr;
- }
- }
- }
- private:
- // We want a simple lock here, without extra memory allocations
- std::mutex Mutex;
- // These fields are initialized on successful registration
- TContextPtr Parent;
- TGRpcClientLow* Owner = nullptr;
- grpc::CompletionQueue* CQ = nullptr;
- // Some children are stored inline, others are in a set
- std::array<TContextImpl*, 2> InlineChildren{ { nullptr, nullptr } };
- std::unordered_set<TContextImpl*> Children;
- // Single callback is stored without extra allocations
- TStackVec<TCallback, 1> Callbacks;
- // Atomic flag for a faster IsCancelled() implementation
- std::atomic<bool> Cancelled;
- };
- TGRpcClientLow::TGRpcClientLow(size_t numWorkerThread, bool useCompletionQueuePerThread)
- : UseCompletionQueuePerThread_(useCompletionQueuePerThread)
- {
- Init(numWorkerThread);
- }
- void TGRpcClientLow::Init(size_t numWorkerThread) {
- SetCqState(WORKING);
- if (UseCompletionQueuePerThread_) {
- for (size_t i = 0; i < numWorkerThread; i++) {
- CQS_.push_back(std::make_unique<grpc::CompletionQueue>());
- auto* cq = CQS_.back().get();
- WorkerThreads_.emplace_back(SystemThreadFactory()->Run([cq]() {
- PullEvents(cq);
- }).Release());
- }
- } else {
- CQS_.push_back(std::make_unique<grpc::CompletionQueue>());
- auto* cq = CQS_.back().get();
- for (size_t i = 0; i < numWorkerThread; i++) {
- WorkerThreads_.emplace_back(SystemThreadFactory()->Run([cq]() {
- PullEvents(cq);
- }).Release());
- }
- }
- }
- void TGRpcClientLow::AddWorkerThreadForTest() {
- if (UseCompletionQueuePerThread_) {
- CQS_.push_back(std::make_unique<grpc::CompletionQueue>());
- auto* cq = CQS_.back().get();
- WorkerThreads_.emplace_back(SystemThreadFactory()->Run([cq]() {
- PullEvents(cq);
- }).Release());
- } else {
- auto* cq = CQS_.back().get();
- WorkerThreads_.emplace_back(SystemThreadFactory()->Run([cq]() {
- PullEvents(cq);
- }).Release());
- }
- }
- TGRpcClientLow::~TGRpcClientLow() {
- StopInternal(true);
- WaitInternal();
- }
- void TGRpcClientLow::Stop(bool wait) {
- StopInternal(false);
- if (wait) {
- WaitInternal();
- }
- }
- void TGRpcClientLow::StopInternal(bool silent) {
- bool shutdown;
- TVector<TContextImpl::TContextPtr> cancelQueue;
- {
- std::unique_lock<std::mutex> guard(Mtx_);
- auto allowStateChange = [&]() {
- switch (GetCqState()) {
- case WORKING:
- return true;
- return !silent;
- return false;
- }
- };
- if (!allowStateChange()) {
- // Completion queue is already stopping
- return;
- }
- SetCqState(silent ? STOP_SILENT : STOP_EXPLICIT);
- if (!silent && !Contexts_.empty()) {
- cancelQueue.reserve(Contexts_.size());
- for (auto* ptr : Contexts_) {
- // N.B. some contexts may be stuck in destructors
- if (auto context = TContextImpl::LockChildPtr(ptr)) {
- cancelQueue.emplace_back(std::move(context));
- }
- }
- }
- shutdown = Contexts_.empty();
- }
- for (auto& context : cancelQueue) {
- context->Cancel();
- context.reset();
- }
- if (shutdown) {
- for (auto& cq : CQS_) {
- cq->Shutdown();
- }
- }
- }
- void TGRpcClientLow::WaitInternal() {
- std::unique_lock<std::mutex> guard(JoinMutex_);
- for (auto& ti : WorkerThreads_) {
- ti->Join();
- }
- }
- void TGRpcClientLow::WaitIdle() {
- std::unique_lock<std::mutex> guard(Mtx_);
- while (!Contexts_.empty()) {
- ContextsEmpty_.wait(guard);
- }
- }
- std::shared_ptr<IQueueClientContext> TGRpcClientLow::CreateContext() {
- std::unique_lock<std::mutex> guard(Mtx_);
- auto allowCreateContext = [&]() {
- switch (GetCqState()) {
- case WORKING:
- return true;
- return false;
- }
- };
- if (!allowCreateContext()) {
- // New context creation is forbidden
- return nullptr;
- }
- auto context = std::make_shared<TContextImpl>();
- Contexts_.insert(context.get());
- context->Owner = this;
- if (UseCompletionQueuePerThread_) {
- context->CQ = CQS_[RandomNumber(CQS_.size())].get();
- } else {
- context->CQ = CQS_[0].get();
- }
- return context;
- }
- void TGRpcClientLow::ForgetContext(TContextImpl* context) {
- bool shutdown = false;
- {
- std::unique_lock<std::mutex> guard(Mtx_);
- if (!Contexts_.erase(context)) {
- Y_FAIL("Unexpected ForgetContext(%p)", context);
- }
- if (Contexts_.empty()) {
- if (IsStopping()) {
- shutdown = true;
- }
- ContextsEmpty_.notify_all();
- }
- }
- if (shutdown) {
- // This was the last context, shutdown CQ
- for (auto& cq : CQS_) {
- cq->Shutdown();
- }
- }
- }
- } // namespace NGRpc