outqueue.c 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287
  1. ///////////////////////////////////////////////////////////////////////////////
  2. //
  3. /// \file outqueue.c
  4. /// \brief Output queue handling in multithreaded coding
  5. //
  6. // Author: Lasse Collin
  7. //
  8. // This file has been put into the public domain.
  9. // You can do whatever you want with this file.
  10. //
  11. ///////////////////////////////////////////////////////////////////////////////
  12. #include "outqueue.h"
  13. /// Get the maximum number of buffers that may be allocated based
  14. /// on the number of threads. For now this is twice the number of threads.
  15. /// It's a compromise between RAM usage and keeping the worker threads busy
  16. /// when buffers finish out of order.
  17. #define GET_BUFS_LIMIT(threads) (2 * (threads))
  18. extern uint64_t
  19. lzma_outq_memusage(uint64_t buf_size_max, uint32_t threads)
  20. {
  21. // This is to ease integer overflow checking: We may allocate up to
  22. // GET_BUFS_LIMIT(LZMA_THREADS_MAX) buffers and we need some extra
  23. // memory for other data structures too (that's the /2).
  24. //
  25. // lzma_outq_prealloc_buf() will still accept bigger buffers than this.
  26. const uint64_t limit
  27. = UINT64_MAX / GET_BUFS_LIMIT(LZMA_THREADS_MAX) / 2;
  28. if (threads > LZMA_THREADS_MAX || buf_size_max > limit)
  29. return UINT64_MAX;
  30. return GET_BUFS_LIMIT(threads)
  31. * lzma_outq_outbuf_memusage(buf_size_max);
  32. }
  33. static void
  34. move_head_to_cache(lzma_outq *outq, const lzma_allocator *allocator)
  35. {
  36. assert(outq->head != NULL);
  37. assert(outq->tail != NULL);
  38. assert(outq->bufs_in_use > 0);
  39. lzma_outbuf *buf = outq->head;
  40. outq->head = buf->next;
  41. if (outq->head == NULL)
  42. outq->tail = NULL;
  43. if (outq->cache != NULL && outq->cache->allocated != buf->allocated)
  44. lzma_outq_clear_cache(outq, allocator);
  45. buf->next = outq->cache;
  46. outq->cache = buf;
  47. --outq->bufs_in_use;
  48. outq->mem_in_use -= lzma_outq_outbuf_memusage(buf->allocated);
  49. return;
  50. }
  51. static void
  52. free_one_cached_buffer(lzma_outq *outq, const lzma_allocator *allocator)
  53. {
  54. assert(outq->cache != NULL);
  55. lzma_outbuf *buf = outq->cache;
  56. outq->cache = buf->next;
  57. --outq->bufs_allocated;
  58. outq->mem_allocated -= lzma_outq_outbuf_memusage(buf->allocated);
  59. lzma_free(buf, allocator);
  60. return;
  61. }
  62. extern void
  63. lzma_outq_clear_cache(lzma_outq *outq, const lzma_allocator *allocator)
  64. {
  65. while (outq->cache != NULL)
  66. free_one_cached_buffer(outq, allocator);
  67. return;
  68. }
  69. extern void
  70. lzma_outq_clear_cache2(lzma_outq *outq, const lzma_allocator *allocator,
  71. size_t keep_size)
  72. {
  73. if (outq->cache == NULL)
  74. return;
  75. // Free all but one.
  76. while (outq->cache->next != NULL)
  77. free_one_cached_buffer(outq, allocator);
  78. // Free the last one only if its size doesn't equal to keep_size.
  79. if (outq->cache->allocated != keep_size)
  80. free_one_cached_buffer(outq, allocator);
  81. return;
  82. }
  83. extern lzma_ret
  84. lzma_outq_init(lzma_outq *outq, const lzma_allocator *allocator,
  85. uint32_t threads)
  86. {
  87. if (threads > LZMA_THREADS_MAX)
  88. return LZMA_OPTIONS_ERROR;
  89. const uint32_t bufs_limit = GET_BUFS_LIMIT(threads);
  90. // Clear head/tail.
  91. while (outq->head != NULL)
  92. move_head_to_cache(outq, allocator);
  93. // If new buf_limit is lower than the old one, we may need to free
  94. // a few cached buffers.
  95. while (bufs_limit < outq->bufs_allocated)
  96. free_one_cached_buffer(outq, allocator);
  97. outq->bufs_limit = bufs_limit;
  98. outq->read_pos = 0;
  99. return LZMA_OK;
  100. }
  101. extern void
  102. lzma_outq_end(lzma_outq *outq, const lzma_allocator *allocator)
  103. {
  104. while (outq->head != NULL)
  105. move_head_to_cache(outq, allocator);
  106. lzma_outq_clear_cache(outq, allocator);
  107. return;
  108. }
  109. extern lzma_ret
  110. lzma_outq_prealloc_buf(lzma_outq *outq, const lzma_allocator *allocator,
  111. size_t size)
  112. {
  113. // Caller must have checked it with lzma_outq_has_buf().
  114. assert(outq->bufs_in_use < outq->bufs_limit);
  115. // If there already is appropriately-sized buffer in the cache,
  116. // we need to do nothing.
  117. if (outq->cache != NULL && outq->cache->allocated == size)
  118. return LZMA_OK;
  119. if (size > SIZE_MAX - sizeof(lzma_outbuf))
  120. return LZMA_MEM_ERROR;
  121. const size_t alloc_size = lzma_outq_outbuf_memusage(size);
  122. // The cache may have buffers but their size is wrong.
  123. lzma_outq_clear_cache(outq, allocator);
  124. outq->cache = lzma_alloc(alloc_size, allocator);
  125. if (outq->cache == NULL)
  126. return LZMA_MEM_ERROR;
  127. outq->cache->next = NULL;
  128. outq->cache->allocated = size;
  129. ++outq->bufs_allocated;
  130. outq->mem_allocated += alloc_size;
  131. return LZMA_OK;
  132. }
  133. extern lzma_outbuf *
  134. lzma_outq_get_buf(lzma_outq *outq, void *worker)
  135. {
  136. // Caller must have used lzma_outq_prealloc_buf() to ensure these.
  137. assert(outq->bufs_in_use < outq->bufs_limit);
  138. assert(outq->bufs_in_use < outq->bufs_allocated);
  139. assert(outq->cache != NULL);
  140. lzma_outbuf *buf = outq->cache;
  141. outq->cache = buf->next;
  142. buf->next = NULL;
  143. if (outq->tail != NULL) {
  144. assert(outq->head != NULL);
  145. outq->tail->next = buf;
  146. } else {
  147. assert(outq->head == NULL);
  148. outq->head = buf;
  149. }
  150. outq->tail = buf;
  151. buf->worker = worker;
  152. buf->finished = false;
  153. buf->finish_ret = LZMA_STREAM_END;
  154. buf->pos = 0;
  155. buf->decoder_in_pos = 0;
  156. buf->unpadded_size = 0;
  157. buf->uncompressed_size = 0;
  158. ++outq->bufs_in_use;
  159. outq->mem_in_use += lzma_outq_outbuf_memusage(buf->allocated);
  160. return buf;
  161. }
  162. extern bool
  163. lzma_outq_is_readable(const lzma_outq *outq)
  164. {
  165. if (outq->head == NULL)
  166. return false;
  167. return outq->read_pos < outq->head->pos || outq->head->finished;
  168. }
  169. extern lzma_ret
  170. lzma_outq_read(lzma_outq *restrict outq,
  171. const lzma_allocator *restrict allocator,
  172. uint8_t *restrict out, size_t *restrict out_pos,
  173. size_t out_size,
  174. lzma_vli *restrict unpadded_size,
  175. lzma_vli *restrict uncompressed_size)
  176. {
  177. // There must be at least one buffer from which to read.
  178. if (outq->bufs_in_use == 0)
  179. return LZMA_OK;
  180. // Get the buffer.
  181. lzma_outbuf *buf = outq->head;
  182. // Copy from the buffer to output.
  183. //
  184. // FIXME? In threaded decoder it may be bad to do this copy while
  185. // the mutex is being held.
  186. lzma_bufcpy(buf->buf, &outq->read_pos, buf->pos,
  187. out, out_pos, out_size);
  188. // Return if we didn't get all the data from the buffer.
  189. if (!buf->finished || outq->read_pos < buf->pos)
  190. return LZMA_OK;
  191. // The buffer was finished. Tell the caller its size information.
  192. if (unpadded_size != NULL)
  193. *unpadded_size = buf->unpadded_size;
  194. if (uncompressed_size != NULL)
  195. *uncompressed_size = buf->uncompressed_size;
  196. // Remember the return value.
  197. const lzma_ret finish_ret = buf->finish_ret;
  198. // Free this buffer for further use.
  199. move_head_to_cache(outq, allocator);
  200. outq->read_pos = 0;
  201. return finish_ret;
  202. }
  203. extern void
  204. lzma_outq_enable_partial_output(lzma_outq *outq,
  205. void (*enable_partial_output)(void *worker))
  206. {
  207. if (outq->head != NULL && !outq->head->finished
  208. && outq->head->worker != NULL) {
  209. enable_partial_output(outq->head->worker);
  210. // Set it to NULL since calling it twice is pointless.
  211. outq->head->worker = NULL;
  212. }
  213. return;
  214. }