diff options
author | Anton Kindestam <antonki@kth.se> | 2018-12-05 19:02:03 +0100 |
---|---|---|
committer | Anton Kindestam <antonki@kth.se> | 2018-12-05 19:19:24 +0100 |
commit | 8b83c8996686072bc743b112ae5cb3bf93aa33ed (patch) | |
tree | b09ce6a7ff470b05006622f19914b3d39d2f7d9f /misc | |
parent | 5bcac8580df6fc62323136f756a3a6d1e754fe9c (diff) | |
parent | 559a400ac36e75a8d73ba263fd7fa6736df1c2da (diff) | |
download | mpv-8b83c8996686072bc743b112ae5cb3bf93aa33ed.tar.bz2 mpv-8b83c8996686072bc743b112ae5cb3bf93aa33ed.tar.xz |
Merge commit '559a400ac36e75a8d73ba263fd7fa6736df1c2da' into wm4-commits--merge-edition
This bumps libmpv version to 1.103
Diffstat (limited to 'misc')
-rw-r--r-- | misc/json.c | 59 | ||||
-rw-r--r-- | misc/linked_list.h | 107 | ||||
-rw-r--r-- | misc/node.c | 65 | ||||
-rw-r--r-- | misc/node.h | 3 | ||||
-rw-r--r-- | misc/thread_pool.c | 200 | ||||
-rw-r--r-- | misc/thread_pool.h | 28 | ||||
-rw-r--r-- | misc/thread_tools.c | 269 | ||||
-rw-r--r-- | misc/thread_tools.h | 82 |
8 files changed, 752 insertions, 61 deletions
diff --git a/misc/json.c b/misc/json.c index 4797fde4d0..d1b2afddb6 100644 --- a/misc/json.c +++ b/misc/json.c @@ -22,9 +22,14 @@ * doesn't verify what's passed to strtod(), and also prefers parsing numbers * as integers with stroll() if possible). * - * Does not support extensions like unquoted string literals. + * It has some non-standard extensions which shouldn't conflict with JSON: + * - a list or object item can have a trailing "," + * - object syntax accepts "=" in addition of ":" + * - object keys can be unquoted, if they start with a character in [A-Za-z_] + * and contain only characters in [A-Za-z0-9_] + * - byte escapes with "\xAB" are allowed (with AB being a 2 digit hex number) * - * Also see: http://tools.ietf.org/html/rfc4627 + * Also see: http://tools.ietf.org/html/rfc8259 * * JSON writer: * @@ -34,9 +39,6 @@ * to deal with somehow: either by using byte-strings for JSON, or by running * a "fixup" pass on the input data. The latter could for example change * invalid UTF-8 sequences to replacement characters. - * - * Currently, will insert \u literals for characters 0-31, '"', '\', and write - * everything else literally. */ #include <stdlib.h> @@ -48,6 +50,7 @@ #include "common/common.h" #include "misc/bstr.h" +#include "misc/ctype.h" #include "json.h" @@ -75,6 +78,24 @@ void json_skip_whitespace(char **src) eat_ws(src); } +static int read_id(void *ta_parent, struct mpv_node *dst, char **src) +{ + char *start = *src; + if (!mp_isalpha(**src) && **src != '_') + return -1; + while (mp_isalnum(**src) || **src == '_') + *src += 1; + if (**src == ' ') { + **src = '\0'; // we're allowed to mutate it => can avoid the strndup + *src += 1; + } else { + start = talloc_strndup(ta_parent, start, *src - start); + } + dst->format = MPV_FORMAT_STRING; + dst->u.string = start; + return 0; +} + static int read_str(void *ta_parent, struct mpv_node *dst, char **src) { if (!eat_c(src, '"')) @@ -125,12 +146,18 @@ static int read_sub(void *ta_parent, struct mpv_node *dst, char **src, if (list->num > 0 && !eat_c(src, ',')) return -1; // missing ',' eat_ws(src); + // non-standard extension: allow a trailing "," + if (eat_c(src, term)) + break; if (is_obj) { struct mpv_node keynode; - if (read_str(list, &keynode, src) < 0) + // non-standard extension: allow unquoted strings as keys + if (read_id(list, &keynode, src) < 0 && + read_str(list, &keynode, src) < 0) return -1; // key is not a string eat_ws(src); - if (!eat_c(src, ':')) + // non-standard extension: allow "=" instead of ":" + if (!eat_c(src, ':') && !eat_c(src, '=')) return -1; // ':' missing eat_ws(src); MP_TARRAY_GROW(list, list->keys, list->num); @@ -218,6 +245,14 @@ int json_parse(void *ta_parent, struct mpv_node *dst, char **src, int max_depth) #define APPEND(b, s) bstr_xappend(NULL, (b), bstr0(s)) +static const char special_escape[] = { + ['\b'] = 'b', + ['\f'] = 'f', + ['\n'] = 'n', + ['\r'] = 'r', + ['\t'] = 't', +}; + static void write_json_str(bstr *b, unsigned char *str) { APPEND(b, "\""); @@ -228,7 +263,15 @@ static void write_json_str(bstr *b, unsigned char *str) if (!cur[0]) break; bstr_xappend(NULL, b, (bstr){str, cur - str}); - bstr_xappend_asprintf(NULL, b, "\\u%04x", (unsigned char)cur[0]); + if (cur[0] == '\"') { + bstr_xappend(NULL, b, (bstr){"\\\"", 2}); + } else if (cur[0] == '\\') { + bstr_xappend(NULL, b, (bstr){"\\\\", 2}); + } else if (cur[0] < sizeof(special_escape) && special_escape[cur[0]]) { + bstr_xappend_asprintf(NULL, b, "\\%c", special_escape[cur[0]]); + } else { + bstr_xappend_asprintf(NULL, b, "\\u%04x", (unsigned char)cur[0]); + } str = cur + 1; } APPEND(b, str); diff --git a/misc/linked_list.h b/misc/linked_list.h new file mode 100644 index 0000000000..b43b227d90 --- /dev/null +++ b/misc/linked_list.h @@ -0,0 +1,107 @@ +#pragma once + +#include <stddef.h> + +/* + * Doubly linked list macros. All of these require that each list item is a + * struct, that contains a field, that is another struct with prev/next fields: + * + * struct example_item { + * struct { + * struct example_item *prev, *next; + * } mylist; + * }; + * + * And a struct somewhere that represents the "list" and has head/tail fields: + * + * struct { + * struct example_item *head, *tail; + * } mylist_var; + * + * Then you can e.g. insert elements like this: + * + * struct example_item item; + * LL_APPEND(mylist, &mylist_var, &item); + * + * The first macro argument is always the name if the field in the item that + * contains the prev/next pointers, in this case struct example_item.mylist. + * This was done so that a single item can be in multiple lists. + * + * The list is started/terminated with NULL. Nothing ever points _to_ the + * list head, so the list head memory location can be safely moved. + * + * General rules are: + * - list head is initialized by setting head/tail to NULL + * - list items do not need to be initialized before inserting them + * - next/prev fields of list items are not cleared when they are removed + * - there's no way to know whether an item is in the list or not (unless + * you clear prev/next on init/removal, _and_ check whether items with + * prev/next==NULL are referenced by head/tail) + */ + +// Insert item at the end of the list (list->tail == item). +// Undefined behavior if item is already in the list. +#define LL_APPEND(field, list, item) do { \ + (item)->field.prev = (list)->tail; \ + (item)->field.next = NULL; \ + LL_RELINK_(field, list, item) \ +} while (0) + +// Insert item enew after eprev (i.e. eprev->next == enew). If eprev is NULL, +// then insert it as head (list->head == enew). +// Undefined behavior if enew is already in the list, or eprev isn't. +#define LL_INSERT_AFTER(field, list, eprev, enew) do { \ + (enew)->field.prev = (eprev); \ + (enew)->field.next = (eprev) ? (eprev)->field.next : (list)->head; \ + LL_RELINK_(field, list, enew) \ +} while (0) + +// Insert item at the start of the list (list->head == item). +// Undefined behavior if item is already in the list. +#define LL_PREPEND(field, list, item) do { \ + (item)->field.prev = NULL; \ + (item)->field.next = (list)->head; \ + LL_RELINK_(field, list, item) \ +} while (0) + +// Insert item enew before enext (i.e. enew->next == enext). If enext is NULL, +// then insert it as tail (list->tail == enew). +// Undefined behavior if enew is already in the list, or enext isn't. +#define LL_INSERT_BEFORE(field, list, enext, enew) do { \ + (enew)->field.prev = (enext) ? (enext)->field.prev : (list)->tail; \ + (enew)->field.next = (enext); \ + LL_RELINK_(field, list, enew) \ +} while (0) + +// Remove the item from the list. +// Undefined behavior if item is not in the list. +#define LL_REMOVE(field, list, item) do { \ + if ((item)->field.prev) { \ + (item)->field.prev->field.next = (item)->field.next; \ + } else { \ + (list)->head = (item)->field.next; \ + } \ + if ((item)->field.next) { \ + (item)->field.next->field.prev = (item)->field.prev; \ + } else { \ + (list)->tail = (item)->field.prev; \ + } \ +} while (0) + +// Remove all items from the list. +#define LL_CLEAR(field, list) do { \ + (list)->head = (list)->tail = NULL; \ +} while (0) + +// Internal helper. +#define LL_RELINK_(field, list, item) \ + if ((item)->field.prev) { \ + (item)->field.prev->field.next = (item); \ + } else { \ + (list)->head = (item); \ + } \ + if ((item)->field.next) { \ + (item)->field.next->field.prev = (item); \ + } else { \ + (list)->tail = (item); \ + } diff --git a/misc/node.c b/misc/node.c index b7bf06d9c1..9b45291a5f 100644 --- a/misc/node.c +++ b/misc/node.c @@ -81,3 +81,68 @@ void node_map_add_flag(struct mpv_node *dst, const char *key, bool v) { node_map_add(dst, key, MPV_FORMAT_FLAG)->u.flag = v; } + +mpv_node *node_map_get(mpv_node *src, const char *key) +{ + if (src->format != MPV_FORMAT_NODE_MAP) + return NULL; + + for (int i = 0; i < src->u.list->num; i++) { + if (strcmp(key, src->u.list->keys[i]) == 0) + return &src->u.list->values[i]; + } + + return NULL; +} + +// Note: for MPV_FORMAT_NODE_MAP, this (incorrectly) takes the order into +// account, instead of treating it as set. +bool equal_mpv_value(const void *a, const void *b, mpv_format format) +{ + switch (format) { + case MPV_FORMAT_NONE: + return true; + case MPV_FORMAT_STRING: + case MPV_FORMAT_OSD_STRING: + return strcmp(*(char **)a, *(char **)b) == 0; + case MPV_FORMAT_FLAG: + return *(int *)a == *(int *)b; + case MPV_FORMAT_INT64: + return *(int64_t *)a == *(int64_t *)b; + case MPV_FORMAT_DOUBLE: + return *(double *)a == *(double *)b; + case MPV_FORMAT_NODE: + return equal_mpv_node(a, b); + case MPV_FORMAT_BYTE_ARRAY: { + const struct mpv_byte_array *a_r = a, *b_r = b; + if (a_r->size != b_r->size) + return false; + return memcmp(a_r->data, b_r->data, a_r->size) == 0; + } + case MPV_FORMAT_NODE_ARRAY: + case MPV_FORMAT_NODE_MAP: + { + mpv_node_list *l_a = *(mpv_node_list **)a, *l_b = *(mpv_node_list **)b; + if (l_a->num != l_b->num) + return false; + for (int n = 0; n < l_a->num; n++) { + if (format == MPV_FORMAT_NODE_MAP) { + if (strcmp(l_a->keys[n], l_b->keys[n]) != 0) + return false; + } + if (!equal_mpv_node(&l_a->values[n], &l_b->values[n])) + return false; + } + return true; + } + } + abort(); // supposed to be able to handle all defined types +} + +// Remarks see equal_mpv_value(). +bool equal_mpv_node(const struct mpv_node *a, const struct mpv_node *b) +{ + if (a->format != b->format) + return false; + return equal_mpv_value(&a->u, &b->u, a->format); +} diff --git a/misc/node.h b/misc/node.h index a1bdab0ae1..419f3fc505 100644 --- a/misc/node.h +++ b/misc/node.h @@ -10,5 +10,8 @@ void node_map_add_string(struct mpv_node *dst, const char *key, const char *val) void node_map_add_int64(struct mpv_node *dst, const char *key, int64_t v); void node_map_add_double(struct mpv_node *dst, const char *key, double v); void node_map_add_flag(struct mpv_node *dst, const char *key, bool v); +mpv_node *node_map_get(mpv_node *src, const char *key); +bool equal_mpv_value(const void *a, const void *b, mpv_format format); +bool equal_mpv_node(const struct mpv_node *a, const struct mpv_node *b); #endif diff --git a/misc/thread_pool.c b/misc/thread_pool.c index dddfad6734..217c990c19 100644 --- a/misc/thread_pool.c +++ b/misc/thread_pool.c @@ -1,40 +1,51 @@ -/* - * This file is part of mpv. +/* Copyright (C) 2018 the mpv developers * - * mpv is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License as published by the Free Software Foundation; either - * version 2.1 of the License, or (at your option) any later version. + * Permission to use, copy, modify, and/or distribute this software for any + * purpose with or without fee is hereby granted, provided that the above + * copyright notice and this permission notice appear in all copies. * - * mpv is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Lesser General Public License for more details. - * - * You should have received a copy of the GNU Lesser General Public - * License along with mpv. If not, see <http://www.gnu.org/licenses/>. + * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. */ #include <pthread.h> #include "common/common.h" +#include "osdep/threads.h" +#include "osdep/timer.h" #include "thread_pool.h" +// Threads destroy themselves after this many seconds, if there's no new work +// and the thread count is above the configured minimum. +#define DESTROY_TIMEOUT 10 + struct work { void (*fn)(void *ctx); void *fn_ctx; }; struct mp_thread_pool { - pthread_t *threads; - int num_threads; + int min_threads, max_threads; pthread_mutex_t lock; pthread_cond_t wakeup; // --- the following fields are protected by lock + + pthread_t *threads; + int num_threads; + + // Number of threads which have taken up work and are still processing it. + int busy_threads; + bool terminate; + struct work *work; int num_work; }; @@ -43,25 +54,61 @@ static void *worker_thread(void *arg) { struct mp_thread_pool *pool = arg; + mpthread_set_name("worker"); + pthread_mutex_lock(&pool->lock); - while (1) { - while (!pool->num_work && !pool->terminate) - pthread_cond_wait(&pool->wakeup, &pool->lock); - if (!pool->num_work && pool->terminate) - break; + struct timespec ts = {0}; + bool got_timeout = false; + while (1) { + struct work work = {0}; + if (pool->num_work > 0) { + work = pool->work[pool->num_work - 1]; + pool->num_work -= 1; + } - assert(pool->num_work > 0); - struct work work = pool->work[pool->num_work - 1]; - pool->num_work -= 1; + if (!work.fn) { + if (got_timeout || pool->terminate) + break; + + if (pool->num_threads > pool->min_threads) { + if (!ts.tv_sec && !ts.tv_nsec) + ts = mp_rel_time_to_timespec(DESTROY_TIMEOUT); + if (pthread_cond_timedwait(&pool->wakeup, &pool->lock, &ts)) + got_timeout = pool->num_threads > pool->min_threads; + } else { + pthread_cond_wait(&pool->wakeup, &pool->lock); + } + continue; + } + pool->busy_threads += 1; pthread_mutex_unlock(&pool->lock); + work.fn(work.fn_ctx); + pthread_mutex_lock(&pool->lock); + pool->busy_threads -= 1; + + ts = (struct timespec){0}; + got_timeout = false; + } + + // If no termination signal was given, it must mean we died because of a + // timeout, and nobody is waiting for us. We have to remove ourselves. + if (!pool->terminate) { + for (int n = 0; n < pool->num_threads; n++) { + if (pthread_equal(pool->threads[n], pthread_self())) { + pthread_detach(pthread_self()); + MP_TARRAY_REMOVE_AT(pool->threads, pool->num_threads, n); + pthread_mutex_unlock(&pool->lock); + return NULL; + } + } + assert(0); } - assert(pool->num_work == 0); - pthread_mutex_unlock(&pool->lock); + pthread_mutex_unlock(&pool->lock); return NULL; } @@ -69,27 +116,46 @@ static void thread_pool_dtor(void *ctx) { struct mp_thread_pool *pool = ctx; + pthread_mutex_lock(&pool->lock); + pool->terminate = true; pthread_cond_broadcast(&pool->wakeup); + + pthread_t *threads = pool->threads; + int num_threads = pool->num_threads; + + pool->threads = NULL; + pool->num_threads = 0; + pthread_mutex_unlock(&pool->lock); - for (int n = 0; n < pool->num_threads; n++) - pthread_join(pool->threads[n], NULL); + for (int n = 0; n < num_threads; n++) + pthread_join(threads[n], NULL); assert(pool->num_work == 0); + assert(pool->num_threads == 0); pthread_cond_destroy(&pool->wakeup); pthread_mutex_destroy(&pool->lock); } -// Create a thread pool with the given number of worker threads. This can return -// NULL if the worker threads could not be created. The thread pool can be -// destroyed with talloc_free(pool), or indirectly with talloc_free(ta_parent). -// If there are still work items on freeing, it will block until all work items -// are done, and the threads terminate. -struct mp_thread_pool *mp_thread_pool_create(void *ta_parent, int threads) +static bool add_thread(struct mp_thread_pool *pool) { - assert(threads > 0); + pthread_t thread; + + if (pthread_create(&thread, NULL, worker_thread, pool) != 0) + return false; + + MP_TARRAY_APPEND(pool, pool->threads, pool->num_threads, thread); + return true; +} + +struct mp_thread_pool *mp_thread_pool_create(void *ta_parent, int init_threads, + int min_threads, int max_threads) +{ + assert(min_threads >= 0); + assert(init_threads <= min_threads); + assert(max_threads > 0 && max_threads >= min_threads); struct mp_thread_pool *pool = talloc_zero(ta_parent, struct mp_thread_pool); talloc_set_destructor(pool, thread_pool_dtor); @@ -97,29 +163,61 @@ struct mp_thread_pool *mp_thread_pool_create(void *ta_parent, int threads) pthread_mutex_init(&pool->lock, NULL); pthread_cond_init(&pool->wakeup, NULL); - for (int n = 0; n < threads; n++) { - pthread_t thread; - if (pthread_create(&thread, NULL, worker_thread, pool)) { - talloc_free(pool); - return NULL; - } - MP_TARRAY_APPEND(pool, pool->threads, pool->num_threads, thread); - } + pool->min_threads = min_threads; + pool->max_threads = max_threads; + + pthread_mutex_lock(&pool->lock); + for (int n = 0; n < init_threads; n++) + add_thread(pool); + bool ok = pool->num_threads >= init_threads; + pthread_mutex_unlock(&pool->lock); + + if (!ok) + TA_FREEP(&pool); return pool; } -// Queue a function to be run on a worker thread: fn(fn_ctx) -// If no worker thread is currently available, it's appended to a list in memory -// with unbounded size. This function always returns immediately. -// Concurrent queue calls are allowed, as long as it does not overlap with -// pool destruction. -void mp_thread_pool_queue(struct mp_thread_pool *pool, void (*fn)(void *ctx), - void *fn_ctx) +static bool thread_pool_add(struct mp_thread_pool *pool, void (*fn)(void *ctx), + void *fn_ctx, bool allow_queue) { + bool ok = true; + + assert(fn); + pthread_mutex_lock(&pool->lock); struct work work = {fn, fn_ctx}; - MP_TARRAY_INSERT_AT(pool, pool->work, pool->num_work, 0, work); - pthread_cond_signal(&pool->wakeup); + + // If there are not enough threads to process all at once, but we can + // create a new thread, then do so. If work is queued quickly, it can + // happen that not all available threads have picked up work yet (up to + // num_threads - busy_threads threads), which has to be accounted for. + if (pool->busy_threads + pool->num_work + 1 > pool->num_threads && + pool->num_threads < pool->max_threads) + { + if (!add_thread(pool)) { + // If we can queue it, it'll get done as long as there is 1 thread. + ok = allow_queue && pool->num_threads > 0; + } + } + + if (ok) { + MP_TARRAY_INSERT_AT(pool, pool->work, pool->num_work, 0, work); + pthread_cond_signal(&pool->wakeup); + } + pthread_mutex_unlock(&pool->lock); + return ok; +} + +bool mp_thread_pool_queue(struct mp_thread_pool *pool, void (*fn)(void *ctx), + void *fn_ctx) +{ + return thread_pool_add(pool, fn, fn_ctx, true); +} + +bool mp_thread_pool_run(struct mp_thread_pool *pool, void (*fn)(void *ctx), + void *fn_ctx) +{ + return thread_pool_add(pool, fn, fn_ctx, false); } diff --git a/misc/thread_pool.h b/misc/thread_pool.h index c7af7b2b57..14954da58f 100644 --- a/misc/thread_pool.h +++ b/misc/thread_pool.h @@ -3,8 +3,32 @@ struct mp_thread_pool; -struct mp_thread_pool *mp_thread_pool_create(void *ta_parent, int threads); -void mp_thread_pool_queue(struct mp_thread_pool *pool, void (*fn)(void *ctx), +// Create a thread pool with the given number of worker threads. This can return +// NULL if the worker threads could not be created. The thread pool can be +// destroyed with talloc_free(pool), or indirectly with talloc_free(ta_parent). +// If there are still work items on freeing, it will block until all work items +// are done, and the threads terminate. +// init_threads is the number of threads created in this function (and it fails +// if it could not be done). min_threads must be >=, if it's >, then the +// remaining threads will be created on demand, but never destroyed. +// If init_threads > 0, then mp_thread_pool_queue() can never fail. +// If init_threads == 0, mp_thread_pool_create() itself can never fail. +struct mp_thread_pool *mp_thread_pool_create(void *ta_parent, int init_threads, + int min_threads, int max_threads); + +// Queue a function to be run on a worker thread: fn(fn_ctx) +// If no worker thread is currently available, it's appended to a list in memory +// with unbounded size. This function always returns immediately. +// Concurrent queue calls are allowed, as long as it does not overlap with +// pool destruction. +// This function is explicitly thread-safe. +// Cannot fail if thread pool was created with at least 1 thread. +bool mp_thread_pool_queue(struct mp_thread_pool *pool, void (*fn)(void *ctx), void *fn_ctx); +// Like mp_thread_pool_queue(), but only queue the item and succeed if a thread +// can be reserved for the item (i.e. minimal wait time instead of unbounded). +bool mp_thread_pool_run(struct mp_thread_pool *pool, void (*fn)(void *ctx), + void *fn_ctx); + #endif diff --git a/misc/thread_tools.c b/misc/thread_tools.c new file mode 100644 index 0000000000..91b774eb93 --- /dev/null +++ b/misc/thread_tools.c @@ -0,0 +1,269 @@ +/* Copyright (C) 2018 the mpv developers + * + * Permission to use, copy, modify, and/or distribute this software for any + * purpose with or without fee is hereby granted, provided that the above + * copyright notice and this permission notice appear in all copies. + * + * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + */ + +#include <assert.h> +#include <string.h> +#include <sys/types.h> +#include <unistd.h> +#include <errno.h> + +#ifdef __MINGW32__ +#include <windows.h> +#else +#include <poll.h> +#endif + +#include "common/common.h" +#include "misc/linked_list.h" +#include "osdep/atomic.h" +#include "osdep/io.h" +#include "osdep/timer.h" + +#include "thread_tools.h" + +uintptr_t mp_waiter_wait(struct mp_waiter *waiter) +{ + pthread_mutex_lock(&waiter->lock); + while (!waiter->done) + pthread_cond_wait(&waiter->wakeup, &waiter->lock); + pthread_mutex_unlock(&waiter->lock); + + uintptr_t ret = waiter->value; + + // We document that after mp_waiter_wait() the waiter object becomes + // invalid. (It strictly returns only after mp_waiter_wakeup() has returned, + // and the object is "single-shot".) So destroy it here. + + // Normally, we expect that the system uses futexes, in which case the + // following functions will do nearly nothing. This is true for Windows + // and Linux. But some lesser OSes still might allocate kernel objects + // when initializing mutexes, so destroy them here. + pthread_mutex_destroy(&waiter->lock); + pthread_cond_destroy(&waiter->wakeup); + + memset(waiter, 0xCA, sizeof(*waiter)); // for debugging + + return ret; +} + +void mp_waiter_wakeup(struct mp_waiter *waiter, uintptr_t value) +{ + pthread_mutex_lock(&waiter->lock); + assert(!waiter->done); + waiter->done = true; + waiter->value = value; + pthread_cond_signal(&waiter->wakeup); + pthread_mutex_unlock(&waiter->lock); +} + +bool mp_waiter_poll(struct mp_waiter *waiter) +{ + pthread_mutex_lock(&waiter->lock); + bool r = waiter->done; + pthread_mutex_unlock(&waiter->lock); + return r; +} + +struct mp_cancel { + pthread_mutex_t lock; + pthread_cond_t wakeup; + + // Semaphore state and "mirrors". + atomic_bool triggered; + void (*cb)(void *ctx); + void *cb_ctx; + int wakeup_pipe[2]; + void *win32_event; // actually HANDLE + + // Slave list. These are automatically notified as well. + struct { + struct mp_cancel *head, *tail; + } slaves; + + // For slaves. Synchronization is managed by parent.lock! + struct mp_cancel *parent; + struct { + struct mp_cancel *next, *prev; + } siblings; +}; + +static void cancel_destroy(void *p) +{ + struct mp_cancel *c = p; + + assert(!c->slaves.head); // API user error + + mp_cancel_set_parent(c, NULL); + + if (c->wakeup_pipe[0] >= 0) { + close(c->wakeup_pipe[0]); + close(c->wakeup_pipe[1]); + } + +#ifdef __MINGW32__ + if (c->win32_event) + CloseHandle(c->win32_event); +#endif + + pthread_mutex_destroy(&c->lock); + pthread_cond_destroy(&c->wakeup); +} + +struct mp_cancel *mp_cancel_new(void *talloc_ctx) +{ + struct mp_cancel *c = talloc_ptrtype(talloc_ctx, c); + talloc_set_destructor(c, cancel_destroy); + *c = (struct mp_cancel){ + .triggered = ATOMIC_VAR_INIT(false), + .wakeup_pipe = {-1, -1}, + }; + pthread_mutex_init(&c->lock, NULL); + pthread_cond_init(&c->wakeup, NULL); + return c; +} + +static void trigger_locked(struct mp_cancel *c) +{ + atomic_store(&c->triggered, true); + + pthread_cond_broadcast(&c->wakeup); // condition bound to c->triggered + + if (c->cb) + c->cb(c->cb_ctx); + + for (struct mp_cancel *sub = c->slaves.head; sub; sub = sub->siblings.next) + mp_cancel_trigger(sub); + + if (c->wakeup_pipe[1] >= 0) + (void)write(c->wakeup_pipe[1], &(char){0}, 1); + +#ifdef __MINGW32__ + if (c->win32_event) + SetEvent(c->win32_event); +#endif +} + +void mp_cancel_trigger(struct mp_cancel *c) +{ + pthread_mutex_lock(&c->lock); + trigger_locked(c); + pthread_mutex_unlock(&c->lock); +} + +void mp_cancel_reset(struct mp_cancel *c) +{ + pthread_mutex_lock(&c->lock); + + atomic_store(&c->triggered, false); + + if (c->wakeup_pipe[0] >= 0) { + // Flush it fully. + while (1) { + int r = read(c->wakeup_pipe[0], &(char[256]){0}, 256); + if (r <= 0 && !(r < 0 && errno == EINTR)) + break; + } + } + +#ifdef __MINGW32__ + if (c->win32_event) + ResetEvent(c->win32_event); +#endif + + pthread_mutex_unlock(&c->lock); +} + +bool mp_cancel_test(struct mp_cancel *c) +{ + return c ? atomic_load_explicit(&c->triggered, memory_order_relaxed) : false; +} + +bool mp_cancel_wait(struct mp_cancel *c, double timeout) +{ + struct timespec ts = mp_rel_time_to_timespec(timeout); + pthread_mutex_lock(&c->lock); + while (!mp_cancel_test(c)) { + if (pthread_cond_timedwait(&c->wakeup, &c->lock, &ts)) + break; + } + pthread_mutex_unlock(&c->lock); + + return mp_cancel_test(c); +} + +// If a new notification mechanism was added, and the mp_cancel state was +// already triggered, make sure the newly added mechanism is also triggered. +static void retrigger_locked(struct mp_cancel *c) +{ + if (mp_cancel_test(c)) + trigger_locked(c); +} + +void mp_cancel_set_cb(struct mp_cancel *c, void (*cb)(void *ctx), void *ctx) +{ + pthread_mutex_lock(&c->lock); + c->cb = cb; + c->cb_ctx = ctx; + retrigger_locked(c); + pthread_mutex_unlock(&c->lock); +} + +void mp_cancel_set_parent(struct mp_cancel *slave, struct mp_cancel *parent) +{ + // We can access c->parent without synchronization, because: + // - concurrent mp_cancel_set_parent() calls to slave are not allowed + // - slave->parent needs to stay valid as long as the slave exists + if (slave->parent == parent) + return; + if (slave->parent) { + pthread_mutex_lock(&slave->parent->lock); + LL_REMOVE(siblings, &slave->parent->slaves, slave); + pthread_mutex_unlock(&slave->parent->lock); + } + slave->parent = parent; + if (slave->parent) { + pthread_mutex_lock(&slave->parent->lock); + LL_APPEND(siblings, &slave->parent->slaves, slave); + retrigger_locked(slave->parent); + pthread_mutex_unlock(&slave->parent->lock); + } +} + +int mp_cancel_get_fd(struct mp_cancel *c) +{ + pthread_mutex_lock(&c->lock); + if (c->wakeup_pipe[0] < 0) { + mp_make_wakeup_pipe(c->wakeup_pipe); + retrigger_locked(c); + } + pthread_mutex_unlock(&c->lock); + + + return c->wakeup_pipe[0]; +} + +#ifdef __MINGW32__ +void *mp_cancel_get_event(struct mp_cancel *c) +{ + pthread_mutex_lock(&c->lock); + if (!c->win32_event) { + c->win32_event = CreateEventW(NULL, TRUE, FALSE, NULL); + retrigger_locked(c); + } + pthread_mutex_unlock(&c->lock); + + return |