summaryrefslogtreecommitdiffstats
path: root/demux/demux.c
diff options
context:
space:
mode:
Diffstat (limited to 'demux/demux.c')
-rw-r--r--demux/demux.c204
1 files changed, 204 insertions, 0 deletions
diff --git a/demux/demux.c b/demux/demux.c
index 6c03c71930..19a57a7c32 100644
--- a/demux/demux.c
+++ b/demux/demux.c
@@ -260,6 +260,9 @@ struct demux_internal {
bool force_metadata_update;
int cached_metadata_index; // speed up repeated lookups
+ struct mp_recorder *dumper;
+ int dumper_status;
+
// -- Access from demuxer thread only
bool enable_recording;
struct mp_recorder *recorder;
@@ -380,6 +383,10 @@ struct demux_stream {
bool attached_picture_added;
bool need_wakeup; // call wakeup_cb on next reader_head state change
+ // For demux_internal.dumper. Currently, this is used only temporarily
+ // during blocking dumping.
+ struct demux_packet *dump_pos;
+
// for refresh seeks: pos/dts of last packet returned to reader
int64_t last_ret_pos;
double last_ret_dts;
@@ -430,6 +437,7 @@ static void find_backward_restart_pos(struct demux_stream *ds);
static struct demux_packet *find_seek_target(struct demux_queue *queue,
double pts, int flags);
static void prune_old_packets(struct demux_internal *in);
+static void dumper_close(struct demux_internal *in);
static uint64_t get_foward_buffered_bytes(struct demux_stream *ds)
{
@@ -1060,6 +1068,8 @@ static void demux_shutdown(struct demux_internal *in)
in->recorder = NULL;
}
+ dumper_close(in);
+
if (demuxer->desc->close)
demuxer->desc->close(in->d_thread);
demuxer->priv = NULL;
@@ -1892,6 +1902,20 @@ static void adjust_seek_range_on_packet(struct demux_stream *ds,
}
}
+static void write_dump_packet(struct demux_internal *in, struct demux_packet *dp)
+{
+ assert(in->dumper);
+ assert(in->dumper_status == CONTROL_TRUE);
+
+ struct mp_recorder_sink *sink = mp_recorder_get_sink(in->dumper, dp->stream);
+ if (sink) {
+ mp_recorder_feed_packet(sink, dp);
+ } else {
+ MP_ERR(in, "New stream appeared; stopping recording.\n");
+ in->dumper_status = CONTROL_ERROR;
+ }
+}
+
static void record_packet(struct demux_internal *in, struct demux_packet *dp)
{
// (should preferably be outside of the lock)
@@ -1920,6 +1944,9 @@ static void record_packet(struct demux_internal *in, struct demux_packet *dp)
in->recorder = NULL;
}
}
+
+ if (in->dumper_status == CONTROL_OK)
+ write_dump_packet(in, dp);
}
static void add_packet_locked(struct sh_stream *stream, demux_packet_t *dp)
@@ -3970,6 +3997,183 @@ static void update_cache(struct demux_internal *in)
pthread_mutex_unlock(&in->lock);
}
+static void dumper_close(struct demux_internal *in)
+{
+ if (in->dumper)
+ mp_recorder_destroy(in->dumper);
+ in->dumper = NULL;
+ if (in->dumper_status == CONTROL_TRUE)
+ in->dumper_status = CONTROL_FALSE; // make abort equal to success
+}
+
+static int range_time_compare(const void *p1, const void *p2)
+{
+ struct demux_cached_range *r1 = (void *)p1;
+ struct demux_cached_range *r2 = (void *)p2;
+
+ if (r1->seek_start == r2->seek_start)
+ return 0;
+ return r1->seek_start < r2->seek_start ? -1 : 1;
+}
+
+static void dump_cache(struct demux_internal *in, double start, double end)
+{
+ in->dumper_status = in->dumper ? CONTROL_TRUE : CONTROL_ERROR;
+ if (!in->dumper)
+ return;
+
+ // (only in pathological cases there might be more ranges than allowed)
+ struct demux_cached_range *ranges[MAX_SEEK_RANGES];
+ int num_ranges = 0;
+ for (int n = 0; n < MPMIN(MP_ARRAY_SIZE(ranges), in->num_ranges); n++)
+ ranges[num_ranges++] = in->ranges[n];
+ qsort(ranges, num_ranges, sizeof(ranges[0]), range_time_compare);
+
+ for (int n = 0; n < num_ranges; n++) {
+ struct demux_cached_range *r = ranges[n];
+ if (r->seek_start == MP_NOPTS_VALUE)
+ continue;
+ if (r->seek_end <= start)
+ continue;
+ if (end != MP_NOPTS_VALUE && r->seek_start >= end)
+ continue;
+
+ mp_recorder_mark_discontinuity(in->dumper);
+
+ double pts = start;
+ int flags = 0;
+ adjust_cache_seek_target(in, r, &pts, &flags);
+
+ for (int i = 0; i < r->num_streams; i++) {
+ struct demux_queue *q = r->streams[i];
+ struct demux_stream *ds = q->ds;
+
+ ds->dump_pos = find_seek_target(q, pts, flags);
+ }
+
+ // We need to reinterleave the separate streams somehow, which makes
+ // everything more complex.
+ while (1) {
+ struct demux_packet *next = NULL;
+ double next_dts = MP_NOPTS_VALUE;
+
+ for (int i = 0; i < r->num_streams; i++) {
+ struct demux_stream *ds = r->streams[i]->ds;
+ struct demux_packet *dp = ds->dump_pos;
+
+ if (!dp)
+ continue;
+ assert(dp->stream == ds->index);
+
+ double pdts = MP_PTS_OR_DEF(dp->dts, dp->pts);
+
+ // Check for stream EOF. Note that we don't try to EOF
+ // streams at the same point (e.g. video can take longer
+ // to finish than audio, so the output file will have no
+ // audio for the last part of the video). Too much effort.
+ if (pdts != MP_NOPTS_VALUE && end != MP_NOPTS_VALUE &&
+ pdts >= end && dp->keyframe)
+ {
+ ds->dump_pos = NULL;
+ continue;
+ }
+
+ if (pdts == MP_NOPTS_VALUE || next_dts == MP_NOPTS_VALUE ||
+ pdts < next_dts)
+ {
+ next_dts = pdts;
+ next = dp;
+ }
+ }
+
+ if (!next)
+ break;
+
+ struct demux_stream *ds = in->streams[next->stream]->ds;
+ ds->dump_pos = next->next;
+
+ struct demux_packet *dp = read_packet_from_cache(in, next);
+ if (!dp) {
+ in->dumper_status = CONTROL_ERROR;
+ break;
+ }
+
+ write_dump_packet(in, dp);
+
+ talloc_free(dp);
+ }
+
+ if (in->dumper_status != CONTROL_OK)
+ break;
+ }
+
+ // (strictly speaking unnecessary; for clarity)
+ for (int n = 0; n < in->num_streams; n++)
+ in->streams[n]->ds->dump_pos = NULL;
+
+ // If dumping (in end==NOPTS mode) doesn't continue at the range that
+ // was written last, we have a discontinuity.
+ if (num_ranges && ranges[num_ranges - 1] != in->current_range)
+ mp_recorder_mark_discontinuity(in->dumper);
+
+ // end=NOPTS means the demuxer output continues to be written to the
+ // dump file.
+ if (end != MP_NOPTS_VALUE || in->dumper_status != CONTROL_OK)
+ dumper_close(in);
+}
+
+// Set the current cache dumping mode. There is only at most 1 dump process
+// active, so calling this aborts the previous dumping. Passing file==NULL
+// stops dumping.
+// This is synchronous with demux_cache_dump_get_status() (i.e. starting or
+// aborting is not asynchronous). On status change, the demuxer wakeup callback
+// is invoked (except for this call).
+// Returns whether dumping was logically started.
+bool demux_cache_dump_set(struct demuxer *demuxer, double start, double end,
+ char *file)
+{
+ struct demux_internal *in = demuxer->in;
+ assert(demuxer == in->d_user);
+
+ bool res = false;
+
+ pthread_mutex_lock(&in->lock);
+
+ start = MP_ADD_PTS(start, -in->ts_offset);
+ end = MP_ADD_PTS(end, -in->ts_offset);
+
+ dumper_close(in);
+
+ if (file && file[0] && start != MP_NOPTS_VALUE) {
+ res = true;
+
+ in->dumper = mp_recorder_create(in->d_thread->global, file,
+ in->streams, in->num_streams);
+
+ // This is not asynchronous and will freeze the shit for a while if the
+ // user is unlucky. It could be moved to a thread with some effort.
+ // General idea: iterate over all cache ranges, dump what intersects.
+ // After that, and if the user requested it, make it dump all newly
+ // received packets, even if it's awkward (consider the case if the
+ // current range is not the last range).
+ dump_cache(in, start, end);
+ }
+
+ pthread_mutex_unlock(&in->lock);
+
+ return res;
+}
+
+// Returns one of CONTROL_*. CONTROL_TRUE means dumping is in progress.
+int demux_cache_dump_get_status(struct demuxer *demuxer)
+{
+ struct demux_internal *in = demuxer->in;
+ pthread_mutex_lock(&in->lock);
+ int status = in->dumper_status;
+ pthread_mutex_unlock(&in->lock);
+ return status;
+}
+
// Used by demuxers to report the amount of transferred bytes. This is for
// streams which circumvent demuxer->stream (stream statistics are handled by
// demux.c itself).