stream_util.py 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148
  1. # Copyright 2015 gRPC authors.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. """Helpful utilities related to the stream module."""
  15. import logging
  16. import threading
  17. from grpc.framework.foundation import stream
  18. _NO_VALUE = object()
  19. _LOGGER = logging.getLogger(__name__)
  20. class TransformingConsumer(stream.Consumer):
  21. """A stream.Consumer that passes a transformation of its input to another."""
  22. def __init__(self, transformation, downstream):
  23. self._transformation = transformation
  24. self._downstream = downstream
  25. def consume(self, value):
  26. self._downstream.consume(self._transformation(value))
  27. def terminate(self):
  28. self._downstream.terminate()
  29. def consume_and_terminate(self, value):
  30. self._downstream.consume_and_terminate(self._transformation(value))
  31. class IterableConsumer(stream.Consumer):
  32. """A Consumer that when iterated over emits the values it has consumed."""
  33. def __init__(self):
  34. self._condition = threading.Condition()
  35. self._values = []
  36. self._active = True
  37. def consume(self, value):
  38. with self._condition:
  39. if self._active:
  40. self._values.append(value)
  41. self._condition.notify()
  42. def terminate(self):
  43. with self._condition:
  44. self._active = False
  45. self._condition.notify()
  46. def consume_and_terminate(self, value):
  47. with self._condition:
  48. if self._active:
  49. self._values.append(value)
  50. self._active = False
  51. self._condition.notify()
  52. def __iter__(self):
  53. return self
  54. def __next__(self):
  55. return self.next()
  56. def next(self):
  57. with self._condition:
  58. while self._active and not self._values:
  59. self._condition.wait()
  60. if self._values:
  61. return self._values.pop(0)
  62. else:
  63. raise StopIteration()
  64. class ThreadSwitchingConsumer(stream.Consumer):
  65. """A Consumer decorator that affords serialization and asynchrony."""
  66. def __init__(self, sink, pool):
  67. self._lock = threading.Lock()
  68. self._sink = sink
  69. self._pool = pool
  70. # True if self._spin has been submitted to the pool to be called once and
  71. # that call has not yet returned, False otherwise.
  72. self._spinning = False
  73. self._values = []
  74. self._active = True
  75. def _spin(self, sink, value, terminate):
  76. while True:
  77. try:
  78. if value is _NO_VALUE:
  79. sink.terminate()
  80. elif terminate:
  81. sink.consume_and_terminate(value)
  82. else:
  83. sink.consume(value)
  84. except Exception as e: # pylint:disable=broad-except
  85. _LOGGER.exception(e)
  86. with self._lock:
  87. if terminate:
  88. self._spinning = False
  89. return
  90. elif self._values:
  91. value = self._values.pop(0)
  92. terminate = not self._values and not self._active
  93. elif not self._active:
  94. value = _NO_VALUE
  95. terminate = True
  96. else:
  97. self._spinning = False
  98. return
  99. def consume(self, value):
  100. with self._lock:
  101. if self._active:
  102. if self._spinning:
  103. self._values.append(value)
  104. else:
  105. self._pool.submit(self._spin, self._sink, value, False)
  106. self._spinning = True
  107. def terminate(self):
  108. with self._lock:
  109. if self._active:
  110. self._active = False
  111. if not self._spinning:
  112. self._pool.submit(self._spin, self._sink, _NO_VALUE, True)
  113. self._spinning = True
  114. def consume_and_terminate(self, value):
  115. with self._lock:
  116. if self._active:
  117. self._active = False
  118. if self._spinning:
  119. self._values.append(value)
  120. else:
  121. self._pool.submit(self._spin, self._sink, value, True)
  122. self._spinning = True