summaryrefslogtreecommitdiffstats
path: root/misc
diff options
context:
space:
mode:
authorwm4 <wm4@nowhere>2018-05-06 16:17:47 +0200
committerwm4 <wm4@nowhere>2018-05-24 19:56:34 +0200
commita1ed1f8be09c927994b58399e77e99336ec7f436 (patch)
tree8989650d9c899dcd140223e10f15a756ad9f0c7f /misc
parenta0308d31697c5dd601ee036acbc937c361746cc3 (diff)
downloadmpv-a1ed1f8be09c927994b58399e77e99336ec7f436.tar.bz2
mpv-a1ed1f8be09c927994b58399e77e99336ec7f436.tar.xz
thread_pool: make it slightly less dumb
The existing thread pool code is the most primitive thread pool possible. That's fine, but one annoying thing was that it used a static number of threads. Make it dynamic, so we don't need to "waste" idle threads. This tries to add threads as needed. If threads are idle for some time, destroy them again until a minimum number of threads is reached. Also change the license to ISC.
Diffstat (limited to 'misc')
-rw-r--r--misc/thread_pool.c177
-rw-r--r--misc/thread_pool.h6
2 files changed, 141 insertions, 42 deletions
diff --git a/misc/thread_pool.c b/misc/thread_pool.c
index dddfad6734..25693a7f25 100644
--- a/misc/thread_pool.c
+++ b/misc/thread_pool.c
@@ -1,40 +1,50 @@
-/*
- * This file is part of mpv.
+/* Copyright (C) 2018 the mpv developers
*
- * mpv is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 2.1 of the License, or (at your option) any later version.
+ * Permission to use, copy, modify, and/or distribute this software for any
+ * purpose with or without fee is hereby granted, provided that the above
+ * copyright notice and this permission notice appear in all copies.
*
- * mpv is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with mpv. If not, see <http://www.gnu.org/licenses/>.
+ * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+ * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+ * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+ * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+ * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+ * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+ * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
#include <pthread.h>
#include "common/common.h"
+#include "osdep/timer.h"
#include "thread_pool.h"
+// Threads destroy themselves after this many seconds, if there's no new work
+// and the thread count is above the configured minimum.
+#define DESTROY_TIMEOUT 10
+
struct work {
void (*fn)(void *ctx);
void *fn_ctx;
};
struct mp_thread_pool {
- pthread_t *threads;
- int num_threads;
+ int min_threads, max_threads;
pthread_mutex_t lock;
pthread_cond_t wakeup;
// --- the following fields are protected by lock
+
+ pthread_t *threads;
+ int num_threads;
+
+ // Number of threads which have taken up work and are still processing it.
+ int busy_threads;
+
bool terminate;
+
struct work *work;
int num_work;
};
@@ -44,24 +54,58 @@ static void *worker_thread(void *arg)
struct mp_thread_pool *pool = arg;
pthread_mutex_lock(&pool->lock);
- while (1) {
- while (!pool->num_work && !pool->terminate)
- pthread_cond_wait(&pool->wakeup, &pool->lock);
- if (!pool->num_work && pool->terminate)
- break;
+ struct timespec ts = {0};
+ bool got_timeout = false;
+ while (1) {
+ struct work work = {0};
+ if (pool->num_work > 0) {
+ work = pool->work[pool->num_work - 1];
+ pool->num_work -= 1;
+ }
- assert(pool->num_work > 0);
- struct work work = pool->work[pool->num_work - 1];
- pool->num_work -= 1;
+ if (!work.fn) {
+ if (got_timeout || pool->terminate)
+ break;
+
+ if (pool->num_threads > pool->min_threads) {
+ if (!ts.tv_sec && !ts.tv_nsec)
+ ts = mp_rel_time_to_timespec(DESTROY_TIMEOUT);
+ if (pthread_cond_timedwait(&pool->wakeup, &pool->lock, &ts))
+ got_timeout = pool->num_threads > pool->min_threads;
+ } else {
+ pthread_cond_wait(&pool->wakeup, &pool->lock);
+ }
+ continue;
+ }
+ pool->busy_threads += 1;
pthread_mutex_unlock(&pool->lock);
+
work.fn(work.fn_ctx);
+
pthread_mutex_lock(&pool->lock);
+ pool->busy_threads -= 1;
+
+ ts = (struct timespec){0};
+ got_timeout = false;
+ }
+
+ // If no termination signal was given, it must mean we died because of a
+ // timeout, and nobody is waiting for us. We have to remove ourselves.
+ if (!pool->terminate) {
+ for (int n = 0; n < pool->num_threads; n++) {
+ if (pthread_equal(pool->threads[n], pthread_self())) {
+ pthread_detach(pthread_self());
+ MP_TARRAY_REMOVE_AT(pool->threads, pool->num_threads, n);
+ pthread_mutex_unlock(&pool->lock);
+ return NULL;
+ }
+ }
+ assert(0);
}
- assert(pool->num_work == 0);
- pthread_mutex_unlock(&pool->lock);
+ pthread_mutex_unlock(&pool->lock);
return NULL;
}
@@ -69,27 +113,53 @@ static void thread_pool_dtor(void *ctx)
{
struct mp_thread_pool *pool = ctx;
+
pthread_mutex_lock(&pool->lock);
+
pool->terminate = true;
pthread_cond_broadcast(&pool->wakeup);
+
+ pthread_t *threads = pool->threads;
+ int num_threads = pool->num_threads;
+
+ pool->threads = NULL;
+ pool->num_threads = 0;
+
pthread_mutex_unlock(&pool->lock);
- for (int n = 0; n < pool->num_threads; n++)
- pthread_join(pool->threads[n], NULL);
+ for (int n = 0; n < num_threads; n++)
+ pthread_join(threads[n], NULL);
assert(pool->num_work == 0);
+ assert(pool->num_threads == 0);
pthread_cond_destroy(&pool->wakeup);
pthread_mutex_destroy(&pool->lock);
}
+static void add_thread(struct mp_thread_pool *pool)
+{
+ pthread_t thread;
+
+ if (pthread_create(&thread, NULL, worker_thread, pool) == 0)
+ MP_TARRAY_APPEND(pool, pool->threads, pool->num_threads, thread);
+}
+
// Create a thread pool with the given number of worker threads. This can return
// NULL if the worker threads could not be created. The thread pool can be
// destroyed with talloc_free(pool), or indirectly with talloc_free(ta_parent).
// If there are still work items on freeing, it will block until all work items
// are done, and the threads terminate.
-struct mp_thread_pool *mp_thread_pool_create(void *ta_parent, int threads)
+// init_threads is the number of threads created in this function (and it fails
+// if it could not be done). min_threads must be >=, if it's >, then the
+// remaining threads will be created on demand, but never destroyed.
+// If init_threads > 0, then mp_thread_pool_queue() can never fail.
+// If init_threads == 0, mp_thread_pool_create() itself can never fail.
+struct mp_thread_pool *mp_thread_pool_create(void *ta_parent, int init_threads,
+ int min_threads, int max_threads)
{
- assert(threads > 0);
+ assert(min_threads >= 0);
+ assert(init_threads <= min_threads);
+ assert(max_threads > 0 && max_threads >= min_threads);
struct mp_thread_pool *pool = talloc_zero(ta_parent, struct mp_thread_pool);
talloc_set_destructor(pool, thread_pool_dtor);
@@ -97,14 +167,17 @@ struct mp_thread_pool *mp_thread_pool_create(void *ta_parent, int threads)
pthread_mutex_init(&pool->lock, NULL);
pthread_cond_init(&pool->wakeup, NULL);
- for (int n = 0; n < threads; n++) {
- pthread_t thread;
- if (pthread_create(&thread, NULL, worker_thread, pool)) {
- talloc_free(pool);
- return NULL;
- }
- MP_TARRAY_APPEND(pool, pool->threads, pool->num_threads, thread);
- }
+ pool->min_threads = min_threads;
+ pool->max_threads = max_threads;
+
+ pthread_mutex_lock(&pool->lock);
+ for (int n = 0; n < init_threads; n++)
+ add_thread(pool);
+ bool ok = pool->num_threads >= init_threads;
+ pthread_mutex_unlock(&pool->lock);
+
+ if (!ok)
+ TA_FREEP(&pool);
return pool;
}
@@ -114,12 +187,36 @@ struct mp_thread_pool *mp_thread_pool_create(void *ta_parent, int threads)
// with unbounded size. This function always returns immediately.
// Concurrent queue calls are allowed, as long as it does not overlap with
// pool destruction.
-void mp_thread_pool_queue(struct mp_thread_pool *pool, void (*fn)(void *ctx),
+// This function is explicitly thread-safe.
+// Cannot fail if thread pool was created with at least 1 thread.
+bool mp_thread_pool_queue(struct mp_thread_pool *pool, void (*fn)(void *ctx),
void *fn_ctx)
{
+ bool ok = true;
+
+ assert(fn);
+
pthread_mutex_lock(&pool->lock);
struct work work = {fn, fn_ctx};
- MP_TARRAY_INSERT_AT(pool, pool->work, pool->num_work, 0, work);
- pthread_cond_signal(&pool->wakeup);
+
+ // If there are not enough threads to process all at once, but we can
+ // create a new thread, then do so. If work is queued quickly, it can
+ // happen that not all available threads have picked up work yet (up to
+ // num_threads - busy_threads threads), which has to be accounted for.
+ if (pool->busy_threads + pool->num_work + 1 > pool->num_threads &&
+ pool->num_threads < pool->max_threads)
+ {
+ // We ignore failures, unless there are no threads available (below).
+ add_thread(pool);
+ }
+
+ if (pool->num_threads) {
+ MP_TARRAY_INSERT_AT(pool, pool->work, pool->num_work, 0, work);
+ pthread_cond_signal(&pool->wakeup);
+ } else {
+ ok = false;
+ }
+
pthread_mutex_unlock(&pool->lock);
+ return ok;
}
diff --git a/misc/thread_pool.h b/misc/thread_pool.h
index c7af7b2b57..7d5aed0531 100644
--- a/misc/thread_pool.h
+++ b/misc/thread_pool.h
@@ -3,8 +3,10 @@
struct mp_thread_pool;
-struct mp_thread_pool *mp_thread_pool_create(void *ta_parent, int threads);
-void mp_thread_pool_queue(struct mp_thread_pool *pool, void (*fn)(void *ctx),
+struct mp_thread_pool *mp_thread_pool_create(void *ta_parent, int init_threads,
+ int min_threads, int max_threads);
+
+bool mp_thread_pool_queue(struct mp_thread_pool *pool, void (*fn)(void *ctx),
void *fn_ctx);
#endif