summaryrefslogtreecommitdiffstats
path: root/demux
diff options
context:
space:
mode:
authorwm4 <wm4@nowhere>2019-07-07 20:38:22 +0200
committerwm4 <wm4@nowhere>2019-09-19 20:37:05 +0200
commit023b5964b047d83772163195c322936968e7d47a (patch)
tree8236e49d39e364bab04439aded260603bb566b31 /demux
parent226e050b83b1c9946c52ae77b78a63e82a3ae628 (diff)
downloadmpv-023b5964b047d83772163195c322936968e7d47a.tar.bz2
mpv-023b5964b047d83772163195c322936968e7d47a.tar.xz
demux, command: add a third stream recording mechanism
That's right, and it's probably not the end of it. I'll just claim that I have no idea how to create a proper user interface for this, so I'm creating multiple partially-orthogonal, of which some may work better in each of its special use cases. Until now, there was --record-file. You get relatively good control about what is muxed, and it can use the cache. But it sucks that it's bound to playback. If you pause while it's set, muxing stops. If you seek while it's set, the output will be sort-of trashed, and that's by design. Then --stream-record was added. This is a bit better (especially for live streams), but you can't really control well when muxing stops or ends. In particular, it can't use the cache (it just dumps whatever the underlying demuxer returns). Today, the idea is that the user should just be able to select a time range to dump to a file, and it should not affected by the user seeking around in the cache. In addition, the stream may still be running, so there's some need to continue dumping, even if it's redundant to --stream-record. One notable thing is that it uses the async command shit. Not sure whether this is a good idea. Maybe not, but whatever. Also, a user can always use the "async" prefix to pretend it doesn't. Much of this was barely tested (especially the reinterleaving crap), let's just hope it mostly works. I'm sure you can tolerate the one or other crash?
Diffstat (limited to 'demux')
-rw-r--r--demux/demux.c204
-rw-r--r--demux/demux.h4
2 files changed, 208 insertions, 0 deletions
diff --git a/demux/demux.c b/demux/demux.c
index 6c03c71930..19a57a7c32 100644
--- a/demux/demux.c
+++ b/demux/demux.c
@@ -260,6 +260,9 @@ struct demux_internal {
bool force_metadata_update;
int cached_metadata_index; // speed up repeated lookups
+ struct mp_recorder *dumper;
+ int dumper_status;
+
// -- Access from demuxer thread only
bool enable_recording;
struct mp_recorder *recorder;
@@ -380,6 +383,10 @@ struct demux_stream {
bool attached_picture_added;
bool need_wakeup; // call wakeup_cb on next reader_head state change
+ // For demux_internal.dumper. Currently, this is used only temporarily
+ // during blocking dumping.
+ struct demux_packet *dump_pos;
+
// for refresh seeks: pos/dts of last packet returned to reader
int64_t last_ret_pos;
double last_ret_dts;
@@ -430,6 +437,7 @@ static void find_backward_restart_pos(struct demux_stream *ds);
static struct demux_packet *find_seek_target(struct demux_queue *queue,
double pts, int flags);
static void prune_old_packets(struct demux_internal *in);
+static void dumper_close(struct demux_internal *in);
static uint64_t get_foward_buffered_bytes(struct demux_stream *ds)
{
@@ -1060,6 +1068,8 @@ static void demux_shutdown(struct demux_internal *in)
in->recorder = NULL;
}
+ dumper_close(in);
+
if (demuxer->desc->close)
demuxer->desc->close(in->d_thread);
demuxer->priv = NULL;
@@ -1892,6 +1902,20 @@ static void adjust_seek_range_on_packet(struct demux_stream *ds,
}
}
+static void write_dump_packet(struct demux_internal *in, struct demux_packet *dp)
+{
+ assert(in->dumper);
+ assert(in->dumper_status == CONTROL_TRUE);
+
+ struct mp_recorder_sink *sink = mp_recorder_get_sink(in->dumper, dp->stream);
+ if (sink) {
+ mp_recorder_feed_packet(sink, dp);
+ } else {
+ MP_ERR(in, "New stream appeared; stopping recording.\n");
+ in->dumper_status = CONTROL_ERROR;
+ }
+}
+
static void record_packet(struct demux_internal *in, struct demux_packet *dp)
{
// (should preferably be outside of the lock)
@@ -1920,6 +1944,9 @@ static void record_packet(struct demux_internal *in, struct demux_packet *dp)
in->recorder = NULL;
}
}
+
+ if (in->dumper_status == CONTROL_OK)
+ write_dump_packet(in, dp);
}
static void add_packet_locked(struct sh_stream *stream, demux_packet_t *dp)
@@ -3970,6 +3997,183 @@ static void update_cache(struct demux_internal *in)
pthread_mutex_unlock(&in->lock);
}
+static void dumper_close(struct demux_internal *in)
+{
+ if (in->dumper)
+ mp_recorder_destroy(in->dumper);
+ in->dumper = NULL;
+ if (in->dumper_status == CONTROL_TRUE)
+ in->dumper_status = CONTROL_FALSE; // make abort equal to success
+}
+
+static int range_time_compare(const void *p1, const void *p2)
+{
+ struct demux_cached_range *r1 = (void *)p1;
+ struct demux_cached_range *r2 = (void *)p2;
+
+ if (r1->seek_start == r2->seek_start)
+ return 0;
+ return r1->seek_start < r2->seek_start ? -1 : 1;
+}
+
+static void dump_cache(struct demux_internal *in, double start, double end)
+{
+ in->dumper_status = in->dumper ? CONTROL_TRUE : CONTROL_ERROR;
+ if (!in->dumper)
+ return;
+
+ // (only in pathological cases there might be more ranges than allowed)
+ struct demux_cached_range *ranges[MAX_SEEK_RANGES];
+ int num_ranges = 0;
+ for (int n = 0; n < MPMIN(MP_ARRAY_SIZE(ranges), in->num_ranges); n++)
+ ranges[num_ranges++] = in->ranges[n];
+ qsort(ranges, num_ranges, sizeof(ranges[0]), range_time_compare);
+
+ for (int n = 0; n < num_ranges; n++) {
+ struct demux_cached_range *r = ranges[n];
+ if (r->seek_start == MP_NOPTS_VALUE)
+ continue;
+ if (r->seek_end <= start)
+ continue;
+ if (end != MP_NOPTS_VALUE && r->seek_start >= end)
+ continue;
+
+ mp_recorder_mark_discontinuity(in->dumper);
+
+ double pts = start;
+ int flags = 0;
+ adjust_cache_seek_target(in, r, &pts, &flags);
+
+ for (int i = 0; i < r->num_streams; i++) {
+ struct demux_queue *q = r->streams[i];
+ struct demux_stream *ds = q->ds;
+
+ ds->dump_pos = find_seek_target(q, pts, flags);
+ }
+
+ // We need to reinterleave the separate streams somehow, which makes
+ // everything more complex.
+ while (1) {
+ struct demux_packet *next = NULL;
+ double next_dts = MP_NOPTS_VALUE;
+
+ for (int i = 0; i < r->num_streams; i++) {
+ struct demux_stream *ds = r->streams[i]->ds;
+ struct demux_packet *dp = ds->dump_pos;
+
+ if (!dp)
+ continue;
+ assert(dp->stream == ds->index);
+
+ double pdts = MP_PTS_OR_DEF(dp->dts, dp->pts);
+
+ // Check for stream EOF. Note that we don't try to EOF
+ // streams at the same point (e.g. video can take longer
+ // to finish than audio, so the output file will have no
+ // audio for the last part of the video). Too much effort.
+ if (pdts != MP_NOPTS_VALUE && end != MP_NOPTS_VALUE &&
+ pdts >= end && dp->keyframe)
+ {
+ ds->dump_pos = NULL;
+ continue;
+ }
+
+ if (pdts == MP_NOPTS_VALUE || next_dts == MP_NOPTS_VALUE ||
+ pdts < next_dts)
+ {
+ next_dts = pdts;
+ next = dp;
+ }
+ }
+
+ if (!next)
+ break;
+
+ struct demux_stream *ds = in->streams[next->stream]->ds;
+ ds->dump_pos = next->next;
+
+ struct demux_packet *dp = read_packet_from_cache(in, next);
+ if (!dp) {
+ in->dumper_status = CONTROL_ERROR;
+ break;
+ }
+
+ write_dump_packet(in, dp);
+
+ talloc_free(dp);
+ }
+
+ if (in->dumper_status != CONTROL_OK)
+ break;
+ }
+
+ // (strictly speaking unnecessary; for clarity)
+ for (int n = 0; n < in->num_streams; n++)
+ in->streams[n]->ds->dump_pos = NULL;
+
+ // If dumping (in end==NOPTS mode) doesn't continue at the range that
+ // was written last, we have a discontinuity.
+ if (num_ranges && ranges[num_ranges - 1] != in->current_range)
+ mp_recorder_mark_discontinuity(in->dumper);
+
+ // end=NOPTS means the demuxer output continues to be written to the
+ // dump file.
+ if (end != MP_NOPTS_VALUE || in->dumper_status != CONTROL_OK)
+ dumper_close(in);
+}
+
+// Set the current cache dumping mode. There is only at most 1 dump process
+// active, so calling this aborts the previous dumping. Passing file==NULL
+// stops dumping.
+// This is synchronous with demux_cache_dump_get_status() (i.e. starting or
+// aborting is not asynchronous). On status change, the demuxer wakeup callback
+// is invoked (except for this call).
+// Returns whether dumping was logically started.
+bool demux_cache_dump_set(struct demuxer *demuxer, double start, double end,
+ char *file)
+{
+ struct demux_internal *in = demuxer->in;
+ assert(demuxer == in->d_user);
+
+ bool res = false;
+
+ pthread_mutex_lock(&in->lock);
+
+ start = MP_ADD_PTS(start, -in->ts_offset);
+ end = MP_ADD_PTS(end, -in->ts_offset);
+
+ dumper_close(in);
+
+ if (file && file[0] && start != MP_NOPTS_VALUE) {
+ res = true;
+
+ in->dumper = mp_recorder_create(in->d_thread->global, file,
+ in->streams, in->num_streams);
+
+ // This is not asynchronous and will freeze the shit for a while if the
+ // user is unlucky. It could be moved to a thread with some effort.
+ // General idea: iterate over all cache ranges, dump what intersects.
+ // After that, and if the user requested it, make it dump all newly
+ // received packets, even if it's awkward (consider the case if the
+ // current range is not the last range).
+ dump_cache(in, start, end);
+ }
+
+ pthread_mutex_unlock(&in->lock);
+
+ return res;
+}
+
+// Returns one of CONTROL_*. CONTROL_TRUE means dumping is in progress.
+int demux_cache_dump_get_status(struct demuxer *demuxer)
+{
+ struct demux_internal *in = demuxer->in;
+ pthread_mutex_lock(&in->lock);
+ int status = in->dumper_status;
+ pthread_mutex_unlock(&in->lock);
+ return status;
+}
+
// Used by demuxers to report the amount of transferred bytes. This is for
// streams which circumvent demuxer->stream (stream statistics are handled by
// demux.c itself).
diff --git a/demux/demux.h b/demux/demux.h
index 14d145704a..3716f28b87 100644
--- a/demux/demux.h
+++ b/demux/demux.h
@@ -292,6 +292,10 @@ void demux_close_stream(struct demuxer *demuxer);
void demux_metadata_changed(demuxer_t *demuxer);
void demux_update(demuxer_t *demuxer, double playback_pts);
+bool demux_cache_dump_set(struct demuxer *demuxer, double start, double end,
+ char *file);
+int demux_cache_dump_get_status(struct demuxer *demuxer);
+
bool demux_is_network_cached(demuxer_t *demuxer);
void demux_report_unbuffered_read_bytes(struct demuxer *demuxer, int64_t new);