diff options
Diffstat (limited to 'demux/demux.c')
-rw-r--r-- | demux/demux.c | 297 |
1 files changed, 214 insertions, 83 deletions
diff --git a/demux/demux.c b/demux/demux.c index 155789559d..e38f704cfb 100644 --- a/demux/demux.c +++ b/demux/demux.c @@ -175,6 +175,17 @@ struct demux_internal { void (*run_fn)(void *); // if non-NULL, function queued to be run on void *run_fn_arg; // the thread as run_fn(run_fn_arg) + // (sorted by least recent use: index 0 is least recently used) + struct demux_cached_range **ranges; + int num_ranges; + + size_t total_bytes; // total sum of packet data buffered + size_t fw_bytes; // sum of forward packet data in current_range + + // Range from which decoder is reading, and to which demuxer is appending. + // This is never NULL. + struct demux_cached_range *current_range; + // Cached state. bool force_cache_update; struct mp_tags *stream_metadata; @@ -184,10 +195,33 @@ struct demux_internal { char *stream_base_filename; }; +// A continuous range of cached packets for all enabled streams. +// (One demux_queue for each known stream.) +struct demux_cached_range { + // streams[] is indexed by demux_stream->index + struct demux_queue **streams; + int num_streams; + + // Computed from the stream queue's values. + double seek_start, seek_end; +}; + +// A continuous list of cached packets for a single stream/range. There is one +// for each stream and range. +struct demux_queue { + struct demux_stream *ds; + + struct demux_packet *head; + struct demux_packet *tail; + + double seek_start, seek_end; +}; + struct demux_stream { struct demux_internal *in; - struct sh_stream *sh; - enum stream_type type; + struct sh_stream *sh; // ds->sh->ds == ds + enum stream_type type; // equals to sh->type + int index; // equals to sh->index // --- all fields are protected by in->lock // demuxer state @@ -195,33 +229,33 @@ struct demux_stream { bool active; // try to keep at least 1 packet queued // if false, this stream is disabled, or passively // read (like subtitles) - bool eof; // end of demuxed stream? (true if all buffer empty) + bool eof; // end of demuxed stream? (true if all buffers empty) bool need_refresh; // enabled mid-stream bool refreshing; bool correct_dts; // packet DTS is strictly monotonically increasing bool correct_pos; // packet pos is strictly monotonically increasing - size_t fw_packs; // number of packets in buffer (forward) - size_t fw_bytes; // total bytes of packets in buffer (forward) - size_t bw_bytes; // same as fw_bytes, but for back buffer - int64_t last_pos; - double last_dts; + int64_t last_pos; // for determining correct_pos + double last_dts; // for determining correct_dts + bool global_correct_dts;// all observed so far + bool global_correct_pos; double last_ts; // timestamp of the last packet added to queue - double back_pts; // smallest timestamp on the start of the back buffer - double end_pts; // highest seekable timestamp - struct demux_packet *queue_head; // start of the full queue - struct demux_packet *queue_tail; // end of the full queue struct demux_packet *next_prune_target; // cached value for faster pruning // for incrementally determining seek PTS range bool keyframe_seen; double keyframe_pts, keyframe_end_pts; struct demux_packet *keyframe_latest; + // current queue - used both for reading and demuxing (this is never NULL) + struct demux_queue *queue; + // reader (decoder) state (bitrate calculations are part of it because we // want to return the bitrate closest to the "current position") double base_ts; // timestamp of the last packet returned to decoder double last_br_ts; // timestamp of last packet bitrate was calculated size_t last_br_bytes; // summed packet sizes since last bitrate calculation double bitrate; + size_t fw_packs; // number of packets in buffer (forward) + size_t fw_bytes; // total bytes of packets in buffer (forward) struct demux_packet *reader_head; // points at current decoder position bool skip_to_keyframe; bool attached_picture_added; @@ -245,33 +279,112 @@ static void update_cache(struct demux_internal *in); static int cached_demux_control(struct demux_internal *in, int cmd, void *arg); static void clear_demux_state(struct demux_internal *in); +#if 0 +// very expensive check for redundant cached queue state +static void check_queue_consistency(struct demux_internal *in) +{ + size_t total_bytes = 0; + size_t total_fw_bytes = 0; + + for (int n = 0; n < in->num_ranges; n++) { + struct demux_cached_range *range = in->ranges[n]; + + assert(range->num_streams == in->num_streams); + + for (int i = 0; i < range->num_streams; i++) { + struct demux_queue *queue = range->streams[i]; + + size_t fw_bytes = 0; + size_t fw_packs = 0; + bool is_forward = false; + for (struct demux_packet *dp = queue->head; dp; dp = dp->next) { + is_forward |= dp == queue->ds->reader_head; + + size_t bytes = demux_packet_estimate_total_size(dp); + total_bytes += bytes; + if (is_forward) { + fw_bytes += bytes; + fw_packs += 1; + assert(range == in->current_range); + assert(queue->ds->queue == queue); + } + + if (!dp->next) + assert(queue->tail == dp); + } + if (!queue->head) + assert(!queue->tail); + + total_fw_bytes += fw_bytes; + + if (range == in->current_range) { + assert(queue->ds->fw_bytes == fw_bytes); + assert(queue->ds->fw_packs == fw_packs); + } else { + assert(fw_bytes == 0 && fw_packs == 0); + } + } + } + + assert(in->total_bytes == total_bytes); + assert(in->fw_bytes == total_fw_bytes); +} +#endif + +static void update_seek_ranges(struct demux_stream *ds) +{ + struct demux_cached_range *range = ds->in->current_range; + + range->seek_start = range->seek_end = MP_NOPTS_VALUE; + + for (int n = 0; n < range->num_streams; n++) { + struct demux_queue *queue = range->streams[n]; + if (queue->ds->active) { + range->seek_start = MP_PTS_MAX(range->seek_start, queue->seek_start); + range->seek_end = MP_PTS_MIN(range->seek_end, queue->seek_end); + + if (range->seek_start == MP_NOPTS_VALUE || + range->seek_end == MP_NOPTS_VALUE) + { + range->seek_start = range->seek_end = MP_NOPTS_VALUE; + break; + } + } + } + + if (range->seek_start >= range->seek_end) + range->seek_start = range->seek_end = MP_NOPTS_VALUE; +} + static void ds_clear_reader_state(struct demux_stream *ds) { + ds->in->fw_bytes -= ds->fw_bytes; + ds->reader_head = NULL; ds->base_ts = ds->last_br_ts = MP_NOPTS_VALUE; ds->last_br_bytes = 0; ds->bitrate = -1; ds->skip_to_keyframe = false; ds->attached_picture_added = false; + ds->fw_bytes = 0; + ds->fw_packs = 0; } static void ds_clear_demux_state(struct demux_stream *ds) { ds_clear_reader_state(ds); - demux_packet_t *dp = ds->queue_head; + demux_packet_t *dp = ds->queue->head; while (dp) { demux_packet_t *dn = dp->next; + ds->in->total_bytes -= demux_packet_estimate_total_size(dp); free_demux_packet(dp); dp = dn; } - ds->queue_head = ds->queue_tail = NULL; + ds->queue->head = ds->queue->tail = NULL; ds->next_prune_target = NULL; ds->keyframe_latest = NULL; - ds->fw_packs = 0; - ds->fw_bytes = 0; - ds->bw_bytes = 0; ds->eof = false; ds->active = false; ds->refreshing = false; @@ -279,9 +392,11 @@ static void ds_clear_demux_state(struct demux_stream *ds) ds->correct_dts = ds->correct_pos = true; ds->last_pos = -1; ds->last_ts = ds->last_dts = MP_NOPTS_VALUE; - ds->back_pts = ds->end_pts = MP_NOPTS_VALUE; + ds->queue->seek_start = ds->queue->seek_end = MP_NOPTS_VALUE; ds->keyframe_seen = false; ds->keyframe_pts = ds->keyframe_end_pts = MP_NOPTS_VALUE; + + update_seek_ranges(ds); } void demux_set_ts_offset(struct demuxer *demuxer, double offset) @@ -318,18 +433,22 @@ static void demux_add_sh_stream_locked(struct demux_internal *in, { assert(!sh->ds); // must not be added yet + sh->index = in->num_streams; + sh->ds = talloc(sh, struct demux_stream); *sh->ds = (struct demux_stream) { .in = in, .sh = sh, .type = sh->type, + .index = sh->index, .selected = in->autoselect, + .global_correct_dts = true, + .global_correct_pos = true, }; if (!sh->codec->codec) sh->codec->codec = ""; - sh->index = in->num_streams; if (sh->ff_index < 0) sh->ff_index = sh->index; if (sh->demuxer_id < 0) { @@ -341,6 +460,21 @@ static void demux_add_sh_stream_locked(struct demux_internal *in, } MP_TARRAY_APPEND(in, in->streams, in->num_streams, sh); + assert(in->streams[sh->index] == sh); + + for (int n = 0; n < in->num_ranges; n++) { + struct demux_cached_range *range = in->ranges[n]; + struct demux_queue *queue = talloc_ptrtype(range, queue); + *queue = (struct demux_queue){ + .ds = sh->ds, + .seek_start = MP_NOPTS_VALUE, + .seek_end = MP_NOPTS_VALUE, + }; + MP_TARRAY_APPEND(range, range->streams, range->num_streams, queue); + assert(range->streams[sh->ds->index] == queue); + } + + sh->ds->queue = in->current_range->streams[sh->ds->index]; in->events |= DEMUX_EVENT_STREAMS; if (in->wakeup_cb) @@ -589,10 +723,11 @@ static void adjust_seek_range_on_packet(struct demux_stream *ds, if (!dp || dp->keyframe) { if (ds->keyframe_latest) { ds->keyframe_latest->kf_seek_pts = ds->keyframe_pts; - if (ds->back_pts == MP_NOPTS_VALUE) - ds->back_pts = ds->keyframe_pts; + if (ds->queue->seek_start == MP_NOPTS_VALUE) + ds->queue->seek_start = ds->keyframe_pts; if (ds->keyframe_end_pts != MP_NOPTS_VALUE) - ds->end_pts = ds->keyframe_end_pts; + ds->queue->seek_end = ds->keyframe_end_pts; + update_seek_ranges(ds); } ds->keyframe_latest = dp; ds->keyframe_pts = ds->keyframe_end_pts = MP_NOPTS_VALUE; @@ -644,6 +779,8 @@ void demux_add_packet(struct sh_stream *stream, demux_packet_t *dp) ds->correct_dts &= dp->dts != MP_NOPTS_VALUE && dp->dts > ds->last_dts; ds->last_pos = dp->pos; ds->last_dts = dp->dts; + ds->global_correct_pos &= ds->correct_pos; + ds->global_correct_dts &= ds->correct_dts; dp->stream = stream->index; dp->next = NULL; @@ -656,20 +793,20 @@ void demux_add_packet(struct sh_stream *stream, demux_packet_t *dp) } size_t bytes = demux_packet_estimate_total_size(dp); + ds->in->total_bytes += bytes; if (ds->reader_head) { ds->fw_packs++; ds->fw_bytes += bytes; - } else { - ds->bw_bytes += bytes; + in->fw_bytes += bytes; } - if (ds->queue_tail) { + if (ds->queue->tail) { // next packet in stream - ds->queue_tail->next = dp; - ds->queue_tail = dp; + ds->queue->tail->next = dp; + ds->queue->tail = dp; } else { // first packet in stream - ds->queue_head = ds->queue_tail = dp; + ds->queue->head = ds->queue->tail = dp; } adjust_seek_range_on_packet(ds, dp); @@ -714,19 +851,17 @@ static bool read_packet(struct demux_internal *in) // the minimum, or if a stream explicitly needs new packets. Also includes // safe-guards against packet queue overflow. bool active = false, read_more = false, prefetch_more = false; - size_t bytes = 0; for (int n = 0; n < in->num_streams; n++) { struct demux_stream *ds = in->streams[n]->ds; active |= ds->active; read_more |= (ds->active && !ds->reader_head) || ds->refreshing; - bytes += ds->fw_bytes; if (ds->active && ds->last_ts != MP_NOPTS_VALUE && in->min_secs > 0 && ds->base_ts != MP_NOPTS_VALUE && ds->last_ts >= ds->base_ts) prefetch_more |= ds->last_ts - ds->base_ts < in->min_secs; } MP_DBG(in, "bytes=%zd, active=%d, read_more=%d prefetch_more=%d\n", - bytes, active, read_more, prefetch_more); - if (bytes >= in->max_bytes) { + in->fw_bytes, active, read_more, prefetch_more); + if (in->fw_bytes >= in->max_bytes) { if (!read_more) return false; if (!in->warned_queue_overflow) { @@ -737,7 +872,7 @@ static bool read_packet(struct demux_internal *in) if (ds->selected) { MP_WARN(in, " %s/%d: %zd packets, %zd bytes\n", stream_type_name(ds->type), n, - ds->fw_packs, ds->fw_bytes); + ds->fw_packs, ds->fw_bytes); } } } @@ -803,9 +938,7 @@ static bool read_packet(struct demux_internal *in) static void prune_old_packets(struct demux_internal *in) { - size_t buffered = 0; - for (int n = 0; n < in->num_streams; n++) - buffered += in->streams[n]->ds->bw_bytes; + size_t buffered = in->total_bytes - in->fw_bytes; MP_TRACE(in, "total backbuffer = %zd\n", buffered); @@ -820,8 +953,8 @@ static void prune_old_packets(struct demux_internal *in) for (int n = 0; n < in->num_streams; n++) { struct demux_stream *ds = in->streams[n]->ds; - if (ds->queue_head && ds->queue_head != ds->reader_head) { - struct demux_packet *dp = ds->queue_head; + if (ds->queue->head && ds->queue->head != ds->reader_head) { + struct demux_packet *dp = ds->queue->head; double ts = dp->kf_seek_pts; // Note: in obscure cases, packets might have no timestamps set, // in which case we still need to prune _something_. @@ -847,35 +980,37 @@ static void prune_old_packets(struct demux_internal *in) // that many keyframe ranges without keyframes exist (audio packets) // makes this much harder. if (in->seekable_cache && !ds->next_prune_target) { - // (Has to be _after_ queue_head to drop at least 1 packet.) - struct demux_packet *prev = ds->queue_head; - ds->back_pts = MP_NOPTS_VALUE; - ds->next_prune_target = ds->queue_tail; // (prune all if none found) + // (Has to be _after_ queue->head to drop at least 1 packet.) + struct demux_packet *prev = ds->queue->head; + ds->queue->seek_start = MP_NOPTS_VALUE; + ds->next_prune_target = ds->queue->tail; // (prune all if none found) while (prev->next) { struct demux_packet *dp = prev->next; // Note that the next back_pts might be above the lowest buffered // packet, but it will still be only viable lowest seek target. if (dp->keyframe && dp->kf_seek_pts != MP_NOPTS_VALUE) { - ds->back_pts = dp->kf_seek_pts; + ds->queue->seek_start = dp->kf_seek_pts; ds->next_prune_target = prev; break; } prev = prev->next; } + + update_seek_ranges(ds); } bool done = false; - while (!done && ds->queue_head && ds->queue_head != ds->reader_head) { - struct demux_packet *dp = ds->queue_head; + while (!done && ds->queue->head && ds->queue->head != ds->reader_head) { + struct demux_packet *dp = ds->queue->head; size_t bytes = demux_packet_estimate_total_size(dp); buffered -= bytes; MP_TRACE(in, "dropping backbuffer packet size %zd from stream %d\n", bytes, ds->sh->index); - ds->queue_head = dp->next; - if (!ds->queue_head) - ds->queue_tail = NULL; + ds->queue->head = dp->next; + if (!ds->queue->head) + ds->queue->tail = NULL; if (ds->next_prune_target == dp) { ds->next_prune_target = NULL; done = true; @@ -883,7 +1018,7 @@ static void prune_old_packets(struct demux_internal *in) if (ds->keyframe_latest == dp) ds->keyframe_latest = NULL; talloc_free(dp); - ds->bw_bytes -= bytes; + in->total_bytes -= bytes; } } } @@ -986,7 +1121,7 @@ static struct demux_packet *dequeue_packet(struct demux_stream *ds) ds->fw_packs--; size_t bytes = demux_packet_estimate_total_size(pkt); ds->fw_bytes -= bytes; - ds->bw_bytes += bytes; + ds->in->fw_bytes -= bytes; // The returned packet is mutated etc. and will be owned by the user. pkt = demux_copy_packet(pkt); @@ -1495,6 +1630,13 @@ static struct demuxer *open_given_type(struct mpv_global *global, pthread_mutex_init(&in->lock, NULL); pthread_cond_init(&in->wakeup, NULL); + in->current_range = talloc_ptrtype(in, in->current_range), + *in->current_range = (struct demux_cached_range){ + .seek_start = MP_NOPTS_VALUE, + .seek_end = MP_NOPTS_VALUE, + }; + MP_TARRAY_APPEND(in, in->ranges, in->num_ranges, in->current_range); + *in->d_thread = *demuxer; *in->d_buffer = *demuxer; @@ -1650,6 +1792,7 @@ static void clear_reader_state(struct demux_internal *in) ds_clear_reader_state(in->streams[n]->ds); in->warned_queue_overflow = false; in->d_user->filepos = -1; // implicitly synchronized + assert(in->fw_bytes == 0); } static void clear_demux_state(struct demux_internal *in) @@ -1660,6 +1803,7 @@ static void clear_demux_state(struct demux_internal *in) in->eof = false; in->last_eof = false; in->idle = true; + assert(in->total_bytes == 0); } // clear the packet queues @@ -1674,20 +1818,10 @@ static void recompute_buffers(struct demux_stream *ds) { ds->fw_packs = 0; ds->fw_bytes = 0; - ds->bw_bytes = 0; - bool in_backbuffer = true; - for (struct demux_packet *dp = ds->queue_head; dp; dp = dp->next) { - if (dp == ds->reader_head) - in_backbuffer = false; - - size_t bytes = demux_packet_estimate_total_size(dp); - if (in_backbuffer) { - ds->bw_bytes += bytes; - } else { - ds->fw_packs++; - ds->fw_bytes += bytes; - } + for (struct demux_packet *dp = ds->reader_head; dp; dp = dp->next) { + ds->fw_bytes += demux_packet_estimate_total_size(dp); + ds->fw_packs++; } } @@ -1696,7 +1830,7 @@ static struct demux_packet *find_seek_target(struct demux_stream *ds, { struct demux_packet *target = NULL; double target_diff = MP_NOPTS_VALUE; - for (struct demux_packet *dp = ds->queue_head; dp; dp = dp->next) { + for (struct demux_packet *dp = ds->queue->head; dp; dp = dp->next) { double range_pts = dp->kf_seek_pts; if (!dp->keyframe || range_pts == MP_NOPTS_VALUE) continue; @@ -1783,10 +1917,12 @@ static bool try_seek_cache(struct demux_internal *in, double pts, int flags) struct demux_packet *target = find_seek_target(ds, pts, flags); ds->reader_head = target; ds->skip_to_keyframe = !target; - recompute_buffers(ds); if (ds->reader_head) ds->base_ts = PTS_OR_DEF(ds->reader_head->pts, ds->reader_head->dts); + recompute_buffers(ds); + in->fw_bytes += ds->fw_bytes; + MP_VERBOSE(in, "seeking stream %d (%s) to ", n, stream_type_name(ds->type)); @@ -2033,41 +2169,36 @@ static int cached_demux_control(struct demux_internal *in, int cmd, void *arg) .ts_duration = -1, }; bool any_packets = false; - bool seek_ok = in->seekable_cache && !in->seeking; - double seek_min = MP_NOPTS_VALUE; - double seek_max = MP_NOPTS_VALUE; for (int n = 0; n < in->num_streams; n++) { struct demux_stream *ds = in->streams[n]->ds; - if (ds->active && !(!ds->queue_head && ds->eof) && + if (ds->active && !(!ds->queue->head && ds->eof) && !ds->ignore_eof && ds->type != STREAM_SUB) { r->underrun |= !ds->reader_head && !ds->eof; r->ts_reader = MP_PTS_MAX(r->ts_reader, ds->base_ts); r->ts_end = MP_PTS_MAX(r->ts_end, ds->last_ts); - seek_min = MP_PTS_MAX(seek_min, ds->back_pts); - seek_max = MP_PTS_MIN(seek_max, ds->end_pts); - if (ds->back_pts == MP_NOPTS_VALUE || - ds->end_pts == MP_NOPTS_VALUE) - seek_ok = false; - any_packets |= !!ds->queue_head; + any_packets |= !!ds->queue->head; } } r->idle = (in->idle && !r->underrun) || r->eof; r->underrun &= !r->idle; - seek_min = MP_ADD_PTS(seek_min, in->ts_offset); - seek_max = MP_ADD_PTS(seek_max, in->ts_offset); r->ts_reader = MP_ADD_PTS(r->ts_reader, in->ts_offset); r->ts_end = MP_ADD_PTS(r->ts_end, in->ts_offset); if (r->ts_reader != MP_NOPTS_VALUE && r->ts_reader <= r->ts_end) r->ts_duration = r->ts_end - r->ts_reader; if (in->seeking || !any_packets) r->ts_duration = 0; - if (seek_ok && seek_min != MP_NOPTS_VALUE && seek_max > seek_min) { - r->num_seek_ranges = 1; - r->seek_ranges[0] = (struct demux_seek_range){ - .start = seek_min, - .end = seek_max, - }; + if (!in->seeking) { + for (int n = 0; n < in->num_ranges; n++) { + struct demux_cached_range *range = in->ranges[n]; + if (range->seek_start != MP_NOPTS_VALUE && n < MAX_SEEK_RANGES) { + r->seek_ranges[r->num_seek_ranges++] = + (struct demux_seek_range){ + .start = MP_ADD_PTS(range->seek_start, in->ts_offset), + .end = MP_ADD_PTS(range->seek_end, in->ts_offset), + }; + } + } } return CONTROL_OK; } |