summaryrefslogtreecommitdiffstats
path: root/misc
diff options
context:
space:
mode:
authorwm4 <wm4@nowhere>2014-04-23 20:37:57 +0200
committerwm4 <wm4@nowhere>2014-04-23 21:16:51 +0200
commit2b26517ef705c4043419e2e70dacd7760af1137e (patch)
treedf04daafcbc47dd91a35b9404b4d8bd1e9373609 /misc
parentac1a21e488f1c6fb4963b40e0ea2da3d523c45ba (diff)
downloadmpv-2b26517ef705c4043419e2e70dacd7760af1137e.tar.bz2
mpv-2b26517ef705c4043419e2e70dacd7760af1137e.tar.xz
dispatch: move into its own source file
This was part of osdep/threads.c out of laziness. But it doesn't contain anything OS dependent. Note that the rest of threads.c actually isn't all that OS dependent either (just some minor ifdeffery to work around the lack of clock_gettime() on OSX).
Diffstat (limited to 'misc')
-rw-r--r--misc/dispatch.c251
-rw-r--r--misc/dispatch.h23
2 files changed, 274 insertions, 0 deletions
diff --git a/misc/dispatch.c b/misc/dispatch.c
new file mode 100644
index 0000000000..69192e8514
--- /dev/null
+++ b/misc/dispatch.c
@@ -0,0 +1,251 @@
+/*
+ * This file is part of mpv.
+ *
+ * mpv is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * mpv is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with mpv. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#include <stdbool.h>
+#include <assert.h>
+
+#include "common/common.h"
+#include "osdep/threads.h"
+
+#include "dispatch.h"
+
+struct mp_dispatch_queue {
+ struct mp_dispatch_item *head, *tail;
+ pthread_mutex_t lock;
+ pthread_cond_t cond;
+ int suspend_requested;
+ bool suspended;
+ bool locked;
+ void (*wakeup_fn)(void *wakeup_ctx);
+ void *wakeup_ctx;
+};
+
+struct mp_dispatch_item {
+ mp_dispatch_fn fn;
+ void *fn_data;
+ bool asynchronous;
+ bool completed;
+ struct mp_dispatch_item *next;
+};
+
+static void queue_dtor(void *p)
+{
+ struct mp_dispatch_queue *queue = p;
+ assert(!queue->head);
+ assert(!queue->suspend_requested);
+ assert(!queue->suspended);
+ assert(!queue->locked);
+ pthread_cond_destroy(&queue->cond);
+ pthread_mutex_destroy(&queue->lock);
+}
+
+// A dispatch queue lets other threads runs callbacks in a target thread.
+// The target thread is the thread which created the queue and which calls
+// mp_dispatch_queue_process().
+// Free the dispatch queue with talloc_free(). (It must be empty.)
+struct mp_dispatch_queue *mp_dispatch_create(void *talloc_parent)
+{
+ struct mp_dispatch_queue *queue = talloc_ptrtype(talloc_parent, queue);
+ *queue = (struct mp_dispatch_queue){0};
+ talloc_set_destructor(queue, queue_dtor);
+ pthread_mutex_init(&queue->lock, NULL);
+ pthread_cond_init(&queue->cond, NULL);
+ return queue;
+}
+
+// Set a custom function that should be called to guarantee that the target
+// thread wakes up. This is intended for use with code that needs to block
+// on non-pthread primitives, such as e.g. select(). In the case of select(),
+// the wakeup_fn could for example write a byte into a "wakeup" pipe in order
+// to unblock the select(). The wakeup_fn is called from the dispatch queue
+// when there are new dispatch items, and the target thread should then enter
+// mp_dispatch_queue_process() as soon as possible. Note that wakeup_fn is
+// called under no lock, so you might have to do synchronization yourself.
+void mp_dispatch_set_wakeup_fn(struct mp_dispatch_queue *queue,
+ void (*wakeup_fn)(void *wakeup_ctx),
+ void *wakeup_ctx)
+{
+ queue->wakeup_fn = wakeup_fn;
+ queue->wakeup_ctx = wakeup_ctx;
+}
+
+static void mp_dispatch_append(struct mp_dispatch_queue *queue,
+ struct mp_dispatch_item *item)
+{
+ pthread_mutex_lock(&queue->lock);
+ if (queue->tail) {
+ queue->tail->next = item;
+ } else {
+ queue->head = item;
+ }
+ queue->tail = item;
+ // Wake up the main thread; note that other threads might wait on this
+ // condition for reasons, so broadcast the condition.
+ pthread_cond_broadcast(&queue->cond);
+ pthread_mutex_unlock(&queue->lock);
+ if (queue->wakeup_fn)
+ queue->wakeup_fn(queue->wakeup_ctx);
+}
+
+// Let the dispatch item process asynchronously. item->fn will be run in the
+// target thread's context, but note that mp_dispatch_enqueue() will usually
+// return long before that happens. It's up to the user to signal completion
+// of the callback. It's also up to the user to guarantee that the context
+// (fn_data) has correct lifetime, i.e. lives until the callback is run, and
+// is freed after that.
+void mp_dispatch_enqueue(struct mp_dispatch_queue *queue,
+ mp_dispatch_fn fn, void *fn_data)
+{
+ struct mp_dispatch_item *item = talloc_ptrtype(NULL, item);
+ *item = (struct mp_dispatch_item){
+ .fn = fn,
+ .fn_data = fn_data,
+ .asynchronous = true,
+ };
+ mp_dispatch_append(queue, item);
+}
+
+// Like mp_dispatch_enqueue(), but the queue code will call talloc_free(fn_data)
+// after the fn callback has been run. (The callback could trivially do that
+// itself, but it makes it easier to implement synchronous and asynchronous
+// requests with the same callback implementation.)
+void mp_dispatch_enqueue_autofree(struct mp_dispatch_queue *queue,
+ mp_dispatch_fn fn, void *fn_data)
+{
+ struct mp_dispatch_item *item = talloc_ptrtype(NULL, item);
+ *item = (struct mp_dispatch_item){
+ .fn = fn,
+ .fn_data = talloc_steal(item, fn_data),
+ .asynchronous = true,
+ };
+ mp_dispatch_append(queue, item);
+}
+
+// Run the dispatch item synchronously. item->fn will be run in the target
+// thread's context, and this function will wait until it's done.
+void mp_dispatch_run(struct mp_dispatch_queue *queue,
+ mp_dispatch_fn fn, void *fn_data)
+{
+ struct mp_dispatch_item item = {
+ .fn = fn,
+ .fn_data = fn_data,
+ };
+ mp_dispatch_append(queue, &item);
+
+ pthread_mutex_lock(&queue->lock);
+ while (!item.completed)
+ pthread_cond_wait(&queue->cond, &queue->lock);
+ pthread_mutex_unlock(&queue->lock);
+}
+
+// Process any outstanding dispatch items in the queue. This also handles
+// suspending or locking the target thread.
+// The timeout specifies the maximum wait time, but the actual time spent in
+// this function can be much higher if the suspending/locking functions are
+// used, or if executing the dispatch items takes time.
+// TODO: implement timeout
+void mp_dispatch_queue_process(struct mp_dispatch_queue *queue, double timeout)
+{
+ pthread_mutex_lock(&queue->lock);
+ queue->suspended = true;
+ // Wake up thread which called mp_dispatch_suspend().
+ pthread_cond_broadcast(&queue->cond);
+ while (queue->head || queue->suspend_requested) {
+ if (queue->head && !queue->locked) {
+ struct mp_dispatch_item *item = queue->head;
+ queue->head = item->next;
+ if (!queue->head)
+ queue->tail = NULL;
+ item->next = NULL;
+ // Unlock, because we want to allow other threads to queue items
+ // while the dispatch item is processed.
+ pthread_mutex_unlock(&queue->lock);
+ item->fn(item->fn_data);
+ pthread_mutex_lock(&queue->lock);
+ if (item->asynchronous) {
+ talloc_free(item);
+ } else {
+ item->completed = true;
+ // Wakeup mp_dispatch_run()
+ pthread_cond_broadcast(&queue->cond);
+ }
+ } else {
+ pthread_cond_wait(&queue->cond, &queue->lock);
+ }
+ }
+ queue->suspended = false;
+ pthread_mutex_unlock(&queue->lock);
+}
+
+// Set the target thread into suspend mode: in this mode, the thread will enter
+// mp_dispatch_queue_process(), process any outstanding dispatch items, and
+// wait for new items when done (instead of exiting the process function).
+// Multiple threads can enter suspend mode at the same time. Suspend mode is
+// not a synchronization mechanism; it merely makes sure the target thread does
+// not leave mp_dispatch_queue_process(), even if it's done. mp_dispatch_lock()
+// can be used for exclusive access.
+void mp_dispatch_suspend(struct mp_dispatch_queue *queue)
+{
+ pthread_mutex_lock(&queue->lock);
+ queue->suspend_requested++;
+ while (!queue->suspended)
+ pthread_cond_wait(&queue->cond, &queue->lock);
+ pthread_mutex_unlock(&queue->lock);
+}
+
+// Undo mp_dispatch_suspend().
+void mp_dispatch_resume(struct mp_dispatch_queue *queue)
+{
+ pthread_mutex_lock(&queue->lock);
+ assert(queue->suspend_requested > 0);
+ queue->suspend_requested--;
+ pthread_cond_broadcast(&queue->cond);
+ pthread_mutex_unlock(&queue->lock);
+}
+
+// Grant exclusive access to the target thread's state. While this is active,
+// no other thread can return from mp_dispatch_lock() (i.e. it behaves like
+// a pthread mutex), and no other thread can get dispatch items completed.
+// Other threads can still queue asynchronous dispatch items without waiting,
+// and the mutex behavior applies to this function only.
+void mp_dispatch_lock(struct mp_dispatch_queue *queue)
+{
+ // TODO: acquiring a lock should probably be serialized with other
+ // dispatch items to guarantee minimum fairness.
+ pthread_mutex_lock(&queue->lock);
+ queue->suspend_requested++;
+ while (1) {
+ if (queue->suspended && !queue->locked) {
+ queue->locked = true;
+ break;
+ }
+ pthread_cond_wait(&queue->cond, &queue->lock);
+ }
+ pthread_mutex_unlock(&queue->lock);
+}
+
+// Undo mp_dispatch_lock().
+void mp_dispatch_unlock(struct mp_dispatch_queue *queue)
+{
+ pthread_mutex_lock(&queue->lock);
+ assert(queue->locked);
+ assert(queue->suspend_requested > 0);
+ queue->locked = false;
+ queue->suspend_requested--;
+ pthread_cond_broadcast(&queue->cond);
+ pthread_mutex_unlock(&queue->lock);
+}
diff --git a/misc/dispatch.h b/misc/dispatch.h
new file mode 100644
index 0000000000..96a5d7cf4b
--- /dev/null
+++ b/misc/dispatch.h
@@ -0,0 +1,23 @@
+#ifndef MP_DISPATCH_H_
+#define MP_DISPATCH_H_
+
+typedef void (*mp_dispatch_fn)(void *data);
+struct mp_dispatch_queue;
+
+struct mp_dispatch_queue *mp_dispatch_create(void *talloc_parent);
+void mp_dispatch_set_wakeup_fn(struct mp_dispatch_queue *queue,
+ void (*wakeup_fn)(void *wakeup_ctx),
+ void *wakeup_ctx);
+void mp_dispatch_enqueue(struct mp_dispatch_queue *queue,
+ mp_dispatch_fn fn, void *fn_data);
+void mp_dispatch_enqueue_autofree(struct mp_dispatch_queue *queue,
+ mp_dispatch_fn fn, void *fn_data);
+void mp_dispatch_run(struct mp_dispatch_queue *queue,
+ mp_dispatch_fn fn, void *fn_data);
+void mp_dispatch_queue_process(struct mp_dispatch_queue *queue, double timeout);
+void mp_dispatch_suspend(struct mp_dispatch_queue *queue);
+void mp_dispatch_resume(struct mp_dispatch_queue *queue);
+void mp_dispatch_lock(struct mp_dispatch_queue *queue);
+void mp_dispatch_unlock(struct mp_dispatch_queue *queue);
+
+#endif