123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255 |
- /*
- * This file is part of FFmpeg.
- *
- * FFmpeg is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 2.1 of the License, or (at your option) any later version.
- *
- * FFmpeg is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with FFmpeg; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
- */
- #include <stdatomic.h>
- #include "slicethread.h"
- #include "mem.h"
- #include "thread.h"
- #include "avassert.h"
- #if HAVE_PTHREADS || HAVE_W32THREADS || HAVE_OS2THREADS
- typedef struct WorkerContext {
- AVSliceThread *ctx;
- pthread_mutex_t mutex;
- pthread_cond_t cond;
- pthread_t thread;
- int done;
- } WorkerContext;
- struct AVSliceThread {
- WorkerContext *workers;
- int nb_threads;
- int nb_active_threads;
- int nb_jobs;
- atomic_uint first_job;
- atomic_uint current_job;
- pthread_mutex_t done_mutex;
- pthread_cond_t done_cond;
- int done;
- int finished;
- void *priv;
- void (*worker_func)(void *priv, int jobnr, int threadnr, int nb_jobs, int nb_threads);
- void (*main_func)(void *priv);
- };
- static int run_jobs(AVSliceThread *ctx)
- {
- unsigned nb_jobs = ctx->nb_jobs;
- unsigned nb_active_threads = ctx->nb_active_threads;
- unsigned first_job = atomic_fetch_add_explicit(&ctx->first_job, 1, memory_order_acq_rel);
- unsigned current_job = first_job;
- do {
- ctx->worker_func(ctx->priv, current_job, first_job, nb_jobs, nb_active_threads);
- } while ((current_job = atomic_fetch_add_explicit(&ctx->current_job, 1, memory_order_acq_rel)) < nb_jobs);
- return current_job == nb_jobs + nb_active_threads - 1;
- }
- static void *attribute_align_arg thread_worker(void *v)
- {
- WorkerContext *w = v;
- AVSliceThread *ctx = w->ctx;
- pthread_mutex_lock(&w->mutex);
- pthread_cond_signal(&w->cond);
- while (1) {
- w->done = 1;
- while (w->done)
- pthread_cond_wait(&w->cond, &w->mutex);
- if (ctx->finished) {
- pthread_mutex_unlock(&w->mutex);
- return NULL;
- }
- if (run_jobs(ctx)) {
- pthread_mutex_lock(&ctx->done_mutex);
- ctx->done = 1;
- pthread_cond_signal(&ctx->done_cond);
- pthread_mutex_unlock(&ctx->done_mutex);
- }
- }
- }
- int avpriv_slicethread_create(AVSliceThread **pctx, void *priv,
- void (*worker_func)(void *priv, int jobnr, int threadnr, int nb_jobs, int nb_threads),
- void (*main_func)(void *priv),
- int nb_threads)
- {
- AVSliceThread *ctx;
- int nb_workers, i;
- av_assert0(nb_threads >= 0);
- if (!nb_threads) {
- int nb_cpus = av_cpu_count();
- if (nb_cpus > 1)
- nb_threads = nb_cpus + 1;
- else
- nb_threads = 1;
- }
- nb_workers = nb_threads;
- if (!main_func)
- nb_workers--;
- *pctx = ctx = av_mallocz(sizeof(*ctx));
- if (!ctx)
- return AVERROR(ENOMEM);
- if (nb_workers && !(ctx->workers = av_calloc(nb_workers, sizeof(*ctx->workers)))) {
- av_freep(pctx);
- return AVERROR(ENOMEM);
- }
- ctx->priv = priv;
- ctx->worker_func = worker_func;
- ctx->main_func = main_func;
- ctx->nb_threads = nb_threads;
- ctx->nb_active_threads = 0;
- ctx->nb_jobs = 0;
- ctx->finished = 0;
- atomic_init(&ctx->first_job, 0);
- atomic_init(&ctx->current_job, 0);
- pthread_mutex_init(&ctx->done_mutex, NULL);
- pthread_cond_init(&ctx->done_cond, NULL);
- ctx->done = 0;
- for (i = 0; i < nb_workers; i++) {
- WorkerContext *w = &ctx->workers[i];
- int ret;
- w->ctx = ctx;
- pthread_mutex_init(&w->mutex, NULL);
- pthread_cond_init(&w->cond, NULL);
- pthread_mutex_lock(&w->mutex);
- w->done = 0;
- if (ret = pthread_create(&w->thread, NULL, thread_worker, w)) {
- ctx->nb_threads = main_func ? i : i + 1;
- pthread_mutex_unlock(&w->mutex);
- pthread_cond_destroy(&w->cond);
- pthread_mutex_destroy(&w->mutex);
- avpriv_slicethread_free(pctx);
- return AVERROR(ret);
- }
- while (!w->done)
- pthread_cond_wait(&w->cond, &w->mutex);
- pthread_mutex_unlock(&w->mutex);
- }
- return nb_threads;
- }
- void avpriv_slicethread_execute(AVSliceThread *ctx, int nb_jobs, int execute_main)
- {
- int nb_workers, i, is_last = 0;
- av_assert0(nb_jobs > 0);
- ctx->nb_jobs = nb_jobs;
- ctx->nb_active_threads = FFMIN(nb_jobs, ctx->nb_threads);
- atomic_store_explicit(&ctx->first_job, 0, memory_order_relaxed);
- atomic_store_explicit(&ctx->current_job, ctx->nb_active_threads, memory_order_relaxed);
- nb_workers = ctx->nb_active_threads;
- if (!ctx->main_func || !execute_main)
- nb_workers--;
- for (i = 0; i < nb_workers; i++) {
- WorkerContext *w = &ctx->workers[i];
- pthread_mutex_lock(&w->mutex);
- w->done = 0;
- pthread_cond_signal(&w->cond);
- pthread_mutex_unlock(&w->mutex);
- }
- if (ctx->main_func && execute_main)
- ctx->main_func(ctx->priv);
- else
- is_last = run_jobs(ctx);
- if (!is_last) {
- pthread_mutex_lock(&ctx->done_mutex);
- while (!ctx->done)
- pthread_cond_wait(&ctx->done_cond, &ctx->done_mutex);
- ctx->done = 0;
- pthread_mutex_unlock(&ctx->done_mutex);
- }
- }
- void avpriv_slicethread_free(AVSliceThread **pctx)
- {
- AVSliceThread *ctx;
- int nb_workers, i;
- if (!pctx || !*pctx)
- return;
- ctx = *pctx;
- nb_workers = ctx->nb_threads;
- if (!ctx->main_func)
- nb_workers--;
- ctx->finished = 1;
- for (i = 0; i < nb_workers; i++) {
- WorkerContext *w = &ctx->workers[i];
- pthread_mutex_lock(&w->mutex);
- w->done = 0;
- pthread_cond_signal(&w->cond);
- pthread_mutex_unlock(&w->mutex);
- }
- for (i = 0; i < nb_workers; i++) {
- WorkerContext *w = &ctx->workers[i];
- pthread_join(w->thread, NULL);
- pthread_cond_destroy(&w->cond);
- pthread_mutex_destroy(&w->mutex);
- }
- pthread_cond_destroy(&ctx->done_cond);
- pthread_mutex_destroy(&ctx->done_mutex);
- av_freep(&ctx->workers);
- av_freep(pctx);
- }
- #else /* HAVE_PTHREADS || HAVE_W32THREADS || HAVE_OS32THREADS */
- int avpriv_slicethread_create(AVSliceThread **pctx, void *priv,
- void (*worker_func)(void *priv, int jobnr, int threadnr, int nb_jobs, int nb_threads),
- void (*main_func)(void *priv),
- int nb_threads)
- {
- *pctx = NULL;
- return AVERROR(EINVAL);
- }
- void avpriv_slicethread_execute(AVSliceThread *ctx, int nb_jobs, int execute_main)
- {
- av_assert0(0);
- }
- void avpriv_slicethread_free(AVSliceThread **pctx)
- {
- av_assert0(!pctx || !*pctx);
- }
- #endif /* HAVE_PTHREADS || HAVE_W32THREADS || HAVE_OS32THREADS */
|