summaryrefslogtreecommitdiffstats
path: root/demux/demux.c
diff options
context:
space:
mode:
Diffstat (limited to 'demux/demux.c')
-rw-r--r--demux/demux.c308
1 files changed, 201 insertions, 107 deletions
diff --git a/demux/demux.c b/demux/demux.c
index 22b1a47ab1..afe8d2b51f 100644
--- a/demux/demux.c
+++ b/demux/demux.c
@@ -34,6 +34,7 @@
#include "mpv_talloc.h"
#include "common/msg.h"
#include "common/global.h"
+#include "osdep/atomic.h"
#include "osdep/threads.h"
#include "stream/stream.h"
@@ -125,13 +126,10 @@ struct demux_internal {
// The demuxer runs potentially in another thread, so we keep two demuxer
// structs; the real demuxer can access the shadow struct only.
- // Since demuxer and user threads both don't use locks, a third demuxer
- // struct d_buffer is used to copy data between them in a synchronized way.
struct demuxer *d_thread; // accessed by demuxer impl. (producer)
struct demuxer *d_user; // accessed by player (consumer)
- struct demuxer *d_buffer; // protected by lock; used to sync d_user/thread
- // The lock protects the packet queues (struct demux_stream), d_buffer,
+ // The lock protects the packet queues (struct demux_stream),
// and the fields below.
pthread_mutex_t lock;
pthread_cond_t wakeup;
@@ -147,6 +145,13 @@ struct demux_internal {
struct sh_stream **streams;
int num_streams;
+ // If non-NULL, a _selected_ stream which is used for global (timed)
+ // metadata. It will be an arbitrary stream that is hopefully not sparse
+ // (i.e. not a subtitle stream). This is needed because due to variable
+ // interleaving multiple streams won't agree whether timed metadata is in
+ // effect yet at the same time position.
+ struct demux_stream *master_stream;
+
int events;
bool warned_queue_overflow;
@@ -199,9 +204,10 @@ struct demux_internal {
bool blocked;
+ // Transient state.
+ double duration;
// Cached state.
bool force_cache_update;
- struct mp_tags *stream_metadata;
struct stream_cache_info stream_cache_info;
int64_t stream_size;
// Updated during init only.
@@ -305,6 +311,19 @@ struct demux_stream {
// for closed captions (demuxer_feed_caption)
struct sh_stream *cc;
bool ignore_eof; // ignore stream in underrun detection
+
+ // timed metadata
+ struct mp_packet_tags *tags_demux; // demuxer state (last updated metadata)
+ struct mp_packet_tags *tags_reader; // reader state (last returned packet)
+ struct mp_packet_tags *tags_init; // global state at start of demuxing
+};
+
+// "Snapshot" of the tag state. Refcounted to avoid a copy per packet.
+struct mp_packet_tags {
+ mp_atomic_int64 refcount;
+ struct mp_tags *demux; // demuxer global tags (normal thing)
+ struct mp_tags *stream; // byte stream tags (ICY crap)
+ struct mp_tags *sh; // per sh_stream tags (e.g. OGG)
};
// Return "a", or if that is NOPTS, return "def".
@@ -404,6 +423,56 @@ static void check_queue_consistency(struct demux_internal *in)
}
#endif
+void mp_packet_tags_unref(struct mp_packet_tags *tags)
+{
+ if (tags) {
+ if (atomic_fetch_add(&tags->refcount, -1) == 1) {
+ talloc_free(tags->sh);
+ talloc_free(tags->demux);
+ talloc_free(tags->stream);
+ talloc_free(tags);
+ }
+ }
+}
+
+void mp_packet_tags_setref(struct mp_packet_tags **dst, struct mp_packet_tags *src)
+{
+ if (src)
+ atomic_fetch_add(&src->refcount, 1);
+ mp_packet_tags_unref(*dst);
+ *dst = src;
+}
+
+static struct mp_tags *tags_dup_or_null(struct mp_tags *t)
+{
+ return t ? mp_tags_dup(NULL, t) : talloc_zero(NULL, struct mp_tags);
+}
+
+// Return a "deep" copy. If tags==NULL, allocate a new one.
+static struct mp_packet_tags *mp_packet_tags_copy(struct mp_packet_tags *tags)
+{
+ struct mp_packet_tags *new = talloc_ptrtype(NULL, new);
+ *new = (struct mp_packet_tags){
+ .refcount = ATOMIC_VAR_INIT(1),
+ .demux = tags_dup_or_null(tags ? tags->demux : NULL),
+ .stream = tags_dup_or_null(tags ? tags->stream : NULL),
+ .sh = tags_dup_or_null(tags ? tags->sh : NULL),
+ };
+ return new;
+}
+
+// Force a copy if refcount != 1.
+// (refcount==1 means we're the unambiguous owner.)
+// If *tags==NULL, allocate a blank one.
+static void mp_packet_tags_make_writable(struct mp_packet_tags **tags)
+{
+ if (*tags && atomic_load(&(*tags)->refcount) == 1)
+ return;
+ struct mp_packet_tags *new = mp_packet_tags_copy(*tags);
+ mp_packet_tags_unref(*tags);
+ *tags = new;
+}
+
static void recompute_buffers(struct demux_stream *ds)
{
ds->fw_packs = 0;
@@ -629,16 +698,25 @@ static void update_stream_selection_state(struct demux_internal *in,
bool any_av_streams = false;
bool any_streams = false;
+ struct demux_stream *master = NULL;
for (int n = 0; n < in->num_streams; n++) {
struct demux_stream *s = in->streams[n]->ds;
s->eager = s->selected && !s->sh->attached_picture;
- if (s->eager)
+ if (s->eager) {
any_av_streams |= s->type != STREAM_SUB;
+ if (!master ||
+ (master->type == STREAM_VIDEO && s->type == STREAM_AUDIO))
+ {
+ master = s;
+ }
+ }
any_streams |= s->selected;
}
+ in->master_stream = master;
+
// Subtitles are only eagerly read if there are no other eagerly read
// streams.
if (any_av_streams) {
@@ -714,6 +792,14 @@ struct sh_stream *demux_alloc_sh_stream(enum stream_type type)
return sh;
}
+static void ds_destroy(void *ptr)
+{
+ struct demux_stream *ds = ptr;
+ mp_packet_tags_unref(ds->tags_init);
+ mp_packet_tags_unref(ds->tags_reader);
+ mp_packet_tags_unref(ds->tags_demux);
+}
+
// Add a new sh_stream to the demuxer. Note that as soon as the stream has been
// added, it must be immutable, and must not be released (this will happen when
// the demuxer is destroyed).
@@ -734,6 +820,7 @@ static void demux_add_sh_stream_locked(struct demux_internal *in,
.global_correct_dts = true,
.global_correct_pos = true,
};
+ talloc_set_destructor(sh->ds, ds_destroy);
if (!sh->codec->codec)
sh->codec->codec = "";
@@ -758,6 +845,11 @@ static void demux_add_sh_stream_locked(struct demux_internal *in,
update_stream_selection_state(in, sh->ds);
+ mp_packet_tags_make_writable(&sh->ds->tags_init);
+ mp_tags_replace(sh->ds->tags_init->demux, in->d_thread->metadata);
+ mp_tags_replace(sh->ds->tags_init->sh, sh->tags);
+ mp_packet_tags_setref(&sh->ds->tags_reader, sh->ds->tags_init);
+
in->events |= DEMUX_EVENT_STREAMS;
if (in->wakeup_cb)
in->wakeup_cb(in->wakeup_cb_ctx);
@@ -767,11 +859,19 @@ static void demux_add_sh_stream_locked(struct demux_internal *in,
void demux_add_sh_stream(struct demuxer *demuxer, struct sh_stream *sh)
{
struct demux_internal *in = demuxer->in;
+ assert(demuxer == in->d_thread);
pthread_mutex_lock(&in->lock);
demux_add_sh_stream_locked(in, sh);
pthread_mutex_unlock(&in->lock);
}
+static void ds_modify_demux_tags(struct demux_stream *ds)
+{
+ if (!ds->tags_demux)
+ mp_packet_tags_setref(&ds->tags_demux, ds->tags_init);
+ mp_packet_tags_make_writable(&ds->tags_demux);
+}
+
// Update sh->tags (lazily). This must be called by demuxers which update
// stream tags after init. (sh->tags can be accessed by the playback thread,
// which means the demuxer thread cannot write or read it directly.)
@@ -782,21 +882,16 @@ void demux_set_stream_tags(struct demuxer *demuxer, struct sh_stream *sh,
{
struct demux_internal *in = demuxer->in;
assert(demuxer == in->d_thread);
+ struct demux_stream *ds = sh->ds;
+ assert(ds); // stream must have been added
- if (sh->ds) {
- while (demuxer->num_update_stream_tags <= sh->index) {
- MP_TARRAY_APPEND(demuxer, demuxer->update_stream_tags,
- demuxer->num_update_stream_tags, NULL);
- }
- talloc_free(demuxer->update_stream_tags[sh->index]);
- demuxer->update_stream_tags[sh->index] = talloc_steal(demuxer, tags);
+ pthread_mutex_lock(&in->lock);
- demux_changed(demuxer, DEMUX_EVENT_METADATA);
- } else {
- // not added yet
- talloc_free(sh->tags);
- sh->tags = talloc_steal(sh, tags);
- }
+ ds_modify_demux_tags(ds);
+ mp_tags_replace(ds->tags_demux->sh, tags);
+ talloc_free(tags);
+
+ pthread_mutex_unlock(&in->lock);
}
// Return a stream with the given index. Since streams can only be added during
@@ -1233,6 +1328,7 @@ void demux_add_packet(struct sh_stream *stream, demux_packet_t *dp)
dp->stream = stream->index;
dp->next = NULL;
+ mp_packet_tags_setref(&dp->metadata, ds->tags_demux);
// (keep in mind that even if the reader went out of data, the queue is not
// necessarily empty due to the backbuffer)
@@ -1289,10 +1385,9 @@ void demux_add_packet(struct sh_stream *stream, demux_packet_t *dp)
double duration = in->highest_av_pts - in->d_thread->start_time;
if (duration > in->d_thread->duration) {
in->d_thread->duration = duration;
- // (Don't wakeup like like demux_changed(), would be too noisy.)
- in->d_thread->events |= DEMUX_EVENT_DURATION;
- in->d_buffer->duration = duration;
- in->d_buffer->events |= DEMUX_EVENT_DURATION;
+ // (Don't wakeup user thread, would be too noisy.)
+ in->events |= DEMUX_EVENT_DURATION;
+ in->duration = duration;
}
}
}
@@ -1631,6 +1726,18 @@ static struct demux_packet *dequeue_packet(struct demux_stream *ds)
pkt->end = MP_ADD_PTS(pkt->end, ds->in->ts_offset);
}
+ // Apply timed metadata when packet is returned to user.
+ // (The tags_init thing is a microopt. to not do refcounting for sane files.)
+ struct mp_packet_tags *metadata = pkt->metadata;
+ if (!metadata)
+ metadata = ds->tags_init;
+ if (metadata != ds->tags_reader) {
+ mp_packet_tags_setref(&ds->tags_reader, metadata);
+ ds->in->events |= DEMUX_EVENT_METADATA;
+ if (ds->in->wakeup_cb)
+ ds->in->wakeup_cb(ds->in->wakeup_cb_ctx);
+ }
+
prune_old_packets(ds->in);
return pkt;
}
@@ -1865,80 +1972,48 @@ static void demux_update_replaygain(demuxer_t *demuxer)
}
}
-// Copy all fields from src to dst, depending on event flags.
+// Copy some fields from src to dst (for initialization).
static void demux_copy(struct demuxer *dst, struct demuxer *src)
{
- if (src->events & DEMUX_EVENT_INIT) {
- // Note that we do as shallow copies as possible. We expect the data
- // that is not-copied (only referenced) to be immutable.
- // This implies e.g. that no chapters are added after initialization.
- dst->chapters = src->chapters;
- dst->num_chapters = src->num_chapters;
- dst->editions = src->editions;
- dst->num_editions = src->num_editions;
- dst->edition = src->edition;
- dst->attachments = src->attachments;
- dst->num_attachments = src->num_attachments;
- dst->matroska_data = src->matroska_data;
- dst->playlist = src->playlist;
- dst->seekable = src->seekable;
- dst->partially_seekable = src->partially_seekable;
- dst->filetype = src->filetype;
- dst->ts_resets_possible = src->ts_resets_possible;
- dst->fully_read = src->fully_read;
- dst->start_time = src->start_time;
- dst->duration = src->duration;
- dst->is_network = src->is_network;
- dst->priv = src->priv;
- }
-
- if (src->events & DEMUX_EVENT_METADATA) {
- talloc_free(dst->metadata);
- dst->metadata = mp_tags_dup(dst, src->metadata);
-
- if (dst->num_update_stream_tags != src->num_update_stream_tags) {
- dst->num_update_stream_tags = src->num_update_stream_tags;
- talloc_free(dst->update_stream_tags);
- dst->update_stream_tags =
- talloc_zero_array(dst, struct mp_tags *, dst->num_update_stream_tags);
- }
- for (int n = 0; n < dst->num_update_stream_tags; n++) {
- talloc_free(dst->update_stream_tags[n]);
- dst->update_stream_tags[n] =
- talloc_steal(dst->update_stream_tags, src->update_stream_tags[n]);
- src->update_stream_tags[n] = NULL;
- }
- }
-
- if (src->events & DEMUX_EVENT_DURATION)
- dst->duration = src->duration;
-
- dst->events |= src->events;
- src->events = 0;
-}
-
-// This is called by demuxer implementations if certain parameters change
-// at runtime.
-// events is one of DEMUX_EVENT_*
-// The code will copy the fields references by the events to the user-thread.
-void demux_changed(demuxer_t *demuxer, int events)
+ // Note that we do as shallow copies as possible. We expect the data
+ // that is not-copied (only referenced) to be immutable.
+ // This implies e.g. that no chapters are added after initialization.
+ dst->chapters = src->chapters;
+ dst->num_chapters = src->num_chapters;
+ dst->editions = src->editions;
+ dst->num_editions = src->num_editions;
+ dst->edition = src->edition;
+ dst->attachments = src->attachments;
+ dst->num_attachments = src->num_attachments;
+ dst->matroska_data = src->matroska_data;
+ dst->playlist = src->playlist;
+ dst->seekable = src->seekable;
+ dst->partially_seekable = src->partially_seekable;
+ dst->filetype = src->filetype;
+ dst->ts_resets_possible = src->ts_resets_possible;
+ dst->fully_read = src->fully_read;
+ dst->start_time = src->start_time;
+ dst->duration = src->duration;
+ dst->is_network = src->is_network;
+ dst->priv = src->priv;
+ dst->metadata = mp_tags_dup(dst, src->metadata);
+}
+
+// This is called by demuxer implementations if demuxer->metadata changed.
+// (It will be propagated to the user as timed metadata.)
+void demux_metadata_changed(demuxer_t *demuxer)
{
assert(demuxer == demuxer->in->d_thread); // call from demuxer impl. only
struct demux_internal *in = demuxer->in;
- demuxer->events |= events;
-
- update_cache(in);
-
pthread_mutex_lock(&in->lock);
- if (demuxer->events & DEMUX_EVENT_INIT)
- demuxer_sort_chapters(demuxer);
-
- demux_copy(in->d_buffer, demuxer);
+ for (int n = 0; n < in->num_streams; n++) {
+ struct demux_stream *ds = in->streams[n]->ds;
+ ds_modify_demux_tags(ds);
+ mp_tags_replace(ds->tags_demux->demux, demuxer->metadata);
+ }
- if (in->wakeup_cb)
- in->wakeup_cb(in->wakeup_cb_ctx);
pthread_mutex_unlock(&in->lock);
}
@@ -1950,15 +2025,18 @@ static void update_final_metadata(demuxer_t *demuxer)
int num_streams = MPMIN(in->num_streams, demuxer->num_update_stream_tags);
for (int n = 0; n < num_streams; n++) {
- struct mp_tags *tags = demuxer->update_stream_tags[n];
- demuxer->update_stream_tags[n] = NULL;
- if (tags) {
- struct sh_stream *sh = in->streams[n];
- talloc_free(sh->tags);
- sh->tags = talloc_steal(sh, tags);
- }
+ struct sh_stream *sh = in->streams[n];
+ // (replace them even if unnecessary, simpler and doesn't hurt)
+ if (sh->ds->tags_reader)
+ mp_tags_replace(sh->tags, sh->ds->tags_reader->sh);
}
+ struct mp_packet_tags *tags =
+ in->master_stream ? in->master_stream->tags_reader : NULL;
+
+ if (tags)
+ mp_tags_replace(demuxer->metadata, tags->demux);
+
// Often for useful audio-only files, which have metadata in the audio track
// metadata instead of the main metadata, but can also have cover art
// metadata (which libavformat likes to treat as video streams).
@@ -1977,8 +2055,8 @@ static void update_final_metadata(demuxer_t *demuxer)
if (vstreams == 0 && astreams == 1)
mp_tags_merge(demuxer->metadata, in->streams[astream_id]->tags);
- if (in->stream_metadata)
- mp_tags_merge(demuxer->metadata, in->stream_metadata);
+ if (tags)
+ mp_tags_merge(demuxer->metadata, tags->stream);
}
// Called by the user thread (i.e. player) to update metadata and other things
@@ -1992,13 +2070,14 @@ void demux_update(demuxer_t *demuxer)
update_cache(in);
pthread_mutex_lock(&in->lock);
- demux_copy(demuxer, in->d_buffer);
demuxer->events |= in->events;
in->events = 0;
if (demuxer->events & DEMUX_EVENT_METADATA)
update_final_metadata(demuxer);
if (demuxer->events & (DEMUX_EVENT_METADATA | DEMUX_EVENT_STREAMS))
demux_update_replaygain(demuxer);
+ if (demuxer->events & DEMUX_EVENT_DURATION)
+ demuxer->duration = in->duration;
pthread_mutex_unlock(&in->lock);
}
@@ -2043,7 +2122,6 @@ static void demux_maybe_replace_stream(struct demuxer *demuxer)
free_stream(demuxer->stream);
demuxer->stream = open_memory_stream(NULL, 0); // dummy
in->d_thread->stream = demuxer->stream;
- in->d_buffer->stream = demuxer->stream;
if (demuxer->desc->control)
demuxer->desc->control(in->d_thread, DEMUXER_CTRL_REPLACE_STREAM, NULL);
@@ -2064,6 +2142,19 @@ static void demux_init_ccs(struct demuxer *demuxer, struct demux_opts *opts)
pthread_mutex_unlock(&in->lock);
}
+// Each stream contains a copy of the global demuxer metadata, but this might
+// be outdated if a stream gets added and then metadata does get set during
+// early init.
+static void fixup_metadata(struct demux_internal *in)
+{
+ for (int n = 0; n < in->num_streams; n++) {
+ struct demux_stream *ds = in->streams[n]->ds;
+ mp_packet_tags_make_writable(&ds->tags_init);
+ mp_tags_replace(ds->tags_init->demux, in->d_thread->metadata);
+ mp_packet_tags_setref(&ds->tags_reader, ds->tags_init);
+ }
+}
+
static struct demuxer *open_given_type(struct mpv_global *global,
struct mp_log *log,
const struct demuxer_desc *desc,
@@ -2098,7 +2189,6 @@ static struct demuxer *open_given_type(struct mpv_global *global,
*in = (struct demux_internal){
.log = demuxer->log,
.d_thread = talloc(demuxer, struct demuxer),
- .d_buffer = talloc(demuxer, struct demuxer),
.d_user = demuxer,
.min_secs = opts->min_secs,
.max_bytes = opts->max_bytes,
@@ -2119,11 +2209,8 @@ static struct demuxer *open_given_type(struct mpv_global *global,
MP_TARRAY_APPEND(in, in->ranges, in->num_ranges, in->current_range);
*in->d_thread = *demuxer;
- *in->d_buffer = *demuxer;
in->d_thread->metadata = talloc_zero(in->d_thread, struct mp_tags);
- in->d_user->metadata = talloc_zero(in->d_user, struct mp_tags);
- in->d_buffer->metadata = talloc_zero(in->d_buffer, struct mp_tags);
mp_dbg(log, "Trying demuxer: %s (force-level: %s)\n",
desc->name, d_level(check));
@@ -2155,7 +2242,11 @@ static struct demuxer *open_given_type(struct mpv_global *global,
demux_init_cuesheet(in->d_thread);
demux_init_cache(demuxer);
demux_init_ccs(demuxer, opts);
- demux_changed(in->d_thread, DEMUX_EVENT_ALL);
+ demux_copy(in->d_user, in->d_thread);
+ in->duration = in->d_thread->duration;
+ demuxer_sort_chapters(demuxer);
+ fixup_metadata(in);
+ in->events = DEMUX_EVENT_ALL;
demux_update(demuxer);
stream_control(demuxer->stream, STREAM_CTRL_SET_READAHEAD,
&(int){params ? params->initial_readahead : false});
@@ -2823,9 +2914,12 @@ static void update_cache(struct demux_internal *in)
in->stream_size = stream_size;
in->stream_cache_info = stream_cache_info;
if (stream_metadata) {
- talloc_free(in->stream_metadata);
- in->stream_metadata = talloc_steal(in, stream_metadata);
- in->d_buffer->events |= DEMUX_EVENT_METADATA;
+ for (int n = 0; n < in->num_streams; n++) {
+ struct demux_stream *ds = in->streams[n]->ds;
+ ds_modify_demux_tags(ds);
+ mp_tags_replace(ds->tags_demux->stream, stream_metadata);
+ }
+ talloc_free(stream_metadata);
}
pthread_mutex_unlock(&in->lock);
}