12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415 |
- #pragma once
- #include "grpc_common.h"
- #include <library/cpp/deprecated/atomic/atomic.h>
- #include <util/thread/factory.h>
- #include <util/string/builder.h>
- #include <grpc++/grpc++.h>
- #include <grpc++/support/async_stream.h>
- #include <grpc++/support/async_unary_call.h>
- #include <deque>
- #include <typeindex>
- #include <typeinfo>
- #include <variant>
- #include <vector>
- #include <unordered_map>
- #include <unordered_set>
- #include <mutex>
- #include <shared_mutex>
- /*
- * This file contains low level logic for grpc
- * This file should not be used in high level code without special reason
- */
- namespace NGrpc {
- const size_t DEFAULT_NUM_THREADS = 2;
- ////////////////////////////////////////////////////////////////////////////////
- void EnableGRpcTracing();
- ////////////////////////////////////////////////////////////////////////////////
- struct TTcpKeepAliveSettings {
- bool Enabled;
- size_t Idle;
- size_t Count;
- size_t Interval;
- };
- ////////////////////////////////////////////////////////////////////////////////
- // Common interface used to execute action from grpc cq routine
- class IQueueClientEvent {
- public:
- virtual ~IQueueClientEvent() = default;
- //! Execute an action defined by implementation
- virtual bool Execute(bool ok) = 0;
- //! Finish and destroy event
- virtual void Destroy() = 0;
- };
- // Implementation of IQueueClientEvent that reduces allocations
- template<class TSelf>
- class TQueueClientFixedEvent : private IQueueClientEvent {
- using TCallback = void (TSelf::*)(bool);
- public:
- TQueueClientFixedEvent(TSelf* self, TCallback callback)
- : Self(self)
- , Callback(callback)
- { }
- IQueueClientEvent* Prepare() {
- Self->Ref();
- return this;
- }
- private:
- bool Execute(bool ok) override {
- ((*Self).*Callback)(ok);
- return false;
- }
- void Destroy() override {
- Self->UnRef();
- }
- private:
- TSelf* const Self;
- TCallback const Callback;
- };
- class IQueueClientContext;
- using IQueueClientContextPtr = std::shared_ptr<IQueueClientContext>;
- // Provider of IQueueClientContext instances
- class IQueueClientContextProvider {
- public:
- virtual ~IQueueClientContextProvider() = default;
- virtual IQueueClientContextPtr CreateContext() = 0;
- };
- // Activity context for a low-level client
- class IQueueClientContext : public IQueueClientContextProvider {
- public:
- virtual ~IQueueClientContext() = default;
- //! Returns CompletionQueue associated with the client
- virtual grpc::CompletionQueue* CompletionQueue() = 0;
- //! Returns true if context has been cancelled
- virtual bool IsCancelled() const = 0;
- //! Tries to cancel context, calling all registered callbacks
- virtual bool Cancel() = 0;
- //! Subscribes callback to cancellation
- //
- // Note there's no way to unsubscribe, if subscription is temporary
- // make sure you create a new context with CreateContext and release
- // it as soon as it's no longer needed.
- virtual void SubscribeCancel(std::function<void()> callback) = 0;
- //! Subscribes callback to cancellation
- //
- // This alias is for compatibility with older code.
- void SubscribeStop(std::function<void()> callback) {
- SubscribeCancel(std::move(callback));
- }
- };
- // Represents grpc status and error message string
- struct TGrpcStatus {
- TString Msg;
- TString Details;
- int GRpcStatusCode;
- bool InternalError;
- TGrpcStatus()
- : GRpcStatusCode(grpc::StatusCode::OK)
- , InternalError(false)
- { }
- TGrpcStatus(TString msg, int statusCode, bool internalError)
- : Msg(std::move(msg))
- , GRpcStatusCode(statusCode)
- , InternalError(internalError)
- { }
- TGrpcStatus(grpc::StatusCode status, TString msg, TString details = {})
- : Msg(std::move(msg))
- , Details(std::move(details))
- , GRpcStatusCode(status)
- , InternalError(false)
- { }
- TGrpcStatus(const grpc::Status& status)
- : TGrpcStatus(status.error_code(), TString(status.error_message()), TString(status.error_details()))
- { }
- TGrpcStatus& operator=(const grpc::Status& status) {
- Msg = TString(status.error_message());
- Details = TString(status.error_details());
- GRpcStatusCode = status.error_code();
- InternalError = false;
- return *this;
- }
- static TGrpcStatus Internal(TString msg) {
- return { std::move(msg), -1, true };
- }
- bool Ok() const {
- return !InternalError && GRpcStatusCode == grpc::StatusCode::OK;
- }
- TStringBuilder ToDebugString() const {
- TStringBuilder ret;
- ret << "gRpcStatusCode: " << GRpcStatusCode;
- if(!Ok())
- ret << ", Msg: " << Msg << ", Details: " << Details << ", InternalError: " << InternalError;
- return ret;
- }
- };
- bool inline IsGRpcStatusGood(const TGrpcStatus& status) {
- return status.Ok();
- }
- // Response callback type - this callback will be called when request is finished
- // (or after getting each chunk in case of streaming mode)
- template<typename TResponse>
- using TResponseCallback = std::function<void (TGrpcStatus&&, TResponse&&)>;
- template<typename TResponse>
- using TAdvancedResponseCallback = std::function<void (const grpc::ClientContext&, TGrpcStatus&&, TResponse&&)>;
- // Call associated metadata
- struct TCallMeta {
- std::shared_ptr<grpc::CallCredentials> CallCredentials;
- std::vector<std::pair<TString, TString>> Aux;
- std::variant<TDuration, TInstant> Timeout; // timeout as duration from now or time point in future
- };
- class TGRpcRequestProcessorCommon {
- protected:
- void ApplyMeta(const TCallMeta& meta) {
- for (const auto& rec : meta.Aux) {
- Context.AddMetadata(rec.first, rec.second);
- }
- if (meta.CallCredentials) {
- Context.set_credentials(meta.CallCredentials);
- }
- if (const TDuration* timeout = std::get_if<TDuration>(&meta.Timeout)) {
- if (*timeout) {
- auto deadline = gpr_time_add(
- gpr_now(GPR_CLOCK_MONOTONIC),
- gpr_time_from_micros(timeout->MicroSeconds(), GPR_TIMESPAN));
- Context.set_deadline(deadline);
- }
- } else if (const TInstant* deadline = std::get_if<TInstant>(&meta.Timeout)) {
- if (*deadline) {
- Context.set_deadline(gpr_time_from_micros(deadline->MicroSeconds(), GPR_CLOCK_MONOTONIC));
- }
- }
- }
- void GetInitialMetadata(std::unordered_multimap<TString, TString>* metadata) {
- for (const auto& [key, value] : Context.GetServerInitialMetadata()) {
- metadata->emplace(
- TString(key.begin(), key.end()),
- TString(value.begin(), value.end())
- );
- }
- }
- grpc::Status Status;
- grpc::ClientContext Context;
- std::shared_ptr<IQueueClientContext> LocalContext;
- };
- template<typename TStub, typename TRequest, typename TResponse>
- class TSimpleRequestProcessor
- : public TThrRefBase
- , public IQueueClientEvent
- , public TGRpcRequestProcessorCommon {
- using TAsyncReaderPtr = std::unique_ptr<grpc::ClientAsyncResponseReader<TResponse>>;
- template<typename> friend class TServiceConnection;
- public:
- using TPtr = TIntrusivePtr<TSimpleRequestProcessor>;
- using TAsyncRequest = TAsyncReaderPtr (TStub::*)(grpc::ClientContext*, const TRequest&, grpc::CompletionQueue*);
- explicit TSimpleRequestProcessor(TResponseCallback<TResponse>&& callback)
- : Callback_(std::move(callback))
- { }
- ~TSimpleRequestProcessor() {
- if (!Replied_ && Callback_) {
- Callback_(TGrpcStatus::Internal("request left unhandled"), std::move(Reply_));
- Callback_ = nullptr; // free resources as early as possible
- }
- }
- bool Execute(bool ok) override {
- {
- std::unique_lock<std::mutex> guard(Mutex_);
- LocalContext.reset();
- }
- TGrpcStatus status;
- if (ok) {
- status = Status;
- } else {
- status = TGrpcStatus::Internal("Unexpected error");
- }
- Replied_ = true;
- Callback_(std::move(status), std::move(Reply_));
- Callback_ = nullptr; // free resources as early as possible
- return false;
- }
- void Destroy() override {
- UnRef();
- }
- private:
- IQueueClientEvent* FinishedEvent() {
- Ref();
- return this;
- }
- void Start(TStub& stub, TAsyncRequest asyncRequest, const TRequest& request, IQueueClientContextProvider* provider) {
- auto context = provider->CreateContext();
- if (!context) {
- Replied_ = true;
- Callback_(TGrpcStatus(grpc::StatusCode::CANCELLED, "Client is shutting down"), std::move(Reply_));
- Callback_ = nullptr;
- return;
- }
- {
- std::unique_lock<std::mutex> guard(Mutex_);
- LocalContext = context;
- Reader_ = (stub.*asyncRequest)(&Context, request, context->CompletionQueue());
- Reader_->Finish(&Reply_, &Status, FinishedEvent());
- }
- context->SubscribeStop([self = TPtr(this)] {
- self->Stop();
- });
- }
- void Stop() {
- Context.TryCancel();
- }
- TResponseCallback<TResponse> Callback_;
- TResponse Reply_;
- std::mutex Mutex_;
- TAsyncReaderPtr Reader_;
- bool Replied_ = false;
- };
- template<typename TStub, typename TRequest, typename TResponse>
- class TAdvancedRequestProcessor
- : public TThrRefBase
- , public IQueueClientEvent
- , public TGRpcRequestProcessorCommon {
- using TAsyncReaderPtr = std::unique_ptr<grpc::ClientAsyncResponseReader<TResponse>>;
- template<typename> friend class TServiceConnection;
- public:
- using TPtr = TIntrusivePtr<TAdvancedRequestProcessor>;
- using TAsyncRequest = TAsyncReaderPtr (TStub::*)(grpc::ClientContext*, const TRequest&, grpc::CompletionQueue*);
- explicit TAdvancedRequestProcessor(TAdvancedResponseCallback<TResponse>&& callback)
- : Callback_(std::move(callback))
- { }
- ~TAdvancedRequestProcessor() {
- if (!Replied_ && Callback_) {
- Callback_(Context, TGrpcStatus::Internal("request left unhandled"), std::move(Reply_));
- Callback_ = nullptr; // free resources as early as possible
- }
- }
- bool Execute(bool ok) override {
- {
- std::unique_lock<std::mutex> guard(Mutex_);
- LocalContext.reset();
- }
- TGrpcStatus status;
- if (ok) {
- status = Status;
- } else {
- status = TGrpcStatus::Internal("Unexpected error");
- }
- Replied_ = true;
- Callback_(Context, std::move(status), std::move(Reply_));
- Callback_ = nullptr; // free resources as early as possible
- return false;
- }
- void Destroy() override {
- UnRef();
- }
- private:
- IQueueClientEvent* FinishedEvent() {
- Ref();
- return this;
- }
- void Start(TStub& stub, TAsyncRequest asyncRequest, const TRequest& request, IQueueClientContextProvider* provider) {
- auto context = provider->CreateContext();
- if (!context) {
- Replied_ = true;
- Callback_(Context, TGrpcStatus(grpc::StatusCode::CANCELLED, "Client is shutting down"), std::move(Reply_));
- Callback_ = nullptr;
- return;
- }
- {
- std::unique_lock<std::mutex> guard(Mutex_);
- LocalContext = context;
- Reader_ = (stub.*asyncRequest)(&Context, request, context->CompletionQueue());
- Reader_->Finish(&Reply_, &Status, FinishedEvent());
- }
- context->SubscribeStop([self = TPtr(this)] {
- self->Stop();
- });
- }
- void Stop() {
- Context.TryCancel();
- }
- TAdvancedResponseCallback<TResponse> Callback_;
- TResponse Reply_;
- std::mutex Mutex_;
- TAsyncReaderPtr Reader_;
- bool Replied_ = false;
- };
- class IStreamRequestCtrl : public TThrRefBase {
- public:
- using TPtr = TIntrusivePtr<IStreamRequestCtrl>;
- /**
- * Asynchronously cancel the request
- */
- virtual void Cancel() = 0;
- };
- template<class TResponse>
- class IStreamRequestReadProcessor : public IStreamRequestCtrl {
- public:
- using TPtr = TIntrusivePtr<IStreamRequestReadProcessor>;
- using TReadCallback = std::function<void(TGrpcStatus&&)>;
- /**
- * Scheduled initial server metadata read from the stream
- */
- virtual void ReadInitialMetadata(std::unordered_multimap<TString, TString>* metadata, TReadCallback callback) = 0;
- /**
- * Scheduled response read from the stream
- * Callback will be called with the status if it failed
- * Only one Read or Finish call may be active at a time
- */
- virtual void Read(TResponse* response, TReadCallback callback) = 0;
- /**
- * Stop reading and gracefully finish the stream
- * Only one Read or Finish call may be active at a time
- */
- virtual void Finish(TReadCallback callback) = 0;
- /**
- * Additional callback to be called when stream has finished
- */
- virtual void AddFinishedCallback(TReadCallback callback) = 0;
- };
- template<class TRequest, class TResponse>
- class IStreamRequestReadWriteProcessor : public IStreamRequestReadProcessor<TResponse> {
- public:
- using TPtr = TIntrusivePtr<IStreamRequestReadWriteProcessor>;
- using TWriteCallback = std::function<void(TGrpcStatus&&)>;
- /**
- * Scheduled request write to the stream
- */
- virtual void Write(TRequest&& request, TWriteCallback callback = { }) = 0;
- };
- class TGRpcKeepAliveSocketMutator;
- // Class to hold stubs allocated on channel.
- // It is poor documented part of grpc. See KIKIMR-6109 and comment to this commit
- // Stub holds shared_ptr<ChannelInterface>, so we can destroy this holder even if
- // request processor using stub
- class TStubsHolder : public TNonCopyable {
- using TypeInfoRef = std::reference_wrapper<const std::type_info>;
- struct THasher {
- std::size_t operator()(TypeInfoRef code) const {
- return code.get().hash_code();
- }
- };
- struct TEqualTo {
- bool operator()(TypeInfoRef lhs, TypeInfoRef rhs) const {
- return lhs.get() == rhs.get();
- }
- };
- public:
- TStubsHolder(std::shared_ptr<grpc::ChannelInterface> channel)
- : ChannelInterface_(channel)
- {}
- // Returns true if channel can't be used to perform request now
- bool IsChannelBroken() const {
- auto state = ChannelInterface_->GetState(false);
- return state == GRPC_CHANNEL_SHUTDOWN ||
- state == GRPC_CHANNEL_TRANSIENT_FAILURE;
- }
- template<typename TStub>
- std::shared_ptr<TStub> GetOrCreateStub() {
- const auto& stubId = typeid(TStub);
- {
- std::shared_lock readGuard(RWMutex_);
- const auto it = Stubs_.find(stubId);
- if (it != Stubs_.end()) {
- return std::static_pointer_cast<TStub>(it->second);
- }
- }
- {
- std::unique_lock writeGuard(RWMutex_);
- auto it = Stubs_.emplace(stubId, nullptr);
- if (!it.second) {
- return std::static_pointer_cast<TStub>(it.first->second);
- } else {
- it.first->second = std::make_shared<TStub>(ChannelInterface_);
- return std::static_pointer_cast<TStub>(it.first->second);
- }
- }
- }
- const TInstant& GetLastUseTime() const {
- return LastUsed_;
- }
- void SetLastUseTime(const TInstant& time) {
- LastUsed_ = time;
- }
- private:
- TInstant LastUsed_ = Now();
- std::shared_mutex RWMutex_;
- std::unordered_map<TypeInfoRef, std::shared_ptr<void>, THasher, TEqualTo> Stubs_;
- std::shared_ptr<grpc::ChannelInterface> ChannelInterface_;
- };
- class TChannelPool {
- public:
- TChannelPool(const TTcpKeepAliveSettings& tcpKeepAliveSettings, const TDuration& expireTime = TDuration::Minutes(6));
- //Allows to CreateStub from TStubsHolder under lock
- //The callback will be called just during GetStubsHolderLocked call
- void GetStubsHolderLocked(const TString& channelId, const TGRpcClientConfig& config, std::function<void(TStubsHolder&)> cb);
- void DeleteChannel(const TString& channelId);
- void DeleteExpiredStubsHolders();
- private:
- std::shared_mutex RWMutex_;
- std::unordered_map<TString, TStubsHolder> Pool_;
- std::multimap<TInstant, TString> LastUsedQueue_;
- TTcpKeepAliveSettings TcpKeepAliveSettings_;
- TDuration ExpireTime_;
- TDuration UpdateReUseTime_;
- void EraseFromQueueByTime(const TInstant& lastUseTime, const TString& channelId);
- };
- template<class TResponse>
- using TStreamReaderCallback = std::function<void(TGrpcStatus&&, typename IStreamRequestReadProcessor<TResponse>::TPtr)>;
- template<typename TStub, typename TRequest, typename TResponse>
- class TStreamRequestReadProcessor
- : public IStreamRequestReadProcessor<TResponse>
- , public TGRpcRequestProcessorCommon {
- template<typename> friend class TServiceConnection;
- public:
- using TSelf = TStreamRequestReadProcessor;
- using TAsyncReaderPtr = std::unique_ptr<grpc::ClientAsyncReader<TResponse>>;
- using TAsyncRequest = TAsyncReaderPtr (TStub::*)(grpc::ClientContext*, const TRequest&, grpc::CompletionQueue*, void*);
- using TReaderCallback = TStreamReaderCallback<TResponse>;
- using TPtr = TIntrusivePtr<TSelf>;
- using TBase = IStreamRequestReadProcessor<TResponse>;
- using TReadCallback = typename TBase::TReadCallback;
- explicit TStreamRequestReadProcessor(TReaderCallback&& callback)
- : Callback(std::move(callback))
- {
- Y_VERIFY(Callback, "Missing connected callback");
- }
- void Cancel() override {
- Context.TryCancel();
- {
- std::unique_lock<std::mutex> guard(Mutex);
- Cancelled = true;
- if (Started && !ReadFinished) {
- if (!ReadActive) {
- ReadFinished = true;
- }
- if (ReadFinished) {
- Stream->Finish(&Status, OnFinishedTag.Prepare());
- }
- }
- }
- }
- void ReadInitialMetadata(std::unordered_multimap<TString, TString>* metadata, TReadCallback callback) override {
- TGrpcStatus status;
- {
- std::unique_lock<std::mutex> guard(Mutex);
- Y_VERIFY(!ReadActive, "Multiple Read/Finish calls detected");
- if (!Finished && !HasInitialMetadata) {
- ReadActive = true;
- ReadCallback = std::move(callback);
- InitialMetadata = metadata;
- if (!ReadFinished) {
- Stream->ReadInitialMetadata(OnReadDoneTag.Prepare());
- }
- return;
- }
- if (!HasInitialMetadata) {
- if (FinishedOk) {
- status = Status;
- } else {
- status = TGrpcStatus::Internal("Unexpected error");
- }
- } else {
- GetInitialMetadata(metadata);
- }
- }
- callback(std::move(status));
- }
- void Read(TResponse* message, TReadCallback callback) override {
- TGrpcStatus status;
- {
- std::unique_lock<std::mutex> guard(Mutex);
- Y_VERIFY(!ReadActive, "Multiple Read/Finish calls detected");
- if (!Finished) {
- ReadActive = true;
- ReadCallback = std::move(callback);
- if (!ReadFinished) {
- Stream->Read(message, OnReadDoneTag.Prepare());
- }
- return;
- }
- if (FinishedOk) {
- status = Status;
- } else {
- status = TGrpcStatus::Internal("Unexpected error");
- }
- }
- if (status.Ok()) {
- status = TGrpcStatus(grpc::StatusCode::OUT_OF_RANGE, "Read EOF");
- }
- callback(std::move(status));
- }
- void Finish(TReadCallback callback) override {
- TGrpcStatus status;
- {
- std::unique_lock<std::mutex> guard(Mutex);
- Y_VERIFY(!ReadActive, "Multiple Read/Finish calls detected");
- if (!Finished) {
- ReadActive = true;
- FinishCallback = std::move(callback);
- if (!ReadFinished) {
- ReadFinished = true;
- }
- Stream->Finish(&Status, OnFinishedTag.Prepare());
- return;
- }
- if (FinishedOk) {
- status = Status;
- } else {
- status = TGrpcStatus::Internal("Unexpected error");
- }
- }
- callback(std::move(status));
- }
- void AddFinishedCallback(TReadCallback callback) override {
- Y_VERIFY(callback, "Unexpected empty callback");
- TGrpcStatus status;
- {
- std::unique_lock<std::mutex> guard(Mutex);
- if (!Finished) {
- FinishedCallbacks.emplace_back().swap(callback);
- return;
- }
- if (FinishedOk) {
- status = Status;
- } else if (Cancelled) {
- status = TGrpcStatus(grpc::StatusCode::CANCELLED, "Stream cancelled");
- } else {
- status = TGrpcStatus::Internal("Unexpected error");
- }
- }
- callback(std::move(status));
- }
- private:
- void Start(TStub& stub, const TRequest& request, TAsyncRequest asyncRequest, IQueueClientContextProvider* provider) {
- auto context = provider->CreateContext();
- if (!context) {
- auto callback = std::move(Callback);
- TGrpcStatus status(grpc::StatusCode::CANCELLED, "Client is shutting down");
- callback(std::move(status), nullptr);
- return;
- }
- {
- std::unique_lock<std::mutex> guard(Mutex);
- LocalContext = context;
- Stream = (stub.*asyncRequest)(&Context, request, context->CompletionQueue(), OnStartDoneTag.Prepare());
- }
- context->SubscribeStop([self = TPtr(this)] {
- self->Cancel();
- });
- }
- void OnReadDone(bool ok) {
- TGrpcStatus status;
- TReadCallback callback;
- std::unordered_multimap<TString, TString>* initialMetadata = nullptr;
- {
- std::unique_lock<std::mutex> guard(Mutex);
- Y_VERIFY(ReadActive, "Unexpected Read done callback");
- Y_VERIFY(!ReadFinished, "Unexpected ReadFinished flag");
- if (!ok || Cancelled) {
- ReadFinished = true;
- Stream->Finish(&Status, OnFinishedTag.Prepare());
- if (!ok) {
- // Keep ReadActive=true, so callback is called
- // after the call is finished with an error
- return;
- }
- }
- callback = std::move(ReadCallback);
- ReadCallback = nullptr;
- ReadActive = false;
- initialMetadata = InitialMetadata;
- InitialMetadata = nullptr;
- HasInitialMetadata = true;
- }
- if (initialMetadata) {
- GetInitialMetadata(initialMetadata);
- }
- callback(std::move(status));
- }
- void OnStartDone(bool ok) {
- TReaderCallback callback;
- {
- std::unique_lock<std::mutex> guard(Mutex);
- Started = true;
- if (!ok || Cancelled) {
- ReadFinished = true;
- Stream->Finish(&Status, OnFinishedTag.Prepare());
- return;
- }
- callback = std::move(Callback);
- Callback = nullptr;
- }
- callback({ }, typename TBase::TPtr(this));
- }
- void OnFinished(bool ok) {
- TGrpcStatus status;
- std::vector<TReadCallback> finishedCallbacks;
- TReaderCallback startCallback;
- TReadCallback readCallback;
- TReadCallback finishCallback;
- {
- std::unique_lock<std::mutex> guard(Mutex);
- Finished = true;
- FinishedOk = ok;
- LocalContext.reset();
- if (ok) {
- status = Status;
- } else if (Cancelled) {
- status = TGrpcStatus(grpc::StatusCode::CANCELLED, "Stream cancelled");
- } else {
- status = TGrpcStatus::Internal("Unexpected error");
- }
- finishedCallbacks.swap(FinishedCallbacks);
- if (Callback) {
- Y_VERIFY(!ReadActive);
- startCallback = std::move(Callback);
- Callback = nullptr;
- } else if (ReadActive) {
- if (ReadCallback) {
- readCallback = std::move(ReadCallback);
- ReadCallback = nullptr;
- } else {
- finishCallback = std::move(FinishCallback);
- FinishCallback = nullptr;
- }
- ReadActive = false;
- }
- }
- for (auto& finishedCallback : finishedCallbacks) {
- auto statusCopy = status;
- finishedCallback(std::move(statusCopy));
- }
- if (startCallback) {
- if (status.Ok()) {
- status = TGrpcStatus(grpc::StatusCode::UNKNOWN, "Unknown stream failure");
- }
- startCallback(std::move(status), nullptr);
- } else if (readCallback) {
- if (status.Ok()) {
- status = TGrpcStatus(grpc::StatusCode::OUT_OF_RANGE, "Read EOF");
- }
- readCallback(std::move(status));
- } else if (finishCallback) {
- finishCallback(std::move(status));
- }
- }
- TReaderCallback Callback;
- TAsyncReaderPtr Stream;
- using TFixedEvent = TQueueClientFixedEvent<TSelf>;
- std::mutex Mutex;
- TFixedEvent OnReadDoneTag = { this, &TSelf::OnReadDone };
- TFixedEvent OnStartDoneTag = { this, &TSelf::OnStartDone };
- TFixedEvent OnFinishedTag = { this, &TSelf::OnFinished };
- TReadCallback ReadCallback;
- TReadCallback FinishCallback;
- std::vector<TReadCallback> FinishedCallbacks;
- std::unordered_multimap<TString, TString>* InitialMetadata = nullptr;
- bool Started = false;
- bool HasInitialMetadata = false;
- bool ReadActive = false;
- bool ReadFinished = false;
- bool Finished = false;
- bool Cancelled = false;
- bool FinishedOk = false;
- };
- template<class TRequest, class TResponse>
- using TStreamConnectedCallback = std::function<void(TGrpcStatus&&, typename IStreamRequestReadWriteProcessor<TRequest, TResponse>::TPtr)>;
- template<class TStub, class TRequest, class TResponse>
- class TStreamRequestReadWriteProcessor
- : public IStreamRequestReadWriteProcessor<TRequest, TResponse>
- , public TGRpcRequestProcessorCommon {
- public:
- using TSelf = TStreamRequestReadWriteProcessor;
- using TBase = IStreamRequestReadWriteProcessor<TRequest, TResponse>;
- using TPtr = TIntrusivePtr<TSelf>;
- using TConnectedCallback = TStreamConnectedCallback<TRequest, TResponse>;
- using TReadCallback = typename TBase::TReadCallback;
- using TWriteCallback = typename TBase::TWriteCallback;
- using TAsyncReaderWriterPtr = std::unique_ptr<grpc::ClientAsyncReaderWriter<TRequest, TResponse>>;
- using TAsyncRequest = TAsyncReaderWriterPtr (TStub::*)(grpc::ClientContext*, grpc::CompletionQueue*, void*);
- explicit TStreamRequestReadWriteProcessor(TConnectedCallback&& callback)
- : ConnectedCallback(std::move(callback))
- {
- Y_VERIFY(ConnectedCallback, "Missing connected callback");
- }
- void Cancel() override {
- Context.TryCancel();
- {
- std::unique_lock<std::mutex> guard(Mutex);
- Cancelled = true;
- if (Started && !(ReadFinished && WriteFinished)) {
- if (!ReadActive) {
- ReadFinished = true;
- }
- if (!WriteActive) {
- WriteFinished = true;
- }
- if (ReadFinished && WriteFinished) {
- Stream->Finish(&Status, OnFinishedTag.Prepare());
- }
- }
- }
- }
- void Write(TRequest&& request, TWriteCallback callback) override {
- TGrpcStatus status;
- {
- std::unique_lock<std::mutex> guard(Mutex);
- if (Cancelled || ReadFinished || WriteFinished) {
- status = TGrpcStatus(grpc::StatusCode::CANCELLED, "Write request dropped");
- } else if (WriteActive) {
- auto& item = WriteQueue.emplace_back();
- item.Callback.swap(callback);
- item.Request.Swap(&request);
- } else {
- WriteActive = true;
- WriteCallback.swap(callback);
- Stream->Write(request, OnWriteDoneTag.Prepare());
- }
- }
- if (!status.Ok() && callback) {
- callback(std::move(status));
- }
- }
- void ReadInitialMetadata(std::unordered_multimap<TString, TString>* metadata, TReadCallback callback) override {
- TGrpcStatus status;
- {
- std::unique_lock<std::mutex> guard(Mutex);
- Y_VERIFY(!ReadActive, "Multiple Read/Finish calls detected");
- if (!Finished && !HasInitialMetadata) {
- ReadActive = true;
- ReadCallback = std::move(callback);
- InitialMetadata = metadata;
- if (!ReadFinished) {
- Stream->ReadInitialMetadata(OnReadDoneTag.Prepare());
- }
- return;
- }
- if (!HasInitialMetadata) {
- if (FinishedOk) {
- status = Status;
- } else {
- status = TGrpcStatus::Internal("Unexpected error");
- }
- } else {
- GetInitialMetadata(metadata);
- }
- }
- callback(std::move(status));
- }
- void Read(TResponse* message, TReadCallback callback) override {
- TGrpcStatus status;
- {
- std::unique_lock<std::mutex> guard(Mutex);
- Y_VERIFY(!ReadActive, "Multiple Read/Finish calls detected");
- if (!Finished) {
- ReadActive = true;
- ReadCallback = std::move(callback);
- if (!ReadFinished) {
- Stream->Read(message, OnReadDoneTag.Prepare());
- }
- return;
- }
- if (FinishedOk) {
- status = Status;
- } else {
- status = TGrpcStatus::Internal("Unexpected error");
- }
- }
- if (status.Ok()) {
- status = TGrpcStatus(grpc::StatusCode::OUT_OF_RANGE, "Read EOF");
- }
- callback(std::move(status));
- }
- void Finish(TReadCallback callback) override {
- TGrpcStatus status;
- {
- std::unique_lock<std::mutex> guard(Mutex);
- Y_VERIFY(!ReadActive, "Multiple Read/Finish calls detected");
- if (!Finished) {
- ReadActive = true;
- FinishCallback = std::move(callback);
- if (!ReadFinished) {
- ReadFinished = true;
- if (!WriteActive) {
- WriteFinished = true;
- }
- if (WriteFinished) {
- Stream->Finish(&Status, OnFinishedTag.Prepare());
- }
- }
- return;
- }
- if (FinishedOk) {
- status = Status;
- } else {
- status = TGrpcStatus::Internal("Unexpected error");
- }
- }
- callback(std::move(status));
- }
- void AddFinishedCallback(TReadCallback callback) override {
- Y_VERIFY(callback, "Unexpected empty callback");
- TGrpcStatus status;
- {
- std::unique_lock<std::mutex> guard(Mutex);
- if (!Finished) {
- FinishedCallbacks.emplace_back().swap(callback);
- return;
- }
- if (FinishedOk) {
- status = Status;
- } else if (Cancelled) {
- status = TGrpcStatus(grpc::StatusCode::CANCELLED, "Stream cancelled");
- } else {
- status = TGrpcStatus::Internal("Unexpected error");
- }
- }
- callback(std::move(status));
- }
- private:
- template<typename> friend class TServiceConnection;
- void Start(TStub& stub, TAsyncRequest asyncRequest, IQueueClientContextProvider* provider) {
- auto context = provider->CreateContext();
- if (!context) {
- auto callback = std::move(ConnectedCallback);
- TGrpcStatus status(grpc::StatusCode::CANCELLED, "Client is shutting down");
- callback(std::move(status), nullptr);
- return;
- }
- {
- std::unique_lock<std::mutex> guard(Mutex);
- LocalContext = context;
- Stream = (stub.*asyncRequest)(&Context, context->CompletionQueue(), OnConnectedTag.Prepare());
- }
- context->SubscribeStop([self = TPtr(this)] {
- self->Cancel();
- });
- }
- private:
- void OnConnected(bool ok) {
- TConnectedCallback callback;
- {
- std::unique_lock<std::mutex> guard(Mutex);
- Started = true;
- if (!ok || Cancelled) {
- ReadFinished = true;
- WriteFinished = true;
- Stream->Finish(&Status, OnFinishedTag.Prepare());
- return;
- }
- callback = std::move(ConnectedCallback);
- ConnectedCallback = nullptr;
- }
- callback({ }, typename TBase::TPtr(this));
- }
- void OnReadDone(bool ok) {
- TGrpcStatus status;
- TReadCallback callback;
- std::unordered_multimap<TString, TString>* initialMetadata = nullptr;
- {
- std::unique_lock<std::mutex> guard(Mutex);
- Y_VERIFY(ReadActive, "Unexpected Read done callback");
- Y_VERIFY(!ReadFinished, "Unexpected ReadFinished flag");
- if (!ok || Cancelled || WriteFinished) {
- ReadFinished = true;
- if (!WriteActive) {
- WriteFinished = true;
- }
- if (WriteFinished) {
- Stream->Finish(&Status, OnFinishedTag.Prepare());
- }
- if (!ok) {
- // Keep ReadActive=true, so callback is called
- // after the call is finished with an error
- return;
- }
- }
- callback = std::move(ReadCallback);
- ReadCallback = nullptr;
- ReadActive = false;
- initialMetadata = InitialMetadata;
- InitialMetadata = nullptr;
- HasInitialMetadata = true;
- }
- if (initialMetadata) {
- GetInitialMetadata(initialMetadata);
- }
- callback(std::move(status));
- }
- void OnWriteDone(bool ok) {
- TWriteCallback okCallback;
- {
- std::unique_lock<std::mutex> guard(Mutex);
- Y_VERIFY(WriteActive, "Unexpected Write done callback");
- Y_VERIFY(!WriteFinished, "Unexpected WriteFinished flag");
- if (ok) {
- okCallback.swap(WriteCallback);
- } else if (WriteCallback) {
- // Put callback back on the queue until OnFinished
- auto& item = WriteQueue.emplace_front();
- item.Callback.swap(WriteCallback);
- }
- if (!ok || Cancelled) {
- WriteActive = false;
- WriteFinished = true;
- if (!ReadActive) {
- ReadFinished = true;
- }
- if (ReadFinished) {
- Stream->Finish(&Status, OnFinishedTag.Prepare());
- }
- } else if (!WriteQueue.empty()) {
- WriteCallback.swap(WriteQueue.front().Callback);
- Stream->Write(WriteQueue.front().Request, OnWriteDoneTag.Prepare());
- WriteQueue.pop_front();
- } else {
- WriteActive = false;
- if (ReadFinished) {
- WriteFinished = true;
- Stream->Finish(&Status, OnFinishedTag.Prepare());
- }
- }
- }
- if (okCallback) {
- okCallback(TGrpcStatus());
- }
- }
- void OnFinished(bool ok) {
- TGrpcStatus status;
- std::deque<TWriteItem> writesDropped;
- std::vector<TReadCallback> finishedCallbacks;
- TConnectedCallback connectedCallback;
- TReadCallback readCallback;
- TReadCallback finishCallback;
- {
- std::unique_lock<std::mutex> guard(Mutex);
- Finished = true;
- FinishedOk = ok;
- LocalContext.reset();
- if (ok) {
- status = Status;
- } else if (Cancelled) {
- status = TGrpcStatus(grpc::StatusCode::CANCELLED, "Stream cancelled");
- } else {
- status = TGrpcStatus::Internal("Unexpected error");
- }
- writesDropped.swap(WriteQueue);
- finishedCallbacks.swap(FinishedCallbacks);
- if (ConnectedCallback) {
- Y_VERIFY(!ReadActive);
- connectedCallback = std::move(ConnectedCallback);
- ConnectedCallback = nullptr;
- } else if (ReadActive) {
- if (ReadCallback) {
- readCallback = std::move(ReadCallback);
- ReadCallback = nullptr;
- } else {
- finishCallback = std::move(FinishCallback);
- FinishCallback = nullptr;
- }
- ReadActive = false;
- }
- }
- for (auto& item : writesDropped) {
- if (item.Callback) {
- TGrpcStatus writeStatus = status;
- if (writeStatus.Ok()) {
- writeStatus = TGrpcStatus(grpc::StatusCode::CANCELLED, "Write request dropped");
- }
- item.Callback(std::move(writeStatus));
- }
- }
- for (auto& finishedCallback : finishedCallbacks) {
- TGrpcStatus statusCopy = status;
- finishedCallback(std::move(statusCopy));
- }
- if (connectedCallback) {
- if (status.Ok()) {
- status = TGrpcStatus(grpc::StatusCode::UNKNOWN, "Unknown stream failure");
- }
- connectedCallback(std::move(status), nullptr);
- } else if (readCallback) {
- if (status.Ok()) {
- status = TGrpcStatus(grpc::StatusCode::OUT_OF_RANGE, "Read EOF");
- }
- readCallback(std::move(status));
- } else if (finishCallback) {
- finishCallback(std::move(status));
- }
- }
- private:
- struct TWriteItem {
- TWriteCallback Callback;
- TRequest Request;
- };
- private:
- using TFixedEvent = TQueueClientFixedEvent<TSelf>;
- TFixedEvent OnConnectedTag = { this, &TSelf::OnConnected };
- TFixedEvent OnReadDoneTag = { this, &TSelf::OnReadDone };
- TFixedEvent OnWriteDoneTag = { this, &TSelf::OnWriteDone };
- TFixedEvent OnFinishedTag = { this, &TSelf::OnFinished };
- private:
- std::mutex Mutex;
- TAsyncReaderWriterPtr Stream;
- TConnectedCallback ConnectedCallback;
- TReadCallback ReadCallback;
- TReadCallback FinishCallback;
- std::vector<TReadCallback> FinishedCallbacks;
- std::deque<TWriteItem> WriteQueue;
- TWriteCallback WriteCallback;
- std::unordered_multimap<TString, TString>* InitialMetadata = nullptr;
- bool Started = false;
- bool HasInitialMetadata = false;
- bool ReadActive = false;
- bool ReadFinished = false;
- bool WriteActive = false;
- bool WriteFinished = false;
- bool Finished = false;
- bool Cancelled = false;
- bool FinishedOk = false;
- };
- class TGRpcClientLow;
- template<typename TGRpcService>
- class TServiceConnection {
- using TStub = typename TGRpcService::Stub;
- friend class TGRpcClientLow;
- public:
- /*
- * Start simple request
- */
- template<typename TRequest, typename TResponse>
- void DoRequest(const TRequest& request,
- TResponseCallback<TResponse> callback,
- typename TSimpleRequestProcessor<TStub, TRequest, TResponse>::TAsyncRequest asyncRequest,
- const TCallMeta& metas = { },
- IQueueClientContextProvider* provider = nullptr)
- {
- auto processor = MakeIntrusive<TSimpleRequestProcessor<TStub, TRequest, TResponse>>(std::move(callback));
- processor->ApplyMeta(metas);
- processor->Start(*Stub_, asyncRequest, request, provider ? provider : Provider_);
- }
- /*
- * Start simple request
- */
- template<typename TRequest, typename TResponse>
- void DoAdvancedRequest(const TRequest& request,
- TAdvancedResponseCallback<TResponse> callback,
- typename TAdvancedRequestProcessor<TStub, TRequest, TResponse>::TAsyncRequest asyncRequest,
- const TCallMeta& metas = { },
- IQueueClientContextProvider* provider = nullptr)
- {
- auto processor = MakeIntrusive<TAdvancedRequestProcessor<TStub, TRequest, TResponse>>(std::move(callback));
- processor->ApplyMeta(metas);
- processor->Start(*Stub_, asyncRequest, request, provider ? provider : Provider_);
- }
- /*
- * Start bidirectional streamming
- */
- template<typename TRequest, typename TResponse>
- void DoStreamRequest(TStreamConnectedCallback<TRequest, TResponse> callback,
- typename TStreamRequestReadWriteProcessor<TStub, TRequest, TResponse>::TAsyncRequest asyncRequest,
- const TCallMeta& metas = { },
- IQueueClientContextProvider* provider = nullptr)
- {
- auto processor = MakeIntrusive<TStreamRequestReadWriteProcessor<TStub, TRequest, TResponse>>(std::move(callback));
- processor->ApplyMeta(metas);
- processor->Start(*Stub_, std::move(asyncRequest), provider ? provider : Provider_);
- }
- /*
- * Start streaming response reading (one request, many responses)
- */
- template<typename TRequest, typename TResponse>
- void DoStreamRequest(const TRequest& request,
- TStreamReaderCallback<TResponse> callback,
- typename TStreamRequestReadProcessor<TStub, TRequest, TResponse>::TAsyncRequest asyncRequest,
- const TCallMeta& metas = { },
- IQueueClientContextProvider* provider = nullptr)
- {
- auto processor = MakeIntrusive<TStreamRequestReadProcessor<TStub, TRequest, TResponse>>(std::move(callback));
- processor->ApplyMeta(metas);
- processor->Start(*Stub_, request, std::move(asyncRequest), provider ? provider : Provider_);
- }
- private:
- TServiceConnection(std::shared_ptr<grpc::ChannelInterface> ci,
- IQueueClientContextProvider* provider)
- : Stub_(TGRpcService::NewStub(ci))
- , Provider_(provider)
- {
- Y_VERIFY(Provider_, "Connection does not have a queue provider");
- }
- TServiceConnection(TStubsHolder& holder,
- IQueueClientContextProvider* provider)
- : Stub_(holder.GetOrCreateStub<TStub>())
- , Provider_(provider)
- {
- Y_VERIFY(Provider_, "Connection does not have a queue provider");
- }
- std::shared_ptr<TStub> Stub_;
- IQueueClientContextProvider* Provider_;
- };
- class TGRpcClientLow
- : public IQueueClientContextProvider
- {
- class TContextImpl;
- friend class TContextImpl;
- enum ECqState : TAtomicBase {
- WORKING = 0,
- STOP_SILENT = 1,
- STOP_EXPLICIT = 2,
- };
- public:
- explicit TGRpcClientLow(size_t numWorkerThread = DEFAULT_NUM_THREADS, bool useCompletionQueuePerThread = false);
- ~TGRpcClientLow();
- // Tries to stop all currently running requests (via their stop callbacks)
- // Will shutdown CQ and drain events once all requests have finished
- // No new requests may be started after this call
- void Stop(bool wait = false);
- // Waits until all currently running requests finish execution
- void WaitIdle();
- inline bool IsStopping() const {
- switch (GetCqState()) {
- case WORKING:
- return false;
- case STOP_SILENT:
- case STOP_EXPLICIT:
- return true;
- }
- Y_UNREACHABLE();
- }
- IQueueClientContextPtr CreateContext() override;
- template<typename TGRpcService>
- std::unique_ptr<TServiceConnection<TGRpcService>> CreateGRpcServiceConnection(const TGRpcClientConfig& config) {
- return std::unique_ptr<TServiceConnection<TGRpcService>>(new TServiceConnection<TGRpcService>(CreateChannelInterface(config), this));
- }
- template<typename TGRpcService>
- std::unique_ptr<TServiceConnection<TGRpcService>> CreateGRpcServiceConnection(TStubsHolder& holder) {
- return std::unique_ptr<TServiceConnection<TGRpcService>>(new TServiceConnection<TGRpcService>(holder, this));
- }
- // Tests only, not thread-safe
- void AddWorkerThreadForTest();
- private:
- using IThreadRef = std::unique_ptr<IThreadFactory::IThread>;
- using CompletionQueueRef = std::unique_ptr<grpc::CompletionQueue>;
- void Init(size_t numWorkerThread);
- inline ECqState GetCqState() const { return (ECqState) AtomicGet(CqState_); }
- inline void SetCqState(ECqState state) { AtomicSet(CqState_, state); }
- void StopInternal(bool silent);
- void WaitInternal();
- void ForgetContext(TContextImpl* context);
- private:
- bool UseCompletionQueuePerThread_;
- std::vector<CompletionQueueRef> CQS_;
- std::vector<IThreadRef> WorkerThreads_;
- TAtomic CqState_ = -1;
- std::mutex Mtx_;
- std::condition_variable ContextsEmpty_;
- std::unordered_set<TContextImpl*> Contexts_;
- std::mutex JoinMutex_;
- };
- } // namespace NGRpc
|