diff options
Diffstat (limited to 'filters/filter.c')
-rw-r--r-- | filters/filter.c | 790 |
1 files changed, 790 insertions, 0 deletions
diff --git a/filters/filter.c b/filters/filter.c new file mode 100644 index 0000000000..bbd8a4ff5b --- /dev/null +++ b/filters/filter.c @@ -0,0 +1,790 @@ +#include <pthread.h> + +#include "common/common.h" +#include "common/global.h" +#include "common/msg.h" +#include "video/hwdec.h" + +#include "filter.h" +#include "filter_internal.h" + +// Note about connections: +// They can be confusing, because pins come in pairs, and multiple pins can be +// transitively connected via mp_pin_connect(). To avoid dealing with this, +// mp_pin.conn is used to skip redundant connected pins. +// Consider <1a|1b> a symbol for mp_pin pair #1 and f1 as filter #1. Then: +// f1 <-> <1a|1b> <-> <2a|2b> <-> <3a|3b> <-> f2 +// would be a connection from 1a to 3b. 1a could be a private pin of f1 (e.g. +// mp_filter.ppin[0]), and 1b would be the public pin (e.g. mp_filter.pin[0]). +// A user could have called mp_pin_connect(2a, 1b) mp_pin_connect(3a, 2b) +// (assuming 1b has dir==MP_PIN_OUT). The end result are the following values: +// pin user_conn conn manual_connection within_conn (uses mp_pin.data) +// 1a NULL 3b f1 false no +// 1b 2a NULL NULL true no +// 2a 1b NULL NULL true no +// 2b 3a NULL NULL true no +// 3a 2b NULL NULL true no +// 3b NULL 1a f2 false yes +// The minimal case of f1 <-> <1a|1b> <-> f2 (1b dir=out) would be: +// 1a NULL 1b f1 false no +// 1b NULL 1a f2 false yes +// In both cases, only the final output pin uses mp_pin.data/data_requested. +struct mp_pin { + const char *name; + enum mp_pin_dir dir; + struct mp_pin *other; // paired mp_pin representing other end + struct mp_filter *owner; + + struct mp_pin *user_conn; // as set by mp_pin_connect() + struct mp_pin *conn; // transitive, actual end of the connection + + // Set if the pin is considered connected, but has no user_conn. pin + // state changes are handled by the given filter. (Defaults to the root + // filter if the pin is for the user of a filter graph.) + // As an invariant, conn and manual_connection are both either set or unset. + struct mp_filter *manual_connection; + + // Set if the pin is indirect part of a connection chain, but not one of + // the end pins. Basically it's a redundant in-between pin. You never access + // these with the pin data flow functions, because only the end pins matter. + // This flag is for checking and enforcing this. + bool within_conn; + + // This is used for the final output mp_pin in connections only. + bool data_requested; // true if out wants new data + struct mp_frame data; // possibly buffered frame (MP_FRAME_NONE if + // empty, usually only temporary) +}; + +// Root filters create this, all other filters reference it. +struct filter_runner { + struct mpv_global *global; + + void (*wakeup_cb)(void *ctx); + void *wakeup_ctx; + + struct mp_filter *root_filter; + + // If we're currently running the filter graph (for avoiding recursion). + bool filtering; + + // Set of filters which need process() to be called. A filter is in this + // array iff mp_filter_internal.pending==true. + struct mp_filter **pending; + int num_pending; + + // Any outside pins have changed state. + bool external_pending; + + // For async notifications only. We don't bother making this fine grained + // across filters. + pthread_mutex_t async_lock; + + // Wakeup is pending. Protected by async_lock. + bool async_wakeup_sent; + + // Similar to pending[]. Uses mp_filter_internal.async_pending. Protected + // by async_lock. + struct mp_filter **async_pending; + int num_async_pending; +}; + +struct mp_filter_internal { + const struct mp_filter_info *info; + + struct mp_filter *parent; + struct filter_runner *runner; + + struct mp_filter **children; + int num_children; + + struct mp_filter *error_handler; + + char *name; + + bool pending; + bool async_pending; + bool failed; +}; + +static void add_pending(struct mp_filter *f) +{ + struct filter_runner *r = f->in->runner; + + if (f->in->pending) + return; + + // This should probably really be some sort of priority queue, but for now + // something naive and dumb does the job too. + f->in->pending = true; + MP_TARRAY_APPEND(r, r->pending, r->num_pending, f); +} + +// Called when new work needs to be done on a pin belonging to the filter: +// - new data was requested +// - new data has been queued +// - or just an connect/disconnect/async notification happened +// This means the process function for this filter has to be called next. +static void update_filter(struct mp_filter *src, struct mp_filter *f) +{ + assert(f); + struct filter_runner *r = f->in->runner; + + // Make sure the filter knows it has to make progress. + if (src->in->runner != r) { + // Connected to a different graph. The user has to drive those manually, + // and we simplify tell the user via the mp_filter_run() return value. + r->external_pending = true; + } else if (!f->in->pending) { + add_pending(f); + + if (!r->filtering) { + // Likely the "outer" API user used an external manually connected + // pin, so do recursive filtering (as a not strictly necessary + // feature which makes outside I/O with filters easier). + // Also don't lose the pending state, which the user may or may not + // care about. + // Note that we must avoid calling this from within filtering, + // because that would make the process() functions recursive and + // reentrant (and hard to reason about). + r->external_pending |= mp_filter_run(r->root_filter); + } + + // Need to tell user that something changed. + if (f == r->root_filter) + r->external_pending = true; + } +} + +void mp_filter_internal_mark_progress(struct mp_filter *f) +{ + struct filter_runner *r = f->in->runner; + assert(r->filtering); // only call from f's process() + add_pending(f); +} + +// Basically copy the async notifications to the sync ones. Done so that the +// sync notifications don't need any locking. +static void flush_async_notifications(struct filter_runner *r, bool queue) +{ + pthread_mutex_lock(&r->async_lock); + for (int n = 0; n < r->num_async_pending; n++) { + struct mp_filter *f = r->async_pending[n]; + if (queue) + add_pending(f); + f->in->async_pending = false; + } + r->num_async_pending = 0; + r->async_wakeup_sent = false; + pthread_mutex_unlock(&r->async_lock); +} + +bool mp_filter_run(struct mp_filter *filter) +{ + struct filter_runner *r = filter->in->runner; + + r->filtering = true; + + flush_async_notifications(r, true); + + while (r->num_pending) { + struct mp_filter *next = r->pending[r->num_pending - 1]; + r->num_pending -= 1; + next->in->pending = false; + + if (next->in->info->process) + next->in->info->process(next); + } + + r->filtering = false; + + bool externals = r->external_pending; + r->external_pending = false; + return externals; +} + +bool mp_pin_can_transfer_data(struct mp_pin *dst, struct mp_pin *src) +{ + return mp_pin_in_needs_data(dst) && mp_pin_out_request_data(src); +} + +bool mp_pin_transfer_data(struct mp_pin *dst, struct mp_pin *src) +{ + if (!mp_pin_can_transfer_data(dst, src)) + return false; + mp_pin_in_write(dst, mp_pin_out_read(src)); + return true; +} + +bool mp_pin_in_needs_data(struct mp_pin *p) +{ + assert(p->dir == MP_PIN_IN); + assert(!p->within_conn); + return p->conn && p->conn->manual_connection && p->conn->data_requested; +} + +bool mp_pin_in_write(struct mp_pin *p, struct mp_frame frame) +{ + if (!mp_pin_in_needs_data(p) || frame.type == MP_FRAME_NONE) { + if (frame.type) + MP_ERR(p->owner, "losing frame on %s\n", p->name); + mp_frame_unref(&frame); + return false; + } + assert(p->conn->data.type == MP_FRAME_NONE); + p->conn->data = frame; + p->conn->data_requested = false; + update_filter(p->owner, p->conn->manual_connection); + return true; +} + +bool mp_pin_out_has_data(struct mp_pin *p) +{ + assert(p->dir == MP_PIN_OUT); + assert(!p->within_conn); + return p->conn && p->conn->manual_connection && p->data.type != MP_FRAME_NONE; +} + +bool mp_pin_out_request_data(struct mp_pin *p) +{ + if (mp_pin_out_has_data(p)) + return true; + if (p->conn && p->conn->manual_connection && !p->data_requested) { + p->data_requested = true; + update_filter(p->owner, p->conn->manual_connection); + } + return mp_pin_out_has_data(p); +} + +struct mp_frame mp_pin_out_read(struct mp_pin *p) +{ + if (!mp_pin_out_request_data(p)) + return MP_NO_FRAME; + struct mp_frame res = p->data; + p->data = MP_NO_FRAME; + return res; +} + +void mp_pin_out_unread(struct mp_pin *p, struct mp_frame frame) +{ + assert(p->dir == MP_PIN_OUT); + assert(!p->within_conn); + assert(p->conn && p->conn->manual_connection); + // Unread is allowed strictly only if you didn't do anything else with + // the pin since the time you read it. + assert(!mp_pin_out_has_data(p)); + assert(!p->data_requested); + p->data = frame; +} + +void mp_pin_out_repeat_eof(struct mp_pin *p) +{ + mp_pin_out_unread(p, MP_EOF_FRAME); +} + +// Follow mp_pin pairs/connection into the "other" direction of the pin, until +// the last pin is found. (In the simplest case, this is just p->other.) E.g.: +// <1a|1b> <-> <2a|2b> <-> <3a|3b> +// find_connected_end(2b)==1a +// find_connected_end(1b)==1a +// find_connected_end(1a)==3b +static struct mp_pin *find_connected_end(struct mp_pin *p) +{ + while (1) { + struct mp_pin *other = p->other; + if (!other->user_conn) + return other; + p = other->user_conn; + } + assert(0); +} + +// With p being part of a connection, create the pin_connection and set all +// state flags. +static void init_connection(struct mp_pin *p) +{ + if (p->dir == MP_PIN_IN) + p = p->other; + + struct mp_pin *in = find_connected_end(p); + struct mp_pin *out = find_connected_end(p->other); + + // These are the "outer" pins by definition, they have no user connections. + assert(!in->user_conn); + assert(!out->user_conn); + + // Logicaly, the ends are always manual connections. A pin chain without + // manual connections at the ends is still disconnected (or if this + // attempted to extend an existing connection, becomes dangling and gets + // disconnected). + if (!in->manual_connection && !out->manual_connection) + return; + + assert(in->dir == MP_PIN_IN); + assert(out->dir == MP_PIN_OUT); + + struct mp_pin *cur = in; + while (cur) { + assert(!cur->within_conn && !cur->other->within_conn); + assert(!cur->conn && !cur->other->conn); + assert(!cur->data_requested); // unused for in pins + assert(!cur->data.type); // unused for in pins + assert(!cur->other->data_requested); // unset for unconnected out pins + assert(!cur->other->data.type); // unset for unconnected out pins + cur->within_conn = cur->other->within_conn = true; + cur = cur->other->user_conn; + } + + in->conn = out; + in->within_conn = false; + out->conn = in; + out->within_conn = false; + + // Scheduling so far will be messed up. + add_pending(in->manual_connection); + add_pending(out->manual_connection); +} + +void mp_pin_connect(struct mp_pin *dst, struct mp_pin *src) +{ + assert(src->dir == MP_PIN_OUT); + assert(dst->dir == MP_PIN_IN); + + if (dst->user_conn == src) { + assert(src->user_conn == dst); + return; + } + + mp_pin_disconnect(src); + mp_pin_disconnect(dst); + + src->user_conn = dst; + dst->user_conn = src; + + init_connection(src); +} + +void mp_pin_set_manual_connection(struct mp_pin *p, bool connected) +{ + mp_pin_set_manual_connection_for(p, connected ? p->owner->in->parent : NULL); +} + +void mp_pin_set_manual_connection_for(struct mp_pin *p, struct mp_filter *f) +{ + if (p->manual_connection == f) + return; + if (p->within_conn) + mp_pin_disconnect(p); + p->manual_connection = f; + init_connection(p); +} + +struct mp_filter *mp_pin_get_manual_connection(struct mp_pin *p) +{ + return p->manual_connection; +} + +static void deinit_connection(struct mp_pin *p) +{ + if (p->dir == MP_PIN_OUT) + p = p->other; + + p = find_connected_end(p); + + while (p) { + p->conn = p->other->conn = NULL; + p->within_conn = p->other->within_conn = false; + assert(!p->other->data_requested); // unused for in pins + assert(!p->other->data.type); // unused for in pins + p->data_requested = false; + if (p->data.type) + MP_WARN(p->owner, "dropping frame due to pin disconnect\n"); + if (p->data_requested) + MP_WARN(p->owner, "dropping request due to pin disconnect\n"); + mp_frame_unref(&p->data); + p = p->other->user_conn; + } +} + +void mp_pin_disconnect(struct mp_pin *p) +{ + if (!mp_pin_is_connected(p)) + return; + + p->manual_connection = NULL; + + struct mp_pin *conn = p->user_conn; + if (conn) { + p->user_conn = NULL; + conn->user_conn = NULL; + deinit_connection(conn); + } + + deinit_connection(p); +} + +bool mp_pin_is_connected(struct mp_pin *p) +{ + return p->user_conn || p->manual_connection; +} + +const char *mp_pin_get_name(struct mp_pin *p) +{ + return p->name; +} + +enum mp_pin_dir mp_pin_get_dir(struct mp_pin *p) +{ + return p->dir; +} + +const char *mp_filter_get_name(struct mp_filter *f) +{ + return f->in->name; +} + +void mp_filter_set_name(struct mp_filter *f, const char *name) +{ + talloc_free(f->in->name); + f->in->name = talloc_strdup(f, name); +} + +struct mp_pin *mp_filter_get_named_pin(struct mp_filter *f, const char *name) +{ + for (int n = 0; n < f->num_pins; n++) { + if (name && strcmp(f->pins[n]->name, name) == 0) + return f->pins[n]; + } + return NULL; +} + +void mp_filter_set_error_handler(struct mp_filter *f, struct mp_filter *handler) +{ + f->in->error_handler = handler; +} + +void mp_filter_internal_mark_failed(struct mp_filter *f) +{ + while (f) { + f->in->failed = true; + if (f->in->error_handler) { + add_pending(f->in->error_handler); + break; + } + f = f->in->parent; + } +} + +bool mp_filter_has_failed(struct mp_filter *filter) +{ + bool failed = filter->in->failed; + filter->in->failed = false; + return failed; +} + +static void reset_pin(struct mp_pin *p) +{ + if (!p->conn || p->dir != MP_PIN_OUT) { + assert(!p->data.type); + assert(!p->data_requested); + } + mp_frame_unref(&p->data); + p->data_requested = false; +} + +void mp_filter_reset(struct mp_filter *filter) +{ + for (int n = 0; n < filter->in->num_children; n++) + mp_filter_reset(filter->in->children[n]); + + for (int n = 0; n < filter->num_pins; n++) { + struct mp_pin *p = filter->ppins[n]; + reset_pin(p); + reset_pin(p->other); + } + + if (filter->in->info->reset) + filter->in->info->reset(filter); +} + +struct mp_pin *mp_filter_add_pin(struct mp_filter *f, enum mp_pin_dir dir, + const char *name) +{ + assert(dir == MP_PIN_IN || dir == MP_PIN_OUT); + assert(name && name[0]); + assert(!mp_filter_get_named_pin(f, name)); + + // "Public" pin + struct mp_pin *p = talloc_ptrtype(NULL, p); + *p = (struct mp_pin){ + .name = talloc_strdup(p, name), + .dir = dir, + .owner = f, + .manual_connection = f->in->parent, + }; + + // "Private" paired pin + p->other = talloc_ptrtype(NULL, p); + *p->other = (struct mp_pin){ + .name = p->name, + .dir = p->dir == MP_PIN_IN ? MP_PIN_OUT : MP_PIN_IN, + .owner = f, + .other = p, + .manual_connection = f, + }; + + MP_TARRAY_GROW(f, f->pins, f->num_pins); + MP_TARRAY_GROW(f, f->ppins, f->num_pins); + f->pins[f->num_pins] = p; + f->ppins[f->num_pins] = p->other; + f->num_pins += 1; + + init_connection(p); + + return p->other; +} + +void mp_filter_remove_pin(struct mp_filter *f, struct mp_pin *p) +{ + if (!p) + return; + + assert(p->owner == f); + mp_pin_disconnect(p); + mp_pin_disconnect(p->other); + + int index = -1; + for (int n = 0; n < f->num_pins; n++) { + if (f->ppins[n] == p) { + index = n; + break; + } + } + assert(index >= 0); + + talloc_free(f->pins[index]); + talloc_free(f->ppins[index]); + + int count = f->num_pins; + MP_TARRAY_REMOVE_AT(f->pins, count, index); + count = f->num_pins; + MP_TARRAY_REMOVE_AT(f->ppins, count, index); + f->num_pins -= 1; +} + +bool mp_filter_command(struct mp_filter *f, struct mp_filter_command *cmd) +{ + return f->in->info->command ? f->in->info->command(f, cmd) : false; +} + +struct mp_stream_info *mp_filter_find_stream_info(struct mp_filter *f) +{ + while (f) { + if (f->stream_info) + return f->stream_info; + f = f->in->parent; + } + return NULL; +} + +struct AVBufferRef *mp_filter_load_hwdec_device(struct mp_filter *f, int avtype) +{ + struct mp_stream_info *info = mp_filter_find_stream_info(f); + if (!info || !info->hwdec_devs) + return NULL; + + hwdec_devices_request_all(info->hwdec_devs); + + return hwdec_devices_get_lavc(info->hwdec_devs, avtype); +} + +static void filter_wakeup(struct mp_filter *f, bool mark_only) +{ + struct filter_runner *r = f->in->runner; + pthread_mutex_lock(&r->async_lock); + if (!f->in->async_pending) { + f->in->async_pending = true; + // (not using a talloc parent for thread safety reasons) + MP_TARRAY_APPEND(NULL, r->async_pending, r->num_async_pending, f); + if (!mark_only && !r->async_wakeup_sent) { + if (r->wakeup_cb) + r->wakeup_cb(r->wakeup_ctx); + r->async_wakeup_sent = true; + } + } + pthread_mutex_unlock(&r->async_lock); +} + +void mp_filter_wakeup(struct mp_filter *f) +{ + filter_wakeup(f, false); +} + +void mp_filter_mark_async_progress(struct mp_filter *f) +{ + filter_wakeup(f, true); +} + +void mp_filter_free_children(struct mp_filter *f) +{ + while(f->in->num_children) + talloc_free(f->in->children[0]); +} + +static void filter_destructor(void *p) +{ + struct mp_filter *f = p; + struct filter_runner *r = f->in->runner; + + if (f->in->info->destroy) + f->in->info->destroy(f); + + // For convenience, free child filters. + mp_filter_free_children(f); + + while (f->num_pins) + mp_filter_remove_pin(f, f->ppins[0]); + + // Just make sure the filter is not still in the async notifications set. + // There will be no more new notifications at this point (due to destroy()). + flush_async_notifications(r, false); + + for (int n = 0; n < r->num_pending; n++) { + if (r->pending[n] == f) { + MP_TARRAY_REMOVE_AT(r->pending, r->num_pending, n); + break; + } + } + + if (f->in->parent) { + struct mp_filter_internal *p_in = f->in->parent->in; + for (int n = 0; n < p_in->num_children; n++) { + if (p_in->children[n] == f) { + MP_TARRAY_REMOVE_AT(p_in->children, p_in->num_children, n); + break; + } + } + } + + if (r->root_filter == f) { + assert(!f->in->parent); + pthread_mutex_destroy(&r->async_lock); + talloc_free(r->async_pending); + talloc_free(r); + } +} + + +struct mp_filter *mp_filter_create_with_params(struct mp_filter_params *params) +{ + struct mp_filter *f = talloc(NULL, struct mp_filter); + talloc_set_destructor(f, filter_destructor); + *f = (struct mp_filter){ + .priv = params->info->priv_size ? + talloc_zero_size(f, params->info->priv_size) : NULL, + .global = params->global, + .in = talloc(f, struct mp_filter_internal), + }; + *f->in = (struct mp_filter_internal){ + .info = params->info, + .parent = params->parent, + .runner = params->parent ? params->parent->in->runner : NULL, + }; + + if (!f->in->runner) { + assert(params->global); + + f->in->runner = talloc(NULL, struct filter_runner); + *f->in->runner = (struct filter_runner){ + .global = params->global, + .root_filter = f, + }; + pthread_mutex_init(&f->in->runner->async_lock, NULL); + } + + if (!f->global) + f->global = f->in->runner->global; + + if (f->in->parent) { + struct mp_filter_internal *parent = f->in->parent->in; + MP_TARRAY_APPEND(parent, parent->children, parent->num_children, f); + } + + f->log = mp_log_new(f, f->global->log, params->info->name); + + if (f->in->info->init) { + if (!f->in->info->init(f, params)) { + talloc_free(f); + return NULL; + } + } + + return f; +} + +struct mp_filter *mp_filter_create(struct mp_filter *parent, + const struct mp_filter_info *info) +{ + assert(parent); + assert(info); + struct mp_filter_params params = { + .info = info, + .parent = parent, + }; + return mp_filter_create_with_params(¶ms); +} + +// (the root filter is just a dummy filter - nothing special about it, except +// that it has no parent, and serves as manual connection for "external" pins) +static const struct mp_filter_info filter_root = { + .name = "root", +}; + +struct mp_filter *mp_filter_create_root(struct mpv_global *global) +{ + struct mp_filter_params params = { + .info = &filter_root, + .global = global, + }; + return mp_filter_create_with_params(¶ms); +} + +void mp_filter_root_set_wakeup_cb(struct mp_filter *root, + void (*wakeup_cb)(void *ctx), void *ctx) +{ + struct filter_runner *r = root->in->runner; + pthread_mutex_lock(&r->async_lock); + r->wakeup_cb = wakeup_cb; + r->wakeup_ctx = ctx; + pthread_mutex_unlock(&r->async_lock); +} + +static const char *filt_name(struct mp_filter *f) +{ + return f ? f->in->info->name : "-"; +} + +static void dump_pin_state(struct mp_filter *f, struct mp_pin *pin) +{ + MP_WARN(f, " [%p] %s %s c=%s[%p] f=%s[%p] m=%s[%p] %s %s %s\n", + pin, pin->name, pin->dir == MP_PIN_IN ? "->" : "<-", + pin->user_conn ? filt_name(pin->user_conn->owner) : "-", pin->user_conn, + pin->conn ? filt_name(pin->conn->owner) : "-", pin->conn, + filt_name(pin->manual_connection), pin->manual_connection, + pin->within_conn ? "(within)" : "", + pin->data_requested ? "(request)" : "", + mp_frame_type_str(pin->data.type)); +} + +void mp_filter_dump_states(struct mp_filter *f) +{ + MP_WARN(f, "%s[%p] (%s[%p])\n", filt_name(f), f, + filt_name(f->in->parent), f->in->parent); + for (int n = 0; n < f->num_pins; n++) { + dump_pin_state(f, f->pins[n]); + dump_pin_state(f, f->ppins[n]); + } + + for (int n = 0; n < f->in->num_children; n++) + mp_filter_dump_states(f->in->children[n]); +} |