summaryrefslogtreecommitdiffstats
path: root/demux/demux.c
diff options
context:
space:
mode:
Diffstat (limited to 'demux/demux.c')
-rw-r--r--demux/demux.c109
1 files changed, 76 insertions, 33 deletions
diff --git a/demux/demux.c b/demux/demux.c
index 5fc81ad41e..b9101a3054 100644
--- a/demux/demux.c
+++ b/demux/demux.c
@@ -29,6 +29,7 @@
#include <sys/types.h>
#include <sys/stat.h>
+#include "cache.h"
#include "config.h"
#include "options/m_config.h"
#include "options/m_option.h"
@@ -80,6 +81,7 @@ static const demuxer_desc_t *const demuxer_list[] = {
struct demux_opts {
int enable_cache;
+ int disk_cache;
int64_t max_bytes;
int64_t max_bytes_bw;
double min_secs;
@@ -103,6 +105,7 @@ const struct m_sub_options demux_conf = {
.opts = (const struct m_option[]){
OPT_CHOICE("cache", enable_cache, 0,
({"no", 0}, {"auto", -1}, {"yes", 1})),
+ OPT_FLAG("cache-on-disk", disk_cache, 0),
OPT_DOUBLE("demuxer-readahead-secs", min_secs, M_OPT_MIN, .min = 0),
// (The MAX_BYTES sizes may not be accurate because the max field is
// of double type.)
@@ -179,6 +182,8 @@ struct demux_internal {
int events;
+ struct demux_cache *cache;
+
bool warned_queue_overflow;
bool last_eof; // last actual global EOF status
bool eof; // whether we're in EOF state (reset for retry)
@@ -1039,6 +1044,10 @@ static void demux_shutdown(struct demux_internal *in)
in->current_range = NULL;
free_empty_cached_ranges(in);
+
+ talloc_free(in->cache);
+ in->cache = NULL;
+
if (in->owns_stream)
free_stream(demuxer->stream);
demuxer->stream = NULL;
@@ -1612,9 +1621,10 @@ static void attempt_range_joining(struct demux_internal *in)
// in case pos/dts are not "correct" across the ranges (we
// never actually check that).
if (dp->dts != end->dts || dp->pos != end->pos ||
- dp->pts != end->pts || dp->len != end->len)
+ dp->pts != end->pts)
{
- MP_WARN(in, "stream %d: weird demuxer behavior\n", n);
+ MP_WARN(in,
+ "stream %d: non-repeatable demuxer behavior\n", n);
goto failed;
}
@@ -1822,6 +1832,36 @@ static void adjust_seek_range_on_packet(struct demux_stream *ds,
}
}
+static void record_packet(struct demux_internal *in, struct demux_packet *dp)
+{
+ // (should preferably be outside of the lock)
+ if (in->enable_recording && !in->recorder &&
+ in->opts->record_file && in->opts->record_file[0])
+ {
+ // Later failures shouldn't make it retry and overwrite the previously
+ // recorded file.
+ in->enable_recording = false;
+
+ in->recorder =
+ mp_recorder_create(in->d_thread->global, in->opts->record_file,
+ in->streams, in->num_streams);
+ if (!in->recorder)
+ MP_ERR(in, "Disabling recording.\n");
+ }
+
+ if (in->recorder) {
+ struct mp_recorder_sink *sink =
+ mp_recorder_get_sink(in->recorder, dp->stream);
+ if (sink) {
+ mp_recorder_feed_packet(sink, dp);
+ } else {
+ MP_ERR(in, "New stream appeared; stopping recording.\n");
+ mp_recorder_destroy(in->recorder);
+ in->recorder = NULL;
+ }
+ }
+}
+
static void add_packet_locked(struct sh_stream *stream, demux_packet_t *dp)
{
struct demux_stream *ds = stream ? stream->ds : NULL;
@@ -1864,6 +1904,17 @@ static void add_packet_locked(struct sh_stream *stream, demux_packet_t *dp)
return;
}
+ record_packet(in, dp);
+
+ if (in->cache) {
+ int64_t pos = demux_cache_write(in->cache, dp);
+ if (pos >= 0) {
+ demux_packet_unref_contents(dp);
+ dp->is_cached = true;
+ dp->cached_data.pos = pos;
+ }
+ }
+
queue->correct_pos &= dp->pos >= 0 && dp->pos > queue->last_pos;
queue->correct_dts &= dp->dts != MP_NOPTS_VALUE && dp->dts > queue->last_dts;
queue->last_pos = dp->pos;
@@ -1940,33 +1991,6 @@ static void add_packet_locked(struct sh_stream *stream, demux_packet_t *dp)
if (!ds->reader_head)
return;
- // (should preferably be outside of the lock)
- if (in->enable_recording && !in->recorder &&
- in->opts->record_file && in->opts->record_file[0])
- {
- // Later failures shouldn't make it retry and overwrite the previously
- // recorded file.
- in->enable_recording = false;
-
- in->recorder =
- mp_recorder_create(in->d_thread->global, in->opts->record_file,
- in->streams, in->num_streams);
- if (!in->recorder)
- MP_ERR(in, "Disabling recording.\n");
- }
-
- if (in->recorder) {
- struct mp_recorder_sink *sink =
- mp_recorder_get_sink(in->recorder, dp->stream);
- if (sink) {
- mp_recorder_feed_packet(sink, dp);
- } else {
- MP_ERR(in, "New stream appeared; stopping recording.\n");
- mp_recorder_destroy(in->recorder);
- in->recorder = NULL;
- }
- }
-
back_demux_see_packets(ds);
wakeup_ds(ds);
@@ -2396,11 +2420,21 @@ static int dequeue_packet(struct demux_stream *ds, struct demux_packet **res)
struct demux_packet *pkt = advance_reader_head(ds);
assert(pkt);
- // The returned packet is mutated etc. and will be owned by the user.
- pkt = demux_copy_packet(pkt);
+ if (pkt->is_cached) {
+ assert(in->cache);
+ struct demux_packet *meta = pkt;
+ pkt = demux_cache_read(in->cache, pkt->cached_data.pos);
+ if (pkt) {
+ demux_packet_copy_attribs(pkt, meta);
+ } else {
+ MP_ERR(in, "Failed to retrieve packet from cache.\n");
+ }
+ } else {
+ // The returned packet is mutated etc. and will be owned by the user.
+ pkt = demux_copy_packet(pkt);
+ }
if (!pkt)
- abort();
- pkt->next = NULL;
+ return 0;
if (in->back_demuxing) {
if (pkt->keyframe) {
@@ -3007,11 +3041,19 @@ static struct demuxer *open_given_type(struct mpv_global *global,
timeline_destroy(tl);
}
}
+
if (!(params && params->is_top_level) || sub) {
in->seekable_cache = false;
in->min_secs = 0;
in->max_bytes = 1;
}
+
+ if (in->seekable_cache && opts->disk_cache) {
+ in->cache = demux_cache_create(global, log);
+ if (!in->cache)
+ MP_ERR(in, "Failed to create file cache.\n");
+ }
+
switch_to_fresh_cache_range(in);
demux_update(demuxer, MP_NOPTS_VALUE);
@@ -3803,6 +3845,7 @@ void demux_get_reader_state(struct demuxer *demuxer, struct demux_reader_state *
.low_level_seeks = in->low_level_seeks,
.ts_last = in->demux_ts,
.bytes_per_second = in->bytes_per_second,
+ .file_cache_bytes = in->cache ? demux_cache_get_size(in->cache) : -1,
};
bool any_packets = false;
for (int n = 0; n < in->num_streams; n++) {