diff options
Diffstat (limited to 'demux')
-rw-r--r-- | demux/demux.c | 825 | ||||
-rw-r--r-- | demux/demux.h | 2 |
2 files changed, 603 insertions, 224 deletions
diff --git a/demux/demux.c b/demux/demux.c index 85acce28db..9558605bad 100644 --- a/demux/demux.c +++ b/demux/demux.c @@ -157,6 +157,10 @@ struct demux_internal { int max_bytes_bw; int seekable_cache; + // At least one decoder actually requested data since init or the last seek. + // Do this to allow the decoder thread to select streams before starting. + bool reading; + // Set if we know that we are at the start of the file. This is used to // avoid a redundant initial seek after enabling streams. We could just // allow it, but to avoid buggy seeking affecting normal playback, we don't. @@ -183,7 +187,7 @@ struct demux_internal { size_t fw_bytes; // sum of forward packet data in current_range // Range from which decoder is reading, and to which demuxer is appending. - // This is never NULL. + // This is never NULL. This is always ranges[num_ranges - 1]. struct demux_cached_range *current_range; // Cached state. @@ -202,18 +206,34 @@ struct demux_cached_range { struct demux_queue **streams; int num_streams; - // Computed from the stream queue's values. + // Computed from the stream queue's values. These fields (unlike as with + // demux_queue) are always either NOPTS, or fully valid. double seek_start, seek_end; }; // A continuous list of cached packets for a single stream/range. There is one -// for each stream and range. +// for each stream and range. Also contains some state for use during demuxing +// (keeping it across seeks makes it easier to resume demuxing). struct demux_queue { struct demux_stream *ds; + struct demux_cached_range *range; struct demux_packet *head; struct demux_packet *tail; + struct demux_packet *next_prune_target; // cached value for faster pruning + + bool correct_dts; // packet DTS is strictly monotonically increasing + bool correct_pos; // packet pos is strictly monotonically increasing + int64_t last_pos; // for determining correct_pos + double last_dts; // for determining correct_dts + double last_ts; // timestamp of the last packet added to queue + + // for incrementally determining seek PTS range + double keyframe_pts, keyframe_end_pts; + struct demux_packet *keyframe_latest; + + // incrementally maintained seek range, possibly invalid double seek_start, seek_end; }; @@ -226,24 +246,14 @@ struct demux_stream { // demuxer state bool selected; // user wants packets from this stream - bool active; // try to keep at least 1 packet queued + bool eager; // try to keep at least 1 packet queued // if false, this stream is disabled, or passively // read (like subtitles) - bool eof; // end of demuxed stream? (true if all buffers empty) bool need_refresh; // enabled mid-stream bool refreshing; - bool correct_dts; // packet DTS is strictly monotonically increasing - bool correct_pos; // packet pos is strictly monotonically increasing - int64_t last_pos; // for determining correct_pos - double last_dts; // for determining correct_dts + bool global_correct_dts;// all observed so far bool global_correct_pos; - double last_ts; // timestamp of the last packet added to queue - struct demux_packet *next_prune_target; // cached value for faster pruning - // for incrementally determining seek PTS range - bool keyframe_seen; - double keyframe_pts, keyframe_end_pts; - struct demux_packet *keyframe_latest; // current queue - used both for reading and demuxing (this is never NULL) struct demux_queue *queue; @@ -256,6 +266,7 @@ struct demux_stream { double bitrate; size_t fw_packs; // number of packets in buffer (forward) size_t fw_bytes; // total bytes of packets in buffer (forward) + bool eof; // end of demuxed stream? (true if no more packets) struct demux_packet *reader_head; // points at current decoder position bool skip_to_keyframe; bool attached_picture_added; @@ -276,8 +287,6 @@ struct demux_stream { static void demuxer_sort_chapters(demuxer_t *demuxer); static void *demux_thread(void *pctx); static void update_cache(struct demux_internal *in); -static int cached_demux_control(struct demux_internal *in, int cmd, void *arg); -static void clear_demux_state(struct demux_internal *in); #if 0 // very expensive check for redundant cached queue state @@ -286,6 +295,9 @@ static void check_queue_consistency(struct demux_internal *in) size_t total_bytes = 0; size_t total_fw_bytes = 0; + assert(in->current_range && in->num_ranges > 0); + assert(in->current_range == in->ranges[in->num_ranges - 1]); + for (int n = 0; n < in->num_ranges; n++) { struct demux_cached_range *range = in->ranges[n]; @@ -294,11 +306,17 @@ static void check_queue_consistency(struct demux_internal *in) for (int i = 0; i < range->num_streams; i++) { struct demux_queue *queue = range->streams[i]; + assert(queue->range == range); + size_t fw_bytes = 0; size_t fw_packs = 0; bool is_forward = false; + bool kf_found = false; + bool npt_found = false; for (struct demux_packet *dp = queue->head; dp; dp = dp->next) { is_forward |= dp == queue->ds->reader_head; + kf_found |= dp == queue->keyframe_latest; + npt_found |= dp == queue->next_prune_target; size_t bytes = demux_packet_estimate_total_size(dp); total_bytes += bytes; @@ -315,6 +333,15 @@ static void check_queue_consistency(struct demux_internal *in) if (!queue->head) assert(!queue->tail); + // If the queue is currently used... + if (queue->ds->queue == queue) { + // ...reader_head and others must be in the queue. + assert(is_forward == !!queue->ds->reader_head); + assert(kf_found == !!queue->keyframe_latest); + } + + assert(npt_found == !!queue->next_prune_target); + total_fw_bytes += fw_bytes; if (range == in->current_range) { @@ -323,6 +350,9 @@ static void check_queue_consistency(struct demux_internal *in) } else { assert(fw_bytes == 0 && fw_packs == 0); } + + if (queue->keyframe_latest) + assert(queue->keyframe_latest->keyframe); } } @@ -331,20 +361,46 @@ static void check_queue_consistency(struct demux_internal *in) } #endif -static void update_seek_ranges(struct demux_stream *ds) +static void recompute_buffers(struct demux_stream *ds) { - struct demux_cached_range *range = ds->in->current_range; + ds->fw_packs = 0; + ds->fw_bytes = 0; + for (struct demux_packet *dp = ds->reader_head; dp; dp = dp->next) { + ds->fw_bytes += demux_packet_estimate_total_size(dp); + ds->fw_packs++; + } +} + +// (this doesn't do most required things for a switch, like updating ds->queue) +static void set_current_range(struct demux_internal *in, + struct demux_cached_range *range) +{ + in->current_range = range; + + // Move to in->ranges[in->num_ranges-1] (for LRU sorting/invariant) + for (int n = 0; n < in->num_ranges; n++) { + if (in->ranges[n] == range) { + MP_TARRAY_REMOVE_AT(in->ranges, in->num_ranges, n); + break; + } + } + MP_TARRAY_APPEND(in, in->ranges, in->num_ranges, range); +} + +// Refresh range->seek_start/end. +static void update_seek_ranges(struct demux_cached_range *range) +{ range->seek_start = range->seek_end = MP_NOPTS_VALUE; for (int n = 0; n < range->num_streams; n++) { struct demux_queue *queue = range->streams[n]; - if (queue->ds->active) { + if (queue->ds->selected) { range->seek_start = MP_PTS_MAX(range->seek_start, queue->seek_start); range->seek_end = MP_PTS_MIN(range->seek_end, queue->seek_end); - if (range->seek_start == MP_NOPTS_VALUE || - range->seek_end == MP_NOPTS_VALUE) + if (queue->seek_start == MP_NOPTS_VALUE || + queue->seek_end == MP_NOPTS_VALUE) { range->seek_start = range->seek_end = MP_NOPTS_VALUE; break; @@ -356,11 +412,92 @@ static void update_seek_ranges(struct demux_stream *ds) range->seek_start = range->seek_end = MP_NOPTS_VALUE; } +// Remove the packet dp from the queue. prev must be the packet before dp, or +// NULL if dp is the first packet. +// This does not update in->fw_bytes/in->fw_packs. +static void remove_packet(struct demux_queue *queue, struct demux_packet *prev, + struct demux_packet *dp) +{ + if (prev) { + assert(prev->next == dp); + } else { + assert(queue->head == dp); + } + + assert(queue->ds->reader_head != dp); + if (queue->next_prune_target == dp) + queue->next_prune_target = NULL; + if (queue->keyframe_latest == dp) + queue->keyframe_latest = NULL; + + queue->ds->in->total_bytes -= demux_packet_estimate_total_size(dp); + + if (prev) { + prev->next = dp->next; + if (!prev->next) + queue->tail = prev; + } else { + queue->head = dp->next; + if (!queue->head) + queue->tail = NULL; + } + + talloc_free(dp); +} + +static void clear_queue(struct demux_queue *queue) +{ + struct demux_stream *ds = queue->ds; + struct demux_internal *in = ds->in; + + struct demux_packet *dp = queue->head; + while (dp) { + struct demux_packet *dn = dp->next; + in->total_bytes -= demux_packet_estimate_total_size(dp); + assert(ds->reader_head != dp); + talloc_free(dp); + dp = dn; + } + queue->head = queue->tail = NULL; + queue->next_prune_target = NULL; + queue->keyframe_latest = NULL; + queue->seek_start = queue->seek_end = MP_NOPTS_VALUE; + + queue->correct_dts = queue->correct_pos = true; + queue->last_pos = -1; + queue->last_ts = queue->last_dts = MP_NOPTS_VALUE; + queue->keyframe_latest = NULL; + queue->keyframe_pts = queue->keyframe_end_pts = MP_NOPTS_VALUE; +} + +static void clear_cached_range(struct demux_internal *in, + struct demux_cached_range *range) +{ + for (int n = 0; n < range->num_streams; n++) + clear_queue(range->streams[n]); + update_seek_ranges(range); +} + +static void free_empty_cached_ranges(struct demux_internal *in) +{ + assert(in->current_range && in->num_ranges > 0); + assert(in->current_range == in->ranges[in->num_ranges - 1]); + + for (int n = in->num_ranges - 2; n >= 0; n--) { + struct demux_cached_range *range = in->ranges[n]; + if (range->seek_start == MP_NOPTS_VALUE) { + clear_cached_range(in, range); + MP_TARRAY_REMOVE_AT(in->ranges, in->num_ranges, n); + } + } +} + static void ds_clear_reader_state(struct demux_stream *ds) { ds->in->fw_bytes -= ds->fw_bytes; ds->reader_head = NULL; + ds->eof = false; ds->base_ts = ds->last_br_ts = MP_NOPTS_VALUE; ds->last_br_bytes = 0; ds->bitrate = -1; @@ -370,33 +507,56 @@ static void ds_clear_reader_state(struct demux_stream *ds) ds->fw_packs = 0; } -static void ds_clear_demux_state(struct demux_stream *ds) +static void update_stream_selection_state(struct demux_internal *in, + struct demux_stream *ds, + bool selected, bool new) { - ds_clear_reader_state(ds); + if (ds->selected != selected || new) { + ds->selected = selected; + ds->eof = false; + ds->refreshing = false; + ds->need_refresh = false; - demux_packet_t *dp = ds->queue->head; - while (dp) { - demux_packet_t *dn = dp->next; - ds->in->total_bytes -= demux_packet_estimate_total_size(dp); - free_demux_packet(dp); - dp = dn; + ds_clear_reader_state(ds); + + // Make sure any stream reselection or addition is reflected in the seek + // ranges, and also get rid of data that is not needed anymore (or + // rather, which can't be kept consistent). + for (int n = 0; n < in->num_ranges; n++) { + struct demux_cached_range *range = in->ranges[n]; + + if (!ds->selected) + clear_queue(range->streams[ds->index]); + + update_seek_ranges(range); + } + + free_empty_cached_ranges(in); } - ds->queue->head = ds->queue->tail = NULL; - ds->next_prune_target = NULL; - ds->keyframe_latest = NULL; - ds->eof = false; - ds->active = false; - ds->refreshing = false; - ds->need_refresh = false; - ds->correct_dts = ds->correct_pos = true; - ds->last_pos = -1; - ds->last_ts = ds->last_dts = MP_NOPTS_VALUE; - ds->queue->seek_start = ds->queue->seek_end = MP_NOPTS_VALUE; - ds->keyframe_seen = false; - ds->keyframe_pts = ds->keyframe_end_pts = MP_NOPTS_VALUE; + // We still have to go over the whole stream list to update ds->eager for + // other streams too, because they depend on other stream's selections. - update_seek_ranges(ds); + bool any_av_streams = false; + + for (int n = 0; n < in->num_streams; n++) { + struct demux_stream *s = in->streams[n]->ds; + + s->eager = s->selected && !s->sh->attached_picture; + if (s->eager) + any_av_streams |= s->type != STREAM_SUB; + } + + // Subtitles are only eagerly read if there are no other eagerly read + // streams. + if (any_av_streams) { + for (int n = 0; n < in->num_streams; n++) { + struct demux_stream *s = in->streams[n]->ds; + + if (s->type == STREAM_SUB) + s->eager = false; + } + } } void demux_set_ts_offset(struct demuxer *demuxer, double offset) @@ -407,6 +567,23 @@ void demux_set_ts_offset(struct demuxer *demuxer, double offset) pthread_mutex_unlock(&in->lock); } +static void add_missing_streams(struct demux_internal *in, + struct demux_cached_range *range) +{ + for (int n = range->num_streams; n < in->num_streams; n++) { + struct demux_stream *ds = in->streams[n]->ds; + + struct demux_queue *queue = talloc_ptrtype(range, queue); + *queue = (struct demux_queue){ + .ds = ds, + .range = range, + }; + clear_queue(queue); + MP_TARRAY_APPEND(range, range->streams, range->num_streams, queue); + assert(range->streams[ds->index] == queue); + } +} + // Allocate a new sh_stream of the given type. It either has to be released // with talloc_free(), or added to a demuxer with demux_add_sh_stream(). You // cannot add or read packets from the stream before it has been added. @@ -441,7 +618,6 @@ static void demux_add_sh_stream_locked(struct demux_internal *in, .sh = sh, .type = sh->type, .index = sh->index, - .selected = in->autoselect, .global_correct_dts = true, .global_correct_pos = true, }; @@ -462,20 +638,13 @@ static void demux_add_sh_stream_locked(struct demux_internal *in, MP_TARRAY_APPEND(in, in->streams, in->num_streams, sh); assert(in->streams[sh->index] == sh); - for (int n = 0; n < in->num_ranges; n++) { - struct demux_cached_range *range = in->ranges[n]; - struct demux_queue *queue = talloc_ptrtype(range, queue); - *queue = (struct demux_queue){ - .ds = sh->ds, - .seek_start = MP_NOPTS_VALUE, - .seek_end = MP_NOPTS_VALUE, - }; - MP_TARRAY_APPEND(range, range->streams, range->num_streams, queue); - assert(range->streams[sh->ds->index] == queue); - } + for (int n = 0; n < in->num_ranges; n++) + add_missing_streams(in, in->ranges[n]); sh->ds->queue = in->current_range->streams[sh->ds->index]; + update_stream_selection_state(in, sh->ds, in->autoselect, true); + in->events |= DEMUX_EVENT_STREAMS; if (in->wakeup_cb) in->wakeup_cb(in->wakeup_cb_ctx); @@ -553,7 +722,8 @@ void free_demuxer(demuxer_t *demuxer) if (demuxer->desc->close) demuxer->desc->close(in->d_thread); - clear_demux_state(in); + demux_flush(demuxer); + assert(in->total_bytes == 0); for (int n = in->num_streams - 1; n >= 0; n--) talloc_free(in->streams[n]); @@ -683,7 +853,7 @@ static double get_refresh_seek_pts(struct demux_internal *in) normal_seek &= ds->need_refresh; ds->need_refresh = false; - refresh_possible &= ds->correct_dts || ds->correct_pos; + refresh_possible &= ds->queue->correct_dts || ds->queue->correct_pos; } if (!needed || start_ts == MP_NOPTS_VALUE || !demux->desc->seek || @@ -702,7 +872,7 @@ static double get_refresh_seek_pts(struct demux_internal *in) struct demux_stream *ds = in->streams[n]->ds; // Streams which didn't have any packets yet will return all packets, // other streams return packets only starting from the last position. - if (ds->last_pos != -1 || ds->last_dts != MP_NOPTS_VALUE) + if (ds->queue->last_pos != -1 || ds->queue->last_dts != MP_NOPTS_VALUE) ds->refreshing |= ds->selected; } @@ -710,27 +880,181 @@ static double get_refresh_seek_pts(struct demux_internal *in) return start_ts - 1.0; } +// Check whether the next range in the list is, and if it appears to overlap, +// try joining it into a single range. +static void attempt_range_joining(struct demux_internal *in) +{ + struct demux_cached_range *next = NULL; + double next_dist = INFINITY; + + assert(in->current_range && in->num_ranges > 0); + assert(in->current_range == in->ranges[in->num_ranges - 1]); + + for (int n = 0; n < in->num_ranges - 1; n++) { + struct demux_cached_range *range = in->ranges[n]; + + if (in->current_range->seek_start <= range->seek_start) { + // This uses ">" to get some non-0 overlap. + double dist = in->current_range->seek_end - range->seek_start; + if (dist > 0 && dist < next_dist) { + next = range; + next_dist = dist; + } + } + } + + if (!next) + return; + + MP_VERBOSE(in, "going to join ranges %f-%f + %f-%f\n", + in->current_range->seek_start, in->current_range->seek_end, + next->seek_start, next->seek_end); + + // Try to find a join point, where packets obviously overlap. (It would be + // better and faster to do this incrementally, but probably too complex.) + // The current range can overlap arbitrarily with the next one, not only by + // by the seek overlap, but for arbitrary packet readahead as well. + // We also drop the overlapping packets (if joining fails, we discard the + // entire next range anyway, so this does no harm). + for (int n = 0; n < in->num_streams; n++) { + struct demux_stream *ds = in->streams[n]->ds; + + struct demux_queue *q1 = in->current_range->streams[n]; + struct demux_queue *q2 = next->streams[n]; + + if (!ds->global_correct_pos && !ds->global_correct_dts) { + MP_WARN(in, "stream %d: ranges unjoinable\n", n); + goto failed; + } + + struct demux_packet *end = q1->tail; + bool join_point_found = !end; // no packets yet -> joining will work + if (end) { + while (q2->head) { + struct demux_packet *dp = q2->head; + + // Some weird corner-case. We'd have to search the equivalent + // packet in q1 to update it correctly. Better just give up. + if (dp == q2->keyframe_latest) { + MP_WARN(in, "stream %d: not enough keyframes\n", n); + goto failed; + } + + // (Check for ">" too, to avoid incorrect joining in weird + // corner cases, where the next range misses the end packet.) + if ((ds->global_correct_dts && dp->dts >= end->dts) || + (ds->global_correct_pos && dp->pos >= end->pos)) + { + // Do some additional checks as a (imperfect) sanity check + // in case pos/dts are not "correct" across the ranges (we + // never actually check that). + if (dp->dts != end->dts || dp->pos != end->pos || + dp->pts != end->pts || dp->len != end->len) + { + MP_WARN(in, "stream %d: weird demuxer behavior\n", n); + goto failed; + } + + remove_packet(q2, NULL, dp); + join_point_found = true; + break; + } + + remove_packet(q2, NULL, dp); + } + } + + // For enabled non-sparse streams, always require an overlap packet. + if (ds->eager && !join_point_found) { + MP_WARN(in, "stream %d: no joint point found\n", n); + goto failed; + } + } + + // Actually join the ranges. Now that we think it will work, mutate the + // data associated with the current range. We actually make the next range + // the current range. + + in->fw_bytes = 0; + + for (int n = 0; n < in->num_streams; n++) { + struct demux_queue *q1 = in->current_range->streams[n]; + struct demux_queue *q2 = next->streams[n]; + + struct demux_stream *ds = in->streams[n]->ds; + + if (q1->head) { + q1->tail->next = q2->head; + q2->head = q1->head; + if (!q2->head || !q2->head->next) + q2->tail = q2->head; + } + q2->next_prune_target = q1->next_prune_target; + q2->seek_start = q1->seek_start; + q2->correct_dts &= q1->correct_dts; + q2->correct_pos &= q1->correct_pos; + + q1->head = q1->tail = NULL; + q1->next_prune_target = NULL; + q1->keyframe_latest = NULL; + + assert(ds->queue == q1); + ds->queue = q2; + + recompute_buffers(ds); + in->fw_bytes += ds->fw_bytes; + + // For moving demuxer position. + ds->refreshing = true; + } + + next->seek_start = in->current_range->seek_start; + + // Move demuxing position to after the current range. + in->seeking = true; + in->seek_flags = SEEK_HR; + in->seek_pts = next->seek_end - 1.0; + + struct demux_cached_range *old = in->current_range; + set_current_range(in, next); + clear_cached_range(in, old); + + MP_VERBOSE(in, "ranges joined!\n"); + + next = NULL; +failed: + if (next) + clear_cached_range(in, next); + free_empty_cached_ranges(in); +} + // Determine seekable range when a packet is added. If dp==NULL, treat it as // EOF (i.e. closes the current block). // This has to deal with a number of corner cases, such as demuxers potentially // starting output at non-keyframes. +// Can join seek ranges, which messes with in->current_range and all. static void adjust_seek_range_on_packet(struct demux_stream *ds, struct demux_packet *dp) { + struct demux_queue *queue = ds->queue; + bool attempt_range_join = false; + if (!ds->in->seekable_cache) return; if (!dp || dp->keyframe) { - if (ds->keyframe_latest) { - ds->keyframe_latest->kf_seek_pts = ds->keyframe_pts; - if (ds->queue->seek_start == MP_NOPTS_VALUE) - ds->queue->seek_start = ds->keyframe_pts; - if (ds->keyframe_end_pts != MP_NOPTS_VALUE) - ds->queue->seek_end = ds->keyframe_end_pts; - update_seek_ranges(ds); + if (queue->keyframe_latest) { + queue->keyframe_latest->kf_seek_pts = queue->keyframe_pts; + double old_end = queue->range->seek_end; + if (queue->seek_start == MP_NOPTS_VALUE) + queue->seek_start = queue->keyframe_pts; + if (queue->keyframe_end_pts != MP_NOPTS_VALUE) + queue->seek_end = queue->keyframe_end_pts; + update_seek_ranges(queue->range); + attempt_range_join = queue->range->seek_end > old_end; } - ds->keyframe_latest = dp; - ds->keyframe_pts = ds->keyframe_end_pts = MP_NOPTS_VALUE; + queue->keyframe_latest = dp; + queue->keyframe_pts = queue->keyframe_end_pts = MP_NOPTS_VALUE; } if (dp) { @@ -740,9 +1064,12 @@ static void adjust_seek_range_on_packet(struct demux_stream *ds, if (dp->segmented && (ts < dp->start || ts > dp->end)) ts = MP_NOPTS_VALUE; - ds->keyframe_pts = MP_PTS_MIN(ds->keyframe_pts, ts); - ds->keyframe_end_pts = MP_PTS_MAX(ds->keyframe_end_pts, ts); + queue->keyframe_pts = MP_PTS_MIN(queue->keyframe_pts, ts); + queue->keyframe_end_pts = MP_PTS_MAX(queue->keyframe_end_pts, ts); } + + if (attempt_range_join) + attempt_range_joining(ds->in); } void demux_add_packet(struct sh_stream *stream, demux_packet_t *dp) @@ -755,17 +1082,20 @@ void demux_add_packet(struct sh_stream *stream, demux_packet_t *dp) struct demux_internal *in = ds->in; pthread_mutex_lock(&in->lock); + struct demux_queue *queue = ds->queue; + bool drop = ds->refreshing; if (ds->refreshing) { // Resume reading once the old position was reached (i.e. we start // returning packets where we left off before the refresh). // If it's the same position, drop, but continue normally next time. - if (ds->correct_dts) { - ds->refreshing = dp->dts < ds->last_dts; - } else if (ds->correct_pos) { - ds->refreshing = dp->pos < ds->last_pos; + if (queue->correct_dts) { + ds->refreshing = dp->dts < queue->last_dts; + } else if (queue->correct_pos) { + ds->refreshing = dp->pos < queue->last_pos; } else { ds->refreshing = false; // should not happen + MP_WARN(in, "stream %d: demux refreshing failed\n", ds->index); } } @@ -775,12 +1105,12 @@ void demux_add_packet(struct sh_stream *stream, demux_packet_t *dp) return; } - ds->correct_pos &= dp->pos >= 0 && dp->pos > ds->last_pos; - ds->correct_dts &= dp->dts != MP_NOPTS_VALUE && dp->dts > ds->last_dts; - ds->last_pos = dp->pos; - ds->last_dts = dp->dts; - ds->global_correct_pos &= ds->correct_pos; - ds->global_correct_dts &= ds->correct_dts; + queue->correct_pos &= dp->pos >= 0 && dp->pos > queue->last_pos; + queue->correct_dts &= dp->dts != MP_NOPTS_VALUE && dp->dts > queue->last_dts; + queue->last_pos = dp->pos; + queue->last_dts = dp->dts; + ds->global_correct_pos &= queue->correct_pos; + ds->global_correct_dts &= queue->correct_dts; dp->stream = stream->index; dp->next = NULL; @@ -800,17 +1130,15 @@ void demux_add_packet(struct sh_stream *stream, demux_packet_t *dp) in->fw_bytes += bytes; } - if (ds->queue->tail) { + if (queue->tail) { // next packet in stream - ds->queue->tail->next = dp; - ds->queue->tail = dp; + queue->tail->next = dp; + queue->tail = dp; } else { // first packet in stream - ds->queue->head = ds->queue->tail = dp; + queue->head = queue->tail = dp; } - adjust_seek_range_on_packet(ds, dp); - if (!ds->ignore_eof) { // obviously not true anymore ds->eof = false; @@ -825,15 +1153,17 @@ void demux_add_packet(struct sh_stream *stream, demux_packet_t *dp) double ts = dp->dts == MP_NOPTS_VALUE ? dp->pts : dp->dts; if (dp->segmented) ts = MP_PTS_MIN(ts, dp->end); - if (ts != MP_NOPTS_VALUE && (ts > ds->last_ts || ts + 10 < ds->last_ts)) - ds->last_ts = ts; + if (ts != MP_NOPTS_VALUE && (ts > queue->last_ts || ts + 10 < queue->last_ts)) + queue->last_ts = ts; if (ds->base_ts == MP_NOPTS_VALUE) - ds->base_ts = ds->last_ts; + ds->base_ts = queue->last_ts; MP_DBG(in, "append packet to %s: size=%d pts=%f dts=%f pos=%"PRIi64" " "[num=%zd size=%zd]\n", stream_type_name(stream->type), dp->len, dp->pts, dp->dts, dp->pos, ds->fw_packs, ds->fw_bytes); + adjust_seek_range_on_packet(ds, dp); + // Wake up if this was the first packet after start/possible underrun. if (ds->in->wakeup_cb && ds->reader_head && !ds->reader_head->next) ds->in->wakeup_cb(ds->in->wakeup_cb_ctx); @@ -847,20 +1177,23 @@ static bool read_packet(struct demux_internal *in) in->eof = false; in->idle = true; + if (!in->reading) + return false; + // Check if we need to read a new packet. We do this if all queues are below // the minimum, or if a stream explicitly needs new packets. Also includes // safe-guards against packet queue overflow. - bool active = false, read_more = false, prefetch_more = false; + bool read_more = false, prefetch_more = false; for (int n = 0; n < in->num_streams; n++) { struct demux_stream *ds = in->streams[n]->ds; - active |= ds->active; - read_more |= (ds->active && !ds->reader_head) || ds->refreshing; - if (ds->active && ds->last_ts != MP_NOPTS_VALUE && in->min_secs > 0 && - ds->base_ts != MP_NOPTS_VALUE && ds->last_ts >= ds->base_ts) - prefetch_more |= ds->last_ts - ds->base_ts < in->min_secs; - } - MP_DBG(in, "bytes=%zd, active=%d, read_more=%d prefetch_more=%d\n", - in->fw_bytes, active, read_more, prefetch_more); + read_more |= (ds->eager && !ds->reader_head) || ds->refreshing; + if (ds->eager && ds->queue->last_ts != MP_NOPTS_VALUE && + in->min_secs > 0 && ds->base_ts != MP_NOPTS_VALUE && + ds->queue->last_ts >= ds->base_ts) + prefetch_more |= ds->queue->last_ts - ds->base_ts < in->min_secs; + } + MP_DBG(in, "bytes=%zd, read_more=%d prefetch_more=%d\n", + in->fw_bytes, read_more, prefetch_more); if (in->fw_bytes >= in->max_bytes) { if (!read_more) return false; @@ -870,9 +1203,10 @@ static bool read_packet(struct demux_internal *in) for (int n = 0; n < in->num_streams; n++) { struct demux_stream *ds = in->streams[n]->ds; if (ds->selected) { - MP_WARN(in, " %s/%d: %zd packets, %zd bytes\n", + MP_WARN(in, " %s/%d: %zd packets, %zd bytes%s\n", stream_type_name(ds->type), n, - ds->fw_packs, ds->fw_bytes); + ds->fw_packs, ds->fw_bytes, + ds->eager ? "" : " (lazy)"); } } } @@ -938,19 +1272,24 @@ static bool read_packet(struct demux_internal *in) static void prune_old_packets(struct demux_internal *in) { + assert(in->current_range == in->ranges[in->num_ranges - 1]); + // It's not clear what the ideal way to prune old packets is. For now, we // prune the oldest packet runs, as long as the total cache amount is too // big. size_t max_bytes = in->seekable_cache ? in->max_bytes_bw : 0; while (in->total_bytes - in->fw_bytes > max_bytes) { + // (Start from least recently used range.) + struct demux_cached_range *range = in->ranges[0]; double earliest_ts = MP_NOPTS_VALUE; struct demux_stream *earliest_stream = NULL; - for (int n = 0; n < in->num_streams; n++) { - struct demux_stream *ds = in->streams[n]->ds; + for (int n = 0; n < range->num_streams; n++) { + struct demux_queue *queue = range->streams[n]; + struct demux_stream *ds = queue->ds; - if (ds->queue->head && ds->queue->head != ds->reader_head) { - struct demux_packet *dp = ds->queue->head; + if (queue->head && queue->head != ds->reader_head) { + struct demux_packet *dp = queue->head; double ts = dp->kf_seek_pts; // Note: in obscure cases, packets might have no timestamps set, // in which case we still need to prune _something_. @@ -967,6 +1306,7 @@ static void prune_old_packets(struct demux_internal *in) assert(earliest_stream); // incorrect accounting of buffered sizes? struct demux_stream *ds = earliest_stream; + struct demux_queue *queue = range->streams[ds->index]; // Prune all packets until the next keyframe or reader_head. Keeping // those packets would not help with seeking at all, so we strictly @@ -975,46 +1315,35 @@ static void prune_old_packets(struct demux_internal *in) // which in the worst case could be inside the forward buffer. The fact // that many keyframe ranges without keyframes exist (audio packets) // makes this much harder. - if (in->seekable_cache && !ds->next_prune_target) { + if (in->seekable_cache && !queue->next_prune_target) { // (Has to be _after_ queue->head to drop at least 1 packet.) - struct demux_packet *prev = ds->queue->head; - ds->queue->seek_start = MP_NOPTS_VALUE; - ds->next_prune_target = ds->queue->tail; // (prune all if none found) + struct demux_packet *prev = queue->head; + queue->seek_start = MP_NOPTS_VALUE; + queue->next_prune_target = queue->tail; // (prune all if none found) while (prev->next) { struct demux_packet *dp = prev->next; // Note that the next back_pts might be above the lowest buffered // packet, but it will still be only viable lowest seek target. if (dp->keyframe && dp->kf_seek_pts != MP_NOPTS_VALUE) { - ds->queue->seek_start = dp->kf_seek_pts; - ds->next_prune_target = prev; + queue->seek_start = dp->kf_seek_pts; + queue->next_prune_target = prev; break; } prev = prev->next; } - update_seek_ranges(ds); + update_seek_ranges(range); } bool done = false; - while (!done && ds->queue->head && ds->queue->head != ds->reader_head) { - struct demux_packet *dp = ds->queue->head; - - size_t bytes = demux_packet_estimate_total_size(dp); - MP_TRACE(in, "dropping backbuffer packet size %zd from stream %d\n", - bytes, ds->sh->index); - - ds->queue->head = dp->next; - if (!ds->queue->head) - ds->queue->tail = NULL; - if (ds->next_prune_target == dp) { - ds->next_prune_target = NULL; - done = true; - } - if (ds->keyframe_latest == dp) - ds->keyframe_latest = NULL; - talloc_free(dp); - in->total_bytes -= bytes; + while (!done && queue->head && queue->head != ds->reader_head) { + struct demux_packet *dp = queue->head; |