summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--filters/f_async_queue.c299
-rw-r--r--filters/f_async_queue.h92
-rw-r--r--filters/filter.h2
-rw-r--r--wscript_build.py1
4 files changed, 393 insertions, 1 deletions
diff --git a/filters/f_async_queue.c b/filters/f_async_queue.c
new file mode 100644
index 0000000000..696649f3d1
--- /dev/null
+++ b/filters/f_async_queue.c
@@ -0,0 +1,299 @@
+#include <limits.h>
+#include <pthread.h>
+
+#include "audio/aframe.h"
+#include "common/common.h"
+#include "common/msg.h"
+#include "osdep/atomic.h"
+
+#include "f_async_queue.h"
+#include "filter_internal.h"
+
+struct mp_async_queue {
+ // This is just a wrapper, so the API user can talloc_free() it, instead of
+ // having to call a special unref function.
+ struct async_queue *q;
+};
+
+struct async_queue {
+ mp_atomic_uint64 refcount;
+
+ pthread_mutex_t lock;
+
+ // -- protected by lock
+ struct mp_async_queue_config cfg;
+ bool active; // queue was resumed; consumer may request frames
+ bool reading; // data flow: reading => consumer has requested frames
+ int64_t samples_size; // queue size in the cfg.sample_unit
+ size_t byte_size; // queue size in bytes (using approx. frame sizes)
+ int num_frames;
+ struct mp_frame *frames;
+ int eof_count; // number of MP_FRAME_EOF in frames[], for draining
+ struct mp_filter *conn[2]; // filters: in (0), out (1)
+};
+
+static void reset_queue(struct async_queue *q)
+{
+ pthread_mutex_lock(&q->lock);
+ q->active = q->reading = false;
+ for (int n = 0; n < q->num_frames; n++)
+ mp_frame_unref(&q->frames[n]);
+ q->num_frames = 0;
+ q->eof_count = 0;
+ q->samples_size = 0;
+ q->byte_size = 0;
+ for (int n = 0; n < 2; n++) {
+ if (q->conn[n])
+ mp_filter_wakeup(q->conn[n]);
+ }
+ pthread_mutex_unlock(&q->lock);
+}
+
+static void unref_queue(struct async_queue *q)
+{
+ if (!q)
+ return;
+ int count = atomic_fetch_add(&q->refcount, -1) - 1;
+ assert(count >= 0);
+ if (count == 0) {
+ reset_queue(q);
+ pthread_mutex_destroy(&q->lock);
+ talloc_free(q);
+ }
+}
+
+static void on_free_queue(void *p)
+{
+ struct mp_async_queue *q = p;
+ unref_queue(q->q);
+}
+
+struct mp_async_queue *mp_async_queue_create(void)
+{
+ struct mp_async_queue *r = talloc_zero(NULL, struct mp_async_queue);
+ r->q = talloc_zero(NULL, struct async_queue);
+ *r->q = (struct async_queue){
+ .refcount = ATOMIC_VAR_INIT(1),
+ };
+ pthread_mutex_init(&r->q->lock, NULL);
+ talloc_set_destructor(r, on_free_queue);
+ mp_async_queue_set_config(r, (struct mp_async_queue_config){0});
+ return r;
+}
+
+static int64_t frame_get_samples(struct async_queue *q, struct mp_frame frame)
+{
+ int64_t res = 1;
+ if (frame.type == MP_FRAME_AUDIO && q->cfg.sample_unit == AQUEUE_UNIT_SAMPLES) {
+ struct mp_aframe *aframe = frame.data;
+ res = mp_aframe_get_size(aframe);
+ }
+ return res;
+}
+
+static bool is_full(struct async_queue *q)
+{
+ if (q->samples_size >= q->cfg.max_samples || q->byte_size >= q->cfg.max_bytes)
+ return true;
+ if (q->num_frames >= 2 && q->cfg.max_duration > 0) {
+ double pts1 = mp_frame_get_pts(q->frames[q->num_frames - 1]);
+ double pts2 = mp_frame_get_pts(q->frames[0]);
+ if (pts1 != MP_NOPTS_VALUE && pts2 != MP_NOPTS_VALUE &&
+ pts2 - pts1 >= q->cfg.max_duration)
+ return true;
+ }
+ return false;
+}
+
+// Add or remove a frame from the accounted queue size.
+// dir==1: add, dir==-1: remove
+static void account_frame(struct async_queue *q, struct mp_frame frame,
+ int dir)
+{
+ assert(dir == 1 || dir == -1);
+
+ q->samples_size += dir * frame_get_samples(q, frame);
+ q->byte_size += dir * mp_frame_approx_size(frame);
+
+ if (frame.type == MP_FRAME_EOF)
+ q->eof_count += dir;
+}
+
+static void recompute_sizes(struct async_queue *q)
+{
+ q->eof_count = 0;
+ q->samples_size = 0;
+ q->byte_size = 0;
+ for (int n = 0; n < q->num_frames; n++)
+ account_frame(q, q->frames[n], 1);
+}
+
+void mp_async_queue_set_config(struct mp_async_queue *queue,
+ struct mp_async_queue_config cfg)
+{
+ struct async_queue *q = queue->q;
+
+ cfg.max_bytes = MPCLAMP(cfg.max_bytes, 1, (size_t)-1 / 2);
+
+ assert(cfg.sample_unit == AQUEUE_UNIT_FRAME ||
+ cfg.sample_unit == AQUEUE_UNIT_SAMPLES);
+
+ cfg.max_samples = MPMAX(cfg.max_samples, 1);
+
+ pthread_mutex_lock(&q->lock);
+ bool recompute = q->cfg.sample_unit != cfg.sample_unit;
+ q->cfg = cfg;
+ if (recompute)
+ recompute_sizes(q);
+ pthread_mutex_unlock(&q->lock);
+}
+
+void mp_async_queue_reset(struct mp_async_queue *queue)
+{
+ reset_queue(queue->q);
+}
+
+void mp_async_queue_resume(struct mp_async_queue *queue)
+{
+ struct async_queue *q = queue->q;
+
+ pthread_mutex_lock(&q->lock);
+ if (!q->active) {
+ q->active = true;
+ // Possibly make the consumer request new frames.
+ if (q->conn[1])
+ mp_filter_wakeup(q->conn[1]);
+ }
+ pthread_mutex_unlock(&q->lock);
+}
+
+struct priv {
+ struct async_queue *q;
+};
+
+static void destroy(struct mp_filter *f)
+{
+ struct priv *p = f->priv;
+ struct async_queue *q = p->q;
+
+ pthread_mutex_lock(&q->lock);
+ for (int n = 0; n < 2; n++) {
+ if (q->conn[n] == f)
+ q->conn[n] = NULL;
+ }
+ pthread_mutex_unlock(&q->lock);
+
+ unref_queue(q);
+}
+
+static void process_in(struct mp_filter *f)
+{
+ struct priv *p = f->priv;
+ struct async_queue *q = p->q;
+ assert(q->conn[0] == f);
+
+ pthread_mutex_lock(&q->lock);
+ if (!q->reading) {
+ // mp_async_queue_reset()/reset_queue() is usually called asynchronously,
+ // so we might have requested a frame earlier, and now can't use it.
+ // Discard it; the expectation is that this is a benign logical race
+ // condition, and the filter graph will be reset anyway.
+ if (mp_pin_out_has_data(f->ppins[0])) {
+ struct mp_frame frame = mp_pin_out_read(f->ppins[0]);
+ mp_frame_unref(&frame);
+ MP_DBG(f, "discarding frame due to async reset\n");
+ }
+ } else if (!is_full(q) && mp_pin_out_request_data(f->ppins[0])) {
+ struct mp_frame frame = mp_pin_out_read(f->ppins[0]);
+ account_frame(q, frame, 1);
+ MP_TARRAY_INSERT_AT(q, q->frames, q->num_frames, 0, frame);
+ // Notify reader that we have new frames.
+ if (q->conn[1])
+ mp_filter_wakeup(q->conn[1]);
+ if (!is_full(q))
+ mp_pin_out_request_data_next(f->ppins[0]);
+ }
+ pthread_mutex_unlock(&q->lock);
+}
+
+static void process_out(struct mp_filter *f)
+{
+ struct priv *p = f->priv;
+ struct async_queue *q = p->q;
+ assert(q->conn[1] == f);
+
+ if (!mp_pin_in_needs_data(f->ppins[0]))
+ return;
+
+ pthread_mutex_lock(&q->lock);
+ if (q->active && !q->reading) {
+ q->reading = true;
+ mp_filter_wakeup(q->conn[0]);
+ }
+ if (q->active && q->num_frames) {
+ struct mp_frame frame = q->frames[q->num_frames - 1];
+ q->num_frames -= 1;
+ account_frame(q, frame, -1);
+ assert(q->samples_size >= 0);
+ mp_pin_in_write(f->ppins[0], frame);
+ // Notify writer that we need new frames.
+ if (q->conn[0])
+ mp_filter_wakeup(q->conn[0]);
+ }
+ pthread_mutex_unlock(&q->lock);
+}
+
+static void reset(struct mp_filter *f)
+{
+ struct priv *p = f->priv;
+ struct async_queue *q = p->q;
+
+ reset_queue(q);
+}
+
+// producer
+static const struct mp_filter_info info_in = {
+ .name = "async_queue_in",
+ .priv_size = sizeof(struct priv),
+ .destroy = destroy,
+ .process = process_in,
+ .reset = reset,
+};
+
+// consumer
+static const struct mp_filter_info info_out = {
+ .name = "async_queue_out",
+ .priv_size = sizeof(struct priv),
+ .destroy = destroy,
+ .process = process_out,
+ .reset = reset,
+};
+
+struct mp_filter *mp_async_queue_create_filter(struct mp_filter *parent,
+ enum mp_pin_dir dir,
+ struct mp_async_queue *queue)
+{
+ bool is_in = dir == MP_PIN_IN;
+ assert(queue);
+
+ struct mp_filter *f = mp_filter_create(parent, is_in ? &info_in : &info_out);
+ if (!f)
+ return NULL;
+
+ struct priv *p = f->priv;
+
+ struct async_queue *q = queue->q;
+
+ mp_filter_add_pin(f, dir, is_in ? "in" : "out");
+
+ atomic_fetch_add(&q->refcount, 1);
+ p->q = q;
+
+ pthread_mutex_lock(&q->lock);
+ int slot = is_in ? 0 : 1;
+ assert(!q->conn[slot]); // fails if already connected on this end
+ q->conn[slot] = f;
+ pthread_mutex_unlock(&q->lock);
+
+ return f;
+}
diff --git a/filters/f_async_queue.h b/filters/f_async_queue.h
new file mode 100644
index 0000000000..6b1ffabe36
--- /dev/null
+++ b/filters/f_async_queue.h
@@ -0,0 +1,92 @@
+#pragma once
+
+#include "filter.h"
+
+// A thread safe queue, which buffers a configurable number of frames like a
+// FIFO. It's part of the filter framework, and intended to provide such a
+// queue between filters. Since a filter graph can't be used from multiple
+// threads without synchronization, this provides 2 filters, which are
+// implicitly connected. (This seemed much saner than having special thread
+// safe mp_pins or such in the filter framework.)
+struct mp_async_queue;
+
+// Create a blank queue. Can be freed with talloc_free(). To use it, you need
+// to create input and output filters with mp_async_queue_create_filter().
+// Note that freeing it will only unref it. (E.g. you can free it once you've
+// created the input and output filters.)
+struct mp_async_queue *mp_async_queue_create(void);
+
+// Clear all queued data and make the queue "inactive". The latter prevents any
+// further communication until mp_async_queue_resume() is called.
+// For correct operation, you also need to call reset on the access filters
+void mp_async_queue_reset(struct mp_async_queue *queue);
+
+// Put the queue into "active" mode. If it wasn't, then the consumer is woken
+// up (and if there is no data in the queue, this will in turn wake up the
+// producer, i.e. start transfers automatically).
+void mp_async_queue_resume(struct mp_async_queue *queue);
+
+// Create a filter to access the queue, and connect it. It's not allowed to
+// connect an already connected end of the queue. The filter can be freed at
+// any time.
+//
+// The queue starts out in "inactive" mode, where the queue does not allow
+// the producer to write any data. You need to call mp_async_queue_resume() to
+// start communication. Actual transfers happen only once the consumer filter
+// has read requests on its mp_pin.
+// Resetting any of the consumer/producer filters calls mp_async_queue_reset().
+// If the producer filter requested a new frame from its filter graph, and the
+// queue is asynchronously set to "inactive", then the requested frame will be
+// silently discarded once it reaches the producer filter.
+//
+// For proper global reset, this order should be preferred:
+// - mp_async_queue_reset()
+// - reset producer and consumer filters on their respective threads (in any
+// order)
+// - do whatever other reset work is required
+// - mp_async_queue_resume()
+//
+// parent: filter graph the filter should be part of (or for standalone use,
+// create one with mp_filter_create_root())
+// dir: MP_PIN_IN for a filter that writes to the queue, MP_PIN_OUT to read
+// queue: queue to attach to (which end of it depends on dir)
+// The returned filter will have exactly 1 pin with the requested dir.
+struct mp_filter *mp_async_queue_create_filter(struct mp_filter *parent,
+ enum mp_pin_dir dir,
+ struct mp_async_queue *queue);
+
+enum mp_async_queue_sample_unit {
+ AQUEUE_UNIT_FRAME = 0, // a frame counts as 1 sample
+ AQUEUE_UNIT_SAMPLES, // number of audio samples (1 for other media types)
+};
+
+// Setting this struct to all-0 is equivalent to defaults.
+struct mp_async_queue_config {
+ // Maximum size of frames buffered. mp_frame_approx_size() is used. May be
+ // overshot by up to 1 full frame. Clamped to [1, SIZE_MAX/2].
+ int64_t max_bytes;
+
+ // Defines what a "sample" is; affects the fields below.
+ enum mp_async_queue_sample_unit sample_unit;
+
+ // Maximum number of frames allowed to be buffered at a time (if
+ // unit!=AQUEUE_UNIT_FRAME, can be overshot by the contents of 1 mp_frame).
+ // 0 is treated as 1.
+ int64_t max_samples;
+
+ // Maximum allowed timestamp difference between 2 frames. This still allows
+ // at least 2 samples. Behavior is unclear on timestamp resets (even if EOF
+ // frames are between them). A value of 0 disables this completely.
+ double max_duration;
+};
+
+// Configure the queue size. By default, the queue size is 1 frame.
+// The wakeup_threshold_* fields can be used to avoid too frequent wakeups by
+// delaying wakeups, and then making the producer to filter multiple frames at
+// once.
+// In all cases, the filters can still read/write if the producer/consumer got
+// woken up by something else.
+// If the current queue contains more frames than the new config allows, the
+// queue will remain over-allocated until these frames have been read.
+void mp_async_queue_set_config(struct mp_async_queue *queue,
+ struct mp_async_queue_config cfg);
diff --git a/filters/filter.h b/filters/filter.h
index 34146af98d..dff1f4e016 100644
--- a/filters/filter.h
+++ b/filters/filter.h
@@ -231,7 +231,7 @@ const char *mp_pin_get_name(struct mp_pin *p);
* graph, and disallowing different root filters ensures these graphs are not
* accidentally connected using non-thread safe mechanisms. Actual threaded
* filter graphs would use several independent graphs connected by asynchronous
- * helpers (such as queues instead of mp_pin connections).
+ * helpers (such as mp_async_queue instead of mp_pin connections).
*
* --- Rules for manual connections:
*
diff --git a/wscript_build.py b/wscript_build.py
index aa41aa06ea..f80fb08a83 100644
--- a/wscript_build.py
+++ b/wscript_build.py
@@ -300,6 +300,7 @@ def build(ctx):
( "demux/packet.c" ),
( "demux/timeline.c" ),
+ ( "filters/f_async_queue.c" ),
( "filters/f_autoconvert.c" ),
( "filters/f_auto_filters.c" ),
( "filters/f_decoder_wrapper.c" ),