stream.py 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100
  1. """
  2. Buffer byte streams.
  3. """
  4. from itertools import count
  5. from typing import Dict, Iterator, List, TypeVar
  6. from attrs import Factory, define
  7. from twisted.protocols.amp import AMP, Command, Integer, String as Bytes
  8. T = TypeVar("T")
  9. class StreamOpen(Command):
  10. """
  11. Open a new stream.
  12. """
  13. response = [(b"streamId", Integer())]
  14. class StreamWrite(Command):
  15. """
  16. Write a chunk of data to a stream.
  17. """
  18. arguments = [
  19. (b"streamId", Integer()),
  20. (b"data", Bytes()),
  21. ]
  22. @define
  23. class StreamReceiver:
  24. """
  25. Buffering de-multiplexing byte stream receiver.
  26. """
  27. _counter: Iterator[int] = count()
  28. _streams: Dict[int, List[bytes]] = Factory(dict)
  29. def open(self) -> int:
  30. """
  31. Open a new stream and return its unique identifier.
  32. """
  33. newId = next(self._counter)
  34. self._streams[newId] = []
  35. return newId
  36. def write(self, streamId: int, chunk: bytes) -> None:
  37. """
  38. Write to an open stream using its unique identifier.
  39. @raise KeyError: If there is no such open stream.
  40. """
  41. self._streams[streamId].append(chunk)
  42. def finish(self, streamId: int) -> List[bytes]:
  43. """
  44. Indicate an open stream may receive no further data and return all of
  45. its current contents.
  46. @raise KeyError: If there is no such open stream.
  47. """
  48. return self._streams.pop(streamId)
  49. def chunk(data: bytes, chunkSize: int) -> Iterator[bytes]:
  50. """
  51. Break a byte string into pieces of no more than ``chunkSize`` length.
  52. @param data: The byte string.
  53. @param chunkSize: The maximum length of the resulting pieces. All pieces
  54. except possibly the last will be this length.
  55. @return: The pieces.
  56. """
  57. pos = 0
  58. while pos < len(data):
  59. yield data[pos : pos + chunkSize]
  60. pos += chunkSize
  61. async def stream(amp: AMP, chunks: Iterator[bytes]) -> int:
  62. """
  63. Send the given stream chunks, one by one, over the given connection.
  64. The chunks are sent using L{StreamWrite} over a stream opened using
  65. L{StreamOpen}.
  66. @return: The identifier of the stream over which the chunks were sent.
  67. """
  68. streamId = (await amp.callRemote(StreamOpen))["streamId"]
  69. assert isinstance(streamId, int)
  70. for oneChunk in chunks:
  71. await amp.callRemote(StreamWrite, streamId=streamId, data=oneChunk)
  72. return streamId