diff options
-rw-r--r-- | filters/filter.c | 42 | ||||
-rw-r--r-- | filters/filter.h | 25 |
2 files changed, 66 insertions, 1 deletions
diff --git a/filters/filter.c b/filters/filter.c index dfe6df953a..a600b7dec4 100644 --- a/filters/filter.c +++ b/filters/filter.c @@ -1,8 +1,11 @@ +#include <math.h> #include <pthread.h> #include "common/common.h" #include "common/global.h" #include "common/msg.h" +#include "osdep/atomic.h" +#include "osdep/timer.h" #include "video/hwdec.h" #include "filter.h" @@ -65,6 +68,9 @@ struct filter_runner { struct mp_filter *root_filter; + double max_run_time; + atomic_bool interrupt_flag; + // If we're currently running the filter graph (for avoiding recursion). bool filtering; @@ -177,6 +183,10 @@ bool mp_filter_run(struct mp_filter *filter) { struct filter_runner *r = filter->in->runner; + int64_t end_time = 0; + if (isfinite(r->max_run_time)) + end_time = mp_add_timeout(mp_time_us(), MPMAX(r->max_run_time, 0)); + // (could happen with separate filter graphs calling each other, for now // ignore this issue as we don't use such a setup anywhere) assert(!r->filtering); @@ -187,13 +197,30 @@ bool mp_filter_run(struct mp_filter *filter) // to queue a wakeup again later. So do not call this in the loop. flush_async_notifications(r); - while (r->num_pending) { + while (1) { + if (atomic_exchange_explicit(&r->interrupt_flag, false, + memory_order_acq_rel)) + { + pthread_mutex_lock(&r->async_lock); + if (!r->async_wakeup_sent && r->wakeup_cb) + r->wakeup_cb(r->wakeup_ctx); + r->async_wakeup_sent = true; + pthread_mutex_unlock(&r->async_lock); + break; + } + + if (!r->num_pending) + break; + struct mp_filter *next = r->pending[r->num_pending - 1]; r->num_pending -= 1; next->in->pending = false; if (next->in->info->process) next->in->info->process(next); + + if (end_time && mp_time_us() >= end_time) + mp_filter_graph_interrupt(r->root_filter); } r->filtering = false; @@ -644,6 +671,18 @@ void mp_filter_mark_async_progress(struct mp_filter *f) filter_wakeup(f, true); } +void mp_filter_graph_set_max_run_time(struct mp_filter *f, double seconds) +{ + struct filter_runner *r = f->in->runner; + r->max_run_time = seconds; +} + +void mp_filter_graph_interrupt(struct mp_filter *f) +{ + struct filter_runner *r = f->in->runner; + atomic_store(&r->interrupt_flag, true); +} + void mp_filter_free_children(struct mp_filter *f) { while(f->in->num_children) @@ -717,6 +756,7 @@ struct mp_filter *mp_filter_create_with_params(struct mp_filter_params *params) *f->in->runner = (struct filter_runner){ .global = params->global, .root_filter = f, + .max_run_time = INFINITY, }; pthread_mutex_init(&f->in->runner->async_lock, NULL); } diff --git a/filters/filter.h b/filters/filter.h index dff1f4e016..5ecbb08111 100644 --- a/filters/filter.h +++ b/filters/filter.h @@ -409,8 +409,31 @@ struct AVBufferRef *mp_filter_load_hwdec_device(struct mp_filter *f, int avtype) // Perform filtering. This runs until the filter graph is blocked (due to // missing external input or unread output). It returns whether any outside // pins have changed state. +// Note: this always operates on the filter graph associated with f, f itself +// is not treated differently from any other filters in the graph. bool mp_filter_run(struct mp_filter *f); +// Set the maximum time mp_filter_run() should block. If the maximum time +// expires, the effect is the same as calling mp_filter_graph_interrupt() while +// the function is running. See that function for further details. +// The default is seconds==INFINITY. Values <=0 make it return after 1 iteration. +void mp_filter_graph_set_max_run_time(struct mp_filter *f, double seconds); + +// Interrupt mp_filter_run() asynchronously. This does not stop filtering in a +// destructive way, but merely suspends it. In practice, this will make +// mp_filter_run() return after the current filter's process() function has +// finished. Filtering can be resumed with subsequent mp_filter_run() calls. +// When mp_filter_run() is interrupted, it will trigger the filter graph wakeup +// callback, which in turn ensures that the user will call mp_filter_run() again. +// If it is called if not in mp_filter_run(), the next mp_filter_run() call is +// interrupted and no filtering is done for that call. +// Calling this too often will starve filtering. +// This does not call the graph wakeup callback directly, which will avoid +// potential reentrancy issues. (But mp_filter_run() will call it in reaction to +// it, as described above.) +// Explicitly thread-safe. +void mp_filter_graph_interrupt(struct mp_filter *f); + // Create a root dummy filter with no inputs or outputs. This fulfills the // following functions: // - passing it as parent filter to top-level filters @@ -428,6 +451,8 @@ struct mp_filter *mp_filter_create_root(struct mpv_global *global); // user's thread to call mp_filter_run() again. // The wakeup callback must not recursively call into any filter APIs, or do // blocking waits on the filter API (deadlocks will happen). +// A wakeup callback should always set a "wakeup" flag, that is reset only when +// mp_filter_run() is going to be called again with no wait time. void mp_filter_root_set_wakeup_cb(struct mp_filter *root, void (*wakeup_cb)(void *ctx), void *ctx); |