From f9ba1a3ddf5186cb31c2715e3293eef70575a9ee Mon Sep 17 00:00:00 2001 From: wm4 Date: Wed, 23 Dec 2015 21:44:53 +0100 Subject: demux: remove weird tripple-buffering for the sh_stream list The demuxer infrastructure was originally single-threaded. To make it suitable for multithreading (specifically, demuxing and decoding on separate threads), some sort of tripple-buffering was introduced. There are separate "struct demuxer" allocations. The demuxer thread sets the state on d_thread. If anything changes, the state is copied to d_buffer (the copy is protected by a lock), and the decoder thread is notified. Then the decoder thread copies the state from d_buffer to d_user (again while holding a lock). This avoids the need for locking in the demuxer/decoder code itself (only demux.c needs an internal, "invisible" lock.) Remove the streams/num_streams fields from this tripple-buffering schema. Move them to the internal struct, and protect them with the internal lock. Use accessors for read access outside of demux.c. Other than replacing all field accesses with accessors, this separates allocating and adding sh_streams. This is needed to avoid race conditions. Before this change, this was awkwardly handled by first initializing the sh_stream, and then sending a stream change event. Now the stream is allocated, then initialized, and then declared as immutable and added (at which point it becomes visible to the decoder thread immediately). This change is useful for PR #2626. And eventually, we should probably get entirely of the tripple buffering, and this makes a nice first step. --- demux/demux.c | 185 +++++++++++++++++++++++++++++++++++++--------------------- 1 file changed, 118 insertions(+), 67 deletions(-) (limited to 'demux/demux.c') diff --git a/demux/demux.c b/demux/demux.c index 972bf31bae..8eb274647d 100644 --- a/demux/demux.c +++ b/demux/demux.c @@ -103,6 +103,11 @@ struct demux_internal { void (*wakeup_cb)(void *ctx); void *wakeup_cb_ctx; + struct sh_stream **streams; + int num_streams; + + int events; + bool warned_queue_overflow; bool last_eof; // last actual global EOF status bool eof; // whether we're in EOF state (reset for retry) @@ -198,42 +203,85 @@ void demux_set_ts_offset(struct demuxer *demuxer, double offset) pthread_mutex_unlock(&in->lock); } -struct sh_stream *new_sh_stream(demuxer_t *demuxer, enum stream_type type) +// Allocate a new sh_stream of the given type. It either has to be released +// with talloc_free(), or added to a demuxer with demux_add_sh_stream(). You +// cannot add or read packets from the stream before it has been added. +struct sh_stream *demux_alloc_sh_stream(enum stream_type type) { - assert(demuxer == demuxer->in->d_thread); - - if (demuxer->num_streams > MAX_SH_STREAMS) { - MP_WARN(demuxer, "Too many streams.\n"); - return NULL; - } - - int demuxer_id = 0; - for (int n = 0; n < demuxer->num_streams; n++) { - if (demuxer->streams[n]->type == type) - demuxer_id++; - } - - struct sh_stream *sh = talloc_ptrtype(demuxer, sh); + struct sh_stream *sh = talloc_ptrtype(NULL, sh); *sh = (struct sh_stream) { .type = type, - .index = demuxer->num_streams, - .ff_index = demuxer->num_streams, - .demuxer_id = demuxer_id, // may be overwritten by demuxer - .ds = talloc(sh, struct demux_stream), + .index = -1, + .ff_index = -1, // may be overwritten by demuxer + .demuxer_id = -1, // ... same }; + switch (sh->type) { + case STREAM_VIDEO: sh->video = talloc_zero(sh, struct sh_video); break; + case STREAM_AUDIO: sh->audio = talloc_zero(sh, struct sh_audio); break; + case STREAM_SUB: sh->sub = talloc_zero(sh, struct sh_sub); break; + } + + return sh; +} + +// Add a new sh_stream to the demuxer. Note that as soon as the stream has been +// added, it must be immutable, and must not be released (this will happen when +// the demuxer is destroyed). +void demux_add_sh_stream(struct demuxer *demuxer, struct sh_stream *sh) +{ + struct demux_internal *in = demuxer->in; + pthread_mutex_lock(&in->lock); + + assert(!sh->ds); // must not be added yet + + sh->ds = talloc(sh, struct demux_stream); *sh->ds = (struct demux_stream) { - .in = demuxer->in, + .in = in, .type = sh->type, - .selected = demuxer->in->autoselect, + .selected = in->autoselect, }; - MP_TARRAY_APPEND(demuxer, demuxer->streams, demuxer->num_streams, sh); - switch (sh->type) { - case STREAM_VIDEO: sh->video = talloc_zero(demuxer, struct sh_video); break; - case STREAM_AUDIO: sh->audio = talloc_zero(demuxer, struct sh_audio); break; - case STREAM_SUB: sh->sub = talloc_zero(demuxer, struct sh_sub); break; + + sh->index = in->num_streams; + if (sh->ff_index < 0) + sh->ff_index = sh->index; + if (sh->demuxer_id < 0) { + sh->demuxer_id = 0; + for (int n = 0; n < in->num_streams; n++) { + if (in->streams[n]->type == sh->type) + sh->demuxer_id += 1; + } } - return sh; + MP_TARRAY_APPEND(in, in->streams, in->num_streams, sh); + + in->events |= DEMUX_EVENT_STREAMS; + if (in->wakeup_cb) + in->wakeup_cb(in->wakeup_cb_ctx); + pthread_mutex_unlock(&in->lock); +} + +// Return a stream with the given index. Since streams can only be added during +// the lifetime of the demuxer, it is guaranteed that an index within the valid +// range [0, demux_get_num_stream()) always returns a valid sh_stream pointer, +// which will be valid until the demuxer is destroyed. +struct sh_stream *demux_get_stream(struct demuxer *demuxer, int index) +{ + struct demux_internal *in = demuxer->in; + pthread_mutex_lock(&in->lock); + assert(index >= 0 && index < in->num_streams); + struct sh_stream *r = in->streams[index]; + pthread_mutex_unlock(&in->lock); + return r; +} + +// See demux_get_stream(). +int demux_get_num_stream(struct demuxer *demuxer) +{ + struct demux_internal *in = demuxer->in; + pthread_mutex_lock(&in->lock); + int r = in->num_streams; + pthread_mutex_unlock(&in->lock); + return r; } void free_demuxer(demuxer_t *demuxer) @@ -247,8 +295,10 @@ void free_demuxer(demuxer_t *demuxer) if (demuxer->desc->close) demuxer->desc->close(in->d_thread); - for (int n = 0; n < demuxer->num_streams; n++) - ds_flush(demuxer->streams[n]->ds); + for (int n = in->num_streams - 1; n >= 0; n--) { + ds_flush(in->streams[n]->ds); + talloc_free(in->streams[n]); + } pthread_mutex_destroy(&in->lock); pthread_cond_destroy(&in->wakeup); talloc_free(demuxer); @@ -388,8 +438,8 @@ static bool read_packet(struct demux_internal *in) // safe-guards against packet queue overflow. bool active = false, read_more = false; size_t packs = 0, bytes = 0; - for (int n = 0; n < in->d_buffer->num_streams; n++) { - struct demux_stream *ds = in->d_buffer->streams[n]->ds; + for (int n = 0; n < in->num_streams; n++) { + struct demux_stream *ds = in->streams[n]->ds; active |= ds->active; read_more |= ds->active && !ds->head; packs += ds->packs; @@ -404,16 +454,16 @@ static bool read_packet(struct demux_internal *in) if (!in->warned_queue_overflow) { in->warned_queue_overflow = true; MP_ERR(in, "Too many packets in the demuxer packet queues:\n"); - for (int n = 0; n < in->d_buffer->num_streams; n++) { - struct demux_stream *ds = in->d_buffer->streams[n]->ds; + for (int n = 0; n < in->num_streams; n++) { + struct demux_stream *ds = in->streams[n]->ds; if (ds->selected) { MP_ERR(in, " %s/%d: %zd packets, %zd bytes\n", stream_type_name(ds->type), n, ds->packs, ds->bytes); } } } - for (int n = 0; n < in->d_buffer->num_streams; n++) { - struct demux_stream *ds = in->d_buffer->streams[n]->ds; + for (int n = 0; n < in->num_streams; n++) { + struct demux_stream *ds = in->streams[n]->ds; ds->eof |= !ds->head; } pthread_cond_signal(&in->wakeup); @@ -433,8 +483,8 @@ static bool read_packet(struct demux_internal *in) pthread_mutex_lock(&in->lock); if (eof) { - for (int n = 0; n < in->d_buffer->num_streams; n++) - in->d_buffer->streams[n]->ds->eof = true; + for (int n = 0; n < in->num_streams; n++) + in->streams[n]->ds->eof = true; // If we had EOF previously, then don't wakeup (avoids wakeup loop) if (!in->last_eof) { if (in->wakeup_cb) @@ -478,8 +528,8 @@ static void start_refreshing(struct demux_internal *in) in->start_refresh_seek = false; double start_ts = MP_NOPTS_VALUE; - for (int n = 0; n < demux->num_streams; n++) { - struct demux_stream *ds = demux->streams[n]->ds; + for (int n = 0; n < in->num_streams; n++) { + struct demux_stream *ds = in->streams[n]->ds; if (ds->type == STREAM_VIDEO || ds->type == STREAM_AUDIO) start_ts = MP_PTS_MIN(start_ts, ds->base_ts); } @@ -488,8 +538,8 @@ static void start_refreshing(struct demux_internal *in) demux->partially_seekable || !demux->allow_refresh_seeks) return; - for (int n = 0; n < demux->num_streams; n++) { - struct demux_stream *ds = demux->streams[n]->ds; + for (int n = 0; n < in->num_streams; n++) { + struct demux_stream *ds = in->streams[n]->ds; // Streams which didn't read any packets yet can return all packets, // or they'd be stuck forever; affects newly selected streams too. if (ds->last_pos != -1) @@ -697,21 +747,22 @@ bool demux_has_packet(struct sh_stream *sh) // Read and return any packet we find. struct demux_packet *demux_read_any_packet(struct demuxer *demuxer) { - assert(!demuxer->in->threading); // doesn't work with threading + struct demux_internal *in = demuxer->in; + assert(!in->threading); // doesn't work with threading bool read_more = true; while (read_more) { - for (int n = 0; n < demuxer->num_streams; n++) { - struct sh_stream *sh = demuxer->streams[n]; + for (int n = 0; n < in->num_streams; n++) { + struct sh_stream *sh = in->streams[n]; sh->ds->active = sh->ds->selected; // force read_packet() to read struct demux_packet *pkt = dequeue_packet(sh->ds); if (pkt) return pkt; } // retry after calling this - pthread_mutex_lock(&demuxer->in->lock); - read_more = read_packet(demuxer->in); - read_more &= !demuxer->in->eof; - pthread_mutex_unlock(&demuxer->in->lock); + pthread_mutex_lock(&in->lock); // lock only because read_packet unlocks + read_more = read_packet(in); + read_more &= !in->eof; + pthread_mutex_unlock(&in->lock); } return NULL; } @@ -795,13 +846,14 @@ static int decode_peak(demuxer_t *demuxer, const char *tag, float *out) static void apply_replaygain(demuxer_t *demuxer, struct replaygain_data *rg) { - for (int n = 0; n < demuxer->num_streams; n++) { - struct sh_stream *sh = demuxer->streams[n]; + struct demux_internal *in = demuxer->in; + for (int n = 0; n < in->num_streams; n++) { + struct sh_stream *sh = in->streams[n]; if (sh->audio && !sh->audio->replaygain_data) { MP_VERBOSE(demuxer, "Replaygain: Track=%f/%f Album=%f/%f\n", rg->track_gain, rg->track_peak, rg->album_gain, rg->album_peak); - sh->audio->replaygain_data = talloc_memdup(demuxer, rg, sizeof(*rg)); + sh->audio->replaygain_data = talloc_memdup(in, rg, sizeof(*rg)); } } } @@ -853,11 +905,6 @@ static void demux_copy(struct demuxer *dst, struct demuxer *src) dst->start_time = src->start_time; dst->priv = src->priv; } - if (src->events & DEMUX_EVENT_STREAMS) { - // The stream structs themselves are immutable. - for (int n = dst->num_streams; n < src->num_streams; n++) - MP_TARRAY_APPEND(dst, dst->streams, dst->num_streams, src->streams[n]); - } if (src->events & DEMUX_EVENT_METADATA) { talloc_free(dst->metadata); dst->metadata = mp_tags_dup(dst, src->metadata); @@ -905,6 +952,8 @@ void demux_update(demuxer_t *demuxer) pthread_mutex_lock(&in->lock); demux_copy(demuxer, in->d_buffer); + demuxer->events |= in->events; + in->events = 0; if (in->stream_metadata && (demuxer->events & DEMUX_EVENT_METADATA)) mp_tags_merge(demuxer->metadata, in->stream_metadata); pthread_mutex_unlock(&in->lock); @@ -1113,8 +1162,8 @@ struct demuxer *demux_open_url(const char *url, static void flush_locked(demuxer_t *demuxer) { - for (int n = 0; n < demuxer->num_streams; n++) - ds_flush(demuxer->streams[n]->ds); + for (int n = 0; n < demuxer->in->num_streams; n++) + ds_flush(demuxer->in->streams[n]->ds); demuxer->in->warned_queue_overflow = false; demuxer->in->eof = false; demuxer->in->last_eof = false; @@ -1192,10 +1241,11 @@ void demux_set_enable_refresh_seeks(struct demuxer *demuxer, bool enabled) struct sh_stream *demuxer_stream_by_demuxer_id(struct demuxer *d, enum stream_type t, int id) { - for (int n = 0; n < d->num_streams; n++) { - struct sh_stream *s = d->streams[n]; + int num = demux_get_num_stream(d); + for (int n = 0; n < num; n++) { + struct sh_stream *s = demux_get_stream(d, n); if (s->type == t && s->demuxer_id == id) - return d->streams[n]; + return s; } return NULL; } @@ -1205,8 +1255,9 @@ void demuxer_switch_track(struct demuxer *demuxer, enum stream_type type, { assert(!stream || stream->type == type); - for (int n = 0; n < demuxer->num_streams; n++) { - struct sh_stream *cur = demuxer->streams[n]; + int num = demux_get_num_stream(demuxer); + for (int n = 0; n < num; n++) { + struct sh_stream *cur = demux_get_stream(demuxer, n); if (cur->type == type) demuxer_select_track(demuxer, cur, cur == stream); } @@ -1408,8 +1459,8 @@ static int cached_demux_control(struct demux_internal *in, int cmd, void *arg) double *rates = arg; for (int n = 0; n < STREAM_TYPE_COUNT; n++) rates[n] = -1; - for (int n = 0; n < in->d_user->num_streams; n++) { - struct demux_stream *ds = in->d_user->streams[n]->ds; + for (int n = 0; n < in->num_streams; n++) { + struct demux_stream *ds = in->streams[n]->ds; if (ds->selected && ds->bitrate >= 0) rates[ds->type] = MPMAX(0, rates[ds->type]) + ds->bitrate; } @@ -1423,8 +1474,8 @@ static int cached_demux_control(struct demux_internal *in, int cmd, void *arg) .ts_duration = -1, }; int num_packets = 0; - for (int n = 0; n < in->d_user->num_streams; n++) { - struct demux_stream *ds = in->d_user->streams[n]->ds; + for (int n = 0; n < in->num_streams; n++) { + struct demux_stream *ds = in->streams[n]->ds; if (ds->active) { r->underrun |= !ds->head && !ds->eof; r->ts_range[0] = MP_PTS_MAX(r->ts_range[0], ds->base_ts); -- cgit v1.2.3