summaryrefslogtreecommitdiffstats
path: root/stream/stream.c
diff options
context:
space:
mode:
Diffstat (limited to 'stream/stream.c')
-rw-r--r--stream/stream.c364
1 files changed, 222 insertions, 142 deletions
diff --git a/stream/stream.c b/stream/stream.c
index 063c033bd4..143c1a28e5 100644
--- a/stream/stream.c
+++ b/stream/stream.c
@@ -33,6 +33,7 @@
#include "misc/bstr.h"
#include "misc/thread_tools.h"
#include "common/msg.h"
+#include "options/m_config.h"
#include "options/options.h"
#include "options/path.h"
#include "osdep/timer.h"
@@ -94,7 +95,23 @@ static const stream_info_t *const stream_list[] = {
NULL
};
-static bool stream_seek_unbuffered(stream_t *s, int64_t newpos);
+struct stream_opts {
+ int64_t buffer_size;
+};
+
+#define OPT_BASE_STRUCT struct stream_opts
+
+const struct m_sub_options stream_conf = {
+ .opts = (const struct m_option[]){
+ OPT_BYTE_SIZE("stream-buffer-size", buffer_size, 0,
+ STREAM_FIXED_BUFFER_SIZE, 512 * 1024 * 1024),
+ {0}
+ },
+ .size = sizeof(struct stream_opts),
+ .defaults = &(const struct stream_opts){
+ .buffer_size = STREAM_FIXED_BUFFER_SIZE,
+ },
+};
// return -1 if not hex char
static int hex2dec(char c)
@@ -181,35 +198,91 @@ static const char *match_proto(const char *url, const char *proto)
return NULL;
}
-// Resize the current stream buffer, or do nothing if the size is adequate.
-// Caller must ensure the used buffer is not less than the new buffer size.
-// Calling this with 0 ensures it uses the default buffer size.
-static void stream_resize_buffer(struct stream *s, int new)
+// Read len bytes from the start position, and wrap around as needed. Limit the
+// a actually read data to the size of the buffer. Return amount of copied bytes.
+// len: max bytes to copy to dst
+// pos: index into s->buffer[], e.g. s->buf_start is byte 0
+// returns: bytes copied to dst (limited by len and available buffered data)
+static int ring_copy(struct stream *s, void *dst, int len, int pos)
{
- new = MPMAX(new, STREAM_BUFFER_SIZE);
+ assert(len >= 0);
- if (new == s->buffer_alloc)
- return;
+ if (pos < s->buf_start || pos > s->buf_end)
+ return 0;
- int buffer_used = s->buf_len - s->buf_pos;
- assert(buffer_used <= new);
+ int copied = 0;
+ len = MPMIN(len, s->buf_end - pos);
- void *nbuf = s->buffer_inline;
- if (new > STREAM_BUFFER_SIZE)
- nbuf = ta_alloc_size(s, new);
+ if (len && pos <= s->buffer_mask) {
+ int copy = MPMIN(len, s->buffer_mask + 1 - pos);
+ memcpy(dst, &s->buffer[pos], copy);
+ copied += copy;
+ len -= copy;
+ pos += copy;
+ }
- if (nbuf) {
- if (s->buffer)
- memmove(nbuf, &s->buffer[s->buf_pos], buffer_used);
- s->buf_pos = 0;
- s->buf_len = buffer_used;
+ if (len) {
+ memcpy((char *)dst + copied, &s->buffer[pos & s->buffer_mask], len);
+ copied += len;
+ }
- if (s->buffer != s->buffer_inline)
- ta_free(s->buffer);
+ return copied;
+}
- s->buffer = nbuf;
- s->buffer_alloc = new;
+// Resize the current stream buffer. Uses a larger size if needed to keep data.
+// Does nothing if the size is adequate. Calling this with 0 ensures it uses the
+// default buffer size if possible.
+// The caller must check whether enough data was really allocated.
+// Returns false if buffer allocation failed.
+static bool stream_resize_buffer(struct stream *s, uint32_t new)
+{
+ // Keep all valid buffer.
+ int old_used_len = s->buf_end - s->buf_start;
+ int old_pos = s->buf_cur - s->buf_start;
+ new = MPMAX(new, old_used_len);
+
+ new = MPMAX(new, s->requested_buffer_size);
+
+ // This much is always required.
+ new = MPMAX(new, STREAM_FIXED_BUFFER_SIZE);
+
+ new = mp_round_next_power_of_2(new);
+ if (!new || new > INT_MAX / 8)
+ return false;
+
+ if (new == s->buffer_mask + 1)
+ return true;
+
+ MP_DBG(s, "resize stream to %d bytes\n", new);
+
+ uint8_t *nbuf = s->buffer_inline;
+ if (new > STREAM_FIXED_BUFFER_SIZE) {
+ nbuf = ta_alloc_size(s, new);
+ } else {
+ static_assert(MP_IS_POWER_OF_2(STREAM_FIXED_BUFFER_SIZE), "");
+ assert(new == STREAM_FIXED_BUFFER_SIZE);
}
+ assert(nbuf != s->buffer);
+
+ if (!nbuf)
+ return false; // oom; tolerate it, caller needs to check if required
+
+ int new_len = 0;
+ if (s->buffer)
+ new_len = ring_copy(s, nbuf, new, s->buf_start);
+ assert(new_len == old_used_len);
+ assert(old_pos <= old_used_len);
+ s->buf_start = 0;
+ s->buf_cur = old_pos;
+ s->buf_end = new_len;
+
+ if (s->buffer != s->buffer_inline)
+ ta_free(s->buffer);
+
+ s->buffer = nbuf;
+ s->buffer_mask = new - 1;
+
+ return true;
}
static int stream_create_instance(const stream_info_t *sinfo,
@@ -236,6 +309,9 @@ static int stream_create_instance(const stream_info_t *sinfo,
if (!path)
return STREAM_NO_MATCH;
+ struct stream_opts *opts =
+ mp_get_config_group(NULL, args->global, &stream_conf);
+
stream_t *s = talloc_zero(NULL, stream_t);
s->global = args->global;
if (flags & STREAM_SILENT) {
@@ -249,11 +325,14 @@ static int stream_create_instance(const stream_info_t *sinfo,
s->path = talloc_strdup(s, path);
s->is_network = sinfo->is_network;
s->mode = flags & (STREAM_READ | STREAM_WRITE);
+ s->requested_buffer_size = opts->buffer_size;
int opt;
mp_read_option_raw(s->global, "access-references", &m_option_type_flag, &opt);
s->access_references = opt;
+ talloc_free(opts);
+
MP_VERBOSE(s, "Opening %s\n", url);
if (strlen(url) > INT_MAX / 8) {
@@ -282,8 +361,10 @@ static int stream_create_instance(const stream_info_t *sinfo,
if (!s->read_chunk)
s->read_chunk = 4 * STREAM_BUFFER_SIZE;
- stream_resize_buffer(s, 0);
- MP_HANDLE_OOM(s->buffer);
+ if (!stream_resize_buffer(s, 0)) {
+ free_stream(s);
+ return STREAM_ERROR;
+ }
assert(s->seekable == !!s->seek);
@@ -391,78 +472,99 @@ static int stream_read_unbuffered(stream_t *s, void *buf, int len)
return res;
}
-// Ask for having "total" bytes ready to read in the stream buffer. This can do
-// a partial read if requested, so it can actually read less.
+// Ask for having at most "forward" bytes ready to read in the buffer.
// To read everything, you may have to call this in a loop.
-// total: desired amount of bytes in buffer
-// allow_short: if true, attempt at most once to read more if needed
-// returns: actual bytes in buffer (can be smaller or larger than total)
-static int stream_extend_buffer(struct stream *s, int total, bool allow_short)
-{
- assert(total >= 0);
-
- if (s->buf_len - s->buf_pos < total) {
- // Move to front to guarantee we really can read up to max size.
- s->buf_len = s->buf_len - s->buf_pos;
- memmove(s->buffer, &s->buffer[s->buf_pos], s->buf_len);
- s->buf_pos = 0;
-
- // Read ahead by about as much as stream_fill_buffer() would, to avoid
- // that many small stream_peek() calls will read the buffer at these
- // quantities.
- total = MPMAX(total, STREAM_BUFFER_SIZE);
-
- // Allocate more if the buffer is too small. Also, if the buffer is
- // larger than needed, resize it to smaller. This assumes stream_peek()
- // calls are rare or done with small sizes.
- stream_resize_buffer(s, total);
-
- // Read less if allocation above failed.
- total = MPMIN(total, s->buffer_alloc);
-
- // Fill rest of the buffer. Can be partial.
- while (total > s->buf_len) {
- int read = stream_read_unbuffered(s, &s->buffer[s->buf_len],
- total - s->buf_len);
- s->buf_len += read;
- if (allow_short || !read)
- break;
- }
+// forward: desired amount of bytes in buffer after s->cur_pos
+// returns: progress (false on EOF or memory allocation failure)
+static bool stream_read_more(struct stream *s, int forward)
+{
+ assert(forward >= 0);
+
+ int forward_avail = s->buf_end - s->buf_cur;
+ if (forward_avail >= forward)
+ return true;
+
+ // Avoid that many small reads will lead to many low-level read calls.
+ forward = MPMAX(forward, s->requested_buffer_size / 2);
+
+ // Keep guaranteed seek-back.
+ int buf_old = MPMIN(s->buf_cur - s->buf_start, s->requested_buffer_size / 2);
- if (s->buf_len)
- s->eof = 0;
+ if (!stream_resize_buffer(s, buf_old + forward))
+ return false;
+
+ int buf_alloc = s->buffer_mask + 1;
+
+ assert(s->buf_start <= s->buf_cur);
+ assert(s->buf_cur <= s->buf_end);
+ assert(s->buf_cur < buf_alloc * 2);
+ assert(s->buf_end < buf_alloc * 2);
+ assert(s->buf_start < buf_alloc);
+
+ // Note: read as much as possible, even if forward is much smaller. Do
+ // this because the stream buffer is supposed to set an approx. minimum
+ // read size on it.
+ int read = buf_alloc - buf_old - forward_avail; // free buffer past end
+
+ int pos = s->buf_end & s->buffer_mask;
+ read = MPMIN(read, buf_alloc - pos);
+
+ // Note: if wrap-around happens, we need to make two calls. This may
+ // affect latency (e.g. waiting for new data on a socket), so do only
+ // 1 read call always.
+ read = stream_read_unbuffered(s, &s->buffer[pos], read);
+
+ s->buf_end += read;
+
+ // May have overwritten old data.
+ if (s->buf_end - s->buf_start >= buf_alloc) {
+ assert(s->buf_end >= buf_alloc);
+
+ s->buf_start = s->buf_end - buf_alloc;
+
+ assert(s->buf_start <= s->buf_cur);
+ assert(s->buf_cur <= s->buf_end);
+
+ if (s->buf_start >= buf_alloc) {
+ s->buf_start -= buf_alloc;
+ s->buf_cur -= buf_alloc;
+ s->buf_end -= buf_alloc;
+ }
}
- return s->buf_len - s->buf_pos;
-}
+ // Must not have overwritten guaranteed old data.
+ assert(s->buf_cur - s->buf_start >= buf_old);
-int stream_fill_buffer(stream_t *s)
-{
- return stream_extend_buffer(s, STREAM_BUFFER_SIZE, true);
+ if (s->buf_cur < s->buf_end)
+ s->eof = 0;
+
+ return !!read;
}
// Read between 1..buf_size bytes of data, return how much data has been read.
// Return 0 on EOF, error, or if buf_size was 0.
int stream_read_partial(stream_t *s, char *buf, int buf_size)
{
- assert(s->buf_pos <= s->buf_len);
+ assert(s->buf_cur <= s->buf_end);
assert(buf_size >= 0);
- if (s->buf_pos == s->buf_len && buf_size > 0) {
- s->buf_pos = s->buf_len = 0;
- stream_resize_buffer(s, 0);
- // Do a direct read
- // Also, small reads will be more efficient with buffering & copying
- if (buf_size >= STREAM_BUFFER_SIZE)
+ if (s->buf_cur == s->buf_end && buf_size > 0) {
+ if (buf_size > (s->buffer_mask + 1) / 2) {
+ // Direct read if the buffer is too small anyway.
+ stream_drop_buffers(s);
return stream_read_unbuffered(s, buf, buf_size);
- if (!stream_fill_buffer(s))
- return 0;
+ }
+ stream_read_more(s, 1);
}
- int len = MPMIN(buf_size, s->buf_len - s->buf_pos);
- memcpy(buf, &s->buffer[s->buf_pos], len);
- s->buf_pos += len;
- if (len > 0)
- s->eof = 0;
- return len;
+ int res = ring_copy(s, buf, buf_size, s->buf_cur);
+ s->buf_cur += res;
+ return res;
+}
+
+// Slow version of stream_read_char(); called by it if the buffer is empty.
+int stream_read_char_fallback(stream_t *s)
+{
+ uint8_t c;
+ return stream_read_partial(s, &c, 1) ? c : -256;
}
int stream_read(stream_t *s, char *mem, int total)
@@ -476,33 +578,18 @@ int stream_read(stream_t *s, char *mem, int total)
len -= read;
}
total -= len;
- if (total > 0)
- s->eof = 0;
return total;
}
-// Read ahead at most len bytes without changing the read position. Return a
-// pointer to the internal buffer, starting from the current read position.
-// Reading ahead may require memory allocation. If allocation fails, read ahead
-// is silently limited to the last successful allocation.
-// The returned buffer becomes invalid on the next stream call, and you must
-// not write to it.
-struct bstr stream_peek(stream_t *s, int len)
-{
- assert(len >= 0);
-
- int avail = stream_extend_buffer(s, len, false);
- return (bstr){.start = &s->buffer[s->buf_pos], .len = MPMIN(len, avail)};
-}
-
-// Peek the current buffer. This will return at least 1 byte, unless EOF was
-// reached. If data is returned, the length is essentially random.
-struct bstr stream_peek_buffer(stream_t *s)
+// Like stream_read(), but do not advance the current position. This may resize
+// the buffer to satisfy the read request.
+int stream_read_peek(stream_t *s, void* buf, int buf_size)
{
- if (s->buf_len - s->buf_pos < 1)
- stream_fill_buffer(s);
- return (bstr){.start = &s->buffer[s->buf_pos],
- .len = s->buf_len - s->buf_pos};
+ while (s->buf_end - s->buf_cur < buf_size) {
+ if (!stream_read_more(s, buf_size))
+ break;
+ }
+ return ring_copy(s, buf, buf_size, s->buf_cur);
}
int stream_write_buffer(stream_t *s, unsigned char *buf, int len)
@@ -527,14 +614,14 @@ int stream_write_buffer(stream_t *s, unsigned char *buf, int len)
static bool stream_skip_read(struct stream *s, int64_t len)
{
while (len > 0) {
- unsigned int left = s->buf_len - s->buf_pos;
+ unsigned int left = s->buf_end - s->buf_cur;
if (!left) {
- if (!stream_fill_buffer(s))
+ if (!stream_read_more(s, 1))
return false;
continue;
}
unsigned skip = MPMIN(len, left);
- s->buf_pos += skip;
+ s->buf_cur += skip;
len -= skip;
}
return true;
@@ -546,7 +633,7 @@ static bool stream_skip_read(struct stream *s, int64_t len)
void stream_drop_buffers(stream_t *s)
{
s->pos = stream_tell(s);
- s->buf_pos = s->buf_len = 0;
+ s->buf_start = s->buf_cur = s->buf_end = 0;
s->eof = 0;
stream_resize_buffer(s, 0);
}
@@ -555,6 +642,9 @@ void stream_drop_buffers(stream_t *s)
static bool stream_seek_unbuffered(stream_t *s, int64_t newpos)
{
if (newpos != s->pos) {
+ MP_VERBOSE(s, "stream level seek from %" PRId64 " to %" PRId64 "\n",
+ s->pos, newpos);
+
if (newpos > s->pos && !s->seekable) {
MP_ERR(s, "Cannot seek forward in this stream\n");
return false;
@@ -577,7 +667,8 @@ static bool stream_seek_unbuffered(stream_t *s, int64_t newpos)
bool stream_seek(stream_t *s, int64_t pos)
{
- MP_TRACE(s, "seek to %lld\n", (long long)pos);
+ MP_TRACE(s, "seek request from %" PRId64 " to %" PRId64 "\n",
+ stream_tell(s), pos);
s->eof = 0; // eof should be set only on read; seeking always clears it
@@ -586,14 +677,12 @@ bool stream_seek(stream_t *s, int64_t pos)
pos = 0;
}
- if (pos == stream_tell(s))
- return true;
-
- if (pos < s->pos) {
- int64_t x = pos - (s->pos - (int)s->buf_len);
- if (x >= 0) {
- s->buf_pos = x;
- assert(s->buf_pos <= s->buf_len);
+ if (pos <= s->pos) {
+ int64_t x = pos - (s->pos - (int)s->buf_end);
+ if (x >= (int)s->buf_start) {
+ s->buf_cur = x;
+ assert(s->buf_cur >= s->buf_start);
+ assert(s->buf_cur <= s->buf_end);
return true;
}
}
@@ -601,35 +690,24 @@ bool stream_seek(stream_t *s, int64_t pos)
if (s->mode == STREAM_WRITE)
return s->seekable && s->seek(s, pos);
- int64_t newpos = pos;
-
- MP_TRACE(s, "Seek from %" PRId64 " to %" PRId64
- " (with offset %d)\n", s->pos, pos, (int)(pos - newpos));
-
- if (pos >= s->pos && !s->seekable && s->fast_skip) {
- // skipping is handled by generic code below
- } else if (!stream_seek_unbuffered(s, newpos)) {
- return false;
+ // Skip data instead of performing a seek in some cases.
+ if (pos >= s->pos &&
+ ((!s->seekable && s->fast_skip) ||
+ pos - s->pos <= s->requested_buffer_size))
+ {
+ return stream_skip_read(s, pos - stream_tell(s));
}
- return stream_skip_read(s, pos - stream_tell(s));
+ return stream_seek_unbuffered(s, pos);
}
bool stream_skip(stream_t *s, int64_t len)
{
- int64_t target = stream_tell(s) + len;
- if (len < 0)
- return stream_seek(s, target);
- if (len > 2 * STREAM_BUFFER_SIZE && s->seekable) {
- // Seek to 1 byte before target - this is the only way to distinguish
- // skip-to-EOF and skip-past-EOF in general. Successful seeking means
- // absolutely nothing, so test by doing a real read of the last byte.
- if (!stream_seek(s, target - 1))
- return false;
- stream_read_char(s);
- return !stream_eof(s) && stream_tell(s) == target;
- }
- return stream_skip_read(s, len);
+ if (!stream_seek(s, stream_tell(s) + len))
+ return false;
+ // Make sure s->eof is set correctly, even if a seek happened.
+ stream_read_more(s, 1);
+ return true;
}
int stream_control(stream_t *s, int cmd, void *arg)
@@ -661,7 +739,9 @@ static const char *const bom[3] = {"\xEF\xBB\xBF", "\xFF\xFE", "\xFE\xFF"};
// Return utf16 argument for stream_read_line
int stream_skip_bom(struct stream *s)
{
- bstr data = stream_peek(s, 4);
+ char buf[4];
+ int len = stream_read_peek(s, buf, sizeof(buf));
+ bstr data = {buf, len};
for (int n = 0; n < 3; n++) {
if (bstr_startswith0(data, bom[n])) {
stream_skip(s, strlen(bom[n]));