_base_channel.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352
  1. # Copyright 2020 The gRPC Authors
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. """Abstract base classes for Channel objects and Multicallable objects."""
  15. import abc
  16. from typing import Any, Optional
  17. import grpc
  18. from . import _base_call
  19. from ._typing import DeserializingFunction
  20. from ._typing import MetadataType
  21. from ._typing import RequestIterableType
  22. from ._typing import SerializingFunction
  23. class UnaryUnaryMultiCallable(abc.ABC):
  24. """Enables asynchronous invocation of a unary-call RPC."""
  25. @abc.abstractmethod
  26. def __call__(
  27. self,
  28. request: Any,
  29. *,
  30. timeout: Optional[float] = None,
  31. metadata: Optional[MetadataType] = None,
  32. credentials: Optional[grpc.CallCredentials] = None,
  33. wait_for_ready: Optional[bool] = None,
  34. compression: Optional[grpc.Compression] = None
  35. ) -> _base_call.UnaryUnaryCall:
  36. """Asynchronously invokes the underlying RPC.
  37. Args:
  38. request: The request value for the RPC.
  39. timeout: An optional duration of time in seconds to allow
  40. for the RPC.
  41. metadata: Optional :term:`metadata` to be transmitted to the
  42. service-side of the RPC.
  43. credentials: An optional CallCredentials for the RPC. Only valid for
  44. secure Channel.
  45. wait_for_ready: This is an EXPERIMENTAL argument. An optional
  46. flag to enable :term:`wait_for_ready` mechanism.
  47. compression: An element of grpc.compression, e.g.
  48. grpc.compression.Gzip. This is an EXPERIMENTAL option.
  49. Returns:
  50. A UnaryUnaryCall object.
  51. Raises:
  52. RpcError: Indicates that the RPC terminated with non-OK status. The
  53. raised RpcError will also be a Call for the RPC affording the RPC's
  54. metadata, status code, and details.
  55. """
  56. class UnaryStreamMultiCallable(abc.ABC):
  57. """Enables asynchronous invocation of a server-streaming RPC."""
  58. @abc.abstractmethod
  59. def __call__(
  60. self,
  61. request: Any,
  62. *,
  63. timeout: Optional[float] = None,
  64. metadata: Optional[MetadataType] = None,
  65. credentials: Optional[grpc.CallCredentials] = None,
  66. wait_for_ready: Optional[bool] = None,
  67. compression: Optional[grpc.Compression] = None
  68. ) -> _base_call.UnaryStreamCall:
  69. """Asynchronously invokes the underlying RPC.
  70. Args:
  71. request: The request value for the RPC.
  72. timeout: An optional duration of time in seconds to allow
  73. for the RPC.
  74. metadata: Optional :term:`metadata` to be transmitted to the
  75. service-side of the RPC.
  76. credentials: An optional CallCredentials for the RPC. Only valid for
  77. secure Channel.
  78. wait_for_ready: This is an EXPERIMENTAL argument. An optional
  79. flag to enable :term:`wait_for_ready` mechanism.
  80. compression: An element of grpc.compression, e.g.
  81. grpc.compression.Gzip. This is an EXPERIMENTAL option.
  82. Returns:
  83. A UnaryStreamCall object.
  84. Raises:
  85. RpcError: Indicates that the RPC terminated with non-OK status. The
  86. raised RpcError will also be a Call for the RPC affording the RPC's
  87. metadata, status code, and details.
  88. """
  89. class StreamUnaryMultiCallable(abc.ABC):
  90. """Enables asynchronous invocation of a client-streaming RPC."""
  91. @abc.abstractmethod
  92. def __call__(
  93. self,
  94. request_iterator: Optional[RequestIterableType] = None,
  95. timeout: Optional[float] = None,
  96. metadata: Optional[MetadataType] = None,
  97. credentials: Optional[grpc.CallCredentials] = None,
  98. wait_for_ready: Optional[bool] = None,
  99. compression: Optional[grpc.Compression] = None
  100. ) -> _base_call.StreamUnaryCall:
  101. """Asynchronously invokes the underlying RPC.
  102. Args:
  103. request_iterator: An optional async iterable or iterable of request
  104. messages for the RPC.
  105. timeout: An optional duration of time in seconds to allow
  106. for the RPC.
  107. metadata: Optional :term:`metadata` to be transmitted to the
  108. service-side of the RPC.
  109. credentials: An optional CallCredentials for the RPC. Only valid for
  110. secure Channel.
  111. wait_for_ready: This is an EXPERIMENTAL argument. An optional
  112. flag to enable :term:`wait_for_ready` mechanism.
  113. compression: An element of grpc.compression, e.g.
  114. grpc.compression.Gzip. This is an EXPERIMENTAL option.
  115. Returns:
  116. A StreamUnaryCall object.
  117. Raises:
  118. RpcError: Indicates that the RPC terminated with non-OK status. The
  119. raised RpcError will also be a Call for the RPC affording the RPC's
  120. metadata, status code, and details.
  121. """
  122. class StreamStreamMultiCallable(abc.ABC):
  123. """Enables asynchronous invocation of a bidirectional-streaming RPC."""
  124. @abc.abstractmethod
  125. def __call__(
  126. self,
  127. request_iterator: Optional[RequestIterableType] = None,
  128. timeout: Optional[float] = None,
  129. metadata: Optional[MetadataType] = None,
  130. credentials: Optional[grpc.CallCredentials] = None,
  131. wait_for_ready: Optional[bool] = None,
  132. compression: Optional[grpc.Compression] = None
  133. ) -> _base_call.StreamStreamCall:
  134. """Asynchronously invokes the underlying RPC.
  135. Args:
  136. request_iterator: An optional async iterable or iterable of request
  137. messages for the RPC.
  138. timeout: An optional duration of time in seconds to allow
  139. for the RPC.
  140. metadata: Optional :term:`metadata` to be transmitted to the
  141. service-side of the RPC.
  142. credentials: An optional CallCredentials for the RPC. Only valid for
  143. secure Channel.
  144. wait_for_ready: This is an EXPERIMENTAL argument. An optional
  145. flag to enable :term:`wait_for_ready` mechanism.
  146. compression: An element of grpc.compression, e.g.
  147. grpc.compression.Gzip. This is an EXPERIMENTAL option.
  148. Returns:
  149. A StreamStreamCall object.
  150. Raises:
  151. RpcError: Indicates that the RPC terminated with non-OK status. The
  152. raised RpcError will also be a Call for the RPC affording the RPC's
  153. metadata, status code, and details.
  154. """
  155. class Channel(abc.ABC):
  156. """Enables asynchronous RPC invocation as a client.
  157. Channel objects implement the Asynchronous Context Manager (aka. async
  158. with) type, although they are not supportted to be entered and exited
  159. multiple times.
  160. """
  161. @abc.abstractmethod
  162. async def __aenter__(self):
  163. """Starts an asynchronous context manager.
  164. Returns:
  165. Channel the channel that was instantiated.
  166. """
  167. @abc.abstractmethod
  168. async def __aexit__(self, exc_type, exc_val, exc_tb):
  169. """Finishes the asynchronous context manager by closing the channel.
  170. Still active RPCs will be cancelled.
  171. """
  172. @abc.abstractmethod
  173. async def close(self, grace: Optional[float] = None):
  174. """Closes this Channel and releases all resources held by it.
  175. This method immediately stops the channel from executing new RPCs in
  176. all cases.
  177. If a grace period is specified, this method wait until all active
  178. RPCs are finshed, once the grace period is reached the ones that haven't
  179. been terminated are cancelled. If a grace period is not specified
  180. (by passing None for grace), all existing RPCs are cancelled immediately.
  181. This method is idempotent.
  182. """
  183. @abc.abstractmethod
  184. def get_state(self,
  185. try_to_connect: bool = False) -> grpc.ChannelConnectivity:
  186. """Checks the connectivity state of a channel.
  187. This is an EXPERIMENTAL API.
  188. If the channel reaches a stable connectivity state, it is guaranteed
  189. that the return value of this function will eventually converge to that
  190. state.
  191. Args:
  192. try_to_connect: a bool indicate whether the Channel should try to
  193. connect to peer or not.
  194. Returns: A ChannelConnectivity object.
  195. """
  196. @abc.abstractmethod
  197. async def wait_for_state_change(
  198. self,
  199. last_observed_state: grpc.ChannelConnectivity,
  200. ) -> None:
  201. """Waits for a change in connectivity state.
  202. This is an EXPERIMENTAL API.
  203. The function blocks until there is a change in the channel connectivity
  204. state from the "last_observed_state". If the state is already
  205. different, this function will return immediately.
  206. There is an inherent race between the invocation of
  207. "Channel.wait_for_state_change" and "Channel.get_state". The state can
  208. change arbitrary many times during the race, so there is no way to
  209. observe every state transition.
  210. If there is a need to put a timeout for this function, please refer to
  211. "asyncio.wait_for".
  212. Args:
  213. last_observed_state: A grpc.ChannelConnectivity object representing
  214. the last known state.
  215. """
  216. @abc.abstractmethod
  217. async def channel_ready(self) -> None:
  218. """Creates a coroutine that blocks until the Channel is READY."""
  219. @abc.abstractmethod
  220. def unary_unary(
  221. self,
  222. method: str,
  223. request_serializer: Optional[SerializingFunction] = None,
  224. response_deserializer: Optional[DeserializingFunction] = None
  225. ) -> UnaryUnaryMultiCallable:
  226. """Creates a UnaryUnaryMultiCallable for a unary-unary method.
  227. Args:
  228. method: The name of the RPC method.
  229. request_serializer: Optional :term:`serializer` for serializing the request
  230. message. Request goes unserialized in case None is passed.
  231. response_deserializer: Optional :term:`deserializer` for deserializing the
  232. response message. Response goes undeserialized in case None
  233. is passed.
  234. Returns:
  235. A UnaryUnaryMultiCallable value for the named unary-unary method.
  236. """
  237. @abc.abstractmethod
  238. def unary_stream(
  239. self,
  240. method: str,
  241. request_serializer: Optional[SerializingFunction] = None,
  242. response_deserializer: Optional[DeserializingFunction] = None
  243. ) -> UnaryStreamMultiCallable:
  244. """Creates a UnaryStreamMultiCallable for a unary-stream method.
  245. Args:
  246. method: The name of the RPC method.
  247. request_serializer: Optional :term:`serializer` for serializing the request
  248. message. Request goes unserialized in case None is passed.
  249. response_deserializer: Optional :term:`deserializer` for deserializing the
  250. response message. Response goes undeserialized in case None
  251. is passed.
  252. Returns:
  253. A UnarySteramMultiCallable value for the named unary-stream method.
  254. """
  255. @abc.abstractmethod
  256. def stream_unary(
  257. self,
  258. method: str,
  259. request_serializer: Optional[SerializingFunction] = None,
  260. response_deserializer: Optional[DeserializingFunction] = None
  261. ) -> StreamUnaryMultiCallable:
  262. """Creates a StreamUnaryMultiCallable for a stream-unary method.
  263. Args:
  264. method: The name of the RPC method.
  265. request_serializer: Optional :term:`serializer` for serializing the request
  266. message. Request goes unserialized in case None is passed.
  267. response_deserializer: Optional :term:`deserializer` for deserializing the
  268. response message. Response goes undeserialized in case None
  269. is passed.
  270. Returns:
  271. A StreamUnaryMultiCallable value for the named stream-unary method.
  272. """
  273. @abc.abstractmethod
  274. def stream_stream(
  275. self,
  276. method: str,
  277. request_serializer: Optional[SerializingFunction] = None,
  278. response_deserializer: Optional[DeserializingFunction] = None
  279. ) -> StreamStreamMultiCallable:
  280. """Creates a StreamStreamMultiCallable for a stream-stream method.
  281. Args:
  282. method: The name of the RPC method.
  283. request_serializer: Optional :term:`serializer` for serializing the request
  284. message. Request goes unserialized in case None is passed.
  285. response_deserializer: Optional :term:`deserializer` for deserializing the
  286. response message. Response goes undeserialized in case None
  287. is passed.
  288. Returns:
  289. A StreamStreamMultiCallable value for the named stream-stream method.
  290. """