diff options
Diffstat (limited to 'demux/demux.c')
-rw-r--r-- | demux/demux.c | 425 |
1 files changed, 402 insertions, 23 deletions
diff --git a/demux/demux.c b/demux/demux.c index ae6d3a96f6..92121c9b7b 100644 --- a/demux/demux.c +++ b/demux/demux.c @@ -88,6 +88,9 @@ struct demux_opts { int seekable_cache; int create_ccs; char *record_file; + int video_back_preroll; + int audio_back_preroll; + double back_seek_size; }; #define OPT_BASE_STRUCT struct demux_opts @@ -110,6 +113,12 @@ const struct m_sub_options demux_conf = { ({"auto", -1}, {"no", 0}, {"yes", 1})), OPT_FLAG("sub-create-cc-track", create_ccs, 0), OPT_STRING("stream-record", record_file, 0), + OPT_CHOICE_OR_INT("video-backward-overlap", video_back_preroll, 0, 0, + 1024, ({"auto", -1})), + OPT_CHOICE_OR_INT("audio-backward-overlap", audio_back_preroll, 0, 0, + 1024, ({"auto", -1})), + OPT_DOUBLE("demuxer-backward-playback-step", back_seek_size, M_OPT_MIN, + .min = 0), {0} }, .size = sizeof(struct demux_opts), @@ -121,6 +130,9 @@ const struct m_sub_options demux_conf = { .min_secs_cache = 10.0 * 60 * 60, .seekable_cache = -1, .access_references = 1, + .video_back_preroll = -1, + .audio_back_preroll = -1, + .back_seek_size = 60, }, }; @@ -183,6 +195,15 @@ struct demux_internal { // file (or if the demuxer was just opened). bool after_seek_to_start; + // Demuxing backwards. Since demuxer implementations don't support this + // directly, it is emulated by seeking backwards for every packet run. Also, + // packets between keyframes are demuxed forwards (you can't decode that + // stuff otherwise), which adds complexity on top of it. + bool back_demuxing; + + // For backward demuxing: back-step seek needs to be triggered. + bool need_back_seek; + bool tracks_switched; // thread needs to inform demuxer of this bool seeking; // there's a seek queued @@ -322,6 +343,27 @@ struct demux_stream { int64_t last_ret_pos; double last_ret_dts; + // Backwards demuxing. + // pos/dts of the previous keyframe packet returned; valid if + // back_range_started or back_restarting are set. + int64_t back_restart_pos; + double back_restart_dts; + bool back_restarting; // searching keyframe before restart pos + // Current PTS lower bound for back demuxing. + double back_seek_pos; + // pos/dts of the packet to resume demuxing from when another stream caused + // a seek backward to get more packets. reader_head will be reset to this + // packet as soon as it's encountered again. + int64_t back_resume_pos; + double back_resume_dts; + bool back_resuming; // resuming mode (above fields are valid/used) + // Set to true if the first packet (keyframe) of a range was returned. + bool back_range_started; + // Number of packets at start of range yet to return. -1 is used for BOF. + int back_range_min; + // Static packet preroll count. + int back_preroll; + // for closed captions (demuxer_feed_caption) struct sh_stream *cc; bool ignore_eof; // ignore stream in underrun detection @@ -352,6 +394,9 @@ static void demuxer_sort_chapters(demuxer_t *demuxer); static void *demux_thread(void *pctx); static void update_cache(struct demux_internal *in); static void add_packet_locked(struct sh_stream *stream, demux_packet_t *dp); +static struct demux_packet *advance_reader_head(struct demux_stream *ds); +static bool queue_seek(struct demux_internal *in, double seek_pts, int flags, + bool clear_back_state); #if 0 // very expensive check for redundant cached queue state @@ -693,7 +738,8 @@ static void ds_clear_reader_queue_state(struct demux_stream *ds) ds->need_wakeup = true; } -static void ds_clear_reader_state(struct demux_stream *ds) +static void ds_clear_reader_state(struct demux_stream *ds, + bool clear_back_state) { ds_clear_reader_queue_state(ds); @@ -704,6 +750,18 @@ static void ds_clear_reader_state(struct demux_stream *ds) ds->attached_picture_added = false; ds->last_ret_pos = -1; ds->last_ret_dts = MP_NOPTS_VALUE; + + if (clear_back_state) { + ds->back_restart_pos = -1; + ds->back_restart_dts = MP_NOPTS_VALUE; + ds->back_restarting = false; + ds->back_seek_pos = MP_NOPTS_VALUE; + ds->back_resume_pos = -1; + ds->back_resume_dts = MP_NOPTS_VALUE; + ds->back_resuming = false; + ds->back_range_started = false; + ds->back_range_min = 0; + } } // Call if the observed reader state on this stream somehow changes. The wakeup @@ -728,7 +786,7 @@ static void update_stream_selection_state(struct demux_internal *in, ds->eof = false; ds->refreshing = false; - ds_clear_reader_state(ds); + ds_clear_reader_state(ds, true); // We still have to go over the whole stream list to update ds->eager for // other streams too, because they depend on other stream's selections. @@ -859,6 +917,8 @@ static void demux_add_sh_stream_locked(struct demux_internal *in, }; talloc_set_destructor(sh->ds, ds_destroy); + struct demux_stream *ds = sh->ds; + if (!sh->codec->codec) sh->codec->codec = ""; @@ -887,6 +947,19 @@ static void demux_add_sh_stream_locked(struct demux_internal *in, mp_tags_replace(sh->ds->tags_init->sh, sh->tags); mp_packet_tags_setref(&sh->ds->tags_reader, sh->ds->tags_init); + switch (ds->type) { + case STREAM_AUDIO: + ds->back_preroll = in->opts->audio_back_preroll; + if (ds->back_preroll < 0) + ds->back_preroll = 1; // auto + break; + case STREAM_VIDEO: + ds->back_preroll = in->opts->video_back_preroll; + if (ds->back_preroll < 0) + ds->back_preroll = 0; // auto + break; + } + in->events |= DEMUX_EVENT_STREAMS; if (in->wakeup_cb) in->wakeup_cb(in->wakeup_cb_ctx); @@ -1159,7 +1232,241 @@ void demuxer_feed_caption(struct sh_stream *stream, demux_packet_t *dp) dp->dts = MP_ADD_PTS(dp->dts, -in->ts_offset); add_packet_locked(sh, dp); pthread_mutex_unlock(&in->lock); +} + +static void perform_backward_seek(struct demux_internal *in) +{ + double target = MP_NOPTS_VALUE; + + for (int n = 0; n < in->num_streams; n++) { + struct demux_stream *ds = in->streams[n]->ds; + + if (ds->reader_head && !ds->back_restarting && !ds->back_resuming && + ds->eager) + { + ds->back_resuming = true; + ds->back_resume_pos = ds->reader_head->pos; + ds->back_resume_dts = ds->reader_head->dts; + } + target = MP_PTS_MIN(target, ds->back_seek_pos); + } + + target = PTS_OR_DEF(target, in->d_thread->start_time); + + target -= in->opts->back_seek_size; + + MP_VERBOSE(in, "triggering backward seek to get more packets\n"); + queue_seek(in, target, SEEK_SATAN, false); + in->reading = true; +} + +// Search for a packet to resume demuxing from. +// from_cache: if true, this was called trying to go backwards in the cache; +// if false, this is from a hard seek before the back_restart_pos +// The implementation of this function is quite awkward, because the packet +// queue is a singly linked list without back links, while it needs to search +// backwards. +// This is the core of backward demuxing. +static void find_backward_restart_pos(struct demux_stream *ds, bool from_cache) +{ + struct demux_internal *in = ds->in; + + assert(ds->back_restarting); + + if (!ds->reader_head) + return; // no packets yet + + struct demux_packet *first = ds->reader_head; + struct demux_packet *last = ds->queue->tail; + assert(last); + + if ((ds->global_correct_dts && last->dts < ds->back_restart_dts) || + (ds->global_correct_pos && last->pos < ds->back_restart_pos)) + return; // restart pos not reached yet + + // The target we're searching for is apparently before the start of the queue. + if ((ds->global_correct_dts && first->dts > ds->back_restart_dts) || + (ds->global_correct_pos && first->pos > ds->back_restart_pos)) + { + // If this function was called for trying to backstep within the packet + // cache, the cache probably got pruned past the target (reader_head is + // being moved backwards, so nothing stops it from pruning packets + // before that). Just make the caller seek. + if (from_cache) { + in->need_back_seek = true; + return; + } + + // The demuxer probably seeked to the wrong position, or broke dts/pos + // determinism assumptions? + MP_ERR(in, "Demuxer did not seek correctly.\n"); + return; + } + + // Packet at back_restart_pos. (Note: we don't actually need it, only the + // packet immediately before it. But same effort.) + struct demux_packet *back_restart = NULL; + + for (struct demux_packet *cur = first; cur; cur = cur->next) { + if ((ds->global_correct_dts && cur->dts == ds->back_restart_dts) || + (ds->global_correct_pos && cur->pos == ds->back_restart_pos)) + { + back_restart = cur; + break; + } + } + + if (!back_restart) { + // The packet should have been in the searched range; maybe dts/pos + // determinism assumptions were broken. + MP_ERR(in, "Demuxer not cooperating.\n"); + return; + } + + if (!ds->reader_head->keyframe) + MP_WARN(in, "Queue not starting on keyframe.\n"); + + // Find where to restart demuxing. It's usually the last keyframe packet + // before restart_pos, but might be up to back_preroll packets earlier. + + struct demux_packet *last_keyframe = NULL; + struct demux_packet *last_preroll = NULL; + + // Keep this packet at back_preroll packets before last_keyframe. + struct demux_packet *pre_packet = ds->reader_head; + int pre_packet_offset = ds->back_preroll; + + // (Normally, we'd just iterate backwards, but no back links.) + for (struct demux_packet *cur = ds->reader_head; + cur != back_restart; + cur = cur->next) + { + if (cur->keyframe) { + last_keyframe = cur; + last_preroll = pre_packet; + } + + if (pre_packet_offset) { + pre_packet_offset--; + } else { + pre_packet = pre_packet->next; + } + } + + if (!last_keyframe) { + // Note: assume this holds true. You could think of various reasons why + // this might break. + if (ds->queue->is_bof) { + MP_VERBOSE(in, "BOF for stream %d\n", ds->index); + ds->back_restarting = false; + ds->back_range_started = false; + ds->back_range_min = -1; + ds->need_wakeup = true; + wakeup_ds(ds); + return; + } + goto resume_earlier; + } + + int got_preroll = 0; + for (struct demux_packet *cur = last_preroll; + cur != last_keyframe; + cur = cur->next) + got_preroll++; + + if (got_preroll < ds->back_preroll && !ds->queue->is_bof) + goto resume_earlier; + + // (Round preroll down to last_keyframe in the worst case.) + while (!last_preroll->keyframe) + last_preroll = last_preroll->next; + + // Skip reader_head from previous keyframe to current one. + // Or if preroll is involved, the first preroll packet. + while (ds->reader_head != last_preroll) { + if (!advance_reader_head(ds)) + assert(0); // last_preroll must be in list + } + + ds->back_restarting = false; + ds->back_range_started = false; + ds->back_range_min = got_preroll + 1; + ds->need_wakeup = true; + wakeup_ds(ds); + return; + +resume_earlier: + // If an earlier seek didn't land at an early enough position, we need to + // try to seek even earlier. Usually this will happen with large + // back_preroll values, because the initial back seek does not take them + // into account. We don't really know how much we need to seek, so add some + // random value to the previous seek value. Not ideal. + if (!from_cache && ds->back_seek_pos != MP_NOPTS_VALUE) + ds->back_seek_pos -= 1.0; + in->need_back_seek = true; +} + +// Process that one or multiple packets were added. +static void back_demux_see_packets(struct demux_stream *ds) +{ + struct demux_internal *in = ds->in; + + if (!ds->selected || !in->back_demuxing) + return; + + assert(!(ds->back_resuming && ds->back_restarting)); + + if (!ds->global_correct_dts && !ds->global_correct_pos) { + MP_ERR(in, "Can't demux backward due to demuxer problems.\n"); + return; + } + + while (ds->back_resuming && ds->reader_head) { + struct demux_packet *head = ds->reader_head; + if ((ds->global_correct_dts && head->dts == ds->back_resume_dts) || + (ds->global_correct_pos && head->pos == ds->back_resume_pos)) + { + ds->back_resuming = false; + ds->need_wakeup = true; + wakeup_ds(ds); // probably + break; + } + advance_reader_head(ds); + } + + if (ds->back_restarting) + find_backward_restart_pos(ds, false); +} + +// Resume demuxing from an earlier position for backward playback. May trigger +// a seek. +static void step_backwards(struct demux_stream *ds) +{ + struct demux_internal *in = ds->in; + + assert(in->back_demuxing); + + assert(!ds->back_restarting); + ds->back_restarting = true; + + // Move to start of queue. This is inefficient, because we need to iterate + // the entire fucking packet queue just to update the fw_* stats. But as + // long as we don't have demux_packet.prev links or a complete index, it's + // the thing to do. + // Note: if the buffer forward is much larger than the one backward, it + // would be worth looping until the previous reader_head and decrementing + // fw_packs/fw_bytes - you could skip the full recompute_buffers(). + ds->reader_head = ds->queue->head; + in->fw_bytes -= ds->fw_bytes; + recompute_buffers(ds); + in->fw_bytes += ds->fw_bytes; + + // Exclude weird special-cases (incomplete pruning? broken seeks?) + while (ds->reader_head && !ds->reader_head->keyframe) + advance_reader_head(ds); + + find_backward_restart_pos(ds, true); } // Add the keyframe to the end of the index. Not all packets are actually added. @@ -1353,6 +1660,9 @@ static void attempt_range_joining(struct demux_internal *in) MP_VERBOSE(in, "ranges joined!\n"); + for (int n = 0; n < in->num_streams; n++) + back_demux_see_packets(in->streams[n]->ds); + failed: clear_cached_range(in, next); free_empty_cached_ranges(in); @@ -1560,6 +1870,8 @@ static void add_packet_locked(struct sh_stream *stream, demux_packet_t *dp) } } + back_demux_see_packets(ds); + wakeup_ds(ds); } @@ -1578,13 +1890,19 @@ static bool read_packet(struct demux_internal *in) bool read_more = false, prefetch_more = false, refresh_more = false; for (int n = 0; n < in->num_streams; n++) { struct demux_stream *ds = in->streams[n]->ds; - read_more |= ds->eager && !ds->reader_head; + if (ds->eager) { + read_more |= !ds->reader_head; + if (in->back_demuxing) + read_more |= ds->back_restarting || ds->back_resuming; + } refresh_more |= ds->refreshing; if (ds->eager && ds->queue->last_ts != MP_NOPTS_VALUE && in->min_secs > 0 && ds->base_ts != MP_NOPTS_VALUE && - ds->queue->last_ts >= ds->base_ts) + ds->queue->last_ts >= ds->base_ts && + !in->back_demuxing) prefetch_more |= ds->queue->last_ts - ds->base_ts < in->min_secs; } + MP_TRACE(in, "bytes=%zd, read_more=%d prefetch_more=%d, refresh_more=%d\n", in->fw_bytes, read_more, prefetch_more, refresh_more); if (in->fw_bytes >= in->max_bytes) { @@ -1604,6 +1922,8 @@ static bool read_packet(struct demux_internal *in) ds->refreshing ? " (refreshing)" : ""); } } + if (in->back_demuxing) + MP_ERR(in, "Backward playback is likely stuck/broken now.\n"); } for (int n = 0; n < in->num_streams; n++) { struct demux_stream *ds = in->streams[n]->ds; @@ -1798,6 +2118,10 @@ static bool thread_work(struct demux_internal *in) execute_trackswitch(in); return true; } + if (in->need_back_seek) { + perform_backward_seek(in); + return true; + } if (in->seeking) { execute_seek(in); return true; @@ -1876,6 +2200,11 @@ static int dequeue_packet(struct demux_stream *ds, struct demux_packet **res) if (in->blocked) return 0; + if (ds->back_resuming || ds->back_restarting) { + assert(in->back_demuxing); + return 0; + } + if (ds->sh->attached_picture) { ds->eof = true; if (ds->attached_picture_added) @@ -1895,8 +2224,27 @@ static int dequeue_packet(struct demux_stream *ds, struct demux_packet **res) pthread_cond_signal(&in->wakeup); // possibly read more } + bool eof = !ds->reader_head && ds->eof; + + if (in->back_demuxing) { + // Subtitles not supported => EOF. + if (!ds->eager) + return -1; + + // Next keyframe (or EOF) was reached => step back. + if (ds->back_range_started && !ds->back_range_min && + ((ds->reader_head && ds->reader_head->keyframe) || eof)) + { + step_backwards(ds); + if (ds->back_restarting) + return 0; + } + + eof = ds->back_range_min < 0; + } + ds->need_wakeup = !ds->reader_head; - if (!ds->reader_head) { + if (!ds->reader_head || eof) { if (!ds->eager) { // Non-eager streams temporarily return EOF. If they returned 0, // the reader would have to wait for new packets, which does not @@ -1904,7 +2252,7 @@ static int dequeue_packet(struct demux_stream *ds, struct demux_packet **res) // streams. return -1; } - return ds->eof ? -1 : 0; + return eof ? -1 : 0; } struct demux_packet *pkt = advance_reader_head(ds); @@ -1916,6 +2264,23 @@ static int dequeue_packet(struct demux_stream *ds, struct demux_packet **res) abort(); pkt->next = NULL; + if (ds->in->back_demuxing) { + if (ds->back_range_min) + ds->back_range_min -= 1; + if (ds->back_range_min) { + pkt->back_preroll = true; + } else if (pkt->keyframe) { + // For next backward adjust action. + ds->back_restart_dts = pkt->dts; + ds->back_restart_pos = pkt->pos; + } + if (!ds->back_range_started) { + pkt->back_restart = true; + ds->back_range_started = true; + } + ds->back_seek_pos = MP_PTS_MIN(ds->back_seek_pos, pkt->pts); + } + double ts = PTS_OR_DEF(pkt->dts, pkt->pts); if (ts != MP_NOPTS_VALUE) ds->base_ts = ts; @@ -2544,13 +2909,15 @@ struct demuxer *demux_open_url(const char *url, } // called locked, from user thread only -static void clear_reader_state(struct demux_internal *in) +static void clear_reader_state(struct demux_internal *in, + bool clear_back_state) { for (int n = 0; n < in->num_streams; n++) - ds_clear_reader_state(in->streams[n]->ds); + ds_clear_reader_state(in->streams[n]->ds, clear_back_state); in->warned_queue_overflow = false; in->d_user->filepos = -1; // implicitly synchronized in->blocked = false; + in->need_back_seek = false; assert(in->fw_bytes == 0); } @@ -2561,7 +2928,7 @@ void demux_flush(demuxer_t *demuxer) assert(demuxer == in->d_user); pthread_mutex_lock(&demuxer->in->lock); - clear_reader_state(in); + clear_reader_state(in, true); for (int n = 0; n < in->num_ranges; n++) clear_cached_range(in, in->ranges[n]); free_empty_cached_ranges(in); @@ -2802,12 +3169,20 @@ int demux_seek(demuxer_t *demuxer, double seek_pts, int flags) { struct demux_internal *in = demuxer->in; assert(demuxer == in->d_user); - int res = 0; pthread_mutex_lock(&in->lock); + int res = queue_seek(in, seek_pts, flags, true); + pthread_cond_signal(&in->wakeup); + pthread_mutex_unlock(&in->lock); + return res; +} + +static bool queue_seek(struct demux_internal *in, double seek_pts, int flags, + bool clear_back_state) +{ if (seek_pts == MP_NOPTS_VALUE) - goto done; + return false; MP_VERBOSE(in, "queuing seek to %f%s\n", seek_pts, in->seeking ? " (cascade)" : ""); @@ -2818,26 +3193,35 @@ int demux_seek(demuxer_t *demuxer, double seek_pts, int flags) bool require_cache = flags & SEEK_CACHED; flags &= ~(unsigned)SEEK_CACHED; + bool set_backwards = flags & SEEK_SATAN; + flags &= ~(unsigned)SEEK_SATAN; + + // For HR seeks, the correct seek rounding direction is forward instead of + // backward. + if (set_backwards && (flags & SEEK_HR)) + flags |= SEEK_FORWARD; + struct demux_cached_range *cache_target = find_cache_seek_target(in, seek_pts, flags); if (!cache_target) { if (require_cache) { - MP_VERBOSE(demuxer, "Cached seek not possible.\n"); - goto done; + MP_VERBOSE(in, "Cached seek not possible.\n"); + return false; } - if (!demuxer->seekable) { - MP_WARN(demuxer, "Cannot seek in this file.\n"); - goto done; + if (!in->d_thread->seekable) { + MP_WARN(in, "Cannot seek in this file.\n"); + return false; } } - clear_reader_state(in); + clear_reader_state(in, clear_back_state); in->eof = false; in->last_eof = false; in->idle = true; in->reading = false; + in->back_demuxing = set_backwards; if (cache_target) { execute_cache_seek(in, cache_target, seek_pts, flags); @@ -2855,12 +3239,7 @@ int demux_seek(demuxer_t *demuxer, double seek_pts, int flags) if (!in->threading && in->seeking) execute_seek(in); - res = 1; - -done: - pthread_cond_signal(&in->wakeup); - pthread_mutex_unlock(&in->lock); - return res; + return true; } struct sh_stream *demuxer_stream_by_demuxer_id(struct demuxer *d, |