summaryrefslogtreecommitdiffstats
path: root/misc/dispatch.c
diff options
context:
space:
mode:
Diffstat (limited to 'misc/dispatch.c')
-rw-r--r--misc/dispatch.c120
1 files changed, 70 insertions, 50 deletions
diff --git a/misc/dispatch.c b/misc/dispatch.c
index 1102c564ae..6fd9fe1a43 100644
--- a/misc/dispatch.c
+++ b/misc/dispatch.c
@@ -26,10 +26,12 @@
struct mp_dispatch_queue {
struct mp_dispatch_item *head, *tail;
- pthread_mutex_t lock;
- pthread_cond_t cond;
+ mp_mutex lock;
+ mp_cond cond;
void (*wakeup_fn)(void *wakeup_ctx);
void *wakeup_ctx;
+ void (*onlock_fn)(void *onlock_ctx);
+ void *onlock_ctx;
// Time at which mp_dispatch_queue_process() should return.
int64_t wait;
// Make mp_dispatch_queue_process() exit if it's idle.
@@ -37,7 +39,7 @@ struct mp_dispatch_queue {
// The target thread is in mp_dispatch_queue_process() (and either idling,
// locked, or running a dispatch callback).
bool in_process;
- pthread_t in_process_thread;
+ mp_thread_id in_process_thread_id;
// The target thread is in mp_dispatch_queue_process(), and currently
// something has exclusive access to it (e.g. running a dispatch callback,
// or a different thread got it with mp_dispatch_lock()).
@@ -46,7 +48,7 @@ struct mp_dispatch_queue {
size_t lock_requests;
// locked==true is due to a mp_dispatch_lock() call (for debugging).
bool locked_explicit;
- pthread_t locked_explicit_thread;
+ mp_thread_id locked_explicit_thread_id;
};
struct mp_dispatch_item {
@@ -65,8 +67,8 @@ static void queue_dtor(void *p)
assert(!queue->in_process);
assert(!queue->lock_requests);
assert(!queue->locked);
- pthread_cond_destroy(&queue->cond);
- pthread_mutex_destroy(&queue->lock);
+ mp_cond_destroy(&queue->cond);
+ mp_mutex_destroy(&queue->lock);
}
// A dispatch queue lets other threads run callbacks in a target thread.
@@ -74,7 +76,7 @@ static void queue_dtor(void *p)
// Free the dispatch queue with talloc_free(). At the time of destruction,
// the queue must be empty. The easiest way to guarantee this is to
// terminate all potential senders, then call mp_dispatch_run() with a
-// function that e.g. makes the target thread exit, then pthread_join() the
+// function that e.g. makes the target thread exit, then mp_thread_join() the
// target thread, and finally destroy the queue. Another way is calling
// mp_dispatch_queue_process() after terminating all potential senders, and
// then destroying the queue.
@@ -83,8 +85,8 @@ struct mp_dispatch_queue *mp_dispatch_create(void *ta_parent)
struct mp_dispatch_queue *queue = talloc_ptrtype(ta_parent, queue);
*queue = (struct mp_dispatch_queue){0};
talloc_set_destructor(queue, queue_dtor);
- pthread_mutex_init(&queue->lock, NULL);
- pthread_cond_init(&queue->cond, NULL);
+ mp_mutex_init(&queue->lock);
+ mp_cond_init(&queue->cond);
return queue;
}
@@ -94,8 +96,9 @@ struct mp_dispatch_queue *mp_dispatch_create(void *ta_parent)
// the wakeup_fn could for example write a byte into a "wakeup" pipe in order
// to unblock the select(). The wakeup_fn is called from the dispatch queue
// when there are new dispatch items, and the target thread should then enter
-// mp_dispatch_queue_process() as soon as possible. Note that wakeup_fn is
-// called under no lock, so you might have to do synchronization yourself.
+// mp_dispatch_queue_process() as soon as possible.
+// Note that this setter does not do internal synchronization, so you must set
+// it before other threads see it.
void mp_dispatch_set_wakeup_fn(struct mp_dispatch_queue *queue,
void (*wakeup_fn)(void *wakeup_ctx),
void *wakeup_ctx)
@@ -104,17 +107,33 @@ void mp_dispatch_set_wakeup_fn(struct mp_dispatch_queue *queue,
queue->wakeup_ctx = wakeup_ctx;
}
+// Set a function that will be called by mp_dispatch_lock() if the target thread
+// is not calling mp_dispatch_queue_process() right now. This is an obscure,
+// optional mechanism to make a worker thread react to external events more
+// quickly. The idea is that the callback will make the worker thread to stop
+// doing whatever (e.g. by setting a flag), and call mp_dispatch_queue_process()
+// in order to let mp_dispatch_lock() calls continue sooner.
+// Like wakeup_fn, this setter does no internal synchronization, and you must
+// not access the dispatch queue itself from the callback.
+void mp_dispatch_set_onlock_fn(struct mp_dispatch_queue *queue,
+ void (*onlock_fn)(void *onlock_ctx),
+ void *onlock_ctx)
+{
+ queue->onlock_fn = onlock_fn;
+ queue->onlock_ctx = onlock_ctx;
+}
+
static void mp_dispatch_append(struct mp_dispatch_queue *queue,
struct mp_dispatch_item *item)
{
- pthread_mutex_lock(&queue->lock);
+ mp_mutex_lock(&queue->lock);
if (item->mergeable) {
for (struct mp_dispatch_item *cur = queue->head; cur; cur = cur->next) {
if (cur->mergeable && cur->fn == item->fn &&
cur->fn_data == item->fn_data)
{
talloc_free(item);
- pthread_mutex_unlock(&queue->lock);
+ mp_mutex_unlock(&queue->lock);
return;
}
}
@@ -129,12 +148,12 @@ static void mp_dispatch_append(struct mp_dispatch_queue *queue,
// Wake up the main thread; note that other threads might wait on this
// condition for reasons, so broadcast the condition.
- pthread_cond_broadcast(&queue->cond);
+ mp_cond_broadcast(&queue->cond);
// No wakeup callback -> assume mp_dispatch_queue_process() needs to be
// interrupted instead.
if (!queue->wakeup_fn)
queue->interrupted = true;
- pthread_mutex_unlock(&queue->lock);
+ mp_mutex_unlock(&queue->lock);
if (queue->wakeup_fn)
queue->wakeup_fn(queue->wakeup_ctx);
@@ -199,7 +218,7 @@ void mp_dispatch_enqueue_notify(struct mp_dispatch_queue *queue,
void mp_dispatch_cancel_fn(struct mp_dispatch_queue *queue,
mp_dispatch_fn fn, void *fn_data)
{
- pthread_mutex_lock(&queue->lock);
+ mp_mutex_lock(&queue->lock);
struct mp_dispatch_item **pcur = &queue->head;
queue->tail = NULL;
while (*pcur) {
@@ -212,7 +231,7 @@ void mp_dispatch_cancel_fn(struct mp_dispatch_queue *queue,
pcur = &cur->next;
}
}
- pthread_mutex_unlock(&queue->lock);
+ mp_mutex_unlock(&queue->lock);
}
// Run fn(fn_data) on the target thread synchronously. This function enqueues
@@ -228,10 +247,10 @@ void mp_dispatch_run(struct mp_dispatch_queue *queue,
};
mp_dispatch_append(queue, &item);
- pthread_mutex_lock(&queue->lock);
+ mp_mutex_lock(&queue->lock);
while (!item.completed)
- pthread_cond_wait(&queue->cond, &queue->lock);
- pthread_mutex_unlock(&queue->lock);
+ mp_cond_wait(&queue->cond, &queue->lock);
+ mp_mutex_unlock(&queue->lock);
}
// Process any outstanding dispatch items in the queue. This also handles
@@ -252,18 +271,18 @@ void mp_dispatch_run(struct mp_dispatch_queue *queue,
// no enqueued callback can call the lock/unlock functions).
void mp_dispatch_queue_process(struct mp_dispatch_queue *queue, double timeout)
{
- pthread_mutex_lock(&queue->lock);
- queue->wait = timeout > 0 ? mp_add_timeout(mp_time_us(), timeout) : 0;
+ mp_mutex_lock(&queue->lock);
+ queue->wait = timeout > 0 ? mp_time_ns_add(mp_time_ns(), timeout) : 0;
assert(!queue->in_process); // recursion not allowed
queue->in_process = true;
- queue->in_process_thread = pthread_self();
+ queue->in_process_thread_id = mp_thread_current_id();
// Wake up thread which called mp_dispatch_lock().
if (queue->lock_requests)
- pthread_cond_broadcast(&queue->cond);
+ mp_cond_broadcast(&queue->cond);
while (1) {
if (queue->lock_requests) {
// Block due to something having called mp_dispatch_lock().
- pthread_cond_wait(&queue->cond, &queue->lock);
+ mp_cond_wait(&queue->cond, &queue->lock);
} else if (queue->head) {
struct mp_dispatch_item *item = queue->head;
queue->head = item->next;
@@ -276,23 +295,22 @@ void mp_dispatch_queue_process(struct mp_dispatch_queue *queue, double timeout)
// from mp_dispatch_lock(), which is done by locked=true.
assert(!queue->locked);
queue->locked = true;
- pthread_mutex_unlock(&queue->lock);
+ mp_mutex_unlock(&queue->lock);
item->fn(item->fn_data);
- pthread_mutex_lock(&queue->lock);
+ mp_mutex_lock(&queue->lock);
assert(queue->locked);
queue->locked = false;
// Wakeup mp_dispatch_run(), also mp_dispatch_lock().
- pthread_cond_broadcast(&queue->cond);
+ mp_cond_broadcast(&queue->cond);
if (item->asynchronous) {
talloc_free(item);
} else {
item->completed = true;
}
} else if (queue->wait > 0 && !queue->interrupted) {
- struct timespec ts = mp_time_us_to_timespec(queue->wait);
- if (pthread_cond_timedwait(&queue->cond, &queue->lock, &ts))
+ if (mp_cond_timedwait_until(&queue->cond, &queue->lock, queue->wait))
queue->wait = 0;
} else {
break;
@@ -301,7 +319,7 @@ void mp_dispatch_queue_process(struct mp_dispatch_queue *queue, double timeout)
assert(!queue->locked);
queue->in_process = false;
queue->interrupted = false;
- pthread_mutex_unlock(&queue->lock);
+ mp_mutex_unlock(&queue->lock);
}
// If the queue is inside of mp_dispatch_queue_process(), make it return as
@@ -312,10 +330,10 @@ void mp_dispatch_queue_process(struct mp_dispatch_queue *queue, double timeout)
// wakeup the main thread from another thread in a race free way).
void mp_dispatch_interrupt(struct mp_dispatch_queue *queue)
{
- pthread_mutex_lock(&queue->lock);
+ mp_mutex_lock(&queue->lock);
queue->interrupted = true;
- pthread_cond_broadcast(&queue->cond);
- pthread_mutex_unlock(&queue->lock);
+ mp_cond_broadcast(&queue->cond);
+ mp_mutex_unlock(&queue->lock);
}
// If a mp_dispatch_queue_process() call is in progress, then adjust the maximum
@@ -328,12 +346,12 @@ void mp_dispatch_interrupt(struct mp_dispatch_queue *queue)
// to wait in external APIs.
void mp_dispatch_adjust_timeout(struct mp_dispatch_queue *queue, int64_t until)
{
- pthread_mutex_lock(&queue->lock);
+ mp_mutex_lock(&queue->lock);
if (queue->in_process && queue->wait > until) {
queue->wait = until;
- pthread_cond_broadcast(&queue->cond);
+ mp_cond_broadcast(&queue->cond);
}
- pthread_mutex_unlock(&queue->lock);
+ mp_mutex_unlock(&queue->lock);
}
// Grant exclusive access to the target thread's state. While this is active,
@@ -345,53 +363,55 @@ void mp_dispatch_adjust_timeout(struct mp_dispatch_queue *queue, int64_t until)
// already holding the dispatch lock.
void mp_dispatch_lock(struct mp_dispatch_queue *queue)
{
- pthread_mutex_lock(&queue->lock);
+ mp_mutex_lock(&queue->lock);
// Must not be called recursively from dispatched callbacks.
if (queue->in_process)
- assert(!pthread_equal(queue->in_process_thread, pthread_self()));
+ assert(!mp_thread_id_equal(queue->in_process_thread_id, mp_thread_current_id()));
// Must not be called recursively at all.
if (queue->locked_explicit)
- assert(!pthread_equal(queue->locked_explicit_thread, pthread_self()));
+ assert(!mp_thread_id_equal(queue->locked_explicit_thread_id, mp_thread_current_id()));
queue->lock_requests += 1;
// And now wait until the target thread gets "trapped" within the
// mp_dispatch_queue_process() call, which will mean we get exclusive
// access to the target's thread state.
+ if (queue->onlock_fn)
+ queue->onlock_fn(queue->onlock_ctx);
while (!queue->in_process) {
- pthread_mutex_unlock(&queue->lock);
+ mp_mutex_unlock(&queue->lock);
if (queue->wakeup_fn)
queue->wakeup_fn(queue->wakeup_ctx);
- pthread_mutex_lock(&queue->lock);
+ mp_mutex_lock(&queue->lock);
if (queue->in_process)
break;
- pthread_cond_wait(&queue->cond, &queue->lock);
+ mp_cond_wait(&queue->cond, &queue->lock);
}
// Wait until we can get the lock.
while (!queue->in_process || queue->locked)
- pthread_cond_wait(&queue->cond, &queue->lock);
+ mp_cond_wait(&queue->cond, &queue->lock);
// "Lock".
assert(queue->lock_requests);
assert(!queue->locked);
assert(!queue->locked_explicit);
queue->locked = true;
queue->locked_explicit = true;
- queue->locked_explicit_thread = pthread_self();
- pthread_mutex_unlock(&queue->lock);
+ queue->locked_explicit_thread_id = mp_thread_current_id();
+ mp_mutex_unlock(&queue->lock);
}
// Undo mp_dispatch_lock().
void mp_dispatch_unlock(struct mp_dispatch_queue *queue)
{
- pthread_mutex_lock(&queue->lock);
+ mp_mutex_lock(&queue->lock);
assert(queue->locked);
// Must be called after a mp_dispatch_lock(), from the same thread.
assert(queue->locked_explicit);
- assert(pthread_equal(queue->locked_explicit_thread, pthread_self()));
+ assert(mp_thread_id_equal(queue->locked_explicit_thread_id, mp_thread_current_id()));
// "Unlock".
queue->locked = false;
queue->locked_explicit = false;
queue->lock_requests -= 1;
// Wakeup mp_dispatch_queue_process(), and maybe other mp_dispatch_lock()s.
// (Would be nice to wake up only 1 other locker if lock_requests>0.)
- pthread_cond_broadcast(&queue->cond);
- pthread_mutex_unlock(&queue->lock);
+ mp_cond_broadcast(&queue->cond);
+ mp_mutex_unlock(&queue->lock);
}