summaryrefslogtreecommitdiffstats
path: root/stream/stream.c
diff options
context:
space:
mode:
authorwm4 <wm4@nowhere>2013-05-24 18:49:09 +0200
committerwm4 <wm4@nowhere>2013-06-16 22:05:09 +0200
commit7c4202b863752e5903c76231420d9b391a0961e1 (patch)
tree4d3764b375bed6c60685b3bbd9a1fdeee5d40dcb /stream/stream.c
parent27b633671f01a0f97518459717d273f45358bfb0 (diff)
downloadmpv-7c4202b863752e5903c76231420d9b391a0961e1.tar.bz2
mpv-7c4202b863752e5903c76231420d9b391a0961e1.tar.xz
cache: make the stream cache a proper stream that wraps other streams
Before this commit, the cache was franken-hacked on top of the stream API. You had to use special functions (like cache_stream_fill_buffer() instead of stream_fill_buffer()), which would access the stream in a cached manner. The whole idea about the previous design was that the cache runs in a thread or in a forked process, while the cache awa functions made sure the stream instance looked consistent to the user. If you used the normal functions instead of the special ones while the cache was running, you were out of luck. Make it a bit more reasonable by turning the cache into a stream on its own. This makes it behave exactly like a normal stream. The stream callbacks call into the original (uncached) stream to do work. No special cache functions or redirections are needed. The only different thing about cache streams is that they are created by special functions, instead of being part of the auto_open_streams[] array. To make things simpler, remove the threading implementation, which was messed into the code. The threading code could perhaps be kept, but I don't really want to have to worry about this special case. A proper threaded implementation will be added later. Remove the cache enabling code from stream_radio.c. Since enabling the cache involves replacing the old stream with a new one, the code as-is can't be kept. It would be easily possible to enable the cache by requesting a cache size (which is also much simpler). But nobody uses stream_radio.c and I can't even test this thing, and the cache is probably not really important for it either.
Diffstat (limited to 'stream/stream.c')
-rw-r--r--stream/stream.c113
1 files changed, 86 insertions, 27 deletions
diff --git a/stream/stream.c b/stream/stream.c
index 4f12e57856..3bfa418c90 100644
--- a/stream/stream.c
+++ b/stream/stream.c
@@ -52,8 +52,6 @@
#include "core/m_option.h"
#include "core/m_struct.h"
-#include "cache2.h"
-
/// We keep these 2 for the gui atm, but they will be removed.
char *cdrom_device = NULL;
char *dvd_device = NULL;
@@ -136,7 +134,7 @@ static const stream_info_t *const auto_open_streams[] = {
NULL
};
-static stream_t *new_stream(int fd, int type);
+static stream_t *new_stream(void);
static stream_t *open_stream_plugin(const stream_info_t *sinfo,
const char *filename,
@@ -165,7 +163,7 @@ static stream_t *open_stream_plugin(const stream_info_t *sinfo,
}
}
}
- s = new_stream(-2, -2);
+ s = new_stream();
s->opts = options;
s->url = strdup(filename);
s->flags |= mode;
@@ -200,6 +198,8 @@ static stream_t *open_stream_plugin(const stream_info_t *sinfo,
s->mode = mode;
+ s->uncached_type = s->type;
+
mp_msg(MSGT_OPEN, MSGL_V, "STREAM: [%s] %s\n", sinfo->name, filename);
mp_msg(MSGT_OPEN, MSGL_V, "STREAM: Description: %s\n", sinfo->info);
mp_msg(MSGT_OPEN, MSGL_V, "STREAM: Author: %s\n", sinfo->author);
@@ -308,7 +308,7 @@ static int stream_reconnect(stream_t *s)
continue;
}
- if (stream_seek_internal(s, pos) < 0 && s->pos == pos)
+ if (stream_seek_unbuffered(s, pos) < 0 && s->pos == pos)
return 1;
}
return 0;
@@ -345,7 +345,11 @@ void stream_capture_write(stream_t *s)
}
}
-int stream_read_internal(stream_t *s, void *buf, int len)
+// Read function bypassing the local stream buffer. This will not write into
+// s->buffer, but into buf[0..len] instead.
+// Returns < 0 on error, 0 on EOF, and length of bytes read on success.
+// Partial reads are possible, even if EOF is not reached.
+int stream_read_unbuffered(stream_t *s, void *buf, int len)
{
int orig_len = len;
// we will retry even if we already reached EOF previously.
@@ -379,7 +383,7 @@ int stream_read_internal(stream_t *s, void *buf, int len)
goto eof_out;
// make sure EOF is set to ensure no endless loops
s->eof = 1;
- return stream_read_internal(s, buf, orig_len);
+ return stream_read_unbuffered(s, buf, orig_len);
eof_out:
s->eof = 1;
@@ -394,7 +398,7 @@ eof_out:
int stream_fill_buffer(stream_t *s)
{
- int len = stream_read_internal(s, s->buffer, STREAM_BUFFER_SIZE);
+ int len = stream_read_unbuffered(s, s->buffer, STREAM_BUFFER_SIZE);
if (len <= 0)
return 0;
s->buf_pos = 0;
@@ -411,7 +415,7 @@ int stream_read(stream_t *s, char *mem, int total)
int x;
x = s->buf_len - s->buf_pos;
if (x == 0) {
- if (!cache_stream_fill_buffer(s))
+ if (!stream_fill_buffer(s))
return total - len; // EOF
x = s->buf_len - s->buf_pos;
}
@@ -441,7 +445,8 @@ int stream_write_buffer(stream_t *s, unsigned char *buf, int len)
return rd;
}
-int stream_seek_internal(stream_t *s, int64_t newpos)
+// Seek function bypassing the local stream buffer.
+int stream_seek_unbuffered(stream_t *s, int64_t newpos)
{
if (newpos == 0 || newpos != s->pos) {
switch (s->type) {
@@ -492,7 +497,9 @@ int stream_seek_internal(stream_t *s, int64_t newpos)
return -1;
}
-int stream_seek_long(stream_t *s, int64_t pos)
+// Unlike stream_seek, does not try to seek within local buffer.
+// Unlike stream_seek_unbuffered(), it still fills the local buffer.
+static int stream_seek_long(stream_t *s, int64_t pos)
{
int res;
int64_t newpos = 0;
@@ -517,7 +524,7 @@ int stream_seek_long(stream_t *s, int64_t pos)
(int64_t)s->pos, (int64_t)newpos, (int64_t)pos, s->buf_len);
pos -= newpos;
- res = stream_seek_internal(s, newpos);
+ res = stream_seek_unbuffered(s, newpos);
if (res >= 0)
return res;
@@ -565,7 +572,7 @@ int stream_seek(stream_t *s, int64_t pos)
}
}
- return cache_stream_seek_long(s, pos);
+ return stream_seek_long(s, pos);
}
int stream_skip(stream_t *s, int64_t len)
@@ -578,7 +585,7 @@ int stream_skip(stream_t *s, int64_t len)
while (len > 0) {
int x = s->buf_len - s->buf_pos;
if (x == 0) {
- if (!cache_stream_fill_buffer(s))
+ if (!stream_fill_buffer(s))
return 0; // EOF
x = s->buf_len - s->buf_pos;
}
@@ -602,10 +609,6 @@ void stream_reset(stream_t *s)
int stream_control(stream_t *s, int cmd, void *arg)
{
-#ifdef CONFIG_STREAM_CACHE
- if (s->cache_pid)
- return cache_do_control(s, cmd, arg);
-#endif
if (!s->control)
return STREAM_UNSUPPORTED;
return s->control(s, cmd, arg);
@@ -620,7 +623,7 @@ void stream_update_size(stream_t *s)
}
}
-static stream_t *new_stream(int fd, int type)
+static stream_t *new_stream(void)
{
stream_t *s = talloc_zero(NULL, stream_t);
@@ -632,18 +635,13 @@ static stream_t *new_stream(int fd, int type)
}
#endif
- s->fd = fd;
- s->type = type;
- stream_reset(s);
+ s->fd = -2;
+ s->type = -2;
return s;
}
void free_stream(stream_t *s)
{
-// printf("\n*** free_stream() called ***\n");
-#ifdef CONFIG_STREAM_CACHE
- cache_uninit(s);
-#endif
stream_set_capture_file(s, NULL);
if (s->close)
@@ -662,6 +660,8 @@ void free_stream(stream_t *s)
WSACleanup(); // there might be a better place for this (-> later)
#endif
free(s->url);
+ if (s->uncached_stream)
+ free_stream(s->uncached_stream);
talloc_free(s);
}
@@ -681,6 +681,65 @@ int stream_check_interrupt(int time)
return stream_check_interrupt_cb(stream_check_interrupt_ctx, time);
}
+int stream_enable_cache_percent(stream_t **stream, int64_t stream_cache_size,
+ float stream_cache_min_percent,
+ float stream_cache_seek_min_percent)
+{
+ return stream_enable_cache(stream, stream_cache_size * 1024,
+ stream_cache_size * 1024 *
+ (stream_cache_min_percent / 100.0),
+ stream_cache_size * 1024 *
+ (stream_cache_seek_min_percent / 100.0));
+}
+
+/**
+ * \return 1 on success, 0 if the function was interrupted and -1 on error, or
+ * if the cache is disabled
+ */
+int stream_enable_cache(stream_t **stream, int64_t size, int64_t min,
+ int64_t seek_limit)
+{
+ stream_t *orig = *stream;
+
+ if (orig->mode != STREAM_READ)
+ return 1;
+
+ // Can't handle a loaded buffer.
+ orig->buf_len = orig->buf_pos = 0;
+
+ stream_t *cache = new_stream();
+ cache->type = STREAMTYPE_CACHE;
+ cache->uncached_type = orig->type;
+ cache->uncached_stream = orig;
+ cache->flags |= MP_STREAM_SEEK;
+ cache->mode = STREAM_READ;
+
+ cache->read_chunk = orig->read_chunk;
+ cache->url = strdup(orig->url);
+ cache->mime_type = talloc_strdup(cache, orig->mime_type);
+ cache->lavf_type = orig->lavf_type;
+ cache->opts = orig->opts;
+ cache->sector_size = orig->sector_size;
+ cache->read_chunk = orig->read_chunk;
+ cache->cache_size = orig->cache_size;
+ cache->start_pos = orig->start_pos;
+ cache->end_pos = orig->end_pos;
+
+ int res = -1;
+
+#ifdef CONFIG_STREAM_CACHE
+ res = stream_cache_init(cache, orig, size, min, seek_limit);
+#endif
+
+ if (res <= 0) {
+ cache->uncached_stream = NULL; // don't free original stream
+ free_stream(cache);
+ } else {
+ *stream = cache;
+ }
+ return res;
+}
+
/**
* Helper function to read 16 bits little-endian and advance pointer
*/
@@ -789,7 +848,7 @@ unsigned char *stream_read_line(stream_t *s, unsigned char *mem, int max,
len = s->buf_len - s->buf_pos;
// try to fill the buffer
if (len <= 0 &&
- (!cache_stream_fill_buffer(s) ||
+ (!stream_fill_buffer(s) ||
(len = s->buf_len - s->buf_pos) <= 0))
break;
end = find_newline(s->buffer + s->buf_pos, len, utf16);