summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--misc/thread_tools.c188
-rw-r--r--misc/thread_tools.h17
2 files changed, 153 insertions, 52 deletions
diff --git a/misc/thread_tools.c b/misc/thread_tools.c
index b11ffd07e4..ecf6bc2381 100644
--- a/misc/thread_tools.c
+++ b/misc/thread_tools.c
@@ -26,8 +26,10 @@
#endif
#include "common/common.h"
+#include "misc/linked_list.h"
#include "osdep/atomic.h"
#include "osdep/io.h"
+#include "osdep/timer.h"
#include "thread_tools.h"
@@ -74,45 +76,118 @@ bool mp_waiter_poll(struct mp_waiter *waiter)
return r;
}
-#ifndef __MINGW32__
struct mp_cancel {
+ pthread_mutex_t lock;
+ pthread_cond_t wakeup;
+
+ // Semaphore state and "mirrors".
atomic_bool triggered;
+ void (*cb)(void *ctx);
+ void *cb_ctx;
int wakeup_pipe[2];
+ void *win32_event; // actually HANDLE
+
+ // Slave list. These are automatically notified as well.
+ struct {
+ struct mp_cancel *head, *tail;
+ } slaves;
+
+ // For slaves. Synchronization is managed by parent.lock!
+ struct mp_cancel *parent;
+ struct {
+ struct mp_cancel *next, *prev;
+ } siblings;
};
static void cancel_destroy(void *p)
{
struct mp_cancel *c = p;
+
+ assert(!c->slaves.head); // API user error
+
+ // We can access c->parent without synchronization, because:
+ // - since c is being destroyed, nobody can explicitly remove it as slave
+ // at the same time
+ // - c->parent needs to stay valid as long as the slave exists
+ if (c->parent)
+ mp_cancel_remove_slave(c->parent, c);
+
if (c->wakeup_pipe[0] >= 0) {
close(c->wakeup_pipe[0]);
close(c->wakeup_pipe[1]);
}
+
+#ifdef __MINGW32__
+ if (c->win32_event)
+ CloseHandle(c->win32_event);
+#endif
+
+ pthread_mutex_destroy(&c->lock);
+ pthread_cond_destroy(&c->wakeup);
}
struct mp_cancel *mp_cancel_new(void *talloc_ctx)
{
struct mp_cancel *c = talloc_ptrtype(talloc_ctx, c);
talloc_set_destructor(c, cancel_destroy);
- *c = (struct mp_cancel){.triggered = ATOMIC_VAR_INIT(false)};
- mp_make_wakeup_pipe(c->wakeup_pipe);
+ *c = (struct mp_cancel){
+ .triggered = ATOMIC_VAR_INIT(false),
+ .wakeup_pipe = {-1, -1},
+ };
+ pthread_mutex_init(&c->lock, NULL);
+ pthread_cond_init(&c->wakeup, NULL);
return c;
}
-void mp_cancel_trigger(struct mp_cancel *c)
+static void trigger_locked(struct mp_cancel *c)
{
atomic_store(&c->triggered, true);
- (void)write(c->wakeup_pipe[1], &(char){0}, 1);
+
+ pthread_cond_broadcast(&c->wakeup); // condition bound to c->triggered
+
+ if (c->cb)
+ c->cb(c->cb_ctx);
+
+ for (struct mp_cancel *sub = c->slaves.head; sub; sub = sub->siblings.next)
+ mp_cancel_trigger(sub);
+
+ if (c->wakeup_pipe[1] >= 0)
+ (void)write(c->wakeup_pipe[1], &(char){0}, 1);
+
+#ifdef __MINGW32__
+ if (c->win32_event)
+ SetEvent(c->win32_event);
+#endif
+}
+
+void mp_cancel_trigger(struct mp_cancel *c)
+{
+ pthread_mutex_lock(&c->lock);
+ trigger_locked(c);
+ pthread_mutex_unlock(&c->lock);
}
void mp_cancel_reset(struct mp_cancel *c)
{
+ pthread_mutex_lock(&c->lock);
+
atomic_store(&c->triggered, false);
- // Flush it fully.
- while (1) {
- int r = read(c->wakeup_pipe[0], &(char[256]){0}, 256);
- if (r <= 0 && !(r < 0 && errno == EINTR))
- break;
+
+ if (c->wakeup_pipe[0] >= 0) {
+ // Flush it fully.
+ while (1) {
+ int r = read(c->wakeup_pipe[0], &(char[256]){0}, 256);
+ if (r <= 0 && !(r < 0 && errno == EINTR))
+ break;
+ }
}
+
+#ifdef __MINGW32__
+ if (c->win32_event)
+ ResetEvent(c->win32_event);
+#endif
+
+ pthread_mutex_unlock(&c->lock);
}
bool mp_cancel_test(struct mp_cancel *c)
@@ -122,69 +197,78 @@ bool mp_cancel_test(struct mp_cancel *c)
bool mp_cancel_wait(struct mp_cancel *c, double timeout)
{
- struct pollfd fd = { .fd = c->wakeup_pipe[0], .events = POLLIN };
- poll(&fd, 1, timeout * 1000);
- return fd.revents & POLLIN;
-}
+ struct timespec ts = mp_rel_time_to_timespec(timeout);
+ pthread_mutex_lock(&c->lock);
+ while (!mp_cancel_test(c)) {
+ if (pthread_cond_timedwait(&c->wakeup, &c->lock, &ts))
+ break;
+ }
+ pthread_mutex_unlock(&c->lock);
-int mp_cancel_get_fd(struct mp_cancel *c)
-{
- return c->wakeup_pipe[0];
+ return mp_cancel_test(c);
}
-#else
-
-struct mp_cancel {
- atomic_bool triggered;
- HANDLE event;
-};
-
-static void cancel_destroy(void *p)
+// If a new notification mechanism was added, and the mp_cancel state was
+// already triggered, make sure the newly added mechanism is also triggered.
+static void retrigger_locked(struct mp_cancel *c)
{
- struct mp_cancel *c = p;
- CloseHandle(c->event);
+ if (mp_cancel_test(c))
+ trigger_locked(c);
}
-struct mp_cancel *mp_cancel_new(void *talloc_ctx)
+void mp_cancel_set_cb(struct mp_cancel *c, void (*cb)(void *ctx), void *ctx)
{
- struct mp_cancel *c = talloc_ptrtype(talloc_ctx, c);
- talloc_set_destructor(c, cancel_destroy);
- *c = (struct mp_cancel){.triggered = ATOMIC_VAR_INIT(false)};
- c->event = CreateEventW(NULL, TRUE, FALSE, NULL);
- return c;
+ pthread_mutex_lock(&c->lock);
+ c->cb = cb;
+ c->cb_ctx = ctx;
+ retrigger_locked(c);
+ pthread_mutex_unlock(&c->lock);
}
-void mp_cancel_trigger(struct mp_cancel *c)
+void mp_cancel_add_slave(struct mp_cancel *c, struct mp_cancel *slave)
{
- atomic_store(&c->triggered, true);
- SetEvent(c->event);
+ pthread_mutex_lock(&c->lock);
+ assert(!slave->parent);
+ slave->parent = c;
+ LL_APPEND(siblings, &c->slaves, slave);
+ retrigger_locked(c);
+ pthread_mutex_unlock(&c->lock);
}
-void mp_cancel_reset(struct mp_cancel *c)
+void mp_cancel_remove_slave(struct mp_cancel *c, struct mp_cancel *slave)
{
- atomic_store(&c->triggered, false);
- ResetEvent(c->event);
+ pthread_mutex_lock(&c->lock);
+ if (slave->parent) {
+ assert(slave->parent == c);
+ slave->parent = NULL;
+ LL_REMOVE(siblings, &c->slaves, slave);
+ }
+ pthread_mutex_unlock(&c->lock);
}
-bool mp_cancel_test(struct mp_cancel *c)
+int mp_cancel_get_fd(struct mp_cancel *c)
{
- return c ? atomic_load_explicit(&c->triggered, memory_order_relaxed) : false;
-}
+ pthread_mutex_lock(&c->lock);
+ if (c->wakeup_pipe[0] < 0) {
+ mp_make_wakeup_pipe(c->wakeup_pipe);
+ retrigger_locked(c);
+ }
+ pthread_mutex_unlock(&c->lock);
-bool mp_cancel_wait(struct mp_cancel *c, double timeout)
-{
- return WaitForSingleObject(c->event, timeout < 0 ? INFINITE : timeout * 1000)
- == WAIT_OBJECT_0;
+
+ return c->wakeup_pipe[0];
}
+#ifdef __MINGW32__
void *mp_cancel_get_event(struct mp_cancel *c)
{
- return c->event;
-}
+ pthread_mutex_lock(&c->lock);
+ if (!c->win32_event) {
+ c->win32_event = CreateEventW(NULL, TRUE, FALSE, NULL);
+ retrigger_locked(c);
+ }
+ pthread_mutex_unlock(&c->lock);
-int mp_cancel_get_fd(struct mp_cancel *c)
-{
- return -1;
+ return c->win32_event;
}
-
#endif
diff --git a/misc/thread_tools.h b/misc/thread_tools.h
index a734ac85b0..2198181e6c 100644
--- a/misc/thread_tools.h
+++ b/misc/thread_tools.h
@@ -59,6 +59,23 @@ bool mp_cancel_wait(struct mp_cancel *c, double timeout);
// Restore original state. (Allows reusing a mp_cancel.)
void mp_cancel_reset(struct mp_cancel *c);
+// Add a callback to invoke when mp_cancel gets triggered. If it's already
+// triggered, call it from mp_cancel_add_cb() directly. May be called multiple
+// times even if the trigger state changes; not called if it resets. In all
+// cases, this may be called with internal locks held (either in mp_cancel, or
+// other locks held by whoever calls mp_cancel_trigger()).
+// There is only one callback. Create a slave mp_cancel to get a private one.
+void mp_cancel_set_cb(struct mp_cancel *c, void (*cb)(void *ctx), void *ctx);
+
+// If c gets triggered, automatically trigger slave. Trying to add a slave more
+// than once or to multiple parents is undefined behavior.
+// The parent mp_cancel must remain valid until the slave is manually removed
+// or destroyed. Destroying a mp_cancel that still has slaves is an error.
+void mp_cancel_add_slave(struct mp_cancel *c, struct mp_cancel *slave);
+
+// Undo mp_cancel_add_slave(). Ignores never added slaves for easier cleanup.
+void mp_cancel_remove_slave(struct mp_cancel *c, struct mp_cancel *slave);
+
// win32 "Event" HANDLE that indicates the current mp_cancel state.
void *mp_cancel_get_event(struct mp_cancel *c);