summaryrefslogtreecommitdiffstats
path: root/demux
diff options
context:
space:
mode:
Diffstat (limited to 'demux')
-rw-r--r--demux/demux.c67
-rw-r--r--demux/demux.h2
2 files changed, 56 insertions, 13 deletions
diff --git a/demux/demux.c b/demux/demux.c
index ed6f2c1b31..64d2a7698a 100644
--- a/demux/demux.c
+++ b/demux/demux.c
@@ -268,6 +268,9 @@ struct demux_stream {
int index; // equals to sh->index
// --- all fields are protected by in->lock
+ void (*wakeup_cb)(void *ctx);
+ void *wakeup_cb_ctx;
+
// demuxer state
bool selected; // user wants packets from this stream
bool eager; // try to keep at least 1 packet queued
@@ -293,6 +296,7 @@ struct demux_stream {
struct demux_packet *reader_head; // points at current decoder position
bool skip_to_keyframe;
bool attached_picture_added;
+ bool need_wakeup; // call wakeup_cb on next reader_head state change
// for refresh seeks: pos/dts of last packet returned to reader
int64_t last_ret_pos;
@@ -580,6 +584,7 @@ static void ds_clear_reader_queue_state(struct demux_stream *ds)
ds->fw_bytes = 0;
ds->fw_packs = 0;
ds->eof = false;
+ ds->need_wakeup = true;
}
static void ds_clear_reader_state(struct demux_stream *ds)
@@ -595,6 +600,22 @@ static void ds_clear_reader_state(struct demux_stream *ds)
ds->last_ret_dts = MP_NOPTS_VALUE;
}
+// Call if the observed reader state on this stream somehow changes. The wakeup
+// is skipped if the reader successfully read a packet, because that means we
+// expect it to come back and ask for more.
+static void wakeup_ds(struct demux_stream *ds)
+{
+ if (ds->need_wakeup) {
+ if (ds->wakeup_cb) {
+ ds->wakeup_cb(ds->wakeup_cb_ctx);
+ } else if (ds->in->wakeup_cb) {
+ ds->in->wakeup_cb(ds->in->wakeup_cb_ctx);
+ }
+ ds->need_wakeup = false;
+ pthread_cond_signal(&ds->in->wakeup);
+ }
+}
+
static void update_stream_selection_state(struct demux_internal *in,
struct demux_stream *ds)
{
@@ -646,6 +667,8 @@ static void update_stream_selection_state(struct demux_internal *in,
}
free_empty_cached_ranges(in);
+
+ wakeup_ds(ds);
}
void demux_set_ts_offset(struct demuxer *demuxer, double offset)
@@ -1273,10 +1296,7 @@ void demux_add_packet(struct sh_stream *stream, demux_packet_t *dp)
}
}
- // Wake up if this was the first packet after start/possible underrun.
- if (ds->in->wakeup_cb && ds->reader_head && !ds->reader_head->next)
- ds->in->wakeup_cb(ds->in->wakeup_cb_ctx);
- pthread_cond_signal(&in->wakeup);
+ wakeup_ds(ds);
pthread_mutex_unlock(&in->lock);
}
@@ -1325,11 +1345,8 @@ static bool read_packet(struct demux_internal *in)
for (int n = 0; n < in->num_streams; n++) {
struct demux_stream *ds = in->streams[n]->ds;
bool eof = !ds->reader_head;
- if (eof && !ds->eof) {
- if (in->wakeup_cb)
- in->wakeup_cb(in->wakeup_cb_ctx);
- pthread_cond_signal(&in->wakeup);
- }
+ if (eof && ds->eof)
+ wakeup_ds(ds);
ds->eof |= eof;
}
return false;
@@ -1365,6 +1382,8 @@ static bool read_packet(struct demux_internal *in)
if (!ds->eof)
adjust_seek_range_on_packet(ds, NULL);
ds->eof = true;
+ if (!in->last_eof && ds->wakeup_cb)
+ wakeup_ds(ds);
}
// If we had EOF previously, then don't wakeup (avoids wakeup loop)
if (!in->last_eof) {
@@ -1629,6 +1648,7 @@ struct demux_packet *demux_read_packet(struct sh_stream *sh)
const char *t = stream_type_name(ds->type);
MP_DBG(in, "reading packet for %s\n", t);
in->eof = false; // force retry
+ ds->need_wakeup = true;
while (ds->selected && !ds->reader_head && !in->blocked) {
in->reading = true;
// Note: the following code marks EOF if it can't continue
@@ -1679,12 +1699,16 @@ int demux_read_packet_async(struct sh_stream *sh, struct demux_packet **out_pkt)
} else {
r = *out_pkt ? 1 : -1;
}
+ ds->need_wakeup = r != 1;
pthread_mutex_unlock(&ds->in->lock);
- } else if (ds->in->blocked) {
- r = 0;
} else {
- *out_pkt = demux_read_packet(sh);
- r = *out_pkt ? 1 : -1;
+ if (ds->in->blocked) {
+ r = 0;
+ } else {
+ *out_pkt = demux_read_packet(sh);
+ r = *out_pkt ? 1 : -1;
+ }
+ ds->need_wakeup = r != 1;
}
return r;
}
@@ -2515,6 +2539,9 @@ int demux_seek(demuxer_t *demuxer, double seek_pts, int flags)
in->seek_pts = seek_pts;
}
+ for (int n = 0; n < in->num_streams; n++)
+ wakeup_ds(in->streams[n]->ds);
+
if (!in->threading && in->seeking)
execute_seek(in);
@@ -2662,6 +2689,16 @@ bool demux_stream_is_selected(struct sh_stream *stream)
return r;
}
+void demux_set_stream_wakeup_cb(struct sh_stream *sh,
+ void (*cb)(void *ctx), void *ctx)
+{
+ pthread_mutex_lock(&sh->ds->in->lock);
+ sh->ds->wakeup_cb = cb;
+ sh->ds->wakeup_cb_ctx = ctx;
+ sh->ds->need_wakeup = true;
+ pthread_mutex_unlock(&sh->ds->in->lock);
+}
+
int demuxer_add_attachment(demuxer_t *demuxer, char *name, char *type,
void *data, size_t data_size)
{
@@ -2738,6 +2775,10 @@ void demux_block_reading(struct demuxer *demuxer, bool block)
pthread_mutex_lock(&in->lock);
in->blocked = block;
+ for (int n = 0; n < in->num_streams; n++) {
+ in->streams[n]->ds->need_wakeup = true;
+ wakeup_ds(in->streams[n]->ds);
+ }
pthread_cond_signal(&in->wakeup);
pthread_mutex_unlock(&in->lock);
}
diff --git a/demux/demux.h b/demux/demux.h
index be6c336a95..b3088db8c4 100644
--- a/demux/demux.h
+++ b/demux/demux.h
@@ -255,6 +255,8 @@ struct demux_packet *demux_read_packet(struct sh_stream *sh);
int demux_read_packet_async(struct sh_stream *sh, struct demux_packet **out_pkt);
bool demux_stream_is_selected(struct sh_stream *stream);
bool demux_has_packet(struct sh_stream *sh);
+void demux_set_stream_wakeup_cb(struct sh_stream *sh,
+ void (*cb)(void *ctx), void *ctx);
struct demux_packet *demux_read_any_packet(struct demuxer *demuxer);
struct sh_stream *demux_get_stream(struct demuxer *demuxer, int index);