summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorwm4 <wm4@nowhere>2020-08-28 20:08:32 +0200
committerwm4 <wm4@nowhere>2020-08-28 20:08:32 +0200
commit3d0eb4c26c134f0c153f28272423a347b7385cda (patch)
tree833302977b721daa8a612a651e1c7b6e72f6f00a
parent71d118733a45e359c0ec645455fac2005efb2692 (diff)
downloadmpv-3d0eb4c26c134f0c153f28272423a347b7385cda.tar.bz2
mpv-3d0eb4c26c134f0c153f28272423a347b7385cda.tar.xz
f_async_queue: add various helper functions
Shouldn't change the behavior if not used. Will probably be used in a later commit.
-rw-r--r--filters/f_async_queue.c72
-rw-r--r--filters/f_async_queue.h35
2 files changed, 105 insertions, 2 deletions
diff --git a/filters/f_async_queue.c b/filters/f_async_queue.c
index 84f38e9460..5ee32be5fa 100644
--- a/filters/f_async_queue.c
+++ b/filters/f_async_queue.c
@@ -155,6 +155,24 @@ void mp_async_queue_reset(struct mp_async_queue *queue)
reset_queue(queue->q);
}
+bool mp_async_queue_is_active(struct mp_async_queue *queue)
+{
+ struct async_queue *q = queue->q;
+ pthread_mutex_lock(&q->lock);
+ bool res = q->active;
+ pthread_mutex_unlock(&q->lock);
+ return res;
+}
+
+bool mp_async_queue_is_full(struct mp_async_queue *queue)
+{
+ struct async_queue *q = queue->q;
+ pthread_mutex_lock(&q->lock);
+ bool res = is_full(q);
+ pthread_mutex_unlock(&q->lock);
+ return res;
+}
+
void mp_async_queue_resume(struct mp_async_queue *queue)
{
struct async_queue *q = queue->q;
@@ -169,8 +187,44 @@ void mp_async_queue_resume(struct mp_async_queue *queue)
pthread_mutex_unlock(&q->lock);
}
+void mp_async_queue_resume_reading(struct mp_async_queue *queue)
+{
+ struct async_queue *q = queue->q;
+
+ pthread_mutex_lock(&q->lock);
+ if (!q->active || !q->reading) {
+ q->active = true;
+ q->reading = true;
+ // Possibly start producer/consumer.
+ for (int n = 0; n < 2; n++) {
+ if (q->conn[n])
+ mp_filter_wakeup(q->conn[n]);
+ }
+ }
+ pthread_mutex_unlock(&q->lock);
+}
+
+int64_t mp_async_queue_get_samples(struct mp_async_queue *queue)
+{
+ struct async_queue *q = queue->q;
+ pthread_mutex_lock(&q->lock);
+ int64_t res = q->samples_size;
+ pthread_mutex_unlock(&q->lock);
+ return res;
+}
+
+int mp_async_queue_get_frames(struct mp_async_queue *queue)
+{
+ struct async_queue *q = queue->q;
+ pthread_mutex_lock(&q->lock);
+ int res = q->num_frames;
+ pthread_mutex_unlock(&q->lock);
+ return res;
+}
+
struct priv {
struct async_queue *q;
+ struct mp_filter *notify;
};
static void destroy(struct mp_filter *f)
@@ -212,9 +266,14 @@ static void process_in(struct mp_filter *f)
// Notify reader that we have new frames.
if (q->conn[1])
mp_filter_wakeup(q->conn[1]);
- if (!is_full(q))
+ bool full = is_full(q);
+ if (!full)
mp_pin_out_request_data_next(f->ppins[0]);
+ if (p->notify && full)
+ mp_filter_wakeup(p->notify);
}
+ if (p->notify && !q->num_frames)
+ mp_filter_wakeup(p->notify);
pthread_mutex_unlock(&q->lock);
}
@@ -275,6 +334,17 @@ static const struct mp_filter_info info_out = {
.process = process_out,
};
+void mp_async_queue_set_notifier(struct mp_filter *f, struct mp_filter *notify)
+{
+ assert(mp_filter_get_info(f) == &info_in);
+ struct priv *p = f->priv;
+ if (p->notify != notify) {
+ p->notify = notify;
+ if (notify)
+ mp_filter_wakeup(notify);
+ }
+}
+
struct mp_filter *mp_async_queue_create_filter(struct mp_filter *parent,
enum mp_pin_dir dir,
struct mp_async_queue *queue)
diff --git a/filters/f_async_queue.h b/filters/f_async_queue.h
index 50678faec1..e05b43a43b 100644
--- a/filters/f_async_queue.h
+++ b/filters/f_async_queue.h
@@ -28,6 +28,28 @@ void mp_async_queue_reset(struct mp_async_queue *queue);
// fill up.
void mp_async_queue_resume(struct mp_async_queue *queue);
+// Like mp_async_queue_resume(), but also allows the producer writing to the
+// queue, even if the consumer will request any data yet.
+void mp_async_queue_resume_reading(struct mp_async_queue *queue);
+
+// Returns true if out of mp_async_queue_reset()/mp_async_queue_resume(), the
+// latter was most recently called.
+bool mp_async_queue_is_active(struct mp_async_queue *queue);
+
+// Returns true if the queue reached its configured size, the input filter
+// accepts no further frames. Always returns false if not active (then it does
+// not accept any input at all).
+bool mp_async_queue_is_full(struct mp_async_queue *queue);
+
+// Get the total of samples buffered within the queue itself. This doesn't count
+// samples buffered in the access filters. mp_async_queue_config.sample_unit is
+// used to define what "1 sample" means.
+int64_t mp_async_queue_get_samples(struct mp_async_queue *queue);
+
+// Get the total number of frames buffered within the queue itself. Frames
+// buffered in the access filters are not included.
+int mp_async_queue_get_frames(struct mp_async_queue *queue);
+
// Create a filter to access the queue, and connect it. It's not allowed to
// connect an already connected end of the queue. The filter can be freed at
// any time.
@@ -44,7 +66,7 @@ void mp_async_queue_resume(struct mp_async_queue *queue);
// queue state is the API user's responsibility. Note that resetting an input
// filter (dir==MP_PIN_IN) while the queue is active and in "reading" state
// (the output filter requested data at any point before the last
-// mp_async_queue_reset() was called), the
+// mp_async_queue_reset(), or mp_async_queue_resume_reading() was called), the
// filter will immediately request data after the reset.
//
// For proper global reset, this order should be preferred:
@@ -63,6 +85,17 @@ struct mp_filter *mp_async_queue_create_filter(struct mp_filter *parent,
enum mp_pin_dir dir,
struct mp_async_queue *queue);
+// Set a filter that should be woken up with mp_filter_wakeup() in the following
+// situations:
+// - mp_async_queue_is_full() changes to true (at least for a short moment)
+// - mp_async_queue_get_frames() changes to 0 (at least until new data is fed)
+// This is a workaround for the filter design, which does not allow you to write
+// to the queue in a "sequential" way (write, then check condition).
+// Calling this again on the same filter removes the previous notify filter.
+// f: must be a filter returned by mp_async_queue_create_filter(, MP_PIN_IN,)
+// notify: filter to be woken up
+void mp_async_queue_set_notifier(struct mp_filter *f, struct mp_filter *notify);
+
enum mp_async_queue_sample_unit {
AQUEUE_UNIT_FRAME = 0, // a frame counts as 1 sample
AQUEUE_UNIT_SAMPLES, // number of audio samples (1 for other media types,