123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103 |
- /* global TransformStream */
- export function transformStream(readable, transformer, oncancel) {
- try {
- return readable.pipeThrough(new TransformStream(transformer));
- } catch (e) {
- const reader = readable.getReader();
- return new ReadableStream({
- start(controller) {
- if (transformer.start) {
- return transformer.start(controller);
- }
- },
- async pull(controller) {
- let enqueued = false;
- const wrappedController = {
- enqueue(d) {
- enqueued = true;
- controller.enqueue(d);
- }
- };
- while (!enqueued) {
- const data = await reader.read();
- if (data.done) {
- if (transformer.flush) {
- await transformer.flush(controller);
- }
- return controller.close();
- }
- await transformer.transform(data.value, wrappedController);
- }
- },
- cancel(reason) {
- readable.cancel(reason);
- if (oncancel) {
- oncancel(reason);
- }
- }
- });
- }
- }
- class BlobStreamController {
- constructor(blob, size) {
- this.blob = blob;
- this.index = 0;
- this.chunkSize = size || 1024 * 64;
- }
- pull(controller) {
- return new Promise((resolve, reject) => {
- const bytesLeft = this.blob.size - this.index;
- if (bytesLeft <= 0) {
- controller.close();
- return resolve();
- }
- const size = Math.min(this.chunkSize, bytesLeft);
- const slice = this.blob.slice(this.index, this.index + size);
- const reader = new FileReader();
- reader.onload = () => {
- controller.enqueue(new Uint8Array(reader.result));
- resolve();
- };
- reader.onerror = reject;
- reader.readAsArrayBuffer(slice);
- this.index += size;
- });
- }
- }
- export function blobStream(blob, size) {
- return new ReadableStream(new BlobStreamController(blob, size));
- }
- class ConcatStreamController {
- constructor(streams) {
- this.streams = streams;
- this.index = 0;
- this.reader = null;
- this.nextReader();
- }
- nextReader() {
- const next = this.streams[this.index++];
- this.reader = next && next.getReader();
- }
- async pull(controller) {
- if (!this.reader) {
- return controller.close();
- }
- const data = await this.reader.read();
- if (data.done) {
- this.nextReader();
- return this.pull(controller);
- }
- controller.enqueue(data.value);
- }
- }
- export function concatStream(streams) {
- return new ReadableStream(new ConcatStreamController(streams));
- }
|