summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorwm4 <wm4@nowhere>2019-06-01 21:56:24 +0200
committerwm4 <wm4@nowhere>2019-09-19 20:37:05 +0200
commit976ee96e450487e8eb14e7e895d4c39bec14930c (patch)
treee67290d6940dfe150323e196d663f92d3b0ebe78
parente7db262450d499c7f511e3f7b2aed5af6419527d (diff)
downloadmpv-976ee96e450487e8eb14e7e895d4c39bec14930c.tar.bz2
mpv-976ee96e450487e8eb14e7e895d4c39bec14930c.tar.xz
demux: don't loop over all packets to find forward buffered size on seek
The size of all forward buffered packets is used to control maximum buffering. Until now, this size was incrementally adjusted, but had to be recomputed on seeks within the cache. Doing this was actually pretty expensive. It iterates over a linked list of separate memory allocations (which are probably spread all over the heap due to the allocation behavior), and the demux_packet_estimate_total_size() call touches a lot of further memory locations. I guess this affects the cache rather negatively. In an unscientific test, the recompute_buffers() function (which contained this loop) was responsible for roughly half of the time seeking took. Replace this with a way that computes the buffered size between 2 packets in constant times. The demux_packet.cum_pos field contains the summed sizes of all previous packets, so subtracting cum_pos between two packets yields the size of all packets in between. We can do this because we never remove packets from the middle of the queue. We only add packets to the end, or remove packets at the beginning. The tail_cum_pos field is needed because we don't store the end position of a packet, so the last packet's position would be unknown. We could recompute the "estimated" packet size, or store the estimated size in the packet struct, but I just didn't like this. This also removes the cached fw_bytes fields. It's slightly nicer to just recompute them when needed. Maintaining them incrementally was annoying. total_size stays though, since recomputing it isn't that cheap (would need to loop over all ranges every time). I'm always using uint64_t for sizes. This is certainly needed (a stream could easily burn through more than 4GB of data, even if much less of that is cached). The actual cached amount should always fit into size_t, so it's casted to size_t for printfs (yes, I hate the way you specify stdint.h types in printfs, the less I have to use that crap, the better).
-rw-r--r--demux/demux.c95
-rw-r--r--demux/packet.h1
2 files changed, 50 insertions, 46 deletions
diff --git a/demux/demux.c b/demux/demux.c
index f0512e5859..e8f9f0a1ea 100644
--- a/demux/demux.c
+++ b/demux/demux.c
@@ -223,8 +223,6 @@ struct demux_internal {
int num_ranges;
size_t total_bytes; // total sum of packet data buffered
- size_t fw_bytes; // sum of forward packet data in current_range
-
// Range from which decoder is reading, and to which demuxer is appending.
// This is never NULL. This is always ranges[num_ranges - 1].
struct demux_cached_range *current_range;
@@ -278,6 +276,8 @@ struct demux_queue {
struct demux_packet *next_prune_target; // cached value for faster pruning
+ uint64_t tail_cum_pos; // cumulative size including tail packet
+
bool correct_dts; // packet DTS is strictly monotonically increasing
bool correct_pos; // packet pos is strictly monotonically increasing
int64_t last_pos; // for determining correct_pos
@@ -333,7 +333,6 @@ struct demux_stream {
double last_br_ts; // timestamp of last packet bitrate was calculated
size_t last_br_bytes; // summed packet sizes since last bitrate calculation
double bitrate;
- size_t fw_bytes; // total bytes of packets in buffer (forward)
struct demux_packet *reader_head; // points at current decoder position
bool skip_to_keyframe;
bool attached_picture_added;
@@ -392,12 +391,18 @@ static struct demux_packet *advance_reader_head(struct demux_stream *ds);
static bool queue_seek(struct demux_internal *in, double seek_pts, int flags,
bool clear_back_state);
+static uint64_t get_foward_buffered_bytes(struct demux_stream *ds)
+{
+ if (!ds->reader_head)
+ return 0;
+ return ds->queue->tail_cum_pos - ds->reader_head->cum_pos;
+}
+
#if 0
// very expensive check for redundant cached queue state
static void check_queue_consistency(struct demux_internal *in)
{
- size_t total_bytes = 0;
- size_t total_fw_bytes = 0;
+ uint64_t total_bytes = 0;
assert(in->current_range && in->num_ranges > 0);
assert(in->current_range == in->ranges[in->num_ranges - 1]);
@@ -418,6 +423,7 @@ static void check_queue_consistency(struct demux_internal *in)
bool kf_found = false;
bool npt_found = false;
int next_index = 0;
+ uint64_t queue_total_bytes = 0;
for (struct demux_packet *dp = queue->head; dp; dp = dp->next) {
is_forward |= dp == queue->ds->reader_head;
kf_found |= dp == queue->keyframe_latest;
@@ -425,6 +431,7 @@ static void check_queue_consistency(struct demux_internal *in)
size_t bytes = demux_packet_estimate_total_size(dp);
total_bytes += bytes;
+ queue_total_bytes += bytes;
if (is_forward) {
fw_bytes += bytes;
assert(range == in->current_range);
@@ -442,20 +449,24 @@ static void check_queue_consistency(struct demux_internal *in)
assert(!queue->tail);
assert(next_index == queue->num_index);
+ uint64_t queue_total_bytes2 = 0;
+ if (queue->head)
+ queue_total_bytes2 = queue->tail_cum_pos - queue->head->cum_pos;
+
+ assert(queue_total_bytes == queue_total_bytes2);
+
// If the queue is currently used...
if (queue->ds->queue == queue) {
// ...reader_head and others must be in the queue.
assert(is_forward == !!queue->ds->reader_head);
assert(kf_found == !!queue->keyframe_latest);
+ uint64_t fw_bytes2 = get_foward_buffered_bytes(queue->ds);
+ assert(fw_bytes == fw_bytes2);
}
assert(npt_found == !!queue->next_prune_target);
- total_fw_bytes += fw_bytes;
-
- if (range == in->current_range) {
- assert(queue->ds->fw_bytes == fw_bytes);
- } else {
+ if (range != in->current_range) {
assert(fw_bytes == 0);
}
@@ -470,7 +481,6 @@ static void check_queue_consistency(struct demux_internal *in)
}
assert(in->total_bytes == total_bytes);
- assert(in->fw_bytes == total_fw_bytes);
}
#endif
@@ -524,15 +534,6 @@ static void mp_packet_tags_make_writable(struct mp_packet_tags **tags)
*tags = new;
}
-static void recompute_buffers(struct demux_stream *ds)
-{
- ds->fw_bytes = 0;
-
- for (struct demux_packet *dp = ds->reader_head; dp; dp = dp->next) {
- ds->fw_bytes += demux_packet_estimate_total_size(dp);
- }
-}
-
// (this doesn't do most required things for a switch, like updating ds->queue)
static void set_current_range(struct demux_internal *in,
struct demux_cached_range *range)
@@ -625,7 +626,7 @@ broken:
range->seek_start = range->seek_end = MP_NOPTS_VALUE;
}
-// Remove queue->head from the queue. Does not update in->fw_bytes.
+// Remove queue->head from the queue.
static void remove_head_packet(struct demux_queue *queue)
{
struct demux_packet *dp = queue->head;
@@ -719,9 +720,7 @@ static void free_empty_cached_ranges(struct demux_internal *in)
static void ds_clear_reader_queue_state(struct demux_stream *ds)
{
- ds->in->fw_bytes -= ds->fw_bytes;
ds->reader_head = NULL;
- ds->fw_bytes = 0;
ds->eof = false;
ds->need_wakeup = true;
}
@@ -1615,8 +1614,6 @@ static void attempt_range_joining(struct demux_internal *in)
// Actually join the ranges. Now that we think it will work, mutate the
// data associated with the current range.
- in->fw_bytes = 0;
-
for (int n = 0; n < in->num_streams; n++) {
struct demux_queue *q1 = in->current_range->streams[n];
struct demux_queue *q2 = next->streams[n];
@@ -1659,8 +1656,13 @@ static void attempt_range_joining(struct demux_internal *in)
ds->reader_head = join_point;
ds->skip_to_keyframe = false;
- recompute_buffers(ds);
- in->fw_bytes += ds->fw_bytes;
+ // Make the cum_pos values in all q2 packets continuous.
+ for (struct demux_packet *dp = join_point; dp; dp = dp->next) {
+ uint64_t next_pos = dp->next ? dp->next->cum_pos : q2->tail_cum_pos;
+ uint64_t size = next_pos - dp->cum_pos;
+ dp->cum_pos = q1->tail_cum_pos;
+ q1->tail_cum_pos += size;
+ }
// For moving demuxer position.
ds->refreshing = ds->selected;
@@ -1803,10 +1805,8 @@ static void add_packet_locked(struct sh_stream *stream, demux_packet_t *dp)
size_t bytes = demux_packet_estimate_total_size(dp);
in->total_bytes += bytes;
- if (ds->reader_head) {
- ds->fw_bytes += bytes;
- in->fw_bytes += bytes;
- }
+ dp->cum_pos = queue->tail_cum_pos;
+ queue->tail_cum_pos += bytes;
if (queue->tail) {
// next packet in stream
@@ -1834,9 +1834,10 @@ static void add_packet_locked(struct sh_stream *stream, demux_packet_t *dp)
ds->base_ts = queue->last_ts;
const char *num_pkts = queue->head == queue->tail ? "1" : ">1";
+ uint64_t fw_bytes = get_foward_buffered_bytes(ds);
MP_TRACE(in, "append packet to %s: size=%d pts=%f dts=%f pos=%"PRIi64" "
"[num=%s size=%zd]\n", stream_type_name(stream->type),
- dp->len, dp->pts, dp->dts, dp->pos, num_pkts, ds->fw_bytes);
+ dp->len, dp->pts, dp->dts, dp->pos, num_pkts, (size_t)fw_bytes);
adjust_seek_range_on_packet(ds, dp);
@@ -1916,6 +1917,7 @@ static bool read_packet(struct demux_internal *in)
// the minimum, or if a stream explicitly needs new packets. Also includes
// safe-guards against packet queue overflow.
bool read_more = false, prefetch_more = false, refresh_more = false;
+ uint64_t total_fw_bytes = 0;
for (int n = 0; n < in->num_streams; n++) {
struct demux_stream *ds = in->streams[n]->ds;
if (ds->eager) {
@@ -1929,11 +1931,12 @@ static bool read_packet(struct demux_internal *in)
ds->queue->last_ts >= ds->base_ts &&
!in->back_demuxing)
prefetch_more |= ds->queue->last_ts - ds->base_ts < in->min_secs;
+ total_fw_bytes += get_foward_buffered_bytes(ds);
}
MP_TRACE(in, "bytes=%zd, read_more=%d prefetch_more=%d, refresh_more=%d\n",
- in->fw_bytes, read_more, prefetch_more, refresh_more);
- if (in->fw_bytes >= in->max_bytes) {
+ (size_t)total_fw_bytes, read_more, prefetch_more, refresh_more);
+ if (total_fw_bytes >= in->max_bytes) {
// if we hit the limit just by prefetching, simply stop prefetching
if (!read_more)
return false;
@@ -1947,9 +1950,10 @@ static bool read_packet(struct demux_internal *in)
for (struct demux_packet *dp = ds->reader_head;
dp; dp = dp->next)
num_pkts++;
+ uint64_t fw_bytes = get_foward_buffered_bytes(ds);
MP_WARN(in, " %s/%d: %zd packets, %zd bytes%s%s\n",
stream_type_name(ds->type), n,
- num_pkts, ds->fw_bytes,
+ num_pkts, (size_t)fw_bytes,
ds->eager ? "" : " (lazy)",
ds->refreshing ? " (refreshing)" : "");
}
@@ -2020,7 +2024,15 @@ static void prune_old_packets(struct demux_internal *in)
// prune the oldest packet runs, as long as the total cache amount is too
// big.
size_t max_bytes = in->seekable_cache ? in->max_bytes_bw : 0;
- while (in->total_bytes - in->fw_bytes > max_bytes) {
+ while (1) {
+ uint64_t fw_bytes = 0;
+ for (int n = 0; n < in->num_streams; n++) {
+ struct demux_stream *ds = in->streams[n]->ds;
+ fw_bytes += get_foward_buffered_bytes(ds);
+ }
+ if (in->total_bytes - fw_bytes <= max_bytes)
+ break;
+
// (Start from least recently used range.)
struct demux_cached_range *range = in->ranges[0];
double earliest_ts = MP_NOPTS_VALUE;
@@ -2197,11 +2209,6 @@ static struct demux_packet *advance_reader_head(struct demux_stream *ds)
ds->reader_head = pkt->next;
- // Update cached packet queue state.
- size_t bytes = demux_packet_estimate_total_size(pkt);
- ds->fw_bytes -= bytes;
- ds->in->fw_bytes -= bytes;
-
ds->last_ret_pos = pkt->pos;
ds->last_ret_dts = pkt->dts;
@@ -2945,7 +2952,6 @@ static void clear_reader_state(struct demux_internal *in,
in->d_user->filepos = -1; // implicitly synchronized
in->blocked = false;
in->need_back_seek = false;
- assert(in->fw_bytes == 0);
}
// clear the packet queues
@@ -3138,9 +3144,6 @@ static void execute_cache_seek(struct demux_internal *in,
if (ds->reader_head)
ds->base_ts = MP_PTS_OR_DEF(ds->reader_head->pts, ds->reader_head->dts);
- recompute_buffers(ds);
- in->fw_bytes += ds->fw_bytes;
-
MP_VERBOSE(in, "seeking stream %d (%s) to ",
n, stream_type_name(ds->type));
@@ -3626,7 +3629,6 @@ void demux_get_reader_state(struct demuxer *demuxer, struct demux_reader_state *
.ts_end = MP_NOPTS_VALUE,
.ts_duration = -1,
.total_bytes = in->total_bytes,
- .fw_bytes = in->fw_bytes,
.seeking = in->seeking_in_progress,
.low_level_seeks = in->low_level_seeks,
.ts_last = in->demux_ts,
@@ -3641,6 +3643,7 @@ void demux_get_reader_state(struct demuxer *demuxer, struct demux_reader_state *
r->ts_end = MP_PTS_MAX(r->ts_end, ds->queue->last_ts);
any_packets |= !!ds->reader_head;
}
+ r->fw_bytes += get_foward_buffered_bytes(ds);
}
r->idle = (in->idle && !r->underrun) || r->eof;
r->underrun &= !r->idle && in->threading;
diff --git a/demux/packet.h b/demux/packet.h
index 9e28ed52d0..6822bce2c1 100644
--- a/demux/packet.h
+++ b/demux/packet.h
@@ -48,6 +48,7 @@ typedef struct demux_packet {
// private
struct demux_packet *next;
struct AVPacket *avpacket; // keep the buffer allocation and sidedata
+ uint64_t cum_pos; // demux.c internal: cumulative size until _start_ of pkt
double kf_seek_pts; // demux.c internal: seek pts for keyframe range
struct mp_packet_tags *metadata; // timed metadata (demux.c internal)
} demux_packet_t;