summaryrefslogtreecommitdiffstats
path: root/filters/f_async_queue.c
diff options
context:
space:
mode:
authorKacper Michajłow <kasper93@gmail.com>2023-10-21 04:55:41 +0200
committerDudemanguy <random342@airmail.cc>2023-11-05 17:36:17 +0000
commit174df99ffa53f1091589eaa4fa0c16cdd55a9326 (patch)
tree3a60d45615f18beed98a9b08267c28ed7e05dd5f /filters/f_async_queue.c
parent3a8b107f6216b38a151d5ca1e9d4f2727e3418f5 (diff)
downloadmpv-174df99ffa53f1091589eaa4fa0c16cdd55a9326.tar.bz2
mpv-174df99ffa53f1091589eaa4fa0c16cdd55a9326.tar.xz
ALL: use new mp_thread abstraction
Diffstat (limited to 'filters/f_async_queue.c')
-rw-r--r--filters/f_async_queue.c60
1 files changed, 30 insertions, 30 deletions
diff --git a/filters/f_async_queue.c b/filters/f_async_queue.c
index d7df39c13f..95db385d7f 100644
--- a/filters/f_async_queue.c
+++ b/filters/f_async_queue.c
@@ -1,10 +1,10 @@
#include <limits.h>
-#include <pthread.h>
#include <stdatomic.h>
#include "audio/aframe.h"
#include "common/common.h"
#include "common/msg.h"
+#include "osdep/threads.h"
#include "f_async_queue.h"
#include "filter_internal.h"
@@ -18,7 +18,7 @@ struct mp_async_queue {
struct async_queue {
_Atomic uint64_t refcount;
- pthread_mutex_t lock;
+ mp_mutex lock;
// -- protected by lock
struct mp_async_queue_config cfg;
@@ -34,7 +34,7 @@ struct async_queue {
static void reset_queue(struct async_queue *q)
{
- pthread_mutex_lock(&q->lock);
+ mp_mutex_lock(&q->lock);
q->active = q->reading = false;
for (int n = 0; n < q->num_frames; n++)
mp_frame_unref(&q->frames[n]);
@@ -46,7 +46,7 @@ static void reset_queue(struct async_queue *q)
if (q->conn[n])
mp_filter_wakeup(q->conn[n]);
}
- pthread_mutex_unlock(&q->lock);
+ mp_mutex_unlock(&q->lock);
}
static void unref_queue(struct async_queue *q)
@@ -57,7 +57,7 @@ static void unref_queue(struct async_queue *q)
assert(count >= 0);
if (count == 0) {
reset_queue(q);
- pthread_mutex_destroy(&q->lock);
+ mp_mutex_destroy(&q->lock);
talloc_free(q);
}
}
@@ -75,7 +75,7 @@ struct mp_async_queue *mp_async_queue_create(void)
*r->q = (struct async_queue){
.refcount = 1,
};
- pthread_mutex_init(&r->q->lock, NULL);
+ mp_mutex_init(&r->q->lock);
talloc_set_destructor(r, on_free_queue);
mp_async_queue_set_config(r, (struct mp_async_queue_config){0});
return r;
@@ -142,12 +142,12 @@ void mp_async_queue_set_config(struct mp_async_queue *queue,
cfg.max_samples = MPMAX(cfg.max_samples, 1);
- pthread_mutex_lock(&q->lock);
+ mp_mutex_lock(&q->lock);
bool recompute = q->cfg.sample_unit != cfg.sample_unit;
q->cfg = cfg;
if (recompute)
recompute_sizes(q);
- pthread_mutex_unlock(&q->lock);
+ mp_mutex_unlock(&q->lock);
}
void mp_async_queue_reset(struct mp_async_queue *queue)
@@ -158,18 +158,18 @@ void mp_async_queue_reset(struct mp_async_queue *queue)
bool mp_async_queue_is_active(struct mp_async_queue *queue)
{
struct async_queue *q = queue->q;
- pthread_mutex_lock(&q->lock);
+ mp_mutex_lock(&q->lock);
bool res = q->active;
- pthread_mutex_unlock(&q->lock);
+ mp_mutex_unlock(&q->lock);
return res;
}
bool mp_async_queue_is_full(struct mp_async_queue *queue)
{
struct async_queue *q = queue->q;
- pthread_mutex_lock(&q->lock);
+ mp_mutex_lock(&q->lock);
bool res = is_full(q);
- pthread_mutex_unlock(&q->lock);
+ mp_mutex_unlock(&q->lock);
return res;
}
@@ -177,21 +177,21 @@ void mp_async_queue_resume(struct mp_async_queue *queue)
{
struct async_queue *q = queue->q;
- pthread_mutex_lock(&q->lock);
+ mp_mutex_lock(&q->lock);
if (!q->active) {
q->active = true;
// Possibly make the consumer request new frames.
if (q->conn[1])
mp_filter_wakeup(q->conn[1]);
}
- pthread_mutex_unlock(&q->lock);
+ mp_mutex_unlock(&q->lock);
}
void mp_async_queue_resume_reading(struct mp_async_queue *queue)
{
struct async_queue *q = queue->q;
- pthread_mutex_lock(&q->lock);
+ mp_mutex_lock(&q->lock);
if (!q->active || !q->reading) {
q->active = true;
q->reading = true;
@@ -201,24 +201,24 @@ void mp_async_queue_resume_reading(struct mp_async_queue *queue)
mp_filter_wakeup(q->conn[n]);
}
}
- pthread_mutex_unlock(&q->lock);
+ mp_mutex_unlock(&q->lock);
}
int64_t mp_async_queue_get_samples(struct mp_async_queue *queue)
{
struct async_queue *q = queue->q;
- pthread_mutex_lock(&q->lock);
+ mp_mutex_lock(&q->lock);
int64_t res = q->samples_size;
- pthread_mutex_unlock(&q->lock);
+ mp_mutex_unlock(&q->lock);
return res;
}
int mp_async_queue_get_frames(struct mp_async_queue *queue)
{
struct async_queue *q = queue->q;
- pthread_mutex_lock(&q->lock);
+ mp_mutex_lock(&q->lock);
int res = q->num_frames;
- pthread_mutex_unlock(&q->lock);
+ mp_mutex_unlock(&q->lock);
return res;
}
@@ -232,12 +232,12 @@ static void destroy(struct mp_filter *f)
struct priv *p = f->priv;
struct async_queue *q = p->q;
- pthread_mutex_lock(&q->lock);
+ mp_mutex_lock(&q->lock);
for (int n = 0; n < 2; n++) {
if (q->conn[n] == f)
q->conn[n] = NULL;
}
- pthread_mutex_unlock(&q->lock);
+ mp_mutex_unlock(&q->lock);
unref_queue(q);
}
@@ -248,7 +248,7 @@ static void process_in(struct mp_filter *f)
struct async_queue *q = p->q;
assert(q->conn[0] == f);
- pthread_mutex_lock(&q->lock);
+ mp_mutex_lock(&q->lock);
if (!q->reading) {
// mp_async_queue_reset()/reset_queue() is usually called asynchronously,
// so we might have requested a frame earlier, and now can't use it.
@@ -274,7 +274,7 @@ static void process_in(struct mp_filter *f)
}
if (p->notify && !q->num_frames)
mp_filter_wakeup(p->notify);
- pthread_mutex_unlock(&q->lock);
+ mp_mutex_unlock(&q->lock);
}
static void process_out(struct mp_filter *f)
@@ -286,7 +286,7 @@ static void process_out(struct mp_filter *f)
if (!mp_pin_in_needs_data(f->ppins[0]))
return;
- pthread_mutex_lock(&q->lock);
+ mp_mutex_lock(&q->lock);
if (q->active && !q->reading) {
q->reading = true;
mp_filter_wakeup(q->conn[0]);
@@ -301,7 +301,7 @@ static void process_out(struct mp_filter *f)
if (q->conn[0])
mp_filter_wakeup(q->conn[0]);
}
- pthread_mutex_unlock(&q->lock);
+ mp_mutex_unlock(&q->lock);
}
static void reset(struct mp_filter *f)
@@ -309,12 +309,12 @@ static void reset(struct mp_filter *f)
struct priv *p = f->priv;
struct async_queue *q = p->q;
- pthread_mutex_lock(&q->lock);
+ mp_mutex_lock(&q->lock);
// If the queue is in reading state, it is logical that it should request
// input immediately.
if (mp_pin_get_dir(f->pins[0]) == MP_PIN_IN && q->reading)
mp_filter_wakeup(f);
- pthread_mutex_unlock(&q->lock);
+ mp_mutex_unlock(&q->lock);
}
// producer
@@ -365,11 +365,11 @@ struct mp_filter *mp_async_queue_create_filter(struct mp_filter *parent,
atomic_fetch_add(&q->refcount, 1);
p->q = q;
- pthread_mutex_lock(&q->lock);
+ mp_mutex_lock(&q->lock);
int slot = is_in ? 0 : 1;
assert(!q->conn[slot]); // fails if already connected on this end
q->conn[slot] = f;
- pthread_mutex_unlock(&q->lock);
+ mp_mutex_unlock(&q->lock);
return f;
}