diff options
author | wm4 <wm4@nowhere> | 2020-08-28 20:08:32 +0200 |
---|---|---|
committer | wm4 <wm4@nowhere> | 2020-08-28 20:08:32 +0200 |
commit | 3d0eb4c26c134f0c153f28272423a347b7385cda (patch) | |
tree | 833302977b721daa8a612a651e1c7b6e72f6f00a /filters/f_async_queue.c | |
parent | 71d118733a45e359c0ec645455fac2005efb2692 (diff) | |
download | mpv-3d0eb4c26c134f0c153f28272423a347b7385cda.tar.bz2 mpv-3d0eb4c26c134f0c153f28272423a347b7385cda.tar.xz |
f_async_queue: add various helper functions
Shouldn't change the behavior if not used. Will probably be used in a
later commit.
Diffstat (limited to 'filters/f_async_queue.c')
-rw-r--r-- | filters/f_async_queue.c | 72 |
1 files changed, 71 insertions, 1 deletions
diff --git a/filters/f_async_queue.c b/filters/f_async_queue.c index 84f38e9460..5ee32be5fa 100644 --- a/filters/f_async_queue.c +++ b/filters/f_async_queue.c @@ -155,6 +155,24 @@ void mp_async_queue_reset(struct mp_async_queue *queue) reset_queue(queue->q); } +bool mp_async_queue_is_active(struct mp_async_queue *queue) +{ + struct async_queue *q = queue->q; + pthread_mutex_lock(&q->lock); + bool res = q->active; + pthread_mutex_unlock(&q->lock); + return res; +} + +bool mp_async_queue_is_full(struct mp_async_queue *queue) +{ + struct async_queue *q = queue->q; + pthread_mutex_lock(&q->lock); + bool res = is_full(q); + pthread_mutex_unlock(&q->lock); + return res; +} + void mp_async_queue_resume(struct mp_async_queue *queue) { struct async_queue *q = queue->q; @@ -169,8 +187,44 @@ void mp_async_queue_resume(struct mp_async_queue *queue) pthread_mutex_unlock(&q->lock); } +void mp_async_queue_resume_reading(struct mp_async_queue *queue) +{ + struct async_queue *q = queue->q; + + pthread_mutex_lock(&q->lock); + if (!q->active || !q->reading) { + q->active = true; + q->reading = true; + // Possibly start producer/consumer. + for (int n = 0; n < 2; n++) { + if (q->conn[n]) + mp_filter_wakeup(q->conn[n]); + } + } + pthread_mutex_unlock(&q->lock); +} + +int64_t mp_async_queue_get_samples(struct mp_async_queue *queue) +{ + struct async_queue *q = queue->q; + pthread_mutex_lock(&q->lock); + int64_t res = q->samples_size; + pthread_mutex_unlock(&q->lock); + return res; +} + +int mp_async_queue_get_frames(struct mp_async_queue *queue) +{ + struct async_queue *q = queue->q; + pthread_mutex_lock(&q->lock); + int res = q->num_frames; + pthread_mutex_unlock(&q->lock); + return res; +} + struct priv { struct async_queue *q; + struct mp_filter *notify; }; static void destroy(struct mp_filter *f) @@ -212,9 +266,14 @@ static void process_in(struct mp_filter *f) // Notify reader that we have new frames. if (q->conn[1]) mp_filter_wakeup(q->conn[1]); - if (!is_full(q)) + bool full = is_full(q); + if (!full) mp_pin_out_request_data_next(f->ppins[0]); + if (p->notify && full) + mp_filter_wakeup(p->notify); } + if (p->notify && !q->num_frames) + mp_filter_wakeup(p->notify); pthread_mutex_unlock(&q->lock); } @@ -275,6 +334,17 @@ static const struct mp_filter_info info_out = { .process = process_out, }; +void mp_async_queue_set_notifier(struct mp_filter *f, struct mp_filter *notify) +{ + assert(mp_filter_get_info(f) == &info_in); + struct priv *p = f->priv; + if (p->notify != notify) { + p->notify = notify; + if (notify) + mp_filter_wakeup(notify); + } +} + struct mp_filter *mp_async_queue_create_filter(struct mp_filter *parent, enum mp_pin_dir dir, struct mp_async_queue *queue) |