method_handler.h 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405
  1. //
  2. //
  3. // Copyright 2015 gRPC authors.
  4. //
  5. // Licensed under the Apache License, Version 2.0 (the "License");
  6. // you may not use this file except in compliance with the License.
  7. // You may obtain a copy of the License at
  8. //
  9. // http://www.apache.org/licenses/LICENSE-2.0
  10. //
  11. // Unless required by applicable law or agreed to in writing, software
  12. // distributed under the License is distributed on an "AS IS" BASIS,
  13. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. // See the License for the specific language governing permissions and
  15. // limitations under the License.
  16. //
  17. //
  18. #ifndef GRPCPP_SUPPORT_METHOD_HANDLER_H
  19. #define GRPCPP_SUPPORT_METHOD_HANDLER_H
  20. #include <grpc/byte_buffer.h>
  21. #include <grpc/support/log.h>
  22. #include <grpcpp/impl/rpc_service_method.h>
  23. #include <grpcpp/support/byte_buffer.h>
  24. #include <grpcpp/support/sync_stream.h>
  25. namespace grpc {
  26. namespace internal {
  27. // Invoke the method handler, fill in the status, and
  28. // return whether or not we finished safely (without an exception).
  29. // Note that exception handling is 0-cost in most compiler/library
  30. // implementations (except when an exception is actually thrown),
  31. // so this process doesn't require additional overhead in the common case.
  32. // Additionally, we don't need to return if we caught an exception or not;
  33. // the handling is the same in either case.
  34. template <class Callable>
  35. ::grpc::Status CatchingFunctionHandler(Callable&& handler) {
  36. #if GRPC_ALLOW_EXCEPTIONS
  37. try {
  38. return handler();
  39. } catch (...) {
  40. return grpc::Status(grpc::StatusCode::UNKNOWN,
  41. "Unexpected error in RPC handling");
  42. }
  43. #else // GRPC_ALLOW_EXCEPTIONS
  44. return handler();
  45. #endif // GRPC_ALLOW_EXCEPTIONS
  46. }
  47. /// A helper function with reduced templating to do the common work needed to
  48. /// actually send the server response. Uses non-const parameter for Status since
  49. /// this should only ever be called from the end of the RunHandler method.
  50. template <class ResponseType>
  51. void UnaryRunHandlerHelper(const MethodHandler::HandlerParameter& param,
  52. ResponseType* rsp, grpc::Status& status) {
  53. GPR_ASSERT(!param.server_context->sent_initial_metadata_);
  54. grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
  55. grpc::internal::CallOpSendMessage,
  56. grpc::internal::CallOpServerSendStatus>
  57. ops;
  58. ops.SendInitialMetadata(&param.server_context->initial_metadata_,
  59. param.server_context->initial_metadata_flags());
  60. if (param.server_context->compression_level_set()) {
  61. ops.set_compression_level(param.server_context->compression_level());
  62. }
  63. if (status.ok()) {
  64. status = ops.SendMessagePtr(rsp);
  65. }
  66. ops.ServerSendStatus(&param.server_context->trailing_metadata_, status);
  67. param.call->PerformOps(&ops);
  68. param.call->cq()->Pluck(&ops);
  69. }
  70. /// A helper function with reduced templating to do deserializing.
  71. template <class RequestType>
  72. void* UnaryDeserializeHelper(grpc_byte_buffer* req, grpc::Status* status,
  73. RequestType* request) {
  74. grpc::ByteBuffer buf;
  75. buf.set_buffer(req);
  76. *status = grpc::SerializationTraits<RequestType>::Deserialize(
  77. &buf, static_cast<RequestType*>(request));
  78. buf.Release();
  79. if (status->ok()) {
  80. return request;
  81. }
  82. request->~RequestType();
  83. return nullptr;
  84. }
  85. /// A wrapper class of an application provided rpc method handler.
  86. template <class ServiceType, class RequestType, class ResponseType,
  87. class BaseRequestType = RequestType,
  88. class BaseResponseType = ResponseType>
  89. class RpcMethodHandler : public grpc::internal::MethodHandler {
  90. public:
  91. RpcMethodHandler(
  92. std::function<grpc::Status(ServiceType*, grpc::ServerContext*,
  93. const RequestType*, ResponseType*)>
  94. func,
  95. ServiceType* service)
  96. : func_(func), service_(service) {}
  97. void RunHandler(const HandlerParameter& param) final {
  98. ResponseType rsp;
  99. grpc::Status status = param.status;
  100. if (status.ok()) {
  101. status = CatchingFunctionHandler([this, &param, &rsp] {
  102. return func_(service_,
  103. static_cast<grpc::ServerContext*>(param.server_context),
  104. static_cast<RequestType*>(param.request), &rsp);
  105. });
  106. static_cast<RequestType*>(param.request)->~RequestType();
  107. }
  108. UnaryRunHandlerHelper(param, static_cast<BaseResponseType*>(&rsp), status);
  109. }
  110. void* Deserialize(grpc_call* call, grpc_byte_buffer* req,
  111. grpc::Status* status, void** /*handler_data*/) final {
  112. auto* request =
  113. new (grpc_call_arena_alloc(call, sizeof(RequestType))) RequestType;
  114. return UnaryDeserializeHelper(req, status,
  115. static_cast<BaseRequestType*>(request));
  116. }
  117. private:
  118. /// Application provided rpc handler function.
  119. std::function<grpc::Status(ServiceType*, grpc::ServerContext*,
  120. const RequestType*, ResponseType*)>
  121. func_;
  122. // The class the above handler function lives in.
  123. ServiceType* service_;
  124. };
  125. /// A wrapper class of an application provided client streaming handler.
  126. template <class ServiceType, class RequestType, class ResponseType>
  127. class ClientStreamingHandler : public grpc::internal::MethodHandler {
  128. public:
  129. ClientStreamingHandler(
  130. std::function<grpc::Status(ServiceType*, grpc::ServerContext*,
  131. ServerReader<RequestType>*, ResponseType*)>
  132. func,
  133. ServiceType* service)
  134. : func_(func), service_(service) {}
  135. void RunHandler(const HandlerParameter& param) final {
  136. ServerReader<RequestType> reader(
  137. param.call, static_cast<grpc::ServerContext*>(param.server_context));
  138. ResponseType rsp;
  139. grpc::Status status =
  140. CatchingFunctionHandler([this, &param, &reader, &rsp] {
  141. return func_(service_,
  142. static_cast<grpc::ServerContext*>(param.server_context),
  143. &reader, &rsp);
  144. });
  145. grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
  146. grpc::internal::CallOpSendMessage,
  147. grpc::internal::CallOpServerSendStatus>
  148. ops;
  149. if (!param.server_context->sent_initial_metadata_) {
  150. ops.SendInitialMetadata(&param.server_context->initial_metadata_,
  151. param.server_context->initial_metadata_flags());
  152. if (param.server_context->compression_level_set()) {
  153. ops.set_compression_level(param.server_context->compression_level());
  154. }
  155. }
  156. if (status.ok()) {
  157. status = ops.SendMessagePtr(&rsp);
  158. }
  159. ops.ServerSendStatus(&param.server_context->trailing_metadata_, status);
  160. param.call->PerformOps(&ops);
  161. param.call->cq()->Pluck(&ops);
  162. }
  163. private:
  164. std::function<grpc::Status(ServiceType*, grpc::ServerContext*,
  165. ServerReader<RequestType>*, ResponseType*)>
  166. func_;
  167. ServiceType* service_;
  168. };
  169. /// A wrapper class of an application provided server streaming handler.
  170. template <class ServiceType, class RequestType, class ResponseType>
  171. class ServerStreamingHandler : public grpc::internal::MethodHandler {
  172. public:
  173. ServerStreamingHandler(std::function<grpc::Status(
  174. ServiceType*, grpc::ServerContext*,
  175. const RequestType*, ServerWriter<ResponseType>*)>
  176. func,
  177. ServiceType* service)
  178. : func_(func), service_(service) {}
  179. void RunHandler(const HandlerParameter& param) final {
  180. grpc::Status status = param.status;
  181. if (status.ok()) {
  182. ServerWriter<ResponseType> writer(
  183. param.call, static_cast<grpc::ServerContext*>(param.server_context));
  184. status = CatchingFunctionHandler([this, &param, &writer] {
  185. return func_(service_,
  186. static_cast<grpc::ServerContext*>(param.server_context),
  187. static_cast<RequestType*>(param.request), &writer);
  188. });
  189. static_cast<RequestType*>(param.request)->~RequestType();
  190. }
  191. grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
  192. grpc::internal::CallOpServerSendStatus>
  193. ops;
  194. if (!param.server_context->sent_initial_metadata_) {
  195. ops.SendInitialMetadata(&param.server_context->initial_metadata_,
  196. param.server_context->initial_metadata_flags());
  197. if (param.server_context->compression_level_set()) {
  198. ops.set_compression_level(param.server_context->compression_level());
  199. }
  200. }
  201. ops.ServerSendStatus(&param.server_context->trailing_metadata_, status);
  202. param.call->PerformOps(&ops);
  203. if (param.server_context->has_pending_ops_) {
  204. param.call->cq()->Pluck(&param.server_context->pending_ops_);
  205. }
  206. param.call->cq()->Pluck(&ops);
  207. }
  208. void* Deserialize(grpc_call* call, grpc_byte_buffer* req,
  209. grpc::Status* status, void** /*handler_data*/) final {
  210. grpc::ByteBuffer buf;
  211. buf.set_buffer(req);
  212. auto* request =
  213. new (grpc_call_arena_alloc(call, sizeof(RequestType))) RequestType();
  214. *status =
  215. grpc::SerializationTraits<RequestType>::Deserialize(&buf, request);
  216. buf.Release();
  217. if (status->ok()) {
  218. return request;
  219. }
  220. request->~RequestType();
  221. return nullptr;
  222. }
  223. private:
  224. std::function<grpc::Status(ServiceType*, grpc::ServerContext*,
  225. const RequestType*, ServerWriter<ResponseType>*)>
  226. func_;
  227. ServiceType* service_;
  228. };
  229. /// A wrapper class of an application provided bidi-streaming handler.
  230. /// This also applies to server-streamed implementation of a unary method
  231. /// with the additional requirement that such methods must have done a
  232. /// write for status to be ok
  233. /// Since this is used by more than 1 class, the service is not passed in.
  234. /// Instead, it is expected to be an implicitly-captured argument of func
  235. /// (through bind or something along those lines)
  236. template <class Streamer, bool WriteNeeded>
  237. class TemplatedBidiStreamingHandler : public grpc::internal::MethodHandler {
  238. public:
  239. explicit TemplatedBidiStreamingHandler(
  240. std::function<grpc::Status(grpc::ServerContext*, Streamer*)> func)
  241. : func_(func), write_needed_(WriteNeeded) {}
  242. void RunHandler(const HandlerParameter& param) final {
  243. Streamer stream(param.call,
  244. static_cast<grpc::ServerContext*>(param.server_context));
  245. grpc::Status status = CatchingFunctionHandler([this, &param, &stream] {
  246. return func_(static_cast<grpc::ServerContext*>(param.server_context),
  247. &stream);
  248. });
  249. grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
  250. grpc::internal::CallOpServerSendStatus>
  251. ops;
  252. if (!param.server_context->sent_initial_metadata_) {
  253. ops.SendInitialMetadata(&param.server_context->initial_metadata_,
  254. param.server_context->initial_metadata_flags());
  255. if (param.server_context->compression_level_set()) {
  256. ops.set_compression_level(param.server_context->compression_level());
  257. }
  258. if (write_needed_ && status.ok()) {
  259. // If we needed a write but never did one, we need to mark the
  260. // status as a fail
  261. status = grpc::Status(grpc::StatusCode::INTERNAL,
  262. "Service did not provide response message");
  263. }
  264. }
  265. ops.ServerSendStatus(&param.server_context->trailing_metadata_, status);
  266. param.call->PerformOps(&ops);
  267. if (param.server_context->has_pending_ops_) {
  268. param.call->cq()->Pluck(&param.server_context->pending_ops_);
  269. }
  270. param.call->cq()->Pluck(&ops);
  271. }
  272. private:
  273. std::function<grpc::Status(grpc::ServerContext*, Streamer*)> func_;
  274. const bool write_needed_;
  275. };
  276. template <class ServiceType, class RequestType, class ResponseType>
  277. class BidiStreamingHandler
  278. : public TemplatedBidiStreamingHandler<
  279. ServerReaderWriter<ResponseType, RequestType>, false> {
  280. public:
  281. BidiStreamingHandler(std::function<grpc::Status(
  282. ServiceType*, grpc::ServerContext*,
  283. ServerReaderWriter<ResponseType, RequestType>*)>
  284. func,
  285. ServiceType* service)
  286. // TODO(vjpai): When gRPC supports C++14, move-capture func in the below
  287. : TemplatedBidiStreamingHandler<
  288. ServerReaderWriter<ResponseType, RequestType>, false>(
  289. [func, service](
  290. grpc::ServerContext* ctx,
  291. ServerReaderWriter<ResponseType, RequestType>* streamer) {
  292. return func(service, ctx, streamer);
  293. }) {}
  294. };
  295. template <class RequestType, class ResponseType>
  296. class StreamedUnaryHandler
  297. : public TemplatedBidiStreamingHandler<
  298. ServerUnaryStreamer<RequestType, ResponseType>, true> {
  299. public:
  300. explicit StreamedUnaryHandler(
  301. std::function<
  302. grpc::Status(grpc::ServerContext*,
  303. ServerUnaryStreamer<RequestType, ResponseType>*)>
  304. func)
  305. : TemplatedBidiStreamingHandler<
  306. ServerUnaryStreamer<RequestType, ResponseType>, true>(
  307. std::move(func)) {}
  308. };
  309. template <class RequestType, class ResponseType>
  310. class SplitServerStreamingHandler
  311. : public TemplatedBidiStreamingHandler<
  312. ServerSplitStreamer<RequestType, ResponseType>, false> {
  313. public:
  314. explicit SplitServerStreamingHandler(
  315. std::function<
  316. grpc::Status(grpc::ServerContext*,
  317. ServerSplitStreamer<RequestType, ResponseType>*)>
  318. func)
  319. : TemplatedBidiStreamingHandler<
  320. ServerSplitStreamer<RequestType, ResponseType>, false>(
  321. std::move(func)) {}
  322. };
  323. /// General method handler class for errors that prevent real method use
  324. /// e.g., handle unknown method by returning UNIMPLEMENTED error.
  325. template <grpc::StatusCode code>
  326. class ErrorMethodHandler : public grpc::internal::MethodHandler {
  327. public:
  328. explicit ErrorMethodHandler(const TString& message) : message_(message) {}
  329. template <class T>
  330. static void FillOps(grpc::ServerContextBase* context,
  331. const TString& message, T* ops) {
  332. grpc::Status status(code, message);
  333. if (!context->sent_initial_metadata_) {
  334. ops->SendInitialMetadata(&context->initial_metadata_,
  335. context->initial_metadata_flags());
  336. if (context->compression_level_set()) {
  337. ops->set_compression_level(context->compression_level());
  338. }
  339. context->sent_initial_metadata_ = true;
  340. }
  341. ops->ServerSendStatus(&context->trailing_metadata_, status);
  342. }
  343. void RunHandler(const HandlerParameter& param) final {
  344. grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
  345. grpc::internal::CallOpServerSendStatus>
  346. ops;
  347. FillOps(param.server_context, message_, &ops);
  348. param.call->PerformOps(&ops);
  349. param.call->cq()->Pluck(&ops);
  350. }
  351. void* Deserialize(grpc_call* /*call*/, grpc_byte_buffer* req,
  352. grpc::Status* /*status*/, void** /*handler_data*/) final {
  353. // We have to destroy any request payload
  354. if (req != nullptr) {
  355. grpc_byte_buffer_destroy(req);
  356. }
  357. return nullptr;
  358. }
  359. private:
  360. const TString message_;
  361. };
  362. typedef ErrorMethodHandler<grpc::StatusCode::UNIMPLEMENTED>
  363. UnknownMethodHandler;
  364. typedef ErrorMethodHandler<grpc::StatusCode::RESOURCE_EXHAUSTED>
  365. ResourceExhaustedHandler;
  366. } // namespace internal
  367. } // namespace grpc
  368. #endif // GRPCPP_SUPPORT_METHOD_HANDLER_H