summaryrefslogtreecommitdiffstats
path: root/misc
diff options
context:
space:
mode:
authorKacper Michajłow <kasper93@gmail.com>2023-10-21 04:55:41 +0200
committerDudemanguy <random342@airmail.cc>2023-11-05 17:36:17 +0000
commit174df99ffa53f1091589eaa4fa0c16cdd55a9326 (patch)
tree3a60d45615f18beed98a9b08267c28ed7e05dd5f /misc
parent3a8b107f6216b38a151d5ca1e9d4f2727e3418f5 (diff)
downloadmpv-174df99ffa53f1091589eaa4fa0c16cdd55a9326.tar.bz2
mpv-174df99ffa53f1091589eaa4fa0c16cdd55a9326.tar.xz
ALL: use new mp_thread abstraction
Diffstat (limited to 'misc')
-rw-r--r--misc/dispatch.c93
-rw-r--r--misc/jni.c12
-rw-r--r--misc/random.c12
-rw-r--r--misc/rendezvous.c15
-rw-r--r--misc/thread_pool.c74
-rw-r--r--misc/thread_tools.c70
-rw-r--r--misc/thread_tools.h11
7 files changed, 143 insertions, 144 deletions
diff --git a/misc/dispatch.c b/misc/dispatch.c
index d88d775d97..56348f2b13 100644
--- a/misc/dispatch.c
+++ b/misc/dispatch.c
@@ -26,8 +26,8 @@
struct mp_dispatch_queue {
struct mp_dispatch_item *head, *tail;
- pthread_mutex_t lock;
- pthread_cond_t cond;
+ mp_mutex lock;
+ mp_cond cond;
void (*wakeup_fn)(void *wakeup_ctx);
void *wakeup_ctx;
void (*onlock_fn)(void *onlock_ctx);
@@ -39,7 +39,7 @@ struct mp_dispatch_queue {
// The target thread is in mp_dispatch_queue_process() (and either idling,
// locked, or running a dispatch callback).
bool in_process;
- pthread_t in_process_thread;
+ mp_thread in_process_thread;
// The target thread is in mp_dispatch_queue_process(), and currently
// something has exclusive access to it (e.g. running a dispatch callback,
// or a different thread got it with mp_dispatch_lock()).
@@ -48,7 +48,7 @@ struct mp_dispatch_queue {
size_t lock_requests;
// locked==true is due to a mp_dispatch_lock() call (for debugging).
bool locked_explicit;
- pthread_t locked_explicit_thread;
+ mp_thread locked_explicit_thread;
};
struct mp_dispatch_item {
@@ -67,8 +67,8 @@ static void queue_dtor(void *p)
assert(!queue->in_process);
assert(!queue->lock_requests);
assert(!queue->locked);
- pthread_cond_destroy(&queue->cond);
- pthread_mutex_destroy(&queue->lock);
+ mp_cond_destroy(&queue->cond);
+ mp_mutex_destroy(&queue->lock);
}
// A dispatch queue lets other threads run callbacks in a target thread.
@@ -76,7 +76,7 @@ static void queue_dtor(void *p)
// Free the dispatch queue with talloc_free(). At the time of destruction,
// the queue must be empty. The easiest way to guarantee this is to
// terminate all potential senders, then call mp_dispatch_run() with a
-// function that e.g. makes the target thread exit, then pthread_join() the
+// function that e.g. makes the target thread exit, then mp_thread_join() the
// target thread, and finally destroy the queue. Another way is calling
// mp_dispatch_queue_process() after terminating all potential senders, and
// then destroying the queue.
@@ -85,8 +85,8 @@ struct mp_dispatch_queue *mp_dispatch_create(void *ta_parent)
struct mp_dispatch_queue *queue = talloc_ptrtype(ta_parent, queue);
*queue = (struct mp_dispatch_queue){0};
talloc_set_destructor(queue, queue_dtor);
- pthread_mutex_init(&queue->lock, NULL);
- pthread_cond_init(&queue->cond, NULL);
+ mp_mutex_init(&queue->lock);
+ mp_cond_init(&queue->cond);
return queue;
}
@@ -126,14 +126,14 @@ void mp_dispatch_set_onlock_fn(struct mp_dispatch_queue *queue,
static void mp_dispatch_append(struct mp_dispatch_queue *queue,
struct mp_dispatch_item *item)
{
- pthread_mutex_lock(&queue->lock);
+ mp_mutex_lock(&queue->lock);
if (item->mergeable) {
for (struct mp_dispatch_item *cur = queue->head; cur; cur = cur->next) {
if (cur->mergeable && cur->fn == item->fn &&
cur->fn_data == item->fn_data)
{
talloc_free(item);
- pthread_mutex_unlock(&queue->lock);
+ mp_mutex_unlock(&queue->lock);
return;
}
}
@@ -148,12 +148,12 @@ static void mp_dispatch_append(struct mp_dispatch_queue *queue,
// Wake up the main thread; note that other threads might wait on this
// condition for reasons, so broadcast the condition.
- pthread_cond_broadcast(&queue->cond);
+ mp_cond_broadcast(&queue->cond);
// No wakeup callback -> assume mp_dispatch_queue_process() needs to be
// interrupted instead.
if (!queue->wakeup_fn)
queue->interrupted = true;
- pthread_mutex_unlock(&queue->lock);
+ mp_mutex_unlock(&queue->lock);
if (queue->wakeup_fn)
queue->wakeup_fn(queue->wakeup_ctx);
@@ -218,7 +218,7 @@ void mp_dispatch_enqueue_notify(struct mp_dispatch_queue *queue,
void mp_dispatch_cancel_fn(struct mp_dispatch_queue *queue,
mp_dispatch_fn fn, void *fn_data)
{
- pthread_mutex_lock(&queue->lock);
+ mp_mutex_lock(&queue->lock);
struct mp_dispatch_item **pcur = &queue->head;
queue->tail = NULL;
while (*pcur) {
@@ -231,7 +231,7 @@ void mp_dispatch_cancel_fn(struct mp_dispatch_queue *queue,
pcur = &cur->next;
}
}
- pthread_mutex_unlock(&queue->lock);
+ mp_mutex_unlock(&queue->lock);
}
// Run fn(fn_data) on the target thread synchronously. This function enqueues
@@ -247,10 +247,10 @@ void mp_dispatch_run(struct mp_dispatch_queue *queue,
};
mp_dispatch_append(queue, &item);
- pthread_mutex_lock(&queue->lock);
+ mp_mutex_lock(&queue->lock);
while (!item.completed)
- pthread_cond_wait(&queue->cond, &queue->lock);
- pthread_mutex_unlock(&queue->lock);
+ mp_cond_wait(&queue->cond, &queue->lock);
+ mp_mutex_unlock(&queue->lock);
}
// Process any outstanding dispatch items in the queue. This also handles
@@ -271,18 +271,18 @@ void mp_dispatch_run(struct mp_dispatch_queue *queue,
// no enqueued callback can call the lock/unlock functions).
void mp_dispatch_queue_process(struct mp_dispatch_queue *queue, double timeout)
{
- pthread_mutex_lock(&queue->lock);
+ mp_mutex_lock(&queue->lock);
queue->wait = timeout > 0 ? mp_time_ns_add(mp_time_ns(), timeout) : 0;
assert(!queue->in_process); // recursion not allowed
queue->in_process = true;
- queue->in_process_thread = pthread_self();
+ queue->in_process_thread = mp_thread_self();
// Wake up thread which called mp_dispatch_lock().
if (queue->lock_requests)
- pthread_cond_broadcast(&queue->cond);
+ mp_cond_broadcast(&queue->cond);
while (1) {
if (queue->lock_requests) {
// Block due to something having called mp_dispatch_lock().
- pthread_cond_wait(&queue->cond, &queue->lock);
+ mp_cond_wait(&queue->cond, &queue->lock);
} else if (queue->head) {
struct mp_dispatch_item *item = queue->head;
queue->head = item->next;
@@ -295,23 +295,22 @@ void mp_dispatch_queue_process(struct mp_dispatch_queue *queue, double timeout)
// from mp_dispatch_lock(), which is done by locked=true.
assert(!queue->locked);
queue->locked = true;
- pthread_mutex_unlock(&queue->lock);
+ mp_mutex_unlock(&queue->lock);
item->fn(item->fn_data);
- pthread_mutex_lock(&queue->lock);
+ mp_mutex_lock(&queue->lock);
assert(queue->locked);
queue->locked = false;
// Wakeup mp_dispatch_run(), also mp_dispatch_lock().
- pthread_cond_broadcast(&queue->cond);
+ mp_cond_broadcast(&queue->cond);
if (item->asynchronous) {
talloc_free(item);
} else {
item->completed = true;
}
} else if (queue->wait > 0 && !queue->interrupted) {
- struct timespec ts = mp_time_ns_to_realtime(queue->wait);
- if (pthread_cond_timedwait(&queue->cond, &queue->lock, &ts))
+ if (mp_cond_timedwait_until(&queue->cond, &queue->lock, queue->wait))
queue->wait = 0;
} else {
break;
@@ -320,7 +319,7 @@ void mp_dispatch_queue_process(struct mp_dispatch_queue *queue, double timeout)
assert(!queue->locked);
queue->in_process = false;
queue->interrupted = false;
- pthread_mutex_unlock(&queue->lock);
+ mp_mutex_unlock(&queue->lock);
}
// If the queue is inside of mp_dispatch_queue_process(), make it return as
@@ -331,10 +330,10 @@ void mp_dispatch_queue_process(struct mp_dispatch_queue *queue, double timeout)
// wakeup the main thread from another thread in a race free way).
void mp_dispatch_interrupt(struct mp_dispatch_queue *queue)
{
- pthread_mutex_lock(&queue->lock);
+ mp_mutex_lock(&queue->lock);
queue->interrupted = true;
- pthread_cond_broadcast(&queue->cond);
- pthread_mutex_unlock(&queue->lock);
+ mp_cond_broadcast(&queue->cond);
+ mp_mutex_unlock(&queue->lock);
}
// If a mp_dispatch_queue_process() call is in progress, then adjust the maximum
@@ -347,12 +346,12 @@ void mp_dispatch_interrupt(struct mp_dispatch_queue *queue)
// to wait in external APIs.
void mp_dispatch_adjust_timeout(struct mp_dispatch_queue *queue, int64_t until)
{
- pthread_mutex_lock(&queue->lock);
+ mp_mutex_lock(&queue->lock);
if (queue->in_process && queue->wait > until) {
queue->wait = until;
- pthread_cond_broadcast(&queue->cond);
+ mp_cond_broadcast(&queue->cond);
}
- pthread_mutex_unlock(&queue->lock);
+ mp_mutex_unlock(&queue->lock);
}
// Grant exclusive access to the target thread's state. While this is active,
@@ -364,13 +363,13 @@ void mp_dispatch_adjust_timeout(struct mp_dispatch_queue *queue, int64_t until)
// already holding the dispatch lock.
void mp_dispatch_lock(struct mp_dispatch_queue *queue)
{
- pthread_mutex_lock(&queue->lock);
+ mp_mutex_lock(&queue->lock);
// Must not be called recursively from dispatched callbacks.
if (queue->in_process)
- assert(!pthread_equal(queue->in_process_thread, pthread_self()));
+ assert(!mp_thread_equal(queue->in_process_thread, mp_thread_self()));
// Must not be called recursively at all.
if (queue->locked_explicit)
- assert(!pthread_equal(queue->locked_explicit_thread, pthread_self()));
+ assert(!mp_thread_equal(queue->locked_explicit_thread, mp_thread_self()));
queue->lock_requests += 1;
// And now wait until the target thread gets "trapped" within the
// mp_dispatch_queue_process() call, which will mean we get exclusive
@@ -378,41 +377,41 @@ void mp_dispatch_lock(struct mp_dispatch_queue *queue)
if (queue->onlock_fn)
queue->onlock_fn(queue->onlock_ctx);
while (!queue->in_process) {
- pthread_mutex_unlock(&queue->lock);
+ mp_mutex_unlock(&queue->lock);
if (queue->wakeup_fn)
queue->wakeup_fn(queue->wakeup_ctx);
- pthread_mutex_lock(&queue->lock);
+ mp_mutex_lock(&queue->lock);
if (queue->in_process)
break;
- pthread_cond_wait(&queue->cond, &queue->lock);
+ mp_cond_wait(&queue->cond, &queue->lock);
}
// Wait until we can get the lock.
while (!queue->in_process || queue->locked)
- pthread_cond_wait(&queue->cond, &queue->lock);
+ mp_cond_wait(&queue->cond, &queue->lock);
// "Lock".
assert(queue->lock_requests);
assert(!queue->locked);
assert(!queue->locked_explicit);
queue->locked = true;
queue->locked_explicit = true;
- queue->locked_explicit_thread = pthread_self();
- pthread_mutex_unlock(&queue->lock);
+ queue->locked_explicit_thread = mp_thread_self();
+ mp_mutex_unlock(&queue->lock);
}
// Undo mp_dispatch_lock().
void mp_dispatch_unlock(struct mp_dispatch_queue *queue)
{
- pthread_mutex_lock(&queue->lock);
+ mp_mutex_lock(&queue->lock);
assert(queue->locked);
// Must be called after a mp_dispatch_lock(), from the same thread.
assert(queue->locked_explicit);
- assert(pthread_equal(queue->locked_explicit_thread, pthread_self()));
+ assert(mp_thread_equal(queue->locked_explicit_thread, mp_thread_self()));
// "Unlock".
queue->locked = false;
queue->locked_explicit = false;
queue->lock_requests -= 1;
// Wakeup mp_dispatch_queue_process(), and maybe other mp_dispatch_lock()s.
// (Would be nice to wake up only 1 other locker if lock_requests>0.)
- pthread_cond_broadcast(&queue->cond);
- pthread_mutex_unlock(&queue->lock);
+ mp_cond_broadcast(&queue->cond);
+ mp_mutex_unlock(&queue->lock);
}
diff --git a/misc/jni.c b/misc/jni.c
index 3f76f379cb..82f63565b5 100644
--- a/misc/jni.c
+++ b/misc/jni.c
@@ -22,15 +22,15 @@
#include <libavcodec/jni.h>
#include <libavutil/mem.h>
#include <libavutil/bprint.h>
-#include <pthread.h>
#include <stdlib.h>
#include "jni.h"
+#include "osdep/threads.h"
static JavaVM *java_vm;
static pthread_key_t current_env;
-static pthread_once_t once = PTHREAD_ONCE_INIT;
-static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
+static mp_once once = MP_STATIC_ONCE_INITIALIZER;
+static mp_static_mutex lock = MP_STATIC_MUTEX_INITIALIZER;
static void jni_detach_env(void *data)
{
@@ -49,7 +49,7 @@ JNIEnv *mp_jni_get_env(struct mp_log *log)
int ret = 0;
JNIEnv *env = NULL;
- pthread_mutex_lock(&lock);
+ mp_mutex_lock(&lock);
if (java_vm == NULL) {
java_vm = av_jni_get_java_vm(NULL);
}
@@ -59,7 +59,7 @@ JNIEnv *mp_jni_get_env(struct mp_log *log)
goto done;
}
- pthread_once(&once, jni_create_pthread_key);
+ mp_exec_once(&once, jni_create_pthread_key);
if ((env = pthread_getspecific(current_env)) != NULL) {
goto done;
@@ -86,7 +86,7 @@ JNIEnv *mp_jni_get_env(struct mp_log *log)
}
done:
- pthread_mutex_unlock(&lock);
+ mp_mutex_unlock(&lock);
return env;
}
diff --git a/misc/random.c b/misc/random.c
index 780c67ec10..e622ab7f3b 100644
--- a/misc/random.c
+++ b/misc/random.c
@@ -19,12 +19,12 @@
*/
#include <stdint.h>
-#include <pthread.h>
+#include "osdep/threads.h"
#include "random.h"
static uint64_t state[4];
-static pthread_mutex_t state_mutex = PTHREAD_MUTEX_INITIALIZER;
+static mp_static_mutex state_mutex = MP_STATIC_MUTEX_INITIALIZER;
static inline uint64_t rotl_u64(const uint64_t x, const int k)
{
@@ -41,18 +41,18 @@ static inline uint64_t splitmix64(uint64_t *const x)
void mp_rand_seed(uint64_t seed)
{
- pthread_mutex_lock(&state_mutex);
+ mp_mutex_lock(&state_mutex);
state[0] = seed;
for (int i = 1; i < 4; i++)
state[i] = splitmix64(&seed);
- pthread_mutex_unlock(&state_mutex);
+ mp_mutex_unlock(&state_mutex);
}
uint64_t mp_rand_next(void)
{
uint64_t result, t;
- pthread_mutex_lock(&state_mutex);
+ mp_mutex_lock(&state_mutex);
result = rotl_u64(state[1] * 5, 7) * 9;
t = state[1] << 17;
@@ -64,7 +64,7 @@ uint64_t mp_rand_next(void)
state[2] ^= t;
state[3] = rotl_u64(state[3], 45);
- pthread_mutex_unlock(&state_mutex);
+ mp_mutex_unlock(&state_mutex);
return result;
}
diff --git a/misc/rendezvous.c b/misc/rendezvous.c
index aa94a9042d..1fe5724dad 100644
--- a/misc/rendezvous.c
+++ b/misc/rendezvous.c
@@ -1,9 +1,10 @@
-#include <pthread.h>
#include "rendezvous.h"
-static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
-static pthread_cond_t wakeup = PTHREAD_COND_INITIALIZER;
+#include "osdep/threads.h"
+
+static mp_static_mutex lock = MP_STATIC_MUTEX_INITIALIZER;
+static mp_cond wakeup = MP_STATIC_COND_INITIALIZER;
static struct waiter *waiters;
@@ -31,7 +32,7 @@ struct waiter {
intptr_t mp_rendezvous(void *tag, intptr_t value)
{
struct waiter wait = { .tag = tag, .value = &value };
- pthread_mutex_lock(&lock);
+ mp_mutex_lock(&lock);
struct waiter **prev = &waiters;
while (*prev) {
if ((*prev)->tag == tag) {
@@ -40,15 +41,15 @@ intptr_t mp_rendezvous(void *tag, intptr_t value)
value = tmp;
(*prev)->value = NULL; // signals completion
*prev = (*prev)->next; // unlink
- pthread_cond_broadcast(&wakeup);
+ mp_cond_broadcast(&wakeup);
goto done;
}
prev = &(*prev)->next;
}
*prev = &wait;
while (wait.value)
- pthread_cond_wait(&wakeup, &lock);
+ mp_cond_wait(&wakeup, &lock);
done:
- pthread_mutex_unlock(&lock);
+ mp_mutex_unlock(&lock);
return value;
}
diff --git a/misc/thread_pool.c b/misc/thread_pool.c
index 9e47f13279..51ca22356d 100644
--- a/misc/thread_pool.c
+++ b/misc/thread_pool.c
@@ -13,8 +13,6 @@
* OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
-#include <pthread.h>
-
#include "common/common.h"
#include "osdep/threads.h"
#include "osdep/timer.h"
@@ -33,12 +31,12 @@ struct work {
struct mp_thread_pool {
int min_threads, max_threads;
- pthread_mutex_t lock;
- pthread_cond_t wakeup;
+ mp_mutex lock;
+ mp_cond wakeup;
// --- the following fields are protected by lock
- pthread_t *threads;
+ mp_thread *threads;
int num_threads;
// Number of threads which have taken up work and are still processing it.
@@ -50,15 +48,15 @@ struct mp_thread_pool {
int num_work;
};
-static void *worker_thread(void *arg)
+static MP_THREAD_VOID worker_thread(void *arg)
{
struct mp_thread_pool *pool = arg;
- mpthread_set_name("worker");
+ mp_thread_set_name("worker");
- pthread_mutex_lock(&pool->lock);
+ mp_mutex_lock(&pool->lock);
- struct timespec ts = {0};
+ int64_t destroy_deadline = 0;
bool got_timeout = false;
while (1) {
struct work work = {0};
@@ -72,25 +70,25 @@ static void *worker_thread(void *arg)
break;
if (pool->num_threads > pool->min_threads) {
- if (!ts.tv_sec && !ts.tv_nsec)
- ts = mp_rel_time_to_timespec(DESTROY_TIMEOUT);
- if (pthread_cond_timedwait(&pool->wakeup, &pool->lock, &ts))
+ if (!destroy_deadline)
+ destroy_deadline = mp_time_ns_add(mp_time_ns(), DESTROY_TIMEOUT);
+ if (mp_cond_timedwait_until(&pool->wakeup, &pool->lock, destroy_deadline))
got_timeout = pool->num_threads > pool->min_threads;
} else {
- pthread_cond_wait(&pool->wakeup, &pool->lock);
+ mp_cond_wait(&pool->wakeup, &pool->lock);
}
continue;
}
pool->busy_threads += 1;
- pthread_mutex_unlock(&pool->lock);
+ mp_mutex_unlock(&pool->lock);
work.fn(work.fn_ctx);
- pthread_mutex_lock(&pool->lock);
+ mp_mutex_lock(&pool->lock);
pool->busy_threads -= 1;
- ts = (struct timespec){0};
+ destroy_deadline = 0;
got_timeout = false;
}
@@ -98,18 +96,18 @@ static void *worker_thread(void *arg)
// timeout, and nobody is waiting for us. We have to remove ourselves.
if (!pool->terminate) {
for (int n = 0; n < pool->num_threads; n++) {
- if (pthread_equal(pool->threads[n], pthread_self())) {
- pthread_detach(pthread_self());
+ if (mp_thread_equal(pool->threads[n], mp_thread_self())) {
+ pthread_detach(mp_thread_self());
MP_TARRAY_REMOVE_AT(pool->threads, pool->num_threads, n);
- pthread_mutex_unlock(&pool->lock);
- return NULL;
+ mp_mutex_unlock(&pool->lock);
+ MP_THREAD_RETURN();
}
}
MP_ASSERT_UNREACHABLE();
}
- pthread_mutex_unlock(&pool->lock);
- return NULL;
+ mp_mutex_unlock(&pool->lock);
+ MP_THREAD_RETURN();
}
static void thread_pool_dtor(void *ctx)
@@ -117,33 +115,33 @@ static void thread_pool_dtor(void *ctx)
struct mp_thread_pool *pool = ctx;
- pthread_mutex_lock(&pool->lock);
+ mp_mutex_lock(&pool->lock);
pool->terminate = true;
- pthread_cond_broadcast(&pool->wakeup);
+ mp_cond_broadcast(&pool->wakeup);
- pthread_t *threads = pool->threads;
+ mp_thread *threads = pool->threads;
int num_threads = pool->num_threads;
pool->threads = NULL;
pool->num_threads = 0;
- pthread_mutex_unlock(&pool->lock);
+ mp_mutex_unlock(&pool->lock);
for (int n = 0; n < num_threads; n++)
- pthread_join(threads[n], NULL);
+ mp_thread_join(threads[n]);
assert(pool->num_work == 0);
assert(pool->num_threads == 0);
- pthread_cond_destroy(&pool->wakeup);
- pthread_mutex_destroy(&pool->lock);
+ mp_cond_destroy(&pool->wakeup);
+ mp_mutex_destroy(&pool->lock);
}
static bool add_thread(struct mp_thread_pool *pool)
{
- pthread_t thread;
+ mp_thread thread;
- if (pthread_create(&thread, NULL, worker_thread, pool) != 0)
+ if (mp_thread_create(&thread, worker_thread, pool) != 0)
return false;
MP_TARRAY_APPEND(pool, pool->threads, pool->num_threads, thread);
@@ -160,17 +158,17 @@ struct mp_thread_pool *mp_thread_pool_create(void *ta_parent, int init_threads,
struct mp_thread_pool *pool = talloc_zero(ta_parent, struct mp_thread_pool);
talloc_set_destructor(pool, thread_pool_dtor);
- pthread_mutex_init(&pool->lock, NULL);
- pthread_cond_init(&pool->wakeup, NULL);
+ mp_mutex_init(&pool->lock);
+ mp_cond_init(&pool->wakeup);
pool->min_threads = min_threads;
pool->max_threads = max_threads;
- pthread_mutex_lock(&pool->lock);
+ mp_mutex_lock(&pool->lock);
for (int n = 0; n < init_threads; n++)
add_thread(pool);
bool ok = pool->num_threads >= init_threads;
- pthread_mutex_unlock(&pool->lock);
+ mp_mutex_unlock(&pool->lock);
if (!ok)
TA_FREEP(&pool);
@@ -185,7 +183,7 @@ static bool thread_pool_add(struct mp_thread_pool *pool, void (*fn)(void *ctx),
assert(fn);
- pthread_mutex_lock(&pool->lock);
+ mp_mutex_lock(&pool->lock);
struct work work = {fn, fn_ctx};
// If there are not enough threads to process all at once, but we can
@@ -203,10 +201,10 @@ static bool thread_pool_add(struct mp_thread_pool *pool, void (*fn)(void *ctx),
if (ok) {
MP_TARRAY_INSERT_AT(pool, pool->work, pool->num_work, 0, work);
- pthread_cond_signal(&pool->wakeup);
+ mp_cond_signal(&pool->wakeup);
}
- pthread_mutex_unlock(&pool->lock);
+ mp_mutex_unlock(&pool->lock);
return ok;
}
diff --git a/misc/thread_tools.c b/misc/thread_tools.c
index b12ec4d603..0f7fe8f77a 100644
--- a/misc/thread_tools.c
+++ b/misc/thread_tools.c
@@ -35,10 +35,10 @@
uintptr_t mp_waiter_wait(struct mp_waiter *waiter)
{
- pthread_mutex_lock(&waiter->lock);
+ mp_mutex_lock(&waiter->lock);
while (!waiter->done)
- pthread_cond_wait(&waiter->wakeup, &waiter->lock);
- pthread_mutex_unlock(&waiter->lock);
+ mp_cond_wait(&waiter->wakeup, &waiter->lock);
+ mp_mutex_unlock(&waiter->lock);
uintptr_t ret = waiter->value;
@@ -50,8 +50,8 @@ uintptr_t mp_waiter_wait(struct mp_waiter *waiter)
// following functions will do nearly nothing. This is true for Windows
// and Linux. But some lesser OSes still might allocate kernel objects
// when initializing mutexes, so destroy them here.
- pthread_mutex_destroy(&waiter->lock);
- pthread_cond_destroy(&waiter->wakeup);
+ mp_mutex_destroy(&waiter->lock);
+ mp_cond_destroy(&waiter->wakeup);
memset(waiter, 0xCA, sizeof(*waiter)); // for debugging
@@ -60,25 +60,25 @@ uintptr_t mp_waiter_wait(struct mp_waiter *waiter)
void mp_waiter_wakeup(struct mp_waiter *waiter, uintptr_t value)
{
- pthread_mutex_lock(&waiter->lock);
+ mp_mutex_lock(&waiter->lock);
assert(!waiter->done);
waiter->done = true;
waiter->value = value;
- pthread_cond_signal(&waiter->wakeup);
- pthread_mutex_unlock(&waiter->lock);
+ mp_cond_signal(&waiter->wakeup);
+ mp_mutex_unlock(&waiter->lock);
}
bool mp_waiter_poll(struct mp_waiter *waiter)
{
- pthread_mutex_lock(&waiter->lock);
+ mp_mutex_lock(&waiter->lock);
bool r = waiter->done;
- pthread_mutex_unlock(&waiter->lock);
+ mp_mutex_unlock(&waiter->lock);
return r;
}
struct mp_cancel {
- pthread_mutex_t lock;
- pthread_cond_t wakeup;
+ mp_mutex lock;
+ mp_cond wakeup;
// Semaphore state and "mirrors".
atomic_bool triggered;
@@ -117,8 +117,8 @@ static void cancel_destroy(void *p)
CloseHandle(c->win32_event);
#endif
- pthread_mutex_destroy(&c->lock);
- pthread_cond_destroy(&c->wakeup);
+ mp_mutex_destroy(&c->lock);
+ mp_cond_destroy(&c->wakeup);
}
struct mp_cancel *mp_cancel_new(void *talloc_ctx)
@@ -129,8 +129,8 @@ struct mp_cancel *mp_cancel_new(void *talloc_ctx)
.triggered = false,
.wakeup_pipe = {-1, -1},
};
- pthread_mutex_init(&c->lock, NULL);
- pthread_cond_init(&c->wakeup, NULL);
+ mp_mutex_init(&c->lock);
+ mp_cond_init(&c->wakeup);
return c;
}
@@ -138,7 +138,7 @@ static void trigger_locked(struct mp_cancel *c)
{
atomic_store(&c->triggered, true);
- pthread_cond_broadcast(&c->wakeup); // condition bound to c->triggered
+ mp_cond_broadcast(&c->wakeup); // condition bound to c->triggered
if (c->cb)
c->cb(c->cb_ctx);
@@ -157,14 +157,14 @@ static void trigger_locked(struct mp_cancel *c)
void mp_cancel_trigger(struct mp_cancel *c)
{
- pthread_mutex_lock(&c->lock);
+ mp_mutex_lock(&c->lock);
trigger_locked(c);
- pthread_mutex_unlock(&c->lock);
+ mp_mutex_unlock(&c->lock);
}
void mp_cancel_reset(struct mp_cancel *c)
{
- pthread_mutex_lock(&c->lock);
+ mp_mutex_lock(&c->lock);
atomic_store(&c->triggered, false);
@@ -182,7 +182,7 @@ void mp_cancel_reset(struct mp_cancel *c)
ResetEvent(c->win32_event);
#endif
- pthread_mutex_unlock(&c->lock);
+ mp_mutex_unlock(&c->lock);
}
bool mp_cancel_test(struct mp_cancel *c)
@@ -192,13 +192,13 @@ bool mp_cancel_test(struct mp_cancel *c)
bool mp_cancel_wait(struct mp_cancel *c, double timeout)
{
- struct timespec ts = mp_rel_time_to_timespec(timeout);
- pthread_mutex_lock(&c->lock);
+ int64_t wait_until = mp_time_ns_add(mp_time_ns(), timeout);
+ mp_mutex_lock(&c->lock);
while (!mp_cancel_test(c)) {
- if (pthread_cond_timedwait(&c->wakeup, &c->lock, &ts))
+ if (mp_cond_timedwait_until(&c->wakeup, &c->lock, wait_until))
break;
}
- pthread_mutex_unlock(&c->lock);
+ mp_mutex_unlock(&c->lock);
return mp_cancel_test(c);
}
@@ -213,11 +213,11 @@ static void retrigger_locked(struct mp_cancel *c)
void mp_cancel_set_cb(struct mp_cancel *c, void (*cb)(void *ctx), void *ctx)
{
- pthread_mutex_lock(&c->lock);
+ mp_mutex_lock(&c->lock);
c->cb = cb;
c->cb_ctx = ctx;
retrigger_locked(c);
- pthread_mutex_unlock(&c->lock);
+ mp_mutex_unlock(&c->lock);
}
void mp_cancel_set_parent(struct mp_cancel *slave, struct mp_cancel *parent)
@@ -228,22 +228,22 @@ void mp_cancel_set_parent(struct mp_cancel *slave, struct mp_cancel *parent)
if (slave->parent == parent)
return;
if (slave->parent) {
- pthread_mutex_lock(&slave->parent->lock);
+ mp_mutex_lock(&slave->parent->lock);
LL_REMOVE(siblings, &slave->parent->slaves, slave);
- pthread_mutex_unlock(&slave->parent->lock);
+ mp_mutex_unlock(&slave->parent->lock);
}
slave->parent = parent;
if (slave->parent) {
- pthread_mutex_lock(&slave->parent->lock);
+ mp_mutex_lock(&slave->parent->lock);
LL_APPEND(siblings, &slave->parent->slaves, slave);
retrigger_locked(slave->parent);
- pthread_mutex_unlock(&slave->parent->lock);
+ mp_mutex_unlock(&slave->parent->lock);
}
}
int mp_cancel_get_fd(struct mp_cancel *c)
{
- pthread_mutex_lock(&c->lock);
+ mp_mutex_lock(&c->lock);
if (c->wakeup_pipe[0] < 0) {
#if defined(__GNUC__) && !defined(__clang__)
# pragma GCC diagnostic push
@@ -255,7 +255,7 @@ int mp_cancel_get_fd(struct mp_cancel *c)
#endif
retrigger_locked(c);
}
- pthread_mutex_unlock(&c->lock);
+ mp_mutex_unlock(&c->lock);
return c->wakeup_pipe[0];
@@ -264,12 +264,12 @@ int mp_cancel_get_fd(struct mp_cancel *c)
#ifdef __MINGW32__
void *mp_cancel_get_event(struct mp_cancel *c)
{
- pthread_mutex_lock(&c->lock);
+ mp_mutex_lock(&c->lock);
if (!c->win32_event) {
c->win32_event = CreateEventW(NULL, TRUE, FALSE, NULL);
retrigger_locked(c);
}
- pthread_mutex_unlock(&c->lock);
+ mp_mutex_unlock(&c->lock);
return c->win32_event;
}
diff --git a/misc/thread_tools.h b/misc/thread_tools.h
index 89d84ce0b6..a07257b09b 100644
--- a/misc/thread_tools.h
+++ b/misc/thread_tools.h
@@ -2,22 +2,23 @@
#include <stdint.h>
#include <stdbool.h>
-#include <pthread.h>
+
+#include "osdep/threads.h"
// This is basically a single-shot semaphore, intended as light-weight solution
// for just making a thread wait for another thread.
struct mp_waiter {
// All fields are considered private. Use MP_WAITER_INITIALIZER to init.
- pthread_mutex_t lock;
- pthread_cond_t wakeup;
+ mp_mutex lock;
+ mp_cond wakeup;
bool done;
uintptr_t value;
};
// Initialize a mp_waiter object for use with mp_waiter_*().
#define MP_WAITER_INITIALIZER { \
- .lock = PTHREAD_MUTEX_INITIALIZER, \
- .wakeup = PTHREAD_COND_INITIALIZER, \
+ .lock = MP_STATIC_MUTEX_INITIALIZER, \
+ .wakeup = MP_STATIC_COND_INITIALIZER, \
}
// Block until some other thread calls mp_waiter_wakeup(). The function returns