summaryrefslogtreecommitdiffstats
path: root/demux/demux.c
diff options
context:
space:
mode:
Diffstat (limited to 'demux/demux.c')
-rw-r--r--demux/demux.c425
1 files changed, 402 insertions, 23 deletions
diff --git a/demux/demux.c b/demux/demux.c
index ae6d3a96f6..92121c9b7b 100644
--- a/demux/demux.c
+++ b/demux/demux.c
@@ -88,6 +88,9 @@ struct demux_opts {
int seekable_cache;
int create_ccs;
char *record_file;
+ int video_back_preroll;
+ int audio_back_preroll;
+ double back_seek_size;
};
#define OPT_BASE_STRUCT struct demux_opts
@@ -110,6 +113,12 @@ const struct m_sub_options demux_conf = {
({"auto", -1}, {"no", 0}, {"yes", 1})),
OPT_FLAG("sub-create-cc-track", create_ccs, 0),
OPT_STRING("stream-record", record_file, 0),
+ OPT_CHOICE_OR_INT("video-backward-overlap", video_back_preroll, 0, 0,
+ 1024, ({"auto", -1})),
+ OPT_CHOICE_OR_INT("audio-backward-overlap", audio_back_preroll, 0, 0,
+ 1024, ({"auto", -1})),
+ OPT_DOUBLE("demuxer-backward-playback-step", back_seek_size, M_OPT_MIN,
+ .min = 0),
{0}
},
.size = sizeof(struct demux_opts),
@@ -121,6 +130,9 @@ const struct m_sub_options demux_conf = {
.min_secs_cache = 10.0 * 60 * 60,
.seekable_cache = -1,
.access_references = 1,
+ .video_back_preroll = -1,
+ .audio_back_preroll = -1,
+ .back_seek_size = 60,
},
};
@@ -183,6 +195,15 @@ struct demux_internal {
// file (or if the demuxer was just opened).
bool after_seek_to_start;
+ // Demuxing backwards. Since demuxer implementations don't support this
+ // directly, it is emulated by seeking backwards for every packet run. Also,
+ // packets between keyframes are demuxed forwards (you can't decode that
+ // stuff otherwise), which adds complexity on top of it.
+ bool back_demuxing;
+
+ // For backward demuxing: back-step seek needs to be triggered.
+ bool need_back_seek;
+
bool tracks_switched; // thread needs to inform demuxer of this
bool seeking; // there's a seek queued
@@ -322,6 +343,27 @@ struct demux_stream {
int64_t last_ret_pos;
double last_ret_dts;
+ // Backwards demuxing.
+ // pos/dts of the previous keyframe packet returned; valid if
+ // back_range_started or back_restarting are set.
+ int64_t back_restart_pos;
+ double back_restart_dts;
+ bool back_restarting; // searching keyframe before restart pos
+ // Current PTS lower bound for back demuxing.
+ double back_seek_pos;
+ // pos/dts of the packet to resume demuxing from when another stream caused
+ // a seek backward to get more packets. reader_head will be reset to this
+ // packet as soon as it's encountered again.
+ int64_t back_resume_pos;
+ double back_resume_dts;
+ bool back_resuming; // resuming mode (above fields are valid/used)
+ // Set to true if the first packet (keyframe) of a range was returned.
+ bool back_range_started;
+ // Number of packets at start of range yet to return. -1 is used for BOF.
+ int back_range_min;
+ // Static packet preroll count.
+ int back_preroll;
+
// for closed captions (demuxer_feed_caption)
struct sh_stream *cc;
bool ignore_eof; // ignore stream in underrun detection
@@ -352,6 +394,9 @@ static void demuxer_sort_chapters(demuxer_t *demuxer);
static void *demux_thread(void *pctx);
static void update_cache(struct demux_internal *in);
static void add_packet_locked(struct sh_stream *stream, demux_packet_t *dp);
+static struct demux_packet *advance_reader_head(struct demux_stream *ds);
+static bool queue_seek(struct demux_internal *in, double seek_pts, int flags,
+ bool clear_back_state);
#if 0
// very expensive check for redundant cached queue state
@@ -693,7 +738,8 @@ static void ds_clear_reader_queue_state(struct demux_stream *ds)
ds->need_wakeup = true;
}
-static void ds_clear_reader_state(struct demux_stream *ds)
+static void ds_clear_reader_state(struct demux_stream *ds,
+ bool clear_back_state)
{
ds_clear_reader_queue_state(ds);
@@ -704,6 +750,18 @@ static void ds_clear_reader_state(struct demux_stream *ds)
ds->attached_picture_added = false;
ds->last_ret_pos = -1;
ds->last_ret_dts = MP_NOPTS_VALUE;
+
+ if (clear_back_state) {
+ ds->back_restart_pos = -1;
+ ds->back_restart_dts = MP_NOPTS_VALUE;
+ ds->back_restarting = false;
+ ds->back_seek_pos = MP_NOPTS_VALUE;
+ ds->back_resume_pos = -1;
+ ds->back_resume_dts = MP_NOPTS_VALUE;
+ ds->back_resuming = false;
+ ds->back_range_started = false;
+ ds->back_range_min = 0;
+ }
}
// Call if the observed reader state on this stream somehow changes. The wakeup
@@ -728,7 +786,7 @@ static void update_stream_selection_state(struct demux_internal *in,
ds->eof = false;
ds->refreshing = false;
- ds_clear_reader_state(ds);
+ ds_clear_reader_state(ds, true);
// We still have to go over the whole stream list to update ds->eager for
// other streams too, because they depend on other stream's selections.
@@ -859,6 +917,8 @@ static void demux_add_sh_stream_locked(struct demux_internal *in,
};
talloc_set_destructor(sh->ds, ds_destroy);
+ struct demux_stream *ds = sh->ds;
+
if (!sh->codec->codec)
sh->codec->codec = "";
@@ -887,6 +947,19 @@ static void demux_add_sh_stream_locked(struct demux_internal *in,
mp_tags_replace(sh->ds->tags_init->sh, sh->tags);
mp_packet_tags_setref(&sh->ds->tags_reader, sh->ds->tags_init);
+ switch (ds->type) {
+ case STREAM_AUDIO:
+ ds->back_preroll = in->opts->audio_back_preroll;
+ if (ds->back_preroll < 0)
+ ds->back_preroll = 1; // auto
+ break;
+ case STREAM_VIDEO:
+ ds->back_preroll = in->opts->video_back_preroll;
+ if (ds->back_preroll < 0)
+ ds->back_preroll = 0; // auto
+ break;
+ }
+
in->events |= DEMUX_EVENT_STREAMS;
if (in->wakeup_cb)
in->wakeup_cb(in->wakeup_cb_ctx);
@@ -1159,7 +1232,241 @@ void demuxer_feed_caption(struct sh_stream *stream, demux_packet_t *dp)
dp->dts = MP_ADD_PTS(dp->dts, -in->ts_offset);
add_packet_locked(sh, dp);
pthread_mutex_unlock(&in->lock);
+}
+
+static void perform_backward_seek(struct demux_internal *in)
+{
+ double target = MP_NOPTS_VALUE;
+
+ for (int n = 0; n < in->num_streams; n++) {
+ struct demux_stream *ds = in->streams[n]->ds;
+
+ if (ds->reader_head && !ds->back_restarting && !ds->back_resuming &&
+ ds->eager)
+ {
+ ds->back_resuming = true;
+ ds->back_resume_pos = ds->reader_head->pos;
+ ds->back_resume_dts = ds->reader_head->dts;
+ }
+ target = MP_PTS_MIN(target, ds->back_seek_pos);
+ }
+
+ target = PTS_OR_DEF(target, in->d_thread->start_time);
+
+ target -= in->opts->back_seek_size;
+
+ MP_VERBOSE(in, "triggering backward seek to get more packets\n");
+ queue_seek(in, target, SEEK_SATAN, false);
+ in->reading = true;
+}
+
+// Search for a packet to resume demuxing from.
+// from_cache: if true, this was called trying to go backwards in the cache;
+// if false, this is from a hard seek before the back_restart_pos
+// The implementation of this function is quite awkward, because the packet
+// queue is a singly linked list without back links, while it needs to search
+// backwards.
+// This is the core of backward demuxing.
+static void find_backward_restart_pos(struct demux_stream *ds, bool from_cache)
+{
+ struct demux_internal *in = ds->in;
+
+ assert(ds->back_restarting);
+
+ if (!ds->reader_head)
+ return; // no packets yet
+
+ struct demux_packet *first = ds->reader_head;
+ struct demux_packet *last = ds->queue->tail;
+ assert(last);
+
+ if ((ds->global_correct_dts && last->dts < ds->back_restart_dts) ||
+ (ds->global_correct_pos && last->pos < ds->back_restart_pos))
+ return; // restart pos not reached yet
+
+ // The target we're searching for is apparently before the start of the queue.
+ if ((ds->global_correct_dts && first->dts > ds->back_restart_dts) ||
+ (ds->global_correct_pos && first->pos > ds->back_restart_pos))
+ {
+ // If this function was called for trying to backstep within the packet
+ // cache, the cache probably got pruned past the target (reader_head is
+ // being moved backwards, so nothing stops it from pruning packets
+ // before that). Just make the caller seek.
+ if (from_cache) {
+ in->need_back_seek = true;
+ return;
+ }
+
+ // The demuxer probably seeked to the wrong position, or broke dts/pos
+ // determinism assumptions?
+ MP_ERR(in, "Demuxer did not seek correctly.\n");
+ return;
+ }
+
+ // Packet at back_restart_pos. (Note: we don't actually need it, only the
+ // packet immediately before it. But same effort.)
+ struct demux_packet *back_restart = NULL;
+
+ for (struct demux_packet *cur = first; cur; cur = cur->next) {
+ if ((ds->global_correct_dts && cur->dts == ds->back_restart_dts) ||
+ (ds->global_correct_pos && cur->pos == ds->back_restart_pos))
+ {
+ back_restart = cur;
+ break;
+ }
+ }
+
+ if (!back_restart) {
+ // The packet should have been in the searched range; maybe dts/pos
+ // determinism assumptions were broken.
+ MP_ERR(in, "Demuxer not cooperating.\n");
+ return;
+ }
+
+ if (!ds->reader_head->keyframe)
+ MP_WARN(in, "Queue not starting on keyframe.\n");
+
+ // Find where to restart demuxing. It's usually the last keyframe packet
+ // before restart_pos, but might be up to back_preroll packets earlier.
+
+ struct demux_packet *last_keyframe = NULL;
+ struct demux_packet *last_preroll = NULL;
+
+ // Keep this packet at back_preroll packets before last_keyframe.
+ struct demux_packet *pre_packet = ds->reader_head;
+ int pre_packet_offset = ds->back_preroll;
+
+ // (Normally, we'd just iterate backwards, but no back links.)
+ for (struct demux_packet *cur = ds->reader_head;
+ cur != back_restart;
+ cur = cur->next)
+ {
+ if (cur->keyframe) {
+ last_keyframe = cur;
+ last_preroll = pre_packet;
+ }
+
+ if (pre_packet_offset) {
+ pre_packet_offset--;
+ } else {
+ pre_packet = pre_packet->next;
+ }
+ }
+
+ if (!last_keyframe) {
+ // Note: assume this holds true. You could think of various reasons why
+ // this might break.
+ if (ds->queue->is_bof) {
+ MP_VERBOSE(in, "BOF for stream %d\n", ds->index);
+ ds->back_restarting = false;
+ ds->back_range_started = false;
+ ds->back_range_min = -1;
+ ds->need_wakeup = true;
+ wakeup_ds(ds);
+ return;
+ }
+ goto resume_earlier;
+ }
+
+ int got_preroll = 0;
+ for (struct demux_packet *cur = last_preroll;
+ cur != last_keyframe;
+ cur = cur->next)
+ got_preroll++;
+
+ if (got_preroll < ds->back_preroll && !ds->queue->is_bof)
+ goto resume_earlier;
+
+ // (Round preroll down to last_keyframe in the worst case.)
+ while (!last_preroll->keyframe)
+ last_preroll = last_preroll->next;
+
+ // Skip reader_head from previous keyframe to current one.
+ // Or if preroll is involved, the first preroll packet.
+ while (ds->reader_head != last_preroll) {
+ if (!advance_reader_head(ds))
+ assert(0); // last_preroll must be in list
+ }
+
+ ds->back_restarting = false;
+ ds->back_range_started = false;
+ ds->back_range_min = got_preroll + 1;
+ ds->need_wakeup = true;
+ wakeup_ds(ds);
+ return;
+
+resume_earlier:
+ // If an earlier seek didn't land at an early enough position, we need to
+ // try to seek even earlier. Usually this will happen with large
+ // back_preroll values, because the initial back seek does not take them
+ // into account. We don't really know how much we need to seek, so add some
+ // random value to the previous seek value. Not ideal.
+ if (!from_cache && ds->back_seek_pos != MP_NOPTS_VALUE)
+ ds->back_seek_pos -= 1.0;
+ in->need_back_seek = true;
+}
+
+// Process that one or multiple packets were added.
+static void back_demux_see_packets(struct demux_stream *ds)
+{
+ struct demux_internal *in = ds->in;
+
+ if (!ds->selected || !in->back_demuxing)
+ return;
+
+ assert(!(ds->back_resuming && ds->back_restarting));
+
+ if (!ds->global_correct_dts && !ds->global_correct_pos) {
+ MP_ERR(in, "Can't demux backward due to demuxer problems.\n");
+ return;
+ }
+
+ while (ds->back_resuming && ds->reader_head) {
+ struct demux_packet *head = ds->reader_head;
+ if ((ds->global_correct_dts && head->dts == ds->back_resume_dts) ||
+ (ds->global_correct_pos && head->pos == ds->back_resume_pos))
+ {
+ ds->back_resuming = false;
+ ds->need_wakeup = true;
+ wakeup_ds(ds); // probably
+ break;
+ }
+ advance_reader_head(ds);
+ }
+
+ if (ds->back_restarting)
+ find_backward_restart_pos(ds, false);
+}
+
+// Resume demuxing from an earlier position for backward playback. May trigger
+// a seek.
+static void step_backwards(struct demux_stream *ds)
+{
+ struct demux_internal *in = ds->in;
+
+ assert(in->back_demuxing);
+
+ assert(!ds->back_restarting);
+ ds->back_restarting = true;
+
+ // Move to start of queue. This is inefficient, because we need to iterate
+ // the entire fucking packet queue just to update the fw_* stats. But as
+ // long as we don't have demux_packet.prev links or a complete index, it's
+ // the thing to do.
+ // Note: if the buffer forward is much larger than the one backward, it
+ // would be worth looping until the previous reader_head and decrementing
+ // fw_packs/fw_bytes - you could skip the full recompute_buffers().
+ ds->reader_head = ds->queue->head;
+ in->fw_bytes -= ds->fw_bytes;
+ recompute_buffers(ds);
+ in->fw_bytes += ds->fw_bytes;
+
+ // Exclude weird special-cases (incomplete pruning? broken seeks?)
+ while (ds->reader_head && !ds->reader_head->keyframe)
+ advance_reader_head(ds);
+
+ find_backward_restart_pos(ds, true);
}
// Add the keyframe to the end of the index. Not all packets are actually added.
@@ -1353,6 +1660,9 @@ static void attempt_range_joining(struct demux_internal *in)
MP_VERBOSE(in, "ranges joined!\n");
+ for (int n = 0; n < in->num_streams; n++)
+ back_demux_see_packets(in->streams[n]->ds);
+
failed:
clear_cached_range(in, next);
free_empty_cached_ranges(in);
@@ -1560,6 +1870,8 @@ static void add_packet_locked(struct sh_stream *stream, demux_packet_t *dp)
}
}
+ back_demux_see_packets(ds);
+
wakeup_ds(ds);
}
@@ -1578,13 +1890,19 @@ static bool read_packet(struct demux_internal *in)
bool read_more = false, prefetch_more = false, refresh_more = false;
for (int n = 0; n < in->num_streams; n++) {
struct demux_stream *ds = in->streams[n]->ds;
- read_more |= ds->eager && !ds->reader_head;
+ if (ds->eager) {
+ read_more |= !ds->reader_head;
+ if (in->back_demuxing)
+ read_more |= ds->back_restarting || ds->back_resuming;
+ }
refresh_more |= ds->refreshing;
if (ds->eager && ds->queue->last_ts != MP_NOPTS_VALUE &&
in->min_secs > 0 && ds->base_ts != MP_NOPTS_VALUE &&
- ds->queue->last_ts >= ds->base_ts)
+ ds->queue->last_ts >= ds->base_ts &&
+ !in->back_demuxing)
prefetch_more |= ds->queue->last_ts - ds->base_ts < in->min_secs;
}
+
MP_TRACE(in, "bytes=%zd, read_more=%d prefetch_more=%d, refresh_more=%d\n",
in->fw_bytes, read_more, prefetch_more, refresh_more);
if (in->fw_bytes >= in->max_bytes) {
@@ -1604,6 +1922,8 @@ static bool read_packet(struct demux_internal *in)
ds->refreshing ? " (refreshing)" : "");
}
}
+ if (in->back_demuxing)
+ MP_ERR(in, "Backward playback is likely stuck/broken now.\n");
}
for (int n = 0; n < in->num_streams; n++) {
struct demux_stream *ds = in->streams[n]->ds;
@@ -1798,6 +2118,10 @@ static bool thread_work(struct demux_internal *in)
execute_trackswitch(in);
return true;
}
+ if (in->need_back_seek) {
+ perform_backward_seek(in);
+ return true;
+ }
if (in->seeking) {
execute_seek(in);
return true;
@@ -1876,6 +2200,11 @@ static int dequeue_packet(struct demux_stream *ds, struct demux_packet **res)
if (in->blocked)
return 0;
+ if (ds->back_resuming || ds->back_restarting) {
+ assert(in->back_demuxing);
+ return 0;
+ }
+
if (ds->sh->attached_picture) {
ds->eof = true;
if (ds->attached_picture_added)
@@ -1895,8 +2224,27 @@ static int dequeue_packet(struct demux_stream *ds, struct demux_packet **res)
pthread_cond_signal(&in->wakeup); // possibly read more
}
+ bool eof = !ds->reader_head && ds->eof;
+
+ if (in->back_demuxing) {
+ // Subtitles not supported => EOF.
+ if (!ds->eager)
+ return -1;
+
+ // Next keyframe (or EOF) was reached => step back.
+ if (ds->back_range_started && !ds->back_range_min &&
+ ((ds->reader_head && ds->reader_head->keyframe) || eof))
+ {
+ step_backwards(ds);
+ if (ds->back_restarting)
+ return 0;
+ }
+
+ eof = ds->back_range_min < 0;
+ }
+
ds->need_wakeup = !ds->reader_head;
- if (!ds->reader_head) {
+ if (!ds->reader_head || eof) {
if (!ds->eager) {
// Non-eager streams temporarily return EOF. If they returned 0,
// the reader would have to wait for new packets, which does not
@@ -1904,7 +2252,7 @@ static int dequeue_packet(struct demux_stream *ds, struct demux_packet **res)
// streams.
return -1;
}
- return ds->eof ? -1 : 0;
+ return eof ? -1 : 0;
}
struct demux_packet *pkt = advance_reader_head(ds);
@@ -1916,6 +2264,23 @@ static int dequeue_packet(struct demux_stream *ds, struct demux_packet **res)
abort();
pkt->next = NULL;
+ if (ds->in->back_demuxing) {
+ if (ds->back_range_min)
+ ds->back_range_min -= 1;
+ if (ds->back_range_min) {
+ pkt->back_preroll = true;
+ } else if (pkt->keyframe) {
+ // For next backward adjust action.
+ ds->back_restart_dts = pkt->dts;
+ ds->back_restart_pos = pkt->pos;
+ }
+ if (!ds->back_range_started) {
+ pkt->back_restart = true;
+ ds->back_range_started = true;
+ }
+ ds->back_seek_pos = MP_PTS_MIN(ds->back_seek_pos, pkt->pts);
+ }
+
double ts = PTS_OR_DEF(pkt->dts, pkt->pts);
if (ts != MP_NOPTS_VALUE)
ds->base_ts = ts;
@@ -2544,13 +2909,15 @@ struct demuxer *demux_open_url(const char *url,
}
// called locked, from user thread only
-static void clear_reader_state(struct demux_internal *in)
+static void clear_reader_state(struct demux_internal *in,
+ bool clear_back_state)
{
for (int n = 0; n < in->num_streams; n++)
- ds_clear_reader_state(in->streams[n]->ds);
+ ds_clear_reader_state(in->streams[n]->ds, clear_back_state);
in->warned_queue_overflow = false;
in->d_user->filepos = -1; // implicitly synchronized
in->blocked = false;
+ in->need_back_seek = false;
assert(in->fw_bytes == 0);
}
@@ -2561,7 +2928,7 @@ void demux_flush(demuxer_t *demuxer)
assert(demuxer == in->d_user);
pthread_mutex_lock(&demuxer->in->lock);
- clear_reader_state(in);
+ clear_reader_state(in, true);
for (int n = 0; n < in->num_ranges; n++)
clear_cached_range(in, in->ranges[n]);
free_empty_cached_ranges(in);
@@ -2802,12 +3169,20 @@ int demux_seek(demuxer_t *demuxer, double seek_pts, int flags)
{
struct demux_internal *in = demuxer->in;
assert(demuxer == in->d_user);
- int res = 0;
pthread_mutex_lock(&in->lock);
+ int res = queue_seek(in, seek_pts, flags, true);
+ pthread_cond_signal(&in->wakeup);
+ pthread_mutex_unlock(&in->lock);
+ return res;
+}
+
+static bool queue_seek(struct demux_internal *in, double seek_pts, int flags,
+ bool clear_back_state)
+{
if (seek_pts == MP_NOPTS_VALUE)
- goto done;
+ return false;
MP_VERBOSE(in, "queuing seek to %f%s\n", seek_pts,
in->seeking ? " (cascade)" : "");
@@ -2818,26 +3193,35 @@ int demux_seek(demuxer_t *demuxer, double seek_pts, int flags)
bool require_cache = flags & SEEK_CACHED;
flags &= ~(unsigned)SEEK_CACHED;
+ bool set_backwards = flags & SEEK_SATAN;
+ flags &= ~(unsigned)SEEK_SATAN;
+
+ // For HR seeks, the correct seek rounding direction is forward instead of
+ // backward.
+ if (set_backwards && (flags & SEEK_HR))
+ flags |= SEEK_FORWARD;
+
struct demux_cached_range *cache_target =
find_cache_seek_target(in, seek_pts, flags);
if (!cache_target) {
if (require_cache) {
- MP_VERBOSE(demuxer, "Cached seek not possible.\n");
- goto done;
+ MP_VERBOSE(in, "Cached seek not possible.\n");
+ return false;
}
- if (!demuxer->seekable) {
- MP_WARN(demuxer, "Cannot seek in this file.\n");
- goto done;
+ if (!in->d_thread->seekable) {
+ MP_WARN(in, "Cannot seek in this file.\n");
+ return false;
}
}
- clear_reader_state(in);
+ clear_reader_state(in, clear_back_state);
in->eof = false;
in->last_eof = false;
in->idle = true;
in->reading = false;
+ in->back_demuxing = set_backwards;
if (cache_target) {
execute_cache_seek(in, cache_target, seek_pts, flags);
@@ -2855,12 +3239,7 @@ int demux_seek(demuxer_t *demuxer, double seek_pts, int flags)
if (!in->threading && in->seeking)
execute_seek(in);
- res = 1;
-
-done:
- pthread_cond_signal(&in->wakeup);
- pthread_mutex_unlock(&in->lock);
- return res;
+ return true;
}
struct sh_stream *demuxer_stream_by_demuxer_id(struct demuxer *d,