diff options
author | Kacper Michajłow <kasper93@gmail.com> | 2023-10-21 04:55:41 +0200 |
---|---|---|
committer | Dudemanguy <random342@airmail.cc> | 2023-11-05 17:36:17 +0000 |
commit | 174df99ffa53f1091589eaa4fa0c16cdd55a9326 (patch) | |
tree | 3a60d45615f18beed98a9b08267c28ed7e05dd5f /misc | |
parent | 3a8b107f6216b38a151d5ca1e9d4f2727e3418f5 (diff) | |
download | mpv-174df99ffa53f1091589eaa4fa0c16cdd55a9326.tar.bz2 mpv-174df99ffa53f1091589eaa4fa0c16cdd55a9326.tar.xz |
ALL: use new mp_thread abstraction
Diffstat (limited to 'misc')
-rw-r--r-- | misc/dispatch.c | 93 | ||||
-rw-r--r-- | misc/jni.c | 12 | ||||
-rw-r--r-- | misc/random.c | 12 | ||||
-rw-r--r-- | misc/rendezvous.c | 15 | ||||
-rw-r--r-- | misc/thread_pool.c | 74 | ||||
-rw-r--r-- | misc/thread_tools.c | 70 | ||||
-rw-r--r-- | misc/thread_tools.h | 11 |
7 files changed, 143 insertions, 144 deletions
diff --git a/misc/dispatch.c b/misc/dispatch.c index d88d775d97..56348f2b13 100644 --- a/misc/dispatch.c +++ b/misc/dispatch.c @@ -26,8 +26,8 @@ struct mp_dispatch_queue { struct mp_dispatch_item *head, *tail; - pthread_mutex_t lock; - pthread_cond_t cond; + mp_mutex lock; + mp_cond cond; void (*wakeup_fn)(void *wakeup_ctx); void *wakeup_ctx; void (*onlock_fn)(void *onlock_ctx); @@ -39,7 +39,7 @@ struct mp_dispatch_queue { // The target thread is in mp_dispatch_queue_process() (and either idling, // locked, or running a dispatch callback). bool in_process; - pthread_t in_process_thread; + mp_thread in_process_thread; // The target thread is in mp_dispatch_queue_process(), and currently // something has exclusive access to it (e.g. running a dispatch callback, // or a different thread got it with mp_dispatch_lock()). @@ -48,7 +48,7 @@ struct mp_dispatch_queue { size_t lock_requests; // locked==true is due to a mp_dispatch_lock() call (for debugging). bool locked_explicit; - pthread_t locked_explicit_thread; + mp_thread locked_explicit_thread; }; struct mp_dispatch_item { @@ -67,8 +67,8 @@ static void queue_dtor(void *p) assert(!queue->in_process); assert(!queue->lock_requests); assert(!queue->locked); - pthread_cond_destroy(&queue->cond); - pthread_mutex_destroy(&queue->lock); + mp_cond_destroy(&queue->cond); + mp_mutex_destroy(&queue->lock); } // A dispatch queue lets other threads run callbacks in a target thread. @@ -76,7 +76,7 @@ static void queue_dtor(void *p) // Free the dispatch queue with talloc_free(). At the time of destruction, // the queue must be empty. The easiest way to guarantee this is to // terminate all potential senders, then call mp_dispatch_run() with a -// function that e.g. makes the target thread exit, then pthread_join() the +// function that e.g. makes the target thread exit, then mp_thread_join() the // target thread, and finally destroy the queue. Another way is calling // mp_dispatch_queue_process() after terminating all potential senders, and // then destroying the queue. @@ -85,8 +85,8 @@ struct mp_dispatch_queue *mp_dispatch_create(void *ta_parent) struct mp_dispatch_queue *queue = talloc_ptrtype(ta_parent, queue); *queue = (struct mp_dispatch_queue){0}; talloc_set_destructor(queue, queue_dtor); - pthread_mutex_init(&queue->lock, NULL); - pthread_cond_init(&queue->cond, NULL); + mp_mutex_init(&queue->lock); + mp_cond_init(&queue->cond); return queue; } @@ -126,14 +126,14 @@ void mp_dispatch_set_onlock_fn(struct mp_dispatch_queue *queue, static void mp_dispatch_append(struct mp_dispatch_queue *queue, struct mp_dispatch_item *item) { - pthread_mutex_lock(&queue->lock); + mp_mutex_lock(&queue->lock); if (item->mergeable) { for (struct mp_dispatch_item *cur = queue->head; cur; cur = cur->next) { if (cur->mergeable && cur->fn == item->fn && cur->fn_data == item->fn_data) { talloc_free(item); - pthread_mutex_unlock(&queue->lock); + mp_mutex_unlock(&queue->lock); return; } } @@ -148,12 +148,12 @@ static void mp_dispatch_append(struct mp_dispatch_queue *queue, // Wake up the main thread; note that other threads might wait on this // condition for reasons, so broadcast the condition. - pthread_cond_broadcast(&queue->cond); + mp_cond_broadcast(&queue->cond); // No wakeup callback -> assume mp_dispatch_queue_process() needs to be // interrupted instead. if (!queue->wakeup_fn) queue->interrupted = true; - pthread_mutex_unlock(&queue->lock); + mp_mutex_unlock(&queue->lock); if (queue->wakeup_fn) queue->wakeup_fn(queue->wakeup_ctx); @@ -218,7 +218,7 @@ void mp_dispatch_enqueue_notify(struct mp_dispatch_queue *queue, void mp_dispatch_cancel_fn(struct mp_dispatch_queue *queue, mp_dispatch_fn fn, void *fn_data) { - pthread_mutex_lock(&queue->lock); + mp_mutex_lock(&queue->lock); struct mp_dispatch_item **pcur = &queue->head; queue->tail = NULL; while (*pcur) { @@ -231,7 +231,7 @@ void mp_dispatch_cancel_fn(struct mp_dispatch_queue *queue, pcur = &cur->next; } } - pthread_mutex_unlock(&queue->lock); + mp_mutex_unlock(&queue->lock); } // Run fn(fn_data) on the target thread synchronously. This function enqueues @@ -247,10 +247,10 @@ void mp_dispatch_run(struct mp_dispatch_queue *queue, }; mp_dispatch_append(queue, &item); - pthread_mutex_lock(&queue->lock); + mp_mutex_lock(&queue->lock); while (!item.completed) - pthread_cond_wait(&queue->cond, &queue->lock); - pthread_mutex_unlock(&queue->lock); + mp_cond_wait(&queue->cond, &queue->lock); + mp_mutex_unlock(&queue->lock); } // Process any outstanding dispatch items in the queue. This also handles @@ -271,18 +271,18 @@ void mp_dispatch_run(struct mp_dispatch_queue *queue, // no enqueued callback can call the lock/unlock functions). void mp_dispatch_queue_process(struct mp_dispatch_queue *queue, double timeout) { - pthread_mutex_lock(&queue->lock); + mp_mutex_lock(&queue->lock); queue->wait = timeout > 0 ? mp_time_ns_add(mp_time_ns(), timeout) : 0; assert(!queue->in_process); // recursion not allowed queue->in_process = true; - queue->in_process_thread = pthread_self(); + queue->in_process_thread = mp_thread_self(); // Wake up thread which called mp_dispatch_lock(). if (queue->lock_requests) - pthread_cond_broadcast(&queue->cond); + mp_cond_broadcast(&queue->cond); while (1) { if (queue->lock_requests) { // Block due to something having called mp_dispatch_lock(). - pthread_cond_wait(&queue->cond, &queue->lock); + mp_cond_wait(&queue->cond, &queue->lock); } else if (queue->head) { struct mp_dispatch_item *item = queue->head; queue->head = item->next; @@ -295,23 +295,22 @@ void mp_dispatch_queue_process(struct mp_dispatch_queue *queue, double timeout) // from mp_dispatch_lock(), which is done by locked=true. assert(!queue->locked); queue->locked = true; - pthread_mutex_unlock(&queue->lock); + mp_mutex_unlock(&queue->lock); item->fn(item->fn_data); - pthread_mutex_lock(&queue->lock); + mp_mutex_lock(&queue->lock); assert(queue->locked); queue->locked = false; // Wakeup mp_dispatch_run(), also mp_dispatch_lock(). - pthread_cond_broadcast(&queue->cond); + mp_cond_broadcast(&queue->cond); if (item->asynchronous) { talloc_free(item); } else { item->completed = true; } } else if (queue->wait > 0 && !queue->interrupted) { - struct timespec ts = mp_time_ns_to_realtime(queue->wait); - if (pthread_cond_timedwait(&queue->cond, &queue->lock, &ts)) + if (mp_cond_timedwait_until(&queue->cond, &queue->lock, queue->wait)) queue->wait = 0; } else { break; @@ -320,7 +319,7 @@ void mp_dispatch_queue_process(struct mp_dispatch_queue *queue, double timeout) assert(!queue->locked); queue->in_process = false; queue->interrupted = false; - pthread_mutex_unlock(&queue->lock); + mp_mutex_unlock(&queue->lock); } // If the queue is inside of mp_dispatch_queue_process(), make it return as @@ -331,10 +330,10 @@ void mp_dispatch_queue_process(struct mp_dispatch_queue *queue, double timeout) // wakeup the main thread from another thread in a race free way). void mp_dispatch_interrupt(struct mp_dispatch_queue *queue) { - pthread_mutex_lock(&queue->lock); + mp_mutex_lock(&queue->lock); queue->interrupted = true; - pthread_cond_broadcast(&queue->cond); - pthread_mutex_unlock(&queue->lock); + mp_cond_broadcast(&queue->cond); + mp_mutex_unlock(&queue->lock); } // If a mp_dispatch_queue_process() call is in progress, then adjust the maximum @@ -347,12 +346,12 @@ void mp_dispatch_interrupt(struct mp_dispatch_queue *queue) // to wait in external APIs. void mp_dispatch_adjust_timeout(struct mp_dispatch_queue *queue, int64_t until) { - pthread_mutex_lock(&queue->lock); + mp_mutex_lock(&queue->lock); if (queue->in_process && queue->wait > until) { queue->wait = until; - pthread_cond_broadcast(&queue->cond); + mp_cond_broadcast(&queue->cond); } - pthread_mutex_unlock(&queue->lock); + mp_mutex_unlock(&queue->lock); } // Grant exclusive access to the target thread's state. While this is active, @@ -364,13 +363,13 @@ void mp_dispatch_adjust_timeout(struct mp_dispatch_queue *queue, int64_t until) // already holding the dispatch lock. void mp_dispatch_lock(struct mp_dispatch_queue *queue) { - pthread_mutex_lock(&queue->lock); + mp_mutex_lock(&queue->lock); // Must not be called recursively from dispatched callbacks. if (queue->in_process) - assert(!pthread_equal(queue->in_process_thread, pthread_self())); + assert(!mp_thread_equal(queue->in_process_thread, mp_thread_self())); // Must not be called recursively at all. if (queue->locked_explicit) - assert(!pthread_equal(queue->locked_explicit_thread, pthread_self())); + assert(!mp_thread_equal(queue->locked_explicit_thread, mp_thread_self())); queue->lock_requests += 1; // And now wait until the target thread gets "trapped" within the // mp_dispatch_queue_process() call, which will mean we get exclusive @@ -378,41 +377,41 @@ void mp_dispatch_lock(struct mp_dispatch_queue *queue) if (queue->onlock_fn) queue->onlock_fn(queue->onlock_ctx); while (!queue->in_process) { - pthread_mutex_unlock(&queue->lock); + mp_mutex_unlock(&queue->lock); if (queue->wakeup_fn) queue->wakeup_fn(queue->wakeup_ctx); - pthread_mutex_lock(&queue->lock); + mp_mutex_lock(&queue->lock); if (queue->in_process) break; - pthread_cond_wait(&queue->cond, &queue->lock); + mp_cond_wait(&queue->cond, &queue->lock); } // Wait until we can get the lock. while (!queue->in_process || queue->locked) - pthread_cond_wait(&queue->cond, &queue->lock); + mp_cond_wait(&queue->cond, &queue->lock); // "Lock". assert(queue->lock_requests); assert(!queue->locked); assert(!queue->locked_explicit); queue->locked = true; queue->locked_explicit = true; - queue->locked_explicit_thread = pthread_self(); - pthread_mutex_unlock(&queue->lock); + queue->locked_explicit_thread = mp_thread_self(); + mp_mutex_unlock(&queue->lock); } // Undo mp_dispatch_lock(). void mp_dispatch_unlock(struct mp_dispatch_queue *queue) { - pthread_mutex_lock(&queue->lock); + mp_mutex_lock(&queue->lock); assert(queue->locked); // Must be called after a mp_dispatch_lock(), from the same thread. assert(queue->locked_explicit); - assert(pthread_equal(queue->locked_explicit_thread, pthread_self())); + assert(mp_thread_equal(queue->locked_explicit_thread, mp_thread_self())); // "Unlock". queue->locked = false; queue->locked_explicit = false; queue->lock_requests -= 1; // Wakeup mp_dispatch_queue_process(), and maybe other mp_dispatch_lock()s. // (Would be nice to wake up only 1 other locker if lock_requests>0.) - pthread_cond_broadcast(&queue->cond); - pthread_mutex_unlock(&queue->lock); + mp_cond_broadcast(&queue->cond); + mp_mutex_unlock(&queue->lock); } diff --git a/misc/jni.c b/misc/jni.c index 3f76f379cb..82f63565b5 100644 --- a/misc/jni.c +++ b/misc/jni.c @@ -22,15 +22,15 @@ #include <libavcodec/jni.h> #include <libavutil/mem.h> #include <libavutil/bprint.h> -#include <pthread.h> #include <stdlib.h> #include "jni.h" +#include "osdep/threads.h" static JavaVM *java_vm; static pthread_key_t current_env; -static pthread_once_t once = PTHREAD_ONCE_INIT; -static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER; +static mp_once once = MP_STATIC_ONCE_INITIALIZER; +static mp_static_mutex lock = MP_STATIC_MUTEX_INITIALIZER; static void jni_detach_env(void *data) { @@ -49,7 +49,7 @@ JNIEnv *mp_jni_get_env(struct mp_log *log) int ret = 0; JNIEnv *env = NULL; - pthread_mutex_lock(&lock); + mp_mutex_lock(&lock); if (java_vm == NULL) { java_vm = av_jni_get_java_vm(NULL); } @@ -59,7 +59,7 @@ JNIEnv *mp_jni_get_env(struct mp_log *log) goto done; } - pthread_once(&once, jni_create_pthread_key); + mp_exec_once(&once, jni_create_pthread_key); if ((env = pthread_getspecific(current_env)) != NULL) { goto done; @@ -86,7 +86,7 @@ JNIEnv *mp_jni_get_env(struct mp_log *log) } done: - pthread_mutex_unlock(&lock); + mp_mutex_unlock(&lock); return env; } diff --git a/misc/random.c b/misc/random.c index 780c67ec10..e622ab7f3b 100644 --- a/misc/random.c +++ b/misc/random.c @@ -19,12 +19,12 @@ */ #include <stdint.h> -#include <pthread.h> +#include "osdep/threads.h" #include "random.h" static uint64_t state[4]; -static pthread_mutex_t state_mutex = PTHREAD_MUTEX_INITIALIZER; +static mp_static_mutex state_mutex = MP_STATIC_MUTEX_INITIALIZER; static inline uint64_t rotl_u64(const uint64_t x, const int k) { @@ -41,18 +41,18 @@ static inline uint64_t splitmix64(uint64_t *const x) void mp_rand_seed(uint64_t seed) { - pthread_mutex_lock(&state_mutex); + mp_mutex_lock(&state_mutex); state[0] = seed; for (int i = 1; i < 4; i++) state[i] = splitmix64(&seed); - pthread_mutex_unlock(&state_mutex); + mp_mutex_unlock(&state_mutex); } uint64_t mp_rand_next(void) { uint64_t result, t; - pthread_mutex_lock(&state_mutex); + mp_mutex_lock(&state_mutex); result = rotl_u64(state[1] * 5, 7) * 9; t = state[1] << 17; @@ -64,7 +64,7 @@ uint64_t mp_rand_next(void) state[2] ^= t; state[3] = rotl_u64(state[3], 45); - pthread_mutex_unlock(&state_mutex); + mp_mutex_unlock(&state_mutex); return result; } diff --git a/misc/rendezvous.c b/misc/rendezvous.c index aa94a9042d..1fe5724dad 100644 --- a/misc/rendezvous.c +++ b/misc/rendezvous.c @@ -1,9 +1,10 @@ -#include <pthread.h> #include "rendezvous.h" -static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER; -static pthread_cond_t wakeup = PTHREAD_COND_INITIALIZER; +#include "osdep/threads.h" + +static mp_static_mutex lock = MP_STATIC_MUTEX_INITIALIZER; +static mp_cond wakeup = MP_STATIC_COND_INITIALIZER; static struct waiter *waiters; @@ -31,7 +32,7 @@ struct waiter { intptr_t mp_rendezvous(void *tag, intptr_t value) { struct waiter wait = { .tag = tag, .value = &value }; - pthread_mutex_lock(&lock); + mp_mutex_lock(&lock); struct waiter **prev = &waiters; while (*prev) { if ((*prev)->tag == tag) { @@ -40,15 +41,15 @@ intptr_t mp_rendezvous(void *tag, intptr_t value) value = tmp; (*prev)->value = NULL; // signals completion *prev = (*prev)->next; // unlink - pthread_cond_broadcast(&wakeup); + mp_cond_broadcast(&wakeup); goto done; } prev = &(*prev)->next; } *prev = &wait; while (wait.value) - pthread_cond_wait(&wakeup, &lock); + mp_cond_wait(&wakeup, &lock); done: - pthread_mutex_unlock(&lock); + mp_mutex_unlock(&lock); return value; } diff --git a/misc/thread_pool.c b/misc/thread_pool.c index 9e47f13279..51ca22356d 100644 --- a/misc/thread_pool.c +++ b/misc/thread_pool.c @@ -13,8 +13,6 @@ * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. */ -#include <pthread.h> - #include "common/common.h" #include "osdep/threads.h" #include "osdep/timer.h" @@ -33,12 +31,12 @@ struct work { struct mp_thread_pool { int min_threads, max_threads; - pthread_mutex_t lock; - pthread_cond_t wakeup; + mp_mutex lock; + mp_cond wakeup; // --- the following fields are protected by lock - pthread_t *threads; + mp_thread *threads; int num_threads; // Number of threads which have taken up work and are still processing it. @@ -50,15 +48,15 @@ struct mp_thread_pool { int num_work; }; -static void *worker_thread(void *arg) +static MP_THREAD_VOID worker_thread(void *arg) { struct mp_thread_pool *pool = arg; - mpthread_set_name("worker"); + mp_thread_set_name("worker"); - pthread_mutex_lock(&pool->lock); + mp_mutex_lock(&pool->lock); - struct timespec ts = {0}; + int64_t destroy_deadline = 0; bool got_timeout = false; while (1) { struct work work = {0}; @@ -72,25 +70,25 @@ static void *worker_thread(void *arg) break; if (pool->num_threads > pool->min_threads) { - if (!ts.tv_sec && !ts.tv_nsec) - ts = mp_rel_time_to_timespec(DESTROY_TIMEOUT); - if (pthread_cond_timedwait(&pool->wakeup, &pool->lock, &ts)) + if (!destroy_deadline) + destroy_deadline = mp_time_ns_add(mp_time_ns(), DESTROY_TIMEOUT); + if (mp_cond_timedwait_until(&pool->wakeup, &pool->lock, destroy_deadline)) got_timeout = pool->num_threads > pool->min_threads; } else { - pthread_cond_wait(&pool->wakeup, &pool->lock); + mp_cond_wait(&pool->wakeup, &pool->lock); } continue; } pool->busy_threads += 1; - pthread_mutex_unlock(&pool->lock); + mp_mutex_unlock(&pool->lock); work.fn(work.fn_ctx); - pthread_mutex_lock(&pool->lock); + mp_mutex_lock(&pool->lock); pool->busy_threads -= 1; - ts = (struct timespec){0}; + destroy_deadline = 0; got_timeout = false; } @@ -98,18 +96,18 @@ static void *worker_thread(void *arg) // timeout, and nobody is waiting for us. We have to remove ourselves. if (!pool->terminate) { for (int n = 0; n < pool->num_threads; n++) { - if (pthread_equal(pool->threads[n], pthread_self())) { - pthread_detach(pthread_self()); + if (mp_thread_equal(pool->threads[n], mp_thread_self())) { + pthread_detach(mp_thread_self()); MP_TARRAY_REMOVE_AT(pool->threads, pool->num_threads, n); - pthread_mutex_unlock(&pool->lock); - return NULL; + mp_mutex_unlock(&pool->lock); + MP_THREAD_RETURN(); } } MP_ASSERT_UNREACHABLE(); } - pthread_mutex_unlock(&pool->lock); - return NULL; + mp_mutex_unlock(&pool->lock); + MP_THREAD_RETURN(); } static void thread_pool_dtor(void *ctx) @@ -117,33 +115,33 @@ static void thread_pool_dtor(void *ctx) struct mp_thread_pool *pool = ctx; - pthread_mutex_lock(&pool->lock); + mp_mutex_lock(&pool->lock); pool->terminate = true; - pthread_cond_broadcast(&pool->wakeup); + mp_cond_broadcast(&pool->wakeup); - pthread_t *threads = pool->threads; + mp_thread *threads = pool->threads; int num_threads = pool->num_threads; pool->threads = NULL; pool->num_threads = 0; - pthread_mutex_unlock(&pool->lock); + mp_mutex_unlock(&pool->lock); for (int n = 0; n < num_threads; n++) - pthread_join(threads[n], NULL); + mp_thread_join(threads[n]); assert(pool->num_work == 0); assert(pool->num_threads == 0); - pthread_cond_destroy(&pool->wakeup); - pthread_mutex_destroy(&pool->lock); + mp_cond_destroy(&pool->wakeup); + mp_mutex_destroy(&pool->lock); } static bool add_thread(struct mp_thread_pool *pool) { - pthread_t thread; + mp_thread thread; - if (pthread_create(&thread, NULL, worker_thread, pool) != 0) + if (mp_thread_create(&thread, worker_thread, pool) != 0) return false; MP_TARRAY_APPEND(pool, pool->threads, pool->num_threads, thread); @@ -160,17 +158,17 @@ struct mp_thread_pool *mp_thread_pool_create(void *ta_parent, int init_threads, struct mp_thread_pool *pool = talloc_zero(ta_parent, struct mp_thread_pool); talloc_set_destructor(pool, thread_pool_dtor); - pthread_mutex_init(&pool->lock, NULL); - pthread_cond_init(&pool->wakeup, NULL); + mp_mutex_init(&pool->lock); + mp_cond_init(&pool->wakeup); pool->min_threads = min_threads; pool->max_threads = max_threads; - pthread_mutex_lock(&pool->lock); + mp_mutex_lock(&pool->lock); for (int n = 0; n < init_threads; n++) add_thread(pool); bool ok = pool->num_threads >= init_threads; - pthread_mutex_unlock(&pool->lock); + mp_mutex_unlock(&pool->lock); if (!ok) TA_FREEP(&pool); @@ -185,7 +183,7 @@ static bool thread_pool_add(struct mp_thread_pool *pool, void (*fn)(void *ctx), assert(fn); - pthread_mutex_lock(&pool->lock); + mp_mutex_lock(&pool->lock); struct work work = {fn, fn_ctx}; // If there are not enough threads to process all at once, but we can @@ -203,10 +201,10 @@ static bool thread_pool_add(struct mp_thread_pool *pool, void (*fn)(void *ctx), if (ok) { MP_TARRAY_INSERT_AT(pool, pool->work, pool->num_work, 0, work); - pthread_cond_signal(&pool->wakeup); + mp_cond_signal(&pool->wakeup); } - pthread_mutex_unlock(&pool->lock); + mp_mutex_unlock(&pool->lock); return ok; } diff --git a/misc/thread_tools.c b/misc/thread_tools.c index b12ec4d603..0f7fe8f77a 100644 --- a/misc/thread_tools.c +++ b/misc/thread_tools.c @@ -35,10 +35,10 @@ uintptr_t mp_waiter_wait(struct mp_waiter *waiter) { - pthread_mutex_lock(&waiter->lock); + mp_mutex_lock(&waiter->lock); while (!waiter->done) - pthread_cond_wait(&waiter->wakeup, &waiter->lock); - pthread_mutex_unlock(&waiter->lock); + mp_cond_wait(&waiter->wakeup, &waiter->lock); + mp_mutex_unlock(&waiter->lock); uintptr_t ret = waiter->value; @@ -50,8 +50,8 @@ uintptr_t mp_waiter_wait(struct mp_waiter *waiter) // following functions will do nearly nothing. This is true for Windows // and Linux. But some lesser OSes still might allocate kernel objects // when initializing mutexes, so destroy them here. - pthread_mutex_destroy(&waiter->lock); - pthread_cond_destroy(&waiter->wakeup); + mp_mutex_destroy(&waiter->lock); + mp_cond_destroy(&waiter->wakeup); memset(waiter, 0xCA, sizeof(*waiter)); // for debugging @@ -60,25 +60,25 @@ uintptr_t mp_waiter_wait(struct mp_waiter *waiter) void mp_waiter_wakeup(struct mp_waiter *waiter, uintptr_t value) { - pthread_mutex_lock(&waiter->lock); + mp_mutex_lock(&waiter->lock); assert(!waiter->done); waiter->done = true; waiter->value = value; - pthread_cond_signal(&waiter->wakeup); - pthread_mutex_unlock(&waiter->lock); + mp_cond_signal(&waiter->wakeup); + mp_mutex_unlock(&waiter->lock); } bool mp_waiter_poll(struct mp_waiter *waiter) { - pthread_mutex_lock(&waiter->lock); + mp_mutex_lock(&waiter->lock); bool r = waiter->done; - pthread_mutex_unlock(&waiter->lock); + mp_mutex_unlock(&waiter->lock); return r; } struct mp_cancel { - pthread_mutex_t lock; - pthread_cond_t wakeup; + mp_mutex lock; + mp_cond wakeup; // Semaphore state and "mirrors". atomic_bool triggered; @@ -117,8 +117,8 @@ static void cancel_destroy(void *p) CloseHandle(c->win32_event); #endif - pthread_mutex_destroy(&c->lock); - pthread_cond_destroy(&c->wakeup); + mp_mutex_destroy(&c->lock); + mp_cond_destroy(&c->wakeup); } struct mp_cancel *mp_cancel_new(void *talloc_ctx) @@ -129,8 +129,8 @@ struct mp_cancel *mp_cancel_new(void *talloc_ctx) .triggered = false, .wakeup_pipe = {-1, -1}, }; - pthread_mutex_init(&c->lock, NULL); - pthread_cond_init(&c->wakeup, NULL); + mp_mutex_init(&c->lock); + mp_cond_init(&c->wakeup); return c; } @@ -138,7 +138,7 @@ static void trigger_locked(struct mp_cancel *c) { atomic_store(&c->triggered, true); - pthread_cond_broadcast(&c->wakeup); // condition bound to c->triggered + mp_cond_broadcast(&c->wakeup); // condition bound to c->triggered if (c->cb) c->cb(c->cb_ctx); @@ -157,14 +157,14 @@ static void trigger_locked(struct mp_cancel *c) void mp_cancel_trigger(struct mp_cancel *c) { - pthread_mutex_lock(&c->lock); + mp_mutex_lock(&c->lock); trigger_locked(c); - pthread_mutex_unlock(&c->lock); + mp_mutex_unlock(&c->lock); } void mp_cancel_reset(struct mp_cancel *c) { - pthread_mutex_lock(&c->lock); + mp_mutex_lock(&c->lock); atomic_store(&c->triggered, false); @@ -182,7 +182,7 @@ void mp_cancel_reset(struct mp_cancel *c) ResetEvent(c->win32_event); #endif - pthread_mutex_unlock(&c->lock); + mp_mutex_unlock(&c->lock); } bool mp_cancel_test(struct mp_cancel *c) @@ -192,13 +192,13 @@ bool mp_cancel_test(struct mp_cancel *c) bool mp_cancel_wait(struct mp_cancel *c, double timeout) { - struct timespec ts = mp_rel_time_to_timespec(timeout); - pthread_mutex_lock(&c->lock); + int64_t wait_until = mp_time_ns_add(mp_time_ns(), timeout); + mp_mutex_lock(&c->lock); while (!mp_cancel_test(c)) { - if (pthread_cond_timedwait(&c->wakeup, &c->lock, &ts)) + if (mp_cond_timedwait_until(&c->wakeup, &c->lock, wait_until)) break; } - pthread_mutex_unlock(&c->lock); + mp_mutex_unlock(&c->lock); return mp_cancel_test(c); } @@ -213,11 +213,11 @@ static void retrigger_locked(struct mp_cancel *c) void mp_cancel_set_cb(struct mp_cancel *c, void (*cb)(void *ctx), void *ctx) { - pthread_mutex_lock(&c->lock); + mp_mutex_lock(&c->lock); c->cb = cb; c->cb_ctx = ctx; retrigger_locked(c); - pthread_mutex_unlock(&c->lock); + mp_mutex_unlock(&c->lock); } void mp_cancel_set_parent(struct mp_cancel *slave, struct mp_cancel *parent) @@ -228,22 +228,22 @@ void mp_cancel_set_parent(struct mp_cancel *slave, struct mp_cancel *parent) if (slave->parent == parent) return; if (slave->parent) { - pthread_mutex_lock(&slave->parent->lock); + mp_mutex_lock(&slave->parent->lock); LL_REMOVE(siblings, &slave->parent->slaves, slave); - pthread_mutex_unlock(&slave->parent->lock); + mp_mutex_unlock(&slave->parent->lock); } slave->parent = parent; if (slave->parent) { - pthread_mutex_lock(&slave->parent->lock); + mp_mutex_lock(&slave->parent->lock); LL_APPEND(siblings, &slave->parent->slaves, slave); retrigger_locked(slave->parent); - pthread_mutex_unlock(&slave->parent->lock); + mp_mutex_unlock(&slave->parent->lock); } } int mp_cancel_get_fd(struct mp_cancel *c) { - pthread_mutex_lock(&c->lock); + mp_mutex_lock(&c->lock); if (c->wakeup_pipe[0] < 0) { #if defined(__GNUC__) && !defined(__clang__) # pragma GCC diagnostic push @@ -255,7 +255,7 @@ int mp_cancel_get_fd(struct mp_cancel *c) #endif retrigger_locked(c); } - pthread_mutex_unlock(&c->lock); + mp_mutex_unlock(&c->lock); return c->wakeup_pipe[0]; @@ -264,12 +264,12 @@ int mp_cancel_get_fd(struct mp_cancel *c) #ifdef __MINGW32__ void *mp_cancel_get_event(struct mp_cancel *c) { - pthread_mutex_lock(&c->lock); + mp_mutex_lock(&c->lock); if (!c->win32_event) { c->win32_event = CreateEventW(NULL, TRUE, FALSE, NULL); retrigger_locked(c); } - pthread_mutex_unlock(&c->lock); + mp_mutex_unlock(&c->lock); return c->win32_event; } diff --git a/misc/thread_tools.h b/misc/thread_tools.h index 89d84ce0b6..a07257b09b 100644 --- a/misc/thread_tools.h +++ b/misc/thread_tools.h @@ -2,22 +2,23 @@ #include <stdint.h> #include <stdbool.h> -#include <pthread.h> + +#include "osdep/threads.h" // This is basically a single-shot semaphore, intended as light-weight solution // for just making a thread wait for another thread. struct mp_waiter { // All fields are considered private. Use MP_WAITER_INITIALIZER to init. - pthread_mutex_t lock; - pthread_cond_t wakeup; + mp_mutex lock; + mp_cond wakeup; bool done; uintptr_t value; }; // Initialize a mp_waiter object for use with mp_waiter_*(). #define MP_WAITER_INITIALIZER { \ - .lock = PTHREAD_MUTEX_INITIALIZER, \ - .wakeup = PTHREAD_COND_INITIALIZER, \ + .lock = MP_STATIC_MUTEX_INITIALIZER, \ + .wakeup = MP_STATIC_COND_INITIALIZER, \ } // Block until some other thread calls mp_waiter_wakeup(). The function returns |