summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorwm4 <wm4@nowhere>2018-01-29 13:51:47 +0100
committerKevin Mitchell <kevmitch@gmail.com>2018-01-30 03:10:27 -0800
commiteaced0ebb03a078394ed01bcbe641b5b7a4312aa (patch)
tree265322debf482f8f8e3416718c0d77b176882dd9
parent1d5991ef306f5a0306f79ba99ae5a919357b18ff (diff)
downloadmpv-eaced0ebb03a078394ed01bcbe641b5b7a4312aa.tar.bz2
mpv-eaced0ebb03a078394ed01bcbe641b5b7a4312aa.tar.xz
demux: add a per stream wakeup callback
This is supposed to help making data flow easier and wakeup handling more efficient. Once that change is done, reading a packet on any stream won't have to wakeup and poll all decoders (which helps reducing the mess even if all decoders are on the same thread). This also improves the accuracy of wakeups by tracking better whether a wakeup is needed.
-rw-r--r--demux/demux.c67
-rw-r--r--demux/demux.h2
2 files changed, 56 insertions, 13 deletions
diff --git a/demux/demux.c b/demux/demux.c
index ed6f2c1b31..64d2a7698a 100644
--- a/demux/demux.c
+++ b/demux/demux.c
@@ -268,6 +268,9 @@ struct demux_stream {
int index; // equals to sh->index
// --- all fields are protected by in->lock
+ void (*wakeup_cb)(void *ctx);
+ void *wakeup_cb_ctx;
+
// demuxer state
bool selected; // user wants packets from this stream
bool eager; // try to keep at least 1 packet queued
@@ -293,6 +296,7 @@ struct demux_stream {
struct demux_packet *reader_head; // points at current decoder position
bool skip_to_keyframe;
bool attached_picture_added;
+ bool need_wakeup; // call wakeup_cb on next reader_head state change
// for refresh seeks: pos/dts of last packet returned to reader
int64_t last_ret_pos;
@@ -580,6 +584,7 @@ static void ds_clear_reader_queue_state(struct demux_stream *ds)
ds->fw_bytes = 0;
ds->fw_packs = 0;
ds->eof = false;
+ ds->need_wakeup = true;
}
static void ds_clear_reader_state(struct demux_stream *ds)
@@ -595,6 +600,22 @@ static void ds_clear_reader_state(struct demux_stream *ds)
ds->last_ret_dts = MP_NOPTS_VALUE;
}
+// Call if the observed reader state on this stream somehow changes. The wakeup
+// is skipped if the reader successfully read a packet, because that means we
+// expect it to come back and ask for more.
+static void wakeup_ds(struct demux_stream *ds)
+{
+ if (ds->need_wakeup) {
+ if (ds->wakeup_cb) {
+ ds->wakeup_cb(ds->wakeup_cb_ctx);
+ } else if (ds->in->wakeup_cb) {
+ ds->in->wakeup_cb(ds->in->wakeup_cb_ctx);
+ }
+ ds->need_wakeup = false;
+ pthread_cond_signal(&ds->in->wakeup);
+ }
+}
+
static void update_stream_selection_state(struct demux_internal *in,
struct demux_stream *ds)
{
@@ -646,6 +667,8 @@ static void update_stream_selection_state(struct demux_internal *in,
}
free_empty_cached_ranges(in);
+
+ wakeup_ds(ds);
}
void demux_set_ts_offset(struct demuxer *demuxer, double offset)
@@ -1273,10 +1296,7 @@ void demux_add_packet(struct sh_stream *stream, demux_packet_t *dp)
}
}
- // Wake up if this was the first packet after start/possible underrun.
- if (ds->in->wakeup_cb && ds->reader_head && !ds->reader_head->next)
- ds->in->wakeup_cb(ds->in->wakeup_cb_ctx);
- pthread_cond_signal(&in->wakeup);
+ wakeup_ds(ds);
pthread_mutex_unlock(&in->lock);
}
@@ -1325,11 +1345,8 @@ static bool read_packet(struct demux_internal *in)
for (int n = 0; n < in->num_streams; n++) {
struct demux_stream *ds = in->streams[n]->ds;
bool eof = !ds->reader_head;
- if (eof && !ds->eof) {
- if (in->wakeup_cb)
- in->wakeup_cb(in->wakeup_cb_ctx);
- pthread_cond_signal(&in->wakeup);
- }
+ if (eof && ds->eof)
+ wakeup_ds(ds);
ds->eof |= eof;
}
return false;
@@ -1365,6 +1382,8 @@ static bool read_packet(struct demux_internal *in)
if (!ds->eof)
adjust_seek_range_on_packet(ds, NULL);
ds->eof = true;
+ if (!in->last_eof && ds->wakeup_cb)
+ wakeup_ds(ds);
}
// If we had EOF previously, then don't wakeup (avoids wakeup loop)
if (!in->last_eof) {
@@ -1629,6 +1648,7 @@ struct demux_packet *demux_read_packet(struct sh_stream *sh)
const char *t = stream_type_name(ds->type);
MP_DBG(in, "reading packet for %s\n", t);
in->eof = false; // force retry
+ ds->need_wakeup = true;
while (ds->selected && !ds->reader_head && !in->blocked) {
in->reading = true;
// Note: the following code marks EOF if it can't continue
@@ -1679,12 +1699,16 @@ int demux_read_packet_async(struct sh_stream *sh, struct demux_packet **out_pkt)
} else {
r = *out_pkt ? 1 : -1;
}
+ ds->need_wakeup = r != 1;
pthread_mutex_unlock(&ds->in->lock);
- } else if (ds->in->blocked) {
- r = 0;
} else {
- *out_pkt = demux_read_packet(sh);
- r = *out_pkt ? 1 : -1;
+ if (ds->in->blocked) {
+ r = 0;
+ } else {
+ *out_pkt = demux_read_packet(sh);
+ r = *out_pkt ? 1 : -1;
+ }
+ ds->need_wakeup = r != 1;
}
return r;
}
@@ -2515,6 +2539,9 @@ int demux_seek(demuxer_t *demuxer, double seek_pts, int flags)
in->seek_pts = seek_pts;
}
+ for (int n = 0; n < in->num_streams; n++)
+ wakeup_ds(in->streams[n]->ds);
+
if (!in->threading && in->seeking)
execute_seek(in);
@@ -2662,6 +2689,16 @@ bool demux_stream_is_selected(struct sh_stream *stream)
return r;
}
+void demux_set_stream_wakeup_cb(struct sh_stream *sh,
+ void (*cb)(void *ctx), void *ctx)
+{
+ pthread_mutex_lock(&sh->ds->in->lock);
+ sh->ds->wakeup_cb = cb;
+ sh->ds->wakeup_cb_ctx = ctx;
+ sh->ds->need_wakeup = true;
+ pthread_mutex_unlock(&sh->ds->in->lock);
+}
+
int demuxer_add_attachment(demuxer_t *demuxer, char *name, char *type,
void *data, size_t data_size)
{
@@ -2738,6 +2775,10 @@ void demux_block_reading(struct demuxer *demuxer, bool block)
pthread_mutex_lock(&in->lock);
in->blocked = block;
+ for (int n = 0; n < in->num_streams; n++) {
+ in->streams[n]->ds->need_wakeup = true;
+ wakeup_ds(in->streams[n]->ds);
+ }
pthread_cond_signal(&in->wakeup);
pthread_mutex_unlock(&in->lock);
}
diff --git a/demux/demux.h b/demux/demux.h
index be6c336a95..b3088db8c4 100644
--- a/demux/demux.h
+++ b/demux/demux.h
@@ -255,6 +255,8 @@ struct demux_packet *demux_read_packet(struct sh_stream *sh);
int demux_read_packet_async(struct sh_stream *sh, struct demux_packet **out_pkt);
bool demux_stream_is_selected(struct sh_stream *stream);
bool demux_has_packet(struct sh_stream *sh);
+void demux_set_stream_wakeup_cb(struct sh_stream *sh,
+ void (*cb)(void *ctx), void *ctx);
struct demux_packet *demux_read_any_packet(struct demuxer *demuxer);
struct sh_stream *demux_get_stream(struct demuxer *demuxer, int index);