thread.c 24 KB


  1. /*
  2. * The copyright in this software is being made available under the 2-clauses
  3. * BSD License, included below. This software may be subject to other third
  4. * party and contributor rights, including patent rights, and no such rights
  5. * are granted under this license.
  6. *
  7. * Copyright (c) 2016, Even Rouault
  8. * All rights reserved.
  9. *
  10. * Redistribution and use in source and binary forms, with or without
  11. * modification, are permitted provided that the following conditions
  12. * are met:
  13. * 1. Redistributions of source code must retain the above copyright
  14. * notice, this list of conditions and the following disclaimer.
  15. * 2. Redistributions in binary form must reproduce the above copyright
  16. * notice, this list of conditions and the following disclaimer in the
  17. * documentation and/or other materials provided with the distribution.
  18. *
  19. * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS `AS IS'
  20. * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
  21. * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
  22. * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
  23. * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
  24. * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
  25. * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
  26. * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
  27. * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
  28. * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
  29. * POSSIBILITY OF SUCH DAMAGE.
  30. */
  31. #include <assert.h>
  32. #ifdef MUTEX_win32
  33. /* Some versions of x86_64-w64-mingw32-gc -m32 resolve InterlockedCompareExchange() */
  34. /* as __sync_val_compare_and_swap_4 but fails to link it. As this protects against */
  35. /* a rather unlikely race, skip it */
  36. #if !(defined(__MINGW32__) && defined(__i386__))
  37. #define HAVE_INTERLOCKED_COMPARE_EXCHANGE 1
  38. #endif
  39. #include <windows.h>
  40. #include <process.h>
  41. #include "opj_includes.h"
  42. OPJ_BOOL OPJ_CALLCONV opj_has_thread_support(void)
  43. {
  44. return OPJ_TRUE;
  45. }
  46. int OPJ_CALLCONV opj_get_num_cpus(void)
  47. {
  48. SYSTEM_INFO info;
  49. DWORD dwNum;
  50. GetSystemInfo(&info);
  51. dwNum = info.dwNumberOfProcessors;
  52. if (dwNum < 1) {
  53. return 1;
  54. }
  55. return (int)dwNum;
  56. }
  57. struct opj_mutex_t {
  58. CRITICAL_SECTION cs;
  59. };
  60. opj_mutex_t* opj_mutex_create(void)
  61. {
  62. opj_mutex_t* mutex = (opj_mutex_t*) opj_malloc(sizeof(opj_mutex_t));
  63. if (!mutex) {
  64. return NULL;
  65. }
  66. InitializeCriticalSectionAndSpinCount(&(mutex->cs), 4000);
  67. return mutex;
  68. }
  69. void opj_mutex_lock(opj_mutex_t* mutex)
  70. {
  71. EnterCriticalSection(&(mutex->cs));
  72. }
  73. void opj_mutex_unlock(opj_mutex_t* mutex)
  74. {
  75. LeaveCriticalSection(&(mutex->cs));
  76. }
  77. void opj_mutex_destroy(opj_mutex_t* mutex)
  78. {
  79. if (!mutex) {
  80. return;
  81. }
  82. DeleteCriticalSection(&(mutex->cs));
  83. opj_free(mutex);
  84. }
  85. struct opj_cond_waiter_list_t {
  86. HANDLE hEvent;
  87. struct opj_cond_waiter_list_t* next;
  88. };
  89. typedef struct opj_cond_waiter_list_t opj_cond_waiter_list_t;
  90. struct opj_cond_t {
  91. opj_mutex_t *internal_mutex;
  92. opj_cond_waiter_list_t *waiter_list;
  93. };
  94. static DWORD TLSKey = 0;
  95. static volatile LONG inTLSLockedSection = 0;
  96. static volatile int TLSKeyInit = OPJ_FALSE;
  97. opj_cond_t* opj_cond_create(void)
  98. {
  99. opj_cond_t* cond = (opj_cond_t*) opj_malloc(sizeof(opj_cond_t));
  100. if (!cond) {
  101. return NULL;
  102. }
  103. /* Make sure that the TLS key is allocated in a thread-safe way */
  104. /* We cannot use a global mutex/critical section since its creation itself would not be */
  105. /* thread-safe, so use InterlockedCompareExchange trick */
  106. while (OPJ_TRUE) {
  107. #if HAVE_INTERLOCKED_COMPARE_EXCHANGE
  108. if (InterlockedCompareExchange(&inTLSLockedSection, 1, 0) == 0)
  109. #endif
  110. {
  111. if (!TLSKeyInit) {
  112. TLSKey = TlsAlloc();
  113. TLSKeyInit = OPJ_TRUE;
  114. }
  115. #if HAVE_INTERLOCKED_COMPARE_EXCHANGE
  116. InterlockedCompareExchange(&inTLSLockedSection, 0, 1);
  117. #endif
  118. break;
  119. }
  120. }
  121. if (TLSKey == TLS_OUT_OF_INDEXES) {
  122. opj_free(cond);
  123. return NULL;
  124. }
  125. cond->internal_mutex = opj_mutex_create();
  126. if (cond->internal_mutex == NULL) {
  127. opj_free(cond);
  128. return NULL;
  129. }
  130. cond->waiter_list = NULL;
  131. return cond;
  132. }
  133. void opj_cond_wait(opj_cond_t* cond, opj_mutex_t* mutex)
  134. {
  135. opj_cond_waiter_list_t* item;
  136. HANDLE hEvent = (HANDLE) TlsGetValue(TLSKey);
  137. if (hEvent == NULL) {
  138. hEvent = CreateEvent(NULL, /* security attributes */
  139. 0, /* manual reset = no */
  140. 0, /* initial state = unsignaled */
  141. NULL /* no name */);
  142. assert(hEvent);
  143. TlsSetValue(TLSKey, hEvent);
  144. }
  145. /* Insert the waiter into the waiter list of the condition */
  146. opj_mutex_lock(cond->internal_mutex);
  147. item = (opj_cond_waiter_list_t*)opj_malloc(sizeof(opj_cond_waiter_list_t));
  148. assert(item != NULL);
  149. item->hEvent = hEvent;
  150. item->next = cond->waiter_list;
  151. cond->waiter_list = item;
  152. opj_mutex_unlock(cond->internal_mutex);
  153. /* Release the client mutex before waiting for the event being signaled */
  154. opj_mutex_unlock(mutex);
  155. /* Ideally we would check that we do not get WAIT_FAILED but it is hard */
  156. /* to report a failure. */
  157. WaitForSingleObject(hEvent, INFINITE);
  158. /* Reacquire the client mutex */
  159. opj_mutex_lock(mutex);
  160. }
  161. void opj_cond_signal(opj_cond_t* cond)
  162. {
  163. opj_cond_waiter_list_t* psIter;
  164. /* Signal the first registered event, and remove it from the list */
  165. opj_mutex_lock(cond->internal_mutex);
  166. psIter = cond->waiter_list;
  167. if (psIter != NULL) {
  168. SetEvent(psIter->hEvent);
  169. cond->waiter_list = psIter->next;
  170. opj_free(psIter);
  171. }
  172. opj_mutex_unlock(cond->internal_mutex);
  173. }
  174. void opj_cond_destroy(opj_cond_t* cond)
  175. {
  176. if (!cond) {
  177. return;
  178. }
  179. opj_mutex_destroy(cond->internal_mutex);
  180. assert(cond->waiter_list == NULL);
  181. opj_free(cond);
  182. }
  183. struct opj_thread_t {
  184. opj_thread_fn thread_fn;
  185. void* user_data;
  186. HANDLE hThread;
  187. };
  188. static unsigned int __stdcall opj_thread_callback_adapter(void *info)
  189. {
  190. opj_thread_t* thread = (opj_thread_t*) info;
  191. HANDLE hEvent = NULL;
  192. thread->thread_fn(thread->user_data);
  193. /* Free the handle possible allocated by a cond */
  194. while (OPJ_TRUE) {
  195. /* Make sure TLSKey is not being created just at that moment... */
  196. #if HAVE_INTERLOCKED_COMPARE_EXCHANGE
  197. if (InterlockedCompareExchange(&inTLSLockedSection, 1, 0) == 0)
  198. #endif
  199. {
  200. if (TLSKeyInit) {
  201. hEvent = (HANDLE) TlsGetValue(TLSKey);
  202. }
  203. #if HAVE_INTERLOCKED_COMPARE_EXCHANGE
  204. InterlockedCompareExchange(&inTLSLockedSection, 0, 1);
  205. #endif
  206. break;
  207. }
  208. }
  209. if (hEvent) {
  210. CloseHandle(hEvent);
  211. }
  212. return 0;
  213. }
  214. opj_thread_t* opj_thread_create(opj_thread_fn thread_fn, void* user_data)
  215. {
  216. opj_thread_t* thread;
  217. assert(thread_fn);
  218. thread = (opj_thread_t*) opj_malloc(sizeof(opj_thread_t));
  219. if (!thread) {
  220. return NULL;
  221. }
  222. thread->thread_fn = thread_fn;
  223. thread->user_data = user_data;
  224. thread->hThread = (HANDLE)_beginthreadex(NULL, 0,
  225. opj_thread_callback_adapter, thread, 0, NULL);
  226. if (thread->hThread == NULL) {
  227. opj_free(thread);
  228. return NULL;
  229. }
  230. return thread;
  231. }
  232. void opj_thread_join(opj_thread_t* thread)
  233. {
  234. WaitForSingleObject(thread->hThread, INFINITE);
  235. CloseHandle(thread->hThread);
  236. opj_free(thread);
  237. }
  238. #elif MUTEX_pthread
  239. #include <pthread.h>
  240. #include <stdlib.h>
  241. #include <unistd.h>
  242. /* Moved after all system includes, and in particular pthread.h, so as to */
  243. /* avoid poisoning issuing with malloc() use in pthread.h with ulibc (#1013) */
  244. #include "opj_includes.h"
  245. OPJ_BOOL OPJ_CALLCONV opj_has_thread_support(void)
  246. {
  247. return OPJ_TRUE;
  248. }
  249. int OPJ_CALLCONV opj_get_num_cpus(void)
  250. {
  251. #ifdef _SC_NPROCESSORS_ONLN
  252. return (int)sysconf(_SC_NPROCESSORS_ONLN);
  253. #else
  254. return 1;
  255. #endif
  256. }
  257. struct opj_mutex_t {
  258. pthread_mutex_t mutex;
  259. };
  260. opj_mutex_t* opj_mutex_create(void)
  261. {
  262. opj_mutex_t* mutex = (opj_mutex_t*) opj_calloc(1U, sizeof(opj_mutex_t));
  263. if (mutex != NULL) {
  264. if (pthread_mutex_init(&mutex->mutex, NULL) != 0) {
  265. opj_free(mutex);
  266. mutex = NULL;
  267. }
  268. }
  269. return mutex;
  270. }
  271. void opj_mutex_lock(opj_mutex_t* mutex)
  272. {
  273. pthread_mutex_lock(&(mutex->mutex));
  274. }
  275. void opj_mutex_unlock(opj_mutex_t* mutex)
  276. {
  277. pthread_mutex_unlock(&(mutex->mutex));
  278. }
  279. void opj_mutex_destroy(opj_mutex_t* mutex)
  280. {
  281. if (!mutex) {
  282. return;
  283. }
  284. pthread_mutex_destroy(&(mutex->mutex));
  285. opj_free(mutex);
  286. }
  287. struct opj_cond_t {
  288. pthread_cond_t cond;
  289. };
  290. opj_cond_t* opj_cond_create(void)
  291. {
  292. opj_cond_t* cond = (opj_cond_t*) opj_malloc(sizeof(opj_cond_t));
  293. if (!cond) {
  294. return NULL;
  295. }
  296. if (pthread_cond_init(&(cond->cond), NULL) != 0) {
  297. opj_free(cond);
  298. return NULL;
  299. }
  300. return cond;
  301. }
  302. void opj_cond_wait(opj_cond_t* cond, opj_mutex_t* mutex)
  303. {
  304. pthread_cond_wait(&(cond->cond), &(mutex->mutex));
  305. }
  306. void opj_cond_signal(opj_cond_t* cond)
  307. {
  308. int ret = pthread_cond_signal(&(cond->cond));
  309. (void)ret;
  310. assert(ret == 0);
  311. }
  312. void opj_cond_destroy(opj_cond_t* cond)
  313. {
  314. if (!cond) {
  315. return;
  316. }
  317. pthread_cond_destroy(&(cond->cond));
  318. opj_free(cond);
  319. }
  320. struct opj_thread_t {
  321. opj_thread_fn thread_fn;
  322. void* user_data;
  323. pthread_t thread;
  324. };
  325. static void* opj_thread_callback_adapter(void* info)
  326. {
  327. opj_thread_t* thread = (opj_thread_t*) info;
  328. thread->thread_fn(thread->user_data);
  329. return NULL;
  330. }
  331. opj_thread_t* opj_thread_create(opj_thread_fn thread_fn, void* user_data)
  332. {
  333. pthread_attr_t attr;
  334. opj_thread_t* thread;
  335. assert(thread_fn);
  336. thread = (opj_thread_t*) opj_malloc(sizeof(opj_thread_t));
  337. if (!thread) {
  338. return NULL;
  339. }
  340. thread->thread_fn = thread_fn;
  341. thread->user_data = user_data;
  342. pthread_attr_init(&attr);
  343. pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
  344. if (pthread_create(&(thread->thread), &attr,
  345. opj_thread_callback_adapter, (void *) thread) != 0) {
  346. opj_free(thread);
  347. return NULL;
  348. }
  349. return thread;
  350. }
  351. void opj_thread_join(opj_thread_t* thread)
  352. {
  353. void* status;
  354. pthread_join(thread->thread, &status);
  355. opj_free(thread);
  356. }
  357. #else
  358. /* Stub implementation */
  359. #include "opj_includes.h"
  360. OPJ_BOOL OPJ_CALLCONV opj_has_thread_support(void)
  361. {
  362. return OPJ_FALSE;
  363. }
  364. int OPJ_CALLCONV opj_get_num_cpus(void)
  365. {
  366. return 1;
  367. }
  368. opj_mutex_t* opj_mutex_create(void)
  369. {
  370. return NULL;
  371. }
  372. void opj_mutex_lock(opj_mutex_t* mutex)
  373. {
  374. (void) mutex;
  375. }
  376. void opj_mutex_unlock(opj_mutex_t* mutex)
  377. {
  378. (void) mutex;
  379. }
  380. void opj_mutex_destroy(opj_mutex_t* mutex)
  381. {
  382. (void) mutex;
  383. }
  384. opj_cond_t* opj_cond_create(void)
  385. {
  386. return NULL;
  387. }
  388. void opj_cond_wait(opj_cond_t* cond, opj_mutex_t* mutex)
  389. {
  390. (void) cond;
  391. (void) mutex;
  392. }
  393. void opj_cond_signal(opj_cond_t* cond)
  394. {
  395. (void) cond;
  396. }
  397. void opj_cond_destroy(opj_cond_t* cond)
  398. {
  399. (void) cond;
  400. }
  401. opj_thread_t* opj_thread_create(opj_thread_fn thread_fn, void* user_data)
  402. {
  403. (void) thread_fn;
  404. (void) user_data;
  405. return NULL;
  406. }
  407. void opj_thread_join(opj_thread_t* thread)
  408. {
  409. (void) thread;
  410. }
  411. #endif
  412. typedef struct {
  413. int key;
  414. void* value;
  415. opj_tls_free_func opj_free_func;
  416. } opj_tls_key_val_t;
  417. struct opj_tls_t {
  418. opj_tls_key_val_t* key_val;
  419. int key_val_count;
  420. };
  421. static opj_tls_t* opj_tls_new(void)
  422. {
  423. return (opj_tls_t*) opj_calloc(1, sizeof(opj_tls_t));
  424. }
  425. static void opj_tls_destroy(opj_tls_t* tls)
  426. {
  427. int i;
  428. if (!tls) {
  429. return;
  430. }
  431. for (i = 0; i < tls->key_val_count; i++) {
  432. if (tls->key_val[i].opj_free_func) {
  433. tls->key_val[i].opj_free_func(tls->key_val[i].value);
  434. }
  435. }
  436. opj_free(tls->key_val);
  437. opj_free(tls);
  438. }
  439. void* opj_tls_get(opj_tls_t* tls, int key)
  440. {
  441. int i;
  442. for (i = 0; i < tls->key_val_count; i++) {
  443. if (tls->key_val[i].key == key) {
  444. return tls->key_val[i].value;
  445. }
  446. }
  447. return NULL;
  448. }
  449. OPJ_BOOL opj_tls_set(opj_tls_t* tls, int key, void* value,
  450. opj_tls_free_func opj_free_func)
  451. {
  452. opj_tls_key_val_t* new_key_val;
  453. int i;
  454. if (tls->key_val_count == INT_MAX) {
  455. return OPJ_FALSE;
  456. }
  457. for (i = 0; i < tls->key_val_count; i++) {
  458. if (tls->key_val[i].key == key) {
  459. if (tls->key_val[i].opj_free_func) {
  460. tls->key_val[i].opj_free_func(tls->key_val[i].value);
  461. }
  462. tls->key_val[i].value = value;
  463. tls->key_val[i].opj_free_func = opj_free_func;
  464. return OPJ_TRUE;
  465. }
  466. }
  467. new_key_val = (opj_tls_key_val_t*) opj_realloc(tls->key_val,
  468. ((size_t)tls->key_val_count + 1U) * sizeof(opj_tls_key_val_t));
  469. if (!new_key_val) {
  470. return OPJ_FALSE;
  471. }
  472. tls->key_val = new_key_val;
  473. new_key_val[tls->key_val_count].key = key;
  474. new_key_val[tls->key_val_count].value = value;
  475. new_key_val[tls->key_val_count].opj_free_func = opj_free_func;
  476. tls->key_val_count ++;
  477. return OPJ_TRUE;
  478. }
  479. typedef struct {
  480. opj_job_fn job_fn;
  481. void *user_data;
  482. } opj_worker_thread_job_t;
  483. typedef struct {
  484. opj_thread_pool_t *tp;
  485. opj_thread_t *thread;
  486. int marked_as_waiting;
  487. opj_mutex_t *mutex;
  488. opj_cond_t *cond;
  489. } opj_worker_thread_t;
  490. typedef enum {
  491. OPJWTS_OK,
  492. OPJWTS_STOP,
  493. OPJWTS_ERROR
  494. } opj_worker_thread_state;
  495. struct opj_job_list_t {
  496. opj_worker_thread_job_t* job;
  497. struct opj_job_list_t* next;
  498. };
  499. typedef struct opj_job_list_t opj_job_list_t;
  500. struct opj_worker_thread_list_t {
  501. opj_worker_thread_t* worker_thread;
  502. struct opj_worker_thread_list_t* next;
  503. };
  504. typedef struct opj_worker_thread_list_t opj_worker_thread_list_t;
  505. struct opj_thread_pool_t {
  506. opj_worker_thread_t* worker_threads;
  507. int worker_threads_count;
  508. opj_cond_t* cond;
  509. opj_mutex_t* mutex;
  510. volatile opj_worker_thread_state state;
  511. opj_job_list_t* job_queue;
  512. volatile int pending_jobs_count;
  513. opj_worker_thread_list_t* waiting_worker_thread_list;
  514. int waiting_worker_thread_count;
  515. opj_tls_t* tls;
  516. int signaling_threshold;
  517. };
  518. static OPJ_BOOL opj_thread_pool_setup(opj_thread_pool_t* tp, int num_threads);
  519. static opj_worker_thread_job_t* opj_thread_pool_get_next_job(
  520. opj_thread_pool_t* tp,
  521. opj_worker_thread_t* worker_thread,
  522. OPJ_BOOL signal_job_finished);
  523. opj_thread_pool_t* opj_thread_pool_create(int num_threads)
  524. {
  525. opj_thread_pool_t* tp;
  526. tp = (opj_thread_pool_t*) opj_calloc(1, sizeof(opj_thread_pool_t));
  527. if (!tp) {
  528. return NULL;
  529. }
  530. tp->state = OPJWTS_OK;
  531. if (num_threads <= 0) {
  532. tp->tls = opj_tls_new();
  533. if (!tp->tls) {
  534. opj_free(tp);
  535. tp = NULL;
  536. }
  537. return tp;
  538. }
  539. tp->mutex = opj_mutex_create();
  540. if (!tp->mutex) {
  541. opj_free(tp);
  542. return NULL;
  543. }
  544. if (!opj_thread_pool_setup(tp, num_threads)) {
  545. opj_thread_pool_destroy(tp);
  546. return NULL;
  547. }
  548. return tp;
  549. }
  550. static void opj_worker_thread_function(void* user_data)
  551. {
  552. opj_worker_thread_t* worker_thread;
  553. opj_thread_pool_t* tp;
  554. opj_tls_t* tls;
  555. OPJ_BOOL job_finished = OPJ_FALSE;
  556. worker_thread = (opj_worker_thread_t*) user_data;
  557. tp = worker_thread->tp;
  558. tls = opj_tls_new();
  559. while (OPJ_TRUE) {
  560. opj_worker_thread_job_t* job = opj_thread_pool_get_next_job(tp, worker_thread,
  561. job_finished);
  562. if (job == NULL) {
  563. break;
  564. }
  565. if (job->job_fn) {
  566. job->job_fn(job->user_data, tls);
  567. }
  568. opj_free(job);
  569. job_finished = OPJ_TRUE;
  570. }
  571. opj_tls_destroy(tls);
  572. }
  573. static OPJ_BOOL opj_thread_pool_setup(opj_thread_pool_t* tp, int num_threads)
  574. {
  575. int i;
  576. OPJ_BOOL bRet = OPJ_TRUE;
  577. assert(num_threads > 0);
  578. tp->cond = opj_cond_create();
  579. if (tp->cond == NULL) {
  580. return OPJ_FALSE;
  581. }
  582. tp->worker_threads = (opj_worker_thread_t*) opj_calloc((size_t)num_threads,
  583. sizeof(opj_worker_thread_t));
  584. if (tp->worker_threads == NULL) {
  585. return OPJ_FALSE;
  586. }
  587. tp->worker_threads_count = num_threads;
  588. for (i = 0; i < num_threads; i++) {
  589. tp->worker_threads[i].tp = tp;
  590. tp->worker_threads[i].mutex = opj_mutex_create();
  591. if (tp->worker_threads[i].mutex == NULL) {
  592. tp->worker_threads_count = i;
  593. bRet = OPJ_FALSE;
  594. break;
  595. }
  596. tp->worker_threads[i].cond = opj_cond_create();
  597. if (tp->worker_threads[i].cond == NULL) {
  598. opj_mutex_destroy(tp->worker_threads[i].mutex);
  599. tp->worker_threads_count = i;
  600. bRet = OPJ_FALSE;
  601. break;
  602. }
  603. tp->worker_threads[i].marked_as_waiting = OPJ_FALSE;
  604. tp->worker_threads[i].thread = opj_thread_create(opj_worker_thread_function,
  605. &(tp->worker_threads[i]));
  606. if (tp->worker_threads[i].thread == NULL) {
  607. opj_mutex_destroy(tp->worker_threads[i].mutex);
  608. opj_cond_destroy(tp->worker_threads[i].cond);
  609. tp->worker_threads_count = i;
  610. bRet = OPJ_FALSE;
  611. break;
  612. }
  613. }
  614. /* Wait all threads to be started */
  615. /* printf("waiting for all threads to be started\n"); */
  616. opj_mutex_lock(tp->mutex);
  617. while (tp->waiting_worker_thread_count < tp->worker_threads_count) {
  618. opj_cond_wait(tp->cond, tp->mutex);
  619. }
  620. opj_mutex_unlock(tp->mutex);
  621. /* printf("all threads started\n"); */
  622. if (tp->state == OPJWTS_ERROR) {
  623. bRet = OPJ_FALSE;
  624. }
  625. return bRet;
  626. }
  627. /*
  628. void opj_waiting()
  629. {
  630. printf("waiting!\n");
  631. }
  632. */
  633. static opj_worker_thread_job_t* opj_thread_pool_get_next_job(
  634. opj_thread_pool_t* tp,
  635. opj_worker_thread_t* worker_thread,
  636. OPJ_BOOL signal_job_finished)
  637. {
  638. while (OPJ_TRUE) {
  639. opj_job_list_t* top_job_iter;
  640. opj_mutex_lock(tp->mutex);
  641. if (signal_job_finished) {
  642. signal_job_finished = OPJ_FALSE;
  643. tp->pending_jobs_count --;
  644. /*printf("tp=%p, remaining jobs: %d\n", tp, tp->pending_jobs_count);*/
  645. if (tp->pending_jobs_count <= tp->signaling_threshold) {
  646. opj_cond_signal(tp->cond);
  647. }
  648. }
  649. if (tp->state == OPJWTS_STOP) {
  650. opj_mutex_unlock(tp->mutex);
  651. return NULL;
  652. }
  653. top_job_iter = tp->job_queue;
  654. if (top_job_iter) {
  655. opj_worker_thread_job_t* job;
  656. tp->job_queue = top_job_iter->next;
  657. job = top_job_iter->job;
  658. opj_mutex_unlock(tp->mutex);
  659. opj_free(top_job_iter);
  660. return job;
  661. }
  662. /* opj_waiting(); */
  663. if (!worker_thread->marked_as_waiting) {
  664. opj_worker_thread_list_t* item;
  665. worker_thread->marked_as_waiting = OPJ_TRUE;
  666. tp->waiting_worker_thread_count ++;
  667. assert(tp->waiting_worker_thread_count <= tp->worker_threads_count);
  668. item = (opj_worker_thread_list_t*) opj_malloc(sizeof(opj_worker_thread_list_t));
  669. if (item == NULL) {
  670. tp->state = OPJWTS_ERROR;
  671. opj_cond_signal(tp->cond);
  672. opj_mutex_unlock(tp->mutex);
  673. return NULL;
  674. }
  675. item->worker_thread = worker_thread;
  676. item->next = tp->waiting_worker_thread_list;
  677. tp->waiting_worker_thread_list = item;
  678. }
  679. /* printf("signaling that worker thread is ready\n"); */
  680. opj_cond_signal(tp->cond);
  681. opj_mutex_lock(worker_thread->mutex);
  682. opj_mutex_unlock(tp->mutex);
  683. /* printf("waiting for job\n"); */
  684. opj_cond_wait(worker_thread->cond, worker_thread->mutex);
  685. opj_mutex_unlock(worker_thread->mutex);
  686. /* printf("got job\n"); */
  687. }
  688. }
  689. OPJ_BOOL opj_thread_pool_submit_job(opj_thread_pool_t* tp,
  690. opj_job_fn job_fn,
  691. void* user_data)
  692. {
  693. opj_worker_thread_job_t* job;
  694. opj_job_list_t* item;
  695. if (tp->mutex == NULL) {
  696. job_fn(user_data, tp->tls);
  697. return OPJ_TRUE;
  698. }
  699. job = (opj_worker_thread_job_t*)opj_malloc(sizeof(opj_worker_thread_job_t));
  700. if (job == NULL) {
  701. return OPJ_FALSE;
  702. }
  703. job->job_fn = job_fn;
  704. job->user_data = user_data;
  705. item = (opj_job_list_t*) opj_malloc(sizeof(opj_job_list_t));
  706. if (item == NULL) {
  707. opj_free(job);
  708. return OPJ_FALSE;
  709. }
  710. item->job = job;
  711. opj_mutex_lock(tp->mutex);
  712. tp->signaling_threshold = 100 * tp->worker_threads_count;
  713. while (tp->pending_jobs_count > tp->signaling_threshold) {
  714. /* printf("%d jobs enqueued. Waiting\n", tp->pending_jobs_count); */
  715. opj_cond_wait(tp->cond, tp->mutex);
  716. /* printf("...%d jobs enqueued.\n", tp->pending_jobs_count); */
  717. }
  718. item->next = tp->job_queue;
  719. tp->job_queue = item;
  720. tp->pending_jobs_count ++;
  721. if (tp->waiting_worker_thread_list) {
  722. opj_worker_thread_t* worker_thread;
  723. opj_worker_thread_list_t* next;
  724. opj_worker_thread_list_t* to_opj_free;
  725. worker_thread = tp->waiting_worker_thread_list->worker_thread;
  726. assert(worker_thread->marked_as_waiting);
  727. worker_thread->marked_as_waiting = OPJ_FALSE;
  728. next = tp->waiting_worker_thread_list->next;
  729. to_opj_free = tp->waiting_worker_thread_list;
  730. tp->waiting_worker_thread_list = next;
  731. tp->waiting_worker_thread_count --;
  732. opj_mutex_lock(worker_thread->mutex);
  733. opj_mutex_unlock(tp->mutex);
  734. opj_cond_signal(worker_thread->cond);
  735. opj_mutex_unlock(worker_thread->mutex);
  736. opj_free(to_opj_free);
  737. } else {
  738. opj_mutex_unlock(tp->mutex);
  739. }
  740. return OPJ_TRUE;
  741. }
  742. void opj_thread_pool_wait_completion(opj_thread_pool_t* tp,
  743. int max_remaining_jobs)
  744. {
  745. if (tp->mutex == NULL) {
  746. return;
  747. }
  748. if (max_remaining_jobs < 0) {
  749. max_remaining_jobs = 0;
  750. }
  751. opj_mutex_lock(tp->mutex);
  752. tp->signaling_threshold = max_remaining_jobs;
  753. while (tp->pending_jobs_count > max_remaining_jobs) {
  754. /*printf("tp=%p, jobs before wait = %d, max_remaining_jobs = %d\n", tp, tp->pending_jobs_count, max_remaining_jobs);*/
  755. opj_cond_wait(tp->cond, tp->mutex);
  756. /*printf("tp=%p, jobs after wait = %d\n", tp, tp->pending_jobs_count);*/
  757. }
  758. opj_mutex_unlock(tp->mutex);
  759. }
  760. int opj_thread_pool_get_thread_count(opj_thread_pool_t* tp)
  761. {
  762. return tp->worker_threads_count;
  763. }
  764. void opj_thread_pool_destroy(opj_thread_pool_t* tp)
  765. {
  766. if (!tp) {
  767. return;
  768. }
  769. if (tp->cond) {
  770. int i;
  771. opj_thread_pool_wait_completion(tp, 0);
  772. opj_mutex_lock(tp->mutex);
  773. tp->state = OPJWTS_STOP;
  774. opj_mutex_unlock(tp->mutex);
  775. for (i = 0; i < tp->worker_threads_count; i++) {
  776. opj_mutex_lock(tp->worker_threads[i].mutex);
  777. opj_cond_signal(tp->worker_threads[i].cond);
  778. opj_mutex_unlock(tp->worker_threads[i].mutex);
  779. opj_thread_join(tp->worker_threads[i].thread);
  780. opj_cond_destroy(tp->worker_threads[i].cond);
  781. opj_mutex_destroy(tp->worker_threads[i].mutex);
  782. }
  783. opj_free(tp->worker_threads);
  784. while (tp->waiting_worker_thread_list != NULL) {
  785. opj_worker_thread_list_t* next = tp->waiting_worker_thread_list->next;
  786. opj_free(tp->waiting_worker_thread_list);
  787. tp->waiting_worker_thread_list = next;
  788. }
  789. opj_cond_destroy(tp->cond);
  790. }
  791. opj_mutex_destroy(tp->mutex);
  792. opj_tls_destroy(tp->tls);
  793. opj_free(tp);
  794. }