summaryrefslogtreecommitdiffstats
path: root/demux/demux.c
diff options
context:
space:
mode:
authorwm4 <wm4@nowhere>2019-06-13 19:10:32 +0200
committerwm4 <wm4@nowhere>2019-09-19 20:37:05 +0200
commit17da9071a4a524139e2ef945ed6cde17dd08c99c (patch)
tree66201f387ae33c5f6dbdb283d62066177b0cedf3 /demux/demux.c
parentef507ad50a0933581f39a0bb86dd85fce9d8f7bc (diff)
downloadmpv-17da9071a4a524139e2ef945ed6cde17dd08c99c.tar.bz2
mpv-17da9071a4a524139e2ef945ed6cde17dd08c99c.tar.xz
demux: add a on-disk cache
Somewhat similar to the old --cache-file, except for the demuxer cache. Instead of keeping packet data in memory, it's written to disk and read back when needed. The idea is to reduce main memory usage, while allowing fast seeking in large cached network streams (especially live streams). Keeping the packet metadata on disk would be rather hard (would use mmap or so, or rewrite the entire demux.c packet queue handling), and since it's relatively small, just keep it in memory. Also for simplicity, the disk cache is append-only. If you're watching really long livestreams, and need pruning, you're probably out of luck. This still could be improved by trying to free unused blocks with fallocate(), but since we're writing multiple streams in an interleaved manner, this is slightly hard. Some rather gross ugliness in packet.h: we want to store the file position of the cached data somewhere, but on 32 bit architectures, we don't have any usable 64 bit members for this, just the buf/len fields, which add up to 64 bit - so the shitty union aliases this memory. Error paths untested. Side data (the complicated part of trying to serialize ffmpeg packets) untested. Stream recording had to be adjusted. Some minor details change due to this, but probably nothing important. The change in attempt_range_joining() is because packets in cache have no valid len field. It was a useful check (heuristically finding broken cases), but not a necessary one. Various other approaches were tried. It would be interesting to list them and to mention the pros and cons, but I don't feel like it.
Diffstat (limited to 'demux/demux.c')
-rw-r--r--demux/demux.c109
1 files changed, 76 insertions, 33 deletions
diff --git a/demux/demux.c b/demux/demux.c
index 5fc81ad41e..b9101a3054 100644
--- a/demux/demux.c
+++ b/demux/demux.c
@@ -29,6 +29,7 @@
#include <sys/types.h>
#include <sys/stat.h>
+#include "cache.h"
#include "config.h"
#include "options/m_config.h"
#include "options/m_option.h"
@@ -80,6 +81,7 @@ static const demuxer_desc_t *const demuxer_list[] = {
struct demux_opts {
int enable_cache;
+ int disk_cache;
int64_t max_bytes;
int64_t max_bytes_bw;
double min_secs;
@@ -103,6 +105,7 @@ const struct m_sub_options demux_conf = {
.opts = (const struct m_option[]){
OPT_CHOICE("cache", enable_cache, 0,
({"no", 0}, {"auto", -1}, {"yes", 1})),
+ OPT_FLAG("cache-on-disk", disk_cache, 0),
OPT_DOUBLE("demuxer-readahead-secs", min_secs, M_OPT_MIN, .min = 0),
// (The MAX_BYTES sizes may not be accurate because the max field is
// of double type.)
@@ -179,6 +182,8 @@ struct demux_internal {
int events;
+ struct demux_cache *cache;
+
bool warned_queue_overflow;
bool last_eof; // last actual global EOF status
bool eof; // whether we're in EOF state (reset for retry)
@@ -1039,6 +1044,10 @@ static void demux_shutdown(struct demux_internal *in)
in->current_range = NULL;
free_empty_cached_ranges(in);
+
+ talloc_free(in->cache);
+ in->cache = NULL;
+
if (in->owns_stream)
free_stream(demuxer->stream);
demuxer->stream = NULL;
@@ -1612,9 +1621,10 @@ static void attempt_range_joining(struct demux_internal *in)
// in case pos/dts are not "correct" across the ranges (we
// never actually check that).
if (dp->dts != end->dts || dp->pos != end->pos ||
- dp->pts != end->pts || dp->len != end->len)
+ dp->pts != end->pts)
{
- MP_WARN(in, "stream %d: weird demuxer behavior\n", n);
+ MP_WARN(in,
+ "stream %d: non-repeatable demuxer behavior\n", n);
goto failed;
}
@@ -1822,6 +1832,36 @@ static void adjust_seek_range_on_packet(struct demux_stream *ds,
}
}
+static void record_packet(struct demux_internal *in, struct demux_packet *dp)
+{
+ // (should preferably be outside of the lock)
+ if (in->enable_recording && !in->recorder &&
+ in->opts->record_file && in->opts->record_file[0])
+ {
+ // Later failures shouldn't make it retry and overwrite the previously
+ // recorded file.
+ in->enable_recording = false;
+
+ in->recorder =
+ mp_recorder_create(in->d_thread->global, in->opts->record_file,
+ in->streams, in->num_streams);
+ if (!in->recorder)
+ MP_ERR(in, "Disabling recording.\n");
+ }
+
+ if (in->recorder) {
+ struct mp_recorder_sink *sink =
+ mp_recorder_get_sink(in->recorder, dp->stream);
+ if (sink) {
+ mp_recorder_feed_packet(sink, dp);
+ } else {
+ MP_ERR(in, "New stream appeared; stopping recording.\n");
+ mp_recorder_destroy(in->recorder);
+ in->recorder = NULL;
+ }
+ }
+}
+
static void add_packet_locked(struct sh_stream *stream, demux_packet_t *dp)
{
struct demux_stream *ds = stream ? stream->ds : NULL;
@@ -1864,6 +1904,17 @@ static void add_packet_locked(struct sh_stream *stream, demux_packet_t *dp)
return;
}
+ record_packet(in, dp);
+
+ if (in->cache) {
+ int64_t pos = demux_cache_write(in->cache, dp);
+ if (pos >= 0) {
+ demux_packet_unref_contents(dp);
+ dp->is_cached = true;
+ dp->cached_data.pos = pos;
+ }
+ }
+
queue->correct_pos &= dp->pos >= 0 && dp->pos > queue->last_pos;
queue->correct_dts &= dp->dts != MP_NOPTS_VALUE && dp->dts > queue->last_dts;
queue->last_pos = dp->pos;
@@ -1940,33 +1991,6 @@ static void add_packet_locked(struct sh_stream *stream, demux_packet_t *dp)
if (!ds->reader_head)
return;
- // (should preferably be outside of the lock)
- if (in->enable_recording && !in->recorder &&
- in->opts->record_file && in->opts->record_file[0])
- {
- // Later failures shouldn't make it retry and overwrite the previously
- // recorded file.
- in->enable_recording = false;
-
- in->recorder =
- mp_recorder_create(in->d_thread->global, in->opts->record_file,
- in->streams, in->num_streams);
- if (!in->recorder)
- MP_ERR(in, "Disabling recording.\n");
- }
-
- if (in->recorder) {
- struct mp_recorder_sink *sink =
- mp_recorder_get_sink(in->recorder, dp->stream);
- if (sink) {
- mp_recorder_feed_packet(sink, dp);
- } else {
- MP_ERR(in, "New stream appeared; stopping recording.\n");
- mp_recorder_destroy(in->recorder);
- in->recorder = NULL;
- }
- }
-
back_demux_see_packets(ds);
wakeup_ds(ds);
@@ -2396,11 +2420,21 @@ static int dequeue_packet(struct demux_stream *ds, struct demux_packet **res)
struct demux_packet *pkt = advance_reader_head(ds);
assert(pkt);
- // The returned packet is mutated etc. and will be owned by the user.
- pkt = demux_copy_packet(pkt);
+ if (pkt->is_cached) {
+ assert(in->cache);
+ struct demux_packet *meta = pkt;
+ pkt = demux_cache_read(in->cache, pkt->cached_data.pos);
+ if (pkt) {
+ demux_packet_copy_attribs(pkt, meta);
+ } else {
+ MP_ERR(in, "Failed to retrieve packet from cache.\n");
+ }
+ } else {
+ // The returned packet is mutated etc. and will be owned by the user.
+ pkt = demux_copy_packet(pkt);
+ }
if (!pkt)
- abort();
- pkt->next = NULL;
+ return 0;
if (in->back_demuxing) {
if (pkt->keyframe) {
@@ -3007,11 +3041,19 @@ static struct demuxer *open_given_type(struct mpv_global *global,
timeline_destroy(tl);
}
}
+
if (!(params && params->is_top_level) || sub) {
in->seekable_cache = false;
in->min_secs = 0;
in->max_bytes = 1;
}
+
+ if (in->seekable_cache && opts->disk_cache) {
+ in->cache = demux_cache_create(global, log);
+ if (!in->cache)
+ MP_ERR(in, "Failed to create file cache.\n");
+ }
+
switch_to_fresh_cache_range(in);
demux_update(demuxer, MP_NOPTS_VALUE);
@@ -3803,6 +3845,7 @@ void demux_get_reader_state(struct demuxer *demuxer, struct demux_reader_state *
.low_level_seeks = in->low_level_seeks,
.ts_last = in->demux_ts,
.bytes_per_second = in->bytes_per_second,
+ .file_cache_bytes = in->cache ? demux_cache_get_size(in->cache) : -1,
};
bool any_packets = false;
for (int n = 0; n < in->num_streams; n++) {