summaryrefslogtreecommitdiffstats
path: root/filters
diff options
context:
space:
mode:
authorKacper Michajłow <kasper93@gmail.com>2023-10-21 04:55:41 +0200
committerDudemanguy <random342@airmail.cc>2023-11-05 17:36:17 +0000
commit174df99ffa53f1091589eaa4fa0c16cdd55a9326 (patch)
tree3a60d45615f18beed98a9b08267c28ed7e05dd5f /filters
parent3a8b107f6216b38a151d5ca1e9d4f2727e3418f5 (diff)
downloadmpv-174df99ffa53f1091589eaa4fa0c16cdd55a9326.tar.bz2
mpv-174df99ffa53f1091589eaa4fa0c16cdd55a9326.tar.xz
ALL: use new mp_thread abstraction
Diffstat (limited to 'filters')
-rw-r--r--filters/f_async_queue.c60
-rw-r--r--filters/f_decoder_wrapper.c75
-rw-r--r--filters/filter.c24
3 files changed, 79 insertions, 80 deletions
diff --git a/filters/f_async_queue.c b/filters/f_async_queue.c
index d7df39c13f..95db385d7f 100644
--- a/filters/f_async_queue.c
+++ b/filters/f_async_queue.c
@@ -1,10 +1,10 @@
#include <limits.h>
-#include <pthread.h>
#include <stdatomic.h>
#include "audio/aframe.h"
#include "common/common.h"
#include "common/msg.h"
+#include "osdep/threads.h"
#include "f_async_queue.h"
#include "filter_internal.h"
@@ -18,7 +18,7 @@ struct mp_async_queue {
struct async_queue {
_Atomic uint64_t refcount;
- pthread_mutex_t lock;
+ mp_mutex lock;
// -- protected by lock
struct mp_async_queue_config cfg;
@@ -34,7 +34,7 @@ struct async_queue {
static void reset_queue(struct async_queue *q)
{
- pthread_mutex_lock(&q->lock);
+ mp_mutex_lock(&q->lock);
q->active = q->reading = false;
for (int n = 0; n < q->num_frames; n++)
mp_frame_unref(&q->frames[n]);
@@ -46,7 +46,7 @@ static void reset_queue(struct async_queue *q)
if (q->conn[n])
mp_filter_wakeup(q->conn[n]);
}
- pthread_mutex_unlock(&q->lock);
+ mp_mutex_unlock(&q->lock);
}
static void unref_queue(struct async_queue *q)
@@ -57,7 +57,7 @@ static void unref_queue(struct async_queue *q)
assert(count >= 0);
if (count == 0) {
reset_queue(q);
- pthread_mutex_destroy(&q->lock);
+ mp_mutex_destroy(&q->lock);
talloc_free(q);
}
}
@@ -75,7 +75,7 @@ struct mp_async_queue *mp_async_queue_create(void)
*r->q = (struct async_queue){
.refcount = 1,
};
- pthread_mutex_init(&r->q->lock, NULL);
+ mp_mutex_init(&r->q->lock);
talloc_set_destructor(r, on_free_queue);
mp_async_queue_set_config(r, (struct mp_async_queue_config){0});
return r;
@@ -142,12 +142,12 @@ void mp_async_queue_set_config(struct mp_async_queue *queue,
cfg.max_samples = MPMAX(cfg.max_samples, 1);
- pthread_mutex_lock(&q->lock);
+ mp_mutex_lock(&q->lock);
bool recompute = q->cfg.sample_unit != cfg.sample_unit;
q->cfg = cfg;
if (recompute)
recompute_sizes(q);
- pthread_mutex_unlock(&q->lock);
+ mp_mutex_unlock(&q->lock);
}
void mp_async_queue_reset(struct mp_async_queue *queue)
@@ -158,18 +158,18 @@ void mp_async_queue_reset(struct mp_async_queue *queue)
bool mp_async_queue_is_active(struct mp_async_queue *queue)
{
struct async_queue *q = queue->q;
- pthread_mutex_lock(&q->lock);
+ mp_mutex_lock(&q->lock);
bool res = q->active;
- pthread_mutex_unlock(&q->lock);
+ mp_mutex_unlock(&q->lock);
return res;
}
bool mp_async_queue_is_full(struct mp_async_queue *queue)
{
struct async_queue *q = queue->q;
- pthread_mutex_lock(&q->lock);
+ mp_mutex_lock(&q->lock);
bool res = is_full(q);
- pthread_mutex_unlock(&q->lock);
+ mp_mutex_unlock(&q->lock);
return res;
}
@@ -177,21 +177,21 @@ void mp_async_queue_resume(struct mp_async_queue *queue)
{
struct async_queue *q = queue->q;
- pthread_mutex_lock(&q->lock);
+ mp_mutex_lock(&q->lock);
if (!q->active) {
q->active = true;
// Possibly make the consumer request new frames.
if (q->conn[1])
mp_filter_wakeup(q->conn[1]);
}
- pthread_mutex_unlock(&q->lock);
+ mp_mutex_unlock(&q->lock);
}
void mp_async_queue_resume_reading(struct mp_async_queue *queue)
{
struct async_queue *q = queue->q;
- pthread_mutex_lock(&q->lock);
+ mp_mutex_lock(&q->lock);
if (!q->active || !q->reading) {
q->active = true;
q->reading = true;
@@ -201,24 +201,24 @@ void mp_async_queue_resume_reading(struct mp_async_queue *queue)
mp_filter_wakeup(q->conn[n]);
}
}
- pthread_mutex_unlock(&q->lock);
+ mp_mutex_unlock(&q->lock);
}
int64_t mp_async_queue_get_samples(struct mp_async_queue *queue)
{
struct async_queue *q = queue->q;
- pthread_mutex_lock(&q->lock);
+ mp_mutex_lock(&q->lock);
int64_t res = q->samples_size;
- pthread_mutex_unlock(&q->lock);
+ mp_mutex_unlock(&q->lock);
return res;
}
int mp_async_queue_get_frames(struct mp_async_queue *queue)
{
struct async_queue *q = queue->q;
- pthread_mutex_lock(&q->lock);
+ mp_mutex_lock(&q->lock);
int res = q->num_frames;
- pthread_mutex_unlock(&q->lock);
+ mp_mutex_unlock(&q->lock);
return res;
}
@@ -232,12 +232,12 @@ static void destroy(struct mp_filter *f)
struct priv *p = f->priv;
struct async_queue *q = p->q;
- pthread_mutex_lock(&q->lock);
+ mp_mutex_lock(&q->lock);
for (int n = 0; n < 2; n++) {
if (q->conn[n] == f)
q->conn[n] = NULL;
}
- pthread_mutex_unlock(&q->lock);
+ mp_mutex_unlock(&q->lock);
unref_queue(q);
}
@@ -248,7 +248,7 @@ static void process_in(struct mp_filter *f)
struct async_queue *q = p->q;
assert(q->conn[0] == f);
- pthread_mutex_lock(&q->lock);
+ mp_mutex_lock(&q->lock);
if (!q->reading) {
// mp_async_queue_reset()/reset_queue() is usually called asynchronously,
// so we might have requested a frame earlier, and now can't use it.
@@ -274,7 +274,7 @@ static void process_in(struct mp_filter *f)
}
if (p->notify && !q->num_frames)
mp_filter_wakeup(p->notify);
- pthread_mutex_unlock(&q->lock);
+ mp_mutex_unlock(&q->lock);
}
static void process_out(struct mp_filter *f)
@@ -286,7 +286,7 @@ static void process_out(struct mp_filter *f)
if (!mp_pin_in_needs_data(f->ppins[0]))
return;
- pthread_mutex_lock(&q->lock);
+ mp_mutex_lock(&q->lock);
if (q->active && !q->reading) {
q->reading = true;
mp_filter_wakeup(q->conn[0]);
@@ -301,7 +301,7 @@ static void process_out(struct mp_filter *f)
if (q->conn[0])
mp_filter_wakeup(q->conn[0]);
}
- pthread_mutex_unlock(&q->lock);
+ mp_mutex_unlock(&q->lock);
}
static void reset(struct mp_filter *f)
@@ -309,12 +309,12 @@ static void reset(struct mp_filter *f)
struct priv *p = f->priv;
struct async_queue *q = p->q;
- pthread_mutex_lock(&q->lock);
+ mp_mutex_lock(&q->lock);
// If the queue is in reading state, it is logical that it should request
// input immediately.
if (mp_pin_get_dir(f->pins[0]) == MP_PIN_IN && q->reading)
mp_filter_wakeup(f);
- pthread_mutex_unlock(&q->lock);
+ mp_mutex_unlock(&q->lock);
}
// producer
@@ -365,11 +365,11 @@ struct mp_filter *mp_async_queue_create_filter(struct mp_filter *parent,
atomic_fetch_add(&q->refcount, 1);
p->q = q;
- pthread_mutex_lock(&q->lock);
+ mp_mutex_lock(&q->lock);
int slot = is_in ? 0 : 1;
assert(!q->conn[slot]); // fails if already connected on this end
q->conn[slot] = f;
- pthread_mutex_unlock(&q->lock);
+ mp_mutex_unlock(&q->lock);
return f;
}
diff --git a/filters/f_decoder_wrapper.c b/filters/f_decoder_wrapper.c
index ad090ecd7f..6551e3478f 100644
--- a/filters/f_decoder_wrapper.c
+++ b/filters/f_decoder_wrapper.c
@@ -21,7 +21,6 @@
#include <stdbool.h>
#include <math.h>
#include <assert.h>
-#include <pthread.h>
#include <libavutil/buffer.h>
#include <libavutil/common.h>
@@ -219,9 +218,9 @@ struct priv {
struct mp_async_queue *queue; // decoded frame output queue
struct mp_dispatch_queue *dec_dispatch; // non-NULL if decoding thread used
bool dec_thread_lock; // debugging (esp. for no-thread case)
- pthread_t dec_thread;
+ mp_thread dec_thread;
bool dec_thread_valid;
- pthread_mutex_t cache_lock;
+ mp_mutex cache_lock;
// --- Protected by cache_lock.
char *cur_hwdec;
@@ -259,13 +258,13 @@ static int decoder_list_help(struct mp_log *log, const m_option_t *opt,
// thread state. Must run on/locked with decoder thread.
static void update_cached_values(struct priv *p)
{
- pthread_mutex_lock(&p->cache_lock);
+ mp_mutex_lock(&p->cache_lock);
p->cur_hwdec = NULL;
if (p->decoder && p->decoder->control)
p->decoder->control(p->decoder->f, VDCTRL_GET_HWDEC, &p->cur_hwdec);
- pthread_mutex_unlock(&p->cache_lock);
+ mp_mutex_unlock(&p->cache_lock);
}
// Lock the decoder thread. This may synchronously wait until the decoder thread
@@ -324,11 +323,11 @@ static void decf_reset(struct mp_filter *f)
p->pts = MP_NOPTS_VALUE;
p->last_format = p->fixed_format = (struct mp_image_params){0};
- pthread_mutex_lock(&p->cache_lock);
+ mp_mutex_lock(&p->cache_lock);
p->pts_reset = false;
p->attempt_framedrops = 0;
p->dropped_frames = 0;
- pthread_mutex_unlock(&p->cache_lock);
+ mp_mutex_unlock(&p->cache_lock);
p->coverart_returned = 0;
@@ -347,9 +346,9 @@ int mp_decoder_wrapper_control(struct mp_decoder_wrapper *d,
struct priv *p = d->f->priv;
int res = CONTROL_UNKNOWN;
if (cmd == VDCTRL_GET_HWDEC) {
- pthread_mutex_lock(&p->cache_lock);
+ mp_mutex_lock(&p->cache_lock);
*(char **)arg = p->cur_hwdec;
- pthread_mutex_unlock(&p->cache_lock);
+ mp_mutex_unlock(&p->cache_lock);
} else {
thread_lock(p);
if (p->decoder && p->decoder->control)
@@ -415,9 +414,9 @@ static bool reinit_decoder(struct priv *p)
user_list = p->opts->audio_decoders;
fallback = "aac";
- pthread_mutex_lock(&p->cache_lock);
+ mp_mutex_lock(&p->cache_lock);
bool try_spdif = p->try_spdif;
- pthread_mutex_unlock(&p->cache_lock);
+ mp_mutex_unlock(&p->cache_lock);
if (try_spdif && p->codec->codec) {
struct mp_decoder_list *spdif =
@@ -450,11 +449,11 @@ static bool reinit_decoder(struct priv *p)
p->decoder = driver->create(p->decf, p->codec, sel->decoder);
if (p->decoder) {
- pthread_mutex_lock(&p->cache_lock);
+ mp_mutex_lock(&p->cache_lock);
const char *d = sel->desc && sel->desc[0] ? sel->desc : sel->decoder;
p->decoder_desc = talloc_strdup(p, d);
MP_VERBOSE(p, "Selected codec: %s\n", p->decoder_desc);
- pthread_mutex_unlock(&p->cache_lock);
+ mp_mutex_unlock(&p->cache_lock);
break;
}
@@ -485,25 +484,25 @@ void mp_decoder_wrapper_get_desc(struct mp_decoder_wrapper *d,
char *buf, size_t buf_size)
{
struct priv *p = d->f->priv;
- pthread_mutex_lock(&p->cache_lock);
+ mp_mutex_lock(&p->cache_lock);
snprintf(buf, buf_size, "%s", p->decoder_desc ? p->decoder_desc : "");
- pthread_mutex_unlock(&p->cache_lock);
+ mp_mutex_unlock(&p->cache_lock);
}
void mp_decoder_wrapper_set_frame_drops(struct mp_decoder_wrapper *d, int num)
{
struct priv *p = d->f->priv;
- pthread_mutex_lock(&p->cache_lock);
+ mp_mutex_lock(&p->cache_lock);
p->attempt_framedrops = num;
- pthread_mutex_unlock(&p->cache_lock);
+ mp_mutex_unlock(&p->cache_lock);
}
int mp_decoder_wrapper_get_frames_dropped(struct mp_decoder_wrapper *d)
{
struct priv *p = d->f->priv;
- pthread_mutex_lock(&p->cache_lock);
+ mp_mutex_lock(&p->cache_lock);
int res = p->dropped_frames;
- pthread_mutex_unlock(&p->cache_lock);
+ mp_mutex_unlock(&p->cache_lock);
return res;
}
@@ -519,25 +518,25 @@ double mp_decoder_wrapper_get_container_fps(struct mp_decoder_wrapper *d)
void mp_decoder_wrapper_set_spdif_flag(struct mp_decoder_wrapper *d, bool spdif)
{
struct priv *p = d->f->priv;
- pthread_mutex_lock(&p->cache_lock);
+ mp_mutex_lock(&p->cache_lock);
p->try_spdif = spdif;
- pthread_mutex_unlock(&p->cache_lock);
+ mp_mutex_unlock(&p->cache_lock);
}
void mp_decoder_wrapper_set_coverart_flag(struct mp_decoder_wrapper *d, bool c)
{
struct priv *p = d->f->priv;
- pthread_mutex_lock(&p->cache_lock);
+ mp_mutex_lock(&p->cache_lock);
p->attached_picture = c;
- pthread_mutex_unlock(&p->cache_lock);
+ mp_mutex_unlock(&p->cache_lock);
}
bool mp_decoder_wrapper_get_pts_reset(struct mp_decoder_wrapper *d)
{
struct priv *p = d->f->priv;
- pthread_mutex_lock(&p->cache_lock);
+ mp_mutex_lock(&p->cache_lock);
bool res = p->pts_reset;
- pthread_mutex_unlock(&p->cache_lock);
+ mp_mutex_unlock(&p->cache_lock);
return res;
}
@@ -800,9 +799,9 @@ static void correct_audio_pts(struct priv *p, struct mp_aframe *aframe)
if (p->pts != MP_NOPTS_VALUE && diff > 0.1) {
MP_WARN(p, "Invalid audio PTS: %f -> %f\n", p->pts, frame_pts);
if (diff >= 5) {
- pthread_mutex_lock(&p->cache_lock);
+ mp_mutex_lock(&p->cache_lock);
p->pts_reset = true;
- pthread_mutex_unlock(&p->cache_lock);
+ mp_mutex_unlock(&p->cache_lock);
}
}
@@ -902,10 +901,10 @@ static void feed_packet(struct priv *p)
int framedrop_type = 0;
- pthread_mutex_lock(&p->cache_lock);
+ mp_mutex_lock(&p->cache_lock);
if (p->attempt_framedrops)
framedrop_type = 1;
- pthread_mutex_unlock(&p->cache_lock);
+ mp_mutex_unlock(&p->cache_lock);
if (start_pts != MP_NOPTS_VALUE && packet && p->play_dir > 0 &&
packet->pts < start_pts - .005 && !p->has_broken_packet_pts)
@@ -1003,7 +1002,7 @@ static void read_frame(struct priv *p)
if (!frame.type)
return;
- pthread_mutex_lock(&p->cache_lock);
+ mp_mutex_lock(&p->cache_lock);
if (p->attached_picture && frame.type == MP_FRAME_VIDEO)
p->decoded_coverart = frame;
if (p->attempt_framedrops) {
@@ -1011,7 +1010,7 @@ static void read_frame(struct priv *p)
p->attempt_framedrops = MPMAX(0, p->attempt_framedrops - dropped);
p->dropped_frames += dropped;
}
- pthread_mutex_unlock(&p->cache_lock);
+ mp_mutex_unlock(&p->cache_lock);
if (p->decoded_coverart.type) {
mp_filter_internal_mark_progress(p->decf);
@@ -1098,7 +1097,7 @@ static void decf_process(struct mp_filter *f)
read_frame(p);
}
-static void *dec_thread(void *ptr)
+static MP_THREAD_VOID dec_thread(void *ptr)
{
struct priv *p = ptr;
@@ -1107,7 +1106,7 @@ static void *dec_thread(void *ptr)
case STREAM_VIDEO: t_name = "dec/video"; break;
case STREAM_AUDIO: t_name = "dec/audio"; break;
}
- mpthread_set_name(t_name);
+ mp_thread_set_name(t_name);
while (!p->request_terminate_dec_thread) {
mp_filter_graph_run(p->dec_root_filter);
@@ -1115,7 +1114,7 @@ static void *dec_thread(void *ptr)
mp_dispatch_queue_process(p->dec_dispatch, INFINITY);
}
- return NULL;
+ MP_THREAD_RETURN();
}
static void public_f_reset(struct mp_filter *f)
@@ -1145,7 +1144,7 @@ static void public_f_destroy(struct mp_filter *f)
p->request_terminate_dec_thread = 1;
mp_dispatch_interrupt(p->dec_dispatch);
thread_unlock(p);
- pthread_join(p->dec_thread, NULL);
+ mp_thread_join(p->dec_thread);
p->dec_thread_valid = false;
}
@@ -1153,7 +1152,7 @@ static void public_f_destroy(struct mp_filter *f)
talloc_free(p->dec_root_filter);
talloc_free(p->queue);
- pthread_mutex_destroy(&p->cache_lock);
+ mp_mutex_destroy(&p->cache_lock);
}
static const struct mp_filter_info decf_filter = {
@@ -1194,7 +1193,7 @@ struct mp_decoder_wrapper *mp_decoder_wrapper_create(struct mp_filter *parent,
struct priv *p = public_f->priv;
p->public.f = public_f;
- pthread_mutex_init(&p->cache_lock, NULL);
+ mp_mutex_init(&p->cache_lock);
p->opt_cache = m_config_cache_alloc(p, public_f->global, &dec_wrapper_conf);
p->opts = p->opt_cache->opts;
p->header = src;
@@ -1264,7 +1263,7 @@ struct mp_decoder_wrapper *mp_decoder_wrapper_create(struct mp_filter *parent,
mp_pin_connect(f_out->pins[0], p->decf->pins[0]);
p->dec_thread_valid = true;
- if (pthread_create(&p->dec_thread, NULL, dec_thread, p)) {
+ if (mp_thread_create(&p->dec_thread, dec_thread, p)) {
p->dec_thread_valid = false;
goto error;
}
diff --git a/filters/filter.c b/filters/filter.c
index c386401c9f..1d13393194 100644
--- a/filters/filter.c
+++ b/filters/filter.c
@@ -1,5 +1,4 @@
#include <math.h>
-#include <pthread.h>
#include <stdatomic.h>
#include <libavutil/hwcontext.h>
@@ -7,6 +6,7 @@
#include "common/common.h"
#include "common/global.h"
#include "common/msg.h"
+#include "osdep/threads.h"
#include "osdep/timer.h"
#include "video/hwdec.h"
#include "video/img_format.h"
@@ -90,7 +90,7 @@ struct filter_runner {
// For async notifications only. We don't bother making this fine grained
// across filters.
- pthread_mutex_t async_lock;
+ mp_mutex async_lock;
// Wakeup is pending. Protected by async_lock.
bool async_wakeup_sent;
@@ -196,7 +196,7 @@ void mp_filter_internal_mark_progress(struct mp_filter *f)
// sync notifications don't need any locking.
static void flush_async_notifications(struct filter_runner *r)
{
- pthread_mutex_lock(&r->async_lock);
+ mp_mutex_lock(&r->async_lock);
for (int n = 0; n < r->num_async_pending; n++) {
struct mp_filter *f = r->async_pending[n];
add_pending(f);
@@ -204,7 +204,7 @@ static void flush_async_notifications(struct filter_runner *r)
}
r->num_async_pending = 0;
r->async_wakeup_sent = false;
- pthread_mutex_unlock(&r->async_lock);
+ mp_mutex_unlock(&r->async_lock);
}
bool mp_filter_graph_run(struct mp_filter *filter)
@@ -230,11 +230,11 @@ bool mp_filter_graph_run(struct mp_filter *filter)
if (atomic_exchange_explicit(&r->interrupt_flag, false,
memory_order_acq_rel))
{
- pthread_mutex_lock(&r->async_lock);
+ mp_mutex_lock(&r->async_lock);
if (!r->async_wakeup_sent && r->wakeup_cb)
r->wakeup_cb(r->wakeup_ctx);
r->async_wakeup_sent = true;
- pthread_mutex_unlock(&r->async_lock);
+ mp_mutex_unlock(&r->async_lock);
exit_req = true;
}
@@ -703,7 +703,7 @@ struct mp_hwdec_ctx *mp_filter_load_hwdec_device(struct mp_filter *f, int imgfmt
static void filter_wakeup(struct mp_filter *f, bool mark_only)
{
struct filter_runner *r = f->in->runner;
- pthread_mutex_lock(&r->async_lock);
+ mp_mutex_lock(&r->async_lock);
if (!f->in->async_pending) {
f->in->async_pending = true;
// (not using a talloc parent for thread safety reasons)
@@ -714,7 +714,7 @@ static void filter_wakeup(struct mp_filter *f, bool mark_only)
r->wakeup_cb(r->wakeup_ctx);
r->async_wakeup_sent = true;
}
- pthread_mutex_unlock(&r->async_lock);
+ mp_mutex_unlock(&r->async_lock);
}
void mp_filter_wakeup(struct mp_filter *f)
@@ -784,7 +784,7 @@ static void filter_destructor(void *p)
if (r->root_filter == f) {
assert(!f->in->parent);
- pthread_mutex_destroy(&r->async_lock);
+ mp_mutex_destroy(&r->async_lock);
talloc_free(r->async_pending);
talloc_free(r);
}
@@ -816,7 +816,7 @@ struct mp_filter *mp_filter_create_with_params(struct mp_filter_params *params)
.root_filter = f,
.max_run_time = INFINITY,
};
- pthread_mutex_init(&f->in->runner->async_lock, NULL);
+ mp_mutex_init(&f->in->runner->async_lock);
}
if (!f->global)
@@ -872,10 +872,10 @@ void mp_filter_graph_set_wakeup_cb(struct mp_filter *root,
{
struct filter_runner *r = root->in->runner;
assert(root == r->root_filter); // user is supposed to call this on root only
- pthread_mutex_lock(&r->async_lock);
+ mp_mutex_lock(&r->async_lock);
r->wakeup_cb = wakeup_cb;
r->wakeup_ctx = ctx;
- pthread_mutex_unlock(&r->async_lock);
+ mp_mutex_unlock(&r->async_lock);
}
static const char *filt_name(struct mp_filter *f)