stream_encoder_mt.c 35 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280
  1. // SPDX-License-Identifier: 0BSD
  2. ///////////////////////////////////////////////////////////////////////////////
  3. //
  4. /// \file stream_encoder_mt.c
  5. /// \brief Multithreaded .xz Stream encoder
  6. //
  7. // Author: Lasse Collin
  8. //
  9. ///////////////////////////////////////////////////////////////////////////////
  10. #include "filter_encoder.h"
  11. #include "easy_preset.h"
  12. #include "block_encoder.h"
  13. #include "block_buffer_encoder.h"
  14. #include "index_encoder.h"
  15. #include "outqueue.h"
  16. /// Maximum supported block size. This makes it simpler to prevent integer
  17. /// overflows if we are given unusually large block size.
  18. #define BLOCK_SIZE_MAX (UINT64_MAX / LZMA_THREADS_MAX)
  19. typedef enum {
  20. /// Waiting for work.
  21. THR_IDLE,
  22. /// Encoding is in progress.
  23. THR_RUN,
  24. /// Encoding is in progress but no more input data will
  25. /// be read.
  26. THR_FINISH,
  27. /// The main thread wants the thread to stop whatever it was doing
  28. /// but not exit.
  29. THR_STOP,
  30. /// The main thread wants the thread to exit. We could use
  31. /// cancellation but since there's stopped anyway, this is lazier.
  32. THR_EXIT,
  33. } worker_state;
  34. typedef struct lzma_stream_coder_s lzma_stream_coder;
  35. typedef struct worker_thread_s worker_thread;
  36. struct worker_thread_s {
  37. worker_state state;
  38. /// Input buffer of coder->block_size bytes. The main thread will
  39. /// put new input into this and update in_size accordingly. Once
  40. /// no more input is coming, state will be set to THR_FINISH.
  41. uint8_t *in;
  42. /// Amount of data available in the input buffer. This is modified
  43. /// only by the main thread.
  44. size_t in_size;
  45. /// Output buffer for this thread. This is set by the main
  46. /// thread every time a new Block is started with this thread
  47. /// structure.
  48. lzma_outbuf *outbuf;
  49. /// Pointer to the main structure is needed when putting this
  50. /// thread back to the stack of free threads.
  51. lzma_stream_coder *coder;
  52. /// The allocator is set by the main thread. Since a copy of the
  53. /// pointer is kept here, the application must not change the
  54. /// allocator before calling lzma_end().
  55. const lzma_allocator *allocator;
  56. /// Amount of uncompressed data that has already been compressed.
  57. uint64_t progress_in;
  58. /// Amount of compressed data that is ready.
  59. uint64_t progress_out;
  60. /// Block encoder
  61. lzma_next_coder block_encoder;
  62. /// Compression options for this Block
  63. lzma_block block_options;
  64. /// Filter chain for this thread. By copying the filters array
  65. /// to each thread it is possible to change the filter chain
  66. /// between Blocks using lzma_filters_update().
  67. lzma_filter filters[LZMA_FILTERS_MAX + 1];
  68. /// Next structure in the stack of free worker threads.
  69. worker_thread *next;
  70. mythread_mutex mutex;
  71. mythread_cond cond;
  72. /// The ID of this thread is used to join the thread
  73. /// when it's not needed anymore.
  74. mythread thread_id;
  75. };
  76. struct lzma_stream_coder_s {
  77. enum {
  78. SEQ_STREAM_HEADER,
  79. SEQ_BLOCK,
  80. SEQ_INDEX,
  81. SEQ_STREAM_FOOTER,
  82. } sequence;
  83. /// Start a new Block every block_size bytes of input unless
  84. /// LZMA_FULL_FLUSH or LZMA_FULL_BARRIER is used earlier.
  85. size_t block_size;
  86. /// The filter chain to use for the next Block.
  87. /// This can be updated using lzma_filters_update()
  88. /// after LZMA_FULL_BARRIER or LZMA_FULL_FLUSH.
  89. lzma_filter filters[LZMA_FILTERS_MAX + 1];
  90. /// A copy of filters[] will be put here when attempting to get
  91. /// a new worker thread. This will be copied to a worker thread
  92. /// when a thread becomes free and then this cache is marked as
  93. /// empty by setting [0].id = LZMA_VLI_UNKNOWN. Without this cache
  94. /// the filter options from filters[] would get uselessly copied
  95. /// multiple times (allocated and freed) when waiting for a new free
  96. /// worker thread.
  97. ///
  98. /// This is freed if filters[] is updated via lzma_filters_update().
  99. lzma_filter filters_cache[LZMA_FILTERS_MAX + 1];
  100. /// Index to hold sizes of the Blocks
  101. lzma_index *index;
  102. /// Index encoder
  103. lzma_next_coder index_encoder;
  104. /// Stream Flags for encoding the Stream Header and Stream Footer.
  105. lzma_stream_flags stream_flags;
  106. /// Buffer to hold Stream Header and Stream Footer.
  107. uint8_t header[LZMA_STREAM_HEADER_SIZE];
  108. /// Read position in header[]
  109. size_t header_pos;
  110. /// Output buffer queue for compressed data
  111. lzma_outq outq;
  112. /// How much memory to allocate for each lzma_outbuf.buf
  113. size_t outbuf_alloc_size;
  114. /// Maximum wait time if cannot use all the input and cannot
  115. /// fill the output buffer. This is in milliseconds.
  116. uint32_t timeout;
  117. /// Error code from a worker thread
  118. lzma_ret thread_error;
  119. /// Array of allocated thread-specific structures
  120. worker_thread *threads;
  121. /// Number of structures in "threads" above. This is also the
  122. /// number of threads that will be created at maximum.
  123. uint32_t threads_max;
  124. /// Number of thread structures that have been initialized, and
  125. /// thus the number of worker threads actually created so far.
  126. uint32_t threads_initialized;
  127. /// Stack of free threads. When a thread finishes, it puts itself
  128. /// back into this stack. This starts as empty because threads
  129. /// are created only when actually needed.
  130. worker_thread *threads_free;
  131. /// The most recent worker thread to which the main thread writes
  132. /// the new input from the application.
  133. worker_thread *thr;
  134. /// Amount of uncompressed data in Blocks that have already
  135. /// been finished.
  136. uint64_t progress_in;
  137. /// Amount of compressed data in Stream Header + Blocks that
  138. /// have already been finished.
  139. uint64_t progress_out;
  140. mythread_mutex mutex;
  141. mythread_cond cond;
  142. };
  143. /// Tell the main thread that something has gone wrong.
  144. static void
  145. worker_error(worker_thread *thr, lzma_ret ret)
  146. {
  147. assert(ret != LZMA_OK);
  148. assert(ret != LZMA_STREAM_END);
  149. mythread_sync(thr->coder->mutex) {
  150. if (thr->coder->thread_error == LZMA_OK)
  151. thr->coder->thread_error = ret;
  152. mythread_cond_signal(&thr->coder->cond);
  153. }
  154. return;
  155. }
  156. static worker_state
  157. worker_encode(worker_thread *thr, size_t *out_pos, worker_state state)
  158. {
  159. assert(thr->progress_in == 0);
  160. assert(thr->progress_out == 0);
  161. // Set the Block options.
  162. thr->block_options = (lzma_block){
  163. .version = 0,
  164. .check = thr->coder->stream_flags.check,
  165. .compressed_size = thr->outbuf->allocated,
  166. .uncompressed_size = thr->coder->block_size,
  167. .filters = thr->filters,
  168. };
  169. // Calculate maximum size of the Block Header. This amount is
  170. // reserved in the beginning of the buffer so that Block Header
  171. // along with Compressed Size and Uncompressed Size can be
  172. // written there.
  173. lzma_ret ret = lzma_block_header_size(&thr->block_options);
  174. if (ret != LZMA_OK) {
  175. worker_error(thr, ret);
  176. return THR_STOP;
  177. }
  178. // Initialize the Block encoder.
  179. ret = lzma_block_encoder_init(&thr->block_encoder,
  180. thr->allocator, &thr->block_options);
  181. if (ret != LZMA_OK) {
  182. worker_error(thr, ret);
  183. return THR_STOP;
  184. }
  185. size_t in_pos = 0;
  186. size_t in_size = 0;
  187. *out_pos = thr->block_options.header_size;
  188. const size_t out_size = thr->outbuf->allocated;
  189. do {
  190. mythread_sync(thr->mutex) {
  191. // Store in_pos and *out_pos into *thr so that
  192. // an application may read them via
  193. // lzma_get_progress() to get progress information.
  194. //
  195. // NOTE: These aren't updated when the encoding
  196. // finishes. Instead, the final values are taken
  197. // later from thr->outbuf.
  198. thr->progress_in = in_pos;
  199. thr->progress_out = *out_pos;
  200. while (in_size == thr->in_size
  201. && thr->state == THR_RUN)
  202. mythread_cond_wait(&thr->cond, &thr->mutex);
  203. state = thr->state;
  204. in_size = thr->in_size;
  205. }
  206. // Return if we were asked to stop or exit.
  207. if (state >= THR_STOP)
  208. return state;
  209. lzma_action action = state == THR_FINISH
  210. ? LZMA_FINISH : LZMA_RUN;
  211. // Limit the amount of input given to the Block encoder
  212. // at once. This way this thread can react fairly quickly
  213. // if the main thread wants us to stop or exit.
  214. static const size_t in_chunk_max = 16384;
  215. size_t in_limit = in_size;
  216. if (in_size - in_pos > in_chunk_max) {
  217. in_limit = in_pos + in_chunk_max;
  218. action = LZMA_RUN;
  219. }
  220. ret = thr->block_encoder.code(
  221. thr->block_encoder.coder, thr->allocator,
  222. thr->in, &in_pos, in_limit, thr->outbuf->buf,
  223. out_pos, out_size, action);
  224. } while (ret == LZMA_OK && *out_pos < out_size);
  225. switch (ret) {
  226. case LZMA_STREAM_END:
  227. assert(state == THR_FINISH);
  228. // Encode the Block Header. By doing it after
  229. // the compression, we can store the Compressed Size
  230. // and Uncompressed Size fields.
  231. ret = lzma_block_header_encode(&thr->block_options,
  232. thr->outbuf->buf);
  233. if (ret != LZMA_OK) {
  234. worker_error(thr, ret);
  235. return THR_STOP;
  236. }
  237. break;
  238. case LZMA_OK:
  239. // The data was incompressible. Encode it using uncompressed
  240. // LZMA2 chunks.
  241. //
  242. // First wait that we have gotten all the input.
  243. mythread_sync(thr->mutex) {
  244. while (thr->state == THR_RUN)
  245. mythread_cond_wait(&thr->cond, &thr->mutex);
  246. state = thr->state;
  247. in_size = thr->in_size;
  248. }
  249. if (state >= THR_STOP)
  250. return state;
  251. // Do the encoding. This takes care of the Block Header too.
  252. *out_pos = 0;
  253. ret = lzma_block_uncomp_encode(&thr->block_options,
  254. thr->in, in_size, thr->outbuf->buf,
  255. out_pos, out_size);
  256. // It shouldn't fail.
  257. if (ret != LZMA_OK) {
  258. worker_error(thr, LZMA_PROG_ERROR);
  259. return THR_STOP;
  260. }
  261. break;
  262. default:
  263. worker_error(thr, ret);
  264. return THR_STOP;
  265. }
  266. // Set the size information that will be read by the main thread
  267. // to write the Index field.
  268. thr->outbuf->unpadded_size
  269. = lzma_block_unpadded_size(&thr->block_options);
  270. assert(thr->outbuf->unpadded_size != 0);
  271. thr->outbuf->uncompressed_size = thr->block_options.uncompressed_size;
  272. return THR_FINISH;
  273. }
  274. static MYTHREAD_RET_TYPE
  275. worker_start(void *thr_ptr)
  276. {
  277. worker_thread *thr = thr_ptr;
  278. worker_state state = THR_IDLE; // Init to silence a warning
  279. while (true) {
  280. // Wait for work.
  281. mythread_sync(thr->mutex) {
  282. while (true) {
  283. // The thread is already idle so if we are
  284. // requested to stop, just set the state.
  285. if (thr->state == THR_STOP) {
  286. thr->state = THR_IDLE;
  287. mythread_cond_signal(&thr->cond);
  288. }
  289. state = thr->state;
  290. if (state != THR_IDLE)
  291. break;
  292. mythread_cond_wait(&thr->cond, &thr->mutex);
  293. }
  294. }
  295. size_t out_pos = 0;
  296. assert(state != THR_IDLE);
  297. assert(state != THR_STOP);
  298. if (state <= THR_FINISH)
  299. state = worker_encode(thr, &out_pos, state);
  300. if (state == THR_EXIT)
  301. break;
  302. // Mark the thread as idle unless the main thread has
  303. // told us to exit. Signal is needed for the case
  304. // where the main thread is waiting for the threads to stop.
  305. mythread_sync(thr->mutex) {
  306. if (thr->state != THR_EXIT) {
  307. thr->state = THR_IDLE;
  308. mythread_cond_signal(&thr->cond);
  309. }
  310. }
  311. mythread_sync(thr->coder->mutex) {
  312. // If no errors occurred, make the encoded data
  313. // available to be copied out.
  314. if (state == THR_FINISH) {
  315. thr->outbuf->pos = out_pos;
  316. thr->outbuf->finished = true;
  317. }
  318. // Update the main progress info.
  319. thr->coder->progress_in
  320. += thr->outbuf->uncompressed_size;
  321. thr->coder->progress_out += out_pos;
  322. thr->progress_in = 0;
  323. thr->progress_out = 0;
  324. // Return this thread to the stack of free threads.
  325. thr->next = thr->coder->threads_free;
  326. thr->coder->threads_free = thr;
  327. mythread_cond_signal(&thr->coder->cond);
  328. }
  329. }
  330. // Exiting, free the resources.
  331. lzma_filters_free(thr->filters, thr->allocator);
  332. mythread_mutex_destroy(&thr->mutex);
  333. mythread_cond_destroy(&thr->cond);
  334. lzma_next_end(&thr->block_encoder, thr->allocator);
  335. lzma_free(thr->in, thr->allocator);
  336. return MYTHREAD_RET_VALUE;
  337. }
  338. /// Make the threads stop but not exit. Optionally wait for them to stop.
  339. static void
  340. threads_stop(lzma_stream_coder *coder, bool wait_for_threads)
  341. {
  342. // Tell the threads to stop.
  343. for (uint32_t i = 0; i < coder->threads_initialized; ++i) {
  344. mythread_sync(coder->threads[i].mutex) {
  345. coder->threads[i].state = THR_STOP;
  346. mythread_cond_signal(&coder->threads[i].cond);
  347. }
  348. }
  349. if (!wait_for_threads)
  350. return;
  351. // Wait for the threads to settle in the idle state.
  352. for (uint32_t i = 0; i < coder->threads_initialized; ++i) {
  353. mythread_sync(coder->threads[i].mutex) {
  354. while (coder->threads[i].state != THR_IDLE)
  355. mythread_cond_wait(&coder->threads[i].cond,
  356. &coder->threads[i].mutex);
  357. }
  358. }
  359. return;
  360. }
  361. /// Stop the threads and free the resources associated with them.
  362. /// Wait until the threads have exited.
  363. static void
  364. threads_end(lzma_stream_coder *coder, const lzma_allocator *allocator)
  365. {
  366. for (uint32_t i = 0; i < coder->threads_initialized; ++i) {
  367. mythread_sync(coder->threads[i].mutex) {
  368. coder->threads[i].state = THR_EXIT;
  369. mythread_cond_signal(&coder->threads[i].cond);
  370. }
  371. }
  372. for (uint32_t i = 0; i < coder->threads_initialized; ++i) {
  373. int ret = mythread_join(coder->threads[i].thread_id);
  374. assert(ret == 0);
  375. (void)ret;
  376. }
  377. lzma_free(coder->threads, allocator);
  378. return;
  379. }
  380. /// Initialize a new worker_thread structure and create a new thread.
  381. static lzma_ret
  382. initialize_new_thread(lzma_stream_coder *coder,
  383. const lzma_allocator *allocator)
  384. {
  385. worker_thread *thr = &coder->threads[coder->threads_initialized];
  386. thr->in = lzma_alloc(coder->block_size, allocator);
  387. if (thr->in == NULL)
  388. return LZMA_MEM_ERROR;
  389. if (mythread_mutex_init(&thr->mutex))
  390. goto error_mutex;
  391. if (mythread_cond_init(&thr->cond))
  392. goto error_cond;
  393. thr->state = THR_IDLE;
  394. thr->allocator = allocator;
  395. thr->coder = coder;
  396. thr->progress_in = 0;
  397. thr->progress_out = 0;
  398. thr->block_encoder = LZMA_NEXT_CODER_INIT;
  399. thr->filters[0].id = LZMA_VLI_UNKNOWN;
  400. if (mythread_create(&thr->thread_id, &worker_start, thr))
  401. goto error_thread;
  402. ++coder->threads_initialized;
  403. coder->thr = thr;
  404. return LZMA_OK;
  405. error_thread:
  406. mythread_cond_destroy(&thr->cond);
  407. error_cond:
  408. mythread_mutex_destroy(&thr->mutex);
  409. error_mutex:
  410. lzma_free(thr->in, allocator);
  411. return LZMA_MEM_ERROR;
  412. }
  413. static lzma_ret
  414. get_thread(lzma_stream_coder *coder, const lzma_allocator *allocator)
  415. {
  416. // If there are no free output subqueues, there is no
  417. // point to try getting a thread.
  418. if (!lzma_outq_has_buf(&coder->outq))
  419. return LZMA_OK;
  420. // That's also true if we cannot allocate memory for the output
  421. // buffer in the output queue.
  422. return_if_error(lzma_outq_prealloc_buf(&coder->outq, allocator,
  423. coder->outbuf_alloc_size));
  424. // Make a thread-specific copy of the filter chain. Put it in
  425. // the cache array first so that if we cannot get a new thread yet,
  426. // the allocation is ready when we try again.
  427. if (coder->filters_cache[0].id == LZMA_VLI_UNKNOWN)
  428. return_if_error(lzma_filters_copy(
  429. coder->filters, coder->filters_cache, allocator));
  430. // If there is a free structure on the stack, use it.
  431. mythread_sync(coder->mutex) {
  432. if (coder->threads_free != NULL) {
  433. coder->thr = coder->threads_free;
  434. coder->threads_free = coder->threads_free->next;
  435. }
  436. }
  437. if (coder->thr == NULL) {
  438. // If there are no uninitialized structures left, return.
  439. if (coder->threads_initialized == coder->threads_max)
  440. return LZMA_OK;
  441. // Initialize a new thread.
  442. return_if_error(initialize_new_thread(coder, allocator));
  443. }
  444. // Reset the parts of the thread state that have to be done
  445. // in the main thread.
  446. mythread_sync(coder->thr->mutex) {
  447. coder->thr->state = THR_RUN;
  448. coder->thr->in_size = 0;
  449. coder->thr->outbuf = lzma_outq_get_buf(&coder->outq, NULL);
  450. // Free the old thread-specific filter options and replace
  451. // them with the already-allocated new options from
  452. // coder->filters_cache[]. Then mark the cache as empty.
  453. lzma_filters_free(coder->thr->filters, allocator);
  454. memcpy(coder->thr->filters, coder->filters_cache,
  455. sizeof(coder->filters_cache));
  456. coder->filters_cache[0].id = LZMA_VLI_UNKNOWN;
  457. mythread_cond_signal(&coder->thr->cond);
  458. }
  459. return LZMA_OK;
  460. }
  461. static lzma_ret
  462. stream_encode_in(lzma_stream_coder *coder, const lzma_allocator *allocator,
  463. const uint8_t *restrict in, size_t *restrict in_pos,
  464. size_t in_size, lzma_action action)
  465. {
  466. while (*in_pos < in_size
  467. || (coder->thr != NULL && action != LZMA_RUN)) {
  468. if (coder->thr == NULL) {
  469. // Get a new thread.
  470. const lzma_ret ret = get_thread(coder, allocator);
  471. if (coder->thr == NULL)
  472. return ret;
  473. }
  474. // Copy the input data to thread's buffer.
  475. size_t thr_in_size = coder->thr->in_size;
  476. lzma_bufcpy(in, in_pos, in_size, coder->thr->in,
  477. &thr_in_size, coder->block_size);
  478. // Tell the Block encoder to finish if
  479. // - it has got block_size bytes of input; or
  480. // - all input was used and LZMA_FINISH, LZMA_FULL_FLUSH,
  481. // or LZMA_FULL_BARRIER was used.
  482. //
  483. // TODO: LZMA_SYNC_FLUSH and LZMA_SYNC_BARRIER.
  484. const bool finish = thr_in_size == coder->block_size
  485. || (*in_pos == in_size && action != LZMA_RUN);
  486. bool block_error = false;
  487. mythread_sync(coder->thr->mutex) {
  488. if (coder->thr->state == THR_IDLE) {
  489. // Something has gone wrong with the Block
  490. // encoder. It has set coder->thread_error
  491. // which we will read a few lines later.
  492. block_error = true;
  493. } else {
  494. // Tell the Block encoder its new amount
  495. // of input and update the state if needed.
  496. coder->thr->in_size = thr_in_size;
  497. if (finish)
  498. coder->thr->state = THR_FINISH;
  499. mythread_cond_signal(&coder->thr->cond);
  500. }
  501. }
  502. if (block_error) {
  503. lzma_ret ret = LZMA_OK; // Init to silence a warning.
  504. mythread_sync(coder->mutex) {
  505. ret = coder->thread_error;
  506. }
  507. return ret;
  508. }
  509. if (finish)
  510. coder->thr = NULL;
  511. }
  512. return LZMA_OK;
  513. }
  514. /// Wait until more input can be consumed, more output can be read, or
  515. /// an optional timeout is reached.
  516. static bool
  517. wait_for_work(lzma_stream_coder *coder, mythread_condtime *wait_abs,
  518. bool *has_blocked, bool has_input)
  519. {
  520. if (coder->timeout != 0 && !*has_blocked) {
  521. // Every time when stream_encode_mt() is called via
  522. // lzma_code(), *has_blocked starts as false. We set it
  523. // to true here and calculate the absolute time when
  524. // we must return if there's nothing to do.
  525. //
  526. // This way if we block multiple times for short moments
  527. // less than "timeout" milliseconds, we will return once
  528. // "timeout" amount of time has passed since the *first*
  529. // blocking occurred. If the absolute time was calculated
  530. // again every time we block, "timeout" would effectively
  531. // be meaningless if we never consecutively block longer
  532. // than "timeout" ms.
  533. *has_blocked = true;
  534. mythread_condtime_set(wait_abs, &coder->cond, coder->timeout);
  535. }
  536. bool timed_out = false;
  537. mythread_sync(coder->mutex) {
  538. // There are four things that we wait. If one of them
  539. // becomes possible, we return.
  540. // - If there is input left, we need to get a free
  541. // worker thread and an output buffer for it.
  542. // - Data ready to be read from the output queue.
  543. // - A worker thread indicates an error.
  544. // - Time out occurs.
  545. while ((!has_input || coder->threads_free == NULL
  546. || !lzma_outq_has_buf(&coder->outq))
  547. && !lzma_outq_is_readable(&coder->outq)
  548. && coder->thread_error == LZMA_OK
  549. && !timed_out) {
  550. if (coder->timeout != 0)
  551. timed_out = mythread_cond_timedwait(
  552. &coder->cond, &coder->mutex,
  553. wait_abs) != 0;
  554. else
  555. mythread_cond_wait(&coder->cond,
  556. &coder->mutex);
  557. }
  558. }
  559. return timed_out;
  560. }
  561. static lzma_ret
  562. stream_encode_mt(void *coder_ptr, const lzma_allocator *allocator,
  563. const uint8_t *restrict in, size_t *restrict in_pos,
  564. size_t in_size, uint8_t *restrict out,
  565. size_t *restrict out_pos, size_t out_size, lzma_action action)
  566. {
  567. lzma_stream_coder *coder = coder_ptr;
  568. switch (coder->sequence) {
  569. case SEQ_STREAM_HEADER:
  570. lzma_bufcpy(coder->header, &coder->header_pos,
  571. sizeof(coder->header),
  572. out, out_pos, out_size);
  573. if (coder->header_pos < sizeof(coder->header))
  574. return LZMA_OK;
  575. coder->header_pos = 0;
  576. coder->sequence = SEQ_BLOCK;
  577. // Fall through
  578. case SEQ_BLOCK: {
  579. // Initialized to silence warnings.
  580. lzma_vli unpadded_size = 0;
  581. lzma_vli uncompressed_size = 0;
  582. lzma_ret ret = LZMA_OK;
  583. // These are for wait_for_work().
  584. bool has_blocked = false;
  585. mythread_condtime wait_abs = { 0 };
  586. while (true) {
  587. mythread_sync(coder->mutex) {
  588. // Check for Block encoder errors.
  589. ret = coder->thread_error;
  590. if (ret != LZMA_OK) {
  591. assert(ret != LZMA_STREAM_END);
  592. break; // Break out of mythread_sync.
  593. }
  594. // Try to read compressed data to out[].
  595. ret = lzma_outq_read(&coder->outq, allocator,
  596. out, out_pos, out_size,
  597. &unpadded_size,
  598. &uncompressed_size);
  599. }
  600. if (ret == LZMA_STREAM_END) {
  601. // End of Block. Add it to the Index.
  602. ret = lzma_index_append(coder->index,
  603. allocator, unpadded_size,
  604. uncompressed_size);
  605. if (ret != LZMA_OK) {
  606. threads_stop(coder, false);
  607. return ret;
  608. }
  609. // If we didn't fill the output buffer yet,
  610. // try to read more data. Maybe the next
  611. // outbuf has been finished already too.
  612. if (*out_pos < out_size)
  613. continue;
  614. }
  615. if (ret != LZMA_OK) {
  616. // coder->thread_error was set.
  617. threads_stop(coder, false);
  618. return ret;
  619. }
  620. // Try to give uncompressed data to a worker thread.
  621. ret = stream_encode_in(coder, allocator,
  622. in, in_pos, in_size, action);
  623. if (ret != LZMA_OK) {
  624. threads_stop(coder, false);
  625. return ret;
  626. }
  627. // See if we should wait or return.
  628. //
  629. // TODO: LZMA_SYNC_FLUSH and LZMA_SYNC_BARRIER.
  630. if (*in_pos == in_size) {
  631. // LZMA_RUN: More data is probably coming
  632. // so return to let the caller fill the
  633. // input buffer.
  634. if (action == LZMA_RUN)
  635. return LZMA_OK;
  636. // LZMA_FULL_BARRIER: The same as with
  637. // LZMA_RUN but tell the caller that the
  638. // barrier was completed.
  639. if (action == LZMA_FULL_BARRIER)
  640. return LZMA_STREAM_END;
  641. // Finishing or flushing isn't completed until
  642. // all input data has been encoded and copied
  643. // to the output buffer.
  644. if (lzma_outq_is_empty(&coder->outq)) {
  645. // LZMA_FINISH: Continue to encode
  646. // the Index field.
  647. if (action == LZMA_FINISH)
  648. break;
  649. // LZMA_FULL_FLUSH: Return to tell
  650. // the caller that flushing was
  651. // completed.
  652. if (action == LZMA_FULL_FLUSH)
  653. return LZMA_STREAM_END;
  654. }
  655. }
  656. // Return if there is no output space left.
  657. // This check must be done after testing the input
  658. // buffer, because we might want to use a different
  659. // return code.
  660. if (*out_pos == out_size)
  661. return LZMA_OK;
  662. // Neither in nor out has been used completely.
  663. // Wait until there's something we can do.
  664. if (wait_for_work(coder, &wait_abs, &has_blocked,
  665. *in_pos < in_size))
  666. return LZMA_TIMED_OUT;
  667. }
  668. // All Blocks have been encoded and the threads have stopped.
  669. // Prepare to encode the Index field.
  670. return_if_error(lzma_index_encoder_init(
  671. &coder->index_encoder, allocator,
  672. coder->index));
  673. coder->sequence = SEQ_INDEX;
  674. // Update the progress info to take the Index and
  675. // Stream Footer into account. Those are very fast to encode
  676. // so in terms of progress information they can be thought
  677. // to be ready to be copied out.
  678. coder->progress_out += lzma_index_size(coder->index)
  679. + LZMA_STREAM_HEADER_SIZE;
  680. }
  681. // Fall through
  682. case SEQ_INDEX: {
  683. // Call the Index encoder. It doesn't take any input, so
  684. // those pointers can be NULL.
  685. const lzma_ret ret = coder->index_encoder.code(
  686. coder->index_encoder.coder, allocator,
  687. NULL, NULL, 0,
  688. out, out_pos, out_size, LZMA_RUN);
  689. if (ret != LZMA_STREAM_END)
  690. return ret;
  691. // Encode the Stream Footer into coder->buffer.
  692. coder->stream_flags.backward_size
  693. = lzma_index_size(coder->index);
  694. if (lzma_stream_footer_encode(&coder->stream_flags,
  695. coder->header) != LZMA_OK)
  696. return LZMA_PROG_ERROR;
  697. coder->sequence = SEQ_STREAM_FOOTER;
  698. }
  699. // Fall through
  700. case SEQ_STREAM_FOOTER:
  701. lzma_bufcpy(coder->header, &coder->header_pos,
  702. sizeof(coder->header),
  703. out, out_pos, out_size);
  704. return coder->header_pos < sizeof(coder->header)
  705. ? LZMA_OK : LZMA_STREAM_END;
  706. }
  707. assert(0);
  708. return LZMA_PROG_ERROR;
  709. }
  710. static void
  711. stream_encoder_mt_end(void *coder_ptr, const lzma_allocator *allocator)
  712. {
  713. lzma_stream_coder *coder = coder_ptr;
  714. // Threads must be killed before the output queue can be freed.
  715. threads_end(coder, allocator);
  716. lzma_outq_end(&coder->outq, allocator);
  717. lzma_filters_free(coder->filters, allocator);
  718. lzma_filters_free(coder->filters_cache, allocator);
  719. lzma_next_end(&coder->index_encoder, allocator);
  720. lzma_index_end(coder->index, allocator);
  721. mythread_cond_destroy(&coder->cond);
  722. mythread_mutex_destroy(&coder->mutex);
  723. lzma_free(coder, allocator);
  724. return;
  725. }
  726. static lzma_ret
  727. stream_encoder_mt_update(void *coder_ptr, const lzma_allocator *allocator,
  728. const lzma_filter *filters,
  729. const lzma_filter *reversed_filters
  730. lzma_attribute((__unused__)))
  731. {
  732. lzma_stream_coder *coder = coder_ptr;
  733. // Applications shouldn't attempt to change the options when
  734. // we are already encoding the Index or Stream Footer.
  735. if (coder->sequence > SEQ_BLOCK)
  736. return LZMA_PROG_ERROR;
  737. // For now the threaded encoder doesn't support changing
  738. // the options in the middle of a Block.
  739. if (coder->thr != NULL)
  740. return LZMA_PROG_ERROR;
  741. // Check if the filter chain seems mostly valid. See the comment
  742. // in stream_encoder_mt_init().
  743. if (lzma_raw_encoder_memusage(filters) == UINT64_MAX)
  744. return LZMA_OPTIONS_ERROR;
  745. // Make a copy to a temporary buffer first. This way the encoder
  746. // state stays unchanged if an error occurs in lzma_filters_copy().
  747. lzma_filter temp[LZMA_FILTERS_MAX + 1];
  748. return_if_error(lzma_filters_copy(filters, temp, allocator));
  749. // Free the options of the old chain as well as the cache.
  750. lzma_filters_free(coder->filters, allocator);
  751. lzma_filters_free(coder->filters_cache, allocator);
  752. // Copy the new filter chain in place.
  753. memcpy(coder->filters, temp, sizeof(temp));
  754. return LZMA_OK;
  755. }
  756. /// Options handling for lzma_stream_encoder_mt_init() and
  757. /// lzma_stream_encoder_mt_memusage()
  758. static lzma_ret
  759. get_options(const lzma_mt *options, lzma_options_easy *opt_easy,
  760. const lzma_filter **filters, uint64_t *block_size,
  761. uint64_t *outbuf_size_max)
  762. {
  763. // Validate some of the options.
  764. if (options == NULL)
  765. return LZMA_PROG_ERROR;
  766. if (options->flags != 0 || options->threads == 0
  767. || options->threads > LZMA_THREADS_MAX)
  768. return LZMA_OPTIONS_ERROR;
  769. if (options->filters != NULL) {
  770. // Filter chain was given, use it as is.
  771. *filters = options->filters;
  772. } else {
  773. // Use a preset.
  774. if (lzma_easy_preset(opt_easy, options->preset))
  775. return LZMA_OPTIONS_ERROR;
  776. *filters = opt_easy->filters;
  777. }
  778. // If the Block size is not set, determine it from the filter chain.
  779. if (options->block_size > 0)
  780. *block_size = options->block_size;
  781. else
  782. *block_size = lzma_mt_block_size(*filters);
  783. // UINT64_MAX > BLOCK_SIZE_MAX, so the second condition
  784. // should be optimized out by any reasonable compiler.
  785. // The second condition should be there in the unlikely event that
  786. // the macros change and UINT64_MAX < BLOCK_SIZE_MAX.
  787. if (*block_size > BLOCK_SIZE_MAX || *block_size == UINT64_MAX)
  788. return LZMA_OPTIONS_ERROR;
  789. // Calculate the maximum amount output that a single output buffer
  790. // may need to hold. This is the same as the maximum total size of
  791. // a Block.
  792. *outbuf_size_max = lzma_block_buffer_bound64(*block_size);
  793. if (*outbuf_size_max == 0)
  794. return LZMA_MEM_ERROR;
  795. return LZMA_OK;
  796. }
  797. static void
  798. get_progress(void *coder_ptr, uint64_t *progress_in, uint64_t *progress_out)
  799. {
  800. lzma_stream_coder *coder = coder_ptr;
  801. // Lock coder->mutex to prevent finishing threads from moving their
  802. // progress info from the worker_thread structure to lzma_stream_coder.
  803. mythread_sync(coder->mutex) {
  804. *progress_in = coder->progress_in;
  805. *progress_out = coder->progress_out;
  806. for (size_t i = 0; i < coder->threads_initialized; ++i) {
  807. mythread_sync(coder->threads[i].mutex) {
  808. *progress_in += coder->threads[i].progress_in;
  809. *progress_out += coder->threads[i]
  810. .progress_out;
  811. }
  812. }
  813. }
  814. return;
  815. }
  816. static lzma_ret
  817. stream_encoder_mt_init(lzma_next_coder *next, const lzma_allocator *allocator,
  818. const lzma_mt *options)
  819. {
  820. lzma_next_coder_init(&stream_encoder_mt_init, next, allocator);
  821. // Get the filter chain.
  822. lzma_options_easy easy;
  823. const lzma_filter *filters;
  824. uint64_t block_size;
  825. uint64_t outbuf_size_max;
  826. return_if_error(get_options(options, &easy, &filters,
  827. &block_size, &outbuf_size_max));
  828. #if SIZE_MAX < UINT64_MAX
  829. if (block_size > SIZE_MAX || outbuf_size_max > SIZE_MAX)
  830. return LZMA_MEM_ERROR;
  831. #endif
  832. // Validate the filter chain so that we can give an error in this
  833. // function instead of delaying it to the first call to lzma_code().
  834. // The memory usage calculation verifies the filter chain as
  835. // a side effect so we take advantage of that. It's not a perfect
  836. // check though as raw encoder allows LZMA1 too but such problems
  837. // will be caught eventually with Block Header encoder.
  838. if (lzma_raw_encoder_memusage(filters) == UINT64_MAX)
  839. return LZMA_OPTIONS_ERROR;
  840. // Validate the Check ID.
  841. if ((unsigned int)(options->check) > LZMA_CHECK_ID_MAX)
  842. return LZMA_PROG_ERROR;
  843. if (!lzma_check_is_supported(options->check))
  844. return LZMA_UNSUPPORTED_CHECK;
  845. // Allocate and initialize the base structure if needed.
  846. lzma_stream_coder *coder = next->coder;
  847. if (coder == NULL) {
  848. coder = lzma_alloc(sizeof(lzma_stream_coder), allocator);
  849. if (coder == NULL)
  850. return LZMA_MEM_ERROR;
  851. next->coder = coder;
  852. // For the mutex and condition variable initializations
  853. // the error handling has to be done here because
  854. // stream_encoder_mt_end() doesn't know if they have
  855. // already been initialized or not.
  856. if (mythread_mutex_init(&coder->mutex)) {
  857. lzma_free(coder, allocator);
  858. next->coder = NULL;
  859. return LZMA_MEM_ERROR;
  860. }
  861. if (mythread_cond_init(&coder->cond)) {
  862. mythread_mutex_destroy(&coder->mutex);
  863. lzma_free(coder, allocator);
  864. next->coder = NULL;
  865. return LZMA_MEM_ERROR;
  866. }
  867. next->code = &stream_encode_mt;
  868. next->end = &stream_encoder_mt_end;
  869. next->get_progress = &get_progress;
  870. next->update = &stream_encoder_mt_update;
  871. coder->filters[0].id = LZMA_VLI_UNKNOWN;
  872. coder->filters_cache[0].id = LZMA_VLI_UNKNOWN;
  873. coder->index_encoder = LZMA_NEXT_CODER_INIT;
  874. coder->index = NULL;
  875. memzero(&coder->outq, sizeof(coder->outq));
  876. coder->threads = NULL;
  877. coder->threads_max = 0;
  878. coder->threads_initialized = 0;
  879. }
  880. // Basic initializations
  881. coder->sequence = SEQ_STREAM_HEADER;
  882. coder->block_size = (size_t)(block_size);
  883. coder->outbuf_alloc_size = (size_t)(outbuf_size_max);
  884. coder->thread_error = LZMA_OK;
  885. coder->thr = NULL;
  886. // Allocate the thread-specific base structures.
  887. assert(options->threads > 0);
  888. if (coder->threads_max != options->threads) {
  889. threads_end(coder, allocator);
  890. coder->threads = NULL;
  891. coder->threads_max = 0;
  892. coder->threads_initialized = 0;
  893. coder->threads_free = NULL;
  894. coder->threads = lzma_alloc(
  895. options->threads * sizeof(worker_thread),
  896. allocator);
  897. if (coder->threads == NULL)
  898. return LZMA_MEM_ERROR;
  899. coder->threads_max = options->threads;
  900. } else {
  901. // Reuse the old structures and threads. Tell the running
  902. // threads to stop and wait until they have stopped.
  903. threads_stop(coder, true);
  904. }
  905. // Output queue
  906. return_if_error(lzma_outq_init(&coder->outq, allocator,
  907. options->threads));
  908. // Timeout
  909. coder->timeout = options->timeout;
  910. // Free the old filter chain and the cache.
  911. lzma_filters_free(coder->filters, allocator);
  912. lzma_filters_free(coder->filters_cache, allocator);
  913. // Copy the new filter chain.
  914. return_if_error(lzma_filters_copy(
  915. filters, coder->filters, allocator));
  916. // Index
  917. lzma_index_end(coder->index, allocator);
  918. coder->index = lzma_index_init(allocator);
  919. if (coder->index == NULL)
  920. return LZMA_MEM_ERROR;
  921. // Stream Header
  922. coder->stream_flags.version = 0;
  923. coder->stream_flags.check = options->check;
  924. return_if_error(lzma_stream_header_encode(
  925. &coder->stream_flags, coder->header));
  926. coder->header_pos = 0;
  927. // Progress info
  928. coder->progress_in = 0;
  929. coder->progress_out = LZMA_STREAM_HEADER_SIZE;
  930. return LZMA_OK;
  931. }
  932. #ifdef HAVE_SYMBOL_VERSIONS_LINUX
  933. // These are for compatibility with binaries linked against liblzma that
  934. // has been patched with xz-5.2.2-compat-libs.patch from RHEL/CentOS 7.
  935. // Actually that patch didn't create lzma_stream_encoder_mt@XZ_5.2.2
  936. // but it has been added here anyway since someone might misread the
  937. // RHEL patch and think both @XZ_5.1.2alpha and @XZ_5.2.2 exist.
  938. LZMA_SYMVER_API("lzma_stream_encoder_mt@XZ_5.1.2alpha",
  939. lzma_ret, lzma_stream_encoder_mt_512a)(
  940. lzma_stream *strm, const lzma_mt *options)
  941. lzma_nothrow lzma_attr_warn_unused_result
  942. __attribute__((__alias__("lzma_stream_encoder_mt_52")));
  943. LZMA_SYMVER_API("lzma_stream_encoder_mt@XZ_5.2.2",
  944. lzma_ret, lzma_stream_encoder_mt_522)(
  945. lzma_stream *strm, const lzma_mt *options)
  946. lzma_nothrow lzma_attr_warn_unused_result
  947. __attribute__((__alias__("lzma_stream_encoder_mt_52")));
  948. LZMA_SYMVER_API("lzma_stream_encoder_mt@@XZ_5.2",
  949. lzma_ret, lzma_stream_encoder_mt_52)(
  950. lzma_stream *strm, const lzma_mt *options)
  951. lzma_nothrow lzma_attr_warn_unused_result;
  952. #define lzma_stream_encoder_mt lzma_stream_encoder_mt_52
  953. #endif
  954. extern LZMA_API(lzma_ret)
  955. lzma_stream_encoder_mt(lzma_stream *strm, const lzma_mt *options)
  956. {
  957. lzma_next_strm_init(stream_encoder_mt_init, strm, options);
  958. strm->internal->supported_actions[LZMA_RUN] = true;
  959. // strm->internal->supported_actions[LZMA_SYNC_FLUSH] = true;
  960. strm->internal->supported_actions[LZMA_FULL_FLUSH] = true;
  961. strm->internal->supported_actions[LZMA_FULL_BARRIER] = true;
  962. strm->internal->supported_actions[LZMA_FINISH] = true;
  963. return LZMA_OK;
  964. }
  965. #ifdef HAVE_SYMBOL_VERSIONS_LINUX
  966. LZMA_SYMVER_API("lzma_stream_encoder_mt_memusage@XZ_5.1.2alpha",
  967. uint64_t, lzma_stream_encoder_mt_memusage_512a)(
  968. const lzma_mt *options) lzma_nothrow lzma_attr_pure
  969. __attribute__((__alias__("lzma_stream_encoder_mt_memusage_52")));
  970. LZMA_SYMVER_API("lzma_stream_encoder_mt_memusage@XZ_5.2.2",
  971. uint64_t, lzma_stream_encoder_mt_memusage_522)(
  972. const lzma_mt *options) lzma_nothrow lzma_attr_pure
  973. __attribute__((__alias__("lzma_stream_encoder_mt_memusage_52")));
  974. LZMA_SYMVER_API("lzma_stream_encoder_mt_memusage@@XZ_5.2",
  975. uint64_t, lzma_stream_encoder_mt_memusage_52)(
  976. const lzma_mt *options) lzma_nothrow lzma_attr_pure;
  977. #define lzma_stream_encoder_mt_memusage lzma_stream_encoder_mt_memusage_52
  978. #endif
  979. // This function name is a monster but it's consistent with the older
  980. // monster names. :-( 31 chars is the max that C99 requires so in that
  981. // sense it's not too long. ;-)
  982. extern LZMA_API(uint64_t)
  983. lzma_stream_encoder_mt_memusage(const lzma_mt *options)
  984. {
  985. lzma_options_easy easy;
  986. const lzma_filter *filters;
  987. uint64_t block_size;
  988. uint64_t outbuf_size_max;
  989. if (get_options(options, &easy, &filters, &block_size,
  990. &outbuf_size_max) != LZMA_OK)
  991. return UINT64_MAX;
  992. // Memory usage of the input buffers
  993. const uint64_t inbuf_memusage = options->threads * block_size;
  994. // Memory usage of the filter encoders
  995. uint64_t filters_memusage = lzma_raw_encoder_memusage(filters);
  996. if (filters_memusage == UINT64_MAX)
  997. return UINT64_MAX;
  998. filters_memusage *= options->threads;
  999. // Memory usage of the output queue
  1000. const uint64_t outq_memusage = lzma_outq_memusage(
  1001. outbuf_size_max, options->threads);
  1002. if (outq_memusage == UINT64_MAX)
  1003. return UINT64_MAX;
  1004. // Sum them with overflow checking.
  1005. uint64_t total_memusage = LZMA_MEMUSAGE_BASE
  1006. + sizeof(lzma_stream_coder)
  1007. + options->threads * sizeof(worker_thread);
  1008. if (UINT64_MAX - total_memusage < inbuf_memusage)
  1009. return UINT64_MAX;
  1010. total_memusage += inbuf_memusage;
  1011. if (UINT64_MAX - total_memusage < filters_memusage)
  1012. return UINT64_MAX;
  1013. total_memusage += filters_memusage;
  1014. if (UINT64_MAX - total_memusage < outq_memusage)
  1015. return UINT64_MAX;
  1016. return total_memusage + outq_memusage;
  1017. }