server_callback.h 29 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797
  1. //
  2. //
  3. // Copyright 2018 gRPC authors.
  4. //
  5. // Licensed under the Apache License, Version 2.0 (the "License");
  6. // you may not use this file except in compliance with the License.
  7. // You may obtain a copy of the License at
  8. //
  9. // http://www.apache.org/licenses/LICENSE-2.0
  10. //
  11. // Unless required by applicable law or agreed to in writing, software
  12. // distributed under the License is distributed on an "AS IS" BASIS,
  13. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. // See the License for the specific language governing permissions and
  15. // limitations under the License.
  16. //
  17. //
  18. #ifndef GRPCPP_SUPPORT_SERVER_CALLBACK_H
  19. #define GRPCPP_SUPPORT_SERVER_CALLBACK_H
  20. #include <atomic>
  21. #include <functional>
  22. #include <type_traits>
  23. #include <grpcpp/impl/call.h>
  24. #include <grpcpp/impl/call_op_set.h>
  25. #include <grpcpp/impl/sync.h>
  26. #include <grpcpp/support/callback_common.h>
  27. #include <grpcpp/support/config.h>
  28. #include <grpcpp/support/message_allocator.h>
  29. #include <grpcpp/support/status.h>
  30. namespace grpc {
  31. // Declare base class of all reactors as internal
  32. namespace internal {
  33. // Forward declarations
  34. template <class Request, class Response>
  35. class CallbackUnaryHandler;
  36. template <class Request, class Response>
  37. class CallbackClientStreamingHandler;
  38. template <class Request, class Response>
  39. class CallbackServerStreamingHandler;
  40. template <class Request, class Response>
  41. class CallbackBidiHandler;
  42. class ServerReactor {
  43. public:
  44. virtual ~ServerReactor() = default;
  45. virtual void OnDone() = 0;
  46. virtual void OnCancel() = 0;
  47. // The following is not API. It is for internal use only and specifies whether
  48. // all reactions of this Reactor can be run without an extra executor
  49. // scheduling. This should only be used for internally-defined reactors with
  50. // trivial reactions.
  51. virtual bool InternalInlineable() { return false; }
  52. private:
  53. template <class Request, class Response>
  54. friend class CallbackUnaryHandler;
  55. template <class Request, class Response>
  56. friend class CallbackClientStreamingHandler;
  57. template <class Request, class Response>
  58. friend class CallbackServerStreamingHandler;
  59. template <class Request, class Response>
  60. friend class CallbackBidiHandler;
  61. };
  62. /// The base class of ServerCallbackUnary etc.
  63. class ServerCallbackCall {
  64. public:
  65. virtual ~ServerCallbackCall() {}
  66. // This object is responsible for tracking when it is safe to call OnDone and
  67. // OnCancel. OnDone should not be called until the method handler is complete,
  68. // Finish has been called, the ServerContext CompletionOp (which tracks
  69. // cancellation or successful completion) has completed, and all outstanding
  70. // Read/Write actions have seen their reactions. OnCancel should not be called
  71. // until after the method handler is done and the RPC has completed with a
  72. // cancellation. This is tracked by counting how many of these conditions have
  73. // been met and calling OnCancel when none remain unmet.
  74. // Public versions of MaybeDone: one where we don't know the reactor in
  75. // advance (used for the ServerContext CompletionOp), and one for where we
  76. // know the inlineability of the OnDone reaction. You should set the inline
  77. // flag to true if either the Reactor is InternalInlineable() or if this
  78. // callback is already being forced to run dispatched to an executor
  79. // (typically because it contains additional work than just the MaybeDone).
  80. void MaybeDone() {
  81. if (GPR_UNLIKELY(Unref() == 1)) {
  82. ScheduleOnDone(reactor()->InternalInlineable());
  83. }
  84. }
  85. void MaybeDone(bool inline_ondone) {
  86. if (GPR_UNLIKELY(Unref() == 1)) {
  87. ScheduleOnDone(inline_ondone);
  88. }
  89. }
  90. // Fast version called with known reactor passed in, used from derived
  91. // classes, typically in non-cancel case
  92. void MaybeCallOnCancel(ServerReactor* reactor) {
  93. if (GPR_UNLIKELY(UnblockCancellation())) {
  94. CallOnCancel(reactor);
  95. }
  96. }
  97. // Slower version called from object that doesn't know the reactor a priori
  98. // (such as the ServerContext CompletionOp which is formed before the
  99. // reactor). This is used in cancel cases only, so it's ok to be slower and
  100. // invoke a virtual function.
  101. void MaybeCallOnCancel() {
  102. if (GPR_UNLIKELY(UnblockCancellation())) {
  103. CallOnCancel(reactor());
  104. }
  105. }
  106. protected:
  107. /// Increases the reference count
  108. void Ref() { callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed); }
  109. private:
  110. virtual ServerReactor* reactor() = 0;
  111. // CallOnDone performs the work required at completion of the RPC: invoking
  112. // the OnDone function and doing all necessary cleanup. This function is only
  113. // ever invoked on a fully-Unref'fed ServerCallbackCall.
  114. virtual void CallOnDone() = 0;
  115. // If the OnDone reaction is inlineable, execute it inline. Otherwise send it
  116. // to an executor.
  117. void ScheduleOnDone(bool inline_ondone);
  118. // If the OnCancel reaction is inlineable, execute it inline. Otherwise send
  119. // it to an executor.
  120. void CallOnCancel(ServerReactor* reactor);
  121. // Implement the cancellation constraint counter. Return true if OnCancel
  122. // should be called, false otherwise.
  123. bool UnblockCancellation() {
  124. return on_cancel_conditions_remaining_.fetch_sub(
  125. 1, std::memory_order_acq_rel) == 1;
  126. }
  127. /// Decreases the reference count and returns the previous value
  128. int Unref() {
  129. return callbacks_outstanding_.fetch_sub(1, std::memory_order_acq_rel);
  130. }
  131. std::atomic_int on_cancel_conditions_remaining_{2};
  132. std::atomic_int callbacks_outstanding_{
  133. 3}; // reserve for start, Finish, and CompletionOp
  134. };
  135. template <class Request, class Response>
  136. class DefaultMessageHolder : public MessageHolder<Request, Response> {
  137. public:
  138. DefaultMessageHolder() {
  139. this->set_request(&request_obj_);
  140. this->set_response(&response_obj_);
  141. }
  142. void Release() override {
  143. // the object is allocated in the call arena.
  144. this->~DefaultMessageHolder<Request, Response>();
  145. }
  146. private:
  147. Request request_obj_;
  148. Response response_obj_;
  149. };
  150. } // namespace internal
  151. // Forward declarations
  152. class ServerUnaryReactor;
  153. template <class Request>
  154. class ServerReadReactor;
  155. template <class Response>
  156. class ServerWriteReactor;
  157. template <class Request, class Response>
  158. class ServerBidiReactor;
  159. // NOTE: The actual call/stream object classes are provided as API only to
  160. // support mocking. There are no implementations of these class interfaces in
  161. // the API.
  162. class ServerCallbackUnary : public internal::ServerCallbackCall {
  163. public:
  164. ~ServerCallbackUnary() override {}
  165. virtual void Finish(grpc::Status s) = 0;
  166. virtual void SendInitialMetadata() = 0;
  167. protected:
  168. // Use a template rather than explicitly specifying ServerUnaryReactor to
  169. // delay binding and avoid a circular forward declaration issue
  170. template <class Reactor>
  171. void BindReactor(Reactor* reactor) {
  172. reactor->InternalBindCall(this);
  173. }
  174. };
  175. template <class Request>
  176. class ServerCallbackReader : public internal::ServerCallbackCall {
  177. public:
  178. ~ServerCallbackReader() override {}
  179. virtual void Finish(grpc::Status s) = 0;
  180. virtual void SendInitialMetadata() = 0;
  181. virtual void Read(Request* msg) = 0;
  182. protected:
  183. void BindReactor(ServerReadReactor<Request>* reactor) {
  184. reactor->InternalBindReader(this);
  185. }
  186. };
  187. template <class Response>
  188. class ServerCallbackWriter : public internal::ServerCallbackCall {
  189. public:
  190. ~ServerCallbackWriter() override {}
  191. virtual void Finish(grpc::Status s) = 0;
  192. virtual void SendInitialMetadata() = 0;
  193. virtual void Write(const Response* msg, grpc::WriteOptions options) = 0;
  194. virtual void WriteAndFinish(const Response* msg, grpc::WriteOptions options,
  195. grpc::Status s) = 0;
  196. protected:
  197. void BindReactor(ServerWriteReactor<Response>* reactor) {
  198. reactor->InternalBindWriter(this);
  199. }
  200. };
  201. template <class Request, class Response>
  202. class ServerCallbackReaderWriter : public internal::ServerCallbackCall {
  203. public:
  204. ~ServerCallbackReaderWriter() override {}
  205. virtual void Finish(grpc::Status s) = 0;
  206. virtual void SendInitialMetadata() = 0;
  207. virtual void Read(Request* msg) = 0;
  208. virtual void Write(const Response* msg, grpc::WriteOptions options) = 0;
  209. virtual void WriteAndFinish(const Response* msg, grpc::WriteOptions options,
  210. grpc::Status s) = 0;
  211. protected:
  212. void BindReactor(ServerBidiReactor<Request, Response>* reactor) {
  213. reactor->InternalBindStream(this);
  214. }
  215. };
  216. // The following classes are the reactor interfaces that are to be implemented
  217. // by the user, returned as the output parameter of the method handler for a
  218. // callback method. Note that none of the classes are pure; all reactions have a
  219. // default empty reaction so that the user class only needs to override those
  220. // reactions that it cares about. The reaction methods will be invoked by the
  221. // library in response to the completion of various operations. Reactions must
  222. // not include blocking operations (such as blocking I/O, starting synchronous
  223. // RPCs, or waiting on condition variables). Reactions may be invoked
  224. // concurrently, except that OnDone is called after all others (assuming proper
  225. // API usage). The reactor may not be deleted until OnDone is called.
  226. /// \a ServerBidiReactor is the interface for a bidirectional streaming RPC.
  227. template <class Request, class Response>
  228. class ServerBidiReactor : public internal::ServerReactor {
  229. public:
  230. // NOTE: Initializing stream_ as a constructor initializer rather than a
  231. // default initializer because gcc-4.x requires a copy constructor for
  232. // default initializing a templated member, which isn't ok for atomic.
  233. // TODO(vjpai): Switch to default constructor and default initializer when
  234. // gcc-4.x is no longer supported
  235. ServerBidiReactor() : stream_(nullptr) {}
  236. ~ServerBidiReactor() override = default;
  237. /// Send any initial metadata stored in the RPC context. If not invoked,
  238. /// any initial metadata will be passed along with the first Write or the
  239. /// Finish (if there are no writes).
  240. void StartSendInitialMetadata() Y_ABSL_LOCKS_EXCLUDED(stream_mu_) {
  241. ServerCallbackReaderWriter<Request, Response>* stream =
  242. stream_.load(std::memory_order_acquire);
  243. if (stream == nullptr) {
  244. grpc::internal::MutexLock l(&stream_mu_);
  245. stream = stream_.load(std::memory_order_relaxed);
  246. if (stream == nullptr) {
  247. backlog_.send_initial_metadata_wanted = true;
  248. return;
  249. }
  250. }
  251. stream->SendInitialMetadata();
  252. }
  253. /// Initiate a read operation.
  254. ///
  255. /// \param[out] req Where to eventually store the read message. Valid when
  256. /// the library calls OnReadDone
  257. void StartRead(Request* req) Y_ABSL_LOCKS_EXCLUDED(stream_mu_) {
  258. ServerCallbackReaderWriter<Request, Response>* stream =
  259. stream_.load(std::memory_order_acquire);
  260. if (stream == nullptr) {
  261. grpc::internal::MutexLock l(&stream_mu_);
  262. stream = stream_.load(std::memory_order_relaxed);
  263. if (stream == nullptr) {
  264. backlog_.read_wanted = req;
  265. return;
  266. }
  267. }
  268. stream->Read(req);
  269. }
  270. /// Initiate a write operation.
  271. ///
  272. /// \param[in] resp The message to be written. The library does not take
  273. /// ownership but the caller must ensure that the message is
  274. /// not deleted or modified until OnWriteDone is called.
  275. void StartWrite(const Response* resp) {
  276. StartWrite(resp, grpc::WriteOptions());
  277. }
  278. /// Initiate a write operation with specified options.
  279. ///
  280. /// \param[in] resp The message to be written. The library does not take
  281. /// ownership but the caller must ensure that the message is
  282. /// not deleted or modified until OnWriteDone is called.
  283. /// \param[in] options The WriteOptions to use for writing this message
  284. void StartWrite(const Response* resp, grpc::WriteOptions options)
  285. Y_ABSL_LOCKS_EXCLUDED(stream_mu_) {
  286. ServerCallbackReaderWriter<Request, Response>* stream =
  287. stream_.load(std::memory_order_acquire);
  288. if (stream == nullptr) {
  289. grpc::internal::MutexLock l(&stream_mu_);
  290. stream = stream_.load(std::memory_order_relaxed);
  291. if (stream == nullptr) {
  292. backlog_.write_wanted = resp;
  293. backlog_.write_options_wanted = options;
  294. return;
  295. }
  296. }
  297. stream->Write(resp, options);
  298. }
  299. /// Initiate a write operation with specified options and final RPC Status,
  300. /// which also causes any trailing metadata for this RPC to be sent out.
  301. /// StartWriteAndFinish is like merging StartWriteLast and Finish into a
  302. /// single step. A key difference, though, is that this operation doesn't have
  303. /// an OnWriteDone reaction - it is considered complete only when OnDone is
  304. /// available. An RPC can either have StartWriteAndFinish or Finish, but not
  305. /// both.
  306. ///
  307. /// \param[in] resp The message to be written. The library does not take
  308. /// ownership but the caller must ensure that the message is
  309. /// not deleted or modified until OnDone is called.
  310. /// \param[in] options The WriteOptions to use for writing this message
  311. /// \param[in] s The status outcome of this RPC
  312. void StartWriteAndFinish(const Response* resp, grpc::WriteOptions options,
  313. grpc::Status s) Y_ABSL_LOCKS_EXCLUDED(stream_mu_) {
  314. ServerCallbackReaderWriter<Request, Response>* stream =
  315. stream_.load(std::memory_order_acquire);
  316. if (stream == nullptr) {
  317. grpc::internal::MutexLock l(&stream_mu_);
  318. stream = stream_.load(std::memory_order_relaxed);
  319. if (stream == nullptr) {
  320. backlog_.write_and_finish_wanted = true;
  321. backlog_.write_wanted = resp;
  322. backlog_.write_options_wanted = options;
  323. backlog_.status_wanted = std::move(s);
  324. return;
  325. }
  326. }
  327. stream->WriteAndFinish(resp, options, std::move(s));
  328. }
  329. /// Inform system of a planned write operation with specified options, but
  330. /// allow the library to schedule the actual write coalesced with the writing
  331. /// of trailing metadata (which takes place on a Finish call).
  332. ///
  333. /// \param[in] resp The message to be written. The library does not take
  334. /// ownership but the caller must ensure that the message is
  335. /// not deleted or modified until OnWriteDone is called.
  336. /// \param[in] options The WriteOptions to use for writing this message
  337. void StartWriteLast(const Response* resp, grpc::WriteOptions options) {
  338. StartWrite(resp, options.set_last_message());
  339. }
  340. /// Indicate that the stream is to be finished and the trailing metadata and
  341. /// RPC status are to be sent. Every RPC MUST be finished using either Finish
  342. /// or StartWriteAndFinish (but not both), even if the RPC is already
  343. /// cancelled.
  344. ///
  345. /// \param[in] s The status outcome of this RPC
  346. void Finish(grpc::Status s) Y_ABSL_LOCKS_EXCLUDED(stream_mu_) {
  347. ServerCallbackReaderWriter<Request, Response>* stream =
  348. stream_.load(std::memory_order_acquire);
  349. if (stream == nullptr) {
  350. grpc::internal::MutexLock l(&stream_mu_);
  351. stream = stream_.load(std::memory_order_relaxed);
  352. if (stream == nullptr) {
  353. backlog_.finish_wanted = true;
  354. backlog_.status_wanted = std::move(s);
  355. return;
  356. }
  357. }
  358. stream->Finish(std::move(s));
  359. }
  360. /// Notifies the application that an explicit StartSendInitialMetadata
  361. /// operation completed. Not used when the sending of initial metadata
  362. /// piggybacks onto the first write.
  363. ///
  364. /// \param[in] ok Was it successful? If false, no further write-side operation
  365. /// will succeed.
  366. virtual void OnSendInitialMetadataDone(bool /*ok*/) {}
  367. /// Notifies the application that a StartRead operation completed.
  368. ///
  369. /// \param[in] ok Was it successful? If false, no further read-side operation
  370. /// will succeed.
  371. virtual void OnReadDone(bool /*ok*/) {}
  372. /// Notifies the application that a StartWrite (or StartWriteLast) operation
  373. /// completed.
  374. ///
  375. /// \param[in] ok Was it successful? If false, no further write-side operation
  376. /// will succeed.
  377. virtual void OnWriteDone(bool /*ok*/) {}
  378. /// Notifies the application that all operations associated with this RPC
  379. /// have completed. This is an override (from the internal base class) but
  380. /// still abstract, so derived classes MUST override it to be instantiated.
  381. void OnDone() override = 0;
  382. /// Notifies the application that this RPC has been cancelled. This is an
  383. /// override (from the internal base class) but not final, so derived classes
  384. /// should override it if they want to take action.
  385. void OnCancel() override {}
  386. private:
  387. friend class ServerCallbackReaderWriter<Request, Response>;
  388. // May be overridden by internal implementation details. This is not a public
  389. // customization point.
  390. virtual void InternalBindStream(
  391. ServerCallbackReaderWriter<Request, Response>* stream) {
  392. grpc::internal::MutexLock l(&stream_mu_);
  393. if (GPR_UNLIKELY(backlog_.send_initial_metadata_wanted)) {
  394. stream->SendInitialMetadata();
  395. }
  396. if (GPR_UNLIKELY(backlog_.read_wanted != nullptr)) {
  397. stream->Read(backlog_.read_wanted);
  398. }
  399. if (GPR_UNLIKELY(backlog_.write_and_finish_wanted)) {
  400. stream->WriteAndFinish(backlog_.write_wanted,
  401. std::move(backlog_.write_options_wanted),
  402. std::move(backlog_.status_wanted));
  403. } else {
  404. if (GPR_UNLIKELY(backlog_.write_wanted != nullptr)) {
  405. stream->Write(backlog_.write_wanted,
  406. std::move(backlog_.write_options_wanted));
  407. }
  408. if (GPR_UNLIKELY(backlog_.finish_wanted)) {
  409. stream->Finish(std::move(backlog_.status_wanted));
  410. }
  411. }
  412. // Set stream_ last so that other functions can use it lock-free
  413. stream_.store(stream, std::memory_order_release);
  414. }
  415. grpc::internal::Mutex stream_mu_;
  416. // TODO(vjpai): Make stream_or_backlog_ into a std::variant or y_absl::variant
  417. // once C++17 or ABSL is supported since stream and backlog are
  418. // mutually exclusive in this class. Do likewise with the
  419. // remaining reactor classes and their backlogs as well.
  420. std::atomic<ServerCallbackReaderWriter<Request, Response>*> stream_{nullptr};
  421. struct PreBindBacklog {
  422. bool send_initial_metadata_wanted = false;
  423. bool write_and_finish_wanted = false;
  424. bool finish_wanted = false;
  425. Request* read_wanted = nullptr;
  426. const Response* write_wanted = nullptr;
  427. grpc::WriteOptions write_options_wanted;
  428. grpc::Status status_wanted;
  429. };
  430. PreBindBacklog backlog_ Y_ABSL_GUARDED_BY(stream_mu_);
  431. };
  432. /// \a ServerReadReactor is the interface for a client-streaming RPC.
  433. template <class Request>
  434. class ServerReadReactor : public internal::ServerReactor {
  435. public:
  436. ServerReadReactor() : reader_(nullptr) {}
  437. ~ServerReadReactor() override = default;
  438. /// The following operation initiations are exactly like ServerBidiReactor.
  439. void StartSendInitialMetadata() Y_ABSL_LOCKS_EXCLUDED(reader_mu_) {
  440. ServerCallbackReader<Request>* reader =
  441. reader_.load(std::memory_order_acquire);
  442. if (reader == nullptr) {
  443. grpc::internal::MutexLock l(&reader_mu_);
  444. reader = reader_.load(std::memory_order_relaxed);
  445. if (reader == nullptr) {
  446. backlog_.send_initial_metadata_wanted = true;
  447. return;
  448. }
  449. }
  450. reader->SendInitialMetadata();
  451. }
  452. void StartRead(Request* req) Y_ABSL_LOCKS_EXCLUDED(reader_mu_) {
  453. ServerCallbackReader<Request>* reader =
  454. reader_.load(std::memory_order_acquire);
  455. if (reader == nullptr) {
  456. grpc::internal::MutexLock l(&reader_mu_);
  457. reader = reader_.load(std::memory_order_relaxed);
  458. if (reader == nullptr) {
  459. backlog_.read_wanted = req;
  460. return;
  461. }
  462. }
  463. reader->Read(req);
  464. }
  465. void Finish(grpc::Status s) Y_ABSL_LOCKS_EXCLUDED(reader_mu_) {
  466. ServerCallbackReader<Request>* reader =
  467. reader_.load(std::memory_order_acquire);
  468. if (reader == nullptr) {
  469. grpc::internal::MutexLock l(&reader_mu_);
  470. reader = reader_.load(std::memory_order_relaxed);
  471. if (reader == nullptr) {
  472. backlog_.finish_wanted = true;
  473. backlog_.status_wanted = std::move(s);
  474. return;
  475. }
  476. }
  477. reader->Finish(std::move(s));
  478. }
  479. /// The following notifications are exactly like ServerBidiReactor.
  480. virtual void OnSendInitialMetadataDone(bool /*ok*/) {}
  481. virtual void OnReadDone(bool /*ok*/) {}
  482. void OnDone() override = 0;
  483. void OnCancel() override {}
  484. private:
  485. friend class ServerCallbackReader<Request>;
  486. // May be overridden by internal implementation details. This is not a public
  487. // customization point.
  488. virtual void InternalBindReader(ServerCallbackReader<Request>* reader)
  489. Y_ABSL_LOCKS_EXCLUDED(reader_mu_) {
  490. grpc::internal::MutexLock l(&reader_mu_);
  491. if (GPR_UNLIKELY(backlog_.send_initial_metadata_wanted)) {
  492. reader->SendInitialMetadata();
  493. }
  494. if (GPR_UNLIKELY(backlog_.read_wanted != nullptr)) {
  495. reader->Read(backlog_.read_wanted);
  496. }
  497. if (GPR_UNLIKELY(backlog_.finish_wanted)) {
  498. reader->Finish(std::move(backlog_.status_wanted));
  499. }
  500. // Set reader_ last so that other functions can use it lock-free
  501. reader_.store(reader, std::memory_order_release);
  502. }
  503. grpc::internal::Mutex reader_mu_;
  504. std::atomic<ServerCallbackReader<Request>*> reader_{nullptr};
  505. struct PreBindBacklog {
  506. bool send_initial_metadata_wanted = false;
  507. bool finish_wanted = false;
  508. Request* read_wanted = nullptr;
  509. grpc::Status status_wanted;
  510. };
  511. PreBindBacklog backlog_ Y_ABSL_GUARDED_BY(reader_mu_);
  512. };
  513. /// \a ServerWriteReactor is the interface for a server-streaming RPC.
  514. template <class Response>
  515. class ServerWriteReactor : public internal::ServerReactor {
  516. public:
  517. ServerWriteReactor() : writer_(nullptr) {}
  518. ~ServerWriteReactor() override = default;
  519. /// The following operation initiations are exactly like ServerBidiReactor.
  520. void StartSendInitialMetadata() Y_ABSL_LOCKS_EXCLUDED(writer_mu_) {
  521. ServerCallbackWriter<Response>* writer =
  522. writer_.load(std::memory_order_acquire);
  523. if (writer == nullptr) {
  524. grpc::internal::MutexLock l(&writer_mu_);
  525. writer = writer_.load(std::memory_order_relaxed);
  526. if (writer == nullptr) {
  527. backlog_.send_initial_metadata_wanted = true;
  528. return;
  529. }
  530. }
  531. writer->SendInitialMetadata();
  532. }
  533. void StartWrite(const Response* resp) {
  534. StartWrite(resp, grpc::WriteOptions());
  535. }
  536. void StartWrite(const Response* resp, grpc::WriteOptions options)
  537. Y_ABSL_LOCKS_EXCLUDED(writer_mu_) {
  538. ServerCallbackWriter<Response>* writer =
  539. writer_.load(std::memory_order_acquire);
  540. if (writer == nullptr) {
  541. grpc::internal::MutexLock l(&writer_mu_);
  542. writer = writer_.load(std::memory_order_relaxed);
  543. if (writer == nullptr) {
  544. backlog_.write_wanted = resp;
  545. backlog_.write_options_wanted = options;
  546. return;
  547. }
  548. }
  549. writer->Write(resp, options);
  550. }
  551. void StartWriteAndFinish(const Response* resp, grpc::WriteOptions options,
  552. grpc::Status s) Y_ABSL_LOCKS_EXCLUDED(writer_mu_) {
  553. ServerCallbackWriter<Response>* writer =
  554. writer_.load(std::memory_order_acquire);
  555. if (writer == nullptr) {
  556. grpc::internal::MutexLock l(&writer_mu_);
  557. writer = writer_.load(std::memory_order_relaxed);
  558. if (writer == nullptr) {
  559. backlog_.write_and_finish_wanted = true;
  560. backlog_.write_wanted = resp;
  561. backlog_.write_options_wanted = options;
  562. backlog_.status_wanted = std::move(s);
  563. return;
  564. }
  565. }
  566. writer->WriteAndFinish(resp, options, std::move(s));
  567. }
  568. void StartWriteLast(const Response* resp, grpc::WriteOptions options) {
  569. StartWrite(resp, options.set_last_message());
  570. }
  571. void Finish(grpc::Status s) Y_ABSL_LOCKS_EXCLUDED(writer_mu_) {
  572. ServerCallbackWriter<Response>* writer =
  573. writer_.load(std::memory_order_acquire);
  574. if (writer == nullptr) {
  575. grpc::internal::MutexLock l(&writer_mu_);
  576. writer = writer_.load(std::memory_order_relaxed);
  577. if (writer == nullptr) {
  578. backlog_.finish_wanted = true;
  579. backlog_.status_wanted = std::move(s);
  580. return;
  581. }
  582. }
  583. writer->Finish(std::move(s));
  584. }
  585. /// The following notifications are exactly like ServerBidiReactor.
  586. virtual void OnSendInitialMetadataDone(bool /*ok*/) {}
  587. virtual void OnWriteDone(bool /*ok*/) {}
  588. void OnDone() override = 0;
  589. void OnCancel() override {}
  590. private:
  591. friend class ServerCallbackWriter<Response>;
  592. // May be overridden by internal implementation details. This is not a public
  593. // customization point.
  594. virtual void InternalBindWriter(ServerCallbackWriter<Response>* writer)
  595. Y_ABSL_LOCKS_EXCLUDED(writer_mu_) {
  596. grpc::internal::MutexLock l(&writer_mu_);
  597. if (GPR_UNLIKELY(backlog_.send_initial_metadata_wanted)) {
  598. writer->SendInitialMetadata();
  599. }
  600. if (GPR_UNLIKELY(backlog_.write_and_finish_wanted)) {
  601. writer->WriteAndFinish(backlog_.write_wanted,
  602. std::move(backlog_.write_options_wanted),
  603. std::move(backlog_.status_wanted));
  604. } else {
  605. if (GPR_UNLIKELY(backlog_.write_wanted != nullptr)) {
  606. writer->Write(backlog_.write_wanted,
  607. std::move(backlog_.write_options_wanted));
  608. }
  609. if (GPR_UNLIKELY(backlog_.finish_wanted)) {
  610. writer->Finish(std::move(backlog_.status_wanted));
  611. }
  612. }
  613. // Set writer_ last so that other functions can use it lock-free
  614. writer_.store(writer, std::memory_order_release);
  615. }
  616. grpc::internal::Mutex writer_mu_;
  617. std::atomic<ServerCallbackWriter<Response>*> writer_{nullptr};
  618. struct PreBindBacklog {
  619. bool send_initial_metadata_wanted = false;
  620. bool write_and_finish_wanted = false;
  621. bool finish_wanted = false;
  622. const Response* write_wanted = nullptr;
  623. grpc::WriteOptions write_options_wanted;
  624. grpc::Status status_wanted;
  625. };
  626. PreBindBacklog backlog_ Y_ABSL_GUARDED_BY(writer_mu_);
  627. };
  628. class ServerUnaryReactor : public internal::ServerReactor {
  629. public:
  630. ServerUnaryReactor() : call_(nullptr) {}
  631. ~ServerUnaryReactor() override = default;
  632. /// StartSendInitialMetadata is exactly like ServerBidiReactor.
  633. void StartSendInitialMetadata() Y_ABSL_LOCKS_EXCLUDED(call_mu_) {
  634. ServerCallbackUnary* call = call_.load(std::memory_order_acquire);
  635. if (call == nullptr) {
  636. grpc::internal::MutexLock l(&call_mu_);
  637. call = call_.load(std::memory_order_relaxed);
  638. if (call == nullptr) {
  639. backlog_.send_initial_metadata_wanted = true;
  640. return;
  641. }
  642. }
  643. call->SendInitialMetadata();
  644. }
  645. /// Finish is similar to ServerBidiReactor except for one detail.
  646. /// If the status is non-OK, any message will not be sent. Instead,
  647. /// the client will only receive the status and any trailing metadata.
  648. void Finish(grpc::Status s) Y_ABSL_LOCKS_EXCLUDED(call_mu_) {
  649. ServerCallbackUnary* call = call_.load(std::memory_order_acquire);
  650. if (call == nullptr) {
  651. grpc::internal::MutexLock l(&call_mu_);
  652. call = call_.load(std::memory_order_relaxed);
  653. if (call == nullptr) {
  654. backlog_.finish_wanted = true;
  655. backlog_.status_wanted = std::move(s);
  656. return;
  657. }
  658. }
  659. call->Finish(std::move(s));
  660. }
  661. /// The following notifications are exactly like ServerBidiReactor.
  662. virtual void OnSendInitialMetadataDone(bool /*ok*/) {}
  663. void OnDone() override = 0;
  664. void OnCancel() override {}
  665. private:
  666. friend class ServerCallbackUnary;
  667. // May be overridden by internal implementation details. This is not a public
  668. // customization point.
  669. virtual void InternalBindCall(ServerCallbackUnary* call)
  670. Y_ABSL_LOCKS_EXCLUDED(call_mu_) {
  671. grpc::internal::MutexLock l(&call_mu_);
  672. if (GPR_UNLIKELY(backlog_.send_initial_metadata_wanted)) {
  673. call->SendInitialMetadata();
  674. }
  675. if (GPR_UNLIKELY(backlog_.finish_wanted)) {
  676. call->Finish(std::move(backlog_.status_wanted));
  677. }
  678. // Set call_ last so that other functions can use it lock-free
  679. call_.store(call, std::memory_order_release);
  680. }
  681. grpc::internal::Mutex call_mu_;
  682. std::atomic<ServerCallbackUnary*> call_{nullptr};
  683. struct PreBindBacklog {
  684. bool send_initial_metadata_wanted = false;
  685. bool finish_wanted = false;
  686. grpc::Status status_wanted;
  687. };
  688. PreBindBacklog backlog_ Y_ABSL_GUARDED_BY(call_mu_);
  689. };
  690. namespace internal {
  691. template <class Base>
  692. class FinishOnlyReactor : public Base {
  693. public:
  694. explicit FinishOnlyReactor(grpc::Status s) { this->Finish(std::move(s)); }
  695. void OnDone() override { this->~FinishOnlyReactor(); }
  696. };
  697. using UnimplementedUnaryReactor = FinishOnlyReactor<ServerUnaryReactor>;
  698. template <class Request>
  699. using UnimplementedReadReactor = FinishOnlyReactor<ServerReadReactor<Request>>;
  700. template <class Response>
  701. using UnimplementedWriteReactor =
  702. FinishOnlyReactor<ServerWriteReactor<Response>>;
  703. template <class Request, class Response>
  704. using UnimplementedBidiReactor =
  705. FinishOnlyReactor<ServerBidiReactor<Request, Response>>;
  706. } // namespace internal
  707. // TODO(vjpai): Remove namespace experimental when last known users are migrated
  708. // off.
  709. namespace experimental {
  710. template <class Request, class Response>
  711. using ServerBidiReactor = ::grpc::ServerBidiReactor<Request, Response>;
  712. } // namespace experimental
  713. } // namespace grpc
  714. #endif // GRPCPP_SUPPORT_SERVER_CALLBACK_H