diff options
Diffstat (limited to 'stream/cache.c')
-rw-r--r-- | stream/cache.c | 590 |
1 files changed, 590 insertions, 0 deletions
diff --git a/stream/cache.c b/stream/cache.c new file mode 100644 index 0000000000..a3134b7c4a --- /dev/null +++ b/stream/cache.c @@ -0,0 +1,590 @@ +/* + * This file is part of MPlayer. + * + * MPlayer is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * MPlayer is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with MPlayer; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +// Time in seconds the main thread waits for the cache thread. On wakeups, the +// code checks for user requested aborts and also prints warnings that the +// cache is being slow. +#define CACHE_WAIT_TIME 0.5 + +// The time the cache sleeps in idle mode. This controls how often the cache +// retries reading from the stream after EOF has reached (in case the stream is +// actually readable again, for example if data has been appended to a file). +// Note that if this timeout is too low, the player will waste too much CPU +// when player is paused. +#define CACHE_IDLE_SLEEP_TIME 1.0 + +// Time in seconds the cache updates "cached" controls. Note that idle mode +// will block the cache from doing this, and this timeout is honored only if +// the cache is active. +#define CACHE_UPDATE_CONTROLS_TIME 0.1 + + +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <sys/types.h> +#include <unistd.h> +#include <errno.h> +#include <assert.h> +#include <pthread.h> + +#include <libavutil/common.h> + +#include "config.h" + +#include "osdep/timer.h" + +#include "core/mp_msg.h" + +#include "stream.h" +#include "core/mp_common.h" + + +// Note: (struct priv*)(cache->priv)->cache == cache +struct priv { + pthread_t cache_thread; + bool cache_thread_running; + pthread_mutex_t mutex; + pthread_cond_t wakeup; + + // Constants (as long as cache thread is running) + unsigned char *buffer; // base pointer of the allocated buffer memory + int64_t buffer_size; // size of the allocated buffer memory + int64_t back_size; // keep back_size amount of old bytes for backward seek + int64_t fill_limit; // we should fill buffer only if space>=fill_limit + int64_t seek_limit; // keep filling cache if distance is less that seek limit + struct byte_meta *bm; // additional per-byte metadata + + // Owned by the main thread + stream_t *cache; // wrapper stream, used by demuxer etc. + + // Owned by the cache thread + stream_t *stream; // "real" stream, used to read from the source media + + // All the following members are shared between the threads. + // You must lock the mutex to access them. + + // Ringbuffer + int64_t min_filepos; // range of file that is cached in the buffer + int64_t max_filepos; // ... max_filepos being the last read position + bool eof; // true if max_filepos = EOF + int64_t offset; // buffer[offset] correponds to max_filepos + + bool idle; // cache thread has stopped reading + + int64_t read_filepos; // client read position (mirrors cache->pos) + int control; // requested STREAM_CTRL_... or CACHE_CTRL_... + void *control_arg; // temporary for executing STREAM_CTRLs + int control_res; + bool control_flush; + + // Cached STREAM_CTRLs + double stream_time_length; + double stream_start_time; + int64_t stream_size; + bool stream_manages_timeline; + int stream_cache_idle; + int stream_cache_fill; +}; + +// Store additional per-byte metadata. Since per-byte would be way too +// inefficient, store it only for every BYTE_META_CHUNK_SIZE byte. +struct byte_meta { + float stream_pts; +}; + +enum { + BYTE_META_CHUNK_SIZE = 16 * 1024, + + CACHE_INTERRUPTED = -1, + + CACHE_CTRL_NONE = 0, + CACHE_CTRL_QUIT = -1, + CACHE_CTRL_PING = -2, +}; + +// pthread_cond_timedwait() with a relative timeout in seconds +static int cond_timed_wait(pthread_cond_t *cond, pthread_mutex_t *mutex, + double timeout) +{ + struct timespec ts; + clock_gettime(CLOCK_REALTIME, &ts); + unsigned long seconds = (int)timeout; + unsigned long nsecs = (timeout - seconds) * 1000000000UL; + if (nsecs + ts.tv_nsec >= 1000000000UL) { + seconds += 1; + nsecs -= 1000000000UL; + } + ts.tv_sec += seconds; + ts.tv_nsec += nsecs; + return pthread_cond_timedwait(cond, mutex, &ts); +} + +// Used by the main thread to wakeup the cache thread, and to wait for the +// cache thread. The cache mutex has to be locked when calling this function. +// *retries should be set to 0 on the first call. +// Returns CACHE_INTERRUPTED if the caller is supposed to abort. +static int cache_wakeup_and_wait(struct priv *s, int *retries) +{ + if (stream_check_interrupt(0)) + return CACHE_INTERRUPTED; + + // Print a "more severe" warning after waiting 1 second and no new data + // (time calculation assumes the number of spurious wakeups is very low) + if ((*retries) * CACHE_WAIT_TIME >= 1.0) { + mp_msg(MSGT_CACHE, MSGL_ERR, "Cache keeps not responding.\n"); + } else if (*retries > 0) { + mp_msg(MSGT_CACHE, MSGL_WARN, + "Cache is not responding - slow/stuck network connection?\n"); + } + (*retries) += 1; + + pthread_cond_signal(&s->wakeup); + cond_timed_wait(&s->wakeup, &s->mutex, CACHE_WAIT_TIME); + + return 0; +} + +// Runs in the cache thread +static void cache_drop_contents(struct priv *s) +{ + s->offset = s->min_filepos = s->max_filepos = s->read_filepos; + s->eof = 0; +} + +// Runs in the main thread +// mutex must be held, but is sometimes temporarily dropped +static int cache_read(struct priv *s, unsigned char *buf, int size) +{ + if (size <= 0) + return 0; + + int retry = 0; + while (s->read_filepos >= s->max_filepos || + s->read_filepos < s->min_filepos) + { + if (s->eof && s->read_filepos >= s->max_filepos) + return 0; + if (cache_wakeup_and_wait(s, &retry) == CACHE_INTERRUPTED) + return 0; + } + + int64_t newb = s->max_filepos - s->read_filepos; // new bytes in the buffer + + int64_t pos = s->read_filepos - s->offset; // file pos to buffer memory pos + if (pos < 0) + pos += s->buffer_size; + else if (pos >= s->buffer_size) + pos -= s->buffer_size; + + if (newb > s->buffer_size - pos) + newb = s->buffer_size - pos; // handle wrap... + + newb = FFMIN(newb, size); + + memcpy(buf, &s->buffer[pos], newb); + + s->read_filepos += newb; + return newb; +} + +// Runs in the cache thread. +// Returns true if reading was attempted, and the mutex was shortly unlocked. +static bool cache_fill(struct priv *s) +{ + int64_t read = s->read_filepos; + int len; + + if (read < s->min_filepos || read > s->max_filepos) { + // seek... + mp_msg(MSGT_CACHE, MSGL_DBG2, + "Out of boundaries... seeking to 0x%" PRIX64 " \n", read); + // drop cache contents only if seeking backward or too much fwd. + // This is also done for on-disk files, since it loses the backseek cache. + // That in turn can cause major bandwidth increase and performance + // issues with e.g. mov or badly interleaved files + if (read < s->min_filepos || read >= s->max_filepos + s->seek_limit) { + mp_msg(MSGT_CACHE, MSGL_V, "Dropping cache at pos %"PRId64", " + "cached range: %"PRId64"-%"PRId64".\n", read, + s->min_filepos, s->max_filepos); + cache_drop_contents(s); + stream_seek(s->stream, read); + } + } + + // number of buffer bytes which should be preserved in backwards direction + int64_t back = av_clip64(read - s->min_filepos, 0, s->back_size); + + // number of buffer bytes that are valid and can be read + int64_t newb = FFMAX(s->max_filepos - read, 0); + + // max. number of bytes that can be written (starting from max_filepos) + int64_t space = s->buffer_size - (newb + back); + + // offset into the buffer that maps to max_filepos + int pos = s->max_filepos - s->offset; + if (pos >= s->buffer_size) + pos -= s->buffer_size; // wrap-around + + if (space < s->fill_limit) { + s->idle = true; + return false; + } + + // limit to end of buffer (without wrapping) + if (pos + space >= s->buffer_size) + space = s->buffer_size - pos; + + // limit read size (or else would block and read the entire buffer in 1 call) + space = FFMIN(space, s->stream->read_chunk); + + // back+newb+space <= buffer_size + int64_t back2 = s->buffer_size - (space + newb); // max back size + if (s->min_filepos < (read - back2)) + s->min_filepos = read - back2; + + // The read call might take a long time and block, so drop the lock. + pthread_mutex_unlock(&s->mutex); + len = stream_read_partial(s->stream, &s->buffer[pos], space); + pthread_mutex_lock(&s->mutex); + + int m1 = pos / BYTE_META_CHUNK_SIZE; + int m2 = (s->buffer_size + pos - 1) % s->buffer_size / BYTE_META_CHUNK_SIZE; + if (m1 != m2) { + double pts; + if (stream_control(s->stream, STREAM_CTRL_GET_CURRENT_TIME, &pts) <= 0) + pts = MP_NOPTS_VALUE; + s->bm[m1] = (struct byte_meta) { + .stream_pts = pts, + }; + } + + s->max_filepos += len; + if (pos + len == s->buffer_size) + s->offset += s->buffer_size; // wrap... + + s->eof = len > 0 ? 0 : 1; + s->idle = s->eof; + + pthread_cond_signal(&s->wakeup); + + return true; +} + +static void update_cached_controls(struct priv *s) +{ + double d; + s->stream_time_length = 0; + if (stream_control(s->stream, STREAM_CTRL_GET_TIME_LENGTH, &d) == STREAM_OK) + s->stream_time_length = d; + s->stream_start_time = MP_NOPTS_VALUE; + if (stream_control(s->stream, STREAM_CTRL_GET_START_TIME, &d) == STREAM_OK) + s->stream_start_time = d; + s->stream_manages_timeline = false; + if (stream_control(s->stream, STREAM_CTRL_MANAGES_TIMELINE, NULL) == STREAM_OK) + s->stream_manages_timeline = true; + stream_update_size(s->stream); + s->stream_size = s->stream->end_pos; +} + +// the core might call these every frame, so cache them... +static int cache_get_cached_control(stream_t *cache, int cmd, void *arg) +{ + struct priv *s = cache->priv; + switch (cmd) { + case STREAM_CTRL_GET_CACHE_SIZE: + *(int64_t *)arg = s->buffer_size; + return STREAM_OK; + case STREAM_CTRL_GET_CACHE_FILL: + *(int64_t *)arg = s->max_filepos - s->read_filepos; + return STREAM_OK; + case STREAM_CTRL_GET_CACHE_IDLE: + *(int *)arg = s->idle; + return STREAM_OK; + case STREAM_CTRL_GET_TIME_LENGTH: + *(double *)arg = s->stream_time_length; + return s->stream_time_length ? STREAM_OK : STREAM_UNSUPPORTED; + case STREAM_CTRL_GET_START_TIME: + *(double *)arg = s->stream_start_time; + return s->stream_start_time != + MP_NOPTS_VALUE ? STREAM_OK : STREAM_UNSUPPORTED; + case STREAM_CTRL_GET_SIZE: + *(int64_t *)arg = s->stream_size; + return STREAM_OK; + case STREAM_CTRL_MANAGES_TIMELINE: + return s->stream_manages_timeline ? STREAM_OK : STREAM_UNSUPPORTED; + case STREAM_CTRL_GET_CURRENT_TIME: { + if (s->read_filepos >= s->min_filepos && + s->read_filepos <= s->max_filepos) + { + int64_t pos = s->read_filepos - s->offset; + if (pos < 0) + pos += s->buffer_size; + else if (pos >= s->buffer_size) + pos -= s->buffer_size; + *(double *)arg = s->bm[pos / BYTE_META_CHUNK_SIZE].stream_pts; + return STREAM_OK; + } + break; + } + } + return STREAM_ERROR; +} + +static bool control_needs_flush(int stream_ctrl) +{ + switch (stream_ctrl) { + case STREAM_CTRL_SEEK_TO_TIME: + case STREAM_CTRL_SEEK_TO_CHAPTER: + case STREAM_CTRL_SET_ANGLE: + return true; + } + return false; +} + +// Runs in the cache thread +static void cache_execute_control(struct priv *s) +{ + uint64_t old_pos = stream_tell(s->stream); + + s->control_res = stream_control(s->stream, s->control, s->control_arg); + s->control_flush = false; + + bool pos_changed = old_pos != stream_tell(s->stream); + bool ok = s->control_res == STREAM_OK; + if (pos_changed && !ok) { + mp_msg(MSGT_STREAM, MSGL_ERR, "STREAM_CTRL changed stream pos but " + "returned error, this is not allowed!\n"); + } else if (pos_changed || (ok && control_needs_flush(s->control))) { + mp_msg(MSGT_CACHE, MSGL_V, "Dropping cache due to control()\n"); + s->read_filepos = stream_tell(s->stream); + s->eof = false; + s->control_flush = true; + cache_drop_contents(s); + } + + s->control = CACHE_CTRL_NONE; + pthread_cond_signal(&s->wakeup); +} + +static void *cache_thread(void *arg) +{ + struct priv *s = arg; + pthread_mutex_lock(&s->mutex); + update_cached_controls(s); + double last = mp_time_sec(); + while (s->control != CACHE_CTRL_QUIT) { + if (mp_time_sec() - last > CACHE_UPDATE_CONTROLS_TIME) { + update_cached_controls(s); + last = mp_time_sec(); + } + if (s->control > 0) { + cache_execute_control(s); + } else { + cache_fill(s); + } + if (s->control == CACHE_CTRL_PING) { + pthread_cond_signal(&s->wakeup); + s->control = CACHE_CTRL_NONE; + } + if (s->idle && s->control == CACHE_CTRL_NONE) + cond_timed_wait(&s->wakeup, &s->mutex, CACHE_IDLE_SLEEP_TIME); + } + pthread_cond_signal(&s->wakeup); + pthread_mutex_unlock(&s->mutex); + mp_msg(MSGT_CACHE, MSGL_V, "Cache exiting...\n"); + return NULL; +} + +static int cache_fill_buffer(struct stream *cache, char *buffer, int max_len) +{ + struct priv *s = cache->priv; + assert(s->cache_thread_running); + + pthread_mutex_lock(&s->mutex); + + if (cache->pos != s->read_filepos) + mp_msg(MSGT_CACHE, MSGL_ERR, + "!!! read_filepos differs !!! report this bug...\n"); + + int t = cache_read(s, buffer, max_len); + // wakeup the cache thread, possibly make it read more data ahead + pthread_cond_signal(&s->wakeup); + pthread_mutex_unlock(&s->mutex); + return t; +} + +static int cache_seek(stream_t *cache, int64_t pos) +{ + struct priv *s = cache->priv; + assert(s->cache_thread_running); + + pthread_mutex_lock(&s->mutex); + + mp_msg(MSGT_CACHE, MSGL_DBG2, "CACHE2_SEEK: 0x%" PRIX64 " <= 0x%" PRIX64 + " (0x%" PRIX64 ") <= 0x%" PRIX64 " \n", + s->min_filepos, pos, s->read_filepos, s->max_filepos); + + cache->pos = s->read_filepos = pos; + s->eof = false; // so that cache_read() will actually wait for new data + pthread_cond_signal(&s->wakeup); + pthread_mutex_unlock(&s->mutex); + + return 1; +} + +static int cache_control(stream_t *cache, int cmd, void *arg) +{ + struct priv *s = cache->priv; + int r = STREAM_ERROR; + + assert(cmd > 0); + + pthread_mutex_lock(&s->mutex); + + r = cache_get_cached_control(cache, cmd, arg); + if (r != STREAM_ERROR) + goto done; + + s->control = cmd; + s->control_arg = arg; + int retry = 0; + while (s->control != CACHE_CTRL_NONE) { + if (cache_wakeup_and_wait(s, &retry) == CACHE_INTERRUPTED) { + s->eof = 1; + r = STREAM_UNSUPPORTED; + goto done; + } + } + r = s->control_res; + if (s->control_flush) { + cache->pos = s->read_filepos; + cache->eof = 0; + cache->buf_pos = cache->buf_len = 0; + } + +done: + pthread_mutex_unlock(&s->mutex); + return r; +} + +static void cache_uninit(stream_t *cache) +{ + struct priv *s = cache->priv; + if (s->cache_thread_running) { + mp_msg(MSGT_CACHE, MSGL_V, "Terminating cache...\n"); + pthread_mutex_lock(&s->mutex); + s->control = CACHE_CTRL_QUIT; + pthread_cond_signal(&s->wakeup); + pthread_mutex_unlock(&s->mutex); + pthread_join(s->cache_thread, NULL); + } + pthread_mutex_destroy(&s->mutex); + pthread_cond_destroy(&s->wakeup); + free(s->buffer); + talloc_free(s); +} + +// return 1 on success, 0 if the function was interrupted and -1 on error, or +// if the cache is disabled +int stream_cache_init(stream_t *cache, stream_t *stream, int64_t size, + int64_t min, int64_t seek_limit) +{ + if (size < 1) + return -1; + + mp_tmsg(MSGT_NETWORK, MSGL_INFO, "Cache size set to %" PRId64 " KiB\n", + size / 1024); + + if (size > SIZE_MAX) { + mp_msg(MSGT_CACHE, MSGL_FATAL, + "Cache size larger than max. allocation size\n"); + return -1; + } + + struct priv *s = talloc_zero(NULL, struct priv); + + //64kb min_size + s->buffer_size = FFMAX(size, 64 * 1024); + s->fill_limit = 16 * 1024; + s->back_size = s->buffer_size / 2; + + s->buffer = malloc(s->buffer_size); + s->bm = malloc((s->buffer_size / BYTE_META_CHUNK_SIZE + 1) * + sizeof(struct byte_meta)); + if (!s->buffer || !s->bm) { + mp_msg(MSGT_CACHE, MSGL_ERR, "Failed to allocate cache buffer.\n"); + free(s->buffer); + free(s->bm); + talloc_free(s); + return -1; + } + + pthread_mutex_init(&s->mutex, NULL); + pthread_cond_init(&s->wakeup, NULL); + + cache->priv = s; + s->cache = cache; + s->stream = stream; + + cache->seek = cache_seek; + cache->fill_buffer = cache_fill_buffer; + cache->control = cache_control; + cache->close = cache_uninit; + + s->seek_limit = seek_limit; + //make sure that we won't wait from cache_fill + //more data than it is allowed to fill + if (s->seek_limit > s->buffer_size - s->fill_limit) + s->seek_limit = s->buffer_size - s->fill_limit; + if (min > s->buffer_size - s->fill_limit) + min = s->buffer_size - s->fill_limit; + + if (pthread_create(&s->cache_thread, NULL, cache_thread, s) != 0) { + mp_msg(MSGT_CACHE, MSGL_ERR, "Starting cache process/thread failed: %s.\n", + strerror(errno)); + return -1; + } + s->cache_thread_running = true; + + // wait until cache is filled at least prefill_init % + for (;;) { + if (stream_check_interrupt(0)) + return 0; + int64_t fill; + int idle; + if (stream_control(s->cache, STREAM_CTRL_GET_CACHE_FILL, &fill) < 0) + break; + if (stream_control(s->cache, STREAM_CTRL_GET_CACHE_IDLE, &idle) < 0) + break; + mp_tmsg(MSGT_CACHE, MSGL_STATUS, "\rCache fill: %5.2f%% " + "(%" PRId64 " bytes) ", 100.0 * fill / s->buffer_size, fill); + if (fill >= min) + break; + if (idle) + break; // file is smaller than prefill size + // Wake up if the cache is done reading some data (or on timeout/abort) + pthread_mutex_lock(&s->mutex); + s->control = CACHE_CTRL_PING; + pthread_cond_signal(&s->wakeup); + cache_wakeup_and_wait(s, &(int){0}); + pthread_mutex_unlock(&s->mutex); + } + mp_msg(MSGT_CACHE, MSGL_STATUS, "\n"); + return 1; +} |