_producer_helpers.py 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118
  1. # -*- test-case-name: twisted.protocols.test.test_tls,twisted.web.test.test_http2 -*-
  2. # Copyright (c) Twisted Matrix Laboratories.
  3. # See LICENSE for details.
  4. """
  5. Helpers for working with producers.
  6. """
  7. from typing import List
  8. from zope.interface import implementer
  9. from twisted.internet.interfaces import IPushProducer
  10. from twisted.internet.task import cooperate
  11. from twisted.logger import Logger
  12. _log = Logger()
  13. # This module exports nothing public, it's for internal Twisted use only.
  14. __all__: List[str] = []
  15. @implementer(IPushProducer)
  16. class _PullToPush:
  17. """
  18. An adapter that converts a non-streaming to a streaming producer.
  19. Because of limitations of the producer API, this adapter requires the
  20. cooperation of the consumer. When the consumer's C{registerProducer} is
  21. called with a non-streaming producer, it must wrap it with L{_PullToPush}
  22. and then call C{startStreaming} on the resulting object. When the
  23. consumer's C{unregisterProducer} is called, it must call
  24. C{stopStreaming} on the L{_PullToPush} instance.
  25. If the underlying producer throws an exception from C{resumeProducing},
  26. the producer will be unregistered from the consumer.
  27. @ivar _producer: the underling non-streaming producer.
  28. @ivar _consumer: the consumer with which the underlying producer was
  29. registered.
  30. @ivar _finished: C{bool} indicating whether the producer has finished.
  31. @ivar _coopTask: the result of calling L{cooperate}, the task driving the
  32. streaming producer.
  33. """
  34. _finished = False
  35. def __init__(self, pullProducer, consumer):
  36. self._producer = pullProducer
  37. self._consumer = consumer
  38. def _pull(self):
  39. """
  40. A generator that calls C{resumeProducing} on the underlying producer
  41. forever.
  42. If C{resumeProducing} throws an exception, the producer is
  43. unregistered, which should result in streaming stopping.
  44. """
  45. while True:
  46. with _log.failuresHandled(
  47. "while calling resumeProducing on {producer}", producer=self._producer
  48. ) as op:
  49. self._producer.resumeProducing()
  50. if op.failed:
  51. with _log.failuresHandled(
  52. "while calling unregisterProducer on {consumer}",
  53. consumer=self._consumer,
  54. ) as handlingop:
  55. self._consumer.unregisterProducer()
  56. if handlingop.failed:
  57. # The consumer should now call stopStreaming() on us,
  58. # thus stopping the streaming.
  59. self._finished = True
  60. return
  61. yield None
  62. def startStreaming(self):
  63. """
  64. This should be called by the consumer when the producer is registered.
  65. Start streaming data to the consumer.
  66. """
  67. self._coopTask = cooperate(self._pull())
  68. def stopStreaming(self):
  69. """
  70. This should be called by the consumer when the producer is
  71. unregistered.
  72. Stop streaming data to the consumer.
  73. """
  74. if self._finished:
  75. return
  76. self._finished = True
  77. self._coopTask.stop()
  78. def pauseProducing(self):
  79. """
  80. @see: C{IPushProducer.pauseProducing}
  81. """
  82. self._coopTask.pause()
  83. def resumeProducing(self):
  84. """
  85. @see: C{IPushProducer.resumeProducing}
  86. """
  87. self._coopTask.resume()
  88. def stopProducing(self):
  89. """
  90. @see: C{IPushProducer.stopProducing}
  91. """
  92. self.stopStreaming()
  93. self._producer.stopProducing()