summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorwm4 <wm4@nowhere>2017-11-09 09:53:46 +0100
committerwm4 <wm4@nowhere>2017-11-09 10:23:57 +0100
commit935e406d6398062ea7978d546f4da6ca8e6a216f (patch)
tree0a817601ab9dec43699c3e63e2dd56d0d992575f
parentbd4ec8e4e1e9a18a8c667a0265fe2bb0e324f8bc (diff)
downloadmpv-935e406d6398062ea7978d546f4da6ca8e6a216f.tar.bz2
mpv-935e406d6398062ea7978d546f4da6ca8e6a216f.tar.xz
demux: support multiple seekable cached ranges
Until now, the demuxer cache was limited to a single range. Extend this to multiple range. Should be useful for slow network streams. This commit changes a lot in the internal demuxer cache logic, so there's a lot of room for bugs and regressions. The logic without demuxer cache is mostly untouched, but also involved with the code changes. Or in other words, this commit probably fucks up shit. There are two things which makes multiple cached ranges rather hard: 1. the need to resume the demuxer at the end of a cached range when seeking to it 2. joining two adjacent ranges when the lowe range "grows" into it (and resuming the demuxer at the end of the new joined range) "Resuming" the demuxer means that we perform a low level seek to the end of a cached range, and properly append new packets to it, without adding packets multiple times or creating holes due to missing packets. Since audio and video never line up exactly, there is no clean "cut" possible, at which you could resume the demuxer cleanly (for 1.) or which you could use to detect that two ranges are perfectly adjacent (for 2.). The way how the demuxer interleaves multiple streams is also unpredictable. Typically you will have to expect that it randomly allows one of the streams to be ahead by a bit, and so on. To deal with this, we have heuristics in place to detect when one packet equals or is "behind" a packet that was demuxed earlier. We reuse the refresh seek logic (used to "reread" packets into the demuxer cache when enabling a track), which checks for certain packet invariants. Currently, it observes whether either the raw packet position, or the packet DTS is strictly monotonically increasing. If none of them are true, we discard old ranges when creating a new one. This heavily depends on the file format and the demuxer behavior. For example, not all file formats have DTS, and the packet position can be unset due to libavformat not always setting it (e.g. when parsers are used). At the same time, we must deal with all the complicated state used to track prefetching and seek ranges. In some complicated corner cases, we just give up and discard other seek ranges, even if the previously mentioned packet invariants are fulfilled. To handle joining, we're being particularly dumb, and require a small overlap to be confident that two ranges join perfectly. (This could be done incrementally with as little overlap as 1 packet, but corner cases would eat us: each stream needs to be joined separately, and the cache pruning logic could remove overlapping packets for other streams again.) Another restriction is that switching the cached range will always trigger an asynchronous low level seek to resume demuxing at the new range. Some users might find this annoying. Dealing with interleaved subtitles is not fully handled yet. It will clamp the seekable range to where subtitle packets are.
-rw-r--r--DOCS/man/input.rst6
-rw-r--r--DOCS/man/options.rst7
-rw-r--r--demux/demux.c825
-rw-r--r--demux/demux.h2
4 files changed, 610 insertions, 230 deletions
diff --git a/DOCS/man/input.rst b/DOCS/man/input.rst
index 987e369a15..2b3e2a3795 100644
--- a/DOCS/man/input.rst
+++ b/DOCS/man/input.rst
@@ -1264,9 +1264,9 @@ Property list
Each entry in ``seekable-ranges`` represents a region in the demuxer cache
that can be seeked to. If there are multiple demuxers active, this only
returns information about the "main" demuxer, but might be changed in
- future to return unified information about all demuxers. There is currently
- only at most 1 range. Should the player implement caching for multiple
- ranges, the order of the ranges will be unspecified and arbitrary.
+ future to return unified information about all demuxers. The ranges are in
+ arbitrary order. Often, ranges will overlap for a bit, before being joined.
+ In broken corner cases, ranges may overlap all over the place.
The end of a seek range is usually smaller than the value returned by the
``demuxer-cache-time`` property, because that property returns the guessed
diff --git a/DOCS/man/options.rst b/DOCS/man/options.rst
index 7dd1dc6af9..490b815415 100644
--- a/DOCS/man/options.rst
+++ b/DOCS/man/options.rst
@@ -2879,11 +2879,12 @@ Demuxer
enabled, short seek offsets will not trigger a low level demuxer seek
(which means for example that slow network round trips or FFmpeg seek bugs
can be avoided). If a seek cannot happen within the cached range, a low
- level seek will be triggered. Seeking outside of the cache will always
- discard the full cache.
+ level seek will be triggered. Seeking outside of the cache will start a new
+ cached range, but can discard the old cache range if the demuxer exhibits
+ certain unsupported behavior.
Keep in mind that some events can flush the cache or force a low level
- seek anyway, such as switching tracks, or attmepting to seek before the
+ seek anyway, such as switching tracks, or attempting to seek before the
start or after the end of the file. This option is experimental - thus
disabled, and bugs are to be expected.
diff --git a/demux/demux.c b/demux/demux.c
index 85acce28db..9558605bad 100644
--- a/demux/demux.c
+++ b/demux/demux.c
@@ -157,6 +157,10 @@ struct demux_internal {
int max_bytes_bw;
int seekable_cache;
+ // At least one decoder actually requested data since init or the last seek.
+ // Do this to allow the decoder thread to select streams before starting.
+ bool reading;
+
// Set if we know that we are at the start of the file. This is used to
// avoid a redundant initial seek after enabling streams. We could just
// allow it, but to avoid buggy seeking affecting normal playback, we don't.
@@ -183,7 +187,7 @@ struct demux_internal {
size_t fw_bytes; // sum of forward packet data in current_range
// Range from which decoder is reading, and to which demuxer is appending.
- // This is never NULL.
+ // This is never NULL. This is always ranges[num_ranges - 1].
struct demux_cached_range *current_range;
// Cached state.
@@ -202,18 +206,34 @@ struct demux_cached_range {
struct demux_queue **streams;
int num_streams;
- // Computed from the stream queue's values.
+ // Computed from the stream queue's values. These fields (unlike as with
+ // demux_queue) are always either NOPTS, or fully valid.
double seek_start, seek_end;
};
// A continuous list of cached packets for a single stream/range. There is one
-// for each stream and range.
+// for each stream and range. Also contains some state for use during demuxing
+// (keeping it across seeks makes it easier to resume demuxing).
struct demux_queue {
struct demux_stream *ds;
+ struct demux_cached_range *range;
struct demux_packet *head;
struct demux_packet *tail;
+ struct demux_packet *next_prune_target; // cached value for faster pruning
+
+ bool correct_dts; // packet DTS is strictly monotonically increasing
+ bool correct_pos; // packet pos is strictly monotonically increasing
+ int64_t last_pos; // for determining correct_pos
+ double last_dts; // for determining correct_dts
+ double last_ts; // timestamp of the last packet added to queue
+
+ // for incrementally determining seek PTS range
+ double keyframe_pts, keyframe_end_pts;
+ struct demux_packet *keyframe_latest;
+
+ // incrementally maintained seek range, possibly invalid
double seek_start, seek_end;
};
@@ -226,24 +246,14 @@ struct demux_stream {
// demuxer state
bool selected; // user wants packets from this stream
- bool active; // try to keep at least 1 packet queued
+ bool eager; // try to keep at least 1 packet queued
// if false, this stream is disabled, or passively
// read (like subtitles)
- bool eof; // end of demuxed stream? (true if all buffers empty)
bool need_refresh; // enabled mid-stream
bool refreshing;
- bool correct_dts; // packet DTS is strictly monotonically increasing
- bool correct_pos; // packet pos is strictly monotonically increasing
- int64_t last_pos; // for determining correct_pos
- double last_dts; // for determining correct_dts
+
bool global_correct_dts;// all observed so far
bool global_correct_pos;
- double last_ts; // timestamp of the last packet added to queue
- struct demux_packet *next_prune_target; // cached value for faster pruning
- // for incrementally determining seek PTS range
- bool keyframe_seen;
- double keyframe_pts, keyframe_end_pts;
- struct demux_packet *keyframe_latest;
// current queue - used both for reading and demuxing (this is never NULL)
struct demux_queue *queue;
@@ -256,6 +266,7 @@ struct demux_stream {
double bitrate;
size_t fw_packs; // number of packets in buffer (forward)
size_t fw_bytes; // total bytes of packets in buffer (forward)
+ bool eof; // end of demuxed stream? (true if no more packets)
struct demux_packet *reader_head; // points at current decoder position
bool skip_to_keyframe;
bool attached_picture_added;
@@ -276,8 +287,6 @@ struct demux_stream {
static void demuxer_sort_chapters(demuxer_t *demuxer);
static void *demux_thread(void *pctx);
static void update_cache(struct demux_internal *in);
-static int cached_demux_control(struct demux_internal *in, int cmd, void *arg);
-static void clear_demux_state(struct demux_internal *in);
#if 0
// very expensive check for redundant cached queue state
@@ -286,6 +295,9 @@ static void check_queue_consistency(struct demux_internal *in)
size_t total_bytes = 0;
size_t total_fw_bytes = 0;
+ assert(in->current_range && in->num_ranges > 0);
+ assert(in->current_range == in->ranges[in->num_ranges - 1]);
+
for (int n = 0; n < in->num_ranges; n++) {
struct demux_cached_range *range = in->ranges[n];
@@ -294,11 +306,17 @@ static void check_queue_consistency(struct demux_internal *in)
for (int i = 0; i < range->num_streams; i++) {
struct demux_queue *queue = range->streams[i];
+ assert(queue->range == range);
+
size_t fw_bytes = 0;
size_t fw_packs = 0;
bool is_forward = false;
+ bool kf_found = false;
+ bool npt_found = false;
for (struct demux_packet *dp = queue->head; dp; dp = dp->next) {
is_forward |= dp == queue->ds->reader_head;
+ kf_found |= dp == queue->keyframe_latest;
+ npt_found |= dp == queue->next_prune_target;
size_t bytes = demux_packet_estimate_total_size(dp);
total_bytes += bytes;
@@ -315,6 +333,15 @@ static void check_queue_consistency(struct demux_internal *in)
if (!queue->head)
assert(!queue->tail);
+ // If the queue is currently used...
+ if (queue->ds->queue == queue) {
+ // ...reader_head and others must be in the queue.
+ assert(is_forward == !!queue->ds->reader_head);
+ assert(kf_found == !!queue->keyframe_latest);
+ }
+
+ assert(npt_found == !!queue->next_prune_target);
+
total_fw_bytes += fw_bytes;
if (range == in->current_range) {
@@ -323,6 +350,9 @@ static void check_queue_consistency(struct demux_internal *in)
} else {
assert(fw_bytes == 0 && fw_packs == 0);
}
+
+ if (queue->keyframe_latest)
+ assert(queue->keyframe_latest->keyframe);
}
}
@@ -331,20 +361,46 @@ static void check_queue_consistency(struct demux_internal *in)
}
#endif
-static void update_seek_ranges(struct demux_stream *ds)
+static void recompute_buffers(struct demux_stream *ds)
{
- struct demux_cached_range *range = ds->in->current_range;
+ ds->fw_packs = 0;
+ ds->fw_bytes = 0;
+ for (struct demux_packet *dp = ds->reader_head; dp; dp = dp->next) {
+ ds->fw_bytes += demux_packet_estimate_total_size(dp);
+ ds->fw_packs++;
+ }
+}
+
+// (this doesn't do most required things for a switch, like updating ds->queue)
+static void set_current_range(struct demux_internal *in,
+ struct demux_cached_range *range)
+{
+ in->current_range = range;
+
+ // Move to in->ranges[in->num_ranges-1] (for LRU sorting/invariant)
+ for (int n = 0; n < in->num_ranges; n++) {
+ if (in->ranges[n] == range) {
+ MP_TARRAY_REMOVE_AT(in->ranges, in->num_ranges, n);
+ break;
+ }
+ }
+ MP_TARRAY_APPEND(in, in->ranges, in->num_ranges, range);
+}
+
+// Refresh range->seek_start/end.
+static void update_seek_ranges(struct demux_cached_range *range)
+{
range->seek_start = range->seek_end = MP_NOPTS_VALUE;
for (int n = 0; n < range->num_streams; n++) {
struct demux_queue *queue = range->streams[n];
- if (queue->ds->active) {
+ if (queue->ds->selected) {
range->seek_start = MP_PTS_MAX(range->seek_start, queue->seek_start);
range->seek_end = MP_PTS_MIN(range->seek_end, queue->seek_end);
- if (range->seek_start == MP_NOPTS_VALUE ||
- range->seek_end == MP_NOPTS_VALUE)
+ if (queue->seek_start == MP_NOPTS_VALUE ||
+ queue->seek_end == MP_NOPTS_VALUE)
{
range->seek_start = range->seek_end = MP_NOPTS_VALUE;
break;
@@ -356,11 +412,92 @@ static void update_seek_ranges(struct demux_stream *ds)
range->seek_start = range->seek_end = MP_NOPTS_VALUE;
}
+// Remove the packet dp from the queue. prev must be the packet before dp, or
+// NULL if dp is the first packet.
+// This does not update in->fw_bytes/in->fw_packs.
+static void remove_packet(struct demux_queue *queue, struct demux_packet *prev,
+ struct demux_packet *dp)
+{
+ if (prev) {
+ assert(prev->next == dp);
+ } else {
+ assert(queue->head == dp);
+ }
+
+ assert(queue->ds->reader_head != dp);
+ if (queue->next_prune_target == dp)
+ queue->next_prune_target = NULL;
+ if (queue->keyframe_latest == dp)
+ queue->keyframe_latest = NULL;
+
+ queue->ds->in->total_bytes -= demux_packet_estimate_total_size(dp);
+
+ if (prev) {
+ prev->next = dp->next;
+ if (!prev->next)
+ queue->tail = prev;
+ } else {
+ queue->head = dp->next;
+ if (!queue->head)
+ queue->tail = NULL;
+ }
+
+ talloc_free(dp);
+}
+
+static void clear_queue(struct demux_queue *queue)
+{
+ struct demux_stream *ds = queue->ds;
+ struct demux_internal *in = ds->in;
+
+ struct demux_packet *dp = queue->head;
+ while (dp) {
+ struct demux_packet *dn = dp->next;
+ in->total_bytes -= demux_packet_estimate_total_size(dp);
+ assert(ds->reader_head != dp);
+ talloc_free(dp);
+ dp = dn;
+ }
+ queue->head = queue->tail = NULL;
+ queue->next_prune_target = NULL;
+ queue->keyframe_latest = NULL;
+ queue->seek_start = queue->seek_end = MP_NOPTS_VALUE;
+
+ queue->correct_dts = queue->correct_pos = true;
+ queue->last_pos = -1;
+ queue->last_ts = queue->last_dts = MP_NOPTS_VALUE;
+ queue->keyframe_latest = NULL;
+ queue->keyframe_pts = queue->keyframe_end_pts = MP_NOPTS_VALUE;
+}
+
+static void clear_cached_range(struct demux_internal *in,
+ struct demux_cached_range *range)
+{
+ for (int n = 0; n < range->num_streams; n++)
+ clear_queue(range->streams[n]);
+ update_seek_ranges(range);
+}
+
+static void free_empty_cached_ranges(struct demux_internal *in)
+{
+ assert(in->current_range && in->num_ranges > 0);
+ assert(in->current_range == in->ranges[in->num_ranges - 1]);
+
+ for (int n = in->num_ranges - 2; n >= 0; n--) {
+ struct demux_cached_range *range = in->ranges[n];
+ if (range->seek_start == MP_NOPTS_VALUE) {
+ clear_cached_range(in, range);
+ MP_TARRAY_REMOVE_AT(in->ranges, in->num_ranges, n);
+ }
+ }
+}
+
static void ds_clear_reader_state(struct demux_stream *ds)
{
ds->in->fw_bytes -= ds->fw_bytes;
ds->reader_head = NULL;
+ ds->eof = false;
ds->base_ts = ds->last_br_ts = MP_NOPTS_VALUE;
ds->last_br_bytes = 0;
ds->bitrate = -1;
@@ -370,33 +507,56 @@ static void ds_clear_reader_state(struct demux_stream *ds)
ds->fw_packs = 0;
}
-static void ds_clear_demux_state(struct demux_stream *ds)
+static void update_stream_selection_state(struct demux_internal *in,
+ struct demux_stream *ds,
+ bool selected, bool new)
{
- ds_clear_reader_state(ds);
+ if (ds->selected != selected || new) {
+ ds->selected = selected;
+ ds->eof = false;
+ ds->refreshing = false;
+ ds->need_refresh = false;
- demux_packet_t *dp = ds->queue->head;
- while (dp) {
- demux_packet_t *dn = dp->next;
- ds->in->total_bytes -= demux_packet_estimate_total_size(dp);
- free_demux_packet(dp);
- dp = dn;
+ ds_clear_reader_state(ds);
+
+ // Make sure any stream reselection or addition is reflected in the seek
+ // ranges, and also get rid of data that is not needed anymore (or
+ // rather, which can't be kept consistent).
+ for (int n = 0; n < in->num_ranges; n++) {
+ struct demux_cached_range *range = in->ranges[n];
+
+ if (!ds->selected)
+ clear_queue(range->streams[ds->index]);
+
+ update_seek_ranges(range);
+ }
+
+ free_empty_cached_ranges(in);
}
- ds->queue->head = ds->queue->tail = NULL;
- ds->next_prune_target = NULL;
- ds->keyframe_latest = NULL;
- ds->eof = false;
- ds->active = false;
- ds->refreshing = false;
- ds->need_refresh = false;
- ds->correct_dts = ds->correct_pos = true;
- ds->last_pos = -1;
- ds->last_ts = ds->last_dts = MP_NOPTS_VALUE;
- ds->queue->seek_start = ds->queue->seek_end = MP_NOPTS_VALUE;
- ds->keyframe_seen = false;
- ds->keyframe_pts = ds->keyframe_end_pts = MP_NOPTS_VALUE;
+ // We still have to go over the whole stream list to update ds->eager for
+ // other streams too, because they depend on other stream's selections.
- update_seek_ranges(ds);
+ bool any_av_streams = false;
+
+ for (int n = 0; n < in->num_streams; n++) {
+ struct demux_stream *s = in->streams[n]->ds;
+
+ s->eager = s->selected && !s->sh->attached_picture;
+ if (s->eager)
+ any_av_streams |= s->type != STREAM_SUB;
+ }
+
+ // Subtitles are only eagerly read if there are no other eagerly read
+ // streams.
+ if (any_av_streams) {
+ for (int n = 0; n < in->num_streams; n++) {
+ struct demux_stream *s = in->streams[n]->ds;
+
+ if (s->type == STREAM_SUB)
+ s->eager = false;
+ }
+ }
}
void demux_set_ts_offset(struct demuxer *demuxer, double offset)
@@ -407,6 +567,23 @@ void demux_set_ts_offset(struct demuxer *demuxer, double offset)
pthread_mutex_unlock(&in->lock);
}
+static void add_missing_streams(struct demux_internal *in,
+ struct demux_cached_range *range)
+{
+ for (int n = range->num_streams; n < in->num_streams; n++) {
+ struct demux_stream *ds = in->streams[n]->ds;
+
+ struct demux_queue *queue = talloc_ptrtype(range, queue);
+ *queue = (struct demux_queue){
+ .ds = ds,
+ .range = range,
+ };
+ clear_queue(queue);
+ MP_TARRAY_APPEND(range, range->streams, range->num_streams, queue);
+ assert(range->streams[ds->index] == queue);
+ }
+}
+
// Allocate a new sh_stream of the given type. It either has to be released
// with talloc_free(), or added to a demuxer with demux_add_sh_stream(). You
// cannot add or read packets from the stream before it has been added.
@@ -441,7 +618,6 @@ static void demux_add_sh_stream_locked(struct demux_internal *in,
.sh = sh,
.type = sh->type,
.index = sh->index,
- .selected = in->autoselect,
.global_correct_dts = true,
.global_correct_pos = true,
};
@@ -462,20 +638,13 @@ static void demux_add_sh_stream_locked(struct demux_internal *in,
MP_TARRAY_APPEND(in, in->streams, in->num_streams, sh);
assert(in->streams[sh->index] == sh);
- for (int n = 0; n < in->num_ranges; n++) {
- struct demux_cached_range *range = in->ranges[n];
- struct demux_queue *queue = talloc_ptrtype(range, queue);
- *queue = (struct demux_queue){
- .ds = sh->ds,
- .seek_start = MP_NOPTS_VALUE,
- .seek_end = MP_NOPTS_VALUE,
- };
- MP_TARRAY_APPEND(range, range->streams, range->num_streams, queue);
- assert(range->streams[sh->ds->index] == queue);
- }
+ for (int n = 0; n < in->num_ranges; n++)
+ add_missing_streams(in, in->ranges[n]);
sh->ds->queue = in->current_range->streams[sh->ds->index];
+ update_stream_selection_state(in, sh->ds, in->autoselect, true);
+
in->events |= DEMUX_EVENT_STREAMS;
if (in->wakeup_cb)
in->wakeup_cb(in->wakeup_cb_ctx);
@@ -553,7 +722,8 @@ void free_demuxer(demuxer_t *demuxer)
if (demuxer->desc->close)
demuxer->desc->close(in->d_thread);
- clear_demux_state(in);
+ demux_flush(demuxer);
+ assert(in->total_bytes == 0);
for (int n = in->num_streams - 1; n >= 0; n--)
talloc_free(in->streams[n]);
@@ -683,7 +853,7 @@ static double get_refresh_seek_pts(struct demux_internal *in)
normal_seek &= ds->need_refresh;
ds->need_refresh = false;
- refresh_possible &= ds->correct_dts || ds->correct_pos;
+ refresh_possible &= ds->queue->correct_dts || ds->queue->correct_pos;
}
if (!needed || start_ts == MP_NOPTS_VALUE || !demux->desc->seek ||
@@ -702,7 +872,7 @@ static double get_refresh_seek_pts(struct demux_internal *in)
struct demux_stream *ds = in->streams[n]->ds;
// Streams which didn't have any packets yet will return all packets,
// other streams return packets only starting from the last position.
- if (ds->last_pos != -1 || ds->last_dts != MP_NOPTS_VALUE)
+ if (ds->queue->last_pos != -1 || ds->queue->last_dts != MP_NOPTS_VALUE)
ds->refreshing |= ds->selected;
}
@@ -710,27 +880,181 @@ static double get_refresh_seek_pts(struct demux_internal *in)
return start_ts - 1.0;
}
+// Check whether the next range in the list is, and if it appears to overlap,
+// try joining it into a single range.
+static void attempt_range_joining(struct demux_internal *in)
+{
+ struct demux_cached_range *next = NULL;
+ double next_dist = INFINITY;
+
+ assert(in->current_range && in->num_ranges > 0);
+ assert(in->current_range == in->ranges[in->num_ranges - 1]);
+
+ for (int n = 0; n < in->num_ranges - 1; n++) {
+ struct demux_cached_range *range = in->ranges[n];
+
+ if (in->current_range->seek_start <= range->seek_start) {
+ // This uses ">" to get some non-0 overlap.
+ double dist = in->current_range->seek_end - range->seek_start;
+ if (dist > 0 && dist < next_dist) {
+ next = range;
+ next_dist = dist;
+ }
+ }
+ }
+
+ if (!next)
+ return;
+
+ MP_VERBOSE(in, "going to join ranges %f-%f + %f-%f\n",
+ in->current_range->seek_start, in->current_range->seek_end,
+ next->seek_start, next->seek_end);
+
+ // Try to find a join point, where packets obviously overlap. (It would be
+ // better and faster to do this incrementally, but probably too complex.)
+ // The current range can overlap arbitrarily with the next one, not only by
+ // by the seek overlap, but for arbitrary packet readahead as well.
+ // We also drop the overlapping packets (if joining fails, we discard the
+ // entire next range anyway, so this does no harm).
+ for (int n = 0; n < in->num_streams; n++) {
+ struct demux_stream *ds = in->streams[n]->ds;
+
+ struct demux_queue *q1 = in->current_range->streams[n];
+ struct demux_queue *q2 = next->streams[n];
+
+ if (!ds->global_correct_pos && !ds->global_correct_dts) {
+ MP_WARN(in, "stream %d: ranges unjoinable\n", n);
+ goto failed;
+ }
+
+ struct demux_packet *end = q1->tail;
+ bool join_point_found = !end; // no packets yet -> joining will work
+ if (end) {
+ while (q2->head) {
+ struct demux_packet *dp = q2->head;
+
+ // Some weird corner-case. We'd have to search the equivalent
+ // packet in q1 to update it correctly. Better just give up.
+ if (dp == q2->keyframe_latest) {
+ MP_WARN(in, "stream %d: not enough keyframes\n", n);
+ goto failed;
+ }
+
+ // (Check for ">" too, to avoid incorrect joining in weird
+ // corner cases, where the next range misses the end packet.)
+ if ((ds->global_correct_dts && dp->dts >= end->dts) ||
+ (ds->global_correct_pos && dp->pos >= end->pos))
+ {
+ // Do some additional checks as a (imperfect) sanity check
+ // in case pos/dts are not "correct" across the ranges (we
+ // never actually check that).
+ if (dp->dts != end->dts || dp->pos != end->pos ||
+ dp->pts != end->pts || dp->len != end->len)
+ {
+ MP_WARN(in, "stream %d: weird demuxer behavior\n", n);
+ goto failed;
+ }
+
+ remove_packet(q2, NULL, dp);
+ join_point_found = true;
+ break;
+ }
+
+ remove_packet(q2, NULL, dp);
+ }
+ }
+
+ // For enabled non-sparse streams, always require an overlap packet.
+ if (ds->eager && !join_point_found) {
+ MP_WARN(in, "stream %d: no joint point found\n", n);
+ goto failed;
+ }
+ }
+
+ // Actually join the ranges. Now that we think it will work, mutate the
+ // data associated with the current range. We actually make the next range
+ // the current range.
+
+ in->fw_bytes = 0;
+
+ for (int n = 0; n < in->num_streams; n++) {
+ struct demux_queue *q1 = in->current_range->streams[n];
+ struct demux_queue *q2 = next->streams[n];
+
+ struct demux_stream *ds = in->streams[n]->ds;
+
+ if (q1->head) {
+ q1->tail->next = q2->head;
+ q2->head = q1->head;
+ if (!q2->head || !q2->head->next)
+ q2->tail = q2->head;
+ }
+ q2->next_prune_target = q1->next_prune_target;
+ q2->seek_start = q1->seek_start;
+ q2->correct_dts &= q1->correct_dts;
+ q2->correct_pos &= q1->correct_pos;
+
+ q1->head = q1->tail = NULL;
+ q1->next_prune_target = NULL;
+ q1->keyframe_latest = NULL;
+
+ assert(ds->queue == q1);
+ ds->queue = q2;
+
+ recompute_buffers(ds);
+ in->fw_bytes += ds->fw_bytes;
+
+ // For moving demuxer position.
+ ds->refreshing = true;
+ }
+
+ next->seek_start = in->current_range->seek_start;
+
+ // Move demuxing position to after the current range.
+ in->seeking = true;
+ in->seek_flags = SEEK_HR;
+ in->seek_pts = next->seek_end - 1.0;
+
+ struct demux_cached_range *old = in->current_range;
+ set_current_range(in, next);
+ clear_cached_range(in, old);
+
+ MP_VERBOSE(in, "ranges joined!\n");
+
+ next = NULL;
+failed:
+ if (next)
+ clear_cached_range(in, next);
+ free_empty_cached_ranges(in);
+}
+
// Determine seekable range when a packet is added. If dp==NULL, treat it as
// EOF (i.e. closes the current block).
// This has to deal with a number of corner cases, such as demuxers potentially
// starting output at non-keyframes.
+// Can join seek ranges, which messes with in->current_range and all.
static void adjust_seek_range_on_packet(struct demux_stream *ds,
struct demux_packet *dp)
{
+ struct demux_queue *queue = ds->queue;
+ bool attempt_range_join = false;
+
if (!ds->in->seekable_cache)
return;
if (!dp || dp->keyframe) {
- if (ds->keyframe_latest) {
- ds->keyframe_latest->kf_seek_pts = ds->keyframe_pts;
- if (ds->queue->seek_start == MP_NOPTS_VALUE)
- ds->queue->seek_start = ds->keyframe_pts;
- if (ds->keyframe_end_pts != MP_NOPTS_VALUE)
- ds->queue->seek_end = ds->keyframe_end_pts;
- update_seek_ranges(ds);
+ if (queue->keyframe_latest) {
+ queue->keyframe_latest->kf_seek_pts = queue->keyframe_pts;
+ double old_end = queue->range->seek_end;
+ if (queue->seek_start == MP_NOPTS_VALUE)
+ queue->seek_start = queue->keyframe_pts;
+ if (queue->keyframe_end_pts != MP_NOPTS_VALUE)
+ queue->seek_end = queue->keyframe_end_pts;
+ update_seek_ranges(queue->range);
+ attempt_range_join = queue->range->seek_end > old_end;
}
- ds->keyframe_latest = dp;
- ds->keyframe_pts = ds->keyframe_end_pts = MP_NOPTS_VALUE;
+ queue->keyframe_latest = dp;
+ queue->keyframe_pts = queue->keyframe_end_pts = MP_NOPTS_VALUE;
}
if (dp) {
@@ -740,9 +1064,12 @@ static void adjust_seek_range_on_packet(struct demux_stream *ds,
if (dp->segmented && (ts < dp->start || ts > dp->end))
ts = MP_NOPTS_VALUE;
- ds->keyframe_pts = MP_PTS_MIN(ds->keyframe_pts, ts);
- ds->keyframe_end_pts = MP_PTS_MAX(ds->keyframe_end_pts, ts);
+ queue->keyframe_pts = MP_PTS_MIN(queue->keyframe_pts, ts);
+ queue->keyframe_end_pts = MP_PTS_MAX(queue->keyframe_end_pts, ts);
}
+
+ if (attempt_range_join)
+ attempt_range_joining(ds->in);
}
void demux_add_packet(struct sh_stream *stream, demux_packet_t *dp)
@@ -755,17 +1082,20 @@ void demux_add_packet(struct sh_stream *stream, demux_packet_t *dp)
struct demux_internal *in = ds->in;
pthread_mutex_lock(&in->lock);
+ struct demux_queue *queue = ds->queue;
+
bool drop = ds->refreshing;
if (ds->refreshing) {
// Resume reading once the old position was reached (i.e. we start
// returning packets where we left off before the refresh).
// If it's the same position, drop, but continue normally next time.
- if (ds->correct_dts) {
- ds->refreshing = dp->dts < ds->last_dts;
- } else if (ds->correct_pos) {
- ds->refreshing = dp->pos < ds->last_pos;
+ if (queue->correct_dts) {
+ ds->refreshing = dp->dts < queue->last_dts;
+ } else if (queue->correct_pos) {
+ ds->refreshing = dp->pos < queue->last_pos;
} else {
ds->refreshing = false; // should not happen
+ MP_WARN(in, "stream %d: demux refreshing failed\n", ds->index);
}
}
@@ -775,12 +1105,12 @@ void demux_add_packet(struct sh_stream *stream, demux_packet_t *dp)
return;
}
- ds->correct_pos &= dp->pos >= 0 && dp->pos > ds->last_pos;
- ds->correct_dts &= dp->dts != MP_NOPTS_VALUE && dp->dts > ds->last_dts;
- ds->last_pos = dp->pos;
- ds->last_dts = dp->dts;
- ds->global_correct_pos &= ds->correct_pos;
- ds->global_correct_dts &= ds->correct_dts;
+ queue->correct_pos &= dp->pos >= 0 && dp->pos > queue->last_pos;
+ queue->correct_dts &= dp->dts != MP_NOPTS_VALUE && dp->dts > queue->last_dts;
+ queue->last_pos = dp->pos;
+ queue->last_dts = dp->dts;
+ ds->global_correct_pos &= queue->correct_pos;
+ ds->global_correct_dts &= queue->correct_dts;
dp->stream = stream->index;
dp->next = NULL;
@@ -800,17 +1130,15 @@ void demux_add_packet(struct sh_stream *stream, demux_packet_t *dp)
in->fw_bytes += bytes;
}
- if (ds->queue->tail) {
+ if (queue->tail) {
// next packet in stream
- ds->queue->tail->next = dp;
- ds->queue->tail = dp;
+ queue->tail->next = dp;
+ queue->tail = dp;
} else {
// first packet in stream
- ds->queue->head = ds->queue->tail = dp;
+ queue->head = queue->tail = dp;
}
- adjust_seek_range_on_packet(ds, dp);
-
if (!ds->ignore_eof) {
// obviously not true anymore
ds->eof = false;
@@ -825,15 +1153,17 @@ void demux_add_packet(struct sh_stream *stream, demux_packet_t *dp)
double ts = dp->dts == MP_NOPTS_VALUE ? dp->pts : dp->dts;
if (dp->segmented)
ts = MP_PTS_MIN(ts, dp->end);
- if (ts != MP_NOPTS_VALUE && (ts > ds->last_ts || ts + 10 < ds->last_ts))
- ds->last_ts = ts;
+ if (ts != MP_NOPTS_VALUE && (ts > queue->last_ts || ts + 10 < queue->last_ts))
+ queue->last_ts = ts;
if (ds->base_ts == MP_NOPTS_VALUE)
- ds->base_ts = ds->last_ts;
+ ds->base_ts = queue->last_ts;
MP_DBG(in, "append packet to %s: size=%d pts=%f dts=%f pos=%"PRIi64" "
"[num=%zd size=%zd]\n", stream_type_name(stream->type),
dp->len, dp->pts, dp->dts, dp->pos, ds->fw_packs, ds->fw_bytes);
+ adjust_seek_range_on_packet(ds, dp);
+
// Wake up if this was the first packet after start/possible underrun.
if (ds->in->wakeup_cb && ds->reader_head && !ds->reader_head->next)
ds->in->wakeup_cb(ds->in->wakeup_cb_ctx);
@@ -847,20 +1177,23 @@ static bool read_packet(struct demux_internal *in)
in->eof = false;
in->idle = true;
+ if (!in->reading)
+ return false;
+
// Check if we need to read a new packet. We do this if all queues are below
// the minimum, or if a stream explicitly needs new packets. Also includes
// safe-guards against packet queue overflow.
- bool active = false, read_more = false, prefetch_more = false;
+ bool read_more = false, prefetch_more = false;
for (int n = 0; n < in->num_streams; n++) {
struct demux_stream *ds = in->streams[n]->ds;
- active |= ds->active;
- read_more |= (ds->active && !ds->reader_head) || ds->refreshing;
- if (ds->active && ds->last_ts != MP_NOPTS_VALUE && in->min_secs > 0 &&
- ds->base_ts != MP_NOPTS_VALUE && ds->last_ts >= ds->base_ts)
- prefetch_more |= ds->last_ts - ds->base_ts < in->min_secs;
- }
- MP_DBG(in, "bytes=%zd, active=%d, read_more=%d prefetch_more=%d\n",
- in->fw_bytes, active, read_more, prefetch_more);
+ read_more |= (ds->eager && !ds->reader_head) || ds->refreshing;
+ if (ds->eager && ds->queue->last_ts != MP_NOPTS_VALUE &&
+ in->min_secs > 0 && ds->base_ts != MP_NOPTS_VALUE &&
+ ds->queue->last_ts >= ds->base_ts)
+ prefetch_more |= ds->queue->last_ts - d