summaryrefslogtreecommitdiffstats
path: root/filters/f_async_queue.h
diff options
context:
space:
mode:
authorwm4 <wm4@nowhere>2020-02-29 21:13:56 +0100
committerwm4 <wm4@nowhere>2020-02-29 21:52:00 +0100
commit485f335b697069a04516beaa4e3cc118d92789e5 (patch)
tree2e2392f21bb1e928ae7f8447b3ab41d42f060754 /filters/f_async_queue.h
parentf29623786b871807a663ce922d9afbf0d716a387 (diff)
downloadmpv-485f335b697069a04516beaa4e3cc118d92789e5.tar.bz2
mpv-485f335b697069a04516beaa4e3cc118d92789e5.tar.xz
filter: add async queue filter
This is supposed to enable communication between filter graphs on separate threads. Having multiple threads makes only sense if they can run concurrently with each other, which requires such an asynchronous queue as a building block. (Probably.) The basic idea is that you have two independent filters, which can be each part of separate filter graphs, but which communicate into one direction with an explicit queue. This is rather similar to unix pipes. Just like unix pipes, the queue is limited in size, so that still some data flow control enforced, and runaway memory usage is avoided. This implementation is pretty dumb. In theory, you could avoid avoid waking up the filter graphs in quite a lot of situations. For example, you don't need to wake up the consumer filter if there are already frames queued. Also, you could add "watermarks" that set a threshold at which producer or consumer should be woken up to produce/consume more frames (this would generally serve to "batch" multiple frames at once, instead of performing high-frequency wakeups). But this is hard, so the code is dumb. (I just deleted all related code when I still got situations where wakeups were lost.) This is actually salvaged and modified from a much older branch I had lying around. It will be used in the next commit.
Diffstat (limited to 'filters/f_async_queue.h')
-rw-r--r--filters/f_async_queue.h92
1 files changed, 92 insertions, 0 deletions
diff --git a/filters/f_async_queue.h b/filters/f_async_queue.h
new file mode 100644
index 0000000000..6b1ffabe36
--- /dev/null
+++ b/filters/f_async_queue.h
@@ -0,0 +1,92 @@
+#pragma once
+
+#include "filter.h"
+
+// A thread safe queue, which buffers a configurable number of frames like a
+// FIFO. It's part of the filter framework, and intended to provide such a
+// queue between filters. Since a filter graph can't be used from multiple
+// threads without synchronization, this provides 2 filters, which are
+// implicitly connected. (This seemed much saner than having special thread
+// safe mp_pins or such in the filter framework.)
+struct mp_async_queue;
+
+// Create a blank queue. Can be freed with talloc_free(). To use it, you need
+// to create input and output filters with mp_async_queue_create_filter().
+// Note that freeing it will only unref it. (E.g. you can free it once you've
+// created the input and output filters.)
+struct mp_async_queue *mp_async_queue_create(void);
+
+// Clear all queued data and make the queue "inactive". The latter prevents any
+// further communication until mp_async_queue_resume() is called.
+// For correct operation, you also need to call reset on the access filters
+void mp_async_queue_reset(struct mp_async_queue *queue);
+
+// Put the queue into "active" mode. If it wasn't, then the consumer is woken
+// up (and if there is no data in the queue, this will in turn wake up the
+// producer, i.e. start transfers automatically).
+void mp_async_queue_resume(struct mp_async_queue *queue);
+
+// Create a filter to access the queue, and connect it. It's not allowed to
+// connect an already connected end of the queue. The filter can be freed at
+// any time.
+//
+// The queue starts out in "inactive" mode, where the queue does not allow
+// the producer to write any data. You need to call mp_async_queue_resume() to
+// start communication. Actual transfers happen only once the consumer filter
+// has read requests on its mp_pin.
+// Resetting any of the consumer/producer filters calls mp_async_queue_reset().
+// If the producer filter requested a new frame from its filter graph, and the
+// queue is asynchronously set to "inactive", then the requested frame will be
+// silently discarded once it reaches the producer filter.
+//
+// For proper global reset, this order should be preferred:
+// - mp_async_queue_reset()
+// - reset producer and consumer filters on their respective threads (in any
+// order)
+// - do whatever other reset work is required
+// - mp_async_queue_resume()
+//
+// parent: filter graph the filter should be part of (or for standalone use,
+// create one with mp_filter_create_root())
+// dir: MP_PIN_IN for a filter that writes to the queue, MP_PIN_OUT to read
+// queue: queue to attach to (which end of it depends on dir)
+// The returned filter will have exactly 1 pin with the requested dir.
+struct mp_filter *mp_async_queue_create_filter(struct mp_filter *parent,
+ enum mp_pin_dir dir,
+ struct mp_async_queue *queue);
+
+enum mp_async_queue_sample_unit {
+ AQUEUE_UNIT_FRAME = 0, // a frame counts as 1 sample
+ AQUEUE_UNIT_SAMPLES, // number of audio samples (1 for other media types)
+};
+
+// Setting this struct to all-0 is equivalent to defaults.
+struct mp_async_queue_config {
+ // Maximum size of frames buffered. mp_frame_approx_size() is used. May be
+ // overshot by up to 1 full frame. Clamped to [1, SIZE_MAX/2].
+ int64_t max_bytes;
+
+ // Defines what a "sample" is; affects the fields below.
+ enum mp_async_queue_sample_unit sample_unit;
+
+ // Maximum number of frames allowed to be buffered at a time (if
+ // unit!=AQUEUE_UNIT_FRAME, can be overshot by the contents of 1 mp_frame).
+ // 0 is treated as 1.
+ int64_t max_samples;
+
+ // Maximum allowed timestamp difference between 2 frames. This still allows
+ // at least 2 samples. Behavior is unclear on timestamp resets (even if EOF
+ // frames are between them). A value of 0 disables this completely.
+ double max_duration;
+};
+
+// Configure the queue size. By default, the queue size is 1 frame.
+// The wakeup_threshold_* fields can be used to avoid too frequent wakeups by
+// delaying wakeups, and then making the producer to filter multiple frames at
+// once.
+// In all cases, the filters can still read/write if the producer/consumer got
+// woken up by something else.
+// If the current queue contains more frames than the new config allows, the
+// queue will remain over-allocated until these frames have been read.
+void mp_async_queue_set_config(struct mp_async_queue *queue,
+ struct mp_async_queue_config cfg);