summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--filters/filter.c42
-rw-r--r--filters/filter.h25
2 files changed, 66 insertions, 1 deletions
diff --git a/filters/filter.c b/filters/filter.c
index dfe6df953a..a600b7dec4 100644
--- a/filters/filter.c
+++ b/filters/filter.c
@@ -1,8 +1,11 @@
+#include <math.h>
#include <pthread.h>
#include "common/common.h"
#include "common/global.h"
#include "common/msg.h"
+#include "osdep/atomic.h"
+#include "osdep/timer.h"
#include "video/hwdec.h"
#include "filter.h"
@@ -65,6 +68,9 @@ struct filter_runner {
struct mp_filter *root_filter;
+ double max_run_time;
+ atomic_bool interrupt_flag;
+
// If we're currently running the filter graph (for avoiding recursion).
bool filtering;
@@ -177,6 +183,10 @@ bool mp_filter_run(struct mp_filter *filter)
{
struct filter_runner *r = filter->in->runner;
+ int64_t end_time = 0;
+ if (isfinite(r->max_run_time))
+ end_time = mp_add_timeout(mp_time_us(), MPMAX(r->max_run_time, 0));
+
// (could happen with separate filter graphs calling each other, for now
// ignore this issue as we don't use such a setup anywhere)
assert(!r->filtering);
@@ -187,13 +197,30 @@ bool mp_filter_run(struct mp_filter *filter)
// to queue a wakeup again later. So do not call this in the loop.
flush_async_notifications(r);
- while (r->num_pending) {
+ while (1) {
+ if (atomic_exchange_explicit(&r->interrupt_flag, false,
+ memory_order_acq_rel))
+ {
+ pthread_mutex_lock(&r->async_lock);
+ if (!r->async_wakeup_sent && r->wakeup_cb)
+ r->wakeup_cb(r->wakeup_ctx);
+ r->async_wakeup_sent = true;
+ pthread_mutex_unlock(&r->async_lock);
+ break;
+ }
+
+ if (!r->num_pending)
+ break;
+
struct mp_filter *next = r->pending[r->num_pending - 1];
r->num_pending -= 1;
next->in->pending = false;
if (next->in->info->process)
next->in->info->process(next);
+
+ if (end_time && mp_time_us() >= end_time)
+ mp_filter_graph_interrupt(r->root_filter);
}
r->filtering = false;
@@ -644,6 +671,18 @@ void mp_filter_mark_async_progress(struct mp_filter *f)
filter_wakeup(f, true);
}
+void mp_filter_graph_set_max_run_time(struct mp_filter *f, double seconds)
+{
+ struct filter_runner *r = f->in->runner;
+ r->max_run_time = seconds;
+}
+
+void mp_filter_graph_interrupt(struct mp_filter *f)
+{
+ struct filter_runner *r = f->in->runner;
+ atomic_store(&r->interrupt_flag, true);
+}
+
void mp_filter_free_children(struct mp_filter *f)
{
while(f->in->num_children)
@@ -717,6 +756,7 @@ struct mp_filter *mp_filter_create_with_params(struct mp_filter_params *params)
*f->in->runner = (struct filter_runner){
.global = params->global,
.root_filter = f,
+ .max_run_time = INFINITY,
};
pthread_mutex_init(&f->in->runner->async_lock, NULL);
}
diff --git a/filters/filter.h b/filters/filter.h
index dff1f4e016..5ecbb08111 100644
--- a/filters/filter.h
+++ b/filters/filter.h
@@ -409,8 +409,31 @@ struct AVBufferRef *mp_filter_load_hwdec_device(struct mp_filter *f, int avtype)
// Perform filtering. This runs until the filter graph is blocked (due to
// missing external input or unread output). It returns whether any outside
// pins have changed state.
+// Note: this always operates on the filter graph associated with f, f itself
+// is not treated differently from any other filters in the graph.
bool mp_filter_run(struct mp_filter *f);
+// Set the maximum time mp_filter_run() should block. If the maximum time
+// expires, the effect is the same as calling mp_filter_graph_interrupt() while
+// the function is running. See that function for further details.
+// The default is seconds==INFINITY. Values <=0 make it return after 1 iteration.
+void mp_filter_graph_set_max_run_time(struct mp_filter *f, double seconds);
+
+// Interrupt mp_filter_run() asynchronously. This does not stop filtering in a
+// destructive way, but merely suspends it. In practice, this will make
+// mp_filter_run() return after the current filter's process() function has
+// finished. Filtering can be resumed with subsequent mp_filter_run() calls.
+// When mp_filter_run() is interrupted, it will trigger the filter graph wakeup
+// callback, which in turn ensures that the user will call mp_filter_run() again.
+// If it is called if not in mp_filter_run(), the next mp_filter_run() call is
+// interrupted and no filtering is done for that call.
+// Calling this too often will starve filtering.
+// This does not call the graph wakeup callback directly, which will avoid
+// potential reentrancy issues. (But mp_filter_run() will call it in reaction to
+// it, as described above.)
+// Explicitly thread-safe.
+void mp_filter_graph_interrupt(struct mp_filter *f);
+
// Create a root dummy filter with no inputs or outputs. This fulfills the
// following functions:
// - passing it as parent filter to top-level filters
@@ -428,6 +451,8 @@ struct mp_filter *mp_filter_create_root(struct mpv_global *global);
// user's thread to call mp_filter_run() again.
// The wakeup callback must not recursively call into any filter APIs, or do
// blocking waits on the filter API (deadlocks will happen).
+// A wakeup callback should always set a "wakeup" flag, that is reset only when
+// mp_filter_run() is going to be called again with no wait time.
void mp_filter_root_set_wakeup_cb(struct mp_filter *root,
void (*wakeup_cb)(void *ctx), void *ctx);