diff options
Diffstat (limited to 'demux')
-rw-r--r-- | demux/codec_tags.c | 16 | ||||
-rw-r--r-- | demux/demux.c | 310 | ||||
-rw-r--r-- | demux/demux.h | 24 | ||||
-rw-r--r-- | demux/demux_disc.c | 16 | ||||
-rw-r--r-- | demux/demux_lavf.c | 85 | ||||
-rw-r--r-- | demux/demux_mkv_timeline.c | 18 | ||||
-rw-r--r-- | demux/demux_playlist.c | 1 | ||||
-rw-r--r-- | demux/demux_timeline.c | 6 | ||||
-rw-r--r-- | demux/demux_tv.c | 2 | ||||
-rw-r--r-- | demux/timeline.c | 6 |
10 files changed, 371 insertions, 113 deletions
diff --git a/demux/codec_tags.c b/demux/codec_tags.c index 4178d29774..d111c39ae3 100644 --- a/demux/codec_tags.c +++ b/demux/codec_tags.c @@ -50,14 +50,8 @@ static const char *lookup_tag(int type, uint32_t tag) return id == AV_CODEC_ID_NONE ? NULL : mp_codec_from_av_codec_id(id); } -static const unsigned char guid_pcm[16] = - {0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x10, 0x00, - 0x80, 0x00, 0x00, 0xaa, 0x00, 0x38, 0x9b, 0x71}; -static const unsigned char guid_float[16] = - {0x03, 0x00, 0x00, 0x00, 0x00, 0x00, 0x10, 0x00, - 0x80, 0x00, 0x00, 0xaa, 0x00, 0x38, 0x9b, 0x71}; -// Corresponds to FF_MEDIASUBTYPE_BASE_GUID (plus 4 bytes of padding). -static const unsigned char guid_ffext[16] = +// Corresponds to WMMEDIASUBTYPE_Base. +static const unsigned char guid_ext_base[16] = {0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x10, 0x00, 0x80, 0x00, 0x00, 0xAA, 0x00, 0x38, 0x9B, 0x71}; @@ -74,14 +68,10 @@ static void map_audio_pcm_tag(struct mp_codec_params *c) // WAVEFORMATEXTENSIBLE.SubFormat unsigned char *subformat = c->extradata + 6; - if (memcmp(subformat + 4, guid_ffext + 4, 12) == 0) { + if (memcmp(subformat + 4, guid_ext_base + 4, 12) == 0) { c->codec_tag = AV_RL32(subformat); c->codec = lookup_tag(c->type, c->codec_tag); } - if (memcmp(subformat, guid_pcm, 16) == 0) - c->codec_tag = 0x0; - if (memcmp(subformat, guid_float, 16) == 0) - c->codec_tag = 0x3; // Compressed formats might use this. c->extradata += 22; diff --git a/demux/demux.c b/demux/demux.c index 81d8fa5c2c..45a2ad65b6 100644 --- a/demux/demux.c +++ b/demux/demux.c @@ -35,7 +35,10 @@ #include "mpv_talloc.h" #include "common/msg.h" #include "common/global.h" +#include "common/recorder.h" +#include "misc/thread_tools.h" #include "osdep/atomic.h" +#include "osdep/timer.h" #include "osdep/threads.h" #include "stream/stream.h" @@ -86,6 +89,7 @@ const demuxer_desc_t *const demuxer_list[] = { }; struct demux_opts { + int enable_cache; int64_t max_bytes; int64_t max_bytes_bw; double min_secs; @@ -94,6 +98,7 @@ struct demux_opts { int access_references; int seekable_cache; int create_ccs; + char *record_file; }; #define OPT_BASE_STRUCT struct demux_opts @@ -102,6 +107,8 @@ struct demux_opts { const struct m_sub_options demux_conf = { .opts = (const struct m_option[]){ + OPT_CHOICE("cache", enable_cache, 0, + ({"no", 0}, {"auto", -1}, {"yes", 1})), OPT_DOUBLE("demuxer-readahead-secs", min_secs, M_OPT_MIN, .min = 0), // (The MAX_BYTES sizes may not be accurate because the max field is // of double type.) @@ -113,10 +120,12 @@ const struct m_sub_options demux_conf = { OPT_CHOICE("demuxer-seekable-cache", seekable_cache, 0, ({"auto", -1}, {"no", 0}, {"yes", 1})), OPT_FLAG("sub-create-cc-track", create_ccs, 0), + OPT_STRING("stream-record", record_file, 0), {0} }, .size = sizeof(struct demux_opts), .defaults = &(const struct demux_opts){ + .enable_cache = -1, // auto .max_bytes = 150 * 1024 * 1024, .max_bytes_bw = 50 * 1024 * 1024, .min_secs = 1.0, @@ -129,11 +138,15 @@ const struct m_sub_options demux_conf = { struct demux_internal { struct mp_log *log; + struct demux_opts *opts; + // The demuxer runs potentially in another thread, so we keep two demuxer // structs; the real demuxer can access the shadow struct only. struct demuxer *d_thread; // accessed by demuxer impl. (producer) struct demuxer *d_user; // accessed by player (consumer) + bool owns_stream; + // The lock protects the packet queues (struct demux_stream), // and the fields below. pthread_mutex_t lock; @@ -144,6 +157,7 @@ struct demux_internal { bool thread_terminate; bool threading; + bool shutdown_async; void (*wakeup_cb)(void *ctx); void *wakeup_cb_ctx; @@ -212,11 +226,16 @@ struct demux_internal { // Transient state. double duration; // Cached state. - bool force_cache_update; - struct stream_cache_info stream_cache_info; int64_t stream_size; + int64_t last_speed_query; + uint64_t bytes_per_second; + int64_t next_cache_update; // Updated during init only. char *stream_base_filename; + + // -- Access from demuxer thread only + bool enable_recording; + struct mp_recorder *recorder; }; // A continuous range of cached packets for all enabled streams. @@ -513,23 +532,40 @@ static void update_seek_ranges(struct demux_cached_range *range) range->is_bof = true; range->is_eof = true; + double min_start_pts = MP_NOPTS_VALUE; + double max_end_pts = MP_NOPTS_VALUE; + for (int n = 0; n < range->num_streams; n++) { struct demux_queue *queue = range->streams[n]; if (queue->ds->selected && queue->ds->eager) { - range->seek_start = MP_PTS_MAX(range->seek_start, queue->seek_start); - range->seek_end = MP_PTS_MIN(range->seek_end, queue->seek_end); + if (queue->is_bof) { + min_start_pts = MP_PTS_MIN(min_start_pts, queue->seek_start); + } else { + range->seek_start = + MP_PTS_MAX(range->seek_start, queue->seek_start); + } + + if (queue->is_eof) { + max_end_pts = MP_PTS_MAX(max_end_pts, queue->seek_end); + } else { + range->seek_end = MP_PTS_MIN(range->seek_end, queue->seek_end); + } range->is_eof &= queue->is_eof; range->is_bof &= queue->is_bof; - if (queue->seek_start >= queue->seek_end) { - range->seek_start = range->seek_end = MP_NOPTS_VALUE; - break; - } + bool empty = queue->is_eof && !queue->head; + if (queue->seek_start >= queue->seek_end && !empty) + goto broken; } } + if (range->is_eof) + range->seek_end = max_end_pts; + if (range->is_bof) + range->seek_start = min_start_pts; + // Sparse stream behavior is not very clearly defined, but usually we don't // want it to restrict the range of other streams, unless // This is incorrect in any of these cases: @@ -557,7 +593,12 @@ static void update_seek_ranges(struct demux_cached_range *range) } if (range->seek_start >= range->seek_end) - range->seek_start = range->seek_end = MP_NOPTS_VALUE; + goto broken; + + return; + +broken: + range->seek_start = range->seek_end = MP_NOPTS_VALUE; } // Remove queue->head from the queue. Does not update in->fw_bytes/in->fw_packs. @@ -925,35 +966,113 @@ int demux_get_num_stream(struct demuxer *demuxer) return r; } -void free_demuxer(demuxer_t *demuxer) +static void demux_shutdown(struct demux_internal *in) { - if (!demuxer) - return; - struct demux_internal *in = demuxer->in; - assert(demuxer == in->d_user); + struct demuxer *demuxer = in->d_user; - demux_stop_thread(demuxer); + if (in->recorder) { + mp_recorder_destroy(in->recorder); + in->recorder = NULL; + } if (demuxer->desc->close) demuxer->desc->close(in->d_thread); + demuxer->priv = NULL; + in->d_thread->priv = NULL; demux_flush(demuxer); assert(in->total_bytes == 0); + if (in->owns_stream) + free_stream(demuxer->stream); + demuxer->stream = NULL; +} + +static void demux_dealloc(struct demux_internal *in) +{ for (int n = 0; n < in->num_streams; n++) talloc_free(in->streams[n]); pthread_mutex_destroy(&in->lock); pthread_cond_destroy(&in->wakeup); - talloc_free(demuxer); + talloc_free(in->d_user); } -void free_demuxer_and_stream(struct demuxer *demuxer) +void demux_free(struct demuxer *demuxer) { if (!demuxer) return; - struct stream *s = demuxer->stream; - free_demuxer(demuxer); - free_stream(s); + struct demux_internal *in = demuxer->in; + assert(demuxer == in->d_user); + + demux_stop_thread(demuxer); + demux_shutdown(in); + demux_dealloc(in); +} + +// Start closing the demuxer and eventually freeing the demuxer asynchronously. +// You must not access the demuxer once this has been started. Once the demuxer +// is shutdown, the wakeup callback is invoked. Then you need to call +// demux_free_async_finish() to end the operation (it must not be called from +// the wakeup callback). +// This can return NULL. Then the demuxer cannot be free'd asynchronously, and +// you need to call demux_free() instead. +struct demux_free_async_state *demux_free_async(struct demuxer *demuxer) +{ + struct demux_internal *in = demuxer->in; + assert(demuxer == in->d_user); + + if (!in->threading) + return NULL; + + pthread_mutex_lock(&in->lock); + in->thread_terminate = true; + in->shutdown_async = true; + pthread_cond_signal(&in->wakeup); + pthread_mutex_unlock(&in->lock); + + return (struct demux_free_async_state *)demuxer->in; // lies +} + +// As long as state is valid, you can call this to request immediate abort. +// Roughly behaves as demux_cancel_and_free(), except you still need to wait +// for the result. +void demux_free_async_force(struct demux_free_async_state *state) +{ + struct demux_internal *in = (struct demux_internal *)state; // reverse lies + + mp_cancel_trigger(in->d_user->cancel); +} + +// Check whether the demuxer is shutdown yet. If not, return false, and you +// need to call this again in the future (preferably after you were notified by +// the wakeup callback). If yes, deallocate all state, and return true (in +// particular, the state ptr becomes invalid, and the wakeup callback will never +// be called again). +bool demux_free_async_finish(struct demux_free_async_state *state) +{ + struct demux_internal *in = (struct demux_internal *)state; // reverse lies + + pthread_mutex_lock(&in->lock); + bool busy = in->shutdown_async; + pthread_mutex_unlock(&in->lock); + + if (busy) + return false; + + demux_stop_thread(in->d_user); + demux_dealloc(in); + return true; +} + +// Like demux_free(), but trigger an abort, which will force the demuxer to +// terminate immediately. If this wasn't opened with demux_open_url(), there is +// some chance this will accidentally abort other things via demuxer->cancel. +void demux_cancel_and_free(struct demuxer *demuxer) +{ + if (!demuxer) + return; + mp_cancel_trigger(demuxer->cancel); + demux_free(demuxer); } // Start the demuxer thread, which reads ahead packets on its own. @@ -1402,6 +1521,33 @@ void demux_add_packet(struct sh_stream *stream, demux_packet_t *dp) } } + // (should preferable be outside of the lock) + if (in->enable_recording && !in->recorder && + in->opts->record_file && in->opts->record_file[0]) + { + // Later failures shouldn't make it retry and overwrite the previously + // recorded file. + in->enable_recording = false; + + in->recorder = + mp_recorder_create(in->d_thread->global, in->opts->record_file, + in->streams, in->num_streams); + if (!in->recorder) + MP_ERR(in, "Disabling recording.\n"); + } + + if (in->recorder) { + struct mp_recorder_sink *sink = + mp_recorder_get_sink(in->recorder, dp->stream); + if (sink) { + mp_recorder_feed_packet(sink, dp); + } else { + MP_ERR(in, "New stream appeared; stopping recording.\n"); + mp_recorder_destroy(in->recorder); + in->recorder = NULL; + } + } + wakeup_ds(ds); pthread_mutex_unlock(&in->lock); } @@ -1597,9 +1743,6 @@ static void execute_trackswitch(struct demux_internal *in) if (in->d_thread->desc->control) in->d_thread->desc->control(in->d_thread, DEMUXER_CTRL_SWITCHED_TRACKS, 0); - stream_control(in->d_thread->stream, STREAM_CTRL_SET_READAHEAD, - &(int){any_selected}); - pthread_mutex_lock(&in->lock); } @@ -1648,11 +1791,10 @@ static bool thread_work(struct demux_internal *in) if (read_packet(in)) return true; // read_packet unlocked, so recheck conditions } - if (in->force_cache_update) { + if (mp_time_us() >= in->next_cache_update) { pthread_mutex_unlock(&in->lock); update_cache(in); pthread_mutex_lock(&in->lock); - in->force_cache_update = false; return true; } return false; @@ -1663,12 +1805,24 @@ static void *demux_thread(void *pctx) struct demux_internal *in = pctx; mpthread_set_name("demux"); pthread_mutex_lock(&in->lock); + while (!in->thread_terminate) { if (thread_work(in)) continue; pthread_cond_signal(&in->wakeup); - pthread_cond_wait(&in->wakeup, &in->lock); + struct timespec until = mp_time_us_to_timespec(in->next_cache_update); + pthread_cond_timedwait(&in->wakeup, &in->lock, &until); + } + + if (in->shutdown_async) { + pthread_mutex_unlock(&in->lock); + demux_shutdown(in); + pthread_mutex_lock(&in->lock); + in->shutdown_async = false; + if (in->wakeup_cb) + in->wakeup_cb(in->wakeup_cb_ctx); } + pthread_mutex_unlock(&in->lock); return NULL; } @@ -2040,14 +2194,6 @@ static void update_final_metadata(demuxer_t *demuxer) assert(demuxer == demuxer->in->d_user); struct demux_internal *in = demuxer->in; - int num_streams = MPMIN(in->num_streams, demuxer->num_update_stream_tags); - for (int n = 0; n < num_streams; n++) { - struct sh_stream *sh = in->streams[n]; - // (replace them even if unnecessary, simpler and doesn't hurt) - if (sh->ds->tags_reader) - mp_tags_replace(sh->tags, sh->ds->tags_reader->sh); - } - struct mp_packet_tags *tags = in->master_stream ? in->master_stream->tags_reader : NULL; @@ -2172,6 +2318,19 @@ static void fixup_metadata(struct demux_internal *in) } } +// Return whether "heavy" caching on this stream is enabled. By default, this +// corresponds to whether the source stream is considered in the network. The +// only effect should be adjusting display behavior (of cache stats etc.), and +// possibly switching between which set of options influence cache settings. +bool demux_is_network_cached(demuxer_t *demuxer) +{ + struct demux_internal *in = demuxer->in; + bool use_cache = demuxer->stream->streaming; + if (in->opts->enable_cache >= 0) + use_cache = in->opts->enable_cache == 1; + return use_cache; +} + static struct demuxer *open_given_type(struct mpv_global *global, struct mp_log *log, const struct demuxer_desc *desc, @@ -2187,6 +2346,7 @@ static struct demuxer *open_given_type(struct mpv_global *global, *demuxer = (struct demuxer) { .desc = desc, .stream = stream, + .cancel = stream->cancel, .seekable = stream->seekable, .filepos = -1, .global = global, @@ -2197,14 +2357,14 @@ static struct demuxer *open_given_type(struct mpv_global *global, .access_references = opts->access_references, .events = DEMUX_EVENT_ALL, .duration = -1, + .extended_ctrls = stream->extended_ctrls, }; demuxer->seekable = stream->seekable; - if (demuxer->stream->underlying && !demuxer->stream->underlying->seekable) - demuxer->seekable = false; struct demux_internal *in = demuxer->in = talloc_ptrtype(demuxer, in); *in = (struct demux_internal){ .log = demuxer->log, + .opts = opts, .d_thread = talloc(demuxer, struct demuxer), .d_user = demuxer, .min_secs = opts->min_secs, @@ -2214,6 +2374,7 @@ static struct demuxer *open_given_type(struct mpv_global *global, .highest_av_pts = MP_NOPTS_VALUE, .seeking_in_progress = MP_NOPTS_VALUE, .demux_ts = MP_NOPTS_VALUE, + .enable_recording = params && params->stream_record, }; pthread_mutex_init(&in->lock, NULL); pthread_cond_init(&in->wakeup, NULL); @@ -2265,10 +2426,8 @@ static struct demuxer *open_given_type(struct mpv_global *global, fixup_metadata(in); in->events = DEMUX_EVENT_ALL; demux_update(demuxer); - stream_control(demuxer->stream, STREAM_CTRL_SET_READAHEAD, - &(int){params ? params->initial_readahead : false}); int seekable = opts->seekable_cache; - if (demuxer->is_network || stream->caching) { + if (demux_is_network_cached(demuxer)) { in->min_secs = MPMAX(in->min_secs, opts->min_secs_cache); if (seekable < 0) seekable = 1; @@ -2292,7 +2451,7 @@ static struct demuxer *open_given_type(struct mpv_global *global, return demuxer; } - free_demuxer(demuxer); + demux_free(demuxer); return NULL; } @@ -2301,6 +2460,9 @@ static const int d_request[] = {DEMUX_CHECK_REQUEST, -1}; static const int d_force[] = {DEMUX_CHECK_FORCE, -1}; // params can be NULL +// If params->does_not_own_stream==false, this does _not_ free the stream if +// opening fails. But if it succeeds, a later demux_free() call will free the +// stream. struct demuxer *demux_open(struct stream *stream, struct demuxer_params *params, struct mpv_global *global) { @@ -2340,6 +2502,8 @@ struct demuxer *demux_open(struct stream *stream, struct demuxer_params *params, if (demuxer) { talloc_steal(demuxer, log); log = NULL; + demuxer->in->owns_stream = + params ? !params->does_not_own_stream : true; goto done; } } @@ -2353,28 +2517,36 @@ done: // Convenience function: open the stream, enable the cache (according to params // and global opts.), open the demuxer. -// (use free_demuxer_and_stream() to free the underlying stream too) // Also for some reason may close the opened stream if it's not needed. +// demuxer->cancel is not the cancel parameter, but is its own object that will +// be a slave (mp_cancel_set_parent()) to provided cancel object. +// demuxer->cancel is automatically freed. struct demuxer *demux_open_url(const char *url, - struct demuxer_params *params, - struct mp_cancel *cancel, - struct mpv_global *global) + struct demuxer_params *params, + struct mp_cancel *cancel, + struct mpv_global *global) { struct demuxer_params dummy = {0}; if (!params) params = &dummy; + assert(!params->does_not_own_stream); // API user error + struct mp_cancel *priv_cancel = mp_cancel_new(NULL); + if (cancel) + mp_cancel_set_parent(priv_cancel, cancel); struct stream *s = stream_create(url, STREAM_READ | params->stream_flags, - cancel, global); - if (!s) + priv_cancel, global); + if (!s) { + talloc_free(priv_cancel); return NULL; - if (!params->disable_cache) - stream_enable_cache_defaults(&s); + } struct demuxer *d = demux_open(s, params, global); if (d) { + talloc_steal(d->in, priv_cancel); demux_maybe_replace_stream(d); } else { params->demuxer_failed = true; free_stream(s); + talloc_free(priv_cancel); } return d; } @@ -2860,8 +3032,10 @@ static int chapter_compare(const void *p1, const void *p2) static void demuxer_sort_chapters(demuxer_t *demuxer) { - qsort(demuxer->chapters, demuxer->num_chapters, - sizeof(struct demux_chapter), chapter_compare); + if (demuxer->num_chapters) { + qsort(demuxer->chapters, demuxer->num_chapters, + sizeof(struct demux_chapter), chapter_compare); + } } int demuxer_add_chapter(demuxer_t *demuxer, char *name, @@ -2921,15 +3095,16 @@ static void update_cache(struct demux_internal *in) // Don't lock while querying the stream. struct mp_tags *stream_metadata = NULL; - struct stream_cache_info stream_cache_info = {.size = -1}; int64_t stream_size = stream_get_size(stream); stream_control(stream, STREAM_CTRL_GET_METADATA, &stream_metadata); - stream_control(stream, STREAM_CTRL_GET_CACHE_INFO, &stream_cache_info); + + demuxer->total_unbuffered_read_bytes += stream->total_unbuffered_read_bytes; + stream->total_unbuffered_read_bytes = 0; pthread_mutex_lock(&in->lock); + in->stream_size = stream_size; - in->stream_cache_info = stream_cache_info; if (stream_metadata) { for (int n = 0; n < in->num_streams; n++) { struct demux_stream *ds = in->streams[n]->ds; @@ -2938,24 +3113,28 @@ static void update_cache(struct demux_internal *in) } talloc_free(stream_metadata); } + + in->next_cache_update = INT64_MAX; + + int64_t now = mp_time_us(); + int64_t diff = now - in->last_speed_query; + if (diff >= MP_SECOND_US) { + uint64_t bytes = demuxer->total_unbuffered_read_bytes; + demuxer->total_unbuffered_read_bytes = 0; + in->last_speed_query = now; + in->bytes_per_second = bytes / (diff / (double)MP_SECOND_US); + } + // The idea is to update as long as there is "activity". + if (in->bytes_per_second) + in->next_cache_update = now + MP_SECOND_US + 1; + pthread_mutex_unlock(&in->lock); } // must be called locked static int cached_stream_control(struct demux_internal *in, int cmd, void *arg) { - // If the cache is active, wake up the thread to possibly update cache state. - if (in->stream_cache_info.size >= 0) { - in->force_cache_update = true; - pthread_cond_signal(&in->wakeup); - } - switch (cmd) { - case STREAM_CTRL_GET_CACHE_INFO: - if (in->stream_cache_info.size < 0) - return STREAM_UNSUPPORTED; - *(struct stream_cache_info *)arg = in->stream_cache_info; - return STREAM_OK; case STREAM_CTRL_GET_SIZE: if (in->stream_size < 0) return STREAM_UNSUPPORTED; @@ -3005,6 +3184,7 @@ static int cached_demux_control(struct demux_internal *in, int cmd, void *arg) .seeking = in->seeking_in_progress, .low_level_seeks = in->low_level_seeks, .ts_last = in->demux_ts, + .bytes_per_second = in->bytes_per_second, }; bool any_packets = false; for (int n = 0; n < in->num_streams; n++) { @@ -3123,7 +3303,7 @@ int demux_stream_control(demuxer_t *demuxer, int ctrl, void *arg) bool demux_cancel_test(struct demuxer *demuxer) { - return mp_cancel_test(demuxer->stream->cancel); + return mp_cancel_test(demuxer->cancel); } struct demux_chapter *demux_copy_chapter_data(struct demux_chapter *c, int num) diff --git a/demux/demux.h b/demux/demux.h index 0150ce1a1a..1cc740589a 100644 --- a/demux/demux.h +++ b/demux/demux.h @@ -56,6 +56,7 @@ struct demux_ctrl_reader_state { double seeking; // current low level seek target, or NOPTS int low_level_seeks; // number of started low level seeks double ts_last; // approx. timestamp of demuxer position + uint64_t bytes_per_second; // low level statistics // Positions that can be seeked to without incurring the latency of a low // level seek. int num_seek_ranges; @@ -174,12 +175,12 @@ struct demuxer_params { bool *matroska_was_valid; struct timeline *timeline; bool disable_timeline; - bool initial_readahead; bstr init_fragment; bool skip_lavf_probing; + bool does_not_own_stream; // if false, stream is free'd on demux_free() + bool stream_record; // if true, enable stream recording if option is set // -- demux_open_url() only int stream_flags; - bool disable_cache; // result bool demuxer_failed; }; @@ -202,6 +203,7 @@ typedef struct demuxer { bool fully_read; bool is_network; // opened directly from a network stream bool access_references; // allow opening other files/URLs + bool extended_ctrls; // supports some of BD/DVD/DVB/TV controls // Bitmask of DEMUX_EVENT_* int events; @@ -230,8 +232,12 @@ typedef struct demuxer { // internal to demux.c struct demux_internal *in; - struct mp_tags **update_stream_tags; - int num_update_stream_tags; + + // Triggered when ending demuxing forcefully. Usually bound to the stream too. + struct mp_cancel *cancel; + + // Demuxer thread only. + uint64_t total_unbuffered_read_bytes; // Since the demuxer can run in its own thread, and the stream is not // thread-safe, only the demuxer is allowed to access the stream directly. @@ -245,8 +251,13 @@ typedef struct { int aid, vid, sid; //audio, video and subtitle id } demux_program_t; -void free_demuxer(struct demuxer *demuxer); -void free_demuxer_and_stream(struct demuxer *demuxer); +void demux_free(struct demuxer *demuxer); +void demux_cancel_and_free(struct demuxer *demuxer); + +struct demux_free_async_state; +struct demux_free_async_state *demux_free_async(struct demuxer *demuxer); +void demux_free_async_force(struct demux_free_async_state *state); +bool demux_free_async_finish(struct demux_free_async_state *state); void demux_add_packet(struct sh_stream *stream, demux_packet_t *dp); void demuxer_feed_caption(struct sh_stream *stream, demux_packet_t *dp); @@ -307,6 +318,7 @@ void demux_metadata_changed(demuxer_t *demuxer); void demux_update(demuxer_t *demuxer); void demux_disable_cache(demuxer_t *demuxer); +bool demux_is_network_cached(demuxer_t *demuxer); struct sh_stream *demuxer_stream_by_demuxer_id(struct demuxer *d, enum stream_type t, int id); diff --git a/demux/demux_disc.c b/demux/demux_disc.c index 6ab17e69c8..e5c63cea17 100644 --- a/demux/demux_disc.c +++ b/demux/demux_disc.c @@ -285,15 +285,15 @@ static int d_open(demuxer_t *demuxer, enum demux_check check) if (check != DEMUX_CHECK_FORCE) return -1; - struct demuxer_params params = {.force_format = "+lavf"}; + struct demuxer_params params = { + .force_format = "+lavf", + .does_not_own_stream = true, + }; struct stream *cur = demuxer->stream; const char *sname = ""; - while (cur) { - if (cur->info) - sname = cur->info->name; - cur = cur->underlying; // down the caching chain - } + if (cur->info) + sname = cur->info->name; p->is_cdda = strcmp(sname, "cdda") == 0; p->is_dvd = strcmp(sname, "dvd") == 0 || @@ -342,13 +342,15 @@ static int d_open(demuxer_t *demuxer, enum demux_check check) if (stream_control(demuxer->stream, STREAM_CTRL_GET_TIME_LENGTH, &len) >= 1) demuxer->duration = len; + demuxer->extended_ctrls = true; + return 0; } static void d_close(demuxer_t *demuxer) { struct priv *p = demuxer->priv; - free_demuxer(p->slave); + demux_free(p->slave); } static int d_control(demuxer_t *demuxer, int cmd, void *arg) diff --git a/demux/demux_lavf.c b/demux/demux_lavf.c index 4802278af5..8ec856d0c8 100644 --- a/demux/demux_lavf.c +++ b/demux/demux_lavf.c @@ -43,6 +43,7 @@ #include "common/av_common.h" #include "misc/bstr.h" #include "misc/charset_conv.h" +#include "misc/thread_tools.h" #include "stream/stream.h" #include "demux.h" @@ -191,6 +192,11 @@ static const struct format_hack format_hacks[] = { {0} }; +struct nested_stream { + AVIOContext *id; + int64_t last_bytes; +}; + typedef struct lavf_priv { struct stream *stream; bool own_stream; @@ -210,8 +216,29 @@ typedef struct lavf_priv { struct demux_lavf_opts *opts; double mf_fps; + + // Proxying nested streams. + struct nested_stream *nested; + int num_nested; + int (*default_io_open)(struct AVFormatContext *s, AVIOContext **pb, + const char *url, int flags, AVDictionary **options); + void (*default_io_close)(struct AVFormatContext *s, AVIOContext *pb); } lavf_priv_t; +static void update_read_stats(struct demuxer *demuxer) +{ + lavf_priv_t *priv = demuxer->priv; + + for (int n = 0; n < priv->num_nested; n++) { + struct nested_stream *nest = &priv->nested[n]; + + int64_t cur = nest->id->bytes_read; + int64_t new = cur - nest->last_bytes; + nest->last_bytes = cur; + demuxer->total_unbuffered_read_bytes += new; + } +} + // At least mp4 has name="mov,mp4,m4a,3gp,3g2,mj2", so we split the name // on "," in general. static bool matches_avinputformat_name(struct lavf_priv *priv, @@ -792,8 +819,7 @@ static void update_metadata(demuxer_t *demuxer) static int interrupt_cb(void *ctx) { struct demuxer *demuxer = ctx; - lavf_priv_t *priv = demuxer->priv; - return mp_cancel_test(priv->stream->cancel); + return mp_cancel_test(demuxer->cancel); } static int block_io_open(struct AVFormatContext *s, AVIOContext **pb, @@ -804,6 +830,38 @@ static int block_io_open(struct AVFormatContext *s, AVIOContext **pb, return AVERROR(EACCES); } +static int nested_io_open(struct AVFormatContext *s, AVIOContext **pb, + const char *url, int flags, AVDictionary **options) +{ + struct demuxer *demuxer = s->opaque; + lavf_priv_t *priv = demuxer->priv; + + int r = priv->default_io_open(s, pb, url, flags, options); + if (r >= 0) { + struct nested_stream nest = { + .id = *pb, + }; + MP_TARRAY_APPEND(priv, priv->nested, priv->num_nested, nest); + } + return r; +} + +static void nested_io_close(struct AVFormatContext *s, AVIOContext *pb) +{ + struct demuxer *demuxer = s->opaque; + lavf_priv_t *priv = demuxer->priv; + + for (int n = 0; n < priv->num_nested; n++) { + if (priv->nested[n].id == pb) { + MP_TARRAY_REMOVE_AT(priv->nested, priv->num_nested, n); + break; + } + } + + + priv->default_io_close(s, pb); +} + static int demux_open_lavf(demuxer_t *demuxer, enum demux_check check) { AVFormatContext *avfc; @@ -898,8 +956,14 @@ static int demux_open_lavf(demuxer_t *demuxer, enum demux_check check) }; avfc->opaque = demuxer; - if (!demuxer->access_references) + if (demuxer->access_references) { + priv->default_io_open = avfc->io_open; + priv->default_io_close = avfc->io_close; + avfc->io_open = nested_io_open; + avfc->io_close = nested_io_close; + } else { avfc->io_open = block_io_open; + } mp_set_avdict(&dopts, lavfdopts->avopts); @@ -1001,6 +1065,7 @@ static int demux_lavf_fill_buffer(demuxer_t *demux) AVPacket *pkt = &(AVPacket){0}; int r = av_read_frame(priv->avfc, pkt); + update_read_stats(demux); if (r < 0) { av_packet_unref(pkt); if (r == AVERROR(EAGAIN)) @@ -1091,6 +1156,8 @@ static void demux_seek_lavf(demuxer_t *demuxer, double seek_pts, int flags) av_strerror(r, buf, sizeof(buf)); MP_VERBOSE(demuxer, "Seek failed (%s)\n", buf); } + + update_read_stats(demuxer); } static int demux_lavf_control(demuxer_t *demuxer, int cmd, void *arg) @@ -1187,7 +1254,19 @@ static void demux_close_lavf(demuxer_t *demuxer) { lavf_priv_t *priv = demuxer->priv; if (priv) { + // This will be a dangling pointer; but see below. + AVIOContext *leaking = priv->avfc ? priv->avfc->pb : NULL; avformat_close_input(&priv->avfc); + // The ffmpeg garbage breaks its own API yet again: hls.c will call + // io_open on the main playlist, but never calls io_close. This happens + // to work out for us (since we don't really use custom I/O), but it's + // still weird. Compensate. + if (priv->num_nested == 1 && priv->nested[0].id == leaking) + priv->num_nested = 0; + if (priv->num_nested) {< |