summaryrefslogtreecommitdiffstats
path: root/misc/thread_pool.c
diff options
context:
space:
mode:
Diffstat (limited to 'misc/thread_pool.c')
-rw-r--r--misc/thread_pool.c200
1 files changed, 149 insertions, 51 deletions
diff --git a/misc/thread_pool.c b/misc/thread_pool.c
index dddfad6734..217c990c19 100644
--- a/misc/thread_pool.c
+++ b/misc/thread_pool.c
@@ -1,40 +1,51 @@
-/*
- * This file is part of mpv.
+/* Copyright (C) 2018 the mpv developers
*
- * mpv is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 2.1 of the License, or (at your option) any later version.
+ * Permission to use, copy, modify, and/or distribute this software for any
+ * purpose with or without fee is hereby granted, provided that the above
+ * copyright notice and this permission notice appear in all copies.
*
- * mpv is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with mpv. If not, see <http://www.gnu.org/licenses/>.
+ * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+ * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+ * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+ * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+ * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+ * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+ * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
#include <pthread.h>
#include "common/common.h"
+#include "osdep/threads.h"
+#include "osdep/timer.h"
#include "thread_pool.h"
+// Threads destroy themselves after this many seconds, if there's no new work
+// and the thread count is above the configured minimum.
+#define DESTROY_TIMEOUT 10
+
struct work {
void (*fn)(void *ctx);
void *fn_ctx;
};
struct mp_thread_pool {
- pthread_t *threads;
- int num_threads;
+ int min_threads, max_threads;
pthread_mutex_t lock;
pthread_cond_t wakeup;
// --- the following fields are protected by lock
+
+ pthread_t *threads;
+ int num_threads;
+
+ // Number of threads which have taken up work and are still processing it.
+ int busy_threads;
+
bool terminate;
+
struct work *work;
int num_work;
};
@@ -43,25 +54,61 @@ static void *worker_thread(void *arg)
{
struct mp_thread_pool *pool = arg;
+ mpthread_set_name("worker");
+
pthread_mutex_lock(&pool->lock);
- while (1) {
- while (!pool->num_work && !pool->terminate)
- pthread_cond_wait(&pool->wakeup, &pool->lock);
- if (!pool->num_work && pool->terminate)
- break;
+ struct timespec ts = {0};
+ bool got_timeout = false;
+ while (1) {
+ struct work work = {0};
+ if (pool->num_work > 0) {
+ work = pool->work[pool->num_work - 1];
+ pool->num_work -= 1;
+ }
- assert(pool->num_work > 0);
- struct work work = pool->work[pool->num_work - 1];
- pool->num_work -= 1;
+ if (!work.fn) {
+ if (got_timeout || pool->terminate)
+ break;
+
+ if (pool->num_threads > pool->min_threads) {
+ if (!ts.tv_sec && !ts.tv_nsec)
+ ts = mp_rel_time_to_timespec(DESTROY_TIMEOUT);
+ if (pthread_cond_timedwait(&pool->wakeup, &pool->lock, &ts))
+ got_timeout = pool->num_threads > pool->min_threads;
+ } else {
+ pthread_cond_wait(&pool->wakeup, &pool->lock);
+ }
+ continue;
+ }
+ pool->busy_threads += 1;
pthread_mutex_unlock(&pool->lock);
+
work.fn(work.fn_ctx);
+
pthread_mutex_lock(&pool->lock);
+ pool->busy_threads -= 1;
+
+ ts = (struct timespec){0};
+ got_timeout = false;
+ }
+
+ // If no termination signal was given, it must mean we died because of a
+ // timeout, and nobody is waiting for us. We have to remove ourselves.
+ if (!pool->terminate) {
+ for (int n = 0; n < pool->num_threads; n++) {
+ if (pthread_equal(pool->threads[n], pthread_self())) {
+ pthread_detach(pthread_self());
+ MP_TARRAY_REMOVE_AT(pool->threads, pool->num_threads, n);
+ pthread_mutex_unlock(&pool->lock);
+ return NULL;
+ }
+ }
+ assert(0);
}
- assert(pool->num_work == 0);
- pthread_mutex_unlock(&pool->lock);
+ pthread_mutex_unlock(&pool->lock);
return NULL;
}
@@ -69,27 +116,46 @@ static void thread_pool_dtor(void *ctx)
{
struct mp_thread_pool *pool = ctx;
+
pthread_mutex_lock(&pool->lock);
+
pool->terminate = true;
pthread_cond_broadcast(&pool->wakeup);
+
+ pthread_t *threads = pool->threads;
+ int num_threads = pool->num_threads;
+
+ pool->threads = NULL;
+ pool->num_threads = 0;
+
pthread_mutex_unlock(&pool->lock);
- for (int n = 0; n < pool->num_threads; n++)
- pthread_join(pool->threads[n], NULL);
+ for (int n = 0; n < num_threads; n++)
+ pthread_join(threads[n], NULL);
assert(pool->num_work == 0);
+ assert(pool->num_threads == 0);
pthread_cond_destroy(&pool->wakeup);
pthread_mutex_destroy(&pool->lock);
}
-// Create a thread pool with the given number of worker threads. This can return
-// NULL if the worker threads could not be created. The thread pool can be
-// destroyed with talloc_free(pool), or indirectly with talloc_free(ta_parent).
-// If there are still work items on freeing, it will block until all work items
-// are done, and the threads terminate.
-struct mp_thread_pool *mp_thread_pool_create(void *ta_parent, int threads)
+static bool add_thread(struct mp_thread_pool *pool)
{
- assert(threads > 0);
+ pthread_t thread;
+
+ if (pthread_create(&thread, NULL, worker_thread, pool) != 0)
+ return false;
+
+ MP_TARRAY_APPEND(pool, pool->threads, pool->num_threads, thread);
+ return true;
+}
+
+struct mp_thread_pool *mp_thread_pool_create(void *ta_parent, int init_threads,
+ int min_threads, int max_threads)
+{
+ assert(min_threads >= 0);
+ assert(init_threads <= min_threads);
+ assert(max_threads > 0 && max_threads >= min_threads);
struct mp_thread_pool *pool = talloc_zero(ta_parent, struct mp_thread_pool);
talloc_set_destructor(pool, thread_pool_dtor);
@@ -97,29 +163,61 @@ struct mp_thread_pool *mp_thread_pool_create(void *ta_parent, int threads)
pthread_mutex_init(&pool->lock, NULL);
pthread_cond_init(&pool->wakeup, NULL);
- for (int n = 0; n < threads; n++) {
- pthread_t thread;
- if (pthread_create(&thread, NULL, worker_thread, pool)) {
- talloc_free(pool);
- return NULL;
- }
- MP_TARRAY_APPEND(pool, pool->threads, pool->num_threads, thread);
- }
+ pool->min_threads = min_threads;
+ pool->max_threads = max_threads;
+
+ pthread_mutex_lock(&pool->lock);
+ for (int n = 0; n < init_threads; n++)
+ add_thread(pool);
+ bool ok = pool->num_threads >= init_threads;
+ pthread_mutex_unlock(&pool->lock);
+
+ if (!ok)
+ TA_FREEP(&pool);
return pool;
}
-// Queue a function to be run on a worker thread: fn(fn_ctx)
-// If no worker thread is currently available, it's appended to a list in memory
-// with unbounded size. This function always returns immediately.
-// Concurrent queue calls are allowed, as long as it does not overlap with
-// pool destruction.
-void mp_thread_pool_queue(struct mp_thread_pool *pool, void (*fn)(void *ctx),
- void *fn_ctx)
+static bool thread_pool_add(struct mp_thread_pool *pool, void (*fn)(void *ctx),
+ void *fn_ctx, bool allow_queue)
{
+ bool ok = true;
+
+ assert(fn);
+
pthread_mutex_lock(&pool->lock);
struct work work = {fn, fn_ctx};
- MP_TARRAY_INSERT_AT(pool, pool->work, pool->num_work, 0, work);
- pthread_cond_signal(&pool->wakeup);
+
+ // If there are not enough threads to process all at once, but we can
+ // create a new thread, then do so. If work is queued quickly, it can
+ // happen that not all available threads have picked up work yet (up to
+ // num_threads - busy_threads threads), which has to be accounted for.
+ if (pool->busy_threads + pool->num_work + 1 > pool->num_threads &&
+ pool->num_threads < pool->max_threads)
+ {
+ if (!add_thread(pool)) {
+ // If we can queue it, it'll get done as long as there is 1 thread.
+ ok = allow_queue && pool->num_threads > 0;
+ }
+ }
+
+ if (ok) {
+ MP_TARRAY_INSERT_AT(pool, pool->work, pool->num_work, 0, work);
+ pthread_cond_signal(&pool->wakeup);
+ }
+
pthread_mutex_unlock(&pool->lock);
+ return ok;
+}
+
+bool mp_thread_pool_queue(struct mp_thread_pool *pool, void (*fn)(void *ctx),
+ void *fn_ctx)
+{
+ return thread_pool_add(pool, fn, fn_ctx, true);
+}
+
+bool mp_thread_pool_run(struct mp_thread_pool *pool, void (*fn)(void *ctx),
+ void *fn_ctx)
+{
+ return thread_pool_add(pool, fn, fn_ctx, false);
}