diff options
Diffstat (limited to 'demux/demux.c')
-rw-r--r-- | demux/demux.c | 67 |
1 files changed, 54 insertions, 13 deletions
diff --git a/demux/demux.c b/demux/demux.c index ed6f2c1b31..64d2a7698a 100644 --- a/demux/demux.c +++ b/demux/demux.c @@ -268,6 +268,9 @@ struct demux_stream { int index; // equals to sh->index // --- all fields are protected by in->lock + void (*wakeup_cb)(void *ctx); + void *wakeup_cb_ctx; + // demuxer state bool selected; // user wants packets from this stream bool eager; // try to keep at least 1 packet queued @@ -293,6 +296,7 @@ struct demux_stream { struct demux_packet *reader_head; // points at current decoder position bool skip_to_keyframe; bool attached_picture_added; + bool need_wakeup; // call wakeup_cb on next reader_head state change // for refresh seeks: pos/dts of last packet returned to reader int64_t last_ret_pos; @@ -580,6 +584,7 @@ static void ds_clear_reader_queue_state(struct demux_stream *ds) ds->fw_bytes = 0; ds->fw_packs = 0; ds->eof = false; + ds->need_wakeup = true; } static void ds_clear_reader_state(struct demux_stream *ds) @@ -595,6 +600,22 @@ static void ds_clear_reader_state(struct demux_stream *ds) ds->last_ret_dts = MP_NOPTS_VALUE; } +// Call if the observed reader state on this stream somehow changes. The wakeup +// is skipped if the reader successfully read a packet, because that means we +// expect it to come back and ask for more. +static void wakeup_ds(struct demux_stream *ds) +{ + if (ds->need_wakeup) { + if (ds->wakeup_cb) { + ds->wakeup_cb(ds->wakeup_cb_ctx); + } else if (ds->in->wakeup_cb) { + ds->in->wakeup_cb(ds->in->wakeup_cb_ctx); + } + ds->need_wakeup = false; + pthread_cond_signal(&ds->in->wakeup); + } +} + static void update_stream_selection_state(struct demux_internal *in, struct demux_stream *ds) { @@ -646,6 +667,8 @@ static void update_stream_selection_state(struct demux_internal *in, } free_empty_cached_ranges(in); + + wakeup_ds(ds); } void demux_set_ts_offset(struct demuxer *demuxer, double offset) @@ -1273,10 +1296,7 @@ void demux_add_packet(struct sh_stream *stream, demux_packet_t *dp) } } - // Wake up if this was the first packet after start/possible underrun. - if (ds->in->wakeup_cb && ds->reader_head && !ds->reader_head->next) - ds->in->wakeup_cb(ds->in->wakeup_cb_ctx); - pthread_cond_signal(&in->wakeup); + wakeup_ds(ds); pthread_mutex_unlock(&in->lock); } @@ -1325,11 +1345,8 @@ static bool read_packet(struct demux_internal *in) for (int n = 0; n < in->num_streams; n++) { struct demux_stream *ds = in->streams[n]->ds; bool eof = !ds->reader_head; - if (eof && !ds->eof) { - if (in->wakeup_cb) - in->wakeup_cb(in->wakeup_cb_ctx); - pthread_cond_signal(&in->wakeup); - } + if (eof && ds->eof) + wakeup_ds(ds); ds->eof |= eof; } return false; @@ -1365,6 +1382,8 @@ static bool read_packet(struct demux_internal *in) if (!ds->eof) adjust_seek_range_on_packet(ds, NULL); ds->eof = true; + if (!in->last_eof && ds->wakeup_cb) + wakeup_ds(ds); } // If we had EOF previously, then don't wakeup (avoids wakeup loop) if (!in->last_eof) { @@ -1629,6 +1648,7 @@ struct demux_packet *demux_read_packet(struct sh_stream *sh) const char *t = stream_type_name(ds->type); MP_DBG(in, "reading packet for %s\n", t); in->eof = false; // force retry + ds->need_wakeup = true; while (ds->selected && !ds->reader_head && !in->blocked) { in->reading = true; // Note: the following code marks EOF if it can't continue @@ -1679,12 +1699,16 @@ int demux_read_packet_async(struct sh_stream *sh, struct demux_packet **out_pkt) } else { r = *out_pkt ? 1 : -1; } + ds->need_wakeup = r != 1; pthread_mutex_unlock(&ds->in->lock); - } else if (ds->in->blocked) { - r = 0; } else { - *out_pkt = demux_read_packet(sh); - r = *out_pkt ? 1 : -1; + if (ds->in->blocked) { + r = 0; + } else { + *out_pkt = demux_read_packet(sh); + r = *out_pkt ? 1 : -1; + } + ds->need_wakeup = r != 1; } return r; } @@ -2515,6 +2539,9 @@ int demux_seek(demuxer_t *demuxer, double seek_pts, int flags) in->seek_pts = seek_pts; } + for (int n = 0; n < in->num_streams; n++) + wakeup_ds(in->streams[n]->ds); + if (!in->threading && in->seeking) execute_seek(in); @@ -2662,6 +2689,16 @@ bool demux_stream_is_selected(struct sh_stream *stream) return r; } +void demux_set_stream_wakeup_cb(struct sh_stream *sh, + void (*cb)(void *ctx), void *ctx) +{ + pthread_mutex_lock(&sh->ds->in->lock); + sh->ds->wakeup_cb = cb; + sh->ds->wakeup_cb_ctx = ctx; + sh->ds->need_wakeup = true; + pthread_mutex_unlock(&sh->ds->in->lock); +} + int demuxer_add_attachment(demuxer_t *demuxer, char *name, char *type, void *data, size_t data_size) { @@ -2738,6 +2775,10 @@ void demux_block_reading(struct demuxer *demuxer, bool block) pthread_mutex_lock(&in->lock); in->blocked = block; + for (int n = 0; n < in->num_streams; n++) { + in->streams[n]->ds->need_wakeup = true; + wakeup_ds(in->streams[n]->ds); + } pthread_cond_signal(&in->wakeup); pthread_mutex_unlock(&in->lock); } |