decompressoriterator.c 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228
  1. /**
  2. * Copyright (c) 2016-present, Gregory Szorc
  3. * All rights reserved.
  4. *
  5. * This software may be modified and distributed under the terms
  6. * of the BSD license. See the LICENSE file for details.
  7. */
  8. #include "python-zstandard.h"
  9. #define min(a, b) (((a) < (b)) ? (a) : (b))
  10. extern PyObject *ZstdError;
  11. static void ZstdDecompressorIterator_dealloc(ZstdDecompressorIterator *self) {
  12. Py_XDECREF(self->decompressor);
  13. Py_XDECREF(self->reader);
  14. if (self->buffer.buf) {
  15. PyBuffer_Release(&self->buffer);
  16. memset(&self->buffer, 0, sizeof(self->buffer));
  17. }
  18. if (self->input.src) {
  19. PyMem_Free((void *)self->input.src);
  20. self->input.src = NULL;
  21. }
  22. PyObject_Del(self);
  23. }
  24. static PyObject *ZstdDecompressorIterator_iter(PyObject *self) {
  25. Py_INCREF(self);
  26. return self;
  27. }
  28. static DecompressorIteratorResult
  29. read_decompressor_iterator(ZstdDecompressorIterator *self) {
  30. size_t zresult;
  31. PyObject *chunk;
  32. DecompressorIteratorResult result;
  33. size_t oldInputPos = self->input.pos;
  34. result.chunk = NULL;
  35. chunk = PyBytes_FromStringAndSize(NULL, self->outSize);
  36. if (!chunk) {
  37. result.errored = 1;
  38. return result;
  39. }
  40. self->output.dst = PyBytes_AsString(chunk);
  41. self->output.size = self->outSize;
  42. self->output.pos = 0;
  43. Py_BEGIN_ALLOW_THREADS zresult = ZSTD_decompressStream(
  44. self->decompressor->dctx, &self->output, &self->input);
  45. Py_END_ALLOW_THREADS
  46. /* We're done with the pointer. Nullify to prevent anyone from getting a
  47. handle on a Python object. */
  48. self->output.dst = NULL;
  49. if (ZSTD_isError(zresult)) {
  50. Py_DECREF(chunk);
  51. PyErr_Format(ZstdError, "zstd decompress error: %s",
  52. ZSTD_getErrorName(zresult));
  53. result.errored = 1;
  54. return result;
  55. }
  56. self->readCount += self->input.pos - oldInputPos;
  57. /* Frame is fully decoded. Input exhausted and output sitting in buffer. */
  58. if (0 == zresult) {
  59. self->finishedInput = 1;
  60. self->finishedOutput = 1;
  61. }
  62. /* If it produced output data, return it. */
  63. if (self->output.pos) {
  64. if (self->output.pos < self->outSize) {
  65. if (safe_pybytes_resize(&chunk, self->output.pos)) {
  66. Py_XDECREF(chunk);
  67. result.errored = 1;
  68. return result;
  69. }
  70. }
  71. }
  72. else {
  73. Py_DECREF(chunk);
  74. chunk = NULL;
  75. }
  76. result.errored = 0;
  77. result.chunk = chunk;
  78. return result;
  79. }
  80. static PyObject *
  81. ZstdDecompressorIterator_iternext(ZstdDecompressorIterator *self) {
  82. PyObject *readResult = NULL;
  83. char *readBuffer;
  84. Py_ssize_t readSize;
  85. Py_ssize_t bufferRemaining;
  86. DecompressorIteratorResult result;
  87. if (self->finishedOutput) {
  88. PyErr_SetString(PyExc_StopIteration, "output flushed");
  89. return NULL;
  90. }
  91. /* If we have data left in the input, consume it. */
  92. if (self->input.pos < self->input.size) {
  93. result = read_decompressor_iterator(self);
  94. if (result.chunk || result.errored) {
  95. return result.chunk;
  96. }
  97. /* Else fall through to get more data from input. */
  98. }
  99. read_from_source:
  100. if (!self->finishedInput) {
  101. if (self->reader) {
  102. readResult =
  103. PyObject_CallMethod(self->reader, "read", "I", self->inSize);
  104. if (!readResult) {
  105. return NULL;
  106. }
  107. PyBytes_AsStringAndSize(readResult, &readBuffer, &readSize);
  108. }
  109. else {
  110. assert(self->buffer.buf);
  111. /* Only support contiguous C arrays for now */
  112. assert(self->buffer.strides == NULL &&
  113. self->buffer.suboffsets == NULL);
  114. assert(self->buffer.itemsize == 1);
  115. /* TODO avoid memcpy() below */
  116. readBuffer = (char *)self->buffer.buf + self->bufferOffset;
  117. bufferRemaining = self->buffer.len - self->bufferOffset;
  118. readSize = min(bufferRemaining, (Py_ssize_t)self->inSize);
  119. self->bufferOffset += readSize;
  120. }
  121. if (readSize) {
  122. if (!self->readCount && self->skipBytes) {
  123. assert(self->skipBytes < self->inSize);
  124. if ((Py_ssize_t)self->skipBytes >= readSize) {
  125. PyErr_SetString(PyExc_ValueError,
  126. "skip_bytes larger than first input chunk; "
  127. "this scenario is currently unsupported");
  128. Py_XDECREF(readResult);
  129. return NULL;
  130. }
  131. readBuffer = readBuffer + self->skipBytes;
  132. readSize -= self->skipBytes;
  133. }
  134. /* Copy input into previously allocated buffer because it can live
  135. longer than a single function call and we don't want to keep a ref
  136. to a Python object around. This could be changed... */
  137. memcpy((void *)self->input.src, readBuffer, readSize);
  138. self->input.size = readSize;
  139. self->input.pos = 0;
  140. }
  141. /* No bytes on first read must mean an empty input stream. */
  142. else if (!self->readCount) {
  143. self->finishedInput = 1;
  144. self->finishedOutput = 1;
  145. Py_XDECREF(readResult);
  146. PyErr_SetString(PyExc_StopIteration, "empty input");
  147. return NULL;
  148. }
  149. else {
  150. self->finishedInput = 1;
  151. }
  152. /* We've copied the data managed by memory. Discard the Python object.
  153. */
  154. Py_XDECREF(readResult);
  155. }
  156. result = read_decompressor_iterator(self);
  157. if (result.errored || result.chunk) {
  158. return result.chunk;
  159. }
  160. /* No new output data. Try again unless we know there is no more data. */
  161. if (!self->finishedInput) {
  162. goto read_from_source;
  163. }
  164. PyErr_SetString(PyExc_StopIteration, "input exhausted");
  165. return NULL;
  166. }
  167. PyType_Slot ZstdDecompressorIteratorSlots[] = {
  168. {Py_tp_dealloc, ZstdDecompressorIterator_dealloc},
  169. {Py_tp_iter, ZstdDecompressorIterator_iter},
  170. {Py_tp_iternext, ZstdDecompressorIterator_iternext},
  171. {Py_tp_new, PyType_GenericNew},
  172. {0, NULL},
  173. };
  174. PyType_Spec ZstdDecompressorIteratorSpec = {
  175. "zstd.ZstdDecompressorIterator",
  176. sizeof(ZstdDecompressorIterator),
  177. 0,
  178. Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE,
  179. ZstdDecompressorIteratorSlots,
  180. };
  181. PyTypeObject *ZstdDecompressorIteratorType;
  182. void decompressoriterator_module_init(PyObject *mod) {
  183. ZstdDecompressorIteratorType =
  184. (PyTypeObject *)PyType_FromSpec(&ZstdDecompressorIteratorSpec);
  185. if (PyType_Ready(ZstdDecompressorIteratorType) < 0) {
  186. return;
  187. }
  188. }