diff options
author | wm4 <wm4@nowhere> | 2013-05-24 18:49:09 +0200 |
---|---|---|
committer | wm4 <wm4@nowhere> | 2013-06-16 22:05:09 +0200 |
commit | 7c4202b863752e5903c76231420d9b391a0961e1 (patch) | |
tree | 4d3764b375bed6c60685b3bbd9a1fdeee5d40dcb /stream/cache.c | |
parent | 27b633671f01a0f97518459717d273f45358bfb0 (diff) | |
download | mpv-7c4202b863752e5903c76231420d9b391a0961e1.tar.bz2 mpv-7c4202b863752e5903c76231420d9b391a0961e1.tar.xz |
cache: make the stream cache a proper stream that wraps other streams
Before this commit, the cache was franken-hacked on top of the stream
API. You had to use special functions (like cache_stream_fill_buffer()
instead of stream_fill_buffer()), which would access the stream in a
cached manner.
The whole idea about the previous design was that the cache runs in a
thread or in a forked process, while the cache awa functions made sure
the stream instance looked consistent to the user. If you used the
normal functions instead of the special ones while the cache was
running, you were out of luck.
Make it a bit more reasonable by turning the cache into a stream on its
own. This makes it behave exactly like a normal stream. The stream
callbacks call into the original (uncached) stream to do work. No
special cache functions or redirections are needed. The only different
thing about cache streams is that they are created by special functions,
instead of being part of the auto_open_streams[] array.
To make things simpler, remove the threading implementation, which was
messed into the code. The threading code could perhaps be kept, but I
don't really want to have to worry about this special case. A proper
threaded implementation will be added later.
Remove the cache enabling code from stream_radio.c. Since enabling the
cache involves replacing the old stream with a new one, the code as-is
can't be kept. It would be easily possible to enable the cache by
requesting a cache size (which is also much simpler). But nobody uses
stream_radio.c and I can't even test this thing, and the cache is
probably not really important for it either.
Diffstat (limited to 'stream/cache.c')
-rw-r--r-- | stream/cache.c | 354 |
1 files changed, 117 insertions, 237 deletions
diff --git a/stream/cache.c b/stream/cache.c index 9038fc6cf6..1e97771075 100644 --- a/stream/cache.c +++ b/stream/cache.c @@ -34,8 +34,10 @@ #include <string.h> #include <signal.h> #include <sys/types.h> +#include <sys/wait.h> #include <unistd.h> #include <errno.h> +#include <assert.h> #include <libavutil/common.h> @@ -43,27 +45,17 @@ #include "osdep/shmem.h" #include "osdep/timer.h" -#if defined(__MINGW32__) -#include <windows.h> -static void ThreadProc(void *s); -#elif defined(PTHREAD_CACHE) -#include <pthread.h> -static void *ThreadProc(void *s); -#else -#include <sys/wait.h> -#define FORKED_CACHE 1 -#endif -#ifndef FORKED_CACHE -#define FORKED_CACHE 0 -#endif #include "core/mp_msg.h" #include "stream.h" -#include "cache2.h" #include "core/mp_common.h" +// Note: (cache_vars_t*)(cache->priv)->cache == cache typedef struct { + stream_t *cache; // wrapper stream, used by demuxer etc. + stream_t *stream; // "real" stream, used to read from the source media + unsigned int cache_pid; // constats: unsigned char *buffer; // base pointer of the allocated buffer memory int64_t buffer_size; // size of the allocated buffer memory @@ -71,9 +63,7 @@ typedef struct { int64_t back_size; // we should keep back_size amount of old bytes for backward seek int64_t fill_limit; // we should fill buffer only if space>=fill_limit int64_t seek_limit; // keep filling cache if distance is less that seek limit -#if FORKED_CACHE pid_t ppid; // parent PID to detect killed parent -#endif // filler's pointers: int eof; int64_t min_filepos; // buffer contain only a part of the file, from min-max pos @@ -85,7 +75,6 @@ typedef struct { // int seek_lock; // 1 if we will seek/reset buffer, 2 if we are ready for cmd // int fifo_flag; // 1 if we should use FIFO to notice cache about buffer reads. // callback - stream_t *stream; volatile int control; volatile uint64_t control_uint_arg; volatile double control_double_arg; @@ -98,19 +87,20 @@ typedef struct { volatile int idle; } cache_vars_t; -static void cache_wakeup(stream_t *s) +static void cache_wakeup(stream_t *stream) { -#if FORKED_CACHE + cache_vars_t *s = stream->priv; // signal process to wake up immediately kill(s->cache_pid, SIGUSR1); -#endif } +// Runs in the forked process static void cache_flush(cache_vars_t *s) { s->offset = s->min_filepos = s->max_filepos = s->read_filepos; // drop cache content :( } +// Runs in the main process static int cache_read(cache_vars_t *s, unsigned char *buf, int size) { int total = 0; @@ -179,6 +169,7 @@ static int cache_read(cache_vars_t *s, unsigned char *buf, int size) return total; } +// Runs in the forked process static int cache_fill(cache_vars_t *s) { int64_t back, back2, newb, space, len, pos; @@ -198,7 +189,7 @@ static int cache_fill(cache_vars_t *s) cache_flush(s); if (s->stream->eof) stream_reset(s->stream); - stream_seek_internal(s->stream, read); + stream_seek_unbuffered(s->stream, read); mp_msg(MSGT_CACHE, MSGL_DBG2, "Seek done. new pos: 0x%" PRIX64 " \n", (int64_t)stream_tell(s->stream)); } @@ -259,12 +250,12 @@ static int cache_fill(cache_vars_t *s) if (wraparound_copy) { int to_copy; - len = stream_read_internal(s->stream, s->stream->buffer, space); + len = stream_read_unbuffered(s->stream, s->stream->buffer, space); to_copy = FFMIN(len, s->buffer_size - pos); memcpy(s->buffer + pos, s->stream->buffer, to_copy); memcpy(s->buffer, s->stream->buffer + to_copy, len - to_copy); } else - len = stream_read_internal(s->stream, &s->buffer[pos], space); + len = stream_read_unbuffered(s->stream, &s->buffer[pos], space); s->eof = !len; s->max_filepos += len; @@ -275,6 +266,7 @@ static int cache_fill(cache_vars_t *s) } +// Runs in the forked process static int cache_execute_control(cache_vars_t *s) { double double_res; @@ -309,14 +301,12 @@ static int cache_execute_control(cache_vars_t *s) s->stream_start_time = pos; else s->stream_start_time = MP_NOPTS_VALUE; -#if FORKED_CACHE // if parent PID changed, main process was killed -> exit if (s->ppid != getppid()) { mp_msg(MSGT_CACHE, MSGL_WARN, "Parent process disappeared, exiting cache process.\n"); return 0; } -#endif last = mp_time_sec(); } if (s->control == -1) @@ -379,20 +369,12 @@ static int cache_execute_control(cache_vars_t *s) static void *shared_alloc(int64_t size) { -#if FORKED_CACHE return shmem_alloc(size); -#else - return malloc(size); -#endif } static void shared_free(void *ptr, int64_t size) { -#if FORKED_CACHE shmem_free(ptr, size); -#else - free(ptr); -#endif } static cache_vars_t *cache_init(int64_t size, int sector) @@ -418,34 +400,27 @@ static cache_vars_t *cache_init(int64_t size, int sector) s->fill_limit = 8 * sector; s->back_size = s->buffer_size / 2; -#if FORKED_CACHE s->ppid = getpid(); -#endif return s; } -void cache_uninit(stream_t *s) +static void cache_uninit(stream_t *s) { - cache_vars_t *c = s->cache_data; - if (s->cache_pid) { -#if !FORKED_CACHE - cache_do_control(s, -2, NULL); -#else - kill(s->cache_pid, SIGKILL); - waitpid(s->cache_pid, NULL, 0); -#endif - s->cache_pid = 0; + cache_vars_t *c = s->priv; + if (c->cache_pid) { + kill(c->cache_pid, SIGKILL); + waitpid(c->cache_pid, NULL, 0); + c->cache_pid = 0; } if (!c) return; shared_free(c->buffer, c->buffer_size); c->buffer = NULL; c->stream = NULL; - shared_free(s->cache_data, sizeof(cache_vars_t)); - s->cache_data = NULL; + shared_free(c, sizeof(cache_vars_t)); + s->priv = NULL; } -#if FORKED_CACHE static void exit_sighandler(int x) { // close stream @@ -455,7 +430,6 @@ static void exit_sighandler(int x) static void dummy_sighandler(int x) { } -#endif /** * Main loop of the cache process or thread. @@ -463,31 +437,25 @@ static void dummy_sighandler(int x) static void cache_mainloop(cache_vars_t *s) { int sleep_count = 0; -#if FORKED_CACHE struct sigaction sa = { .sa_handler = SIG_IGN }; sigaction(SIGUSR1, &sa, NULL); -#endif do { if (!cache_fill(s)) { s->idle = 1; -#if FORKED_CACHE // Let signal wake us up, we cannot leave this // enabled since we do not handle EINTR in most places. // This might need extra code to work on BSD. sa.sa_handler = dummy_sighandler; sigaction(SIGUSR1, &sa, NULL); -#endif if (sleep_count < INITIAL_FILL_USLEEP_COUNT) { sleep_count++; mp_sleep_us(INITIAL_FILL_USLEEP_TIME); } else mp_sleep_us(FILL_USLEEP_TIME); // idle -#if FORKED_CACHE sa.sa_handler = SIG_IGN; sigaction(SIGUSR1, &sa, NULL); -#endif } else { sleep_count = 0; s->idle = 0; @@ -495,180 +463,24 @@ static void cache_mainloop(cache_vars_t *s) } while (cache_execute_control(s)); } -int stream_enable_cache_percent(stream_t *stream, int64_t stream_cache_size, - float stream_cache_min_percent, - float stream_cache_seek_min_percent) -{ - return stream_enable_cache(stream, stream_cache_size * 1024, - stream_cache_size * 1024 * - (stream_cache_min_percent / 100.0), - stream_cache_size * 1024 * - (stream_cache_seek_min_percent / 100.0)); -} - -/** - * \return 1 on success, 0 if the function was interrupted and -1 on error - */ -int stream_enable_cache(stream_t *stream, int64_t size, int64_t min, - int64_t seek_limit) -{ - if (size < 0) - size = stream->cache_size * 1024; - if (!size) - return 1; - mp_tmsg(MSGT_NETWORK, MSGL_INFO, "Cache size set to %" PRId64 " KiB\n", - size / 1024); - - int ss = stream->sector_size ? stream->sector_size : STREAM_BUFFER_SIZE; - int res = -1; - cache_vars_t *s; - - if (size > SIZE_MAX) { - mp_msg(MSGT_CACHE, MSGL_FATAL, - "Cache size larger than max. allocation size\n"); - return -1; - } - - s = cache_init(size, ss); - if (s == NULL) - return -1; - stream->cache_data = s; - s->stream = stream; // callback - s->seek_limit = seek_limit; - - - //make sure that we won't wait from cache_fill - //more data than it is allowed to fill - if (s->seek_limit > s->buffer_size - s->fill_limit) - s->seek_limit = s->buffer_size - s->fill_limit; - if (min > s->buffer_size - s->fill_limit) - min = s->buffer_size - s->fill_limit; - // to make sure we wait for the cache process/thread to be active - // before continuing - if (min <= 0) - min = 1; - -#if FORKED_CACHE - if ((stream->cache_pid = fork())) { - if ((pid_t)stream->cache_pid == -1) - stream->cache_pid = 0; -#else - { - stream_t *stream2 = malloc(sizeof(stream_t)); - memcpy(stream2, s->stream, sizeof(stream_t)); - s->stream = stream2; -#if defined(__MINGW32__) - stream->cache_pid = _beginthread(ThreadProc, 0, s); -#else - { - pthread_t tid; - pthread_create(&tid, NULL, ThreadProc, s); - stream->cache_pid = 1; - } -#endif -#endif - if (!stream->cache_pid) { - mp_msg(MSGT_CACHE, MSGL_ERR, - "Starting cache process/thread failed: %s.\n", - strerror(errno)); - goto err_out; - } - // wait until cache is filled at least prefill_init % - mp_msg(MSGT_CACHE, MSGL_V, "CACHE_PRE_INIT: %" PRId64 " [%" PRId64 "] " - "%" PRId64 " pre:%" PRId64 " eof:%d \n", - s->min_filepos, s->read_filepos, s->max_filepos, min, s->eof); - while (s->read_filepos < s->min_filepos || - s->max_filepos - s->read_filepos < min) - { - mp_tmsg(MSGT_CACHE, MSGL_STATUS, "\rCache fill: %5.2f%% " - "(%" PRId64 " bytes) ", - 100.0 * (float)(s->max_filepos - s->read_filepos) / - (float)(s->buffer_size), - s->max_filepos - s->read_filepos); - if (s->eof) - break; // file is smaller than prefill size - if (stream_check_interrupt(PREFILL_SLEEP_TIME)) { - res = 0; - goto err_out; - } - } - mp_msg(MSGT_CACHE, MSGL_STATUS, "\n"); - stream->cached = true; - return 1; // parent exits - -err_out: - cache_uninit(stream); - return res; - } - -#if FORKED_CACHE - signal(SIGTERM, exit_sighandler); // kill - cache_mainloop(s); - // make sure forked code never leaves this function - exit(0); -#endif -} - -#if !FORKED_CACHE -#if defined(__MINGW32__) -static void ThreadProc(void *s) -{ - cache_mainloop(s); - _endthread(); -} -#else -static void *ThreadProc(void *s) +static int cache_fill_buffer(struct stream *stream, char *buffer, int max_len) { - cache_mainloop(s); - return NULL; -} -#endif -#endif - -int cache_stream_fill_buffer(stream_t *s) -{ - int len; - int sector_size; - if (!s->cache_pid) - return stream_fill_buffer(s); + cache_vars_t *c = stream->priv; + assert(c->cache_pid); - if (s->pos != ((cache_vars_t *)s->cache_data)->read_filepos) + if (stream->pos != c->read_filepos) mp_msg(MSGT_CACHE, MSGL_ERR, "!!! read_filepos differs!!! report this bug...\n"); - sector_size = ((cache_vars_t *)s->cache_data)->sector_size; - if (sector_size > STREAM_MAX_SECTOR_SIZE) { - mp_msg(MSGT_CACHE, MSGL_ERR, "Sector size %i larger than maximum %i\n", - sector_size, - STREAM_MAX_SECTOR_SIZE); - sector_size = STREAM_MAX_SECTOR_SIZE; - } - - len = cache_read(s->cache_data, s->buffer, sector_size); - //printf("cache_stream_fill_buffer->read -> %d\n",len); - - if (len <= 0) { - s->eof = 1; - s->buf_pos = s->buf_len = 0; - return 0; - } - s->eof = 0; - s->buf_pos = 0; - s->buf_len = len; - s->pos += len; -// printf("[%d]",len);fflush(stdout); - stream_capture_write(s); - return len; + return cache_read(c, buffer, max_len); } -int cache_stream_seek_long(stream_t *stream, int64_t pos) +static int cache_seek(stream_t *stream, int64_t pos) { - cache_vars_t *s; + cache_vars_t *s = stream->priv; int64_t newpos; - if (!stream->cache_pid) - return stream_seek_long(stream, pos); + assert(s->cache_pid); - s = stream->cache_data; // s->seek_lock=1; mp_msg(MSGT_CACHE, MSGL_DBG2, "CACHE2_SEEK: 0x%" PRIX64 " <= 0x%" PRIX64 @@ -680,29 +492,14 @@ int cache_stream_seek_long(stream_t *stream, int64_t pos) stream->pos = s->read_filepos = newpos; s->eof = 0; // !!!!!!! cache_wakeup(stream); - - cache_stream_fill_buffer(stream); - - pos -= newpos; - if (pos >= 0 && pos <= stream->buf_len) { - stream->buf_pos = pos; // byte position in sector - return 1; - } - -// stream->buf_pos=stream->buf_len=0; -// return 1; - - mp_msg(MSGT_CACHE, MSGL_V, - "cache_stream_seek: WARNING! Can't seek to 0x%" PRIX64 " !\n", - pos + newpos); - return 0; + return 1; } -int cache_do_control(stream_t *stream, int cmd, void *arg) +static int cache_control(stream_t *stream, int cmd, void *arg) { int sleep_count = 0; int pos_change = 0; - cache_vars_t *s = stream->cache_data; + cache_vars_t *s = stream->priv; switch (cmd) { case STREAM_CTRL_GET_CACHE_SIZE: *(int64_t *)arg = s->buffer_size; @@ -812,3 +609,86 @@ int cache_do_control(stream_t *stream, int cmd, void *arg) } return s->control_res; } + +// return 1 on success, 0 if the function was interrupted and -1 on error, or +// if the cache is disabled +int stream_cache_init(stream_t *cache, stream_t *stream, int64_t size, + int64_t min, int64_t seek_limit) +{ + if (size < 0) + size = stream->cache_size * 1024; + if (!size) + return -1; + mp_tmsg(MSGT_NETWORK, MSGL_INFO, "Cache size set to %" PRId64 " KiB\n", + size / 1024); + + int ss = stream->sector_size ? stream->sector_size : STREAM_BUFFER_SIZE; + cache_vars_t *s; + + if (size > SIZE_MAX) { + mp_msg(MSGT_CACHE, MSGL_FATAL, + "Cache size larger than max. allocation size\n"); + return -1; + } + + s = cache_init(size, ss); + if (s == NULL) + return -1; + cache->priv = s; + s->cache = cache; + s->stream = stream; // callback + s->seek_limit = seek_limit; + + cache->seek = cache_seek; + cache->fill_buffer = cache_fill_buffer; + cache->control = cache_control; + cache->close = cache_uninit; + + //make sure that we won't wait from cache_fill + //more data than it is allowed to fill + if (s->seek_limit > s->buffer_size - s->fill_limit) + s->seek_limit = s->buffer_size - s->fill_limit; + if (min > s->buffer_size - s->fill_limit) + min = s->buffer_size - s->fill_limit; + // to make sure we wait for the cache process/thread to be active + // before continuing + if (min <= 0) + min = 1; + + pid_t child_pid = fork(); + if (child_pid) { + if (child_pid == (pid_t)-1) + child_pid = 0; + if (!child_pid) { + mp_msg(MSGT_CACHE, MSGL_ERR, + "Starting cache process/thread failed: %s.\n", + strerror(errno)); + return -1; + } + s->cache_pid = child_pid; + // wait until cache is filled at least prefill_init % + mp_msg(MSGT_CACHE, MSGL_V, "CACHE_PRE_INIT: %" PRId64 " [%" PRId64 "] " + "%" PRId64 " pre:%" PRId64 " eof:%d \n", + s->min_filepos, s->read_filepos, s->max_filepos, min, s->eof); + while (s->read_filepos < s->min_filepos || + s->max_filepos - s->read_filepos < min) + { + mp_tmsg(MSGT_CACHE, MSGL_STATUS, "\rCache fill: %5.2f%% " + "(%" PRId64 " bytes) ", + 100.0 * (float)(s->max_filepos - s->read_filepos) / + (float)(s->buffer_size), + s->max_filepos - s->read_filepos); + if (s->eof) + break; // file is smaller than prefill size + if (stream_check_interrupt(PREFILL_SLEEP_TIME)) + return 0; + } + mp_msg(MSGT_CACHE, MSGL_STATUS, "\n"); + return 1; // parent exits + } + + signal(SIGTERM, exit_sighandler); // kill + cache_mainloop(s); + // make sure forked code never leaves this function + exit(0); +} |