diff options
Diffstat (limited to 'demux/demux.c')
-rw-r--r-- | demux/demux.c | 204 |
1 files changed, 204 insertions, 0 deletions
diff --git a/demux/demux.c b/demux/demux.c index 6c03c71930..19a57a7c32 100644 --- a/demux/demux.c +++ b/demux/demux.c @@ -260,6 +260,9 @@ struct demux_internal { bool force_metadata_update; int cached_metadata_index; // speed up repeated lookups + struct mp_recorder *dumper; + int dumper_status; + // -- Access from demuxer thread only bool enable_recording; struct mp_recorder *recorder; @@ -380,6 +383,10 @@ struct demux_stream { bool attached_picture_added; bool need_wakeup; // call wakeup_cb on next reader_head state change + // For demux_internal.dumper. Currently, this is used only temporarily + // during blocking dumping. + struct demux_packet *dump_pos; + // for refresh seeks: pos/dts of last packet returned to reader int64_t last_ret_pos; double last_ret_dts; @@ -430,6 +437,7 @@ static void find_backward_restart_pos(struct demux_stream *ds); static struct demux_packet *find_seek_target(struct demux_queue *queue, double pts, int flags); static void prune_old_packets(struct demux_internal *in); +static void dumper_close(struct demux_internal *in); static uint64_t get_foward_buffered_bytes(struct demux_stream *ds) { @@ -1060,6 +1068,8 @@ static void demux_shutdown(struct demux_internal *in) in->recorder = NULL; } + dumper_close(in); + if (demuxer->desc->close) demuxer->desc->close(in->d_thread); demuxer->priv = NULL; @@ -1892,6 +1902,20 @@ static void adjust_seek_range_on_packet(struct demux_stream *ds, } } +static void write_dump_packet(struct demux_internal *in, struct demux_packet *dp) +{ + assert(in->dumper); + assert(in->dumper_status == CONTROL_TRUE); + + struct mp_recorder_sink *sink = mp_recorder_get_sink(in->dumper, dp->stream); + if (sink) { + mp_recorder_feed_packet(sink, dp); + } else { + MP_ERR(in, "New stream appeared; stopping recording.\n"); + in->dumper_status = CONTROL_ERROR; + } +} + static void record_packet(struct demux_internal *in, struct demux_packet *dp) { // (should preferably be outside of the lock) @@ -1920,6 +1944,9 @@ static void record_packet(struct demux_internal *in, struct demux_packet *dp) in->recorder = NULL; } } + + if (in->dumper_status == CONTROL_OK) + write_dump_packet(in, dp); } static void add_packet_locked(struct sh_stream *stream, demux_packet_t *dp) @@ -3970,6 +3997,183 @@ static void update_cache(struct demux_internal *in) pthread_mutex_unlock(&in->lock); } +static void dumper_close(struct demux_internal *in) +{ + if (in->dumper) + mp_recorder_destroy(in->dumper); + in->dumper = NULL; + if (in->dumper_status == CONTROL_TRUE) + in->dumper_status = CONTROL_FALSE; // make abort equal to success +} + +static int range_time_compare(const void *p1, const void *p2) +{ + struct demux_cached_range *r1 = (void *)p1; + struct demux_cached_range *r2 = (void *)p2; + + if (r1->seek_start == r2->seek_start) + return 0; + return r1->seek_start < r2->seek_start ? -1 : 1; +} + +static void dump_cache(struct demux_internal *in, double start, double end) +{ + in->dumper_status = in->dumper ? CONTROL_TRUE : CONTROL_ERROR; + if (!in->dumper) + return; + + // (only in pathological cases there might be more ranges than allowed) + struct demux_cached_range *ranges[MAX_SEEK_RANGES]; + int num_ranges = 0; + for (int n = 0; n < MPMIN(MP_ARRAY_SIZE(ranges), in->num_ranges); n++) + ranges[num_ranges++] = in->ranges[n]; + qsort(ranges, num_ranges, sizeof(ranges[0]), range_time_compare); + + for (int n = 0; n < num_ranges; n++) { + struct demux_cached_range *r = ranges[n]; + if (r->seek_start == MP_NOPTS_VALUE) + continue; + if (r->seek_end <= start) + continue; + if (end != MP_NOPTS_VALUE && r->seek_start >= end) + continue; + + mp_recorder_mark_discontinuity(in->dumper); + + double pts = start; + int flags = 0; + adjust_cache_seek_target(in, r, &pts, &flags); + + for (int i = 0; i < r->num_streams; i++) { + struct demux_queue *q = r->streams[i]; + struct demux_stream *ds = q->ds; + + ds->dump_pos = find_seek_target(q, pts, flags); + } + + // We need to reinterleave the separate streams somehow, which makes + // everything more complex. + while (1) { + struct demux_packet *next = NULL; + double next_dts = MP_NOPTS_VALUE; + + for (int i = 0; i < r->num_streams; i++) { + struct demux_stream *ds = r->streams[i]->ds; + struct demux_packet *dp = ds->dump_pos; + + if (!dp) + continue; + assert(dp->stream == ds->index); + + double pdts = MP_PTS_OR_DEF(dp->dts, dp->pts); + + // Check for stream EOF. Note that we don't try to EOF + // streams at the same point (e.g. video can take longer + // to finish than audio, so the output file will have no + // audio for the last part of the video). Too much effort. + if (pdts != MP_NOPTS_VALUE && end != MP_NOPTS_VALUE && + pdts >= end && dp->keyframe) + { + ds->dump_pos = NULL; + continue; + } + + if (pdts == MP_NOPTS_VALUE || next_dts == MP_NOPTS_VALUE || + pdts < next_dts) + { + next_dts = pdts; + next = dp; + } + } + + if (!next) + break; + + struct demux_stream *ds = in->streams[next->stream]->ds; + ds->dump_pos = next->next; + + struct demux_packet *dp = read_packet_from_cache(in, next); + if (!dp) { + in->dumper_status = CONTROL_ERROR; + break; + } + + write_dump_packet(in, dp); + + talloc_free(dp); + } + + if (in->dumper_status != CONTROL_OK) + break; + } + + // (strictly speaking unnecessary; for clarity) + for (int n = 0; n < in->num_streams; n++) + in->streams[n]->ds->dump_pos = NULL; + + // If dumping (in end==NOPTS mode) doesn't continue at the range that + // was written last, we have a discontinuity. + if (num_ranges && ranges[num_ranges - 1] != in->current_range) + mp_recorder_mark_discontinuity(in->dumper); + + // end=NOPTS means the demuxer output continues to be written to the + // dump file. + if (end != MP_NOPTS_VALUE || in->dumper_status != CONTROL_OK) + dumper_close(in); +} + +// Set the current cache dumping mode. There is only at most 1 dump process +// active, so calling this aborts the previous dumping. Passing file==NULL +// stops dumping. +// This is synchronous with demux_cache_dump_get_status() (i.e. starting or +// aborting is not asynchronous). On status change, the demuxer wakeup callback +// is invoked (except for this call). +// Returns whether dumping was logically started. +bool demux_cache_dump_set(struct demuxer *demuxer, double start, double end, + char *file) +{ + struct demux_internal *in = demuxer->in; + assert(demuxer == in->d_user); + + bool res = false; + + pthread_mutex_lock(&in->lock); + + start = MP_ADD_PTS(start, -in->ts_offset); + end = MP_ADD_PTS(end, -in->ts_offset); + + dumper_close(in); + + if (file && file[0] && start != MP_NOPTS_VALUE) { + res = true; + + in->dumper = mp_recorder_create(in->d_thread->global, file, + in->streams, in->num_streams); + + // This is not asynchronous and will freeze the shit for a while if the + // user is unlucky. It could be moved to a thread with some effort. + // General idea: iterate over all cache ranges, dump what intersects. + // After that, and if the user requested it, make it dump all newly + // received packets, even if it's awkward (consider the case if the + // current range is not the last range). + dump_cache(in, start, end); + } + + pthread_mutex_unlock(&in->lock); + + return res; +} + +// Returns one of CONTROL_*. CONTROL_TRUE means dumping is in progress. +int demux_cache_dump_get_status(struct demuxer *demuxer) +{ + struct demux_internal *in = demuxer->in; + pthread_mutex_lock(&in->lock); + int status = in->dumper_status; + pthread_mutex_unlock(&in->lock); + return status; +} + // Used by demuxers to report the amount of transferred bytes. This is for // streams which circumvent demuxer->stream (stream statistics are handled by // demux.c itself). |