summaryrefslogtreecommitdiffstats
path: root/filters
diff options
context:
space:
mode:
authorwm4 <wm4@nowhere>2020-02-29 21:40:52 +0100
committerwm4 <wm4@nowhere>2020-02-29 21:52:00 +0100
commita3823ce0e0353fa4ae4b75b0ff2cc17e61969005 (patch)
tree7d2ebbf91040c39fdfe6a7d7c56e0414bc511bee /filters
parent485f335b697069a04516beaa4e3cc118d92789e5 (diff)
downloadmpv-a3823ce0e0353fa4ae4b75b0ff2cc17e61969005.tar.bz2
mpv-a3823ce0e0353fa4ae4b75b0ff2cc17e61969005.tar.xz
player: add optional separate video decoding thread
See manpage additions. This has been a topic in MPlayer/mplayer2/mpv since forever. But since libavcodec multi-threaded decoding was added, I've always considered this pointless. libavcodec requires you to "preload" it with packets, and then you can pretty much avoid blocking on it, if decoding is fast enough. But in some cases, a decoupled decoder thread _might_ help. Users have for example come up with cases where decoding video in a separate process and piping it as raw video to mpv helped. (Or my memory is false, and it was about vapoursynth filtering, who knows.) So let's just see whether this helps with anything. Note that this would have been _much_ easier if libavcodec had an asynchronous (or rather, non-blocking) API. It could probably have easily gained that with a small change to its multi-threading code and a small extension to its API, but I guess not. Unfortunately, this uglifies f_decoder_wrapper quite a lot. Part of this is due to annoying corner cases like legacy frame dropping and hardware decoder state. These could probably be prettified later on. There is also a change in playloop.c: this is because there is a need to coordinate playback resets between demuxer thread, decoder thread, and playback logic. I think this SEEK_BLOCK idea worked out reasonably well. There are still a number of problems. For example, if the demuxer cache is full, the decoder thread will simply block hard until the output queue is full, which interferes with seeking. Could also be improved later. Hardware decoding will probably die in a fire, because it will run out of surfaces quickly. We could reduce the queue to size 1... maybe later. We could update the queue options at runtime easily, but currently I'm not going to bother. I could only have put the lavc wrapper itself on a separate thread. But there is some annoying interaction with EDL and backward playback shit, and also you would have had to loop demuxer packets through the playloop, so this sounded less annoying. The food my mother made for us today was delicious. Because audio uses the same code, also for audio (even if completely pointless). Fixes: #6926
Diffstat (limited to 'filters')
-rw-r--r--filters/f_decoder_wrapper.c379
1 files changed, 325 insertions, 54 deletions
diff --git a/filters/f_decoder_wrapper.c b/filters/f_decoder_wrapper.c
index 23e796a6bc..73d3076294 100644
--- a/filters/f_decoder_wrapper.c
+++ b/filters/f_decoder_wrapper.c
@@ -20,6 +20,7 @@
#include <stdbool.h>
#include <math.h>
#include <assert.h>
+#include <pthread.h>
#include <libavutil/buffer.h>
#include <libavutil/common.h>
@@ -30,6 +31,7 @@
#include "common/msg.h"
#include "options/m_config.h"
#include "osdep/timer.h"
+#include "osdep/threads.h"
#include "demux/demux.h"
#include "demux/packet.h"
@@ -37,6 +39,7 @@
#include "common/codecs.h"
#include "common/global.h"
#include "common/recorder.h"
+#include "misc/dispatch.h"
#include "audio/aframe.h"
#include "video/out/vo.h"
@@ -44,21 +47,65 @@
#include "demux/stheader.h"
+#include "f_async_queue.h"
#include "f_decoder_wrapper.h"
#include "f_demux_in.h"
#include "filter_internal.h"
+struct dec_queue_opts {
+ int use_queue;
+ int64_t max_bytes;
+ int64_t max_samples;
+ double max_duration;
+};
+
+#define OPT_BASE_STRUCT struct dec_queue_opts
+
+static const struct m_option dec_queue_opts_list[] = {
+ OPT_FLAG("enable", use_queue, 0),
+ OPT_DOUBLE("max-secs", max_duration, M_OPT_MIN, .min = 0),
+ OPT_BYTE_SIZE("max-bytes", max_bytes, 0, 0, (size_t)-1),
+ OPT_INT64("max-samples", max_samples, M_OPT_MIN, 0, 0),
+ {0}
+};
+
+const struct m_sub_options vdec_queue_conf = {
+ .opts = dec_queue_opts_list,
+ .size = sizeof(struct dec_queue_opts),
+ .defaults = &(const struct dec_queue_opts){
+ .use_queue = 0,
+ .max_bytes = 512 * 1024 * 1024,
+ .max_samples = 50,
+ .max_duration = 2,
+ },
+};
+
+const struct m_sub_options adec_queue_conf = {
+ .opts = dec_queue_opts_list,
+ .size = sizeof(struct dec_queue_opts),
+ .defaults = &(const struct dec_queue_opts){
+ .use_queue = 0,
+ .max_bytes = 1 * 1024 * 1024,
+ .max_samples = 48000,
+ .max_duration = 1,
+ },
+};
+
struct priv {
- struct mp_filter *f;
struct mp_log *log;
+ struct sh_stream *header;
+
+ // --- The following fields are to be accessed by dec_dispatch (or if that
+ // field is NULL, by the mp_decoder_wrapper user thread).
+ // Use thread_lock() for access outside of the decoder thread.
+
+ bool request_terminate_dec_thread;
+ struct mp_filter *dec_root_filter; // thread root filter; no thread => NULL
+ struct mp_filter *decf; // wrapper filter which drives the decoder
struct m_config_cache *opt_cache;
- struct sh_stream *header;
struct mp_codec_params *codec;
-
struct mp_decoder *decoder;
- char *decoder_desc;
- bool try_spdif;
// Demuxer output.
struct mp_pin *demux;
@@ -104,14 +151,65 @@ struct priv {
struct mp_frame decoded_coverart;
int coverart_returned; // 0: no, 1: coverart frame itself, 2: EOF returned
- int attempt_framedrops; // try dropping this many frames
- int dropped_frames; // total frames _probably_ dropped
- bool pts_reset;
int play_dir;
+ // --- The following fields can be accessed only from the mp_decoder_wrapper
+ // user thread.
struct mp_decoder_wrapper public;
+
+ // --- Specific access depending on threading stuff.
+ struct mp_async_queue *queue; // decoded frame output queue
+ struct mp_dispatch_queue *dec_dispatch; // non-NULL if decoding thread used
+ bool dec_thread_lock; // debugging (esp. for no-thread case)
+ pthread_t dec_thread;
+ bool dec_thread_valid;
+ pthread_mutex_t cache_lock;
+
+ // --- Protected by cache_lock.
+ char *cur_hwdec;
+ char *decoder_desc;
+ bool try_spdif;
+ bool pts_reset;
+ int attempt_framedrops; // try dropping this many frames
+ int dropped_frames; // total frames _probably_ dropped
};
+// Update cached values for main thread which require access to the decoder
+// thread state. Must run on/locked with decoder thread.
+static void update_cached_values(struct priv *p)
+{
+ pthread_mutex_lock(&p->cache_lock);
+
+ p->cur_hwdec = NULL;
+ if (p->decoder && p->decoder->control)
+ p->decoder->control(p->decoder->f, VDCTRL_GET_HWDEC, &p->cur_hwdec);
+
+ pthread_mutex_unlock(&p->cache_lock);
+}
+
+// Lock the decoder thread. This may synchronously wait until the decoder thread
+// is done with its current work item (such as waiting for a frame), and thus
+// may block for a while. (I.e. avoid during normal playback.)
+// If no decoder thread is running, this is a no-op, except for some debug stuff.
+static void thread_lock(struct priv *p)
+{
+ if (p->dec_dispatch)
+ mp_dispatch_lock(p->dec_dispatch);
+
+ assert(!p->dec_thread_lock);
+ p->dec_thread_lock = true;
+}
+
+// Undo thread_lock().
+static void thread_unlock(struct priv *p)
+{
+ assert(p->dec_thread_lock);
+ p->dec_thread_lock = false;
+
+ if (p->dec_dispatch)
+ mp_dispatch_unlock(p->dec_dispatch);
+}
+
// This resets only the decoder. Unlike a full reset(), this doesn't imply a
// seek reset. This distinction exists only when using timeline stuff (EDL and
// ordered chapters). timeline stuff needs to reset the decoder state, but keep
@@ -137,15 +235,19 @@ static void reset_decoder(struct priv *p)
mp_filter_reset(p->decoder->f);
}
-static void reset(struct mp_filter *f)
+static void decf_reset(struct mp_filter *f)
{
struct priv *p = f->priv;
+ assert(p->decf == f);
p->pts = MP_NOPTS_VALUE;
p->last_format = p->fixed_format = (struct mp_image_params){0};
- p->dropped_frames = 0;
- p->attempt_framedrops = 0;
+
+ pthread_mutex_lock(&p->cache_lock);
p->pts_reset = false;
+ p->attempt_framedrops = 0;
+ p->dropped_frames = 0;
+ pthread_mutex_unlock(&p->cache_lock);
p->coverart_returned = 0;
@@ -162,20 +264,33 @@ int mp_decoder_wrapper_control(struct mp_decoder_wrapper *d,
enum dec_ctrl cmd, void *arg)
{
struct priv *p = d->f->priv;
- if (p->decoder && p->decoder->control)
- return p->decoder->control(p->decoder->f, cmd, arg);
- return CONTROL_UNKNOWN;
+ int res = CONTROL_UNKNOWN;
+ if (cmd == VDCTRL_GET_HWDEC) {
+ pthread_mutex_lock(&p->cache_lock);
+ *(char **)arg = p->cur_hwdec;
+ pthread_mutex_unlock(&p->cache_lock);
+ } else {
+ thread_lock(p);
+ if (p->decoder && p->decoder->control)
+ res = p->decoder->control(p->decoder->f, cmd, arg);
+ update_cached_values(p);
+ thread_unlock(p);
+ }
+ return res;
}
-static void destroy(struct mp_filter *f)
+static void decf_destroy(struct mp_filter *f)
{
struct priv *p = f->priv;
+ assert(p->decf == f);
+
if (p->decoder) {
MP_DBG(f, "Uninit decoder.\n");
talloc_free(p->decoder->f);
p->decoder = NULL;
}
- reset(f);
+
+ decf_reset(f);
mp_frame_unref(&p->decoded_coverart);
}
@@ -193,9 +308,8 @@ struct mp_decoder_list *audio_decoder_list(void)
return list;
}
-bool mp_decoder_wrapper_reinit(struct mp_decoder_wrapper *d)
+static bool reinit_decoder(struct priv *p)
{
- struct priv *p = d->f->priv;
struct MPOpts *opts = p->opt_cache->opts;
if (p->decoder)
@@ -222,7 +336,11 @@ bool mp_decoder_wrapper_reinit(struct mp_decoder_wrapper *d)
user_list = opts->audio_decoders;
fallback = "aac";
- if (p->try_spdif && p->codec->codec) {
+ pthread_mutex_lock(&p->cache_lock);
+ bool try_spdif = p->try_spdif;
+ pthread_mutex_unlock(&p->cache_lock);
+
+ if (try_spdif && p->codec->codec) {
struct mp_decoder_list *spdif =
select_spdif_codec(p->codec->codec, opts->audio_spdif);
if (spdif->num_entries) {
@@ -251,11 +369,13 @@ bool mp_decoder_wrapper_reinit(struct mp_decoder_wrapper *d)
struct mp_decoder_entry *sel = &list->entries[n];
MP_VERBOSE(p, "Opening decoder %s\n", sel->decoder);
- p->decoder = driver->create(p->f, p->codec, sel->decoder);
+ p->decoder = driver->create(p->decf, p->codec, sel->decoder);
if (p->decoder) {
+ pthread_mutex_lock(&p->cache_lock);
p->decoder_desc =
talloc_asprintf(p, "%s (%s)", sel->decoder, sel->desc);
MP_VERBOSE(p, "Selected codec: %s\n", p->decoder_desc);
+ pthread_mutex_unlock(&p->cache_lock);
break;
}
@@ -267,51 +387,79 @@ bool mp_decoder_wrapper_reinit(struct mp_decoder_wrapper *d)
p->codec->codec ? p->codec->codec : "<?>");
}
+ update_cached_values(p);
+
talloc_free(list);
return !!p->decoder;
}
+bool mp_decoder_wrapper_reinit(struct mp_decoder_wrapper *d)
+{
+ struct priv *p = d->f->priv;
+ thread_lock(p);
+ bool res = reinit_decoder(p);
+ thread_unlock(p);
+ return res;
+}
+
void mp_decoder_wrapper_get_desc(struct mp_decoder_wrapper *d,
char *buf, size_t buf_size)
{
struct priv *p = d->f->priv;
+ pthread_mutex_lock(&p->cache_lock);
snprintf(buf, buf_size, "%s", p->decoder_desc ? p->decoder_desc : "");
+ pthread_mutex_unlock(&p->cache_lock);
}
void mp_decoder_wrapper_set_frame_drops(struct mp_decoder_wrapper *d, int num)
{
struct priv *p = d->f->priv;
+ pthread_mutex_lock(&p->cache_lock);
p->attempt_framedrops = num;
+ pthread_mutex_unlock(&p->cache_lock);
}
int mp_decoder_wrapper_get_frames_dropped(struct mp_decoder_wrapper *d)
{
struct priv *p = d->f->priv;
- return p->dropped_frames;
+ pthread_mutex_lock(&p->cache_lock);
+ int res = p->dropped_frames;
+ pthread_mutex_unlock(&p->cache_lock);
+ return res;
}
double mp_decoder_wrapper_get_container_fps(struct mp_decoder_wrapper *d)
{
struct priv *p = d->f->priv;
- return p->fps;
+ thread_lock(p);
+ double res = p->fps;
+ thread_unlock(p);
+ return res;
}
void mp_decoder_wrapper_set_spdif_flag(struct mp_decoder_wrapper *d, bool spdif)
{
struct priv *p = d->f->priv;
+ pthread_mutex_lock(&p->cache_lock);
p->try_spdif = spdif;
+ pthread_mutex_unlock(&p->cache_lock);
}
bool mp_decoder_wrapper_get_pts_reset(struct mp_decoder_wrapper *d)
{
struct priv *p = d->f->priv;
- return p->pts_reset;
+ pthread_mutex_lock(&p->cache_lock);
+ bool res = p->pts_reset;
+ pthread_mutex_unlock(&p->cache_lock);
+ return res;
}
void mp_decoder_wrapper_set_play_dir(struct mp_decoder_wrapper *d, int dir)
{
struct priv *p = d->f->priv;
+ thread_lock(p);
p->play_dir = dir;
+ thread_unlock(p);
}
static bool is_valid_peak(float sig_peak)
@@ -545,8 +693,11 @@ static void correct_audio_pts(struct priv *p, struct mp_aframe *aframe)
// than enough.
if (p->pts != MP_NOPTS_VALUE && diff > 0.1) {
MP_WARN(p, "Invalid audio PTS: %f -> %f\n", p->pts, frame_pts);
- if (diff >= 5)
+ if (diff >= 5) {
+ pthread_mutex_lock(&p->cache_lock);
p->pts_reset = true;
+ pthread_mutex_unlock(&p->cache_lock);
+ }
}
// Keep the interpolated timestamp if it doesn't deviate more
@@ -617,7 +768,7 @@ static void feed_packet(struct priv *p)
if (p->packet.type != MP_FRAME_EOF && p->packet.type != MP_FRAME_PACKET) {
MP_ERR(p, "invalid frame type from demuxer\n");
mp_frame_unref(&p->packet);
- mp_filter_internal_mark_failed(p->f);
+ mp_filter_internal_mark_failed(p->decf);
return;
}
}
@@ -645,8 +796,10 @@ static void feed_packet(struct priv *p)
int framedrop_type = 0;
+ pthread_mutex_lock(&p->cache_lock);
if (p->attempt_framedrops)
framedrop_type = 1;
+ pthread_mutex_unlock(&p->cache_lock);
if (start_pts != MP_NOPTS_VALUE && packet && p->play_dir > 0 &&
packet->pts < start_pts - .005 && !p->has_broken_packet_pts)
@@ -655,7 +808,7 @@ static void feed_packet(struct priv *p)
p->decoder->control(p->decoder->f, VDCTRL_SET_FRAMEDROP, &framedrop_type);
}
- if (p->public.recorder_sink)
+ if (!p->dec_dispatch && p->public.recorder_sink)
mp_recorder_feed_packet(p->public.recorder_sink, packet);
double pkt_pts = packet ? packet->pts : MP_NOPTS_VALUE;
@@ -714,7 +867,7 @@ static void enqueue_backward_frame(struct priv *p, struct mp_frame frame)
static void read_frame(struct priv *p)
{
- struct mp_pin *pin = p->f->ppins[0];
+ struct mp_pin *pin = p->decf->ppins[0];
struct mp_frame frame = {0};
if (!p->decoder || !mp_pin_in_needs_data(pin))
@@ -746,22 +899,25 @@ static void read_frame(struct priv *p)
if (p->header->attached_picture && frame.type == MP_FRAME_VIDEO) {
p->decoded_coverart = frame;
- mp_filter_internal_mark_progress(p->f);
+ mp_filter_internal_mark_progress(p->decf);
return;
}
+ pthread_mutex_lock(&p->cache_lock);
if (p->attempt_framedrops) {
int dropped = MPMAX(0, p->packets_without_output - 1);
p->attempt_framedrops = MPMAX(0, p->attempt_framedrops - dropped);
p->dropped_frames += dropped;
}
+ pthread_mutex_unlock(&p->cache_lock);
+
p->packets_without_output = 0;
if (p->preroll_discard && frame.type != MP_FRAME_EOF) {
double ts = mp_frame_get_pts(frame);
if (ts == MP_NOPTS_VALUE) {
mp_frame_unref(&frame);
- mp_filter_internal_mark_progress(p->f);
+ mp_filter_internal_mark_progress(p->decf);
return;
}
p->preroll_discard = false;
@@ -785,7 +941,7 @@ static void read_frame(struct priv *p)
if (p->codec != new_segment->codec) {
p->codec = new_segment->codec;
if (!mp_decoder_wrapper_reinit(&p->public))
- mp_filter_internal_mark_failed(p->f);
+ mp_filter_internal_mark_failed(p->decf);
}
p->start = new_segment->start;
@@ -796,11 +952,11 @@ static void read_frame(struct priv *p)
p->reverse_queue_complete = p->num_reverse_queue > 0;
p->packet = MAKE_FRAME(MP_FRAME_PACKET, new_segment);
- mp_filter_internal_mark_progress(p->f);
+ mp_filter_internal_mark_progress(p->decf);
}
if (!frame.type) {
- mp_filter_internal_mark_progress(p->f); // make it retry
+ mp_filter_internal_mark_progress(p->decf); // make it retry
return;
}
@@ -809,46 +965,116 @@ output_frame:
mp_pin_in_write(pin, frame);
}
-static void process(struct mp_filter *f)
+static void decf_process(struct mp_filter *f)
{
struct priv *p = f->priv;
+ assert(p->decf == f);
+
m_config_cache_update(p->opt_cache);
feed_packet(p);
read_frame(p);
}
-static const struct mp_filter_info decode_wrapper_filter = {
+static void *dec_thread(void *ptr)
+{
+ struct priv *p = ptr;
+
+ char *t_name = "?";
+ switch (p->header->type) {
+ case STREAM_VIDEO: t_name = "vdec"; break;
+ case STREAM_AUDIO: t_name = "adec"; break;
+ }
+ mpthread_set_name(t_name);
+
+ while (!p->request_terminate_dec_thread) {
+ mp_filter_run(p->dec_root_filter);
+ update_cached_values(p);
+ mp_dispatch_queue_process(p->dec_dispatch, INFINITY);
+ }
+
+ return NULL;
+}
+
+static void public_f_reset(struct mp_filter *f)
+{
+ struct priv *p = f->priv;
+ assert(p->public.f == f);
+
+ if (p->queue) {
+ mp_async_queue_reset(p->queue);
+ thread_lock(p);
+ if (p->dec_root_filter)
+ mp_filter_reset(p->dec_root_filter);
+ mp_dispatch_interrupt(p->dec_dispatch);
+ thread_unlock(p);
+ mp_async_queue_resume(p->queue);
+ }
+}
+
+static void public_f_destroy(struct mp_filter *f)
+{
+ struct priv *p = f->priv;
+ assert(p->public.f == f);
+
+ if (p->dec_thread_valid) {
+ assert(p->dec_dispatch);
+ thread_lock(p);
+ p->request_terminate_dec_thread = 1;
+ mp_dispatch_interrupt(p->dec_dispatch);
+ thread_unlock(p);
+ pthread_join(p->dec_thread, NULL);
+ p->dec_thread_valid = false;
+ }
+
+ talloc_free(p->dec_root_filter);
+ talloc_free(p->queue);
+ pthread_mutex_destroy(&p->cache_lock);
+}
+
+static const struct mp_filter_info decf_filter = {
.name = "decode",
+ .process = decf_process,
+ .reset = decf_reset,
+ .destroy = decf_destroy,
+};
+
+static const struct mp_filter_info decode_wrapper_filter = {
+ .name = "decode_wrapper",
.priv_size = sizeof(struct priv),
- .process = process,
- .reset = reset,
- .destroy = destroy,
+ .reset = public_f_reset,
+ .destroy = public_f_destroy,
};
+static void wakeup_dec_thread(void *ptr)
+{
+ struct priv *p = ptr;
+
+ mp_dispatch_interrupt(p->dec_dispatch);
+}
+
struct mp_decoder_wrapper *mp_decoder_wrapper_create(struct mp_filter *parent,
struct sh_stream *src)
{
- struct mp_filter *f = mp_filter_create(parent, &decode_wrapper_filter);
- if (!f)
+ struct mp_filter *public_f = mp_filter_create(parent, &decode_wrapper_filter);
+ if (!public_f)
return NULL;
- struct priv *p = f->priv;
- struct mp_decoder_wrapper *w = &p->public;
- p->opt_cache = m_config_cache_alloc(p, f->global, &mp_opt_root);
- p->log = f->log;
- p->f = f;
+ struct priv *p = public_f->priv;
+ p->public.f = public_f;
+
+ pthread_mutex_init(&p->cache_lock, NULL);
+ p->opt_cache = m_config_cache_alloc(p, public_f->global, &mp_opt_root);
+ struct MPOpts *opts = p->opt_cache->opts;
p->header = src;
p->codec = p->header->codec;
p->play_dir = 1;
- w->f = f;
-
- struct MPOpts *opts = p->opt_cache->opts;
+ mp_filter_add_pin(public_f, MP_PIN_OUT, "out");
- mp_filter_add_pin(f, MP_PIN_OUT, "out");
+ struct dec_queue_opts *queue_opts = NULL;
if (p->header->type == STREAM_VIDEO) {
- p->log = f->log = mp_log_new(f, parent->log, "!vd");
+ p->log = mp_log_new(p, public_f->log, "!vd");
p->fps = src->codec->fps;
@@ -859,20 +1085,65 @@ struct mp_decoder_wrapper *mp_decoder_wrapper_create(struct mp_filter *parent,
MP_INFO(p, "FPS forced to %5.3f.\n", p->fps);
MP_INFO(p, "Use --no-correct-pts to force FPS based timing.\n");
}
+
+ queue_opts = opts->vdec_queue_opts;
} else if (p->header->type == STREAM_AUDIO) {
- p->log = f->log = mp_log_new(f, parent->log, "!ad");
+ p->log = mp_log_new(p, public_f->log, "!ad");
+ queue_opts = opts->adec_queue_opts;
+ } else {
+ goto error;
}
- struct mp_filter *demux = mp_demux_in_create(f, p->header);
+ if (queue_opts && queue_opts->use_queue) {
+ p->queue = mp_async_queue_create();
+ p->dec_dispatch = mp_dispatch_create(p);
+ p->dec_root_filter = mp_filter_create_root(public_f->global);
+ mp_filter_root_set_wakeup_cb(p->dec_root_filter, wakeup_dec_thread, p);
+
+ struct mp_async_queue_config cfg = {
+ .max_bytes = queue_opts->max_bytes,
+ .sample_unit = AQUEUE_UNIT_SAMPLES,
+ .max_samples = queue_opts->max_samples,
+ .max_duration = queue_opts->max_duration,
+ };
+ mp_async_queue_set_config(p->queue, cfg);
+ }
+
+ p->decf = mp_filter_create(p->dec_root_filter ? p->dec_root_filter : public_f,
+ &decf_filter);
+ p->decf->priv = p;
+ p->decf->log = p->log;
+ mp_filter_add_pin(p->decf, MP_PIN_OUT, "out");
+
+ struct mp_filter *demux = mp_demux_in_create(p->decf, p->header);
if (!demux)
goto error;
p->demux = demux->pins[0];
- reset(f);
+ decf_reset(p->decf);
+
+ if (p->queue) {
+ struct mp_filter *f_in =
+ mp_async_queue_create_filter(public_f, MP_PIN_OUT, p->queue);
+ struct mp_filter *f_out =
+ mp_async_queue_create_filter(p->decf, MP_PIN_IN, p->queue);
+ mp_pin_connect(public_f->ppins[0], f_in->pins[0]);
+ mp_pin_connect(f_out->pins[0], p->decf->pins[0]);
+
+ p->dec_thread_valid = true;
+ if (pthread_create(&p->dec_thread, NULL, dec_thread, p)) {
+ p->dec_thread_valid = false;
+ goto error;
+ }
+ } else {
+ mp_pin_connect(public_f->ppins[0], p->decf->pins[0]);
+ }
+
+ public_f_reset(public_f);
- return w;
+ return &p->public;
error:
- talloc_free(f);
+ talloc_free(public_f);
return NULL;
}