_producer_helpers.py 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125
  1. # -*- test-case-name: twisted.test.test_producer_helpers -*-
  2. # Copyright (c) Twisted Matrix Laboratories.
  3. # See LICENSE for details.
  4. """
  5. Helpers for working with producers.
  6. """
  7. from __future__ import division, absolute_import
  8. from zope.interface import implementer
  9. from twisted.internet.interfaces import IPushProducer
  10. from twisted.internet.task import cooperate
  11. from twisted.python import log
  12. from twisted.python.reflect import safe_str
  13. # This module exports nothing public, it's for internal Twisted use only.
  14. __all__ = []
  15. @implementer(IPushProducer)
  16. class _PullToPush(object):
  17. """
  18. An adapter that converts a non-streaming to a streaming producer.
  19. Because of limitations of the producer API, this adapter requires the
  20. cooperation of the consumer. When the consumer's C{registerProducer} is
  21. called with a non-streaming producer, it must wrap it with L{_PullToPush}
  22. and then call C{startStreaming} on the resulting object. When the
  23. consumer's C{unregisterProducer} is called, it must call
  24. C{stopStreaming} on the L{_PullToPush} instance.
  25. If the underlying producer throws an exception from C{resumeProducing},
  26. the producer will be unregistered from the consumer.
  27. @ivar _producer: the underling non-streaming producer.
  28. @ivar _consumer: the consumer with which the underlying producer was
  29. registered.
  30. @ivar _finished: C{bool} indicating whether the producer has finished.
  31. @ivar _coopTask: the result of calling L{cooperate}, the task driving the
  32. streaming producer.
  33. """
  34. _finished = False
  35. def __init__(self, pullProducer, consumer):
  36. self._producer = pullProducer
  37. self._consumer = consumer
  38. def _pull(self):
  39. """
  40. A generator that calls C{resumeProducing} on the underlying producer
  41. forever.
  42. If C{resumeProducing} throws an exception, the producer is
  43. unregistered, which should result in streaming stopping.
  44. """
  45. while True:
  46. try:
  47. self._producer.resumeProducing()
  48. except:
  49. log.err(None, "%s failed, producing will be stopped:" %
  50. (safe_str(self._producer),))
  51. try:
  52. self._consumer.unregisterProducer()
  53. # The consumer should now call stopStreaming() on us,
  54. # thus stopping the streaming.
  55. except:
  56. # Since the consumer blew up, we may not have had
  57. # stopStreaming() called, so we just stop on our own:
  58. log.err(None, "%s failed to unregister producer:" %
  59. (safe_str(self._consumer),))
  60. self._finished = True
  61. return
  62. yield None
  63. def startStreaming(self):
  64. """
  65. This should be called by the consumer when the producer is registered.
  66. Start streaming data to the consumer.
  67. """
  68. self._coopTask = cooperate(self._pull())
  69. def stopStreaming(self):
  70. """
  71. This should be called by the consumer when the producer is
  72. unregistered.
  73. Stop streaming data to the consumer.
  74. """
  75. if self._finished:
  76. return
  77. self._finished = True
  78. self._coopTask.stop()
  79. def pauseProducing(self):
  80. """
  81. @see: C{IPushProducer.pauseProducing}
  82. """
  83. self._coopTask.pause()
  84. def resumeProducing(self):
  85. """
  86. @see: C{IPushProducer.resumeProducing}
  87. """
  88. self._coopTask.resume()
  89. def stopProducing(self):
  90. """
  91. @see: C{IPushProducer.stopProducing}
  92. """
  93. self.stopStreaming()
  94. self._producer.stopProducing()