From f37f4de8496556afaa024e39e2efb433eb1680d4 Mon Sep 17 00:00:00 2001 From: wm4 Date: Wed, 6 Nov 2019 21:36:02 +0100 Subject: stream: turn into a ring buffer, make size configurable In some corner cases (see #6802), it can be beneficial to use a larger stream buffer size. Use this as argument to rewrite everything for no reason. Turn stream.c itself into a ring buffer, with configurable size. The latter would have been easily achievable with minimal changes, and the ring buffer is the hard part. There is no reason to have a ring buffer at all, except possibly if ffmpeg don't fix their awful mp4 demuxer, and some subtle issues with demux_mkv.c wanting to seek back by small offsets (the latter was handled with small stream_peek() calls, which are unneeded now). In addition, this turns small forward seeks into reads (where data is simply skipped). Before this commit, only stream_skip() did this (which also mean that stream_skip() simply calls stream_seek() now). Replace all stream_peek() calls with something else (usually stream_read_peek()). The function was a problem, because it returned a pointer to the internal buffer, which is now a ring buffer with wrapping. The new function just copies the data into a buffer, and in some cases requires callers to dynamically allocate memory. (The most common case, demux_lavf.c, required a separate buffer allocation anyway due to FFmpeg "idiosyncrasies".) This is the bulk of the demuxer_* changes. I'm not happy with this. There still isn't a good reason why there should be a ring buffer, that is complex, and most of the time just wastes half of the available memory. Maybe another rewrite soon. It also contains bugs; you're an alpha tester now. --- stream/stream.c | 364 ++++++++++++++++++++++++++++++++++---------------------- stream/stream.h | 46 +++++-- 2 files changed, 259 insertions(+), 151 deletions(-) (limited to 'stream') diff --git a/stream/stream.c b/stream/stream.c index 063c033bd4..143c1a28e5 100644 --- a/stream/stream.c +++ b/stream/stream.c @@ -33,6 +33,7 @@ #include "misc/bstr.h" #include "misc/thread_tools.h" #include "common/msg.h" +#include "options/m_config.h" #include "options/options.h" #include "options/path.h" #include "osdep/timer.h" @@ -94,7 +95,23 @@ static const stream_info_t *const stream_list[] = { NULL }; -static bool stream_seek_unbuffered(stream_t *s, int64_t newpos); +struct stream_opts { + int64_t buffer_size; +}; + +#define OPT_BASE_STRUCT struct stream_opts + +const struct m_sub_options stream_conf = { + .opts = (const struct m_option[]){ + OPT_BYTE_SIZE("stream-buffer-size", buffer_size, 0, + STREAM_FIXED_BUFFER_SIZE, 512 * 1024 * 1024), + {0} + }, + .size = sizeof(struct stream_opts), + .defaults = &(const struct stream_opts){ + .buffer_size = STREAM_FIXED_BUFFER_SIZE, + }, +}; // return -1 if not hex char static int hex2dec(char c) @@ -181,35 +198,91 @@ static const char *match_proto(const char *url, const char *proto) return NULL; } -// Resize the current stream buffer, or do nothing if the size is adequate. -// Caller must ensure the used buffer is not less than the new buffer size. -// Calling this with 0 ensures it uses the default buffer size. -static void stream_resize_buffer(struct stream *s, int new) +// Read len bytes from the start position, and wrap around as needed. Limit the +// a actually read data to the size of the buffer. Return amount of copied bytes. +// len: max bytes to copy to dst +// pos: index into s->buffer[], e.g. s->buf_start is byte 0 +// returns: bytes copied to dst (limited by len and available buffered data) +static int ring_copy(struct stream *s, void *dst, int len, int pos) { - new = MPMAX(new, STREAM_BUFFER_SIZE); + assert(len >= 0); - if (new == s->buffer_alloc) - return; + if (pos < s->buf_start || pos > s->buf_end) + return 0; - int buffer_used = s->buf_len - s->buf_pos; - assert(buffer_used <= new); + int copied = 0; + len = MPMIN(len, s->buf_end - pos); - void *nbuf = s->buffer_inline; - if (new > STREAM_BUFFER_SIZE) - nbuf = ta_alloc_size(s, new); + if (len && pos <= s->buffer_mask) { + int copy = MPMIN(len, s->buffer_mask + 1 - pos); + memcpy(dst, &s->buffer[pos], copy); + copied += copy; + len -= copy; + pos += copy; + } - if (nbuf) { - if (s->buffer) - memmove(nbuf, &s->buffer[s->buf_pos], buffer_used); - s->buf_pos = 0; - s->buf_len = buffer_used; + if (len) { + memcpy((char *)dst + copied, &s->buffer[pos & s->buffer_mask], len); + copied += len; + } - if (s->buffer != s->buffer_inline) - ta_free(s->buffer); + return copied; +} - s->buffer = nbuf; - s->buffer_alloc = new; +// Resize the current stream buffer. Uses a larger size if needed to keep data. +// Does nothing if the size is adequate. Calling this with 0 ensures it uses the +// default buffer size if possible. +// The caller must check whether enough data was really allocated. +// Returns false if buffer allocation failed. +static bool stream_resize_buffer(struct stream *s, uint32_t new) +{ + // Keep all valid buffer. + int old_used_len = s->buf_end - s->buf_start; + int old_pos = s->buf_cur - s->buf_start; + new = MPMAX(new, old_used_len); + + new = MPMAX(new, s->requested_buffer_size); + + // This much is always required. + new = MPMAX(new, STREAM_FIXED_BUFFER_SIZE); + + new = mp_round_next_power_of_2(new); + if (!new || new > INT_MAX / 8) + return false; + + if (new == s->buffer_mask + 1) + return true; + + MP_DBG(s, "resize stream to %d bytes\n", new); + + uint8_t *nbuf = s->buffer_inline; + if (new > STREAM_FIXED_BUFFER_SIZE) { + nbuf = ta_alloc_size(s, new); + } else { + static_assert(MP_IS_POWER_OF_2(STREAM_FIXED_BUFFER_SIZE), ""); + assert(new == STREAM_FIXED_BUFFER_SIZE); } + assert(nbuf != s->buffer); + + if (!nbuf) + return false; // oom; tolerate it, caller needs to check if required + + int new_len = 0; + if (s->buffer) + new_len = ring_copy(s, nbuf, new, s->buf_start); + assert(new_len == old_used_len); + assert(old_pos <= old_used_len); + s->buf_start = 0; + s->buf_cur = old_pos; + s->buf_end = new_len; + + if (s->buffer != s->buffer_inline) + ta_free(s->buffer); + + s->buffer = nbuf; + s->buffer_mask = new - 1; + + return true; } static int stream_create_instance(const stream_info_t *sinfo, @@ -236,6 +309,9 @@ static int stream_create_instance(const stream_info_t *sinfo, if (!path) return STREAM_NO_MATCH; + struct stream_opts *opts = + mp_get_config_group(NULL, args->global, &stream_conf); + stream_t *s = talloc_zero(NULL, stream_t); s->global = args->global; if (flags & STREAM_SILENT) { @@ -249,11 +325,14 @@ static int stream_create_instance(const stream_info_t *sinfo, s->path = talloc_strdup(s, path); s->is_network = sinfo->is_network; s->mode = flags & (STREAM_READ | STREAM_WRITE); + s->requested_buffer_size = opts->buffer_size; int opt; mp_read_option_raw(s->global, "access-references", &m_option_type_flag, &opt); s->access_references = opt; + talloc_free(opts); + MP_VERBOSE(s, "Opening %s\n", url); if (strlen(url) > INT_MAX / 8) { @@ -282,8 +361,10 @@ static int stream_create_instance(const stream_info_t *sinfo, if (!s->read_chunk) s->read_chunk = 4 * STREAM_BUFFER_SIZE; - stream_resize_buffer(s, 0); - MP_HANDLE_OOM(s->buffer); + if (!stream_resize_buffer(s, 0)) { + free_stream(s); + return STREAM_ERROR; + } assert(s->seekable == !!s->seek); @@ -391,78 +472,99 @@ static int stream_read_unbuffered(stream_t *s, void *buf, int len) return res; } -// Ask for having "total" bytes ready to read in the stream buffer. This can do -// a partial read if requested, so it can actually read less. +// Ask for having at most "forward" bytes ready to read in the buffer. // To read everything, you may have to call this in a loop. -// total: desired amount of bytes in buffer -// allow_short: if true, attempt at most once to read more if needed -// returns: actual bytes in buffer (can be smaller or larger than total) -static int stream_extend_buffer(struct stream *s, int total, bool allow_short) -{ - assert(total >= 0); - - if (s->buf_len - s->buf_pos < total) { - // Move to front to guarantee we really can read up to max size. - s->buf_len = s->buf_len - s->buf_pos; - memmove(s->buffer, &s->buffer[s->buf_pos], s->buf_len); - s->buf_pos = 0; - - // Read ahead by about as much as stream_fill_buffer() would, to avoid - // that many small stream_peek() calls will read the buffer at these - // quantities. - total = MPMAX(total, STREAM_BUFFER_SIZE); - - // Allocate more if the buffer is too small. Also, if the buffer is - // larger than needed, resize it to smaller. This assumes stream_peek() - // calls are rare or done with small sizes. - stream_resize_buffer(s, total); - - // Read less if allocation above failed. - total = MPMIN(total, s->buffer_alloc); - - // Fill rest of the buffer. Can be partial. - while (total > s->buf_len) { - int read = stream_read_unbuffered(s, &s->buffer[s->buf_len], - total - s->buf_len); - s->buf_len += read; - if (allow_short || !read) - break; - } +// forward: desired amount of bytes in buffer after s->cur_pos +// returns: progress (false on EOF or memory allocation failure) +static bool stream_read_more(struct stream *s, int forward) +{ + assert(forward >= 0); + + int forward_avail = s->buf_end - s->buf_cur; + if (forward_avail >= forward) + return true; + + // Avoid that many small reads will lead to many low-level read calls. + forward = MPMAX(forward, s->requested_buffer_size / 2); + + // Keep guaranteed seek-back. + int buf_old = MPMIN(s->buf_cur - s->buf_start, s->requested_buffer_size / 2); - if (s->buf_len) - s->eof = 0; + if (!stream_resize_buffer(s, buf_old + forward)) + return false; + + int buf_alloc = s->buffer_mask + 1; + + assert(s->buf_start <= s->buf_cur); + assert(s->buf_cur <= s->buf_end); + assert(s->buf_cur < buf_alloc * 2); + assert(s->buf_end < buf_alloc * 2); + assert(s->buf_start < buf_alloc); + + // Note: read as much as possible, even if forward is much smaller. Do + // this because the stream buffer is supposed to set an approx. minimum + // read size on it. + int read = buf_alloc - buf_old - forward_avail; // free buffer past end + + int pos = s->buf_end & s->buffer_mask; + read = MPMIN(read, buf_alloc - pos); + + // Note: if wrap-around happens, we need to make two calls. This may + // affect latency (e.g. waiting for new data on a socket), so do only + // 1 read call always. + read = stream_read_unbuffered(s, &s->buffer[pos], read); + + s->buf_end += read; + + // May have overwritten old data. + if (s->buf_end - s->buf_start >= buf_alloc) { + assert(s->buf_end >= buf_alloc); + + s->buf_start = s->buf_end - buf_alloc; + + assert(s->buf_start <= s->buf_cur); + assert(s->buf_cur <= s->buf_end); + + if (s->buf_start >= buf_alloc) { + s->buf_start -= buf_alloc; + s->buf_cur -= buf_alloc; + s->buf_end -= buf_alloc; + } } - return s->buf_len - s->buf_pos; -} + // Must not have overwritten guaranteed old data. + assert(s->buf_cur - s->buf_start >= buf_old); -int stream_fill_buffer(stream_t *s) -{ - return stream_extend_buffer(s, STREAM_BUFFER_SIZE, true); + if (s->buf_cur < s->buf_end) + s->eof = 0; + + return !!read; } // Read between 1..buf_size bytes of data, return how much data has been read. // Return 0 on EOF, error, or if buf_size was 0. int stream_read_partial(stream_t *s, char *buf, int buf_size) { - assert(s->buf_pos <= s->buf_len); + assert(s->buf_cur <= s->buf_end); assert(buf_size >= 0); - if (s->buf_pos == s->buf_len && buf_size > 0) { - s->buf_pos = s->buf_len = 0; - stream_resize_buffer(s, 0); - // Do a direct read - // Also, small reads will be more efficient with buffering & copying - if (buf_size >= STREAM_BUFFER_SIZE) + if (s->buf_cur == s->buf_end && buf_size > 0) { + if (buf_size > (s->buffer_mask + 1) / 2) { + // Direct read if the buffer is too small anyway. + stream_drop_buffers(s); return stream_read_unbuffered(s, buf, buf_size); - if (!stream_fill_buffer(s)) - return 0; + } + stream_read_more(s, 1); } - int len = MPMIN(buf_size, s->buf_len - s->buf_pos); - memcpy(buf, &s->buffer[s->buf_pos], len); - s->buf_pos += len; - if (len > 0) - s->eof = 0; - return len; + int res = ring_copy(s, buf, buf_size, s->buf_cur); + s->buf_cur += res; + return res; +} + +// Slow version of stream_read_char(); called by it if the buffer is empty. +int stream_read_char_fallback(stream_t *s) +{ + uint8_t c; + return stream_read_partial(s, &c, 1) ? c : -256; } int stream_read(stream_t *s, char *mem, int total) @@ -476,33 +578,18 @@ int stream_read(stream_t *s, char *mem, int total) len -= read; } total -= len; - if (total > 0) - s->eof = 0; return total; } -// Read ahead at most len bytes without changing the read position. Return a -// pointer to the internal buffer, starting from the current read position. -// Reading ahead may require memory allocation. If allocation fails, read ahead -// is silently limited to the last successful allocation. -// The returned buffer becomes invalid on the next stream call, and you must -// not write to it. -struct bstr stream_peek(stream_t *s, int len) -{ - assert(len >= 0); - - int avail = stream_extend_buffer(s, len, false); - return (bstr){.start = &s->buffer[s->buf_pos], .len = MPMIN(len, avail)}; -} - -// Peek the current buffer. This will return at least 1 byte, unless EOF was -// reached. If data is returned, the length is essentially random. -struct bstr stream_peek_buffer(stream_t *s) +// Like stream_read(), but do not advance the current position. This may resize +// the buffer to satisfy the read request. +int stream_read_peek(stream_t *s, void* buf, int buf_size) { - if (s->buf_len - s->buf_pos < 1) - stream_fill_buffer(s); - return (bstr){.start = &s->buffer[s->buf_pos], - .len = s->buf_len - s->buf_pos}; + while (s->buf_end - s->buf_cur < buf_size) { + if (!stream_read_more(s, buf_size)) + break; + } + return ring_copy(s, buf, buf_size, s->buf_cur); } int stream_write_buffer(stream_t *s, unsigned char *buf, int len) @@ -527,14 +614,14 @@ int stream_write_buffer(stream_t *s, unsigned char *buf, int len) static bool stream_skip_read(struct stream *s, int64_t len) { while (len > 0) { - unsigned int left = s->buf_len - s->buf_pos; + unsigned int left = s->buf_end - s->buf_cur; if (!left) { - if (!stream_fill_buffer(s)) + if (!stream_read_more(s, 1)) return false; continue; } unsigned skip = MPMIN(len, left); - s->buf_pos += skip; + s->buf_cur += skip; len -= skip; } return true; @@ -546,7 +633,7 @@ static bool stream_skip_read(struct stream *s, int64_t len) void stream_drop_buffers(stream_t *s) { s->pos = stream_tell(s); - s->buf_pos = s->buf_len = 0; + s->buf_start = s->buf_cur = s->buf_end = 0; s->eof = 0; stream_resize_buffer(s, 0); } @@ -555,6 +642,9 @@ void stream_drop_buffers(stream_t *s) static bool stream_seek_unbuffered(stream_t *s, int64_t newpos) { if (newpos != s->pos) { + MP_VERBOSE(s, "stream level seek from %" PRId64 " to %" PRId64 "\n", + s->pos, newpos); + if (newpos > s->pos && !s->seekable) { MP_ERR(s, "Cannot seek forward in this stream\n"); return false; @@ -577,7 +667,8 @@ static bool stream_seek_unbuffered(stream_t *s, int64_t newpos) bool stream_seek(stream_t *s, int64_t pos) { - MP_TRACE(s, "seek to %lld\n", (long long)pos); + MP_TRACE(s, "seek request from %" PRId64 " to %" PRId64 "\n", + stream_tell(s), pos); s->eof = 0; // eof should be set only on read; seeking always clears it @@ -586,14 +677,12 @@ bool stream_seek(stream_t *s, int64_t pos) pos = 0; } - if (pos == stream_tell(s)) - return true; - - if (pos < s->pos) { - int64_t x = pos - (s->pos - (int)s->buf_len); - if (x >= 0) { - s->buf_pos = x; - assert(s->buf_pos <= s->buf_len); + if (pos <= s->pos) { + int64_t x = pos - (s->pos - (int)s->buf_end); + if (x >= (int)s->buf_start) { + s->buf_cur = x; + assert(s->buf_cur >= s->buf_start); + assert(s->buf_cur <= s->buf_end); return true; } } @@ -601,35 +690,24 @@ bool stream_seek(stream_t *s, int64_t pos) if (s->mode == STREAM_WRITE) return s->seekable && s->seek(s, pos); - int64_t newpos = pos; - - MP_TRACE(s, "Seek from %" PRId64 " to %" PRId64 - " (with offset %d)\n", s->pos, pos, (int)(pos - newpos)); - - if (pos >= s->pos && !s->seekable && s->fast_skip) { - // skipping is handled by generic code below - } else if (!stream_seek_unbuffered(s, newpos)) { - return false; + // Skip data instead of performing a seek in some cases. + if (pos >= s->pos && + ((!s->seekable && s->fast_skip) || + pos - s->pos <= s->requested_buffer_size)) + { + return stream_skip_read(s, pos - stream_tell(s)); } - return stream_skip_read(s, pos - stream_tell(s)); + return stream_seek_unbuffered(s, pos); } bool stream_skip(stream_t *s, int64_t len) { - int64_t target = stream_tell(s) + len; - if (len < 0) - return stream_seek(s, target); - if (len > 2 * STREAM_BUFFER_SIZE && s->seekable) { - // Seek to 1 byte before target - this is the only way to distinguish - // skip-to-EOF and skip-past-EOF in general. Successful seeking means - // absolutely nothing, so test by doing a real read of the last byte. - if (!stream_seek(s, target - 1)) - return false; - stream_read_char(s); - return !stream_eof(s) && stream_tell(s) == target; - } - return stream_skip_read(s, len); + if (!stream_seek(s, stream_tell(s) + len)) + return false; + // Make sure s->eof is set correctly, even if a seek happened. + stream_read_more(s, 1); + return true; } int stream_control(stream_t *s, int cmd, void *arg) @@ -661,7 +739,9 @@ static const char *const bom[3] = {"\xEF\xBB\xBF", "\xFF\xFE", "\xFE\xFF"}; // Return utf16 argument for stream_read_line int stream_skip_bom(struct stream *s) { - bstr data = stream_peek(s, 4); + char buf[4]; + int len = stream_read_peek(s, buf, sizeof(buf)); + bstr data = {buf, len}; for (int n = 0; n < 3; n++) { if (bstr_startswith0(data, bom[n])) { stream_skip(s, strlen(bom[n])); diff --git a/stream/stream.h b/stream/stream.h index 66f151962a..7437e1c86b 100644 --- a/stream/stream.h +++ b/stream/stream.h @@ -28,7 +28,11 @@ #include "misc/bstr.h" +// Minimum guaranteed buffer and seek-back size. For any reads <= of this size, +// it's guaranteed that you can seek back by <= of this size again. #define STREAM_BUFFER_SIZE 2048 +// (Half of this is typically reserved for seeking back.) +#define STREAM_FIXED_BUFFER_SIZE (STREAM_BUFFER_SIZE * 2) // stream->mode #define STREAM_READ 0 @@ -119,7 +123,6 @@ typedef struct stream { void (*close)(struct stream *s); int read_chunk; // maximum amount of data to read at once to limit latency - unsigned int buf_pos, buf_len; int64_t pos; int eof; int mode; //STREAM_READ or STREAM_WRITE @@ -145,20 +148,46 @@ typedef struct stream { // added to this. The user can reset this as needed. uint64_t total_unbuffered_read_bytes; + // Buffer size requested by user; s->buffer may have a different size + int requested_buffer_size; + + // This is a ring buffer. It is reset only on seeks (or when buffers are + // dropped). Otherwise old contents always stay valid. + // The valid buffer is from buf_start to buf_end; buf_end can be larger + // then the buffer size (requires wrap around). buf_cur is a value in the + // range [buf_start, buf_end]. + // When reading more data from the stream, buf_start is advanced as old + // data is overwritten with new data. + // Example: + // 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 + // +===========================+---------------------------+ + // + 05 06 07 08 | 01 02 03 04 + 05 06 07 08 | 01 02 03 04 + + // +===========================+---------------------------+ + // ^ buf_start (4) | | + // | ^ buf_end (12 % 8 => 4) + // ^ buf_cur (9 % 8 => 1) + // Here, the entire 8 byte buffer is filled, i.e. buf_end - buf_start = 8. + // buffer_mask == 7, so (x & buffer_mask) == (x % buffer_size) + unsigned int buf_start; // index of oldest byte in buffer (is <= buffer_mask) + unsigned int buf_cur; // current read pos (can be > buffer_mask) + unsigned int buf_end; // end position (can be > buffer_mask) + + unsigned int buffer_mask; // buffer_size-1, where buffer_size == 2**n uint8_t *buffer; - int buffer_alloc; - uint8_t buffer_inline[STREAM_BUFFER_SIZE]; + uint8_t buffer_inline[STREAM_FIXED_BUFFER_SIZE]; } stream_t; -int stream_fill_buffer(stream_t *s); +// Non-inline version with stream_read_char(). +int stream_read_char_fallback(stream_t *s); int stream_write_buffer(stream_t *s, unsigned char *buf, int len); inline static int stream_read_char(stream_t *s) { - return (s->buf_pos < s->buf_len) ? s->buffer[s->buf_pos++] : - (stream_fill_buffer(s) ? s->buffer[s->buf_pos++] : -256); + return s->buf_cur < s->buf_end + ? s->buffer[(s->buf_cur++) & s->buffer_mask] + : stream_read_char_fallback(s); } int stream_skip_bom(struct stream *s); @@ -170,15 +199,14 @@ inline static int stream_eof(stream_t *s) inline static int64_t stream_tell(stream_t *s) { - return s->pos + s->buf_pos - s->buf_len; + return s->pos + s->buf_cur - s->buf_end; } bool stream_skip(stream_t *s, int64_t len); bool stream_seek(stream_t *s, int64_t pos); int stream_read(stream_t *s, char *mem, int total); int stream_read_partial(stream_t *s, char *buf, int buf_size); -struct bstr stream_peek(stream_t *s, int len); -struct bstr stream_peek_buffer(stream_t *s); +int stream_read_peek(stream_t *s, void* buf, int buf_size); void stream_drop_buffers(stream_t *s); int64_t stream_get_size(stream_t *s); -- cgit v1.2.3