diff options
author | Kacper Michajłow <kasper93@gmail.com> | 2023-10-21 04:55:41 +0200 |
---|---|---|
committer | Dudemanguy <random342@airmail.cc> | 2023-11-05 17:36:17 +0000 |
commit | 174df99ffa53f1091589eaa4fa0c16cdd55a9326 (patch) | |
tree | 3a60d45615f18beed98a9b08267c28ed7e05dd5f /filters | |
parent | 3a8b107f6216b38a151d5ca1e9d4f2727e3418f5 (diff) | |
download | mpv-174df99ffa53f1091589eaa4fa0c16cdd55a9326.tar.bz2 mpv-174df99ffa53f1091589eaa4fa0c16cdd55a9326.tar.xz |
ALL: use new mp_thread abstraction
Diffstat (limited to 'filters')
-rw-r--r-- | filters/f_async_queue.c | 60 | ||||
-rw-r--r-- | filters/f_decoder_wrapper.c | 75 | ||||
-rw-r--r-- | filters/filter.c | 24 |
3 files changed, 79 insertions, 80 deletions
diff --git a/filters/f_async_queue.c b/filters/f_async_queue.c index d7df39c13f..95db385d7f 100644 --- a/filters/f_async_queue.c +++ b/filters/f_async_queue.c @@ -1,10 +1,10 @@ #include <limits.h> -#include <pthread.h> #include <stdatomic.h> #include "audio/aframe.h" #include "common/common.h" #include "common/msg.h" +#include "osdep/threads.h" #include "f_async_queue.h" #include "filter_internal.h" @@ -18,7 +18,7 @@ struct mp_async_queue { struct async_queue { _Atomic uint64_t refcount; - pthread_mutex_t lock; + mp_mutex lock; // -- protected by lock struct mp_async_queue_config cfg; @@ -34,7 +34,7 @@ struct async_queue { static void reset_queue(struct async_queue *q) { - pthread_mutex_lock(&q->lock); + mp_mutex_lock(&q->lock); q->active = q->reading = false; for (int n = 0; n < q->num_frames; n++) mp_frame_unref(&q->frames[n]); @@ -46,7 +46,7 @@ static void reset_queue(struct async_queue *q) if (q->conn[n]) mp_filter_wakeup(q->conn[n]); } - pthread_mutex_unlock(&q->lock); + mp_mutex_unlock(&q->lock); } static void unref_queue(struct async_queue *q) @@ -57,7 +57,7 @@ static void unref_queue(struct async_queue *q) assert(count >= 0); if (count == 0) { reset_queue(q); - pthread_mutex_destroy(&q->lock); + mp_mutex_destroy(&q->lock); talloc_free(q); } } @@ -75,7 +75,7 @@ struct mp_async_queue *mp_async_queue_create(void) *r->q = (struct async_queue){ .refcount = 1, }; - pthread_mutex_init(&r->q->lock, NULL); + mp_mutex_init(&r->q->lock); talloc_set_destructor(r, on_free_queue); mp_async_queue_set_config(r, (struct mp_async_queue_config){0}); return r; @@ -142,12 +142,12 @@ void mp_async_queue_set_config(struct mp_async_queue *queue, cfg.max_samples = MPMAX(cfg.max_samples, 1); - pthread_mutex_lock(&q->lock); + mp_mutex_lock(&q->lock); bool recompute = q->cfg.sample_unit != cfg.sample_unit; q->cfg = cfg; if (recompute) recompute_sizes(q); - pthread_mutex_unlock(&q->lock); + mp_mutex_unlock(&q->lock); } void mp_async_queue_reset(struct mp_async_queue *queue) @@ -158,18 +158,18 @@ void mp_async_queue_reset(struct mp_async_queue *queue) bool mp_async_queue_is_active(struct mp_async_queue *queue) { struct async_queue *q = queue->q; - pthread_mutex_lock(&q->lock); + mp_mutex_lock(&q->lock); bool res = q->active; - pthread_mutex_unlock(&q->lock); + mp_mutex_unlock(&q->lock); return res; } bool mp_async_queue_is_full(struct mp_async_queue *queue) { struct async_queue *q = queue->q; - pthread_mutex_lock(&q->lock); + mp_mutex_lock(&q->lock); bool res = is_full(q); - pthread_mutex_unlock(&q->lock); + mp_mutex_unlock(&q->lock); return res; } @@ -177,21 +177,21 @@ void mp_async_queue_resume(struct mp_async_queue *queue) { struct async_queue *q = queue->q; - pthread_mutex_lock(&q->lock); + mp_mutex_lock(&q->lock); if (!q->active) { q->active = true; // Possibly make the consumer request new frames. if (q->conn[1]) mp_filter_wakeup(q->conn[1]); } - pthread_mutex_unlock(&q->lock); + mp_mutex_unlock(&q->lock); } void mp_async_queue_resume_reading(struct mp_async_queue *queue) { struct async_queue *q = queue->q; - pthread_mutex_lock(&q->lock); + mp_mutex_lock(&q->lock); if (!q->active || !q->reading) { q->active = true; q->reading = true; @@ -201,24 +201,24 @@ void mp_async_queue_resume_reading(struct mp_async_queue *queue) mp_filter_wakeup(q->conn[n]); } } - pthread_mutex_unlock(&q->lock); + mp_mutex_unlock(&q->lock); } int64_t mp_async_queue_get_samples(struct mp_async_queue *queue) { struct async_queue *q = queue->q; - pthread_mutex_lock(&q->lock); + mp_mutex_lock(&q->lock); int64_t res = q->samples_size; - pthread_mutex_unlock(&q->lock); + mp_mutex_unlock(&q->lock); return res; } int mp_async_queue_get_frames(struct mp_async_queue *queue) { struct async_queue *q = queue->q; - pthread_mutex_lock(&q->lock); + mp_mutex_lock(&q->lock); int res = q->num_frames; - pthread_mutex_unlock(&q->lock); + mp_mutex_unlock(&q->lock); return res; } @@ -232,12 +232,12 @@ static void destroy(struct mp_filter *f) struct priv *p = f->priv; struct async_queue *q = p->q; - pthread_mutex_lock(&q->lock); + mp_mutex_lock(&q->lock); for (int n = 0; n < 2; n++) { if (q->conn[n] == f) q->conn[n] = NULL; } - pthread_mutex_unlock(&q->lock); + mp_mutex_unlock(&q->lock); unref_queue(q); } @@ -248,7 +248,7 @@ static void process_in(struct mp_filter *f) struct async_queue *q = p->q; assert(q->conn[0] == f); - pthread_mutex_lock(&q->lock); + mp_mutex_lock(&q->lock); if (!q->reading) { // mp_async_queue_reset()/reset_queue() is usually called asynchronously, // so we might have requested a frame earlier, and now can't use it. @@ -274,7 +274,7 @@ static void process_in(struct mp_filter *f) } if (p->notify && !q->num_frames) mp_filter_wakeup(p->notify); - pthread_mutex_unlock(&q->lock); + mp_mutex_unlock(&q->lock); } static void process_out(struct mp_filter *f) @@ -286,7 +286,7 @@ static void process_out(struct mp_filter *f) if (!mp_pin_in_needs_data(f->ppins[0])) return; - pthread_mutex_lock(&q->lock); + mp_mutex_lock(&q->lock); if (q->active && !q->reading) { q->reading = true; mp_filter_wakeup(q->conn[0]); @@ -301,7 +301,7 @@ static void process_out(struct mp_filter *f) if (q->conn[0]) mp_filter_wakeup(q->conn[0]); } - pthread_mutex_unlock(&q->lock); + mp_mutex_unlock(&q->lock); } static void reset(struct mp_filter *f) @@ -309,12 +309,12 @@ static void reset(struct mp_filter *f) struct priv *p = f->priv; struct async_queue *q = p->q; - pthread_mutex_lock(&q->lock); + mp_mutex_lock(&q->lock); // If the queue is in reading state, it is logical that it should request // input immediately. if (mp_pin_get_dir(f->pins[0]) == MP_PIN_IN && q->reading) mp_filter_wakeup(f); - pthread_mutex_unlock(&q->lock); + mp_mutex_unlock(&q->lock); } // producer @@ -365,11 +365,11 @@ struct mp_filter *mp_async_queue_create_filter(struct mp_filter *parent, atomic_fetch_add(&q->refcount, 1); p->q = q; - pthread_mutex_lock(&q->lock); + mp_mutex_lock(&q->lock); int slot = is_in ? 0 : 1; assert(!q->conn[slot]); // fails if already connected on this end q->conn[slot] = f; - pthread_mutex_unlock(&q->lock); + mp_mutex_unlock(&q->lock); return f; } diff --git a/filters/f_decoder_wrapper.c b/filters/f_decoder_wrapper.c index ad090ecd7f..6551e3478f 100644 --- a/filters/f_decoder_wrapper.c +++ b/filters/f_decoder_wrapper.c @@ -21,7 +21,6 @@ #include <stdbool.h> #include <math.h> #include <assert.h> -#include <pthread.h> #include <libavutil/buffer.h> #include <libavutil/common.h> @@ -219,9 +218,9 @@ struct priv { struct mp_async_queue *queue; // decoded frame output queue struct mp_dispatch_queue *dec_dispatch; // non-NULL if decoding thread used bool dec_thread_lock; // debugging (esp. for no-thread case) - pthread_t dec_thread; + mp_thread dec_thread; bool dec_thread_valid; - pthread_mutex_t cache_lock; + mp_mutex cache_lock; // --- Protected by cache_lock. char *cur_hwdec; @@ -259,13 +258,13 @@ static int decoder_list_help(struct mp_log *log, const m_option_t *opt, // thread state. Must run on/locked with decoder thread. static void update_cached_values(struct priv *p) { - pthread_mutex_lock(&p->cache_lock); + mp_mutex_lock(&p->cache_lock); p->cur_hwdec = NULL; if (p->decoder && p->decoder->control) p->decoder->control(p->decoder->f, VDCTRL_GET_HWDEC, &p->cur_hwdec); - pthread_mutex_unlock(&p->cache_lock); + mp_mutex_unlock(&p->cache_lock); } // Lock the decoder thread. This may synchronously wait until the decoder thread @@ -324,11 +323,11 @@ static void decf_reset(struct mp_filter *f) p->pts = MP_NOPTS_VALUE; p->last_format = p->fixed_format = (struct mp_image_params){0}; - pthread_mutex_lock(&p->cache_lock); + mp_mutex_lock(&p->cache_lock); p->pts_reset = false; p->attempt_framedrops = 0; p->dropped_frames = 0; - pthread_mutex_unlock(&p->cache_lock); + mp_mutex_unlock(&p->cache_lock); p->coverart_returned = 0; @@ -347,9 +346,9 @@ int mp_decoder_wrapper_control(struct mp_decoder_wrapper *d, struct priv *p = d->f->priv; int res = CONTROL_UNKNOWN; if (cmd == VDCTRL_GET_HWDEC) { - pthread_mutex_lock(&p->cache_lock); + mp_mutex_lock(&p->cache_lock); *(char **)arg = p->cur_hwdec; - pthread_mutex_unlock(&p->cache_lock); + mp_mutex_unlock(&p->cache_lock); } else { thread_lock(p); if (p->decoder && p->decoder->control) @@ -415,9 +414,9 @@ static bool reinit_decoder(struct priv *p) user_list = p->opts->audio_decoders; fallback = "aac"; - pthread_mutex_lock(&p->cache_lock); + mp_mutex_lock(&p->cache_lock); bool try_spdif = p->try_spdif; - pthread_mutex_unlock(&p->cache_lock); + mp_mutex_unlock(&p->cache_lock); if (try_spdif && p->codec->codec) { struct mp_decoder_list *spdif = @@ -450,11 +449,11 @@ static bool reinit_decoder(struct priv *p) p->decoder = driver->create(p->decf, p->codec, sel->decoder); if (p->decoder) { - pthread_mutex_lock(&p->cache_lock); + mp_mutex_lock(&p->cache_lock); const char *d = sel->desc && sel->desc[0] ? sel->desc : sel->decoder; p->decoder_desc = talloc_strdup(p, d); MP_VERBOSE(p, "Selected codec: %s\n", p->decoder_desc); - pthread_mutex_unlock(&p->cache_lock); + mp_mutex_unlock(&p->cache_lock); break; } @@ -485,25 +484,25 @@ void mp_decoder_wrapper_get_desc(struct mp_decoder_wrapper *d, char *buf, size_t buf_size) { struct priv *p = d->f->priv; - pthread_mutex_lock(&p->cache_lock); + mp_mutex_lock(&p->cache_lock); snprintf(buf, buf_size, "%s", p->decoder_desc ? p->decoder_desc : ""); - pthread_mutex_unlock(&p->cache_lock); + mp_mutex_unlock(&p->cache_lock); } void mp_decoder_wrapper_set_frame_drops(struct mp_decoder_wrapper *d, int num) { struct priv *p = d->f->priv; - pthread_mutex_lock(&p->cache_lock); + mp_mutex_lock(&p->cache_lock); p->attempt_framedrops = num; - pthread_mutex_unlock(&p->cache_lock); + mp_mutex_unlock(&p->cache_lock); } int mp_decoder_wrapper_get_frames_dropped(struct mp_decoder_wrapper *d) { struct priv *p = d->f->priv; - pthread_mutex_lock(&p->cache_lock); + mp_mutex_lock(&p->cache_lock); int res = p->dropped_frames; - pthread_mutex_unlock(&p->cache_lock); + mp_mutex_unlock(&p->cache_lock); return res; } @@ -519,25 +518,25 @@ double mp_decoder_wrapper_get_container_fps(struct mp_decoder_wrapper *d) void mp_decoder_wrapper_set_spdif_flag(struct mp_decoder_wrapper *d, bool spdif) { struct priv *p = d->f->priv; - pthread_mutex_lock(&p->cache_lock); + mp_mutex_lock(&p->cache_lock); p->try_spdif = spdif; - pthread_mutex_unlock(&p->cache_lock); + mp_mutex_unlock(&p->cache_lock); } void mp_decoder_wrapper_set_coverart_flag(struct mp_decoder_wrapper *d, bool c) { struct priv *p = d->f->priv; - pthread_mutex_lock(&p->cache_lock); + mp_mutex_lock(&p->cache_lock); p->attached_picture = c; - pthread_mutex_unlock(&p->cache_lock); + mp_mutex_unlock(&p->cache_lock); } bool mp_decoder_wrapper_get_pts_reset(struct mp_decoder_wrapper *d) { struct priv *p = d->f->priv; - pthread_mutex_lock(&p->cache_lock); + mp_mutex_lock(&p->cache_lock); bool res = p->pts_reset; - pthread_mutex_unlock(&p->cache_lock); + mp_mutex_unlock(&p->cache_lock); return res; } @@ -800,9 +799,9 @@ static void correct_audio_pts(struct priv *p, struct mp_aframe *aframe) if (p->pts != MP_NOPTS_VALUE && diff > 0.1) { MP_WARN(p, "Invalid audio PTS: %f -> %f\n", p->pts, frame_pts); if (diff >= 5) { - pthread_mutex_lock(&p->cache_lock); + mp_mutex_lock(&p->cache_lock); p->pts_reset = true; - pthread_mutex_unlock(&p->cache_lock); + mp_mutex_unlock(&p->cache_lock); } } @@ -902,10 +901,10 @@ static void feed_packet(struct priv *p) int framedrop_type = 0; - pthread_mutex_lock(&p->cache_lock); + mp_mutex_lock(&p->cache_lock); if (p->attempt_framedrops) framedrop_type = 1; - pthread_mutex_unlock(&p->cache_lock); + mp_mutex_unlock(&p->cache_lock); if (start_pts != MP_NOPTS_VALUE && packet && p->play_dir > 0 && packet->pts < start_pts - .005 && !p->has_broken_packet_pts) @@ -1003,7 +1002,7 @@ static void read_frame(struct priv *p) if (!frame.type) return; - pthread_mutex_lock(&p->cache_lock); + mp_mutex_lock(&p->cache_lock); if (p->attached_picture && frame.type == MP_FRAME_VIDEO) p->decoded_coverart = frame; if (p->attempt_framedrops) { @@ -1011,7 +1010,7 @@ static void read_frame(struct priv *p) p->attempt_framedrops = MPMAX(0, p->attempt_framedrops - dropped); p->dropped_frames += dropped; } - pthread_mutex_unlock(&p->cache_lock); + mp_mutex_unlock(&p->cache_lock); if (p->decoded_coverart.type) { mp_filter_internal_mark_progress(p->decf); @@ -1098,7 +1097,7 @@ static void decf_process(struct mp_filter *f) read_frame(p); } -static void *dec_thread(void *ptr) +static MP_THREAD_VOID dec_thread(void *ptr) { struct priv *p = ptr; @@ -1107,7 +1106,7 @@ static void *dec_thread(void *ptr) case STREAM_VIDEO: t_name = "dec/video"; break; case STREAM_AUDIO: t_name = "dec/audio"; break; } - mpthread_set_name(t_name); + mp_thread_set_name(t_name); while (!p->request_terminate_dec_thread) { mp_filter_graph_run(p->dec_root_filter); @@ -1115,7 +1114,7 @@ static void *dec_thread(void *ptr) mp_dispatch_queue_process(p->dec_dispatch, INFINITY); } - return NULL; + MP_THREAD_RETURN(); } static void public_f_reset(struct mp_filter *f) @@ -1145,7 +1144,7 @@ static void public_f_destroy(struct mp_filter *f) p->request_terminate_dec_thread = 1; mp_dispatch_interrupt(p->dec_dispatch); thread_unlock(p); - pthread_join(p->dec_thread, NULL); + mp_thread_join(p->dec_thread); p->dec_thread_valid = false; } @@ -1153,7 +1152,7 @@ static void public_f_destroy(struct mp_filter *f) talloc_free(p->dec_root_filter); talloc_free(p->queue); - pthread_mutex_destroy(&p->cache_lock); + mp_mutex_destroy(&p->cache_lock); } static const struct mp_filter_info decf_filter = { @@ -1194,7 +1193,7 @@ struct mp_decoder_wrapper *mp_decoder_wrapper_create(struct mp_filter *parent, struct priv *p = public_f->priv; p->public.f = public_f; - pthread_mutex_init(&p->cache_lock, NULL); + mp_mutex_init(&p->cache_lock); p->opt_cache = m_config_cache_alloc(p, public_f->global, &dec_wrapper_conf); p->opts = p->opt_cache->opts; p->header = src; @@ -1264,7 +1263,7 @@ struct mp_decoder_wrapper *mp_decoder_wrapper_create(struct mp_filter *parent, mp_pin_connect(f_out->pins[0], p->decf->pins[0]); p->dec_thread_valid = true; - if (pthread_create(&p->dec_thread, NULL, dec_thread, p)) { + if (mp_thread_create(&p->dec_thread, dec_thread, p)) { p->dec_thread_valid = false; goto error; } diff --git a/filters/filter.c b/filters/filter.c index c386401c9f..1d13393194 100644 --- a/filters/filter.c +++ b/filters/filter.c @@ -1,5 +1,4 @@ #include <math.h> -#include <pthread.h> #include <stdatomic.h> #include <libavutil/hwcontext.h> @@ -7,6 +6,7 @@ #include "common/common.h" #include "common/global.h" #include "common/msg.h" +#include "osdep/threads.h" #include "osdep/timer.h" #include "video/hwdec.h" #include "video/img_format.h" @@ -90,7 +90,7 @@ struct filter_runner { // For async notifications only. We don't bother making this fine grained // across filters. - pthread_mutex_t async_lock; + mp_mutex async_lock; // Wakeup is pending. Protected by async_lock. bool async_wakeup_sent; @@ -196,7 +196,7 @@ void mp_filter_internal_mark_progress(struct mp_filter *f) // sync notifications don't need any locking. static void flush_async_notifications(struct filter_runner *r) { - pthread_mutex_lock(&r->async_lock); + mp_mutex_lock(&r->async_lock); for (int n = 0; n < r->num_async_pending; n++) { struct mp_filter *f = r->async_pending[n]; add_pending(f); @@ -204,7 +204,7 @@ static void flush_async_notifications(struct filter_runner *r) } r->num_async_pending = 0; r->async_wakeup_sent = false; - pthread_mutex_unlock(&r->async_lock); + mp_mutex_unlock(&r->async_lock); } bool mp_filter_graph_run(struct mp_filter *filter) @@ -230,11 +230,11 @@ bool mp_filter_graph_run(struct mp_filter *filter) if (atomic_exchange_explicit(&r->interrupt_flag, false, memory_order_acq_rel)) { - pthread_mutex_lock(&r->async_lock); + mp_mutex_lock(&r->async_lock); if (!r->async_wakeup_sent && r->wakeup_cb) r->wakeup_cb(r->wakeup_ctx); r->async_wakeup_sent = true; - pthread_mutex_unlock(&r->async_lock); + mp_mutex_unlock(&r->async_lock); exit_req = true; } @@ -703,7 +703,7 @@ struct mp_hwdec_ctx *mp_filter_load_hwdec_device(struct mp_filter *f, int imgfmt static void filter_wakeup(struct mp_filter *f, bool mark_only) { struct filter_runner *r = f->in->runner; - pthread_mutex_lock(&r->async_lock); + mp_mutex_lock(&r->async_lock); if (!f->in->async_pending) { f->in->async_pending = true; // (not using a talloc parent for thread safety reasons) @@ -714,7 +714,7 @@ static void filter_wakeup(struct mp_filter *f, bool mark_only) r->wakeup_cb(r->wakeup_ctx); r->async_wakeup_sent = true; } - pthread_mutex_unlock(&r->async_lock); + mp_mutex_unlock(&r->async_lock); } void mp_filter_wakeup(struct mp_filter *f) @@ -784,7 +784,7 @@ static void filter_destructor(void *p) if (r->root_filter == f) { assert(!f->in->parent); - pthread_mutex_destroy(&r->async_lock); + mp_mutex_destroy(&r->async_lock); talloc_free(r->async_pending); talloc_free(r); } @@ -816,7 +816,7 @@ struct mp_filter *mp_filter_create_with_params(struct mp_filter_params *params) .root_filter = f, .max_run_time = INFINITY, }; - pthread_mutex_init(&f->in->runner->async_lock, NULL); + mp_mutex_init(&f->in->runner->async_lock); } if (!f->global) @@ -872,10 +872,10 @@ void mp_filter_graph_set_wakeup_cb(struct mp_filter *root, { struct filter_runner *r = root->in->runner; assert(root == r->root_filter); // user is supposed to call this on root only - pthread_mutex_lock(&r->async_lock); + mp_mutex_lock(&r->async_lock); r->wakeup_cb = wakeup_cb; r->wakeup_ctx = ctx; - pthread_mutex_unlock(&r->async_lock); + mp_mutex_unlock(&r->async_lock); } static const char *filt_name(struct mp_filter *f) |