summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--DOCS/man/input.rst6
-rw-r--r--DOCS/man/options.rst7
-rw-r--r--demux/demux.c825
-rw-r--r--demux/demux.h2
4 files changed, 610 insertions, 230 deletions
diff --git a/DOCS/man/input.rst b/DOCS/man/input.rst
index 987e369a15..2b3e2a3795 100644
--- a/DOCS/man/input.rst
+++ b/DOCS/man/input.rst
@@ -1264,9 +1264,9 @@ Property list
Each entry in ``seekable-ranges`` represents a region in the demuxer cache
that can be seeked to. If there are multiple demuxers active, this only
returns information about the "main" demuxer, but might be changed in
- future to return unified information about all demuxers. There is currently
- only at most 1 range. Should the player implement caching for multiple
- ranges, the order of the ranges will be unspecified and arbitrary.
+ future to return unified information about all demuxers. The ranges are in
+ arbitrary order. Often, ranges will overlap for a bit, before being joined.
+ In broken corner cases, ranges may overlap all over the place.
The end of a seek range is usually smaller than the value returned by the
``demuxer-cache-time`` property, because that property returns the guessed
diff --git a/DOCS/man/options.rst b/DOCS/man/options.rst
index 7dd1dc6af9..490b815415 100644
--- a/DOCS/man/options.rst
+++ b/DOCS/man/options.rst
@@ -2879,11 +2879,12 @@ Demuxer
enabled, short seek offsets will not trigger a low level demuxer seek
(which means for example that slow network round trips or FFmpeg seek bugs
can be avoided). If a seek cannot happen within the cached range, a low
- level seek will be triggered. Seeking outside of the cache will always
- discard the full cache.
+ level seek will be triggered. Seeking outside of the cache will start a new
+ cached range, but can discard the old cache range if the demuxer exhibits
+ certain unsupported behavior.
Keep in mind that some events can flush the cache or force a low level
- seek anyway, such as switching tracks, or attmepting to seek before the
+ seek anyway, such as switching tracks, or attempting to seek before the
start or after the end of the file. This option is experimental - thus
disabled, and bugs are to be expected.
diff --git a/demux/demux.c b/demux/demux.c
index 85acce28db..9558605bad 100644
--- a/demux/demux.c
+++ b/demux/demux.c
@@ -157,6 +157,10 @@ struct demux_internal {
int max_bytes_bw;
int seekable_cache;
+ // At least one decoder actually requested data since init or the last seek.
+ // Do this to allow the decoder thread to select streams before starting.
+ bool reading;
+
// Set if we know that we are at the start of the file. This is used to
// avoid a redundant initial seek after enabling streams. We could just
// allow it, but to avoid buggy seeking affecting normal playback, we don't.
@@ -183,7 +187,7 @@ struct demux_internal {
size_t fw_bytes; // sum of forward packet data in current_range
// Range from which decoder is reading, and to which demuxer is appending.
- // This is never NULL.
+ // This is never NULL. This is always ranges[num_ranges - 1].
struct demux_cached_range *current_range;
// Cached state.
@@ -202,18 +206,34 @@ struct demux_cached_range {
struct demux_queue **streams;
int num_streams;
- // Computed from the stream queue's values.
+ // Computed from the stream queue's values. These fields (unlike as with
+ // demux_queue) are always either NOPTS, or fully valid.
double seek_start, seek_end;
};
// A continuous list of cached packets for a single stream/range. There is one
-// for each stream and range.
+// for each stream and range. Also contains some state for use during demuxing
+// (keeping it across seeks makes it easier to resume demuxing).
struct demux_queue {
struct demux_stream *ds;
+ struct demux_cached_range *range;
struct demux_packet *head;
struct demux_packet *tail;
+ struct demux_packet *next_prune_target; // cached value for faster pruning
+
+ bool correct_dts; // packet DTS is strictly monotonically increasing
+ bool correct_pos; // packet pos is strictly monotonically increasing
+ int64_t last_pos; // for determining correct_pos
+ double last_dts; // for determining correct_dts
+ double last_ts; // timestamp of the last packet added to queue
+
+ // for incrementally determining seek PTS range
+ double keyframe_pts, keyframe_end_pts;
+ struct demux_packet *keyframe_latest;
+
+ // incrementally maintained seek range, possibly invalid
double seek_start, seek_end;
};
@@ -226,24 +246,14 @@ struct demux_stream {
// demuxer state
bool selected; // user wants packets from this stream
- bool active; // try to keep at least 1 packet queued
+ bool eager; // try to keep at least 1 packet queued
// if false, this stream is disabled, or passively
// read (like subtitles)
- bool eof; // end of demuxed stream? (true if all buffers empty)
bool need_refresh; // enabled mid-stream
bool refreshing;
- bool correct_dts; // packet DTS is strictly monotonically increasing
- bool correct_pos; // packet pos is strictly monotonically increasing
- int64_t last_pos; // for determining correct_pos
- double last_dts; // for determining correct_dts
+
bool global_correct_dts;// all observed so far
bool global_correct_pos;
- double last_ts; // timestamp of the last packet added to queue
- struct demux_packet *next_prune_target; // cached value for faster pruning
- // for incrementally determining seek PTS range
- bool keyframe_seen;
- double keyframe_pts, keyframe_end_pts;
- struct demux_packet *keyframe_latest;
// current queue - used both for reading and demuxing (this is never NULL)
struct demux_queue *queue;
@@ -256,6 +266,7 @@ struct demux_stream {
double bitrate;
size_t fw_packs; // number of packets in buffer (forward)
size_t fw_bytes; // total bytes of packets in buffer (forward)
+ bool eof; // end of demuxed stream? (true if no more packets)
struct demux_packet *reader_head; // points at current decoder position
bool skip_to_keyframe;
bool attached_picture_added;
@@ -276,8 +287,6 @@ struct demux_stream {
static void demuxer_sort_chapters(demuxer_t *demuxer);
static void *demux_thread(void *pctx);
static void update_cache(struct demux_internal *in);
-static int cached_demux_control(struct demux_internal *in, int cmd, void *arg);
-static void clear_demux_state(struct demux_internal *in);
#if 0
// very expensive check for redundant cached queue state
@@ -286,6 +295,9 @@ static void check_queue_consistency(struct demux_internal *in)
size_t total_bytes = 0;
size_t total_fw_bytes = 0;
+ assert(in->current_range && in->num_ranges > 0);
+ assert(in->current_range == in->ranges[in->num_ranges - 1]);
+
for (int n = 0; n < in->num_ranges; n++) {
struct demux_cached_range *range = in->ranges[n];
@@ -294,11 +306,17 @@ static void check_queue_consistency(struct demux_internal *in)
for (int i = 0; i < range->num_streams; i++) {
struct demux_queue *queue = range->streams[i];
+ assert(queue->range == range);
+
size_t fw_bytes = 0;
size_t fw_packs = 0;
bool is_forward = false;
+ bool kf_found = false;
+ bool npt_found = false;
for (struct demux_packet *dp = queue->head; dp; dp = dp->next) {
is_forward |= dp == queue->ds->reader_head;
+ kf_found |= dp == queue->keyframe_latest;
+ npt_found |= dp == queue->next_prune_target;
size_t bytes = demux_packet_estimate_total_size(dp);
total_bytes += bytes;
@@ -315,6 +333,15 @@ static void check_queue_consistency(struct demux_internal *in)
if (!queue->head)
assert(!queue->tail);
+ // If the queue is currently used...
+ if (queue->ds->queue == queue) {
+ // ...reader_head and others must be in the queue.
+ assert(is_forward == !!queue->ds->reader_head);
+ assert(kf_found == !!queue->keyframe_latest);
+ }
+
+ assert(npt_found == !!queue->next_prune_target);
+
total_fw_bytes += fw_bytes;
if (range == in->current_range) {
@@ -323,6 +350,9 @@ static void check_queue_consistency(struct demux_internal *in)
} else {
assert(fw_bytes == 0 && fw_packs == 0);
}
+
+ if (queue->keyframe_latest)
+ assert(queue->keyframe_latest->keyframe);
}
}
@@ -331,20 +361,46 @@ static void check_queue_consistency(struct demux_internal *in)
}
#endif
-static void update_seek_ranges(struct demux_stream *ds)
+static void recompute_buffers(struct demux_stream *ds)
{
- struct demux_cached_range *range = ds->in->current_range;
+ ds->fw_packs = 0;
+ ds->fw_bytes = 0;
+ for (struct demux_packet *dp = ds->reader_head; dp; dp = dp->next) {
+ ds->fw_bytes += demux_packet_estimate_total_size(dp);
+ ds->fw_packs++;
+ }
+}
+
+// (this doesn't do most required things for a switch, like updating ds->queue)
+static void set_current_range(struct demux_internal *in,
+ struct demux_cached_range *range)
+{
+ in->current_range = range;
+
+ // Move to in->ranges[in->num_ranges-1] (for LRU sorting/invariant)
+ for (int n = 0; n < in->num_ranges; n++) {
+ if (in->ranges[n] == range) {
+ MP_TARRAY_REMOVE_AT(in->ranges, in->num_ranges, n);
+ break;
+ }
+ }
+ MP_TARRAY_APPEND(in, in->ranges, in->num_ranges, range);
+}
+
+// Refresh range->seek_start/end.
+static void update_seek_ranges(struct demux_cached_range *range)
+{
range->seek_start = range->seek_end = MP_NOPTS_VALUE;
for (int n = 0; n < range->num_streams; n++) {
struct demux_queue *queue = range->streams[n];
- if (queue->ds->active) {
+ if (queue->ds->selected) {
range->seek_start = MP_PTS_MAX(range->seek_start, queue->seek_start);
range->seek_end = MP_PTS_MIN(range->seek_end, queue->seek_end);
- if (range->seek_start == MP_NOPTS_VALUE ||
- range->seek_end == MP_NOPTS_VALUE)
+ if (queue->seek_start == MP_NOPTS_VALUE ||
+ queue->seek_end == MP_NOPTS_VALUE)
{
range->seek_start = range->seek_end = MP_NOPTS_VALUE;
break;
@@ -356,11 +412,92 @@ static void update_seek_ranges(struct demux_stream *ds)
range->seek_start = range->seek_end = MP_NOPTS_VALUE;
}
+// Remove the packet dp from the queue. prev must be the packet before dp, or
+// NULL if dp is the first packet.
+// This does not update in->fw_bytes/in->fw_packs.
+static void remove_packet(struct demux_queue *queue, struct demux_packet *prev,
+ struct demux_packet *dp)
+{
+ if (prev) {
+ assert(prev->next == dp);
+ } else {
+ assert(queue->head == dp);
+ }
+
+ assert(queue->ds->reader_head != dp);
+ if (queue->next_prune_target == dp)
+ queue->next_prune_target = NULL;
+ if (queue->keyframe_latest == dp)
+ queue->keyframe_latest = NULL;
+
+ queue->ds->in->total_bytes -= demux_packet_estimate_total_size(dp);
+
+ if (prev) {
+ prev->next = dp->next;
+ if (!prev->next)
+ queue->tail = prev;
+ } else {
+ queue->head = dp->next;
+ if (!queue->head)
+ queue->tail = NULL;
+ }
+
+ talloc_free(dp);
+}
+
+static void clear_queue(struct demux_queue *queue)
+{
+ struct demux_stream *ds = queue->ds;
+ struct demux_internal *in = ds->in;
+
+ struct demux_packet *dp = queue->head;
+ while (dp) {
+ struct demux_packet *dn = dp->next;
+ in->total_bytes -= demux_packet_estimate_total_size(dp);
+ assert(ds->reader_head != dp);
+ talloc_free(dp);
+ dp = dn;
+ }
+ queue->head = queue->tail = NULL;
+ queue->next_prune_target = NULL;
+ queue->keyframe_latest = NULL;
+ queue->seek_start = queue->seek_end = MP_NOPTS_VALUE;
+
+ queue->correct_dts = queue->correct_pos = true;
+ queue->last_pos = -1;
+ queue->last_ts = queue->last_dts = MP_NOPTS_VALUE;
+ queue->keyframe_latest = NULL;
+ queue->keyframe_pts = queue->keyframe_end_pts = MP_NOPTS_VALUE;
+}
+
+static void clear_cached_range(struct demux_internal *in,
+ struct demux_cached_range *range)
+{
+ for (int n = 0; n < range->num_streams; n++)
+ clear_queue(range->streams[n]);
+ update_seek_ranges(range);
+}
+
+static void free_empty_cached_ranges(struct demux_internal *in)
+{
+ assert(in->current_range && in->num_ranges > 0);
+ assert(in->current_range == in->ranges[in->num_ranges - 1]);
+
+ for (int n = in->num_ranges - 2; n >= 0; n--) {
+ struct demux_cached_range *range = in->ranges[n];
+ if (range->seek_start == MP_NOPTS_VALUE) {
+ clear_cached_range(in, range);
+ MP_TARRAY_REMOVE_AT(in->ranges, in->num_ranges, n);
+ }
+ }
+}
+
static void ds_clear_reader_state(struct demux_stream *ds)
{
ds->in->fw_bytes -= ds->fw_bytes;
ds->reader_head = NULL;
+ ds->eof = false;
ds->base_ts = ds->last_br_ts = MP_NOPTS_VALUE;
ds->last_br_bytes = 0;
ds->bitrate = -1;
@@ -370,33 +507,56 @@ static void ds_clear_reader_state(struct demux_stream *ds)
ds->fw_packs = 0;
}
-static void ds_clear_demux_state(struct demux_stream *ds)
+static void update_stream_selection_state(struct demux_internal *in,
+ struct demux_stream *ds,
+ bool selected, bool new)
{
- ds_clear_reader_state(ds);
+ if (ds->selected != selected || new) {
+ ds->selected = selected;
+ ds->eof = false;
+ ds->refreshing = false;
+ ds->need_refresh = false;
- demux_packet_t *dp = ds->queue->head;
- while (dp) {
- demux_packet_t *dn = dp->next;
- ds->in->total_bytes -= demux_packet_estimate_total_size(dp);
- free_demux_packet(dp);
- dp = dn;
+ ds_clear_reader_state(ds);
+
+ // Make sure any stream reselection or addition is reflected in the seek
+ // ranges, and also get rid of data that is not needed anymore (or
+ // rather, which can't be kept consistent).
+ for (int n = 0; n < in->num_ranges; n++) {
+ struct demux_cached_range *range = in->ranges[n];
+
+ if (!ds->selected)
+ clear_queue(range->streams[ds->index]);
+
+ update_seek_ranges(range);
+ }
+
+ free_empty_cached_ranges(in);
}
- ds->queue->head = ds->queue->tail = NULL;
- ds->next_prune_target = NULL;
- ds->keyframe_latest = NULL;
- ds->eof = false;
- ds->active = false;
- ds->refreshing = false;
- ds->need_refresh = false;
- ds->correct_dts = ds->correct_pos = true;
- ds->last_pos = -1;
- ds->last_ts = ds->last_dts = MP_NOPTS_VALUE;
- ds->queue->seek_start = ds->queue->seek_end = MP_NOPTS_VALUE;
- ds->keyframe_seen = false;
- ds->keyframe_pts = ds->keyframe_end_pts = MP_NOPTS_VALUE;
+ // We still have to go over the whole stream list to update ds->eager for
+ // other streams too, because they depend on other stream's selections.
- update_seek_ranges(ds);
+ bool any_av_streams = false;
+
+ for (int n = 0; n < in->num_streams; n++) {
+ struct demux_stream *s = in->streams[n]->ds;
+
+ s->eager = s->selected && !s->sh->attached_picture;
+ if (s->eager)
+ any_av_streams |= s->type != STREAM_SUB;
+ }
+
+ // Subtitles are only eagerly read if there are no other eagerly read
+ // streams.
+ if (any_av_streams) {
+ for (int n = 0; n < in->num_streams; n++) {
+ struct demux_stream *s = in->streams[n]->ds;
+
+ if (s->type == STREAM_SUB)
+ s->eager = false;
+ }
+ }
}
void demux_set_ts_offset(struct demuxer *demuxer, double offset)
@@ -407,6 +567,23 @@ void demux_set_ts_offset(struct demuxer *demuxer, double offset)
pthread_mutex_unlock(&in->lock);
}
+static void add_missing_streams(struct demux_internal *in,
+ struct demux_cached_range *range)
+{
+ for (int n = range->num_streams; n < in->num_streams; n++) {
+ struct demux_stream *ds = in->streams[n]->ds;
+
+ struct demux_queue *queue = talloc_ptrtype(range, queue);
+ *queue = (struct demux_queue){
+ .ds = ds,
+ .range = range,
+ };
+ clear_queue(queue);
+ MP_TARRAY_APPEND(range, range->streams, range->num_streams, queue);
+ assert(range->streams[ds->index] == queue);
+ }
+}
+
// Allocate a new sh_stream of the given type. It either has to be released
// with talloc_free(), or added to a demuxer with demux_add_sh_stream(). You
// cannot add or read packets from the stream before it has been added.
@@ -441,7 +618,6 @@ static void demux_add_sh_stream_locked(struct demux_internal *in,
.sh = sh,
.type = sh->type,
.index = sh->index,
- .selected = in->autoselect,
.global_correct_dts = true,
.global_correct_pos = true,
};
@@ -462,20 +638,13 @@ static void demux_add_sh_stream_locked(struct demux_internal *in,
MP_TARRAY_APPEND(in, in->streams, in->num_streams, sh);
assert(in->streams[sh->index] == sh);
- for (int n = 0; n < in->num_ranges; n++) {
- struct demux_cached_range *range = in->ranges[n];
- struct demux_queue *queue = talloc_ptrtype(range, queue);
- *queue = (struct demux_queue){
- .ds = sh->ds,
- .seek_start = MP_NOPTS_VALUE,
- .seek_end = MP_NOPTS_VALUE,
- };
- MP_TARRAY_APPEND(range, range->streams, range->num_streams, queue);
- assert(range->streams[sh->ds->index] == queue);
- }
+ for (int n = 0; n < in->num_ranges; n++)
+ add_missing_streams(in, in->ranges[n]);
sh->ds->queue = in->current_range->streams[sh->ds->index];
+ update_stream_selection_state(in, sh->ds, in->autoselect, true);
+
in->events |= DEMUX_EVENT_STREAMS;
if (in->wakeup_cb)
in->wakeup_cb(in->wakeup_cb_ctx);
@@ -553,7 +722,8 @@ void free_demuxer(demuxer_t *demuxer)
if (demuxer->desc->close)
demuxer->desc->close(in->d_thread);
- clear_demux_state(in);
+ demux_flush(demuxer);
+ assert(in->total_bytes == 0);
for (int n = in->num_streams - 1; n >= 0; n--)
talloc_free(in->streams[n]);
@@ -683,7 +853,7 @@ static double get_refresh_seek_pts(struct demux_internal *in)
normal_seek &= ds->need_refresh;
ds->need_refresh = false;
- refresh_possible &= ds->correct_dts || ds->correct_pos;
+ refresh_possible &= ds->queue->correct_dts || ds->queue->correct_pos;
}
if (!needed || start_ts == MP_NOPTS_VALUE || !demux->desc->seek ||
@@ -702,7 +872,7 @@ static double get_refresh_seek_pts(struct demux_internal *in)
struct demux_stream *ds = in->streams[n]->ds;
// Streams which didn't have any packets yet will return all packets,
// other streams return packets only starting from the last position.
- if (ds->last_pos != -1 || ds->last_dts != MP_NOPTS_VALUE)
+ if (ds->queue->last_pos != -1 || ds->queue->last_dts != MP_NOPTS_VALUE)
ds->refreshing |= ds->selected;
}
@@ -710,27 +880,181 @@ static double get_refresh_seek_pts(struct demux_internal *in)
return start_ts - 1.0;
}
+// Check whether the next range in the list is, and if it appears to overlap,
+// try joining it into a single range.
+static void attempt_range_joining(struct demux_internal *in)
+{
+ struct demux_cached_range *next = NULL;
+ double next_dist = INFINITY;
+
+ assert(in->current_range && in->num_ranges > 0);
+ assert(in->current_range == in->ranges[in->num_ranges - 1]);
+
+ for (int n = 0; n < in->num_ranges - 1; n++) {
+ struct demux_cached_range *range = in->ranges[n];
+
+ if (in->current_range->seek_start <= range->seek_start) {
+ // This uses ">" to get some non-0 overlap.
+ double dist = in->current_range->seek_end - range->seek_start;
+ if (dist > 0 && dist < next_dist) {
+ next = range;
+ next_dist = dist;
+ }
+ }
+ }
+
+ if (!next)
+ return;
+
+ MP_VERBOSE(in, "going to join ranges %f-%f + %f-%f\n",
+ in->current_range->seek_start, in->current_range->seek_end,
+ next->seek_start, next->seek_end);
+
+ // Try to find a join point, where packets obviously overlap. (It would be
+ // better and faster to do this incrementally, but probably too complex.)
+ // The current range can overlap arbitrarily with the next one, not only by
+ // by the seek overlap, but for arbitrary packet readahead as well.
+ // We also drop the overlapping packets (if joining fails, we discard the
+ // entire next range anyway, so this does no harm).
+ for (int n = 0; n < in->num_streams; n++) {
+ struct demux_stream *ds = in->streams[n]->ds;
+
+ struct demux_queue *q1 = in->current_range->streams[n];
+ struct demux_queue *q2 = next->streams[n];
+
+ if (!ds->global_correct_pos && !ds->global_correct_dts) {
+ MP_WARN(in, "stream %d: ranges unjoinable\n", n);
+ goto failed;
+ }
+
+ struct demux_packet *end = q1->tail;
+ bool join_point_found = !end; // no packets yet -> joining will work
+ if (end) {
+ while (q2->head) {
+ struct demux_packet *dp = q2->head;
+
+ // Some weird corner-case. We'd have to search the equivalent
+ // packet in q1 to update it correctly. Better just give up.
+ if (dp == q2->keyframe_latest) {
+ MP_WARN(in, "stream %d: not enough keyframes\n", n);
+ goto failed;
+ }
+
+ // (Check for ">" too, to avoid incorrect joining in weird
+ // corner cases, where the next range misses the end packet.)
+ if ((ds->global_correct_dts && dp->dts >= end->dts) ||
+ (ds->global_correct_pos && dp->pos >= end->pos))
+ {
+ // Do some additional checks as a (imperfect) sanity check
+ // in case pos/dts are not "correct" across the ranges (we
+ // never actually check that).
+ if (dp->dts != end->dts || dp->pos != end->pos ||
+ dp->pts != end->pts || dp->len != end->len)
+ {
+ MP_WARN(in, "stream %d: weird demuxer behavior\n", n);
+ goto failed;
+ }
+
+ remove_packet(q2, NULL, dp);
+ join_point_found = true;
+ break;
+ }
+
+ remove_packet(q2, NULL, dp);
+ }
+ }
+
+ // For enabled non-sparse streams, always require an overlap packet.
+ if (ds->eager && !join_point_found) {
+ MP_WARN(in, "stream %d: no joint point found\n", n);
+ goto failed;
+ }
+ }
+
+ // Actually join the ranges. Now that we think it will work, mutate the
+ // data associated with the current range. We actually make the next range
+ // the current range.
+
+ in->fw_bytes = 0;
+
+ for (int n = 0; n < in->num_streams; n++) {
+ struct demux_queue *q1 = in->current_range->streams[n];
+ struct demux_queue *q2 = next->streams[n];
+
+ struct demux_stream *ds = in->streams[n]->ds;
+
+ if (q1->head) {
+ q1->tail->next = q2->head;
+ q2->head = q1->head;
+ if (!q2->head || !q2->head->next)
+ q2->tail = q2->head;
+ }
+ q2->next_prune_target = q1->next_prune_target;
+ q2->seek_start = q1->seek_start;
+ q2->correct_dts &= q1->correct_dts;
+ q2->correct_pos &= q1->correct_pos;
+
+ q1->head = q1->tail = NULL;
+ q1->next_prune_target = NULL;
+ q1->keyframe_latest = NULL;
+
+ assert(ds->queue == q1);
+ ds->queue = q2;
+
+ recompute_buffers(ds);
+ in->fw_bytes += ds->fw_bytes;
+
+ // For moving demuxer position.
+ ds->refreshing = true;
+ }
+
+ next->seek_start = in->current_range->seek_start;
+
+ // Move demuxing position to after the current range.
+ in->seeking = true;
+ in->seek_flags = SEEK_HR;
+ in->seek_pts = next->seek_end - 1.0;
+
+ struct demux_cached_range *old = in->current_range;
+ set_current_range(in, next);
+ clear_cached_range(in, old);
+
+ MP_VERBOSE(in, "ranges joined!\n");
+
+ next = NULL;
+failed:
+ if (next)
+ clear_cached_range(in, next);
+ free_empty_cached_ranges(in);
+}
+
// Determine seekable range when a packet is added. If dp==NULL, treat it as
// EOF (i.e. closes the current block).
// This has to deal with a number of corner cases, such as demuxers potentially
// starting output at non-keyframes.
+// Can join seek ranges, which messes with in->current_range and all.
static void adjust_seek_range_on_packet(struct demux_stream *ds,
struct demux_packet *dp)
{
+ struct demux_queue *queue = ds->queue;
+ bool attempt_range_join = false;
+
if (!ds->in->seekable_cache)
return;
if (!dp || dp->keyframe) {
- if (ds->keyframe_latest) {
- ds->keyframe_latest->kf_seek_pts = ds->keyframe_pts;
- if (ds->queue->seek_start == MP_NOPTS_VALUE)
- ds->queue->seek_start = ds->keyframe_pts;
- if (ds->keyframe_end_pts != MP_NOPTS_VALUE)
- ds->queue->seek_end = ds->keyframe_end_pts;
- update_seek_ranges(ds);
+ if (queue->keyframe_latest) {
+ queue->keyframe_latest->kf_seek_pts = queue->keyframe_pts;
+ double old_end = queue->range->seek_end;
+ if (queue->seek_start == MP_NOPTS_VALUE)
+ queue->seek_start = queue->keyframe_pts;
+ if (queue->keyframe_end_pts != MP_NOPTS_VALUE)
+ queue->seek_end = queue->keyframe_end_pts;
+ update_seek_ranges(queue->range);
+ attempt_range_join = queue->range->seek_end > old_end;
}
- ds->keyframe_latest = dp;
- ds->keyframe_pts = ds->keyframe_end_pts = MP_NOPTS_VALUE;
+ queue->keyframe_latest = dp;
+ queue->keyframe_pts = queue->keyframe_end_pts = MP_NOPTS_VALUE;
}
if (dp) {
@@ -740,9 +1064,12 @@ static void adjust_seek_range_on_packet(struct demux_stream *ds,
if (dp->segmented && (ts < dp->start || ts > dp->end))
ts = MP_NOPTS_VALUE;
- ds->keyframe_pts = MP_PTS_MIN(ds->keyframe_pts, ts);
- ds->keyframe_end_pts = MP_PTS_MAX(ds->keyframe_end_pts, ts);
+ queue->keyframe_pts = MP_PTS_MIN(queue->keyframe_pts, ts);
+ queue->keyframe_end_pts = MP_PTS_MAX(queue->keyframe_end_pts, ts);
}
+
+ if (attempt_range_join)
+ attempt_range_joining(ds->in);
}
void demux_add_packet(struct sh_stream *stream, demux_packet_t *dp)
@@ -755,17 +1082,20 @@ void demux_add_packet(struct sh_stream *stream, demux_packet_t *dp)
struct demux_internal *in = ds->in;
pthread_mutex_lock(&in->lock);
+ struct demux_queue *queue = ds->queue;
+
bool drop = ds->refreshing;
if (ds->refreshing) {
// Resume reading once the old position was reached (i.e. we start
// returning packets where we left off before the refresh).
// If it's the same position, drop, but continue normally next time.
- if (ds->correct_dts) {
- ds->refreshing = dp->dts < ds->last_dts;
- } else if (ds->correct_pos) {
- ds->refreshing = dp->pos < ds->last_pos;
+ if (queue->correct_dts) {
+ ds->refreshing = dp->dts < queue->last_dts;
+ } else if (queue->correct_pos) {
+ ds->refreshing = dp->pos < queue->last_pos;
} else {
ds->refreshing = false; // should not happen
+ MP_WARN(in, "stream %d: demux refreshing failed\n", ds->index);
}
}
@@ -775,12 +1105,12 @@ void demux_add_packet(struct sh_stream *stream, demux_packet_t *dp)
return;
}
- ds->correct_pos &= dp->pos >= 0 && dp->pos > ds->last_pos;
- ds->correct_dts &= dp->dts != MP_NOPTS_VALUE && dp->dts > ds->last_dts;
- ds->last_pos = dp->pos;
- ds->last_dts = dp->dts;
- ds->global_correct_pos &= ds->correct_pos;
- ds->global_correct_dts &= ds->correct_dts;
+ queue->correct_pos &= dp->pos >= 0 && dp->pos > queue->last_pos;
+ queue->correct_dts &= dp->dts != MP_NOPTS_VALUE && dp->dts > queue->last_dts;
+ queue->last_pos = dp->pos;
+ queue->last_dts = dp->dts;
+ ds->global_correct_pos &= queue->correct_pos;
+ ds->global_correct_dts &= queue->correct_dts;
dp->stream = stream->index;
dp->next = NULL;
@@ -800,17 +1130,15 @@ void demux_add_packet(struct sh_stream *stream, demux_packet_t *dp)
in->fw_bytes += bytes;
}
- if (ds->queue->tail) {
+ if (queue->tail) {
// next packet in stream
- ds->queue->tail->next = dp;
- ds->queue->tail = dp;
+ queue->tail->next = dp;
+ queue->tail = dp;
} else {
// first packet in stream
- ds->queue->head = ds->queue->tail = dp;
+ queue->head = queue->tail = dp;
}
- adjust_seek_range_on_packet(ds, dp);
-
if (!ds->ignore_eof) {
// obviously not true anymore
ds->eof = false;
@@ -825,15 +1153,17 @@ void demux_add_packet(struct sh_stream *stream, demux_packet_t *dp)
double ts = dp->dts == MP_NOPTS_VALUE ? dp->pts : dp->dts;
if (dp->segmented)
ts = MP_PTS_MIN(ts, dp->end);
- if (ts != MP_NOPTS_VALUE && (ts > ds->last_ts || ts + 10 < ds->last_ts))
- ds->last_ts = ts;
+ if (ts != MP_NOPTS_VALUE && (ts > queue->last_ts || ts + 10 < queue->last_ts))
+ queue->last_ts = ts;
if (ds->base_ts == MP_NOPTS_VALUE)
- ds->base_ts = ds->last_ts;
+ ds->base_ts = queue->last_ts;
MP_DBG(in, "append packet to %s: size=%d pts=%f dts=%f pos=%"PRIi64" "
"[num=%zd size=%zd]\n", stream_type_name(stream->type),
dp->len, dp->pts, dp->dts, dp->pos, ds->fw_packs, ds->fw_bytes);
+ adjust_seek_range_on_packet(ds, dp);
+
// Wake up if this was the first packet after start/possible underrun.
if (ds->in->wakeup_cb && ds->reader_head && !ds->reader_head->next)
ds->in->wakeup_cb(ds->in->wakeup_cb_ctx);
@@ -847,20 +1177,23 @@ static bool read_packet(struct demux_internal *in)
in->eof = false;
in->idle = true;
+ if (!in->reading)
+ return false;
+
// Check if we need to read a new packet. We do this if all queues are below
// the minimum, or if a stream explicitly needs new packets. Also includes
// safe-guards against packet queue overflow.
- bool active = false, read_more = false, prefetch_more = false;
+ bool read_more = false, prefetch_more = false;
for (int n = 0; n < in->num_streams; n++) {
struct demux_stream *ds = in->streams[n]->ds;
- active |= ds->active;
- read_more |= (ds->active && !ds->reader_head) || ds->refreshing;
- if (ds->active && ds->last_ts != MP_NOPTS_VALUE && in->min_secs > 0 &&
- ds->base_ts != MP_NOPTS_VALUE && ds->last_ts >= ds->base_ts)
- prefetch_more |= ds->last_ts - ds->base_ts < in->min_secs;
- }
- MP_DBG(in, "bytes=%zd, active=%d, read_more=%d prefetch_more=%d\n",
- in->fw_bytes, active, read_more, prefetch_more);
+ read_more |= (ds->eager && !ds->reader_head) || ds->refreshing;
+ if (ds->eager && ds->queue->last_ts != MP_NOPTS_VALUE &&
+ in->min_secs > 0 && ds->base_ts != MP_NOPTS_VALUE &&
+ ds->queue->last_ts >= ds->base_ts)
+ prefetch_more |= ds->queue->last_ts - ds->base_ts < in->min_secs;
+ }
+ MP_DBG(in, "bytes=%zd, read_more=%d prefetch_more=%d\n",
+ in->fw_bytes, read_more, prefetch_more);
if (in->fw_bytes >= in->max_bytes) {
if (!read_more)
return false;
@@ -870,9 +1203,10 @@ static bool read_packet(struct demux_internal *in)
for (int n = 0; n < in->num_streams; n++) {
struct demux_stream *ds = in->streams[n]->ds;
if (ds->selected) {
- MP_WARN(in, " %s/%d: %zd packets, %zd bytes\n",
+ MP_WARN(in, " %s/%d: %zd packets, %zd bytes%s\n",
stream_type_name(ds->type), n,
- ds->fw_packs, ds->fw_bytes);
+ ds->fw_packs, ds->fw_bytes,
+ ds->eager ? "" : " (lazy)");
}
}
}
@@ -938,19 +1272,24 @@ static bool read_packet(struct demux_internal *in)
static void prune_old_packets(struct demux_internal *in)
{
+ assert(in->current_range == in->ranges[in->num_ranges - 1]);
+
// It's not clear what the ideal way to prune old packets is. For now, we
// prune the oldest packet runs, as long as the total cache amount is too
// big.
size_t max_bytes = in->seekable_cache ? in->max_bytes_bw : 0;
while (in->total_bytes - in->fw_bytes > max_bytes) {
+ // (Start from least recently used range.)
+ struct demux_cached_range *range = in->ranges[0];
double earliest_ts = MP_NOPTS_VALUE;
struct demux_stream *earliest_stream = NULL;
- for (int n = 0; n < in->num_streams; n++) {
- struct demux_stream *ds = in->streams[n]->ds;
+ for (int n = 0; n < range->num_streams; n++) {
+ struct demux_queue *queue = range->streams[n];
+ struct demux_stream *ds = queue->ds;
- if (ds->queue->head && ds->queue->head != ds->reader_head) {
- struct demux_packet *dp = ds->queue->head;
+ if (queue->head && queue->head != ds->reader_head) {
+ struct demux_packet *dp = queue->head;
double ts = dp->kf_seek_pts;
// Note: in obscure cases, packets might have no timestamps set,
// in which case we still need to prune _something_.
@@ -967,6 +1306,7 @@ static void prune_old_packets(struct demux_internal *in)
assert(earliest_stream); // incorrect accounting of buffered sizes?
struct demux_stream *ds = earliest_stream;
+ struct demux_queue *queue = range->streams[ds->index];
// Prune all packets until the next keyframe or reader_head. Keeping
// those packets would not help with seeking at all, so we strictly
@@ -975,46 +1315,35 @@ static void prune_old_packets(struct demux_internal *in)
// which in the worst case cou