summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorwm4 <wm4@nowhere>2017-11-04 23:39:04 +0100
committerwm4 <wm4@nowhere>2017-11-04 23:45:21 +0100
commit8aa1db3b175d29201f55648849b4f2ddc15f8ea1 (patch)
tree27bac642cb10759769c31cfe5a378d42b1dc2165
parent10d0963d851fad338d5d4457172fb20e76bc88aa (diff)
downloadmpv-8aa1db3b175d29201f55648849b4f2ddc15f8ea1.tar.bz2
mpv-8aa1db3b175d29201f55648849b4f2ddc15f8ea1.tar.xz
demux: refactoring in preparation for multiple seek range support
This adds a bunch of stuff (mostly unused or redundant) as preparation for supporting multiple seek ranges. Actual support is probably still far away. One change that messes deeper with the actual code is that we account for total buffered bytes instead of just the backwards bytes now. This way, prune_old_packets() doesn't have to iterate over all seek ranges to determine whether something needs pruning.
-rw-r--r--demux/demux.c297
1 files changed, 214 insertions, 83 deletions
diff --git a/demux/demux.c b/demux/demux.c
index 155789559d..e38f704cfb 100644
--- a/demux/demux.c
+++ b/demux/demux.c
@@ -175,6 +175,17 @@ struct demux_internal {
void (*run_fn)(void *); // if non-NULL, function queued to be run on
void *run_fn_arg; // the thread as run_fn(run_fn_arg)
+ // (sorted by least recent use: index 0 is least recently used)
+ struct demux_cached_range **ranges;
+ int num_ranges;
+
+ size_t total_bytes; // total sum of packet data buffered
+ size_t fw_bytes; // sum of forward packet data in current_range
+
+ // Range from which decoder is reading, and to which demuxer is appending.
+ // This is never NULL.
+ struct demux_cached_range *current_range;
+
// Cached state.
bool force_cache_update;
struct mp_tags *stream_metadata;
@@ -184,10 +195,33 @@ struct demux_internal {
char *stream_base_filename;
};
+// A continuous range of cached packets for all enabled streams.
+// (One demux_queue for each known stream.)
+struct demux_cached_range {
+ // streams[] is indexed by demux_stream->index
+ struct demux_queue **streams;
+ int num_streams;
+
+ // Computed from the stream queue's values.
+ double seek_start, seek_end;
+};
+
+// A continuous list of cached packets for a single stream/range. There is one
+// for each stream and range.
+struct demux_queue {
+ struct demux_stream *ds;
+
+ struct demux_packet *head;
+ struct demux_packet *tail;
+
+ double seek_start, seek_end;
+};
+
struct demux_stream {
struct demux_internal *in;
- struct sh_stream *sh;
- enum stream_type type;
+ struct sh_stream *sh; // ds->sh->ds == ds
+ enum stream_type type; // equals to sh->type
+ int index; // equals to sh->index
// --- all fields are protected by in->lock
// demuxer state
@@ -195,33 +229,33 @@ struct demux_stream {
bool active; // try to keep at least 1 packet queued
// if false, this stream is disabled, or passively
// read (like subtitles)
- bool eof; // end of demuxed stream? (true if all buffer empty)
+ bool eof; // end of demuxed stream? (true if all buffers empty)
bool need_refresh; // enabled mid-stream
bool refreshing;
bool correct_dts; // packet DTS is strictly monotonically increasing
bool correct_pos; // packet pos is strictly monotonically increasing
- size_t fw_packs; // number of packets in buffer (forward)
- size_t fw_bytes; // total bytes of packets in buffer (forward)
- size_t bw_bytes; // same as fw_bytes, but for back buffer
- int64_t last_pos;
- double last_dts;
+ int64_t last_pos; // for determining correct_pos
+ double last_dts; // for determining correct_dts
+ bool global_correct_dts;// all observed so far
+ bool global_correct_pos;
double last_ts; // timestamp of the last packet added to queue
- double back_pts; // smallest timestamp on the start of the back buffer
- double end_pts; // highest seekable timestamp
- struct demux_packet *queue_head; // start of the full queue
- struct demux_packet *queue_tail; // end of the full queue
struct demux_packet *next_prune_target; // cached value for faster pruning
// for incrementally determining seek PTS range
bool keyframe_seen;
double keyframe_pts, keyframe_end_pts;
struct demux_packet *keyframe_latest;
+ // current queue - used both for reading and demuxing (this is never NULL)
+ struct demux_queue *queue;
+
// reader (decoder) state (bitrate calculations are part of it because we
// want to return the bitrate closest to the "current position")
double base_ts; // timestamp of the last packet returned to decoder
double last_br_ts; // timestamp of last packet bitrate was calculated
size_t last_br_bytes; // summed packet sizes since last bitrate calculation
double bitrate;
+ size_t fw_packs; // number of packets in buffer (forward)
+ size_t fw_bytes; // total bytes of packets in buffer (forward)
struct demux_packet *reader_head; // points at current decoder position
bool skip_to_keyframe;
bool attached_picture_added;
@@ -245,33 +279,112 @@ static void update_cache(struct demux_internal *in);
static int cached_demux_control(struct demux_internal *in, int cmd, void *arg);
static void clear_demux_state(struct demux_internal *in);
+#if 0
+// very expensive check for redundant cached queue state
+static void check_queue_consistency(struct demux_internal *in)
+{
+ size_t total_bytes = 0;
+ size_t total_fw_bytes = 0;
+
+ for (int n = 0; n < in->num_ranges; n++) {
+ struct demux_cached_range *range = in->ranges[n];
+
+ assert(range->num_streams == in->num_streams);
+
+ for (int i = 0; i < range->num_streams; i++) {
+ struct demux_queue *queue = range->streams[i];
+
+ size_t fw_bytes = 0;
+ size_t fw_packs = 0;
+ bool is_forward = false;
+ for (struct demux_packet *dp = queue->head; dp; dp = dp->next) {
+ is_forward |= dp == queue->ds->reader_head;
+
+ size_t bytes = demux_packet_estimate_total_size(dp);
+ total_bytes += bytes;
+ if (is_forward) {
+ fw_bytes += bytes;
+ fw_packs += 1;
+ assert(range == in->current_range);
+ assert(queue->ds->queue == queue);
+ }
+
+ if (!dp->next)
+ assert(queue->tail == dp);
+ }
+ if (!queue->head)
+ assert(!queue->tail);
+
+ total_fw_bytes += fw_bytes;
+
+ if (range == in->current_range) {
+ assert(queue->ds->fw_bytes == fw_bytes);
+ assert(queue->ds->fw_packs == fw_packs);
+ } else {
+ assert(fw_bytes == 0 && fw_packs == 0);
+ }
+ }
+ }
+
+ assert(in->total_bytes == total_bytes);
+ assert(in->fw_bytes == total_fw_bytes);
+}
+#endif
+
+static void update_seek_ranges(struct demux_stream *ds)
+{
+ struct demux_cached_range *range = ds->in->current_range;
+
+ range->seek_start = range->seek_end = MP_NOPTS_VALUE;
+
+ for (int n = 0; n < range->num_streams; n++) {
+ struct demux_queue *queue = range->streams[n];
+ if (queue->ds->active) {
+ range->seek_start = MP_PTS_MAX(range->seek_start, queue->seek_start);
+ range->seek_end = MP_PTS_MIN(range->seek_end, queue->seek_end);
+
+ if (range->seek_start == MP_NOPTS_VALUE ||
+ range->seek_end == MP_NOPTS_VALUE)
+ {
+ range->seek_start = range->seek_end = MP_NOPTS_VALUE;
+ break;
+ }
+ }
+ }
+
+ if (range->seek_start >= range->seek_end)
+ range->seek_start = range->seek_end = MP_NOPTS_VALUE;
+}
+
static void ds_clear_reader_state(struct demux_stream *ds)
{
+ ds->in->fw_bytes -= ds->fw_bytes;
+
ds->reader_head = NULL;
ds->base_ts = ds->last_br_ts = MP_NOPTS_VALUE;
ds->last_br_bytes = 0;
ds->bitrate = -1;
ds->skip_to_keyframe = false;
ds->attached_picture_added = false;
+ ds->fw_bytes = 0;
+ ds->fw_packs = 0;
}
static void ds_clear_demux_state(struct demux_stream *ds)
{
ds_clear_reader_state(ds);
- demux_packet_t *dp = ds->queue_head;
+ demux_packet_t *dp = ds->queue->head;
while (dp) {
demux_packet_t *dn = dp->next;
+ ds->in->total_bytes -= demux_packet_estimate_total_size(dp);
free_demux_packet(dp);
dp = dn;
}
- ds->queue_head = ds->queue_tail = NULL;
+ ds->queue->head = ds->queue->tail = NULL;
ds->next_prune_target = NULL;
ds->keyframe_latest = NULL;
- ds->fw_packs = 0;
- ds->fw_bytes = 0;
- ds->bw_bytes = 0;
ds->eof = false;
ds->active = false;
ds->refreshing = false;
@@ -279,9 +392,11 @@ static void ds_clear_demux_state(struct demux_stream *ds)
ds->correct_dts = ds->correct_pos = true;
ds->last_pos = -1;
ds->last_ts = ds->last_dts = MP_NOPTS_VALUE;
- ds->back_pts = ds->end_pts = MP_NOPTS_VALUE;
+ ds->queue->seek_start = ds->queue->seek_end = MP_NOPTS_VALUE;
ds->keyframe_seen = false;
ds->keyframe_pts = ds->keyframe_end_pts = MP_NOPTS_VALUE;
+
+ update_seek_ranges(ds);
}
void demux_set_ts_offset(struct demuxer *demuxer, double offset)
@@ -318,18 +433,22 @@ static void demux_add_sh_stream_locked(struct demux_internal *in,
{
assert(!sh->ds); // must not be added yet
+ sh->index = in->num_streams;
+
sh->ds = talloc(sh, struct demux_stream);
*sh->ds = (struct demux_stream) {
.in = in,
.sh = sh,
.type = sh->type,
+ .index = sh->index,
.selected = in->autoselect,
+ .global_correct_dts = true,
+ .global_correct_pos = true,
};
if (!sh->codec->codec)
sh->codec->codec = "";
- sh->index = in->num_streams;
if (sh->ff_index < 0)
sh->ff_index = sh->index;
if (sh->demuxer_id < 0) {
@@ -341,6 +460,21 @@ static void demux_add_sh_stream_locked(struct demux_internal *in,
}
MP_TARRAY_APPEND(in, in->streams, in->num_streams, sh);
+ assert(in->streams[sh->index] == sh);
+
+ for (int n = 0; n < in->num_ranges; n++) {
+ struct demux_cached_range *range = in->ranges[n];
+ struct demux_queue *queue = talloc_ptrtype(range, queue);
+ *queue = (struct demux_queue){
+ .ds = sh->ds,
+ .seek_start = MP_NOPTS_VALUE,
+ .seek_end = MP_NOPTS_VALUE,
+ };
+ MP_TARRAY_APPEND(range, range->streams, range->num_streams, queue);
+ assert(range->streams[sh->ds->index] == queue);
+ }
+
+ sh->ds->queue = in->current_range->streams[sh->ds->index];
in->events |= DEMUX_EVENT_STREAMS;
if (in->wakeup_cb)
@@ -589,10 +723,11 @@ static void adjust_seek_range_on_packet(struct demux_stream *ds,
if (!dp || dp->keyframe) {
if (ds->keyframe_latest) {
ds->keyframe_latest->kf_seek_pts = ds->keyframe_pts;
- if (ds->back_pts == MP_NOPTS_VALUE)
- ds->back_pts = ds->keyframe_pts;
+ if (ds->queue->seek_start == MP_NOPTS_VALUE)
+ ds->queue->seek_start = ds->keyframe_pts;
if (ds->keyframe_end_pts != MP_NOPTS_VALUE)
- ds->end_pts = ds->keyframe_end_pts;
+ ds->queue->seek_end = ds->keyframe_end_pts;
+ update_seek_ranges(ds);
}
ds->keyframe_latest = dp;
ds->keyframe_pts = ds->keyframe_end_pts = MP_NOPTS_VALUE;
@@ -644,6 +779,8 @@ void demux_add_packet(struct sh_stream *stream, demux_packet_t *dp)
ds->correct_dts &= dp->dts != MP_NOPTS_VALUE && dp->dts > ds->last_dts;
ds->last_pos = dp->pos;
ds->last_dts = dp->dts;
+ ds->global_correct_pos &= ds->correct_pos;
+ ds->global_correct_dts &= ds->correct_dts;
dp->stream = stream->index;
dp->next = NULL;
@@ -656,20 +793,20 @@ void demux_add_packet(struct sh_stream *stream, demux_packet_t *dp)
}
size_t bytes = demux_packet_estimate_total_size(dp);
+ ds->in->total_bytes += bytes;
if (ds->reader_head) {
ds->fw_packs++;
ds->fw_bytes += bytes;
- } else {
- ds->bw_bytes += bytes;
+ in->fw_bytes += bytes;
}
- if (ds->queue_tail) {
+ if (ds->queue->tail) {
// next packet in stream
- ds->queue_tail->next = dp;
- ds->queue_tail = dp;
+ ds->queue->tail->next = dp;
+ ds->queue->tail = dp;
} else {
// first packet in stream
- ds->queue_head = ds->queue_tail = dp;
+ ds->queue->head = ds->queue->tail = dp;
}
adjust_seek_range_on_packet(ds, dp);
@@ -714,19 +851,17 @@ static bool read_packet(struct demux_internal *in)
// the minimum, or if a stream explicitly needs new packets. Also includes
// safe-guards against packet queue overflow.
bool active = false, read_more = false, prefetch_more = false;
- size_t bytes = 0;
for (int n = 0; n < in->num_streams; n++) {
struct demux_stream *ds = in->streams[n]->ds;
active |= ds->active;
read_more |= (ds->active && !ds->reader_head) || ds->refreshing;
- bytes += ds->fw_bytes;
if (ds->active && ds->last_ts != MP_NOPTS_VALUE && in->min_secs > 0 &&
ds->base_ts != MP_NOPTS_VALUE && ds->last_ts >= ds->base_ts)
prefetch_more |= ds->last_ts - ds->base_ts < in->min_secs;
}
MP_DBG(in, "bytes=%zd, active=%d, read_more=%d prefetch_more=%d\n",
- bytes, active, read_more, prefetch_more);
- if (bytes >= in->max_bytes) {
+ in->fw_bytes, active, read_more, prefetch_more);
+ if (in->fw_bytes >= in->max_bytes) {
if (!read_more)
return false;
if (!in->warned_queue_overflow) {
@@ -737,7 +872,7 @@ static bool read_packet(struct demux_internal *in)
if (ds->selected) {
MP_WARN(in, " %s/%d: %zd packets, %zd bytes\n",
stream_type_name(ds->type), n,
- ds->fw_packs, ds->fw_bytes);
+ ds->fw_packs, ds->fw_bytes);
}
}
}
@@ -803,9 +938,7 @@ static bool read_packet(struct demux_internal *in)
static void prune_old_packets(struct demux_internal *in)
{
- size_t buffered = 0;
- for (int n = 0; n < in->num_streams; n++)
- buffered += in->streams[n]->ds->bw_bytes;
+ size_t buffered = in->total_bytes - in->fw_bytes;
MP_TRACE(in, "total backbuffer = %zd\n", buffered);
@@ -820,8 +953,8 @@ static void prune_old_packets(struct demux_internal *in)
for (int n = 0; n < in->num_streams; n++) {
struct demux_stream *ds = in->streams[n]->ds;
- if (ds->queue_head && ds->queue_head != ds->reader_head) {
- struct demux_packet *dp = ds->queue_head;
+ if (ds->queue->head && ds->queue->head != ds->reader_head) {
+ struct demux_packet *dp = ds->queue->head;
double ts = dp->kf_seek_pts;
// Note: in obscure cases, packets might have no timestamps set,
// in which case we still need to prune _something_.
@@ -847,35 +980,37 @@ static void prune_old_packets(struct demux_internal *in)
// that many keyframe ranges without keyframes exist (audio packets)
// makes this much harder.
if (in->seekable_cache && !ds->next_prune_target) {
- // (Has to be _after_ queue_head to drop at least 1 packet.)
- struct demux_packet *prev = ds->queue_head;
- ds->back_pts = MP_NOPTS_VALUE;
- ds->next_prune_target = ds->queue_tail; // (prune all if none found)
+ // (Has to be _after_ queue->head to drop at least 1 packet.)
+ struct demux_packet *prev = ds->queue->head;
+ ds->queue->seek_start = MP_NOPTS_VALUE;
+ ds->next_prune_target = ds->queue->tail; // (prune all if none found)
while (prev->next) {
struct demux_packet *dp = prev->next;
// Note that the next back_pts might be above the lowest buffered
// packet, but it will still be only viable lowest seek target.
if (dp->keyframe && dp->kf_seek_pts != MP_NOPTS_VALUE) {
- ds->back_pts = dp->kf_seek_pts;
+ ds->queue->seek_start = dp->kf_seek_pts;
ds->next_prune_target = prev;
break;
}
prev = prev->next;
}
+
+ update_seek_ranges(ds);
}
bool done = false;
- while (!done && ds->queue_head && ds->queue_head != ds->reader_head) {
- struct demux_packet *dp = ds->queue_head;
+ while (!done && ds->queue->head && ds->queue->head != ds->reader_head) {
+ struct demux_packet *dp = ds->queue->head;
size_t bytes = demux_packet_estimate_total_size(dp);
buffered -= bytes;
MP_TRACE(in, "dropping backbuffer packet size %zd from stream %d\n",
bytes, ds->sh->index);
- ds->queue_head = dp->next;
- if (!ds->queue_head)
- ds->queue_tail = NULL;
+ ds->queue->head = dp->next;
+ if (!ds->queue->head)
+ ds->queue->tail = NULL;
if (ds->next_prune_target == dp) {
ds->next_prune_target = NULL;
done = true;
@@ -883,7 +1018,7 @@ static void prune_old_packets(struct demux_internal *in)
if (ds->keyframe_latest == dp)
ds->keyframe_latest = NULL;
talloc_free(dp);
- ds->bw_bytes -= bytes;
+ in->total_bytes -= bytes;
}
}
}
@@ -986,7 +1121,7 @@ static struct demux_packet *dequeue_packet(struct demux_stream *ds)
ds->fw_packs--;
size_t bytes = demux_packet_estimate_total_size(pkt);
ds->fw_bytes -= bytes;
- ds->bw_bytes += bytes;
+ ds->in->fw_bytes -= bytes;
// The returned packet is mutated etc. and will be owned by the user.
pkt = demux_copy_packet(pkt);
@@ -1495,6 +1630,13 @@ static struct demuxer *open_given_type(struct mpv_global *global,
pthread_mutex_init(&in->lock, NULL);
pthread_cond_init(&in->wakeup, NULL);
+ in->current_range = talloc_ptrtype(in, in->current_range),
+ *in->current_range = (struct demux_cached_range){
+ .seek_start = MP_NOPTS_VALUE,
+ .seek_end = MP_NOPTS_VALUE,
+ };
+ MP_TARRAY_APPEND(in, in->ranges, in->num_ranges, in->current_range);
+
*in->d_thread = *demuxer;
*in->d_buffer = *demuxer;
@@ -1650,6 +1792,7 @@ static void clear_reader_state(struct demux_internal *in)
ds_clear_reader_state(in->streams[n]->ds);
in->warned_queue_overflow = false;
in->d_user->filepos = -1; // implicitly synchronized
+ assert(in->fw_bytes == 0);
}
static void clear_demux_state(struct demux_internal *in)
@@ -1660,6 +1803,7 @@ static void clear_demux_state(struct demux_internal *in)
in->eof = false;
in->last_eof = false;
in->idle = true;
+ assert(in->total_bytes == 0);
}
// clear the packet queues
@@ -1674,20 +1818,10 @@ static void recompute_buffers(struct demux_stream *ds)
{
ds->fw_packs = 0;
ds->fw_bytes = 0;
- ds->bw_bytes = 0;
- bool in_backbuffer = true;
- for (struct demux_packet *dp = ds->queue_head; dp; dp = dp->next) {
- if (dp == ds->reader_head)
- in_backbuffer = false;
-
- size_t bytes = demux_packet_estimate_total_size(dp);
- if (in_backbuffer) {
- ds->bw_bytes += bytes;
- } else {
- ds->fw_packs++;
- ds->fw_bytes += bytes;
- }
+ for (struct demux_packet *dp = ds->reader_head; dp; dp = dp->next) {
+ ds->fw_bytes += demux_packet_estimate_total_size(dp);
+ ds->fw_packs++;
}
}
@@ -1696,7 +1830,7 @@ static struct demux_packet *find_seek_target(struct demux_stream *ds,
{
struct demux_packet *target = NULL;
double target_diff = MP_NOPTS_VALUE;
- for (struct demux_packet *dp = ds->queue_head; dp; dp = dp->next) {
+ for (struct demux_packet *dp = ds->queue->head; dp; dp = dp->next) {
double range_pts = dp->kf_seek_pts;
if (!dp->keyframe || range_pts == MP_NOPTS_VALUE)
continue;
@@ -1783,10 +1917,12 @@ static bool try_seek_cache(struct demux_internal *in, double pts, int flags)
struct demux_packet *target = find_seek_target(ds, pts, flags);
ds->reader_head = target;
ds->skip_to_keyframe = !target;
- recompute_buffers(ds);
if (ds->reader_head)
ds->base_ts = PTS_OR_DEF(ds->reader_head->pts, ds->reader_head->dts);
+ recompute_buffers(ds);
+ in->fw_bytes += ds->fw_bytes;
+
MP_VERBOSE(in, "seeking stream %d (%s) to ",
n, stream_type_name(ds->type));
@@ -2033,41 +2169,36 @@ static int cached_demux_control(struct demux_internal *in, int cmd, void *arg)
.ts_duration = -1,
};
bool any_packets = false;
- bool seek_ok = in->seekable_cache && !in->seeking;
- double seek_min = MP_NOPTS_VALUE;
- double seek_max = MP_NOPTS_VALUE;
for (int n = 0; n < in->num_streams; n++) {
struct demux_stream *ds = in->streams[n]->ds;
- if (ds->active && !(!ds->queue_head && ds->eof) &&
+ if (ds->active && !(!ds->queue->head && ds->eof) &&
!ds->ignore_eof && ds->type != STREAM_SUB)
{
r->underrun |= !ds->reader_head && !ds->eof;
r->ts_reader = MP_PTS_MAX(r->ts_reader, ds->base_ts);
r->ts_end = MP_PTS_MAX(r->ts_end, ds->last_ts);
- seek_min = MP_PTS_MAX(seek_min, ds->back_pts);
- seek_max = MP_PTS_MIN(seek_max, ds->end_pts);
- if (ds->back_pts == MP_NOPTS_VALUE ||
- ds->end_pts == MP_NOPTS_VALUE)
- seek_ok = false;
- any_packets |= !!ds->queue_head;
+ any_packets |= !!ds->queue->head;
}
}
r->idle = (in->idle && !r->underrun) || r->eof;
r->underrun &= !r->idle;
- seek_min = MP_ADD_PTS(seek_min, in->ts_offset);
- seek_max = MP_ADD_PTS(seek_max, in->ts_offset);
r->ts_reader = MP_ADD_PTS(r->ts_reader, in->ts_offset);
r->ts_end = MP_ADD_PTS(r->ts_end, in->ts_offset);
if (r->ts_reader != MP_NOPTS_VALUE && r->ts_reader <= r->ts_end)
r->ts_duration = r->ts_end - r->ts_reader;
if (in->seeking || !any_packets)
r->ts_duration = 0;
- if (seek_ok && seek_min != MP_NOPTS_VALUE && seek_max > seek_min) {
- r->num_seek_ranges = 1;
- r->seek_ranges[0] = (struct demux_seek_range){
- .start = seek_min,
- .end = seek_max,
- };
+ if (!in->seeking) {
+ for (int n = 0; n < in->num_ranges; n++) {
+ struct demux_cached_range *range = in->ranges[n];
+ if (range->seek_start != MP_NOPTS_VALUE && n < MAX_SEEK_RANGES) {
+ r->seek_ranges[r->num_seek_ranges++] =
+ (struct demux_seek_range){
+ .start = MP_ADD_PTS(range->seek_start, in->ts_offset),
+ .end = MP_ADD_PTS(range->seek_end, in->ts_offset),
+ };
+ }
+ }
}
return CONTROL_OK;
}