From f37f4de8496556afaa024e39e2efb433eb1680d4 Mon Sep 17 00:00:00 2001 From: wm4 Date: Wed, 6 Nov 2019 21:36:02 +0100 Subject: stream: turn into a ring buffer, make size configurable In some corner cases (see #6802), it can be beneficial to use a larger stream buffer size. Use this as argument to rewrite everything for no reason. Turn stream.c itself into a ring buffer, with configurable size. The latter would have been easily achievable with minimal changes, and the ring buffer is the hard part. There is no reason to have a ring buffer at all, except possibly if ffmpeg don't fix their awful mp4 demuxer, and some subtle issues with demux_mkv.c wanting to seek back by small offsets (the latter was handled with small stream_peek() calls, which are unneeded now). In addition, this turns small forward seeks into reads (where data is simply skipped). Before this commit, only stream_skip() did this (which also mean that stream_skip() simply calls stream_seek() now). Replace all stream_peek() calls with something else (usually stream_read_peek()). The function was a problem, because it returned a pointer to the internal buffer, which is now a ring buffer with wrapping. The new function just copies the data into a buffer, and in some cases requires callers to dynamically allocate memory. (The most common case, demux_lavf.c, required a separate buffer allocation anyway due to FFmpeg "idiosyncrasies".) This is the bulk of the demuxer_* changes. I'm not happy with this. There still isn't a good reason why there should be a ring buffer, that is complex, and most of the time just wastes half of the available memory. Maybe another rewrite soon. It also contains bugs; you're an alpha tester now. --- DOCS/man/options.rst | 26 ++++ demux/demux_cue.c | 5 +- demux/demux_disc.c | 2 +- demux/demux_edl.c | 4 +- demux/demux_lavf.c | 7 +- demux/demux_libarchive.c | 8 +- demux/demux_mkv.c | 10 +- demux/demux_playlist.c | 18 ++- options/options.c | 1 + options/options.h | 2 + stream/stream.c | 364 +++++++++++++++++++++++++++++------------------ stream/stream.h | 46 ++++-- wscript | 2 +- 13 files changed, 317 insertions(+), 178 deletions(-) diff --git a/DOCS/man/options.rst b/DOCS/man/options.rst index 8dcbe1bcd8..3ae3264969 100644 --- a/DOCS/man/options.rst +++ b/DOCS/man/options.rst @@ -4196,6 +4196,32 @@ Cache Currently, this is used for ``--cache-on-disk`` only. +``--stream-buffer-size=`` + Size of the low level stream byte buffer (default: 4KB). This is used as + buffer between demuxer and low level I/O (e.g. sockets). Generally, this + can be very small, and the main purpose is similar to the internal buffer + FILE in the C standard library will have. + + Half of the buffer is always used for guaranteed seek back, which is + important for unseekable input. + + There are known cases where this can help performance to set a large buffer: + + 1. mp4 files. libavformat may trigger many small seeks in both + directions, depending on how the file was muxed. + + 2. Certain network filesystems, which do not have a cache, and where + small reads can be inefficient. + + In other cases, setting this to a large value can reduce performance. + + Usually, read accesses are at half the buffer size, but it may happen that + accesses are done alternating with smaller and larger sizes (this is due to + the internal ring buffer wrap-around). + + See ``--list-options`` for defaults and value range. ```` options + accept suffixes such as ``KiB`` and ``MiB``. + Network ------- diff --git a/demux/demux_cue.c b/demux/demux_cue.c index 59628c74b4..35874972b9 100644 --- a/demux/demux_cue.c +++ b/demux/demux_cue.c @@ -265,8 +265,9 @@ static int try_open_file(struct demuxer *demuxer, enum demux_check check) struct stream *s = demuxer->stream; if (check >= DEMUX_CHECK_UNSAFE) { - bstr d = stream_peek(s, PROBE_SIZE); - if (d.len < 1 || !mp_probe_cue(d)) + char probe[PROBE_SIZE]; + int len = stream_read_peek(s, probe, sizeof(probe)); + if (len < 1 || !mp_probe_cue((bstr){probe, len})) return -1; } struct priv *p = talloc_zero(demuxer, struct priv); diff --git a/demux/demux_disc.c b/demux/demux_disc.c index 13e73caa39..919360d074 100644 --- a/demux/demux_disc.c +++ b/demux/demux_disc.c @@ -321,7 +321,7 @@ static int d_open(demuxer_t *demuxer, enum demux_check check) // Initialize the playback time. We need to read _some_ data to get the // correct stream-layer time (at least with libdvdnav). - stream_peek(demuxer->stream, 1); + stream_read_peek(demuxer->stream, &(char){0}, 1); reset_pts(demuxer); p->slave = demux_open_url("-", ¶ms, demuxer->cancel, demuxer->global); diff --git a/demux/demux_edl.c b/demux/demux_edl.c index 62e6c50070..aa650a383f 100644 --- a/demux/demux_edl.c +++ b/demux/demux_edl.c @@ -459,7 +459,9 @@ static int try_open_file(struct demuxer *demuxer, enum demux_check check) return 0; } if (check >= DEMUX_CHECK_UNSAFE) { - if (!bstr_equals0(stream_peek(s, strlen(HEADER)), HEADER)) + char header[sizeof(HEADER) - 1]; + int len = stream_read_peek(s, header, sizeof(header)); + if (len != strlen(HEADER) || memcmp(header, HEADER, len) != 0) return -1; } p->data = stream_read_complete(s, demuxer, 1000000); diff --git a/demux/demux_lavf.c b/demux/demux_lavf.c index 670b611375..9ac6495f15 100644 --- a/demux/demux_lavf.c +++ b/demux/demux_lavf.c @@ -458,11 +458,10 @@ static int lavf_check_file(demuxer_t *demuxer, enum demux_check check) } else { int nsize = av_clip(avpd.buf_size * 2, INITIAL_PROBE_SIZE, PROBE_BUF_SIZE); - bstr buf = stream_peek(s, nsize); - if (buf.len <= avpd.buf_size) + nsize = stream_read_peek(s, avpd.buf, nsize); + if (nsize <= avpd.buf_size) final_probe = true; - memcpy(avpd.buf, buf.start, buf.len); - avpd.buf_size = buf.len; + avpd.buf_size = nsize; priv->avif = av_probe_input_format2(&avpd, avpd.buf_size > 0, &score); } diff --git a/demux/demux_libarchive.c b/demux/demux_libarchive.c index 3540082179..bb2ca24862 100644 --- a/demux/demux_libarchive.c +++ b/demux/demux_libarchive.c @@ -43,15 +43,17 @@ static int open_file(struct demuxer *demuxer, enum demux_check check) probe_size *= 100; } - bstr probe = stream_peek(demuxer->stream, probe_size); - if (probe.len == 0) + void *probe = ta_alloc_size(NULL, probe_size); + if (!probe) return -1; + int probe_got = stream_read_peek(demuxer->stream, probe, probe_size); struct stream *probe_stream = - stream_memory_open(demuxer->global, probe.start, probe.len); + stream_memory_open(demuxer->global, probe, probe_got); struct mp_archive *mpa = mp_archive_new(mp_null_log, probe_stream, flags); bool ok = !!mpa; free_stream(probe_stream); mp_archive_free(mpa); + ta_free(probe); if (!ok) return -1; diff --git a/demux/demux_mkv.c b/demux/demux_mkv.c index 5eaa7bbb07..db1ccff13a 100644 --- a/demux/demux_mkv.c +++ b/demux/demux_mkv.c @@ -1999,13 +1999,9 @@ static int demux_mkv_open(demuxer_t *demuxer, enum demux_check check) if (demuxer->params) mkv_d->probably_webm_dash_init = demuxer->params->init_fragment.len > 0; - bstr start = stream_peek(s, 4); - uint32_t start_id = 0; - for (int n = 0; n < start.len; n++) - start_id = (start_id << 8) | start.start[n]; - if (start_id != EBML_ID_EBML) + // Make sure you can seek back after read_ebml_header() if no EBML ID. + if (stream_read_peek(s, &(char[4]){0}, 4) != 4) return -1; - if (!read_ebml_header(demuxer)) return -1; MP_DBG(demuxer, "Found the head...\n"); @@ -2027,7 +2023,6 @@ static int demux_mkv_open(demuxer_t *demuxer, enum demux_check check) while (1) { start_pos = stream_tell(s); - stream_peek(s, 4); // make sure we can always seek back uint32_t id = ebml_read_id(s); if (s->eof) { if (!mkv_d->probably_webm_dash_init) @@ -2836,7 +2831,6 @@ static int read_next_block_into_queue(demuxer_t *demuxer) find_next_cluster: mkv_d->cluster_end = 0; for (;;) { - stream_peek(s, 4); // guarantee we can undo ebml_read_id() below mkv_d->cluster_start = stream_tell(s); uint32_t id = ebml_read_id(s); if (id == MATROSKA_ID_CLUSTER) diff --git a/demux/demux_playlist.c b/demux/demux_playlist.c index 95c89a18e2..a7650b2b69 100644 --- a/demux/demux_playlist.c +++ b/demux/demux_playlist.c @@ -92,12 +92,13 @@ static int read_characters(stream_t *s, uint8_t *dst, int dstsize, int utf16) } return cur - dst; } else { - bstr buf = stream_peek_buffer(s); - uint8_t *end = memchr(buf.start, '\n', buf.len); - int len = end ? end - buf.start + 1 : buf.len; + uint8_t buf[1024]; + int buf_len = stream_read_peek(s, buf, sizeof(buf)); + uint8_t *end = memchr(buf, '\n', buf_len); + int len = end ? end - buf + 1 : buf_len; if (len > dstsize) return -1; // line too long - memcpy(dst, buf.start, len); + memcpy(dst, buf, len); stream_skip(s, len); return len; } @@ -178,7 +179,9 @@ static int parse_m3u(struct pl_parser *p) // Last resort: if the file extension is m3u, it might be headerless. if (p->check_level == DEMUX_CHECK_UNSAFE) { char *ext = mp_splitext(p->real_stream->url, NULL); - bstr data = stream_peek(p->real_stream, PROBE_SIZE); + char probe[PROBE_SIZE]; + int len = stream_read_peek(p->real_stream, probe, sizeof(probe)); + bstr data = {probe, len}; if (ext && data.len > 10 && maybe_text(data)) { const char *exts[] = {"m3u", "m3u8", NULL}; for (int n = 0; exts[n]; n++) { @@ -437,8 +440,9 @@ static int open_file(struct demuxer *demuxer, enum demux_check check) p->real_stream = demuxer->stream; p->add_base = true; - bstr probe_buf = stream_peek(demuxer->stream, PROBE_SIZE); - p->s = stream_memory_open(demuxer->global, probe_buf.start, probe_buf.len); + char probe[PROBE_SIZE]; + int probe_len = stream_read_peek(p->real_stream, probe, sizeof(probe)); + p->s = stream_memory_open(demuxer->global, probe, probe_len); p->s->mime_type = demuxer->stream->mime_type; p->utf16 = stream_skip_bom(p->s); p->force = force; diff --git a/options/options.c b/options/options.c index 4fbf9d6401..79ae3ca5ab 100644 --- a/options/options.c +++ b/options/options.c @@ -711,6 +711,7 @@ const m_option_t mp_opts[] = { OPT_SUBSTRUCT("", vo, vo_sub_opts, 0), OPT_SUBSTRUCT("", demux_opts, demux_conf, 0), OPT_SUBSTRUCT("", demux_cache_opts, demux_cache_conf, 0), + OPT_SUBSTRUCT("", stream_opts, stream_conf, 0), OPT_SUBSTRUCT("", gl_video_opts, gl_video_conf, 0), OPT_SUBSTRUCT("", spirv_opts, spirv_conf, 0), diff --git a/options/options.h b/options/options.h index 441d06b285..7af86ab9fb 100644 --- a/options/options.h +++ b/options/options.h @@ -307,6 +307,7 @@ typedef struct MPOpts { struct demux_opts *demux_opts; struct demux_cache_opts *demux_cache_opts; + struct stream_opts *stream_opts; struct vd_lavc_params *vd_lavc_params; struct ad_lavc_params *ad_lavc_params; @@ -360,5 +361,6 @@ extern const struct m_sub_options mp_subtitle_sub_opts; extern const struct m_sub_options mp_osd_render_sub_opts; extern const struct m_sub_options filter_conf; extern const struct m_sub_options resample_conf; +extern const struct m_sub_options stream_conf; #endif diff --git a/stream/stream.c b/stream/stream.c index 063c033bd4..143c1a28e5 100644 --- a/stream/stream.c +++ b/stream/stream.c @@ -33,6 +33,7 @@ #include "misc/bstr.h" #include "misc/thread_tools.h" #include "common/msg.h" +#include "options/m_config.h" #include "options/options.h" #include "options/path.h" #include "osdep/timer.h" @@ -94,7 +95,23 @@ static const stream_info_t *const stream_list[] = { NULL }; -static bool stream_seek_unbuffered(stream_t *s, int64_t newpos); +struct stream_opts { + int64_t buffer_size; +}; + +#define OPT_BASE_STRUCT struct stream_opts + +const struct m_sub_options stream_conf = { + .opts = (const struct m_option[]){ + OPT_BYTE_SIZE("stream-buffer-size", buffer_size, 0, + STREAM_FIXED_BUFFER_SIZE, 512 * 1024 * 1024), + {0} + }, + .size = sizeof(struct stream_opts), + .defaults = &(const struct stream_opts){ + .buffer_size = STREAM_FIXED_BUFFER_SIZE, + }, +}; // return -1 if not hex char static int hex2dec(char c) @@ -181,35 +198,91 @@ static const char *match_proto(const char *url, const char *proto) return NULL; } -// Resize the current stream buffer, or do nothing if the size is adequate. -// Caller must ensure the used buffer is not less than the new buffer size. -// Calling this with 0 ensures it uses the default buffer size. -static void stream_resize_buffer(struct stream *s, int new) +// Read len bytes from the start position, and wrap around as needed. Limit the +// a actually read data to the size of the buffer. Return amount of copied bytes. +// len: max bytes to copy to dst +// pos: index into s->buffer[], e.g. s->buf_start is byte 0 +// returns: bytes copied to dst (limited by len and available buffered data) +static int ring_copy(struct stream *s, void *dst, int len, int pos) { - new = MPMAX(new, STREAM_BUFFER_SIZE); + assert(len >= 0); - if (new == s->buffer_alloc) - return; + if (pos < s->buf_start || pos > s->buf_end) + return 0; - int buffer_used = s->buf_len - s->buf_pos; - assert(buffer_used <= new); + int copied = 0; + len = MPMIN(len, s->buf_end - pos); - void *nbuf = s->buffer_inline; - if (new > STREAM_BUFFER_SIZE) - nbuf = ta_alloc_size(s, new); + if (len && pos <= s->buffer_mask) { + int copy = MPMIN(len, s->buffer_mask + 1 - pos); + memcpy(dst, &s->buffer[pos], copy); + copied += copy; + len -= copy; + pos += copy; + } - if (nbuf) { - if (s->buffer) - memmove(nbuf, &s->buffer[s->buf_pos], buffer_used); - s->buf_pos = 0; - s->buf_len = buffer_used; + if (len) { + memcpy((char *)dst + copied, &s->buffer[pos & s->buffer_mask], len); + copied += len; + } - if (s->buffer != s->buffer_inline) - ta_free(s->buffer); + return copied; +} - s->buffer = nbuf; - s->buffer_alloc = new; +// Resize the current stream buffer. Uses a larger size if needed to keep data. +// Does nothing if the size is adequate. Calling this with 0 ensures it uses the +// default buffer size if possible. +// The caller must check whether enough data was really allocated. +// Returns false if buffer allocation failed. +static bool stream_resize_buffer(struct stream *s, uint32_t new) +{ + // Keep all valid buffer. + int old_used_len = s->buf_end - s->buf_start; + int old_pos = s->buf_cur - s->buf_start; + new = MPMAX(new, old_used_len); + + new = MPMAX(new, s->requested_buffer_size); + + // This much is always required. + new = MPMAX(new, STREAM_FIXED_BUFFER_SIZE); + + new = mp_round_next_power_of_2(new); + if (!new || new > INT_MAX / 8) + return false; + + if (new == s->buffer_mask + 1) + return true; + + MP_DBG(s, "resize stream to %d bytes\n", new); + + uint8_t *nbuf = s->buffer_inline; + if (new > STREAM_FIXED_BUFFER_SIZE) { + nbuf = ta_alloc_size(s, new); + } else { + static_assert(MP_IS_POWER_OF_2(STREAM_FIXED_BUFFER_SIZE), ""); + assert(new == STREAM_FIXED_BUFFER_SIZE); } + assert(nbuf != s->buffer); + + if (!nbuf) + return false; // oom; tolerate it, caller needs to check if required + + int new_len = 0; + if (s->buffer) + new_len = ring_copy(s, nbuf, new, s->buf_start); + assert(new_len == old_used_len); + assert(old_pos <= old_used_len); + s->buf_start = 0; + s->buf_cur = old_pos; + s->buf_end = new_len; + + if (s->buffer != s->buffer_inline) + ta_free(s->buffer); + + s->buffer = nbuf; + s->buffer_mask = new - 1; + + return true; } static int stream_create_instance(const stream_info_t *sinfo, @@ -236,6 +309,9 @@ static int stream_create_instance(const stream_info_t *sinfo, if (!path) return STREAM_NO_MATCH; + struct stream_opts *opts = + mp_get_config_group(NULL, args->global, &stream_conf); + stream_t *s = talloc_zero(NULL, stream_t); s->global = args->global; if (flags & STREAM_SILENT) { @@ -249,11 +325,14 @@ static int stream_create_instance(const stream_info_t *sinfo, s->path = talloc_strdup(s, path); s->is_network = sinfo->is_network; s->mode = flags & (STREAM_READ | STREAM_WRITE); + s->requested_buffer_size = opts->buffer_size; int opt; mp_read_option_raw(s->global, "access-references", &m_option_type_flag, &opt); s->access_references = opt; + talloc_free(opts); + MP_VERBOSE(s, "Opening %s\n", url); if (strlen(url) > INT_MAX / 8) { @@ -282,8 +361,10 @@ static int stream_create_instance(const stream_info_t *sinfo, if (!s->read_chunk) s->read_chunk = 4 * STREAM_BUFFER_SIZE; - stream_resize_buffer(s, 0); - MP_HANDLE_OOM(s->buffer); + if (!stream_resize_buffer(s, 0)) { + free_stream(s); + return STREAM_ERROR; + } assert(s->seekable == !!s->seek); @@ -391,78 +472,99 @@ static int stream_read_unbuffered(stream_t *s, void *buf, int len) return res; } -// Ask for having "total" bytes ready to read in the stream buffer. This can do -// a partial read if requested, so it can actually read less. +// Ask for having at most "forward" bytes ready to read in the buffer. // To read everything, you may have to call this in a loop. -// total: desired amount of bytes in buffer -// allow_short: if true, attempt at most once to read more if needed -// returns: actual bytes in buffer (can be smaller or larger than total) -static int stream_extend_buffer(struct stream *s, int total, bool allow_short) -{ - assert(total >= 0); - - if (s->buf_len - s->buf_pos < total) { - // Move to front to guarantee we really can read up to max size. - s->buf_len = s->buf_len - s->buf_pos; - memmove(s->buffer, &s->buffer[s->buf_pos], s->buf_len); - s->buf_pos = 0; - - // Read ahead by about as much as stream_fill_buffer() would, to avoid - // that many small stream_peek() calls will read the buffer at these - // quantities. - total = MPMAX(total, STREAM_BUFFER_SIZE); - - // Allocate more if the buffer is too small. Also, if the buffer is - // larger than needed, resize it to smaller. This assumes stream_peek() - // calls are rare or done with small sizes. - stream_resize_buffer(s, total); - - // Read less if allocation above failed. - total = MPMIN(total, s->buffer_alloc); - - // Fill rest of the buffer. Can be partial. - while (total > s->buf_len) { - int read = stream_read_unbuffered(s, &s->buffer[s->buf_len], - total - s->buf_len); - s->buf_len += read; - if (allow_short || !read) - break; - } +// forward: desired amount of bytes in buffer after s->cur_pos +// returns: progress (false on EOF or memory allocation failure) +static bool stream_read_more(struct stream *s, int forward) +{ + assert(forward >= 0); + + int forward_avail = s->buf_end - s->buf_cur; + if (forward_avail >= forward) + return true; + + // Avoid that many small reads will lead to many low-level read calls. + forward = MPMAX(forward, s->requested_buffer_size / 2); + + // Keep guaranteed seek-back. + int buf_old = MPMIN(s->buf_cur - s->buf_start, s->requested_buffer_size / 2); - if (s->buf_len) - s->eof = 0; + if (!stream_resize_buffer(s, buf_old + forward)) + return false; + + int buf_alloc = s->buffer_mask + 1; + + assert(s->buf_start <= s->buf_cur); + assert(s->buf_cur <= s->buf_end); + assert(s->buf_cur < buf_alloc * 2); + assert(s->buf_end < buf_alloc * 2); + assert(s->buf_start < buf_alloc); + + // Note: read as much as possible, even if forward is much smaller. Do + // this because the stream buffer is supposed to set an approx. minimum + // read size on it. + int read = buf_alloc - buf_old - forward_avail; // free buffer past end + + int pos = s->buf_end & s->buffer_mask; + read = MPMIN(read, buf_alloc - pos); + + // Note: if wrap-around happens, we need to make two calls. This may + // affect latency (e.g. waiting for new data on a socket), so do only + // 1 read call always. + read = stream_read_unbuffered(s, &s->buffer[pos], read); + + s->buf_end += read; + + // May have overwritten old data. + if (s->buf_end - s->buf_start >= buf_alloc) { + assert(s->buf_end >= buf_alloc); + + s->buf_start = s->buf_end - buf_alloc; + + assert(s->buf_start <= s->buf_cur); + assert(s->buf_cur <= s->buf_end); + + if (s->buf_start >= buf_alloc) { + s->buf_start -= buf_alloc; + s->buf_cur -= buf_alloc; + s->buf_end -= buf_alloc; + } } - return s->buf_len - s->buf_pos; -} + // Must not have overwritten guaranteed old data. + assert(s->buf_cur - s->buf_start >= buf_old); -int stream_fill_buffer(stream_t *s) -{ - return stream_extend_buffer(s, STREAM_BUFFER_SIZE, true); + if (s->buf_cur < s->buf_end) + s->eof = 0; + + return !!read; } // Read between 1..buf_size bytes of data, return how much data has been read. // Return 0 on EOF, error, or if buf_size was 0. int stream_read_partial(stream_t *s, char *buf, int buf_size) { - assert(s->buf_pos <= s->buf_len); + assert(s->buf_cur <= s->buf_end); assert(buf_size >= 0); - if (s->buf_pos == s->buf_len && buf_size > 0) { - s->buf_pos = s->buf_len = 0; - stream_resize_buffer(s, 0); - // Do a direct read - // Also, small reads will be more efficient with buffering & copying - if (buf_size >= STREAM_BUFFER_SIZE) + if (s->buf_cur == s->buf_end && buf_size > 0) { + if (buf_size > (s->buffer_mask + 1) / 2) { + // Direct read if the buffer is too small anyway. + stream_drop_buffers(s); return stream_read_unbuffered(s, buf, buf_size); - if (!stream_fill_buffer(s)) - return 0; + } + stream_read_more(s, 1); } - int len = MPMIN(buf_size, s->buf_len - s->buf_pos); - memcpy(buf, &s->buffer[s->buf_pos], len); - s->buf_pos += len; - if (len > 0) - s->eof = 0; - return len; + int res = ring_copy(s, buf, buf_size, s->buf_cur); + s->buf_cur += res; + return res; +} + +// Slow version of stream_read_char(); called by it if the buffer is empty. +int stream_read_char_fallback(stream_t *s) +{ + uint8_t c; + return stream_read_partial(s, &c, 1) ? c : -256; } int stream_read(stream_t *s, char *mem, int total) @@ -476,33 +578,18 @@ int stream_read(stream_t *s, char *mem, int total) len -= read; } total -= len; - if (total > 0) - s->eof = 0; return total; } -// Read ahead at most len bytes without changing the read position. Return a -// pointer to the internal buffer, starting from the current read position. -// Reading ahead may require memory allocation. If allocation fails, read ahead -// is silently limited to the last successful allocation. -// The returned buffer becomes invalid on the next stream call, and you must -// not write to it. -struct bstr stream_peek(stream_t *s, int len) -{ - assert(len >= 0); - - int avail = stream_extend_buffer(s, len, false); - return (bstr){.start = &s->buffer[s->buf_pos], .len = MPMIN(len, avail)}; -} - -// Peek the current buffer. This will return at least 1 byte, unless EOF was -// reached. If data is returned, the length is essentially random. -struct bstr stream_peek_buffer(stream_t *s) +// Like stream_read(), but do not advance the current position. This may resize +// the buffer to satisfy the read request. +int stream_read_peek(stream_t *s, void* buf, int buf_size) { - if (s->buf_len - s->buf_pos < 1) - stream_fill_buffer(s); - return (bstr){.start = &s->buffer[s->buf_pos], - .len = s->buf_len - s->buf_pos}; + while (s->buf_end - s->buf_cur < buf_size) { + if (!stream_read_more(s, buf_size)) + break; + } + return ring_copy(s, buf, buf_size, s->buf_cur); } int stream_write_buffer(stream_t *s, unsigned char *buf, int len) @@ -527,14 +614,14 @@ int stream_write_buffer(stream_t *s, unsigned char *buf, int len) static bool stream_skip_read(struct stream *s, int64_t len) { while (len > 0) { - unsigned int left = s->buf_len - s->buf_pos; + unsigned int left = s->buf_end - s->buf_cur; if (!left) { - if (!stream_fill_buffer(s)) + if (!stream_read_more(s, 1)) return false; continue; } unsigned skip = MPMIN(len, left); - s->buf_pos += skip; + s->buf_cur += skip; len -= skip; } return true; @@ -546,7 +633,7 @@ static bool stream_skip_read(struct stream *s, int64_t len) void stream_drop_buffers(stream_t *s) { s->pos = stream_tell(s); - s->buf_pos = s->buf_len = 0; + s->buf_start = s->buf_cur = s->buf_end = 0; s->eof = 0; stream_resize_buffer(s, 0); } @@ -555,6 +642,9 @@ void stream_drop_buffers(stream_t *s) static bool stream_seek_unbuffered(stream_t *s, int64_t newpos) { if (newpos != s->pos) { + MP_VERBOSE(s, "stream level seek from %" PRId64 " to %" PRId64 "\n", + s->pos, newpos); + if (newpos > s->pos && !s->seekable) { MP_ERR(s, "Cannot seek forward in this stream\n"); return false; @@ -577,7 +667,8 @@ static bool stream_seek_unbuffered(stream_t *s, int64_t newpos) bool stream_seek(stream_t *s, int64_t pos) { - MP_TRACE(s, "seek to %lld\n", (long long)pos); + MP_TRACE(s, "seek request from %" PRId64 " to %" PRId64 "\n", + stream_tell(s), pos); s->eof = 0; // eof should be set only on read; seeking always clears it @@ -586,14 +677,12 @@ bool stream_seek(stream_t *s, int64_t pos) pos = 0; } - if (pos == stream_tell(s)) - return true; - - if (pos < s->pos) { - int64_t x = pos - (s->pos - (int)s->buf_len); - if (x >= 0) { - s->buf_pos = x; - assert(s->buf_pos <= s->buf_len); + if (pos <= s->pos) { + int64_t x = pos - (s->pos - (int)s->buf_end); + if (x >= (int)s->buf_start) { + s->buf_cur = x; + assert(s->buf_cur >= s->buf_start); + assert(s->buf_cur <= s->buf_end); return true; } } @@ -601,35 +690,24 @@ bool stream_seek(stream_t *s, int64_t pos) if (s->mode == STREAM_WRITE) return s->seekable && s->seek(s, pos); - int64_t newpos = pos; - - MP_TRACE(s, "Seek from %" PRId64 " to %" PRId64 - " (with offset %d)\n", s->pos, pos, (int)(pos - newpos)); - - if (pos >= s->pos && !s->seekable && s->fast_skip) { - // skipping is handled by generic code below - } else if (!stream_seek_unbuffered(s, newpos)) { - return false; + // Skip data instead of performing a seek in some cases. + if (pos >= s->pos && + ((!s->seekable && s->fast_skip) || + pos - s->pos <= s->requested_buffer_size)) + { + return stream_skip_read(s, pos - stream_tell(s)); } - return stream_skip_read(s, pos - stream_tell(s)); + return stream_seek_unbuffered(s, pos); } bool stream_skip(stream_t *s, int64_t len) { - int64_t target = stream_tell(s) + len; - if (len < 0) - return stream_seek(s, target); - if (len > 2 * STREAM_BUFFER_SIZE && s->seekable) { - // Seek to 1 byte before target - this is the only way to distinguish - // skip-to-EOF and skip-past-EOF in general. Successful seeking means - // absolutely nothing, so test by doing a real read of the last byte. - if (!stream_seek(s, target - 1)) - return false; - stream_read_char(s); - return !stream_eof(s) && stream_tell(s) == target; - } - return stream_skip_read(s, len); + if (!stream_seek(s, stream_tell(s) + len)) + return false; + // Make sure s->eof is set correctly, even if a seek happened. + stream_read_more(s, 1); + return true; } int stream_control(stream_t *s, int cmd, void *arg) @@ -661,7 +739,9 @@ static const char *const bom[3] = {"\xEF\xBB\xBF", "\xFF\xFE", "\xFE\xFF"}; // Return utf16 argument for stream_read_line int stream_skip_bom(struct stream *s) { - bstr data = stream_peek(s, 4); + char buf[4]; + int len = stream_read_peek(s, buf, sizeof(buf)); + bstr data = {buf, len}; for (int n = 0; n < 3; n++) { if (bstr_startswith0(data, bom[n])) { stream_skip(s, strlen(bom[n])); diff --git a/stream/stream.h b/stream/stream.h index 66f151962a..7437e1c86b 100644 --- a/stream/stream.h +++ b/stream/stream.h @@ -28,7 +28,11 @@ #include "misc/bstr.h" +// Minimum guaranteed buffer and seek-back size. For any reads <= of this size, +// it's guaranteed that you can seek back by <= of this size again. #define STREAM_BUFFER_SIZE 2048 +// (Half of this is typically reserved for seeking back.) +#define STREAM_FIXED_BUFFER_SIZE (STREAM_BUFFER_SIZE * 2) // stream->mode #define STREAM_READ 0 @@ -119,7 +123,6 @@ typedef struct stream { void (*close)(struct stream *s); int read_chunk; // maximum amount of data to read at once to limit latency - unsigned int buf_pos, buf_len; int64_t pos; int eof; int mode; //STREAM_READ or STREAM_WRITE @@ -145,20 +148,46 @@ typedef struct stream { // added to this. The user can reset this as needed. uint64_t total_unbuffered_read_bytes; + // Buffer size requested by user; s->buffer may have a different size + int requested_buffer_size; + + // This is a ring buffer. It is reset only on seeks (or when buffers are + // dropped). Otherwise old contents always stay valid. + // The valid buffer is from buf_start to buf_end; buf_end can be larger + // then the buffer size (requires wrap around). buf_cur is a value in the + // range [buf_start, buf_end]. + // When reading more data from the stream, buf_start is advanced as old + // data is overwritten with new data. + // Example: + // 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 + // +===========================+---------------------------+ + // + 05 06 07 08 | 01 02 03 04 + 05 06 07 08 | 01 02 03 04 + + // +===========================+---------------------------+ + // ^ buf_start (4) | | + // | ^ buf_end (12 % 8 => 4) + // ^ buf_cur (9 % 8 => 1) + // Here, the entire 8 byte buffer is filled, i.e. buf_end - buf_start = 8. + // buffer_mask == 7, so (x & buffer_mask) == (x % buffer_size) + unsigned int buf_start; // index of oldest byte in buffer (is <= buffer_mask) + unsigned int buf_cur; // current read pos (can be > buffer_mask) + unsigned int buf_end; // end position (can be > buffer_mask) + + unsigned int buffer_mask; // buffer_size-1, where buffer_size == 2**n uint8_t *buffer; - int buffer_alloc; - uint8_t buffer_inline[STREAM_BUFFER_SIZE]; + uint8_t buffer_inline[STREAM_FIXED_BUFFER_SIZE]; } stream_t; -int stream_fill_buffer(stream_t *s); +// Non-inline version with stream_read_char(). +int stream_read_char_fallback(stream_t *s); int stream_write_buffer(stream_t *s, unsigned char *buf, int len); inline static int stream_read_char(stream_t *s) { - return (s->buf_pos < s->buf_len) ? s->buffer[s->buf_pos++] : - (stream_fill_buffer(s) ? s->buffer[s->buf_pos++] : -256); + return s->buf_cur < s->buf_end + ? s->buffer[(s->buf_cur++) & s->buffer_mask] + : stream_read_char_fallback(s); } int stream_skip_bom(struct stream *s); @@ -170,15 +199,14 @@ inline static int stream_eof(stream_t *s) inline static int64_t stream_tell(stream_t *s) { - return s->pos + s->buf_pos - s->buf_len; + return s->pos + s->buf_cur - s->buf_end; } bool stream_skip(stream_t *s, int64_t len); bool stream_seek(stream_t *s, int64_t pos); int stream_read(stream_t *s, char *mem, int total); int stream_read_partial(stream_t *s, char *buf, int buf_size); -struct bstr stream_peek(stream_t *s, int len); -struct bstr stream_peek_buffer(stream_t *s); +int stream_read_peek(stream_t *s, void* buf, int buf_size); void stream_drop_buffers(stream_t *s); int64_t stream_get_size(stream_t *s); diff --git a/wscript b/wscript index 034851f940..0fae0c2941 100644 --- a/wscript +++ b/wscript @@ -215,7 +215,7 @@ main_dependencies = [ 'atomic_int_least64_t test = ATOMIC_VAR_INIT(123);' 'atomic_fetch_add(&test, 1)')) }, { - # C11; technically we still support C99 + # C11; technically we require C11, but aligned_alloc() is not in MinGW 'name': 'aligned_alloc', 'desc': 'C11 aligned_alloc()', 'func': check_statement('stdlib.h', 'aligned_alloc(1, 1)'), -- cgit v1.2.3