summaryrefslogtreecommitdiffstats
path: root/misc/dispatch.c
diff options
context:
space:
mode:
Diffstat (limited to 'misc/dispatch.c')
-rw-r--r--misc/dispatch.c205
1 files changed, 132 insertions, 73 deletions
diff --git a/misc/dispatch.c b/misc/dispatch.c
index 2ddac2c92d..086896ba79 100644
--- a/misc/dispatch.c
+++ b/misc/dispatch.c
@@ -28,19 +28,31 @@ struct mp_dispatch_queue {
struct mp_dispatch_item *head, *tail;
pthread_mutex_t lock;
pthread_cond_t cond;
- int suspend_requested;
- bool suspended;
void (*wakeup_fn)(void *wakeup_ctx);
void *wakeup_ctx;
- // This lock grant access to the target thread's state during suspend mode.
- // During suspend mode, the target thread is blocked in the function
- // mp_dispatch_queue_process(), however this function may be processing
- // dispatch queue items. This lock serializes the dispatch queue processing
- // and external mp_dispatch_lock() calls.
- // Invariant: can be held only while suspended==true, and suspend_requested
- // must be >0 (unless mp_dispatch_queue_process() locks it). In particular,
- // suspend mode must not be left while the lock is held.
- pthread_mutex_t exclusive_lock;
+ // Make mp_dispatch_queue_process() exit if it's idle.
+ bool interrupted;
+ // The target thread is blocked by mp_dispatch_queue_process(). Note that
+ // mp_dispatch_lock() can set this from true to false to keep the thread
+ // blocked (this stops if from processing other dispatch items, and from
+ // other threads to return from mp_dispatch_lock(), making it an exclusive
+ // lock).
+ bool idling;
+ // A mp_dispatch_lock() call is requesting an exclusive lock.
+ bool lock_request;
+ // Used to block out threads calling mp_dispatch_queue_process() while
+ // they're externall locked via mp_dispatch_lock().
+ // We could use a simple counter (increment it instead of adding a frame,
+ // also increment it when locking), but with this we can perform some
+ // minimal debug checks.
+ struct lock_frame *frame;
+};
+
+struct lock_frame {
+ struct lock_frame *prev;
+ pthread_t thread;
+ pthread_t locked_thread;
+ bool locked;
};
struct mp_dispatch_item {
@@ -55,11 +67,11 @@ static void queue_dtor(void *p)
{
struct mp_dispatch_queue *queue = p;
assert(!queue->head);
- assert(!queue->suspend_requested);
- assert(!queue->suspended);
+ assert(!queue->idling);
+ assert(!queue->lock_request);
+ assert(!queue->frame);
pthread_cond_destroy(&queue->cond);
pthread_mutex_destroy(&queue->lock);
- pthread_mutex_destroy(&queue->exclusive_lock);
}
// A dispatch queue lets other threads run callbacks in a target thread.
@@ -76,7 +88,6 @@ struct mp_dispatch_queue *mp_dispatch_create(void *ta_parent)
struct mp_dispatch_queue *queue = talloc_ptrtype(ta_parent, queue);
*queue = (struct mp_dispatch_queue){0};
talloc_set_destructor(queue, queue_dtor);
- pthread_mutex_init(&queue->exclusive_lock, NULL);
pthread_mutex_init(&queue->lock, NULL);
pthread_cond_init(&queue->cond, NULL);
return queue;
@@ -111,6 +122,10 @@ static void mp_dispatch_append(struct mp_dispatch_queue *queue,
// Wake up the main thread; note that other threads might wait on this
// condition for reasons, so broadcast the condition.
pthread_cond_broadcast(&queue->cond);
+ // No wakeup callback -> assume mp_dispatch_queue_process() needs to be
+ // interrupted instead.
+ if (!queue->wakeup_fn)
+ queue->interrupted = true;
pthread_mutex_unlock(&queue->lock);
if (queue->wakeup_fn)
queue->wakeup_fn(queue->wakeup_ctx);
@@ -170,24 +185,47 @@ void mp_dispatch_run(struct mp_dispatch_queue *queue,
}
// Process any outstanding dispatch items in the queue. This also handles
-// suspending or locking the target thread.
+// suspending or locking the this thread from another thread via
+// mp_dispatch_lock().
// The timeout specifies the minimum wait time. The actual time spent in this
// function can be much higher if the suspending/locking functions are used, or
// if executing the dispatch items takes time. On the other hand, this function
// can return much earlier than the timeout due to sporadic wakeups.
-// It is also guaranteed that if at least one queue item was processed, the
-// function will return as soon as possible, ignoring the timeout. This
-// simplifies users, such as re-checking conditions before waiting. (It will
-// still process the remaining queue items, and wait for unsuspend.)
+// Note that this will strictly return only after:
+// - timeout has passed,
+// - all queue items were processed,
+// - the possibly acquired lock has been released
+// It's possible to cancel the timeout by calling mp_dispatch_interrupt().
void mp_dispatch_queue_process(struct mp_dispatch_queue *queue, double timeout)
{
int64_t wait = timeout > 0 ? mp_add_timeout(mp_time_us(), timeout) : 0;
+ struct lock_frame frame = {
+ .thread = pthread_self(),
+ };
+
pthread_mutex_lock(&queue->lock);
- queue->suspended = true;
- // Wake up thread which called mp_dispatch_suspend().
- pthread_cond_broadcast(&queue->cond);
- while (queue->head || queue->suspend_requested || wait > 0) {
- if (queue->head) {
+ frame.prev = queue->frame;
+ queue->frame = &frame;
+ // Logically, the queue is idling if the target thread is blocked in
+ // mp_dispatch_queue_process() doing nothing, so it's not possible to call
+ // it again. (Reentrant calls via callbacks temporarily reset the field.)
+ assert(!queue->idling);
+ queue->idling = true;
+ // Wake up thread which called mp_dispatch_lock().
+ if (queue->lock_request)
+ pthread_cond_broadcast(&queue->cond);
+ while (1) {
+ if (queue->lock_request || queue->frame != &frame || frame.locked) {
+ // Block due to something having called mp_dispatch_lock(). This
+ // is either a lock "acquire" (lock_request=true), or a lock in
+ // progress, with the possibility the thread which called
+ // mp_dispatch_lock() is now calling mp_dispatch_queue_process()
+ // (the latter means we must ignore any queue state changes,
+ // until it has been unlocked again).
+ pthread_cond_wait(&queue->cond, &queue->lock);
+ if (queue->frame == &frame && !frame.locked)
+ assert(queue->idling);
+ } else if (queue->head) {
struct mp_dispatch_item *item = queue->head;
queue->head = item->next;
if (!queue->head)
@@ -195,66 +233,50 @@ void mp_dispatch_queue_process(struct mp_dispatch_queue *queue, double timeout)
item->next = NULL;
// Unlock, because we want to allow other threads to queue items
// while the dispatch item is processed.
- // At the same time, exclusive_lock must be held to protect the
- // thread's user state.
+ // At the same time, we must prevent other threads from returning
+ // from mp_dispatch_lock(), which is done by idling=false.
+ queue->idling = false;
pthread_mutex_unlock(&queue->lock);
- pthread_mutex_lock(&queue->exclusive_lock);
+
item->fn(item->fn_data);
- pthread_mutex_unlock(&queue->exclusive_lock);
+
pthread_mutex_lock(&queue->lock);
+ assert(!queue->idling);
+ queue->idling = true;
+ // Wakeup mp_dispatch_run(), also mp_dispatch_lock().
+ pthread_cond_broadcast(&queue->cond);
if (item->asynchronous) {
talloc_free(item);
} else {
item->completed = true;
- // Wakeup mp_dispatch_run()
- pthread_cond_broadcast(&queue->cond);
}
+ } else if (wait > 0 && !queue->interrupted) {
+ struct timespec ts = mp_time_us_to_timespec(wait);
+ if (pthread_cond_timedwait(&queue->cond, &queue->lock, &ts))
+ wait = 0;
} else {
- if (wait > 0) {
- struct timespec ts = mp_time_us_to_timespec(wait);
- pthread_cond_timedwait(&queue->cond, &queue->lock, &ts);
- } else {
- pthread_cond_wait(&queue->cond, &queue->lock);
- }
- }
- wait = 0;
- }
- queue->suspended = false;
- pthread_mutex_unlock(&queue->lock);
-}
-
-// Set the target thread into suspend mode: in this mode, the thread will enter
-// mp_dispatch_queue_process(), process any outstanding dispatch items, and
-// wait for new items when done (instead of exiting the process function).
-// Multiple threads can enter suspend mode at the same time. Suspend mode is
-// not a synchronization mechanism; it merely makes sure the target thread does
-// not leave mp_dispatch_queue_process(), even if it's done. mp_dispatch_lock()
-// can be used for exclusive access.
-void mp_dispatch_suspend(struct mp_dispatch_queue *queue)
-{
- pthread_mutex_lock(&queue->lock);
- queue->suspend_requested++;
- while (!queue->suspended) {
- pthread_mutex_unlock(&queue->lock);
- if (queue->wakeup_fn)
- queue->wakeup_fn(queue->wakeup_ctx);
- pthread_mutex_lock(&queue->lock);
- if (queue->suspended)
break;
- pthread_cond_wait(&queue->cond, &queue->lock);
+ }
}
+ queue->idling = false;
+ assert(!frame.locked);
+ assert(queue->frame == &frame);
+ queue->frame = frame.prev;
+ queue->interrupted = false;
pthread_mutex_unlock(&queue->lock);
}
-// Undo mp_dispatch_suspend().
-void mp_dispatch_resume(struct mp_dispatch_queue *queue)
+// If the queue is inside of mp_dispatch_queue_process(), make it return as
+// soon as all work items have been run, without waiting for the timeout. This
+// does not make it return early if it's blocked by a mp_dispatch_lock().
+// If mp_dispatch_queue_process() is called in a reentrant way (including the
+// case where another thread calls mp_dispatch_lock() and then
+// mp_dispatch_queue_process()), this affects only the "topmost" invocation.
+void mp_dispatch_interrupt(struct mp_dispatch_queue *queue)
{
pthread_mutex_lock(&queue->lock);
- assert(queue->suspended);
- assert(queue->suspend_requested > 0);
- queue->suspend_requested--;
- if (queue->suspend_requested == 0)
- pthread_cond_broadcast(&queue->cond);
+ queue->interrupted = true;
+ pthread_cond_broadcast(&queue->cond);
pthread_mutex_unlock(&queue->lock);
}
@@ -265,13 +287,50 @@ void mp_dispatch_resume(struct mp_dispatch_queue *queue)
// and the mutex behavior applies to this function only.
void mp_dispatch_lock(struct mp_dispatch_queue *queue)
{
- mp_dispatch_suspend(queue);
- pthread_mutex_lock(&queue->exclusive_lock);
+ pthread_mutex_lock(&queue->lock);
+ // First grab the queue lock. Something else could be holding the lock.
+ while (queue->lock_request)
+ pthread_cond_wait(&queue->cond, &queue->lock);
+ queue->lock_request = true;
+ // And now wait until the target thread gets "trapped" within the
+ // mp_dispatch_queue_process() call, which will mean we get exclusive
+ // access to the target's thread state.
+ while (!queue->idling) {
+ pthread_mutex_unlock(&queue->lock);
+ if (queue->wakeup_fn)
+ queue->wakeup_fn(queue->wakeup_ctx);
+ pthread_mutex_lock(&queue->lock);
+ if (queue->idling)
+ break;
+ pthread_cond_wait(&queue->cond, &queue->lock);
+ }
+ assert(queue->lock_request);
+ assert(queue->frame); // must be set if idling
+ assert(!queue->frame->locked); // no recursive locking on the same level
+ // "Lock".
+ queue->frame->locked = true;
+ queue->frame->locked_thread = pthread_self();
+ // Reset state for recursive mp_dispatch_queue_process() calls.
+ queue->lock_request = false;
+ queue->idling = false;
+ pthread_mutex_unlock(&queue->lock);
}
// Undo mp_dispatch_lock().
void mp_dispatch_unlock(struct mp_dispatch_queue *queue)
{
- pthread_mutex_unlock(&queue->exclusive_lock);
- mp_dispatch_resume(queue);
+ pthread_mutex_lock(&queue->lock);
+ // Must be called atfer a mp_dispatch_lock().
+ assert(queue->frame);
+ assert(queue->frame->locked);
+ assert(pthread_equal(queue->frame->locked_thread, pthread_self()));
+ // "Unlock".
+ queue->frame->locked = false;
+ // This must have been set to false during locking (except temporarily
+ // during recursive mp_dispatch_queue_process() calls).
+ assert(!queue->idling);
+ queue->idling = true;
+ // Wakeup mp_dispatch_queue_process(), and maybe other mp_dispatch_lock()s.
+ pthread_cond_broadcast(&queue->cond);
+ pthread_mutex_unlock(&queue->lock);
}