/* * This file is part of MPlayer. * * MPlayer is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. * * MPlayer is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License along * with MPlayer; if not, write to the Free Software Foundation, Inc., * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */ // Time in seconds the main thread waits for the cache thread. On wakeups, the // code checks for user requested aborts and also prints warnings that the // cache is being slow. #define CACHE_WAIT_TIME 0.5 // The time the cache sleeps in idle mode. This controls how often the cache // retries reading from the stream after EOF has reached (in case the stream is // actually readable again, for example if data has been appended to a file). // Note that if this timeout is too low, the player will waste too much CPU // when player is paused. #define CACHE_IDLE_SLEEP_TIME 1.0 // Time in seconds the cache updates "cached" controls. Note that idle mode // will block the cache from doing this, and this timeout is honored only if // the cache is active. #define CACHE_UPDATE_CONTROLS_TIME 2.0 // Time in seconds the cache prints a new message at all. #define CACHE_NO_SPAM 5.0 #include #include #include #include #include #include #include #include #include #include #include #include "config.h" #include "osdep/timer.h" #include "osdep/threads.h" #include "common/msg.h" #include "stream.h" #include "common/common.h" // Note: (struct priv*)(cache->priv)->cache == cache struct priv { pthread_t cache_thread; bool cache_thread_running; pthread_mutex_t mutex; pthread_cond_t wakeup; // Constants (as long as cache thread is running) // Some of these might actually be changed by a synced cache resize. unsigned char *buffer; // base pointer of the allocated buffer memory int64_t buffer_size; // size of the allocated buffer memory int64_t back_size; // keep back_size amount of old bytes for backward seek int64_t seek_limit; // keep filling cache if distance is less that seek limit struct byte_meta *bm; // additional per-byte metadata bool seekable; // underlying stream is seekable struct mp_log *log; // Owned by the main thread stream_t *cache; // wrapper stream, used by demuxer etc. double last_warn_time; // Owned by the cache thread stream_t *stream; // "real" stream, used to read from the source media // All the following members are shared between the threads. // You must lock the mutex to access them. // Ringbuffer int64_t min_filepos; // range of file that is cached in the buffer int64_t max_filepos; // ... max_filepos being the last read position bool eof; // true if max_filepos = EOF int64_t offset; // buffer[WRAP(s->max_filepos - offset)] corresponds // to the byte at max_filepos (must be wrapped by // buffer_size) bool idle; // cache thread has stopped reading int64_t reads; // number of actual read attempts performed int64_t read_filepos; // client read position (mirrors cache->pos) int control; // requested STREAM_CTRL_... or CACHE_CTRL_... void *control_arg; // temporary for executing STREAM_CTRLs int control_res; bool control_flush; // Cached STREAM_CTRLs double stream_time_length; double stream_start_time; int64_t stream_size; bool stream_manages_timeline; unsigned int stream_num_chapters; int stream_cache_idle; int stream_cache_fill; char **stream_metadata; char *disc_name; }; // Store additional per-byte metadata. Since per-byte would be way too // inefficient, store it only for every BYTE_META_CHUNK_SIZE byte. struct byte_meta { float stream_pts; }; enum { BYTE_META_CHUNK_SIZE = 8 * 1024, CACHE_INTERRUPTED = -1, CACHE_CTRL_NONE = 0, CACHE_CTRL_QUIT = -1, CACHE_CTRL_PING = -2, // we should fill buffer only if space>=FILL_LIMIT FILL_LIMIT = FFMAX(16 * 1024, BYTE_META_CHUNK_SIZE * 2), }; static int64_t mp_clipi64(int64_t val, int64_t min, int64_t max) { val = FFMIN(val, max); val = FFMAX(val, min); return val; } // Used by the main thread to wakeup the cache thread, and to wait for the // cache thread. The cache mutex has to be locked when calling this function. // *retry_time should be set to 0 on the first call. // Returns CACHE_INTERRUPTED if the caller is supposed to abort. static int cache_wakeup_and_wait(struct priv *s, double *retry_time) { if (stream_check_interrupt(0)) return CACHE_INTERRUPTED; double start = mp_time_sec(); if (!s->last_warn_time || start - s->last_warn_time >= CACHE_NO_SPAM) { // Print a "more severe" warning after waiting 1 second and no new data if ((*retry_time) >= 1.0) { MP_ERR(s, "Cache keeps not responding.\n"); s->last_warn_time = start; } else if (*retry_time > 0.1) { MP_WARN(s, "Cache is not responding - slow/stuck network connection?\n"); s->last_warn_time = start; } } pthread_cond_signal(&s->wakeup); mpthread_cond_timed_wait(&s->wakeup, &s->mutex, CACHE_WAIT_TIME); *retry_time += mp_time_sec() - start; return 0; } // Runs in the cache thread static void cache_drop_contents(struct priv *s) { s->offset = s->min_filepos = s->max_filepos = s->read_filepos; s->eof = false; } // Copy at most dst_size from the cache at the given absolute file position pos. // Return number of bytes that could actually be read. // Does not advance the file position, or change anything else. // Can be called from anywhere, as long as the mutex is held. static size_t read_buffer(struct priv *s, unsigned char *dst, size_t dst_size, int64_t pos) { size_t read = 0; while (read < dst_size) { if (pos >= s->max_filepos || pos < s->min_filepos) break; int64_t newb = s->max_filepos - pos; // new bytes in the buffer int64_t bpos = pos - s->offset; // file pos to buffer memory pos if (bpos < 0) { bpos += s->buffer_size; } else if (bpos >= s->buffer_size) { bpos -= s->buffer_size; } if (newb > s->buffer_size - bpos) newb = s->buffer_size - bpos; // handle wrap... newb = MPMIN(newb, dst_size - read); assert(newb >= 0 && read + newb <= dst_size); assert(bpos >= 0 && bpos + newb <= s->buffer_size); memcpy(&dst[read], &s->buffer[bpos], newb); read += newb; pos += newb; } return read; } // Runs in the cache thread. // Returns true if reading was attempted, and the mutex was shortly unlocked. static bool cache_fill(struct priv *s) { int64_t read = s->read_filepos; int len; // drop cache contents only if seeking backward or too much fwd. // This is also done for on-disk files, since it loses the backseek cache. // That in turn can cause major bandwidth increase and performance // issues with e.g. mov or badly interleaved files if (read < s->min_filepos || read > s->max_filepos + s->seek_limit) { MP_VERBOSE(s, "Dropping cache at pos %"PRId64", " "cached range: %"PRId64"-%"PRId64".\n", read, s->min_filepos, s->max_filepos); cache_drop_contents(s); } if (stream_tell(s->stream) != s->max_filepos && s->seekable) { MP_VERBOSE(s, "Seeking underlying stream: %"PRId64" -> %"PRId64"\n", stream_tell(s->stream), s->max_filepos); stream_seek(s->stream, s->max_filepos); } // number of buffer bytes which should be preserved in backwards direction int64_t back = mp_clipi64(read - s->min_filepos, 0, s->back_size); // number of buffer bytes that are valid and can be read int64_t newb = FFMAX(s->max_filepos - read, 0); // max. number of bytes that can be written (starting from max_filepos) int64_t space = s->buffer_size - (newb + back); // offset into the buffer that maps to max_filepos int pos = s->max_filepos - s->offset; if (pos >= s->buffer_size) pos -= s->buffer_size; // wrap-around if (space < FILL_LIMIT) { s->idle = true; s->reads++; // don't stuck main thread return false; } // limit to end of buffer (without wrapping) if (pos + space >= s->buffer_size) space = s->buffer_size - pos; // limit read size (or else would block and read the entire buffer in 1 call) space = FFMIN(space, s->stream->read_chunk); // back+newb+space <= buffer_size int64_t back2 = s->buffer_size - (space + newb); // max back size if (s->min_filepos < (read - back2)) s->min_filepos = read - back2; // The read call might take a long time and block, so drop the lock. pthread_mutex_unlock(&s->mutex); len = stream_read_partial(s->stream, &s->buffer[pos], space); pthread_mutex_lock(&s->mutex); double pts; if (stream_control(s->stream, STREAM_CTRL_GET_CURRENT_TIME, &pts) <= 0) pts = MP_NOPTS_VALUE; for (int64_t b_pos = pos; b_pos < pos + len + BYTE_META_CHUNK_SIZE; b_pos += BYTE_META_CHUNK_SIZE) { s->bm[b_pos / BYTE_META_CHUNK_SIZE] = (struct byte_meta){.stream_pts = pts}; } s->max_filepos += len; if (pos + len == s->buffer_size) s->offset += s->buffer_size; // wrap... s->eof = len <= 0; s->idle = s->eof; s->reads++; if (s->eof) MP_VERBOSE(s, "EOF reached.\n"); pthread_cond_signal(&s->wakeup); return true; } // This is called both during init and at runtime. static int resize_cache(struct priv *s, int64_t size) { int64_t min_size = FILL_LIMIT * 4; int64_t max_size = ((size_t)-1) / 4; int64_t buffer_size = MPMIN(MPMAX(size, min_size), max_size); unsigned char *buffer = malloc(buffer_size); struct byte_meta *bm = calloc(buffer_size / BYTE_META_CHUNK_SIZE + 2, sizeof(struct byte_meta)); if (!buffer || !bm) { free(buffer); free(bm); return STREAM_ERROR; } if (s->buffer) { // Copy & free the old ringbuffer data. // If the buffer is too small, prefer to copy these regions: // 1. Data starting from read_filepos, until cache end size_t read_1 = read_buffer(s, buffer, buffer_size, s->read_filepos); // 2. then data from before read_filepos until cache start // (this one needs to be copied to the end of the ringbuffer) size_t read_2 = 0; if (s->min_filepos < s->read_filepos) { size_t copy_len = buffer_size - read_1; copy_len = MPMIN(copy_len, s->read_filepos - s->min_filepos); assert(copy_len + read_1 <= buffer_size); read_2 = read_buffer(s, buffer + buffer_size - copy_len, copy_len, s->read_filepos - copy_len); // This shouldn't happen, unless copy_len was computed incorrectly. assert(read_2 == copy_len); } // Set it up such that read_1 is at buffer pos 0, and read_2 wraps // around below it, so that it is located at the end of the buffer. s->min_filepos = s->read_filepos - read_2; s->max_filepos = s->read_filepos + read_1; s->offset = s->max_filepos - read_1; } else { cache_drop_contents(s); } free(s->buffer); free(s->bm); s->buffer_size = buffer_size; s->back_size = buffer_size / 2; s->buffer = buffer; s->bm = bm; s->idle = false; s->eof = false; //make sure that we won't wait from cache_fill //more data than it is allowed to fill if (s->seek_limit > s->buffer_size - FILL_LIMIT) s->seek_limit = s->buffer_size - FILL_LIMIT; return STREAM_OK; } static void update_cached_controls(struct priv *s) { unsigned int ui; double d; char **m; char *t; s->stream_time_length = 0; if (stream_control(s->stream, STREAM_CTRL_GET_TIME_LENGTH, &d) == STREAM_OK) s->stream_time_length = d; s->stream_start_time = MP_NOPTS_VALUE; if (stream_control(s->stream, STREAM_CTRL_GET_START_TIME, &d) == STREAM_OK) s->stream_start_time = d; s->stream_manages_timeline = false; if (stream_control(s->stream, STREAM_CTRL_MANAGES_TIMELINE, NULL) == STREAM_OK) s->stream_manages_timeline = true; s->stream_num_chapters = 0; if (stream_control(s->stream, STREAM_CTRL_GET_NUM_CHAPTERS, &ui) == STREAM_OK) s->stream_num_chapters = ui; if (stream_control(s->stream, STREAM_CTRL_GET_METADATA, &m) == STREAM_OK) { talloc_free(s->stream_metadata); s->stream_metadata = talloc_steal(s, m); } if (stream_control(s->stream, STREAM_CTRL_GET_DISC_NAME, &t) == STREAM_OK) { talloc_free(s->disc_name); s->disc_name = talloc_steal(s, t); } stream_update_size(s->stream); s->stream_size = s->stream->end_pos; } // the core might call these every frame, so cache them... static int cache_get_cached_control(stream_t *cache, int cmd, void *arg) { struct priv *s = cache->priv; switch (cmd) { case STREAM_CTRL_GET_CACHE_SIZE: *(int64_t *)arg = s->buffer_size; return STREAM_OK; case STREAM_CTRL_GET_CACHE_FILL: *(int64_t *)arg = s->max_filepos - s->read_filepos; return STREAM_OK; case STREAM_CTRL_GET_CACHE_IDLE: *(int *)arg = s->idle; return STREAM_OK; case STREAM_CTRL_GET_TIME_LENGTH: *(double *)arg = s->stream_time_length; return s->stream_time_length ? STREAM_OK : STREAM_UNSUPPORTED; case STREAM_CTRL_GET_START_TIME: *(double *)arg = s->stream_start_time; return s->stream_start_time != MP_NOPTS_VALUE ? STREAM_OK : STREAM_UNSUPPORTED; case STREAM_CTRL_GET_SIZE: *(int64_t *)arg = s->stream_size; return STREAM_OK; case STREAM_CTRL_MANAGES_TIMELINE: return s->stream_manages_timeline ? STREAM_OK : STREAM_UNSUPPORTED; case STREAM_CTRL_GET_NUM_CHAPTERS: *(unsigned int *)arg = s->stream_num_chapters; return STREAM_OK; case STREAM_CTRL_GET_CURRENT_TIME: { if (s->read_filepos >= s->min_filepos && s->read_filepos <= s->max_filepos && s->min_filepos < s->max_filepos) { int64_t fpos = FFMIN(s->read_filepos, s->max_filepos - 1); int64_t pos = fpos - s->offset; if (pos < 0) pos += s->buffer_size; else if (pos >= s->buffer_size) pos -= s->buffer_size; double pts = s->bm[pos / BYTE_META_CHUNK_SIZE].stream_pts; *(double *)arg = pts; return pts == MP_NOPTS_VALUE ? STREAM_UNSUPPORTED : STREAM_OK; } return STREAM_UNSUPPORTED; } case STREAM_CTRL_GET_METADATA: { if (s->stream_metadata && s->stream_metadata[0]) { char **m = talloc_new(NULL); int num_m = 0; for (int n = 0; s->stream_metadata[n]; n++) { char *t = talloc_strdup(m, s->stream_metadata[n]); MP_TARRAY_APPEND(NULL, m, num_m, t); } MP_TARRAY_APPEND(NULL, m, num_m, NULL); MP_TARRAY_APPEND(NULL, m, num_m, NULL); *(char ***)arg = m; return STREAM_OK; } return STREAM_UNSUPPORTED; } case STREAM_CTRL_GET_DISC_NAME: { if (!s->disc_name) return STREAM_UNSUPPORTED; *(char **)arg = talloc_strdup(NULL, s->disc_name); return STREAM_OK; } case STREAM_CTRL_RESUME_CACHE: s->idle = s->eof = false; pthread_cond_signal(&s->wakeup); return STREAM_OK; } return STREAM_ERROR; } static bool control_needs_flush(int stream_ctrl) { switch (stream_ctrl) { case STREAM_CTRL_SEEK_TO_TIME: case STREAM_CTRL_SET_ANGLE: case STREAM_CTRL_SET_CURRENT_TITLE: return true; } return false; } // Runs in the cache thread static void cache_execute_control(struct priv *s) { uint64_t old_pos = stream_tell(s->stream); s->control_flush = false; switch (s->control) { case STREAM_CTRL_SET_CACHE_SIZE: s->control_res = resize_cache(s, *(int64_t *)s->control_arg); break; default: s->control_res = stream_control(s->stream, s->control, s->control_arg); } bool pos_changed = old_pos != stream_tell(s->stream); bool ok = s->control_res == STREAM_OK; if (pos_changed && !ok) { MP_ERR(s, "STREAM_CTRL changed stream pos but " "returned error, this is not allowed!\n"); } else if (pos_changed || (ok && control_needs_flush(s->control))) { MP_VERBOSE(s, "Dropping cache due to control()\n"); s->read_filepos = stream_tell(s->stream); s->control_flush = true; cache_drop_contents(s); } update_cached_controls(s); s->control = CACHE_CTRL_NONE; pthread_cond_signal(&s->wakeup); } static void *cache_thread(void *arg) { struct priv *s = arg; pthread_mutex_lock(&s->mutex); update_cached_controls(s); double last = mp_time_sec(); while (s->control != CACHE_CTRL_QUIT) { if (mp_time_sec() - last > CACHE_UPDATE_CONTROLS_TIME) { update_cached_controls(s); last = mp_time_sec(); } if (s->control > 0) { cache_execute_control(s); } else { cache_fill(s); } if (s->control == CACHE_CTRL_PING) { pthread_cond_signal(&s->wakeup); s->control = CACHE_CTRL_NONE; } if (s->idle && s->control == CACHE_CTRL_NONE) mpthread_cond_timed_wait(&s->wakeup, &s->mutex, CACHE_IDLE_SLEEP_TIME); } pthread_cond_signal(&s->wakeup); pthread_mutex_unlock(&s->mutex); MP_VERBOSE(s, "Cache exiting...\n"); return NULL; } static int cache_fill_buffer(struct stream *cache, char *buffer, int max_len) { struct priv *s = cache->priv; assert(s->cache_thread_running); pthread_mutex_lock(&s->mutex); if (cache->pos != s->read_filepos) MP_ERR(s, "!!! read_filepos differs !!! report this bug...\n"); int readb = 0; if (max_len > 0) { double retry_time = 0; int64_t retry = s->reads - 1; // try at least 1 read on EOF while (1) { readb = read_buffer(s, buffer, max_len, s->read_filepos); s->read_filepos += readb; if (readb > 0) break; if (s->eof && s->read_filepos >= s->max_filepos && s->reads >= retry) break; if (cache_wakeup_and_wait(s, &retry_time) == CACHE_INTERRUPTED) break; } } // wakeup the cache thread, possibly make it read more data ahead pthread_cond_signal(&s->wakeup); pthread_mutex_unlock(&s->mutex); return readb; } static int cache_seek(stream_t *cache, int64_t pos) { struct priv *s = cache->priv; assert(s->cache_thread_running); int r = 1; pthread_mutex_lock(&s->mutex); MP_DBG(s, "request seek: %" PRId64 " <= to=%" PRId64 " (cur=%" PRId64 ") <= %" PRId64 " \n", s->min_filepos, pos, s->read_filepos, s->max_filepos); if (!s->seekable && pos > s->max_filepos) { MP_ERR(s, "Attempting to seek past cached data in unseekable stream.\n"); r = 0; } else if (!s->seekable && pos < s->min_filepos) { MP_ERR(s, "Attempting to seek before cached data in unseekable stream.\n"); r = 0; } else { cache->pos = s->read_filepos = pos; s->eof = false; // so that cache_read() will actually wait for new data pthread_cond_signal(&s->wakeup); } pthread_mutex_unlock(&s->mutex); return r; } static int cache_control(stream_t *cache, int cmd, void *arg) { struct priv *s = cache->priv; int r = STREAM_ERROR; assert(cmd > 0); pthread_mutex_lock(&s->mutex); r = cache_get_cached_control(cache, cmd, arg); if (r != STREAM_ERROR) goto done; MP_VERBOSE(s, "[cache] blocking for STREAM_CTRL %d\n", cmd); s->control = cmd; s->control_arg = arg; double retry = 0; while (s->control != CACHE_CTRL_NONE) { if (cache_wakeup_and_wait(s, &retry) == CACHE_INTERRUPTED) { s->eof = 1; r = STREAM_UNSUPPORTED; goto done; } } r = s->control_res; if (s->control_flush) { cache->pos = s->read_filepos; cache->eof = 0; cache->buf_pos = cache->buf_len = 0; } done: pthread_mutex_unlock(&s->mutex); return r; } static void cache_uninit(stream_t *cache) { struct priv *s = cache->priv; if (s->cache_thread_running) { MP_VERBOSE(s, "Terminating cache...\n"); pthread_mutex_lock(&s->mutex); s->control = CACHE_CTRL_QUIT; pthread_cond_signal(&s->wakeup); pthread_mutex_unlock(&s->mutex); pthread_join(s->cache_thread, NULL); } pthread_mutex_destroy(&s->mutex); pthread_cond_destroy(&s->wakeup); free(s->buffer); free(s->bm); talloc_free(s); } // return 1 on success, 0 if the function was interrupted and -1 on error, or // if the cache is disabled int stream_cache_init(stream_t *cache, stream_t *stream, int64_t size, int64_t min, int64_t seek_limit) { if (size < 1) return -1; struct priv *s = talloc_zero(NULL, struct priv); s->log = cache->log; s->seek_limit = seek_limit; if (resize_cache(s, size) != STREAM_OK) { MP_ERR(s, "Failed to allocate cache buffer.\n"); talloc_free(s); return -1; } MP_INFO(cache, "Cache size set to %" PRId64 " KiB\n", s->buffer_size / 1024); pthread_mutex_init(&s->mutex, NULL); pthread_cond_init(&s->wakeup, NULL); cache->priv = s; s->cache = cache; s->stream = stream; cache->seek = cache_seek; cache->fill_buffer = cache_fill_buffer; cache->control = cache_control; cache->close = cache_uninit; if (min > s->buffer_size - FILL_LIMIT) min = s->buffer_size - FILL_LIMIT; s->seekable = (stream->flags & MP_STREAM_SEEK) == MP_STREAM_SEEK && stream->end_pos > 0; if (pthread_create(&s->cache_thread, NULL, cache_thread, s) != 0) { MP_ERR(s, "Starting cache process/thread failed: %s.\n", strerror(errno)); return -1; } s->cache_thread_running = true; // wait until cache is filled at least prefill_init % for (;;) { if (stream_check_interrupt(0)) return 0; int64_t fill; int idle; if (stream_control(s->cache, STREAM_CTRL_GET_CACHE_FILL, &fill) < 0) break; if (stream_control(s->cache, STREAM_CTRL_GET_CACHE_IDLE, &idle) < 0) break; MP_INFO(s, "\rCache fill: %5.2f%% " "(%" PRId64 " bytes) ", 100.0 * fill / s->buffer_size, fill); if (fill >= min) break; if (idle) break; // file is smaller than prefill size // Wake up if the cache is done reading some data (or on timeout/abort) pthread_mutex_lock(&s->mutex); s->control = CACHE_CTRL_PING; pthread_cond_signal(&s->wakeup); cache_wakeup_and_wait(s, &(double){0}); pthread_mutex_unlock(&s->mutex); } MP_INFO(s, "\n"); return 1; }