decompressoriterator.c 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249
  1. /**
  2. * Copyright (c) 2016-present, Gregory Szorc
  3. * All rights reserved.
  4. *
  5. * This software may be modified and distributed under the terms
  6. * of the BSD license. See the LICENSE file for details.
  7. */
  8. #include "python-zstandard.h"
  9. #define min(a, b) (((a) < (b)) ? (a) : (b))
  10. extern PyObject* ZstdError;
  11. PyDoc_STRVAR(ZstdDecompressorIterator__doc__,
  12. "Represents an iterator of decompressed data.\n"
  13. );
  14. static void ZstdDecompressorIterator_dealloc(ZstdDecompressorIterator* self) {
  15. Py_XDECREF(self->decompressor);
  16. Py_XDECREF(self->reader);
  17. if (self->buffer.buf) {
  18. PyBuffer_Release(&self->buffer);
  19. memset(&self->buffer, 0, sizeof(self->buffer));
  20. }
  21. if (self->input.src) {
  22. PyMem_Free((void*)self->input.src);
  23. self->input.src = NULL;
  24. }
  25. PyObject_Del(self);
  26. }
  27. static PyObject* ZstdDecompressorIterator_iter(PyObject* self) {
  28. Py_INCREF(self);
  29. return self;
  30. }
  31. static DecompressorIteratorResult read_decompressor_iterator(ZstdDecompressorIterator* self) {
  32. size_t zresult;
  33. PyObject* chunk;
  34. DecompressorIteratorResult result;
  35. size_t oldInputPos = self->input.pos;
  36. result.chunk = NULL;
  37. chunk = PyBytes_FromStringAndSize(NULL, self->outSize);
  38. if (!chunk) {
  39. result.errored = 1;
  40. return result;
  41. }
  42. self->output.dst = PyBytes_AsString(chunk);
  43. self->output.size = self->outSize;
  44. self->output.pos = 0;
  45. Py_BEGIN_ALLOW_THREADS
  46. zresult = ZSTD_decompressStream(self->decompressor->dctx, &self->output, &self->input);
  47. Py_END_ALLOW_THREADS
  48. /* We're done with the pointer. Nullify to prevent anyone from getting a
  49. handle on a Python object. */
  50. self->output.dst = NULL;
  51. if (ZSTD_isError(zresult)) {
  52. Py_DECREF(chunk);
  53. PyErr_Format(ZstdError, "zstd decompress error: %s",
  54. ZSTD_getErrorName(zresult));
  55. result.errored = 1;
  56. return result;
  57. }
  58. self->readCount += self->input.pos - oldInputPos;
  59. /* Frame is fully decoded. Input exhausted and output sitting in buffer. */
  60. if (0 == zresult) {
  61. self->finishedInput = 1;
  62. self->finishedOutput = 1;
  63. }
  64. /* If it produced output data, return it. */
  65. if (self->output.pos) {
  66. if (self->output.pos < self->outSize) {
  67. if (safe_pybytes_resize(&chunk, self->output.pos)) {
  68. Py_XDECREF(chunk);
  69. result.errored = 1;
  70. return result;
  71. }
  72. }
  73. }
  74. else {
  75. Py_DECREF(chunk);
  76. chunk = NULL;
  77. }
  78. result.errored = 0;
  79. result.chunk = chunk;
  80. return result;
  81. }
  82. static PyObject* ZstdDecompressorIterator_iternext(ZstdDecompressorIterator* self) {
  83. PyObject* readResult = NULL;
  84. char* readBuffer;
  85. Py_ssize_t readSize;
  86. Py_ssize_t bufferRemaining;
  87. DecompressorIteratorResult result;
  88. if (self->finishedOutput) {
  89. PyErr_SetString(PyExc_StopIteration, "output flushed");
  90. return NULL;
  91. }
  92. /* If we have data left in the input, consume it. */
  93. if (self->input.pos < self->input.size) {
  94. result = read_decompressor_iterator(self);
  95. if (result.chunk || result.errored) {
  96. return result.chunk;
  97. }
  98. /* Else fall through to get more data from input. */
  99. }
  100. read_from_source:
  101. if (!self->finishedInput) {
  102. if (self->reader) {
  103. readResult = PyObject_CallMethod(self->reader, "read", "I", self->inSize);
  104. if (!readResult) {
  105. return NULL;
  106. }
  107. PyBytes_AsStringAndSize(readResult, &readBuffer, &readSize);
  108. }
  109. else {
  110. assert(self->buffer.buf);
  111. /* Only support contiguous C arrays for now */
  112. assert(self->buffer.strides == NULL && self->buffer.suboffsets == NULL);
  113. assert(self->buffer.itemsize == 1);
  114. /* TODO avoid memcpy() below */
  115. readBuffer = (char *)self->buffer.buf + self->bufferOffset;
  116. bufferRemaining = self->buffer.len - self->bufferOffset;
  117. readSize = min(bufferRemaining, (Py_ssize_t)self->inSize);
  118. self->bufferOffset += readSize;
  119. }
  120. if (readSize) {
  121. if (!self->readCount && self->skipBytes) {
  122. assert(self->skipBytes < self->inSize);
  123. if ((Py_ssize_t)self->skipBytes >= readSize) {
  124. PyErr_SetString(PyExc_ValueError,
  125. "skip_bytes larger than first input chunk; "
  126. "this scenario is currently unsupported");
  127. Py_XDECREF(readResult);
  128. return NULL;
  129. }
  130. readBuffer = readBuffer + self->skipBytes;
  131. readSize -= self->skipBytes;
  132. }
  133. /* Copy input into previously allocated buffer because it can live longer
  134. than a single function call and we don't want to keep a ref to a Python
  135. object around. This could be changed... */
  136. memcpy((void*)self->input.src, readBuffer, readSize);
  137. self->input.size = readSize;
  138. self->input.pos = 0;
  139. }
  140. /* No bytes on first read must mean an empty input stream. */
  141. else if (!self->readCount) {
  142. self->finishedInput = 1;
  143. self->finishedOutput = 1;
  144. Py_XDECREF(readResult);
  145. PyErr_SetString(PyExc_StopIteration, "empty input");
  146. return NULL;
  147. }
  148. else {
  149. self->finishedInput = 1;
  150. }
  151. /* We've copied the data managed by memory. Discard the Python object. */
  152. Py_XDECREF(readResult);
  153. }
  154. result = read_decompressor_iterator(self);
  155. if (result.errored || result.chunk) {
  156. return result.chunk;
  157. }
  158. /* No new output data. Try again unless we know there is no more data. */
  159. if (!self->finishedInput) {
  160. goto read_from_source;
  161. }
  162. PyErr_SetString(PyExc_StopIteration, "input exhausted");
  163. return NULL;
  164. }
  165. PyTypeObject ZstdDecompressorIteratorType = {
  166. PyVarObject_HEAD_INIT(NULL, 0)
  167. "zstd.ZstdDecompressorIterator", /* tp_name */
  168. sizeof(ZstdDecompressorIterator), /* tp_basicsize */
  169. 0, /* tp_itemsize */
  170. (destructor)ZstdDecompressorIterator_dealloc, /* tp_dealloc */
  171. 0, /* tp_print */
  172. 0, /* tp_getattr */
  173. 0, /* tp_setattr */
  174. 0, /* tp_compare */
  175. 0, /* tp_repr */
  176. 0, /* tp_as_number */
  177. 0, /* tp_as_sequence */
  178. 0, /* tp_as_mapping */
  179. 0, /* tp_hash */
  180. 0, /* tp_call */
  181. 0, /* tp_str */
  182. 0, /* tp_getattro */
  183. 0, /* tp_setattro */
  184. 0, /* tp_as_buffer */
  185. Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE, /* tp_flags */
  186. ZstdDecompressorIterator__doc__, /* tp_doc */
  187. 0, /* tp_traverse */
  188. 0, /* tp_clear */
  189. 0, /* tp_richcompare */
  190. 0, /* tp_weaklistoffset */
  191. ZstdDecompressorIterator_iter, /* tp_iter */
  192. (iternextfunc)ZstdDecompressorIterator_iternext, /* tp_iternext */
  193. 0, /* tp_methods */
  194. 0, /* tp_members */
  195. 0, /* tp_getset */
  196. 0, /* tp_base */
  197. 0, /* tp_dict */
  198. 0, /* tp_descr_get */
  199. 0, /* tp_descr_set */
  200. 0, /* tp_dictoffset */
  201. 0, /* tp_init */
  202. 0, /* tp_alloc */
  203. PyType_GenericNew, /* tp_new */
  204. };
  205. void decompressoriterator_module_init(PyObject* mod) {
  206. Py_TYPE(&ZstdDecompressorIteratorType) = &PyType_Type;
  207. if (PyType_Ready(&ZstdDecompressorIteratorType) < 0) {
  208. return;
  209. }
  210. }