diff options
Diffstat (limited to 'stream/stream.c')
-rw-r--r-- | stream/stream.c | 292 |
1 files changed, 187 insertions, 105 deletions
diff --git a/stream/stream.c b/stream/stream.c index 9de53aba52..3281e50fe6 100644 --- a/stream/stream.c +++ b/stream/stream.c @@ -52,10 +52,10 @@ #include "core/m_option.h" #include "core/m_struct.h" -#include "cache2.h" +// Includes additional padding in case sizes get rounded up by sector size. +#define TOTAL_BUFFER_SIZE (STREAM_MAX_BUFFER_SIZE + STREAM_MAX_SECTOR_SIZE) /// We keep these 2 for the gui atm, but they will be removed. -int vcd_track = 0; char *cdrom_device = NULL; char *dvd_device = NULL; int dvd_title = 0; @@ -137,7 +137,8 @@ static const stream_info_t *const auto_open_streams[] = { NULL }; -static stream_t *new_stream(int fd, int type); +static stream_t *new_stream(void); +static int stream_seek_unbuffered(stream_t *s, int64_t newpos); static stream_t *open_stream_plugin(const stream_info_t *sinfo, const char *filename, @@ -166,7 +167,7 @@ static stream_t *open_stream_plugin(const stream_info_t *sinfo, } } } - s = new_stream(-2, -2); + s = new_stream(); s->opts = options; s->url = strdup(filename); s->flags |= mode; @@ -187,6 +188,9 @@ static stream_t *open_stream_plugin(const stream_info_t *sinfo, return NULL; } + if (!s->read_chunk) + s->read_chunk = 4 * (s->sector_size ? s->sector_size : STREAM_BUFFER_SIZE); + if (s->streaming && !s->cache_size) { // Set default cache size to use if user does not specify it. s->cache_size = 320; @@ -194,13 +198,15 @@ static stream_t *open_stream_plugin(const stream_info_t *sinfo, if (s->type <= -2) mp_msg(MSGT_OPEN, MSGL_WARN, "Warning streams need a type !!!!\n"); - if (s->flags & MP_STREAM_SEEK && !s->seek) + if (!s->seek) s->flags &= ~MP_STREAM_SEEK; if (s->seek && !(s->flags & MP_STREAM_SEEK)) s->flags |= MP_STREAM_SEEK; s->mode = mode; + s->uncached_type = s->type; + mp_msg(MSGT_OPEN, MSGL_V, "STREAM: [%s] %s\n", sinfo->name, filename); mp_msg(MSGT_OPEN, MSGL_V, "STREAM: Description: %s\n", sinfo->info); mp_msg(MSGT_OPEN, MSGL_V, "STREAM: Author: %s\n", sinfo->author); @@ -283,8 +289,6 @@ stream_t *open_output_stream(const char *filename, struct MPOpts *options) return open_stream_full(filename, STREAM_WRITE, options, NULL); } -//=================== STREAMER ========================= - static int stream_reconnect(stream_t *s) { #define MAX_RECONNECT_RETRIES 5 @@ -294,13 +298,14 @@ static int stream_reconnect(stream_t *s) int64_t pos = s->pos; for (int retry = 0; retry < MAX_RECONNECT_RETRIES; retry++) { mp_msg(MSGT_STREAM, MSGL_WARN, - "Connection lost! Attempting to reconnect...\n"); + "Connection lost! Attempting to reconnect (%d)...\n", retry + 1); - if (retry) - mp_sleep_us(RECONNECT_SLEEP_MS * 1000); + if (stream_check_interrupt(retry ? RECONNECT_SLEEP_MS : 0)) + return 0; s->eof = 1; - stream_reset(s); + s->pos = 0; + s->buf_pos = s->buf_len = 0; // Some streams (internal http.c) don't support STREAM_CTRL_RECONNECT, // but do it when trying to seek. @@ -309,7 +314,7 @@ static int stream_reconnect(stream_t *s) continue; } - if (stream_seek_internal(s, pos) < 0 && s->pos == pos) + if (stream_seek_unbuffered(s, pos) < 0 && s->pos == pos) return 1; } return 0; @@ -335,10 +340,10 @@ void stream_set_capture_file(stream_t *s, const char *filename) } } -void stream_capture_write(stream_t *s) +static void stream_capture_write(stream_t *s, void *buf, size_t len) { - if (s->capture_file) { - if (fwrite(s->buffer, s->buf_len, 1, s->capture_file) < 1) { + if (s->capture_file && len > 0) { + if (fwrite(buf, len, 1, s->capture_file) < 1) { mp_tmsg(MSGT_GLOBAL, MSGL_ERR, "Error writing capture file: %s\n", strerror(errno)); stream_set_capture_file(s, NULL); @@ -346,9 +351,14 @@ void stream_capture_write(stream_t *s) } } -int stream_read_internal(stream_t *s, void *buf, int len) +// Read function bypassing the local stream buffer. This will not write into +// s->buffer, but into buf[0..len] instead. +// Returns < 0 on error, 0 on EOF, and length of bytes read on success. +// Partial reads are possible, even if EOF is not reached. +static int stream_read_unbuffered(stream_t *s, void *buf, int len) { int orig_len = len; + s->buf_pos = s->buf_len = 0; // we will retry even if we already reached EOF previously. switch (s->type) { case STREAMTYPE_STREAM: @@ -380,28 +390,64 @@ int stream_read_internal(stream_t *s, void *buf, int len) goto eof_out; // make sure EOF is set to ensure no endless loops s->eof = 1; - return stream_read_internal(s, buf, orig_len); + return stream_read_unbuffered(s, buf, orig_len); eof_out: s->eof = 1; return 0; } // When reading succeeded we are obviously not at eof. - // This e.g. avoids issues with eof getting stuck when lavf seeks in MPEG-TS s->eof = 0; s->pos += len; + stream_capture_write(s, buf, len); return len; } +// This works like stdio's ungetc(), but for more than one byte. Rewind the +// file position by buffer_size, and make all future reads/buffer fills read +// from the given buffer, until the buffer is exhausted or a seek outside of +// the buffer happens. +// You can unread at most STREAM_MAX_BUFFER_SIZE bytes. +void stream_unread_buffer(stream_t *s, void *buffer, size_t buffer_size) +{ + assert(stream_tell(s) >= buffer_size); // can't unread to before file start + assert(buffer_size <= STREAM_MAX_BUFFER_SIZE); + // Need to include the remaining buffer to ensure no data is lost. + int remainder = s->buf_len - s->buf_pos; + // Successive buffer unreading might trigger this. + assert(buffer_size + remainder <= TOTAL_BUFFER_SIZE); + memmove(&s->buffer[buffer_size], &s->buffer[s->buf_pos], remainder); + memcpy(s->buffer, buffer, buffer_size); + s->buf_pos = 0; + s->buf_len = buffer_size + remainder; +} + int stream_fill_buffer(stream_t *s) { - int len = stream_read_internal(s, s->buffer, STREAM_BUFFER_SIZE); - if (len <= 0) - return 0; + int len = stream_read_unbuffered(s, s->buffer, STREAM_BUFFER_SIZE); s->buf_pos = 0; - s->buf_len = len; -// printf("[%d]",len);fflush(stdout); - stream_capture_write(s); + s->buf_len = len < 0 ? 0 : len; + return s->buf_len; +} + +// Read between 1..buf_size bytes of data, return how much data has been read. +// Return <= 0 on EOF, error, of if buf_size was 0. +int stream_read_partial(stream_t *s, char *buf, int buf_size) +{ + assert(s->buf_pos <= s->buf_len); + assert(buf_size >= 0); + if (s->buf_pos == s->buf_len && buf_size > 0) { + s->buf_pos = s->buf_len = 0; + // Do a direct read, but only if there's no sector alignment requirement + // Also, small reads will be more efficient with buffering & copying + if (!s->sector_size && buf_size >= STREAM_BUFFER_SIZE) + return stream_read_unbuffered(s, buf, buf_size); + if (!stream_fill_buffer(s)) + return 0; + } + int len = FFMIN(buf_size, s->buf_len - s->buf_pos); + memcpy(buf, &s->buffer[s->buf_pos], len); + s->buf_pos += len; return len; } @@ -409,24 +455,13 @@ int stream_read(stream_t *s, char *mem, int total) { int len = total; while (len > 0) { - int x; - x = s->buf_len - s->buf_pos; - if (x == 0) { - if (!cache_stream_fill_buffer(s)) - return total - len; // EOF - x = s->buf_len - s->buf_pos; - } - if (s->buf_pos > s->buf_len) - mp_msg(MSGT_DEMUX, MSGL_WARN, - "stream_read: WARNING! s->buf_pos>s->buf_len\n"); - if (x > len) - x = len; - memcpy(mem, &s->buffer[s->buf_pos], x); - s->buf_pos += x; - mem += x; - len -= x; + int read = stream_read_partial(s, mem, len); + if (read <= 0) + break; // EOF + mem += read; + len -= read; } - return total; + return total - len; } int stream_write_buffer(stream_t *s, unsigned char *buf, int len) @@ -442,17 +477,17 @@ int stream_write_buffer(stream_t *s, unsigned char *buf, int len) return rd; } -int stream_seek_internal(stream_t *s, int64_t newpos) +// Seek function bypassing the local stream buffer. +static int stream_seek_unbuffered(stream_t *s, int64_t newpos) { if (newpos == 0 || newpos != s->pos) { switch (s->type) { case STREAMTYPE_STREAM: - //s->pos=newpos; // real seek // Some streaming protocol allow to seek backward and forward // A function call that return -1 can tell that the protocol // doesn't support seeking. #ifdef CONFIG_NETWORKING - if (s->seek) { // new stream seek is much cleaner than streaming_ctrl one + if (s->seek) { if (!s->seek(s, newpos)) { mp_tmsg(MSGT_STREAM, MSGL_ERR, "Seek failed\n"); return 0; @@ -486,21 +521,18 @@ int stream_seek_internal(stream_t *s, int64_t newpos) return 0; } } -// putchar('.');fflush(stdout); -//} else { -// putchar('%');fflush(stdout); } + s->eof = 0; // EOF reset when seek succeeds. return -1; } -int stream_seek_long(stream_t *s, int64_t pos) +// Unlike stream_seek, does not try to seek within local buffer. +// Unlike stream_seek_unbuffered(), it still fills the local buffer. +static int stream_seek_long(stream_t *s, int64_t pos) { - int res; - int64_t newpos = 0; - -// if( mp_msg_test(MSGT_STREAM,MSGL_DBG3) ) printf("seek_long to 0x%X\n",(unsigned int)pos); - + int64_t oldpos = s->pos; s->buf_pos = s->buf_len = 0; + s->eof = 0; if (s->mode == STREAM_WRITE) { if (!s->seek || !s->seek(s, pos)) @@ -508,41 +540,38 @@ int stream_seek_long(stream_t *s, int64_t pos) return 1; } + int64_t newpos = pos; if (s->sector_size) newpos = (pos / s->sector_size) * s->sector_size; - else - newpos = pos & (~((int64_t)STREAM_BUFFER_SIZE - 1)); - - if (mp_msg_test(MSGT_STREAM, MSGL_DBG3)) { - mp_msg( - MSGT_STREAM, MSGL_DBG3, - "s->pos=%" PRIX64 " newpos=%" PRIX64 " new_bufpos=%" PRIX64 - " buflen=%X \n", - (int64_t)s->pos, (int64_t)newpos, (int64_t)pos, s->buf_len); - } + + mp_msg(MSGT_STREAM, MSGL_DBG3, "s->pos=%" PRIX64 " newpos=%" PRIX64 + " new_bufpos=%" PRIX64 " buflen=%X \n", + (int64_t)s->pos, (int64_t)newpos, (int64_t)pos, s->buf_len); + pos -= newpos; - res = stream_seek_internal(s, newpos); - if (res >= 0) - return res; + if (stream_seek_unbuffered(s, newpos) >= 0) { + s->pos = oldpos; + return 0; + } while (s->pos < newpos) { if (stream_fill_buffer(s) <= 0) - break; // EOF + break; // EOF } - s->eof = 0; // EOF reset when seek succeeds. while (stream_fill_buffer(s) > 0) { if (pos <= s->buf_len) { s->buf_pos = pos; // byte position in sector + s->eof = 0; return 1; } pos -= s->buf_len; } -// Fill failed, but seek still is a success. - s->pos += pos; + // Fill failed, but seek still is a success (partially). s->buf_pos = 0; s->buf_len = 0; + s->eof = 0; // eof should be set only on read mp_msg(MSGT_STREAM, MSGL_V, "stream_seek: Seek to/past EOF: no buffer preloaded.\n"); @@ -555,62 +584,55 @@ int stream_seek(stream_t *s, int64_t pos) mp_dbg(MSGT_DEMUX, MSGL_DBG3, "seek to 0x%llX\n", (long long)pos); if (pos < 0) { - mp_msg(MSGT_DEMUX, MSGL_ERR, - "Invalid seek to negative position %llx!\n", + mp_msg(MSGT_DEMUX, MSGL_ERR, "Invalid seek to negative position %llx!\n", (long long)pos); pos = 0; } if (pos < s->pos) { - int64_t x = pos - (s->pos - s->buf_len); + int64_t x = pos - (s->pos - (int)s->buf_len); if (x >= 0) { s->buf_pos = x; s->eof = 0; -// putchar('*');fflush(stdout); return 1; } } - return cache_stream_seek_long(s, pos); + return stream_seek_long(s, pos); } int stream_skip(stream_t *s, int64_t len) { - if (len < 0 || - (len > 2 * STREAM_BUFFER_SIZE && (s->flags & MP_STREAM_SEEK_FW))) { - // negative or big skip! - return stream_seek(s, stream_tell(s) + len); + int64_t target = stream_tell(s) + len; + if (len < 0) + return stream_seek(s, target); + if (len > 2 * STREAM_BUFFER_SIZE && (s->flags & MP_STREAM_SEEK_FW)) { + // Seek to 1 byte before target - this is the only way to distinguish + // skip-to-EOF and skip-past-EOF in general. Successful seeking means + // absolutely nothing, so test by doing a real read of the last byte. + int r = stream_seek(s, target - 1); + if (r) { + stream_read_char(s); + return !stream_eof(s) && stream_tell(s) == target; + } + return r; } while (len > 0) { int x = s->buf_len - s->buf_pos; if (x == 0) { - if (!cache_stream_fill_buffer(s)) - return 0; // EOF + if (!stream_fill_buffer(s)) + return 0; // EOF x = s->buf_len - s->buf_pos; } if (x > len) x = len; - //memcpy(mem,&s->buf[s->buf_pos],x); s->buf_pos += x; len -= x; } return 1; } -void stream_reset(stream_t *s) -{ - if (s->eof) { - s->pos = 0; - s->buf_pos = s->buf_len = 0; - s->eof = 0; - } -} - int stream_control(stream_t *s, int cmd, void *arg) { -#ifdef CONFIG_STREAM_CACHE - if (s->cache_pid) - return cache_do_control(s, cmd, arg); -#endif if (!s->control) return STREAM_UNSUPPORTED; return s->control(s, cmd, arg); @@ -625,9 +647,10 @@ void stream_update_size(stream_t *s) } } -static stream_t *new_stream(int fd, int type) +static stream_t *new_stream(void) { - stream_t *s = talloc_zero(NULL, stream_t); + stream_t *s = talloc_size(NULL, sizeof(stream_t) + TOTAL_BUFFER_SIZE); + memset(s, 0, sizeof(stream_t)); #if HAVE_WINSOCK2_H { @@ -637,18 +660,13 @@ static stream_t *new_stream(int fd, int type) } #endif - s->fd = fd; - s->type = type; - stream_reset(s); + s->fd = -2; + s->type = -2; return s; } void free_stream(stream_t *s) { -// printf("\n*** free_stream() called ***\n"); -#ifdef CONFIG_STREAM_CACHE - cache_uninit(s); -#endif stream_set_capture_file(s, NULL); if (s->close) @@ -667,6 +685,8 @@ void free_stream(stream_t *s) WSACleanup(); // there might be a better place for this (-> later) #endif free(s->url); + if (s->uncached_stream) + free_stream(s->uncached_stream); talloc_free(s); } @@ -686,6 +706,68 @@ int stream_check_interrupt(int time) return stream_check_interrupt_cb(stream_check_interrupt_ctx, time); } +int stream_enable_cache_percent(stream_t **stream, int64_t stream_cache_size, + float stream_cache_min_percent, + float stream_cache_seek_min_percent) +{ + + if (stream_cache_size == -1) + stream_cache_size = (*stream)->cache_size; + + stream_cache_size = stream_cache_size * 1024; // input is in KiB + + return stream_enable_cache(stream, stream_cache_size, + stream_cache_size * + (stream_cache_min_percent / 100.0), + stream_cache_size * + (stream_cache_seek_min_percent / 100.0)); +} + +/** + * \return 1 on success, 0 if the function was interrupted and -1 on error, or + * if the cache is disabled + */ +int stream_enable_cache(stream_t **stream, int64_t size, int64_t min, + int64_t seek_limit) +{ + stream_t *orig = *stream; + + if (orig->mode != STREAM_READ) + return 1; + + // Can't handle a loaded buffer. + orig->buf_len = orig->buf_pos = 0; + + stream_t *cache = new_stream(); + cache->type = STREAMTYPE_CACHE; + cache->uncached_type = orig->type; + cache->uncached_stream = orig; + cache->flags |= MP_STREAM_SEEK; + cache->mode = STREAM_READ; + cache->read_chunk = 4 * STREAM_BUFFER_SIZE; + + cache->url = strdup(orig->url); + cache->mime_type = talloc_strdup(cache, orig->mime_type); + cache->lavf_type = orig->lavf_type; + cache->opts = orig->opts; + cache->start_pos = orig->start_pos; + cache->end_pos = orig->end_pos; + + int res = -1; + +#ifdef CONFIG_STREAM_CACHE + res = stream_cache_init(cache, orig, size, min, seek_limit); +#endif + + if (res <= 0) { + cache->uncached_stream = NULL; // don't free original stream + free_stream(cache); + } else { + *stream = cache; + } + return res; +} + /** * Helper function to read 16 bits little-endian and advance pointer */ @@ -794,7 +876,7 @@ unsigned char *stream_read_line(stream_t *s, unsigned char *mem, int max, len = s->buf_len - s->buf_pos; // try to fill the buffer if (len <= 0 && - (!cache_stream_fill_buffer(s) || + (!stream_fill_buffer(s) || (len = s->buf_len - s->buf_pos) <= 0)) break; end = find_newline(s->buffer + s->buf_pos, len, utf16); |