_channel.py 72 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767
  1. # Copyright 2016 gRPC authors.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. """Invocation-side implementation of gRPC Python."""
  15. import copy
  16. import functools
  17. import logging
  18. import os
  19. import sys
  20. import threading
  21. import time
  22. import types
  23. from typing import (Any, Callable, Iterator, List, Optional, Sequence, Set,
  24. Tuple, Union)
  25. import grpc # pytype: disable=pyi-error
  26. from grpc import _common # pytype: disable=pyi-error
  27. from grpc import _compression # pytype: disable=pyi-error
  28. from grpc import _grpcio_metadata # pytype: disable=pyi-error
  29. from grpc._cython import cygrpc
  30. from grpc._typing import ChannelArgumentType
  31. from grpc._typing import DeserializingFunction
  32. from grpc._typing import IntegratedCallFactory
  33. from grpc._typing import MetadataType
  34. from grpc._typing import NullaryCallbackType
  35. from grpc._typing import ResponseType
  36. from grpc._typing import SerializingFunction
  37. from grpc._typing import UserTag
  38. import grpc.experimental # pytype: disable=pyi-error
  39. _LOGGER = logging.getLogger(__name__)
  40. _USER_AGENT = 'grpc-python/{}'.format(_grpcio_metadata.__version__)
  41. _EMPTY_FLAGS = 0
  42. # NOTE(rbellevi): No guarantees are given about the maintenance of this
  43. # environment variable.
  44. _DEFAULT_SINGLE_THREADED_UNARY_STREAM = os.getenv(
  45. "GRPC_SINGLE_THREADED_UNARY_STREAM") is not None
  46. _UNARY_UNARY_INITIAL_DUE = (
  47. cygrpc.OperationType.send_initial_metadata,
  48. cygrpc.OperationType.send_message,
  49. cygrpc.OperationType.send_close_from_client,
  50. cygrpc.OperationType.receive_initial_metadata,
  51. cygrpc.OperationType.receive_message,
  52. cygrpc.OperationType.receive_status_on_client,
  53. )
  54. _UNARY_STREAM_INITIAL_DUE = (
  55. cygrpc.OperationType.send_initial_metadata,
  56. cygrpc.OperationType.send_message,
  57. cygrpc.OperationType.send_close_from_client,
  58. cygrpc.OperationType.receive_initial_metadata,
  59. cygrpc.OperationType.receive_status_on_client,
  60. )
  61. _STREAM_UNARY_INITIAL_DUE = (
  62. cygrpc.OperationType.send_initial_metadata,
  63. cygrpc.OperationType.receive_initial_metadata,
  64. cygrpc.OperationType.receive_message,
  65. cygrpc.OperationType.receive_status_on_client,
  66. )
  67. _STREAM_STREAM_INITIAL_DUE = (
  68. cygrpc.OperationType.send_initial_metadata,
  69. cygrpc.OperationType.receive_initial_metadata,
  70. cygrpc.OperationType.receive_status_on_client,
  71. )
  72. _CHANNEL_SUBSCRIPTION_CALLBACK_ERROR_LOG_MESSAGE = (
  73. 'Exception calling channel subscription callback!')
  74. _OK_RENDEZVOUS_REPR_FORMAT = ('<{} of RPC that terminated with:\n'
  75. '\tstatus = {}\n'
  76. '\tdetails = "{}"\n'
  77. '>')
  78. _NON_OK_RENDEZVOUS_REPR_FORMAT = ('<{} of RPC that terminated with:\n'
  79. '\tstatus = {}\n'
  80. '\tdetails = "{}"\n'
  81. '\tdebug_error_string = "{}"\n'
  82. '>')
  83. def _deadline(timeout: Optional[float]) -> Optional[float]:
  84. return None if timeout is None else time.time() + timeout
  85. def _unknown_code_details(unknown_cygrpc_code: Optional[grpc.StatusCode],
  86. details: Optional[str]) -> str:
  87. return 'Server sent unknown code {} and details "{}"'.format(
  88. unknown_cygrpc_code, details)
  89. class _RPCState(object):
  90. condition: threading.Condition
  91. due: Set[cygrpc.OperationType]
  92. initial_metadata: Optional[MetadataType]
  93. response: Any
  94. trailing_metadata: Optional[MetadataType]
  95. code: Optional[grpc.StatusCode]
  96. details: Optional[str]
  97. debug_error_string: Optional[str]
  98. cancelled: bool
  99. callbacks: List[NullaryCallbackType]
  100. fork_epoch: Optional[int]
  101. def __init__(self, due: Sequence[cygrpc.OperationType],
  102. initial_metadata: Optional[MetadataType],
  103. trailing_metadata: Optional[MetadataType],
  104. code: Optional[grpc.StatusCode], details: Optional[str]):
  105. # `condition` guards all members of _RPCState. `notify_all` is called on
  106. # `condition` when the state of the RPC has changed.
  107. self.condition = threading.Condition()
  108. # The cygrpc.OperationType objects representing events due from the RPC's
  109. # completion queue. If an operation is in `due`, it is guaranteed that
  110. # `operate()` has been called on a corresponding operation. But the
  111. # converse is not true. That is, in the case of failed `operate()`
  112. # calls, there may briefly be events in `due` that do not correspond to
  113. # operations submitted to Core.
  114. self.due = set(due)
  115. self.initial_metadata = initial_metadata
  116. self.response = None
  117. self.trailing_metadata = trailing_metadata
  118. self.code = code
  119. self.details = details
  120. self.debug_error_string = None
  121. # The semantics of grpc.Future.cancel and grpc.Future.cancelled are
  122. # slightly wonky, so they have to be tracked separately from the rest of the
  123. # result of the RPC. This field tracks whether cancellation was requested
  124. # prior to termination of the RPC.
  125. self.cancelled = False
  126. self.callbacks = []
  127. self.fork_epoch = cygrpc.get_fork_epoch()
  128. def reset_postfork_child(self):
  129. self.condition = threading.Condition()
  130. def _abort(state: _RPCState, code: grpc.StatusCode, details: str) -> None:
  131. if state.code is None:
  132. state.code = code
  133. state.details = details
  134. if state.initial_metadata is None:
  135. state.initial_metadata = ()
  136. state.trailing_metadata = ()
  137. def _handle_event(
  138. event: cygrpc.BaseEvent, state: _RPCState,
  139. response_deserializer: Optional[DeserializingFunction]
  140. ) -> List[NullaryCallbackType]:
  141. callbacks = []
  142. for batch_operation in event.batch_operations:
  143. operation_type = batch_operation.type()
  144. state.due.remove(operation_type)
  145. if operation_type == cygrpc.OperationType.receive_initial_metadata:
  146. state.initial_metadata = batch_operation.initial_metadata()
  147. elif operation_type == cygrpc.OperationType.receive_message:
  148. serialized_response = batch_operation.message()
  149. if serialized_response is not None:
  150. response = _common.deserialize(serialized_response,
  151. response_deserializer)
  152. if response is None:
  153. details = 'Exception deserializing response!'
  154. _abort(state, grpc.StatusCode.INTERNAL, details)
  155. else:
  156. state.response = response
  157. elif operation_type == cygrpc.OperationType.receive_status_on_client:
  158. state.trailing_metadata = batch_operation.trailing_metadata()
  159. if state.code is None:
  160. code = _common.CYGRPC_STATUS_CODE_TO_STATUS_CODE.get(
  161. batch_operation.code())
  162. if code is None:
  163. state.code = grpc.StatusCode.UNKNOWN
  164. state.details = _unknown_code_details(
  165. code, batch_operation.details())
  166. else:
  167. state.code = code
  168. state.details = batch_operation.details()
  169. state.debug_error_string = batch_operation.error_string()
  170. callbacks.extend(state.callbacks)
  171. state.callbacks = None
  172. return callbacks
  173. def _event_handler(
  174. state: _RPCState,
  175. response_deserializer: Optional[DeserializingFunction]) -> UserTag:
  176. def handle_event(event):
  177. with state.condition:
  178. callbacks = _handle_event(event, state, response_deserializer)
  179. state.condition.notify_all()
  180. done = not state.due
  181. for callback in callbacks:
  182. try:
  183. callback()
  184. except Exception as e: # pylint: disable=broad-except
  185. # NOTE(rbellevi): We suppress but log errors here so as not to
  186. # kill the channel spin thread.
  187. logging.error('Exception in callback %s: %s',
  188. repr(callback.func), repr(e))
  189. return done and state.fork_epoch >= cygrpc.get_fork_epoch()
  190. return handle_event
  191. # TODO(xuanwn): Create a base class for IntegratedCall and SegregatedCall.
  192. #pylint: disable=too-many-statements
  193. def _consume_request_iterator(request_iterator: Iterator, state: _RPCState,
  194. call: Union[cygrpc.IntegratedCall,
  195. cygrpc.SegregatedCall],
  196. request_serializer: SerializingFunction,
  197. event_handler: Optional[UserTag]) -> None:
  198. """Consume a request supplied by the user."""
  199. def consume_request_iterator(): # pylint: disable=too-many-branches
  200. # Iterate over the request iterator until it is exhausted or an error
  201. # condition is encountered.
  202. while True:
  203. return_from_user_request_generator_invoked = False
  204. try:
  205. # The thread may die in user-code. Do not block fork for this.
  206. cygrpc.enter_user_request_generator()
  207. request = next(request_iterator)
  208. except StopIteration:
  209. break
  210. except Exception: # pylint: disable=broad-except
  211. cygrpc.return_from_user_request_generator()
  212. return_from_user_request_generator_invoked = True
  213. code = grpc.StatusCode.UNKNOWN
  214. details = 'Exception iterating requests!'
  215. _LOGGER.exception(details)
  216. call.cancel(_common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[code],
  217. details)
  218. _abort(state, code, details)
  219. return
  220. finally:
  221. if not return_from_user_request_generator_invoked:
  222. cygrpc.return_from_user_request_generator()
  223. serialized_request = _common.serialize(request, request_serializer)
  224. with state.condition:
  225. if state.code is None and not state.cancelled:
  226. if serialized_request is None:
  227. code = grpc.StatusCode.INTERNAL
  228. details = 'Exception serializing request!'
  229. call.cancel(
  230. _common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[code],
  231. details)
  232. _abort(state, code, details)
  233. return
  234. else:
  235. state.due.add(cygrpc.OperationType.send_message)
  236. operations = (cygrpc.SendMessageOperation(
  237. serialized_request, _EMPTY_FLAGS),)
  238. operating = call.operate(operations, event_handler)
  239. if not operating:
  240. state.due.remove(cygrpc.OperationType.send_message)
  241. return
  242. def _done():
  243. return (state.code is not None or
  244. cygrpc.OperationType.send_message
  245. not in state.due)
  246. _common.wait(state.condition.wait,
  247. _done,
  248. spin_cb=functools.partial(
  249. cygrpc.block_if_fork_in_progress,
  250. state))
  251. if state.code is not None:
  252. return
  253. else:
  254. return
  255. with state.condition:
  256. if state.code is None:
  257. state.due.add(cygrpc.OperationType.send_close_from_client)
  258. operations = (
  259. cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS),)
  260. operating = call.operate(operations, event_handler)
  261. if not operating:
  262. state.due.remove(
  263. cygrpc.OperationType.send_close_from_client)
  264. consumption_thread = cygrpc.ForkManagedThread(
  265. target=consume_request_iterator)
  266. consumption_thread.setDaemon(True)
  267. consumption_thread.start()
  268. def _rpc_state_string(class_name: str, rpc_state: _RPCState) -> str:
  269. """Calculates error string for RPC."""
  270. with rpc_state.condition:
  271. if rpc_state.code is None:
  272. return '<{} object>'.format(class_name)
  273. elif rpc_state.code is grpc.StatusCode.OK:
  274. return _OK_RENDEZVOUS_REPR_FORMAT.format(class_name, rpc_state.code,
  275. rpc_state.details)
  276. else:
  277. return _NON_OK_RENDEZVOUS_REPR_FORMAT.format(
  278. class_name, rpc_state.code, rpc_state.details,
  279. rpc_state.debug_error_string)
  280. class _InactiveRpcError(grpc.RpcError, grpc.Call, grpc.Future):
  281. """An RPC error not tied to the execution of a particular RPC.
  282. The RPC represented by the state object must not be in-progress or
  283. cancelled.
  284. Attributes:
  285. _state: An instance of _RPCState.
  286. """
  287. _state: _RPCState
  288. def __init__(self, state: _RPCState):
  289. with state.condition:
  290. self._state = _RPCState((), copy.deepcopy(state.initial_metadata),
  291. copy.deepcopy(state.trailing_metadata),
  292. state.code, copy.deepcopy(state.details))
  293. self._state.response = copy.copy(state.response)
  294. self._state.debug_error_string = copy.copy(state.debug_error_string)
  295. def initial_metadata(self) -> Optional[MetadataType]:
  296. return self._state.initial_metadata
  297. def trailing_metadata(self) -> Optional[MetadataType]:
  298. return self._state.trailing_metadata
  299. def code(self) -> Optional[grpc.StatusCode]:
  300. return self._state.code
  301. def details(self) -> Optional[str]:
  302. return _common.decode(self._state.details)
  303. def debug_error_string(self) -> Optional[str]:
  304. return _common.decode(self._state.debug_error_string)
  305. def _repr(self) -> str:
  306. return _rpc_state_string(self.__class__.__name__, self._state)
  307. def __repr__(self) -> str:
  308. return self._repr()
  309. def __str__(self) -> str:
  310. return self._repr()
  311. def cancel(self) -> bool:
  312. """See grpc.Future.cancel."""
  313. return False
  314. def cancelled(self) -> bool:
  315. """See grpc.Future.cancelled."""
  316. return False
  317. def running(self) -> bool:
  318. """See grpc.Future.running."""
  319. return False
  320. def done(self) -> bool:
  321. """See grpc.Future.done."""
  322. return True
  323. def result(self, timeout: Optional[float] = None) -> Any: # pylint: disable=unused-argument
  324. """See grpc.Future.result."""
  325. raise self
  326. def exception(self, timeout: Optional[float] = None) -> Optional[Exception]: # pylint: disable=unused-argument
  327. """See grpc.Future.exception."""
  328. return self
  329. def traceback(
  330. self,
  331. timeout: Optional[float] = None # pylint: disable=unused-argument
  332. ) -> Optional[types.TracebackType]:
  333. """See grpc.Future.traceback."""
  334. try:
  335. raise self
  336. except grpc.RpcError:
  337. return sys.exc_info()[2]
  338. def add_done_callback(
  339. self,
  340. fn: Callable[[grpc.Future], None],
  341. timeout: Optional[float] = None) -> None: # pylint: disable=unused-argument
  342. """See grpc.Future.add_done_callback."""
  343. fn(self)
  344. class _Rendezvous(grpc.RpcError, grpc.RpcContext):
  345. """An RPC iterator.
  346. Attributes:
  347. _state: An instance of _RPCState.
  348. _call: An instance of SegregatedCall or IntegratedCall.
  349. In either case, the _call object is expected to have operate, cancel,
  350. and next_event methods.
  351. _response_deserializer: A callable taking bytes and return a Python
  352. object.
  353. _deadline: A float representing the deadline of the RPC in seconds. Or
  354. possibly None, to represent an RPC with no deadline at all.
  355. """
  356. _state: _RPCState
  357. _call: Union[cygrpc.SegregatedCall, cygrpc.IntegratedCall]
  358. _response_deserializer: Optional[DeserializingFunction]
  359. _deadline: Optional[float]
  360. def __init__(self, state: _RPCState, call: Union[cygrpc.SegregatedCall,
  361. cygrpc.IntegratedCall],
  362. response_deserializer: Optional[DeserializingFunction],
  363. deadline: Optional[float]):
  364. super(_Rendezvous, self).__init__()
  365. self._state = state
  366. self._call = call
  367. self._response_deserializer = response_deserializer
  368. self._deadline = deadline
  369. def is_active(self) -> bool:
  370. """See grpc.RpcContext.is_active"""
  371. with self._state.condition:
  372. return self._state.code is None
  373. def time_remaining(self) -> Optional[float]:
  374. """See grpc.RpcContext.time_remaining"""
  375. with self._state.condition:
  376. if self._deadline is None:
  377. return None
  378. else:
  379. return max(self._deadline - time.time(), 0)
  380. def cancel(self) -> bool:
  381. """See grpc.RpcContext.cancel"""
  382. with self._state.condition:
  383. if self._state.code is None:
  384. code = grpc.StatusCode.CANCELLED
  385. details = 'Locally cancelled by application!'
  386. self._call.cancel(
  387. _common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[code], details)
  388. self._state.cancelled = True
  389. _abort(self._state, code, details)
  390. self._state.condition.notify_all()
  391. return True
  392. else:
  393. return False
  394. def add_callback(self, callback: NullaryCallbackType) -> bool:
  395. """See grpc.RpcContext.add_callback"""
  396. with self._state.condition:
  397. if self._state.callbacks is None:
  398. return False
  399. else:
  400. self._state.callbacks.append(callback)
  401. return True
  402. def __iter__(self):
  403. return self
  404. def next(self):
  405. return self._next()
  406. def __next__(self):
  407. return self._next()
  408. def _next(self):
  409. raise NotImplementedError()
  410. def debug_error_string(self) -> Optional[str]:
  411. raise NotImplementedError()
  412. def _repr(self) -> str:
  413. return _rpc_state_string(self.__class__.__name__, self._state)
  414. def __repr__(self) -> str:
  415. return self._repr()
  416. def __str__(self) -> str:
  417. return self._repr()
  418. def __del__(self) -> None:
  419. with self._state.condition:
  420. if self._state.code is None:
  421. self._state.code = grpc.StatusCode.CANCELLED
  422. self._state.details = 'Cancelled upon garbage collection!'
  423. self._state.cancelled = True
  424. self._call.cancel(
  425. _common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[self._state.code],
  426. self._state.details)
  427. self._state.condition.notify_all()
  428. class _SingleThreadedRendezvous(_Rendezvous, grpc.Call, grpc.Future): # pylint: disable=too-many-ancestors
  429. """An RPC iterator operating entirely on a single thread.
  430. The __next__ method of _SingleThreadedRendezvous does not depend on the
  431. existence of any other thread, including the "channel spin thread".
  432. However, this means that its interface is entirely synchronous. So this
  433. class cannot completely fulfill the grpc.Future interface. The result,
  434. exception, and traceback methods will never block and will instead raise
  435. an exception if calling the method would result in blocking.
  436. This means that these methods are safe to call from add_done_callback
  437. handlers.
  438. """
  439. _state: _RPCState
  440. def _is_complete(self) -> bool:
  441. return self._state.code is not None
  442. def cancelled(self) -> bool:
  443. with self._state.condition:
  444. return self._state.cancelled
  445. def running(self) -> bool:
  446. with self._state.condition:
  447. return self._state.code is None
  448. def done(self) -> bool:
  449. with self._state.condition:
  450. return self._state.code is not None
  451. def result(self, timeout: Optional[float] = None) -> Any:
  452. """Returns the result of the computation or raises its exception.
  453. This method will never block. Instead, it will raise an exception
  454. if calling this method would otherwise result in blocking.
  455. Since this method will never block, any `timeout` argument passed will
  456. be ignored.
  457. """
  458. del timeout
  459. with self._state.condition:
  460. if not self._is_complete():
  461. raise grpc.experimental.UsageError(
  462. "_SingleThreadedRendezvous only supports result() when the RPC is complete."
  463. )
  464. if self._state.code is grpc.StatusCode.OK:
  465. return self._state.response
  466. elif self._state.cancelled:
  467. raise grpc.FutureCancelledError()
  468. else:
  469. raise self
  470. def exception(self, timeout: Optional[float] = None) -> Optional[Exception]:
  471. """Return the exception raised by the computation.
  472. This method will never block. Instead, it will raise an exception
  473. if calling this method would otherwise result in blocking.
  474. Since this method will never block, any `timeout` argument passed will
  475. be ignored.
  476. """
  477. del timeout
  478. with self._state.condition:
  479. if not self._is_complete():
  480. raise grpc.experimental.UsageError(
  481. "_SingleThreadedRendezvous only supports exception() when the RPC is complete."
  482. )
  483. if self._state.code is grpc.StatusCode.OK:
  484. return None
  485. elif self._state.cancelled:
  486. raise grpc.FutureCancelledError()
  487. else:
  488. return self
  489. def traceback(
  490. self,
  491. timeout: Optional[float] = None) -> Optional[types.TracebackType]:
  492. """Access the traceback of the exception raised by the computation.
  493. This method will never block. Instead, it will raise an exception
  494. if calling this method would otherwise result in blocking.
  495. Since this method will never block, any `timeout` argument passed will
  496. be ignored.
  497. """
  498. del timeout
  499. with self._state.condition:
  500. if not self._is_complete():
  501. raise grpc.experimental.UsageError(
  502. "_SingleThreadedRendezvous only supports traceback() when the RPC is complete."
  503. )
  504. if self._state.code is grpc.StatusCode.OK:
  505. return None
  506. elif self._state.cancelled:
  507. raise grpc.FutureCancelledError()
  508. else:
  509. try:
  510. raise self
  511. except grpc.RpcError:
  512. return sys.exc_info()[2]
  513. def add_done_callback(self, fn: Callable[[grpc.Future], None]) -> None:
  514. with self._state.condition:
  515. if self._state.code is None:
  516. self._state.callbacks.append(functools.partial(fn, self))
  517. return
  518. fn(self)
  519. def initial_metadata(self) -> Optional[MetadataType]:
  520. """See grpc.Call.initial_metadata"""
  521. with self._state.condition:
  522. # NOTE(gnossen): Based on our initial call batch, we are guaranteed
  523. # to receive initial metadata before any messages.
  524. while self._state.initial_metadata is None:
  525. self._consume_next_event()
  526. return self._state.initial_metadata
  527. def trailing_metadata(self) -> Optional[MetadataType]:
  528. """See grpc.Call.trailing_metadata"""
  529. with self._state.condition:
  530. if self._state.trailing_metadata is None:
  531. raise grpc.experimental.UsageError(
  532. "Cannot get trailing metadata until RPC is completed.")
  533. return self._state.trailing_metadata
  534. def code(self) -> Optional[grpc.StatusCode]:
  535. """See grpc.Call.code"""
  536. with self._state.condition:
  537. if self._state.code is None:
  538. raise grpc.experimental.UsageError(
  539. "Cannot get code until RPC is completed.")
  540. return self._state.code
  541. def details(self) -> Optional[str]:
  542. """See grpc.Call.details"""
  543. with self._state.condition:
  544. if self._state.details is None:
  545. raise grpc.experimental.UsageError(
  546. "Cannot get details until RPC is completed.")
  547. return _common.decode(self._state.details)
  548. def _consume_next_event(self) -> Optional[cygrpc.BaseEvent]:
  549. event = self._call.next_event()
  550. with self._state.condition:
  551. callbacks = _handle_event(event, self._state,
  552. self._response_deserializer)
  553. for callback in callbacks:
  554. # NOTE(gnossen): We intentionally allow exceptions to bubble up
  555. # to the user when running on a single thread.
  556. callback()
  557. return event
  558. def _next_response(self) -> Any:
  559. while True:
  560. self._consume_next_event()
  561. with self._state.condition:
  562. if self._state.response is not None:
  563. response = self._state.response
  564. self._state.response = None
  565. return response
  566. elif cygrpc.OperationType.receive_message not in self._state.due:
  567. if self._state.code is grpc.StatusCode.OK:
  568. raise StopIteration()
  569. elif self._state.code is not None:
  570. raise self
  571. def _next(self) -> Any:
  572. with self._state.condition:
  573. if self._state.code is None:
  574. # We tentatively add the operation as expected and remove
  575. # it if the enqueue operation fails. This allows us to guarantee that
  576. # if an event has been submitted to the core completion queue,
  577. # it is in `due`. If we waited until after a successful
  578. # enqueue operation then a signal could interrupt this
  579. # thread between the enqueue operation and the addition of the
  580. # operation to `due`. This would cause an exception on the
  581. # channel spin thread when the operation completes and no
  582. # corresponding operation would be present in state.due.
  583. # Note that, since `condition` is held through this block, there is
  584. # no data race on `due`.
  585. self._state.due.add(cygrpc.OperationType.receive_message)
  586. operating = self._call.operate(
  587. (cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),), None)
  588. if not operating:
  589. self._state.due.remove(cygrpc.OperationType.receive_message)
  590. elif self._state.code is grpc.StatusCode.OK:
  591. raise StopIteration()
  592. else:
  593. raise self
  594. return self._next_response()
  595. def debug_error_string(self) -> Optional[str]:
  596. with self._state.condition:
  597. if self._state.debug_error_string is None:
  598. raise grpc.experimental.UsageError(
  599. "Cannot get debug error string until RPC is completed.")
  600. return _common.decode(self._state.debug_error_string)
  601. class _MultiThreadedRendezvous(_Rendezvous, grpc.Call, grpc.Future): # pylint: disable=too-many-ancestors
  602. """An RPC iterator that depends on a channel spin thread.
  603. This iterator relies upon a per-channel thread running in the background,
  604. dequeueing events from the completion queue, and notifying threads waiting
  605. on the threading.Condition object in the _RPCState object.
  606. This extra thread allows _MultiThreadedRendezvous to fulfill the grpc.Future interface
  607. and to mediate a bidirection streaming RPC.
  608. """
  609. _state: _RPCState
  610. def initial_metadata(self) -> Optional[MetadataType]:
  611. """See grpc.Call.initial_metadata"""
  612. with self._state.condition:
  613. def _done():
  614. return self._state.initial_metadata is not None
  615. _common.wait(self._state.condition.wait, _done)
  616. return self._state.initial_metadata
  617. def trailing_metadata(self) -> Optional[MetadataType]:
  618. """See grpc.Call.trailing_metadata"""
  619. with self._state.condition:
  620. def _done():
  621. return self._state.trailing_metadata is not None
  622. _common.wait(self._state.condition.wait, _done)
  623. return self._state.trailing_metadata
  624. def code(self) -> Optional[grpc.StatusCode]:
  625. """See grpc.Call.code"""
  626. with self._state.condition:
  627. def _done():
  628. return self._state.code is not None
  629. _common.wait(self._state.condition.wait, _done)
  630. return self._state.code
  631. def details(self) -> Optional[str]:
  632. """See grpc.Call.details"""
  633. with self._state.condition:
  634. def _done():
  635. return self._state.details is not None
  636. _common.wait(self._state.condition.wait, _done)
  637. return _common.decode(self._state.details)
  638. def debug_error_string(self) -> Optional[str]:
  639. with self._state.condition:
  640. def _done():
  641. return self._state.debug_error_string is not None
  642. _common.wait(self._state.condition.wait, _done)
  643. return _common.decode(self._state.debug_error_string)
  644. def cancelled(self) -> bool:
  645. with self._state.condition:
  646. return self._state.cancelled
  647. def running(self) -> bool:
  648. with self._state.condition:
  649. return self._state.code is None
  650. def done(self) -> bool:
  651. with self._state.condition:
  652. return self._state.code is not None
  653. def _is_complete(self) -> bool:
  654. return self._state.code is not None
  655. def result(self, timeout: Optional[float] = None) -> Any:
  656. """Returns the result of the computation or raises its exception.
  657. See grpc.Future.result for the full API contract.
  658. """
  659. with self._state.condition:
  660. timed_out = _common.wait(self._state.condition.wait,
  661. self._is_complete,
  662. timeout=timeout)
  663. if timed_out:
  664. raise grpc.FutureTimeoutError()
  665. else:
  666. if self._state.code is grpc.StatusCode.OK:
  667. return self._state.response
  668. elif self._state.cancelled:
  669. raise grpc.FutureCancelledError()
  670. else:
  671. raise self
  672. def exception(self, timeout: Optional[float] = None) -> Optional[Exception]:
  673. """Return the exception raised by the computation.
  674. See grpc.Future.exception for the full API contract.
  675. """
  676. with self._state.condition:
  677. timed_out = _common.wait(self._state.condition.wait,
  678. self._is_complete,
  679. timeout=timeout)
  680. if timed_out:
  681. raise grpc.FutureTimeoutError()
  682. else:
  683. if self._state.code is grpc.StatusCode.OK:
  684. return None
  685. elif self._state.cancelled:
  686. raise grpc.FutureCancelledError()
  687. else:
  688. return self
  689. def traceback(
  690. self,
  691. timeout: Optional[float] = None) -> Optional[types.TracebackType]:
  692. """Access the traceback of the exception raised by the computation.
  693. See grpc.future.traceback for the full API contract.
  694. """
  695. with self._state.condition:
  696. timed_out = _common.wait(self._state.condition.wait,
  697. self._is_complete,
  698. timeout=timeout)
  699. if timed_out:
  700. raise grpc.FutureTimeoutError()
  701. else:
  702. if self._state.code is grpc.StatusCode.OK:
  703. return None
  704. elif self._state.cancelled:
  705. raise grpc.FutureCancelledError()
  706. else:
  707. try:
  708. raise self
  709. except grpc.RpcError:
  710. return sys.exc_info()[2]
  711. def add_done_callback(self, fn: Callable[[grpc.Future], None]) -> None:
  712. with self._state.condition:
  713. if self._state.code is None:
  714. self._state.callbacks.append(functools.partial(fn, self))
  715. return
  716. fn(self)
  717. def _next(self) -> Any:
  718. with self._state.condition:
  719. if self._state.code is None:
  720. event_handler = _event_handler(self._state,
  721. self._response_deserializer)
  722. self._state.due.add(cygrpc.OperationType.receive_message)
  723. operating = self._call.operate(
  724. (cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),),
  725. event_handler)
  726. if not operating:
  727. self._state.due.remove(cygrpc.OperationType.receive_message)
  728. elif self._state.code is grpc.StatusCode.OK:
  729. raise StopIteration()
  730. else:
  731. raise self
  732. def _response_ready():
  733. return (self._state.response is not None or
  734. (cygrpc.OperationType.receive_message
  735. not in self._state.due and
  736. self._state.code is not None))
  737. _common.wait(self._state.condition.wait, _response_ready)
  738. if self._state.response is not None:
  739. response = self._state.response
  740. self._state.response = None
  741. return response
  742. elif cygrpc.OperationType.receive_message not in self._state.due:
  743. if self._state.code is grpc.StatusCode.OK:
  744. raise StopIteration()
  745. elif self._state.code is not None:
  746. raise self
  747. def _start_unary_request(
  748. request: Any, timeout: Optional[float],
  749. request_serializer: SerializingFunction
  750. ) -> Tuple[Optional[float], Optional[bytes], Optional[grpc.RpcError]]:
  751. deadline = _deadline(timeout)
  752. serialized_request = _common.serialize(request, request_serializer)
  753. if serialized_request is None:
  754. state = _RPCState((), (), (), grpc.StatusCode.INTERNAL,
  755. 'Exception serializing request!')
  756. error = _InactiveRpcError(state)
  757. return deadline, None, error
  758. else:
  759. return deadline, serialized_request, None
  760. def _end_unary_response_blocking(
  761. state: _RPCState, call: cygrpc.SegregatedCall, with_call: bool,
  762. deadline: Optional[float]
  763. ) -> Union[ResponseType, Tuple[ResponseType, grpc.Call]]:
  764. if state.code is grpc.StatusCode.OK:
  765. if with_call:
  766. rendezvous = _MultiThreadedRendezvous(state, call, None, deadline)
  767. return state.response, rendezvous
  768. else:
  769. return state.response
  770. else:
  771. raise _InactiveRpcError(state) # pytype: disable=not-instantiable
  772. def _stream_unary_invocation_operations(
  773. metadata: Optional[MetadataType],
  774. initial_metadata_flags: int) -> Sequence[Sequence[cygrpc.Operation]]:
  775. return (
  776. (
  777. cygrpc.SendInitialMetadataOperation(metadata,
  778. initial_metadata_flags),
  779. cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),
  780. cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),
  781. ),
  782. (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),),
  783. )
  784. def _stream_unary_invocation_operations_and_tags(
  785. metadata: Optional[MetadataType], initial_metadata_flags: int
  786. ) -> Sequence[Tuple[Sequence[cygrpc.Operation], Optional[UserTag]]]:
  787. return tuple((
  788. operations,
  789. None,
  790. ) for operations in _stream_unary_invocation_operations(
  791. metadata, initial_metadata_flags))
  792. def _determine_deadline(user_deadline: Optional[float]) -> Optional[float]:
  793. parent_deadline = cygrpc.get_deadline_from_context()
  794. if parent_deadline is None and user_deadline is None:
  795. return None
  796. elif parent_deadline is not None and user_deadline is None:
  797. return parent_deadline
  798. elif user_deadline is not None and parent_deadline is None:
  799. return user_deadline
  800. else:
  801. return min(parent_deadline, user_deadline)
  802. class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable):
  803. _channel: cygrpc.Channel
  804. _managed_call: IntegratedCallFactory
  805. _method: bytes
  806. _request_serializer: Optional[SerializingFunction]
  807. _response_deserializer: Optional[DeserializingFunction]
  808. _context: Any
  809. # pylint: disable=too-many-arguments
  810. def __init__(self, channel: cygrpc.Channel,
  811. managed_call: IntegratedCallFactory, method: bytes,
  812. request_serializer: Optional[SerializingFunction],
  813. response_deserializer: Optional[DeserializingFunction]):
  814. self._channel = channel
  815. self._managed_call = managed_call
  816. self._method = method
  817. self._request_serializer = request_serializer
  818. self._response_deserializer = response_deserializer
  819. self._context = cygrpc.build_census_context()
  820. def _prepare(
  821. self, request: Any, timeout: Optional[float],
  822. metadata: Optional[MetadataType], wait_for_ready: Optional[bool],
  823. compression: Optional[grpc.Compression]
  824. ) -> Tuple[Optional[_RPCState], Optional[Sequence[cygrpc.Operation]],
  825. Optional[float], Optional[grpc.RpcError]]:
  826. deadline, serialized_request, rendezvous = _start_unary_request(
  827. request, timeout, self._request_serializer)
  828. initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready(
  829. wait_for_ready)
  830. augmented_metadata = _compression.augment_metadata(
  831. metadata, compression)
  832. if serialized_request is None:
  833. return None, None, None, rendezvous
  834. else:
  835. state = _RPCState(_UNARY_UNARY_INITIAL_DUE, None, None, None, None)
  836. operations = (
  837. cygrpc.SendInitialMetadataOperation(augmented_metadata,
  838. initial_metadata_flags),
  839. cygrpc.SendMessageOperation(serialized_request, _EMPTY_FLAGS),
  840. cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS),
  841. cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),
  842. cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),
  843. cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),
  844. )
  845. return state, operations, deadline, None
  846. def _blocking(
  847. self,
  848. request: Any,
  849. timeout: Optional[float] = None,
  850. metadata: Optional[MetadataType] = None,
  851. credentials: Optional[grpc.CallCredentials] = None,
  852. wait_for_ready: Optional[bool] = None,
  853. compression: Optional[grpc.Compression] = None
  854. ) -> Tuple[_RPCState, cygrpc.SegregatedCall]:
  855. state, operations, deadline, rendezvous = self._prepare(
  856. request, timeout, metadata, wait_for_ready, compression)
  857. if state is None:
  858. raise rendezvous # pylint: disable-msg=raising-bad-type
  859. else:
  860. call = self._channel.segregated_call(
  861. cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS,
  862. self._method, None, _determine_deadline(deadline), metadata,
  863. None if credentials is None else credentials._credentials, ((
  864. operations,
  865. None,
  866. ),), self._context)
  867. event = call.next_event()
  868. _handle_event(event, state, self._response_deserializer)
  869. return state, call
  870. def __call__(self,
  871. request: Any,
  872. timeout: Optional[float] = None,
  873. metadata: Optional[MetadataType] = None,
  874. credentials: Optional[grpc.CallCredentials] = None,
  875. wait_for_ready: Optional[bool] = None,
  876. compression: Optional[grpc.Compression] = None) -> Any:
  877. state, call, = self._blocking(request, timeout, metadata, credentials,
  878. wait_for_ready, compression)
  879. return _end_unary_response_blocking(state, call, False, None)
  880. def with_call(
  881. self,
  882. request: Any,
  883. timeout: Optional[float] = None,
  884. metadata: Optional[MetadataType] = None,
  885. credentials: Optional[grpc.CallCredentials] = None,
  886. wait_for_ready: Optional[bool] = None,
  887. compression: Optional[grpc.Compression] = None
  888. ) -> Tuple[Any, grpc.Call]:
  889. state, call, = self._blocking(request, timeout, metadata, credentials,
  890. wait_for_ready, compression)
  891. return _end_unary_response_blocking(state, call, True, None)
  892. def future(
  893. self,
  894. request: Any,
  895. timeout: Optional[float] = None,
  896. metadata: Optional[MetadataType] = None,
  897. credentials: Optional[grpc.CallCredentials] = None,
  898. wait_for_ready: Optional[bool] = None,
  899. compression: Optional[grpc.Compression] = None
  900. ) -> _MultiThreadedRendezvous:
  901. state, operations, deadline, rendezvous = self._prepare(
  902. request, timeout, metadata, wait_for_ready, compression)
  903. if state is None:
  904. raise rendezvous # pylint: disable-msg=raising-bad-type
  905. else:
  906. event_handler = _event_handler(state, self._response_deserializer)
  907. call = self._managed_call(
  908. cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS,
  909. self._method, None, deadline, metadata,
  910. None if credentials is None else credentials._credentials,
  911. (operations,), event_handler, self._context)
  912. return _MultiThreadedRendezvous(state, call,
  913. self._response_deserializer,
  914. deadline)
  915. class _SingleThreadedUnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable):
  916. _channel: cygrpc.Channel
  917. _method: bytes
  918. _request_serializer: Optional[SerializingFunction]
  919. _response_deserializer: Optional[DeserializingFunction]
  920. _context: Any
  921. # pylint: disable=too-many-arguments
  922. def __init__(self, channel: cygrpc.Channel, method: bytes,
  923. request_serializer: SerializingFunction,
  924. response_deserializer: DeserializingFunction):
  925. self._channel = channel
  926. self._method = method
  927. self._request_serializer = request_serializer
  928. self._response_deserializer = response_deserializer
  929. self._context = cygrpc.build_census_context()
  930. def __call__( # pylint: disable=too-many-locals
  931. self,
  932. request: Any,
  933. timeout: Optional[float] = None,
  934. metadata: Optional[MetadataType] = None,
  935. credentials: Optional[grpc.CallCredentials] = None,
  936. wait_for_ready: Optional[bool] = None,
  937. compression: Optional[grpc.Compression] = None
  938. ) -> _SingleThreadedRendezvous:
  939. deadline = _deadline(timeout)
  940. serialized_request = _common.serialize(request,
  941. self._request_serializer)
  942. if serialized_request is None:
  943. state = _RPCState((), (), (), grpc.StatusCode.INTERNAL,
  944. 'Exception serializing request!')
  945. raise _InactiveRpcError(state)
  946. state = _RPCState(_UNARY_STREAM_INITIAL_DUE, None, None, None, None)
  947. call_credentials = None if credentials is None else credentials._credentials
  948. initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready(
  949. wait_for_ready)
  950. augmented_metadata = _compression.augment_metadata(
  951. metadata, compression)
  952. operations = (
  953. (cygrpc.SendInitialMetadataOperation(augmented_metadata,
  954. initial_metadata_flags),
  955. cygrpc.SendMessageOperation(serialized_request, _EMPTY_FLAGS),
  956. cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS)),
  957. (cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),),
  958. (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),),
  959. )
  960. operations_and_tags = tuple((ops, None) for ops in operations)
  961. call = self._channel.segregated_call(
  962. cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS, self._method,
  963. None, _determine_deadline(deadline), metadata, call_credentials,
  964. operations_and_tags, self._context)
  965. return _SingleThreadedRendezvous(state, call,
  966. self._response_deserializer, deadline)
  967. class _UnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable):
  968. _channel: cygrpc.Channel
  969. _managed_call: IntegratedCallFactory
  970. _method: bytes
  971. _request_serializer: Optional[SerializingFunction]
  972. _response_deserializer: Optional[DeserializingFunction]
  973. _context: Any
  974. # pylint: disable=too-many-arguments
  975. def __init__(self, channel: cygrpc.Channel,
  976. managed_call: IntegratedCallFactory, method: bytes,
  977. request_serializer: SerializingFunction,
  978. response_deserializer: DeserializingFunction):
  979. self._channel = channel
  980. self._managed_call = managed_call
  981. self._method = method
  982. self._request_serializer = request_serializer
  983. self._response_deserializer = response_deserializer
  984. self._context = cygrpc.build_census_context()
  985. def __call__( # pylint: disable=too-many-locals
  986. self,
  987. request: Any,
  988. timeout: Optional[float] = None,
  989. metadata: Optional[MetadataType] = None,
  990. credentials: Optional[grpc.CallCredentials] = None,
  991. wait_for_ready: Optional[bool] = None,
  992. compression: Optional[
  993. grpc.Compression] = None) -> _MultiThreadedRendezvous:
  994. deadline, serialized_request, rendezvous = _start_unary_request(
  995. request, timeout, self._request_serializer)
  996. initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready(
  997. wait_for_ready)
  998. if serialized_request is None:
  999. raise rendezvous # pylint: disable-msg=raising-bad-type
  1000. else:
  1001. augmented_metadata = _compression.augment_metadata(
  1002. metadata, compression)
  1003. state = _RPCState(_UNARY_STREAM_INITIAL_DUE, None, None, None, None)
  1004. operations = (
  1005. (
  1006. cygrpc.SendInitialMetadataOperation(augmented_metadata,
  1007. initial_metadata_flags),
  1008. cygrpc.SendMessageOperation(serialized_request,
  1009. _EMPTY_FLAGS),
  1010. cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS),
  1011. cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),
  1012. ),
  1013. (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),),
  1014. )
  1015. call = self._managed_call(
  1016. cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS,
  1017. self._method, None, _determine_deadline(deadline), metadata,
  1018. None if credentials is None else credentials._credentials,
  1019. operations, _event_handler(state, self._response_deserializer),
  1020. self._context)
  1021. return _MultiThreadedRendezvous(state, call,
  1022. self._response_deserializer,
  1023. deadline)
  1024. class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable):
  1025. _channel: cygrpc.Channel
  1026. _managed_call: IntegratedCallFactory
  1027. _method: bytes
  1028. _request_serializer: Optional[SerializingFunction]
  1029. _response_deserializer: Optional[DeserializingFunction]
  1030. _context: Any
  1031. # pylint: disable=too-many-arguments
  1032. def __init__(self, channel: cygrpc.Channel,
  1033. managed_call: IntegratedCallFactory, method: bytes,
  1034. request_serializer: Optional[SerializingFunction],
  1035. response_deserializer: Optional[DeserializingFunction]):
  1036. self._channel = channel
  1037. self._managed_call = managed_call
  1038. self._method = method
  1039. self._request_serializer = request_serializer
  1040. self._response_deserializer = response_deserializer
  1041. self._context = cygrpc.build_census_context()
  1042. def _blocking(
  1043. self, request_iterator: Iterator, timeout: Optional[float],
  1044. metadata: Optional[MetadataType],
  1045. credentials: Optional[grpc.CallCredentials],
  1046. wait_for_ready: Optional[bool], compression: Optional[grpc.Compression]
  1047. ) -> Tuple[_RPCState, cygrpc.SegregatedCall]:
  1048. deadline = _deadline(timeout)
  1049. state = _RPCState(_STREAM_UNARY_INITIAL_DUE, None, None, None, None)
  1050. initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready(
  1051. wait_for_ready)
  1052. augmented_metadata = _compression.augment_metadata(
  1053. metadata, compression)
  1054. call = self._channel.segregated_call(
  1055. cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS, self._method,
  1056. None, _determine_deadline(deadline), augmented_metadata,
  1057. None if credentials is None else credentials._credentials,
  1058. _stream_unary_invocation_operations_and_tags(
  1059. augmented_metadata, initial_metadata_flags), self._context)
  1060. _consume_request_iterator(request_iterator, state, call,
  1061. self._request_serializer, None)
  1062. while True:
  1063. event = call.next_event()
  1064. with state.condition:
  1065. _handle_event(event, state, self._response_deserializer)
  1066. state.condition.notify_all()
  1067. if not state.due:
  1068. break
  1069. return state, call
  1070. def __call__(self,
  1071. request_iterator: Iterator,
  1072. timeout: Optional[float] = None,
  1073. metadata: Optional[MetadataType] = None,
  1074. credentials: Optional[grpc.CallCredentials] = None,
  1075. wait_for_ready: Optional[bool] = None,
  1076. compression: Optional[grpc.Compression] = None) -> Any:
  1077. state, call, = self._blocking(request_iterator, timeout, metadata,
  1078. credentials, wait_for_ready, compression)
  1079. return _end_unary_response_blocking(state, call, False, None)
  1080. def with_call(
  1081. self,
  1082. request_iterator: Iterator,
  1083. timeout: Optional[float] = None,
  1084. metadata: Optional[MetadataType] = None,
  1085. credentials: Optional[grpc.CallCredentials] = None,
  1086. wait_for_ready: Optional[bool] = None,
  1087. compression: Optional[grpc.Compression] = None
  1088. ) -> Tuple[Any, grpc.Call]:
  1089. state, call, = self._blocking(request_iterator, timeout, metadata,
  1090. credentials, wait_for_ready, compression)
  1091. return _end_unary_response_blocking(state, call, True, None)
  1092. def future(
  1093. self,
  1094. request_iterator: Iterator,
  1095. timeout: Optional[float] = None,
  1096. metadata: Optional[MetadataType] = None,
  1097. credentials: Optional[grpc.CallCredentials] = None,
  1098. wait_for_ready: Optional[bool] = None,
  1099. compression: Optional[grpc.Compression] = None
  1100. ) -> _MultiThreadedRendezvous:
  1101. deadline = _deadline(timeout)
  1102. state = _RPCState(_STREAM_UNARY_INITIAL_DUE, None, None, None, None)
  1103. event_handler = _event_handler(state, self._response_deserializer)
  1104. initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready(
  1105. wait_for_ready)
  1106. augmented_metadata = _compression.augment_metadata(
  1107. metadata, compression)
  1108. call = self._managed_call(
  1109. cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS, self._method,
  1110. None, deadline, augmented_metadata,
  1111. None if credentials is None else credentials._credentials,
  1112. _stream_unary_invocation_operations(metadata,
  1113. initial_metadata_flags),
  1114. event_handler, self._context)
  1115. _consume_request_iterator(request_iterator, state, call,
  1116. self._request_serializer, event_handler)
  1117. return _MultiThreadedRendezvous(state, call,
  1118. self._response_deserializer, deadline)
  1119. class _StreamStreamMultiCallable(grpc.StreamStreamMultiCallable):
  1120. _channel: cygrpc.Channel
  1121. _managed_call: IntegratedCallFactory
  1122. _method: bytes
  1123. _request_serializer: Optional[SerializingFunction]
  1124. _response_deserializer: Optional[DeserializingFunction]
  1125. _context: Any
  1126. # pylint: disable=too-many-arguments
  1127. def __init__(self,
  1128. channel: cygrpc.Channel,
  1129. managed_call: IntegratedCallFactory,
  1130. method: bytes,
  1131. request_serializer: Optional[SerializingFunction] = None,
  1132. response_deserializer: Optional[DeserializingFunction] = None):
  1133. self._channel = channel
  1134. self._managed_call = managed_call
  1135. self._method = method
  1136. self._request_serializer = request_serializer
  1137. self._response_deserializer = response_deserializer
  1138. self._context = cygrpc.build_census_context()
  1139. def __call__(
  1140. self,
  1141. request_iterator: Iterator,
  1142. timeout: Optional[float] = None,
  1143. metadata: Optional[MetadataType] = None,
  1144. credentials: Optional[grpc.CallCredentials] = None,
  1145. wait_for_ready: Optional[bool] = None,
  1146. compression: Optional[grpc.Compression] = None
  1147. ) -> _MultiThreadedRendezvous:
  1148. deadline = _deadline(timeout)
  1149. state = _RPCState(_STREAM_STREAM_INITIAL_DUE, None, None, None, None)
  1150. initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready(
  1151. wait_for_ready)
  1152. augmented_metadata = _compression.augment_metadata(
  1153. metadata, compression)
  1154. operations = (
  1155. (
  1156. cygrpc.SendInitialMetadataOperation(augmented_metadata,
  1157. initial_metadata_flags),
  1158. cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),
  1159. ),
  1160. (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),),
  1161. )
  1162. event_handler = _event_handler(state, self._response_deserializer)
  1163. call = self._managed_call(
  1164. cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS, self._method,
  1165. None, _determine_deadline(deadline), augmented_metadata,
  1166. None if credentials is None else credentials._credentials,
  1167. operations, event_handler, self._context)
  1168. _consume_request_iterator(request_iterator, state, call,
  1169. self._request_serializer, event_handler)
  1170. return _MultiThreadedRendezvous(state, call,
  1171. self._response_deserializer, deadline)
  1172. class _InitialMetadataFlags(int):
  1173. """Stores immutable initial metadata flags"""
  1174. def __new__(cls, value: int = _EMPTY_FLAGS):
  1175. value &= cygrpc.InitialMetadataFlags.used_mask
  1176. return super(_InitialMetadataFlags, cls).__new__(cls, value)
  1177. def with_wait_for_ready(self, wait_for_ready: Optional[bool]) -> int:
  1178. if wait_for_ready is not None:
  1179. if wait_for_ready:
  1180. return self.__class__(self | cygrpc.InitialMetadataFlags.wait_for_ready | \
  1181. cygrpc.InitialMetadataFlags.wait_for_ready_explicitly_set)
  1182. elif not wait_for_ready:
  1183. return self.__class__(self & ~cygrpc.InitialMetadataFlags.wait_for_ready | \
  1184. cygrpc.InitialMetadataFlags.wait_for_ready_explicitly_set)
  1185. return self
  1186. class _ChannelCallState(object):
  1187. channel: cygrpc.Channel
  1188. managed_calls: int
  1189. threading: bool
  1190. def __init__(self, channel: cygrpc.Channel):
  1191. self.lock = threading.Lock()
  1192. self.channel = channel
  1193. self.managed_calls = 0
  1194. self.threading = False
  1195. def reset_postfork_child(self) -> None:
  1196. self.managed_calls = 0
  1197. def __del__(self):
  1198. try:
  1199. self.channel.close(cygrpc.StatusCode.cancelled,
  1200. 'Channel deallocated!')
  1201. except (TypeError, AttributeError):
  1202. pass
  1203. def _run_channel_spin_thread(state: _ChannelCallState) -> None:
  1204. def channel_spin():
  1205. while True:
  1206. cygrpc.block_if_fork_in_progress(state)
  1207. event = state.channel.next_call_event()
  1208. if event.completion_type == cygrpc.CompletionType.queue_timeout:
  1209. continue
  1210. call_completed = event.tag(event)
  1211. if call_completed:
  1212. with state.lock:
  1213. state.managed_calls -= 1
  1214. if state.managed_calls == 0:
  1215. return
  1216. channel_spin_thread = cygrpc.ForkManagedThread(target=channel_spin)
  1217. channel_spin_thread.setDaemon(True)
  1218. channel_spin_thread.start()
  1219. def _channel_managed_call_management(state: _ChannelCallState):
  1220. # pylint: disable=too-many-arguments
  1221. def create(flags: int, method: bytes, host: Optional[str],
  1222. deadline: Optional[float], metadata: Optional[MetadataType],
  1223. credentials: Optional[cygrpc.CallCredentials],
  1224. operations: Sequence[Sequence[cygrpc.Operation]],
  1225. event_handler: UserTag, context) -> cygrpc.IntegratedCall:
  1226. """Creates a cygrpc.IntegratedCall.
  1227. Args:
  1228. flags: An integer bitfield of call flags.
  1229. method: The RPC method.
  1230. host: A host string for the created call.
  1231. deadline: A float to be the deadline of the created call or None if
  1232. the call is to have an infinite deadline.
  1233. metadata: The metadata for the call or None.
  1234. credentials: A cygrpc.CallCredentials or None.
  1235. operations: A sequence of sequences of cygrpc.Operations to be
  1236. started on the call.
  1237. event_handler: A behavior to call to handle the events resultant from
  1238. the operations on the call.
  1239. context: Context object for distributed tracing.
  1240. Returns:
  1241. A cygrpc.IntegratedCall with which to conduct an RPC.
  1242. """
  1243. operations_and_tags = tuple((
  1244. operation,
  1245. event_handler,
  1246. ) for operation in operations)
  1247. with state.lock:
  1248. call = state.channel.integrated_call(flags, method, host, deadline,
  1249. metadata, credentials,
  1250. operations_and_tags, context)
  1251. if state.managed_calls == 0:
  1252. state.managed_calls = 1
  1253. _run_channel_spin_thread(state)
  1254. else:
  1255. state.managed_calls += 1
  1256. return call
  1257. return create
  1258. class _ChannelConnectivityState(object):
  1259. lock: threading.RLock
  1260. channel: grpc.Channel
  1261. polling: bool
  1262. connectivity: grpc.ChannelConnectivity
  1263. try_to_connect: bool
  1264. # TODO(xuanwn): Refactor this: https://github.com/grpc/grpc/issues/31704
  1265. callbacks_and_connectivities: List[Sequence[Union[Callable[
  1266. [grpc.ChannelConnectivity], None], Optional[grpc.ChannelConnectivity]]]]
  1267. delivering: bool
  1268. def __init__(self, channel: grpc.Channel):
  1269. self.lock = threading.RLock()
  1270. self.channel = channel
  1271. self.polling = False
  1272. self.connectivity = None
  1273. self.try_to_connect = False
  1274. self.callbacks_and_connectivities = []
  1275. self.delivering = False
  1276. def reset_postfork_child(self) -> None:
  1277. self.polling = False
  1278. self.connectivity = None
  1279. self.try_to_connect = False
  1280. self.callbacks_and_connectivities = []
  1281. self.delivering = False
  1282. def _deliveries(
  1283. state: _ChannelConnectivityState
  1284. ) -> List[Callable[[grpc.ChannelConnectivity], None]]:
  1285. callbacks_needing_update = []
  1286. for callback_and_connectivity in state.callbacks_and_connectivities:
  1287. callback, callback_connectivity, = callback_and_connectivity
  1288. if callback_connectivity is not state.connectivity:
  1289. callbacks_needing_update.append(callback)
  1290. callback_and_connectivity[1] = state.connectivity
  1291. return callbacks_needing_update
  1292. def _deliver(
  1293. state: _ChannelConnectivityState,
  1294. initial_connectivity: grpc.ChannelConnectivity,
  1295. initial_callbacks: Sequence[Callable[[grpc.ChannelConnectivity], None]]
  1296. ) -> None:
  1297. connectivity = initial_connectivity
  1298. callbacks = initial_callbacks
  1299. while True:
  1300. for callback in callbacks:
  1301. cygrpc.block_if_fork_in_progress(state)
  1302. try:
  1303. callback(connectivity)
  1304. except Exception: # pylint: disable=broad-except
  1305. _LOGGER.exception(
  1306. _CHANNEL_SUBSCRIPTION_CALLBACK_ERROR_LOG_MESSAGE)
  1307. with state.lock:
  1308. callbacks = _deliveries(state)
  1309. if callbacks:
  1310. connectivity = state.connectivity
  1311. else:
  1312. state.delivering = False
  1313. return
  1314. def _spawn_delivery(
  1315. state: _ChannelConnectivityState,
  1316. callbacks: Sequence[Callable[[grpc.ChannelConnectivity],
  1317. None]]) -> None:
  1318. delivering_thread = cygrpc.ForkManagedThread(target=_deliver,
  1319. args=(
  1320. state,
  1321. state.connectivity,
  1322. callbacks,
  1323. ))
  1324. delivering_thread.setDaemon(True)
  1325. delivering_thread.start()
  1326. state.delivering = True
  1327. # NOTE(https://github.com/grpc/grpc/issues/3064): We'd rather not poll.
  1328. def _poll_connectivity(state: _ChannelConnectivityState, channel: grpc.Channel,
  1329. initial_try_to_connect: bool) -> None:
  1330. try_to_connect = initial_try_to_connect
  1331. connectivity = channel.check_connectivity_state(try_to_connect)
  1332. with state.lock:
  1333. state.connectivity = (
  1334. _common.
  1335. CYGRPC_CONNECTIVITY_STATE_TO_CHANNEL_CONNECTIVITY[connectivity])
  1336. callbacks = tuple(
  1337. callback for callback, unused_but_known_to_be_none_connectivity in
  1338. state.callbacks_and_connectivities)
  1339. for callback_and_connectivity in state.callbacks_and_connectivities:
  1340. callback_and_connectivity[1] = state.connectivity
  1341. if callbacks:
  1342. _spawn_delivery(state, callbacks)
  1343. while True:
  1344. event = channel.watch_connectivity_state(connectivity,
  1345. time.time() + 0.2)
  1346. cygrpc.block_if_fork_in_progress(state)
  1347. with state.lock:
  1348. if not state.callbacks_and_connectivities and not state.try_to_connect:
  1349. state.polling = False
  1350. state.connectivity = None
  1351. break
  1352. try_to_connect = state.try_to_connect
  1353. state.try_to_connect = False
  1354. if event.success or try_to_connect:
  1355. connectivity = channel.check_connectivity_state(try_to_connect)
  1356. with state.lock:
  1357. state.connectivity = (
  1358. _common.CYGRPC_CONNECTIVITY_STATE_TO_CHANNEL_CONNECTIVITY[
  1359. connectivity])
  1360. if not state.delivering:
  1361. callbacks = _deliveries(state)
  1362. if callbacks:
  1363. _spawn_delivery(state, callbacks)
  1364. def _subscribe(state: _ChannelConnectivityState,
  1365. callback: Callable[[grpc.ChannelConnectivity],
  1366. None], try_to_connect: bool) -> None:
  1367. with state.lock:
  1368. if not state.callbacks_and_connectivities and not state.polling:
  1369. polling_thread = cygrpc.ForkManagedThread(
  1370. target=_poll_connectivity,
  1371. args=(state, state.channel, bool(try_to_connect)))
  1372. polling_thread.setDaemon(True)
  1373. polling_thread.start()
  1374. state.polling = True
  1375. state.callbacks_and_connectivities.append([callback, None])
  1376. elif not state.delivering and state.connectivity is not None:
  1377. _spawn_delivery(state, (callback,))
  1378. state.try_to_connect |= bool(try_to_connect)
  1379. state.callbacks_and_connectivities.append(
  1380. [callback, state.connectivity])
  1381. else:
  1382. state.try_to_connect |= bool(try_to_connect)
  1383. state.callbacks_and_connectivities.append([callback, None])
  1384. def _unsubscribe(state: _ChannelConnectivityState,
  1385. callback: Callable[[grpc.ChannelConnectivity], None]) -> None:
  1386. with state.lock:
  1387. for index, (subscribed_callback, unused_connectivity) in enumerate(
  1388. state.callbacks_and_connectivities):
  1389. if callback == subscribed_callback:
  1390. state.callbacks_and_connectivities.pop(index)
  1391. break
  1392. def _augment_options(
  1393. base_options: Sequence[ChannelArgumentType],
  1394. compression: Optional[grpc.Compression]
  1395. ) -> Sequence[ChannelArgumentType]:
  1396. compression_option = _compression.create_channel_option(compression)
  1397. return tuple(base_options) + compression_option + ((
  1398. cygrpc.ChannelArgKey.primary_user_agent_string,
  1399. _USER_AGENT,
  1400. ),)
  1401. def _separate_channel_options(
  1402. options: Sequence[ChannelArgumentType]
  1403. ) -> Tuple[Sequence[ChannelArgumentType], Sequence[ChannelArgumentType]]:
  1404. """Separates core channel options from Python channel options."""
  1405. core_options = []
  1406. python_options = []
  1407. for pair in options:
  1408. if pair[0] == grpc.experimental.ChannelOptions.SingleThreadedUnaryStream:
  1409. python_options.append(pair)
  1410. else:
  1411. core_options.append(pair)
  1412. return python_options, core_options
  1413. class Channel(grpc.Channel):
  1414. """A cygrpc.Channel-backed implementation of grpc.Channel."""
  1415. _single_threaded_unary_stream: bool
  1416. _channel: cygrpc.Channel
  1417. _call_state: _ChannelCallState
  1418. _connectivity_state: _ChannelConnectivityState
  1419. def __init__(self, target: str, options: Sequence[ChannelArgumentType],
  1420. credentials: Optional[grpc.ChannelCredentials],
  1421. compression: Optional[grpc.Compression]):
  1422. """Constructor.
  1423. Args:
  1424. target: The target to which to connect.
  1425. options: Configuration options for the channel.
  1426. credentials: A cygrpc.ChannelCredentials or None.
  1427. compression: An optional value indicating the compression method to be
  1428. used over the lifetime of the channel.
  1429. """
  1430. python_options, core_options = _separate_channel_options(options)
  1431. self._single_threaded_unary_stream = _DEFAULT_SINGLE_THREADED_UNARY_STREAM
  1432. self._process_python_options(python_options)
  1433. self._channel = cygrpc.Channel(
  1434. _common.encode(target), _augment_options(core_options, compression),
  1435. credentials)
  1436. self._call_state = _ChannelCallState(self._channel)
  1437. self._connectivity_state = _ChannelConnectivityState(self._channel)
  1438. cygrpc.fork_register_channel(self)
  1439. if cygrpc.g_gevent_activated:
  1440. cygrpc.gevent_increment_channel_count()
  1441. def _process_python_options(
  1442. self, python_options: Sequence[ChannelArgumentType]) -> None:
  1443. """Sets channel attributes according to python-only channel options."""
  1444. for pair in python_options:
  1445. if pair[0] == grpc.experimental.ChannelOptions.SingleThreadedUnaryStream:
  1446. self._single_threaded_unary_stream = True
  1447. def subscribe(self,
  1448. callback: Callable[[grpc.ChannelConnectivity], None],
  1449. try_to_connect: Optional[bool] = None) -> None:
  1450. _subscribe(self._connectivity_state, callback, try_to_connect)
  1451. def unsubscribe(
  1452. self, callback: Callable[[grpc.ChannelConnectivity], None]) -> None:
  1453. _unsubscribe(self._connectivity_state, callback)
  1454. def unary_unary(
  1455. self,
  1456. method: str,
  1457. request_serializer: Optional[SerializingFunction] = None,
  1458. response_deserializer: Optional[DeserializingFunction] = None
  1459. ) -> grpc.UnaryUnaryMultiCallable:
  1460. return _UnaryUnaryMultiCallable(
  1461. self._channel, _channel_managed_call_management(self._call_state),
  1462. _common.encode(method), request_serializer, response_deserializer)
  1463. def unary_stream(
  1464. self,
  1465. method: str,
  1466. request_serializer: Optional[SerializingFunction] = None,
  1467. response_deserializer: Optional[DeserializingFunction] = None
  1468. ) -> grpc.UnaryStreamMultiCallable:
  1469. # NOTE(rbellevi): Benchmarks have shown that running a unary-stream RPC
  1470. # on a single Python thread results in an appreciable speed-up. However,
  1471. # due to slight differences in capability, the multi-threaded variant
  1472. # remains the default.
  1473. if self._single_threaded_unary_stream:
  1474. return _SingleThreadedUnaryStreamMultiCallable(
  1475. self._channel, _common.encode(method), request_serializer,
  1476. response_deserializer)
  1477. else:
  1478. return _UnaryStreamMultiCallable(
  1479. self._channel,
  1480. _channel_managed_call_management(self._call_state),
  1481. _common.encode(method), request_serializer,
  1482. response_deserializer)
  1483. def stream_unary(
  1484. self,
  1485. method: str,
  1486. request_serializer: Optional[SerializingFunction] = None,
  1487. response_deserializer: Optional[DeserializingFunction] = None
  1488. ) -> grpc.StreamUnaryMultiCallable:
  1489. return _StreamUnaryMultiCallable(
  1490. self._channel, _channel_managed_call_management(self._call_state),
  1491. _common.encode(method), request_serializer, response_deserializer)
  1492. def stream_stream(
  1493. self,
  1494. method: str,
  1495. request_serializer: Optional[SerializingFunction] = None,
  1496. response_deserializer: Optional[DeserializingFunction] = None
  1497. ) -> grpc.StreamStreamMultiCallable:
  1498. return _StreamStreamMultiCallable(
  1499. self._channel, _channel_managed_call_management(self._call_state),
  1500. _common.encode(method), request_serializer, response_deserializer)
  1501. def _unsubscribe_all(self) -> None:
  1502. state = self._connectivity_state
  1503. if state:
  1504. with state.lock:
  1505. del state.callbacks_and_connectivities[:]
  1506. def _close(self) -> None:
  1507. self._unsubscribe_all()
  1508. self._channel.close(cygrpc.StatusCode.cancelled, 'Channel closed!')
  1509. cygrpc.fork_unregister_channel(self)
  1510. if cygrpc.g_gevent_activated:
  1511. cygrpc.gevent_decrement_channel_count()
  1512. def _close_on_fork(self) -> None:
  1513. self._unsubscribe_all()
  1514. self._channel.close_on_fork(cygrpc.StatusCode.cancelled,
  1515. 'Channel closed due to fork')
  1516. def __enter__(self):
  1517. return self
  1518. def __exit__(self, exc_type, exc_val, exc_tb):
  1519. self._close()
  1520. return False
  1521. def close(self) -> None:
  1522. self._close()
  1523. def __del__(self):
  1524. # TODO(https://github.com/grpc/grpc/issues/12531): Several releases
  1525. # after 1.12 (1.16 or thereabouts?) add a "self._channel.close" call
  1526. # here (or more likely, call self._close() here). We don't do this today
  1527. # because many valid use cases today allow the channel to be deleted
  1528. # immediately after stubs are created. After a sufficient period of time
  1529. # has passed for all users to be trusted to freeze out to their channels
  1530. # for as long as they are in use and to close them after using them,
  1531. # then deletion of this grpc._channel.Channel instance can be made to
  1532. # effect closure of the underlying cygrpc.Channel instance.
  1533. try:
  1534. self._unsubscribe_all()
  1535. except: # pylint: disable=bare-except
  1536. # Exceptions in __del__ are ignored by Python anyway, but they can
  1537. # keep spamming logs. Just silence them.
  1538. pass