summaryrefslogtreecommitdiffstats
path: root/misc
diff options
context:
space:
mode:
authorwm4 <wm4@nowhere>2016-09-04 17:00:21 +0200
committerwm4 <wm4@nowhere>2016-09-04 18:05:36 +0200
commit3878a59e2c2ecca25ea9c94d0082ed2df3d52796 (patch)
tree95c2feeac5e96aba5c25f3718a6dc2a8e5088487 /misc
parent2619d8eff4e2c1cf54574acfbd0cd5895e362e45 (diff)
downloadmpv-3878a59e2c2ecca25ea9c94d0082ed2df3d52796.tar.bz2
mpv-3878a59e2c2ecca25ea9c94d0082ed2df3d52796.tar.xz
dispatch: redo locking, and allow reentrant processing
A deadlock bug was reported with the following test program: mpv_handle *mpv = mpv_create(); mpv_set_option_string(mpv, "ytdl", "yes"); mpv_initialize(mpv); mpv_terminate_destroy(mpv); The cause of this is loading the ytdl.lua script, which triggers a certain code path that calls mp_dispatch_queue_process() recursively. It does so to wait until the script is loaded, and we want to keep that. Reentrancy was not supported by mp_dispatch, which leads to the deadlock. Rewrite the locking so that it does. We mainly get rid of the "exclusive_lock" mutex. Instead we use the existing lock/condition variable to wait until we can grab a logical lock. Note that the lock_frame business can be replaced with a simple counter. Instead of checking the lock_frame address, it'd simply increment and store the counter when entering mp_dispatch_queue_process(), and then compare the counter to decide whether or not to wait. But I think the additional error checking done by the lock_frame list is valuable. Fixes #3489.
Diffstat (limited to 'misc')
-rw-r--r--misc/dispatch.c171
1 files changed, 101 insertions, 70 deletions
diff --git a/misc/dispatch.c b/misc/dispatch.c
index 0c8727d053..37807d8d09 100644
--- a/misc/dispatch.c
+++ b/misc/dispatch.c
@@ -28,19 +28,22 @@ struct mp_dispatch_queue {
struct mp_dispatch_item *head, *tail;
pthread_mutex_t lock;
pthread_cond_t cond;
- int suspend_requested;
- bool suspended;
void (*wakeup_fn)(void *wakeup_ctx);
void *wakeup_ctx;
- // This lock grant access to the target thread's state during suspend mode.
- // During suspend mode, the target thread is blocked in the function
- // mp_dispatch_queue_process(), however this function may be processing
- // dispatch queue items. This lock serializes the dispatch queue processing
- // and external mp_dispatch_lock() calls.
- // Invariant: can be held only while suspended==true, and suspend_requested
- // must be >0 (unless mp_dispatch_queue_process() locks it). In particular,
- // suspend mode must not be left while the lock is held.
- pthread_mutex_t exclusive_lock;
+ // The target thread is blocked by mp_dispatch_queue_process().
+ bool idling;
+ // A mp_dispatch_lock() call is requesting an exclusive lock.
+ bool lock_request;
+ // Used to block out threads calling mp_dispatch_queue_process() while
+ // they're externall locked via mp_dispatch_lock().
+ // We could use a simple counter, but with this we can perform some
+ // minimal debug checks.
+ struct lock_frame *frame;
+};
+
+struct lock_frame {
+ struct lock_frame *prev;
+ pthread_t thread;
};
struct mp_dispatch_item {
@@ -55,11 +58,11 @@ static void queue_dtor(void *p)
{
struct mp_dispatch_queue *queue = p;
assert(!queue->head);
- assert(!queue->suspend_requested);
- assert(!queue->suspended);
+ assert(!queue->idling);
+ assert(!queue->lock_request);
+ assert(!queue->frame);
pthread_cond_destroy(&queue->cond);
pthread_mutex_destroy(&queue->lock);
- pthread_mutex_destroy(&queue->exclusive_lock);
}
// A dispatch queue lets other threads run callbacks in a target thread.
@@ -76,7 +79,6 @@ struct mp_dispatch_queue *mp_dispatch_create(void *ta_parent)
struct mp_dispatch_queue *queue = talloc_ptrtype(ta_parent, queue);
*queue = (struct mp_dispatch_queue){0};
talloc_set_destructor(queue, queue_dtor);
- pthread_mutex_init(&queue->exclusive_lock, NULL);
pthread_mutex_init(&queue->lock, NULL);
pthread_cond_init(&queue->cond, NULL);
return queue;
@@ -170,7 +172,8 @@ void mp_dispatch_run(struct mp_dispatch_queue *queue,
}
// Process any outstanding dispatch items in the queue. This also handles
-// suspending or locking the target thread.
+// suspending or locking the this thread from another thread via
+// mp_dispatch_lock().
// The timeout specifies the minimum wait time. The actual time spent in this
// function can be much higher if the suspending/locking functions are used, or
// if executing the dispatch items takes time. On the other hand, this function
@@ -182,12 +185,33 @@ void mp_dispatch_run(struct mp_dispatch_queue *queue,
void mp_dispatch_queue_process(struct mp_dispatch_queue *queue, double timeout)
{
int64_t wait = timeout > 0 ? mp_add_timeout(mp_time_us(), timeout) : 0;
+ struct lock_frame frame = {
+ .thread = pthread_self(),
+ };
+
pthread_mutex_lock(&queue->lock);
- queue->suspended = true;
- // Wake up thread which called mp_dispatch_suspend().
- pthread_cond_broadcast(&queue->cond);
- while (queue->head || queue->suspend_requested || wait > 0) {
- if (queue->head) {
+ frame.prev = queue->frame;
+ queue->frame = &frame;
+ // Logically, the queue is idling if the target thread is blocked in
+ // mp_dispatch_queue_process() doing nothing, so it's not possible to call
+ // it again. (Reentrant calls via callbacks temporarily reset the field.)
+ assert(!queue->idling);
+ queue->idling = true;
+ // Wake up thread which called mp_dispatch_lock().
+ if (queue->lock_request)
+ pthread_cond_broadcast(&queue->cond);
+ while (1) {
+ if (queue->lock_request || queue->frame != &frame) {
+ // Block due to something having called mp_dispatch_lock(). This
+ // is either a lock "acquire" (lock_request=true), or a lock in
+ // progress, with the possibility the thread which called
+ // mp_dispatch_lock() is now calling mp_dispatch_queue_process()
+ // (the latter means we must ignore any queue state changes,
+ // until it has been unlocked again).
+ pthread_cond_wait(&queue->cond, &queue->lock);
+ if (queue->frame == &frame)
+ assert(queue->idling);
+ } else if (queue->head) {
struct mp_dispatch_item *item = queue->head;
queue->head = item->next;
if (!queue->head)
@@ -195,83 +219,90 @@ void mp_dispatch_queue_process(struct mp_dispatch_queue *queue, double timeout)
item->next = NULL;
// Unlock, because we want to allow other threads to queue items
// while the dispatch item is processed.
- // At the same time, exclusive_lock must be held to protect the
- // thread's user state.
+ // At the same time, we must prevent other threads from returning
+ // from mp_dispatch_lock(), which is done by idling=false.
+ queue->idling = false;
pthread_mutex_unlock(&queue->lock);
- pthread_mutex_lock(&queue->exclusive_lock);
+
item->fn(item->fn_data);
- pthread_mutex_unlock(&queue->exclusive_lock);
+
pthread_mutex_lock(&queue->lock);
+ assert(!queue->idling);
+ queue->idling = true;
+ // Wakeup mp_dispatch_run(), also mp_dispatch_lock().
+ pthread_cond_broadcast(&queue->cond);
if (item->asynchronous) {
talloc_free(item);
} else {
item->completed = true;
- // Wakeup mp_dispatch_run()
- pthread_cond_broadcast(&queue->cond);
}
+ } else if (wait > 0) {
+ struct timespec ts = mp_time_us_to_timespec(wait);
+ pthread_cond_timedwait(&queue->cond, &queue->lock, &ts);
} else {
- if (wait > 0) {
- struct timespec ts = mp_time_us_to_timespec(wait);
- pthread_cond_timedwait(&queue->cond, &queue->lock, &ts);
- } else {
- pthread_cond_wait(&queue->cond, &queue->lock);
- }
+ break;
}
wait = 0;
}
- queue->suspended = false;
+ queue->idling = false;
+ assert(queue->frame == &frame);
+ queue->frame = frame.prev;
pthread_mutex_unlock(&queue->lock);
}
-// Set the target thread into suspend mode: in this mode, the thread will enter
-// mp_dispatch_queue_process(), process any outstanding dispatch items, and
-// wait for new items when done (instead of exiting the process function).
-// Multiple threads can enter suspend mode at the same time. Suspend mode is
-// not a synchronization mechanism; it merely makes sure the target thread does
-// not leave mp_dispatch_queue_process(), even if it's done. mp_dispatch_lock()
-// can be used for exclusive access.
-static void mp_dispatch_suspend(struct mp_dispatch_queue *queue)
+// Grant exclusive access to the target thread's state. While this is active,
+// no other thread can return from mp_dispatch_lock() (i.e. it behaves like
+// a pthread mutex), and no other thread can get dispatch items completed.
+// Other threads can still queue asynchronous dispatch items without waiting,
+// and the mutex behavior applies to this function only.
+void mp_dispatch_lock(struct mp_dispatch_queue *queue)
{
+ struct lock_frame *frame = talloc_zero(NULL, struct lock_frame);
+ frame->thread = pthread_self();
+
pthread_mutex_lock(&queue->lock);
- queue->suspend_requested++;
- while (!queue->suspended) {
+ // First grab the queue lock. Something else could be holding the lock.
+ while (queue->lock_request)
+ pthread_cond_wait(&queue->cond, &queue->lock);
+ queue->lock_request = true;
+ // And now wait until the target thread gets "trapped" within the
+ // mp_dispatch_queue_process() call, which will mean we get exclusive
+ // access to the target's thread state.
+ while (!queue->idling) {
pthread_mutex_unlock(&queue->lock);
if (queue->wakeup_fn)
queue->wakeup_fn(queue->wakeup_ctx);
pthread_mutex_lock(&queue->lock);
- if (queue->suspended)
+ if (queue->idling)
break;
pthread_cond_wait(&queue->cond, &queue->lock);
}
+ assert(queue->lock_request);
+ // "Lock".
+ frame->prev = queue->frame;
+ queue->frame = frame;
+ // Reset state for recursive mp_dispatch_queue_process() calls.
+ queue->lock_request = false;
+ queue->idling = false;
pthread_mutex_unlock(&queue->lock);
}
-// Undo mp_dispatch_suspend().
-static void mp_dispatch_resume(struct mp_dispatch_queue *queue)
-{
- pthread_mutex_lock(&queue->lock);
- assert(queue->suspended);
- assert(queue->suspend_requested > 0);
- queue->suspend_requested--;
- if (queue->suspend_requested == 0)
- pthread_cond_broadcast(&queue->cond);
- pthread_mutex_unlock(&queue->lock);
-}
-
-// Grant exclusive access to the target thread's state. While this is active,
-// no other thread can return from mp_dispatch_lock() (i.e. it behaves like
-// a pthread mutex), and no other thread can get dispatch items completed.
-// Other threads can still queue asynchronous dispatch items without waiting,
-// and the mutex behavior applies to this function only.
-void mp_dispatch_lock(struct mp_dispatch_queue *queue)
-{
- mp_dispatch_suspend(queue);
- pthread_mutex_lock(&queue->exclusive_lock);
-}
-
// Undo mp_dispatch_lock().
void mp_dispatch_unlock(struct mp_dispatch_queue *queue)
{
- pthread_mutex_unlock(&queue->exclusive_lock);
- mp_dispatch_resume(queue);
+ pthread_mutex_lock(&queue->lock);
+ struct lock_frame *frame = queue->frame;
+ // Must be called atfer a mp_dispatch_lock().
+ assert(frame);
+ assert(pthread_equal(frame->thread, pthread_self()));
+ // "Unlock".
+ queue->frame = frame->prev;
+ talloc_free(frame);
+ // This must have been set to false during locking (except temporarily
+ // during recursive mp_dispatch_queue_process() calls).
+ assert(!queue->idling);
+ queue->idling = true;
+ // Wakeup mp_dispatch_queue_process(), and maybe other mp_dispatch_lock()s.
+ pthread_cond_broadcast(&queue->cond);
+ pthread_mutex_unlock(&queue->lock);
}