utilities.py 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149
  1. # Copyright 2015 gRPC authors.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. """Utilities for the gRPC Python Beta API."""
  15. import threading
  16. import time
  17. # implementations is referenced from specification in this module.
  18. from grpc.beta import implementations # pylint: disable=unused-import
  19. from grpc.beta import interfaces
  20. from grpc.framework.foundation import callable_util
  21. from grpc.framework.foundation import future
  22. _DONE_CALLBACK_EXCEPTION_LOG_MESSAGE = (
  23. 'Exception calling connectivity future "done" callback!')
  24. class _ChannelReadyFuture(future.Future):
  25. def __init__(self, channel):
  26. self._condition = threading.Condition()
  27. self._channel = channel
  28. self._matured = False
  29. self._cancelled = False
  30. self._done_callbacks = []
  31. def _block(self, timeout):
  32. until = None if timeout is None else time.time() + timeout
  33. with self._condition:
  34. while True:
  35. if self._cancelled:
  36. raise future.CancelledError()
  37. elif self._matured:
  38. return
  39. else:
  40. if until is None:
  41. self._condition.wait()
  42. else:
  43. remaining = until - time.time()
  44. if remaining < 0:
  45. raise future.TimeoutError()
  46. else:
  47. self._condition.wait(timeout=remaining)
  48. def _update(self, connectivity):
  49. with self._condition:
  50. if (not self._cancelled and
  51. connectivity is interfaces.ChannelConnectivity.READY):
  52. self._matured = True
  53. self._channel.unsubscribe(self._update)
  54. self._condition.notify_all()
  55. done_callbacks = tuple(self._done_callbacks)
  56. self._done_callbacks = None
  57. else:
  58. return
  59. for done_callback in done_callbacks:
  60. callable_util.call_logging_exceptions(
  61. done_callback, _DONE_CALLBACK_EXCEPTION_LOG_MESSAGE, self)
  62. def cancel(self):
  63. with self._condition:
  64. if not self._matured:
  65. self._cancelled = True
  66. self._channel.unsubscribe(self._update)
  67. self._condition.notify_all()
  68. done_callbacks = tuple(self._done_callbacks)
  69. self._done_callbacks = None
  70. else:
  71. return False
  72. for done_callback in done_callbacks:
  73. callable_util.call_logging_exceptions(
  74. done_callback, _DONE_CALLBACK_EXCEPTION_LOG_MESSAGE, self)
  75. return True
  76. def cancelled(self):
  77. with self._condition:
  78. return self._cancelled
  79. def running(self):
  80. with self._condition:
  81. return not self._cancelled and not self._matured
  82. def done(self):
  83. with self._condition:
  84. return self._cancelled or self._matured
  85. def result(self, timeout=None):
  86. self._block(timeout)
  87. return None
  88. def exception(self, timeout=None):
  89. self._block(timeout)
  90. return None
  91. def traceback(self, timeout=None):
  92. self._block(timeout)
  93. return None
  94. def add_done_callback(self, fn):
  95. with self._condition:
  96. if not self._cancelled and not self._matured:
  97. self._done_callbacks.append(fn)
  98. return
  99. fn(self)
  100. def start(self):
  101. with self._condition:
  102. self._channel.subscribe(self._update, try_to_connect=True)
  103. def __del__(self):
  104. with self._condition:
  105. if not self._cancelled and not self._matured:
  106. self._channel.unsubscribe(self._update)
  107. def channel_ready_future(channel):
  108. """Creates a future.Future tracking when an implementations.Channel is ready.
  109. Cancelling the returned future.Future does not tell the given
  110. implementations.Channel to abandon attempts it may have been making to
  111. connect; cancelling merely deactivates the return future.Future's
  112. subscription to the given implementations.Channel's connectivity.
  113. Args:
  114. channel: An implementations.Channel.
  115. Returns:
  116. A future.Future that matures when the given Channel has connectivity
  117. interfaces.ChannelConnectivity.READY.
  118. """
  119. ready_future = _ChannelReadyFuture(channel)
  120. ready_future.start()
  121. return ready_future