summaryrefslogtreecommitdiffstats
path: root/misc
diff options
context:
space:
mode:
authorwm4 <wm4@nowhere>2017-04-01 20:32:01 +0200
committerwm4 <wm4@nowhere>2017-04-01 20:32:01 +0200
commitec3dd7164ceea9ceda71e05e774501874fd20924 (patch)
tree7be23c4ff8da7fb1e612b8aba66f0c512dcc632d /misc
parent7bab24b9b3b0621a3e91da0ac7585fd0c55085e3 (diff)
downloadmpv-ec3dd7164ceea9ceda71e05e774501874fd20924.tar.bz2
mpv-ec3dd7164ceea9ceda71e05e774501874fd20924.tar.xz
misc: add a thread pool
To be used by the following commits.
Diffstat (limited to 'misc')
-rw-r--r--misc/thread_pool.c125
-rw-r--r--misc/thread_pool.h10
2 files changed, 135 insertions, 0 deletions
diff --git a/misc/thread_pool.c b/misc/thread_pool.c
new file mode 100644
index 0000000000..dddfad6734
--- /dev/null
+++ b/misc/thread_pool.c
@@ -0,0 +1,125 @@
+/*
+ * This file is part of mpv.
+ *
+ * mpv is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * mpv is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with mpv. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#include <pthread.h>
+
+#include "common/common.h"
+
+#include "thread_pool.h"
+
+struct work {
+ void (*fn)(void *ctx);
+ void *fn_ctx;
+};
+
+struct mp_thread_pool {
+ pthread_t *threads;
+ int num_threads;
+
+ pthread_mutex_t lock;
+ pthread_cond_t wakeup;
+
+ // --- the following fields are protected by lock
+ bool terminate;
+ struct work *work;
+ int num_work;
+};
+
+static void *worker_thread(void *arg)
+{
+ struct mp_thread_pool *pool = arg;
+
+ pthread_mutex_lock(&pool->lock);
+ while (1) {
+ while (!pool->num_work && !pool->terminate)
+ pthread_cond_wait(&pool->wakeup, &pool->lock);
+
+ if (!pool->num_work && pool->terminate)
+ break;
+
+ assert(pool->num_work > 0);
+ struct work work = pool->work[pool->num_work - 1];
+ pool->num_work -= 1;
+
+ pthread_mutex_unlock(&pool->lock);
+ work.fn(work.fn_ctx);
+ pthread_mutex_lock(&pool->lock);
+ }
+ assert(pool->num_work == 0);
+ pthread_mutex_unlock(&pool->lock);
+
+ return NULL;
+}
+
+static void thread_pool_dtor(void *ctx)
+{
+ struct mp_thread_pool *pool = ctx;
+
+ pthread_mutex_lock(&pool->lock);
+ pool->terminate = true;
+ pthread_cond_broadcast(&pool->wakeup);
+ pthread_mutex_unlock(&pool->lock);
+
+ for (int n = 0; n < pool->num_threads; n++)
+ pthread_join(pool->threads[n], NULL);
+
+ assert(pool->num_work == 0);
+ pthread_cond_destroy(&pool->wakeup);
+ pthread_mutex_destroy(&pool->lock);
+}
+
+// Create a thread pool with the given number of worker threads. This can return
+// NULL if the worker threads could not be created. The thread pool can be
+// destroyed with talloc_free(pool), or indirectly with talloc_free(ta_parent).
+// If there are still work items on freeing, it will block until all work items
+// are done, and the threads terminate.
+struct mp_thread_pool *mp_thread_pool_create(void *ta_parent, int threads)
+{
+ assert(threads > 0);
+
+ struct mp_thread_pool *pool = talloc_zero(ta_parent, struct mp_thread_pool);
+ talloc_set_destructor(pool, thread_pool_dtor);
+
+ pthread_mutex_init(&pool->lock, NULL);
+ pthread_cond_init(&pool->wakeup, NULL);
+
+ for (int n = 0; n < threads; n++) {
+ pthread_t thread;
+ if (pthread_create(&thread, NULL, worker_thread, pool)) {
+ talloc_free(pool);
+ return NULL;
+ }
+ MP_TARRAY_APPEND(pool, pool->threads, pool->num_threads, thread);
+ }
+
+ return pool;
+}
+
+// Queue a function to be run on a worker thread: fn(fn_ctx)
+// If no worker thread is currently available, it's appended to a list in memory
+// with unbounded size. This function always returns immediately.
+// Concurrent queue calls are allowed, as long as it does not overlap with
+// pool destruction.
+void mp_thread_pool_queue(struct mp_thread_pool *pool, void (*fn)(void *ctx),
+ void *fn_ctx)
+{
+ pthread_mutex_lock(&pool->lock);
+ struct work work = {fn, fn_ctx};
+ MP_TARRAY_INSERT_AT(pool, pool->work, pool->num_work, 0, work);
+ pthread_cond_signal(&pool->wakeup);
+ pthread_mutex_unlock(&pool->lock);
+}
diff --git a/misc/thread_pool.h b/misc/thread_pool.h
new file mode 100644
index 0000000000..c7af7b2b57
--- /dev/null
+++ b/misc/thread_pool.h
@@ -0,0 +1,10 @@
+#ifndef MPV_MP_THREAD_POOL_H
+#define MPV_MP_THREAD_POOL_H
+
+struct mp_thread_pool;
+
+struct mp_thread_pool *mp_thread_pool_create(void *ta_parent, int threads);
+void mp_thread_pool_queue(struct mp_thread_pool *pool, void (*fn)(void *ctx),
+ void *fn_ctx);
+
+#endif