_utilities.py 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180
  1. # Copyright 2015 gRPC authors.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. """Internal utilities for gRPC Python."""
  15. import collections
  16. import logging
  17. import threading
  18. import time
  19. from typing import Callable, Dict, Optional, Sequence
  20. import grpc # pytype: disable=pyi-error
  21. from grpc import _common # pytype: disable=pyi-error
  22. from grpc._typing import DoneCallbackType
  23. _LOGGER = logging.getLogger(__name__)
  24. _DONE_CALLBACK_EXCEPTION_LOG_MESSAGE = (
  25. 'Exception calling connectivity future "done" callback!')
  26. class RpcMethodHandler(
  27. collections.namedtuple('_RpcMethodHandler', (
  28. 'request_streaming',
  29. 'response_streaming',
  30. 'request_deserializer',
  31. 'response_serializer',
  32. 'unary_unary',
  33. 'unary_stream',
  34. 'stream_unary',
  35. 'stream_stream',
  36. )), grpc.RpcMethodHandler):
  37. pass
  38. class DictionaryGenericHandler(grpc.ServiceRpcHandler):
  39. _name: str
  40. _method_handlers: Dict[str, grpc.RpcMethodHandler]
  41. def __init__(self, service: str,
  42. method_handlers: Dict[str, grpc.RpcMethodHandler]):
  43. self._name = service
  44. self._method_handlers = {
  45. _common.fully_qualified_method(service, method): method_handler
  46. for method, method_handler in method_handlers.items()
  47. }
  48. def service_name(self) -> str:
  49. return self._name
  50. def service(
  51. self, handler_call_details: grpc.HandlerCallDetails
  52. ) -> Optional[grpc.RpcMethodHandler]:
  53. details_method = handler_call_details.method
  54. return self._method_handlers.get(details_method) # pytype: disable=attribute-error
  55. class _ChannelReadyFuture(grpc.Future):
  56. _condition: threading.Condition
  57. _channel: grpc.Channel
  58. _matured: bool
  59. _cancelled: bool
  60. _done_callbacks: Sequence[Callable]
  61. def __init__(self, channel: grpc.Channel):
  62. self._condition = threading.Condition()
  63. self._channel = channel
  64. self._matured = False
  65. self._cancelled = False
  66. self._done_callbacks = []
  67. def _block(self, timeout: Optional[float]) -> None:
  68. until = None if timeout is None else time.time() + timeout
  69. with self._condition:
  70. while True:
  71. if self._cancelled:
  72. raise grpc.FutureCancelledError()
  73. elif self._matured:
  74. return
  75. else:
  76. if until is None:
  77. self._condition.wait()
  78. else:
  79. remaining = until - time.time()
  80. if remaining < 0:
  81. raise grpc.FutureTimeoutError()
  82. else:
  83. self._condition.wait(timeout=remaining)
  84. def _update(self, connectivity: Optional[grpc.ChannelConnectivity]) -> None:
  85. with self._condition:
  86. if (not self._cancelled and
  87. connectivity is grpc.ChannelConnectivity.READY):
  88. self._matured = True
  89. self._channel.unsubscribe(self._update)
  90. self._condition.notify_all()
  91. done_callbacks = tuple(self._done_callbacks)
  92. self._done_callbacks = None
  93. else:
  94. return
  95. for done_callback in done_callbacks:
  96. try:
  97. done_callback(self)
  98. except Exception: # pylint: disable=broad-except
  99. _LOGGER.exception(_DONE_CALLBACK_EXCEPTION_LOG_MESSAGE)
  100. def cancel(self) -> bool:
  101. with self._condition:
  102. if not self._matured:
  103. self._cancelled = True
  104. self._channel.unsubscribe(self._update)
  105. self._condition.notify_all()
  106. done_callbacks = tuple(self._done_callbacks)
  107. self._done_callbacks = None
  108. else:
  109. return False
  110. for done_callback in done_callbacks:
  111. try:
  112. done_callback(self)
  113. except Exception: # pylint: disable=broad-except
  114. _LOGGER.exception(_DONE_CALLBACK_EXCEPTION_LOG_MESSAGE)
  115. return True
  116. def cancelled(self) -> bool:
  117. with self._condition:
  118. return self._cancelled
  119. def running(self) -> bool:
  120. with self._condition:
  121. return not self._cancelled and not self._matured
  122. def done(self) -> bool:
  123. with self._condition:
  124. return self._cancelled or self._matured
  125. def result(self, timeout: Optional[float] = None) -> None:
  126. self._block(timeout)
  127. def exception(self, timeout: Optional[float] = None) -> None:
  128. self._block(timeout)
  129. def traceback(self, timeout: Optional[float] = None) -> None:
  130. self._block(timeout)
  131. def add_done_callback(self, fn: DoneCallbackType):
  132. with self._condition:
  133. if not self._cancelled and not self._matured:
  134. self._done_callbacks.append(fn)
  135. return
  136. fn(self)
  137. def start(self):
  138. with self._condition:
  139. self._channel.subscribe(self._update, try_to_connect=True)
  140. def __del__(self):
  141. with self._condition:
  142. if not self._cancelled and not self._matured:
  143. self._channel.unsubscribe(self._update)
  144. def channel_ready_future(channel: grpc.Channel) -> _ChannelReadyFuture:
  145. ready_future = _ChannelReadyFuture(channel)
  146. ready_future.start()
  147. return ready_future