diff options
author | Kacper Michajłow <kasper93@gmail.com> | 2023-10-21 04:55:41 +0200 |
---|---|---|
committer | Dudemanguy <random342@airmail.cc> | 2023-11-05 17:36:17 +0000 |
commit | 174df99ffa53f1091589eaa4fa0c16cdd55a9326 (patch) | |
tree | 3a60d45615f18beed98a9b08267c28ed7e05dd5f /filters/f_async_queue.c | |
parent | 3a8b107f6216b38a151d5ca1e9d4f2727e3418f5 (diff) | |
download | mpv-174df99ffa53f1091589eaa4fa0c16cdd55a9326.tar.bz2 mpv-174df99ffa53f1091589eaa4fa0c16cdd55a9326.tar.xz |
ALL: use new mp_thread abstraction
Diffstat (limited to 'filters/f_async_queue.c')
-rw-r--r-- | filters/f_async_queue.c | 60 |
1 files changed, 30 insertions, 30 deletions
diff --git a/filters/f_async_queue.c b/filters/f_async_queue.c index d7df39c13f..95db385d7f 100644 --- a/filters/f_async_queue.c +++ b/filters/f_async_queue.c @@ -1,10 +1,10 @@ #include <limits.h> -#include <pthread.h> #include <stdatomic.h> #include "audio/aframe.h" #include "common/common.h" #include "common/msg.h" +#include "osdep/threads.h" #include "f_async_queue.h" #include "filter_internal.h" @@ -18,7 +18,7 @@ struct mp_async_queue { struct async_queue { _Atomic uint64_t refcount; - pthread_mutex_t lock; + mp_mutex lock; // -- protected by lock struct mp_async_queue_config cfg; @@ -34,7 +34,7 @@ struct async_queue { static void reset_queue(struct async_queue *q) { - pthread_mutex_lock(&q->lock); + mp_mutex_lock(&q->lock); q->active = q->reading = false; for (int n = 0; n < q->num_frames; n++) mp_frame_unref(&q->frames[n]); @@ -46,7 +46,7 @@ static void reset_queue(struct async_queue *q) if (q->conn[n]) mp_filter_wakeup(q->conn[n]); } - pthread_mutex_unlock(&q->lock); + mp_mutex_unlock(&q->lock); } static void unref_queue(struct async_queue *q) @@ -57,7 +57,7 @@ static void unref_queue(struct async_queue *q) assert(count >= 0); if (count == 0) { reset_queue(q); - pthread_mutex_destroy(&q->lock); + mp_mutex_destroy(&q->lock); talloc_free(q); } } @@ -75,7 +75,7 @@ struct mp_async_queue *mp_async_queue_create(void) *r->q = (struct async_queue){ .refcount = 1, }; - pthread_mutex_init(&r->q->lock, NULL); + mp_mutex_init(&r->q->lock); talloc_set_destructor(r, on_free_queue); mp_async_queue_set_config(r, (struct mp_async_queue_config){0}); return r; @@ -142,12 +142,12 @@ void mp_async_queue_set_config(struct mp_async_queue *queue, cfg.max_samples = MPMAX(cfg.max_samples, 1); - pthread_mutex_lock(&q->lock); + mp_mutex_lock(&q->lock); bool recompute = q->cfg.sample_unit != cfg.sample_unit; q->cfg = cfg; if (recompute) recompute_sizes(q); - pthread_mutex_unlock(&q->lock); + mp_mutex_unlock(&q->lock); } void mp_async_queue_reset(struct mp_async_queue *queue) @@ -158,18 +158,18 @@ void mp_async_queue_reset(struct mp_async_queue *queue) bool mp_async_queue_is_active(struct mp_async_queue *queue) { struct async_queue *q = queue->q; - pthread_mutex_lock(&q->lock); + mp_mutex_lock(&q->lock); bool res = q->active; - pthread_mutex_unlock(&q->lock); + mp_mutex_unlock(&q->lock); return res; } bool mp_async_queue_is_full(struct mp_async_queue *queue) { struct async_queue *q = queue->q; - pthread_mutex_lock(&q->lock); + mp_mutex_lock(&q->lock); bool res = is_full(q); - pthread_mutex_unlock(&q->lock); + mp_mutex_unlock(&q->lock); return res; } @@ -177,21 +177,21 @@ void mp_async_queue_resume(struct mp_async_queue *queue) { struct async_queue *q = queue->q; - pthread_mutex_lock(&q->lock); + mp_mutex_lock(&q->lock); if (!q->active) { q->active = true; // Possibly make the consumer request new frames. if (q->conn[1]) mp_filter_wakeup(q->conn[1]); } - pthread_mutex_unlock(&q->lock); + mp_mutex_unlock(&q->lock); } void mp_async_queue_resume_reading(struct mp_async_queue *queue) { struct async_queue *q = queue->q; - pthread_mutex_lock(&q->lock); + mp_mutex_lock(&q->lock); if (!q->active || !q->reading) { q->active = true; q->reading = true; @@ -201,24 +201,24 @@ void mp_async_queue_resume_reading(struct mp_async_queue *queue) mp_filter_wakeup(q->conn[n]); } } - pthread_mutex_unlock(&q->lock); + mp_mutex_unlock(&q->lock); } int64_t mp_async_queue_get_samples(struct mp_async_queue *queue) { struct async_queue *q = queue->q; - pthread_mutex_lock(&q->lock); + mp_mutex_lock(&q->lock); int64_t res = q->samples_size; - pthread_mutex_unlock(&q->lock); + mp_mutex_unlock(&q->lock); return res; } int mp_async_queue_get_frames(struct mp_async_queue *queue) { struct async_queue *q = queue->q; - pthread_mutex_lock(&q->lock); + mp_mutex_lock(&q->lock); int res = q->num_frames; - pthread_mutex_unlock(&q->lock); + mp_mutex_unlock(&q->lock); return res; } @@ -232,12 +232,12 @@ static void destroy(struct mp_filter *f) struct priv *p = f->priv; struct async_queue *q = p->q; - pthread_mutex_lock(&q->lock); + mp_mutex_lock(&q->lock); for (int n = 0; n < 2; n++) { if (q->conn[n] == f) q->conn[n] = NULL; } - pthread_mutex_unlock(&q->lock); + mp_mutex_unlock(&q->lock); unref_queue(q); } @@ -248,7 +248,7 @@ static void process_in(struct mp_filter *f) struct async_queue *q = p->q; assert(q->conn[0] == f); - pthread_mutex_lock(&q->lock); + mp_mutex_lock(&q->lock); if (!q->reading) { // mp_async_queue_reset()/reset_queue() is usually called asynchronously, // so we might have requested a frame earlier, and now can't use it. @@ -274,7 +274,7 @@ static void process_in(struct mp_filter *f) } if (p->notify && !q->num_frames) mp_filter_wakeup(p->notify); - pthread_mutex_unlock(&q->lock); + mp_mutex_unlock(&q->lock); } static void process_out(struct mp_filter *f) @@ -286,7 +286,7 @@ static void process_out(struct mp_filter *f) if (!mp_pin_in_needs_data(f->ppins[0])) return; - pthread_mutex_lock(&q->lock); + mp_mutex_lock(&q->lock); if (q->active && !q->reading) { q->reading = true; mp_filter_wakeup(q->conn[0]); @@ -301,7 +301,7 @@ static void process_out(struct mp_filter *f) if (q->conn[0]) mp_filter_wakeup(q->conn[0]); } - pthread_mutex_unlock(&q->lock); + mp_mutex_unlock(&q->lock); } static void reset(struct mp_filter *f) @@ -309,12 +309,12 @@ static void reset(struct mp_filter *f) struct priv *p = f->priv; struct async_queue *q = p->q; - pthread_mutex_lock(&q->lock); + mp_mutex_lock(&q->lock); // If the queue is in reading state, it is logical that it should request // input immediately. if (mp_pin_get_dir(f->pins[0]) == MP_PIN_IN && q->reading) mp_filter_wakeup(f); - pthread_mutex_unlock(&q->lock); + mp_mutex_unlock(&q->lock); } // producer @@ -365,11 +365,11 @@ struct mp_filter *mp_async_queue_create_filter(struct mp_filter *parent, atomic_fetch_add(&q->refcount, 1); p->q = q; - pthread_mutex_lock(&q->lock); + mp_mutex_lock(&q->lock); int slot = is_in ? 0 : 1; assert(!q->conn[slot]); // fails if already connected on this end q->conn[slot] = f; - pthread_mutex_unlock(&q->lock); + mp_mutex_unlock(&q->lock); return f; } |