_simple_stubs.py 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486
  1. # Copyright 2020 The gRPC authors.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. """Functions that obviate explicit stubs and explicit channels."""
  15. import collections
  16. import datetime
  17. import logging
  18. import os
  19. import threading
  20. from typing import (Any, AnyStr, Callable, Dict, Iterator, Optional, Sequence,
  21. Tuple, TypeVar, Union)
  22. import grpc
  23. from grpc.experimental import experimental_api
  24. RequestType = TypeVar('RequestType')
  25. ResponseType = TypeVar('ResponseType')
  26. OptionsType = Sequence[Tuple[str, str]]
  27. CacheKey = Tuple[str, OptionsType, Optional[grpc.ChannelCredentials],
  28. Optional[grpc.Compression]]
  29. _LOGGER = logging.getLogger(__name__)
  30. _EVICTION_PERIOD_KEY = "GRPC_PYTHON_MANAGED_CHANNEL_EVICTION_SECONDS"
  31. if _EVICTION_PERIOD_KEY in os.environ:
  32. _EVICTION_PERIOD = datetime.timedelta(
  33. seconds=float(os.environ[_EVICTION_PERIOD_KEY]))
  34. _LOGGER.debug("Setting managed channel eviction period to %s",
  35. _EVICTION_PERIOD)
  36. else:
  37. _EVICTION_PERIOD = datetime.timedelta(minutes=10)
  38. _MAXIMUM_CHANNELS_KEY = "GRPC_PYTHON_MANAGED_CHANNEL_MAXIMUM"
  39. if _MAXIMUM_CHANNELS_KEY in os.environ:
  40. _MAXIMUM_CHANNELS = int(os.environ[_MAXIMUM_CHANNELS_KEY])
  41. _LOGGER.debug("Setting maximum managed channels to %d", _MAXIMUM_CHANNELS)
  42. else:
  43. _MAXIMUM_CHANNELS = 2**8
  44. _DEFAULT_TIMEOUT_KEY = "GRPC_PYTHON_DEFAULT_TIMEOUT_SECONDS"
  45. if _DEFAULT_TIMEOUT_KEY in os.environ:
  46. _DEFAULT_TIMEOUT = float(os.environ[_DEFAULT_TIMEOUT_KEY])
  47. _LOGGER.debug("Setting default timeout seconds to %f", _DEFAULT_TIMEOUT)
  48. else:
  49. _DEFAULT_TIMEOUT = 60.0
  50. def _create_channel(target: str, options: Sequence[Tuple[str, str]],
  51. channel_credentials: Optional[grpc.ChannelCredentials],
  52. compression: Optional[grpc.Compression]) -> grpc.Channel:
  53. _LOGGER.debug(
  54. f"Creating secure channel with credentials '{channel_credentials}', " +
  55. f"options '{options}' and compression '{compression}'")
  56. return grpc.secure_channel(target,
  57. credentials=channel_credentials,
  58. options=options,
  59. compression=compression)
  60. class ChannelCache:
  61. # NOTE(rbellevi): Untyped due to reference cycle.
  62. _singleton = None
  63. _lock: threading.RLock = threading.RLock()
  64. _condition: threading.Condition = threading.Condition(lock=_lock)
  65. _eviction_ready: threading.Event = threading.Event()
  66. _mapping: Dict[CacheKey, Tuple[grpc.Channel, datetime.datetime]]
  67. _eviction_thread: threading.Thread
  68. def __init__(self):
  69. self._mapping = collections.OrderedDict()
  70. self._eviction_thread = threading.Thread(
  71. target=ChannelCache._perform_evictions, daemon=True)
  72. self._eviction_thread.start()
  73. @staticmethod
  74. def get():
  75. with ChannelCache._lock:
  76. if ChannelCache._singleton is None:
  77. ChannelCache._singleton = ChannelCache()
  78. ChannelCache._eviction_ready.wait()
  79. return ChannelCache._singleton
  80. def _evict_locked(self, key: CacheKey):
  81. channel, _ = self._mapping.pop(key)
  82. _LOGGER.debug("Evicting channel %s with configuration %s.", channel,
  83. key)
  84. channel.close()
  85. del channel
  86. @staticmethod
  87. def _perform_evictions():
  88. while True:
  89. with ChannelCache._lock:
  90. ChannelCache._eviction_ready.set()
  91. if not ChannelCache._singleton._mapping:
  92. ChannelCache._condition.wait()
  93. elif len(ChannelCache._singleton._mapping) > _MAXIMUM_CHANNELS:
  94. key = next(iter(ChannelCache._singleton._mapping.keys()))
  95. ChannelCache._singleton._evict_locked(key)
  96. # And immediately reevaluate.
  97. else:
  98. key, (_, eviction_time) = next(
  99. iter(ChannelCache._singleton._mapping.items()))
  100. now = datetime.datetime.now()
  101. if eviction_time <= now:
  102. ChannelCache._singleton._evict_locked(key)
  103. continue
  104. else:
  105. time_to_eviction = (eviction_time - now).total_seconds()
  106. # NOTE: We aim to *eventually* coalesce to a state in
  107. # which no overdue channels are in the cache and the
  108. # length of the cache is longer than _MAXIMUM_CHANNELS.
  109. # We tolerate momentary states in which these two
  110. # criteria are not met.
  111. ChannelCache._condition.wait(timeout=time_to_eviction)
  112. def get_channel(self, target: str, options: Sequence[Tuple[str, str]],
  113. channel_credentials: Optional[grpc.ChannelCredentials],
  114. insecure: bool,
  115. compression: Optional[grpc.Compression]) -> grpc.Channel:
  116. if insecure and channel_credentials:
  117. raise ValueError("The insecure option is mutually exclusive with " +
  118. "the channel_credentials option. Please use one " +
  119. "or the other.")
  120. if insecure:
  121. channel_credentials = grpc.experimental.insecure_channel_credentials(
  122. )
  123. elif channel_credentials is None:
  124. _LOGGER.debug("Defaulting to SSL channel credentials.")
  125. channel_credentials = grpc.ssl_channel_credentials()
  126. key = (target, options, channel_credentials, compression)
  127. with self._lock:
  128. channel_data = self._mapping.get(key, None)
  129. if channel_data is not None:
  130. channel = channel_data[0]
  131. self._mapping.pop(key)
  132. self._mapping[key] = (channel, datetime.datetime.now() +
  133. _EVICTION_PERIOD)
  134. return channel
  135. else:
  136. channel = _create_channel(target, options, channel_credentials,
  137. compression)
  138. self._mapping[key] = (channel, datetime.datetime.now() +
  139. _EVICTION_PERIOD)
  140. if len(self._mapping) == 1 or len(
  141. self._mapping) >= _MAXIMUM_CHANNELS:
  142. self._condition.notify()
  143. return channel
  144. def _test_only_channel_count(self) -> int:
  145. with self._lock:
  146. return len(self._mapping)
  147. @experimental_api
  148. def unary_unary(
  149. request: RequestType,
  150. target: str,
  151. method: str,
  152. request_serializer: Optional[Callable[[Any], bytes]] = None,
  153. response_deserializer: Optional[Callable[[bytes], Any]] = None,
  154. options: Sequence[Tuple[AnyStr, AnyStr]] = (),
  155. channel_credentials: Optional[grpc.ChannelCredentials] = None,
  156. insecure: bool = False,
  157. call_credentials: Optional[grpc.CallCredentials] = None,
  158. compression: Optional[grpc.Compression] = None,
  159. wait_for_ready: Optional[bool] = None,
  160. timeout: Optional[float] = _DEFAULT_TIMEOUT,
  161. metadata: Optional[Sequence[Tuple[str, Union[str, bytes]]]] = None
  162. ) -> ResponseType:
  163. """Invokes a unary-unary RPC without an explicitly specified channel.
  164. THIS IS AN EXPERIMENTAL API.
  165. This is backed by a per-process cache of channels. Channels are evicted
  166. from the cache after a fixed period by a background. Channels will also be
  167. evicted if more than a configured maximum accumulate.
  168. The default eviction period is 10 minutes. One may set the environment
  169. variable "GRPC_PYTHON_MANAGED_CHANNEL_EVICTION_SECONDS" to configure this.
  170. The default maximum number of channels is 256. One may set the
  171. environment variable "GRPC_PYTHON_MANAGED_CHANNEL_MAXIMUM" to configure
  172. this.
  173. Args:
  174. request: An iterator that yields request values for the RPC.
  175. target: The server address.
  176. method: The name of the RPC method.
  177. request_serializer: Optional :term:`serializer` for serializing the request
  178. message. Request goes unserialized in case None is passed.
  179. response_deserializer: Optional :term:`deserializer` for deserializing the response
  180. message. Response goes undeserialized in case None is passed.
  181. options: An optional list of key-value pairs (:term:`channel_arguments` in gRPC Core
  182. runtime) to configure the channel.
  183. channel_credentials: A credential applied to the whole channel, e.g. the
  184. return value of grpc.ssl_channel_credentials() or
  185. grpc.insecure_channel_credentials().
  186. insecure: If True, specifies channel_credentials as
  187. :term:`grpc.insecure_channel_credentials()`. This option is mutually
  188. exclusive with the `channel_credentials` option.
  189. call_credentials: A call credential applied to each call individually,
  190. e.g. the output of grpc.metadata_call_credentials() or
  191. grpc.access_token_call_credentials().
  192. compression: An optional value indicating the compression method to be
  193. used over the lifetime of the channel, e.g. grpc.Compression.Gzip.
  194. wait_for_ready: An optional flag indicating whether the RPC should fail
  195. immediately if the connection is not ready at the time the RPC is
  196. invoked, or if it should wait until the connection to the server
  197. becomes ready. When using this option, the user will likely also want
  198. to set a timeout. Defaults to True.
  199. timeout: An optional duration of time in seconds to allow for the RPC,
  200. after which an exception will be raised. If timeout is unspecified,
  201. defaults to a timeout controlled by the
  202. GRPC_PYTHON_DEFAULT_TIMEOUT_SECONDS environment variable. If that is
  203. unset, defaults to 60 seconds. Supply a value of None to indicate that
  204. no timeout should be enforced.
  205. metadata: Optional metadata to send to the server.
  206. Returns:
  207. The response to the RPC.
  208. """
  209. channel = ChannelCache.get().get_channel(target, options,
  210. channel_credentials, insecure,
  211. compression)
  212. multicallable = channel.unary_unary(method, request_serializer,
  213. response_deserializer)
  214. wait_for_ready = wait_for_ready if wait_for_ready is not None else True
  215. return multicallable(request,
  216. metadata=metadata,
  217. wait_for_ready=wait_for_ready,
  218. credentials=call_credentials,
  219. timeout=timeout)
  220. @experimental_api
  221. def unary_stream(
  222. request: RequestType,
  223. target: str,
  224. method: str,
  225. request_serializer: Optional[Callable[[Any], bytes]] = None,
  226. response_deserializer: Optional[Callable[[bytes], Any]] = None,
  227. options: Sequence[Tuple[AnyStr, AnyStr]] = (),
  228. channel_credentials: Optional[grpc.ChannelCredentials] = None,
  229. insecure: bool = False,
  230. call_credentials: Optional[grpc.CallCredentials] = None,
  231. compression: Optional[grpc.Compression] = None,
  232. wait_for_ready: Optional[bool] = None,
  233. timeout: Optional[float] = _DEFAULT_TIMEOUT,
  234. metadata: Optional[Sequence[Tuple[str, Union[str, bytes]]]] = None
  235. ) -> Iterator[ResponseType]:
  236. """Invokes a unary-stream RPC without an explicitly specified channel.
  237. THIS IS AN EXPERIMENTAL API.
  238. This is backed by a per-process cache of channels. Channels are evicted
  239. from the cache after a fixed period by a background. Channels will also be
  240. evicted if more than a configured maximum accumulate.
  241. The default eviction period is 10 minutes. One may set the environment
  242. variable "GRPC_PYTHON_MANAGED_CHANNEL_EVICTION_SECONDS" to configure this.
  243. The default maximum number of channels is 256. One may set the
  244. environment variable "GRPC_PYTHON_MANAGED_CHANNEL_MAXIMUM" to configure
  245. this.
  246. Args:
  247. request: An iterator that yields request values for the RPC.
  248. target: The server address.
  249. method: The name of the RPC method.
  250. request_serializer: Optional :term:`serializer` for serializing the request
  251. message. Request goes unserialized in case None is passed.
  252. response_deserializer: Optional :term:`deserializer` for deserializing the response
  253. message. Response goes undeserialized in case None is passed.
  254. options: An optional list of key-value pairs (:term:`channel_arguments` in gRPC Core
  255. runtime) to configure the channel.
  256. channel_credentials: A credential applied to the whole channel, e.g. the
  257. return value of grpc.ssl_channel_credentials().
  258. insecure: If True, specifies channel_credentials as
  259. :term:`grpc.insecure_channel_credentials()`. This option is mutually
  260. exclusive with the `channel_credentials` option.
  261. call_credentials: A call credential applied to each call individually,
  262. e.g. the output of grpc.metadata_call_credentials() or
  263. grpc.access_token_call_credentials().
  264. compression: An optional value indicating the compression method to be
  265. used over the lifetime of the channel, e.g. grpc.Compression.Gzip.
  266. wait_for_ready: An optional flag indicating whether the RPC should fail
  267. immediately if the connection is not ready at the time the RPC is
  268. invoked, or if it should wait until the connection to the server
  269. becomes ready. When using this option, the user will likely also want
  270. to set a timeout. Defaults to True.
  271. timeout: An optional duration of time in seconds to allow for the RPC,
  272. after which an exception will be raised. If timeout is unspecified,
  273. defaults to a timeout controlled by the
  274. GRPC_PYTHON_DEFAULT_TIMEOUT_SECONDS environment variable. If that is
  275. unset, defaults to 60 seconds. Supply a value of None to indicate that
  276. no timeout should be enforced.
  277. metadata: Optional metadata to send to the server.
  278. Returns:
  279. An iterator of responses.
  280. """
  281. channel = ChannelCache.get().get_channel(target, options,
  282. channel_credentials, insecure,
  283. compression)
  284. multicallable = channel.unary_stream(method, request_serializer,
  285. response_deserializer)
  286. wait_for_ready = wait_for_ready if wait_for_ready is not None else True
  287. return multicallable(request,
  288. metadata=metadata,
  289. wait_for_ready=wait_for_ready,
  290. credentials=call_credentials,
  291. timeout=timeout)
  292. @experimental_api
  293. def stream_unary(
  294. request_iterator: Iterator[RequestType],
  295. target: str,
  296. method: str,
  297. request_serializer: Optional[Callable[[Any], bytes]] = None,
  298. response_deserializer: Optional[Callable[[bytes], Any]] = None,
  299. options: Sequence[Tuple[AnyStr, AnyStr]] = (),
  300. channel_credentials: Optional[grpc.ChannelCredentials] = None,
  301. insecure: bool = False,
  302. call_credentials: Optional[grpc.CallCredentials] = None,
  303. compression: Optional[grpc.Compression] = None,
  304. wait_for_ready: Optional[bool] = None,
  305. timeout: Optional[float] = _DEFAULT_TIMEOUT,
  306. metadata: Optional[Sequence[Tuple[str, Union[str, bytes]]]] = None
  307. ) -> ResponseType:
  308. """Invokes a stream-unary RPC without an explicitly specified channel.
  309. THIS IS AN EXPERIMENTAL API.
  310. This is backed by a per-process cache of channels. Channels are evicted
  311. from the cache after a fixed period by a background. Channels will also be
  312. evicted if more than a configured maximum accumulate.
  313. The default eviction period is 10 minutes. One may set the environment
  314. variable "GRPC_PYTHON_MANAGED_CHANNEL_EVICTION_SECONDS" to configure this.
  315. The default maximum number of channels is 256. One may set the
  316. environment variable "GRPC_PYTHON_MANAGED_CHANNEL_MAXIMUM" to configure
  317. this.
  318. Args:
  319. request_iterator: An iterator that yields request values for the RPC.
  320. target: The server address.
  321. method: The name of the RPC method.
  322. request_serializer: Optional :term:`serializer` for serializing the request
  323. message. Request goes unserialized in case None is passed.
  324. response_deserializer: Optional :term:`deserializer` for deserializing the response
  325. message. Response goes undeserialized in case None is passed.
  326. options: An optional list of key-value pairs (:term:`channel_arguments` in gRPC Core
  327. runtime) to configure the channel.
  328. channel_credentials: A credential applied to the whole channel, e.g. the
  329. return value of grpc.ssl_channel_credentials().
  330. call_credentials: A call credential applied to each call individually,
  331. e.g. the output of grpc.metadata_call_credentials() or
  332. grpc.access_token_call_credentials().
  333. insecure: If True, specifies channel_credentials as
  334. :term:`grpc.insecure_channel_credentials()`. This option is mutually
  335. exclusive with the `channel_credentials` option.
  336. compression: An optional value indicating the compression method to be
  337. used over the lifetime of the channel, e.g. grpc.Compression.Gzip.
  338. wait_for_ready: An optional flag indicating whether the RPC should fail
  339. immediately if the connection is not ready at the time the RPC is
  340. invoked, or if it should wait until the connection to the server
  341. becomes ready. When using this option, the user will likely also want
  342. to set a timeout. Defaults to True.
  343. timeout: An optional duration of time in seconds to allow for the RPC,
  344. after which an exception will be raised. If timeout is unspecified,
  345. defaults to a timeout controlled by the
  346. GRPC_PYTHON_DEFAULT_TIMEOUT_SECONDS environment variable. If that is
  347. unset, defaults to 60 seconds. Supply a value of None to indicate that
  348. no timeout should be enforced.
  349. metadata: Optional metadata to send to the server.
  350. Returns:
  351. The response to the RPC.
  352. """
  353. channel = ChannelCache.get().get_channel(target, options,
  354. channel_credentials, insecure,
  355. compression)
  356. multicallable = channel.stream_unary(method, request_serializer,
  357. response_deserializer)
  358. wait_for_ready = wait_for_ready if wait_for_ready is not None else True
  359. return multicallable(request_iterator,
  360. metadata=metadata,
  361. wait_for_ready=wait_for_ready,
  362. credentials=call_credentials,
  363. timeout=timeout)
  364. @experimental_api
  365. def stream_stream(
  366. request_iterator: Iterator[RequestType],
  367. target: str,
  368. method: str,
  369. request_serializer: Optional[Callable[[Any], bytes]] = None,
  370. response_deserializer: Optional[Callable[[bytes], Any]] = None,
  371. options: Sequence[Tuple[AnyStr, AnyStr]] = (),
  372. channel_credentials: Optional[grpc.ChannelCredentials] = None,
  373. insecure: bool = False,
  374. call_credentials: Optional[grpc.CallCredentials] = None,
  375. compression: Optional[grpc.Compression] = None,
  376. wait_for_ready: Optional[bool] = None,
  377. timeout: Optional[float] = _DEFAULT_TIMEOUT,
  378. metadata: Optional[Sequence[Tuple[str, Union[str, bytes]]]] = None
  379. ) -> Iterator[ResponseType]:
  380. """Invokes a stream-stream RPC without an explicitly specified channel.
  381. THIS IS AN EXPERIMENTAL API.
  382. This is backed by a per-process cache of channels. Channels are evicted
  383. from the cache after a fixed period by a background. Channels will also be
  384. evicted if more than a configured maximum accumulate.
  385. The default eviction period is 10 minutes. One may set the environment
  386. variable "GRPC_PYTHON_MANAGED_CHANNEL_EVICTION_SECONDS" to configure this.
  387. The default maximum number of channels is 256. One may set the
  388. environment variable "GRPC_PYTHON_MANAGED_CHANNEL_MAXIMUM" to configure
  389. this.
  390. Args:
  391. request_iterator: An iterator that yields request values for the RPC.
  392. target: The server address.
  393. method: The name of the RPC method.
  394. request_serializer: Optional :term:`serializer` for serializing the request
  395. message. Request goes unserialized in case None is passed.
  396. response_deserializer: Optional :term:`deserializer` for deserializing the response
  397. message. Response goes undeserialized in case None is passed.
  398. options: An optional list of key-value pairs (:term:`channel_arguments` in gRPC Core
  399. runtime) to configure the channel.
  400. channel_credentials: A credential applied to the whole channel, e.g. the
  401. return value of grpc.ssl_channel_credentials().
  402. call_credentials: A call credential applied to each call individually,
  403. e.g. the output of grpc.metadata_call_credentials() or
  404. grpc.access_token_call_credentials().
  405. insecure: If True, specifies channel_credentials as
  406. :term:`grpc.insecure_channel_credentials()`. This option is mutually
  407. exclusive with the `channel_credentials` option.
  408. compression: An optional value indicating the compression method to be
  409. used over the lifetime of the channel, e.g. grpc.Compression.Gzip.
  410. wait_for_ready: An optional flag indicating whether the RPC should fail
  411. immediately if the connection is not ready at the time the RPC is
  412. invoked, or if it should wait until the connection to the server
  413. becomes ready. When using this option, the user will likely also want
  414. to set a timeout. Defaults to True.
  415. timeout: An optional duration of time in seconds to allow for the RPC,
  416. after which an exception will be raised. If timeout is unspecified,
  417. defaults to a timeout controlled by the
  418. GRPC_PYTHON_DEFAULT_TIMEOUT_SECONDS environment variable. If that is
  419. unset, defaults to 60 seconds. Supply a value of None to indicate that
  420. no timeout should be enforced.
  421. metadata: Optional metadata to send to the server.
  422. Returns:
  423. An iterator of responses.
  424. """
  425. channel = ChannelCache.get().get_channel(target, options,
  426. channel_credentials, insecure,
  427. compression)
  428. multicallable = channel.stream_stream(method, request_serializer,
  429. response_deserializer)
  430. wait_for_ready = wait_for_ready if wait_for_ready is not None else True
  431. return multicallable(request_iterator,
  432. metadata=metadata,
  433. wait_for_ready=wait_for_ready,
  434. credentials=call_credentials,
  435. timeout=timeout)