_server_adaptations.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385
  1. # Copyright 2016 gRPC authors.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. """Translates gRPC's server-side API into gRPC's server-side Beta API."""
  15. import collections
  16. import threading
  17. import grpc
  18. from grpc import _common
  19. from grpc.beta import _metadata
  20. from grpc.beta import interfaces
  21. from grpc.framework.common import cardinality
  22. from grpc.framework.common import style
  23. from grpc.framework.foundation import abandonment
  24. from grpc.framework.foundation import logging_pool
  25. from grpc.framework.foundation import stream
  26. from grpc.framework.interfaces.face import face
  27. # pylint: disable=too-many-return-statements
  28. _DEFAULT_POOL_SIZE = 8
  29. class _ServerProtocolContext(interfaces.GRPCServicerContext):
  30. def __init__(self, servicer_context):
  31. self._servicer_context = servicer_context
  32. def peer(self):
  33. return self._servicer_context.peer()
  34. def disable_next_response_compression(self):
  35. pass # TODO(https://github.com/grpc/grpc/issues/4078): design, implement.
  36. class _FaceServicerContext(face.ServicerContext):
  37. def __init__(self, servicer_context):
  38. self._servicer_context = servicer_context
  39. def is_active(self):
  40. return self._servicer_context.is_active()
  41. def time_remaining(self):
  42. return self._servicer_context.time_remaining()
  43. def add_abortion_callback(self, abortion_callback):
  44. raise NotImplementedError(
  45. 'add_abortion_callback no longer supported server-side!')
  46. def cancel(self):
  47. self._servicer_context.cancel()
  48. def protocol_context(self):
  49. return _ServerProtocolContext(self._servicer_context)
  50. def invocation_metadata(self):
  51. return _metadata.beta(self._servicer_context.invocation_metadata())
  52. def initial_metadata(self, initial_metadata):
  53. self._servicer_context.send_initial_metadata(
  54. _metadata.unbeta(initial_metadata))
  55. def terminal_metadata(self, terminal_metadata):
  56. self._servicer_context.set_terminal_metadata(
  57. _metadata.unbeta(terminal_metadata))
  58. def code(self, code):
  59. self._servicer_context.set_code(code)
  60. def details(self, details):
  61. self._servicer_context.set_details(details)
  62. def _adapt_unary_request_inline(unary_request_inline):
  63. def adaptation(request, servicer_context):
  64. return unary_request_inline(request,
  65. _FaceServicerContext(servicer_context))
  66. return adaptation
  67. def _adapt_stream_request_inline(stream_request_inline):
  68. def adaptation(request_iterator, servicer_context):
  69. return stream_request_inline(request_iterator,
  70. _FaceServicerContext(servicer_context))
  71. return adaptation
  72. class _Callback(stream.Consumer):
  73. def __init__(self):
  74. self._condition = threading.Condition()
  75. self._values = []
  76. self._terminated = False
  77. self._cancelled = False
  78. def consume(self, value):
  79. with self._condition:
  80. self._values.append(value)
  81. self._condition.notify_all()
  82. def terminate(self):
  83. with self._condition:
  84. self._terminated = True
  85. self._condition.notify_all()
  86. def consume_and_terminate(self, value):
  87. with self._condition:
  88. self._values.append(value)
  89. self._terminated = True
  90. self._condition.notify_all()
  91. def cancel(self):
  92. with self._condition:
  93. self._cancelled = True
  94. self._condition.notify_all()
  95. def draw_one_value(self):
  96. with self._condition:
  97. while True:
  98. if self._cancelled:
  99. raise abandonment.Abandoned()
  100. elif self._values:
  101. return self._values.pop(0)
  102. elif self._terminated:
  103. return None
  104. else:
  105. self._condition.wait()
  106. def draw_all_values(self):
  107. with self._condition:
  108. while True:
  109. if self._cancelled:
  110. raise abandonment.Abandoned()
  111. elif self._terminated:
  112. all_values = tuple(self._values)
  113. self._values = None
  114. return all_values
  115. else:
  116. self._condition.wait()
  117. def _run_request_pipe_thread(request_iterator, request_consumer,
  118. servicer_context):
  119. thread_joined = threading.Event()
  120. def pipe_requests():
  121. for request in request_iterator:
  122. if not servicer_context.is_active() or thread_joined.is_set():
  123. return
  124. request_consumer.consume(request)
  125. if not servicer_context.is_active() or thread_joined.is_set():
  126. return
  127. request_consumer.terminate()
  128. request_pipe_thread = threading.Thread(target=pipe_requests)
  129. request_pipe_thread.daemon = True
  130. request_pipe_thread.start()
  131. def _adapt_unary_unary_event(unary_unary_event):
  132. def adaptation(request, servicer_context):
  133. callback = _Callback()
  134. if not servicer_context.add_callback(callback.cancel):
  135. raise abandonment.Abandoned()
  136. unary_unary_event(request, callback.consume_and_terminate,
  137. _FaceServicerContext(servicer_context))
  138. return callback.draw_all_values()[0]
  139. return adaptation
  140. def _adapt_unary_stream_event(unary_stream_event):
  141. def adaptation(request, servicer_context):
  142. callback = _Callback()
  143. if not servicer_context.add_callback(callback.cancel):
  144. raise abandonment.Abandoned()
  145. unary_stream_event(request, callback,
  146. _FaceServicerContext(servicer_context))
  147. while True:
  148. response = callback.draw_one_value()
  149. if response is None:
  150. return
  151. else:
  152. yield response
  153. return adaptation
  154. def _adapt_stream_unary_event(stream_unary_event):
  155. def adaptation(request_iterator, servicer_context):
  156. callback = _Callback()
  157. if not servicer_context.add_callback(callback.cancel):
  158. raise abandonment.Abandoned()
  159. request_consumer = stream_unary_event(
  160. callback.consume_and_terminate,
  161. _FaceServicerContext(servicer_context))
  162. _run_request_pipe_thread(request_iterator, request_consumer,
  163. servicer_context)
  164. return callback.draw_all_values()[0]
  165. return adaptation
  166. def _adapt_stream_stream_event(stream_stream_event):
  167. def adaptation(request_iterator, servicer_context):
  168. callback = _Callback()
  169. if not servicer_context.add_callback(callback.cancel):
  170. raise abandonment.Abandoned()
  171. request_consumer = stream_stream_event(
  172. callback, _FaceServicerContext(servicer_context))
  173. _run_request_pipe_thread(request_iterator, request_consumer,
  174. servicer_context)
  175. while True:
  176. response = callback.draw_one_value()
  177. if response is None:
  178. return
  179. else:
  180. yield response
  181. return adaptation
  182. class _SimpleMethodHandler(
  183. collections.namedtuple('_MethodHandler', (
  184. 'request_streaming',
  185. 'response_streaming',
  186. 'request_deserializer',
  187. 'response_serializer',
  188. 'unary_unary',
  189. 'unary_stream',
  190. 'stream_unary',
  191. 'stream_stream',
  192. )), grpc.RpcMethodHandler):
  193. pass
  194. def _simple_method_handler(implementation, request_deserializer,
  195. response_serializer):
  196. if implementation.style is style.Service.INLINE:
  197. if implementation.cardinality is cardinality.Cardinality.UNARY_UNARY:
  198. return _SimpleMethodHandler(
  199. False, False, request_deserializer, response_serializer,
  200. _adapt_unary_request_inline(implementation.unary_unary_inline),
  201. None, None, None)
  202. elif implementation.cardinality is cardinality.Cardinality.UNARY_STREAM:
  203. return _SimpleMethodHandler(
  204. False, True, request_deserializer, response_serializer, None,
  205. _adapt_unary_request_inline(implementation.unary_stream_inline),
  206. None, None)
  207. elif implementation.cardinality is cardinality.Cardinality.STREAM_UNARY:
  208. return _SimpleMethodHandler(
  209. True, False, request_deserializer, response_serializer, None,
  210. None,
  211. _adapt_stream_request_inline(
  212. implementation.stream_unary_inline), None)
  213. elif implementation.cardinality is cardinality.Cardinality.STREAM_STREAM:
  214. return _SimpleMethodHandler(
  215. True, True, request_deserializer, response_serializer, None,
  216. None, None,
  217. _adapt_stream_request_inline(
  218. implementation.stream_stream_inline))
  219. elif implementation.style is style.Service.EVENT:
  220. if implementation.cardinality is cardinality.Cardinality.UNARY_UNARY:
  221. return _SimpleMethodHandler(
  222. False, False, request_deserializer, response_serializer,
  223. _adapt_unary_unary_event(implementation.unary_unary_event),
  224. None, None, None)
  225. elif implementation.cardinality is cardinality.Cardinality.UNARY_STREAM:
  226. return _SimpleMethodHandler(
  227. False, True, request_deserializer, response_serializer, None,
  228. _adapt_unary_stream_event(implementation.unary_stream_event),
  229. None, None)
  230. elif implementation.cardinality is cardinality.Cardinality.STREAM_UNARY:
  231. return _SimpleMethodHandler(
  232. True, False, request_deserializer, response_serializer, None,
  233. None,
  234. _adapt_stream_unary_event(implementation.stream_unary_event),
  235. None)
  236. elif implementation.cardinality is cardinality.Cardinality.STREAM_STREAM:
  237. return _SimpleMethodHandler(
  238. True, True, request_deserializer, response_serializer, None,
  239. None, None,
  240. _adapt_stream_stream_event(implementation.stream_stream_event))
  241. raise ValueError()
  242. def _flatten_method_pair_map(method_pair_map):
  243. method_pair_map = method_pair_map or {}
  244. flat_map = {}
  245. for method_pair in method_pair_map:
  246. method = _common.fully_qualified_method(method_pair[0], method_pair[1])
  247. flat_map[method] = method_pair_map[method_pair]
  248. return flat_map
  249. class _GenericRpcHandler(grpc.GenericRpcHandler):
  250. def __init__(self, method_implementations, multi_method_implementation,
  251. request_deserializers, response_serializers):
  252. self._method_implementations = _flatten_method_pair_map(
  253. method_implementations)
  254. self._request_deserializers = _flatten_method_pair_map(
  255. request_deserializers)
  256. self._response_serializers = _flatten_method_pair_map(
  257. response_serializers)
  258. self._multi_method_implementation = multi_method_implementation
  259. def service(self, handler_call_details):
  260. method_implementation = self._method_implementations.get(
  261. handler_call_details.method)
  262. if method_implementation is not None:
  263. return _simple_method_handler(
  264. method_implementation,
  265. self._request_deserializers.get(handler_call_details.method),
  266. self._response_serializers.get(handler_call_details.method))
  267. elif self._multi_method_implementation is None:
  268. return None
  269. else:
  270. try:
  271. return None #TODO(nathaniel): call the multimethod.
  272. except face.NoSuchMethodError:
  273. return None
  274. class _Server(interfaces.Server):
  275. def __init__(self, grpc_server):
  276. self._grpc_server = grpc_server
  277. def add_insecure_port(self, address):
  278. return self._grpc_server.add_insecure_port(address)
  279. def add_secure_port(self, address, server_credentials):
  280. return self._grpc_server.add_secure_port(address, server_credentials)
  281. def start(self):
  282. self._grpc_server.start()
  283. def stop(self, grace):
  284. return self._grpc_server.stop(grace)
  285. def __enter__(self):
  286. self._grpc_server.start()
  287. return self
  288. def __exit__(self, exc_type, exc_val, exc_tb):
  289. self._grpc_server.stop(None)
  290. return False
  291. def server(service_implementations, multi_method_implementation,
  292. request_deserializers, response_serializers, thread_pool,
  293. thread_pool_size):
  294. generic_rpc_handler = _GenericRpcHandler(service_implementations,
  295. multi_method_implementation,
  296. request_deserializers,
  297. response_serializers)
  298. if thread_pool is None:
  299. effective_thread_pool = logging_pool.pool(
  300. _DEFAULT_POOL_SIZE if thread_pool_size is None else thread_pool_size
  301. )
  302. else:
  303. effective_thread_pool = thread_pool
  304. return _Server(
  305. grpc.server(effective_thread_pool, handlers=(generic_rpc_handler,)))