grpc_client_low.h 46 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426
  1. #pragma once
  2. #include "grpc_common.h"
  3. #include <library/cpp/deprecated/atomic/atomic.h>
  4. #include <util/thread/factory.h>
  5. #include <util/string/builder.h>
  6. #include <grpc++/grpc++.h>
  7. #include <grpc++/support/async_stream.h>
  8. #include <grpc++/support/async_unary_call.h>
  9. #include <deque>
  10. #include <typeindex>
  11. #include <typeinfo>
  12. #include <variant>
  13. #include <vector>
  14. #include <unordered_map>
  15. #include <unordered_set>
  16. #include <mutex>
  17. #include <shared_mutex>
  18. /*
  19. * This file contains low level logic for grpc
  20. * This file should not be used in high level code without special reason
  21. */
  22. namespace NGrpc {
  23. const size_t DEFAULT_NUM_THREADS = 2;
  24. ////////////////////////////////////////////////////////////////////////////////
  25. void EnableGRpcTracing();
  26. ////////////////////////////////////////////////////////////////////////////////
  27. struct TTcpKeepAliveSettings {
  28. bool Enabled;
  29. size_t Idle;
  30. size_t Count;
  31. size_t Interval;
  32. };
  33. ////////////////////////////////////////////////////////////////////////////////
  34. // Common interface used to execute action from grpc cq routine
  35. class IQueueClientEvent {
  36. public:
  37. virtual ~IQueueClientEvent() = default;
  38. //! Execute an action defined by implementation
  39. virtual bool Execute(bool ok) = 0;
  40. //! Finish and destroy event
  41. virtual void Destroy() = 0;
  42. };
  43. // Implementation of IQueueClientEvent that reduces allocations
  44. template<class TSelf>
  45. class TQueueClientFixedEvent : private IQueueClientEvent {
  46. using TCallback = void (TSelf::*)(bool);
  47. public:
  48. TQueueClientFixedEvent(TSelf* self, TCallback callback)
  49. : Self(self)
  50. , Callback(callback)
  51. { }
  52. IQueueClientEvent* Prepare() {
  53. Self->Ref();
  54. return this;
  55. }
  56. private:
  57. bool Execute(bool ok) override {
  58. ((*Self).*Callback)(ok);
  59. return false;
  60. }
  61. void Destroy() override {
  62. Self->UnRef();
  63. }
  64. private:
  65. TSelf* const Self;
  66. TCallback const Callback;
  67. };
  68. class IQueueClientContext;
  69. using IQueueClientContextPtr = std::shared_ptr<IQueueClientContext>;
  70. // Provider of IQueueClientContext instances
  71. class IQueueClientContextProvider {
  72. public:
  73. virtual ~IQueueClientContextProvider() = default;
  74. virtual IQueueClientContextPtr CreateContext() = 0;
  75. };
  76. // Activity context for a low-level client
  77. class IQueueClientContext : public IQueueClientContextProvider {
  78. public:
  79. virtual ~IQueueClientContext() = default;
  80. //! Returns CompletionQueue associated with the client
  81. virtual grpc::CompletionQueue* CompletionQueue() = 0;
  82. //! Returns true if context has been cancelled
  83. virtual bool IsCancelled() const = 0;
  84. //! Tries to cancel context, calling all registered callbacks
  85. virtual bool Cancel() = 0;
  86. //! Subscribes callback to cancellation
  87. //
  88. // Note there's no way to unsubscribe, if subscription is temporary
  89. // make sure you create a new context with CreateContext and release
  90. // it as soon as it's no longer needed.
  91. virtual void SubscribeCancel(std::function<void()> callback) = 0;
  92. //! Subscribes callback to cancellation
  93. //
  94. // This alias is for compatibility with older code.
  95. void SubscribeStop(std::function<void()> callback) {
  96. SubscribeCancel(std::move(callback));
  97. }
  98. };
  99. // Represents grpc status and error message string
  100. struct TGrpcStatus {
  101. TString Msg;
  102. TString Details;
  103. int GRpcStatusCode;
  104. bool InternalError;
  105. std::multimap<TString, TString> ServerTrailingMetadata;
  106. TGrpcStatus()
  107. : GRpcStatusCode(grpc::StatusCode::OK)
  108. , InternalError(false)
  109. { }
  110. TGrpcStatus(TString msg, int statusCode, bool internalError)
  111. : Msg(std::move(msg))
  112. , GRpcStatusCode(statusCode)
  113. , InternalError(internalError)
  114. { }
  115. TGrpcStatus(grpc::StatusCode status, TString msg, TString details = {})
  116. : Msg(std::move(msg))
  117. , Details(std::move(details))
  118. , GRpcStatusCode(status)
  119. , InternalError(false)
  120. { }
  121. TGrpcStatus(const grpc::Status& status)
  122. : TGrpcStatus(status.error_code(), TString(status.error_message()), TString(status.error_details()))
  123. { }
  124. TGrpcStatus& operator=(const grpc::Status& status) {
  125. Msg = TString(status.error_message());
  126. Details = TString(status.error_details());
  127. GRpcStatusCode = status.error_code();
  128. InternalError = false;
  129. return *this;
  130. }
  131. static TGrpcStatus Internal(TString msg) {
  132. return { std::move(msg), -1, true };
  133. }
  134. bool Ok() const {
  135. return !InternalError && GRpcStatusCode == grpc::StatusCode::OK;
  136. }
  137. TStringBuilder ToDebugString() const {
  138. TStringBuilder ret;
  139. ret << "gRpcStatusCode: " << GRpcStatusCode;
  140. if(!Ok())
  141. ret << ", Msg: " << Msg << ", Details: " << Details << ", InternalError: " << InternalError;
  142. return ret;
  143. }
  144. };
  145. bool inline IsGRpcStatusGood(const TGrpcStatus& status) {
  146. return status.Ok();
  147. }
  148. // Response callback type - this callback will be called when request is finished
  149. // (or after getting each chunk in case of streaming mode)
  150. template<typename TResponse>
  151. using TResponseCallback = std::function<void (TGrpcStatus&&, TResponse&&)>;
  152. template<typename TResponse>
  153. using TAdvancedResponseCallback = std::function<void (const grpc::ClientContext&, TGrpcStatus&&, TResponse&&)>;
  154. // Call associated metadata
  155. struct TCallMeta {
  156. std::shared_ptr<grpc::CallCredentials> CallCredentials;
  157. std::vector<std::pair<TString, TString>> Aux;
  158. std::variant<TDuration, TInstant> Timeout; // timeout as duration from now or time point in future
  159. };
  160. class TGRpcRequestProcessorCommon {
  161. protected:
  162. void ApplyMeta(const TCallMeta& meta) {
  163. for (const auto& rec : meta.Aux) {
  164. Context.AddMetadata(rec.first, rec.second);
  165. }
  166. if (meta.CallCredentials) {
  167. Context.set_credentials(meta.CallCredentials);
  168. }
  169. if (const TDuration* timeout = std::get_if<TDuration>(&meta.Timeout)) {
  170. if (*timeout) {
  171. auto deadline = gpr_time_add(
  172. gpr_now(GPR_CLOCK_MONOTONIC),
  173. gpr_time_from_micros(timeout->MicroSeconds(), GPR_TIMESPAN));
  174. Context.set_deadline(deadline);
  175. }
  176. } else if (const TInstant* deadline = std::get_if<TInstant>(&meta.Timeout)) {
  177. if (*deadline) {
  178. Context.set_deadline(gpr_time_from_micros(deadline->MicroSeconds(), GPR_CLOCK_MONOTONIC));
  179. }
  180. }
  181. }
  182. void GetInitialMetadata(std::unordered_multimap<TString, TString>* metadata) {
  183. for (const auto& [key, value] : Context.GetServerInitialMetadata()) {
  184. metadata->emplace(
  185. TString(key.begin(), key.end()),
  186. TString(value.begin(), value.end())
  187. );
  188. }
  189. }
  190. grpc::Status Status;
  191. grpc::ClientContext Context;
  192. std::shared_ptr<IQueueClientContext> LocalContext;
  193. };
  194. template<typename TStub, typename TRequest, typename TResponse>
  195. class TSimpleRequestProcessor
  196. : public TThrRefBase
  197. , public IQueueClientEvent
  198. , public TGRpcRequestProcessorCommon {
  199. using TAsyncReaderPtr = std::unique_ptr<grpc::ClientAsyncResponseReader<TResponse>>;
  200. template<typename> friend class TServiceConnection;
  201. public:
  202. using TPtr = TIntrusivePtr<TSimpleRequestProcessor>;
  203. using TAsyncRequest = TAsyncReaderPtr (TStub::*)(grpc::ClientContext*, const TRequest&, grpc::CompletionQueue*);
  204. explicit TSimpleRequestProcessor(TResponseCallback<TResponse>&& callback)
  205. : Callback_(std::move(callback))
  206. { }
  207. ~TSimpleRequestProcessor() {
  208. if (!Replied_ && Callback_) {
  209. Callback_(TGrpcStatus::Internal("request left unhandled"), std::move(Reply_));
  210. Callback_ = nullptr; // free resources as early as possible
  211. }
  212. }
  213. bool Execute(bool ok) override {
  214. {
  215. std::unique_lock<std::mutex> guard(Mutex_);
  216. LocalContext.reset();
  217. }
  218. TGrpcStatus status;
  219. if (ok) {
  220. status = Status;
  221. } else {
  222. status = TGrpcStatus::Internal("Unexpected error");
  223. }
  224. Replied_ = true;
  225. Callback_(std::move(status), std::move(Reply_));
  226. Callback_ = nullptr; // free resources as early as possible
  227. return false;
  228. }
  229. void Destroy() override {
  230. UnRef();
  231. }
  232. private:
  233. IQueueClientEvent* FinishedEvent() {
  234. Ref();
  235. return this;
  236. }
  237. void Start(TStub& stub, TAsyncRequest asyncRequest, const TRequest& request, IQueueClientContextProvider* provider) {
  238. auto context = provider->CreateContext();
  239. if (!context) {
  240. Replied_ = true;
  241. Callback_(TGrpcStatus(grpc::StatusCode::CANCELLED, "Client is shutting down"), std::move(Reply_));
  242. Callback_ = nullptr;
  243. return;
  244. }
  245. {
  246. std::unique_lock<std::mutex> guard(Mutex_);
  247. LocalContext = context;
  248. Reader_ = (stub.*asyncRequest)(&Context, request, context->CompletionQueue());
  249. Reader_->Finish(&Reply_, &Status, FinishedEvent());
  250. }
  251. context->SubscribeStop([self = TPtr(this)] {
  252. self->Stop();
  253. });
  254. }
  255. void Stop() {
  256. Context.TryCancel();
  257. }
  258. TResponseCallback<TResponse> Callback_;
  259. TResponse Reply_;
  260. std::mutex Mutex_;
  261. TAsyncReaderPtr Reader_;
  262. bool Replied_ = false;
  263. };
  264. template<typename TStub, typename TRequest, typename TResponse>
  265. class TAdvancedRequestProcessor
  266. : public TThrRefBase
  267. , public IQueueClientEvent
  268. , public TGRpcRequestProcessorCommon {
  269. using TAsyncReaderPtr = std::unique_ptr<grpc::ClientAsyncResponseReader<TResponse>>;
  270. template<typename> friend class TServiceConnection;
  271. public:
  272. using TPtr = TIntrusivePtr<TAdvancedRequestProcessor>;
  273. using TAsyncRequest = TAsyncReaderPtr (TStub::*)(grpc::ClientContext*, const TRequest&, grpc::CompletionQueue*);
  274. explicit TAdvancedRequestProcessor(TAdvancedResponseCallback<TResponse>&& callback)
  275. : Callback_(std::move(callback))
  276. { }
  277. ~TAdvancedRequestProcessor() {
  278. if (!Replied_ && Callback_) {
  279. Callback_(Context, TGrpcStatus::Internal("request left unhandled"), std::move(Reply_));
  280. Callback_ = nullptr; // free resources as early as possible
  281. }
  282. }
  283. bool Execute(bool ok) override {
  284. {
  285. std::unique_lock<std::mutex> guard(Mutex_);
  286. LocalContext.reset();
  287. }
  288. TGrpcStatus status;
  289. if (ok) {
  290. status = Status;
  291. } else {
  292. status = TGrpcStatus::Internal("Unexpected error");
  293. }
  294. Replied_ = true;
  295. Callback_(Context, std::move(status), std::move(Reply_));
  296. Callback_ = nullptr; // free resources as early as possible
  297. return false;
  298. }
  299. void Destroy() override {
  300. UnRef();
  301. }
  302. private:
  303. IQueueClientEvent* FinishedEvent() {
  304. Ref();
  305. return this;
  306. }
  307. void Start(TStub& stub, TAsyncRequest asyncRequest, const TRequest& request, IQueueClientContextProvider* provider) {
  308. auto context = provider->CreateContext();
  309. if (!context) {
  310. Replied_ = true;
  311. Callback_(Context, TGrpcStatus(grpc::StatusCode::CANCELLED, "Client is shutting down"), std::move(Reply_));
  312. Callback_ = nullptr;
  313. return;
  314. }
  315. {
  316. std::unique_lock<std::mutex> guard(Mutex_);
  317. LocalContext = context;
  318. Reader_ = (stub.*asyncRequest)(&Context, request, context->CompletionQueue());
  319. Reader_->Finish(&Reply_, &Status, FinishedEvent());
  320. }
  321. context->SubscribeStop([self = TPtr(this)] {
  322. self->Stop();
  323. });
  324. }
  325. void Stop() {
  326. Context.TryCancel();
  327. }
  328. TAdvancedResponseCallback<TResponse> Callback_;
  329. TResponse Reply_;
  330. std::mutex Mutex_;
  331. TAsyncReaderPtr Reader_;
  332. bool Replied_ = false;
  333. };
  334. class IStreamRequestCtrl : public TThrRefBase {
  335. public:
  336. using TPtr = TIntrusivePtr<IStreamRequestCtrl>;
  337. /**
  338. * Asynchronously cancel the request
  339. */
  340. virtual void Cancel() = 0;
  341. };
  342. template<class TResponse>
  343. class IStreamRequestReadProcessor : public IStreamRequestCtrl {
  344. public:
  345. using TPtr = TIntrusivePtr<IStreamRequestReadProcessor>;
  346. using TReadCallback = std::function<void(TGrpcStatus&&)>;
  347. /**
  348. * Scheduled initial server metadata read from the stream
  349. */
  350. virtual void ReadInitialMetadata(std::unordered_multimap<TString, TString>* metadata, TReadCallback callback) = 0;
  351. /**
  352. * Scheduled response read from the stream
  353. * Callback will be called with the status if it failed
  354. * Only one Read or Finish call may be active at a time
  355. */
  356. virtual void Read(TResponse* response, TReadCallback callback) = 0;
  357. /**
  358. * Stop reading and gracefully finish the stream
  359. * Only one Read or Finish call may be active at a time
  360. */
  361. virtual void Finish(TReadCallback callback) = 0;
  362. /**
  363. * Additional callback to be called when stream has finished
  364. */
  365. virtual void AddFinishedCallback(TReadCallback callback) = 0;
  366. };
  367. template<class TRequest, class TResponse>
  368. class IStreamRequestReadWriteProcessor : public IStreamRequestReadProcessor<TResponse> {
  369. public:
  370. using TPtr = TIntrusivePtr<IStreamRequestReadWriteProcessor>;
  371. using TWriteCallback = std::function<void(TGrpcStatus&&)>;
  372. /**
  373. * Scheduled request write to the stream
  374. */
  375. virtual void Write(TRequest&& request, TWriteCallback callback = { }) = 0;
  376. };
  377. class TGRpcKeepAliveSocketMutator;
  378. // Class to hold stubs allocated on channel.
  379. // It is poor documented part of grpc. See KIKIMR-6109 and comment to this commit
  380. // Stub holds shared_ptr<ChannelInterface>, so we can destroy this holder even if
  381. // request processor using stub
  382. class TStubsHolder : public TNonCopyable {
  383. using TypeInfoRef = std::reference_wrapper<const std::type_info>;
  384. struct THasher {
  385. std::size_t operator()(TypeInfoRef code) const {
  386. return code.get().hash_code();
  387. }
  388. };
  389. struct TEqualTo {
  390. bool operator()(TypeInfoRef lhs, TypeInfoRef rhs) const {
  391. return lhs.get() == rhs.get();
  392. }
  393. };
  394. public:
  395. TStubsHolder(std::shared_ptr<grpc::ChannelInterface> channel)
  396. : ChannelInterface_(channel)
  397. {}
  398. // Returns true if channel can't be used to perform request now
  399. bool IsChannelBroken() const {
  400. auto state = ChannelInterface_->GetState(false);
  401. return state == GRPC_CHANNEL_SHUTDOWN ||
  402. state == GRPC_CHANNEL_TRANSIENT_FAILURE;
  403. }
  404. template<typename TStub>
  405. std::shared_ptr<TStub> GetOrCreateStub() {
  406. const auto& stubId = typeid(TStub);
  407. {
  408. std::shared_lock readGuard(RWMutex_);
  409. const auto it = Stubs_.find(stubId);
  410. if (it != Stubs_.end()) {
  411. return std::static_pointer_cast<TStub>(it->second);
  412. }
  413. }
  414. {
  415. std::unique_lock writeGuard(RWMutex_);
  416. auto it = Stubs_.emplace(stubId, nullptr);
  417. if (!it.second) {
  418. return std::static_pointer_cast<TStub>(it.first->second);
  419. } else {
  420. it.first->second = std::make_shared<TStub>(ChannelInterface_);
  421. return std::static_pointer_cast<TStub>(it.first->second);
  422. }
  423. }
  424. }
  425. const TInstant& GetLastUseTime() const {
  426. return LastUsed_;
  427. }
  428. void SetLastUseTime(const TInstant& time) {
  429. LastUsed_ = time;
  430. }
  431. private:
  432. TInstant LastUsed_ = Now();
  433. std::shared_mutex RWMutex_;
  434. std::unordered_map<TypeInfoRef, std::shared_ptr<void>, THasher, TEqualTo> Stubs_;
  435. std::shared_ptr<grpc::ChannelInterface> ChannelInterface_;
  436. };
  437. class TChannelPool {
  438. public:
  439. TChannelPool(const TTcpKeepAliveSettings& tcpKeepAliveSettings, const TDuration& expireTime = TDuration::Minutes(6));
  440. //Allows to CreateStub from TStubsHolder under lock
  441. //The callback will be called just during GetStubsHolderLocked call
  442. void GetStubsHolderLocked(const TString& channelId, const TGRpcClientConfig& config, std::function<void(TStubsHolder&)> cb);
  443. void DeleteChannel(const TString& channelId);
  444. void DeleteExpiredStubsHolders();
  445. private:
  446. std::shared_mutex RWMutex_;
  447. std::unordered_map<TString, TStubsHolder> Pool_;
  448. std::multimap<TInstant, TString> LastUsedQueue_;
  449. TTcpKeepAliveSettings TcpKeepAliveSettings_;
  450. TDuration ExpireTime_;
  451. TDuration UpdateReUseTime_;
  452. void EraseFromQueueByTime(const TInstant& lastUseTime, const TString& channelId);
  453. };
  454. template<class TResponse>
  455. using TStreamReaderCallback = std::function<void(TGrpcStatus&&, typename IStreamRequestReadProcessor<TResponse>::TPtr)>;
  456. template<typename TStub, typename TRequest, typename TResponse>
  457. class TStreamRequestReadProcessor
  458. : public IStreamRequestReadProcessor<TResponse>
  459. , public TGRpcRequestProcessorCommon {
  460. template<typename> friend class TServiceConnection;
  461. public:
  462. using TSelf = TStreamRequestReadProcessor;
  463. using TAsyncReaderPtr = std::unique_ptr<grpc::ClientAsyncReader<TResponse>>;
  464. using TAsyncRequest = TAsyncReaderPtr (TStub::*)(grpc::ClientContext*, const TRequest&, grpc::CompletionQueue*, void*);
  465. using TReaderCallback = TStreamReaderCallback<TResponse>;
  466. using TPtr = TIntrusivePtr<TSelf>;
  467. using TBase = IStreamRequestReadProcessor<TResponse>;
  468. using TReadCallback = typename TBase::TReadCallback;
  469. explicit TStreamRequestReadProcessor(TReaderCallback&& callback)
  470. : Callback(std::move(callback))
  471. {
  472. Y_ABORT_UNLESS(Callback, "Missing connected callback");
  473. }
  474. void Cancel() override {
  475. Context.TryCancel();
  476. {
  477. std::unique_lock<std::mutex> guard(Mutex);
  478. Cancelled = true;
  479. if (Started && !ReadFinished) {
  480. if (!ReadActive) {
  481. ReadFinished = true;
  482. }
  483. if (ReadFinished) {
  484. Stream->Finish(&Status, OnFinishedTag.Prepare());
  485. }
  486. }
  487. }
  488. }
  489. void ReadInitialMetadata(std::unordered_multimap<TString, TString>* metadata, TReadCallback callback) override {
  490. TGrpcStatus status;
  491. {
  492. std::unique_lock<std::mutex> guard(Mutex);
  493. Y_ABORT_UNLESS(!ReadActive, "Multiple Read/Finish calls detected");
  494. if (!Finished && !HasInitialMetadata) {
  495. ReadActive = true;
  496. ReadCallback = std::move(callback);
  497. InitialMetadata = metadata;
  498. if (!ReadFinished) {
  499. Stream->ReadInitialMetadata(OnReadDoneTag.Prepare());
  500. }
  501. return;
  502. }
  503. if (!HasInitialMetadata) {
  504. if (FinishedOk) {
  505. status = Status;
  506. } else {
  507. status = TGrpcStatus::Internal("Unexpected error");
  508. }
  509. } else {
  510. GetInitialMetadata(metadata);
  511. }
  512. }
  513. callback(std::move(status));
  514. }
  515. void Read(TResponse* message, TReadCallback callback) override {
  516. TGrpcStatus status;
  517. {
  518. std::unique_lock<std::mutex> guard(Mutex);
  519. Y_ABORT_UNLESS(!ReadActive, "Multiple Read/Finish calls detected");
  520. if (!Finished) {
  521. ReadActive = true;
  522. ReadCallback = std::move(callback);
  523. if (!ReadFinished) {
  524. Stream->Read(message, OnReadDoneTag.Prepare());
  525. }
  526. return;
  527. }
  528. if (FinishedOk) {
  529. status = Status;
  530. } else {
  531. status = TGrpcStatus::Internal("Unexpected error");
  532. }
  533. }
  534. if (status.Ok()) {
  535. status = TGrpcStatus(grpc::StatusCode::OUT_OF_RANGE, "Read EOF");
  536. }
  537. callback(std::move(status));
  538. }
  539. void Finish(TReadCallback callback) override {
  540. TGrpcStatus status;
  541. {
  542. std::unique_lock<std::mutex> guard(Mutex);
  543. Y_ABORT_UNLESS(!ReadActive, "Multiple Read/Finish calls detected");
  544. if (!Finished) {
  545. ReadActive = true;
  546. FinishCallback = std::move(callback);
  547. if (!ReadFinished) {
  548. ReadFinished = true;
  549. }
  550. Stream->Finish(&Status, OnFinishedTag.Prepare());
  551. return;
  552. }
  553. if (FinishedOk) {
  554. status = Status;
  555. } else {
  556. status = TGrpcStatus::Internal("Unexpected error");
  557. }
  558. }
  559. callback(std::move(status));
  560. }
  561. void AddFinishedCallback(TReadCallback callback) override {
  562. Y_ABORT_UNLESS(callback, "Unexpected empty callback");
  563. TGrpcStatus status;
  564. {
  565. std::unique_lock<std::mutex> guard(Mutex);
  566. if (!Finished) {
  567. FinishedCallbacks.emplace_back().swap(callback);
  568. return;
  569. }
  570. if (FinishedOk) {
  571. status = Status;
  572. } else if (Cancelled) {
  573. status = TGrpcStatus(grpc::StatusCode::CANCELLED, "Stream cancelled");
  574. } else {
  575. status = TGrpcStatus::Internal("Unexpected error");
  576. }
  577. }
  578. callback(std::move(status));
  579. }
  580. private:
  581. void Start(TStub& stub, const TRequest& request, TAsyncRequest asyncRequest, IQueueClientContextProvider* provider) {
  582. auto context = provider->CreateContext();
  583. if (!context) {
  584. auto callback = std::move(Callback);
  585. TGrpcStatus status(grpc::StatusCode::CANCELLED, "Client is shutting down");
  586. callback(std::move(status), nullptr);
  587. return;
  588. }
  589. {
  590. std::unique_lock<std::mutex> guard(Mutex);
  591. LocalContext = context;
  592. Stream = (stub.*asyncRequest)(&Context, request, context->CompletionQueue(), OnStartDoneTag.Prepare());
  593. }
  594. context->SubscribeStop([self = TPtr(this)] {
  595. self->Cancel();
  596. });
  597. }
  598. void OnReadDone(bool ok) {
  599. TGrpcStatus status;
  600. TReadCallback callback;
  601. std::unordered_multimap<TString, TString>* initialMetadata = nullptr;
  602. {
  603. std::unique_lock<std::mutex> guard(Mutex);
  604. Y_ABORT_UNLESS(ReadActive, "Unexpected Read done callback");
  605. Y_ABORT_UNLESS(!ReadFinished, "Unexpected ReadFinished flag");
  606. if (!ok || Cancelled) {
  607. ReadFinished = true;
  608. Stream->Finish(&Status, OnFinishedTag.Prepare());
  609. if (!ok) {
  610. // Keep ReadActive=true, so callback is called
  611. // after the call is finished with an error
  612. return;
  613. }
  614. }
  615. callback = std::move(ReadCallback);
  616. ReadCallback = nullptr;
  617. ReadActive = false;
  618. initialMetadata = InitialMetadata;
  619. InitialMetadata = nullptr;
  620. HasInitialMetadata = true;
  621. }
  622. if (initialMetadata) {
  623. GetInitialMetadata(initialMetadata);
  624. }
  625. callback(std::move(status));
  626. }
  627. void OnStartDone(bool ok) {
  628. TReaderCallback callback;
  629. {
  630. std::unique_lock<std::mutex> guard(Mutex);
  631. Started = true;
  632. if (!ok || Cancelled) {
  633. ReadFinished = true;
  634. Stream->Finish(&Status, OnFinishedTag.Prepare());
  635. return;
  636. }
  637. callback = std::move(Callback);
  638. Callback = nullptr;
  639. }
  640. callback({ }, typename TBase::TPtr(this));
  641. }
  642. void OnFinished(bool ok) {
  643. TGrpcStatus status;
  644. std::vector<TReadCallback> finishedCallbacks;
  645. TReaderCallback startCallback;
  646. TReadCallback readCallback;
  647. TReadCallback finishCallback;
  648. {
  649. std::unique_lock<std::mutex> guard(Mutex);
  650. Finished = true;
  651. FinishedOk = ok;
  652. LocalContext.reset();
  653. if (ok) {
  654. status = Status;
  655. } else if (Cancelled) {
  656. status = TGrpcStatus(grpc::StatusCode::CANCELLED, "Stream cancelled");
  657. } else {
  658. status = TGrpcStatus::Internal("Unexpected error");
  659. }
  660. finishedCallbacks.swap(FinishedCallbacks);
  661. if (Callback) {
  662. Y_ABORT_UNLESS(!ReadActive);
  663. startCallback = std::move(Callback);
  664. Callback = nullptr;
  665. } else if (ReadActive) {
  666. if (ReadCallback) {
  667. readCallback = std::move(ReadCallback);
  668. ReadCallback = nullptr;
  669. } else {
  670. finishCallback = std::move(FinishCallback);
  671. FinishCallback = nullptr;
  672. }
  673. ReadActive = false;
  674. }
  675. }
  676. for (auto& finishedCallback : finishedCallbacks) {
  677. auto statusCopy = status;
  678. finishedCallback(std::move(statusCopy));
  679. }
  680. if (startCallback) {
  681. if (status.Ok()) {
  682. status = TGrpcStatus(grpc::StatusCode::UNKNOWN, "Unknown stream failure");
  683. }
  684. startCallback(std::move(status), nullptr);
  685. } else if (readCallback) {
  686. if (status.Ok()) {
  687. status = TGrpcStatus(grpc::StatusCode::OUT_OF_RANGE, "Read EOF");
  688. for (const auto& [name, value] : Context.GetServerTrailingMetadata()) {
  689. status.ServerTrailingMetadata.emplace(
  690. TString(name.begin(), name.end()),
  691. TString(value.begin(), value.end()));
  692. }
  693. }
  694. readCallback(std::move(status));
  695. } else if (finishCallback) {
  696. finishCallback(std::move(status));
  697. }
  698. }
  699. TReaderCallback Callback;
  700. TAsyncReaderPtr Stream;
  701. using TFixedEvent = TQueueClientFixedEvent<TSelf>;
  702. std::mutex Mutex;
  703. TFixedEvent OnReadDoneTag = { this, &TSelf::OnReadDone };
  704. TFixedEvent OnStartDoneTag = { this, &TSelf::OnStartDone };
  705. TFixedEvent OnFinishedTag = { this, &TSelf::OnFinished };
  706. TReadCallback ReadCallback;
  707. TReadCallback FinishCallback;
  708. std::vector<TReadCallback> FinishedCallbacks;
  709. std::unordered_multimap<TString, TString>* InitialMetadata = nullptr;
  710. bool Started = false;
  711. bool HasInitialMetadata = false;
  712. bool ReadActive = false;
  713. bool ReadFinished = false;
  714. bool Finished = false;
  715. bool Cancelled = false;
  716. bool FinishedOk = false;
  717. };
  718. template<class TRequest, class TResponse>
  719. using TStreamConnectedCallback = std::function<void(TGrpcStatus&&, typename IStreamRequestReadWriteProcessor<TRequest, TResponse>::TPtr)>;
  720. template<class TStub, class TRequest, class TResponse>
  721. class TStreamRequestReadWriteProcessor
  722. : public IStreamRequestReadWriteProcessor<TRequest, TResponse>
  723. , public TGRpcRequestProcessorCommon {
  724. public:
  725. using TSelf = TStreamRequestReadWriteProcessor;
  726. using TBase = IStreamRequestReadWriteProcessor<TRequest, TResponse>;
  727. using TPtr = TIntrusivePtr<TSelf>;
  728. using TConnectedCallback = TStreamConnectedCallback<TRequest, TResponse>;
  729. using TReadCallback = typename TBase::TReadCallback;
  730. using TWriteCallback = typename TBase::TWriteCallback;
  731. using TAsyncReaderWriterPtr = std::unique_ptr<grpc::ClientAsyncReaderWriter<TRequest, TResponse>>;
  732. using TAsyncRequest = TAsyncReaderWriterPtr (TStub::*)(grpc::ClientContext*, grpc::CompletionQueue*, void*);
  733. explicit TStreamRequestReadWriteProcessor(TConnectedCallback&& callback)
  734. : ConnectedCallback(std::move(callback))
  735. {
  736. Y_ABORT_UNLESS(ConnectedCallback, "Missing connected callback");
  737. }
  738. void Cancel() override {
  739. Context.TryCancel();
  740. {
  741. std::unique_lock<std::mutex> guard(Mutex);
  742. Cancelled = true;
  743. if (Started && !(ReadFinished && WriteFinished)) {
  744. if (!ReadActive) {
  745. ReadFinished = true;
  746. }
  747. if (!WriteActive) {
  748. WriteFinished = true;
  749. }
  750. if (ReadFinished && WriteFinished) {
  751. Stream->Finish(&Status, OnFinishedTag.Prepare());
  752. }
  753. }
  754. }
  755. }
  756. void Write(TRequest&& request, TWriteCallback callback) override {
  757. TGrpcStatus status;
  758. {
  759. std::unique_lock<std::mutex> guard(Mutex);
  760. if (Cancelled || ReadFinished || WriteFinished) {
  761. status = TGrpcStatus(grpc::StatusCode::CANCELLED, "Write request dropped");
  762. } else if (WriteActive) {
  763. auto& item = WriteQueue.emplace_back();
  764. item.Callback.swap(callback);
  765. item.Request.Swap(&request);
  766. } else {
  767. WriteActive = true;
  768. WriteCallback.swap(callback);
  769. Stream->Write(request, OnWriteDoneTag.Prepare());
  770. }
  771. }
  772. if (!status.Ok() && callback) {
  773. callback(std::move(status));
  774. }
  775. }
  776. void ReadInitialMetadata(std::unordered_multimap<TString, TString>* metadata, TReadCallback callback) override {
  777. TGrpcStatus status;
  778. {
  779. std::unique_lock<std::mutex> guard(Mutex);
  780. Y_ABORT_UNLESS(!ReadActive, "Multiple Read/Finish calls detected");
  781. if (!Finished && !HasInitialMetadata) {
  782. ReadActive = true;
  783. ReadCallback = std::move(callback);
  784. InitialMetadata = metadata;
  785. if (!ReadFinished) {
  786. Stream->ReadInitialMetadata(OnReadDoneTag.Prepare());
  787. }
  788. return;
  789. }
  790. if (!HasInitialMetadata) {
  791. if (FinishedOk) {
  792. status = Status;
  793. } else {
  794. status = TGrpcStatus::Internal("Unexpected error");
  795. }
  796. } else {
  797. GetInitialMetadata(metadata);
  798. }
  799. }
  800. callback(std::move(status));
  801. }
  802. void Read(TResponse* message, TReadCallback callback) override {
  803. TGrpcStatus status;
  804. {
  805. std::unique_lock<std::mutex> guard(Mutex);
  806. Y_ABORT_UNLESS(!ReadActive, "Multiple Read/Finish calls detected");
  807. if (!Finished) {
  808. ReadActive = true;
  809. ReadCallback = std::move(callback);
  810. if (!ReadFinished) {
  811. Stream->Read(message, OnReadDoneTag.Prepare());
  812. }
  813. return;
  814. }
  815. if (FinishedOk) {
  816. status = Status;
  817. } else {
  818. status = TGrpcStatus::Internal("Unexpected error");
  819. }
  820. }
  821. if (status.Ok()) {
  822. status = TGrpcStatus(grpc::StatusCode::OUT_OF_RANGE, "Read EOF");
  823. }
  824. callback(std::move(status));
  825. }
  826. void Finish(TReadCallback callback) override {
  827. TGrpcStatus status;
  828. {
  829. std::unique_lock<std::mutex> guard(Mutex);
  830. Y_ABORT_UNLESS(!ReadActive, "Multiple Read/Finish calls detected");
  831. if (!Finished) {
  832. ReadActive = true;
  833. FinishCallback = std::move(callback);
  834. if (!ReadFinished) {
  835. ReadFinished = true;
  836. if (!WriteActive) {
  837. WriteFinished = true;
  838. }
  839. if (WriteFinished) {
  840. Stream->Finish(&Status, OnFinishedTag.Prepare());
  841. }
  842. }
  843. return;
  844. }
  845. if (FinishedOk) {
  846. status = Status;
  847. } else {
  848. status = TGrpcStatus::Internal("Unexpected error");
  849. }
  850. }
  851. callback(std::move(status));
  852. }
  853. void AddFinishedCallback(TReadCallback callback) override {
  854. Y_ABORT_UNLESS(callback, "Unexpected empty callback");
  855. TGrpcStatus status;
  856. {
  857. std::unique_lock<std::mutex> guard(Mutex);
  858. if (!Finished) {
  859. FinishedCallbacks.emplace_back().swap(callback);
  860. return;
  861. }
  862. if (FinishedOk) {
  863. status = Status;
  864. } else if (Cancelled) {
  865. status = TGrpcStatus(grpc::StatusCode::CANCELLED, "Stream cancelled");
  866. } else {
  867. status = TGrpcStatus::Internal("Unexpected error");
  868. }
  869. }
  870. callback(std::move(status));
  871. }
  872. private:
  873. template<typename> friend class TServiceConnection;
  874. void Start(TStub& stub, TAsyncRequest asyncRequest, IQueueClientContextProvider* provider) {
  875. auto context = provider->CreateContext();
  876. if (!context) {
  877. auto callback = std::move(ConnectedCallback);
  878. TGrpcStatus status(grpc::StatusCode::CANCELLED, "Client is shutting down");
  879. callback(std::move(status), nullptr);
  880. return;
  881. }
  882. {
  883. std::unique_lock<std::mutex> guard(Mutex);
  884. LocalContext = context;
  885. Stream = (stub.*asyncRequest)(&Context, context->CompletionQueue(), OnConnectedTag.Prepare());
  886. }
  887. context->SubscribeStop([self = TPtr(this)] {
  888. self->Cancel();
  889. });
  890. }
  891. private:
  892. void OnConnected(bool ok) {
  893. TConnectedCallback callback;
  894. {
  895. std::unique_lock<std::mutex> guard(Mutex);
  896. Started = true;
  897. if (!ok || Cancelled) {
  898. ReadFinished = true;
  899. WriteFinished = true;
  900. Stream->Finish(&Status, OnFinishedTag.Prepare());
  901. return;
  902. }
  903. callback = std::move(ConnectedCallback);
  904. ConnectedCallback = nullptr;
  905. }
  906. callback({ }, typename TBase::TPtr(this));
  907. }
  908. void OnReadDone(bool ok) {
  909. TGrpcStatus status;
  910. TReadCallback callback;
  911. std::unordered_multimap<TString, TString>* initialMetadata = nullptr;
  912. {
  913. std::unique_lock<std::mutex> guard(Mutex);
  914. Y_ABORT_UNLESS(ReadActive, "Unexpected Read done callback");
  915. Y_ABORT_UNLESS(!ReadFinished, "Unexpected ReadFinished flag");
  916. if (!ok || Cancelled || WriteFinished) {
  917. ReadFinished = true;
  918. if (!WriteActive) {
  919. WriteFinished = true;
  920. }
  921. if (WriteFinished) {
  922. Stream->Finish(&Status, OnFinishedTag.Prepare());
  923. }
  924. if (!ok) {
  925. // Keep ReadActive=true, so callback is called
  926. // after the call is finished with an error
  927. return;
  928. }
  929. }
  930. callback = std::move(ReadCallback);
  931. ReadCallback = nullptr;
  932. ReadActive = false;
  933. initialMetadata = InitialMetadata;
  934. InitialMetadata = nullptr;
  935. HasInitialMetadata = true;
  936. }
  937. if (initialMetadata) {
  938. GetInitialMetadata(initialMetadata);
  939. }
  940. callback(std::move(status));
  941. }
  942. void OnWriteDone(bool ok) {
  943. TWriteCallback okCallback;
  944. {
  945. std::unique_lock<std::mutex> guard(Mutex);
  946. Y_ABORT_UNLESS(WriteActive, "Unexpected Write done callback");
  947. Y_ABORT_UNLESS(!WriteFinished, "Unexpected WriteFinished flag");
  948. if (ok) {
  949. okCallback.swap(WriteCallback);
  950. } else if (WriteCallback) {
  951. // Put callback back on the queue until OnFinished
  952. auto& item = WriteQueue.emplace_front();
  953. item.Callback.swap(WriteCallback);
  954. }
  955. if (!ok || Cancelled) {
  956. WriteActive = false;
  957. WriteFinished = true;
  958. if (!ReadActive) {
  959. ReadFinished = true;
  960. }
  961. if (ReadFinished) {
  962. Stream->Finish(&Status, OnFinishedTag.Prepare());
  963. }
  964. } else if (!WriteQueue.empty()) {
  965. WriteCallback.swap(WriteQueue.front().Callback);
  966. Stream->Write(WriteQueue.front().Request, OnWriteDoneTag.Prepare());
  967. WriteQueue.pop_front();
  968. } else {
  969. WriteActive = false;
  970. if (ReadFinished) {
  971. WriteFinished = true;
  972. Stream->Finish(&Status, OnFinishedTag.Prepare());
  973. }
  974. }
  975. }
  976. if (okCallback) {
  977. okCallback(TGrpcStatus());
  978. }
  979. }
  980. void OnFinished(bool ok) {
  981. TGrpcStatus status;
  982. std::deque<TWriteItem> writesDropped;
  983. std::vector<TReadCallback> finishedCallbacks;
  984. TConnectedCallback connectedCallback;
  985. TReadCallback readCallback;
  986. TReadCallback finishCallback;
  987. {
  988. std::unique_lock<std::mutex> guard(Mutex);
  989. Finished = true;
  990. FinishedOk = ok;
  991. LocalContext.reset();
  992. if (ok) {
  993. status = Status;
  994. } else if (Cancelled) {
  995. status = TGrpcStatus(grpc::StatusCode::CANCELLED, "Stream cancelled");
  996. } else {
  997. status = TGrpcStatus::Internal("Unexpected error");
  998. }
  999. writesDropped.swap(WriteQueue);
  1000. finishedCallbacks.swap(FinishedCallbacks);
  1001. if (ConnectedCallback) {
  1002. Y_ABORT_UNLESS(!ReadActive);
  1003. connectedCallback = std::move(ConnectedCallback);
  1004. ConnectedCallback = nullptr;
  1005. } else if (ReadActive) {
  1006. if (ReadCallback) {
  1007. readCallback = std::move(ReadCallback);
  1008. ReadCallback = nullptr;
  1009. } else {
  1010. finishCallback = std::move(FinishCallback);
  1011. FinishCallback = nullptr;
  1012. }
  1013. ReadActive = false;
  1014. }
  1015. }
  1016. for (auto& item : writesDropped) {
  1017. if (item.Callback) {
  1018. TGrpcStatus writeStatus = status;
  1019. if (writeStatus.Ok()) {
  1020. writeStatus = TGrpcStatus(grpc::StatusCode::CANCELLED, "Write request dropped");
  1021. }
  1022. item.Callback(std::move(writeStatus));
  1023. }
  1024. }
  1025. for (auto& finishedCallback : finishedCallbacks) {
  1026. TGrpcStatus statusCopy = status;
  1027. finishedCallback(std::move(statusCopy));
  1028. }
  1029. if (connectedCallback) {
  1030. if (status.Ok()) {
  1031. status = TGrpcStatus(grpc::StatusCode::UNKNOWN, "Unknown stream failure");
  1032. }
  1033. connectedCallback(std::move(status), nullptr);
  1034. } else if (readCallback) {
  1035. if (status.Ok()) {
  1036. status = TGrpcStatus(grpc::StatusCode::OUT_OF_RANGE, "Read EOF");
  1037. for (const auto& [name, value] : Context.GetServerTrailingMetadata()) {
  1038. status.ServerTrailingMetadata.emplace(
  1039. TString(name.begin(), name.end()),
  1040. TString(value.begin(), value.end()));
  1041. }
  1042. }
  1043. readCallback(std::move(status));
  1044. } else if (finishCallback) {
  1045. finishCallback(std::move(status));
  1046. }
  1047. }
  1048. private:
  1049. struct TWriteItem {
  1050. TWriteCallback Callback;
  1051. TRequest Request;
  1052. };
  1053. private:
  1054. using TFixedEvent = TQueueClientFixedEvent<TSelf>;
  1055. TFixedEvent OnConnectedTag = { this, &TSelf::OnConnected };
  1056. TFixedEvent OnReadDoneTag = { this, &TSelf::OnReadDone };
  1057. TFixedEvent OnWriteDoneTag = { this, &TSelf::OnWriteDone };
  1058. TFixedEvent OnFinishedTag = { this, &TSelf::OnFinished };
  1059. private:
  1060. std::mutex Mutex;
  1061. TAsyncReaderWriterPtr Stream;
  1062. TConnectedCallback ConnectedCallback;
  1063. TReadCallback ReadCallback;
  1064. TReadCallback FinishCallback;
  1065. std::vector<TReadCallback> FinishedCallbacks;
  1066. std::deque<TWriteItem> WriteQueue;
  1067. TWriteCallback WriteCallback;
  1068. std::unordered_multimap<TString, TString>* InitialMetadata = nullptr;
  1069. bool Started = false;
  1070. bool HasInitialMetadata = false;
  1071. bool ReadActive = false;
  1072. bool ReadFinished = false;
  1073. bool WriteActive = false;
  1074. bool WriteFinished = false;
  1075. bool Finished = false;
  1076. bool Cancelled = false;
  1077. bool FinishedOk = false;
  1078. };
  1079. class TGRpcClientLow;
  1080. template<typename TGRpcService>
  1081. class TServiceConnection {
  1082. using TStub = typename TGRpcService::Stub;
  1083. friend class TGRpcClientLow;
  1084. public:
  1085. /*
  1086. * Start simple request
  1087. */
  1088. template<typename TRequest, typename TResponse>
  1089. void DoRequest(const TRequest& request,
  1090. TResponseCallback<TResponse> callback,
  1091. typename TSimpleRequestProcessor<TStub, TRequest, TResponse>::TAsyncRequest asyncRequest,
  1092. const TCallMeta& metas = { },
  1093. IQueueClientContextProvider* provider = nullptr)
  1094. {
  1095. auto processor = MakeIntrusive<TSimpleRequestProcessor<TStub, TRequest, TResponse>>(std::move(callback));
  1096. processor->ApplyMeta(metas);
  1097. processor->Start(*Stub_, asyncRequest, request, provider ? provider : Provider_);
  1098. }
  1099. /*
  1100. * Start simple request
  1101. */
  1102. template<typename TRequest, typename TResponse>
  1103. void DoAdvancedRequest(const TRequest& request,
  1104. TAdvancedResponseCallback<TResponse> callback,
  1105. typename TAdvancedRequestProcessor<TStub, TRequest, TResponse>::TAsyncRequest asyncRequest,
  1106. const TCallMeta& metas = { },
  1107. IQueueClientContextProvider* provider = nullptr)
  1108. {
  1109. auto processor = MakeIntrusive<TAdvancedRequestProcessor<TStub, TRequest, TResponse>>(std::move(callback));
  1110. processor->ApplyMeta(metas);
  1111. processor->Start(*Stub_, asyncRequest, request, provider ? provider : Provider_);
  1112. }
  1113. /*
  1114. * Start bidirectional streamming
  1115. */
  1116. template<typename TRequest, typename TResponse>
  1117. void DoStreamRequest(TStreamConnectedCallback<TRequest, TResponse> callback,
  1118. typename TStreamRequestReadWriteProcessor<TStub, TRequest, TResponse>::TAsyncRequest asyncRequest,
  1119. const TCallMeta& metas = { },
  1120. IQueueClientContextProvider* provider = nullptr)
  1121. {
  1122. auto processor = MakeIntrusive<TStreamRequestReadWriteProcessor<TStub, TRequest, TResponse>>(std::move(callback));
  1123. processor->ApplyMeta(metas);
  1124. processor->Start(*Stub_, std::move(asyncRequest), provider ? provider : Provider_);
  1125. }
  1126. /*
  1127. * Start streaming response reading (one request, many responses)
  1128. */
  1129. template<typename TRequest, typename TResponse>
  1130. void DoStreamRequest(const TRequest& request,
  1131. TStreamReaderCallback<TResponse> callback,
  1132. typename TStreamRequestReadProcessor<TStub, TRequest, TResponse>::TAsyncRequest asyncRequest,
  1133. const TCallMeta& metas = { },
  1134. IQueueClientContextProvider* provider = nullptr)
  1135. {
  1136. auto processor = MakeIntrusive<TStreamRequestReadProcessor<TStub, TRequest, TResponse>>(std::move(callback));
  1137. processor->ApplyMeta(metas);
  1138. processor->Start(*Stub_, request, std::move(asyncRequest), provider ? provider : Provider_);
  1139. }
  1140. private:
  1141. TServiceConnection(std::shared_ptr<grpc::ChannelInterface> ci,
  1142. IQueueClientContextProvider* provider)
  1143. : Stub_(TGRpcService::NewStub(ci))
  1144. , Provider_(provider)
  1145. {
  1146. Y_ABORT_UNLESS(Provider_, "Connection does not have a queue provider");
  1147. }
  1148. TServiceConnection(TStubsHolder& holder,
  1149. IQueueClientContextProvider* provider)
  1150. : Stub_(holder.GetOrCreateStub<TStub>())
  1151. , Provider_(provider)
  1152. {
  1153. Y_ABORT_UNLESS(Provider_, "Connection does not have a queue provider");
  1154. }
  1155. std::shared_ptr<TStub> Stub_;
  1156. IQueueClientContextProvider* Provider_;
  1157. };
  1158. class TGRpcClientLow
  1159. : public IQueueClientContextProvider
  1160. {
  1161. class TContextImpl;
  1162. friend class TContextImpl;
  1163. enum ECqState : TAtomicBase {
  1164. WORKING = 0,
  1165. STOP_SILENT = 1,
  1166. STOP_EXPLICIT = 2,
  1167. };
  1168. public:
  1169. explicit TGRpcClientLow(size_t numWorkerThread = DEFAULT_NUM_THREADS, bool useCompletionQueuePerThread = false);
  1170. ~TGRpcClientLow();
  1171. // Tries to stop all currently running requests (via their stop callbacks)
  1172. // Will shutdown CQ and drain events once all requests have finished
  1173. // No new requests may be started after this call
  1174. void Stop(bool wait = false);
  1175. // Waits until all currently running requests finish execution
  1176. void WaitIdle();
  1177. inline bool IsStopping() const {
  1178. switch (GetCqState()) {
  1179. case WORKING:
  1180. return false;
  1181. case STOP_SILENT:
  1182. case STOP_EXPLICIT:
  1183. return true;
  1184. }
  1185. Y_UNREACHABLE();
  1186. }
  1187. IQueueClientContextPtr CreateContext() override;
  1188. template<typename TGRpcService>
  1189. std::unique_ptr<TServiceConnection<TGRpcService>> CreateGRpcServiceConnection(const TGRpcClientConfig& config) {
  1190. return std::unique_ptr<TServiceConnection<TGRpcService>>(new TServiceConnection<TGRpcService>(CreateChannelInterface(config), this));
  1191. }
  1192. template<typename TGRpcService>
  1193. std::unique_ptr<TServiceConnection<TGRpcService>> CreateGRpcServiceConnection(TStubsHolder& holder) {
  1194. return std::unique_ptr<TServiceConnection<TGRpcService>>(new TServiceConnection<TGRpcService>(holder, this));
  1195. }
  1196. // Tests only, not thread-safe
  1197. void AddWorkerThreadForTest();
  1198. private:
  1199. using IThreadRef = std::unique_ptr<IThreadFactory::IThread>;
  1200. using CompletionQueueRef = std::unique_ptr<grpc::CompletionQueue>;
  1201. void Init(size_t numWorkerThread);
  1202. inline ECqState GetCqState() const { return (ECqState) AtomicGet(CqState_); }
  1203. inline void SetCqState(ECqState state) { AtomicSet(CqState_, state); }
  1204. void StopInternal(bool silent);
  1205. void WaitInternal();
  1206. void ForgetContext(TContextImpl* context);
  1207. private:
  1208. bool UseCompletionQueuePerThread_;
  1209. std::vector<CompletionQueueRef> CQS_;
  1210. std::vector<IThreadRef> WorkerThreads_;
  1211. TAtomic CqState_ = -1;
  1212. std::mutex Mtx_;
  1213. std::condition_variable ContextsEmpty_;
  1214. std::unordered_set<TContextImpl*> Contexts_;
  1215. std::mutex JoinMutex_;
  1216. };
  1217. } // namespace NGRpc