diff options
-rw-r--r-- | misc/dispatch.c | 251 | ||||
-rw-r--r-- | misc/dispatch.h | 23 | ||||
-rw-r--r-- | old-makefile | 1 | ||||
-rw-r--r-- | osdep/threads.c | 228 | ||||
-rw-r--r-- | osdep/threads.h | 21 | ||||
-rw-r--r-- | player/client.c | 1 | ||||
-rw-r--r-- | player/main.c | 2 | ||||
-rw-r--r-- | player/playloop.c | 2 | ||||
-rw-r--r-- | wscript_build.py | 3 |
9 files changed, 280 insertions, 252 deletions
diff --git a/misc/dispatch.c b/misc/dispatch.c new file mode 100644 index 0000000000..69192e8514 --- /dev/null +++ b/misc/dispatch.c @@ -0,0 +1,251 @@ +/* + * This file is part of mpv. + * + * mpv is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * mpv is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with mpv. If not, see <http://www.gnu.org/licenses/>. + */ + +#include <stdbool.h> +#include <assert.h> + +#include "common/common.h" +#include "osdep/threads.h" + +#include "dispatch.h" + +struct mp_dispatch_queue { + struct mp_dispatch_item *head, *tail; + pthread_mutex_t lock; + pthread_cond_t cond; + int suspend_requested; + bool suspended; + bool locked; + void (*wakeup_fn)(void *wakeup_ctx); + void *wakeup_ctx; +}; + +struct mp_dispatch_item { + mp_dispatch_fn fn; + void *fn_data; + bool asynchronous; + bool completed; + struct mp_dispatch_item *next; +}; + +static void queue_dtor(void *p) +{ + struct mp_dispatch_queue *queue = p; + assert(!queue->head); + assert(!queue->suspend_requested); + assert(!queue->suspended); + assert(!queue->locked); + pthread_cond_destroy(&queue->cond); + pthread_mutex_destroy(&queue->lock); +} + +// A dispatch queue lets other threads runs callbacks in a target thread. +// The target thread is the thread which created the queue and which calls +// mp_dispatch_queue_process(). +// Free the dispatch queue with talloc_free(). (It must be empty.) +struct mp_dispatch_queue *mp_dispatch_create(void *talloc_parent) +{ + struct mp_dispatch_queue *queue = talloc_ptrtype(talloc_parent, queue); + *queue = (struct mp_dispatch_queue){0}; + talloc_set_destructor(queue, queue_dtor); + pthread_mutex_init(&queue->lock, NULL); + pthread_cond_init(&queue->cond, NULL); + return queue; +} + +// Set a custom function that should be called to guarantee that the target +// thread wakes up. This is intended for use with code that needs to block +// on non-pthread primitives, such as e.g. select(). In the case of select(), +// the wakeup_fn could for example write a byte into a "wakeup" pipe in order +// to unblock the select(). The wakeup_fn is called from the dispatch queue +// when there are new dispatch items, and the target thread should then enter +// mp_dispatch_queue_process() as soon as possible. Note that wakeup_fn is +// called under no lock, so you might have to do synchronization yourself. +void mp_dispatch_set_wakeup_fn(struct mp_dispatch_queue *queue, + void (*wakeup_fn)(void *wakeup_ctx), + void *wakeup_ctx) +{ + queue->wakeup_fn = wakeup_fn; + queue->wakeup_ctx = wakeup_ctx; +} + +static void mp_dispatch_append(struct mp_dispatch_queue *queue, + struct mp_dispatch_item *item) +{ + pthread_mutex_lock(&queue->lock); + if (queue->tail) { + queue->tail->next = item; + } else { + queue->head = item; + } + queue->tail = item; + // Wake up the main thread; note that other threads might wait on this + // condition for reasons, so broadcast the condition. + pthread_cond_broadcast(&queue->cond); + pthread_mutex_unlock(&queue->lock); + if (queue->wakeup_fn) + queue->wakeup_fn(queue->wakeup_ctx); +} + +// Let the dispatch item process asynchronously. item->fn will be run in the +// target thread's context, but note that mp_dispatch_enqueue() will usually +// return long before that happens. It's up to the user to signal completion +// of the callback. It's also up to the user to guarantee that the context +// (fn_data) has correct lifetime, i.e. lives until the callback is run, and +// is freed after that. +void mp_dispatch_enqueue(struct mp_dispatch_queue *queue, + mp_dispatch_fn fn, void *fn_data) +{ + struct mp_dispatch_item *item = talloc_ptrtype(NULL, item); + *item = (struct mp_dispatch_item){ + .fn = fn, + .fn_data = fn_data, + .asynchronous = true, + }; + mp_dispatch_append(queue, item); +} + +// Like mp_dispatch_enqueue(), but the queue code will call talloc_free(fn_data) +// after the fn callback has been run. (The callback could trivially do that +// itself, but it makes it easier to implement synchronous and asynchronous +// requests with the same callback implementation.) +void mp_dispatch_enqueue_autofree(struct mp_dispatch_queue *queue, + mp_dispatch_fn fn, void *fn_data) +{ + struct mp_dispatch_item *item = talloc_ptrtype(NULL, item); + *item = (struct mp_dispatch_item){ + .fn = fn, + .fn_data = talloc_steal(item, fn_data), + .asynchronous = true, + }; + mp_dispatch_append(queue, item); +} + +// Run the dispatch item synchronously. item->fn will be run in the target +// thread's context, and this function will wait until it's done. +void mp_dispatch_run(struct mp_dispatch_queue *queue, + mp_dispatch_fn fn, void *fn_data) +{ + struct mp_dispatch_item item = { + .fn = fn, + .fn_data = fn_data, + }; + mp_dispatch_append(queue, &item); + + pthread_mutex_lock(&queue->lock); + while (!item.completed) + pthread_cond_wait(&queue->cond, &queue->lock); + pthread_mutex_unlock(&queue->lock); +} + +// Process any outstanding dispatch items in the queue. This also handles +// suspending or locking the target thread. +// The timeout specifies the maximum wait time, but the actual time spent in +// this function can be much higher if the suspending/locking functions are +// used, or if executing the dispatch items takes time. +// TODO: implement timeout +void mp_dispatch_queue_process(struct mp_dispatch_queue *queue, double timeout) +{ + pthread_mutex_lock(&queue->lock); + queue->suspended = true; + // Wake up thread which called mp_dispatch_suspend(). + pthread_cond_broadcast(&queue->cond); + while (queue->head || queue->suspend_requested) { + if (queue->head && !queue->locked) { + struct mp_dispatch_item *item = queue->head; + queue->head = item->next; + if (!queue->head) + queue->tail = NULL; + item->next = NULL; + // Unlock, because we want to allow other threads to queue items + // while the dispatch item is processed. + pthread_mutex_unlock(&queue->lock); + item->fn(item->fn_data); + pthread_mutex_lock(&queue->lock); + if (item->asynchronous) { + talloc_free(item); + } else { + item->completed = true; + // Wakeup mp_dispatch_run() + pthread_cond_broadcast(&queue->cond); + } + } else { + pthread_cond_wait(&queue->cond, &queue->lock); + } + } + queue->suspended = false; + pthread_mutex_unlock(&queue->lock); +} + +// Set the target thread into suspend mode: in this mode, the thread will enter +// mp_dispatch_queue_process(), process any outstanding dispatch items, and +// wait for new items when done (instead of exiting the process function). +// Multiple threads can enter suspend mode at the same time. Suspend mode is +// not a synchronization mechanism; it merely makes sure the target thread does +// not leave mp_dispatch_queue_process(), even if it's done. mp_dispatch_lock() +// can be used for exclusive access. +void mp_dispatch_suspend(struct mp_dispatch_queue *queue) +{ + pthread_mutex_lock(&queue->lock); + queue->suspend_requested++; + while (!queue->suspended) + pthread_cond_wait(&queue->cond, &queue->lock); + pthread_mutex_unlock(&queue->lock); +} + +// Undo mp_dispatch_suspend(). +void mp_dispatch_resume(struct mp_dispatch_queue *queue) +{ + pthread_mutex_lock(&queue->lock); + assert(queue->suspend_requested > 0); + queue->suspend_requested--; + pthread_cond_broadcast(&queue->cond); + pthread_mutex_unlock(&queue->lock); +} + +// Grant exclusive access to the target thread's state. While this is active, +// no other thread can return from mp_dispatch_lock() (i.e. it behaves like +// a pthread mutex), and no other thread can get dispatch items completed. +// Other threads can still queue asynchronous dispatch items without waiting, +// and the mutex behavior applies to this function only. +void mp_dispatch_lock(struct mp_dispatch_queue *queue) +{ + // TODO: acquiring a lock should probably be serialized with other + // dispatch items to guarantee minimum fairness. + pthread_mutex_lock(&queue->lock); + queue->suspend_requested++; + while (1) { + if (queue->suspended && !queue->locked) { + queue->locked = true; + break; + } + pthread_cond_wait(&queue->cond, &queue->lock); + } + pthread_mutex_unlock(&queue->lock); +} + +// Undo mp_dispatch_lock(). +void mp_dispatch_unlock(struct mp_dispatch_queue *queue) +{ + pthread_mutex_lock(&queue->lock); + assert(queue->locked); + assert(queue->suspend_requested > 0); + queue->locked = false; + queue->suspend_requested--; + pthread_cond_broadcast(&queue->cond); + pthread_mutex_unlock(&queue->lock); +} diff --git a/misc/dispatch.h b/misc/dispatch.h new file mode 100644 index 0000000000..96a5d7cf4b --- /dev/null +++ b/misc/dispatch.h @@ -0,0 +1,23 @@ +#ifndef MP_DISPATCH_H_ +#define MP_DISPATCH_H_ + +typedef void (*mp_dispatch_fn)(void *data); +struct mp_dispatch_queue; + +struct mp_dispatch_queue *mp_dispatch_create(void *talloc_parent); +void mp_dispatch_set_wakeup_fn(struct mp_dispatch_queue *queue, + void (*wakeup_fn)(void *wakeup_ctx), + void *wakeup_ctx); +void mp_dispatch_enqueue(struct mp_dispatch_queue *queue, + mp_dispatch_fn fn, void *fn_data); +void mp_dispatch_enqueue_autofree(struct mp_dispatch_queue *queue, + mp_dispatch_fn fn, void *fn_data); +void mp_dispatch_run(struct mp_dispatch_queue *queue, + mp_dispatch_fn fn, void *fn_data); +void mp_dispatch_queue_process(struct mp_dispatch_queue *queue, double timeout); +void mp_dispatch_suspend(struct mp_dispatch_queue *queue); +void mp_dispatch_resume(struct mp_dispatch_queue *queue); +void mp_dispatch_lock(struct mp_dispatch_queue *queue); +void mp_dispatch_unlock(struct mp_dispatch_queue *queue); + +#endif diff --git a/old-makefile b/old-makefile index 8267b81322..61dce6dc46 100644 --- a/old-makefile +++ b/old-makefile @@ -202,6 +202,7 @@ SOURCES = audio/audio.c \ input/input.c \ input/keycodes.c \ misc/charset_conv.c \ + misc/dispatch.c \ misc/ring.c \ options/m_config.c \ options/m_option.c \ diff --git a/osdep/threads.c b/osdep/threads.c index 2beb67df14..6b19f83f1d 100644 --- a/osdep/threads.c +++ b/osdep/threads.c @@ -19,7 +19,6 @@ #include <unistd.h> #include <sys/time.h> #include <limits.h> -#include <assert.h> #include "common/common.h" #include "threads.h" @@ -80,230 +79,3 @@ int mpthread_mutex_init_recursive(pthread_mutex_t *mutex) pthread_mutexattr_destroy(&attr); return r; } - -struct mp_dispatch_queue { - struct mp_dispatch_item *head, *tail; - pthread_mutex_t lock; - pthread_cond_t cond; - int suspend_requested; - bool suspended; - bool locked; - void (*wakeup_fn)(void *wakeup_ctx); - void *wakeup_ctx; -}; - -struct mp_dispatch_item { - mp_dispatch_fn fn; - void *fn_data; - bool asynchronous; - bool completed; - struct mp_dispatch_item *next; -}; - -static void queue_dtor(void *p) -{ - struct mp_dispatch_queue *queue = p; - assert(!queue->head); - assert(!queue->suspend_requested); - assert(!queue->suspended); - assert(!queue->locked); - pthread_cond_destroy(&queue->cond); - pthread_mutex_destroy(&queue->lock); -} - -// A dispatch queue lets other threads runs callbacks in s target thread. -// The target thread is the thread which created the queue and which calls -// mp_dispatch_queue_process(). -// Free the dispatch queue with talloc_free(). (It must be empty.) -struct mp_dispatch_queue *mp_dispatch_create(void *talloc_parent) -{ - struct mp_dispatch_queue *queue = talloc_ptrtype(talloc_parent, queue); - *queue = (struct mp_dispatch_queue){0}; - talloc_set_destructor(queue, queue_dtor); - pthread_mutex_init(&queue->lock, NULL); - pthread_cond_init(&queue->cond, NULL); - return queue; -} - -// Set a custom function that should be called to guarantee that the target -// thread wakes up. This is intended for use with code that needs to block -// on non-pthread primitives, such as e.g. select(). In the case of select(), -// the wakeup_fn could for example write a byte into a "wakeup" pipe in order -// to unblock the select(). The wakeup_fn is called from the dispatch queue -// when there are new dispatch items, and the target thread should then enter -// mp_dispatch_queue_process() as soon as possible. Note that wakeup_fn is -// called under no lock, so you might have to do synchronization yourself. -void mp_dispatch_set_wakeup_fn(struct mp_dispatch_queue *queue, - void (*wakeup_fn)(void *wakeup_ctx), - void *wakeup_ctx) -{ - queue->wakeup_fn = wakeup_fn; - queue->wakeup_ctx = wakeup_ctx; -} - -static void mp_dispatch_append(struct mp_dispatch_queue *queue, - struct mp_dispatch_item *item) -{ - pthread_mutex_lock(&queue->lock); - if (queue->tail) { - queue->tail->next = item; - } else { - queue->head = item; - } - queue->tail = item; - // Wake up the main thread; note that other threads might wait on this - // condition for reasons, so broadcast the condition. - pthread_cond_broadcast(&queue->cond); - pthread_mutex_unlock(&queue->lock); - if (queue->wakeup_fn) - queue->wakeup_fn(queue->wakeup_ctx); -} - -// Let the dispatch item process asynchronously. item->fn will be run in the -// target thread's context, but note that mp_dispatch_enqueue() will usually -// return long before that happens. It's up to the user to signal completion -// of the callback. It's also up to the user to guarantee that the context -// (fn_data) has correct lifetime, i.e. lives until the callback is run, and -// is freed after that. -void mp_dispatch_enqueue(struct mp_dispatch_queue *queue, - mp_dispatch_fn fn, void *fn_data) -{ - struct mp_dispatch_item *item = talloc_ptrtype(NULL, item); - *item = (struct mp_dispatch_item){ - .fn = fn, - .fn_data = fn_data, - .asynchronous = true, - }; - mp_dispatch_append(queue, item); -} - -// Like mp_dispatch_enqueue(), but the queue code will call talloc_free(fn_data) -// after the fn callback has been run. (The callback could trivially do that -// itself, but it makes it easier to implement synchronous and asynchronous -// requests with the same callback implementation.) -void mp_dispatch_enqueue_autofree(struct mp_dispatch_queue *queue, - mp_dispatch_fn fn, void *fn_data) -{ - struct mp_dispatch_item *item = talloc_ptrtype(NULL, item); - *item = (struct mp_dispatch_item){ - .fn = fn, - .fn_data = talloc_steal(item, fn_data), - .asynchronous = true, - }; - mp_dispatch_append(queue, item); -} - -// Run the dispatch item synchronously. item->fn will be run in the target -// thread's context, and this function will wait until it's done. -void mp_dispatch_run(struct mp_dispatch_queue *queue, - mp_dispatch_fn fn, void *fn_data) -{ - struct mp_dispatch_item item = { - .fn = fn, - .fn_data = fn_data, - }; - mp_dispatch_append(queue, &item); - - pthread_mutex_lock(&queue->lock); - while (!item.completed) - pthread_cond_wait(&queue->cond, &queue->lock); - pthread_mutex_unlock(&queue->lock); -} - -// Process any outstanding dispatch items in the queue. This also handles -// suspending or locking the target thread. -// The timeout specifies the maximum wait time, but the actual time spent in -// this function can be much higher if the suspending/locking functions are -// used, or if executing the dispatch items takes time. -// TODO: implement timeout -void mp_dispatch_queue_process(struct mp_dispatch_queue *queue, double timeout) -{ - pthread_mutex_lock(&queue->lock); - queue->suspended = true; - // Wake up thread which called mp_dispatch_suspend(). - pthread_cond_broadcast(&queue->cond); - while (queue->head || queue->suspend_requested) { - if (queue->head && !queue->locked) { - struct mp_dispatch_item *item = queue->head; - queue->head = item->next; - if (!queue->head) - queue->tail = NULL; - item->next = NULL; - // Unlock, because we want to allow other threads to queue items - // while the dispatch item is processed. - pthread_mutex_unlock(&queue->lock); - item->fn(item->fn_data); - pthread_mutex_lock(&queue->lock); - if (item->asynchronous) { - talloc_free(item); - } else { - item->completed = true; - // Wakeup mp_dispatch_run() - pthread_cond_broadcast(&queue->cond); - } - } else { - pthread_cond_wait(&queue->cond, &queue->lock); - } - } - queue->suspended = false; - pthread_mutex_unlock(&queue->lock); -} - -// Set the target thread into suspend mode: in this mode, the thread will enter -// mp_dispatch_queue_process(), process any outstanding dispatch items, and -// wait for new items when done (instead of exiting the process function). -// Multiple threads can enter suspend mode at the same time. Suspend mode is -// not a synchronization mechanism; it merely makes sure the target thread does -// not leave mp_dispatch_queue_process(), even if it's done. mp_dispatch_lock() -// can be used for exclusive access. -void mp_dispatch_suspend(struct mp_dispatch_queue *queue) -{ - pthread_mutex_lock(&queue->lock); - queue->suspend_requested++; - while (!queue->suspended) - pthread_cond_wait(&queue->cond, &queue->lock); - pthread_mutex_unlock(&queue->lock); -} - -// Undo mp_dispatch_suspend(). -void mp_dispatch_resume(struct mp_dispatch_queue *queue) -{ - pthread_mutex_lock(&queue->lock); - assert(queue->suspend_requested > 0); - queue->suspend_requested--; - pthread_cond_broadcast(&queue->cond); - pthread_mutex_unlock(&queue->lock); -} - -// Grant exclusive access to the target thread's state. While this is active, -// no other thread can return from mp_dispatch_lock() (i.e. it behaves like -// a pthread mutex), and no other thread can get dispatch items completed. -// Other threads can still queue asynchronous dispatch items without waiting, -// and the mutex behavior applies to this function only. -void mp_dispatch_lock(struct mp_dispatch_queue *queue) -{ - // TODO: acquiring a lock should probably be serialized with other - // dispatch items to guarantee minimum fairness. - pthread_mutex_lock(&queue->lock); - queue->suspend_requested++; - while (1) { - if (queue->suspended && !queue->locked) { - queue->locked = true; - break; - } - pthread_cond_wait(&queue->cond, &queue->lock); - } - pthread_mutex_unlock(&queue->lock); -} - -// Undo mp_dispatch_lock(). -void mp_dispatch_unlock(struct mp_dispatch_queue *queue) -{ - pthread_mutex_lock(&queue->lock); - assert(queue->locked); - assert(queue->suspend_requested > 0); - queue->locked = false; - queue->suspend_requested--; - pthread_cond_broadcast(&queue->cond); - pthread_mutex_unlock(&queue->lock); -} diff --git a/osdep/threads.h b/osdep/threads.h index 3eb4ce2ffc..3d060009cb 100644 --- a/osdep/threads.h +++ b/osdep/threads.h @@ -1,7 +1,6 @@ #ifndef MP_OSDEP_THREADS_H_ #define MP_OSDEP_THREADS_H_ -#include <stdbool.h> #include <pthread.h> struct timespec mpthread_get_deadline(double timeout); @@ -11,24 +10,4 @@ int mpthread_cond_timed_wait(pthread_cond_t *cond, pthread_mutex_t *mutex, int mpthread_mutex_init_recursive(pthread_mutex_t *mutex); - -typedef void (*mp_dispatch_fn)(void *data); -struct mp_dispatch_queue; - -struct mp_dispatch_queue *mp_dispatch_create(void *talloc_parent); -void mp_dispatch_set_wakeup_fn(struct mp_dispatch_queue *queue, - void (*wakeup_fn)(void *wakeup_ctx), - void *wakeup_ctx); -void mp_dispatch_enqueue(struct mp_dispatch_queue *queue, - mp_dispatch_fn fn, void *fn_data); -void mp_dispatch_enqueue_autofree(struct mp_dispatch_queue *queue, - mp_dispatch_fn fn, void *fn_data); -void mp_dispatch_run(struct mp_dispatch_queue *queue, - mp_dispatch_fn fn, void *fn_data); -void mp_dispatch_queue_process(struct mp_dispatch_queue *queue, double timeout); -void mp_dispatch_suspend(struct mp_dispatch_queue *queue); -void mp_dispatch_resume(struct mp_dispatch_queue *queue); -void mp_dispatch_lock(struct mp_dispatch_queue *queue); -void mp_dispatch_unlock(struct mp_dispatch_queue *queue); - #endif diff --git a/player/client.c b/player/client.c index afcff83f9f..083785b0ee 100644 --- a/player/client.c +++ b/player/client.c @@ -23,6 +23,7 @@ #include "common/msg.h" #include "common/msg_control.h" #include "input/input.h" +#include "misc/dispatch.h" #include "options/m_config.h" #include "options/m_option.h" #include "options/m_property.h" diff --git a/player/main.c b/player/main.c index 6217c6af64..1cec6a0324 100644 --- a/player/main.c +++ b/player/main.c @@ -27,11 +27,11 @@ #include "config.h" #include "talloc.h" +#include "misc/dispatch.h" #include "osdep/io.h" #include "osdep/priority.h" #include "osdep/terminal.h" #include "osdep/timer.h" -#include "osdep/threads.h" #include "common/av_log.h" #include "common/codecs.h" diff --git a/player/playloop.c b/player/playloop.c index 204a19241f..d182d090c4 100644 --- a/player/playloop.c +++ b/player/playloop.c @@ -33,8 +33,8 @@ #include "common/playlist.h" #include "input/input.h" +#include "misc/dispatch.h" #include "osdep/terminal.h" -#include "osdep/threads.h" #include "osdep/timer.h" #include "audio/mixer.h" diff --git a/wscript_build.py b/wscript_build.py index 62328c4bef..d9f359842c 100644 --- a/wscript_build.py +++ b/wscript_build.py @@ -190,8 +190,9 @@ def build(ctx): ( "input/lirc.c", "lirc" ), ## Misc - ( "misc/ring.c" ), ( "misc/charset_conv.c" ), + ( "misc/dispatch.c" ), + ( "misc/ring.c" ), ## Options ( "options/m_config.c" ), |