From 3d0eb4c26c134f0c153f28272423a347b7385cda Mon Sep 17 00:00:00 2001 From: wm4 Date: Fri, 28 Aug 2020 20:08:32 +0200 Subject: f_async_queue: add various helper functions Shouldn't change the behavior if not used. Will probably be used in a later commit. --- filters/f_async_queue.c | 72 ++++++++++++++++++++++++++++++++++++++++++++++++- filters/f_async_queue.h | 35 +++++++++++++++++++++++- 2 files changed, 105 insertions(+), 2 deletions(-) diff --git a/filters/f_async_queue.c b/filters/f_async_queue.c index 84f38e9460..5ee32be5fa 100644 --- a/filters/f_async_queue.c +++ b/filters/f_async_queue.c @@ -155,6 +155,24 @@ void mp_async_queue_reset(struct mp_async_queue *queue) reset_queue(queue->q); } +bool mp_async_queue_is_active(struct mp_async_queue *queue) +{ + struct async_queue *q = queue->q; + pthread_mutex_lock(&q->lock); + bool res = q->active; + pthread_mutex_unlock(&q->lock); + return res; +} + +bool mp_async_queue_is_full(struct mp_async_queue *queue) +{ + struct async_queue *q = queue->q; + pthread_mutex_lock(&q->lock); + bool res = is_full(q); + pthread_mutex_unlock(&q->lock); + return res; +} + void mp_async_queue_resume(struct mp_async_queue *queue) { struct async_queue *q = queue->q; @@ -169,8 +187,44 @@ void mp_async_queue_resume(struct mp_async_queue *queue) pthread_mutex_unlock(&q->lock); } +void mp_async_queue_resume_reading(struct mp_async_queue *queue) +{ + struct async_queue *q = queue->q; + + pthread_mutex_lock(&q->lock); + if (!q->active || !q->reading) { + q->active = true; + q->reading = true; + // Possibly start producer/consumer. + for (int n = 0; n < 2; n++) { + if (q->conn[n]) + mp_filter_wakeup(q->conn[n]); + } + } + pthread_mutex_unlock(&q->lock); +} + +int64_t mp_async_queue_get_samples(struct mp_async_queue *queue) +{ + struct async_queue *q = queue->q; + pthread_mutex_lock(&q->lock); + int64_t res = q->samples_size; + pthread_mutex_unlock(&q->lock); + return res; +} + +int mp_async_queue_get_frames(struct mp_async_queue *queue) +{ + struct async_queue *q = queue->q; + pthread_mutex_lock(&q->lock); + int res = q->num_frames; + pthread_mutex_unlock(&q->lock); + return res; +} + struct priv { struct async_queue *q; + struct mp_filter *notify; }; static void destroy(struct mp_filter *f) @@ -212,9 +266,14 @@ static void process_in(struct mp_filter *f) // Notify reader that we have new frames. if (q->conn[1]) mp_filter_wakeup(q->conn[1]); - if (!is_full(q)) + bool full = is_full(q); + if (!full) mp_pin_out_request_data_next(f->ppins[0]); + if (p->notify && full) + mp_filter_wakeup(p->notify); } + if (p->notify && !q->num_frames) + mp_filter_wakeup(p->notify); pthread_mutex_unlock(&q->lock); } @@ -275,6 +334,17 @@ static const struct mp_filter_info info_out = { .process = process_out, }; +void mp_async_queue_set_notifier(struct mp_filter *f, struct mp_filter *notify) +{ + assert(mp_filter_get_info(f) == &info_in); + struct priv *p = f->priv; + if (p->notify != notify) { + p->notify = notify; + if (notify) + mp_filter_wakeup(notify); + } +} + struct mp_filter *mp_async_queue_create_filter(struct mp_filter *parent, enum mp_pin_dir dir, struct mp_async_queue *queue) diff --git a/filters/f_async_queue.h b/filters/f_async_queue.h index 50678faec1..e05b43a43b 100644 --- a/filters/f_async_queue.h +++ b/filters/f_async_queue.h @@ -28,6 +28,28 @@ void mp_async_queue_reset(struct mp_async_queue *queue); // fill up. void mp_async_queue_resume(struct mp_async_queue *queue); +// Like mp_async_queue_resume(), but also allows the producer writing to the +// queue, even if the consumer will request any data yet. +void mp_async_queue_resume_reading(struct mp_async_queue *queue); + +// Returns true if out of mp_async_queue_reset()/mp_async_queue_resume(), the +// latter was most recently called. +bool mp_async_queue_is_active(struct mp_async_queue *queue); + +// Returns true if the queue reached its configured size, the input filter +// accepts no further frames. Always returns false if not active (then it does +// not accept any input at all). +bool mp_async_queue_is_full(struct mp_async_queue *queue); + +// Get the total of samples buffered within the queue itself. This doesn't count +// samples buffered in the access filters. mp_async_queue_config.sample_unit is +// used to define what "1 sample" means. +int64_t mp_async_queue_get_samples(struct mp_async_queue *queue); + +// Get the total number of frames buffered within the queue itself. Frames +// buffered in the access filters are not included. +int mp_async_queue_get_frames(struct mp_async_queue *queue); + // Create a filter to access the queue, and connect it. It's not allowed to // connect an already connected end of the queue. The filter can be freed at // any time. @@ -44,7 +66,7 @@ void mp_async_queue_resume(struct mp_async_queue *queue); // queue state is the API user's responsibility. Note that resetting an input // filter (dir==MP_PIN_IN) while the queue is active and in "reading" state // (the output filter requested data at any point before the last -// mp_async_queue_reset() was called), the +// mp_async_queue_reset(), or mp_async_queue_resume_reading() was called), the // filter will immediately request data after the reset. // // For proper global reset, this order should be preferred: @@ -63,6 +85,17 @@ struct mp_filter *mp_async_queue_create_filter(struct mp_filter *parent, enum mp_pin_dir dir, struct mp_async_queue *queue); +// Set a filter that should be woken up with mp_filter_wakeup() in the following +// situations: +// - mp_async_queue_is_full() changes to true (at least for a short moment) +// - mp_async_queue_get_frames() changes to 0 (at least until new data is fed) +// This is a workaround for the filter design, which does not allow you to write +// to the queue in a "sequential" way (write, then check condition). +// Calling this again on the same filter removes the previous notify filter. +// f: must be a filter returned by mp_async_queue_create_filter(, MP_PIN_IN,) +// notify: filter to be woken up +void mp_async_queue_set_notifier(struct mp_filter *f, struct mp_filter *notify); + enum mp_async_queue_sample_unit { AQUEUE_UNIT_FRAME = 0, // a frame counts as 1 sample AQUEUE_UNIT_SAMPLES, // number of audio samples (1 for other media types, -- cgit v1.2.3