123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212 |
- /**
- * Copyright (c) 2016-present, Gregory Szorc
- * All rights reserved.
- *
- * This software may be modified and distributed under the terms
- * of the BSD license. See the LICENSE file for details.
- */
- #include "python-zstandard.h"
- #define min(a, b) (((a) < (b)) ? (a) : (b))
- extern PyObject *ZstdError;
- static void ZstdCompressorIterator_dealloc(ZstdCompressorIterator *self) {
- Py_XDECREF(self->readResult);
- Py_XDECREF(self->compressor);
- Py_XDECREF(self->reader);
- if (self->buffer.buf) {
- PyBuffer_Release(&self->buffer);
- memset(&self->buffer, 0, sizeof(self->buffer));
- }
- if (self->output.dst) {
- PyMem_Free(self->output.dst);
- self->output.dst = NULL;
- }
- PyObject_Del(self);
- }
- static PyObject *ZstdCompressorIterator_iter(PyObject *self) {
- Py_INCREF(self);
- return self;
- }
- static PyObject *ZstdCompressorIterator_iternext(ZstdCompressorIterator *self) {
- size_t zresult;
- PyObject *readResult = NULL;
- PyObject *chunk;
- char *readBuffer;
- Py_ssize_t readSize = 0;
- Py_ssize_t bufferRemaining;
- if (self->finishedOutput) {
- PyErr_SetString(PyExc_StopIteration, "output flushed");
- return NULL;
- }
- feedcompressor:
- /* If we have data left in the input, consume it. */
- if (self->input.pos < self->input.size) {
- Py_BEGIN_ALLOW_THREADS zresult =
- ZSTD_compressStream2(self->compressor->cctx, &self->output,
- &self->input, ZSTD_e_continue);
- Py_END_ALLOW_THREADS
- /* Release the Python object holding the input buffer. */
- if (self->input.pos == self->input.size) {
- self->input.src = NULL;
- self->input.pos = 0;
- self->input.size = 0;
- Py_DECREF(self->readResult);
- self->readResult = NULL;
- }
- if (ZSTD_isError(zresult)) {
- PyErr_Format(ZstdError, "zstd compress error: %s",
- ZSTD_getErrorName(zresult));
- return NULL;
- }
- /* If it produced output data, emit it. */
- if (self->output.pos) {
- chunk =
- PyBytes_FromStringAndSize(self->output.dst, self->output.pos);
- self->output.pos = 0;
- return chunk;
- }
- }
- /* We should never have output data sitting around after a previous call. */
- assert(self->output.pos == 0);
- /* The code above should have either emitted a chunk and returned or
- consumed the entire input buffer. So the state of the input buffer is not
- relevant. */
- if (!self->finishedInput) {
- if (self->reader) {
- readResult =
- PyObject_CallMethod(self->reader, "read", "I", self->inSize);
- if (!readResult) {
- return NULL;
- }
- PyBytes_AsStringAndSize(readResult, &readBuffer, &readSize);
- }
- else {
- assert(self->buffer.buf);
- /* Only support contiguous C arrays. */
- assert(self->buffer.strides == NULL &&
- self->buffer.suboffsets == NULL);
- assert(self->buffer.itemsize == 1);
- readBuffer = (char *)self->buffer.buf + self->bufferOffset;
- bufferRemaining = self->buffer.len - self->bufferOffset;
- readSize = min(bufferRemaining, (Py_ssize_t)self->inSize);
- self->bufferOffset += readSize;
- }
- if (0 == readSize) {
- Py_XDECREF(readResult);
- self->finishedInput = 1;
- }
- else {
- self->readResult = readResult;
- }
- }
- /* EOF */
- if (0 == readSize) {
- self->input.src = NULL;
- self->input.size = 0;
- self->input.pos = 0;
- zresult = ZSTD_compressStream2(self->compressor->cctx, &self->output,
- &self->input, ZSTD_e_end);
- if (ZSTD_isError(zresult)) {
- PyErr_Format(ZstdError, "error ending compression stream: %s",
- ZSTD_getErrorName(zresult));
- return NULL;
- }
- assert(self->output.pos);
- if (0 == zresult) {
- self->finishedOutput = 1;
- }
- chunk = PyBytes_FromStringAndSize(self->output.dst, self->output.pos);
- self->output.pos = 0;
- return chunk;
- }
- /* New data from reader. Feed into compressor. */
- self->input.src = readBuffer;
- self->input.size = readSize;
- self->input.pos = 0;
- Py_BEGIN_ALLOW_THREADS zresult = ZSTD_compressStream2(
- self->compressor->cctx, &self->output, &self->input, ZSTD_e_continue);
- Py_END_ALLOW_THREADS
- /* The input buffer currently points to memory managed by Python
- (readBuffer). This object was allocated by this function. If it wasn't
- fully consumed, we need to release it in a subsequent function call.
- If it is fully consumed, do that now.
- */
- if (self->input.pos == self->input.size) {
- self->input.src = NULL;
- self->input.pos = 0;
- self->input.size = 0;
- Py_XDECREF(self->readResult);
- self->readResult = NULL;
- }
- if (ZSTD_isError(zresult)) {
- PyErr_Format(ZstdError, "zstd compress error: %s",
- ZSTD_getErrorName(zresult));
- return NULL;
- }
- assert(self->input.pos <= self->input.size);
- /* If we didn't write anything, start the process over. */
- if (0 == self->output.pos) {
- goto feedcompressor;
- }
- chunk = PyBytes_FromStringAndSize(self->output.dst, self->output.pos);
- self->output.pos = 0;
- return chunk;
- }
- PyType_Slot ZstdCompressorIteratorSlots[] = {
- {Py_tp_dealloc, ZstdCompressorIterator_dealloc},
- {Py_tp_iter, ZstdCompressorIterator_iter},
- {Py_tp_iternext, ZstdCompressorIterator_iternext},
- {Py_tp_new, PyType_GenericNew},
- {0, NULL},
- };
- PyType_Spec ZstdCompressorIteratorSpec = {
- "zstd.ZstdCompressorIterator",
- sizeof(ZstdCompressorIterator),
- 0,
- Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE,
- ZstdCompressorIteratorSlots,
- };
- PyTypeObject *ZstdCompressorIteratorType;
- void compressoriterator_module_init(PyObject *mod) {
- ZstdCompressorIteratorType =
- (PyTypeObject *)PyType_FromSpec(&ZstdCompressorIteratorSpec);
- if (PyType_Ready(ZstdCompressorIteratorType) < 0) {
- return;
- }
- }
|