diff options
Diffstat (limited to 'misc/dispatch.c')
-rw-r--r-- | misc/dispatch.c | 205 |
1 files changed, 132 insertions, 73 deletions
diff --git a/misc/dispatch.c b/misc/dispatch.c index 2ddac2c92d..086896ba79 100644 --- a/misc/dispatch.c +++ b/misc/dispatch.c @@ -28,19 +28,31 @@ struct mp_dispatch_queue { struct mp_dispatch_item *head, *tail; pthread_mutex_t lock; pthread_cond_t cond; - int suspend_requested; - bool suspended; void (*wakeup_fn)(void *wakeup_ctx); void *wakeup_ctx; - // This lock grant access to the target thread's state during suspend mode. - // During suspend mode, the target thread is blocked in the function - // mp_dispatch_queue_process(), however this function may be processing - // dispatch queue items. This lock serializes the dispatch queue processing - // and external mp_dispatch_lock() calls. - // Invariant: can be held only while suspended==true, and suspend_requested - // must be >0 (unless mp_dispatch_queue_process() locks it). In particular, - // suspend mode must not be left while the lock is held. - pthread_mutex_t exclusive_lock; + // Make mp_dispatch_queue_process() exit if it's idle. + bool interrupted; + // The target thread is blocked by mp_dispatch_queue_process(). Note that + // mp_dispatch_lock() can set this from true to false to keep the thread + // blocked (this stops if from processing other dispatch items, and from + // other threads to return from mp_dispatch_lock(), making it an exclusive + // lock). + bool idling; + // A mp_dispatch_lock() call is requesting an exclusive lock. + bool lock_request; + // Used to block out threads calling mp_dispatch_queue_process() while + // they're externall locked via mp_dispatch_lock(). + // We could use a simple counter (increment it instead of adding a frame, + // also increment it when locking), but with this we can perform some + // minimal debug checks. + struct lock_frame *frame; +}; + +struct lock_frame { + struct lock_frame *prev; + pthread_t thread; + pthread_t locked_thread; + bool locked; }; struct mp_dispatch_item { @@ -55,11 +67,11 @@ static void queue_dtor(void *p) { struct mp_dispatch_queue *queue = p; assert(!queue->head); - assert(!queue->suspend_requested); - assert(!queue->suspended); + assert(!queue->idling); + assert(!queue->lock_request); + assert(!queue->frame); pthread_cond_destroy(&queue->cond); pthread_mutex_destroy(&queue->lock); - pthread_mutex_destroy(&queue->exclusive_lock); } // A dispatch queue lets other threads run callbacks in a target thread. @@ -76,7 +88,6 @@ struct mp_dispatch_queue *mp_dispatch_create(void *ta_parent) struct mp_dispatch_queue *queue = talloc_ptrtype(ta_parent, queue); *queue = (struct mp_dispatch_queue){0}; talloc_set_destructor(queue, queue_dtor); - pthread_mutex_init(&queue->exclusive_lock, NULL); pthread_mutex_init(&queue->lock, NULL); pthread_cond_init(&queue->cond, NULL); return queue; @@ -111,6 +122,10 @@ static void mp_dispatch_append(struct mp_dispatch_queue *queue, // Wake up the main thread; note that other threads might wait on this // condition for reasons, so broadcast the condition. pthread_cond_broadcast(&queue->cond); + // No wakeup callback -> assume mp_dispatch_queue_process() needs to be + // interrupted instead. + if (!queue->wakeup_fn) + queue->interrupted = true; pthread_mutex_unlock(&queue->lock); if (queue->wakeup_fn) queue->wakeup_fn(queue->wakeup_ctx); @@ -170,24 +185,47 @@ void mp_dispatch_run(struct mp_dispatch_queue *queue, } // Process any outstanding dispatch items in the queue. This also handles -// suspending or locking the target thread. +// suspending or locking the this thread from another thread via +// mp_dispatch_lock(). // The timeout specifies the minimum wait time. The actual time spent in this // function can be much higher if the suspending/locking functions are used, or // if executing the dispatch items takes time. On the other hand, this function // can return much earlier than the timeout due to sporadic wakeups. -// It is also guaranteed that if at least one queue item was processed, the -// function will return as soon as possible, ignoring the timeout. This -// simplifies users, such as re-checking conditions before waiting. (It will -// still process the remaining queue items, and wait for unsuspend.) +// Note that this will strictly return only after: +// - timeout has passed, +// - all queue items were processed, +// - the possibly acquired lock has been released +// It's possible to cancel the timeout by calling mp_dispatch_interrupt(). void mp_dispatch_queue_process(struct mp_dispatch_queue *queue, double timeout) { int64_t wait = timeout > 0 ? mp_add_timeout(mp_time_us(), timeout) : 0; + struct lock_frame frame = { + .thread = pthread_self(), + }; + pthread_mutex_lock(&queue->lock); - queue->suspended = true; - // Wake up thread which called mp_dispatch_suspend(). - pthread_cond_broadcast(&queue->cond); - while (queue->head || queue->suspend_requested || wait > 0) { - if (queue->head) { + frame.prev = queue->frame; + queue->frame = &frame; + // Logically, the queue is idling if the target thread is blocked in + // mp_dispatch_queue_process() doing nothing, so it's not possible to call + // it again. (Reentrant calls via callbacks temporarily reset the field.) + assert(!queue->idling); + queue->idling = true; + // Wake up thread which called mp_dispatch_lock(). + if (queue->lock_request) + pthread_cond_broadcast(&queue->cond); + while (1) { + if (queue->lock_request || queue->frame != &frame || frame.locked) { + // Block due to something having called mp_dispatch_lock(). This + // is either a lock "acquire" (lock_request=true), or a lock in + // progress, with the possibility the thread which called + // mp_dispatch_lock() is now calling mp_dispatch_queue_process() + // (the latter means we must ignore any queue state changes, + // until it has been unlocked again). + pthread_cond_wait(&queue->cond, &queue->lock); + if (queue->frame == &frame && !frame.locked) + assert(queue->idling); + } else if (queue->head) { struct mp_dispatch_item *item = queue->head; queue->head = item->next; if (!queue->head) @@ -195,66 +233,50 @@ void mp_dispatch_queue_process(struct mp_dispatch_queue *queue, double timeout) item->next = NULL; // Unlock, because we want to allow other threads to queue items // while the dispatch item is processed. - // At the same time, exclusive_lock must be held to protect the - // thread's user state. + // At the same time, we must prevent other threads from returning + // from mp_dispatch_lock(), which is done by idling=false. + queue->idling = false; pthread_mutex_unlock(&queue->lock); - pthread_mutex_lock(&queue->exclusive_lock); + item->fn(item->fn_data); - pthread_mutex_unlock(&queue->exclusive_lock); + pthread_mutex_lock(&queue->lock); + assert(!queue->idling); + queue->idling = true; + // Wakeup mp_dispatch_run(), also mp_dispatch_lock(). + pthread_cond_broadcast(&queue->cond); if (item->asynchronous) { talloc_free(item); } else { item->completed = true; - // Wakeup mp_dispatch_run() - pthread_cond_broadcast(&queue->cond); } + } else if (wait > 0 && !queue->interrupted) { + struct timespec ts = mp_time_us_to_timespec(wait); + if (pthread_cond_timedwait(&queue->cond, &queue->lock, &ts)) + wait = 0; } else { - if (wait > 0) { - struct timespec ts = mp_time_us_to_timespec(wait); - pthread_cond_timedwait(&queue->cond, &queue->lock, &ts); - } else { - pthread_cond_wait(&queue->cond, &queue->lock); - } - } - wait = 0; - } - queue->suspended = false; - pthread_mutex_unlock(&queue->lock); -} - -// Set the target thread into suspend mode: in this mode, the thread will enter -// mp_dispatch_queue_process(), process any outstanding dispatch items, and -// wait for new items when done (instead of exiting the process function). -// Multiple threads can enter suspend mode at the same time. Suspend mode is -// not a synchronization mechanism; it merely makes sure the target thread does -// not leave mp_dispatch_queue_process(), even if it's done. mp_dispatch_lock() -// can be used for exclusive access. -void mp_dispatch_suspend(struct mp_dispatch_queue *queue) -{ - pthread_mutex_lock(&queue->lock); - queue->suspend_requested++; - while (!queue->suspended) { - pthread_mutex_unlock(&queue->lock); - if (queue->wakeup_fn) - queue->wakeup_fn(queue->wakeup_ctx); - pthread_mutex_lock(&queue->lock); - if (queue->suspended) break; - pthread_cond_wait(&queue->cond, &queue->lock); + } } + queue->idling = false; + assert(!frame.locked); + assert(queue->frame == &frame); + queue->frame = frame.prev; + queue->interrupted = false; pthread_mutex_unlock(&queue->lock); } -// Undo mp_dispatch_suspend(). -void mp_dispatch_resume(struct mp_dispatch_queue *queue) +// If the queue is inside of mp_dispatch_queue_process(), make it return as +// soon as all work items have been run, without waiting for the timeout. This +// does not make it return early if it's blocked by a mp_dispatch_lock(). +// If mp_dispatch_queue_process() is called in a reentrant way (including the +// case where another thread calls mp_dispatch_lock() and then +// mp_dispatch_queue_process()), this affects only the "topmost" invocation. +void mp_dispatch_interrupt(struct mp_dispatch_queue *queue) { pthread_mutex_lock(&queue->lock); - assert(queue->suspended); - assert(queue->suspend_requested > 0); - queue->suspend_requested--; - if (queue->suspend_requested == 0) - pthread_cond_broadcast(&queue->cond); + queue->interrupted = true; + pthread_cond_broadcast(&queue->cond); pthread_mutex_unlock(&queue->lock); } @@ -265,13 +287,50 @@ void mp_dispatch_resume(struct mp_dispatch_queue *queue) // and the mutex behavior applies to this function only. void mp_dispatch_lock(struct mp_dispatch_queue *queue) { - mp_dispatch_suspend(queue); - pthread_mutex_lock(&queue->exclusive_lock); + pthread_mutex_lock(&queue->lock); + // First grab the queue lock. Something else could be holding the lock. + while (queue->lock_request) + pthread_cond_wait(&queue->cond, &queue->lock); + queue->lock_request = true; + // And now wait until the target thread gets "trapped" within the + // mp_dispatch_queue_process() call, which will mean we get exclusive + // access to the target's thread state. + while (!queue->idling) { + pthread_mutex_unlock(&queue->lock); + if (queue->wakeup_fn) + queue->wakeup_fn(queue->wakeup_ctx); + pthread_mutex_lock(&queue->lock); + if (queue->idling) + break; + pthread_cond_wait(&queue->cond, &queue->lock); + } + assert(queue->lock_request); + assert(queue->frame); // must be set if idling + assert(!queue->frame->locked); // no recursive locking on the same level + // "Lock". + queue->frame->locked = true; + queue->frame->locked_thread = pthread_self(); + // Reset state for recursive mp_dispatch_queue_process() calls. + queue->lock_request = false; + queue->idling = false; + pthread_mutex_unlock(&queue->lock); } // Undo mp_dispatch_lock(). void mp_dispatch_unlock(struct mp_dispatch_queue *queue) { - pthread_mutex_unlock(&queue->exclusive_lock); - mp_dispatch_resume(queue); + pthread_mutex_lock(&queue->lock); + // Must be called atfer a mp_dispatch_lock(). + assert(queue->frame); + assert(queue->frame->locked); + assert(pthread_equal(queue->frame->locked_thread, pthread_self())); + // "Unlock". + queue->frame->locked = false; + // This must have been set to false during locking (except temporarily + // during recursive mp_dispatch_queue_process() calls). + assert(!queue->idling); + queue->idling = true; + // Wakeup mp_dispatch_queue_process(), and maybe other mp_dispatch_lock()s. + pthread_cond_broadcast(&queue->cond); + pthread_mutex_unlock(&queue->lock); } |