diff options
Diffstat (limited to 'misc/thread_pool.c')
-rw-r--r-- | misc/thread_pool.c | 177 |
1 files changed, 137 insertions, 40 deletions
diff --git a/misc/thread_pool.c b/misc/thread_pool.c index dddfad6734..25693a7f25 100644 --- a/misc/thread_pool.c +++ b/misc/thread_pool.c @@ -1,40 +1,50 @@ -/* - * This file is part of mpv. +/* Copyright (C) 2018 the mpv developers * - * mpv is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License as published by the Free Software Foundation; either - * version 2.1 of the License, or (at your option) any later version. + * Permission to use, copy, modify, and/or distribute this software for any + * purpose with or without fee is hereby granted, provided that the above + * copyright notice and this permission notice appear in all copies. * - * mpv is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Lesser General Public License for more details. - * - * You should have received a copy of the GNU Lesser General Public - * License along with mpv. If not, see <http://www.gnu.org/licenses/>. + * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. */ #include <pthread.h> #include "common/common.h" +#include "osdep/timer.h" #include "thread_pool.h" +// Threads destroy themselves after this many seconds, if there's no new work +// and the thread count is above the configured minimum. +#define DESTROY_TIMEOUT 10 + struct work { void (*fn)(void *ctx); void *fn_ctx; }; struct mp_thread_pool { - pthread_t *threads; - int num_threads; + int min_threads, max_threads; pthread_mutex_t lock; pthread_cond_t wakeup; // --- the following fields are protected by lock + + pthread_t *threads; + int num_threads; + + // Number of threads which have taken up work and are still processing it. + int busy_threads; + bool terminate; + struct work *work; int num_work; }; @@ -44,24 +54,58 @@ static void *worker_thread(void *arg) struct mp_thread_pool *pool = arg; pthread_mutex_lock(&pool->lock); - while (1) { - while (!pool->num_work && !pool->terminate) - pthread_cond_wait(&pool->wakeup, &pool->lock); - if (!pool->num_work && pool->terminate) - break; + struct timespec ts = {0}; + bool got_timeout = false; + while (1) { + struct work work = {0}; + if (pool->num_work > 0) { + work = pool->work[pool->num_work - 1]; + pool->num_work -= 1; + } - assert(pool->num_work > 0); - struct work work = pool->work[pool->num_work - 1]; - pool->num_work -= 1; + if (!work.fn) { + if (got_timeout || pool->terminate) + break; + + if (pool->num_threads > pool->min_threads) { + if (!ts.tv_sec && !ts.tv_nsec) + ts = mp_rel_time_to_timespec(DESTROY_TIMEOUT); + if (pthread_cond_timedwait(&pool->wakeup, &pool->lock, &ts)) + got_timeout = pool->num_threads > pool->min_threads; + } else { + pthread_cond_wait(&pool->wakeup, &pool->lock); + } + continue; + } + pool->busy_threads += 1; pthread_mutex_unlock(&pool->lock); + work.fn(work.fn_ctx); + pthread_mutex_lock(&pool->lock); + pool->busy_threads -= 1; + + ts = (struct timespec){0}; + got_timeout = false; + } + + // If no termination signal was given, it must mean we died because of a + // timeout, and nobody is waiting for us. We have to remove ourselves. + if (!pool->terminate) { + for (int n = 0; n < pool->num_threads; n++) { + if (pthread_equal(pool->threads[n], pthread_self())) { + pthread_detach(pthread_self()); + MP_TARRAY_REMOVE_AT(pool->threads, pool->num_threads, n); + pthread_mutex_unlock(&pool->lock); + return NULL; + } + } + assert(0); } - assert(pool->num_work == 0); - pthread_mutex_unlock(&pool->lock); + pthread_mutex_unlock(&pool->lock); return NULL; } @@ -69,27 +113,53 @@ static void thread_pool_dtor(void *ctx) { struct mp_thread_pool *pool = ctx; + pthread_mutex_lock(&pool->lock); + pool->terminate = true; pthread_cond_broadcast(&pool->wakeup); + + pthread_t *threads = pool->threads; + int num_threads = pool->num_threads; + + pool->threads = NULL; + pool->num_threads = 0; + pthread_mutex_unlock(&pool->lock); - for (int n = 0; n < pool->num_threads; n++) - pthread_join(pool->threads[n], NULL); + for (int n = 0; n < num_threads; n++) + pthread_join(threads[n], NULL); assert(pool->num_work == 0); + assert(pool->num_threads == 0); pthread_cond_destroy(&pool->wakeup); pthread_mutex_destroy(&pool->lock); } +static void add_thread(struct mp_thread_pool *pool) +{ + pthread_t thread; + + if (pthread_create(&thread, NULL, worker_thread, pool) == 0) + MP_TARRAY_APPEND(pool, pool->threads, pool->num_threads, thread); +} + // Create a thread pool with the given number of worker threads. This can return // NULL if the worker threads could not be created. The thread pool can be // destroyed with talloc_free(pool), or indirectly with talloc_free(ta_parent). // If there are still work items on freeing, it will block until all work items // are done, and the threads terminate. -struct mp_thread_pool *mp_thread_pool_create(void *ta_parent, int threads) +// init_threads is the number of threads created in this function (and it fails +// if it could not be done). min_threads must be >=, if it's >, then the +// remaining threads will be created on demand, but never destroyed. +// If init_threads > 0, then mp_thread_pool_queue() can never fail. +// If init_threads == 0, mp_thread_pool_create() itself can never fail. +struct mp_thread_pool *mp_thread_pool_create(void *ta_parent, int init_threads, + int min_threads, int max_threads) { - assert(threads > 0); + assert(min_threads >= 0); + assert(init_threads <= min_threads); + assert(max_threads > 0 && max_threads >= min_threads); struct mp_thread_pool *pool = talloc_zero(ta_parent, struct mp_thread_pool); talloc_set_destructor(pool, thread_pool_dtor); @@ -97,14 +167,17 @@ struct mp_thread_pool *mp_thread_pool_create(void *ta_parent, int threads) pthread_mutex_init(&pool->lock, NULL); pthread_cond_init(&pool->wakeup, NULL); - for (int n = 0; n < threads; n++) { - pthread_t thread; - if (pthread_create(&thread, NULL, worker_thread, pool)) { - talloc_free(pool); - return NULL; - } - MP_TARRAY_APPEND(pool, pool->threads, pool->num_threads, thread); - } + pool->min_threads = min_threads; + pool->max_threads = max_threads; + + pthread_mutex_lock(&pool->lock); + for (int n = 0; n < init_threads; n++) + add_thread(pool); + bool ok = pool->num_threads >= init_threads; + pthread_mutex_unlock(&pool->lock); + + if (!ok) + TA_FREEP(&pool); return pool; } @@ -114,12 +187,36 @@ struct mp_thread_pool *mp_thread_pool_create(void *ta_parent, int threads) // with unbounded size. This function always returns immediately. // Concurrent queue calls are allowed, as long as it does not overlap with // pool destruction. -void mp_thread_pool_queue(struct mp_thread_pool *pool, void (*fn)(void *ctx), +// This function is explicitly thread-safe. +// Cannot fail if thread pool was created with at least 1 thread. +bool mp_thread_pool_queue(struct mp_thread_pool *pool, void (*fn)(void *ctx), void *fn_ctx) { + bool ok = true; + + assert(fn); + pthread_mutex_lock(&pool->lock); struct work work = {fn, fn_ctx}; - MP_TARRAY_INSERT_AT(pool, pool->work, pool->num_work, 0, work); - pthread_cond_signal(&pool->wakeup); + + // If there are not enough threads to process all at once, but we can + // create a new thread, then do so. If work is queued quickly, it can + // happen that not all available threads have picked up work yet (up to + // num_threads - busy_threads threads), which has to be accounted for. + if (pool->busy_threads + pool->num_work + 1 > pool->num_threads && + pool->num_threads < pool->max_threads) + { + // We ignore failures, unless there are no threads available (below). + add_thread(pool); + } + + if (pool->num_threads) { + MP_TARRAY_INSERT_AT(pool, pool->work, pool->num_work, 0, work); + pthread_cond_signal(&pool->wakeup); + } else { + ok = false; + } + pthread_mutex_unlock(&pool->lock); + return ok; } |