_base_channel.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348
  1. # Copyright 2020 The gRPC Authors
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. """Abstract base classes for Channel objects and Multicallable objects."""
  15. import abc
  16. from typing import Any, Optional
  17. import grpc
  18. from . import _base_call
  19. from ._typing import DeserializingFunction
  20. from ._typing import MetadataType
  21. from ._typing import RequestIterableType
  22. from ._typing import SerializingFunction
  23. class UnaryUnaryMultiCallable(abc.ABC):
  24. """Enables asynchronous invocation of a unary-call RPC."""
  25. @abc.abstractmethod
  26. def __call__(
  27. self,
  28. request: Any,
  29. *,
  30. timeout: Optional[float] = None,
  31. metadata: Optional[MetadataType] = None,
  32. credentials: Optional[grpc.CallCredentials] = None,
  33. wait_for_ready: Optional[bool] = None,
  34. compression: Optional[grpc.Compression] = None
  35. ) -> _base_call.UnaryUnaryCall:
  36. """Asynchronously invokes the underlying RPC.
  37. Args:
  38. request: The request value for the RPC.
  39. timeout: An optional duration of time in seconds to allow
  40. for the RPC.
  41. metadata: Optional :term:`metadata` to be transmitted to the
  42. service-side of the RPC.
  43. credentials: An optional CallCredentials for the RPC. Only valid for
  44. secure Channel.
  45. wait_for_ready: An optional flag to enable :term:`wait_for_ready` mechanism.
  46. compression: An element of grpc.compression, e.g.
  47. grpc.compression.Gzip.
  48. Returns:
  49. A UnaryUnaryCall object.
  50. Raises:
  51. RpcError: Indicates that the RPC terminated with non-OK status. The
  52. raised RpcError will also be a Call for the RPC affording the RPC's
  53. metadata, status code, and details.
  54. """
  55. class UnaryStreamMultiCallable(abc.ABC):
  56. """Enables asynchronous invocation of a server-streaming RPC."""
  57. @abc.abstractmethod
  58. def __call__(
  59. self,
  60. request: Any,
  61. *,
  62. timeout: Optional[float] = None,
  63. metadata: Optional[MetadataType] = None,
  64. credentials: Optional[grpc.CallCredentials] = None,
  65. wait_for_ready: Optional[bool] = None,
  66. compression: Optional[grpc.Compression] = None
  67. ) -> _base_call.UnaryStreamCall:
  68. """Asynchronously invokes the underlying RPC.
  69. Args:
  70. request: The request value for the RPC.
  71. timeout: An optional duration of time in seconds to allow
  72. for the RPC.
  73. metadata: Optional :term:`metadata` to be transmitted to the
  74. service-side of the RPC.
  75. credentials: An optional CallCredentials for the RPC. Only valid for
  76. secure Channel.
  77. wait_for_ready: An optional flag to enable :term:`wait_for_ready` mechanism.
  78. compression: An element of grpc.compression, e.g.
  79. grpc.compression.Gzip.
  80. Returns:
  81. A UnaryStreamCall object.
  82. Raises:
  83. RpcError: Indicates that the RPC terminated with non-OK status. The
  84. raised RpcError will also be a Call for the RPC affording the RPC's
  85. metadata, status code, and details.
  86. """
  87. class StreamUnaryMultiCallable(abc.ABC):
  88. """Enables asynchronous invocation of a client-streaming RPC."""
  89. @abc.abstractmethod
  90. def __call__(
  91. self,
  92. request_iterator: Optional[RequestIterableType] = None,
  93. timeout: Optional[float] = None,
  94. metadata: Optional[MetadataType] = None,
  95. credentials: Optional[grpc.CallCredentials] = None,
  96. wait_for_ready: Optional[bool] = None,
  97. compression: Optional[grpc.Compression] = None
  98. ) -> _base_call.StreamUnaryCall:
  99. """Asynchronously invokes the underlying RPC.
  100. Args:
  101. request_iterator: An optional async iterable or iterable of request
  102. messages for the RPC.
  103. timeout: An optional duration of time in seconds to allow
  104. for the RPC.
  105. metadata: Optional :term:`metadata` to be transmitted to the
  106. service-side of the RPC.
  107. credentials: An optional CallCredentials for the RPC. Only valid for
  108. secure Channel.
  109. wait_for_ready: An optional flag to enable :term:`wait_for_ready` mechanism.
  110. compression: An element of grpc.compression, e.g.
  111. grpc.compression.Gzip.
  112. Returns:
  113. A StreamUnaryCall object.
  114. Raises:
  115. RpcError: Indicates that the RPC terminated with non-OK status. The
  116. raised RpcError will also be a Call for the RPC affording the RPC's
  117. metadata, status code, and details.
  118. """
  119. class StreamStreamMultiCallable(abc.ABC):
  120. """Enables asynchronous invocation of a bidirectional-streaming RPC."""
  121. @abc.abstractmethod
  122. def __call__(
  123. self,
  124. request_iterator: Optional[RequestIterableType] = None,
  125. timeout: Optional[float] = None,
  126. metadata: Optional[MetadataType] = None,
  127. credentials: Optional[grpc.CallCredentials] = None,
  128. wait_for_ready: Optional[bool] = None,
  129. compression: Optional[grpc.Compression] = None
  130. ) -> _base_call.StreamStreamCall:
  131. """Asynchronously invokes the underlying RPC.
  132. Args:
  133. request_iterator: An optional async iterable or iterable of request
  134. messages for the RPC.
  135. timeout: An optional duration of time in seconds to allow
  136. for the RPC.
  137. metadata: Optional :term:`metadata` to be transmitted to the
  138. service-side of the RPC.
  139. credentials: An optional CallCredentials for the RPC. Only valid for
  140. secure Channel.
  141. wait_for_ready: An optional flag to enable :term:`wait_for_ready` mechanism.
  142. compression: An element of grpc.compression, e.g.
  143. grpc.compression.Gzip.
  144. Returns:
  145. A StreamStreamCall object.
  146. Raises:
  147. RpcError: Indicates that the RPC terminated with non-OK status. The
  148. raised RpcError will also be a Call for the RPC affording the RPC's
  149. metadata, status code, and details.
  150. """
  151. class Channel(abc.ABC):
  152. """Enables asynchronous RPC invocation as a client.
  153. Channel objects implement the Asynchronous Context Manager (aka. async
  154. with) type, although they are not supportted to be entered and exited
  155. multiple times.
  156. """
  157. @abc.abstractmethod
  158. async def __aenter__(self):
  159. """Starts an asynchronous context manager.
  160. Returns:
  161. Channel the channel that was instantiated.
  162. """
  163. @abc.abstractmethod
  164. async def __aexit__(self, exc_type, exc_val, exc_tb):
  165. """Finishes the asynchronous context manager by closing the channel.
  166. Still active RPCs will be cancelled.
  167. """
  168. @abc.abstractmethod
  169. async def close(self, grace: Optional[float] = None):
  170. """Closes this Channel and releases all resources held by it.
  171. This method immediately stops the channel from executing new RPCs in
  172. all cases.
  173. If a grace period is specified, this method wait until all active
  174. RPCs are finshed, once the grace period is reached the ones that haven't
  175. been terminated are cancelled. If a grace period is not specified
  176. (by passing None for grace), all existing RPCs are cancelled immediately.
  177. This method is idempotent.
  178. """
  179. @abc.abstractmethod
  180. def get_state(self,
  181. try_to_connect: bool = False) -> grpc.ChannelConnectivity:
  182. """Checks the connectivity state of a channel.
  183. This is an EXPERIMENTAL API.
  184. If the channel reaches a stable connectivity state, it is guaranteed
  185. that the return value of this function will eventually converge to that
  186. state.
  187. Args:
  188. try_to_connect: a bool indicate whether the Channel should try to
  189. connect to peer or not.
  190. Returns: A ChannelConnectivity object.
  191. """
  192. @abc.abstractmethod
  193. async def wait_for_state_change(
  194. self,
  195. last_observed_state: grpc.ChannelConnectivity,
  196. ) -> None:
  197. """Waits for a change in connectivity state.
  198. This is an EXPERIMENTAL API.
  199. The function blocks until there is a change in the channel connectivity
  200. state from the "last_observed_state". If the state is already
  201. different, this function will return immediately.
  202. There is an inherent race between the invocation of
  203. "Channel.wait_for_state_change" and "Channel.get_state". The state can
  204. change arbitrary many times during the race, so there is no way to
  205. observe every state transition.
  206. If there is a need to put a timeout for this function, please refer to
  207. "asyncio.wait_for".
  208. Args:
  209. last_observed_state: A grpc.ChannelConnectivity object representing
  210. the last known state.
  211. """
  212. @abc.abstractmethod
  213. async def channel_ready(self) -> None:
  214. """Creates a coroutine that blocks until the Channel is READY."""
  215. @abc.abstractmethod
  216. def unary_unary(
  217. self,
  218. method: str,
  219. request_serializer: Optional[SerializingFunction] = None,
  220. response_deserializer: Optional[DeserializingFunction] = None
  221. ) -> UnaryUnaryMultiCallable:
  222. """Creates a UnaryUnaryMultiCallable for a unary-unary method.
  223. Args:
  224. method: The name of the RPC method.
  225. request_serializer: Optional :term:`serializer` for serializing the request
  226. message. Request goes unserialized in case None is passed.
  227. response_deserializer: Optional :term:`deserializer` for deserializing the
  228. response message. Response goes undeserialized in case None
  229. is passed.
  230. Returns:
  231. A UnaryUnaryMultiCallable value for the named unary-unary method.
  232. """
  233. @abc.abstractmethod
  234. def unary_stream(
  235. self,
  236. method: str,
  237. request_serializer: Optional[SerializingFunction] = None,
  238. response_deserializer: Optional[DeserializingFunction] = None
  239. ) -> UnaryStreamMultiCallable:
  240. """Creates a UnaryStreamMultiCallable for a unary-stream method.
  241. Args:
  242. method: The name of the RPC method.
  243. request_serializer: Optional :term:`serializer` for serializing the request
  244. message. Request goes unserialized in case None is passed.
  245. response_deserializer: Optional :term:`deserializer` for deserializing the
  246. response message. Response goes undeserialized in case None
  247. is passed.
  248. Returns:
  249. A UnarySteramMultiCallable value for the named unary-stream method.
  250. """
  251. @abc.abstractmethod
  252. def stream_unary(
  253. self,
  254. method: str,
  255. request_serializer: Optional[SerializingFunction] = None,
  256. response_deserializer: Optional[DeserializingFunction] = None
  257. ) -> StreamUnaryMultiCallable:
  258. """Creates a StreamUnaryMultiCallable for a stream-unary method.
  259. Args:
  260. method: The name of the RPC method.
  261. request_serializer: Optional :term:`serializer` for serializing the request
  262. message. Request goes unserialized in case None is passed.
  263. response_deserializer: Optional :term:`deserializer` for deserializing the
  264. response message. Response goes undeserialized in case None
  265. is passed.
  266. Returns:
  267. A StreamUnaryMultiCallable value for the named stream-unary method.
  268. """
  269. @abc.abstractmethod
  270. def stream_stream(
  271. self,
  272. method: str,
  273. request_serializer: Optional[SerializingFunction] = None,
  274. response_deserializer: Optional[DeserializingFunction] = None
  275. ) -> StreamStreamMultiCallable:
  276. """Creates a StreamStreamMultiCallable for a stream-stream method.
  277. Args:
  278. method: The name of the RPC method.
  279. request_serializer: Optional :term:`serializer` for serializing the request
  280. message. Request goes unserialized in case None is passed.
  281. response_deserializer: Optional :term:`deserializer` for deserializing the
  282. response message. Response goes undeserialized in case None
  283. is passed.
  284. Returns:
  285. A StreamStreamMultiCallable value for the named stream-stream method.
  286. """