summaryrefslogtreecommitdiffstats
path: root/stream/stream.c
diff options
context:
space:
mode:
Diffstat (limited to 'stream/stream.c')
-rw-r--r--stream/stream.c292
1 files changed, 187 insertions, 105 deletions
diff --git a/stream/stream.c b/stream/stream.c
index 9de53aba52..3281e50fe6 100644
--- a/stream/stream.c
+++ b/stream/stream.c
@@ -52,10 +52,10 @@
#include "core/m_option.h"
#include "core/m_struct.h"
-#include "cache2.h"
+// Includes additional padding in case sizes get rounded up by sector size.
+#define TOTAL_BUFFER_SIZE (STREAM_MAX_BUFFER_SIZE + STREAM_MAX_SECTOR_SIZE)
/// We keep these 2 for the gui atm, but they will be removed.
-int vcd_track = 0;
char *cdrom_device = NULL;
char *dvd_device = NULL;
int dvd_title = 0;
@@ -137,7 +137,8 @@ static const stream_info_t *const auto_open_streams[] = {
NULL
};
-static stream_t *new_stream(int fd, int type);
+static stream_t *new_stream(void);
+static int stream_seek_unbuffered(stream_t *s, int64_t newpos);
static stream_t *open_stream_plugin(const stream_info_t *sinfo,
const char *filename,
@@ -166,7 +167,7 @@ static stream_t *open_stream_plugin(const stream_info_t *sinfo,
}
}
}
- s = new_stream(-2, -2);
+ s = new_stream();
s->opts = options;
s->url = strdup(filename);
s->flags |= mode;
@@ -187,6 +188,9 @@ static stream_t *open_stream_plugin(const stream_info_t *sinfo,
return NULL;
}
+ if (!s->read_chunk)
+ s->read_chunk = 4 * (s->sector_size ? s->sector_size : STREAM_BUFFER_SIZE);
+
if (s->streaming && !s->cache_size) {
// Set default cache size to use if user does not specify it.
s->cache_size = 320;
@@ -194,13 +198,15 @@ static stream_t *open_stream_plugin(const stream_info_t *sinfo,
if (s->type <= -2)
mp_msg(MSGT_OPEN, MSGL_WARN, "Warning streams need a type !!!!\n");
- if (s->flags & MP_STREAM_SEEK && !s->seek)
+ if (!s->seek)
s->flags &= ~MP_STREAM_SEEK;
if (s->seek && !(s->flags & MP_STREAM_SEEK))
s->flags |= MP_STREAM_SEEK;
s->mode = mode;
+ s->uncached_type = s->type;
+
mp_msg(MSGT_OPEN, MSGL_V, "STREAM: [%s] %s\n", sinfo->name, filename);
mp_msg(MSGT_OPEN, MSGL_V, "STREAM: Description: %s\n", sinfo->info);
mp_msg(MSGT_OPEN, MSGL_V, "STREAM: Author: %s\n", sinfo->author);
@@ -283,8 +289,6 @@ stream_t *open_output_stream(const char *filename, struct MPOpts *options)
return open_stream_full(filename, STREAM_WRITE, options, NULL);
}
-//=================== STREAMER =========================
-
static int stream_reconnect(stream_t *s)
{
#define MAX_RECONNECT_RETRIES 5
@@ -294,13 +298,14 @@ static int stream_reconnect(stream_t *s)
int64_t pos = s->pos;
for (int retry = 0; retry < MAX_RECONNECT_RETRIES; retry++) {
mp_msg(MSGT_STREAM, MSGL_WARN,
- "Connection lost! Attempting to reconnect...\n");
+ "Connection lost! Attempting to reconnect (%d)...\n", retry + 1);
- if (retry)
- mp_sleep_us(RECONNECT_SLEEP_MS * 1000);
+ if (stream_check_interrupt(retry ? RECONNECT_SLEEP_MS : 0))
+ return 0;
s->eof = 1;
- stream_reset(s);
+ s->pos = 0;
+ s->buf_pos = s->buf_len = 0;
// Some streams (internal http.c) don't support STREAM_CTRL_RECONNECT,
// but do it when trying to seek.
@@ -309,7 +314,7 @@ static int stream_reconnect(stream_t *s)
continue;
}
- if (stream_seek_internal(s, pos) < 0 && s->pos == pos)
+ if (stream_seek_unbuffered(s, pos) < 0 && s->pos == pos)
return 1;
}
return 0;
@@ -335,10 +340,10 @@ void stream_set_capture_file(stream_t *s, const char *filename)
}
}
-void stream_capture_write(stream_t *s)
+static void stream_capture_write(stream_t *s, void *buf, size_t len)
{
- if (s->capture_file) {
- if (fwrite(s->buffer, s->buf_len, 1, s->capture_file) < 1) {
+ if (s->capture_file && len > 0) {
+ if (fwrite(buf, len, 1, s->capture_file) < 1) {
mp_tmsg(MSGT_GLOBAL, MSGL_ERR, "Error writing capture file: %s\n",
strerror(errno));
stream_set_capture_file(s, NULL);
@@ -346,9 +351,14 @@ void stream_capture_write(stream_t *s)
}
}
-int stream_read_internal(stream_t *s, void *buf, int len)
+// Read function bypassing the local stream buffer. This will not write into
+// s->buffer, but into buf[0..len] instead.
+// Returns < 0 on error, 0 on EOF, and length of bytes read on success.
+// Partial reads are possible, even if EOF is not reached.
+static int stream_read_unbuffered(stream_t *s, void *buf, int len)
{
int orig_len = len;
+ s->buf_pos = s->buf_len = 0;
// we will retry even if we already reached EOF previously.
switch (s->type) {
case STREAMTYPE_STREAM:
@@ -380,28 +390,64 @@ int stream_read_internal(stream_t *s, void *buf, int len)
goto eof_out;
// make sure EOF is set to ensure no endless loops
s->eof = 1;
- return stream_read_internal(s, buf, orig_len);
+ return stream_read_unbuffered(s, buf, orig_len);
eof_out:
s->eof = 1;
return 0;
}
// When reading succeeded we are obviously not at eof.
- // This e.g. avoids issues with eof getting stuck when lavf seeks in MPEG-TS
s->eof = 0;
s->pos += len;
+ stream_capture_write(s, buf, len);
return len;
}
+// This works like stdio's ungetc(), but for more than one byte. Rewind the
+// file position by buffer_size, and make all future reads/buffer fills read
+// from the given buffer, until the buffer is exhausted or a seek outside of
+// the buffer happens.
+// You can unread at most STREAM_MAX_BUFFER_SIZE bytes.
+void stream_unread_buffer(stream_t *s, void *buffer, size_t buffer_size)
+{
+ assert(stream_tell(s) >= buffer_size); // can't unread to before file start
+ assert(buffer_size <= STREAM_MAX_BUFFER_SIZE);
+ // Need to include the remaining buffer to ensure no data is lost.
+ int remainder = s->buf_len - s->buf_pos;
+ // Successive buffer unreading might trigger this.
+ assert(buffer_size + remainder <= TOTAL_BUFFER_SIZE);
+ memmove(&s->buffer[buffer_size], &s->buffer[s->buf_pos], remainder);
+ memcpy(s->buffer, buffer, buffer_size);
+ s->buf_pos = 0;
+ s->buf_len = buffer_size + remainder;
+}
+
int stream_fill_buffer(stream_t *s)
{
- int len = stream_read_internal(s, s->buffer, STREAM_BUFFER_SIZE);
- if (len <= 0)
- return 0;
+ int len = stream_read_unbuffered(s, s->buffer, STREAM_BUFFER_SIZE);
s->buf_pos = 0;
- s->buf_len = len;
-// printf("[%d]",len);fflush(stdout);
- stream_capture_write(s);
+ s->buf_len = len < 0 ? 0 : len;
+ return s->buf_len;
+}
+
+// Read between 1..buf_size bytes of data, return how much data has been read.
+// Return <= 0 on EOF, error, of if buf_size was 0.
+int stream_read_partial(stream_t *s, char *buf, int buf_size)
+{
+ assert(s->buf_pos <= s->buf_len);
+ assert(buf_size >= 0);
+ if (s->buf_pos == s->buf_len && buf_size > 0) {
+ s->buf_pos = s->buf_len = 0;
+ // Do a direct read, but only if there's no sector alignment requirement
+ // Also, small reads will be more efficient with buffering & copying
+ if (!s->sector_size && buf_size >= STREAM_BUFFER_SIZE)
+ return stream_read_unbuffered(s, buf, buf_size);
+ if (!stream_fill_buffer(s))
+ return 0;
+ }
+ int len = FFMIN(buf_size, s->buf_len - s->buf_pos);
+ memcpy(buf, &s->buffer[s->buf_pos], len);
+ s->buf_pos += len;
return len;
}
@@ -409,24 +455,13 @@ int stream_read(stream_t *s, char *mem, int total)
{
int len = total;
while (len > 0) {
- int x;
- x = s->buf_len - s->buf_pos;
- if (x == 0) {
- if (!cache_stream_fill_buffer(s))
- return total - len; // EOF
- x = s->buf_len - s->buf_pos;
- }
- if (s->buf_pos > s->buf_len)
- mp_msg(MSGT_DEMUX, MSGL_WARN,
- "stream_read: WARNING! s->buf_pos>s->buf_len\n");
- if (x > len)
- x = len;
- memcpy(mem, &s->buffer[s->buf_pos], x);
- s->buf_pos += x;
- mem += x;
- len -= x;
+ int read = stream_read_partial(s, mem, len);
+ if (read <= 0)
+ break; // EOF
+ mem += read;
+ len -= read;
}
- return total;
+ return total - len;
}
int stream_write_buffer(stream_t *s, unsigned char *buf, int len)
@@ -442,17 +477,17 @@ int stream_write_buffer(stream_t *s, unsigned char *buf, int len)
return rd;
}
-int stream_seek_internal(stream_t *s, int64_t newpos)
+// Seek function bypassing the local stream buffer.
+static int stream_seek_unbuffered(stream_t *s, int64_t newpos)
{
if (newpos == 0 || newpos != s->pos) {
switch (s->type) {
case STREAMTYPE_STREAM:
- //s->pos=newpos; // real seek
// Some streaming protocol allow to seek backward and forward
// A function call that return -1 can tell that the protocol
// doesn't support seeking.
#ifdef CONFIG_NETWORKING
- if (s->seek) { // new stream seek is much cleaner than streaming_ctrl one
+ if (s->seek) {
if (!s->seek(s, newpos)) {
mp_tmsg(MSGT_STREAM, MSGL_ERR, "Seek failed\n");
return 0;
@@ -486,21 +521,18 @@ int stream_seek_internal(stream_t *s, int64_t newpos)
return 0;
}
}
-// putchar('.');fflush(stdout);
-//} else {
-// putchar('%');fflush(stdout);
}
+ s->eof = 0; // EOF reset when seek succeeds.
return -1;
}
-int stream_seek_long(stream_t *s, int64_t pos)
+// Unlike stream_seek, does not try to seek within local buffer.
+// Unlike stream_seek_unbuffered(), it still fills the local buffer.
+static int stream_seek_long(stream_t *s, int64_t pos)
{
- int res;
- int64_t newpos = 0;
-
-// if( mp_msg_test(MSGT_STREAM,MSGL_DBG3) ) printf("seek_long to 0x%X\n",(unsigned int)pos);
-
+ int64_t oldpos = s->pos;
s->buf_pos = s->buf_len = 0;
+ s->eof = 0;
if (s->mode == STREAM_WRITE) {
if (!s->seek || !s->seek(s, pos))
@@ -508,41 +540,38 @@ int stream_seek_long(stream_t *s, int64_t pos)
return 1;
}
+ int64_t newpos = pos;
if (s->sector_size)
newpos = (pos / s->sector_size) * s->sector_size;
- else
- newpos = pos & (~((int64_t)STREAM_BUFFER_SIZE - 1));
-
- if (mp_msg_test(MSGT_STREAM, MSGL_DBG3)) {
- mp_msg(
- MSGT_STREAM, MSGL_DBG3,
- "s->pos=%" PRIX64 " newpos=%" PRIX64 " new_bufpos=%" PRIX64
- " buflen=%X \n",
- (int64_t)s->pos, (int64_t)newpos, (int64_t)pos, s->buf_len);
- }
+
+ mp_msg(MSGT_STREAM, MSGL_DBG3, "s->pos=%" PRIX64 " newpos=%" PRIX64
+ " new_bufpos=%" PRIX64 " buflen=%X \n",
+ (int64_t)s->pos, (int64_t)newpos, (int64_t)pos, s->buf_len);
+
pos -= newpos;
- res = stream_seek_internal(s, newpos);
- if (res >= 0)
- return res;
+ if (stream_seek_unbuffered(s, newpos) >= 0) {
+ s->pos = oldpos;
+ return 0;
+ }
while (s->pos < newpos) {
if (stream_fill_buffer(s) <= 0)
- break; // EOF
+ break; // EOF
}
- s->eof = 0; // EOF reset when seek succeeds.
while (stream_fill_buffer(s) > 0) {
if (pos <= s->buf_len) {
s->buf_pos = pos; // byte position in sector
+ s->eof = 0;
return 1;
}
pos -= s->buf_len;
}
-// Fill failed, but seek still is a success.
- s->pos += pos;
+ // Fill failed, but seek still is a success (partially).
s->buf_pos = 0;
s->buf_len = 0;
+ s->eof = 0; // eof should be set only on read
mp_msg(MSGT_STREAM, MSGL_V,
"stream_seek: Seek to/past EOF: no buffer preloaded.\n");
@@ -555,62 +584,55 @@ int stream_seek(stream_t *s, int64_t pos)
mp_dbg(MSGT_DEMUX, MSGL_DBG3, "seek to 0x%llX\n", (long long)pos);
if (pos < 0) {
- mp_msg(MSGT_DEMUX, MSGL_ERR,
- "Invalid seek to negative position %llx!\n",
+ mp_msg(MSGT_DEMUX, MSGL_ERR, "Invalid seek to negative position %llx!\n",
(long long)pos);
pos = 0;
}
if (pos < s->pos) {
- int64_t x = pos - (s->pos - s->buf_len);
+ int64_t x = pos - (s->pos - (int)s->buf_len);
if (x >= 0) {
s->buf_pos = x;
s->eof = 0;
-// putchar('*');fflush(stdout);
return 1;
}
}
- return cache_stream_seek_long(s, pos);
+ return stream_seek_long(s, pos);
}
int stream_skip(stream_t *s, int64_t len)
{
- if (len < 0 ||
- (len > 2 * STREAM_BUFFER_SIZE && (s->flags & MP_STREAM_SEEK_FW))) {
- // negative or big skip!
- return stream_seek(s, stream_tell(s) + len);
+ int64_t target = stream_tell(s) + len;
+ if (len < 0)
+ return stream_seek(s, target);
+ if (len > 2 * STREAM_BUFFER_SIZE && (s->flags & MP_STREAM_SEEK_FW)) {
+ // Seek to 1 byte before target - this is the only way to distinguish
+ // skip-to-EOF and skip-past-EOF in general. Successful seeking means
+ // absolutely nothing, so test by doing a real read of the last byte.
+ int r = stream_seek(s, target - 1);
+ if (r) {
+ stream_read_char(s);
+ return !stream_eof(s) && stream_tell(s) == target;
+ }
+ return r;
}
while (len > 0) {
int x = s->buf_len - s->buf_pos;
if (x == 0) {
- if (!cache_stream_fill_buffer(s))
- return 0; // EOF
+ if (!stream_fill_buffer(s))
+ return 0; // EOF
x = s->buf_len - s->buf_pos;
}
if (x > len)
x = len;
- //memcpy(mem,&s->buf[s->buf_pos],x);
s->buf_pos += x;
len -= x;
}
return 1;
}
-void stream_reset(stream_t *s)
-{
- if (s->eof) {
- s->pos = 0;
- s->buf_pos = s->buf_len = 0;
- s->eof = 0;
- }
-}
-
int stream_control(stream_t *s, int cmd, void *arg)
{
-#ifdef CONFIG_STREAM_CACHE
- if (s->cache_pid)
- return cache_do_control(s, cmd, arg);
-#endif
if (!s->control)
return STREAM_UNSUPPORTED;
return s->control(s, cmd, arg);
@@ -625,9 +647,10 @@ void stream_update_size(stream_t *s)
}
}
-static stream_t *new_stream(int fd, int type)
+static stream_t *new_stream(void)
{
- stream_t *s = talloc_zero(NULL, stream_t);
+ stream_t *s = talloc_size(NULL, sizeof(stream_t) + TOTAL_BUFFER_SIZE);
+ memset(s, 0, sizeof(stream_t));
#if HAVE_WINSOCK2_H
{
@@ -637,18 +660,13 @@ static stream_t *new_stream(int fd, int type)
}
#endif
- s->fd = fd;
- s->type = type;
- stream_reset(s);
+ s->fd = -2;
+ s->type = -2;
return s;
}
void free_stream(stream_t *s)
{
-// printf("\n*** free_stream() called ***\n");
-#ifdef CONFIG_STREAM_CACHE
- cache_uninit(s);
-#endif
stream_set_capture_file(s, NULL);
if (s->close)
@@ -667,6 +685,8 @@ void free_stream(stream_t *s)
WSACleanup(); // there might be a better place for this (-> later)
#endif
free(s->url);
+ if (s->uncached_stream)
+ free_stream(s->uncached_stream);
talloc_free(s);
}
@@ -686,6 +706,68 @@ int stream_check_interrupt(int time)
return stream_check_interrupt_cb(stream_check_interrupt_ctx, time);
}
+int stream_enable_cache_percent(stream_t **stream, int64_t stream_cache_size,
+ float stream_cache_min_percent,
+ float stream_cache_seek_min_percent)
+{
+
+ if (stream_cache_size == -1)
+ stream_cache_size = (*stream)->cache_size;
+
+ stream_cache_size = stream_cache_size * 1024; // input is in KiB
+
+ return stream_enable_cache(stream, stream_cache_size,
+ stream_cache_size *
+ (stream_cache_min_percent / 100.0),
+ stream_cache_size *
+ (stream_cache_seek_min_percent / 100.0));
+}
+
+/**
+ * \return 1 on success, 0 if the function was interrupted and -1 on error, or
+ * if the cache is disabled
+ */
+int stream_enable_cache(stream_t **stream, int64_t size, int64_t min,
+ int64_t seek_limit)
+{
+ stream_t *orig = *stream;
+
+ if (orig->mode != STREAM_READ)
+ return 1;
+
+ // Can't handle a loaded buffer.
+ orig->buf_len = orig->buf_pos = 0;
+
+ stream_t *cache = new_stream();
+ cache->type = STREAMTYPE_CACHE;
+ cache->uncached_type = orig->type;
+ cache->uncached_stream = orig;
+ cache->flags |= MP_STREAM_SEEK;
+ cache->mode = STREAM_READ;
+ cache->read_chunk = 4 * STREAM_BUFFER_SIZE;
+
+ cache->url = strdup(orig->url);
+ cache->mime_type = talloc_strdup(cache, orig->mime_type);
+ cache->lavf_type = orig->lavf_type;
+ cache->opts = orig->opts;
+ cache->start_pos = orig->start_pos;
+ cache->end_pos = orig->end_pos;
+
+ int res = -1;
+
+#ifdef CONFIG_STREAM_CACHE
+ res = stream_cache_init(cache, orig, size, min, seek_limit);
+#endif
+
+ if (res <= 0) {
+ cache->uncached_stream = NULL; // don't free original stream
+ free_stream(cache);
+ } else {
+ *stream = cache;
+ }
+ return res;
+}
+
/**
* Helper function to read 16 bits little-endian and advance pointer
*/
@@ -794,7 +876,7 @@ unsigned char *stream_read_line(stream_t *s, unsigned char *mem, int max,
len = s->buf_len - s->buf_pos;
// try to fill the buffer
if (len <= 0 &&
- (!cache_stream_fill_buffer(s) ||
+ (!stream_fill_buffer(s) ||
(len = s->buf_len - s->buf_pos) <= 0))
break;
end = find_newline(s->buffer + s->buf_pos, len, utf16);