summaryrefslogtreecommitdiffstats
path: root/misc/dispatch.c
diff options
context:
space:
mode:
Diffstat (limited to 'misc/dispatch.c')
-rw-r--r--misc/dispatch.c95
1 files changed, 47 insertions, 48 deletions
diff --git a/misc/dispatch.c b/misc/dispatch.c
index 0c3c574afa..6fd9fe1a43 100644
--- a/misc/dispatch.c
+++ b/misc/dispatch.c
@@ -26,8 +26,8 @@
struct mp_dispatch_queue {
struct mp_dispatch_item *head, *tail;
- pthread_mutex_t lock;
- pthread_cond_t cond;
+ mp_mutex lock;
+ mp_cond cond;
void (*wakeup_fn)(void *wakeup_ctx);
void *wakeup_ctx;
void (*onlock_fn)(void *onlock_ctx);
@@ -39,7 +39,7 @@ struct mp_dispatch_queue {
// The target thread is in mp_dispatch_queue_process() (and either idling,
// locked, or running a dispatch callback).
bool in_process;
- pthread_t in_process_thread;
+ mp_thread_id in_process_thread_id;
// The target thread is in mp_dispatch_queue_process(), and currently
// something has exclusive access to it (e.g. running a dispatch callback,
// or a different thread got it with mp_dispatch_lock()).
@@ -48,7 +48,7 @@ struct mp_dispatch_queue {
size_t lock_requests;
// locked==true is due to a mp_dispatch_lock() call (for debugging).
bool locked_explicit;
- pthread_t locked_explicit_thread;
+ mp_thread_id locked_explicit_thread_id;
};
struct mp_dispatch_item {
@@ -67,8 +67,8 @@ static void queue_dtor(void *p)
assert(!queue->in_process);
assert(!queue->lock_requests);
assert(!queue->locked);
- pthread_cond_destroy(&queue->cond);
- pthread_mutex_destroy(&queue->lock);
+ mp_cond_destroy(&queue->cond);
+ mp_mutex_destroy(&queue->lock);
}
// A dispatch queue lets other threads run callbacks in a target thread.
@@ -76,7 +76,7 @@ static void queue_dtor(void *p)
// Free the dispatch queue with talloc_free(). At the time of destruction,
// the queue must be empty. The easiest way to guarantee this is to
// terminate all potential senders, then call mp_dispatch_run() with a
-// function that e.g. makes the target thread exit, then pthread_join() the
+// function that e.g. makes the target thread exit, then mp_thread_join() the
// target thread, and finally destroy the queue. Another way is calling
// mp_dispatch_queue_process() after terminating all potential senders, and
// then destroying the queue.
@@ -85,8 +85,8 @@ struct mp_dispatch_queue *mp_dispatch_create(void *ta_parent)
struct mp_dispatch_queue *queue = talloc_ptrtype(ta_parent, queue);
*queue = (struct mp_dispatch_queue){0};
talloc_set_destructor(queue, queue_dtor);
- pthread_mutex_init(&queue->lock, NULL);
- pthread_cond_init(&queue->cond, NULL);
+ mp_mutex_init(&queue->lock);
+ mp_cond_init(&queue->cond);
return queue;
}
@@ -126,14 +126,14 @@ void mp_dispatch_set_onlock_fn(struct mp_dispatch_queue *queue,
static void mp_dispatch_append(struct mp_dispatch_queue *queue,
struct mp_dispatch_item *item)
{
- pthread_mutex_lock(&queue->lock);
+ mp_mutex_lock(&queue->lock);
if (item->mergeable) {
for (struct mp_dispatch_item *cur = queue->head; cur; cur = cur->next) {
if (cur->mergeable && cur->fn == item->fn &&
cur->fn_data == item->fn_data)
{
talloc_free(item);
- pthread_mutex_unlock(&queue->lock);
+ mp_mutex_unlock(&queue->lock);
return;
}
}
@@ -148,12 +148,12 @@ static void mp_dispatch_append(struct mp_dispatch_queue *queue,
// Wake up the main thread; note that other threads might wait on this
// condition for reasons, so broadcast the condition.
- pthread_cond_broadcast(&queue->cond);
+ mp_cond_broadcast(&queue->cond);
// No wakeup callback -> assume mp_dispatch_queue_process() needs to be
// interrupted instead.
if (!queue->wakeup_fn)
queue->interrupted = true;
- pthread_mutex_unlock(&queue->lock);
+ mp_mutex_unlock(&queue->lock);
if (queue->wakeup_fn)
queue->wakeup_fn(queue->wakeup_ctx);
@@ -218,7 +218,7 @@ void mp_dispatch_enqueue_notify(struct mp_dispatch_queue *queue,
void mp_dispatch_cancel_fn(struct mp_dispatch_queue *queue,
mp_dispatch_fn fn, void *fn_data)
{
- pthread_mutex_lock(&queue->lock);
+ mp_mutex_lock(&queue->lock);
struct mp_dispatch_item **pcur = &queue->head;
queue->tail = NULL;
while (*pcur) {
@@ -231,7 +231,7 @@ void mp_dispatch_cancel_fn(struct mp_dispatch_queue *queue,
pcur = &cur->next;
}
}
- pthread_mutex_unlock(&queue->lock);
+ mp_mutex_unlock(&queue->lock);
}
// Run fn(fn_data) on the target thread synchronously. This function enqueues
@@ -247,10 +247,10 @@ void mp_dispatch_run(struct mp_dispatch_queue *queue,
};
mp_dispatch_append(queue, &item);
- pthread_mutex_lock(&queue->lock);
+ mp_mutex_lock(&queue->lock);
while (!item.completed)
- pthread_cond_wait(&queue->cond, &queue->lock);
- pthread_mutex_unlock(&queue->lock);
+ mp_cond_wait(&queue->cond, &queue->lock);
+ mp_mutex_unlock(&queue->lock);
}
// Process any outstanding dispatch items in the queue. This also handles
@@ -271,18 +271,18 @@ void mp_dispatch_run(struct mp_dispatch_queue *queue,
// no enqueued callback can call the lock/unlock functions).
void mp_dispatch_queue_process(struct mp_dispatch_queue *queue, double timeout)
{
- pthread_mutex_lock(&queue->lock);
- queue->wait = timeout > 0 ? mp_add_timeout(mp_time_us(), timeout) : 0;
+ mp_mutex_lock(&queue->lock);
+ queue->wait = timeout > 0 ? mp_time_ns_add(mp_time_ns(), timeout) : 0;
assert(!queue->in_process); // recursion not allowed
queue->in_process = true;
- queue->in_process_thread = pthread_self();
+ queue->in_process_thread_id = mp_thread_current_id();
// Wake up thread which called mp_dispatch_lock().
if (queue->lock_requests)
- pthread_cond_broadcast(&queue->cond);
+ mp_cond_broadcast(&queue->cond);
while (1) {
if (queue->lock_requests) {
// Block due to something having called mp_dispatch_lock().
- pthread_cond_wait(&queue->cond, &queue->lock);
+ mp_cond_wait(&queue->cond, &queue->lock);
} else if (queue->head) {
struct mp_dispatch_item *item = queue->head;
queue->head = item->next;
@@ -295,23 +295,22 @@ void mp_dispatch_queue_process(struct mp_dispatch_queue *queue, double timeout)
// from mp_dispatch_lock(), which is done by locked=true.
assert(!queue->locked);
queue->locked = true;
- pthread_mutex_unlock(&queue->lock);
+ mp_mutex_unlock(&queue->lock);
item->fn(item->fn_data);
- pthread_mutex_lock(&queue->lock);
+ mp_mutex_lock(&queue->lock);
assert(queue->locked);
queue->locked = false;
// Wakeup mp_dispatch_run(), also mp_dispatch_lock().
- pthread_cond_broadcast(&queue->cond);
+ mp_cond_broadcast(&queue->cond);
if (item->asynchronous) {
talloc_free(item);
} else {
item->completed = true;
}
} else if (queue->wait > 0 && !queue->interrupted) {
- struct timespec ts = mp_time_us_to_timespec(queue->wait);
- if (pthread_cond_timedwait(&queue->cond, &queue->lock, &ts))
+ if (mp_cond_timedwait_until(&queue->cond, &queue->lock, queue->wait))
queue->wait = 0;
} else {
break;
@@ -320,7 +319,7 @@ void mp_dispatch_queue_process(struct mp_dispatch_queue *queue, double timeout)
assert(!queue->locked);
queue->in_process = false;
queue->interrupted = false;
- pthread_mutex_unlock(&queue->lock);
+ mp_mutex_unlock(&queue->lock);
}
// If the queue is inside of mp_dispatch_queue_process(), make it return as
@@ -331,10 +330,10 @@ void mp_dispatch_queue_process(struct mp_dispatch_queue *queue, double timeout)
// wakeup the main thread from another thread in a race free way).
void mp_dispatch_interrupt(struct mp_dispatch_queue *queue)
{
- pthread_mutex_lock(&queue->lock);
+ mp_mutex_lock(&queue->lock);
queue->interrupted = true;
- pthread_cond_broadcast(&queue->cond);
- pthread_mutex_unlock(&queue->lock);
+ mp_cond_broadcast(&queue->cond);
+ mp_mutex_unlock(&queue->lock);
}
// If a mp_dispatch_queue_process() call is in progress, then adjust the maximum
@@ -347,12 +346,12 @@ void mp_dispatch_interrupt(struct mp_dispatch_queue *queue)
// to wait in external APIs.
void mp_dispatch_adjust_timeout(struct mp_dispatch_queue *queue, int64_t until)
{
- pthread_mutex_lock(&queue->lock);
+ mp_mutex_lock(&queue->lock);
if (queue->in_process && queue->wait > until) {
queue->wait = until;
- pthread_cond_broadcast(&queue->cond);
+ mp_cond_broadcast(&queue->cond);
}
- pthread_mutex_unlock(&queue->lock);
+ mp_mutex_unlock(&queue->lock);
}
// Grant exclusive access to the target thread's state. While this is active,
@@ -364,13 +363,13 @@ void mp_dispatch_adjust_timeout(struct mp_dispatch_queue *queue, int64_t until)
// already holding the dispatch lock.
void mp_dispatch_lock(struct mp_dispatch_queue *queue)
{
- pthread_mutex_lock(&queue->lock);
+ mp_mutex_lock(&queue->lock);
// Must not be called recursively from dispatched callbacks.
if (queue->in_process)
- assert(!pthread_equal(queue->in_process_thread, pthread_self()));
+ assert(!mp_thread_id_equal(queue->in_process_thread_id, mp_thread_current_id()));
// Must not be called recursively at all.
if (queue->locked_explicit)
- assert(!pthread_equal(queue->locked_explicit_thread, pthread_self()));
+ assert(!mp_thread_id_equal(queue->locked_explicit_thread_id, mp_thread_current_id()));
queue->lock_requests += 1;
// And now wait until the target thread gets "trapped" within the
// mp_dispatch_queue_process() call, which will mean we get exclusive
@@ -378,41 +377,41 @@ void mp_dispatch_lock(struct mp_dispatch_queue *queue)
if (queue->onlock_fn)
queue->onlock_fn(queue->onlock_ctx);
while (!queue->in_process) {
- pthread_mutex_unlock(&queue->lock);
+ mp_mutex_unlock(&queue->lock);
if (queue->wakeup_fn)
queue->wakeup_fn(queue->wakeup_ctx);
- pthread_mutex_lock(&queue->lock);
+ mp_mutex_lock(&queue->lock);
if (queue->in_process)
break;
- pthread_cond_wait(&queue->cond, &queue->lock);
+ mp_cond_wait(&queue->cond, &queue->lock);
}
// Wait until we can get the lock.
while (!queue->in_process || queue->locked)
- pthread_cond_wait(&queue->cond, &queue->lock);
+ mp_cond_wait(&queue->cond, &queue->lock);
// "Lock".
assert(queue->lock_requests);
assert(!queue->locked);
assert(!queue->locked_explicit);
queue->locked = true;
queue->locked_explicit = true;
- queue->locked_explicit_thread = pthread_self();
- pthread_mutex_unlock(&queue->lock);
+ queue->locked_explicit_thread_id = mp_thread_current_id();
+ mp_mutex_unlock(&queue->lock);
}
// Undo mp_dispatch_lock().
void mp_dispatch_unlock(struct mp_dispatch_queue *queue)
{
- pthread_mutex_lock(&queue->lock);
+ mp_mutex_lock(&queue->lock);
assert(queue->locked);
// Must be called after a mp_dispatch_lock(), from the same thread.
assert(queue->locked_explicit);
- assert(pthread_equal(queue->locked_explicit_thread, pthread_self()));
+ assert(mp_thread_id_equal(queue->locked_explicit_thread_id, mp_thread_current_id()));
// "Unlock".
queue->locked = false;
queue->locked_explicit = false;
queue->lock_requests -= 1;
// Wakeup mp_dispatch_queue_process(), and maybe other mp_dispatch_lock()s.
// (Would be nice to wake up only 1 other locker if lock_requests>0.)
- pthread_cond_broadcast(&queue->cond);
- pthread_mutex_unlock(&queue->lock);
+ mp_cond_broadcast(&queue->cond);
+ mp_mutex_unlock(&queue->lock);
}