diff options
Diffstat (limited to 'demux/demux.c')
-rw-r--r-- | demux/demux.c | 109 |
1 files changed, 76 insertions, 33 deletions
diff --git a/demux/demux.c b/demux/demux.c index 5fc81ad41e..b9101a3054 100644 --- a/demux/demux.c +++ b/demux/demux.c @@ -29,6 +29,7 @@ #include <sys/types.h> #include <sys/stat.h> +#include "cache.h" #include "config.h" #include "options/m_config.h" #include "options/m_option.h" @@ -80,6 +81,7 @@ static const demuxer_desc_t *const demuxer_list[] = { struct demux_opts { int enable_cache; + int disk_cache; int64_t max_bytes; int64_t max_bytes_bw; double min_secs; @@ -103,6 +105,7 @@ const struct m_sub_options demux_conf = { .opts = (const struct m_option[]){ OPT_CHOICE("cache", enable_cache, 0, ({"no", 0}, {"auto", -1}, {"yes", 1})), + OPT_FLAG("cache-on-disk", disk_cache, 0), OPT_DOUBLE("demuxer-readahead-secs", min_secs, M_OPT_MIN, .min = 0), // (The MAX_BYTES sizes may not be accurate because the max field is // of double type.) @@ -179,6 +182,8 @@ struct demux_internal { int events; + struct demux_cache *cache; + bool warned_queue_overflow; bool last_eof; // last actual global EOF status bool eof; // whether we're in EOF state (reset for retry) @@ -1039,6 +1044,10 @@ static void demux_shutdown(struct demux_internal *in) in->current_range = NULL; free_empty_cached_ranges(in); + + talloc_free(in->cache); + in->cache = NULL; + if (in->owns_stream) free_stream(demuxer->stream); demuxer->stream = NULL; @@ -1612,9 +1621,10 @@ static void attempt_range_joining(struct demux_internal *in) // in case pos/dts are not "correct" across the ranges (we // never actually check that). if (dp->dts != end->dts || dp->pos != end->pos || - dp->pts != end->pts || dp->len != end->len) + dp->pts != end->pts) { - MP_WARN(in, "stream %d: weird demuxer behavior\n", n); + MP_WARN(in, + "stream %d: non-repeatable demuxer behavior\n", n); goto failed; } @@ -1822,6 +1832,36 @@ static void adjust_seek_range_on_packet(struct demux_stream *ds, } } +static void record_packet(struct demux_internal *in, struct demux_packet *dp) +{ + // (should preferably be outside of the lock) + if (in->enable_recording && !in->recorder && + in->opts->record_file && in->opts->record_file[0]) + { + // Later failures shouldn't make it retry and overwrite the previously + // recorded file. + in->enable_recording = false; + + in->recorder = + mp_recorder_create(in->d_thread->global, in->opts->record_file, + in->streams, in->num_streams); + if (!in->recorder) + MP_ERR(in, "Disabling recording.\n"); + } + + if (in->recorder) { + struct mp_recorder_sink *sink = + mp_recorder_get_sink(in->recorder, dp->stream); + if (sink) { + mp_recorder_feed_packet(sink, dp); + } else { + MP_ERR(in, "New stream appeared; stopping recording.\n"); + mp_recorder_destroy(in->recorder); + in->recorder = NULL; + } + } +} + static void add_packet_locked(struct sh_stream *stream, demux_packet_t *dp) { struct demux_stream *ds = stream ? stream->ds : NULL; @@ -1864,6 +1904,17 @@ static void add_packet_locked(struct sh_stream *stream, demux_packet_t *dp) return; } + record_packet(in, dp); + + if (in->cache) { + int64_t pos = demux_cache_write(in->cache, dp); + if (pos >= 0) { + demux_packet_unref_contents(dp); + dp->is_cached = true; + dp->cached_data.pos = pos; + } + } + queue->correct_pos &= dp->pos >= 0 && dp->pos > queue->last_pos; queue->correct_dts &= dp->dts != MP_NOPTS_VALUE && dp->dts > queue->last_dts; queue->last_pos = dp->pos; @@ -1940,33 +1991,6 @@ static void add_packet_locked(struct sh_stream *stream, demux_packet_t *dp) if (!ds->reader_head) return; - // (should preferably be outside of the lock) - if (in->enable_recording && !in->recorder && - in->opts->record_file && in->opts->record_file[0]) - { - // Later failures shouldn't make it retry and overwrite the previously - // recorded file. - in->enable_recording = false; - - in->recorder = - mp_recorder_create(in->d_thread->global, in->opts->record_file, - in->streams, in->num_streams); - if (!in->recorder) - MP_ERR(in, "Disabling recording.\n"); - } - - if (in->recorder) { - struct mp_recorder_sink *sink = - mp_recorder_get_sink(in->recorder, dp->stream); - if (sink) { - mp_recorder_feed_packet(sink, dp); - } else { - MP_ERR(in, "New stream appeared; stopping recording.\n"); - mp_recorder_destroy(in->recorder); - in->recorder = NULL; - } - } - back_demux_see_packets(ds); wakeup_ds(ds); @@ -2396,11 +2420,21 @@ static int dequeue_packet(struct demux_stream *ds, struct demux_packet **res) struct demux_packet *pkt = advance_reader_head(ds); assert(pkt); - // The returned packet is mutated etc. and will be owned by the user. - pkt = demux_copy_packet(pkt); + if (pkt->is_cached) { + assert(in->cache); + struct demux_packet *meta = pkt; + pkt = demux_cache_read(in->cache, pkt->cached_data.pos); + if (pkt) { + demux_packet_copy_attribs(pkt, meta); + } else { + MP_ERR(in, "Failed to retrieve packet from cache.\n"); + } + } else { + // The returned packet is mutated etc. and will be owned by the user. + pkt = demux_copy_packet(pkt); + } if (!pkt) - abort(); - pkt->next = NULL; + return 0; if (in->back_demuxing) { if (pkt->keyframe) { @@ -3007,11 +3041,19 @@ static struct demuxer *open_given_type(struct mpv_global *global, timeline_destroy(tl); } } + if (!(params && params->is_top_level) || sub) { in->seekable_cache = false; in->min_secs = 0; in->max_bytes = 1; } + + if (in->seekable_cache && opts->disk_cache) { + in->cache = demux_cache_create(global, log); + if (!in->cache) + MP_ERR(in, "Failed to create file cache.\n"); + } + switch_to_fresh_cache_range(in); demux_update(demuxer, MP_NOPTS_VALUE); @@ -3803,6 +3845,7 @@ void demux_get_reader_state(struct demuxer *demuxer, struct demux_reader_state * .low_level_seeks = in->low_level_seeks, .ts_last = in->demux_ts, .bytes_per_second = in->bytes_per_second, + .file_cache_bytes = in->cache ? demux_cache_get_size(in->cache) : -1, }; bool any_packets = false; for (int n = 0; n < in->num_streams; n++) { |