diff options
author | Kacper Michajłow <kasper93@gmail.com> | 2023-10-21 04:55:41 +0200 |
---|---|---|
committer | Dudemanguy <random342@airmail.cc> | 2023-11-05 17:36:17 +0000 |
commit | 174df99ffa53f1091589eaa4fa0c16cdd55a9326 (patch) | |
tree | 3a60d45615f18beed98a9b08267c28ed7e05dd5f /misc/thread_pool.c | |
parent | 3a8b107f6216b38a151d5ca1e9d4f2727e3418f5 (diff) | |
download | mpv-174df99ffa53f1091589eaa4fa0c16cdd55a9326.tar.bz2 mpv-174df99ffa53f1091589eaa4fa0c16cdd55a9326.tar.xz |
ALL: use new mp_thread abstraction
Diffstat (limited to 'misc/thread_pool.c')
-rw-r--r-- | misc/thread_pool.c | 74 |
1 files changed, 36 insertions, 38 deletions
diff --git a/misc/thread_pool.c b/misc/thread_pool.c index 9e47f13279..51ca22356d 100644 --- a/misc/thread_pool.c +++ b/misc/thread_pool.c @@ -13,8 +13,6 @@ * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. */ -#include <pthread.h> - #include "common/common.h" #include "osdep/threads.h" #include "osdep/timer.h" @@ -33,12 +31,12 @@ struct work { struct mp_thread_pool { int min_threads, max_threads; - pthread_mutex_t lock; - pthread_cond_t wakeup; + mp_mutex lock; + mp_cond wakeup; // --- the following fields are protected by lock - pthread_t *threads; + mp_thread *threads; int num_threads; // Number of threads which have taken up work and are still processing it. @@ -50,15 +48,15 @@ struct mp_thread_pool { int num_work; }; -static void *worker_thread(void *arg) +static MP_THREAD_VOID worker_thread(void *arg) { struct mp_thread_pool *pool = arg; - mpthread_set_name("worker"); + mp_thread_set_name("worker"); - pthread_mutex_lock(&pool->lock); + mp_mutex_lock(&pool->lock); - struct timespec ts = {0}; + int64_t destroy_deadline = 0; bool got_timeout = false; while (1) { struct work work = {0}; @@ -72,25 +70,25 @@ static void *worker_thread(void *arg) break; if (pool->num_threads > pool->min_threads) { - if (!ts.tv_sec && !ts.tv_nsec) - ts = mp_rel_time_to_timespec(DESTROY_TIMEOUT); - if (pthread_cond_timedwait(&pool->wakeup, &pool->lock, &ts)) + if (!destroy_deadline) + destroy_deadline = mp_time_ns_add(mp_time_ns(), DESTROY_TIMEOUT); + if (mp_cond_timedwait_until(&pool->wakeup, &pool->lock, destroy_deadline)) got_timeout = pool->num_threads > pool->min_threads; } else { - pthread_cond_wait(&pool->wakeup, &pool->lock); + mp_cond_wait(&pool->wakeup, &pool->lock); } continue; } pool->busy_threads += 1; - pthread_mutex_unlock(&pool->lock); + mp_mutex_unlock(&pool->lock); work.fn(work.fn_ctx); - pthread_mutex_lock(&pool->lock); + mp_mutex_lock(&pool->lock); pool->busy_threads -= 1; - ts = (struct timespec){0}; + destroy_deadline = 0; got_timeout = false; } @@ -98,18 +96,18 @@ static void *worker_thread(void *arg) // timeout, and nobody is waiting for us. We have to remove ourselves. if (!pool->terminate) { for (int n = 0; n < pool->num_threads; n++) { - if (pthread_equal(pool->threads[n], pthread_self())) { - pthread_detach(pthread_self()); + if (mp_thread_equal(pool->threads[n], mp_thread_self())) { + pthread_detach(mp_thread_self()); MP_TARRAY_REMOVE_AT(pool->threads, pool->num_threads, n); - pthread_mutex_unlock(&pool->lock); - return NULL; + mp_mutex_unlock(&pool->lock); + MP_THREAD_RETURN(); } } MP_ASSERT_UNREACHABLE(); } - pthread_mutex_unlock(&pool->lock); - return NULL; + mp_mutex_unlock(&pool->lock); + MP_THREAD_RETURN(); } static void thread_pool_dtor(void *ctx) @@ -117,33 +115,33 @@ static void thread_pool_dtor(void *ctx) struct mp_thread_pool *pool = ctx; - pthread_mutex_lock(&pool->lock); + mp_mutex_lock(&pool->lock); pool->terminate = true; - pthread_cond_broadcast(&pool->wakeup); + mp_cond_broadcast(&pool->wakeup); - pthread_t *threads = pool->threads; + mp_thread *threads = pool->threads; int num_threads = pool->num_threads; pool->threads = NULL; pool->num_threads = 0; - pthread_mutex_unlock(&pool->lock); + mp_mutex_unlock(&pool->lock); for (int n = 0; n < num_threads; n++) - pthread_join(threads[n], NULL); + mp_thread_join(threads[n]); assert(pool->num_work == 0); assert(pool->num_threads == 0); - pthread_cond_destroy(&pool->wakeup); - pthread_mutex_destroy(&pool->lock); + mp_cond_destroy(&pool->wakeup); + mp_mutex_destroy(&pool->lock); } static bool add_thread(struct mp_thread_pool *pool) { - pthread_t thread; + mp_thread thread; - if (pthread_create(&thread, NULL, worker_thread, pool) != 0) + if (mp_thread_create(&thread, worker_thread, pool) != 0) return false; MP_TARRAY_APPEND(pool, pool->threads, pool->num_threads, thread); @@ -160,17 +158,17 @@ struct mp_thread_pool *mp_thread_pool_create(void *ta_parent, int init_threads, struct mp_thread_pool *pool = talloc_zero(ta_parent, struct mp_thread_pool); talloc_set_destructor(pool, thread_pool_dtor); - pthread_mutex_init(&pool->lock, NULL); - pthread_cond_init(&pool->wakeup, NULL); + mp_mutex_init(&pool->lock); + mp_cond_init(&pool->wakeup); pool->min_threads = min_threads; pool->max_threads = max_threads; - pthread_mutex_lock(&pool->lock); + mp_mutex_lock(&pool->lock); for (int n = 0; n < init_threads; n++) add_thread(pool); bool ok = pool->num_threads >= init_threads; - pthread_mutex_unlock(&pool->lock); + mp_mutex_unlock(&pool->lock); if (!ok) TA_FREEP(&pool); @@ -185,7 +183,7 @@ static bool thread_pool_add(struct mp_thread_pool *pool, void (*fn)(void *ctx), assert(fn); - pthread_mutex_lock(&pool->lock); + mp_mutex_lock(&pool->lock); struct work work = {fn, fn_ctx}; // If there are not enough threads to process all at once, but we can @@ -203,10 +201,10 @@ static bool thread_pool_add(struct mp_thread_pool *pool, void (*fn)(void *ctx), if (ok) { MP_TARRAY_INSERT_AT(pool, pool->work, pool->num_work, 0, work); - pthread_cond_signal(&pool->wakeup); + mp_cond_signal(&pool->wakeup); } - pthread_mutex_unlock(&pool->lock); + mp_mutex_unlock(&pool->lock); return ok; } |