slicethread.c 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255
  1. /*
  2. * This file is part of FFmpeg.
  3. *
  4. * FFmpeg is free software; you can redistribute it and/or
  5. * modify it under the terms of the GNU Lesser General Public
  6. * License as published by the Free Software Foundation; either
  7. * version 2.1 of the License, or (at your option) any later version.
  8. *
  9. * FFmpeg is distributed in the hope that it will be useful,
  10. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  12. * Lesser General Public License for more details.
  13. *
  14. * You should have received a copy of the GNU Lesser General Public
  15. * License along with FFmpeg; if not, write to the Free Software
  16. * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
  17. */
  18. #include <stdatomic.h>
  19. #include "slicethread.h"
  20. #include "mem.h"
  21. #include "thread.h"
  22. #include "avassert.h"
  23. #if HAVE_PTHREADS || HAVE_W32THREADS || HAVE_OS2THREADS
  24. typedef struct WorkerContext {
  25. AVSliceThread *ctx;
  26. pthread_mutex_t mutex;
  27. pthread_cond_t cond;
  28. pthread_t thread;
  29. int done;
  30. } WorkerContext;
  31. struct AVSliceThread {
  32. WorkerContext *workers;
  33. int nb_threads;
  34. int nb_active_threads;
  35. int nb_jobs;
  36. atomic_uint first_job;
  37. atomic_uint current_job;
  38. pthread_mutex_t done_mutex;
  39. pthread_cond_t done_cond;
  40. int done;
  41. int finished;
  42. void *priv;
  43. void (*worker_func)(void *priv, int jobnr, int threadnr, int nb_jobs, int nb_threads);
  44. void (*main_func)(void *priv);
  45. };
  46. static int run_jobs(AVSliceThread *ctx)
  47. {
  48. unsigned nb_jobs = ctx->nb_jobs;
  49. unsigned nb_active_threads = ctx->nb_active_threads;
  50. unsigned first_job = atomic_fetch_add_explicit(&ctx->first_job, 1, memory_order_acq_rel);
  51. unsigned current_job = first_job;
  52. do {
  53. ctx->worker_func(ctx->priv, current_job, first_job, nb_jobs, nb_active_threads);
  54. } while ((current_job = atomic_fetch_add_explicit(&ctx->current_job, 1, memory_order_acq_rel)) < nb_jobs);
  55. return current_job == nb_jobs + nb_active_threads - 1;
  56. }
  57. static void *attribute_align_arg thread_worker(void *v)
  58. {
  59. WorkerContext *w = v;
  60. AVSliceThread *ctx = w->ctx;
  61. pthread_mutex_lock(&w->mutex);
  62. pthread_cond_signal(&w->cond);
  63. while (1) {
  64. w->done = 1;
  65. while (w->done)
  66. pthread_cond_wait(&w->cond, &w->mutex);
  67. if (ctx->finished) {
  68. pthread_mutex_unlock(&w->mutex);
  69. return NULL;
  70. }
  71. if (run_jobs(ctx)) {
  72. pthread_mutex_lock(&ctx->done_mutex);
  73. ctx->done = 1;
  74. pthread_cond_signal(&ctx->done_cond);
  75. pthread_mutex_unlock(&ctx->done_mutex);
  76. }
  77. }
  78. }
  79. int avpriv_slicethread_create(AVSliceThread **pctx, void *priv,
  80. void (*worker_func)(void *priv, int jobnr, int threadnr, int nb_jobs, int nb_threads),
  81. void (*main_func)(void *priv),
  82. int nb_threads)
  83. {
  84. AVSliceThread *ctx;
  85. int nb_workers, i;
  86. av_assert0(nb_threads >= 0);
  87. if (!nb_threads) {
  88. int nb_cpus = av_cpu_count();
  89. if (nb_cpus > 1)
  90. nb_threads = nb_cpus + 1;
  91. else
  92. nb_threads = 1;
  93. }
  94. nb_workers = nb_threads;
  95. if (!main_func)
  96. nb_workers--;
  97. *pctx = ctx = av_mallocz(sizeof(*ctx));
  98. if (!ctx)
  99. return AVERROR(ENOMEM);
  100. if (nb_workers && !(ctx->workers = av_calloc(nb_workers, sizeof(*ctx->workers)))) {
  101. av_freep(pctx);
  102. return AVERROR(ENOMEM);
  103. }
  104. ctx->priv = priv;
  105. ctx->worker_func = worker_func;
  106. ctx->main_func = main_func;
  107. ctx->nb_threads = nb_threads;
  108. ctx->nb_active_threads = 0;
  109. ctx->nb_jobs = 0;
  110. ctx->finished = 0;
  111. atomic_init(&ctx->first_job, 0);
  112. atomic_init(&ctx->current_job, 0);
  113. pthread_mutex_init(&ctx->done_mutex, NULL);
  114. pthread_cond_init(&ctx->done_cond, NULL);
  115. ctx->done = 0;
  116. for (i = 0; i < nb_workers; i++) {
  117. WorkerContext *w = &ctx->workers[i];
  118. int ret;
  119. w->ctx = ctx;
  120. pthread_mutex_init(&w->mutex, NULL);
  121. pthread_cond_init(&w->cond, NULL);
  122. pthread_mutex_lock(&w->mutex);
  123. w->done = 0;
  124. if (ret = pthread_create(&w->thread, NULL, thread_worker, w)) {
  125. ctx->nb_threads = main_func ? i : i + 1;
  126. pthread_mutex_unlock(&w->mutex);
  127. pthread_cond_destroy(&w->cond);
  128. pthread_mutex_destroy(&w->mutex);
  129. avpriv_slicethread_free(pctx);
  130. return AVERROR(ret);
  131. }
  132. while (!w->done)
  133. pthread_cond_wait(&w->cond, &w->mutex);
  134. pthread_mutex_unlock(&w->mutex);
  135. }
  136. return nb_threads;
  137. }
  138. void avpriv_slicethread_execute(AVSliceThread *ctx, int nb_jobs, int execute_main)
  139. {
  140. int nb_workers, i, is_last = 0;
  141. av_assert0(nb_jobs > 0);
  142. ctx->nb_jobs = nb_jobs;
  143. ctx->nb_active_threads = FFMIN(nb_jobs, ctx->nb_threads);
  144. atomic_store_explicit(&ctx->first_job, 0, memory_order_relaxed);
  145. atomic_store_explicit(&ctx->current_job, ctx->nb_active_threads, memory_order_relaxed);
  146. nb_workers = ctx->nb_active_threads;
  147. if (!ctx->main_func || !execute_main)
  148. nb_workers--;
  149. for (i = 0; i < nb_workers; i++) {
  150. WorkerContext *w = &ctx->workers[i];
  151. pthread_mutex_lock(&w->mutex);
  152. w->done = 0;
  153. pthread_cond_signal(&w->cond);
  154. pthread_mutex_unlock(&w->mutex);
  155. }
  156. if (ctx->main_func && execute_main)
  157. ctx->main_func(ctx->priv);
  158. else
  159. is_last = run_jobs(ctx);
  160. if (!is_last) {
  161. pthread_mutex_lock(&ctx->done_mutex);
  162. while (!ctx->done)
  163. pthread_cond_wait(&ctx->done_cond, &ctx->done_mutex);
  164. ctx->done = 0;
  165. pthread_mutex_unlock(&ctx->done_mutex);
  166. }
  167. }
  168. void avpriv_slicethread_free(AVSliceThread **pctx)
  169. {
  170. AVSliceThread *ctx;
  171. int nb_workers, i;
  172. if (!pctx || !*pctx)
  173. return;
  174. ctx = *pctx;
  175. nb_workers = ctx->nb_threads;
  176. if (!ctx->main_func)
  177. nb_workers--;
  178. ctx->finished = 1;
  179. for (i = 0; i < nb_workers; i++) {
  180. WorkerContext *w = &ctx->workers[i];
  181. pthread_mutex_lock(&w->mutex);
  182. w->done = 0;
  183. pthread_cond_signal(&w->cond);
  184. pthread_mutex_unlock(&w->mutex);
  185. }
  186. for (i = 0; i < nb_workers; i++) {
  187. WorkerContext *w = &ctx->workers[i];
  188. pthread_join(w->thread, NULL);
  189. pthread_cond_destroy(&w->cond);
  190. pthread_mutex_destroy(&w->mutex);
  191. }
  192. pthread_cond_destroy(&ctx->done_cond);
  193. pthread_mutex_destroy(&ctx->done_mutex);
  194. av_freep(&ctx->workers);
  195. av_freep(pctx);
  196. }
  197. #else /* HAVE_PTHREADS || HAVE_W32THREADS || HAVE_OS32THREADS */
  198. int avpriv_slicethread_create(AVSliceThread **pctx, void *priv,
  199. void (*worker_func)(void *priv, int jobnr, int threadnr, int nb_jobs, int nb_threads),
  200. void (*main_func)(void *priv),
  201. int nb_threads)
  202. {
  203. *pctx = NULL;
  204. return AVERROR(EINVAL);
  205. }
  206. void avpriv_slicethread_execute(AVSliceThread *ctx, int nb_jobs, int execute_main)
  207. {
  208. av_assert0(0);
  209. }
  210. void avpriv_slicethread_free(AVSliceThread **pctx)
  211. {
  212. av_assert0(!pctx || !*pctx);
  213. }
  214. #endif /* HAVE_PTHREADS || HAVE_W32THREADS || HAVE_OS32THREADS */