diff options
Diffstat (limited to 'demux/demux.c')
-rw-r--r-- | demux/demux.c | 308 |
1 files changed, 201 insertions, 107 deletions
diff --git a/demux/demux.c b/demux/demux.c index 22b1a47ab1..afe8d2b51f 100644 --- a/demux/demux.c +++ b/demux/demux.c @@ -34,6 +34,7 @@ #include "mpv_talloc.h" #include "common/msg.h" #include "common/global.h" +#include "osdep/atomic.h" #include "osdep/threads.h" #include "stream/stream.h" @@ -125,13 +126,10 @@ struct demux_internal { // The demuxer runs potentially in another thread, so we keep two demuxer // structs; the real demuxer can access the shadow struct only. - // Since demuxer and user threads both don't use locks, a third demuxer - // struct d_buffer is used to copy data between them in a synchronized way. struct demuxer *d_thread; // accessed by demuxer impl. (producer) struct demuxer *d_user; // accessed by player (consumer) - struct demuxer *d_buffer; // protected by lock; used to sync d_user/thread - // The lock protects the packet queues (struct demux_stream), d_buffer, + // The lock protects the packet queues (struct demux_stream), // and the fields below. pthread_mutex_t lock; pthread_cond_t wakeup; @@ -147,6 +145,13 @@ struct demux_internal { struct sh_stream **streams; int num_streams; + // If non-NULL, a _selected_ stream which is used for global (timed) + // metadata. It will be an arbitrary stream that is hopefully not sparse + // (i.e. not a subtitle stream). This is needed because due to variable + // interleaving multiple streams won't agree whether timed metadata is in + // effect yet at the same time position. + struct demux_stream *master_stream; + int events; bool warned_queue_overflow; @@ -199,9 +204,10 @@ struct demux_internal { bool blocked; + // Transient state. + double duration; // Cached state. bool force_cache_update; - struct mp_tags *stream_metadata; struct stream_cache_info stream_cache_info; int64_t stream_size; // Updated during init only. @@ -305,6 +311,19 @@ struct demux_stream { // for closed captions (demuxer_feed_caption) struct sh_stream *cc; bool ignore_eof; // ignore stream in underrun detection + + // timed metadata + struct mp_packet_tags *tags_demux; // demuxer state (last updated metadata) + struct mp_packet_tags *tags_reader; // reader state (last returned packet) + struct mp_packet_tags *tags_init; // global state at start of demuxing +}; + +// "Snapshot" of the tag state. Refcounted to avoid a copy per packet. +struct mp_packet_tags { + mp_atomic_int64 refcount; + struct mp_tags *demux; // demuxer global tags (normal thing) + struct mp_tags *stream; // byte stream tags (ICY crap) + struct mp_tags *sh; // per sh_stream tags (e.g. OGG) }; // Return "a", or if that is NOPTS, return "def". @@ -404,6 +423,56 @@ static void check_queue_consistency(struct demux_internal *in) } #endif +void mp_packet_tags_unref(struct mp_packet_tags *tags) +{ + if (tags) { + if (atomic_fetch_add(&tags->refcount, -1) == 1) { + talloc_free(tags->sh); + talloc_free(tags->demux); + talloc_free(tags->stream); + talloc_free(tags); + } + } +} + +void mp_packet_tags_setref(struct mp_packet_tags **dst, struct mp_packet_tags *src) +{ + if (src) + atomic_fetch_add(&src->refcount, 1); + mp_packet_tags_unref(*dst); + *dst = src; +} + +static struct mp_tags *tags_dup_or_null(struct mp_tags *t) +{ + return t ? mp_tags_dup(NULL, t) : talloc_zero(NULL, struct mp_tags); +} + +// Return a "deep" copy. If tags==NULL, allocate a new one. +static struct mp_packet_tags *mp_packet_tags_copy(struct mp_packet_tags *tags) +{ + struct mp_packet_tags *new = talloc_ptrtype(NULL, new); + *new = (struct mp_packet_tags){ + .refcount = ATOMIC_VAR_INIT(1), + .demux = tags_dup_or_null(tags ? tags->demux : NULL), + .stream = tags_dup_or_null(tags ? tags->stream : NULL), + .sh = tags_dup_or_null(tags ? tags->sh : NULL), + }; + return new; +} + +// Force a copy if refcount != 1. +// (refcount==1 means we're the unambiguous owner.) +// If *tags==NULL, allocate a blank one. +static void mp_packet_tags_make_writable(struct mp_packet_tags **tags) +{ + if (*tags && atomic_load(&(*tags)->refcount) == 1) + return; + struct mp_packet_tags *new = mp_packet_tags_copy(*tags); + mp_packet_tags_unref(*tags); + *tags = new; +} + static void recompute_buffers(struct demux_stream *ds) { ds->fw_packs = 0; @@ -629,16 +698,25 @@ static void update_stream_selection_state(struct demux_internal *in, bool any_av_streams = false; bool any_streams = false; + struct demux_stream *master = NULL; for (int n = 0; n < in->num_streams; n++) { struct demux_stream *s = in->streams[n]->ds; s->eager = s->selected && !s->sh->attached_picture; - if (s->eager) + if (s->eager) { any_av_streams |= s->type != STREAM_SUB; + if (!master || + (master->type == STREAM_VIDEO && s->type == STREAM_AUDIO)) + { + master = s; + } + } any_streams |= s->selected; } + in->master_stream = master; + // Subtitles are only eagerly read if there are no other eagerly read // streams. if (any_av_streams) { @@ -714,6 +792,14 @@ struct sh_stream *demux_alloc_sh_stream(enum stream_type type) return sh; } +static void ds_destroy(void *ptr) +{ + struct demux_stream *ds = ptr; + mp_packet_tags_unref(ds->tags_init); + mp_packet_tags_unref(ds->tags_reader); + mp_packet_tags_unref(ds->tags_demux); +} + // Add a new sh_stream to the demuxer. Note that as soon as the stream has been // added, it must be immutable, and must not be released (this will happen when // the demuxer is destroyed). @@ -734,6 +820,7 @@ static void demux_add_sh_stream_locked(struct demux_internal *in, .global_correct_dts = true, .global_correct_pos = true, }; + talloc_set_destructor(sh->ds, ds_destroy); if (!sh->codec->codec) sh->codec->codec = ""; @@ -758,6 +845,11 @@ static void demux_add_sh_stream_locked(struct demux_internal *in, update_stream_selection_state(in, sh->ds); + mp_packet_tags_make_writable(&sh->ds->tags_init); + mp_tags_replace(sh->ds->tags_init->demux, in->d_thread->metadata); + mp_tags_replace(sh->ds->tags_init->sh, sh->tags); + mp_packet_tags_setref(&sh->ds->tags_reader, sh->ds->tags_init); + in->events |= DEMUX_EVENT_STREAMS; if (in->wakeup_cb) in->wakeup_cb(in->wakeup_cb_ctx); @@ -767,11 +859,19 @@ static void demux_add_sh_stream_locked(struct demux_internal *in, void demux_add_sh_stream(struct demuxer *demuxer, struct sh_stream *sh) { struct demux_internal *in = demuxer->in; + assert(demuxer == in->d_thread); pthread_mutex_lock(&in->lock); demux_add_sh_stream_locked(in, sh); pthread_mutex_unlock(&in->lock); } +static void ds_modify_demux_tags(struct demux_stream *ds) +{ + if (!ds->tags_demux) + mp_packet_tags_setref(&ds->tags_demux, ds->tags_init); + mp_packet_tags_make_writable(&ds->tags_demux); +} + // Update sh->tags (lazily). This must be called by demuxers which update // stream tags after init. (sh->tags can be accessed by the playback thread, // which means the demuxer thread cannot write or read it directly.) @@ -782,21 +882,16 @@ void demux_set_stream_tags(struct demuxer *demuxer, struct sh_stream *sh, { struct demux_internal *in = demuxer->in; assert(demuxer == in->d_thread); + struct demux_stream *ds = sh->ds; + assert(ds); // stream must have been added - if (sh->ds) { - while (demuxer->num_update_stream_tags <= sh->index) { - MP_TARRAY_APPEND(demuxer, demuxer->update_stream_tags, - demuxer->num_update_stream_tags, NULL); - } - talloc_free(demuxer->update_stream_tags[sh->index]); - demuxer->update_stream_tags[sh->index] = talloc_steal(demuxer, tags); + pthread_mutex_lock(&in->lock); - demux_changed(demuxer, DEMUX_EVENT_METADATA); - } else { - // not added yet - talloc_free(sh->tags); - sh->tags = talloc_steal(sh, tags); - } + ds_modify_demux_tags(ds); + mp_tags_replace(ds->tags_demux->sh, tags); + talloc_free(tags); + + pthread_mutex_unlock(&in->lock); } // Return a stream with the given index. Since streams can only be added during @@ -1233,6 +1328,7 @@ void demux_add_packet(struct sh_stream *stream, demux_packet_t *dp) dp->stream = stream->index; dp->next = NULL; + mp_packet_tags_setref(&dp->metadata, ds->tags_demux); // (keep in mind that even if the reader went out of data, the queue is not // necessarily empty due to the backbuffer) @@ -1289,10 +1385,9 @@ void demux_add_packet(struct sh_stream *stream, demux_packet_t *dp) double duration = in->highest_av_pts - in->d_thread->start_time; if (duration > in->d_thread->duration) { in->d_thread->duration = duration; - // (Don't wakeup like like demux_changed(), would be too noisy.) - in->d_thread->events |= DEMUX_EVENT_DURATION; - in->d_buffer->duration = duration; - in->d_buffer->events |= DEMUX_EVENT_DURATION; + // (Don't wakeup user thread, would be too noisy.) + in->events |= DEMUX_EVENT_DURATION; + in->duration = duration; } } } @@ -1631,6 +1726,18 @@ static struct demux_packet *dequeue_packet(struct demux_stream *ds) pkt->end = MP_ADD_PTS(pkt->end, ds->in->ts_offset); } + // Apply timed metadata when packet is returned to user. + // (The tags_init thing is a microopt. to not do refcounting for sane files.) + struct mp_packet_tags *metadata = pkt->metadata; + if (!metadata) + metadata = ds->tags_init; + if (metadata != ds->tags_reader) { + mp_packet_tags_setref(&ds->tags_reader, metadata); + ds->in->events |= DEMUX_EVENT_METADATA; + if (ds->in->wakeup_cb) + ds->in->wakeup_cb(ds->in->wakeup_cb_ctx); + } + prune_old_packets(ds->in); return pkt; } @@ -1865,80 +1972,48 @@ static void demux_update_replaygain(demuxer_t *demuxer) } } -// Copy all fields from src to dst, depending on event flags. +// Copy some fields from src to dst (for initialization). static void demux_copy(struct demuxer *dst, struct demuxer *src) { - if (src->events & DEMUX_EVENT_INIT) { - // Note that we do as shallow copies as possible. We expect the data - // that is not-copied (only referenced) to be immutable. - // This implies e.g. that no chapters are added after initialization. - dst->chapters = src->chapters; - dst->num_chapters = src->num_chapters; - dst->editions = src->editions; - dst->num_editions = src->num_editions; - dst->edition = src->edition; - dst->attachments = src->attachments; - dst->num_attachments = src->num_attachments; - dst->matroska_data = src->matroska_data; - dst->playlist = src->playlist; - dst->seekable = src->seekable; - dst->partially_seekable = src->partially_seekable; - dst->filetype = src->filetype; - dst->ts_resets_possible = src->ts_resets_possible; - dst->fully_read = src->fully_read; - dst->start_time = src->start_time; - dst->duration = src->duration; - dst->is_network = src->is_network; - dst->priv = src->priv; - } - - if (src->events & DEMUX_EVENT_METADATA) { - talloc_free(dst->metadata); - dst->metadata = mp_tags_dup(dst, src->metadata); - - if (dst->num_update_stream_tags != src->num_update_stream_tags) { - dst->num_update_stream_tags = src->num_update_stream_tags; - talloc_free(dst->update_stream_tags); - dst->update_stream_tags = - talloc_zero_array(dst, struct mp_tags *, dst->num_update_stream_tags); - } - for (int n = 0; n < dst->num_update_stream_tags; n++) { - talloc_free(dst->update_stream_tags[n]); - dst->update_stream_tags[n] = - talloc_steal(dst->update_stream_tags, src->update_stream_tags[n]); - src->update_stream_tags[n] = NULL; - } - } - - if (src->events & DEMUX_EVENT_DURATION) - dst->duration = src->duration; - - dst->events |= src->events; - src->events = 0; -} - -// This is called by demuxer implementations if certain parameters change -// at runtime. -// events is one of DEMUX_EVENT_* -// The code will copy the fields references by the events to the user-thread. -void demux_changed(demuxer_t *demuxer, int events) + // Note that we do as shallow copies as possible. We expect the data + // that is not-copied (only referenced) to be immutable. + // This implies e.g. that no chapters are added after initialization. + dst->chapters = src->chapters; + dst->num_chapters = src->num_chapters; + dst->editions = src->editions; + dst->num_editions = src->num_editions; + dst->edition = src->edition; + dst->attachments = src->attachments; + dst->num_attachments = src->num_attachments; + dst->matroska_data = src->matroska_data; + dst->playlist = src->playlist; + dst->seekable = src->seekable; + dst->partially_seekable = src->partially_seekable; + dst->filetype = src->filetype; + dst->ts_resets_possible = src->ts_resets_possible; + dst->fully_read = src->fully_read; + dst->start_time = src->start_time; + dst->duration = src->duration; + dst->is_network = src->is_network; + dst->priv = src->priv; + dst->metadata = mp_tags_dup(dst, src->metadata); +} + +// This is called by demuxer implementations if demuxer->metadata changed. +// (It will be propagated to the user as timed metadata.) +void demux_metadata_changed(demuxer_t *demuxer) { assert(demuxer == demuxer->in->d_thread); // call from demuxer impl. only struct demux_internal *in = demuxer->in; - demuxer->events |= events; - - update_cache(in); - pthread_mutex_lock(&in->lock); - if (demuxer->events & DEMUX_EVENT_INIT) - demuxer_sort_chapters(demuxer); - - demux_copy(in->d_buffer, demuxer); + for (int n = 0; n < in->num_streams; n++) { + struct demux_stream *ds = in->streams[n]->ds; + ds_modify_demux_tags(ds); + mp_tags_replace(ds->tags_demux->demux, demuxer->metadata); + } - if (in->wakeup_cb) - in->wakeup_cb(in->wakeup_cb_ctx); pthread_mutex_unlock(&in->lock); } @@ -1950,15 +2025,18 @@ static void update_final_metadata(demuxer_t *demuxer) int num_streams = MPMIN(in->num_streams, demuxer->num_update_stream_tags); for (int n = 0; n < num_streams; n++) { - struct mp_tags *tags = demuxer->update_stream_tags[n]; - demuxer->update_stream_tags[n] = NULL; - if (tags) { - struct sh_stream *sh = in->streams[n]; - talloc_free(sh->tags); - sh->tags = talloc_steal(sh, tags); - } + struct sh_stream *sh = in->streams[n]; + // (replace them even if unnecessary, simpler and doesn't hurt) + if (sh->ds->tags_reader) + mp_tags_replace(sh->tags, sh->ds->tags_reader->sh); } + struct mp_packet_tags *tags = + in->master_stream ? in->master_stream->tags_reader : NULL; + + if (tags) + mp_tags_replace(demuxer->metadata, tags->demux); + // Often for useful audio-only files, which have metadata in the audio track // metadata instead of the main metadata, but can also have cover art // metadata (which libavformat likes to treat as video streams). @@ -1977,8 +2055,8 @@ static void update_final_metadata(demuxer_t *demuxer) if (vstreams == 0 && astreams == 1) mp_tags_merge(demuxer->metadata, in->streams[astream_id]->tags); - if (in->stream_metadata) - mp_tags_merge(demuxer->metadata, in->stream_metadata); + if (tags) + mp_tags_merge(demuxer->metadata, tags->stream); } // Called by the user thread (i.e. player) to update metadata and other things @@ -1992,13 +2070,14 @@ void demux_update(demuxer_t *demuxer) update_cache(in); pthread_mutex_lock(&in->lock); - demux_copy(demuxer, in->d_buffer); demuxer->events |= in->events; in->events = 0; if (demuxer->events & DEMUX_EVENT_METADATA) update_final_metadata(demuxer); if (demuxer->events & (DEMUX_EVENT_METADATA | DEMUX_EVENT_STREAMS)) demux_update_replaygain(demuxer); + if (demuxer->events & DEMUX_EVENT_DURATION) + demuxer->duration = in->duration; pthread_mutex_unlock(&in->lock); } @@ -2043,7 +2122,6 @@ static void demux_maybe_replace_stream(struct demuxer *demuxer) free_stream(demuxer->stream); demuxer->stream = open_memory_stream(NULL, 0); // dummy in->d_thread->stream = demuxer->stream; - in->d_buffer->stream = demuxer->stream; if (demuxer->desc->control) demuxer->desc->control(in->d_thread, DEMUXER_CTRL_REPLACE_STREAM, NULL); @@ -2064,6 +2142,19 @@ static void demux_init_ccs(struct demuxer *demuxer, struct demux_opts *opts) pthread_mutex_unlock(&in->lock); } +// Each stream contains a copy of the global demuxer metadata, but this might +// be outdated if a stream gets added and then metadata does get set during +// early init. +static void fixup_metadata(struct demux_internal *in) +{ + for (int n = 0; n < in->num_streams; n++) { + struct demux_stream *ds = in->streams[n]->ds; + mp_packet_tags_make_writable(&ds->tags_init); + mp_tags_replace(ds->tags_init->demux, in->d_thread->metadata); + mp_packet_tags_setref(&ds->tags_reader, ds->tags_init); + } +} + static struct demuxer *open_given_type(struct mpv_global *global, struct mp_log *log, const struct demuxer_desc *desc, @@ -2098,7 +2189,6 @@ static struct demuxer *open_given_type(struct mpv_global *global, *in = (struct demux_internal){ .log = demuxer->log, .d_thread = talloc(demuxer, struct demuxer), - .d_buffer = talloc(demuxer, struct demuxer), .d_user = demuxer, .min_secs = opts->min_secs, .max_bytes = opts->max_bytes, @@ -2119,11 +2209,8 @@ static struct demuxer *open_given_type(struct mpv_global *global, MP_TARRAY_APPEND(in, in->ranges, in->num_ranges, in->current_range); *in->d_thread = *demuxer; - *in->d_buffer = *demuxer; in->d_thread->metadata = talloc_zero(in->d_thread, struct mp_tags); - in->d_user->metadata = talloc_zero(in->d_user, struct mp_tags); - in->d_buffer->metadata = talloc_zero(in->d_buffer, struct mp_tags); mp_dbg(log, "Trying demuxer: %s (force-level: %s)\n", desc->name, d_level(check)); @@ -2155,7 +2242,11 @@ static struct demuxer *open_given_type(struct mpv_global *global, demux_init_cuesheet(in->d_thread); demux_init_cache(demuxer); demux_init_ccs(demuxer, opts); - demux_changed(in->d_thread, DEMUX_EVENT_ALL); + demux_copy(in->d_user, in->d_thread); + in->duration = in->d_thread->duration; + demuxer_sort_chapters(demuxer); + fixup_metadata(in); + in->events = DEMUX_EVENT_ALL; demux_update(demuxer); stream_control(demuxer->stream, STREAM_CTRL_SET_READAHEAD, &(int){params ? params->initial_readahead : false}); @@ -2823,9 +2914,12 @@ static void update_cache(struct demux_internal *in) in->stream_size = stream_size; in->stream_cache_info = stream_cache_info; if (stream_metadata) { - talloc_free(in->stream_metadata); - in->stream_metadata = talloc_steal(in, stream_metadata); - in->d_buffer->events |= DEMUX_EVENT_METADATA; + for (int n = 0; n < in->num_streams; n++) { + struct demux_stream *ds = in->streams[n]->ds; + ds_modify_demux_tags(ds); + mp_tags_replace(ds->tags_demux->stream, stream_metadata); + } + talloc_free(stream_metadata); } pthread_mutex_unlock(&in->lock); } |