diff options
Diffstat (limited to 'stream')
-rw-r--r-- | stream/cache.c | 354 | ||||
-rw-r--r-- | stream/cache2.h | 27 | ||||
-rw-r--r-- | stream/stream.c | 113 | ||||
-rw-r--r-- | stream/stream.h | 44 | ||||
-rw-r--r-- | stream/stream_radio.c | 13 |
5 files changed, 220 insertions, 331 deletions
diff --git a/stream/cache.c b/stream/cache.c index 9038fc6cf6..1e97771075 100644 --- a/stream/cache.c +++ b/stream/cache.c @@ -34,8 +34,10 @@ #include <string.h> #include <signal.h> #include <sys/types.h> +#include <sys/wait.h> #include <unistd.h> #include <errno.h> +#include <assert.h> #include <libavutil/common.h> @@ -43,27 +45,17 @@ #include "osdep/shmem.h" #include "osdep/timer.h" -#if defined(__MINGW32__) -#include <windows.h> -static void ThreadProc(void *s); -#elif defined(PTHREAD_CACHE) -#include <pthread.h> -static void *ThreadProc(void *s); -#else -#include <sys/wait.h> -#define FORKED_CACHE 1 -#endif -#ifndef FORKED_CACHE -#define FORKED_CACHE 0 -#endif #include "core/mp_msg.h" #include "stream.h" -#include "cache2.h" #include "core/mp_common.h" +// Note: (cache_vars_t*)(cache->priv)->cache == cache typedef struct { + stream_t *cache; // wrapper stream, used by demuxer etc. + stream_t *stream; // "real" stream, used to read from the source media + unsigned int cache_pid; // constats: unsigned char *buffer; // base pointer of the allocated buffer memory int64_t buffer_size; // size of the allocated buffer memory @@ -71,9 +63,7 @@ typedef struct { int64_t back_size; // we should keep back_size amount of old bytes for backward seek int64_t fill_limit; // we should fill buffer only if space>=fill_limit int64_t seek_limit; // keep filling cache if distance is less that seek limit -#if FORKED_CACHE pid_t ppid; // parent PID to detect killed parent -#endif // filler's pointers: int eof; int64_t min_filepos; // buffer contain only a part of the file, from min-max pos @@ -85,7 +75,6 @@ typedef struct { // int seek_lock; // 1 if we will seek/reset buffer, 2 if we are ready for cmd // int fifo_flag; // 1 if we should use FIFO to notice cache about buffer reads. // callback - stream_t *stream; volatile int control; volatile uint64_t control_uint_arg; volatile double control_double_arg; @@ -98,19 +87,20 @@ typedef struct { volatile int idle; } cache_vars_t; -static void cache_wakeup(stream_t *s) +static void cache_wakeup(stream_t *stream) { -#if FORKED_CACHE + cache_vars_t *s = stream->priv; // signal process to wake up immediately kill(s->cache_pid, SIGUSR1); -#endif } +// Runs in the forked process static void cache_flush(cache_vars_t *s) { s->offset = s->min_filepos = s->max_filepos = s->read_filepos; // drop cache content :( } +// Runs in the main process static int cache_read(cache_vars_t *s, unsigned char *buf, int size) { int total = 0; @@ -179,6 +169,7 @@ static int cache_read(cache_vars_t *s, unsigned char *buf, int size) return total; } +// Runs in the forked process static int cache_fill(cache_vars_t *s) { int64_t back, back2, newb, space, len, pos; @@ -198,7 +189,7 @@ static int cache_fill(cache_vars_t *s) cache_flush(s); if (s->stream->eof) stream_reset(s->stream); - stream_seek_internal(s->stream, read); + stream_seek_unbuffered(s->stream, read); mp_msg(MSGT_CACHE, MSGL_DBG2, "Seek done. new pos: 0x%" PRIX64 " \n", (int64_t)stream_tell(s->stream)); } @@ -259,12 +250,12 @@ static int cache_fill(cache_vars_t *s) if (wraparound_copy) { int to_copy; - len = stream_read_internal(s->stream, s->stream->buffer, space); + len = stream_read_unbuffered(s->stream, s->stream->buffer, space); to_copy = FFMIN(len, s->buffer_size - pos); memcpy(s->buffer + pos, s->stream->buffer, to_copy); memcpy(s->buffer, s->stream->buffer + to_copy, len - to_copy); } else - len = stream_read_internal(s->stream, &s->buffer[pos], space); + len = stream_read_unbuffered(s->stream, &s->buffer[pos], space); s->eof = !len; s->max_filepos += len; @@ -275,6 +266,7 @@ static int cache_fill(cache_vars_t *s) } +// Runs in the forked process static int cache_execute_control(cache_vars_t *s) { double double_res; @@ -309,14 +301,12 @@ static int cache_execute_control(cache_vars_t *s) s->stream_start_time = pos; else s->stream_start_time = MP_NOPTS_VALUE; -#if FORKED_CACHE // if parent PID changed, main process was killed -> exit if (s->ppid != getppid()) { mp_msg(MSGT_CACHE, MSGL_WARN, "Parent process disappeared, exiting cache process.\n"); return 0; } -#endif last = mp_time_sec(); } if (s->control == -1) @@ -379,20 +369,12 @@ static int cache_execute_control(cache_vars_t *s) static void *shared_alloc(int64_t size) { -#if FORKED_CACHE return shmem_alloc(size); -#else - return malloc(size); -#endif } static void shared_free(void *ptr, int64_t size) { -#if FORKED_CACHE shmem_free(ptr, size); -#else - free(ptr); -#endif } static cache_vars_t *cache_init(int64_t size, int sector) @@ -418,34 +400,27 @@ static cache_vars_t *cache_init(int64_t size, int sector) s->fill_limit = 8 * sector; s->back_size = s->buffer_size / 2; -#if FORKED_CACHE s->ppid = getpid(); -#endif return s; } -void cache_uninit(stream_t *s) +static void cache_uninit(stream_t *s) { - cache_vars_t *c = s->cache_data; - if (s->cache_pid) { -#if !FORKED_CACHE - cache_do_control(s, -2, NULL); -#else - kill(s->cache_pid, SIGKILL); - waitpid(s->cache_pid, NULL, 0); -#endif - s->cache_pid = 0; + cache_vars_t *c = s->priv; + if (c->cache_pid) { + kill(c->cache_pid, SIGKILL); + waitpid(c->cache_pid, NULL, 0); + c->cache_pid = 0; } if (!c) return; shared_free(c->buffer, c->buffer_size); c->buffer = NULL; c->stream = NULL; - shared_free(s->cache_data, sizeof(cache_vars_t)); - s->cache_data = NULL; + shared_free(c, sizeof(cache_vars_t)); + s->priv = NULL; } -#if FORKED_CACHE static void exit_sighandler(int x) { // close stream @@ -455,7 +430,6 @@ static void exit_sighandler(int x) static void dummy_sighandler(int x) { } -#endif /** * Main loop of the cache process or thread. @@ -463,31 +437,25 @@ static void dummy_sighandler(int x) static void cache_mainloop(cache_vars_t *s) { int sleep_count = 0; -#if FORKED_CACHE struct sigaction sa = { .sa_handler = SIG_IGN }; sigaction(SIGUSR1, &sa, NULL); -#endif do { if (!cache_fill(s)) { s->idle = 1; -#if FORKED_CACHE // Let signal wake us up, we cannot leave this // enabled since we do not handle EINTR in most places. // This might need extra code to work on BSD. sa.sa_handler = dummy_sighandler; sigaction(SIGUSR1, &sa, NULL); -#endif if (sleep_count < INITIAL_FILL_USLEEP_COUNT) { sleep_count++; mp_sleep_us(INITIAL_FILL_USLEEP_TIME); } else mp_sleep_us(FILL_USLEEP_TIME); // idle -#if FORKED_CACHE sa.sa_handler = SIG_IGN; sigaction(SIGUSR1, &sa, NULL); -#endif } else { sleep_count = 0; s->idle = 0; @@ -495,180 +463,24 @@ static void cache_mainloop(cache_vars_t *s) } while (cache_execute_control(s)); } -int stream_enable_cache_percent(stream_t *stream, int64_t stream_cache_size, - float stream_cache_min_percent, - float stream_cache_seek_min_percent) -{ - return stream_enable_cache(stream, stream_cache_size * 1024, - stream_cache_size * 1024 * - (stream_cache_min_percent / 100.0), - stream_cache_size * 1024 * - (stream_cache_seek_min_percent / 100.0)); -} - -/** - * \return 1 on success, 0 if the function was interrupted and -1 on error - */ -int stream_enable_cache(stream_t *stream, int64_t size, int64_t min, - int64_t seek_limit) -{ - if (size < 0) - size = stream->cache_size * 1024; - if (!size) - return 1; - mp_tmsg(MSGT_NETWORK, MSGL_INFO, "Cache size set to %" PRId64 " KiB\n", - size / 1024); - - int ss = stream->sector_size ? stream->sector_size : STREAM_BUFFER_SIZE; - int res = -1; - cache_vars_t *s; - - if (size > SIZE_MAX) { - mp_msg(MSGT_CACHE, MSGL_FATAL, - "Cache size larger than max. allocation size\n"); - return -1; - } - - s = cache_init(size, ss); - if (s == NULL) - return -1; - stream->cache_data = s; - s->stream = stream; // callback - s->seek_limit = seek_limit; - - - //make sure that we won't wait from cache_fill - //more data than it is allowed to fill - if (s->seek_limit > s->buffer_size - s->fill_limit) - s->seek_limit = s->buffer_size - s->fill_limit; - if (min > s->buffer_size - s->fill_limit) - min = s->buffer_size - s->fill_limit; - // to make sure we wait for the cache process/thread to be active - // before continuing - if (min <= 0) - min = 1; - -#if FORKED_CACHE - if ((stream->cache_pid = fork())) { - if ((pid_t)stream->cache_pid == -1) - stream->cache_pid = 0; -#else - { - stream_t *stream2 = malloc(sizeof(stream_t)); - memcpy(stream2, s->stream, sizeof(stream_t)); - s->stream = stream2; -#if defined(__MINGW32__) - stream->cache_pid = _beginthread(ThreadProc, 0, s); -#else - { - pthread_t tid; - pthread_create(&tid, NULL, ThreadProc, s); - stream->cache_pid = 1; - } -#endif -#endif - if (!stream->cache_pid) { - mp_msg(MSGT_CACHE, MSGL_ERR, - "Starting cache process/thread failed: %s.\n", - strerror(errno)); - goto err_out; - } - // wait until cache is filled at least prefill_init % - mp_msg(MSGT_CACHE, MSGL_V, "CACHE_PRE_INIT: %" PRId64 " [%" PRId64 "] " - "%" PRId64 " pre:%" PRId64 " eof:%d \n", - s->min_filepos, s->read_filepos, s->max_filepos, min, s->eof); - while (s->read_filepos < s->min_filepos || - s->max_filepos - s->read_filepos < min) - { - mp_tmsg(MSGT_CACHE, MSGL_STATUS, "\rCache fill: %5.2f%% " - "(%" PRId64 " bytes) ", - 100.0 * (float)(s->max_filepos - s->read_filepos) / - (float)(s->buffer_size), - s->max_filepos - s->read_filepos); - if (s->eof) - break; // file is smaller than prefill size - if (stream_check_interrupt(PREFILL_SLEEP_TIME)) { - res = 0; - goto err_out; - } - } - mp_msg(MSGT_CACHE, MSGL_STATUS, "\n"); - stream->cached = true; - return 1; // parent exits - -err_out: - cache_uninit(stream); - return res; - } - -#if FORKED_CACHE - signal(SIGTERM, exit_sighandler); // kill - cache_mainloop(s); - // make sure forked code never leaves this function - exit(0); -#endif -} - -#if !FORKED_CACHE -#if defined(__MINGW32__) -static void ThreadProc(void *s) -{ - cache_mainloop(s); - _endthread(); -} -#else -static void *ThreadProc(void *s) +static int cache_fill_buffer(struct stream *stream, char *buffer, int max_len) { - cache_mainloop(s); - return NULL; -} -#endif -#endif - -int cache_stream_fill_buffer(stream_t *s) -{ - int len; - int sector_size; - if (!s->cache_pid) - return stream_fill_buffer(s); + cache_vars_t *c = stream->priv; + assert(c->cache_pid); - if (s->pos != ((cache_vars_t *)s->cache_data)->read_filepos) + if (stream->pos != c->read_filepos) mp_msg(MSGT_CACHE, MSGL_ERR, "!!! read_filepos differs!!! report this bug...\n"); - sector_size = ((cache_vars_t *)s->cache_data)->sector_size; - if (sector_size > STREAM_MAX_SECTOR_SIZE) { - mp_msg(MSGT_CACHE, MSGL_ERR, "Sector size %i larger than maximum %i\n", - sector_size, - STREAM_MAX_SECTOR_SIZE); - sector_size = STREAM_MAX_SECTOR_SIZE; - } - - len = cache_read(s->cache_data, s->buffer, sector_size); - //printf("cache_stream_fill_buffer->read -> %d\n",len); - - if (len <= 0) { - s->eof = 1; - s->buf_pos = s->buf_len = 0; - return 0; - } - s->eof = 0; - s->buf_pos = 0; - s->buf_len = len; - s->pos += len; -// printf("[%d]",len);fflush(stdout); - stream_capture_write(s); - return len; + return cache_read(c, buffer, max_len); } -int cache_stream_seek_long(stream_t *stream, int64_t pos) +static int cache_seek(stream_t *stream, int64_t pos) { - cache_vars_t *s; + cache_vars_t *s = stream->priv; int64_t newpos; - if (!stream->cache_pid) - return stream_seek_long(stream, pos); + assert(s->cache_pid); - s = stream->cache_data; // s->seek_lock=1; mp_msg(MSGT_CACHE, MSGL_DBG2, "CACHE2_SEEK: 0x%" PRIX64 " <= 0x%" PRIX64 @@ -680,29 +492,14 @@ int cache_stream_seek_long(stream_t *stream, int64_t pos) stream->pos = s->read_filepos = newpos; s->eof = 0; // !!!!!!! cache_wakeup(stream); - - cache_stream_fill_buffer(stream); - - pos -= newpos; - if (pos >= 0 && pos <= stream->buf_len) { - stream->buf_pos = pos; // byte position in sector - return 1; - } - -// stream->buf_pos=stream->buf_len=0; -// return 1; - - mp_msg(MSGT_CACHE, MSGL_V, - "cache_stream_seek: WARNING! Can't seek to 0x%" PRIX64 " !\n", - pos + newpos); - return 0; + return 1; } -int cache_do_control(stream_t *stream, int cmd, void *arg) +static int cache_control(stream_t *stream, int cmd, void *arg) { int sleep_count = 0; int pos_change = 0; - cache_vars_t *s = stream->cache_data; + cache_vars_t *s = stream->priv; switch (cmd) { case STREAM_CTRL_GET_CACHE_SIZE: *(int64_t *)arg = s->buffer_size; @@ -812,3 +609,86 @@ int cache_do_control(stream_t *stream, int cmd, void *arg) } return s->control_res; } + +// return 1 on success, 0 if the function was interrupted and -1 on error, or +// if the cache is disabled +int stream_cache_init(stream_t *cache, stream_t *stream, int64_t size, + int64_t min, int64_t seek_limit) +{ + if (size < 0) + size = stream->cache_size * 1024; + if (!size) + return -1; + mp_tmsg(MSGT_NETWORK, MSGL_INFO, "Cache size set to %" PRId64 " KiB\n", + size / 1024); + + int ss = stream->sector_size ? stream->sector_size : STREAM_BUFFER_SIZE; + cache_vars_t *s; + + if (size > SIZE_MAX) { + mp_msg(MSGT_CACHE, MSGL_FATAL, + "Cache size larger than max. allocation size\n"); + return -1; + } + + s = cache_init(size, ss); + if (s == NULL) + return -1; + cache->priv = s; + s->cache = cache; + s->stream = stream; // callback + s->seek_limit = seek_limit; + + cache->seek = cache_seek; + cache->fill_buffer = cache_fill_buffer; + cache->control = cache_control; + cache->close = cache_uninit; + + //make sure that we won't wait from cache_fill + //more data than it is allowed to fill + if (s->seek_limit > s->buffer_size - s->fill_limit) + s->seek_limit = s->buffer_size - s->fill_limit; + if (min > s->buffer_size - s->fill_limit) + min = s->buffer_size - s->fill_limit; + // to make sure we wait for the cache process/thread to be active + // before continuing + if (min <= 0) + min = 1; + + pid_t child_pid = fork(); + if (child_pid) { + if (child_pid == (pid_t)-1) + child_pid = 0; + if (!child_pid) { + mp_msg(MSGT_CACHE, MSGL_ERR, + "Starting cache process/thread failed: %s.\n", + strerror(errno)); + return -1; + } + s->cache_pid = child_pid; + // wait until cache is filled at least prefill_init % + mp_msg(MSGT_CACHE, MSGL_V, "CACHE_PRE_INIT: %" PRId64 " [%" PRId64 "] " + "%" PRId64 " pre:%" PRId64 " eof:%d \n", + s->min_filepos, s->read_filepos, s->max_filepos, min, s->eof); + while (s->read_filepos < s->min_filepos || + s->max_filepos - s->read_filepos < min) + { + mp_tmsg(MSGT_CACHE, MSGL_STATUS, "\rCache fill: %5.2f%% " + "(%" PRId64 " bytes) ", + 100.0 * (float)(s->max_filepos - s->read_filepos) / + (float)(s->buffer_size), + s->max_filepos - s->read_filepos); + if (s->eof) + break; // file is smaller than prefill size + if (stream_check_interrupt(PREFILL_SLEEP_TIME)) + return 0; + } + mp_msg(MSGT_CACHE, MSGL_STATUS, "\n"); + return 1; // parent exits + } + + signal(SIGTERM, exit_sighandler); // kill + cache_mainloop(s); + // make sure forked code never leaves this function + exit(0); +} diff --git a/stream/cache2.h b/stream/cache2.h deleted file mode 100644 index 330558333e..0000000000 --- a/stream/cache2.h +++ /dev/null @@ -1,27 +0,0 @@ -/* - * This file is part of MPlayer. - * - * MPlayer is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation; either version 2 of the License, or - * (at your option) any later version. - * - * MPlayer is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License along - * with MPlayer; if not, write to the Free Software Foundation, Inc., - * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. - */ - -#ifndef MPLAYER_CACHE2_H -#define MPLAYER_CACHE2_H - -#include "stream.h" - -void cache_uninit(stream_t *s); -int cache_do_control(stream_t *stream, int cmd, void *arg); - -#endif /* MPLAYER_CACHE2_H */ diff --git a/stream/stream.c b/stream/stream.c index 4f12e57856..3bfa418c90 100644 --- a/stream/stream.c +++ b/stream/stream.c @@ -52,8 +52,6 @@ #include "core/m_option.h" #include "core/m_struct.h" -#include "cache2.h" - /// We keep these 2 for the gui atm, but they will be removed. char *cdrom_device = NULL; char *dvd_device = NULL; @@ -136,7 +134,7 @@ static const stream_info_t *const auto_open_streams[] = { NULL }; -static stream_t *new_stream(int fd, int type); +static stream_t *new_stream(void); static stream_t *open_stream_plugin(const stream_info_t *sinfo, const char *filename, @@ -165,7 +163,7 @@ static stream_t *open_stream_plugin(const stream_info_t *sinfo, } } } - s = new_stream(-2, -2); + s = new_stream(); s->opts = options; s->url = strdup(filename); s->flags |= mode; @@ -200,6 +198,8 @@ static stream_t *open_stream_plugin(const stream_info_t *sinfo, s->mode = mode; + s->uncached_type = s->type; + mp_msg(MSGT_OPEN, MSGL_V, "STREAM: [%s] %s\n", sinfo->name, filename); mp_msg(MSGT_OPEN, MSGL_V, "STREAM: Description: %s\n", sinfo->info); mp_msg(MSGT_OPEN, MSGL_V, "STREAM: Author: %s\n", sinfo->author); @@ -308,7 +308,7 @@ static int stream_reconnect(stream_t *s) continue; } - if (stream_seek_internal(s, pos) < 0 && s->pos == pos) + if (stream_seek_unbuffered(s, pos) < 0 && s->pos == pos) return 1; } return 0; @@ -345,7 +345,11 @@ void stream_capture_write(stream_t *s) } } -int stream_read_internal(stream_t *s, void *buf, int len) +// Read function bypassing the local stream buffer. This will not write into +// s->buffer, but into buf[0..len] instead. +// Returns < 0 on error, 0 on EOF, and length of bytes read on success. +// Partial reads are possible, even if EOF is not reached. +int stream_read_unbuffered(stream_t *s, void *buf, int len) { int orig_len = len; // we will retry even if we already reached EOF previously. @@ -379,7 +383,7 @@ int stream_read_internal(stream_t *s, void *buf, int len) goto eof_out; // make sure EOF is set to ensure no endless loops s->eof = 1; - return stream_read_internal(s, buf, orig_len); + return stream_read_unbuffered(s, buf, orig_len); eof_out: s->eof = 1; @@ -394,7 +398,7 @@ eof_out: int stream_fill_buffer(stream_t *s) { - int len = stream_read_internal(s, s->buffer, STREAM_BUFFER_SIZE); + int len = stream_read_unbuffered(s, s->buffer, STREAM_BUFFER_SIZE); if (len <= 0) return 0; s->buf_pos = 0; @@ -411,7 +415,7 @@ int stream_read(stream_t *s, char *mem, int total) int x; x = s->buf_len - s->buf_pos; if (x == 0) { - if (!cache_stream_fill_buffer(s)) + if (!stream_fill_buffer(s)) return total - len; // EOF x = s->buf_len - s->buf_pos; } @@ -441,7 +445,8 @@ int stream_write_buffer(stream_t *s, unsigned char *buf, int len) return rd; } -int stream_seek_internal(stream_t *s, int64_t newpos) +// Seek function bypassing the local stream buffer. +int stream_seek_unbuffered(stream_t *s, int64_t newpos) { if (newpos == 0 || newpos != s->pos) { switch (s->type) { @@ -492,7 +497,9 @@ int stream_seek_internal(stream_t *s, int64_t newpos) return -1; } -int stream_seek_long(stream_t *s, int64_t pos) +// Unlike stream_seek, does not try to seek within local buffer. +// Unlike stream_seek_unbuffered(), it still fills the local buffer. +static int stream_seek_long(stream_t *s, int64_t pos) { int res; int64_t newpos = 0; @@ -517,7 +524,7 @@ int stream_seek_long(stream_t *s, int64_t pos) (int64_t)s->pos, (int64_t)newpos, (int64_t)pos, s->buf_len); pos -= newpos; - res = stream_seek_internal(s, newpos); + res = stream_seek_unbuffered(s, newpos); if (res >= 0) return res; @@ -565,7 +572,7 @@ int stream_seek(stream_t *s, int64_t pos) } } - return cache_stream_seek_long(s, pos); + return stream_seek_long(s, pos); } int stream_skip(stream_t *s, int64_t len) @@ -578,7 +585,7 @@ int stream_skip(stream_t *s, int64_t len) while (len > 0) { int x = s->buf_len - s->buf_pos; if (x == 0) { - if (!cache_stream_fill_buffer(s)) + if (!stream_fill_buffer(s)) return 0; // EOF x = s->buf_len - s->buf_pos; } @@ -602,10 +609,6 @@ void stream_reset(stream_t *s) int stream_control(stream_t *s, int cmd, void *arg) { -#ifdef CONFIG_STREAM_CACHE - if (s->cache_pid) - return cache_do_control(s, cmd, arg); -#endif if (!s->control) return STREAM_UNSUPPORTED; return s->control(s, cmd, arg); @@ -620,7 +623,7 @@ void stream_update_size(stream_t *s) } } -static stream_t *new_stream(int fd, int type) +static stream_t *new_stream(void) { stream_t *s = talloc_zero(NULL, stream_t); @@ -632,18 +635,13 @@ static stream_t *new_stream(int fd, int type) } #endif - s->fd = fd; - s->type = type; - stream_reset(s); + s->fd = -2; + s->type = -2; return s; } void free_stream(stream_t *s) { -// printf("\n*** free_stream() called ***\n"); -#ifdef CONFIG_STREAM_CACHE - cache_uninit(s); -#endif stream_set_capture_file(s, NULL); if (s->close) @@ -662,6 +660,8 @@ void free_stream(stream_t *s) WSACleanup(); // there might be a better place for this (-> later) #endif free(s->url); + if (s->uncached_stream) + free_stream(s->uncached_stream); talloc_free(s); } @@ -681,6 +681,65 @@ int stream_check_interrupt(int time) return stream_check_interrupt_cb(stream_check_interrupt_ctx, time); } +int stream_enable_cache_percent(stream_t **stream, int64_t stream_cache_size, + float stream_cache_min_percent, + float stream_cache_seek_min_percent) +{ + return stream_enable_cache(stream, stream_cache_size * 1024, + stream_cache_size * 1024 * + (stream_cache_min_percent / 100.0), + stream_cache_size * 1024 * + (stream_cache_seek_min_percent / 100.0)); +} + +/** + * \return 1 on success, 0 if the function was interrupted and -1 on error, or + * if the cache is disabled + */ +int stream_enable_cache(stream_t **stream, int64_t size, int64_t min, + int64_t seek_limit) +{ + stream_t *orig = *stream; + + if (orig->mode != STREAM_READ) + return 1; + + // Can't handle a loaded buffer. + orig->buf_len = orig->buf_pos = 0; + + stream_t *cache = new_stream(); + cache->type = STREAMTYPE_CACHE; + cache->uncached_type = orig->type; + cache->uncached_stream = orig; + cache->flags |= MP_STREAM_SEEK; + cache->mode = STREAM_READ; + + cache->read_chunk = orig->read_chunk; + cache->url = strdup(orig->url); + cache->mime_type = talloc_strdup(cache, orig->mime_type); + cache->lavf_type = orig->lavf_type; + cache->opts = orig->opts; + cache->sector_size = orig->sector_size; + cache->read_chunk = orig->read_chunk; + cache->cache_size = orig->cache_size; + cache->start_pos = orig->start_pos; + cache->end_pos = orig->end_pos; + + int res = -1; + +#ifdef CONFIG_STREAM_CACHE + res = stream_cache_init(cache, orig, size, min, seek_limit); +#endif + + if (res <= 0) { + cache->uncached_stream = NULL; // don't free original stream + free_stream(cache); + } else { + *stream = cache; + } + return res; +} + /** * Helper function to read 16 bits little-endian and advance pointer */ @@ -789,7 +848,7 @@ unsigned char *stream_read_line(stream_t *s, unsigned char *mem, int max, len = s->buf_len - s->buf_pos; // try to fill the buffer if (len <= 0 && - (!cache_stream_fill_buffer(s) || + (!stream_fill_buffer(s) || (len = s->buf_len - s->buf_pos) <= 0)) break; end = find_newline(s->buffer + s->buf_pos, len, utf16); diff --git a/stream/stream.h b/stream/stream.h index 4ae0e1e6a0..43ed6280bf 100644 --- a/stream/stream.h +++ b/stream/stream.h @@ -53,6 +53,7 @@ #define STREAMTYPE_RADIO 19 #define STREAMTYPE_BLURAY 20 #define STREAMTYPE_AVDEVICE 21 +#define STREAMTYPE_CACHE 22 #define STREAM_BUFFER_SIZE 2048 #define STREAM_MAX_SECTOR_SIZE (8 * 1024) @@ -167,7 +168,8 @@ typedef struct stream { int fd; // file descriptor, see man open(2) int type; // see STREAMTYPE_* - int flags; + int uncached_type; // like (uncached_stream ? uncached_stream->type : type) + int flags; // MP_STREAM_SEEK_* or'ed flags int sector_size; // sector size (seek will be aligned on this size if non 0) int read_chunk; // maximum amount of data to read at once to limit latency (0 for default) unsigned int buf_pos, buf_len; @@ -176,9 +178,6 @@ typedef struct stream { int mode; //STREAM_READ or STREAM_WRITE bool streaming; // known to be a network stream if true int cache_size; // cache size in KB to use if enabled - bool cached; // cache active - unsigned int cache_pid; - void *cache_data; void *priv; // used for DVD, TV, RTSP etc char *url; // strdup() of filename/url char *mime_type; // when HTTP streaming is used @@ -191,6 +190,8 @@ typedef struct stream { FILE *capture_file; char *capture_filename; + + struct stream *uncached_stream; } stream_t; #ifdef CONFIG_NETWORKING @@ -198,36 +199,26 @@ typedef struct stream { #endif int stream_fill_buffer(stream_t *s); -int stream_seek_long(stream_t *s, int64_t pos); void stream_set_capture_file(stream_t *s, const char *filename); void stream_capture_write(stream_t *s); -#ifdef CONFIG_STREAM_CACHE -int stream_enable_cache_percent(stream_t *stream, int64_t stream_cache_size, +int stream_enable_cache_percent(stream_t **stream, int64_t stream_cache_size, float stream_cache_min_percent, float stream_cache_seek_min_percent); -int stream_enable_cache(stream_t *stream, int64_t size, int64_t min, - int64_t prefill); -int cache_stream_fill_buffer(stream_t *s); -int cache_stream_seek_long(stream_t *s, int64_t pos); -#else -// no cache, define wrappers: -#define cache_stream_fill_buffer(x) stream_fill_buffer(x) -#define cache_stream_seek_long(x, y) stream_seek_long(x, y) -#define stream_enable_cache(x, y, z, w) 1 -#define stream_enable_cache_percent(x, y, z, w) 1 -#endif +int stream_enable_cache(stream_t **stream, int64_t size, int64_t min, + int64_t seek_limit); + +// Internal +int stream_cache_init(stream_t *cache, stream_t *stream, int64_t size, + int64_t min, int64_t seek_limit); + int stream_write_buffer(stream_t *s, unsigned char *buf, int len); inline static int stream_read_char(stream_t *s) { return (s->buf_pos < s->buf_len) ? s->buffer[s->buf_pos++] : - (cache_stream_fill_buffer(s) ? s->buffer[s->buf_pos++] : -256); -// if(s->buf_pos<s->buf_len) return s->buffer[s->buf_pos++]; -// stream_fill_buffer(s); -// if(s->buf_pos<s->buf_len) return s->buffer[s->buf_pos++]; -// return 0; // EOF + (stream_fill_buffer(s) ? s->buffer[s->buf_pos++] : -256); } inline static unsigned int stream_read_word(stream_t *s) @@ -333,10 +324,9 @@ void stream_set_interrupt_callback(int (*cb)(struct input_ctx *, int), /// Call the interrupt checking callback if there is one and /// wait for time milliseconds int stream_check_interrupt(int time); -/// Internal read function bypassing the stream buffer -int stream_read_internal(stream_t *s, void *buf, int len); -/// Internal seek function bypassing the stream buffer -int stream_seek_internal(stream_t *s, int64_t newpos); + +int stream_read_unbuffered(stream_t *s, void *buf, int len); +int stream_seek_unbuffered(stream_t *s, int64_t newpos); bool stream_manages_timeline(stream_t *s); diff --git a/stream/stream_radio.c b/stream/stream_radio.c index eea5fdca04..49a3353c2a 100644 --- a/stream/stream_radio.c +++ b/stream/stream_radio.c @@ -923,19 +923,6 @@ static int open_s(stream_t *stream,int mode, void* opts, int* file_format) { return STREAM_ERROR; } -#if defined(CONFIG_RADIO_CAPTURE) && defined(CONFIG_STREAM_CACHE) - if(priv->do_capture){ - //5 second cache - if(!stream_enable_cache(stream,5*priv->audio_in.samplerate*priv->audio_in.channels* - priv->audio_in.bytes_per_sample,2*priv->audio_in.samplerate*priv->audio_in.channels* - priv->audio_in.bytes_per_sample,priv->audio_in.blocksize)) { - mp_tmsg(MSGT_RADIO, MSGL_ERR, "[radio] Call to stream_enable_cache failed: %s\n",strerror(errno)); - close_s(stream); - return STREAM_ERROR; - } - } -#endif - set_volume(priv,priv->radio_param->volume); return STREAM_OK; |