server_callback_handlers.h 35 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888
  1. //
  2. //
  3. // Copyright 2019 gRPC authors.
  4. //
  5. // Licensed under the Apache License, Version 2.0 (the "License");
  6. // you may not use this file except in compliance with the License.
  7. // You may obtain a copy of the License at
  8. //
  9. // http://www.apache.org/licenses/LICENSE-2.0
  10. //
  11. // Unless required by applicable law or agreed to in writing, software
  12. // distributed under the License is distributed on an "AS IS" BASIS,
  13. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. // See the License for the specific language governing permissions and
  15. // limitations under the License.
  16. //
  17. #ifndef GRPCPP_IMPL_SERVER_CALLBACK_HANDLERS_H
  18. #define GRPCPP_IMPL_SERVER_CALLBACK_HANDLERS_H
  19. #include <grpc/grpc.h>
  20. #include <grpc/support/log.h>
  21. #include <grpcpp/impl/rpc_service_method.h>
  22. #include <grpcpp/server_context.h>
  23. #include <grpcpp/support/message_allocator.h>
  24. #include <grpcpp/support/server_callback.h>
  25. #include <grpcpp/support/status.h>
  26. namespace grpc {
  27. namespace internal {
  28. template <class RequestType, class ResponseType>
  29. class CallbackUnaryHandler : public grpc::internal::MethodHandler {
  30. public:
  31. explicit CallbackUnaryHandler(
  32. std::function<ServerUnaryReactor*(grpc::CallbackServerContext*,
  33. const RequestType*, ResponseType*)>
  34. get_reactor)
  35. : get_reactor_(std::move(get_reactor)) {}
  36. void SetMessageAllocator(
  37. MessageAllocator<RequestType, ResponseType>* allocator) {
  38. allocator_ = allocator;
  39. }
  40. void RunHandler(const HandlerParameter& param) final {
  41. // Arena allocate a controller structure (that includes request/response)
  42. grpc_call_ref(param.call->call());
  43. auto* allocator_state =
  44. static_cast<MessageHolder<RequestType, ResponseType>*>(
  45. param.internal_data);
  46. auto* call = new (grpc_call_arena_alloc(param.call->call(),
  47. sizeof(ServerCallbackUnaryImpl)))
  48. ServerCallbackUnaryImpl(
  49. static_cast<grpc::CallbackServerContext*>(param.server_context),
  50. param.call, allocator_state, param.call_requester);
  51. param.server_context->BeginCompletionOp(
  52. param.call, [call](bool) { call->MaybeDone(); }, call);
  53. ServerUnaryReactor* reactor = nullptr;
  54. if (param.status.ok()) {
  55. reactor = grpc::internal::CatchingReactorGetter<ServerUnaryReactor>(
  56. get_reactor_,
  57. static_cast<grpc::CallbackServerContext*>(param.server_context),
  58. call->request(), call->response());
  59. }
  60. if (reactor == nullptr) {
  61. // if deserialization or reactor creator failed, we need to fail the call
  62. reactor = new (grpc_call_arena_alloc(param.call->call(),
  63. sizeof(UnimplementedUnaryReactor)))
  64. UnimplementedUnaryReactor(
  65. grpc::Status(grpc::StatusCode::UNIMPLEMENTED, ""));
  66. }
  67. /// Invoke SetupReactor as the last part of the handler
  68. call->SetupReactor(reactor);
  69. }
  70. void* Deserialize(grpc_call* call, grpc_byte_buffer* req,
  71. grpc::Status* status, void** handler_data) final {
  72. grpc::ByteBuffer buf;
  73. buf.set_buffer(req);
  74. RequestType* request = nullptr;
  75. MessageHolder<RequestType, ResponseType>* allocator_state;
  76. if (allocator_ != nullptr) {
  77. allocator_state = allocator_->AllocateMessages();
  78. } else {
  79. allocator_state = new (grpc_call_arena_alloc(
  80. call, sizeof(DefaultMessageHolder<RequestType, ResponseType>)))
  81. DefaultMessageHolder<RequestType, ResponseType>();
  82. }
  83. *handler_data = allocator_state;
  84. request = allocator_state->request();
  85. *status =
  86. grpc::SerializationTraits<RequestType>::Deserialize(&buf, request);
  87. buf.Release();
  88. if (status->ok()) {
  89. return request;
  90. }
  91. return nullptr;
  92. }
  93. private:
  94. std::function<ServerUnaryReactor*(grpc::CallbackServerContext*,
  95. const RequestType*, ResponseType*)>
  96. get_reactor_;
  97. MessageAllocator<RequestType, ResponseType>* allocator_ = nullptr;
  98. class ServerCallbackUnaryImpl : public ServerCallbackUnary {
  99. public:
  100. void Finish(grpc::Status s) override {
  101. // A callback that only contains a call to MaybeDone can be run as an
  102. // inline callback regardless of whether or not OnDone is inlineable
  103. // because if the actual OnDone callback needs to be scheduled, MaybeDone
  104. // is responsible for dispatching to an executor thread if needed. Thus,
  105. // when setting up the finish_tag_, we can set its own callback to
  106. // inlineable.
  107. finish_tag_.Set(
  108. call_.call(),
  109. [this](bool) {
  110. this->MaybeDone(
  111. reactor_.load(std::memory_order_relaxed)->InternalInlineable());
  112. },
  113. &finish_ops_, /*can_inline=*/true);
  114. finish_ops_.set_core_cq_tag(&finish_tag_);
  115. if (!ctx_->sent_initial_metadata_) {
  116. finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
  117. ctx_->initial_metadata_flags());
  118. if (ctx_->compression_level_set()) {
  119. finish_ops_.set_compression_level(ctx_->compression_level());
  120. }
  121. ctx_->sent_initial_metadata_ = true;
  122. }
  123. // The response is dropped if the status is not OK.
  124. if (s.ok()) {
  125. finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_,
  126. finish_ops_.SendMessagePtr(response()));
  127. } else {
  128. finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
  129. }
  130. finish_ops_.set_core_cq_tag(&finish_tag_);
  131. call_.PerformOps(&finish_ops_);
  132. }
  133. void SendInitialMetadata() override {
  134. GPR_ASSERT(!ctx_->sent_initial_metadata_);
  135. this->Ref();
  136. // The callback for this function should not be marked inline because it
  137. // is directly invoking a user-controlled reaction
  138. // (OnSendInitialMetadataDone). Thus it must be dispatched to an executor
  139. // thread. However, any OnDone needed after that can be inlined because it
  140. // is already running on an executor thread.
  141. meta_tag_.Set(
  142. call_.call(),
  143. [this](bool ok) {
  144. ServerUnaryReactor* reactor =
  145. reactor_.load(std::memory_order_relaxed);
  146. reactor->OnSendInitialMetadataDone(ok);
  147. this->MaybeDone(/*inlineable_ondone=*/true);
  148. },
  149. &meta_ops_, /*can_inline=*/false);
  150. meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
  151. ctx_->initial_metadata_flags());
  152. if (ctx_->compression_level_set()) {
  153. meta_ops_.set_compression_level(ctx_->compression_level());
  154. }
  155. ctx_->sent_initial_metadata_ = true;
  156. meta_ops_.set_core_cq_tag(&meta_tag_);
  157. call_.PerformOps(&meta_ops_);
  158. }
  159. private:
  160. friend class CallbackUnaryHandler<RequestType, ResponseType>;
  161. ServerCallbackUnaryImpl(
  162. grpc::CallbackServerContext* ctx, grpc::internal::Call* call,
  163. MessageHolder<RequestType, ResponseType>* allocator_state,
  164. std::function<void()> call_requester)
  165. : ctx_(ctx),
  166. call_(*call),
  167. allocator_state_(allocator_state),
  168. call_requester_(std::move(call_requester)) {
  169. ctx_->set_message_allocator_state(allocator_state);
  170. }
  171. /// SetupReactor binds the reactor (which also releases any queued
  172. /// operations), maybe calls OnCancel if possible/needed, and maybe marks
  173. /// the completion of the RPC. This should be the last component of the
  174. /// handler.
  175. void SetupReactor(ServerUnaryReactor* reactor) {
  176. reactor_.store(reactor, std::memory_order_relaxed);
  177. this->BindReactor(reactor);
  178. this->MaybeCallOnCancel(reactor);
  179. this->MaybeDone(reactor->InternalInlineable());
  180. }
  181. const RequestType* request() { return allocator_state_->request(); }
  182. ResponseType* response() { return allocator_state_->response(); }
  183. void CallOnDone() override {
  184. reactor_.load(std::memory_order_relaxed)->OnDone();
  185. grpc_call* call = call_.call();
  186. auto call_requester = std::move(call_requester_);
  187. allocator_state_->Release();
  188. if (ctx_->context_allocator() != nullptr) {
  189. ctx_->context_allocator()->Release(ctx_);
  190. }
  191. this->~ServerCallbackUnaryImpl(); // explicitly call destructor
  192. grpc_call_unref(call);
  193. call_requester();
  194. }
  195. ServerReactor* reactor() override {
  196. return reactor_.load(std::memory_order_relaxed);
  197. }
  198. grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata>
  199. meta_ops_;
  200. grpc::internal::CallbackWithSuccessTag meta_tag_;
  201. grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
  202. grpc::internal::CallOpSendMessage,
  203. grpc::internal::CallOpServerSendStatus>
  204. finish_ops_;
  205. grpc::internal::CallbackWithSuccessTag finish_tag_;
  206. grpc::CallbackServerContext* const ctx_;
  207. grpc::internal::Call call_;
  208. MessageHolder<RequestType, ResponseType>* const allocator_state_;
  209. std::function<void()> call_requester_;
  210. // reactor_ can always be loaded/stored with relaxed memory ordering because
  211. // its value is only set once, independently of other data in the object,
  212. // and the loads that use it will always actually come provably later even
  213. // though they are from different threads since they are triggered by
  214. // actions initiated only by the setting up of the reactor_ variable. In
  215. // a sense, it's a delayed "const": it gets its value from the SetupReactor
  216. // method (not the constructor, so it's not a true const), but it doesn't
  217. // change after that and it only gets used by actions caused, directly or
  218. // indirectly, by that setup. This comment also applies to the reactor_
  219. // variables of the other streaming objects in this file.
  220. std::atomic<ServerUnaryReactor*> reactor_;
  221. // callbacks_outstanding_ follows a refcount pattern
  222. std::atomic<intptr_t> callbacks_outstanding_{
  223. 3}; // reserve for start, Finish, and CompletionOp
  224. };
  225. };
  226. template <class RequestType, class ResponseType>
  227. class CallbackClientStreamingHandler : public grpc::internal::MethodHandler {
  228. public:
  229. explicit CallbackClientStreamingHandler(
  230. std::function<ServerReadReactor<RequestType>*(
  231. grpc::CallbackServerContext*, ResponseType*)>
  232. get_reactor)
  233. : get_reactor_(std::move(get_reactor)) {}
  234. void RunHandler(const HandlerParameter& param) final {
  235. // Arena allocate a reader structure (that includes response)
  236. grpc_call_ref(param.call->call());
  237. auto* reader = new (grpc_call_arena_alloc(param.call->call(),
  238. sizeof(ServerCallbackReaderImpl)))
  239. ServerCallbackReaderImpl(
  240. static_cast<grpc::CallbackServerContext*>(param.server_context),
  241. param.call, param.call_requester);
  242. // Inlineable OnDone can be false in the CompletionOp callback because there
  243. // is no read reactor that has an inlineable OnDone; this only applies to
  244. // the DefaultReactor (which is unary).
  245. param.server_context->BeginCompletionOp(
  246. param.call,
  247. [reader](bool) { reader->MaybeDone(/*inlineable_ondone=*/false); },
  248. reader);
  249. ServerReadReactor<RequestType>* reactor = nullptr;
  250. if (param.status.ok()) {
  251. reactor =
  252. grpc::internal::CatchingReactorGetter<ServerReadReactor<RequestType>>(
  253. get_reactor_,
  254. static_cast<grpc::CallbackServerContext*>(param.server_context),
  255. reader->response());
  256. }
  257. if (reactor == nullptr) {
  258. // if deserialization or reactor creator failed, we need to fail the call
  259. reactor = new (grpc_call_arena_alloc(
  260. param.call->call(), sizeof(UnimplementedReadReactor<RequestType>)))
  261. UnimplementedReadReactor<RequestType>(
  262. grpc::Status(grpc::StatusCode::UNIMPLEMENTED, ""));
  263. }
  264. reader->SetupReactor(reactor);
  265. }
  266. private:
  267. std::function<ServerReadReactor<RequestType>*(grpc::CallbackServerContext*,
  268. ResponseType*)>
  269. get_reactor_;
  270. class ServerCallbackReaderImpl : public ServerCallbackReader<RequestType> {
  271. public:
  272. void Finish(grpc::Status s) override {
  273. // A finish tag with only MaybeDone can have its callback inlined
  274. // regardless even if OnDone is not inlineable because this callback just
  275. // checks a ref and then decides whether or not to dispatch OnDone.
  276. finish_tag_.Set(
  277. call_.call(),
  278. [this](bool) {
  279. // Inlineable OnDone can be false here because there is
  280. // no read reactor that has an inlineable OnDone; this
  281. // only applies to the DefaultReactor (which is unary).
  282. this->MaybeDone(/*inlineable_ondone=*/false);
  283. },
  284. &finish_ops_, /*can_inline=*/true);
  285. if (!ctx_->sent_initial_metadata_) {
  286. finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
  287. ctx_->initial_metadata_flags());
  288. if (ctx_->compression_level_set()) {
  289. finish_ops_.set_compression_level(ctx_->compression_level());
  290. }
  291. ctx_->sent_initial_metadata_ = true;
  292. }
  293. // The response is dropped if the status is not OK.
  294. if (s.ok()) {
  295. finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_,
  296. finish_ops_.SendMessagePtr(&resp_));
  297. } else {
  298. finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
  299. }
  300. finish_ops_.set_core_cq_tag(&finish_tag_);
  301. call_.PerformOps(&finish_ops_);
  302. }
  303. void SendInitialMetadata() override {
  304. GPR_ASSERT(!ctx_->sent_initial_metadata_);
  305. this->Ref();
  306. // The callback for this function should not be inlined because it invokes
  307. // a user-controlled reaction, but any resulting OnDone can be inlined in
  308. // the executor to which this callback is dispatched.
  309. meta_tag_.Set(
  310. call_.call(),
  311. [this](bool ok) {
  312. ServerReadReactor<RequestType>* reactor =
  313. reactor_.load(std::memory_order_relaxed);
  314. reactor->OnSendInitialMetadataDone(ok);
  315. this->MaybeDone(/*inlineable_ondone=*/true);
  316. },
  317. &meta_ops_, /*can_inline=*/false);
  318. meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
  319. ctx_->initial_metadata_flags());
  320. if (ctx_->compression_level_set()) {
  321. meta_ops_.set_compression_level(ctx_->compression_level());
  322. }
  323. ctx_->sent_initial_metadata_ = true;
  324. meta_ops_.set_core_cq_tag(&meta_tag_);
  325. call_.PerformOps(&meta_ops_);
  326. }
  327. void Read(RequestType* req) override {
  328. this->Ref();
  329. read_ops_.RecvMessage(req);
  330. call_.PerformOps(&read_ops_);
  331. }
  332. private:
  333. friend class CallbackClientStreamingHandler<RequestType, ResponseType>;
  334. ServerCallbackReaderImpl(grpc::CallbackServerContext* ctx,
  335. grpc::internal::Call* call,
  336. std::function<void()> call_requester)
  337. : ctx_(ctx), call_(*call), call_requester_(std::move(call_requester)) {}
  338. void SetupReactor(ServerReadReactor<RequestType>* reactor) {
  339. reactor_.store(reactor, std::memory_order_relaxed);
  340. // The callback for this function should not be inlined because it invokes
  341. // a user-controlled reaction, but any resulting OnDone can be inlined in
  342. // the executor to which this callback is dispatched.
  343. read_tag_.Set(
  344. call_.call(),
  345. [this, reactor](bool ok) {
  346. if (GPR_UNLIKELY(!ok)) {
  347. ctx_->MaybeMarkCancelledOnRead();
  348. }
  349. reactor->OnReadDone(ok);
  350. this->MaybeDone(/*inlineable_ondone=*/true);
  351. },
  352. &read_ops_, /*can_inline=*/false);
  353. read_ops_.set_core_cq_tag(&read_tag_);
  354. this->BindReactor(reactor);
  355. this->MaybeCallOnCancel(reactor);
  356. // Inlineable OnDone can be false here because there is no read
  357. // reactor that has an inlineable OnDone; this only applies to the
  358. // DefaultReactor (which is unary).
  359. this->MaybeDone(/*inlineable_ondone=*/false);
  360. }
  361. ~ServerCallbackReaderImpl() {}
  362. ResponseType* response() { return &resp_; }
  363. void CallOnDone() override {
  364. reactor_.load(std::memory_order_relaxed)->OnDone();
  365. grpc_call* call = call_.call();
  366. auto call_requester = std::move(call_requester_);
  367. if (ctx_->context_allocator() != nullptr) {
  368. ctx_->context_allocator()->Release(ctx_);
  369. }
  370. this->~ServerCallbackReaderImpl(); // explicitly call destructor
  371. grpc_call_unref(call);
  372. call_requester();
  373. }
  374. ServerReactor* reactor() override {
  375. return reactor_.load(std::memory_order_relaxed);
  376. }
  377. grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata>
  378. meta_ops_;
  379. grpc::internal::CallbackWithSuccessTag meta_tag_;
  380. grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
  381. grpc::internal::CallOpSendMessage,
  382. grpc::internal::CallOpServerSendStatus>
  383. finish_ops_;
  384. grpc::internal::CallbackWithSuccessTag finish_tag_;
  385. grpc::internal::CallOpSet<grpc::internal::CallOpRecvMessage<RequestType>>
  386. read_ops_;
  387. grpc::internal::CallbackWithSuccessTag read_tag_;
  388. grpc::CallbackServerContext* const ctx_;
  389. grpc::internal::Call call_;
  390. ResponseType resp_;
  391. std::function<void()> call_requester_;
  392. // The memory ordering of reactor_ follows ServerCallbackUnaryImpl.
  393. std::atomic<ServerReadReactor<RequestType>*> reactor_;
  394. // callbacks_outstanding_ follows a refcount pattern
  395. std::atomic<intptr_t> callbacks_outstanding_{
  396. 3}; // reserve for OnStarted, Finish, and CompletionOp
  397. };
  398. };
  399. template <class RequestType, class ResponseType>
  400. class CallbackServerStreamingHandler : public grpc::internal::MethodHandler {
  401. public:
  402. explicit CallbackServerStreamingHandler(
  403. std::function<ServerWriteReactor<ResponseType>*(
  404. grpc::CallbackServerContext*, const RequestType*)>
  405. get_reactor)
  406. : get_reactor_(std::move(get_reactor)) {}
  407. void RunHandler(const HandlerParameter& param) final {
  408. // Arena allocate a writer structure
  409. grpc_call_ref(param.call->call());
  410. auto* writer = new (grpc_call_arena_alloc(param.call->call(),
  411. sizeof(ServerCallbackWriterImpl)))
  412. ServerCallbackWriterImpl(
  413. static_cast<grpc::CallbackServerContext*>(param.server_context),
  414. param.call, static_cast<RequestType*>(param.request),
  415. param.call_requester);
  416. // Inlineable OnDone can be false in the CompletionOp callback because there
  417. // is no write reactor that has an inlineable OnDone; this only applies to
  418. // the DefaultReactor (which is unary).
  419. param.server_context->BeginCompletionOp(
  420. param.call,
  421. [writer](bool) { writer->MaybeDone(/*inlineable_ondone=*/false); },
  422. writer);
  423. ServerWriteReactor<ResponseType>* reactor = nullptr;
  424. if (param.status.ok()) {
  425. reactor = grpc::internal::CatchingReactorGetter<
  426. ServerWriteReactor<ResponseType>>(
  427. get_reactor_,
  428. static_cast<grpc::CallbackServerContext*>(param.server_context),
  429. writer->request());
  430. }
  431. if (reactor == nullptr) {
  432. // if deserialization or reactor creator failed, we need to fail the call
  433. reactor = new (grpc_call_arena_alloc(
  434. param.call->call(), sizeof(UnimplementedWriteReactor<ResponseType>)))
  435. UnimplementedWriteReactor<ResponseType>(
  436. grpc::Status(grpc::StatusCode::UNIMPLEMENTED, ""));
  437. }
  438. writer->SetupReactor(reactor);
  439. }
  440. void* Deserialize(grpc_call* call, grpc_byte_buffer* req,
  441. grpc::Status* status, void** /*handler_data*/) final {
  442. grpc::ByteBuffer buf;
  443. buf.set_buffer(req);
  444. auto* request =
  445. new (grpc_call_arena_alloc(call, sizeof(RequestType))) RequestType();
  446. *status =
  447. grpc::SerializationTraits<RequestType>::Deserialize(&buf, request);
  448. buf.Release();
  449. if (status->ok()) {
  450. return request;
  451. }
  452. request->~RequestType();
  453. return nullptr;
  454. }
  455. private:
  456. std::function<ServerWriteReactor<ResponseType>*(grpc::CallbackServerContext*,
  457. const RequestType*)>
  458. get_reactor_;
  459. class ServerCallbackWriterImpl : public ServerCallbackWriter<ResponseType> {
  460. public:
  461. void Finish(grpc::Status s) override {
  462. // A finish tag with only MaybeDone can have its callback inlined
  463. // regardless even if OnDone is not inlineable because this callback just
  464. // checks a ref and then decides whether or not to dispatch OnDone.
  465. finish_tag_.Set(
  466. call_.call(),
  467. [this](bool) {
  468. // Inlineable OnDone can be false here because there is
  469. // no write reactor that has an inlineable OnDone; this
  470. // only applies to the DefaultReactor (which is unary).
  471. this->MaybeDone(/*inlineable_ondone=*/false);
  472. },
  473. &finish_ops_, /*can_inline=*/true);
  474. finish_ops_.set_core_cq_tag(&finish_tag_);
  475. if (!ctx_->sent_initial_metadata_) {
  476. finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
  477. ctx_->initial_metadata_flags());
  478. if (ctx_->compression_level_set()) {
  479. finish_ops_.set_compression_level(ctx_->compression_level());
  480. }
  481. ctx_->sent_initial_metadata_ = true;
  482. }
  483. finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
  484. call_.PerformOps(&finish_ops_);
  485. }
  486. void SendInitialMetadata() override {
  487. GPR_ASSERT(!ctx_->sent_initial_metadata_);
  488. this->Ref();
  489. // The callback for this function should not be inlined because it invokes
  490. // a user-controlled reaction, but any resulting OnDone can be inlined in
  491. // the executor to which this callback is dispatched.
  492. meta_tag_.Set(
  493. call_.call(),
  494. [this](bool ok) {
  495. ServerWriteReactor<ResponseType>* reactor =
  496. reactor_.load(std::memory_order_relaxed);
  497. reactor->OnSendInitialMetadataDone(ok);
  498. this->MaybeDone(/*inlineable_ondone=*/true);
  499. },
  500. &meta_ops_, /*can_inline=*/false);
  501. meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
  502. ctx_->initial_metadata_flags());
  503. if (ctx_->compression_level_set()) {
  504. meta_ops_.set_compression_level(ctx_->compression_level());
  505. }
  506. ctx_->sent_initial_metadata_ = true;
  507. meta_ops_.set_core_cq_tag(&meta_tag_);
  508. call_.PerformOps(&meta_ops_);
  509. }
  510. void Write(const ResponseType* resp, grpc::WriteOptions options) override {
  511. this->Ref();
  512. if (options.is_last_message()) {
  513. options.set_buffer_hint();
  514. }
  515. if (!ctx_->sent_initial_metadata_) {
  516. write_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
  517. ctx_->initial_metadata_flags());
  518. if (ctx_->compression_level_set()) {
  519. write_ops_.set_compression_level(ctx_->compression_level());
  520. }
  521. ctx_->sent_initial_metadata_ = true;
  522. }
  523. // TODO(vjpai): don't assert
  524. GPR_ASSERT(write_ops_.SendMessagePtr(resp, options).ok());
  525. call_.PerformOps(&write_ops_);
  526. }
  527. void WriteAndFinish(const ResponseType* resp, grpc::WriteOptions options,
  528. grpc::Status s) override {
  529. // This combines the write into the finish callback
  530. // TODO(vjpai): don't assert
  531. GPR_ASSERT(finish_ops_.SendMessagePtr(resp, options).ok());
  532. Finish(std::move(s));
  533. }
  534. private:
  535. friend class CallbackServerStreamingHandler<RequestType, ResponseType>;
  536. ServerCallbackWriterImpl(grpc::CallbackServerContext* ctx,
  537. grpc::internal::Call* call, const RequestType* req,
  538. std::function<void()> call_requester)
  539. : ctx_(ctx),
  540. call_(*call),
  541. req_(req),
  542. call_requester_(std::move(call_requester)) {}
  543. void SetupReactor(ServerWriteReactor<ResponseType>* reactor) {
  544. reactor_.store(reactor, std::memory_order_relaxed);
  545. // The callback for this function should not be inlined because it invokes
  546. // a user-controlled reaction, but any resulting OnDone can be inlined in
  547. // the executor to which this callback is dispatched.
  548. write_tag_.Set(
  549. call_.call(),
  550. [this, reactor](bool ok) {
  551. reactor->OnWriteDone(ok);
  552. this->MaybeDone(/*inlineable_ondone=*/true);
  553. },
  554. &write_ops_, /*can_inline=*/false);
  555. write_ops_.set_core_cq_tag(&write_tag_);
  556. this->BindReactor(reactor);
  557. this->MaybeCallOnCancel(reactor);
  558. // Inlineable OnDone can be false here because there is no write
  559. // reactor that has an inlineable OnDone; this only applies to the
  560. // DefaultReactor (which is unary).
  561. this->MaybeDone(/*inlineable_ondone=*/false);
  562. }
  563. ~ServerCallbackWriterImpl() {
  564. if (req_ != nullptr) {
  565. req_->~RequestType();
  566. }
  567. }
  568. const RequestType* request() { return req_; }
  569. void CallOnDone() override {
  570. reactor_.load(std::memory_order_relaxed)->OnDone();
  571. grpc_call* call = call_.call();
  572. auto call_requester = std::move(call_requester_);
  573. if (ctx_->context_allocator() != nullptr) {
  574. ctx_->context_allocator()->Release(ctx_);
  575. }
  576. this->~ServerCallbackWriterImpl(); // explicitly call destructor
  577. grpc_call_unref(call);
  578. call_requester();
  579. }
  580. ServerReactor* reactor() override {
  581. return reactor_.load(std::memory_order_relaxed);
  582. }
  583. grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata>
  584. meta_ops_;
  585. grpc::internal::CallbackWithSuccessTag meta_tag_;
  586. grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
  587. grpc::internal::CallOpSendMessage,
  588. grpc::internal::CallOpServerSendStatus>
  589. finish_ops_;
  590. grpc::internal::CallbackWithSuccessTag finish_tag_;
  591. grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
  592. grpc::internal::CallOpSendMessage>
  593. write_ops_;
  594. grpc::internal::CallbackWithSuccessTag write_tag_;
  595. grpc::CallbackServerContext* const ctx_;
  596. grpc::internal::Call call_;
  597. const RequestType* req_;
  598. std::function<void()> call_requester_;
  599. // The memory ordering of reactor_ follows ServerCallbackUnaryImpl.
  600. std::atomic<ServerWriteReactor<ResponseType>*> reactor_;
  601. // callbacks_outstanding_ follows a refcount pattern
  602. std::atomic<intptr_t> callbacks_outstanding_{
  603. 3}; // reserve for OnStarted, Finish, and CompletionOp
  604. };
  605. };
  606. template <class RequestType, class ResponseType>
  607. class CallbackBidiHandler : public grpc::internal::MethodHandler {
  608. public:
  609. explicit CallbackBidiHandler(
  610. std::function<ServerBidiReactor<RequestType, ResponseType>*(
  611. grpc::CallbackServerContext*)>
  612. get_reactor)
  613. : get_reactor_(std::move(get_reactor)) {}
  614. void RunHandler(const HandlerParameter& param) final {
  615. grpc_call_ref(param.call->call());
  616. auto* stream = new (grpc_call_arena_alloc(
  617. param.call->call(), sizeof(ServerCallbackReaderWriterImpl)))
  618. ServerCallbackReaderWriterImpl(
  619. static_cast<grpc::CallbackServerContext*>(param.server_context),
  620. param.call, param.call_requester);
  621. // Inlineable OnDone can be false in the CompletionOp callback because there
  622. // is no bidi reactor that has an inlineable OnDone; this only applies to
  623. // the DefaultReactor (which is unary).
  624. param.server_context->BeginCompletionOp(
  625. param.call,
  626. [stream](bool) { stream->MaybeDone(/*inlineable_ondone=*/false); },
  627. stream);
  628. ServerBidiReactor<RequestType, ResponseType>* reactor = nullptr;
  629. if (param.status.ok()) {
  630. reactor = grpc::internal::CatchingReactorGetter<
  631. ServerBidiReactor<RequestType, ResponseType>>(
  632. get_reactor_,
  633. static_cast<grpc::CallbackServerContext*>(param.server_context));
  634. }
  635. if (reactor == nullptr) {
  636. // if deserialization or reactor creator failed, we need to fail the call
  637. reactor = new (grpc_call_arena_alloc(
  638. param.call->call(),
  639. sizeof(UnimplementedBidiReactor<RequestType, ResponseType>)))
  640. UnimplementedBidiReactor<RequestType, ResponseType>(
  641. grpc::Status(grpc::StatusCode::UNIMPLEMENTED, ""));
  642. }
  643. stream->SetupReactor(reactor);
  644. }
  645. private:
  646. std::function<ServerBidiReactor<RequestType, ResponseType>*(
  647. grpc::CallbackServerContext*)>
  648. get_reactor_;
  649. class ServerCallbackReaderWriterImpl
  650. : public ServerCallbackReaderWriter<RequestType, ResponseType> {
  651. public:
  652. void Finish(grpc::Status s) override {
  653. // A finish tag with only MaybeDone can have its callback inlined
  654. // regardless even if OnDone is not inlineable because this callback just
  655. // checks a ref and then decides whether or not to dispatch OnDone.
  656. finish_tag_.Set(
  657. call_.call(),
  658. [this](bool) {
  659. // Inlineable OnDone can be false here because there is
  660. // no bidi reactor that has an inlineable OnDone; this
  661. // only applies to the DefaultReactor (which is unary).
  662. this->MaybeDone(/*inlineable_ondone=*/false);
  663. },
  664. &finish_ops_, /*can_inline=*/true);
  665. finish_ops_.set_core_cq_tag(&finish_tag_);
  666. if (!ctx_->sent_initial_metadata_) {
  667. finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
  668. ctx_->initial_metadata_flags());
  669. if (ctx_->compression_level_set()) {
  670. finish_ops_.set_compression_level(ctx_->compression_level());
  671. }
  672. ctx_->sent_initial_metadata_ = true;
  673. }
  674. finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
  675. call_.PerformOps(&finish_ops_);
  676. }
  677. void SendInitialMetadata() override {
  678. GPR_ASSERT(!ctx_->sent_initial_metadata_);
  679. this->Ref();
  680. // The callback for this function should not be inlined because it invokes
  681. // a user-controlled reaction, but any resulting OnDone can be inlined in
  682. // the executor to which this callback is dispatched.
  683. meta_tag_.Set(
  684. call_.call(),
  685. [this](bool ok) {
  686. ServerBidiReactor<RequestType, ResponseType>* reactor =
  687. reactor_.load(std::memory_order_relaxed);
  688. reactor->OnSendInitialMetadataDone(ok);
  689. this->MaybeDone(/*inlineable_ondone=*/true);
  690. },
  691. &meta_ops_, /*can_inline=*/false);
  692. meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
  693. ctx_->initial_metadata_flags());
  694. if (ctx_->compression_level_set()) {
  695. meta_ops_.set_compression_level(ctx_->compression_level());
  696. }
  697. ctx_->sent_initial_metadata_ = true;
  698. meta_ops_.set_core_cq_tag(&meta_tag_);
  699. call_.PerformOps(&meta_ops_);
  700. }
  701. void Write(const ResponseType* resp, grpc::WriteOptions options) override {
  702. this->Ref();
  703. if (options.is_last_message()) {
  704. options.set_buffer_hint();
  705. }
  706. if (!ctx_->sent_initial_metadata_) {
  707. write_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
  708. ctx_->initial_metadata_flags());
  709. if (ctx_->compression_level_set()) {
  710. write_ops_.set_compression_level(ctx_->compression_level());
  711. }
  712. ctx_->sent_initial_metadata_ = true;
  713. }
  714. // TODO(vjpai): don't assert
  715. GPR_ASSERT(write_ops_.SendMessagePtr(resp, options).ok());
  716. call_.PerformOps(&write_ops_);
  717. }
  718. void WriteAndFinish(const ResponseType* resp, grpc::WriteOptions options,
  719. grpc::Status s) override {
  720. // TODO(vjpai): don't assert
  721. GPR_ASSERT(finish_ops_.SendMessagePtr(resp, options).ok());
  722. Finish(std::move(s));
  723. }
  724. void Read(RequestType* req) override {
  725. this->Ref();
  726. read_ops_.RecvMessage(req);
  727. call_.PerformOps(&read_ops_);
  728. }
  729. private:
  730. friend class CallbackBidiHandler<RequestType, ResponseType>;
  731. ServerCallbackReaderWriterImpl(grpc::CallbackServerContext* ctx,
  732. grpc::internal::Call* call,
  733. std::function<void()> call_requester)
  734. : ctx_(ctx), call_(*call), call_requester_(std::move(call_requester)) {}
  735. void SetupReactor(ServerBidiReactor<RequestType, ResponseType>* reactor) {
  736. reactor_.store(reactor, std::memory_order_relaxed);
  737. // The callbacks for these functions should not be inlined because they
  738. // invoke user-controlled reactions, but any resulting OnDones can be
  739. // inlined in the executor to which a callback is dispatched.
  740. write_tag_.Set(
  741. call_.call(),
  742. [this, reactor](bool ok) {
  743. reactor->OnWriteDone(ok);
  744. this->MaybeDone(/*inlineable_ondone=*/true);
  745. },
  746. &write_ops_, /*can_inline=*/false);
  747. write_ops_.set_core_cq_tag(&write_tag_);
  748. read_tag_.Set(
  749. call_.call(),
  750. [this, reactor](bool ok) {
  751. if (GPR_UNLIKELY(!ok)) {
  752. ctx_->MaybeMarkCancelledOnRead();
  753. }
  754. reactor->OnReadDone(ok);
  755. this->MaybeDone(/*inlineable_ondone=*/true);
  756. },
  757. &read_ops_, /*can_inline=*/false);
  758. read_ops_.set_core_cq_tag(&read_tag_);
  759. this->BindReactor(reactor);
  760. this->MaybeCallOnCancel(reactor);
  761. // Inlineable OnDone can be false here because there is no bidi
  762. // reactor that has an inlineable OnDone; this only applies to the
  763. // DefaultReactor (which is unary).
  764. this->MaybeDone(/*inlineable_ondone=*/false);
  765. }
  766. void CallOnDone() override {
  767. reactor_.load(std::memory_order_relaxed)->OnDone();
  768. grpc_call* call = call_.call();
  769. auto call_requester = std::move(call_requester_);
  770. if (ctx_->context_allocator() != nullptr) {
  771. ctx_->context_allocator()->Release(ctx_);
  772. }
  773. this->~ServerCallbackReaderWriterImpl(); // explicitly call destructor
  774. grpc_call_unref(call);
  775. call_requester();
  776. }
  777. ServerReactor* reactor() override {
  778. return reactor_.load(std::memory_order_relaxed);
  779. }
  780. grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata>
  781. meta_ops_;
  782. grpc::internal::CallbackWithSuccessTag meta_tag_;
  783. grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
  784. grpc::internal::CallOpSendMessage,
  785. grpc::internal::CallOpServerSendStatus>
  786. finish_ops_;
  787. grpc::internal::CallbackWithSuccessTag finish_tag_;
  788. grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
  789. grpc::internal::CallOpSendMessage>
  790. write_ops_;
  791. grpc::internal::CallbackWithSuccessTag write_tag_;
  792. grpc::internal::CallOpSet<grpc::internal::CallOpRecvMessage<RequestType>>
  793. read_ops_;
  794. grpc::internal::CallbackWithSuccessTag read_tag_;
  795. grpc::CallbackServerContext* const ctx_;
  796. grpc::internal::Call call_;
  797. std::function<void()> call_requester_;
  798. // The memory ordering of reactor_ follows ServerCallbackUnaryImpl.
  799. std::atomic<ServerBidiReactor<RequestType, ResponseType>*> reactor_;
  800. // callbacks_outstanding_ follows a refcount pattern
  801. std::atomic<intptr_t> callbacks_outstanding_{
  802. 3}; // reserve for OnStarted, Finish, and CompletionOp
  803. };
  804. };
  805. } // namespace internal
  806. } // namespace grpc
  807. #endif // GRPCPP_IMPL_SERVER_CALLBACK_HANDLERS_H