summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorwm4 <wm4@nowhere>2019-11-06 21:36:02 +0100
committerwm4 <wm4@nowhere>2019-11-06 21:36:02 +0100
commitf37f4de8496556afaa024e39e2efb433eb1680d4 (patch)
tree1ade8205598a3142d1fcbe5f32f461266ce81b76
parentabb089431d0467c3609207d0b27359ef08a6c16d (diff)
downloadmpv-f37f4de8496556afaa024e39e2efb433eb1680d4.tar.bz2
mpv-f37f4de8496556afaa024e39e2efb433eb1680d4.tar.xz
stream: turn into a ring buffer, make size configurable
In some corner cases (see #6802), it can be beneficial to use a larger stream buffer size. Use this as argument to rewrite everything for no reason. Turn stream.c itself into a ring buffer, with configurable size. The latter would have been easily achievable with minimal changes, and the ring buffer is the hard part. There is no reason to have a ring buffer at all, except possibly if ffmpeg don't fix their awful mp4 demuxer, and some subtle issues with demux_mkv.c wanting to seek back by small offsets (the latter was handled with small stream_peek() calls, which are unneeded now). In addition, this turns small forward seeks into reads (where data is simply skipped). Before this commit, only stream_skip() did this (which also mean that stream_skip() simply calls stream_seek() now). Replace all stream_peek() calls with something else (usually stream_read_peek()). The function was a problem, because it returned a pointer to the internal buffer, which is now a ring buffer with wrapping. The new function just copies the data into a buffer, and in some cases requires callers to dynamically allocate memory. (The most common case, demux_lavf.c, required a separate buffer allocation anyway due to FFmpeg "idiosyncrasies".) This is the bulk of the demuxer_* changes. I'm not happy with this. There still isn't a good reason why there should be a ring buffer, that is complex, and most of the time just wastes half of the available memory. Maybe another rewrite soon. It also contains bugs; you're an alpha tester now.
-rw-r--r--DOCS/man/options.rst26
-rw-r--r--demux/demux_cue.c5
-rw-r--r--demux/demux_disc.c2
-rw-r--r--demux/demux_edl.c4
-rw-r--r--demux/demux_lavf.c7
-rw-r--r--demux/demux_libarchive.c8
-rw-r--r--demux/demux_mkv.c10
-rw-r--r--demux/demux_playlist.c18
-rw-r--r--options/options.c1
-rw-r--r--options/options.h2
-rw-r--r--stream/stream.c364
-rw-r--r--stream/stream.h46
-rw-r--r--wscript2
13 files changed, 317 insertions, 178 deletions
diff --git a/DOCS/man/options.rst b/DOCS/man/options.rst
index 8dcbe1bcd8..3ae3264969 100644
--- a/DOCS/man/options.rst
+++ b/DOCS/man/options.rst
@@ -4196,6 +4196,32 @@ Cache
Currently, this is used for ``--cache-on-disk`` only.
+``--stream-buffer-size=<bytesize>``
+ Size of the low level stream byte buffer (default: 4KB). This is used as
+ buffer between demuxer and low level I/O (e.g. sockets). Generally, this
+ can be very small, and the main purpose is similar to the internal buffer
+ FILE in the C standard library will have.
+
+ Half of the buffer is always used for guaranteed seek back, which is
+ important for unseekable input.
+
+ There are known cases where this can help performance to set a large buffer:
+
+ 1. mp4 files. libavformat may trigger many small seeks in both
+ directions, depending on how the file was muxed.
+
+ 2. Certain network filesystems, which do not have a cache, and where
+ small reads can be inefficient.
+
+ In other cases, setting this to a large value can reduce performance.
+
+ Usually, read accesses are at half the buffer size, but it may happen that
+ accesses are done alternating with smaller and larger sizes (this is due to
+ the internal ring buffer wrap-around).
+
+ See ``--list-options`` for defaults and value range. ``<bytesize>`` options
+ accept suffixes such as ``KiB`` and ``MiB``.
+
Network
-------
diff --git a/demux/demux_cue.c b/demux/demux_cue.c
index 59628c74b4..35874972b9 100644
--- a/demux/demux_cue.c
+++ b/demux/demux_cue.c
@@ -265,8 +265,9 @@ static int try_open_file(struct demuxer *demuxer, enum demux_check check)
struct stream *s = demuxer->stream;
if (check >= DEMUX_CHECK_UNSAFE) {
- bstr d = stream_peek(s, PROBE_SIZE);
- if (d.len < 1 || !mp_probe_cue(d))
+ char probe[PROBE_SIZE];
+ int len = stream_read_peek(s, probe, sizeof(probe));
+ if (len < 1 || !mp_probe_cue((bstr){probe, len}))
return -1;
}
struct priv *p = talloc_zero(demuxer, struct priv);
diff --git a/demux/demux_disc.c b/demux/demux_disc.c
index 13e73caa39..919360d074 100644
--- a/demux/demux_disc.c
+++ b/demux/demux_disc.c
@@ -321,7 +321,7 @@ static int d_open(demuxer_t *demuxer, enum demux_check check)
// Initialize the playback time. We need to read _some_ data to get the
// correct stream-layer time (at least with libdvdnav).
- stream_peek(demuxer->stream, 1);
+ stream_read_peek(demuxer->stream, &(char){0}, 1);
reset_pts(demuxer);
p->slave = demux_open_url("-", &params, demuxer->cancel, demuxer->global);
diff --git a/demux/demux_edl.c b/demux/demux_edl.c
index 62e6c50070..aa650a383f 100644
--- a/demux/demux_edl.c
+++ b/demux/demux_edl.c
@@ -459,7 +459,9 @@ static int try_open_file(struct demuxer *demuxer, enum demux_check check)
return 0;
}
if (check >= DEMUX_CHECK_UNSAFE) {
- if (!bstr_equals0(stream_peek(s, strlen(HEADER)), HEADER))
+ char header[sizeof(HEADER) - 1];
+ int len = stream_read_peek(s, header, sizeof(header));
+ if (len != strlen(HEADER) || memcmp(header, HEADER, len) != 0)
return -1;
}
p->data = stream_read_complete(s, demuxer, 1000000);
diff --git a/demux/demux_lavf.c b/demux/demux_lavf.c
index 670b611375..9ac6495f15 100644
--- a/demux/demux_lavf.c
+++ b/demux/demux_lavf.c
@@ -458,11 +458,10 @@ static int lavf_check_file(demuxer_t *demuxer, enum demux_check check)
} else {
int nsize = av_clip(avpd.buf_size * 2, INITIAL_PROBE_SIZE,
PROBE_BUF_SIZE);
- bstr buf = stream_peek(s, nsize);
- if (buf.len <= avpd.buf_size)
+ nsize = stream_read_peek(s, avpd.buf, nsize);
+ if (nsize <= avpd.buf_size)
final_probe = true;
- memcpy(avpd.buf, buf.start, buf.len);
- avpd.buf_size = buf.len;
+ avpd.buf_size = nsize;
priv->avif = av_probe_input_format2(&avpd, avpd.buf_size > 0, &score);
}
diff --git a/demux/demux_libarchive.c b/demux/demux_libarchive.c
index 3540082179..bb2ca24862 100644
--- a/demux/demux_libarchive.c
+++ b/demux/demux_libarchive.c
@@ -43,15 +43,17 @@ static int open_file(struct demuxer *demuxer, enum demux_check check)
probe_size *= 100;
}
- bstr probe = stream_peek(demuxer->stream, probe_size);
- if (probe.len == 0)
+ void *probe = ta_alloc_size(NULL, probe_size);
+ if (!probe)
return -1;
+ int probe_got = stream_read_peek(demuxer->stream, probe, probe_size);
struct stream *probe_stream =
- stream_memory_open(demuxer->global, probe.start, probe.len);
+ stream_memory_open(demuxer->global, probe, probe_got);
struct mp_archive *mpa = mp_archive_new(mp_null_log, probe_stream, flags);
bool ok = !!mpa;
free_stream(probe_stream);
mp_archive_free(mpa);
+ ta_free(probe);
if (!ok)
return -1;
diff --git a/demux/demux_mkv.c b/demux/demux_mkv.c
index 5eaa7bbb07..db1ccff13a 100644
--- a/demux/demux_mkv.c
+++ b/demux/demux_mkv.c
@@ -1999,13 +1999,9 @@ static int demux_mkv_open(demuxer_t *demuxer, enum demux_check check)
if (demuxer->params)
mkv_d->probably_webm_dash_init = demuxer->params->init_fragment.len > 0;
- bstr start = stream_peek(s, 4);
- uint32_t start_id = 0;
- for (int n = 0; n < start.len; n++)
- start_id = (start_id << 8) | start.start[n];
- if (start_id != EBML_ID_EBML)
+ // Make sure you can seek back after read_ebml_header() if no EBML ID.
+ if (stream_read_peek(s, &(char[4]){0}, 4) != 4)
return -1;
-
if (!read_ebml_header(demuxer))
return -1;
MP_DBG(demuxer, "Found the head...\n");
@@ -2027,7 +2023,6 @@ static int demux_mkv_open(demuxer_t *demuxer, enum demux_check check)
while (1) {
start_pos = stream_tell(s);
- stream_peek(s, 4); // make sure we can always seek back
uint32_t id = ebml_read_id(s);
if (s->eof) {
if (!mkv_d->probably_webm_dash_init)
@@ -2836,7 +2831,6 @@ static int read_next_block_into_queue(demuxer_t *demuxer)
find_next_cluster:
mkv_d->cluster_end = 0;
for (;;) {
- stream_peek(s, 4); // guarantee we can undo ebml_read_id() below
mkv_d->cluster_start = stream_tell(s);
uint32_t id = ebml_read_id(s);
if (id == MATROSKA_ID_CLUSTER)
diff --git a/demux/demux_playlist.c b/demux/demux_playlist.c
index 95c89a18e2..a7650b2b69 100644
--- a/demux/demux_playlist.c
+++ b/demux/demux_playlist.c
@@ -92,12 +92,13 @@ static int read_characters(stream_t *s, uint8_t *dst, int dstsize, int utf16)
}
return cur - dst;
} else {
- bstr buf = stream_peek_buffer(s);
- uint8_t *end = memchr(buf.start, '\n', buf.len);
- int len = end ? end - buf.start + 1 : buf.len;
+ uint8_t buf[1024];
+ int buf_len = stream_read_peek(s, buf, sizeof(buf));
+ uint8_t *end = memchr(buf, '\n', buf_len);
+ int len = end ? end - buf + 1 : buf_len;
if (len > dstsize)
return -1; // line too long
- memcpy(dst, buf.start, len);
+ memcpy(dst, buf, len);
stream_skip(s, len);
return len;
}
@@ -178,7 +179,9 @@ static int parse_m3u(struct pl_parser *p)
// Last resort: if the file extension is m3u, it might be headerless.
if (p->check_level == DEMUX_CHECK_UNSAFE) {
char *ext = mp_splitext(p->real_stream->url, NULL);
- bstr data = stream_peek(p->real_stream, PROBE_SIZE);
+ char probe[PROBE_SIZE];
+ int len = stream_read_peek(p->real_stream, probe, sizeof(probe));
+ bstr data = {probe, len};
if (ext && data.len > 10 && maybe_text(data)) {
const char *exts[] = {"m3u", "m3u8", NULL};
for (int n = 0; exts[n]; n++) {
@@ -437,8 +440,9 @@ static int open_file(struct demuxer *demuxer, enum demux_check check)
p->real_stream = demuxer->stream;
p->add_base = true;
- bstr probe_buf = stream_peek(demuxer->stream, PROBE_SIZE);
- p->s = stream_memory_open(demuxer->global, probe_buf.start, probe_buf.len);
+ char probe[PROBE_SIZE];
+ int probe_len = stream_read_peek(p->real_stream, probe, sizeof(probe));
+ p->s = stream_memory_open(demuxer->global, probe, probe_len);
p->s->mime_type = demuxer->stream->mime_type;
p->utf16 = stream_skip_bom(p->s);
p->force = force;
diff --git a/options/options.c b/options/options.c
index 4fbf9d6401..79ae3ca5ab 100644
--- a/options/options.c
+++ b/options/options.c
@@ -711,6 +711,7 @@ const m_option_t mp_opts[] = {
OPT_SUBSTRUCT("", vo, vo_sub_opts, 0),
OPT_SUBSTRUCT("", demux_opts, demux_conf, 0),
OPT_SUBSTRUCT("", demux_cache_opts, demux_cache_conf, 0),
+ OPT_SUBSTRUCT("", stream_opts, stream_conf, 0),
OPT_SUBSTRUCT("", gl_video_opts, gl_video_conf, 0),
OPT_SUBSTRUCT("", spirv_opts, spirv_conf, 0),
diff --git a/options/options.h b/options/options.h
index 441d06b285..7af86ab9fb 100644
--- a/options/options.h
+++ b/options/options.h
@@ -307,6 +307,7 @@ typedef struct MPOpts {
struct demux_opts *demux_opts;
struct demux_cache_opts *demux_cache_opts;
+ struct stream_opts *stream_opts;
struct vd_lavc_params *vd_lavc_params;
struct ad_lavc_params *ad_lavc_params;
@@ -360,5 +361,6 @@ extern const struct m_sub_options mp_subtitle_sub_opts;
extern const struct m_sub_options mp_osd_render_sub_opts;
extern const struct m_sub_options filter_conf;
extern const struct m_sub_options resample_conf;
+extern const struct m_sub_options stream_conf;
#endif
diff --git a/stream/stream.c b/stream/stream.c
index 063c033bd4..143c1a28e5 100644
--- a/stream/stream.c
+++ b/stream/stream.c
@@ -33,6 +33,7 @@
#include "misc/bstr.h"
#include "misc/thread_tools.h"
#include "common/msg.h"
+#include "options/m_config.h"
#include "options/options.h"
#include "options/path.h"
#include "osdep/timer.h"
@@ -94,7 +95,23 @@ static const stream_info_t *const stream_list[] = {
NULL
};
-static bool stream_seek_unbuffered(stream_t *s, int64_t newpos);
+struct stream_opts {
+ int64_t buffer_size;
+};
+
+#define OPT_BASE_STRUCT struct stream_opts
+
+const struct m_sub_options stream_conf = {
+ .opts = (const struct m_option[]){
+ OPT_BYTE_SIZE("stream-buffer-size", buffer_size, 0,
+ STREAM_FIXED_BUFFER_SIZE, 512 * 1024 * 1024),
+ {0}
+ },
+ .size = sizeof(struct stream_opts),
+ .defaults = &(const struct stream_opts){
+ .buffer_size = STREAM_FIXED_BUFFER_SIZE,
+ },
+};
// return -1 if not hex char
static int hex2dec(char c)
@@ -181,35 +198,91 @@ static const char *match_proto(const char *url, const char *proto)
return NULL;
}
-// Resize the current stream buffer, or do nothing if the size is adequate.
-// Caller must ensure the used buffer is not less than the new buffer size.
-// Calling this with 0 ensures it uses the default buffer size.
-static void stream_resize_buffer(struct stream *s, int new)
+// Read len bytes from the start position, and wrap around as needed. Limit the
+// a actually read data to the size of the buffer. Return amount of copied bytes.
+// len: max bytes to copy to dst
+// pos: index into s->buffer[], e.g. s->buf_start is byte 0
+// returns: bytes copied to dst (limited by len and available buffered data)
+static int ring_copy(struct stream *s, void *dst, int len, int pos)
{
- new = MPMAX(new, STREAM_BUFFER_SIZE);
+ assert(len >= 0);
- if (new == s->buffer_alloc)
- return;
+ if (pos < s->buf_start || pos > s->buf_end)
+ return 0;
- int buffer_used = s->buf_len - s->buf_pos;
- assert(buffer_used <= new);
+ int copied = 0;
+ len = MPMIN(len, s->buf_end - pos);
- void *nbuf = s->buffer_inline;
- if (new > STREAM_BUFFER_SIZE)
- nbuf = ta_alloc_size(s, new);
+ if (len && pos <= s->buffer_mask) {
+ int copy = MPMIN(len, s->buffer_mask + 1 - pos);
+ memcpy(dst, &s->buffer[pos], copy);
+ copied += copy;
+ len -= copy;
+ pos += copy;
+ }
- if (nbuf) {
- if (s->buffer)
- memmove(nbuf, &s->buffer[s->buf_pos], buffer_used);
- s->buf_pos = 0;
- s->buf_len = buffer_used;
+ if (len) {
+ memcpy((char *)dst + copied, &s->buffer[pos & s->buffer_mask], len);
+ copied += len;
+ }
- if (s->buffer != s->buffer_inline)
- ta_free(s->buffer);
+ return copied;
+}
- s->buffer = nbuf;
- s->buffer_alloc = new;
+// Resize the current stream buffer. Uses a larger size if needed to keep data.
+// Does nothing if the size is adequate. Calling this with 0 ensures it uses the
+// default buffer size if possible.
+// The caller must check whether enough data was really allocated.
+// Returns false if buffer allocation failed.
+static bool stream_resize_buffer(struct stream *s, uint32_t new)
+{
+ // Keep all valid buffer.
+ int old_used_len = s->buf_end - s->buf_start;
+ int old_pos = s->buf_cur - s->buf_start;
+ new = MPMAX(new, old_used_len);
+
+ new = MPMAX(new, s->requested_buffer_size);
+
+ // This much is always required.
+ new = MPMAX(new, STREAM_FIXED_BUFFER_SIZE);
+
+ new = mp_round_next_power_of_2(new);
+ if (!new || new > INT_MAX / 8)
+ return false;
+
+ if (new == s->buffer_mask + 1)
+ return true;
+
+ MP_DBG(s, "resize stream to %d bytes\n", new);
+
+ uint8_t *nbuf = s->buffer_inline;
+ if (new > STREAM_FIXED_BUFFER_SIZE) {
+ nbuf = ta_alloc_size(s, new);
+ } else {
+ static_assert(MP_IS_POWER_OF_2(STREAM_FIXED_BUFFER_SIZE), "");
+ assert(new == STREAM_FIXED_BUFFER_SIZE);
}
+ assert(nbuf != s->buffer);
+
+ if (!nbuf)
+ return false; // oom; tolerate it, caller needs to check if required
+
+ int new_len = 0;
+ if (s->buffer)
+ new_len = ring_copy(s, nbuf, new, s->buf_start);
+ assert(new_len == old_used_len);
+ assert(old_pos <= old_used_len);
+ s->buf_start = 0;
+ s->buf_cur = old_pos;
+ s->buf_end = new_len;
+
+ if (s->buffer != s->buffer_inline)
+ ta_free(s->buffer);
+
+ s->buffer = nbuf;
+ s->buffer_mask = new - 1;
+
+ return true;
}
static int stream_create_instance(const stream_info_t *sinfo,
@@ -236,6 +309,9 @@ static int stream_create_instance(const stream_info_t *sinfo,
if (!path)
return STREAM_NO_MATCH;
+ struct stream_opts *opts =
+ mp_get_config_group(NULL, args->global, &stream_conf);
+
stream_t *s = talloc_zero(NULL, stream_t);
s->global = args->global;
if (flags & STREAM_SILENT) {
@@ -249,11 +325,14 @@ static int stream_create_instance(const stream_info_t *sinfo,
s->path = talloc_strdup(s, path);
s->is_network = sinfo->is_network;
s->mode = flags & (STREAM_READ | STREAM_WRITE);
+ s->requested_buffer_size = opts->buffer_size;
int opt;
mp_read_option_raw(s->global, "access-references", &m_option_type_flag, &opt);
s->access_references = opt;
+ talloc_free(opts);
+
MP_VERBOSE(s, "Opening %s\n", url);
if (strlen(url) > INT_MAX / 8) {
@@ -282,8 +361,10 @@ static int stream_create_instance(const stream_info_t *sinfo,
if (!s->read_chunk)
s->read_chunk = 4 * STREAM_BUFFER_SIZE;
- stream_resize_buffer(s, 0);
- MP_HANDLE_OOM(s->buffer);
+ if (!stream_resize_buffer(s, 0)) {
+ free_stream(s);
+ return STREAM_ERROR;
+ }
assert(s->seekable == !!s->seek);
@@ -391,78 +472,99 @@ static int stream_read_unbuffered(stream_t *s, void *buf, int len)
return res;
}
-// Ask for having "total" bytes ready to read in the stream buffer. This can do
-// a partial read if requested, so it can actually read less.
+// Ask for having at most "forward" bytes ready to read in the buffer.
// To read everything, you may have to call this in a loop.
-// total: desired amount of bytes in buffer
-// allow_short: if true, attempt at most once to read more if needed
-// returns: actual bytes in buffer (can be smaller or larger than total)
-static int stream_extend_buffer(struct stream *s, int total, bool allow_short)
-{
- assert(total >= 0);
-
- if (s->buf_len - s->buf_pos < total) {
- // Move to front to guarantee we really can read up to max size.
- s->buf_len = s->buf_len - s->buf_pos;
- memmove(s->buffer, &s->buffer[s->buf_pos], s->buf_len);
- s->buf_pos = 0;
-
- // Read ahead by about as much as stream_fill_buffer() would, to avoid
- // that many small stream_peek() calls will read the buffer at these
- // quantities.
- total = MPMAX(total, STREAM_BUFFER_SIZE);
-
- // Allocate more if the buffer is too small. Also, if the buffer is
- // larger than needed, resize it to smaller. This assumes stream_peek()
- // calls are rare or done with small sizes.
- stream_resize_buffer(s, total);
-
- // Read less if allocation above failed.
- total = MPMIN(total, s->buffer_alloc);
-
- // Fill rest of the buffer. Can be partial.
- while (total > s->buf_len) {
- int read = stream_read_unbuffered(s, &s->buffer[s->buf_len],
- total - s->buf_len);
- s->buf_len += read;
- if (allow_short || !read)
- break;
- }
+// forward: desired amount of bytes in buffer after s->cur_pos
+// returns: progress (false on EOF or memory allocation failure)
+static bool stream_read_more(struct stream *s, int forward)
+{
+ assert(forward >= 0);
+
+ int forward_avail = s->buf_end - s->buf_cur;
+ if (forward_avail >= forward)
+ return true;
+
+ // Avoid that many small reads will lead to many low-level read calls.
+ forward = MPMAX(forward, s->requested_buffer_size / 2);
+
+ // Keep guaranteed seek-back.
+ int buf_old = MPMIN(s->buf_cur - s->buf_start, s->requested_buffer_size / 2);
- if (s->buf_len)
- s->eof = 0;
+ if (!stream_resize_buffer(s, buf_old + forward))
+ return false;
+
+ int buf_alloc = s->buffer_mask + 1;
+
+ assert(s->buf_start <= s->buf_cur);
+ assert(s->buf_cur <= s->buf_end);
+ assert(s->buf_cur < buf_alloc * 2);
+ assert(s->buf_end < buf_alloc * 2);
+ assert(s->buf_start < buf_alloc);
+
+ // Note: read as much as possible, even if forward is much smaller. Do
+ // this because the stream buffer is supposed to set an approx. minimum
+ // read size on it.
+ int read = buf_alloc - buf_old - forward_avail; // free buffer past end
+
+ int pos = s->buf_end & s->buffer_mask;
+ read = MPMIN(read, buf_alloc - pos);
+
+ // Note: if wrap-around happens, we need to make two calls. This may
+ // affect latency (e.g. waiting for new data on a socket), so do only
+ // 1 read call always.
+ read = stream_read_unbuffered(s, &s->buffer[pos], read);
+
+ s->buf_end += read;
+
+ // May have overwritten old data.
+ if (s->buf_end - s->buf_start >= buf_alloc) {
+ assert(s->buf_end >= buf_alloc);
+
+ s->buf_start = s->buf_end - buf_alloc;
+
+ assert(s->buf_start <= s->buf_cur);
+ assert(s->buf_cur <= s->buf_end);
+
+ if (s->buf_start >= buf_alloc) {
+ s->buf_start -= buf_alloc;
+ s->buf_cur -= buf_alloc;
+ s->buf_end -= buf_alloc;
+ }
}
- return s->buf_len - s->buf_pos;
-}
+ // Must not have overwritten guaranteed old data.
+ assert(s->buf_cur - s->buf_start >= buf_old);
-int stream_fill_buffer(stream_t *s)
-{
- return stream_extend_buffer(s, STREAM_BUFFER_SIZE, true);
+ if (s->buf_cur < s->buf_end)
+ s->eof = 0;
+
+ return !!read;
}
// Read between 1..buf_size bytes of data, return how much data has been read.
// Return 0 on EOF, error, or if buf_size was 0.
int stream_read_partial(stream_t *s, char *buf, int buf_size)
{
- assert(s->buf_pos <= s->buf_len);
+ assert(s->buf_cur <= s->buf_end);
assert(buf_size >= 0);
- if (s->buf_pos == s->buf_len && buf_size > 0) {
- s->buf_pos = s->buf_len = 0;
- stream_resize_buffer(s, 0);
- // Do a direct read
- // Also, small reads will be more efficient with buffering & copying
- if (buf_size >= STREAM_BUFFER_SIZE)
+ if (s->buf_cur == s->buf_end && buf_size > 0) {
+ if (buf_size > (s->buffer_mask + 1) / 2) {
+ // Direct read if the buffer is too small anyway.
+ stream_drop_buffers(s);
return stream_read_unbuffered(s, buf, buf_size);
- if (!stream_fill_buffer(s))
- return 0;
+ }
+ stream_read_more(s, 1);
}
- int len = MPMIN(buf_size, s->buf_len - s->buf_pos);
- memcpy(buf, &s->buffer[s->buf_pos], len);
- s->buf_pos += len;
- if (len > 0)
- s->eof = 0;
- return len;
+ int res = ring_copy(s, buf, buf_size, s->buf_cur);
+ s->buf_cur += res;
+ return res;
+}
+
+// Slow version of stream_read_char(); called by it if the buffer is empty.
+int stream_read_char_fallback(stream_t *s)
+{
+ uint8_t c;
+ return stream_read_partial(s, &c, 1) ? c : -256;
}
int stream_read(stream_t *s, char *mem, int total)
@@ -476,33 +578,18 @@ int stream_read(stream_t *s, char *mem, int total)
len -= read;
}
total -= len;
- if (total > 0)
- s->eof = 0;
return total;
}
-// Read ahead at most len bytes without changing the read position. Return a
-// pointer to the internal buffer, starting from the current read position.
-// Reading ahead may require memory allocation. If allocation fails, read ahead
-// is silently limited to the last successful allocation.
-// The returned buffer becomes invalid on the next stream call, and you must
-// not write to it.
-struct bstr stream_peek(stream_t *s, int len)
-{
- assert(len >= 0);
-
- int avail = stream_extend_buffer(s, len, false);
- return (bstr){.start = &s->buffer[s->buf_pos], .len = MPMIN(len, avail)};
-}
-
-// Peek the current buffer. This will return at least 1 byte, unless EOF was
-// reached. If data is returned, the length is essentially random.
-struct bstr stream_peek_buffer(stream_t *s)
+// Like stream_read(), but do not advance the current position. This may resize
+// the buffer to satisfy the read request.
+int stream_read_peek(stream_t *s, void* buf, int buf_size)
{
- if (s->buf_len - s->buf_pos < 1)
- stream_fill_buffer(s);
- return (bstr){.start = &s->buffer[s->buf_pos],
- .len = s->buf_len - s->buf_pos};
+ while (s->buf_end - s->buf_cur < buf_size) {
+ if (!stream_read_more(s, buf_size))
+ break;
+ }
+ return ring_copy(s, buf, buf_size, s->buf_cur);
}
int stream_write_buffer(stream_t *s, unsigned char *buf, int len)
@@ -527,14 +614,14 @@ int stream_write_buffer(stream_t *s, unsigned char *buf, int len)
static bool stream_skip_read(struct stream *s, int64_t len)
{
while (len > 0) {
- unsigned int left = s->buf_len - s->buf_pos;
+ unsigned int left = s->buf_end - s->buf_cur;
if (!left) {
- if (!stream_fill_buffer(s))
+ if (!stream_read_more(s, 1))
return false;
continue;
}
unsigned skip = MPMIN(len, left);
- s->buf_pos += skip;
+ s->buf_cur += skip;
len -= skip;
}
return true;
@@ -546,7 +633,7 @@ static bool stream_skip_read(struct stream *s, int64_t len)
void stream_drop_buffers(stream_t *s)
{
s->pos = stream_tell(s);
- s->buf_pos = s->buf_len = 0;
+ s->buf_start = s->buf_cur = s->buf_end = 0;
s->eof = 0;
stream_resize_buffer(s, 0);
}
@@ -555,6 +642,9 @@ void stream_drop_buffers(stream_t *s)
static bool stream_seek_unbuffered(stream_t *s, int64_t newpos)
{
if (newpos != s->pos) {
+ MP_VERBOSE(s, "stream level seek from %" PRId64 " to %" PRId64 "\n",
+ s->pos, newpos);
+
if (newpos > s->pos && !s->seekable) {
MP_ERR(s, "Cannot seek forward in this stream\n");
return false;
@@ -577,7 +667,8 @@ static bool stream_seek_unbuffered(stream_t *s, int64_t newpos)
bool stream_seek(stream_t *s, int64_t pos)
{
- MP_TRACE(s, "seek to %lld\n", (long long)pos);
+ MP_TRACE(s, "seek request from %" PRId64 " to %" PRId64 "\n",
+ stream_tell(s), pos);
s->eof = 0; // eof should be set only on read; seeking always clears it
@@ -586,14 +677,12 @@ bool stream_seek(stream_t *s, int64_t pos)
pos = 0;
}
- if (pos == stream_tell(s))
- return true;
-
- if (pos < s->pos) {
- int64_t x = pos - (s->pos - (int)s->buf_len);
- if (x >= 0) {
- s->buf_pos = x;
- assert(s->buf_pos <= s->buf_len);
+ if (pos <= s->pos) {
+ int64_t x = pos - (s->pos - (int)s->buf_end);
+ if (x >= (int)s->buf_start) {
+ s->buf_cur = x;
+ assert(s->buf_cur >= s->buf_start);
+ assert(s->buf_cur <= s->buf_end);
return true;
}
}
@@ -601,35 +690,24 @@ bool stream_seek(stream_t *s, int64_t pos)
if (s->mode == STREAM_WRITE)
return s->seekable && s->seek(s, pos);
- int64_t newpos = pos;
-
- MP_TRACE(s, "Seek from %" PRId64 " to %" PRId64
- " (with offset %d)\n", s->pos, pos, (int)(pos - newpos));
-
- if (pos >= s->pos && !s->seekable && s->fast_skip) {
- // skipping is handled by generic code below
- } else if (!stream_seek_unbuffered(s, newpos)) {
- return false;
+ // Skip data instead of performing a seek in some cases.
+ if (pos >= s->pos &&
+ ((!s->seekable && s->fast_skip) ||
+ pos - s->pos <= s->requested_buffer_size))
+ {
+ return stream_skip_read(s, pos - stream_tell(s));
}
- return stream_skip_read(s, pos - stream_tell(s));
+ return stream_seek_unbuffered(s, pos);
}
bool stream_skip(stream_t *s, int64_t len)
{
- int64_t target = stream_tell(s) + len;
- if (len < 0)
- return stream_seek(s, target);
- if (len > 2 * STREAM_BUFFER_SIZE && s->seekable) {
- // Seek to 1 byte before target - this is the only way to distinguish
- // skip-to-EOF and skip-past-EOF in general. Successful seeking means
- // absolutely nothing, so test by doing a real read of the last byte.
- if (!stream_seek(s, target - 1))
- return false;
- stream_read_char(s);
- return !stream_eof(s) && stream_tell(s) == target;
- }
- return stream_skip_read(s, len);
+ if (!stream_seek(s, stream_tell(s) + len))
+ return false;
+ // Make sure s->eof is set correctly, even if a seek happened.
+ stream_read_more(s, 1);
+ return true;
}
int stream_control(stream_t *s, int cmd, void *arg)
@@ -661,7 +739,9 @@ static const char *const bom[3] = {"\xEF\xBB\xBF", "\xFF\xFE", "\xFE\xFF"};
// Return utf16 argument for stream_read_line
int stream_skip_bom(struct stream *s)
{
- bstr data = stream_peek(s, 4);
+ char buf[4];
+ int len = stream_read_peek(s, buf, sizeof(buf));
+ bstr data = {buf, len};
for (int n = 0; n < 3; n++) {
if (bstr_startswith0(data, bom[n])) {
stream_skip(s, strlen(bom[n]));
diff --git a/stream/stream.h b/stream/stream.h
index 66f151962a..7437e1c86b 100644
--- a/stream/stream.h
+++ b/stream/stream.h
@@ -28,7 +28,11 @@
#include "misc/bstr.h"
+// Minimum guaranteed buffer and seek-back size. For any reads <= of this size,
+// it's guaranteed that you can seek back by <= of this size again.
#define STREAM_BUFFER_SIZE 2048
+// (Half of this is typically reserved for seeking back.)
+#define STREAM_FIXED_BUFFER_SIZE (STREAM_BUFFER_SIZE * 2)
// stream->mode
#define STREAM_READ 0
@@ -119,7 +123,6 @@ typedef struct stream {
void (*close)(struct stream *s);
int read_chunk; // maximum amount of data to read at once to limit latency
- unsigned int buf_pos, buf_len;
int64_t pos;
int eof;
int mode; //STREAM_READ or STREAM_WRITE
@@ -145,20 +148,46 @@ typedef struct stream {
// added to this. The user can reset this as needed.
uint64_t total_unbuffered_read_bytes;
+ // Buffer size requested by user; s->buffer may have a different size
+ int requested_buffer_size;
+
+ // This is a ring buffer. It is reset only on seeks (or when buffers are
+ // dropped). Otherwise old contents always stay valid.
+ // The valid buffer is from buf_start to buf_end; buf_end can be larger
+ // then the buffer size (requires wrap around). buf_cur is a value in the
+ // range [buf_start, buf_end].
+ // When reading more data from the stream, buf_start is advanced as old
+ // data is overwritten with new data.
+ // Example:
+ // 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
+ // +===========================+---------------------------+
+ // + 05 06 07 08 | 01 02 03 04 + 05 06 07 08 | 01 02 03 04 +
+ // +===========================+---------------------------+
+ // ^ buf_start (4) | |
+ // | ^ buf_end (12 % 8 => 4)
+ // ^ buf_cur (9 % 8 => 1)
+ // Here, the entire 8 byte buffer is filled, i.e. buf_end - buf_start = 8.
+ // buffer_mask == 7, so (x & buffer_mask) == (x % buffer_size)
+ unsigned int buf_start; // index of oldest byte in buffer (is <= buffer_mask)
+ unsigned int buf_cur; // current read pos (can be > buffer_mask)
+ unsigned int buf_end; // end position (can be > buffer_mask)
+
+ unsigned int buffer_mask; // buffer_size-1, where buffer_size == 2**n
uint8_t *buffer;
- int buffer_alloc;
- uint8_t buffer_inline[STREAM_BUFFER_SIZE];
+