From 3878a59e2c2ecca25ea9c94d0082ed2df3d52796 Mon Sep 17 00:00:00 2001 From: wm4 Date: Sun, 4 Sep 2016 17:00:21 +0200 Subject: dispatch: redo locking, and allow reentrant processing A deadlock bug was reported with the following test program: mpv_handle *mpv = mpv_create(); mpv_set_option_string(mpv, "ytdl", "yes"); mpv_initialize(mpv); mpv_terminate_destroy(mpv); The cause of this is loading the ytdl.lua script, which triggers a certain code path that calls mp_dispatch_queue_process() recursively. It does so to wait until the script is loaded, and we want to keep that. Reentrancy was not supported by mp_dispatch, which leads to the deadlock. Rewrite the locking so that it does. We mainly get rid of the "exclusive_lock" mutex. Instead we use the existing lock/condition variable to wait until we can grab a logical lock. Note that the lock_frame business can be replaced with a simple counter. Instead of checking the lock_frame address, it'd simply increment and store the counter when entering mp_dispatch_queue_process(), and then compare the counter to decide whether or not to wait. But I think the additional error checking done by the lock_frame list is valuable. Fixes #3489. --- misc/dispatch.c | 171 +++++++++++++++++++++++++++++++++----------------------- 1 file changed, 101 insertions(+), 70 deletions(-) (limited to 'misc') diff --git a/misc/dispatch.c b/misc/dispatch.c index 0c8727d053..37807d8d09 100644 --- a/misc/dispatch.c +++ b/misc/dispatch.c @@ -28,19 +28,22 @@ struct mp_dispatch_queue { struct mp_dispatch_item *head, *tail; pthread_mutex_t lock; pthread_cond_t cond; - int suspend_requested; - bool suspended; void (*wakeup_fn)(void *wakeup_ctx); void *wakeup_ctx; - // This lock grant access to the target thread's state during suspend mode. - // During suspend mode, the target thread is blocked in the function - // mp_dispatch_queue_process(), however this function may be processing - // dispatch queue items. This lock serializes the dispatch queue processing - // and external mp_dispatch_lock() calls. - // Invariant: can be held only while suspended==true, and suspend_requested - // must be >0 (unless mp_dispatch_queue_process() locks it). In particular, - // suspend mode must not be left while the lock is held. - pthread_mutex_t exclusive_lock; + // The target thread is blocked by mp_dispatch_queue_process(). + bool idling; + // A mp_dispatch_lock() call is requesting an exclusive lock. + bool lock_request; + // Used to block out threads calling mp_dispatch_queue_process() while + // they're externall locked via mp_dispatch_lock(). + // We could use a simple counter, but with this we can perform some + // minimal debug checks. + struct lock_frame *frame; +}; + +struct lock_frame { + struct lock_frame *prev; + pthread_t thread; }; struct mp_dispatch_item { @@ -55,11 +58,11 @@ static void queue_dtor(void *p) { struct mp_dispatch_queue *queue = p; assert(!queue->head); - assert(!queue->suspend_requested); - assert(!queue->suspended); + assert(!queue->idling); + assert(!queue->lock_request); + assert(!queue->frame); pthread_cond_destroy(&queue->cond); pthread_mutex_destroy(&queue->lock); - pthread_mutex_destroy(&queue->exclusive_lock); } // A dispatch queue lets other threads run callbacks in a target thread. @@ -76,7 +79,6 @@ struct mp_dispatch_queue *mp_dispatch_create(void *ta_parent) struct mp_dispatch_queue *queue = talloc_ptrtype(ta_parent, queue); *queue = (struct mp_dispatch_queue){0}; talloc_set_destructor(queue, queue_dtor); - pthread_mutex_init(&queue->exclusive_lock, NULL); pthread_mutex_init(&queue->lock, NULL); pthread_cond_init(&queue->cond, NULL); return queue; @@ -170,7 +172,8 @@ void mp_dispatch_run(struct mp_dispatch_queue *queue, } // Process any outstanding dispatch items in the queue. This also handles -// suspending or locking the target thread. +// suspending or locking the this thread from another thread via +// mp_dispatch_lock(). // The timeout specifies the minimum wait time. The actual time spent in this // function can be much higher if the suspending/locking functions are used, or // if executing the dispatch items takes time. On the other hand, this function @@ -182,12 +185,33 @@ void mp_dispatch_run(struct mp_dispatch_queue *queue, void mp_dispatch_queue_process(struct mp_dispatch_queue *queue, double timeout) { int64_t wait = timeout > 0 ? mp_add_timeout(mp_time_us(), timeout) : 0; + struct lock_frame frame = { + .thread = pthread_self(), + }; + pthread_mutex_lock(&queue->lock); - queue->suspended = true; - // Wake up thread which called mp_dispatch_suspend(). - pthread_cond_broadcast(&queue->cond); - while (queue->head || queue->suspend_requested || wait > 0) { - if (queue->head) { + frame.prev = queue->frame; + queue->frame = &frame; + // Logically, the queue is idling if the target thread is blocked in + // mp_dispatch_queue_process() doing nothing, so it's not possible to call + // it again. (Reentrant calls via callbacks temporarily reset the field.) + assert(!queue->idling); + queue->idling = true; + // Wake up thread which called mp_dispatch_lock(). + if (queue->lock_request) + pthread_cond_broadcast(&queue->cond); + while (1) { + if (queue->lock_request || queue->frame != &frame) { + // Block due to something having called mp_dispatch_lock(). This + // is either a lock "acquire" (lock_request=true), or a lock in + // progress, with the possibility the thread which called + // mp_dispatch_lock() is now calling mp_dispatch_queue_process() + // (the latter means we must ignore any queue state changes, + // until it has been unlocked again). + pthread_cond_wait(&queue->cond, &queue->lock); + if (queue->frame == &frame) + assert(queue->idling); + } else if (queue->head) { struct mp_dispatch_item *item = queue->head; queue->head = item->next; if (!queue->head) @@ -195,83 +219,90 @@ void mp_dispatch_queue_process(struct mp_dispatch_queue *queue, double timeout) item->next = NULL; // Unlock, because we want to allow other threads to queue items // while the dispatch item is processed. - // At the same time, exclusive_lock must be held to protect the - // thread's user state. + // At the same time, we must prevent other threads from returning + // from mp_dispatch_lock(), which is done by idling=false. + queue->idling = false; pthread_mutex_unlock(&queue->lock); - pthread_mutex_lock(&queue->exclusive_lock); + item->fn(item->fn_data); - pthread_mutex_unlock(&queue->exclusive_lock); + pthread_mutex_lock(&queue->lock); + assert(!queue->idling); + queue->idling = true; + // Wakeup mp_dispatch_run(), also mp_dispatch_lock(). + pthread_cond_broadcast(&queue->cond); if (item->asynchronous) { talloc_free(item); } else { item->completed = true; - // Wakeup mp_dispatch_run() - pthread_cond_broadcast(&queue->cond); } + } else if (wait > 0) { + struct timespec ts = mp_time_us_to_timespec(wait); + pthread_cond_timedwait(&queue->cond, &queue->lock, &ts); } else { - if (wait > 0) { - struct timespec ts = mp_time_us_to_timespec(wait); - pthread_cond_timedwait(&queue->cond, &queue->lock, &ts); - } else { - pthread_cond_wait(&queue->cond, &queue->lock); - } + break; } wait = 0; } - queue->suspended = false; + queue->idling = false; + assert(queue->frame == &frame); + queue->frame = frame.prev; pthread_mutex_unlock(&queue->lock); } -// Set the target thread into suspend mode: in this mode, the thread will enter -// mp_dispatch_queue_process(), process any outstanding dispatch items, and -// wait for new items when done (instead of exiting the process function). -// Multiple threads can enter suspend mode at the same time. Suspend mode is -// not a synchronization mechanism; it merely makes sure the target thread does -// not leave mp_dispatch_queue_process(), even if it's done. mp_dispatch_lock() -// can be used for exclusive access. -static void mp_dispatch_suspend(struct mp_dispatch_queue *queue) +// Grant exclusive access to the target thread's state. While this is active, +// no other thread can return from mp_dispatch_lock() (i.e. it behaves like +// a pthread mutex), and no other thread can get dispatch items completed. +// Other threads can still queue asynchronous dispatch items without waiting, +// and the mutex behavior applies to this function only. +void mp_dispatch_lock(struct mp_dispatch_queue *queue) { + struct lock_frame *frame = talloc_zero(NULL, struct lock_frame); + frame->thread = pthread_self(); + pthread_mutex_lock(&queue->lock); - queue->suspend_requested++; - while (!queue->suspended) { + // First grab the queue lock. Something else could be holding the lock. + while (queue->lock_request) + pthread_cond_wait(&queue->cond, &queue->lock); + queue->lock_request = true; + // And now wait until the target thread gets "trapped" within the + // mp_dispatch_queue_process() call, which will mean we get exclusive + // access to the target's thread state. + while (!queue->idling) { pthread_mutex_unlock(&queue->lock); if (queue->wakeup_fn) queue->wakeup_fn(queue->wakeup_ctx); pthread_mutex_lock(&queue->lock); - if (queue->suspended) + if (queue->idling) break; pthread_cond_wait(&queue->cond, &queue->lock); } + assert(queue->lock_request); + // "Lock". + frame->prev = queue->frame; + queue->frame = frame; + // Reset state for recursive mp_dispatch_queue_process() calls. + queue->lock_request = false; + queue->idling = false; pthread_mutex_unlock(&queue->lock); } -// Undo mp_dispatch_suspend(). -static void mp_dispatch_resume(struct mp_dispatch_queue *queue) -{ - pthread_mutex_lock(&queue->lock); - assert(queue->suspended); - assert(queue->suspend_requested > 0); - queue->suspend_requested--; - if (queue->suspend_requested == 0) - pthread_cond_broadcast(&queue->cond); - pthread_mutex_unlock(&queue->lock); -} - -// Grant exclusive access to the target thread's state. While this is active, -// no other thread can return from mp_dispatch_lock() (i.e. it behaves like -// a pthread mutex), and no other thread can get dispatch items completed. -// Other threads can still queue asynchronous dispatch items without waiting, -// and the mutex behavior applies to this function only. -void mp_dispatch_lock(struct mp_dispatch_queue *queue) -{ - mp_dispatch_suspend(queue); - pthread_mutex_lock(&queue->exclusive_lock); -} - // Undo mp_dispatch_lock(). void mp_dispatch_unlock(struct mp_dispatch_queue *queue) { - pthread_mutex_unlock(&queue->exclusive_lock); - mp_dispatch_resume(queue); + pthread_mutex_lock(&queue->lock); + struct lock_frame *frame = queue->frame; + // Must be called atfer a mp_dispatch_lock(). + assert(frame); + assert(pthread_equal(frame->thread, pthread_self())); + // "Unlock". + queue->frame = frame->prev; + talloc_free(frame); + // This must have been set to false during locking (except temporarily + // during recursive mp_dispatch_queue_process() calls). + assert(!queue->idling); + queue->idling = true; + // Wakeup mp_dispatch_queue_process(), and maybe other mp_dispatch_lock()s. + pthread_cond_broadcast(&queue->cond); + pthread_mutex_unlock(&queue->lock); } -- cgit v1.2.3