pcp.py 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211
  1. # -*- test-case-name: twisted.test.test_pcp -*-
  2. # Copyright (c) Twisted Matrix Laboratories.
  3. # See LICENSE for details.
  4. """
  5. Producer-Consumer Proxy.
  6. """
  7. from zope.interface import implementer
  8. from twisted.internet import interfaces
  9. @implementer(interfaces.IProducer, interfaces.IConsumer)
  10. class BasicProducerConsumerProxy:
  11. """
  12. I can act as a man in the middle between any Producer and Consumer.
  13. @ivar producer: the Producer I subscribe to.
  14. @type producer: L{IProducer<interfaces.IProducer>}
  15. @ivar consumer: the Consumer I publish to.
  16. @type consumer: L{IConsumer<interfaces.IConsumer>}
  17. @ivar paused: As a Producer, am I paused?
  18. @type paused: bool
  19. """
  20. consumer = None
  21. producer = None
  22. producerIsStreaming = None
  23. iAmStreaming = True
  24. outstandingPull = False
  25. paused = False
  26. stopped = False
  27. def __init__(self, consumer):
  28. self._buffer = []
  29. if consumer is not None:
  30. self.consumer = consumer
  31. consumer.registerProducer(self, self.iAmStreaming)
  32. # Producer methods:
  33. def pauseProducing(self):
  34. self.paused = True
  35. if self.producer:
  36. self.producer.pauseProducing()
  37. def resumeProducing(self):
  38. self.paused = False
  39. if self._buffer:
  40. # TODO: Check to see if consumer supports writeSeq.
  41. self.consumer.write("".join(self._buffer))
  42. self._buffer[:] = []
  43. else:
  44. if not self.iAmStreaming:
  45. self.outstandingPull = True
  46. if self.producer is not None:
  47. self.producer.resumeProducing()
  48. def stopProducing(self):
  49. if self.producer is not None:
  50. self.producer.stopProducing()
  51. if self.consumer is not None:
  52. del self.consumer
  53. # Consumer methods:
  54. def write(self, data):
  55. if self.paused or (not self.iAmStreaming and not self.outstandingPull):
  56. # We could use that fifo queue here.
  57. self._buffer.append(data)
  58. elif self.consumer is not None:
  59. self.consumer.write(data)
  60. self.outstandingPull = False
  61. def finish(self):
  62. if self.consumer is not None:
  63. self.consumer.finish()
  64. self.unregisterProducer()
  65. def registerProducer(self, producer, streaming):
  66. self.producer = producer
  67. self.producerIsStreaming = streaming
  68. def unregisterProducer(self):
  69. if self.producer is not None:
  70. del self.producer
  71. del self.producerIsStreaming
  72. if self.consumer:
  73. self.consumer.unregisterProducer()
  74. def __repr__(self) -> str:
  75. return f"<{self.__class__}@{id(self):x} around {self.consumer}>"
  76. class ProducerConsumerProxy(BasicProducerConsumerProxy):
  77. """ProducerConsumerProxy with a finite buffer.
  78. When my buffer fills up, I have my parent Producer pause until my buffer
  79. has room in it again.
  80. """
  81. # Copies much from abstract.FileDescriptor
  82. bufferSize = 2**2**2**2
  83. producerPaused = False
  84. unregistered = False
  85. def pauseProducing(self):
  86. # Does *not* call up to ProducerConsumerProxy to relay the pause
  87. # message through to my parent Producer.
  88. self.paused = True
  89. def resumeProducing(self):
  90. self.paused = False
  91. if self._buffer:
  92. data = "".join(self._buffer)
  93. bytesSent = self._writeSomeData(data)
  94. if bytesSent < len(data):
  95. unsent = data[bytesSent:]
  96. assert (
  97. not self.iAmStreaming
  98. ), "Streaming producer did not write all its data."
  99. self._buffer[:] = [unsent]
  100. else:
  101. self._buffer[:] = []
  102. else:
  103. bytesSent = 0
  104. if (
  105. self.unregistered
  106. and bytesSent
  107. and not self._buffer
  108. and self.consumer is not None
  109. ):
  110. self.consumer.unregisterProducer()
  111. if not self.iAmStreaming:
  112. self.outstandingPull = not bytesSent
  113. if self.producer is not None:
  114. bytesBuffered = sum(len(s) for s in self._buffer)
  115. # TODO: You can see here the potential for high and low
  116. # watermarks, where bufferSize would be the high mark when we
  117. # ask the upstream producer to pause, and we wouldn't have
  118. # it resume again until it hit the low mark. Or if producer
  119. # is Pull, maybe we'd like to pull from it as much as necessary
  120. # to keep our buffer full to the low mark, so we're never caught
  121. # without something to send.
  122. if self.producerPaused and (bytesBuffered < self.bufferSize):
  123. # Now that our buffer is empty,
  124. self.producerPaused = False
  125. self.producer.resumeProducing()
  126. elif self.outstandingPull:
  127. # I did not have any data to write in response to a pull,
  128. # so I'd better pull some myself.
  129. self.producer.resumeProducing()
  130. def write(self, data):
  131. if self.paused or (not self.iAmStreaming and not self.outstandingPull):
  132. # We could use that fifo queue here.
  133. self._buffer.append(data)
  134. elif self.consumer is not None:
  135. assert (
  136. not self._buffer
  137. ), "Writing fresh data to consumer before my buffer is empty!"
  138. # I'm going to use _writeSomeData here so that there is only one
  139. # path to self.consumer.write. But it doesn't actually make sense,
  140. # if I am streaming, for some data to not be all data. But maybe I
  141. # am not streaming, but I am writing here anyway, because there was
  142. # an earlier request for data which was not answered.
  143. bytesSent = self._writeSomeData(data)
  144. self.outstandingPull = False
  145. if not bytesSent == len(data):
  146. assert (
  147. not self.iAmStreaming
  148. ), "Streaming producer did not write all its data."
  149. self._buffer.append(data[bytesSent:])
  150. if (self.producer is not None) and self.producerIsStreaming:
  151. bytesBuffered = sum(len(s) for s in self._buffer)
  152. if bytesBuffered >= self.bufferSize:
  153. self.producer.pauseProducing()
  154. self.producerPaused = True
  155. def registerProducer(self, producer, streaming):
  156. self.unregistered = False
  157. BasicProducerConsumerProxy.registerProducer(self, producer, streaming)
  158. if not streaming:
  159. producer.resumeProducing()
  160. def unregisterProducer(self):
  161. if self.producer is not None:
  162. del self.producer
  163. del self.producerIsStreaming
  164. self.unregistered = True
  165. if self.consumer and not self._buffer:
  166. self.consumer.unregisterProducer()
  167. def _writeSomeData(self, data):
  168. """Write as much of this data as possible.
  169. @returns: The number of bytes written.
  170. """
  171. if self.consumer is None:
  172. return 0
  173. self.consumer.write(data)
  174. return len(data)