summaryrefslogtreecommitdiffstats
path: root/osdep
diff options
context:
space:
mode:
authorwm4 <wm4@nowhere>2014-02-02 17:06:49 +0100
committerwm4 <wm4@nowhere>2014-02-10 00:04:39 +0100
commit20fbe2fb8c1b3b4f5b08dd529ac1897f24c88a95 (patch)
tree46d609915932125f402061370e17f2f8039244d0 /osdep
parentdd264ebe9d00c8cb22ed4d931d31293ff5b3cece (diff)
downloadmpv-20fbe2fb8c1b3b4f5b08dd529ac1897f24c88a95.tar.bz2
mpv-20fbe2fb8c1b3b4f5b08dd529ac1897f24c88a95.tar.xz
threads: add a dispatch queue thing
Makes working with the (still) single-threaded playback thread easier. Might be reusable for other stuff.
Diffstat (limited to 'osdep')
-rw-r--r--osdep/threads.c229
-rw-r--r--osdep/threads.h21
2 files changed, 250 insertions, 0 deletions
diff --git a/osdep/threads.c b/osdep/threads.c
index dcc3965b1d..0f72599641 100644
--- a/osdep/threads.c
+++ b/osdep/threads.c
@@ -19,7 +19,9 @@
#include <unistd.h>
#include <sys/time.h>
#include <limits.h>
+#include <assert.h>
+#include "common/common.h"
#include "threads.h"
static void get_pthread_time(struct timespec *out_ts)
@@ -78,3 +80,230 @@ int mpthread_mutex_init_recursive(pthread_mutex_t *mutex)
pthread_mutexattr_destroy(&attr);
return r;
}
+
+struct mp_dispatch_queue {
+ struct mp_dispatch_item *head, *tail;
+ pthread_mutex_t lock;
+ pthread_cond_t cond;
+ int suspend_requested;
+ bool suspended;
+ bool locked;
+ void (*wakeup_fn)(void *wakeup_ctx);
+ void *wakeup_ctx;
+};
+
+struct mp_dispatch_item {
+ mp_dispatch_fn fn;
+ void *fn_data;
+ bool asynchronous;
+ bool completed;
+ struct mp_dispatch_item *next;
+};
+
+static void queue_dtor(void *p)
+{
+ struct mp_dispatch_queue *queue = p;
+ assert(!queue->head);
+ assert(!queue->suspend_requested);
+ assert(!queue->suspended);
+ assert(!queue->locked);
+ pthread_cond_destroy(&queue->cond);
+ pthread_mutex_destroy(&queue->lock);
+}
+
+// A dispatch queue lets other threads runs callbacks in s target thread.
+// The target thread is the thread which created the queue and which calls
+// mp_dispatch_queue_process().
+// Free the dispatch queue with talloc_free(). (It must be empty.)
+struct mp_dispatch_queue *mp_dispatch_create(void *talloc_parent)
+{
+ struct mp_dispatch_queue *queue = talloc_ptrtype(talloc_parent, queue);
+ *queue = (struct mp_dispatch_queue){0};
+ talloc_set_destructor(queue, queue_dtor);
+ pthread_mutex_init(&queue->lock, NULL);
+ pthread_cond_init(&queue->cond, NULL);
+ return queue;
+}
+
+// Set a custom function that should be called to guarantee that the target
+// thread wakes up. This is intended for use with code that needs to block
+// on non-pthread primitives, such as e.g. select(). In the case of select(),
+// the wakeup_fn could for example write a byte into a "wakeup" pipe in order
+// to unblock the select(). The wakeup_fn is called from the dispatch queue
+// when there are new dispatch items, and the target thread should then enter
+// mp_dispatch_queue_process() as soon as possible. Note that wakeup_fn is
+// called under no lock, so you might have to do synchronization yourself.
+void mp_dispatch_set_wakeup_fn(struct mp_dispatch_queue *queue,
+ void (*wakeup_fn)(void *wakeup_ctx),
+ void *wakeup_ctx)
+{
+ queue->wakeup_fn = wakeup_fn;
+ queue->wakeup_ctx = wakeup_ctx;
+}
+
+static void mp_dispatch_append(struct mp_dispatch_queue *queue,
+ struct mp_dispatch_item *item)
+{
+ pthread_mutex_lock(&queue->lock);
+ if (queue->tail) {
+ queue->tail->next = item;
+ } else {
+ queue->head = item;
+ }
+ queue->tail = item;
+ // Wake up the main thread; note that other threads might wait on this
+ // condition for reasons, so broadcast the condition.
+ pthread_cond_broadcast(&queue->cond);
+ pthread_mutex_unlock(&queue->lock);
+ if (queue->wakeup_fn)
+ queue->wakeup_fn(queue->wakeup_ctx);
+}
+
+// Let the dispatch item process asynchronously. item->fn will be run in the
+// target thread's context, but note that mp_dispatch_enqueue() will usually
+// return long before that happens. It's up to the user to signal completion
+// of the callback. It's also up to the user to guarantee that the context
+// (fn_data) has correct lifetime, i.e. lives until the callback is run, and
+// is freed after that.
+void mp_dispatch_enqueue(struct mp_dispatch_queue *queue,
+ mp_dispatch_fn fn, void *fn_data)
+{
+ struct mp_dispatch_item *item = talloc_ptrtype(NULL, item);
+ *item = (struct mp_dispatch_item){
+ .fn = fn,
+ .fn_data = fn_data,
+ .asynchronous = true,
+ };
+ mp_dispatch_append(queue, item);
+}
+
+// Like mp_dispatch_enqueue(), but the queue code will call talloc_free(fn_data)
+// after the fn callback has been run. (The callback could trivially do that
+// itself, but it makes it easier to implement synchronous and asynchronous
+// requests with the same callback implementation.)
+void mp_dispatch_enqueue_autofree(struct mp_dispatch_queue *queue,
+ mp_dispatch_fn fn, void *fn_data)
+{
+ struct mp_dispatch_item *item = talloc_ptrtype(NULL, item);
+ *item = (struct mp_dispatch_item){
+ .fn = fn,
+ .fn_data = talloc_steal(item, fn_data),
+ .asynchronous = true,
+ };
+ mp_dispatch_append(queue, item);
+}
+
+// Run the dispatch item synchronously. item->fn will be run in the target
+// thread's context, and this function will wait until it's done.
+void mp_dispatch_run(struct mp_dispatch_queue *queue,
+ mp_dispatch_fn fn, void *fn_data)
+{
+ struct mp_dispatch_item item = {
+ .fn = fn,
+ .fn_data = fn_data,
+ };
+ mp_dispatch_append(queue, &item);
+
+ pthread_mutex_lock(&queue->lock);
+ while (!item.completed)
+ pthread_cond_wait(&queue->cond, &queue->lock);
+ pthread_mutex_unlock(&queue->lock);
+}
+
+// Process any outstanding dispatch items in the queue. This also handles
+// suspending or locking the target thread.
+// The timeout specifies the maximum wait time, but the actual time spent in
+// this function can be much higher if the suspending/locking functions are
+// used, or if executing the dispatch items takes time.
+// TODO: implement timeout
+void mp_dispatch_queue_process(struct mp_dispatch_queue *queue, double timeout)
+{
+ pthread_mutex_lock(&queue->lock);
+ queue->suspended = true;
+ // Wake up thread which called mp_dispatch_suspend().
+ pthread_cond_broadcast(&queue->cond);
+ while (queue->head || queue->suspend_requested) {
+ if (queue->head && !queue->locked) {
+ struct mp_dispatch_item *item = queue->head;
+ queue->head = item->next;
+ if (!queue->head)
+ queue->tail = NULL;
+ item->next = NULL;
+ // Unlock, because we want to allow other threads to queue items
+ // while the dispatch item is processed.
+ pthread_mutex_unlock(&queue->lock);
+ item->fn(item->fn_data);
+ pthread_mutex_lock(&queue->lock);
+ if (item->asynchronous) {
+ talloc_free(item);
+ } else {
+ item->completed = true;
+ // Wakeup mp_dispatch_run()
+ pthread_cond_broadcast(&queue->cond);
+ }
+ } else {
+ pthread_cond_wait(&queue->cond, &queue->lock);
+ }
+ }
+ queue->suspended = false;
+ pthread_mutex_unlock(&queue->lock);
+}
+
+// Set the target thread into suspend mode: in this mode, the thread will enter
+// mp_dispatch_queue_process(), process any outstanding dispatch items, and
+// wait for new items when done (instead of exiting the process function).
+// Multiple threads can enter suspend mode at the same time. Suspend mode is
+// not a synchronization mechanism; it merely makes sure the target thread does
+// not leave mp_dispatch_queue_process(), even if it's done. mp_dispatch_lock()
+// can be used for exclusive access.
+void mp_dispatch_suspend(struct mp_dispatch_queue *queue)
+{
+ pthread_mutex_lock(&queue->lock);
+ queue->suspend_requested++;
+ while (!queue->suspended)
+ pthread_cond_wait(&queue->cond, &queue->lock);
+ pthread_mutex_unlock(&queue->lock);
+}
+
+// Undo mp_dispatch_suspend().
+void mp_dispatch_resume(struct mp_dispatch_queue *queue)
+{
+ pthread_mutex_lock(&queue->lock);
+ assert(queue->suspend_requested > 0);
+ queue->suspend_requested--;
+ pthread_cond_broadcast(&queue->cond);
+ pthread_mutex_unlock(&queue->lock);
+}
+
+// Grant exclusive access to the target thread's state. While this is active,
+// no other thread can return from mp_dispatch_lock() (i.e. it behaves like
+// a pthread mutex), and no other thread can get dispatch items completed.
+// Other threads can still queue asynchronous dispatch items without waiting,
+// and the mutex behavior applies to this function only.
+void mp_dispatch_lock(struct mp_dispatch_queue *queue)
+{
+ // TODO: acquiring a lock should probably be serialized with other
+ // dispatch items to guarantee minimum fairness.
+ pthread_mutex_lock(&queue->lock);
+ queue->suspend_requested++;
+ while (1) {
+ if (queue->suspended && !queue->locked) {
+ queue->locked = true;
+ break;
+ }
+ pthread_cond_wait(&queue->cond, &queue->lock);
+ }
+ pthread_mutex_unlock(&queue->lock);
+}
+
+// Undo mp_dispatch_lock().
+void mp_dispatch_unlock(struct mp_dispatch_queue *queue)
+{
+ pthread_mutex_lock(&queue->lock);
+ assert(queue->locked);
+ assert(queue->suspend_requested > 0);
+ queue->locked = false;
+ queue->suspend_requested--;
+ pthread_cond_broadcast(&queue->cond);
+ pthread_mutex_unlock(&queue->lock);
+}
diff --git a/osdep/threads.h b/osdep/threads.h
index 3d060009cb..3eb4ce2ffc 100644
--- a/osdep/threads.h
+++ b/osdep/threads.h
@@ -1,6 +1,7 @@
#ifndef MP_OSDEP_THREADS_H_
#define MP_OSDEP_THREADS_H_
+#include <stdbool.h>
#include <pthread.h>
struct timespec mpthread_get_deadline(double timeout);
@@ -10,4 +11,24 @@ int mpthread_cond_timed_wait(pthread_cond_t *cond, pthread_mutex_t *mutex,
int mpthread_mutex_init_recursive(pthread_mutex_t *mutex);
+
+typedef void (*mp_dispatch_fn)(void *data);
+struct mp_dispatch_queue;
+
+struct mp_dispatch_queue *mp_dispatch_create(void *talloc_parent);
+void mp_dispatch_set_wakeup_fn(struct mp_dispatch_queue *queue,
+ void (*wakeup_fn)(void *wakeup_ctx),
+ void *wakeup_ctx);
+void mp_dispatch_enqueue(struct mp_dispatch_queue *queue,
+ mp_dispatch_fn fn, void *fn_data);
+void mp_dispatch_enqueue_autofree(struct mp_dispatch_queue *queue,
+ mp_dispatch_fn fn, void *fn_data);
+void mp_dispatch_run(struct mp_dispatch_queue *queue,
+ mp_dispatch_fn fn, void *fn_data);
+void mp_dispatch_queue_process(struct mp_dispatch_queue *queue, double timeout);
+void mp_dispatch_suspend(struct mp_dispatch_queue *queue);
+void mp_dispatch_resume(struct mp_dispatch_queue *queue);
+void mp_dispatch_lock(struct mp_dispatch_queue *queue);
+void mp_dispatch_unlock(struct mp_dispatch_queue *queue);
+
#endif