summaryrefslogtreecommitdiffstats
path: root/demux/demux.c
diff options
context:
space:
mode:
Diffstat (limited to 'demux/demux.c')
-rw-r--r--demux/demux.c95
1 files changed, 49 insertions, 46 deletions
diff --git a/demux/demux.c b/demux/demux.c
index f0512e5859..e8f9f0a1ea 100644
--- a/demux/demux.c
+++ b/demux/demux.c
@@ -223,8 +223,6 @@ struct demux_internal {
int num_ranges;
size_t total_bytes; // total sum of packet data buffered
- size_t fw_bytes; // sum of forward packet data in current_range
-
// Range from which decoder is reading, and to which demuxer is appending.
// This is never NULL. This is always ranges[num_ranges - 1].
struct demux_cached_range *current_range;
@@ -278,6 +276,8 @@ struct demux_queue {
struct demux_packet *next_prune_target; // cached value for faster pruning
+ uint64_t tail_cum_pos; // cumulative size including tail packet
+
bool correct_dts; // packet DTS is strictly monotonically increasing
bool correct_pos; // packet pos is strictly monotonically increasing
int64_t last_pos; // for determining correct_pos
@@ -333,7 +333,6 @@ struct demux_stream {
double last_br_ts; // timestamp of last packet bitrate was calculated
size_t last_br_bytes; // summed packet sizes since last bitrate calculation
double bitrate;
- size_t fw_bytes; // total bytes of packets in buffer (forward)
struct demux_packet *reader_head; // points at current decoder position
bool skip_to_keyframe;
bool attached_picture_added;
@@ -392,12 +391,18 @@ static struct demux_packet *advance_reader_head(struct demux_stream *ds);
static bool queue_seek(struct demux_internal *in, double seek_pts, int flags,
bool clear_back_state);
+static uint64_t get_foward_buffered_bytes(struct demux_stream *ds)
+{
+ if (!ds->reader_head)
+ return 0;
+ return ds->queue->tail_cum_pos - ds->reader_head->cum_pos;
+}
+
#if 0
// very expensive check for redundant cached queue state
static void check_queue_consistency(struct demux_internal *in)
{
- size_t total_bytes = 0;
- size_t total_fw_bytes = 0;
+ uint64_t total_bytes = 0;
assert(in->current_range && in->num_ranges > 0);
assert(in->current_range == in->ranges[in->num_ranges - 1]);
@@ -418,6 +423,7 @@ static void check_queue_consistency(struct demux_internal *in)
bool kf_found = false;
bool npt_found = false;
int next_index = 0;
+ uint64_t queue_total_bytes = 0;
for (struct demux_packet *dp = queue->head; dp; dp = dp->next) {
is_forward |= dp == queue->ds->reader_head;
kf_found |= dp == queue->keyframe_latest;
@@ -425,6 +431,7 @@ static void check_queue_consistency(struct demux_internal *in)
size_t bytes = demux_packet_estimate_total_size(dp);
total_bytes += bytes;
+ queue_total_bytes += bytes;
if (is_forward) {
fw_bytes += bytes;
assert(range == in->current_range);
@@ -442,20 +449,24 @@ static void check_queue_consistency(struct demux_internal *in)
assert(!queue->tail);
assert(next_index == queue->num_index);
+ uint64_t queue_total_bytes2 = 0;
+ if (queue->head)
+ queue_total_bytes2 = queue->tail_cum_pos - queue->head->cum_pos;
+
+ assert(queue_total_bytes == queue_total_bytes2);
+
// If the queue is currently used...
if (queue->ds->queue == queue) {
// ...reader_head and others must be in the queue.
assert(is_forward == !!queue->ds->reader_head);
assert(kf_found == !!queue->keyframe_latest);
+ uint64_t fw_bytes2 = get_foward_buffered_bytes(queue->ds);
+ assert(fw_bytes == fw_bytes2);
}
assert(npt_found == !!queue->next_prune_target);
- total_fw_bytes += fw_bytes;
-
- if (range == in->current_range) {
- assert(queue->ds->fw_bytes == fw_bytes);
- } else {
+ if (range != in->current_range) {
assert(fw_bytes == 0);
}
@@ -470,7 +481,6 @@ static void check_queue_consistency(struct demux_internal *in)
}
assert(in->total_bytes == total_bytes);
- assert(in->fw_bytes == total_fw_bytes);
}
#endif
@@ -524,15 +534,6 @@ static void mp_packet_tags_make_writable(struct mp_packet_tags **tags)
*tags = new;
}
-static void recompute_buffers(struct demux_stream *ds)
-{
- ds->fw_bytes = 0;
-
- for (struct demux_packet *dp = ds->reader_head; dp; dp = dp->next) {
- ds->fw_bytes += demux_packet_estimate_total_size(dp);
- }
-}
-
// (this doesn't do most required things for a switch, like updating ds->queue)
static void set_current_range(struct demux_internal *in,
struct demux_cached_range *range)
@@ -625,7 +626,7 @@ broken:
range->seek_start = range->seek_end = MP_NOPTS_VALUE;
}
-// Remove queue->head from the queue. Does not update in->fw_bytes.
+// Remove queue->head from the queue.
static void remove_head_packet(struct demux_queue *queue)
{
struct demux_packet *dp = queue->head;
@@ -719,9 +720,7 @@ static void free_empty_cached_ranges(struct demux_internal *in)
static void ds_clear_reader_queue_state(struct demux_stream *ds)
{
- ds->in->fw_bytes -= ds->fw_bytes;
ds->reader_head = NULL;
- ds->fw_bytes = 0;
ds->eof = false;
ds->need_wakeup = true;
}
@@ -1615,8 +1614,6 @@ static void attempt_range_joining(struct demux_internal *in)
// Actually join the ranges. Now that we think it will work, mutate the
// data associated with the current range.
- in->fw_bytes = 0;
-
for (int n = 0; n < in->num_streams; n++) {
struct demux_queue *q1 = in->current_range->streams[n];
struct demux_queue *q2 = next->streams[n];
@@ -1659,8 +1656,13 @@ static void attempt_range_joining(struct demux_internal *in)
ds->reader_head = join_point;
ds->skip_to_keyframe = false;
- recompute_buffers(ds);
- in->fw_bytes += ds->fw_bytes;
+ // Make the cum_pos values in all q2 packets continuous.
+ for (struct demux_packet *dp = join_point; dp; dp = dp->next) {
+ uint64_t next_pos = dp->next ? dp->next->cum_pos : q2->tail_cum_pos;
+ uint64_t size = next_pos - dp->cum_pos;
+ dp->cum_pos = q1->tail_cum_pos;
+ q1->tail_cum_pos += size;
+ }
// For moving demuxer position.
ds->refreshing = ds->selected;
@@ -1803,10 +1805,8 @@ static void add_packet_locked(struct sh_stream *stream, demux_packet_t *dp)
size_t bytes = demux_packet_estimate_total_size(dp);
in->total_bytes += bytes;
- if (ds->reader_head) {
- ds->fw_bytes += bytes;
- in->fw_bytes += bytes;
- }
+ dp->cum_pos = queue->tail_cum_pos;
+ queue->tail_cum_pos += bytes;
if (queue->tail) {
// next packet in stream
@@ -1834,9 +1834,10 @@ static void add_packet_locked(struct sh_stream *stream, demux_packet_t *dp)
ds->base_ts = queue->last_ts;
const char *num_pkts = queue->head == queue->tail ? "1" : ">1";
+ uint64_t fw_bytes = get_foward_buffered_bytes(ds);
MP_TRACE(in, "append packet to %s: size=%d pts=%f dts=%f pos=%"PRIi64" "
"[num=%s size=%zd]\n", stream_type_name(stream->type),
- dp->len, dp->pts, dp->dts, dp->pos, num_pkts, ds->fw_bytes);
+ dp->len, dp->pts, dp->dts, dp->pos, num_pkts, (size_t)fw_bytes);
adjust_seek_range_on_packet(ds, dp);
@@ -1916,6 +1917,7 @@ static bool read_packet(struct demux_internal *in)
// the minimum, or if a stream explicitly needs new packets. Also includes
// safe-guards against packet queue overflow.
bool read_more = false, prefetch_more = false, refresh_more = false;
+ uint64_t total_fw_bytes = 0;
for (int n = 0; n < in->num_streams; n++) {
struct demux_stream *ds = in->streams[n]->ds;
if (ds->eager) {
@@ -1929,11 +1931,12 @@ static bool read_packet(struct demux_internal *in)
ds->queue->last_ts >= ds->base_ts &&
!in->back_demuxing)
prefetch_more |= ds->queue->last_ts - ds->base_ts < in->min_secs;
+ total_fw_bytes += get_foward_buffered_bytes(ds);
}
MP_TRACE(in, "bytes=%zd, read_more=%d prefetch_more=%d, refresh_more=%d\n",
- in->fw_bytes, read_more, prefetch_more, refresh_more);
- if (in->fw_bytes >= in->max_bytes) {
+ (size_t)total_fw_bytes, read_more, prefetch_more, refresh_more);
+ if (total_fw_bytes >= in->max_bytes) {
// if we hit the limit just by prefetching, simply stop prefetching
if (!read_more)
return false;
@@ -1947,9 +1950,10 @@ static bool read_packet(struct demux_internal *in)
for (struct demux_packet *dp = ds->reader_head;
dp; dp = dp->next)
num_pkts++;
+ uint64_t fw_bytes = get_foward_buffered_bytes(ds);
MP_WARN(in, " %s/%d: %zd packets, %zd bytes%s%s\n",
stream_type_name(ds->type), n,
- num_pkts, ds->fw_bytes,
+ num_pkts, (size_t)fw_bytes,
ds->eager ? "" : " (lazy)",
ds->refreshing ? " (refreshing)" : "");
}
@@ -2020,7 +2024,15 @@ static void prune_old_packets(struct demux_internal *in)
// prune the oldest packet runs, as long as the total cache amount is too
// big.
size_t max_bytes = in->seekable_cache ? in->max_bytes_bw : 0;
- while (in->total_bytes - in->fw_bytes > max_bytes) {
+ while (1) {
+ uint64_t fw_bytes = 0;
+ for (int n = 0; n < in->num_streams; n++) {
+ struct demux_stream *ds = in->streams[n]->ds;
+ fw_bytes += get_foward_buffered_bytes(ds);
+ }
+ if (in->total_bytes - fw_bytes <= max_bytes)
+ break;
+
// (Start from least recently used range.)
struct demux_cached_range *range = in->ranges[0];
double earliest_ts = MP_NOPTS_VALUE;
@@ -2197,11 +2209,6 @@ static struct demux_packet *advance_reader_head(struct demux_stream *ds)
ds->reader_head = pkt->next;
- // Update cached packet queue state.
- size_t bytes = demux_packet_estimate_total_size(pkt);
- ds->fw_bytes -= bytes;
- ds->in->fw_bytes -= bytes;
-
ds->last_ret_pos = pkt->pos;
ds->last_ret_dts = pkt->dts;
@@ -2945,7 +2952,6 @@ static void clear_reader_state(struct demux_internal *in,
in->d_user->filepos = -1; // implicitly synchronized
in->blocked = false;
in->need_back_seek = false;
- assert(in->fw_bytes == 0);
}
// clear the packet queues
@@ -3138,9 +3144,6 @@ static void execute_cache_seek(struct demux_internal *in,
if (ds->reader_head)
ds->base_ts = MP_PTS_OR_DEF(ds->reader_head->pts, ds->reader_head->dts);
- recompute_buffers(ds);
- in->fw_bytes += ds->fw_bytes;
-
MP_VERBOSE(in, "seeking stream %d (%s) to ",
n, stream_type_name(ds->type));
@@ -3626,7 +3629,6 @@ void demux_get_reader_state(struct demuxer *demuxer, struct demux_reader_state *
.ts_end = MP_NOPTS_VALUE,
.ts_duration = -1,
.total_bytes = in->total_bytes,
- .fw_bytes = in->fw_bytes,
.seeking = in->seeking_in_progress,
.low_level_seeks = in->low_level_seeks,
.ts_last = in->demux_ts,
@@ -3641,6 +3643,7 @@ void demux_get_reader_state(struct demuxer *demuxer, struct demux_reader_state *
r->ts_end = MP_PTS_MAX(r->ts_end, ds->queue->last_ts);
any_packets |= !!ds->reader_head;
}
+ r->fw_bytes += get_foward_buffered_bytes(ds);
}
r->idle = (in->idle && !r->underrun) || r->eof;
r->underrun &= !r->idle && in->threading;