diff options
Diffstat (limited to 'filters/filter.c')
-rw-r--r-- | filters/filter.c | 194 |
1 files changed, 152 insertions, 42 deletions
diff --git a/filters/filter.c b/filters/filter.c index beec13c210..1d13393194 100644 --- a/filters/filter.c +++ b/filters/filter.c @@ -1,9 +1,15 @@ -#include <pthread.h> +#include <math.h> +#include <stdatomic.h> + +#include <libavutil/hwcontext.h> #include "common/common.h" #include "common/global.h" #include "common/msg.h" +#include "osdep/threads.h" +#include "osdep/timer.h" #include "video/hwdec.h" +#include "video/img_format.h" #include "filter.h" #include "filter_internal.h" @@ -65,9 +71,15 @@ struct filter_runner { struct mp_filter *root_filter; + double max_run_time; + atomic_bool interrupt_flag; + // If we're currently running the filter graph (for avoiding recursion). bool filtering; + // If set, recursive filtering was initiated through this pin. + struct mp_pin *recursive; + // Set of filters which need process() to be called. A filter is in this // array iff mp_filter_internal.pending==true. struct mp_filter **pending; @@ -78,7 +90,7 @@ struct filter_runner { // For async notifications only. We don't bother making this fine grained // across filters. - pthread_mutex_t async_lock; + mp_mutex async_lock; // Wakeup is pending. Protected by async_lock. bool async_wakeup_sent; @@ -101,13 +113,13 @@ struct mp_filter_internal { struct mp_filter *error_handler; char *name; + bool high_priority; bool pending; bool async_pending; bool failed; }; - // Called when new work needs to be done on a pin belonging to the filter: // - new data was requested // - new data has been queued @@ -124,21 +136,37 @@ static void add_pending(struct mp_filter *f) // This should probably really be some sort of priority queue, but for now // something naive and dumb does the job too. f->in->pending = true; - MP_TARRAY_APPEND(r, r->pending, r->num_pending, f); + if (f->in->high_priority) { + MP_TARRAY_INSERT_AT(r, r->pending, r->num_pending, 0, f); + } else { + MP_TARRAY_APPEND(r, r->pending, r->num_pending, f); + } +} + +static void add_pending_pin(struct mp_pin *p) +{ + struct mp_filter *f = p->manual_connection; + assert(f); + + if (f->in->pending) + return; + + add_pending(f); // Need to tell user that something changed. - if (f == r->root_filter) - r->external_pending = true; + if (f == f->in->runner->root_filter && p != f->in->runner->recursive) + f->in->runner->external_pending = true; } // Possibly enter recursive filtering. This is done as convenience for // "external" filter users only. (Normal filtering does this iteratively via -// mp_filter_run() to avoid filter reentrancy issues and deep call stacks.) If -// the API users uses an external manually connected pin, do recursive filtering -// as a not strictly necessary feature which makes outside I/O with filters -// easier. -static void filter_recursive(struct mp_filter *f) +// mp_filter_graph_run() to avoid filter reentrancy issues and deep call +// stacks.) If the API users uses an external manually connected pin, do +// recursive filtering as a not strictly necessary feature which makes outside +// I/O with filters easier. +static void filter_recursive(struct mp_pin *p) { + struct mp_filter *f = p->conn->manual_connection; assert(f); struct filter_runner *r = f->in->runner; @@ -146,9 +174,15 @@ static void filter_recursive(struct mp_filter *f) if (r->filtering) return; + assert(!r->recursive); + r->recursive = p; + // Also don't lose the pending state, which the user may or may not // care about. - r->external_pending |= mp_filter_run(r->root_filter); + r->external_pending |= mp_filter_graph_run(r->root_filter); + + assert(r->recursive == p); + r->recursive = NULL; } void mp_filter_internal_mark_progress(struct mp_filter *f) @@ -162,7 +196,7 @@ void mp_filter_internal_mark_progress(struct mp_filter *f) // sync notifications don't need any locking. static void flush_async_notifications(struct filter_runner *r) { - pthread_mutex_lock(&r->async_lock); + mp_mutex_lock(&r->async_lock); for (int n = 0; n < r->num_async_pending; n++) { struct mp_filter *f = r->async_pending[n]; add_pending(f); @@ -170,12 +204,17 @@ static void flush_async_notifications(struct filter_runner *r) } r->num_async_pending = 0; r->async_wakeup_sent = false; - pthread_mutex_unlock(&r->async_lock); + mp_mutex_unlock(&r->async_lock); } -bool mp_filter_run(struct mp_filter *filter) +bool mp_filter_graph_run(struct mp_filter *filter) { struct filter_runner *r = filter->in->runner; + assert(filter == r->root_filter); // user is supposed to call this on root only + + int64_t end_time = 0; + if (isfinite(r->max_run_time)) + end_time = mp_time_ns_add(mp_time_ns(), MPMAX(r->max_run_time, 0)); // (could happen with separate filter graphs calling each other, for now // ignore this issue as we don't use such a setup anywhere) @@ -185,13 +224,45 @@ bool mp_filter_run(struct mp_filter *filter) flush_async_notifications(r); - while (r->num_pending) { - struct mp_filter *next = r->pending[r->num_pending - 1]; - r->num_pending -= 1; - next->in->pending = false; + bool exit_req = false; + + while (1) { + if (atomic_exchange_explicit(&r->interrupt_flag, false, + memory_order_acq_rel)) + { + mp_mutex_lock(&r->async_lock); + if (!r->async_wakeup_sent && r->wakeup_cb) + r->wakeup_cb(r->wakeup_ctx); + r->async_wakeup_sent = true; + mp_mutex_unlock(&r->async_lock); + exit_req = true; + } + + if (!r->num_pending) { + flush_async_notifications(r); + if (!r->num_pending) + break; + } + + struct mp_filter *next = NULL; + if (r->pending[0]->in->high_priority) { + next = r->pending[0]; + MP_TARRAY_REMOVE_AT(r->pending, r->num_pending, 0); + } else if (!exit_req) { + next = r->pending[r->num_pending - 1]; + r->num_pending -= 1; + } + + if (!next) + break; + + next->in->pending = false; if (next->in->info->process) next->in->info->process(next); + + if (end_time && mp_time_ns() >= end_time) + mp_filter_graph_interrupt(r->root_filter); } r->filtering = false; @@ -232,8 +303,8 @@ bool mp_pin_in_write(struct mp_pin *p, struct mp_frame frame) assert(p->conn->data.type == MP_FRAME_NONE); p->conn->data = frame; p->conn->data_requested = false; - add_pending(p->conn->manual_connection); - filter_recursive(p->conn->manual_connection); + add_pending_pin(p->conn); + filter_recursive(p); return true; } @@ -251,9 +322,9 @@ bool mp_pin_out_request_data(struct mp_pin *p) if (p->conn && p->conn->manual_connection) { if (!p->data_requested) { p->data_requested = true; - add_pending(p->conn->manual_connection); + add_pending_pin(p->conn); } - filter_recursive(p->conn->manual_connection); + filter_recursive(p); } return mp_pin_out_has_data(p); } @@ -261,7 +332,7 @@ bool mp_pin_out_request_data(struct mp_pin *p) void mp_pin_out_request_data_next(struct mp_pin *p) { if (mp_pin_out_request_data(p)) - add_pending(p->conn->manual_connection); + add_pending_pin(p->conn); } struct mp_frame mp_pin_out_read(struct mp_pin *p) @@ -304,13 +375,15 @@ static struct mp_pin *find_connected_end(struct mp_pin *p) return other; p = other->user_conn; } - assert(0); + MP_ASSERT_UNREACHABLE(); } // With p being part of a connection, create the pin_connection and set all // state flags. static void init_connection(struct mp_pin *p) { + struct filter_runner *runner = p->owner->in->runner; + if (p->dir == MP_PIN_IN) p = p->other; @@ -321,7 +394,13 @@ static void init_connection(struct mp_pin *p) assert(!in->user_conn); assert(!out->user_conn); - // Logicaly, the ends are always manual connections. A pin chain without + // This and similar checks enforce the same root filter requirement. + if (in->manual_connection) + assert(in->manual_connection->in->runner == runner); + if (out->manual_connection) + assert(out->manual_connection->in->runner == runner); + + // Logically, the ends are always manual connections. A pin chain without // manual connections at the ends is still disconnected (or if this // attempted to extend an existing connection, becomes dangling and gets // disconnected). @@ -339,6 +418,7 @@ static void init_connection(struct mp_pin *p) assert(!cur->data.type); // unused for in pins assert(!cur->other->data_requested); // unset for unconnected out pins assert(!cur->other->data.type); // unset for unconnected out pins + assert(cur->owner->in->runner == runner); cur->within_conn = cur->other->within_conn = true; cur = cur->other->user_conn; } @@ -451,6 +531,16 @@ const char *mp_filter_get_name(struct mp_filter *f) return f->in->name; } +const struct mp_filter_info *mp_filter_get_info(struct mp_filter *f) +{ + return f->in->info; +} + +void mp_filter_set_high_priority(struct mp_filter *f, bool pri) +{ + f->in->high_priority = pri; +} + void mp_filter_set_name(struct mp_filter *f, const char *name) { talloc_free(f->in->name); @@ -595,32 +685,36 @@ struct mp_stream_info *mp_filter_find_stream_info(struct mp_filter *f) return NULL; } -struct AVBufferRef *mp_filter_load_hwdec_device(struct mp_filter *f, int avtype) +struct mp_hwdec_ctx *mp_filter_load_hwdec_device(struct mp_filter *f, int imgfmt) { struct mp_stream_info *info = mp_filter_find_stream_info(f); if (!info || !info->hwdec_devs) return NULL; - hwdec_devices_request_all(info->hwdec_devs); + struct hwdec_imgfmt_request params = { + .imgfmt = imgfmt, + .probing = false, + }; + hwdec_devices_request_for_img_fmt(info->hwdec_devs, ¶ms); - return hwdec_devices_get_lavc(info->hwdec_devs, avtype); + return hwdec_devices_get_by_imgfmt(info->hwdec_devs, imgfmt); } static void filter_wakeup(struct mp_filter *f, bool mark_only) { struct filter_runner *r = f->in->runner; - pthread_mutex_lock(&r->async_lock); + mp_mutex_lock(&r->async_lock); if (!f->in->async_pending) { f->in->async_pending = true; // (not using a talloc parent for thread safety reasons) MP_TARRAY_APPEND(NULL, r->async_pending, r->num_async_pending, f); - if (!mark_only && !r->async_wakeup_sent) { - if (r->wakeup_cb) - r->wakeup_cb(r->wakeup_ctx); - r->async_wakeup_sent = true; - } } - pthread_mutex_unlock(&r->async_lock); + if (!mark_only && !r->async_wakeup_sent) { + if (r->wakeup_cb) + r->wakeup_cb(r->wakeup_ctx); + r->async_wakeup_sent = true; + } + mp_mutex_unlock(&r->async_lock); } void mp_filter_wakeup(struct mp_filter *f) @@ -633,6 +727,20 @@ void mp_filter_mark_async_progress(struct mp_filter *f) filter_wakeup(f, true); } +void mp_filter_graph_set_max_run_time(struct mp_filter *f, double seconds) +{ + struct filter_runner *r = f->in->runner; + assert(f == r->root_filter); // user is supposed to call this on root only + r->max_run_time = seconds; +} + +void mp_filter_graph_interrupt(struct mp_filter *f) +{ + struct filter_runner *r = f->in->runner; + assert(f == r->root_filter); // user is supposed to call this on root only + atomic_store(&r->interrupt_flag, true); +} + void mp_filter_free_children(struct mp_filter *f) { while(f->in->num_children) @@ -676,7 +784,7 @@ static void filter_destructor(void *p) if (r->root_filter == f) { assert(!f->in->parent); - pthread_mutex_destroy(&r->async_lock); + mp_mutex_destroy(&r->async_lock); talloc_free(r->async_pending); talloc_free(r); } @@ -706,8 +814,9 @@ struct mp_filter *mp_filter_create_with_params(struct mp_filter_params *params) *f->in->runner = (struct filter_runner){ .global = params->global, .root_filter = f, + .max_run_time = INFINITY, }; - pthread_mutex_init(&f->in->runner->async_lock, NULL); + mp_mutex_init(&f->in->runner->async_lock); } if (!f->global) @@ -758,14 +867,15 @@ struct mp_filter *mp_filter_create_root(struct mpv_global *global) return mp_filter_create_with_params(¶ms); } -void mp_filter_root_set_wakeup_cb(struct mp_filter *root, - void (*wakeup_cb)(void *ctx), void *ctx) +void mp_filter_graph_set_wakeup_cb(struct mp_filter *root, + void (*wakeup_cb)(void *ctx), void *ctx) { struct filter_runner *r = root->in->runner; - pthread_mutex_lock(&r->async_lock); + assert(root == r->root_filter); // user is supposed to call this on root only + mp_mutex_lock(&r->async_lock); r->wakeup_cb = wakeup_cb; r->wakeup_ctx = ctx; - pthread_mutex_unlock(&r->async_lock); + mp_mutex_unlock(&r->async_lock); } static const char *filt_name(struct mp_filter *f) |