streams.js 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103
  1. /* global TransformStream */
  2. export function transformStream(readable, transformer, oncancel) {
  3. try {
  4. return readable.pipeThrough(new TransformStream(transformer));
  5. } catch (e) {
  6. const reader = readable.getReader();
  7. return new ReadableStream({
  8. start(controller) {
  9. if (transformer.start) {
  10. return transformer.start(controller);
  11. }
  12. },
  13. async pull(controller) {
  14. let enqueued = false;
  15. const wrappedController = {
  16. enqueue(d) {
  17. enqueued = true;
  18. controller.enqueue(d);
  19. }
  20. };
  21. while (!enqueued) {
  22. const data = await reader.read();
  23. if (data.done) {
  24. if (transformer.flush) {
  25. await transformer.flush(controller);
  26. }
  27. return controller.close();
  28. }
  29. await transformer.transform(data.value, wrappedController);
  30. }
  31. },
  32. cancel(reason) {
  33. readable.cancel(reason);
  34. if (oncancel) {
  35. oncancel(reason);
  36. }
  37. }
  38. });
  39. }
  40. }
  41. class BlobStreamController {
  42. constructor(blob, size) {
  43. this.blob = blob;
  44. this.index = 0;
  45. this.chunkSize = size || 1024 * 64;
  46. }
  47. pull(controller) {
  48. return new Promise((resolve, reject) => {
  49. const bytesLeft = this.blob.size - this.index;
  50. if (bytesLeft <= 0) {
  51. controller.close();
  52. return resolve();
  53. }
  54. const size = Math.min(this.chunkSize, bytesLeft);
  55. const slice = this.blob.slice(this.index, this.index + size);
  56. const reader = new FileReader();
  57. reader.onload = () => {
  58. controller.enqueue(new Uint8Array(reader.result));
  59. resolve();
  60. };
  61. reader.onerror = reject;
  62. reader.readAsArrayBuffer(slice);
  63. this.index += size;
  64. });
  65. }
  66. }
  67. export function blobStream(blob, size) {
  68. return new ReadableStream(new BlobStreamController(blob, size));
  69. }
  70. class ConcatStreamController {
  71. constructor(streams) {
  72. this.streams = streams;
  73. this.index = 0;
  74. this.reader = null;
  75. this.nextReader();
  76. }
  77. nextReader() {
  78. const next = this.streams[this.index++];
  79. this.reader = next && next.getReader();
  80. }
  81. async pull(controller) {
  82. if (!this.reader) {
  83. return controller.close();
  84. }
  85. const data = await this.reader.read();
  86. if (data.done) {
  87. this.nextReader();
  88. return this.pull(controller);
  89. }
  90. controller.enqueue(data.value);
  91. }
  92. }
  93. export function concatStream(streams) {
  94. return new ReadableStream(new ConcatStreamController(streams));
  95. }