client_callback.h 47 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227
  1. //
  2. //
  3. // Copyright 2018 gRPC authors.
  4. //
  5. // Licensed under the Apache License, Version 2.0 (the "License");
  6. // you may not use this file except in compliance with the License.
  7. // You may obtain a copy of the License at
  8. //
  9. // http://www.apache.org/licenses/LICENSE-2.0
  10. //
  11. // Unless required by applicable law or agreed to in writing, software
  12. // distributed under the License is distributed on an "AS IS" BASIS,
  13. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. // See the License for the specific language governing permissions and
  15. // limitations under the License.
  16. //
  17. //
  18. #ifndef GRPCPP_SUPPORT_CLIENT_CALLBACK_H
  19. #define GRPCPP_SUPPORT_CLIENT_CALLBACK_H
  20. #include <atomic>
  21. #include <functional>
  22. #include <grpc/grpc.h>
  23. #include <grpc/support/log.h>
  24. #include <grpcpp/impl/call.h>
  25. #include <grpcpp/impl/call_op_set.h>
  26. #include <grpcpp/impl/sync.h>
  27. #include <grpcpp/support/callback_common.h>
  28. #include <grpcpp/support/config.h>
  29. #include <grpcpp/support/status.h>
  30. namespace grpc {
  31. class Channel;
  32. class ClientContext;
  33. namespace internal {
  34. class RpcMethod;
  35. /// Perform a callback-based unary call. May optionally specify the base
  36. /// class of the Request and Response so that the internal calls and structures
  37. /// below this may be based on those base classes and thus achieve code reuse
  38. /// across different RPCs (e.g., for protobuf, MessageLite would be a base
  39. /// class).
  40. /// TODO(vjpai): Combine as much as possible with the blocking unary call code
  41. template <class InputMessage, class OutputMessage,
  42. class BaseInputMessage = InputMessage,
  43. class BaseOutputMessage = OutputMessage>
  44. void CallbackUnaryCall(grpc::ChannelInterface* channel,
  45. const grpc::internal::RpcMethod& method,
  46. grpc::ClientContext* context,
  47. const InputMessage* request, OutputMessage* result,
  48. std::function<void(grpc::Status)> on_completion) {
  49. static_assert(std::is_base_of<BaseInputMessage, InputMessage>::value,
  50. "Invalid input message specification");
  51. static_assert(std::is_base_of<BaseOutputMessage, OutputMessage>::value,
  52. "Invalid output message specification");
  53. CallbackUnaryCallImpl<BaseInputMessage, BaseOutputMessage> x(
  54. channel, method, context, request, result, on_completion);
  55. }
  56. template <class InputMessage, class OutputMessage>
  57. class CallbackUnaryCallImpl {
  58. public:
  59. CallbackUnaryCallImpl(grpc::ChannelInterface* channel,
  60. const grpc::internal::RpcMethod& method,
  61. grpc::ClientContext* context,
  62. const InputMessage* request, OutputMessage* result,
  63. std::function<void(grpc::Status)> on_completion) {
  64. grpc::CompletionQueue* cq = channel->CallbackCQ();
  65. GPR_ASSERT(cq != nullptr);
  66. grpc::internal::Call call(channel->CreateCall(method, context, cq));
  67. using FullCallOpSet = grpc::internal::CallOpSet<
  68. grpc::internal::CallOpSendInitialMetadata,
  69. grpc::internal::CallOpSendMessage,
  70. grpc::internal::CallOpRecvInitialMetadata,
  71. grpc::internal::CallOpRecvMessage<OutputMessage>,
  72. grpc::internal::CallOpClientSendClose,
  73. grpc::internal::CallOpClientRecvStatus>;
  74. struct OpSetAndTag {
  75. FullCallOpSet opset;
  76. grpc::internal::CallbackWithStatusTag tag;
  77. };
  78. const size_t alloc_sz = sizeof(OpSetAndTag);
  79. auto* const alloced =
  80. static_cast<OpSetAndTag*>(grpc_call_arena_alloc(call.call(), alloc_sz));
  81. auto* ops = new (&alloced->opset) FullCallOpSet;
  82. auto* tag = new (&alloced->tag)
  83. grpc::internal::CallbackWithStatusTag(call.call(), on_completion, ops);
  84. // TODO(vjpai): Unify code with sync API as much as possible
  85. grpc::Status s = ops->SendMessagePtr(request);
  86. if (!s.ok()) {
  87. tag->force_run(s);
  88. return;
  89. }
  90. ops->SendInitialMetadata(&context->send_initial_metadata_,
  91. context->initial_metadata_flags());
  92. ops->RecvInitialMetadata(context);
  93. ops->RecvMessage(result);
  94. ops->AllowNoMessage();
  95. ops->ClientSendClose();
  96. ops->ClientRecvStatus(context, tag->status_ptr());
  97. ops->set_core_cq_tag(tag);
  98. call.PerformOps(ops);
  99. }
  100. };
  101. // Base class for public API classes.
  102. class ClientReactor {
  103. public:
  104. virtual ~ClientReactor() = default;
  105. /// Called by the library when all operations associated with this RPC have
  106. /// completed and all Holds have been removed. OnDone provides the RPC status
  107. /// outcome for both successful and failed RPCs. If it is never called on an
  108. /// RPC, it indicates an application-level problem (like failure to remove a
  109. /// hold).
  110. ///
  111. /// \param[in] s The status outcome of this RPC
  112. virtual void OnDone(const grpc::Status& /*s*/) = 0;
  113. /// InternalScheduleOnDone is not part of the API and is not meant to be
  114. /// overridden. It is virtual to allow successful builds for certain bazel
  115. /// build users that only want to depend on gRPC codegen headers and not the
  116. /// full library (although this is not a generally-supported option). Although
  117. /// the virtual call is slower than a direct call, this function is
  118. /// heavyweight and the cost of the virtual call is not much in comparison.
  119. /// This function may be removed or devirtualized in the future.
  120. virtual void InternalScheduleOnDone(grpc::Status s);
  121. /// InternalTrailersOnly is not part of the API and is not meant to be
  122. /// overridden. It is virtual to allow successful builds for certain bazel
  123. /// build users that only want to depend on gRPC codegen headers and not the
  124. /// full library (although this is not a generally-supported option). Although
  125. /// the virtual call is slower than a direct call, this function is
  126. /// heavyweight and the cost of the virtual call is not much in comparison.
  127. /// This function may be removed or devirtualized in the future.
  128. virtual bool InternalTrailersOnly(const grpc_call* call) const;
  129. };
  130. } // namespace internal
  131. // Forward declarations
  132. template <class Request, class Response>
  133. class ClientBidiReactor;
  134. template <class Response>
  135. class ClientReadReactor;
  136. template <class Request>
  137. class ClientWriteReactor;
  138. class ClientUnaryReactor;
  139. // NOTE: The streaming objects are not actually implemented in the public API.
  140. // These interfaces are provided for mocking only. Typical applications
  141. // will interact exclusively with the reactors that they define.
  142. template <class Request, class Response>
  143. class ClientCallbackReaderWriter {
  144. public:
  145. virtual ~ClientCallbackReaderWriter() {}
  146. virtual void StartCall() = 0;
  147. virtual void Write(const Request* req, grpc::WriteOptions options) = 0;
  148. virtual void WritesDone() = 0;
  149. virtual void Read(Response* resp) = 0;
  150. virtual void AddHold(int holds) = 0;
  151. virtual void RemoveHold() = 0;
  152. protected:
  153. void BindReactor(ClientBidiReactor<Request, Response>* reactor) {
  154. reactor->BindStream(this);
  155. }
  156. };
  157. template <class Response>
  158. class ClientCallbackReader {
  159. public:
  160. virtual ~ClientCallbackReader() {}
  161. virtual void StartCall() = 0;
  162. virtual void Read(Response* resp) = 0;
  163. virtual void AddHold(int holds) = 0;
  164. virtual void RemoveHold() = 0;
  165. protected:
  166. void BindReactor(ClientReadReactor<Response>* reactor) {
  167. reactor->BindReader(this);
  168. }
  169. };
  170. template <class Request>
  171. class ClientCallbackWriter {
  172. public:
  173. virtual ~ClientCallbackWriter() {}
  174. virtual void StartCall() = 0;
  175. void Write(const Request* req) { Write(req, grpc::WriteOptions()); }
  176. virtual void Write(const Request* req, grpc::WriteOptions options) = 0;
  177. void WriteLast(const Request* req, grpc::WriteOptions options) {
  178. Write(req, options.set_last_message());
  179. }
  180. virtual void WritesDone() = 0;
  181. virtual void AddHold(int holds) = 0;
  182. virtual void RemoveHold() = 0;
  183. protected:
  184. void BindReactor(ClientWriteReactor<Request>* reactor) {
  185. reactor->BindWriter(this);
  186. }
  187. };
  188. class ClientCallbackUnary {
  189. public:
  190. virtual ~ClientCallbackUnary() {}
  191. virtual void StartCall() = 0;
  192. protected:
  193. void BindReactor(ClientUnaryReactor* reactor);
  194. };
  195. // The following classes are the reactor interfaces that are to be implemented
  196. // by the user. They are passed in to the library as an argument to a call on a
  197. // stub (either a codegen-ed call or a generic call). The streaming RPC is
  198. // activated by calling StartCall, possibly after initiating StartRead,
  199. // StartWrite, or AddHold operations on the streaming object. Note that none of
  200. // the classes are pure; all reactions have a default empty reaction so that the
  201. // user class only needs to override those reactions that it cares about.
  202. // The reactor must be passed to the stub invocation before any of the below
  203. // operations can be called and its reactions will be invoked by the library in
  204. // response to the completion of various operations. Reactions must not include
  205. // blocking operations (such as blocking I/O, starting synchronous RPCs, or
  206. // waiting on condition variables). Reactions may be invoked concurrently,
  207. // except that OnDone is called after all others (assuming proper API usage).
  208. // The reactor may not be deleted until OnDone is called.
  209. /// \a ClientBidiReactor is the interface for a bidirectional streaming RPC.
  210. template <class Request, class Response>
  211. class ClientBidiReactor : public internal::ClientReactor {
  212. public:
  213. /// Activate the RPC and initiate any reads or writes that have been Start'ed
  214. /// before this call. All streaming RPCs issued by the client MUST have
  215. /// StartCall invoked on them (even if they are canceled) as this call is the
  216. /// activation of their lifecycle.
  217. void StartCall() { stream_->StartCall(); }
  218. /// Initiate a read operation (or post it for later initiation if StartCall
  219. /// has not yet been invoked).
  220. ///
  221. /// \param[out] resp Where to eventually store the read message. Valid when
  222. /// the library calls OnReadDone
  223. void StartRead(Response* resp) { stream_->Read(resp); }
  224. /// Initiate a write operation (or post it for later initiation if StartCall
  225. /// has not yet been invoked).
  226. ///
  227. /// \param[in] req The message to be written. The library does not take
  228. /// ownership but the caller must ensure that the message is
  229. /// not deleted or modified until OnWriteDone is called.
  230. void StartWrite(const Request* req) { StartWrite(req, grpc::WriteOptions()); }
  231. /// Initiate/post a write operation with specified options.
  232. ///
  233. /// \param[in] req The message to be written. The library does not take
  234. /// ownership but the caller must ensure that the message is
  235. /// not deleted or modified until OnWriteDone is called.
  236. /// \param[in] options The WriteOptions to use for writing this message
  237. void StartWrite(const Request* req, grpc::WriteOptions options) {
  238. stream_->Write(req, options);
  239. }
  240. /// Initiate/post a write operation with specified options and an indication
  241. /// that this is the last write (like StartWrite and StartWritesDone, merged).
  242. /// Note that calling this means that no more calls to StartWrite,
  243. /// StartWriteLast, or StartWritesDone are allowed.
  244. ///
  245. /// \param[in] req The message to be written. The library does not take
  246. /// ownership but the caller must ensure that the message is
  247. /// not deleted or modified until OnWriteDone is called.
  248. /// \param[in] options The WriteOptions to use for writing this message
  249. void StartWriteLast(const Request* req, grpc::WriteOptions options) {
  250. StartWrite(req, options.set_last_message());
  251. }
  252. /// Indicate that the RPC will have no more write operations. This can only be
  253. /// issued once for a given RPC. This is not required or allowed if
  254. /// StartWriteLast is used since that already has the same implication.
  255. /// Note that calling this means that no more calls to StartWrite,
  256. /// StartWriteLast, or StartWritesDone are allowed.
  257. void StartWritesDone() { stream_->WritesDone(); }
  258. /// Holds are needed if (and only if) this stream has operations that take
  259. /// place on it after StartCall but from outside one of the reactions
  260. /// (OnReadDone, etc). This is _not_ a common use of the streaming API.
  261. ///
  262. /// Holds must be added before calling StartCall. If a stream still has a hold
  263. /// in place, its resources will not be destroyed even if the status has
  264. /// already come in from the wire and there are currently no active callbacks
  265. /// outstanding. Similarly, the stream will not call OnDone if there are still
  266. /// holds on it.
  267. ///
  268. /// For example, if a StartRead or StartWrite operation is going to be
  269. /// initiated from elsewhere in the application, the application should call
  270. /// AddHold or AddMultipleHolds before StartCall. If there is going to be,
  271. /// for example, a read-flow and a write-flow taking place outside the
  272. /// reactions, then call AddMultipleHolds(2) before StartCall. When the
  273. /// application knows that it won't issue any more read operations (such as
  274. /// when a read comes back as not ok), it should issue a RemoveHold(). It
  275. /// should also call RemoveHold() again after it does StartWriteLast or
  276. /// StartWritesDone that indicates that there will be no more write ops.
  277. /// The number of RemoveHold calls must match the total number of AddHold
  278. /// calls plus the number of holds added by AddMultipleHolds.
  279. /// The argument to AddMultipleHolds must be positive.
  280. void AddHold() { AddMultipleHolds(1); }
  281. void AddMultipleHolds(int holds) {
  282. GPR_DEBUG_ASSERT(holds > 0);
  283. stream_->AddHold(holds);
  284. }
  285. void RemoveHold() { stream_->RemoveHold(); }
  286. /// Notifies the application that all operations associated with this RPC
  287. /// have completed and all Holds have been removed. OnDone provides the RPC
  288. /// status outcome for both successful and failed RPCs and will be called in
  289. /// all cases. If it is not called, it indicates an application-level problem
  290. /// (like failure to remove a hold).
  291. ///
  292. /// \param[in] s The status outcome of this RPC
  293. void OnDone(const grpc::Status& /*s*/) override {}
  294. /// Notifies the application that a read of initial metadata from the
  295. /// server is done. If the application chooses not to implement this method,
  296. /// it can assume that the initial metadata has been read before the first
  297. /// call of OnReadDone or OnDone.
  298. ///
  299. /// \param[in] ok Was the initial metadata read successfully? If false, no
  300. /// new read/write operation will succeed, and any further
  301. /// Start* operations should not be called.
  302. virtual void OnReadInitialMetadataDone(bool /*ok*/) {}
  303. /// Notifies the application that a StartRead operation completed.
  304. ///
  305. /// \param[in] ok Was it successful? If false, no new read/write operation
  306. /// will succeed, and any further Start* should not be called.
  307. virtual void OnReadDone(bool /*ok*/) {}
  308. /// Notifies the application that a StartWrite or StartWriteLast operation
  309. /// completed.
  310. ///
  311. /// \param[in] ok Was it successful? If false, no new read/write operation
  312. /// will succeed, and any further Start* should not be called.
  313. virtual void OnWriteDone(bool /*ok*/) {}
  314. /// Notifies the application that a StartWritesDone operation completed. Note
  315. /// that this is only used on explicit StartWritesDone operations and not for
  316. /// those that are implicitly invoked as part of a StartWriteLast.
  317. ///
  318. /// \param[in] ok Was it successful? If false, the application will later see
  319. /// the failure reflected as a bad status in OnDone and no
  320. /// further Start* should be called.
  321. virtual void OnWritesDoneDone(bool /*ok*/) {}
  322. private:
  323. friend class ClientCallbackReaderWriter<Request, Response>;
  324. void BindStream(ClientCallbackReaderWriter<Request, Response>* stream) {
  325. stream_ = stream;
  326. }
  327. ClientCallbackReaderWriter<Request, Response>* stream_;
  328. };
  329. /// \a ClientReadReactor is the interface for a server-streaming RPC.
  330. /// All public methods behave as in ClientBidiReactor.
  331. template <class Response>
  332. class ClientReadReactor : public internal::ClientReactor {
  333. public:
  334. void StartCall() { reader_->StartCall(); }
  335. void StartRead(Response* resp) { reader_->Read(resp); }
  336. void AddHold() { AddMultipleHolds(1); }
  337. void AddMultipleHolds(int holds) {
  338. GPR_DEBUG_ASSERT(holds > 0);
  339. reader_->AddHold(holds);
  340. }
  341. void RemoveHold() { reader_->RemoveHold(); }
  342. void OnDone(const grpc::Status& /*s*/) override {}
  343. virtual void OnReadInitialMetadataDone(bool /*ok*/) {}
  344. virtual void OnReadDone(bool /*ok*/) {}
  345. private:
  346. friend class ClientCallbackReader<Response>;
  347. void BindReader(ClientCallbackReader<Response>* reader) { reader_ = reader; }
  348. ClientCallbackReader<Response>* reader_;
  349. };
  350. /// \a ClientWriteReactor is the interface for a client-streaming RPC.
  351. /// All public methods behave as in ClientBidiReactor.
  352. template <class Request>
  353. class ClientWriteReactor : public internal::ClientReactor {
  354. public:
  355. void StartCall() { writer_->StartCall(); }
  356. void StartWrite(const Request* req) { StartWrite(req, grpc::WriteOptions()); }
  357. void StartWrite(const Request* req, grpc::WriteOptions options) {
  358. writer_->Write(req, options);
  359. }
  360. void StartWriteLast(const Request* req, grpc::WriteOptions options) {
  361. StartWrite(req, options.set_last_message());
  362. }
  363. void StartWritesDone() { writer_->WritesDone(); }
  364. void AddHold() { AddMultipleHolds(1); }
  365. void AddMultipleHolds(int holds) {
  366. GPR_DEBUG_ASSERT(holds > 0);
  367. writer_->AddHold(holds);
  368. }
  369. void RemoveHold() { writer_->RemoveHold(); }
  370. void OnDone(const grpc::Status& /*s*/) override {}
  371. virtual void OnReadInitialMetadataDone(bool /*ok*/) {}
  372. virtual void OnWriteDone(bool /*ok*/) {}
  373. virtual void OnWritesDoneDone(bool /*ok*/) {}
  374. private:
  375. friend class ClientCallbackWriter<Request>;
  376. void BindWriter(ClientCallbackWriter<Request>* writer) { writer_ = writer; }
  377. ClientCallbackWriter<Request>* writer_;
  378. };
  379. /// \a ClientUnaryReactor is a reactor-style interface for a unary RPC.
  380. /// This is _not_ a common way of invoking a unary RPC. In practice, this
  381. /// option should be used only if the unary RPC wants to receive initial
  382. /// metadata without waiting for the response to complete. Most deployments of
  383. /// RPC systems do not use this option, but it is needed for generality.
  384. /// All public methods behave as in ClientBidiReactor.
  385. /// StartCall is included for consistency with the other reactor flavors: even
  386. /// though there are no StartRead or StartWrite operations to queue before the
  387. /// call (that is part of the unary call itself) and there is no reactor object
  388. /// being created as a result of this call, we keep a consistent 2-phase
  389. /// initiation API among all the reactor flavors.
  390. class ClientUnaryReactor : public internal::ClientReactor {
  391. public:
  392. void StartCall() { call_->StartCall(); }
  393. void OnDone(const grpc::Status& /*s*/) override {}
  394. virtual void OnReadInitialMetadataDone(bool /*ok*/) {}
  395. private:
  396. friend class ClientCallbackUnary;
  397. void BindCall(ClientCallbackUnary* call) { call_ = call; }
  398. ClientCallbackUnary* call_;
  399. };
  400. // Define function out-of-line from class to avoid forward declaration issue
  401. inline void ClientCallbackUnary::BindReactor(ClientUnaryReactor* reactor) {
  402. reactor->BindCall(this);
  403. }
  404. namespace internal {
  405. // Forward declare factory classes for friendship
  406. template <class Request, class Response>
  407. class ClientCallbackReaderWriterFactory;
  408. template <class Response>
  409. class ClientCallbackReaderFactory;
  410. template <class Request>
  411. class ClientCallbackWriterFactory;
  412. template <class Request, class Response>
  413. class ClientCallbackReaderWriterImpl
  414. : public ClientCallbackReaderWriter<Request, Response> {
  415. public:
  416. // always allocated against a call arena, no memory free required
  417. static void operator delete(void* /*ptr*/, std::size_t size) {
  418. GPR_ASSERT(size == sizeof(ClientCallbackReaderWriterImpl));
  419. }
  420. // This operator should never be called as the memory should be freed as part
  421. // of the arena destruction. It only exists to provide a matching operator
  422. // delete to the operator new so that some compilers will not complain (see
  423. // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this
  424. // there are no tests catching the compiler warning.
  425. static void operator delete(void*, void*) { GPR_ASSERT(false); }
  426. void StartCall() Y_ABSL_LOCKS_EXCLUDED(start_mu_) override {
  427. // This call initiates two batches, plus any backlog, each with a callback
  428. // 1. Send initial metadata (unless corked) + recv initial metadata
  429. // 2. Any read backlog
  430. // 3. Any write backlog
  431. // 4. Recv trailing metadata (unless corked)
  432. if (!start_corked_) {
  433. start_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
  434. context_->initial_metadata_flags());
  435. }
  436. call_.PerformOps(&start_ops_);
  437. {
  438. grpc::internal::MutexLock lock(&start_mu_);
  439. if (backlog_.read_ops) {
  440. call_.PerformOps(&read_ops_);
  441. }
  442. if (backlog_.write_ops) {
  443. call_.PerformOps(&write_ops_);
  444. }
  445. if (backlog_.writes_done_ops) {
  446. call_.PerformOps(&writes_done_ops_);
  447. }
  448. call_.PerformOps(&finish_ops_);
  449. // The last thing in this critical section is to set started_ so that it
  450. // can be used lock-free as well.
  451. started_.store(true, std::memory_order_release);
  452. }
  453. // MaybeFinish outside the lock to make sure that destruction of this object
  454. // doesn't take place while holding the lock (which would cause the lock to
  455. // be released after destruction)
  456. this->MaybeFinish(/*from_reaction=*/false);
  457. }
  458. void Read(Response* msg) override {
  459. read_ops_.RecvMessage(msg);
  460. callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
  461. if (GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) {
  462. grpc::internal::MutexLock lock(&start_mu_);
  463. if (GPR_LIKELY(!started_.load(std::memory_order_relaxed))) {
  464. backlog_.read_ops = true;
  465. return;
  466. }
  467. }
  468. call_.PerformOps(&read_ops_);
  469. }
  470. void Write(const Request* msg, grpc::WriteOptions options)
  471. Y_ABSL_LOCKS_EXCLUDED(start_mu_) override {
  472. if (options.is_last_message()) {
  473. options.set_buffer_hint();
  474. write_ops_.ClientSendClose();
  475. }
  476. // TODO(vjpai): don't assert
  477. GPR_ASSERT(write_ops_.SendMessagePtr(msg, options).ok());
  478. callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
  479. if (GPR_UNLIKELY(corked_write_needed_)) {
  480. write_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
  481. context_->initial_metadata_flags());
  482. corked_write_needed_ = false;
  483. }
  484. if (GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) {
  485. grpc::internal::MutexLock lock(&start_mu_);
  486. if (GPR_LIKELY(!started_.load(std::memory_order_relaxed))) {
  487. backlog_.write_ops = true;
  488. return;
  489. }
  490. }
  491. call_.PerformOps(&write_ops_);
  492. }
  493. void WritesDone() Y_ABSL_LOCKS_EXCLUDED(start_mu_) override {
  494. writes_done_ops_.ClientSendClose();
  495. writes_done_tag_.Set(
  496. call_.call(),
  497. [this](bool ok) {
  498. reactor_->OnWritesDoneDone(ok);
  499. MaybeFinish(/*from_reaction=*/true);
  500. },
  501. &writes_done_ops_, /*can_inline=*/false);
  502. writes_done_ops_.set_core_cq_tag(&writes_done_tag_);
  503. callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
  504. if (GPR_UNLIKELY(corked_write_needed_)) {
  505. writes_done_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
  506. context_->initial_metadata_flags());
  507. corked_write_needed_ = false;
  508. }
  509. if (GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) {
  510. grpc::internal::MutexLock lock(&start_mu_);
  511. if (GPR_LIKELY(!started_.load(std::memory_order_relaxed))) {
  512. backlog_.writes_done_ops = true;
  513. return;
  514. }
  515. }
  516. call_.PerformOps(&writes_done_ops_);
  517. }
  518. void AddHold(int holds) override {
  519. callbacks_outstanding_.fetch_add(holds, std::memory_order_relaxed);
  520. }
  521. void RemoveHold() override { MaybeFinish(/*from_reaction=*/false); }
  522. private:
  523. friend class ClientCallbackReaderWriterFactory<Request, Response>;
  524. ClientCallbackReaderWriterImpl(grpc::internal::Call call,
  525. grpc::ClientContext* context,
  526. ClientBidiReactor<Request, Response>* reactor)
  527. : context_(context),
  528. call_(call),
  529. reactor_(reactor),
  530. start_corked_(context_->initial_metadata_corked_),
  531. corked_write_needed_(start_corked_) {
  532. this->BindReactor(reactor);
  533. // Set up the unchanging parts of the start, read, and write tags and ops.
  534. start_tag_.Set(
  535. call_.call(),
  536. [this](bool ok) {
  537. reactor_->OnReadInitialMetadataDone(
  538. ok && !reactor_->InternalTrailersOnly(call_.call()));
  539. MaybeFinish(/*from_reaction=*/true);
  540. },
  541. &start_ops_, /*can_inline=*/false);
  542. start_ops_.RecvInitialMetadata(context_);
  543. start_ops_.set_core_cq_tag(&start_tag_);
  544. write_tag_.Set(
  545. call_.call(),
  546. [this](bool ok) {
  547. reactor_->OnWriteDone(ok);
  548. MaybeFinish(/*from_reaction=*/true);
  549. },
  550. &write_ops_, /*can_inline=*/false);
  551. write_ops_.set_core_cq_tag(&write_tag_);
  552. read_tag_.Set(
  553. call_.call(),
  554. [this](bool ok) {
  555. reactor_->OnReadDone(ok);
  556. MaybeFinish(/*from_reaction=*/true);
  557. },
  558. &read_ops_, /*can_inline=*/false);
  559. read_ops_.set_core_cq_tag(&read_tag_);
  560. // Also set up the Finish tag and op set.
  561. finish_tag_.Set(
  562. call_.call(),
  563. [this](bool /*ok*/) { MaybeFinish(/*from_reaction=*/true); },
  564. &finish_ops_,
  565. /*can_inline=*/false);
  566. finish_ops_.ClientRecvStatus(context_, &finish_status_);
  567. finish_ops_.set_core_cq_tag(&finish_tag_);
  568. }
  569. // MaybeFinish can be called from reactions or from user-initiated operations
  570. // like StartCall or RemoveHold. If this is the last operation or hold on this
  571. // object, it will invoke the OnDone reaction. If MaybeFinish was called from
  572. // a reaction, it can call OnDone directly. If not, it would need to schedule
  573. // OnDone onto an executor thread to avoid the possibility of deadlocking with
  574. // any locks in the user code that invoked it.
  575. void MaybeFinish(bool from_reaction) {
  576. if (GPR_UNLIKELY(callbacks_outstanding_.fetch_sub(
  577. 1, std::memory_order_acq_rel) == 1)) {
  578. grpc::Status s = std::move(finish_status_);
  579. auto* reactor = reactor_;
  580. auto* call = call_.call();
  581. this->~ClientCallbackReaderWriterImpl();
  582. grpc_call_unref(call);
  583. if (GPR_LIKELY(from_reaction)) {
  584. reactor->OnDone(s);
  585. } else {
  586. reactor->InternalScheduleOnDone(std::move(s));
  587. }
  588. }
  589. }
  590. grpc::ClientContext* const context_;
  591. grpc::internal::Call call_;
  592. ClientBidiReactor<Request, Response>* const reactor_;
  593. grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
  594. grpc::internal::CallOpRecvInitialMetadata>
  595. start_ops_;
  596. grpc::internal::CallbackWithSuccessTag start_tag_;
  597. const bool start_corked_;
  598. bool corked_write_needed_; // no lock needed since only accessed in
  599. // Write/WritesDone which cannot be concurrent
  600. grpc::internal::CallOpSet<grpc::internal::CallOpClientRecvStatus> finish_ops_;
  601. grpc::internal::CallbackWithSuccessTag finish_tag_;
  602. grpc::Status finish_status_;
  603. grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
  604. grpc::internal::CallOpSendMessage,
  605. grpc::internal::CallOpClientSendClose>
  606. write_ops_;
  607. grpc::internal::CallbackWithSuccessTag write_tag_;
  608. grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
  609. grpc::internal::CallOpClientSendClose>
  610. writes_done_ops_;
  611. grpc::internal::CallbackWithSuccessTag writes_done_tag_;
  612. grpc::internal::CallOpSet<grpc::internal::CallOpRecvMessage<Response>>
  613. read_ops_;
  614. grpc::internal::CallbackWithSuccessTag read_tag_;
  615. struct StartCallBacklog {
  616. bool write_ops = false;
  617. bool writes_done_ops = false;
  618. bool read_ops = false;
  619. };
  620. StartCallBacklog backlog_ Y_ABSL_GUARDED_BY(start_mu_);
  621. // Minimum of 3 callbacks to pre-register for start ops, StartCall, and finish
  622. std::atomic<intptr_t> callbacks_outstanding_{3};
  623. std::atomic_bool started_{false};
  624. grpc::internal::Mutex start_mu_;
  625. };
  626. template <class Request, class Response>
  627. class ClientCallbackReaderWriterFactory {
  628. public:
  629. static void Create(grpc::ChannelInterface* channel,
  630. const grpc::internal::RpcMethod& method,
  631. grpc::ClientContext* context,
  632. ClientBidiReactor<Request, Response>* reactor) {
  633. grpc::internal::Call call =
  634. channel->CreateCall(method, context, channel->CallbackCQ());
  635. grpc_call_ref(call.call());
  636. new (grpc_call_arena_alloc(
  637. call.call(), sizeof(ClientCallbackReaderWriterImpl<Request, Response>)))
  638. ClientCallbackReaderWriterImpl<Request, Response>(call, context,
  639. reactor);
  640. }
  641. };
  642. template <class Response>
  643. class ClientCallbackReaderImpl : public ClientCallbackReader<Response> {
  644. public:
  645. // always allocated against a call arena, no memory free required
  646. static void operator delete(void* /*ptr*/, std::size_t size) {
  647. GPR_ASSERT(size == sizeof(ClientCallbackReaderImpl));
  648. }
  649. // This operator should never be called as the memory should be freed as part
  650. // of the arena destruction. It only exists to provide a matching operator
  651. // delete to the operator new so that some compilers will not complain (see
  652. // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this
  653. // there are no tests catching the compiler warning.
  654. static void operator delete(void*, void*) { GPR_ASSERT(false); }
  655. void StartCall() override {
  656. // This call initiates two batches, plus any backlog, each with a callback
  657. // 1. Send initial metadata (unless corked) + recv initial metadata
  658. // 2. Any backlog
  659. // 3. Recv trailing metadata
  660. start_tag_.Set(
  661. call_.call(),
  662. [this](bool ok) {
  663. reactor_->OnReadInitialMetadataDone(
  664. ok && !reactor_->InternalTrailersOnly(call_.call()));
  665. MaybeFinish(/*from_reaction=*/true);
  666. },
  667. &start_ops_, /*can_inline=*/false);
  668. start_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
  669. context_->initial_metadata_flags());
  670. start_ops_.RecvInitialMetadata(context_);
  671. start_ops_.set_core_cq_tag(&start_tag_);
  672. call_.PerformOps(&start_ops_);
  673. // Also set up the read tag so it doesn't have to be set up each time
  674. read_tag_.Set(
  675. call_.call(),
  676. [this](bool ok) {
  677. reactor_->OnReadDone(ok);
  678. MaybeFinish(/*from_reaction=*/true);
  679. },
  680. &read_ops_, /*can_inline=*/false);
  681. read_ops_.set_core_cq_tag(&read_tag_);
  682. {
  683. grpc::internal::MutexLock lock(&start_mu_);
  684. if (backlog_.read_ops) {
  685. call_.PerformOps(&read_ops_);
  686. }
  687. started_.store(true, std::memory_order_release);
  688. }
  689. finish_tag_.Set(
  690. call_.call(),
  691. [this](bool /*ok*/) { MaybeFinish(/*from_reaction=*/true); },
  692. &finish_ops_, /*can_inline=*/false);
  693. finish_ops_.ClientRecvStatus(context_, &finish_status_);
  694. finish_ops_.set_core_cq_tag(&finish_tag_);
  695. call_.PerformOps(&finish_ops_);
  696. }
  697. void Read(Response* msg) override {
  698. read_ops_.RecvMessage(msg);
  699. callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
  700. if (GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) {
  701. grpc::internal::MutexLock lock(&start_mu_);
  702. if (GPR_LIKELY(!started_.load(std::memory_order_relaxed))) {
  703. backlog_.read_ops = true;
  704. return;
  705. }
  706. }
  707. call_.PerformOps(&read_ops_);
  708. }
  709. void AddHold(int holds) override {
  710. callbacks_outstanding_.fetch_add(holds, std::memory_order_relaxed);
  711. }
  712. void RemoveHold() override { MaybeFinish(/*from_reaction=*/false); }
  713. private:
  714. friend class ClientCallbackReaderFactory<Response>;
  715. template <class Request>
  716. ClientCallbackReaderImpl(grpc::internal::Call call,
  717. grpc::ClientContext* context, Request* request,
  718. ClientReadReactor<Response>* reactor)
  719. : context_(context), call_(call), reactor_(reactor) {
  720. this->BindReactor(reactor);
  721. // TODO(vjpai): don't assert
  722. GPR_ASSERT(start_ops_.SendMessagePtr(request).ok());
  723. start_ops_.ClientSendClose();
  724. }
  725. // MaybeFinish behaves as in ClientCallbackReaderWriterImpl.
  726. void MaybeFinish(bool from_reaction) {
  727. if (GPR_UNLIKELY(callbacks_outstanding_.fetch_sub(
  728. 1, std::memory_order_acq_rel) == 1)) {
  729. grpc::Status s = std::move(finish_status_);
  730. auto* reactor = reactor_;
  731. auto* call = call_.call();
  732. this->~ClientCallbackReaderImpl();
  733. grpc_call_unref(call);
  734. if (GPR_LIKELY(from_reaction)) {
  735. reactor->OnDone(s);
  736. } else {
  737. reactor->InternalScheduleOnDone(std::move(s));
  738. }
  739. }
  740. }
  741. grpc::ClientContext* const context_;
  742. grpc::internal::Call call_;
  743. ClientReadReactor<Response>* const reactor_;
  744. grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
  745. grpc::internal::CallOpSendMessage,
  746. grpc::internal::CallOpClientSendClose,
  747. grpc::internal::CallOpRecvInitialMetadata>
  748. start_ops_;
  749. grpc::internal::CallbackWithSuccessTag start_tag_;
  750. grpc::internal::CallOpSet<grpc::internal::CallOpClientRecvStatus> finish_ops_;
  751. grpc::internal::CallbackWithSuccessTag finish_tag_;
  752. grpc::Status finish_status_;
  753. grpc::internal::CallOpSet<grpc::internal::CallOpRecvMessage<Response>>
  754. read_ops_;
  755. grpc::internal::CallbackWithSuccessTag read_tag_;
  756. struct StartCallBacklog {
  757. bool read_ops = false;
  758. };
  759. StartCallBacklog backlog_ Y_ABSL_GUARDED_BY(start_mu_);
  760. // Minimum of 2 callbacks to pre-register for start and finish
  761. std::atomic<intptr_t> callbacks_outstanding_{2};
  762. std::atomic_bool started_{false};
  763. grpc::internal::Mutex start_mu_;
  764. };
  765. template <class Response>
  766. class ClientCallbackReaderFactory {
  767. public:
  768. template <class Request>
  769. static void Create(grpc::ChannelInterface* channel,
  770. const grpc::internal::RpcMethod& method,
  771. grpc::ClientContext* context, const Request* request,
  772. ClientReadReactor<Response>* reactor) {
  773. grpc::internal::Call call =
  774. channel->CreateCall(method, context, channel->CallbackCQ());
  775. grpc_call_ref(call.call());
  776. new (grpc_call_arena_alloc(call.call(),
  777. sizeof(ClientCallbackReaderImpl<Response>)))
  778. ClientCallbackReaderImpl<Response>(call, context, request, reactor);
  779. }
  780. };
  781. template <class Request>
  782. class ClientCallbackWriterImpl : public ClientCallbackWriter<Request> {
  783. public:
  784. // always allocated against a call arena, no memory free required
  785. static void operator delete(void* /*ptr*/, std::size_t size) {
  786. GPR_ASSERT(size == sizeof(ClientCallbackWriterImpl));
  787. }
  788. // This operator should never be called as the memory should be freed as part
  789. // of the arena destruction. It only exists to provide a matching operator
  790. // delete to the operator new so that some compilers will not complain (see
  791. // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this
  792. // there are no tests catching the compiler warning.
  793. static void operator delete(void*, void*) { GPR_ASSERT(false); }
  794. void StartCall() Y_ABSL_LOCKS_EXCLUDED(start_mu_) override {
  795. // This call initiates two batches, plus any backlog, each with a callback
  796. // 1. Send initial metadata (unless corked) + recv initial metadata
  797. // 2. Any backlog
  798. // 3. Recv trailing metadata
  799. if (!start_corked_) {
  800. start_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
  801. context_->initial_metadata_flags());
  802. }
  803. call_.PerformOps(&start_ops_);
  804. {
  805. grpc::internal::MutexLock lock(&start_mu_);
  806. if (backlog_.write_ops) {
  807. call_.PerformOps(&write_ops_);
  808. }
  809. if (backlog_.writes_done_ops) {
  810. call_.PerformOps(&writes_done_ops_);
  811. }
  812. call_.PerformOps(&finish_ops_);
  813. // The last thing in this critical section is to set started_ so that it
  814. // can be used lock-free as well.
  815. started_.store(true, std::memory_order_release);
  816. }
  817. // MaybeFinish outside the lock to make sure that destruction of this object
  818. // doesn't take place while holding the lock (which would cause the lock to
  819. // be released after destruction)
  820. this->MaybeFinish(/*from_reaction=*/false);
  821. }
  822. void Write(const Request* msg, grpc::WriteOptions options)
  823. Y_ABSL_LOCKS_EXCLUDED(start_mu_) override {
  824. if (GPR_UNLIKELY(options.is_last_message())) {
  825. options.set_buffer_hint();
  826. write_ops_.ClientSendClose();
  827. }
  828. // TODO(vjpai): don't assert
  829. GPR_ASSERT(write_ops_.SendMessagePtr(msg, options).ok());
  830. callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
  831. if (GPR_UNLIKELY(corked_write_needed_)) {
  832. write_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
  833. context_->initial_metadata_flags());
  834. corked_write_needed_ = false;
  835. }
  836. if (GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) {
  837. grpc::internal::MutexLock lock(&start_mu_);
  838. if (GPR_LIKELY(!started_.load(std::memory_order_relaxed))) {
  839. backlog_.write_ops = true;
  840. return;
  841. }
  842. }
  843. call_.PerformOps(&write_ops_);
  844. }
  845. void WritesDone() Y_ABSL_LOCKS_EXCLUDED(start_mu_) override {
  846. writes_done_ops_.ClientSendClose();
  847. writes_done_tag_.Set(
  848. call_.call(),
  849. [this](bool ok) {
  850. reactor_->OnWritesDoneDone(ok);
  851. MaybeFinish(/*from_reaction=*/true);
  852. },
  853. &writes_done_ops_, /*can_inline=*/false);
  854. writes_done_ops_.set_core_cq_tag(&writes_done_tag_);
  855. callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
  856. if (GPR_UNLIKELY(corked_write_needed_)) {
  857. writes_done_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
  858. context_->initial_metadata_flags());
  859. corked_write_needed_ = false;
  860. }
  861. if (GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) {
  862. grpc::internal::MutexLock lock(&start_mu_);
  863. if (GPR_LIKELY(!started_.load(std::memory_order_relaxed))) {
  864. backlog_.writes_done_ops = true;
  865. return;
  866. }
  867. }
  868. call_.PerformOps(&writes_done_ops_);
  869. }
  870. void AddHold(int holds) override {
  871. callbacks_outstanding_.fetch_add(holds, std::memory_order_relaxed);
  872. }
  873. void RemoveHold() override { MaybeFinish(/*from_reaction=*/false); }
  874. private:
  875. friend class ClientCallbackWriterFactory<Request>;
  876. template <class Response>
  877. ClientCallbackWriterImpl(grpc::internal::Call call,
  878. grpc::ClientContext* context, Response* response,
  879. ClientWriteReactor<Request>* reactor)
  880. : context_(context),
  881. call_(call),
  882. reactor_(reactor),
  883. start_corked_(context_->initial_metadata_corked_),
  884. corked_write_needed_(start_corked_) {
  885. this->BindReactor(reactor);
  886. // Set up the unchanging parts of the start and write tags and ops.
  887. start_tag_.Set(
  888. call_.call(),
  889. [this](bool ok) {
  890. reactor_->OnReadInitialMetadataDone(
  891. ok && !reactor_->InternalTrailersOnly(call_.call()));
  892. MaybeFinish(/*from_reaction=*/true);
  893. },
  894. &start_ops_, /*can_inline=*/false);
  895. start_ops_.RecvInitialMetadata(context_);
  896. start_ops_.set_core_cq_tag(&start_tag_);
  897. write_tag_.Set(
  898. call_.call(),
  899. [this](bool ok) {
  900. reactor_->OnWriteDone(ok);
  901. MaybeFinish(/*from_reaction=*/true);
  902. },
  903. &write_ops_, /*can_inline=*/false);
  904. write_ops_.set_core_cq_tag(&write_tag_);
  905. // Also set up the Finish tag and op set.
  906. finish_ops_.RecvMessage(response);
  907. finish_ops_.AllowNoMessage();
  908. finish_tag_.Set(
  909. call_.call(),
  910. [this](bool /*ok*/) { MaybeFinish(/*from_reaction=*/true); },
  911. &finish_ops_,
  912. /*can_inline=*/false);
  913. finish_ops_.ClientRecvStatus(context_, &finish_status_);
  914. finish_ops_.set_core_cq_tag(&finish_tag_);
  915. }
  916. // MaybeFinish behaves as in ClientCallbackReaderWriterImpl.
  917. void MaybeFinish(bool from_reaction) {
  918. if (GPR_UNLIKELY(callbacks_outstanding_.fetch_sub(
  919. 1, std::memory_order_acq_rel) == 1)) {
  920. grpc::Status s = std::move(finish_status_);
  921. auto* reactor = reactor_;
  922. auto* call = call_.call();
  923. this->~ClientCallbackWriterImpl();
  924. grpc_call_unref(call);
  925. if (GPR_LIKELY(from_reaction)) {
  926. reactor->OnDone(s);
  927. } else {
  928. reactor->InternalScheduleOnDone(std::move(s));
  929. }
  930. }
  931. }
  932. grpc::ClientContext* const context_;
  933. grpc::internal::Call call_;
  934. ClientWriteReactor<Request>* const reactor_;
  935. grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
  936. grpc::internal::CallOpRecvInitialMetadata>
  937. start_ops_;
  938. grpc::internal::CallbackWithSuccessTag start_tag_;
  939. const bool start_corked_;
  940. bool corked_write_needed_; // no lock needed since only accessed in
  941. // Write/WritesDone which cannot be concurrent
  942. grpc::internal::CallOpSet<grpc::internal::CallOpGenericRecvMessage,
  943. grpc::internal::CallOpClientRecvStatus>
  944. finish_ops_;
  945. grpc::internal::CallbackWithSuccessTag finish_tag_;
  946. grpc::Status finish_status_;
  947. grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
  948. grpc::internal::CallOpSendMessage,
  949. grpc::internal::CallOpClientSendClose>
  950. write_ops_;
  951. grpc::internal::CallbackWithSuccessTag write_tag_;
  952. grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
  953. grpc::internal::CallOpClientSendClose>
  954. writes_done_ops_;
  955. grpc::internal::CallbackWithSuccessTag writes_done_tag_;
  956. struct StartCallBacklog {
  957. bool write_ops = false;
  958. bool writes_done_ops = false;
  959. };
  960. StartCallBacklog backlog_ Y_ABSL_GUARDED_BY(start_mu_);
  961. // Minimum of 3 callbacks to pre-register for start ops, StartCall, and finish
  962. std::atomic<intptr_t> callbacks_outstanding_{3};
  963. std::atomic_bool started_{false};
  964. grpc::internal::Mutex start_mu_;
  965. };
  966. template <class Request>
  967. class ClientCallbackWriterFactory {
  968. public:
  969. template <class Response>
  970. static void Create(grpc::ChannelInterface* channel,
  971. const grpc::internal::RpcMethod& method,
  972. grpc::ClientContext* context, Response* response,
  973. ClientWriteReactor<Request>* reactor) {
  974. grpc::internal::Call call =
  975. channel->CreateCall(method, context, channel->CallbackCQ());
  976. grpc_call_ref(call.call());
  977. new (grpc_call_arena_alloc(call.call(),
  978. sizeof(ClientCallbackWriterImpl<Request>)))
  979. ClientCallbackWriterImpl<Request>(call, context, response, reactor);
  980. }
  981. };
  982. class ClientCallbackUnaryImpl final : public ClientCallbackUnary {
  983. public:
  984. // always allocated against a call arena, no memory free required
  985. static void operator delete(void* /*ptr*/, std::size_t size) {
  986. GPR_ASSERT(size == sizeof(ClientCallbackUnaryImpl));
  987. }
  988. // This operator should never be called as the memory should be freed as part
  989. // of the arena destruction. It only exists to provide a matching operator
  990. // delete to the operator new so that some compilers will not complain (see
  991. // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this
  992. // there are no tests catching the compiler warning.
  993. static void operator delete(void*, void*) { GPR_ASSERT(false); }
  994. void StartCall() override {
  995. // This call initiates two batches, each with a callback
  996. // 1. Send initial metadata + write + writes done + recv initial metadata
  997. // 2. Read message, recv trailing metadata
  998. start_tag_.Set(
  999. call_.call(),
  1000. [this](bool ok) {
  1001. reactor_->OnReadInitialMetadataDone(
  1002. ok && !reactor_->InternalTrailersOnly(call_.call()));
  1003. MaybeFinish();
  1004. },
  1005. &start_ops_, /*can_inline=*/false);
  1006. start_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
  1007. context_->initial_metadata_flags());
  1008. start_ops_.RecvInitialMetadata(context_);
  1009. start_ops_.set_core_cq_tag(&start_tag_);
  1010. call_.PerformOps(&start_ops_);
  1011. finish_tag_.Set(
  1012. call_.call(), [this](bool /*ok*/) { MaybeFinish(); }, &finish_ops_,
  1013. /*can_inline=*/false);
  1014. finish_ops_.ClientRecvStatus(context_, &finish_status_);
  1015. finish_ops_.set_core_cq_tag(&finish_tag_);
  1016. call_.PerformOps(&finish_ops_);
  1017. }
  1018. private:
  1019. friend class ClientCallbackUnaryFactory;
  1020. template <class Request, class Response>
  1021. ClientCallbackUnaryImpl(grpc::internal::Call call,
  1022. grpc::ClientContext* context, Request* request,
  1023. Response* response, ClientUnaryReactor* reactor)
  1024. : context_(context), call_(call), reactor_(reactor) {
  1025. this->BindReactor(reactor);
  1026. // TODO(vjpai): don't assert
  1027. GPR_ASSERT(start_ops_.SendMessagePtr(request).ok());
  1028. start_ops_.ClientSendClose();
  1029. finish_ops_.RecvMessage(response);
  1030. finish_ops_.AllowNoMessage();
  1031. }
  1032. // In the unary case, MaybeFinish is only ever invoked from a
  1033. // library-initiated reaction, so it will just directly call OnDone if this is
  1034. // the last reaction for this RPC.
  1035. void MaybeFinish() {
  1036. if (GPR_UNLIKELY(callbacks_outstanding_.fetch_sub(
  1037. 1, std::memory_order_acq_rel) == 1)) {
  1038. grpc::Status s = std::move(finish_status_);
  1039. auto* reactor = reactor_;
  1040. auto* call = call_.call();
  1041. this->~ClientCallbackUnaryImpl();
  1042. grpc_call_unref(call);
  1043. reactor->OnDone(s);
  1044. }
  1045. }
  1046. grpc::ClientContext* const context_;
  1047. grpc::internal::Call call_;
  1048. ClientUnaryReactor* const reactor_;
  1049. grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
  1050. grpc::internal::CallOpSendMessage,
  1051. grpc::internal::CallOpClientSendClose,
  1052. grpc::internal::CallOpRecvInitialMetadata>
  1053. start_ops_;
  1054. grpc::internal::CallbackWithSuccessTag start_tag_;
  1055. grpc::internal::CallOpSet<grpc::internal::CallOpGenericRecvMessage,
  1056. grpc::internal::CallOpClientRecvStatus>
  1057. finish_ops_;
  1058. grpc::internal::CallbackWithSuccessTag finish_tag_;
  1059. grpc::Status finish_status_;
  1060. // This call will have 2 callbacks: start and finish
  1061. std::atomic<intptr_t> callbacks_outstanding_{2};
  1062. };
  1063. class ClientCallbackUnaryFactory {
  1064. public:
  1065. template <class Request, class Response, class BaseRequest = Request,
  1066. class BaseResponse = Response>
  1067. static void Create(grpc::ChannelInterface* channel,
  1068. const grpc::internal::RpcMethod& method,
  1069. grpc::ClientContext* context, const Request* request,
  1070. Response* response, ClientUnaryReactor* reactor) {
  1071. grpc::internal::Call call =
  1072. channel->CreateCall(method, context, channel->CallbackCQ());
  1073. grpc_call_ref(call.call());
  1074. new (grpc_call_arena_alloc(call.call(), sizeof(ClientCallbackUnaryImpl)))
  1075. ClientCallbackUnaryImpl(call, context,
  1076. static_cast<const BaseRequest*>(request),
  1077. static_cast<BaseResponse*>(response), reactor);
  1078. }
  1079. };
  1080. } // namespace internal
  1081. } // namespace grpc
  1082. #endif // GRPCPP_SUPPORT_CLIENT_CALLBACK_H