summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--demux/demux.c297
1 files changed, 214 insertions, 83 deletions
diff --git a/demux/demux.c b/demux/demux.c
index 155789559d..e38f704cfb 100644
--- a/demux/demux.c
+++ b/demux/demux.c
@@ -175,6 +175,17 @@ struct demux_internal {
void (*run_fn)(void *); // if non-NULL, function queued to be run on
void *run_fn_arg; // the thread as run_fn(run_fn_arg)
+ // (sorted by least recent use: index 0 is least recently used)
+ struct demux_cached_range **ranges;
+ int num_ranges;
+
+ size_t total_bytes; // total sum of packet data buffered
+ size_t fw_bytes; // sum of forward packet data in current_range
+
+ // Range from which decoder is reading, and to which demuxer is appending.
+ // This is never NULL.
+ struct demux_cached_range *current_range;
+
// Cached state.
bool force_cache_update;
struct mp_tags *stream_metadata;
@@ -184,10 +195,33 @@ struct demux_internal {
char *stream_base_filename;
};
+// A continuous range of cached packets for all enabled streams.
+// (One demux_queue for each known stream.)
+struct demux_cached_range {
+ // streams[] is indexed by demux_stream->index
+ struct demux_queue **streams;
+ int num_streams;
+
+ // Computed from the stream queue's values.
+ double seek_start, seek_end;
+};
+
+// A continuous list of cached packets for a single stream/range. There is one
+// for each stream and range.
+struct demux_queue {
+ struct demux_stream *ds;
+
+ struct demux_packet *head;
+ struct demux_packet *tail;
+
+ double seek_start, seek_end;
+};
+
struct demux_stream {
struct demux_internal *in;
- struct sh_stream *sh;
- enum stream_type type;
+ struct sh_stream *sh; // ds->sh->ds == ds
+ enum stream_type type; // equals to sh->type
+ int index; // equals to sh->index
// --- all fields are protected by in->lock
// demuxer state
@@ -195,33 +229,33 @@ struct demux_stream {
bool active; // try to keep at least 1 packet queued
// if false, this stream is disabled, or passively
// read (like subtitles)
- bool eof; // end of demuxed stream? (true if all buffer empty)
+ bool eof; // end of demuxed stream? (true if all buffers empty)
bool need_refresh; // enabled mid-stream
bool refreshing;
bool correct_dts; // packet DTS is strictly monotonically increasing
bool correct_pos; // packet pos is strictly monotonically increasing
- size_t fw_packs; // number of packets in buffer (forward)
- size_t fw_bytes; // total bytes of packets in buffer (forward)
- size_t bw_bytes; // same as fw_bytes, but for back buffer
- int64_t last_pos;
- double last_dts;
+ int64_t last_pos; // for determining correct_pos
+ double last_dts; // for determining correct_dts
+ bool global_correct_dts;// all observed so far
+ bool global_correct_pos;
double last_ts; // timestamp of the last packet added to queue
- double back_pts; // smallest timestamp on the start of the back buffer
- double end_pts; // highest seekable timestamp
- struct demux_packet *queue_head; // start of the full queue
- struct demux_packet *queue_tail; // end of the full queue
struct demux_packet *next_prune_target; // cached value for faster pruning
// for incrementally determining seek PTS range
bool keyframe_seen;
double keyframe_pts, keyframe_end_pts;
struct demux_packet *keyframe_latest;
+ // current queue - used both for reading and demuxing (this is never NULL)
+ struct demux_queue *queue;
+
// reader (decoder) state (bitrate calculations are part of it because we
// want to return the bitrate closest to the "current position")
double base_ts; // timestamp of the last packet returned to decoder
double last_br_ts; // timestamp of last packet bitrate was calculated
size_t last_br_bytes; // summed packet sizes since last bitrate calculation
double bitrate;
+ size_t fw_packs; // number of packets in buffer (forward)
+ size_t fw_bytes; // total bytes of packets in buffer (forward)
struct demux_packet *reader_head; // points at current decoder position
bool skip_to_keyframe;
bool attached_picture_added;
@@ -245,33 +279,112 @@ static void update_cache(struct demux_internal *in);
static int cached_demux_control(struct demux_internal *in, int cmd, void *arg);
static void clear_demux_state(struct demux_internal *in);
+#if 0
+// very expensive check for redundant cached queue state
+static void check_queue_consistency(struct demux_internal *in)
+{
+ size_t total_bytes = 0;
+ size_t total_fw_bytes = 0;
+
+ for (int n = 0; n < in->num_ranges; n++) {
+ struct demux_cached_range *range = in->ranges[n];
+
+ assert(range->num_streams == in->num_streams);
+
+ for (int i = 0; i < range->num_streams; i++) {
+ struct demux_queue *queue = range->streams[i];
+
+ size_t fw_bytes = 0;
+ size_t fw_packs = 0;
+ bool is_forward = false;
+ for (struct demux_packet *dp = queue->head; dp; dp = dp->next) {
+ is_forward |= dp == queue->ds->reader_head;
+
+ size_t bytes = demux_packet_estimate_total_size(dp);
+ total_bytes += bytes;
+ if (is_forward) {
+ fw_bytes += bytes;
+ fw_packs += 1;
+ assert(range == in->current_range);
+ assert(queue->ds->queue == queue);
+ }
+
+ if (!dp->next)
+ assert(queue->tail == dp);
+ }
+ if (!queue->head)
+ assert(!queue->tail);
+
+ total_fw_bytes += fw_bytes;
+
+ if (range == in->current_range) {
+ assert(queue->ds->fw_bytes == fw_bytes);
+ assert(queue->ds->fw_packs == fw_packs);
+ } else {
+ assert(fw_bytes == 0 && fw_packs == 0);
+ }
+ }
+ }
+
+ assert(in->total_bytes == total_bytes);
+ assert(in->fw_bytes == total_fw_bytes);
+}
+#endif
+
+static void update_seek_ranges(struct demux_stream *ds)
+{
+ struct demux_cached_range *range = ds->in->current_range;
+
+ range->seek_start = range->seek_end = MP_NOPTS_VALUE;
+
+ for (int n = 0; n < range->num_streams; n++) {
+ struct demux_queue *queue = range->streams[n];
+ if (queue->ds->active) {
+ range->seek_start = MP_PTS_MAX(range->seek_start, queue->seek_start);
+ range->seek_end = MP_PTS_MIN(range->seek_end, queue->seek_end);
+
+ if (range->seek_start == MP_NOPTS_VALUE ||
+ range->seek_end == MP_NOPTS_VALUE)
+ {
+ range->seek_start = range->seek_end = MP_NOPTS_VALUE;
+ break;
+ }
+ }
+ }
+
+ if (range->seek_start >= range->seek_end)
+ range->seek_start = range->seek_end = MP_NOPTS_VALUE;
+}
+
static void ds_clear_reader_state(struct demux_stream *ds)
{
+ ds->in->fw_bytes -= ds->fw_bytes;
+
ds->reader_head = NULL;
ds->base_ts = ds->last_br_ts = MP_NOPTS_VALUE;
ds->last_br_bytes = 0;
ds->bitrate = -1;
ds->skip_to_keyframe = false;
ds->attached_picture_added = false;
+ ds->fw_bytes = 0;
+ ds->fw_packs = 0;
}
static void ds_clear_demux_state(struct demux_stream *ds)
{
ds_clear_reader_state(ds);
- demux_packet_t *dp = ds->queue_head;
+ demux_packet_t *dp = ds->queue->head;
while (dp) {
demux_packet_t *dn = dp->next;
+ ds->in->total_bytes -= demux_packet_estimate_total_size(dp);
free_demux_packet(dp);
dp = dn;
}
- ds->queue_head = ds->queue_tail = NULL;
+ ds->queue->head = ds->queue->tail = NULL;
ds->next_prune_target = NULL;
ds->keyframe_latest = NULL;
- ds->fw_packs = 0;
- ds->fw_bytes = 0;
- ds->bw_bytes = 0;
ds->eof = false;
ds->active = false;
ds->refreshing = false;
@@ -279,9 +392,11 @@ static void ds_clear_demux_state(struct demux_stream *ds)
ds->correct_dts = ds->correct_pos = true;
ds->last_pos = -1;
ds->last_ts = ds->last_dts = MP_NOPTS_VALUE;
- ds->back_pts = ds->end_pts = MP_NOPTS_VALUE;
+ ds->queue->seek_start = ds->queue->seek_end = MP_NOPTS_VALUE;
ds->keyframe_seen = false;
ds->keyframe_pts = ds->keyframe_end_pts = MP_NOPTS_VALUE;
+
+ update_seek_ranges(ds);
}
void demux_set_ts_offset(struct demuxer *demuxer, double offset)
@@ -318,18 +433,22 @@ static void demux_add_sh_stream_locked(struct demux_internal *in,
{
assert(!sh->ds); // must not be added yet
+ sh->index = in->num_streams;
+
sh->ds = talloc(sh, struct demux_stream);
*sh->ds = (struct demux_stream) {
.in = in,
.sh = sh,
.type = sh->type,
+ .index = sh->index,
.selected = in->autoselect,
+ .global_correct_dts = true,
+ .global_correct_pos = true,
};
if (!sh->codec->codec)
sh->codec->codec = "";
- sh->index = in->num_streams;
if (sh->ff_index < 0)
sh->ff_index = sh->index;
if (sh->demuxer_id < 0) {
@@ -341,6 +460,21 @@ static void demux_add_sh_stream_locked(struct demux_internal *in,
}
MP_TARRAY_APPEND(in, in->streams, in->num_streams, sh);
+ assert(in->streams[sh->index] == sh);
+
+ for (int n = 0; n < in->num_ranges; n++) {
+ struct demux_cached_range *range = in->ranges[n];
+ struct demux_queue *queue = talloc_ptrtype(range, queue);
+ *queue = (struct demux_queue){
+ .ds = sh->ds,
+ .seek_start = MP_NOPTS_VALUE,
+ .seek_end = MP_NOPTS_VALUE,
+ };
+ MP_TARRAY_APPEND(range, range->streams, range->num_streams, queue);
+ assert(range->streams[sh->ds->index] == queue);
+ }
+
+ sh->ds->queue = in->current_range->streams[sh->ds->index];
in->events |= DEMUX_EVENT_STREAMS;
if (in->wakeup_cb)
@@ -589,10 +723,11 @@ static void adjust_seek_range_on_packet(struct demux_stream *ds,
if (!dp || dp->keyframe) {
if (ds->keyframe_latest) {
ds->keyframe_latest->kf_seek_pts = ds->keyframe_pts;
- if (ds->back_pts == MP_NOPTS_VALUE)
- ds->back_pts = ds->keyframe_pts;
+ if (ds->queue->seek_start == MP_NOPTS_VALUE)
+ ds->queue->seek_start = ds->keyframe_pts;
if (ds->keyframe_end_pts != MP_NOPTS_VALUE)
- ds->end_pts = ds->keyframe_end_pts;
+ ds->queue->seek_end = ds->keyframe_end_pts;
+ update_seek_ranges(ds);
}
ds->keyframe_latest = dp;
ds->keyframe_pts = ds->keyframe_end_pts = MP_NOPTS_VALUE;
@@ -644,6 +779,8 @@ void demux_add_packet(struct sh_stream *stream, demux_packet_t *dp)
ds->correct_dts &= dp->dts != MP_NOPTS_VALUE && dp->dts > ds->last_dts;
ds->last_pos = dp->pos;
ds->last_dts = dp->dts;
+ ds->global_correct_pos &= ds->correct_pos;
+ ds->global_correct_dts &= ds->correct_dts;
dp->stream = stream->index;
dp->next = NULL;
@@ -656,20 +793,20 @@ void demux_add_packet(struct sh_stream *stream, demux_packet_t *dp)
}
size_t bytes = demux_packet_estimate_total_size(dp);
+ ds->in->total_bytes += bytes;
if (ds->reader_head) {
ds->fw_packs++;
ds->fw_bytes += bytes;
- } else {
- ds->bw_bytes += bytes;
+ in->fw_bytes += bytes;
}
- if (ds->queue_tail) {
+ if (ds->queue->tail) {
// next packet in stream
- ds->queue_tail->next = dp;
- ds->queue_tail = dp;
+ ds->queue->tail->next = dp;
+ ds->queue->tail = dp;
} else {
// first packet in stream
- ds->queue_head = ds->queue_tail = dp;
+ ds->queue->head = ds->queue->tail = dp;
}
adjust_seek_range_on_packet(ds, dp);
@@ -714,19 +851,17 @@ static bool read_packet(struct demux_internal *in)
// the minimum, or if a stream explicitly needs new packets. Also includes
// safe-guards against packet queue overflow.
bool active = false, read_more = false, prefetch_more = false;
- size_t bytes = 0;
for (int n = 0; n < in->num_streams; n++) {
struct demux_stream *ds = in->streams[n]->ds;
active |= ds->active;
read_more |= (ds->active && !ds->reader_head) || ds->refreshing;
- bytes += ds->fw_bytes;
if (ds->active && ds->last_ts != MP_NOPTS_VALUE && in->min_secs > 0 &&
ds->base_ts != MP_NOPTS_VALUE && ds->last_ts >= ds->base_ts)
prefetch_more |= ds->last_ts - ds->base_ts < in->min_secs;
}
MP_DBG(in, "bytes=%zd, active=%d, read_more=%d prefetch_more=%d\n",
- bytes, active, read_more, prefetch_more);
- if (bytes >= in->max_bytes) {
+ in->fw_bytes, active, read_more, prefetch_more);
+ if (in->fw_bytes >= in->max_bytes) {
if (!read_more)
return false;
if (!in->warned_queue_overflow) {
@@ -737,7 +872,7 @@ static bool read_packet(struct demux_internal *in)
if (ds->selected) {
MP_WARN(in, " %s/%d: %zd packets, %zd bytes\n",
stream_type_name(ds->type), n,
- ds->fw_packs, ds->fw_bytes);
+ ds->fw_packs, ds->fw_bytes);
}
}
}
@@ -803,9 +938,7 @@ static bool read_packet(struct demux_internal *in)
static void prune_old_packets(struct demux_internal *in)
{
- size_t buffered = 0;
- for (int n = 0; n < in->num_streams; n++)
- buffered += in->streams[n]->ds->bw_bytes;
+ size_t buffered = in->total_bytes - in->fw_bytes;
MP_TRACE(in, "total backbuffer = %zd\n", buffered);
@@ -820,8 +953,8 @@ static void prune_old_packets(struct demux_internal *in)
for (int n = 0; n < in->num_streams; n++) {
struct demux_stream *ds = in->streams[n]->ds;
- if (ds->queue_head && ds->queue_head != ds->reader_head) {
- struct demux_packet *dp = ds->queue_head;
+ if (ds->queue->head && ds->queue->head != ds->reader_head) {
+ struct demux_packet *dp = ds->queue->head;
double ts = dp->kf_seek_pts;
// Note: in obscure cases, packets might have no timestamps set,
// in which case we still need to prune _something_.
@@ -847,35 +980,37 @@ static void prune_old_packets(struct demux_internal *in)
// that many keyframe ranges without keyframes exist (audio packets)
// makes this much harder.
if (in->seekable_cache && !ds->next_prune_target) {
- // (Has to be _after_ queue_head to drop at least 1 packet.)
- struct demux_packet *prev = ds->queue_head;
- ds->back_pts = MP_NOPTS_VALUE;
- ds->next_prune_target = ds->queue_tail; // (prune all if none found)
+ // (Has to be _after_ queue->head to drop at least 1 packet.)
+ struct demux_packet *prev = ds->queue->head;
+ ds->queue->seek_start = MP_NOPTS_VALUE;
+ ds->next_prune_target = ds->queue->tail; // (prune all if none found)
while (prev->next) {
struct demux_packet *dp = prev->next;
// Note that the next back_pts might be above the lowest buffered
// packet, but it will still be only viable lowest seek target.
if (dp->keyframe && dp->kf_seek_pts != MP_NOPTS_VALUE) {
- ds->back_pts = dp->kf_seek_pts;
+ ds->queue->seek_start = dp->kf_seek_pts;
ds->next_prune_target = prev;
break;
}
prev = prev->next;
}
+
+ update_seek_ranges(ds);
}
bool done = false;
- while (!done && ds->queue_head && ds->queue_head != ds->reader_head) {
- struct demux_packet *dp = ds->queue_head;
+ while (!done && ds->queue->head && ds->queue->head != ds->reader_head) {
+ struct demux_packet *dp = ds->queue->head;
size_t bytes = demux_packet_estimate_total_size(dp);
buffered -= bytes;
MP_TRACE(in, "dropping backbuffer packet size %zd from stream %d\n",
bytes, ds->sh->index);
- ds->queue_head = dp->next;
- if (!ds->queue_head)
- ds->queue_tail = NULL;
+ ds->queue->head = dp->next;
+ if (!ds->queue->head)
+ ds->queue->tail = NULL;
if (ds->next_prune_target == dp) {
ds->next_prune_target = NULL;
done = true;
@@ -883,7 +1018,7 @@ static void prune_old_packets(struct demux_internal *in)
if (ds->keyframe_latest == dp)
ds->keyframe_latest = NULL;
talloc_free(dp);
- ds->bw_bytes -= bytes;
+ in->total_bytes -= bytes;
}
}
}
@@ -986,7 +1121,7 @@ static struct demux_packet *dequeue_packet(struct demux_stream *ds)
ds->fw_packs--;
size_t bytes = demux_packet_estimate_total_size(pkt);
ds->fw_bytes -= bytes;
- ds->bw_bytes += bytes;
+ ds->in->fw_bytes -= bytes;
// The returned packet is mutated etc. and will be owned by the user.
pkt = demux_copy_packet(pkt);
@@ -1495,6 +1630,13 @@ static struct demuxer *open_given_type(struct mpv_global *global,
pthread_mutex_init(&in->lock, NULL);
pthread_cond_init(&in->wakeup, NULL);
+ in->current_range = talloc_ptrtype(in, in->current_range),
+ *in->current_range = (struct demux_cached_range){
+ .seek_start = MP_NOPTS_VALUE,
+ .seek_end = MP_NOPTS_VALUE,
+ };
+ MP_TARRAY_APPEND(in, in->ranges, in->num_ranges, in->current_range);
+
*in->d_thread = *demuxer;
*in->d_buffer = *demuxer;
@@ -1650,6 +1792,7 @@ static void clear_reader_state(struct demux_internal *in)
ds_clear_reader_state(in->streams[n]->ds);
in->warned_queue_overflow = false;
in->d_user->filepos = -1; // implicitly synchronized
+ assert(in->fw_bytes == 0);
}
static void clear_demux_state(struct demux_internal *in)
@@ -1660,6 +1803,7 @@ static void clear_demux_state(struct demux_internal *in)
in->eof = false;
in->last_eof = false;
in->idle = true;
+ assert(in->total_bytes == 0);
}
// clear the packet queues
@@ -1674,20 +1818,10 @@ static void recompute_buffers(struct demux_stream *ds)
{
ds->fw_packs = 0;
ds->fw_bytes = 0;
- ds->bw_bytes = 0;
- bool in_backbuffer = true;
- for (struct demux_packet *dp = ds->queue_head; dp; dp = dp->next) {
- if (dp == ds->reader_head)
- in_backbuffer = false;
-
- size_t bytes = demux_packet_estimate_total_size(dp);
- if (in_backbuffer) {
- ds->bw_bytes += bytes;
- } else {
- ds->fw_packs++;
- ds->fw_bytes += bytes;
- }
+ for (struct demux_packet *dp = ds->reader_head; dp; dp = dp->next) {
+ ds->fw_bytes += demux_packet_estimate_total_size(dp);
+ ds->fw_packs++;
}
}
@@ -1696,7 +1830,7 @@ static struct demux_packet *find_seek_target(struct demux_stream *ds,
{
struct demux_packet *target = NULL;
double target_diff = MP_NOPTS_VALUE;
- for (struct demux_packet *dp = ds->queue_head; dp; dp = dp->next) {
+ for (struct demux_packet *dp = ds->queue->head; dp; dp = dp->next) {
double range_pts = dp->kf_seek_pts;
if (!dp->keyframe || range_pts == MP_NOPTS_VALUE)
continue;
@@ -1783,10 +1917,12 @@ static bool try_seek_cache(struct demux_internal *in, double pts, int flags)
struct demux_packet *target = find_seek_target(ds, pts, flags);
ds->reader_head = target;
ds->skip_to_keyframe = !target;
- recompute_buffers(ds);
if (ds->reader_head)
ds->base_ts = PTS_OR_DEF(ds->reader_head->pts, ds->reader_head->dts);
+ recompute_buffers(ds);
+ in->fw_bytes += ds->fw_bytes;
+
MP_VERBOSE(in, "seeking stream %d (%s) to ",
n, stream_type_name(ds->type));
@@ -2033,41 +2169,36 @@ static int cached_demux_control(struct demux_internal *in, int cmd, void *arg)
.ts_duration = -1,
};
bool any_packets = false;
- bool seek_ok = in->seekable_cache && !in->seeking;
- double seek_min = MP_NOPTS_VALUE;
- double seek_max = MP_NOPTS_VALUE;
for (int n = 0; n < in->num_streams; n++) {
struct demux_stream *ds = in->streams[n]->ds;
- if (ds->active && !(!ds->queue_head && ds->eof) &&
+ if (ds->active && !(!ds->queue->head && ds->eof) &&
!ds->ignore_eof && ds->type != STREAM_SUB)
{
r->underrun |= !ds->reader_head && !ds->eof;
r->ts_reader = MP_PTS_MAX(r->ts_reader, ds->base_ts);
r->ts_end = MP_PTS_MAX(r->ts_end, ds->last_ts);
- seek_min = MP_PTS_MAX(seek_min, ds->back_pts);
- seek_max = MP_PTS_MIN(seek_max, ds->end_pts);
- if (ds->back_pts == MP_NOPTS_VALUE ||
- ds->end_pts == MP_NOPTS_VALUE)
- seek_ok = false;
- any_packets |= !!ds->queue_head;
+ any_packets |= !!ds->queue->head;
}
}
r->idle = (in->idle && !r->underrun) || r->eof;
r->underrun &= !r->idle;
- seek_min = MP_ADD_PTS(seek_min, in->ts_offset);
- seek_max = MP_ADD_PTS(seek_max, in->ts_offset);
r->ts_reader = MP_ADD_PTS(r->ts_reader, in->ts_offset);
r->ts_end = MP_ADD_PTS(r->ts_end, in->ts_offset);
if (r->ts_reader != MP_NOPTS_VALUE && r->ts_reader <= r->ts_end)
r->ts_duration = r->ts_end - r->ts_reader;
if (in->seeking || !any_packets)
r->ts_duration = 0;
- if (seek_ok && seek_min != MP_NOPTS_VALUE && seek_max > seek_min) {
- r->num_seek_ranges = 1;
- r->seek_ranges[0] = (struct demux_seek_range){
- .start = seek_min,
- .end = seek_max,
- };
+ if (!in->seeking) {
+ for (int n = 0; n < in->num_ranges; n++) {
+ struct demux_cached_range *range = in->ranges[n];
+ if (range->seek_start != MP_NOPTS_VALUE && n < MAX_SEEK_RANGES) {
+ r->seek_ranges[r->num_seek_ranges++] =
+ (struct demux_seek_range){
+ .start = MP_ADD_PTS(range->seek_start, in->ts_offset),
+ .end = MP_ADD_PTS(range->seek_end, in->ts_offset),
+ };
+ }
+ }
}
return CONTROL_OK;
}