summaryrefslogtreecommitdiffstats
path: root/filters/filter.c
diff options
context:
space:
mode:
Diffstat (limited to 'filters/filter.c')
-rw-r--r--filters/filter.c194
1 files changed, 152 insertions, 42 deletions
diff --git a/filters/filter.c b/filters/filter.c
index beec13c210..1d13393194 100644
--- a/filters/filter.c
+++ b/filters/filter.c
@@ -1,9 +1,15 @@
-#include <pthread.h>
+#include <math.h>
+#include <stdatomic.h>
+
+#include <libavutil/hwcontext.h>
#include "common/common.h"
#include "common/global.h"
#include "common/msg.h"
+#include "osdep/threads.h"
+#include "osdep/timer.h"
#include "video/hwdec.h"
+#include "video/img_format.h"
#include "filter.h"
#include "filter_internal.h"
@@ -65,9 +71,15 @@ struct filter_runner {
struct mp_filter *root_filter;
+ double max_run_time;
+ atomic_bool interrupt_flag;
+
// If we're currently running the filter graph (for avoiding recursion).
bool filtering;
+ // If set, recursive filtering was initiated through this pin.
+ struct mp_pin *recursive;
+
// Set of filters which need process() to be called. A filter is in this
// array iff mp_filter_internal.pending==true.
struct mp_filter **pending;
@@ -78,7 +90,7 @@ struct filter_runner {
// For async notifications only. We don't bother making this fine grained
// across filters.
- pthread_mutex_t async_lock;
+ mp_mutex async_lock;
// Wakeup is pending. Protected by async_lock.
bool async_wakeup_sent;
@@ -101,13 +113,13 @@ struct mp_filter_internal {
struct mp_filter *error_handler;
char *name;
+ bool high_priority;
bool pending;
bool async_pending;
bool failed;
};
-
// Called when new work needs to be done on a pin belonging to the filter:
// - new data was requested
// - new data has been queued
@@ -124,21 +136,37 @@ static void add_pending(struct mp_filter *f)
// This should probably really be some sort of priority queue, but for now
// something naive and dumb does the job too.
f->in->pending = true;
- MP_TARRAY_APPEND(r, r->pending, r->num_pending, f);
+ if (f->in->high_priority) {
+ MP_TARRAY_INSERT_AT(r, r->pending, r->num_pending, 0, f);
+ } else {
+ MP_TARRAY_APPEND(r, r->pending, r->num_pending, f);
+ }
+}
+
+static void add_pending_pin(struct mp_pin *p)
+{
+ struct mp_filter *f = p->manual_connection;
+ assert(f);
+
+ if (f->in->pending)
+ return;
+
+ add_pending(f);
// Need to tell user that something changed.
- if (f == r->root_filter)
- r->external_pending = true;
+ if (f == f->in->runner->root_filter && p != f->in->runner->recursive)
+ f->in->runner->external_pending = true;
}
// Possibly enter recursive filtering. This is done as convenience for
// "external" filter users only. (Normal filtering does this iteratively via
-// mp_filter_run() to avoid filter reentrancy issues and deep call stacks.) If
-// the API users uses an external manually connected pin, do recursive filtering
-// as a not strictly necessary feature which makes outside I/O with filters
-// easier.
-static void filter_recursive(struct mp_filter *f)
+// mp_filter_graph_run() to avoid filter reentrancy issues and deep call
+// stacks.) If the API users uses an external manually connected pin, do
+// recursive filtering as a not strictly necessary feature which makes outside
+// I/O with filters easier.
+static void filter_recursive(struct mp_pin *p)
{
+ struct mp_filter *f = p->conn->manual_connection;
assert(f);
struct filter_runner *r = f->in->runner;
@@ -146,9 +174,15 @@ static void filter_recursive(struct mp_filter *f)
if (r->filtering)
return;
+ assert(!r->recursive);
+ r->recursive = p;
+
// Also don't lose the pending state, which the user may or may not
// care about.
- r->external_pending |= mp_filter_run(r->root_filter);
+ r->external_pending |= mp_filter_graph_run(r->root_filter);
+
+ assert(r->recursive == p);
+ r->recursive = NULL;
}
void mp_filter_internal_mark_progress(struct mp_filter *f)
@@ -162,7 +196,7 @@ void mp_filter_internal_mark_progress(struct mp_filter *f)
// sync notifications don't need any locking.
static void flush_async_notifications(struct filter_runner *r)
{
- pthread_mutex_lock(&r->async_lock);
+ mp_mutex_lock(&r->async_lock);
for (int n = 0; n < r->num_async_pending; n++) {
struct mp_filter *f = r->async_pending[n];
add_pending(f);
@@ -170,12 +204,17 @@ static void flush_async_notifications(struct filter_runner *r)
}
r->num_async_pending = 0;
r->async_wakeup_sent = false;
- pthread_mutex_unlock(&r->async_lock);
+ mp_mutex_unlock(&r->async_lock);
}
-bool mp_filter_run(struct mp_filter *filter)
+bool mp_filter_graph_run(struct mp_filter *filter)
{
struct filter_runner *r = filter->in->runner;
+ assert(filter == r->root_filter); // user is supposed to call this on root only
+
+ int64_t end_time = 0;
+ if (isfinite(r->max_run_time))
+ end_time = mp_time_ns_add(mp_time_ns(), MPMAX(r->max_run_time, 0));
// (could happen with separate filter graphs calling each other, for now
// ignore this issue as we don't use such a setup anywhere)
@@ -185,13 +224,45 @@ bool mp_filter_run(struct mp_filter *filter)
flush_async_notifications(r);
- while (r->num_pending) {
- struct mp_filter *next = r->pending[r->num_pending - 1];
- r->num_pending -= 1;
- next->in->pending = false;
+ bool exit_req = false;
+
+ while (1) {
+ if (atomic_exchange_explicit(&r->interrupt_flag, false,
+ memory_order_acq_rel))
+ {
+ mp_mutex_lock(&r->async_lock);
+ if (!r->async_wakeup_sent && r->wakeup_cb)
+ r->wakeup_cb(r->wakeup_ctx);
+ r->async_wakeup_sent = true;
+ mp_mutex_unlock(&r->async_lock);
+ exit_req = true;
+ }
+
+ if (!r->num_pending) {
+ flush_async_notifications(r);
+ if (!r->num_pending)
+ break;
+ }
+
+ struct mp_filter *next = NULL;
+ if (r->pending[0]->in->high_priority) {
+ next = r->pending[0];
+ MP_TARRAY_REMOVE_AT(r->pending, r->num_pending, 0);
+ } else if (!exit_req) {
+ next = r->pending[r->num_pending - 1];
+ r->num_pending -= 1;
+ }
+
+ if (!next)
+ break;
+
+ next->in->pending = false;
if (next->in->info->process)
next->in->info->process(next);
+
+ if (end_time && mp_time_ns() >= end_time)
+ mp_filter_graph_interrupt(r->root_filter);
}
r->filtering = false;
@@ -232,8 +303,8 @@ bool mp_pin_in_write(struct mp_pin *p, struct mp_frame frame)
assert(p->conn->data.type == MP_FRAME_NONE);
p->conn->data = frame;
p->conn->data_requested = false;
- add_pending(p->conn->manual_connection);
- filter_recursive(p->conn->manual_connection);
+ add_pending_pin(p->conn);
+ filter_recursive(p);
return true;
}
@@ -251,9 +322,9 @@ bool mp_pin_out_request_data(struct mp_pin *p)
if (p->conn && p->conn->manual_connection) {
if (!p->data_requested) {
p->data_requested = true;
- add_pending(p->conn->manual_connection);
+ add_pending_pin(p->conn);
}
- filter_recursive(p->conn->manual_connection);
+ filter_recursive(p);
}
return mp_pin_out_has_data(p);
}
@@ -261,7 +332,7 @@ bool mp_pin_out_request_data(struct mp_pin *p)
void mp_pin_out_request_data_next(struct mp_pin *p)
{
if (mp_pin_out_request_data(p))
- add_pending(p->conn->manual_connection);
+ add_pending_pin(p->conn);
}
struct mp_frame mp_pin_out_read(struct mp_pin *p)
@@ -304,13 +375,15 @@ static struct mp_pin *find_connected_end(struct mp_pin *p)
return other;
p = other->user_conn;
}
- assert(0);
+ MP_ASSERT_UNREACHABLE();
}
// With p being part of a connection, create the pin_connection and set all
// state flags.
static void init_connection(struct mp_pin *p)
{
+ struct filter_runner *runner = p->owner->in->runner;
+
if (p->dir == MP_PIN_IN)
p = p->other;
@@ -321,7 +394,13 @@ static void init_connection(struct mp_pin *p)
assert(!in->user_conn);
assert(!out->user_conn);
- // Logicaly, the ends are always manual connections. A pin chain without
+ // This and similar checks enforce the same root filter requirement.
+ if (in->manual_connection)
+ assert(in->manual_connection->in->runner == runner);
+ if (out->manual_connection)
+ assert(out->manual_connection->in->runner == runner);
+
+ // Logically, the ends are always manual connections. A pin chain without
// manual connections at the ends is still disconnected (or if this
// attempted to extend an existing connection, becomes dangling and gets
// disconnected).
@@ -339,6 +418,7 @@ static void init_connection(struct mp_pin *p)
assert(!cur->data.type); // unused for in pins
assert(!cur->other->data_requested); // unset for unconnected out pins
assert(!cur->other->data.type); // unset for unconnected out pins
+ assert(cur->owner->in->runner == runner);
cur->within_conn = cur->other->within_conn = true;
cur = cur->other->user_conn;
}
@@ -451,6 +531,16 @@ const char *mp_filter_get_name(struct mp_filter *f)
return f->in->name;
}
+const struct mp_filter_info *mp_filter_get_info(struct mp_filter *f)
+{
+ return f->in->info;
+}
+
+void mp_filter_set_high_priority(struct mp_filter *f, bool pri)
+{
+ f->in->high_priority = pri;
+}
+
void mp_filter_set_name(struct mp_filter *f, const char *name)
{
talloc_free(f->in->name);
@@ -595,32 +685,36 @@ struct mp_stream_info *mp_filter_find_stream_info(struct mp_filter *f)
return NULL;
}
-struct AVBufferRef *mp_filter_load_hwdec_device(struct mp_filter *f, int avtype)
+struct mp_hwdec_ctx *mp_filter_load_hwdec_device(struct mp_filter *f, int imgfmt)
{
struct mp_stream_info *info = mp_filter_find_stream_info(f);
if (!info || !info->hwdec_devs)
return NULL;
- hwdec_devices_request_all(info->hwdec_devs);
+ struct hwdec_imgfmt_request params = {
+ .imgfmt = imgfmt,
+ .probing = false,
+ };
+ hwdec_devices_request_for_img_fmt(info->hwdec_devs, &params);
- return hwdec_devices_get_lavc(info->hwdec_devs, avtype);
+ return hwdec_devices_get_by_imgfmt(info->hwdec_devs, imgfmt);
}
static void filter_wakeup(struct mp_filter *f, bool mark_only)
{
struct filter_runner *r = f->in->runner;
- pthread_mutex_lock(&r->async_lock);
+ mp_mutex_lock(&r->async_lock);
if (!f->in->async_pending) {
f->in->async_pending = true;
// (not using a talloc parent for thread safety reasons)
MP_TARRAY_APPEND(NULL, r->async_pending, r->num_async_pending, f);
- if (!mark_only && !r->async_wakeup_sent) {
- if (r->wakeup_cb)
- r->wakeup_cb(r->wakeup_ctx);
- r->async_wakeup_sent = true;
- }
}
- pthread_mutex_unlock(&r->async_lock);
+ if (!mark_only && !r->async_wakeup_sent) {
+ if (r->wakeup_cb)
+ r->wakeup_cb(r->wakeup_ctx);
+ r->async_wakeup_sent = true;
+ }
+ mp_mutex_unlock(&r->async_lock);
}
void mp_filter_wakeup(struct mp_filter *f)
@@ -633,6 +727,20 @@ void mp_filter_mark_async_progress(struct mp_filter *f)
filter_wakeup(f, true);
}
+void mp_filter_graph_set_max_run_time(struct mp_filter *f, double seconds)
+{
+ struct filter_runner *r = f->in->runner;
+ assert(f == r->root_filter); // user is supposed to call this on root only
+ r->max_run_time = seconds;
+}
+
+void mp_filter_graph_interrupt(struct mp_filter *f)
+{
+ struct filter_runner *r = f->in->runner;
+ assert(f == r->root_filter); // user is supposed to call this on root only
+ atomic_store(&r->interrupt_flag, true);
+}
+
void mp_filter_free_children(struct mp_filter *f)
{
while(f->in->num_children)
@@ -676,7 +784,7 @@ static void filter_destructor(void *p)
if (r->root_filter == f) {
assert(!f->in->parent);
- pthread_mutex_destroy(&r->async_lock);
+ mp_mutex_destroy(&r->async_lock);
talloc_free(r->async_pending);
talloc_free(r);
}
@@ -706,8 +814,9 @@ struct mp_filter *mp_filter_create_with_params(struct mp_filter_params *params)
*f->in->runner = (struct filter_runner){
.global = params->global,
.root_filter = f,
+ .max_run_time = INFINITY,
};
- pthread_mutex_init(&f->in->runner->async_lock, NULL);
+ mp_mutex_init(&f->in->runner->async_lock);
}
if (!f->global)
@@ -758,14 +867,15 @@ struct mp_filter *mp_filter_create_root(struct mpv_global *global)
return mp_filter_create_with_params(&params);
}
-void mp_filter_root_set_wakeup_cb(struct mp_filter *root,
- void (*wakeup_cb)(void *ctx), void *ctx)
+void mp_filter_graph_set_wakeup_cb(struct mp_filter *root,
+ void (*wakeup_cb)(void *ctx), void *ctx)
{
struct filter_runner *r = root->in->runner;
- pthread_mutex_lock(&r->async_lock);
+ assert(root == r->root_filter); // user is supposed to call this on root only
+ mp_mutex_lock(&r->async_lock);
r->wakeup_cb = wakeup_cb;
r->wakeup_ctx = ctx;
- pthread_mutex_unlock(&r->async_lock);
+ mp_mutex_unlock(&r->async_lock);
}
static const char *filt_name(struct mp_filter *f)