diff options
Diffstat (limited to 'stream/cache.c')
-rw-r--r-- | stream/cache.c | 354 |
1 files changed, 117 insertions, 237 deletions
diff --git a/stream/cache.c b/stream/cache.c index 9038fc6cf6..1e97771075 100644 --- a/stream/cache.c +++ b/stream/cache.c @@ -34,8 +34,10 @@ #include <string.h> #include <signal.h> #include <sys/types.h> +#include <sys/wait.h> #include <unistd.h> #include <errno.h> +#include <assert.h> #include <libavutil/common.h> @@ -43,27 +45,17 @@ #include "osdep/shmem.h" #include "osdep/timer.h" -#if defined(__MINGW32__) -#include <windows.h> -static void ThreadProc(void *s); -#elif defined(PTHREAD_CACHE) -#include <pthread.h> -static void *ThreadProc(void *s); -#else -#include <sys/wait.h> -#define FORKED_CACHE 1 -#endif -#ifndef FORKED_CACHE -#define FORKED_CACHE 0 -#endif #include "core/mp_msg.h" #include "stream.h" -#include "cache2.h" #include "core/mp_common.h" +// Note: (cache_vars_t*)(cache->priv)->cache == cache typedef struct { + stream_t *cache; // wrapper stream, used by demuxer etc. + stream_t *stream; // "real" stream, used to read from the source media + unsigned int cache_pid; // constats: unsigned char *buffer; // base pointer of the allocated buffer memory int64_t buffer_size; // size of the allocated buffer memory @@ -71,9 +63,7 @@ typedef struct { int64_t back_size; // we should keep back_size amount of old bytes for backward seek int64_t fill_limit; // we should fill buffer only if space>=fill_limit int64_t seek_limit; // keep filling cache if distance is less that seek limit -#if FORKED_CACHE pid_t ppid; // parent PID to detect killed parent -#endif // filler's pointers: int eof; int64_t min_filepos; // buffer contain only a part of the file, from min-max pos @@ -85,7 +75,6 @@ typedef struct { // int seek_lock; // 1 if we will seek/reset buffer, 2 if we are ready for cmd // int fifo_flag; // 1 if we should use FIFO to notice cache about buffer reads. // callback - stream_t *stream; volatile int control; volatile uint64_t control_uint_arg; volatile double control_double_arg; @@ -98,19 +87,20 @@ typedef struct { volatile int idle; } cache_vars_t; -static void cache_wakeup(stream_t *s) +static void cache_wakeup(stream_t *stream) { -#if FORKED_CACHE + cache_vars_t *s = stream->priv; // signal process to wake up immediately kill(s->cache_pid, SIGUSR1); -#endif } +// Runs in the forked process static void cache_flush(cache_vars_t *s) { s->offset = s->min_filepos = s->max_filepos = s->read_filepos; // drop cache content :( } +// Runs in the main process static int cache_read(cache_vars_t *s, unsigned char *buf, int size) { int total = 0; @@ -179,6 +169,7 @@ static int cache_read(cache_vars_t *s, unsigned char *buf, int size) return total; } +// Runs in the forked process static int cache_fill(cache_vars_t *s) { int64_t back, back2, newb, space, len, pos; @@ -198,7 +189,7 @@ static int cache_fill(cache_vars_t *s) cache_flush(s); if (s->stream->eof) stream_reset(s->stream); - stream_seek_internal(s->stream, read); + stream_seek_unbuffered(s->stream, read); mp_msg(MSGT_CACHE, MSGL_DBG2, "Seek done. new pos: 0x%" PRIX64 " \n", (int64_t)stream_tell(s->stream)); } @@ -259,12 +250,12 @@ static int cache_fill(cache_vars_t *s) if (wraparound_copy) { int to_copy; - len = stream_read_internal(s->stream, s->stream->buffer, space); + len = stream_read_unbuffered(s->stream, s->stream->buffer, space); to_copy = FFMIN(len, s->buffer_size - pos); memcpy(s->buffer + pos, s->stream->buffer, to_copy); memcpy(s->buffer, s->stream->buffer + to_copy, len - to_copy); } else - len = stream_read_internal(s->stream, &s->buffer[pos], space); + len = stream_read_unbuffered(s->stream, &s->buffer[pos], space); s->eof = !len; s->max_filepos += len; @@ -275,6 +266,7 @@ static int cache_fill(cache_vars_t *s) } +// Runs in the forked process static int cache_execute_control(cache_vars_t *s) { double double_res; @@ -309,14 +301,12 @@ static int cache_execute_control(cache_vars_t *s) s->stream_start_time = pos; else s->stream_start_time = MP_NOPTS_VALUE; -#if FORKED_CACHE // if parent PID changed, main process was killed -> exit if (s->ppid != getppid()) { mp_msg(MSGT_CACHE, MSGL_WARN, "Parent process disappeared, exiting cache process.\n"); return 0; } -#endif last = mp_time_sec(); } if (s->control == -1) @@ -379,20 +369,12 @@ static int cache_execute_control(cache_vars_t *s) static void *shared_alloc(int64_t size) { -#if FORKED_CACHE return shmem_alloc(size); -#else - return malloc(size); -#endif } static void shared_free(void *ptr, int64_t size) { -#if FORKED_CACHE shmem_free(ptr, size); -#else - free(ptr); -#endif } static cache_vars_t *cache_init(int64_t size, int sector) @@ -418,34 +400,27 @@ static cache_vars_t *cache_init(int64_t size, int sector) s->fill_limit = 8 * sector; s->back_size = s->buffer_size / 2; -#if FORKED_CACHE s->ppid = getpid(); -#endif return s; } -void cache_uninit(stream_t *s) +static void cache_uninit(stream_t *s) { - cache_vars_t *c = s->cache_data; - if (s->cache_pid) { -#if !FORKED_CACHE - cache_do_control(s, -2, NULL); -#else - kill(s->cache_pid, SIGKILL); - waitpid(s->cache_pid, NULL, 0); -#endif - s->cache_pid = 0; + cache_vars_t *c = s->priv; + if (c->cache_pid) { + kill(c->cache_pid, SIGKILL); + waitpid(c->cache_pid, NULL, 0); + c->cache_pid = 0; } if (!c) return; shared_free(c->buffer, c->buffer_size); c->buffer = NULL; c->stream = NULL; - shared_free(s->cache_data, sizeof(cache_vars_t)); - s->cache_data = NULL; + shared_free(c, sizeof(cache_vars_t)); + s->priv = NULL; } -#if FORKED_CACHE static void exit_sighandler(int x) { // close stream @@ -455,7 +430,6 @@ static void exit_sighandler(int x) static void dummy_sighandler(int x) { } -#endif /** * Main loop of the cache process or thread. @@ -463,31 +437,25 @@ static void dummy_sighandler(int x) static void cache_mainloop(cache_vars_t *s) { int sleep_count = 0; -#if FORKED_CACHE struct sigaction sa = { .sa_handler = SIG_IGN }; sigaction(SIGUSR1, &sa, NULL); -#endif do { if (!cache_fill(s)) { s->idle = 1; -#if FORKED_CACHE // Let signal wake us up, we cannot leave this // enabled since we do not handle EINTR in most places. // This might need extra code to work on BSD. sa.sa_handler = dummy_sighandler; sigaction(SIGUSR1, &sa, NULL); -#endif if (sleep_count < INITIAL_FILL_USLEEP_COUNT) { sleep_count++; mp_sleep_us(INITIAL_FILL_USLEEP_TIME); } else mp_sleep_us(FILL_USLEEP_TIME); // idle -#if FORKED_CACHE sa.sa_handler = SIG_IGN; sigaction(SIGUSR1, &sa, NULL); -#endif } else { sleep_count = 0; s->idle = 0; @@ -495,180 +463,24 @@ static void cache_mainloop(cache_vars_t *s) } while (cache_execute_control(s)); } -int stream_enable_cache_percent(stream_t *stream, int64_t stream_cache_size, - float stream_cache_min_percent, - float stream_cache_seek_min_percent) -{ - return stream_enable_cache(stream, stream_cache_size * 1024, - stream_cache_size * 1024 * - (stream_cache_min_percent / 100.0), - stream_cache_size * 1024 * - (stream_cache_seek_min_percent / 100.0)); -} - -/** - * \return 1 on success, 0 if the function was interrupted and -1 on error - */ -int stream_enable_cache(stream_t *stream, int64_t size, int64_t min, - int64_t seek_limit) -{ - if (size < 0) - size = stream->cache_size * 1024; - if (!size) - return 1; - mp_tmsg(MSGT_NETWORK, MSGL_INFO, "Cache size set to %" PRId64 " KiB\n", - size / 1024); - - int ss = stream->sector_size ? stream->sector_size : STREAM_BUFFER_SIZE; - int res = -1; - cache_vars_t *s; - - if (size > SIZE_MAX) { - mp_msg(MSGT_CACHE, MSGL_FATAL, - "Cache size larger than max. allocation size\n"); - return -1; - } - - s = cache_init(size, ss); - if (s == NULL) - return -1; - stream->cache_data = s; - s->stream = stream; // callback - s->seek_limit = seek_limit; - - - //make sure that we won't wait from cache_fill - //more data than it is allowed to fill - if (s->seek_limit > s->buffer_size - s->fill_limit) - s->seek_limit = s->buffer_size - s->fill_limit; - if (min > s->buffer_size - s->fill_limit) - min = s->buffer_size - s->fill_limit; - // to make sure we wait for the cache process/thread to be active - // before continuing - if (min <= 0) - min = 1; - -#if FORKED_CACHE - if ((stream->cache_pid = fork())) { - if ((pid_t)stream->cache_pid == -1) - stream->cache_pid = 0; -#else - { - stream_t *stream2 = malloc(sizeof(stream_t)); - memcpy(stream2, s->stream, sizeof(stream_t)); - s->stream = stream2; -#if defined(__MINGW32__) - stream->cache_pid = _beginthread(ThreadProc, 0, s); -#else - { - pthread_t tid; - pthread_create(&tid, NULL, ThreadProc, s); - stream->cache_pid = 1; - } -#endif -#endif - if (!stream->cache_pid) { - mp_msg(MSGT_CACHE, MSGL_ERR, - "Starting cache process/thread failed: %s.\n", - strerror(errno)); - goto err_out; - } - // wait until cache is filled at least prefill_init % - mp_msg(MSGT_CACHE, MSGL_V, "CACHE_PRE_INIT: %" PRId64 " [%" PRId64 "] " - "%" PRId64 " pre:%" PRId64 " eof:%d \n", - s->min_filepos, s->read_filepos, s->max_filepos, min, s->eof); - while (s->read_filepos < s->min_filepos || - s->max_filepos - s->read_filepos < min) - { - mp_tmsg(MSGT_CACHE, MSGL_STATUS, "\rCache fill: %5.2f%% " - "(%" PRId64 " bytes) ", - 100.0 * (float)(s->max_filepos - s->read_filepos) / - (float)(s->buffer_size), - s->max_filepos - s->read_filepos); - if (s->eof) - break; // file is smaller than prefill size - if (stream_check_interrupt(PREFILL_SLEEP_TIME)) { - res = 0; - goto err_out; - } - } - mp_msg(MSGT_CACHE, MSGL_STATUS, "\n"); - stream->cached = true; - return 1; // parent exits - -err_out: - cache_uninit(stream); - return res; - } - -#if FORKED_CACHE - signal(SIGTERM, exit_sighandler); // kill - cache_mainloop(s); - // make sure forked code never leaves this function - exit(0); -#endif -} - -#if !FORKED_CACHE -#if defined(__MINGW32__) -static void ThreadProc(void *s) -{ - cache_mainloop(s); - _endthread(); -} -#else -static void *ThreadProc(void *s) +static int cache_fill_buffer(struct stream *stream, char *buffer, int max_len) { - cache_mainloop(s); - return NULL; -} -#endif -#endif - -int cache_stream_fill_buffer(stream_t *s) -{ - int len; - int sector_size; - if (!s->cache_pid) - return stream_fill_buffer(s); + cache_vars_t *c = stream->priv; + assert(c->cache_pid); - if (s->pos != ((cache_vars_t *)s->cache_data)->read_filepos) + if (stream->pos != c->read_filepos) mp_msg(MSGT_CACHE, MSGL_ERR, "!!! read_filepos differs!!! report this bug...\n"); - sector_size = ((cache_vars_t *)s->cache_data)->sector_size; - if (sector_size > STREAM_MAX_SECTOR_SIZE) { - mp_msg(MSGT_CACHE, MSGL_ERR, "Sector size %i larger than maximum %i\n", - sector_size, - STREAM_MAX_SECTOR_SIZE); - sector_size = STREAM_MAX_SECTOR_SIZE; - } - - len = cache_read(s->cache_data, s->buffer, sector_size); - //printf("cache_stream_fill_buffer->read -> %d\n",len); - - if (len <= 0) { - s->eof = 1; - s->buf_pos = s->buf_len = 0; - return 0; - } - s->eof = 0; - s->buf_pos = 0; - s->buf_len = len; - s->pos += len; -// printf("[%d]",len);fflush(stdout); - stream_capture_write(s); - return len; + return cache_read(c, buffer, max_len); } -int cache_stream_seek_long(stream_t *stream, int64_t pos) +static int cache_seek(stream_t *stream, int64_t pos) { - cache_vars_t *s; + cache_vars_t *s = stream->priv; int64_t newpos; - if (!stream->cache_pid) - return stream_seek_long(stream, pos); + assert(s->cache_pid); - s = stream->cache_data; // s->seek_lock=1; mp_msg(MSGT_CACHE, MSGL_DBG2, "CACHE2_SEEK: 0x%" PRIX64 " <= 0x%" PRIX64 @@ -680,29 +492,14 @@ int cache_stream_seek_long(stream_t *stream, int64_t pos) stream->pos = s->read_filepos = newpos; s->eof = 0; // !!!!!!! cache_wakeup(stream); - - cache_stream_fill_buffer(stream); - - pos -= newpos; - if (pos >= 0 && pos <= stream->buf_len) { - stream->buf_pos = pos; // byte position in sector - return 1; - } - -// stream->buf_pos=stream->buf_len=0; -// return 1; - - mp_msg(MSGT_CACHE, MSGL_V, - "cache_stream_seek: WARNING! Can't seek to 0x%" PRIX64 " !\n", - pos + newpos); - return 0; + return 1; } -int cache_do_control(stream_t *stream, int cmd, void *arg) +static int cache_control(stream_t *stream, int cmd, void *arg) { int sleep_count = 0; int pos_change = 0; - cache_vars_t *s = stream->cache_data; + cache_vars_t *s = stream->priv; switch (cmd) { case STREAM_CTRL_GET_CACHE_SIZE: *(int64_t *)arg = s->buffer_size; @@ -812,3 +609,86 @@ int cache_do_control(stream_t *stream, int cmd, void *arg) } return s->control_res; } + +// return 1 on success, 0 if the function was interrupted and -1 on error, or +// if the cache is disabled +int stream_cache_init(stream_t *cache, stream_t *stream, int64_t size, + int64_t min, int64_t seek_limit) +{ + if (size < 0) + size = stream->cache_size * 1024; + if (!size) + return -1; + mp_tmsg(MSGT_NETWORK, MSGL_INFO, "Cache size set to %" PRId64 " KiB\n", + size / 1024); + + int ss = stream->sector_size ? stream->sector_size : STREAM_BUFFER_SIZE; + cache_vars_t *s; + + if (size > SIZE_MAX) { + mp_msg(MSGT_CACHE, MSGL_FATAL, + "Cache size larger than max. allocation size\n"); + return -1; + } + + s = cache_init(size, ss); + if (s == NULL) + return -1; + cache->priv = s; + s->cache = cache; + s->stream = stream; // callback + s->seek_limit = seek_limit; + + cache->seek = cache_seek; + cache->fill_buffer = cache_fill_buffer; + cache->control = cache_control; + cache->close = cache_uninit; + + //make sure that we won't wait from cache_fill + //more data than it is allowed to fill + if (s->seek_limit > s->buffer_size - s->fill_limit) + s->seek_limit = s->buffer_size - s->fill_limit; + if (min > s->buffer_size - s->fill_limit) + min = s->buffer_size - s->fill_limit; + // to make sure we wait for the cache process/thread to be active + // before continuing + if (min <= 0) + min = 1; + + pid_t child_pid = fork(); + if (child_pid) { + if (child_pid == (pid_t)-1) + child_pid = 0; + if (!child_pid) { + mp_msg(MSGT_CACHE, MSGL_ERR, + "Starting cache process/thread failed: %s.\n", + strerror(errno)); + return -1; + } + s->cache_pid = child_pid; + // wait until cache is filled at least prefill_init % + mp_msg(MSGT_CACHE, MSGL_V, "CACHE_PRE_INIT: %" PRId64 " [%" PRId64 "] " + "%" PRId64 " pre:%" PRId64 " eof:%d \n", + s->min_filepos, s->read_filepos, s->max_filepos, min, s->eof); + while (s->read_filepos < s->min_filepos || + s->max_filepos - s->read_filepos < min) + { + mp_tmsg(MSGT_CACHE, MSGL_STATUS, "\rCache fill: %5.2f%% " + "(%" PRId64 " bytes) ", + 100.0 * (float)(s->max_filepos - s->read_filepos) / + (float)(s->buffer_size), + s->max_filepos - s->read_filepos); + if (s->eof) + break; // file is smaller than prefill size + if (stream_check_interrupt(PREFILL_SLEEP_TIME)) + return 0; + } + mp_msg(MSGT_CACHE, MSGL_STATUS, "\n"); + return 1; // parent exits + } + + signal(SIGTERM, exit_sighandler); // kill + cache_mainloop(s); + // make sure forked code never leaves this function + exit(0); +} |