diff options
-rw-r--r-- | misc/dispatch.c | 57 |
1 files changed, 20 insertions, 37 deletions
diff --git a/misc/dispatch.c b/misc/dispatch.c index e64dc425cd..53d1829e88 100644 --- a/misc/dispatch.c +++ b/misc/dispatch.c @@ -28,11 +28,18 @@ struct mp_dispatch_queue { pthread_mutex_t lock; pthread_cond_t cond; int suspend_requested; - int lock_requested; bool suspended; - bool locked; void (*wakeup_fn)(void *wakeup_ctx); void *wakeup_ctx; + // This lock grant access to the target thread's state during suspend mode. + // During suspend mode, the target thread is blocked in the function + // mp_dispatch_queue_process(), however this function may be processing + // dispatch queue items. This lock serializes the dispatch queue processing + // and external mp_dispatch_lock() calls. + // Invariant: can be held only while suspended==true, and suspend_requested + // must be >0 (unless mp_dispatch_queue_process() locks it). In particular, + // suspend mode must not be left while the lock is held. + pthread_mutex_t exclusive_lock; }; struct mp_dispatch_item { @@ -49,9 +56,9 @@ static void queue_dtor(void *p) assert(!queue->head); assert(!queue->suspend_requested); assert(!queue->suspended); - assert(!queue->locked); pthread_cond_destroy(&queue->cond); pthread_mutex_destroy(&queue->lock); + pthread_mutex_destroy(&queue->exclusive_lock); } // A dispatch queue lets other threads runs callbacks in a target thread. @@ -63,6 +70,7 @@ struct mp_dispatch_queue *mp_dispatch_create(void *talloc_parent) struct mp_dispatch_queue *queue = talloc_ptrtype(talloc_parent, queue); *queue = (struct mp_dispatch_queue){0}; talloc_set_destructor(queue, queue_dtor); + pthread_mutex_init(&queue->exclusive_lock, NULL); pthread_mutex_init(&queue->lock, NULL); pthread_cond_init(&queue->cond, NULL); return queue; @@ -166,7 +174,7 @@ void mp_dispatch_queue_process(struct mp_dispatch_queue *queue, double timeout) // Wake up thread which called mp_dispatch_suspend(). pthread_cond_broadcast(&queue->cond); while (queue->head || queue->suspend_requested) { - if (queue->head && !queue->locked) { + if (queue->head) { struct mp_dispatch_item *item = queue->head; queue->head = item->next; if (!queue->head) @@ -174,11 +182,13 @@ void mp_dispatch_queue_process(struct mp_dispatch_queue *queue, double timeout) item->next = NULL; // Unlock, because we want to allow other threads to queue items // while the dispatch item is processed. - queue->locked = true; + // At the same time, exclusive_lock must be held to protect the + // thread's user state. pthread_mutex_unlock(&queue->lock); + pthread_mutex_lock(&queue->exclusive_lock); item->fn(item->fn_data); + pthread_mutex_unlock(&queue->exclusive_lock); pthread_mutex_lock(&queue->lock); - queue->locked = false; if (item->asynchronous) { talloc_free(item); } else { @@ -187,8 +197,6 @@ void mp_dispatch_queue_process(struct mp_dispatch_queue *queue, double timeout) pthread_cond_broadcast(&queue->cond); } } else { - if (!queue->locked && queue->lock_requested) - pthread_cond_broadcast(&queue->cond); pthread_cond_wait(&queue->cond, &queue->lock); } } @@ -236,38 +244,13 @@ void mp_dispatch_resume(struct mp_dispatch_queue *queue) // and the mutex behavior applies to this function only. void mp_dispatch_lock(struct mp_dispatch_queue *queue) { - // TODO: acquiring a lock should probably be serialized with other - // dispatch items to guarantee minimum fairness. - pthread_mutex_lock(&queue->lock); - queue->suspend_requested++; - queue->lock_requested++; - while (1) { - if (queue->suspended && !queue->locked) { - queue->locked = true; - break; - } - if (!queue->suspended) { - pthread_mutex_unlock(&queue->lock); - if (queue->wakeup_fn) - queue->wakeup_fn(queue->wakeup_ctx); - pthread_mutex_lock(&queue->lock); - if (queue->suspended) - continue; - } - pthread_cond_wait(&queue->cond, &queue->lock); - } - queue->lock_requested--; - pthread_mutex_unlock(&queue->lock); + mp_dispatch_suspend(queue); + pthread_mutex_lock(&queue->exclusive_lock); } // Undo mp_dispatch_lock(). void mp_dispatch_unlock(struct mp_dispatch_queue *queue) { - pthread_mutex_lock(&queue->lock); - assert(queue->locked); - assert(queue->suspend_requested > 0); - queue->locked = false; - queue->suspend_requested--; - pthread_cond_broadcast(&queue->cond); - pthread_mutex_unlock(&queue->lock); + pthread_mutex_unlock(&queue->exclusive_lock); + mp_dispatch_resume(queue); } |