_utilities.py 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168
  1. # Copyright 2015 gRPC authors.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. """Internal utilities for gRPC Python."""
  15. import collections
  16. import logging
  17. import threading
  18. import time
  19. import grpc
  20. from grpc import _common
  21. import six
  22. _LOGGER = logging.getLogger(__name__)
  23. _DONE_CALLBACK_EXCEPTION_LOG_MESSAGE = (
  24. 'Exception calling connectivity future "done" callback!')
  25. class RpcMethodHandler(
  26. collections.namedtuple('_RpcMethodHandler', (
  27. 'request_streaming',
  28. 'response_streaming',
  29. 'request_deserializer',
  30. 'response_serializer',
  31. 'unary_unary',
  32. 'unary_stream',
  33. 'stream_unary',
  34. 'stream_stream',
  35. )), grpc.RpcMethodHandler):
  36. pass
  37. class DictionaryGenericHandler(grpc.ServiceRpcHandler):
  38. def __init__(self, service, method_handlers):
  39. self._name = service
  40. self._method_handlers = {
  41. _common.fully_qualified_method(service, method): method_handler
  42. for method, method_handler in six.iteritems(method_handlers)
  43. }
  44. def service_name(self):
  45. return self._name
  46. def service(self, handler_call_details):
  47. return self._method_handlers.get(handler_call_details.method)
  48. class _ChannelReadyFuture(grpc.Future):
  49. def __init__(self, channel):
  50. self._condition = threading.Condition()
  51. self._channel = channel
  52. self._matured = False
  53. self._cancelled = False
  54. self._done_callbacks = []
  55. def _block(self, timeout):
  56. until = None if timeout is None else time.time() + timeout
  57. with self._condition:
  58. while True:
  59. if self._cancelled:
  60. raise grpc.FutureCancelledError()
  61. elif self._matured:
  62. return
  63. else:
  64. if until is None:
  65. self._condition.wait()
  66. else:
  67. remaining = until - time.time()
  68. if remaining < 0:
  69. raise grpc.FutureTimeoutError()
  70. else:
  71. self._condition.wait(timeout=remaining)
  72. def _update(self, connectivity):
  73. with self._condition:
  74. if (not self._cancelled and
  75. connectivity is grpc.ChannelConnectivity.READY):
  76. self._matured = True
  77. self._channel.unsubscribe(self._update)
  78. self._condition.notify_all()
  79. done_callbacks = tuple(self._done_callbacks)
  80. self._done_callbacks = None
  81. else:
  82. return
  83. for done_callback in done_callbacks:
  84. try:
  85. done_callback(self)
  86. except Exception: # pylint: disable=broad-except
  87. _LOGGER.exception(_DONE_CALLBACK_EXCEPTION_LOG_MESSAGE)
  88. def cancel(self):
  89. with self._condition:
  90. if not self._matured:
  91. self._cancelled = True
  92. self._channel.unsubscribe(self._update)
  93. self._condition.notify_all()
  94. done_callbacks = tuple(self._done_callbacks)
  95. self._done_callbacks = None
  96. else:
  97. return False
  98. for done_callback in done_callbacks:
  99. try:
  100. done_callback(self)
  101. except Exception: # pylint: disable=broad-except
  102. _LOGGER.exception(_DONE_CALLBACK_EXCEPTION_LOG_MESSAGE)
  103. return True
  104. def cancelled(self):
  105. with self._condition:
  106. return self._cancelled
  107. def running(self):
  108. with self._condition:
  109. return not self._cancelled and not self._matured
  110. def done(self):
  111. with self._condition:
  112. return self._cancelled or self._matured
  113. def result(self, timeout=None):
  114. self._block(timeout)
  115. def exception(self, timeout=None):
  116. self._block(timeout)
  117. def traceback(self, timeout=None):
  118. self._block(timeout)
  119. def add_done_callback(self, fn):
  120. with self._condition:
  121. if not self._cancelled and not self._matured:
  122. self._done_callbacks.append(fn)
  123. return
  124. fn(self)
  125. def start(self):
  126. with self._condition:
  127. self._channel.subscribe(self._update, try_to_connect=True)
  128. def __del__(self):
  129. with self._condition:
  130. if not self._cancelled and not self._matured:
  131. self._channel.unsubscribe(self._update)
  132. def channel_ready_future(channel):
  133. ready_future = _ChannelReadyFuture(channel)
  134. ready_future.start()
  135. return ready_future