diff options
Diffstat (limited to 'stream/stream.c')
-rw-r--r-- | stream/stream.c | 364 |
1 files changed, 222 insertions, 142 deletions
diff --git a/stream/stream.c b/stream/stream.c index 063c033bd4..143c1a28e5 100644 --- a/stream/stream.c +++ b/stream/stream.c @@ -33,6 +33,7 @@ #include "misc/bstr.h" #include "misc/thread_tools.h" #include "common/msg.h" +#include "options/m_config.h" #include "options/options.h" #include "options/path.h" #include "osdep/timer.h" @@ -94,7 +95,23 @@ static const stream_info_t *const stream_list[] = { NULL }; -static bool stream_seek_unbuffered(stream_t *s, int64_t newpos); +struct stream_opts { + int64_t buffer_size; +}; + +#define OPT_BASE_STRUCT struct stream_opts + +const struct m_sub_options stream_conf = { + .opts = (const struct m_option[]){ + OPT_BYTE_SIZE("stream-buffer-size", buffer_size, 0, + STREAM_FIXED_BUFFER_SIZE, 512 * 1024 * 1024), + {0} + }, + .size = sizeof(struct stream_opts), + .defaults = &(const struct stream_opts){ + .buffer_size = STREAM_FIXED_BUFFER_SIZE, + }, +}; // return -1 if not hex char static int hex2dec(char c) @@ -181,35 +198,91 @@ static const char *match_proto(const char *url, const char *proto) return NULL; } -// Resize the current stream buffer, or do nothing if the size is adequate. -// Caller must ensure the used buffer is not less than the new buffer size. -// Calling this with 0 ensures it uses the default buffer size. -static void stream_resize_buffer(struct stream *s, int new) +// Read len bytes from the start position, and wrap around as needed. Limit the +// a actually read data to the size of the buffer. Return amount of copied bytes. +// len: max bytes to copy to dst +// pos: index into s->buffer[], e.g. s->buf_start is byte 0 +// returns: bytes copied to dst (limited by len and available buffered data) +static int ring_copy(struct stream *s, void *dst, int len, int pos) { - new = MPMAX(new, STREAM_BUFFER_SIZE); + assert(len >= 0); - if (new == s->buffer_alloc) - return; + if (pos < s->buf_start || pos > s->buf_end) + return 0; - int buffer_used = s->buf_len - s->buf_pos; - assert(buffer_used <= new); + int copied = 0; + len = MPMIN(len, s->buf_end - pos); - void *nbuf = s->buffer_inline; - if (new > STREAM_BUFFER_SIZE) - nbuf = ta_alloc_size(s, new); + if (len && pos <= s->buffer_mask) { + int copy = MPMIN(len, s->buffer_mask + 1 - pos); + memcpy(dst, &s->buffer[pos], copy); + copied += copy; + len -= copy; + pos += copy; + } - if (nbuf) { - if (s->buffer) - memmove(nbuf, &s->buffer[s->buf_pos], buffer_used); - s->buf_pos = 0; - s->buf_len = buffer_used; + if (len) { + memcpy((char *)dst + copied, &s->buffer[pos & s->buffer_mask], len); + copied += len; + } - if (s->buffer != s->buffer_inline) - ta_free(s->buffer); + return copied; +} - s->buffer = nbuf; - s->buffer_alloc = new; +// Resize the current stream buffer. Uses a larger size if needed to keep data. +// Does nothing if the size is adequate. Calling this with 0 ensures it uses the +// default buffer size if possible. +// The caller must check whether enough data was really allocated. +// Returns false if buffer allocation failed. +static bool stream_resize_buffer(struct stream *s, uint32_t new) +{ + // Keep all valid buffer. + int old_used_len = s->buf_end - s->buf_start; + int old_pos = s->buf_cur - s->buf_start; + new = MPMAX(new, old_used_len); + + new = MPMAX(new, s->requested_buffer_size); + + // This much is always required. + new = MPMAX(new, STREAM_FIXED_BUFFER_SIZE); + + new = mp_round_next_power_of_2(new); + if (!new || new > INT_MAX / 8) + return false; + + if (new == s->buffer_mask + 1) + return true; + + MP_DBG(s, "resize stream to %d bytes\n", new); + + uint8_t *nbuf = s->buffer_inline; + if (new > STREAM_FIXED_BUFFER_SIZE) { + nbuf = ta_alloc_size(s, new); + } else { + static_assert(MP_IS_POWER_OF_2(STREAM_FIXED_BUFFER_SIZE), ""); + assert(new == STREAM_FIXED_BUFFER_SIZE); } + assert(nbuf != s->buffer); + + if (!nbuf) + return false; // oom; tolerate it, caller needs to check if required + + int new_len = 0; + if (s->buffer) + new_len = ring_copy(s, nbuf, new, s->buf_start); + assert(new_len == old_used_len); + assert(old_pos <= old_used_len); + s->buf_start = 0; + s->buf_cur = old_pos; + s->buf_end = new_len; + + if (s->buffer != s->buffer_inline) + ta_free(s->buffer); + + s->buffer = nbuf; + s->buffer_mask = new - 1; + + return true; } static int stream_create_instance(const stream_info_t *sinfo, @@ -236,6 +309,9 @@ static int stream_create_instance(const stream_info_t *sinfo, if (!path) return STREAM_NO_MATCH; + struct stream_opts *opts = + mp_get_config_group(NULL, args->global, &stream_conf); + stream_t *s = talloc_zero(NULL, stream_t); s->global = args->global; if (flags & STREAM_SILENT) { @@ -249,11 +325,14 @@ static int stream_create_instance(const stream_info_t *sinfo, s->path = talloc_strdup(s, path); s->is_network = sinfo->is_network; s->mode = flags & (STREAM_READ | STREAM_WRITE); + s->requested_buffer_size = opts->buffer_size; int opt; mp_read_option_raw(s->global, "access-references", &m_option_type_flag, &opt); s->access_references = opt; + talloc_free(opts); + MP_VERBOSE(s, "Opening %s\n", url); if (strlen(url) > INT_MAX / 8) { @@ -282,8 +361,10 @@ static int stream_create_instance(const stream_info_t *sinfo, if (!s->read_chunk) s->read_chunk = 4 * STREAM_BUFFER_SIZE; - stream_resize_buffer(s, 0); - MP_HANDLE_OOM(s->buffer); + if (!stream_resize_buffer(s, 0)) { + free_stream(s); + return STREAM_ERROR; + } assert(s->seekable == !!s->seek); @@ -391,78 +472,99 @@ static int stream_read_unbuffered(stream_t *s, void *buf, int len) return res; } -// Ask for having "total" bytes ready to read in the stream buffer. This can do -// a partial read if requested, so it can actually read less. +// Ask for having at most "forward" bytes ready to read in the buffer. // To read everything, you may have to call this in a loop. -// total: desired amount of bytes in buffer -// allow_short: if true, attempt at most once to read more if needed -// returns: actual bytes in buffer (can be smaller or larger than total) -static int stream_extend_buffer(struct stream *s, int total, bool allow_short) -{ - assert(total >= 0); - - if (s->buf_len - s->buf_pos < total) { - // Move to front to guarantee we really can read up to max size. - s->buf_len = s->buf_len - s->buf_pos; - memmove(s->buffer, &s->buffer[s->buf_pos], s->buf_len); - s->buf_pos = 0; - - // Read ahead by about as much as stream_fill_buffer() would, to avoid - // that many small stream_peek() calls will read the buffer at these - // quantities. - total = MPMAX(total, STREAM_BUFFER_SIZE); - - // Allocate more if the buffer is too small. Also, if the buffer is - // larger than needed, resize it to smaller. This assumes stream_peek() - // calls are rare or done with small sizes. - stream_resize_buffer(s, total); - - // Read less if allocation above failed. - total = MPMIN(total, s->buffer_alloc); - - // Fill rest of the buffer. Can be partial. - while (total > s->buf_len) { - int read = stream_read_unbuffered(s, &s->buffer[s->buf_len], - total - s->buf_len); - s->buf_len += read; - if (allow_short || !read) - break; - } +// forward: desired amount of bytes in buffer after s->cur_pos +// returns: progress (false on EOF or memory allocation failure) +static bool stream_read_more(struct stream *s, int forward) +{ + assert(forward >= 0); + + int forward_avail = s->buf_end - s->buf_cur; + if (forward_avail >= forward) + return true; + + // Avoid that many small reads will lead to many low-level read calls. + forward = MPMAX(forward, s->requested_buffer_size / 2); + + // Keep guaranteed seek-back. + int buf_old = MPMIN(s->buf_cur - s->buf_start, s->requested_buffer_size / 2); - if (s->buf_len) - s->eof = 0; + if (!stream_resize_buffer(s, buf_old + forward)) + return false; + + int buf_alloc = s->buffer_mask + 1; + + assert(s->buf_start <= s->buf_cur); + assert(s->buf_cur <= s->buf_end); + assert(s->buf_cur < buf_alloc * 2); + assert(s->buf_end < buf_alloc * 2); + assert(s->buf_start < buf_alloc); + + // Note: read as much as possible, even if forward is much smaller. Do + // this because the stream buffer is supposed to set an approx. minimum + // read size on it. + int read = buf_alloc - buf_old - forward_avail; // free buffer past end + + int pos = s->buf_end & s->buffer_mask; + read = MPMIN(read, buf_alloc - pos); + + // Note: if wrap-around happens, we need to make two calls. This may + // affect latency (e.g. waiting for new data on a socket), so do only + // 1 read call always. + read = stream_read_unbuffered(s, &s->buffer[pos], read); + + s->buf_end += read; + + // May have overwritten old data. + if (s->buf_end - s->buf_start >= buf_alloc) { + assert(s->buf_end >= buf_alloc); + + s->buf_start = s->buf_end - buf_alloc; + + assert(s->buf_start <= s->buf_cur); + assert(s->buf_cur <= s->buf_end); + + if (s->buf_start >= buf_alloc) { + s->buf_start -= buf_alloc; + s->buf_cur -= buf_alloc; + s->buf_end -= buf_alloc; + } } - return s->buf_len - s->buf_pos; -} + // Must not have overwritten guaranteed old data. + assert(s->buf_cur - s->buf_start >= buf_old); -int stream_fill_buffer(stream_t *s) -{ - return stream_extend_buffer(s, STREAM_BUFFER_SIZE, true); + if (s->buf_cur < s->buf_end) + s->eof = 0; + + return !!read; } // Read between 1..buf_size bytes of data, return how much data has been read. // Return 0 on EOF, error, or if buf_size was 0. int stream_read_partial(stream_t *s, char *buf, int buf_size) { - assert(s->buf_pos <= s->buf_len); + assert(s->buf_cur <= s->buf_end); assert(buf_size >= 0); - if (s->buf_pos == s->buf_len && buf_size > 0) { - s->buf_pos = s->buf_len = 0; - stream_resize_buffer(s, 0); - // Do a direct read - // Also, small reads will be more efficient with buffering & copying - if (buf_size >= STREAM_BUFFER_SIZE) + if (s->buf_cur == s->buf_end && buf_size > 0) { + if (buf_size > (s->buffer_mask + 1) / 2) { + // Direct read if the buffer is too small anyway. + stream_drop_buffers(s); return stream_read_unbuffered(s, buf, buf_size); - if (!stream_fill_buffer(s)) - return 0; + } + stream_read_more(s, 1); } - int len = MPMIN(buf_size, s->buf_len - s->buf_pos); - memcpy(buf, &s->buffer[s->buf_pos], len); - s->buf_pos += len; - if (len > 0) - s->eof = 0; - return len; + int res = ring_copy(s, buf, buf_size, s->buf_cur); + s->buf_cur += res; + return res; +} + +// Slow version of stream_read_char(); called by it if the buffer is empty. +int stream_read_char_fallback(stream_t *s) +{ + uint8_t c; + return stream_read_partial(s, &c, 1) ? c : -256; } int stream_read(stream_t *s, char *mem, int total) @@ -476,33 +578,18 @@ int stream_read(stream_t *s, char *mem, int total) len -= read; } total -= len; - if (total > 0) - s->eof = 0; return total; } -// Read ahead at most len bytes without changing the read position. Return a -// pointer to the internal buffer, starting from the current read position. -// Reading ahead may require memory allocation. If allocation fails, read ahead -// is silently limited to the last successful allocation. -// The returned buffer becomes invalid on the next stream call, and you must -// not write to it. -struct bstr stream_peek(stream_t *s, int len) -{ - assert(len >= 0); - - int avail = stream_extend_buffer(s, len, false); - return (bstr){.start = &s->buffer[s->buf_pos], .len = MPMIN(len, avail)}; -} - -// Peek the current buffer. This will return at least 1 byte, unless EOF was -// reached. If data is returned, the length is essentially random. -struct bstr stream_peek_buffer(stream_t *s) +// Like stream_read(), but do not advance the current position. This may resize +// the buffer to satisfy the read request. +int stream_read_peek(stream_t *s, void* buf, int buf_size) { - if (s->buf_len - s->buf_pos < 1) - stream_fill_buffer(s); - return (bstr){.start = &s->buffer[s->buf_pos], - .len = s->buf_len - s->buf_pos}; + while (s->buf_end - s->buf_cur < buf_size) { + if (!stream_read_more(s, buf_size)) + break; + } + return ring_copy(s, buf, buf_size, s->buf_cur); } int stream_write_buffer(stream_t *s, unsigned char *buf, int len) @@ -527,14 +614,14 @@ int stream_write_buffer(stream_t *s, unsigned char *buf, int len) static bool stream_skip_read(struct stream *s, int64_t len) { while (len > 0) { - unsigned int left = s->buf_len - s->buf_pos; + unsigned int left = s->buf_end - s->buf_cur; if (!left) { - if (!stream_fill_buffer(s)) + if (!stream_read_more(s, 1)) return false; continue; } unsigned skip = MPMIN(len, left); - s->buf_pos += skip; + s->buf_cur += skip; len -= skip; } return true; @@ -546,7 +633,7 @@ static bool stream_skip_read(struct stream *s, int64_t len) void stream_drop_buffers(stream_t *s) { s->pos = stream_tell(s); - s->buf_pos = s->buf_len = 0; + s->buf_start = s->buf_cur = s->buf_end = 0; s->eof = 0; stream_resize_buffer(s, 0); } @@ -555,6 +642,9 @@ void stream_drop_buffers(stream_t *s) static bool stream_seek_unbuffered(stream_t *s, int64_t newpos) { if (newpos != s->pos) { + MP_VERBOSE(s, "stream level seek from %" PRId64 " to %" PRId64 "\n", + s->pos, newpos); + if (newpos > s->pos && !s->seekable) { MP_ERR(s, "Cannot seek forward in this stream\n"); return false; @@ -577,7 +667,8 @@ static bool stream_seek_unbuffered(stream_t *s, int64_t newpos) bool stream_seek(stream_t *s, int64_t pos) { - MP_TRACE(s, "seek to %lld\n", (long long)pos); + MP_TRACE(s, "seek request from %" PRId64 " to %" PRId64 "\n", + stream_tell(s), pos); s->eof = 0; // eof should be set only on read; seeking always clears it @@ -586,14 +677,12 @@ bool stream_seek(stream_t *s, int64_t pos) pos = 0; } - if (pos == stream_tell(s)) - return true; - - if (pos < s->pos) { - int64_t x = pos - (s->pos - (int)s->buf_len); - if (x >= 0) { - s->buf_pos = x; - assert(s->buf_pos <= s->buf_len); + if (pos <= s->pos) { + int64_t x = pos - (s->pos - (int)s->buf_end); + if (x >= (int)s->buf_start) { + s->buf_cur = x; + assert(s->buf_cur >= s->buf_start); + assert(s->buf_cur <= s->buf_end); return true; } } @@ -601,35 +690,24 @@ bool stream_seek(stream_t *s, int64_t pos) if (s->mode == STREAM_WRITE) return s->seekable && s->seek(s, pos); - int64_t newpos = pos; - - MP_TRACE(s, "Seek from %" PRId64 " to %" PRId64 - " (with offset %d)\n", s->pos, pos, (int)(pos - newpos)); - - if (pos >= s->pos && !s->seekable && s->fast_skip) { - // skipping is handled by generic code below - } else if (!stream_seek_unbuffered(s, newpos)) { - return false; + // Skip data instead of performing a seek in some cases. + if (pos >= s->pos && + ((!s->seekable && s->fast_skip) || + pos - s->pos <= s->requested_buffer_size)) + { + return stream_skip_read(s, pos - stream_tell(s)); } - return stream_skip_read(s, pos - stream_tell(s)); + return stream_seek_unbuffered(s, pos); } bool stream_skip(stream_t *s, int64_t len) { - int64_t target = stream_tell(s) + len; - if (len < 0) - return stream_seek(s, target); - if (len > 2 * STREAM_BUFFER_SIZE && s->seekable) { - // Seek to 1 byte before target - this is the only way to distinguish - // skip-to-EOF and skip-past-EOF in general. Successful seeking means - // absolutely nothing, so test by doing a real read of the last byte. - if (!stream_seek(s, target - 1)) - return false; - stream_read_char(s); - return !stream_eof(s) && stream_tell(s) == target; - } - return stream_skip_read(s, len); + if (!stream_seek(s, stream_tell(s) + len)) + return false; + // Make sure s->eof is set correctly, even if a seek happened. + stream_read_more(s, 1); + return true; } int stream_control(stream_t *s, int cmd, void *arg) @@ -661,7 +739,9 @@ static const char *const bom[3] = {"\xEF\xBB\xBF", "\xFF\xFE", "\xFE\xFF"}; // Return utf16 argument for stream_read_line int stream_skip_bom(struct stream *s) { - bstr data = stream_peek(s, 4); + char buf[4]; + int len = stream_read_peek(s, buf, sizeof(buf)); + bstr data = {buf, len}; for (int n = 0; n < 3; n++) { if (bstr_startswith0(data, bom[n])) { stream_skip(s, strlen(bom[n])); |