summaryrefslogtreecommitdiffstats
path: root/misc
diff options
context:
space:
mode:
Diffstat (limited to 'misc')
-rw-r--r--misc/dispatch.c57
1 files changed, 20 insertions, 37 deletions
diff --git a/misc/dispatch.c b/misc/dispatch.c
index e64dc425cd..53d1829e88 100644
--- a/misc/dispatch.c
+++ b/misc/dispatch.c
@@ -28,11 +28,18 @@ struct mp_dispatch_queue {
pthread_mutex_t lock;
pthread_cond_t cond;
int suspend_requested;
- int lock_requested;
bool suspended;
- bool locked;
void (*wakeup_fn)(void *wakeup_ctx);
void *wakeup_ctx;
+ // This lock grant access to the target thread's state during suspend mode.
+ // During suspend mode, the target thread is blocked in the function
+ // mp_dispatch_queue_process(), however this function may be processing
+ // dispatch queue items. This lock serializes the dispatch queue processing
+ // and external mp_dispatch_lock() calls.
+ // Invariant: can be held only while suspended==true, and suspend_requested
+ // must be >0 (unless mp_dispatch_queue_process() locks it). In particular,
+ // suspend mode must not be left while the lock is held.
+ pthread_mutex_t exclusive_lock;
};
struct mp_dispatch_item {
@@ -49,9 +56,9 @@ static void queue_dtor(void *p)
assert(!queue->head);
assert(!queue->suspend_requested);
assert(!queue->suspended);
- assert(!queue->locked);
pthread_cond_destroy(&queue->cond);
pthread_mutex_destroy(&queue->lock);
+ pthread_mutex_destroy(&queue->exclusive_lock);
}
// A dispatch queue lets other threads runs callbacks in a target thread.
@@ -63,6 +70,7 @@ struct mp_dispatch_queue *mp_dispatch_create(void *talloc_parent)
struct mp_dispatch_queue *queue = talloc_ptrtype(talloc_parent, queue);
*queue = (struct mp_dispatch_queue){0};
talloc_set_destructor(queue, queue_dtor);
+ pthread_mutex_init(&queue->exclusive_lock, NULL);
pthread_mutex_init(&queue->lock, NULL);
pthread_cond_init(&queue->cond, NULL);
return queue;
@@ -166,7 +174,7 @@ void mp_dispatch_queue_process(struct mp_dispatch_queue *queue, double timeout)
// Wake up thread which called mp_dispatch_suspend().
pthread_cond_broadcast(&queue->cond);
while (queue->head || queue->suspend_requested) {
- if (queue->head && !queue->locked) {
+ if (queue->head) {
struct mp_dispatch_item *item = queue->head;
queue->head = item->next;
if (!queue->head)
@@ -174,11 +182,13 @@ void mp_dispatch_queue_process(struct mp_dispatch_queue *queue, double timeout)
item->next = NULL;
// Unlock, because we want to allow other threads to queue items
// while the dispatch item is processed.
- queue->locked = true;
+ // At the same time, exclusive_lock must be held to protect the
+ // thread's user state.
pthread_mutex_unlock(&queue->lock);
+ pthread_mutex_lock(&queue->exclusive_lock);
item->fn(item->fn_data);
+ pthread_mutex_unlock(&queue->exclusive_lock);
pthread_mutex_lock(&queue->lock);
- queue->locked = false;
if (item->asynchronous) {
talloc_free(item);
} else {
@@ -187,8 +197,6 @@ void mp_dispatch_queue_process(struct mp_dispatch_queue *queue, double timeout)
pthread_cond_broadcast(&queue->cond);
}
} else {
- if (!queue->locked && queue->lock_requested)
- pthread_cond_broadcast(&queue->cond);
pthread_cond_wait(&queue->cond, &queue->lock);
}
}
@@ -236,38 +244,13 @@ void mp_dispatch_resume(struct mp_dispatch_queue *queue)
// and the mutex behavior applies to this function only.
void mp_dispatch_lock(struct mp_dispatch_queue *queue)
{
- // TODO: acquiring a lock should probably be serialized with other
- // dispatch items to guarantee minimum fairness.
- pthread_mutex_lock(&queue->lock);
- queue->suspend_requested++;
- queue->lock_requested++;
- while (1) {
- if (queue->suspended && !queue->locked) {
- queue->locked = true;
- break;
- }
- if (!queue->suspended) {
- pthread_mutex_unlock(&queue->lock);
- if (queue->wakeup_fn)
- queue->wakeup_fn(queue->wakeup_ctx);
- pthread_mutex_lock(&queue->lock);
- if (queue->suspended)
- continue;
- }
- pthread_cond_wait(&queue->cond, &queue->lock);
- }
- queue->lock_requested--;
- pthread_mutex_unlock(&queue->lock);
+ mp_dispatch_suspend(queue);
+ pthread_mutex_lock(&queue->exclusive_lock);
}
// Undo mp_dispatch_lock().
void mp_dispatch_unlock(struct mp_dispatch_queue *queue)
{
- pthread_mutex_lock(&queue->lock);
- assert(queue->locked);
- assert(queue->suspend_requested > 0);
- queue->locked = false;
- queue->suspend_requested--;
- pthread_cond_broadcast(&queue->cond);
- pthread_mutex_unlock(&queue->lock);
+ pthread_mutex_unlock(&queue->exclusive_lock);
+ mp_dispatch_resume(queue);
}