summaryrefslogtreecommitdiffstats
path: root/filters/f_async_queue.c
diff options
context:
space:
mode:
Diffstat (limited to 'filters/f_async_queue.c')
-rw-r--r--filters/f_async_queue.c72
1 files changed, 71 insertions, 1 deletions
diff --git a/filters/f_async_queue.c b/filters/f_async_queue.c
index 84f38e9460..5ee32be5fa 100644
--- a/filters/f_async_queue.c
+++ b/filters/f_async_queue.c
@@ -155,6 +155,24 @@ void mp_async_queue_reset(struct mp_async_queue *queue)
reset_queue(queue->q);
}
+bool mp_async_queue_is_active(struct mp_async_queue *queue)
+{
+ struct async_queue *q = queue->q;
+ pthread_mutex_lock(&q->lock);
+ bool res = q->active;
+ pthread_mutex_unlock(&q->lock);
+ return res;
+}
+
+bool mp_async_queue_is_full(struct mp_async_queue *queue)
+{
+ struct async_queue *q = queue->q;
+ pthread_mutex_lock(&q->lock);
+ bool res = is_full(q);
+ pthread_mutex_unlock(&q->lock);
+ return res;
+}
+
void mp_async_queue_resume(struct mp_async_queue *queue)
{
struct async_queue *q = queue->q;
@@ -169,8 +187,44 @@ void mp_async_queue_resume(struct mp_async_queue *queue)
pthread_mutex_unlock(&q->lock);
}
+void mp_async_queue_resume_reading(struct mp_async_queue *queue)
+{
+ struct async_queue *q = queue->q;
+
+ pthread_mutex_lock(&q->lock);
+ if (!q->active || !q->reading) {
+ q->active = true;
+ q->reading = true;
+ // Possibly start producer/consumer.
+ for (int n = 0; n < 2; n++) {
+ if (q->conn[n])
+ mp_filter_wakeup(q->conn[n]);
+ }
+ }
+ pthread_mutex_unlock(&q->lock);
+}
+
+int64_t mp_async_queue_get_samples(struct mp_async_queue *queue)
+{
+ struct async_queue *q = queue->q;
+ pthread_mutex_lock(&q->lock);
+ int64_t res = q->samples_size;
+ pthread_mutex_unlock(&q->lock);
+ return res;
+}
+
+int mp_async_queue_get_frames(struct mp_async_queue *queue)
+{
+ struct async_queue *q = queue->q;
+ pthread_mutex_lock(&q->lock);
+ int res = q->num_frames;
+ pthread_mutex_unlock(&q->lock);
+ return res;
+}
+
struct priv {
struct async_queue *q;
+ struct mp_filter *notify;
};
static void destroy(struct mp_filter *f)
@@ -212,9 +266,14 @@ static void process_in(struct mp_filter *f)
// Notify reader that we have new frames.
if (q->conn[1])
mp_filter_wakeup(q->conn[1]);
- if (!is_full(q))
+ bool full = is_full(q);
+ if (!full)
mp_pin_out_request_data_next(f->ppins[0]);
+ if (p->notify && full)
+ mp_filter_wakeup(p->notify);
}
+ if (p->notify && !q->num_frames)
+ mp_filter_wakeup(p->notify);
pthread_mutex_unlock(&q->lock);
}
@@ -275,6 +334,17 @@ static const struct mp_filter_info info_out = {
.process = process_out,
};
+void mp_async_queue_set_notifier(struct mp_filter *f, struct mp_filter *notify)
+{
+ assert(mp_filter_get_info(f) == &info_in);
+ struct priv *p = f->priv;
+ if (p->notify != notify) {
+ p->notify = notify;
+ if (notify)
+ mp_filter_wakeup(notify);
+ }
+}
+
struct mp_filter *mp_async_queue_create_filter(struct mp_filter *parent,
enum mp_pin_dir dir,
struct mp_async_queue *queue)