summaryrefslogtreecommitdiffstats
path: root/misc
diff options
context:
space:
mode:
Diffstat (limited to 'misc')
-rw-r--r--misc/json.c59
-rw-r--r--misc/linked_list.h107
-rw-r--r--misc/node.c65
-rw-r--r--misc/node.h3
-rw-r--r--misc/thread_pool.c200
-rw-r--r--misc/thread_pool.h28
-rw-r--r--misc/thread_tools.c269
-rw-r--r--misc/thread_tools.h82
8 files changed, 752 insertions, 61 deletions
diff --git a/misc/json.c b/misc/json.c
index 4797fde4d0..d1b2afddb6 100644
--- a/misc/json.c
+++ b/misc/json.c
@@ -22,9 +22,14 @@
* doesn't verify what's passed to strtod(), and also prefers parsing numbers
* as integers with stroll() if possible).
*
- * Does not support extensions like unquoted string literals.
+ * It has some non-standard extensions which shouldn't conflict with JSON:
+ * - a list or object item can have a trailing ","
+ * - object syntax accepts "=" in addition of ":"
+ * - object keys can be unquoted, if they start with a character in [A-Za-z_]
+ * and contain only characters in [A-Za-z0-9_]
+ * - byte escapes with "\xAB" are allowed (with AB being a 2 digit hex number)
*
- * Also see: http://tools.ietf.org/html/rfc4627
+ * Also see: http://tools.ietf.org/html/rfc8259
*
* JSON writer:
*
@@ -34,9 +39,6 @@
* to deal with somehow: either by using byte-strings for JSON, or by running
* a "fixup" pass on the input data. The latter could for example change
* invalid UTF-8 sequences to replacement characters.
- *
- * Currently, will insert \u literals for characters 0-31, '"', '\', and write
- * everything else literally.
*/
#include <stdlib.h>
@@ -48,6 +50,7 @@
#include "common/common.h"
#include "misc/bstr.h"
+#include "misc/ctype.h"
#include "json.h"
@@ -75,6 +78,24 @@ void json_skip_whitespace(char **src)
eat_ws(src);
}
+static int read_id(void *ta_parent, struct mpv_node *dst, char **src)
+{
+ char *start = *src;
+ if (!mp_isalpha(**src) && **src != '_')
+ return -1;
+ while (mp_isalnum(**src) || **src == '_')
+ *src += 1;
+ if (**src == ' ') {
+ **src = '\0'; // we're allowed to mutate it => can avoid the strndup
+ *src += 1;
+ } else {
+ start = talloc_strndup(ta_parent, start, *src - start);
+ }
+ dst->format = MPV_FORMAT_STRING;
+ dst->u.string = start;
+ return 0;
+}
+
static int read_str(void *ta_parent, struct mpv_node *dst, char **src)
{
if (!eat_c(src, '"'))
@@ -125,12 +146,18 @@ static int read_sub(void *ta_parent, struct mpv_node *dst, char **src,
if (list->num > 0 && !eat_c(src, ','))
return -1; // missing ','
eat_ws(src);
+ // non-standard extension: allow a trailing ","
+ if (eat_c(src, term))
+ break;
if (is_obj) {
struct mpv_node keynode;
- if (read_str(list, &keynode, src) < 0)
+ // non-standard extension: allow unquoted strings as keys
+ if (read_id(list, &keynode, src) < 0 &&
+ read_str(list, &keynode, src) < 0)
return -1; // key is not a string
eat_ws(src);
- if (!eat_c(src, ':'))
+ // non-standard extension: allow "=" instead of ":"
+ if (!eat_c(src, ':') && !eat_c(src, '='))
return -1; // ':' missing
eat_ws(src);
MP_TARRAY_GROW(list, list->keys, list->num);
@@ -218,6 +245,14 @@ int json_parse(void *ta_parent, struct mpv_node *dst, char **src, int max_depth)
#define APPEND(b, s) bstr_xappend(NULL, (b), bstr0(s))
+static const char special_escape[] = {
+ ['\b'] = 'b',
+ ['\f'] = 'f',
+ ['\n'] = 'n',
+ ['\r'] = 'r',
+ ['\t'] = 't',
+};
+
static void write_json_str(bstr *b, unsigned char *str)
{
APPEND(b, "\"");
@@ -228,7 +263,15 @@ static void write_json_str(bstr *b, unsigned char *str)
if (!cur[0])
break;
bstr_xappend(NULL, b, (bstr){str, cur - str});
- bstr_xappend_asprintf(NULL, b, "\\u%04x", (unsigned char)cur[0]);
+ if (cur[0] == '\"') {
+ bstr_xappend(NULL, b, (bstr){"\\\"", 2});
+ } else if (cur[0] == '\\') {
+ bstr_xappend(NULL, b, (bstr){"\\\\", 2});
+ } else if (cur[0] < sizeof(special_escape) && special_escape[cur[0]]) {
+ bstr_xappend_asprintf(NULL, b, "\\%c", special_escape[cur[0]]);
+ } else {
+ bstr_xappend_asprintf(NULL, b, "\\u%04x", (unsigned char)cur[0]);
+ }
str = cur + 1;
}
APPEND(b, str);
diff --git a/misc/linked_list.h b/misc/linked_list.h
new file mode 100644
index 0000000000..b43b227d90
--- /dev/null
+++ b/misc/linked_list.h
@@ -0,0 +1,107 @@
+#pragma once
+
+#include <stddef.h>
+
+/*
+ * Doubly linked list macros. All of these require that each list item is a
+ * struct, that contains a field, that is another struct with prev/next fields:
+ *
+ * struct example_item {
+ * struct {
+ * struct example_item *prev, *next;
+ * } mylist;
+ * };
+ *
+ * And a struct somewhere that represents the "list" and has head/tail fields:
+ *
+ * struct {
+ * struct example_item *head, *tail;
+ * } mylist_var;
+ *
+ * Then you can e.g. insert elements like this:
+ *
+ * struct example_item item;
+ * LL_APPEND(mylist, &mylist_var, &item);
+ *
+ * The first macro argument is always the name if the field in the item that
+ * contains the prev/next pointers, in this case struct example_item.mylist.
+ * This was done so that a single item can be in multiple lists.
+ *
+ * The list is started/terminated with NULL. Nothing ever points _to_ the
+ * list head, so the list head memory location can be safely moved.
+ *
+ * General rules are:
+ * - list head is initialized by setting head/tail to NULL
+ * - list items do not need to be initialized before inserting them
+ * - next/prev fields of list items are not cleared when they are removed
+ * - there's no way to know whether an item is in the list or not (unless
+ * you clear prev/next on init/removal, _and_ check whether items with
+ * prev/next==NULL are referenced by head/tail)
+ */
+
+// Insert item at the end of the list (list->tail == item).
+// Undefined behavior if item is already in the list.
+#define LL_APPEND(field, list, item) do { \
+ (item)->field.prev = (list)->tail; \
+ (item)->field.next = NULL; \
+ LL_RELINK_(field, list, item) \
+} while (0)
+
+// Insert item enew after eprev (i.e. eprev->next == enew). If eprev is NULL,
+// then insert it as head (list->head == enew).
+// Undefined behavior if enew is already in the list, or eprev isn't.
+#define LL_INSERT_AFTER(field, list, eprev, enew) do { \
+ (enew)->field.prev = (eprev); \
+ (enew)->field.next = (eprev) ? (eprev)->field.next : (list)->head; \
+ LL_RELINK_(field, list, enew) \
+} while (0)
+
+// Insert item at the start of the list (list->head == item).
+// Undefined behavior if item is already in the list.
+#define LL_PREPEND(field, list, item) do { \
+ (item)->field.prev = NULL; \
+ (item)->field.next = (list)->head; \
+ LL_RELINK_(field, list, item) \
+} while (0)
+
+// Insert item enew before enext (i.e. enew->next == enext). If enext is NULL,
+// then insert it as tail (list->tail == enew).
+// Undefined behavior if enew is already in the list, or enext isn't.
+#define LL_INSERT_BEFORE(field, list, enext, enew) do { \
+ (enew)->field.prev = (enext) ? (enext)->field.prev : (list)->tail; \
+ (enew)->field.next = (enext); \
+ LL_RELINK_(field, list, enew) \
+} while (0)
+
+// Remove the item from the list.
+// Undefined behavior if item is not in the list.
+#define LL_REMOVE(field, list, item) do { \
+ if ((item)->field.prev) { \
+ (item)->field.prev->field.next = (item)->field.next; \
+ } else { \
+ (list)->head = (item)->field.next; \
+ } \
+ if ((item)->field.next) { \
+ (item)->field.next->field.prev = (item)->field.prev; \
+ } else { \
+ (list)->tail = (item)->field.prev; \
+ } \
+} while (0)
+
+// Remove all items from the list.
+#define LL_CLEAR(field, list) do { \
+ (list)->head = (list)->tail = NULL; \
+} while (0)
+
+// Internal helper.
+#define LL_RELINK_(field, list, item) \
+ if ((item)->field.prev) { \
+ (item)->field.prev->field.next = (item); \
+ } else { \
+ (list)->head = (item); \
+ } \
+ if ((item)->field.next) { \
+ (item)->field.next->field.prev = (item); \
+ } else { \
+ (list)->tail = (item); \
+ }
diff --git a/misc/node.c b/misc/node.c
index b7bf06d9c1..9b45291a5f 100644
--- a/misc/node.c
+++ b/misc/node.c
@@ -81,3 +81,68 @@ void node_map_add_flag(struct mpv_node *dst, const char *key, bool v)
{
node_map_add(dst, key, MPV_FORMAT_FLAG)->u.flag = v;
}
+
+mpv_node *node_map_get(mpv_node *src, const char *key)
+{
+ if (src->format != MPV_FORMAT_NODE_MAP)
+ return NULL;
+
+ for (int i = 0; i < src->u.list->num; i++) {
+ if (strcmp(key, src->u.list->keys[i]) == 0)
+ return &src->u.list->values[i];
+ }
+
+ return NULL;
+}
+
+// Note: for MPV_FORMAT_NODE_MAP, this (incorrectly) takes the order into
+// account, instead of treating it as set.
+bool equal_mpv_value(const void *a, const void *b, mpv_format format)
+{
+ switch (format) {
+ case MPV_FORMAT_NONE:
+ return true;
+ case MPV_FORMAT_STRING:
+ case MPV_FORMAT_OSD_STRING:
+ return strcmp(*(char **)a, *(char **)b) == 0;
+ case MPV_FORMAT_FLAG:
+ return *(int *)a == *(int *)b;
+ case MPV_FORMAT_INT64:
+ return *(int64_t *)a == *(int64_t *)b;
+ case MPV_FORMAT_DOUBLE:
+ return *(double *)a == *(double *)b;
+ case MPV_FORMAT_NODE:
+ return equal_mpv_node(a, b);
+ case MPV_FORMAT_BYTE_ARRAY: {
+ const struct mpv_byte_array *a_r = a, *b_r = b;
+ if (a_r->size != b_r->size)
+ return false;
+ return memcmp(a_r->data, b_r->data, a_r->size) == 0;
+ }
+ case MPV_FORMAT_NODE_ARRAY:
+ case MPV_FORMAT_NODE_MAP:
+ {
+ mpv_node_list *l_a = *(mpv_node_list **)a, *l_b = *(mpv_node_list **)b;
+ if (l_a->num != l_b->num)
+ return false;
+ for (int n = 0; n < l_a->num; n++) {
+ if (format == MPV_FORMAT_NODE_MAP) {
+ if (strcmp(l_a->keys[n], l_b->keys[n]) != 0)
+ return false;
+ }
+ if (!equal_mpv_node(&l_a->values[n], &l_b->values[n]))
+ return false;
+ }
+ return true;
+ }
+ }
+ abort(); // supposed to be able to handle all defined types
+}
+
+// Remarks see equal_mpv_value().
+bool equal_mpv_node(const struct mpv_node *a, const struct mpv_node *b)
+{
+ if (a->format != b->format)
+ return false;
+ return equal_mpv_value(&a->u, &b->u, a->format);
+}
diff --git a/misc/node.h b/misc/node.h
index a1bdab0ae1..419f3fc505 100644
--- a/misc/node.h
+++ b/misc/node.h
@@ -10,5 +10,8 @@ void node_map_add_string(struct mpv_node *dst, const char *key, const char *val)
void node_map_add_int64(struct mpv_node *dst, const char *key, int64_t v);
void node_map_add_double(struct mpv_node *dst, const char *key, double v);
void node_map_add_flag(struct mpv_node *dst, const char *key, bool v);
+mpv_node *node_map_get(mpv_node *src, const char *key);
+bool equal_mpv_value(const void *a, const void *b, mpv_format format);
+bool equal_mpv_node(const struct mpv_node *a, const struct mpv_node *b);
#endif
diff --git a/misc/thread_pool.c b/misc/thread_pool.c
index dddfad6734..217c990c19 100644
--- a/misc/thread_pool.c
+++ b/misc/thread_pool.c
@@ -1,40 +1,51 @@
-/*
- * This file is part of mpv.
+/* Copyright (C) 2018 the mpv developers
*
- * mpv is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 2.1 of the License, or (at your option) any later version.
+ * Permission to use, copy, modify, and/or distribute this software for any
+ * purpose with or without fee is hereby granted, provided that the above
+ * copyright notice and this permission notice appear in all copies.
*
- * mpv is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with mpv. If not, see <http://www.gnu.org/licenses/>.
+ * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+ * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+ * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+ * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+ * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+ * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+ * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
#include <pthread.h>
#include "common/common.h"
+#include "osdep/threads.h"
+#include "osdep/timer.h"
#include "thread_pool.h"
+// Threads destroy themselves after this many seconds, if there's no new work
+// and the thread count is above the configured minimum.
+#define DESTROY_TIMEOUT 10
+
struct work {
void (*fn)(void *ctx);
void *fn_ctx;
};
struct mp_thread_pool {
- pthread_t *threads;
- int num_threads;
+ int min_threads, max_threads;
pthread_mutex_t lock;
pthread_cond_t wakeup;
// --- the following fields are protected by lock
+
+ pthread_t *threads;
+ int num_threads;
+
+ // Number of threads which have taken up work and are still processing it.
+ int busy_threads;
+
bool terminate;
+
struct work *work;
int num_work;
};
@@ -43,25 +54,61 @@ static void *worker_thread(void *arg)
{
struct mp_thread_pool *pool = arg;
+ mpthread_set_name("worker");
+
pthread_mutex_lock(&pool->lock);
- while (1) {
- while (!pool->num_work && !pool->terminate)
- pthread_cond_wait(&pool->wakeup, &pool->lock);
- if (!pool->num_work && pool->terminate)
- break;
+ struct timespec ts = {0};
+ bool got_timeout = false;
+ while (1) {
+ struct work work = {0};
+ if (pool->num_work > 0) {
+ work = pool->work[pool->num_work - 1];
+ pool->num_work -= 1;
+ }
- assert(pool->num_work > 0);
- struct work work = pool->work[pool->num_work - 1];
- pool->num_work -= 1;
+ if (!work.fn) {
+ if (got_timeout || pool->terminate)
+ break;
+
+ if (pool->num_threads > pool->min_threads) {
+ if (!ts.tv_sec && !ts.tv_nsec)
+ ts = mp_rel_time_to_timespec(DESTROY_TIMEOUT);
+ if (pthread_cond_timedwait(&pool->wakeup, &pool->lock, &ts))
+ got_timeout = pool->num_threads > pool->min_threads;
+ } else {
+ pthread_cond_wait(&pool->wakeup, &pool->lock);
+ }
+ continue;
+ }
+ pool->busy_threads += 1;
pthread_mutex_unlock(&pool->lock);
+
work.fn(work.fn_ctx);
+
pthread_mutex_lock(&pool->lock);
+ pool->busy_threads -= 1;
+
+ ts = (struct timespec){0};
+ got_timeout = false;
+ }
+
+ // If no termination signal was given, it must mean we died because of a
+ // timeout, and nobody is waiting for us. We have to remove ourselves.
+ if (!pool->terminate) {
+ for (int n = 0; n < pool->num_threads; n++) {
+ if (pthread_equal(pool->threads[n], pthread_self())) {
+ pthread_detach(pthread_self());
+ MP_TARRAY_REMOVE_AT(pool->threads, pool->num_threads, n);
+ pthread_mutex_unlock(&pool->lock);
+ return NULL;
+ }
+ }
+ assert(0);
}
- assert(pool->num_work == 0);
- pthread_mutex_unlock(&pool->lock);
+ pthread_mutex_unlock(&pool->lock);
return NULL;
}
@@ -69,27 +116,46 @@ static void thread_pool_dtor(void *ctx)
{
struct mp_thread_pool *pool = ctx;
+
pthread_mutex_lock(&pool->lock);
+
pool->terminate = true;
pthread_cond_broadcast(&pool->wakeup);
+
+ pthread_t *threads = pool->threads;
+ int num_threads = pool->num_threads;
+
+ pool->threads = NULL;
+ pool->num_threads = 0;
+
pthread_mutex_unlock(&pool->lock);
- for (int n = 0; n < pool->num_threads; n++)
- pthread_join(pool->threads[n], NULL);
+ for (int n = 0; n < num_threads; n++)
+ pthread_join(threads[n], NULL);
assert(pool->num_work == 0);
+ assert(pool->num_threads == 0);
pthread_cond_destroy(&pool->wakeup);
pthread_mutex_destroy(&pool->lock);
}
-// Create a thread pool with the given number of worker threads. This can return
-// NULL if the worker threads could not be created. The thread pool can be
-// destroyed with talloc_free(pool), or indirectly with talloc_free(ta_parent).
-// If there are still work items on freeing, it will block until all work items
-// are done, and the threads terminate.
-struct mp_thread_pool *mp_thread_pool_create(void *ta_parent, int threads)
+static bool add_thread(struct mp_thread_pool *pool)
{
- assert(threads > 0);
+ pthread_t thread;
+
+ if (pthread_create(&thread, NULL, worker_thread, pool) != 0)
+ return false;
+
+ MP_TARRAY_APPEND(pool, pool->threads, pool->num_threads, thread);
+ return true;
+}
+
+struct mp_thread_pool *mp_thread_pool_create(void *ta_parent, int init_threads,
+ int min_threads, int max_threads)
+{
+ assert(min_threads >= 0);
+ assert(init_threads <= min_threads);
+ assert(max_threads > 0 && max_threads >= min_threads);
struct mp_thread_pool *pool = talloc_zero(ta_parent, struct mp_thread_pool);
talloc_set_destructor(pool, thread_pool_dtor);
@@ -97,29 +163,61 @@ struct mp_thread_pool *mp_thread_pool_create(void *ta_parent, int threads)
pthread_mutex_init(&pool->lock, NULL);
pthread_cond_init(&pool->wakeup, NULL);
- for (int n = 0; n < threads; n++) {
- pthread_t thread;
- if (pthread_create(&thread, NULL, worker_thread, pool)) {
- talloc_free(pool);
- return NULL;
- }
- MP_TARRAY_APPEND(pool, pool->threads, pool->num_threads, thread);
- }
+ pool->min_threads = min_threads;
+ pool->max_threads = max_threads;
+
+ pthread_mutex_lock(&pool->lock);
+ for (int n = 0; n < init_threads; n++)
+ add_thread(pool);
+ bool ok = pool->num_threads >= init_threads;
+ pthread_mutex_unlock(&pool->lock);
+
+ if (!ok)
+ TA_FREEP(&pool);
return pool;
}
-// Queue a function to be run on a worker thread: fn(fn_ctx)
-// If no worker thread is currently available, it's appended to a list in memory
-// with unbounded size. This function always returns immediately.
-// Concurrent queue calls are allowed, as long as it does not overlap with
-// pool destruction.
-void mp_thread_pool_queue(struct mp_thread_pool *pool, void (*fn)(void *ctx),
- void *fn_ctx)
+static bool thread_pool_add(struct mp_thread_pool *pool, void (*fn)(void *ctx),
+ void *fn_ctx, bool allow_queue)
{
+ bool ok = true;
+
+ assert(fn);
+
pthread_mutex_lock(&pool->lock);
struct work work = {fn, fn_ctx};
- MP_TARRAY_INSERT_AT(pool, pool->work, pool->num_work, 0, work);
- pthread_cond_signal(&pool->wakeup);
+
+ // If there are not enough threads to process all at once, but we can
+ // create a new thread, then do so. If work is queued quickly, it can
+ // happen that not all available threads have picked up work yet (up to
+ // num_threads - busy_threads threads), which has to be accounted for.
+ if (pool->busy_threads + pool->num_work + 1 > pool->num_threads &&
+ pool->num_threads < pool->max_threads)
+ {
+ if (!add_thread(pool)) {
+ // If we can queue it, it'll get done as long as there is 1 thread.
+ ok = allow_queue && pool->num_threads > 0;
+ }
+ }
+
+ if (ok) {
+ MP_TARRAY_INSERT_AT(pool, pool->work, pool->num_work, 0, work);
+ pthread_cond_signal(&pool->wakeup);
+ }
+
pthread_mutex_unlock(&pool->lock);
+ return ok;
+}
+
+bool mp_thread_pool_queue(struct mp_thread_pool *pool, void (*fn)(void *ctx),
+ void *fn_ctx)
+{
+ return thread_pool_add(pool, fn, fn_ctx, true);
+}
+
+bool mp_thread_pool_run(struct mp_thread_pool *pool, void (*fn)(void *ctx),
+ void *fn_ctx)
+{
+ return thread_pool_add(pool, fn, fn_ctx, false);
}
diff --git a/misc/thread_pool.h b/misc/thread_pool.h
index c7af7b2b57..14954da58f 100644
--- a/misc/thread_pool.h
+++ b/misc/thread_pool.h
@@ -3,8 +3,32 @@
struct mp_thread_pool;
-struct mp_thread_pool *mp_thread_pool_create(void *ta_parent, int threads);
-void mp_thread_pool_queue(struct mp_thread_pool *pool, void (*fn)(void *ctx),
+// Create a thread pool with the given number of worker threads. This can return
+// NULL if the worker threads could not be created. The thread pool can be
+// destroyed with talloc_free(pool), or indirectly with talloc_free(ta_parent).
+// If there are still work items on freeing, it will block until all work items
+// are done, and the threads terminate.
+// init_threads is the number of threads created in this function (and it fails
+// if it could not be done). min_threads must be >=, if it's >, then the
+// remaining threads will be created on demand, but never destroyed.
+// If init_threads > 0, then mp_thread_pool_queue() can never fail.
+// If init_threads == 0, mp_thread_pool_create() itself can never fail.
+struct mp_thread_pool *mp_thread_pool_create(void *ta_parent, int init_threads,
+ int min_threads, int max_threads);
+
+// Queue a function to be run on a worker thread: fn(fn_ctx)
+// If no worker thread is currently available, it's appended to a list in memory
+// with unbounded size. This function always returns immediately.
+// Concurrent queue calls are allowed, as long as it does not overlap with
+// pool destruction.
+// This function is explicitly thread-safe.
+// Cannot fail if thread pool was created with at least 1 thread.
+bool mp_thread_pool_queue(struct mp_thread_pool *pool, void (*fn)(void *ctx),
void *fn_ctx);
+// Like mp_thread_pool_queue(), but only queue the item and succeed if a thread
+// can be reserved for the item (i.e. minimal wait time instead of unbounded).
+bool mp_thread_pool_run(struct mp_thread_pool *pool, void (*fn)(void *ctx),
+ void *fn_ctx);
+
#endif
diff --git a/misc/thread_tools.c b/misc/thread_tools.c
new file mode 100644
index 0000000000..91b774eb93
--- /dev/null
+++ b/misc/thread_tools.c
@@ -0,0 +1,269 @@
+/* Copyright (C) 2018 the mpv developers
+ *
+ * Permission to use, copy, modify, and/or distribute this software for any
+ * purpose with or without fee is hereby granted, provided that the above
+ * copyright notice and this permission notice appear in all copies.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+ * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+ * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+ * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+ * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+ * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+ * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+ */
+
+#include <assert.h>
+#include <string.h>
+#include <sys/types.h>
+#include <unistd.h>
+#include <errno.h>
+
+#ifdef __MINGW32__
+#include <windows.h>
+#else
+#include <poll.h>
+#endif
+
+#include "common/common.h"
+#include "misc/linked_list.h"
+#include "osdep/atomic.h"
+#include "osdep/io.h"
+#include "osdep/timer.h"
+
+#include "thread_tools.h"
+
+uintptr_t mp_waiter_wait(struct mp_waiter *waiter)
+{
+ pthread_mutex_lock(&waiter->lock);
+ while (!waiter->done)
+ pthread_cond_wait(&waiter->wakeup, &waiter->lock);
+ pthread_mutex_unlock(&waiter->lock);
+
+ uintptr_t ret = waiter->value;
+
+ // We document that after mp_waiter_wait() the waiter object becomes
+ // invalid. (It strictly returns only after mp_waiter_wakeup() has returned,
+ // and the object is "single-shot".) So destroy it here.
+
+ // Normally, we expect that the system uses futexes, in which case the
+ // following functions will do nearly nothing. This is true for Windows
+ // and Linux. But some lesser OSes still might allocate kernel objects
+ // when initializing mutexes, so destroy them here.
+ pthread_mutex_destroy(&waiter->lock);
+ pthread_cond_destroy(&waiter->wakeup);
+
+ memset(waiter, 0xCA, sizeof(*waiter)); // for debugging
+
+ return ret;
+}
+
+void mp_waiter_wakeup(struct mp_waiter *waiter, uintptr_t value)
+{
+ pthread_mutex_lock(&waiter->lock);
+ assert(!waiter->done);
+ waiter->done = true;
+ waiter->value = value;
+ pthread_cond_signal(&waiter->wakeup);
+ pthread_mutex_unlock(&waiter->lock);
+}
+
+bool mp_waiter_poll(struct mp_waiter *waiter)
+{
+ pthread_mutex_lock(&waiter->lock);
+ bool r = waiter->done;
+ pthread_mutex_unlock(&waiter->lock);
+ return r;
+}
+
+struct mp_cancel {
+ pthread_mutex_t lock;
+ pthread_cond_t wakeup;
+
+ // Semaphore state and "mirrors".
+ atomic_bool triggered;
+ void (*cb)(void *ctx);
+ void *cb_ctx;
+ int wakeup_pipe[2];
+ void *win32_event; // actually HANDLE
+
+ // Slave list. These are automatically notified as well.
+ struct {
+ struct mp_cancel *head, *tail;
+ } slaves;
+
+ // For slaves. Synchronization is managed by parent.lock!
+ struct mp_cancel *parent;
+ struct {
+ struct mp_cancel *next, *prev;
+ } siblings;
+};
+
+static void cancel_destroy(void *p)
+{
+ struct mp_cancel *c = p;
+
+ assert(!c->slaves.head); // API user error
+
+ mp_cancel_set_parent(c, NULL);
+
+ if (c->wakeup_pipe[0] >= 0) {
+ close(c->wakeup_pipe[0]);
+ close(c->wakeup_pipe[1]);
+ }
+
+#ifdef __MINGW32__
+ if (c->win32_event)
+ CloseHandle(c->win32_event);
+#endif
+
+ pthread_mutex_destroy(&c->lock);
+ pthread_cond_destroy(&c->wakeup);
+}
+
+struct mp_cancel *mp_cancel_new(void *talloc_ctx)
+{
+ struct mp_cancel *c = talloc_ptrtype(talloc_ctx, c);
+ talloc_set_destructor(c, cancel_destroy);
+ *c = (struct mp_cancel){
+ .triggered = ATOMIC_VAR_INIT(false),
+ .wakeup_pipe = {-1, -1},
+ };
+ pthread_mutex_init(&c->lock, NULL);
+ pthread_cond_init(&c->wakeup, NULL);
+ return c;
+}
+
+static void trigger_locked(struct mp_cancel *c)
+{
+ atomic_store(&c->triggered, true);
+
+ pthread_cond_broadcast(&c->wakeup); // condition bound to c->triggered
+
+ if (c->cb)
+ c->cb(c->cb_ctx);
+
+ for (struct mp_cancel *sub = c->slaves.head; sub; sub = sub->siblings.next)
+ mp_cancel_trigger(sub);
+
+ if (c->wakeup_pipe[1] >= 0)
+ (void)write(c->wakeup_pipe[1], &(char){0}, 1);
+
+#ifdef __MINGW32__
+ if (c->win32_event)
+ SetEvent(c->win32_event);
+#endif
+}
+
+void mp_cancel_trigger(struct mp_cancel *c)
+{
+ pthread_mutex_lock(&c->lock);
+ trigger_locked(c);
+ pthread_mutex_unlock(&c->lock);
+}
+
+void mp_cancel_reset(struct mp_cancel *c)
+{
+ pthread_mutex_lock(&c->lock);
+
+ atomic_store(&c->triggered, false);
+
+ if (c->wakeup_pipe[0] >= 0) {
+ // Flush it fully.
+ while (1) {
+ int r = read(c->wakeup_pipe[0], &(char[256]){0}, 256);
+ if (r <= 0 && !(r < 0 && errno == EINTR))
+ break;
+ }
+ }
+
+#ifdef __MINGW32__
+ if (c->win32_event)
+ ResetEvent(c->win32_event);
+#endif
+
+ pthread_mutex_unlock(&c->lock);
+}
+
+bool mp_cancel_test(struct mp_cancel *c)
+{
+ return c ? atomic_load_explicit(&c->triggered, memory_order_relaxed) : false;
+}
+
+bool mp_cancel_wait(struct mp_cancel *c, double timeout)
+{
+ struct timespec ts = mp_rel_time_to_timespec(timeout);
+ pthread_mutex_lock(&c->lock);
+ while (!mp_cancel_test(c)) {
+ if (pthread_cond_timedwait(&c->wakeup, &c->lock, &ts))
+ break;
+ }
+ pthread_mutex_unlock(&c->lock);
+
+ return mp_cancel_test(c);
+}
+
+// If a new notification mechanism was added, and the mp_cancel state was
+// already triggered, make sure the newly added mechanism is also triggered.
+static void retrigger_locked(struct mp_cancel *c)
+{
+ if (mp_cancel_test(c))
+ trigger_locked(c);
+}
+
+void mp_cancel_set_cb(struct mp_cancel *c, void (*cb)(void *ctx), void *ctx)
+{
+ pthread_mutex_lock(&c->lock);
+ c->cb = cb;
+ c->cb_ctx = ctx;
+ retrigger_locked(c);
+ pthread_mutex_unlock(&c->lock);
+}
+
+void mp_cancel_set_parent(struct mp_cancel *slave, struct mp_cancel *parent)
+{
+ // We can access c->parent without synchronization, because:
+ // - concurrent mp_cancel_set_parent() calls to slave are not allowed
+ // - slave->parent needs to stay valid as long as the slave exists
+ if (slave->parent == parent)
+ return;
+ if (slave->parent) {
+ pthread_mutex_lock(&slave->parent->lock);
+ LL_REMOVE(siblings, &slave->parent->slaves, slave);
+ pthread_mutex_unlock(&slave->parent->lock);
+ }
+ slave->parent = parent;
+ if (slave->parent) {
+ pthread_mutex_lock(&slave->parent->lock);
+ LL_APPEND(siblings, &slave->parent->slaves, slave);
+ retrigger_locked(slave->parent);
+ pthread_mutex_unlock(&slave->parent->lock);
+ }
+}
+
+int mp_cancel_get_fd(struct mp_cancel *c)
+{
+ pthread_mutex_lock(&c->lock);
+ if (c->wakeup_pipe[0] < 0) {
+ mp_make_wakeup_pipe(c->wakeup_pipe);
+ retrigger_locked(c);
+ }
+ pthread_mutex_unlock(&c->lock);
+
+
+ return c->wakeup_pipe[0];
+}
+
+#ifdef __MINGW32__
+void *mp_cancel_get_event(struct mp_cancel *c)
+{
+ pthread_mutex_lock(&c->lock);
+ if (!c->win32_event) {
+ c->win32_event = CreateEventW(NULL, TRUE, FALSE, NULL);
+ retrigger_locked(c);
+ }
+ pthread_mutex_unlock(&c->lock);
+
+ return c->win32_event;
+}
+#endif
diff --git a/misc/thread_tools.h b/misc/thread_tools.h
new file mode 100644
index 0000000000..89d84ce0b6
--- /dev/null
+++ b/misc/thread_tools.h
@@ -0,0 +1,82 @@
+#pragma once
+
+#include <stdint.h>
+#include <stdbool.h>
+#include <pthread.h>
+
+// This is basically a single-shot semaphore, intended as light-weight solution
+// for just making a thread wait for another thread.
+struct mp_waiter {
+ // All fields are considered private. Use MP_WAITER_INITIALIZER to init.
+ pthread_mutex_t lock;
+ pthread_cond_t wakeup;
+ bool done;
+ uintptr_t value;
+};
+
+// Initialize a mp_waiter object for use with mp_waiter_*().
+#define MP_WAITER_INITIALIZER { \
+ .lock = PTHREAD_MUTEX_INITIALIZER, \
+ .wakeup = PTHREAD_COND_INITIALIZER, \
+ }
+
+// Block until some other thread calls mp_waiter_wakeup(). The function returns
+// the value argument of that wakeup call. After this, the waiter object must
+// not be used anymore. Although you can reinit it with MP_WAITER_INITIALIZER
+// (then you must make sure nothing calls mp_waiter_wakeup() before this).
+uintptr_t mp_waiter_wait(struct mp_waiter *waiter);
+
+// Unblock the thread waiti