diff options
Diffstat (limited to 'misc/dispatch.c')
-rw-r--r-- | misc/dispatch.c | 120 |
1 files changed, 70 insertions, 50 deletions
diff --git a/misc/dispatch.c b/misc/dispatch.c index 1102c564ae..6fd9fe1a43 100644 --- a/misc/dispatch.c +++ b/misc/dispatch.c @@ -26,10 +26,12 @@ struct mp_dispatch_queue { struct mp_dispatch_item *head, *tail; - pthread_mutex_t lock; - pthread_cond_t cond; + mp_mutex lock; + mp_cond cond; void (*wakeup_fn)(void *wakeup_ctx); void *wakeup_ctx; + void (*onlock_fn)(void *onlock_ctx); + void *onlock_ctx; // Time at which mp_dispatch_queue_process() should return. int64_t wait; // Make mp_dispatch_queue_process() exit if it's idle. @@ -37,7 +39,7 @@ struct mp_dispatch_queue { // The target thread is in mp_dispatch_queue_process() (and either idling, // locked, or running a dispatch callback). bool in_process; - pthread_t in_process_thread; + mp_thread_id in_process_thread_id; // The target thread is in mp_dispatch_queue_process(), and currently // something has exclusive access to it (e.g. running a dispatch callback, // or a different thread got it with mp_dispatch_lock()). @@ -46,7 +48,7 @@ struct mp_dispatch_queue { size_t lock_requests; // locked==true is due to a mp_dispatch_lock() call (for debugging). bool locked_explicit; - pthread_t locked_explicit_thread; + mp_thread_id locked_explicit_thread_id; }; struct mp_dispatch_item { @@ -65,8 +67,8 @@ static void queue_dtor(void *p) assert(!queue->in_process); assert(!queue->lock_requests); assert(!queue->locked); - pthread_cond_destroy(&queue->cond); - pthread_mutex_destroy(&queue->lock); + mp_cond_destroy(&queue->cond); + mp_mutex_destroy(&queue->lock); } // A dispatch queue lets other threads run callbacks in a target thread. @@ -74,7 +76,7 @@ static void queue_dtor(void *p) // Free the dispatch queue with talloc_free(). At the time of destruction, // the queue must be empty. The easiest way to guarantee this is to // terminate all potential senders, then call mp_dispatch_run() with a -// function that e.g. makes the target thread exit, then pthread_join() the +// function that e.g. makes the target thread exit, then mp_thread_join() the // target thread, and finally destroy the queue. Another way is calling // mp_dispatch_queue_process() after terminating all potential senders, and // then destroying the queue. @@ -83,8 +85,8 @@ struct mp_dispatch_queue *mp_dispatch_create(void *ta_parent) struct mp_dispatch_queue *queue = talloc_ptrtype(ta_parent, queue); *queue = (struct mp_dispatch_queue){0}; talloc_set_destructor(queue, queue_dtor); - pthread_mutex_init(&queue->lock, NULL); - pthread_cond_init(&queue->cond, NULL); + mp_mutex_init(&queue->lock); + mp_cond_init(&queue->cond); return queue; } @@ -94,8 +96,9 @@ struct mp_dispatch_queue *mp_dispatch_create(void *ta_parent) // the wakeup_fn could for example write a byte into a "wakeup" pipe in order // to unblock the select(). The wakeup_fn is called from the dispatch queue // when there are new dispatch items, and the target thread should then enter -// mp_dispatch_queue_process() as soon as possible. Note that wakeup_fn is -// called under no lock, so you might have to do synchronization yourself. +// mp_dispatch_queue_process() as soon as possible. +// Note that this setter does not do internal synchronization, so you must set +// it before other threads see it. void mp_dispatch_set_wakeup_fn(struct mp_dispatch_queue *queue, void (*wakeup_fn)(void *wakeup_ctx), void *wakeup_ctx) @@ -104,17 +107,33 @@ void mp_dispatch_set_wakeup_fn(struct mp_dispatch_queue *queue, queue->wakeup_ctx = wakeup_ctx; } +// Set a function that will be called by mp_dispatch_lock() if the target thread +// is not calling mp_dispatch_queue_process() right now. This is an obscure, +// optional mechanism to make a worker thread react to external events more +// quickly. The idea is that the callback will make the worker thread to stop +// doing whatever (e.g. by setting a flag), and call mp_dispatch_queue_process() +// in order to let mp_dispatch_lock() calls continue sooner. +// Like wakeup_fn, this setter does no internal synchronization, and you must +// not access the dispatch queue itself from the callback. +void mp_dispatch_set_onlock_fn(struct mp_dispatch_queue *queue, + void (*onlock_fn)(void *onlock_ctx), + void *onlock_ctx) +{ + queue->onlock_fn = onlock_fn; + queue->onlock_ctx = onlock_ctx; +} + static void mp_dispatch_append(struct mp_dispatch_queue *queue, struct mp_dispatch_item *item) { - pthread_mutex_lock(&queue->lock); + mp_mutex_lock(&queue->lock); if (item->mergeable) { for (struct mp_dispatch_item *cur = queue->head; cur; cur = cur->next) { if (cur->mergeable && cur->fn == item->fn && cur->fn_data == item->fn_data) { talloc_free(item); - pthread_mutex_unlock(&queue->lock); + mp_mutex_unlock(&queue->lock); return; } } @@ -129,12 +148,12 @@ static void mp_dispatch_append(struct mp_dispatch_queue *queue, // Wake up the main thread; note that other threads might wait on this // condition for reasons, so broadcast the condition. - pthread_cond_broadcast(&queue->cond); + mp_cond_broadcast(&queue->cond); // No wakeup callback -> assume mp_dispatch_queue_process() needs to be // interrupted instead. if (!queue->wakeup_fn) queue->interrupted = true; - pthread_mutex_unlock(&queue->lock); + mp_mutex_unlock(&queue->lock); if (queue->wakeup_fn) queue->wakeup_fn(queue->wakeup_ctx); @@ -199,7 +218,7 @@ void mp_dispatch_enqueue_notify(struct mp_dispatch_queue *queue, void mp_dispatch_cancel_fn(struct mp_dispatch_queue *queue, mp_dispatch_fn fn, void *fn_data) { - pthread_mutex_lock(&queue->lock); + mp_mutex_lock(&queue->lock); struct mp_dispatch_item **pcur = &queue->head; queue->tail = NULL; while (*pcur) { @@ -212,7 +231,7 @@ void mp_dispatch_cancel_fn(struct mp_dispatch_queue *queue, pcur = &cur->next; } } - pthread_mutex_unlock(&queue->lock); + mp_mutex_unlock(&queue->lock); } // Run fn(fn_data) on the target thread synchronously. This function enqueues @@ -228,10 +247,10 @@ void mp_dispatch_run(struct mp_dispatch_queue *queue, }; mp_dispatch_append(queue, &item); - pthread_mutex_lock(&queue->lock); + mp_mutex_lock(&queue->lock); while (!item.completed) - pthread_cond_wait(&queue->cond, &queue->lock); - pthread_mutex_unlock(&queue->lock); + mp_cond_wait(&queue->cond, &queue->lock); + mp_mutex_unlock(&queue->lock); } // Process any outstanding dispatch items in the queue. This also handles @@ -252,18 +271,18 @@ void mp_dispatch_run(struct mp_dispatch_queue *queue, // no enqueued callback can call the lock/unlock functions). void mp_dispatch_queue_process(struct mp_dispatch_queue *queue, double timeout) { - pthread_mutex_lock(&queue->lock); - queue->wait = timeout > 0 ? mp_add_timeout(mp_time_us(), timeout) : 0; + mp_mutex_lock(&queue->lock); + queue->wait = timeout > 0 ? mp_time_ns_add(mp_time_ns(), timeout) : 0; assert(!queue->in_process); // recursion not allowed queue->in_process = true; - queue->in_process_thread = pthread_self(); + queue->in_process_thread_id = mp_thread_current_id(); // Wake up thread which called mp_dispatch_lock(). if (queue->lock_requests) - pthread_cond_broadcast(&queue->cond); + mp_cond_broadcast(&queue->cond); while (1) { if (queue->lock_requests) { // Block due to something having called mp_dispatch_lock(). - pthread_cond_wait(&queue->cond, &queue->lock); + mp_cond_wait(&queue->cond, &queue->lock); } else if (queue->head) { struct mp_dispatch_item *item = queue->head; queue->head = item->next; @@ -276,23 +295,22 @@ void mp_dispatch_queue_process(struct mp_dispatch_queue *queue, double timeout) // from mp_dispatch_lock(), which is done by locked=true. assert(!queue->locked); queue->locked = true; - pthread_mutex_unlock(&queue->lock); + mp_mutex_unlock(&queue->lock); item->fn(item->fn_data); - pthread_mutex_lock(&queue->lock); + mp_mutex_lock(&queue->lock); assert(queue->locked); queue->locked = false; // Wakeup mp_dispatch_run(), also mp_dispatch_lock(). - pthread_cond_broadcast(&queue->cond); + mp_cond_broadcast(&queue->cond); if (item->asynchronous) { talloc_free(item); } else { item->completed = true; } } else if (queue->wait > 0 && !queue->interrupted) { - struct timespec ts = mp_time_us_to_timespec(queue->wait); - if (pthread_cond_timedwait(&queue->cond, &queue->lock, &ts)) + if (mp_cond_timedwait_until(&queue->cond, &queue->lock, queue->wait)) queue->wait = 0; } else { break; @@ -301,7 +319,7 @@ void mp_dispatch_queue_process(struct mp_dispatch_queue *queue, double timeout) assert(!queue->locked); queue->in_process = false; queue->interrupted = false; - pthread_mutex_unlock(&queue->lock); + mp_mutex_unlock(&queue->lock); } // If the queue is inside of mp_dispatch_queue_process(), make it return as @@ -312,10 +330,10 @@ void mp_dispatch_queue_process(struct mp_dispatch_queue *queue, double timeout) // wakeup the main thread from another thread in a race free way). void mp_dispatch_interrupt(struct mp_dispatch_queue *queue) { - pthread_mutex_lock(&queue->lock); + mp_mutex_lock(&queue->lock); queue->interrupted = true; - pthread_cond_broadcast(&queue->cond); - pthread_mutex_unlock(&queue->lock); + mp_cond_broadcast(&queue->cond); + mp_mutex_unlock(&queue->lock); } // If a mp_dispatch_queue_process() call is in progress, then adjust the maximum @@ -328,12 +346,12 @@ void mp_dispatch_interrupt(struct mp_dispatch_queue *queue) // to wait in external APIs. void mp_dispatch_adjust_timeout(struct mp_dispatch_queue *queue, int64_t until) { - pthread_mutex_lock(&queue->lock); + mp_mutex_lock(&queue->lock); if (queue->in_process && queue->wait > until) { queue->wait = until; - pthread_cond_broadcast(&queue->cond); + mp_cond_broadcast(&queue->cond); } - pthread_mutex_unlock(&queue->lock); + mp_mutex_unlock(&queue->lock); } // Grant exclusive access to the target thread's state. While this is active, @@ -345,53 +363,55 @@ void mp_dispatch_adjust_timeout(struct mp_dispatch_queue *queue, int64_t until) // already holding the dispatch lock. void mp_dispatch_lock(struct mp_dispatch_queue *queue) { - pthread_mutex_lock(&queue->lock); + mp_mutex_lock(&queue->lock); // Must not be called recursively from dispatched callbacks. if (queue->in_process) - assert(!pthread_equal(queue->in_process_thread, pthread_self())); + assert(!mp_thread_id_equal(queue->in_process_thread_id, mp_thread_current_id())); // Must not be called recursively at all. if (queue->locked_explicit) - assert(!pthread_equal(queue->locked_explicit_thread, pthread_self())); + assert(!mp_thread_id_equal(queue->locked_explicit_thread_id, mp_thread_current_id())); queue->lock_requests += 1; // And now wait until the target thread gets "trapped" within the // mp_dispatch_queue_process() call, which will mean we get exclusive // access to the target's thread state. + if (queue->onlock_fn) + queue->onlock_fn(queue->onlock_ctx); while (!queue->in_process) { - pthread_mutex_unlock(&queue->lock); + mp_mutex_unlock(&queue->lock); if (queue->wakeup_fn) queue->wakeup_fn(queue->wakeup_ctx); - pthread_mutex_lock(&queue->lock); + mp_mutex_lock(&queue->lock); if (queue->in_process) break; - pthread_cond_wait(&queue->cond, &queue->lock); + mp_cond_wait(&queue->cond, &queue->lock); } // Wait until we can get the lock. while (!queue->in_process || queue->locked) - pthread_cond_wait(&queue->cond, &queue->lock); + mp_cond_wait(&queue->cond, &queue->lock); // "Lock". assert(queue->lock_requests); assert(!queue->locked); assert(!queue->locked_explicit); queue->locked = true; queue->locked_explicit = true; - queue->locked_explicit_thread = pthread_self(); - pthread_mutex_unlock(&queue->lock); + queue->locked_explicit_thread_id = mp_thread_current_id(); + mp_mutex_unlock(&queue->lock); } // Undo mp_dispatch_lock(). void mp_dispatch_unlock(struct mp_dispatch_queue *queue) { - pthread_mutex_lock(&queue->lock); + mp_mutex_lock(&queue->lock); assert(queue->locked); // Must be called after a mp_dispatch_lock(), from the same thread. assert(queue->locked_explicit); - assert(pthread_equal(queue->locked_explicit_thread, pthread_self())); + assert(mp_thread_id_equal(queue->locked_explicit_thread_id, mp_thread_current_id())); // "Unlock". queue->locked = false; queue->locked_explicit = false; queue->lock_requests -= 1; // Wakeup mp_dispatch_queue_process(), and maybe other mp_dispatch_lock()s. // (Would be nice to wake up only 1 other locker if lock_requests>0.) - pthread_cond_broadcast(&queue->cond); - pthread_mutex_unlock(&queue->lock); + mp_cond_broadcast(&queue->cond); + mp_mutex_unlock(&queue->lock); } |