1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828 |
- /**
- * Copyright (c) 2016-present, Gregory Szorc
- * All rights reserved.
- *
- * This software may be modified and distributed under the terms
- * of the BSD license. See the LICENSE file for details.
- */
- #include "python-zstandard.h"
- #include "pool.h"
- extern PyObject* ZstdError;
- /**
- * Ensure the ZSTD_DCtx on a decompressor is initiated and ready for a new operation.
- */
- int ensure_dctx(ZstdDecompressor* decompressor, int loadDict) {
- size_t zresult;
- ZSTD_DCtx_reset(decompressor->dctx, ZSTD_reset_session_only);
- if (decompressor->maxWindowSize) {
- zresult = ZSTD_DCtx_setMaxWindowSize(decompressor->dctx, decompressor->maxWindowSize);
- if (ZSTD_isError(zresult)) {
- PyErr_Format(ZstdError, "unable to set max window size: %s",
- ZSTD_getErrorName(zresult));
- return 1;
- }
- }
- zresult = ZSTD_DCtx_setFormat(decompressor->dctx, decompressor->format);
- if (ZSTD_isError(zresult)) {
- PyErr_Format(ZstdError, "unable to set decoding format: %s",
- ZSTD_getErrorName(zresult));
- return 1;
- }
- if (loadDict && decompressor->dict) {
- if (ensure_ddict(decompressor->dict)) {
- return 1;
- }
- zresult = ZSTD_DCtx_refDDict(decompressor->dctx, decompressor->dict->ddict);
- if (ZSTD_isError(zresult)) {
- PyErr_Format(ZstdError, "unable to reference prepared dictionary: %s",
- ZSTD_getErrorName(zresult));
- return 1;
- }
- }
- return 0;
- }
- PyDoc_STRVAR(Decompressor__doc__,
- "ZstdDecompressor(dict_data=None)\n"
- "\n"
- "Create an object used to perform Zstandard decompression.\n"
- "\n"
- "An instance can perform multiple decompression operations."
- );
- static int Decompressor_init(ZstdDecompressor* self, PyObject* args, PyObject* kwargs) {
- static char* kwlist[] = {
- "dict_data",
- "max_window_size",
- "format",
- NULL
- };
- ZstdCompressionDict* dict = NULL;
- Py_ssize_t maxWindowSize = 0;
- ZSTD_format_e format = ZSTD_f_zstd1;
- self->dctx = NULL;
- self->dict = NULL;
- if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|O!nI:ZstdDecompressor", kwlist,
- &ZstdCompressionDictType, &dict, &maxWindowSize, &format)) {
- return -1;
- }
- self->dctx = ZSTD_createDCtx();
- if (!self->dctx) {
- PyErr_NoMemory();
- goto except;
- }
- self->maxWindowSize = maxWindowSize;
- self->format = format;
- if (dict) {
- self->dict = dict;
- Py_INCREF(dict);
- }
- if (ensure_dctx(self, 1)) {
- goto except;
- }
- return 0;
- except:
- Py_CLEAR(self->dict);
- if (self->dctx) {
- ZSTD_freeDCtx(self->dctx);
- self->dctx = NULL;
- }
- return -1;
- }
- static void Decompressor_dealloc(ZstdDecompressor* self) {
- Py_CLEAR(self->dict);
- if (self->dctx) {
- ZSTD_freeDCtx(self->dctx);
- self->dctx = NULL;
- }
- PyObject_Del(self);
- }
- PyDoc_STRVAR(Decompressor_memory_size__doc__,
- "memory_size() -- Size of decompression context, in bytes\n"
- );
- static PyObject* Decompressor_memory_size(ZstdDecompressor* self) {
- if (self->dctx) {
- return PyLong_FromSize_t(ZSTD_sizeof_DCtx(self->dctx));
- }
- else {
- PyErr_SetString(ZstdError, "no decompressor context found; this should never happen");
- return NULL;
- }
- }
- PyDoc_STRVAR(Decompressor_copy_stream__doc__,
- "copy_stream(ifh, ofh[, read_size=default, write_size=default]) -- decompress data between streams\n"
- "\n"
- "Compressed data will be read from ``ifh``, decompressed, and written to\n"
- "``ofh``. ``ifh`` must have a ``read(size)`` method. ``ofh`` must have a\n"
- "``write(data)`` method.\n"
- "\n"
- "The optional ``read_size`` and ``write_size`` arguments control the chunk\n"
- "size of data that is ``read()`` and ``write()`` between streams. They default\n"
- "to the default input and output sizes of zstd decompressor streams.\n"
- );
- static PyObject* Decompressor_copy_stream(ZstdDecompressor* self, PyObject* args, PyObject* kwargs) {
- static char* kwlist[] = {
- "ifh",
- "ofh",
- "read_size",
- "write_size",
- NULL
- };
- PyObject* source;
- PyObject* dest;
- size_t inSize = ZSTD_DStreamInSize();
- size_t outSize = ZSTD_DStreamOutSize();
- ZSTD_inBuffer input;
- ZSTD_outBuffer output;
- Py_ssize_t totalRead = 0;
- Py_ssize_t totalWrite = 0;
- char* readBuffer;
- Py_ssize_t readSize;
- PyObject* readResult = NULL;
- PyObject* res = NULL;
- size_t zresult = 0;
- PyObject* writeResult;
- PyObject* totalReadPy;
- PyObject* totalWritePy;
- if (!PyArg_ParseTupleAndKeywords(args, kwargs, "OO|kk:copy_stream", kwlist,
- &source, &dest, &inSize, &outSize)) {
- return NULL;
- }
- if (!PyObject_HasAttrString(source, "read")) {
- PyErr_SetString(PyExc_ValueError, "first argument must have a read() method");
- return NULL;
- }
- if (!PyObject_HasAttrString(dest, "write")) {
- PyErr_SetString(PyExc_ValueError, "second argument must have a write() method");
- return NULL;
- }
- /* Prevent free on uninitialized memory in finally. */
- output.dst = NULL;
- if (ensure_dctx(self, 1)) {
- res = NULL;
- goto finally;
- }
- output.dst = PyMem_Malloc(outSize);
- if (!output.dst) {
- PyErr_NoMemory();
- res = NULL;
- goto finally;
- }
- output.size = outSize;
- output.pos = 0;
- /* Read source stream until EOF */
- while (1) {
- readResult = PyObject_CallMethod(source, "read", "n", inSize);
- if (!readResult) {
- PyErr_SetString(ZstdError, "could not read() from source");
- goto finally;
- }
- PyBytes_AsStringAndSize(readResult, &readBuffer, &readSize);
- /* If no data was read, we're at EOF. */
- if (0 == readSize) {
- break;
- }
- totalRead += readSize;
- /* Send data to decompressor */
- input.src = readBuffer;
- input.size = readSize;
- input.pos = 0;
- while (input.pos < input.size) {
- Py_BEGIN_ALLOW_THREADS
- zresult = ZSTD_decompressStream(self->dctx, &output, &input);
- Py_END_ALLOW_THREADS
- if (ZSTD_isError(zresult)) {
- PyErr_Format(ZstdError, "zstd decompressor error: %s",
- ZSTD_getErrorName(zresult));
- res = NULL;
- goto finally;
- }
- if (output.pos) {
- #if PY_MAJOR_VERSION >= 3
- writeResult = PyObject_CallMethod(dest, "write", "y#",
- #else
- writeResult = PyObject_CallMethod(dest, "write", "s#",
- #endif
- output.dst, output.pos);
- Py_XDECREF(writeResult);
- totalWrite += output.pos;
- output.pos = 0;
- }
- }
- Py_CLEAR(readResult);
- }
- /* Source stream is exhausted. Finish up. */
- totalReadPy = PyLong_FromSsize_t(totalRead);
- totalWritePy = PyLong_FromSsize_t(totalWrite);
- res = PyTuple_Pack(2, totalReadPy, totalWritePy);
- Py_DECREF(totalReadPy);
- Py_DECREF(totalWritePy);
- finally:
- if (output.dst) {
- PyMem_Free(output.dst);
- }
- Py_XDECREF(readResult);
- return res;
- }
- PyDoc_STRVAR(Decompressor_decompress__doc__,
- "decompress(data[, max_output_size=None]) -- Decompress data in its entirety\n"
- "\n"
- "This method will decompress the entirety of the argument and return the\n"
- "result.\n"
- "\n"
- "The input bytes are expected to contain a full Zstandard frame (something\n"
- "compressed with ``ZstdCompressor.compress()`` or similar). If the input does\n"
- "not contain a full frame, an exception will be raised.\n"
- "\n"
- "If the frame header of the compressed data does not contain the content size\n"
- "``max_output_size`` must be specified or ``ZstdError`` will be raised. An\n"
- "allocation of size ``max_output_size`` will be performed and an attempt will\n"
- "be made to perform decompression into that buffer. If the buffer is too\n"
- "small or cannot be allocated, ``ZstdError`` will be raised. The buffer will\n"
- "be resized if it is too large.\n"
- "\n"
- "Uncompressed data could be much larger than compressed data. As a result,\n"
- "calling this function could result in a very large memory allocation being\n"
- "performed to hold the uncompressed data. Therefore it is **highly**\n"
- "recommended to use a streaming decompression method instead of this one.\n"
- );
- PyObject* Decompressor_decompress(ZstdDecompressor* self, PyObject* args, PyObject* kwargs) {
- static char* kwlist[] = {
- "data",
- "max_output_size",
- NULL
- };
- Py_buffer source;
- Py_ssize_t maxOutputSize = 0;
- unsigned long long decompressedSize;
- size_t destCapacity;
- PyObject* result = NULL;
- size_t zresult;
- ZSTD_outBuffer outBuffer;
- ZSTD_inBuffer inBuffer;
- #if PY_MAJOR_VERSION >= 3
- if (!PyArg_ParseTupleAndKeywords(args, kwargs, "y*|n:decompress",
- #else
- if (!PyArg_ParseTupleAndKeywords(args, kwargs, "s*|n:decompress",
- #endif
- kwlist, &source, &maxOutputSize)) {
- return NULL;
- }
- if (!PyBuffer_IsContiguous(&source, 'C') || source.ndim > 1) {
- PyErr_SetString(PyExc_ValueError,
- "data buffer should be contiguous and have at most one dimension");
- goto finally;
- }
- if (ensure_dctx(self, 1)) {
- goto finally;
- }
- decompressedSize = ZSTD_getFrameContentSize(source.buf, source.len);
- if (ZSTD_CONTENTSIZE_ERROR == decompressedSize) {
- PyErr_SetString(ZstdError, "error determining content size from frame header");
- goto finally;
- }
- /* Special case of empty frame. */
- else if (0 == decompressedSize) {
- result = PyBytes_FromStringAndSize("", 0);
- goto finally;
- }
- /* Missing content size in frame header. */
- if (ZSTD_CONTENTSIZE_UNKNOWN == decompressedSize) {
- if (0 == maxOutputSize) {
- PyErr_SetString(ZstdError, "could not determine content size in frame header");
- goto finally;
- }
- result = PyBytes_FromStringAndSize(NULL, maxOutputSize);
- destCapacity = maxOutputSize;
- decompressedSize = 0;
- }
- /* Size is recorded in frame header. */
- else {
- assert(SIZE_MAX >= PY_SSIZE_T_MAX);
- if (decompressedSize > PY_SSIZE_T_MAX) {
- PyErr_SetString(ZstdError, "frame is too large to decompress on this platform");
- goto finally;
- }
- result = PyBytes_FromStringAndSize(NULL, (Py_ssize_t)decompressedSize);
- destCapacity = (size_t)decompressedSize;
- }
- if (!result) {
- goto finally;
- }
- outBuffer.dst = PyBytes_AsString(result);
- outBuffer.size = destCapacity;
- outBuffer.pos = 0;
- inBuffer.src = source.buf;
- inBuffer.size = source.len;
- inBuffer.pos = 0;
- Py_BEGIN_ALLOW_THREADS
- zresult = ZSTD_decompressStream(self->dctx, &outBuffer, &inBuffer);
- Py_END_ALLOW_THREADS
- if (ZSTD_isError(zresult)) {
- PyErr_Format(ZstdError, "decompression error: %s", ZSTD_getErrorName(zresult));
- Py_CLEAR(result);
- goto finally;
- }
- else if (zresult) {
- PyErr_Format(ZstdError, "decompression error: did not decompress full frame");
- Py_CLEAR(result);
- goto finally;
- }
- else if (decompressedSize && outBuffer.pos != decompressedSize) {
- PyErr_Format(ZstdError, "decompression error: decompressed %zu bytes; expected %llu",
- zresult, decompressedSize);
- Py_CLEAR(result);
- goto finally;
- }
- else if (outBuffer.pos < destCapacity) {
- if (safe_pybytes_resize(&result, outBuffer.pos)) {
- Py_CLEAR(result);
- goto finally;
- }
- }
- finally:
- PyBuffer_Release(&source);
- return result;
- }
- PyDoc_STRVAR(Decompressor_decompressobj__doc__,
- "decompressobj([write_size=default])\n"
- "\n"
- "Incrementally feed data into a decompressor.\n"
- "\n"
- "The returned object exposes a ``decompress(data)`` method. This makes it\n"
- "compatible with ``zlib.decompressobj`` and ``bz2.BZ2Decompressor`` so that\n"
- "callers can swap in the zstd decompressor while using the same API.\n"
- );
- static ZstdDecompressionObj* Decompressor_decompressobj(ZstdDecompressor* self, PyObject* args, PyObject* kwargs) {
- static char* kwlist[] = {
- "write_size",
- NULL
- };
- ZstdDecompressionObj* result = NULL;
- size_t outSize = ZSTD_DStreamOutSize();
- if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|k:decompressobj", kwlist, &outSize)) {
- return NULL;
- }
- if (!outSize) {
- PyErr_SetString(PyExc_ValueError, "write_size must be positive");
- return NULL;
- }
- result = (ZstdDecompressionObj*)PyObject_CallObject((PyObject*)&ZstdDecompressionObjType, NULL);
- if (!result) {
- return NULL;
- }
- if (ensure_dctx(self, 1)) {
- Py_DECREF(result);
- return NULL;
- }
- result->decompressor = self;
- Py_INCREF(result->decompressor);
- result->outSize = outSize;
- return result;
- }
- PyDoc_STRVAR(Decompressor_read_to_iter__doc__,
- "read_to_iter(reader[, read_size=default, write_size=default, skip_bytes=0])\n"
- "Read compressed data and return an iterator\n"
- "\n"
- "Returns an iterator of decompressed data chunks produced from reading from\n"
- "the ``reader``.\n"
- "\n"
- "Compressed data will be obtained from ``reader`` by calling the\n"
- "``read(size)`` method of it. The source data will be streamed into a\n"
- "decompressor. As decompressed data is available, it will be exposed to the\n"
- "returned iterator.\n"
- "\n"
- "Data is ``read()`` in chunks of size ``read_size`` and exposed to the\n"
- "iterator in chunks of size ``write_size``. The default values are the input\n"
- "and output sizes for a zstd streaming decompressor.\n"
- "\n"
- "There is also support for skipping the first ``skip_bytes`` of data from\n"
- "the source.\n"
- );
- static ZstdDecompressorIterator* Decompressor_read_to_iter(ZstdDecompressor* self, PyObject* args, PyObject* kwargs) {
- static char* kwlist[] = {
- "reader",
- "read_size",
- "write_size",
- "skip_bytes",
- NULL
- };
- PyObject* reader;
- size_t inSize = ZSTD_DStreamInSize();
- size_t outSize = ZSTD_DStreamOutSize();
- ZstdDecompressorIterator* result;
- size_t skipBytes = 0;
- if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O|kkk:read_to_iter", kwlist,
- &reader, &inSize, &outSize, &skipBytes)) {
- return NULL;
- }
- if (skipBytes >= inSize) {
- PyErr_SetString(PyExc_ValueError,
- "skip_bytes must be smaller than read_size");
- return NULL;
- }
- result = (ZstdDecompressorIterator*)PyObject_CallObject((PyObject*)&ZstdDecompressorIteratorType, NULL);
- if (!result) {
- return NULL;
- }
- if (PyObject_HasAttrString(reader, "read")) {
- result->reader = reader;
- Py_INCREF(result->reader);
- }
- else if (1 == PyObject_CheckBuffer(reader)) {
- /* Object claims it is a buffer. Try to get a handle to it. */
- if (0 != PyObject_GetBuffer(reader, &result->buffer, PyBUF_CONTIG_RO)) {
- goto except;
- }
- }
- else {
- PyErr_SetString(PyExc_ValueError,
- "must pass an object with a read() method or conforms to buffer protocol");
- goto except;
- }
- result->decompressor = self;
- Py_INCREF(result->decompressor);
- result->inSize = inSize;
- result->outSize = outSize;
- result->skipBytes = skipBytes;
- if (ensure_dctx(self, 1)) {
- goto except;
- }
- result->input.src = PyMem_Malloc(inSize);
- if (!result->input.src) {
- PyErr_NoMemory();
- goto except;
- }
- goto finally;
- except:
- Py_CLEAR(result);
- finally:
- return result;
- }
- PyDoc_STRVAR(Decompressor_stream_reader__doc__,
- "stream_reader(source, [read_size=default, [read_across_frames=False]])\n"
- "\n"
- "Obtain an object that behaves like an I/O stream that can be used for\n"
- "reading decompressed output from an object.\n"
- "\n"
- "The source object can be any object with a ``read(size)`` method or that\n"
- "conforms to the buffer protocol.\n"
- "\n"
- "``read_across_frames`` controls the behavior of ``read()`` when the end\n"
- "of a zstd frame is reached. When ``True``, ``read()`` can potentially\n"
- "return data belonging to multiple zstd frames. When ``False``, ``read()``\n"
- "will return when the end of a frame is reached.\n"
- );
- static ZstdDecompressionReader* Decompressor_stream_reader(ZstdDecompressor* self, PyObject* args, PyObject* kwargs) {
- static char* kwlist[] = {
- "source",
- "read_size",
- "read_across_frames",
- NULL
- };
- PyObject* source;
- size_t readSize = ZSTD_DStreamInSize();
- PyObject* readAcrossFrames = NULL;
- ZstdDecompressionReader* result;
- if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O|kO:stream_reader", kwlist,
- &source, &readSize, &readAcrossFrames)) {
- return NULL;
- }
- if (ensure_dctx(self, 1)) {
- return NULL;
- }
- result = (ZstdDecompressionReader*)PyObject_CallObject((PyObject*)&ZstdDecompressionReaderType, NULL);
- if (NULL == result) {
- return NULL;
- }
- result->entered = 0;
- result->closed = 0;
- if (PyObject_HasAttrString(source, "read")) {
- result->reader = source;
- Py_INCREF(source);
- result->readSize = readSize;
- }
- else if (1 == PyObject_CheckBuffer(source)) {
- if (0 != PyObject_GetBuffer(source, &result->buffer, PyBUF_CONTIG_RO)) {
- Py_CLEAR(result);
- return NULL;
- }
- }
- else {
- PyErr_SetString(PyExc_TypeError,
- "must pass an object with a read() method or that conforms to the buffer protocol");
- Py_CLEAR(result);
- return NULL;
- }
- result->decompressor = self;
- Py_INCREF(self);
- result->readAcrossFrames = readAcrossFrames ? PyObject_IsTrue(readAcrossFrames) : 0;
- return result;
- }
- PyDoc_STRVAR(Decompressor_stream_writer__doc__,
- "Create a context manager to write decompressed data to an object.\n"
- "\n"
- "The passed object must have a ``write()`` method.\n"
- "\n"
- "The caller feeds intput data to the object by calling ``write(data)``.\n"
- "Decompressed data is written to the argument given as it is decompressed.\n"
- "\n"
- "An optional ``write_size`` argument defines the size of chunks to\n"
- "``write()`` to the writer. It defaults to the default output size for a zstd\n"
- "streaming decompressor.\n"
- );
- static ZstdDecompressionWriter* Decompressor_stream_writer(ZstdDecompressor* self, PyObject* args, PyObject* kwargs) {
- static char* kwlist[] = {
- "writer",
- "write_size",
- "write_return_read",
- NULL
- };
- PyObject* writer;
- size_t outSize = ZSTD_DStreamOutSize();
- PyObject* writeReturnRead = NULL;
- ZstdDecompressionWriter* result;
- if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O|kO:stream_writer", kwlist,
- &writer, &outSize, &writeReturnRead)) {
- return NULL;
- }
- if (!PyObject_HasAttrString(writer, "write")) {
- PyErr_SetString(PyExc_ValueError, "must pass an object with a write() method");
- return NULL;
- }
- if (ensure_dctx(self, 1)) {
- return NULL;
- }
- result = (ZstdDecompressionWriter*)PyObject_CallObject((PyObject*)&ZstdDecompressionWriterType, NULL);
- if (!result) {
- return NULL;
- }
- result->entered = 0;
- result->closed = 0;
- result->decompressor = self;
- Py_INCREF(result->decompressor);
- result->writer = writer;
- Py_INCREF(result->writer);
- result->outSize = outSize;
- result->writeReturnRead = writeReturnRead ? PyObject_IsTrue(writeReturnRead) : 0;
- return result;
- }
- PyDoc_STRVAR(Decompressor_decompress_content_dict_chain__doc__,
- "Decompress a series of chunks using the content dictionary chaining technique\n"
- );
- static PyObject* Decompressor_decompress_content_dict_chain(ZstdDecompressor* self, PyObject* args, PyObject* kwargs) {
- static char* kwlist[] = {
- "frames",
- NULL
- };
- PyObject* chunks;
- Py_ssize_t chunksLen;
- Py_ssize_t chunkIndex;
- char parity = 0;
- PyObject* chunk;
- char* chunkData;
- Py_ssize_t chunkSize;
- size_t zresult;
- ZSTD_frameHeader frameHeader;
- void* buffer1 = NULL;
- size_t buffer1Size = 0;
- size_t buffer1ContentSize = 0;
- void* buffer2 = NULL;
- size_t buffer2Size = 0;
- size_t buffer2ContentSize = 0;
- void* destBuffer = NULL;
- PyObject* result = NULL;
- ZSTD_outBuffer outBuffer;
- ZSTD_inBuffer inBuffer;
- if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O!:decompress_content_dict_chain",
- kwlist, &PyList_Type, &chunks)) {
- return NULL;
- }
- chunksLen = PyList_Size(chunks);
- if (!chunksLen) {
- PyErr_SetString(PyExc_ValueError, "empty input chain");
- return NULL;
- }
- /* The first chunk should not be using a dictionary. We handle it specially. */
- chunk = PyList_GetItem(chunks, 0);
- if (!PyBytes_Check(chunk)) {
- PyErr_SetString(PyExc_ValueError, "chunk 0 must be bytes");
- return NULL;
- }
- /* We require that all chunks be zstd frames and that they have content size set. */
- PyBytes_AsStringAndSize(chunk, &chunkData, &chunkSize);
- zresult = ZSTD_getFrameHeader(&frameHeader, (void*)chunkData, chunkSize);
- if (ZSTD_isError(zresult)) {
- PyErr_SetString(PyExc_ValueError, "chunk 0 is not a valid zstd frame");
- return NULL;
- }
- else if (zresult) {
- PyErr_SetString(PyExc_ValueError, "chunk 0 is too small to contain a zstd frame");
- return NULL;
- }
- if (ZSTD_CONTENTSIZE_UNKNOWN == frameHeader.frameContentSize) {
- PyErr_SetString(PyExc_ValueError, "chunk 0 missing content size in frame");
- return NULL;
- }
- assert(ZSTD_CONTENTSIZE_ERROR != frameHeader.frameContentSize);
- /* We check against PY_SSIZE_T_MAX here because we ultimately cast the
- * result to a Python object and it's length can be no greater than
- * Py_ssize_t. In theory, we could have an intermediate frame that is
- * larger. But a) why would this API be used for frames that large b)
- * it isn't worth the complexity to support. */
- assert(SIZE_MAX >= PY_SSIZE_T_MAX);
- if (frameHeader.frameContentSize > PY_SSIZE_T_MAX) {
- PyErr_SetString(PyExc_ValueError,
- "chunk 0 is too large to decompress on this platform");
- return NULL;
- }
- if (ensure_dctx(self, 0)) {
- goto finally;
- }
- buffer1Size = (size_t)frameHeader.frameContentSize;
- buffer1 = PyMem_Malloc(buffer1Size);
- if (!buffer1) {
- goto finally;
- }
- outBuffer.dst = buffer1;
- outBuffer.size = buffer1Size;
- outBuffer.pos = 0;
- inBuffer.src = chunkData;
- inBuffer.size = chunkSize;
- inBuffer.pos = 0;
- Py_BEGIN_ALLOW_THREADS
- zresult = ZSTD_decompressStream(self->dctx, &outBuffer, &inBuffer);
- Py_END_ALLOW_THREADS
- if (ZSTD_isError(zresult)) {
- PyErr_Format(ZstdError, "could not decompress chunk 0: %s", ZSTD_getErrorName(zresult));
- goto finally;
- }
- else if (zresult) {
- PyErr_Format(ZstdError, "chunk 0 did not decompress full frame");
- goto finally;
- }
- buffer1ContentSize = outBuffer.pos;
- /* Special case of a simple chain. */
- if (1 == chunksLen) {
- result = PyBytes_FromStringAndSize(buffer1, buffer1Size);
- goto finally;
- }
- /* This should ideally look at next chunk. But this is slightly simpler. */
- buffer2Size = (size_t)frameHeader.frameContentSize;
- buffer2 = PyMem_Malloc(buffer2Size);
- if (!buffer2) {
- goto finally;
- }
- /* For each subsequent chunk, use the previous fulltext as a content dictionary.
- Our strategy is to have 2 buffers. One holds the previous fulltext (to be
- used as a content dictionary) and the other holds the new fulltext. The
- buffers grow when needed but never decrease in size. This limits the
- memory allocator overhead.
- */
- for (chunkIndex = 1; chunkIndex < chunksLen; chunkIndex++) {
- chunk = PyList_GetItem(chunks, chunkIndex);
- if (!PyBytes_Check(chunk)) {
- PyErr_Format(PyExc_ValueError, "chunk %zd must be bytes", chunkIndex);
- goto finally;
- }
- PyBytes_AsStringAndSize(chunk, &chunkData, &chunkSize);
- zresult = ZSTD_getFrameHeader(&frameHeader, (void*)chunkData, chunkSize);
- if (ZSTD_isError(zresult)) {
- PyErr_Format(PyExc_ValueError, "chunk %zd is not a valid zstd frame", chunkIndex);
- goto finally;
- }
- else if (zresult) {
- PyErr_Format(PyExc_ValueError, "chunk %zd is too small to contain a zstd frame", chunkIndex);
- goto finally;
- }
- if (ZSTD_CONTENTSIZE_UNKNOWN == frameHeader.frameContentSize) {
- PyErr_Format(PyExc_ValueError, "chunk %zd missing content size in frame", chunkIndex);
- goto finally;
- }
- assert(ZSTD_CONTENTSIZE_ERROR != frameHeader.frameContentSize);
- if (frameHeader.frameContentSize > PY_SSIZE_T_MAX) {
- PyErr_Format(PyExc_ValueError,
- "chunk %zd is too large to decompress on this platform", chunkIndex);
- goto finally;
- }
- inBuffer.src = chunkData;
- inBuffer.size = chunkSize;
- inBuffer.pos = 0;
- parity = chunkIndex % 2;
- /* This could definitely be abstracted to reduce code duplication. */
- if (parity) {
- /* Resize destination buffer to hold larger content. */
- if (buffer2Size < frameHeader.frameContentSize) {
- buffer2Size = (size_t)frameHeader.frameContentSize;
- destBuffer = PyMem_Realloc(buffer2, buffer2Size);
- if (!destBuffer) {
- goto finally;
- }
- buffer2 = destBuffer;
- }
- Py_BEGIN_ALLOW_THREADS
- zresult = ZSTD_DCtx_refPrefix_advanced(self->dctx,
- buffer1, buffer1ContentSize, ZSTD_dct_rawContent);
- Py_END_ALLOW_THREADS
- if (ZSTD_isError(zresult)) {
- PyErr_Format(ZstdError,
- "failed to load prefix dictionary at chunk %zd", chunkIndex);
- goto finally;
- }
- outBuffer.dst = buffer2;
- outBuffer.size = buffer2Size;
- outBuffer.pos = 0;
- Py_BEGIN_ALLOW_THREADS
- zresult = ZSTD_decompressStream(self->dctx, &outBuffer, &inBuffer);
- Py_END_ALLOW_THREADS
- if (ZSTD_isError(zresult)) {
- PyErr_Format(ZstdError, "could not decompress chunk %zd: %s",
- chunkIndex, ZSTD_getErrorName(zresult));
- goto finally;
- }
- else if (zresult) {
- PyErr_Format(ZstdError, "chunk %zd did not decompress full frame",
- chunkIndex);
- goto finally;
- }
- buffer2ContentSize = outBuffer.pos;
- }
- else {
- if (buffer1Size < frameHeader.frameContentSize) {
- buffer1Size = (size_t)frameHeader.frameContentSize;
- destBuffer = PyMem_Realloc(buffer1, buffer1Size);
- if (!destBuffer) {
- goto finally;
- }
- buffer1 = destBuffer;
- }
- Py_BEGIN_ALLOW_THREADS
- zresult = ZSTD_DCtx_refPrefix_advanced(self->dctx,
- buffer2, buffer2ContentSize, ZSTD_dct_rawContent);
- Py_END_ALLOW_THREADS
- if (ZSTD_isError(zresult)) {
- PyErr_Format(ZstdError,
- "failed to load prefix dictionary at chunk %zd", chunkIndex);
- goto finally;
- }
- outBuffer.dst = buffer1;
- outBuffer.size = buffer1Size;
- outBuffer.pos = 0;
- Py_BEGIN_ALLOW_THREADS
- zresult = ZSTD_decompressStream(self->dctx, &outBuffer, &inBuffer);
- Py_END_ALLOW_THREADS
- if (ZSTD_isError(zresult)) {
- PyErr_Format(ZstdError, "could not decompress chunk %zd: %s",
- chunkIndex, ZSTD_getErrorName(zresult));
- goto finally;
- }
- else if (zresult) {
- PyErr_Format(ZstdError, "chunk %zd did not decompress full frame",
- chunkIndex);
- goto finally;
- }
- buffer1ContentSize = outBuffer.pos;
- }
- }
- result = PyBytes_FromStringAndSize(parity ? buffer2 : buffer1,
- parity ? buffer2ContentSize : buffer1ContentSize);
- finally:
- if (buffer2) {
- PyMem_Free(buffer2);
- }
- if (buffer1) {
- PyMem_Free(buffer1);
- }
- return result;
- }
- typedef struct {
- void* sourceData;
- size_t sourceSize;
- size_t destSize;
- } FramePointer;
- typedef struct {
- FramePointer* frames;
- Py_ssize_t framesSize;
- unsigned long long compressedSize;
- } FrameSources;
- typedef struct {
- void* dest;
- Py_ssize_t destSize;
- BufferSegment* segments;
- Py_ssize_t segmentsSize;
- } DestBuffer;
- typedef enum {
- WorkerError_none = 0,
- WorkerError_zstd = 1,
- WorkerError_memory = 2,
- WorkerError_sizeMismatch = 3,
- WorkerError_unknownSize = 4,
- } WorkerError;
- typedef struct {
- /* Source records and length */
- FramePointer* framePointers;
- /* Which records to process. */
- Py_ssize_t startOffset;
- Py_ssize_t endOffset;
- unsigned long long totalSourceSize;
- /* Compression state and settings. */
- ZSTD_DCtx* dctx;
- int requireOutputSizes;
- /* Output storage. */
- DestBuffer* destBuffers;
- Py_ssize_t destCount;
- /* Item that error occurred on. */
- Py_ssize_t errorOffset;
- /* If an error occurred. */
- WorkerError error;
- /* result from zstd decompression operation */
- size_t zresult;
- } WorkerState;
- static void decompress_worker(WorkerState* state) {
- size_t allocationSize;
- DestBuffer* destBuffer;
- Py_ssize_t frameIndex;
- Py_ssize_t localOffset = 0;
- Py_ssize_t currentBufferStartIndex = state->startOffset;
- Py_ssize_t remainingItems = state->endOffset - state->startOffset + 1;
- void* tmpBuf;
- Py_ssize_t destOffset = 0;
- FramePointer* framePointers = state->framePointers;
- size_t zresult;
- unsigned long long totalOutputSize = 0;
- assert(NULL == state->destBuffers);
- assert(0 == state->destCount);
- assert(state->endOffset - state->startOffset >= 0);
- /* We could get here due to the way work is allocated. Ideally we wouldn't
- get here. But that would require a bit of a refactor in the caller. */
- if (state->totalSourceSize > SIZE_MAX) {
- state->error = WorkerError_memory;
- state->errorOffset = 0;
- return;
- }
- /*
- * We need to allocate a buffer to hold decompressed data. How we do this
- * depends on what we know about the output. The following scenarios are
- * possible:
- *
- * 1. All structs defining frames declare the output size.
- * 2. The decompressed size is embedded within the zstd frame.
- * 3. The decompressed size is not stored anywhere.
- *
- * For now, we only support #1 and #2.
- */
- /* Resolve ouput segments. */
- for (frameIndex = state->startOffset; frameIndex <= state->endOffset; frameIndex++) {
- FramePointer* fp = &framePointers[frameIndex];
- unsigned long long decompressedSize;
- if (0 == fp->destSize) {
- decompressedSize = ZSTD_getFrameContentSize(fp->sourceData, fp->sourceSize);
- if (ZSTD_CONTENTSIZE_ERROR == decompressedSize) {
- state->error = WorkerError_unknownSize;
- state->errorOffset = frameIndex;
- return;
- }
- else if (ZSTD_CONTENTSIZE_UNKNOWN == decompressedSize) {
- if (state->requireOutputSizes) {
- state->error = WorkerError_unknownSize;
- state->errorOffset = frameIndex;
- return;
- }
- /* This will fail the assert for .destSize > 0 below. */
- decompressedSize = 0;
- }
- if (decompressedSize > SIZE_MAX) {
- state->error = WorkerError_memory;
- state->errorOffset = frameIndex;
- return;
- }
- fp->destSize = (size_t)decompressedSize;
- }
- totalOutputSize += fp->destSize;
- }
- state->destBuffers = calloc(1, sizeof(DestBuffer));
- if (NULL == state->destBuffers) {
- state->error = WorkerError_memory;
- return;
- }
- state->destCount = 1;
- destBuffer = &state->destBuffers[state->destCount - 1];
- assert(framePointers[state->startOffset].destSize > 0); /* For now. */
- allocationSize = roundpow2((size_t)state->totalSourceSize);
- if (framePointers[state->startOffset].destSize > allocationSize) {
- allocationSize = roundpow2(framePointers[state->startOffset].destSize);
- }
- destBuffer->dest = malloc(allocationSize);
- if (NULL == destBuffer->dest) {
- state->error = WorkerError_memory;
- return;
- }
- destBuffer->destSize = allocationSize;
- destBuffer->segments = calloc(remainingItems, sizeof(BufferSegment));
- if (NULL == destBuffer->segments) {
- /* Caller will free state->dest as part of cleanup. */
- state->error = WorkerError_memory;
- return;
- }
- destBuffer->segmentsSize = remainingItems;
- for (frameIndex = state->startOffset; frameIndex <= state->endOffset; frameIndex++) {
- ZSTD_outBuffer outBuffer;
- ZSTD_inBuffer inBuffer;
- const void* source = framePointers[frameIndex].sourceData;
- const size_t sourceSize = framePointers[frameIndex].sourceSize;
- void* dest;
- const size_t decompressedSize = framePointers[frameIndex].destSize;
- size_t destAvailable = destBuffer->destSize - destOffset;
- assert(decompressedSize > 0); /* For now. */
- /*
- * Not enough space in current buffer. Finish current before and allocate and
- * switch to a new one.
- */
- if (decompressedSize > destAvailable) {
- /*
- * Shrinking the destination buffer is optional. But it should be cheap,
- * so we just do it.
- */
- if (destAvailable) {
- tmpBuf = realloc(destBuffer->dest, destOffset);
- if (NULL == tmpBuf) {
- state->error = WorkerError_memory;
- return;
- }
- destBuffer->dest = tmpBuf;
- destBuffer->destSize = destOffset;
- }
- /* Truncate segments buffer. */
- tmpBuf = realloc(destBuffer->segments,
- (frameIndex - currentBufferStartIndex) * sizeof(BufferSegment));
- if (NULL == tmpBuf) {
- state->error = WorkerError_memory;
- return;
- }
- destBuffer->segments = tmpBuf;
- destBuffer->segmentsSize = frameIndex - currentBufferStartIndex;
- /* Grow space for new DestBuffer. */
- tmpBuf = realloc(state->destBuffers, (state->destCount + 1) * sizeof(DestBuffer));
- if (NULL == tmpBuf) {
- state->error = WorkerError_memory;
- return;
- }
- state->destBuffers = tmpBuf;
- state->destCount++;
- destBuffer = &state->destBuffers[state->destCount - 1];
- /* Don't take any chances will non-NULL pointers. */
- memset(destBuffer, 0, sizeof(DestBuffer));
- allocationSize = roundpow2((size_t)state->totalSourceSize);
- if (decompressedSize > allocationSize) {
- allocationSize = roundpow2(decompressedSize);
- }
- destBuffer->dest = malloc(allocationSize);
- if (NULL == destBuffer->dest) {
- state->error = WorkerError_memory;
- return;
- }
- destBuffer->destSize = allocationSize;
- destAvailable = allocationSize;
- destOffset = 0;
- localOffset = 0;
- destBuffer->segments = calloc(remainingItems, sizeof(BufferSegment));
- if (NULL == destBuffer->segments) {
- state->error = WorkerError_memory;
- return;
- }
- destBuffer->segmentsSize = remainingItems;
- currentBufferStartIndex = frameIndex;
- }
- dest = (char*)destBuffer->dest + destOffset;
- outBuffer.dst = dest;
- outBuffer.size = decompressedSize;
- outBuffer.pos = 0;
- inBuffer.src = source;
- inBuffer.size = sourceSize;
- inBuffer.pos = 0;
- zresult = ZSTD_decompressStream(state->dctx, &outBuffer, &inBuffer);
- if (ZSTD_isError(zresult)) {
- state->error = WorkerError_zstd;
- state->zresult = zresult;
- state->errorOffset = frameIndex;
- return;
- }
- else if (zresult || outBuffer.pos != decompressedSize) {
- state->error = WorkerError_sizeMismatch;
- state->zresult = outBuffer.pos;
- state->errorOffset = frameIndex;
- return;
- }
- destBuffer->segments[localOffset].offset = destOffset;
- destBuffer->segments[localOffset].length = outBuffer.pos;
- destOffset += outBuffer.pos;
- localOffset++;
- remainingItems--;
- }
- if (destBuffer->destSize > destOffset) {
- tmpBuf = realloc(destBuffer->dest, destOffset);
- if (NULL == tmpBuf) {
- state->error = WorkerError_memory;
- return;
- }
- destBuffer->dest = tmpBuf;
- destBuffer->destSize = destOffset;
- }
- }
- ZstdBufferWithSegmentsCollection* decompress_from_framesources(ZstdDecompressor* decompressor, FrameSources* frames,
- Py_ssize_t threadCount) {
- Py_ssize_t i = 0;
- int errored = 0;
- Py_ssize_t segmentsCount;
- ZstdBufferWithSegments* bws = NULL;
- PyObject* resultArg = NULL;
- Py_ssize_t resultIndex;
- ZstdBufferWithSegmentsCollection* result = NULL;
- FramePointer* framePointers = frames->frames;
- unsigned long long workerBytes = 0;
- Py_ssize_t currentThread = 0;
- Py_ssize_t workerStartOffset = 0;
- POOL_ctx* pool = NULL;
- WorkerState* workerStates = NULL;
- unsigned long long bytesPerWorker;
- /* Caller should normalize 0 and negative values to 1 or larger. */
- assert(threadCount >= 1);
- /* More threads than inputs makes no sense under any conditions. */
- threadCount = frames->framesSize < threadCount ? frames->framesSize
- : threadCount;
- /* TODO lower thread count if input size is too small and threads would just
- add overhead. */
- if (decompressor->dict) {
- if (ensure_ddict(decompressor->dict)) {
- return NULL;
- }
- }
- /* If threadCount==1, we don't start a thread pool. But we do leverage the
- same API for dispatching work. */
- workerStates = PyMem_Malloc(threadCount * sizeof(WorkerState));
- if (NULL == workerStates) {
- PyErr_NoMemory();
- goto finally;
- }
- memset(workerStates, 0, threadCount * sizeof(WorkerState));
- if (threadCount > 1) {
- pool = POOL_create(threadCount, 1);
- if (NULL == pool) {
- PyErr_SetString(ZstdError, "could not initialize zstd thread pool");
- goto finally;
- }
- }
- bytesPerWorker = frames->compressedSize / threadCount;
- if (bytesPerWorker > SIZE_MAX) {
- PyErr_SetString(ZstdError, "too much data per worker for this platform");
- goto finally;
- }
- for (i = 0; i < threadCount; i++) {
- size_t zresult;
- workerStates[i].dctx = ZSTD_createDCtx();
- if (NULL == workerStates[i].dctx) {
- PyErr_NoMemory();
- goto finally;
- }
- ZSTD_copyDCtx(workerStates[i].dctx, decompressor->dctx);
- if (decompressor->dict) {
- zresult = ZSTD_DCtx_refDDict(workerStates[i].dctx, decompressor->dict->ddict);
- if (zresult) {
- PyErr_Format(ZstdError, "unable to reference prepared dictionary: %s",
- ZSTD_getErrorName(zresult));
- goto finally;
- }
- }
- workerStates[i].framePointers = framePointers;
- workerStates[i].requireOutputSizes = 1;
- }
- Py_BEGIN_ALLOW_THREADS
- /* There are many ways to split work among workers.
- For now, we take a simple approach of splitting work so each worker
- gets roughly the same number of input bytes. This will result in more
- starvation than running N>threadCount jobs. But it avoids complications
- around state tracking, which could involve extra locking.
- */
- for (i = 0; i < frames->framesSize; i++) {
- workerBytes += frames->frames[i].sourceSize;
- /*
- * The last worker/thread needs to handle all remaining work. Don't
- * trigger it prematurely. Defer to the block outside of the loop.
- * (But still process this loop so workerBytes is correct.
- */
- if (currentThread == threadCount - 1) {
- continue;
- }
- if (workerBytes >= bytesPerWorker) {
- workerStates[currentThread].startOffset = workerStartOffset;
- workerStates[currentThread].endOffset = i;
- workerStates[currentThread].totalSourceSize = workerBytes;
- if (threadCount > 1) {
- POOL_add(pool, (POOL_function)decompress_worker, &workerStates[currentThread]);
- }
- else {
- decompress_worker(&workerStates[currentThread]);
- }
- currentThread++;
- workerStartOffset = i + 1;
- workerBytes = 0;
- }
- }
- if (workerBytes) {
- workerStates[currentThread].startOffset = workerStartOffset;
- workerStates[currentThread].endOffset = frames->framesSize - 1;
- workerStates[currentThread].totalSourceSize = workerBytes;
- if (threadCount > 1) {
- POOL_add(pool, (POOL_function)decompress_worker, &workerStates[currentThread]);
- }
- else {
- decompress_worker(&workerStates[currentThread]);
- }
- }
- if (threadCount > 1) {
- POOL_free(pool);
- pool = NULL;
- }
- Py_END_ALLOW_THREADS
- for (i = 0; i < threadCount; i++) {
- switch (workerStates[i].error) {
- case WorkerError_none:
- break;
- case WorkerError_zstd:
- PyErr_Format(ZstdError, "error decompressing item %zd: %s",
- workerStates[i].errorOffset, ZSTD_getErrorName(workerStates[i].zresult));
- errored = 1;
- break;
- case WorkerError_memory:
- PyErr_NoMemory();
- errored = 1;
- break;
- case WorkerError_sizeMismatch:
- PyErr_Format(ZstdError, "error decompressing item %zd: decompressed %zu bytes; expected %zu",
- workerStates[i].errorOffset, workerStates[i].zresult,
- framePointers[workerStates[i].errorOffset].destSize);
- errored = 1;
- break;
- case WorkerError_unknownSize:
- PyErr_Format(PyExc_ValueError, "could not determine decompressed size of item %zd",
- workerStates[i].errorOffset);
- errored = 1;
- break;
- default:
- PyErr_Format(ZstdError, "unhandled error type: %d; this is a bug",
- workerStates[i].error);
- errored = 1;
- break;
- }
- if (errored) {
- break;
- }
- }
- if (errored) {
- goto finally;
- }
- segmentsCount = 0;
- for (i = 0; i < threadCount; i++) {
- segmentsCount += workerStates[i].destCount;
- }
- resultArg = PyTuple_New(segmentsCount);
- if (NULL == resultArg) {
- goto finally;
- }
- resultIndex = 0;
- for (i = 0; i < threadCount; i++) {
- Py_ssize_t bufferIndex;
- WorkerState* state = &workerStates[i];
- for (bufferIndex = 0; bufferIndex < state->destCount; bufferIndex++) {
- DestBuffer* destBuffer = &state->destBuffers[bufferIndex];
- bws = BufferWithSegments_FromMemory(destBuffer->dest, destBuffer->destSize,
- destBuffer->segments, destBuffer->segmentsSize);
- if (NULL == bws) {
- goto finally;
- }
- /*
- * Memory for buffer and segments was allocated using malloc() in worker
- * and the memory is transferred to the BufferWithSegments instance. So
- * tell instance to use free() and NULL the reference in the state struct
- * so it isn't freed below.
- */
- bws->useFree = 1;
- destBuffer->dest = NULL;
- destBuffer->segments = NULL;
- PyTuple_SET_ITEM(resultArg, resultIndex++, (PyObject*)bws);
- }
- }
- result = (ZstdBufferWithSegmentsCollection*)PyObject_CallObject(
- (PyObject*)&ZstdBufferWithSegmentsCollectionType, resultArg);
- finally:
- Py_CLEAR(resultArg);
- if (workerStates) {
- for (i = 0; i < threadCount; i++) {
- Py_ssize_t bufferIndex;
- WorkerState* state = &workerStates[i];
- if (state->dctx) {
- ZSTD_freeDCtx(state->dctx);
- }
- for (bufferIndex = 0; bufferIndex < state->destCount; bufferIndex++) {
- if (state->destBuffers) {
- /*
- * Will be NULL if memory transfered to a BufferWithSegments.
- * Otherwise it is left over after an error occurred.
- */
- free(state->destBuffers[bufferIndex].dest);
- free(state->destBuffers[bufferIndex].segments);
- }
- }
- free(state->destBuffers);
- }
- PyMem_Free(workerStates);
- }
- POOL_free(pool);
- return result;
- }
- PyDoc_STRVAR(Decompressor_multi_decompress_to_buffer__doc__,
- "Decompress multiple frames to output buffers\n"
- "\n"
- "Receives a ``BufferWithSegments``, a ``BufferWithSegmentsCollection`` or a\n"
- "list of bytes-like objects. Each item in the passed collection should be a\n"
- "compressed zstd frame.\n"
- "\n"
- "Unless ``decompressed_sizes`` is specified, the content size *must* be\n"
- "written into the zstd frame header. If ``decompressed_sizes`` is specified,\n"
- "it is an object conforming to the buffer protocol that represents an array\n"
- "of 64-bit unsigned integers in the machine's native format. Specifying\n"
- "``decompressed_sizes`` avoids a pre-scan of each frame to determine its\n"
- "output size.\n"
- "\n"
- "Returns a ``BufferWithSegmentsCollection`` containing the decompressed\n"
- "data. All decompressed data is allocated in a single memory buffer. The\n"
- "``BufferWithSegments`` instance tracks which objects are at which offsets\n"
- "and their respective lengths.\n"
- "\n"
- "The ``threads`` argument controls how many threads to use for operations.\n"
- "Negative values will use the same number of threads as logical CPUs on the\n"
- "machine.\n"
- );
- static ZstdBufferWithSegmentsCollection* Decompressor_multi_decompress_to_buffer(ZstdDecompressor* self, PyObject* args, PyObject* kwargs) {
- static char* kwlist[] = {
- "frames",
- "decompressed_sizes",
- "threads",
- NULL
- };
- PyObject* frames;
- Py_buffer frameSizes;
- int threads = 0;
- Py_ssize_t frameCount;
- Py_buffer* frameBuffers = NULL;
- FramePointer* framePointers = NULL;
- unsigned long long* frameSizesP = NULL;
- unsigned long long totalInputSize = 0;
- FrameSources frameSources;
- ZstdBufferWithSegmentsCollection* result = NULL;
- Py_ssize_t i;
- memset(&frameSizes, 0, sizeof(frameSizes));
- #if PY_MAJOR_VERSION >= 3
- if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O|y*i:multi_decompress_to_buffer",
- #else
- if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O|s*i:multi_decompress_to_buffer",
- #endif
- kwlist, &frames, &frameSizes, &threads)) {
- return NULL;
- }
- if (frameSizes.buf) {
- if (!PyBuffer_IsContiguous(&frameSizes, 'C') || frameSizes.ndim > 1) {
- PyErr_SetString(PyExc_ValueError, "decompressed_sizes buffer should be contiguous and have a single dimension");
- goto finally;
- }
- frameSizesP = (unsigned long long*)frameSizes.buf;
- }
- if (threads < 0) {
- threads = cpu_count();
- }
- if (threads < 2) {
- threads = 1;
- }
- if (PyObject_TypeCheck(frames, &ZstdBufferWithSegmentsType)) {
- ZstdBufferWithSegments* buffer = (ZstdBufferWithSegments*)frames;
- frameCount = buffer->segmentCount;
- if (frameSizes.buf && frameSizes.len != frameCount * (Py_ssize_t)sizeof(unsigned long long)) {
- PyErr_Format(PyExc_ValueError, "decompressed_sizes size mismatch; expected %zd, got %zd",
- frameCount * sizeof(unsigned long long), frameSizes.len);
- goto finally;
- }
- framePointers = PyMem_Malloc(frameCount * sizeof(FramePointer));
- if (!framePointers) {
- PyErr_NoMemory();
- goto finally;
- }
- for (i = 0; i < frameCount; i++) {
- void* sourceData;
- unsigned long long sourceSize;
- unsigned long long decompressedSize = 0;
- if (buffer->segments[i].offset + buffer->segments[i].length > buffer->dataSize) {
- PyErr_Format(PyExc_ValueError, "item %zd has offset outside memory area", i);
- goto finally;
- }
- sourceData = (char*)buffer->data + buffer->segments[i].offset;
- sourceSize = buffer->segments[i].length;
- totalInputSize += sourceSize;
- if (frameSizesP) {
- decompressedSize = frameSizesP[i];
- }
- if (sourceSize > SIZE_MAX) {
- PyErr_Format(PyExc_ValueError,
- "item %zd is too large for this platform", i);
- goto finally;
- }
- if (decompressedSize > SIZE_MAX) {
- PyErr_Format(PyExc_ValueError,
- "decompressed size of item %zd is too large for this platform", i);
- goto finally;
- }
- framePointers[i].sourceData = sourceData;
- framePointers[i].sourceSize = (size_t)sourceSize;
- framePointers[i].destSize = (size_t)decompressedSize;
- }
- }
- else if (PyObject_TypeCheck(frames, &ZstdBufferWithSegmentsCollectionType)) {
- Py_ssize_t offset = 0;
- ZstdBufferWithSegments* buffer;
- ZstdBufferWithSegmentsCollection* collection = (ZstdBufferWithSegmentsCollection*)frames;
- frameCount = BufferWithSegmentsCollection_length(collection);
- if (frameSizes.buf && frameSizes.len != frameCount) {
- PyErr_Format(PyExc_ValueError,
- "decompressed_sizes size mismatch; expected %zd; got %zd",
- frameCount * sizeof(unsigned long long), frameSizes.len);
- goto finally;
- }
- framePointers = PyMem_Malloc(frameCount * sizeof(FramePointer));
- if (NULL == framePointers) {
- PyErr_NoMemory();
- goto finally;
- }
- /* Iterate the data structure directly because it is faster. */
- for (i = 0; i < collection->bufferCount; i++) {
- Py_ssize_t segmentIndex;
- buffer = collection->buffers[i];
- for (segmentIndex = 0; segmentIndex < buffer->segmentCount; segmentIndex++) {
- unsigned long long decompressedSize = frameSizesP ? frameSizesP[offset] : 0;
- if (buffer->segments[segmentIndex].offset + buffer->segments[segmentIndex].length > buffer->dataSize) {
- PyErr_Format(PyExc_ValueError, "item %zd has offset outside memory area",
- offset);
- goto finally;
- }
- if (buffer->segments[segmentIndex].length > SIZE_MAX) {
- PyErr_Format(PyExc_ValueError,
- "item %zd in buffer %zd is too large for this platform",
- segmentIndex, i);
- goto finally;
- }
- if (decompressedSize > SIZE_MAX) {
- PyErr_Format(PyExc_ValueError,
- "decompressed size of item %zd in buffer %zd is too large for this platform",
- segmentIndex, i);
- goto finally;
- }
- totalInputSize += buffer->segments[segmentIndex].length;
- framePointers[offset].sourceData = (char*)buffer->data + buffer->segments[segmentIndex].offset;
- framePointers[offset].sourceSize = (size_t)buffer->segments[segmentIndex].length;
- framePointers[offset].destSize = (size_t)decompressedSize;
- offset++;
- }
- }
- }
- else if (PyList_Check(frames)) {
- frameCount = PyList_GET_SIZE(frames);
- if (frameSizes.buf && frameSizes.len != frameCount * (Py_ssize_t)sizeof(unsigned long long)) {
- PyErr_Format(PyExc_ValueError, "decompressed_sizes size mismatch; expected %zd, got %zd",
- frameCount * sizeof(unsigned long long), frameSizes.len);
- goto finally;
- }
- framePointers = PyMem_Malloc(frameCount * sizeof(FramePointer));
- if (!framePointers) {
- PyErr_NoMemory();
- goto finally;
- }
- frameBuffers = PyMem_Malloc(frameCount * sizeof(Py_buffer));
- if (NULL == frameBuffers) {
- PyErr_NoMemory();
- goto finally;
- }
- memset(frameBuffers, 0, frameCount * sizeof(Py_buffer));
- /* Do a pass to assemble info about our input buffers and output sizes. */
- for (i = 0; i < frameCount; i++) {
- unsigned long long decompressedSize = frameSizesP ? frameSizesP[i] : 0;
- if (0 != PyObject_GetBuffer(PyList_GET_ITEM(frames, i),
- &frameBuffers[i], PyBUF_CONTIG_RO)) {
- PyErr_Clear();
- PyErr_Format(PyExc_TypeError, "item %zd not a bytes like object", i);
- goto finally;
- }
- if (decompressedSize > SIZE_MAX) {
- PyErr_Format(PyExc_ValueError,
- "decompressed size of item %zd is too large for this platform", i);
- goto finally;
- }
- totalInputSize += frameBuffers[i].len;
- framePointers[i].sourceData = frameBuffers[i].buf;
- framePointers[i].sourceSize = frameBuffers[i].len;
- framePointers[i].destSize = (size_t)decompressedSize;
- }
- }
- else {
- PyErr_SetString(PyExc_TypeError, "argument must be list or BufferWithSegments");
- goto finally;
- }
- /* We now have an array with info about our inputs and outputs. Feed it into
- our generic decompression function. */
- frameSources.frames = framePointers;
- frameSources.framesSize = frameCount;
- frameSources.compressedSize = totalInputSize;
- result = decompress_from_framesources(self, &frameSources, threads);
- finally:
- if (frameSizes.buf) {
- PyBuffer_Release(&frameSizes);
- }
- PyMem_Free(framePointers);
- if (frameBuffers) {
- for (i = 0; i < frameCount; i++) {
- PyBuffer_Release(&frameBuffers[i]);
- }
- PyMem_Free(frameBuffers);
- }
- return result;
- }
- static PyMethodDef Decompressor_methods[] = {
- { "copy_stream", (PyCFunction)Decompressor_copy_stream, METH_VARARGS | METH_KEYWORDS,
- Decompressor_copy_stream__doc__ },
- { "decompress", (PyCFunction)Decompressor_decompress, METH_VARARGS | METH_KEYWORDS,
- Decompressor_decompress__doc__ },
- { "decompressobj", (PyCFunction)Decompressor_decompressobj, METH_VARARGS | METH_KEYWORDS,
- Decompressor_decompressobj__doc__ },
- { "read_to_iter", (PyCFunction)Decompressor_read_to_iter, METH_VARARGS | METH_KEYWORDS,
- Decompressor_read_to_iter__doc__ },
- /* TODO Remove deprecated API */
- { "read_from", (PyCFunction)Decompressor_read_to_iter, METH_VARARGS | METH_KEYWORDS,
- Decompressor_read_to_iter__doc__ },
- { "stream_reader", (PyCFunction)Decompressor_stream_reader,
- METH_VARARGS | METH_KEYWORDS, Decompressor_stream_reader__doc__ },
- { "stream_writer", (PyCFunction)Decompressor_stream_writer, METH_VARARGS | METH_KEYWORDS,
- Decompressor_stream_writer__doc__ },
- /* TODO remove deprecated API */
- { "write_to", (PyCFunction)Decompressor_stream_writer, METH_VARARGS | METH_KEYWORDS,
- Decompressor_stream_writer__doc__ },
- { "decompress_content_dict_chain", (PyCFunction)Decompressor_decompress_content_dict_chain,
- METH_VARARGS | METH_KEYWORDS, Decompressor_decompress_content_dict_chain__doc__ },
- { "multi_decompress_to_buffer", (PyCFunction)Decompressor_multi_decompress_to_buffer,
- METH_VARARGS | METH_KEYWORDS, Decompressor_multi_decompress_to_buffer__doc__ },
- { "memory_size", (PyCFunction)Decompressor_memory_size, METH_NOARGS,
- Decompressor_memory_size__doc__ },
- { NULL, NULL }
- };
- PyTypeObject ZstdDecompressorType = {
- PyVarObject_HEAD_INIT(NULL, 0)
- "zstd.ZstdDecompressor", /* tp_name */
- sizeof(ZstdDecompressor), /* tp_basicsize */
- 0, /* tp_itemsize */
- (destructor)Decompressor_dealloc, /* tp_dealloc */
- 0, /* tp_print */
- 0, /* tp_getattr */
- 0, /* tp_setattr */
- 0, /* tp_compare */
- 0, /* tp_repr */
- 0, /* tp_as_number */
- 0, /* tp_as_sequence */
- 0, /* tp_as_mapping */
- 0, /* tp_hash */
- 0, /* tp_call */
- 0, /* tp_str */
- 0, /* tp_getattro */
- 0, /* tp_setattro */
- 0, /* tp_as_buffer */
- Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE, /* tp_flags */
- Decompressor__doc__, /* tp_doc */
- 0, /* tp_traverse */
- 0, /* tp_clear */
- 0, /* tp_richcompare */
- 0, /* tp_weaklistoffset */
- 0, /* tp_iter */
- 0, /* tp_iternext */
- Decompressor_methods, /* tp_methods */
- 0, /* tp_members */
- 0, /* tp_getset */
- 0, /* tp_base */
- 0, /* tp_dict */
- 0, /* tp_descr_get */
- 0, /* tp_descr_set */
- 0, /* tp_dictoffset */
- (initproc)Decompressor_init, /* tp_init */
- 0, /* tp_alloc */
- PyType_GenericNew, /* tp_new */
- };
- void decompressor_module_init(PyObject* mod) {
- Py_TYPE(&ZstdDecompressorType) = &PyType_Type;
- if (PyType_Ready(&ZstdDecompressorType) < 0) {
- return;
- }
- Py_INCREF((PyObject*)&ZstdDecompressorType);
- PyModule_AddObject(mod, "ZstdDecompressor",
- (PyObject*)&ZstdDecompressorType);
- }
|