diff options
-rw-r--r-- | DOCS/man/options.rst | 18 | ||||
-rw-r--r-- | demux/demux.c | 710 | ||||
-rw-r--r-- | demux/demux.h | 49 | ||||
-rw-r--r-- | demux/demux_disc.c | 11 | ||||
-rw-r--r-- | demux/demux_lavf.c | 3 | ||||
-rw-r--r-- | demux/stheader.h | 1 | ||||
-rw-r--r-- | options/options.c | 6 | ||||
-rw-r--r-- | options/options.h | 3 | ||||
-rw-r--r-- | player/command.c | 91 | ||||
-rw-r--r-- | player/core.h | 2 | ||||
-rw-r--r-- | player/discnav.c | 31 | ||||
-rw-r--r-- | player/loadfile.c | 64 | ||||
-rw-r--r-- | player/misc.c | 10 | ||||
-rw-r--r-- | player/playloop.c | 24 | ||||
-rw-r--r-- | stream/cache.c | 23 |
15 files changed, 743 insertions, 303 deletions
diff --git a/DOCS/man/options.rst b/DOCS/man/options.rst index b554fecc62..84300eac47 100644 --- a/DOCS/man/options.rst +++ b/DOCS/man/options.rst @@ -778,6 +778,24 @@ OPTIONS ``--demuxer-rawvideo-size=<value>`` Frame size in bytes when using ``--demuxer=rawvideo``. +``--demuxer-thread=<yes|no>`` + Run the demuxer in a separate thread, and let it prefetch a certain amount + of packets (default: yes). + +``--demuxer-readahead-packets=N`` + If ``--demuxer-thread`` is enabled, this controls how much the demuxer + should buffer ahead. If the number of packets in the packet queue exceeds + ``--demuxer-readahead-packets``, or the total number of bytes exceeds + ``--demuxer-readahead-bytes``, the thread stops reading ahead. + + Note that if you set these options near the maximum, you might get a + packet queue overflow warning. + + See ``--list-options`` for defaults and value range. + +``--demuxer-readahead-bytes=N`` + See ``--demuxer-readahead-packets``. + ``--dump-stats=<filename>`` Write certain statistics to the given file. The file is truncated on opening. The file will contain raw samples, each with a timestamp. To diff --git a/demux/demux.c b/demux/demux.c index e716194614..599894ed82 100644 --- a/demux/demux.c +++ b/demux/demux.c @@ -21,6 +21,7 @@ #include <string.h> #include <assert.h> #include <unistd.h> +#include <pthread.h> #include <math.h> @@ -79,19 +80,66 @@ const demuxer_desc_t *const demuxer_list[] = { NULL }; +struct demux_internal { + struct mp_log *log; + + // The demuxer runs potentially in another thread, so we keep two demuxer + // structs; the real demuxer can access the shadow struct only. + // Since demuxer and user threads both don't use locks, a third demuxer + // struct d_buffer is used to copy data between them in a synchronized way. + struct demuxer *d_thread; // accessed by demuxer impl. (producer) + struct demuxer *d_user; // accessed by player (consumer) + struct demuxer *d_buffer; // protected by lock; used to sync d_user/thread + + // The lock protects the packet queues (struct demux_stream), d_buffer, + // and some minor fields like thread_paused. + pthread_mutex_t lock; + pthread_cond_t wakeup; + pthread_t thread; + + // -- All the following fields are protected by lock. + + bool thread_paused; + int thread_request_pause; // counter, if >0, make demuxer thread pause + bool thread_terminate; + bool threading; + void (*wakeup_cb)(void *ctx); + void *wakeup_cb_ctx; + + bool warned_queue_overflow; + bool eof; // last global EOF status + bool autoselect; + int min_packs; + int min_bytes; + + // Cached state. + double time_length; + struct mp_tags *stream_metadata; + int64_t stream_size; + int64_t stream_cache_size; + int64_t stream_cache_fill; + int stream_cache_idle; +}; + struct demux_stream { - struct demuxer *demuxer; - int selected; // user wants packets from this stream - int eof; // end of demuxed stream? (true if all buffer empty) - int packs; // number of packets in buffer - int bytes; // total bytes of packets in buffer + struct demux_internal *in; + enum stream_type type; + // all fields are protected by in->lock + bool selected; // user wants packets from this stream + bool active; // try to keep at least 1 packet queued + bool eof; // end of demuxed stream? (true if all buffer empty) + size_t packs; // number of packets in buffer + size_t bytes; // total bytes of packets in buffer struct demux_packet *head; struct demux_packet *tail; }; -void demuxer_sort_chapters(demuxer_t *demuxer); +static void demuxer_sort_chapters(demuxer_t *demuxer); +static void *demux_thread(void *pctx); +static void update_cache(struct demux_internal *in); -static void ds_free_packs(struct demux_stream *ds) +// called locked +static void ds_flush(struct demux_stream *ds) { demux_packet_t *dp = ds->head; while (dp) { @@ -100,13 +148,16 @@ static void ds_free_packs(struct demux_stream *ds) dp = dn; } ds->head = ds->tail = NULL; - ds->packs = 0; // !!!!! + ds->packs = 0; ds->bytes = 0; - ds->eof = 0; + ds->eof = false; + ds->active = false; } struct sh_stream *new_sh_stream(demuxer_t *demuxer, enum stream_type type) { + assert(demuxer == demuxer->in->d_thread); + if (demuxer->num_streams > MAX_SH_STREAMS) { MP_WARN(demuxer, "Too many streams.\n"); return NULL; @@ -121,13 +172,15 @@ struct sh_stream *new_sh_stream(demuxer_t *demuxer, enum stream_type type) struct sh_stream *sh = talloc_ptrtype(demuxer, sh); *sh = (struct sh_stream) { .type = type, - .demuxer = demuxer, .index = demuxer->num_streams, .demuxer_id = demuxer_id, // may be overwritten by demuxer - .ds = talloc_zero(sh, struct demux_stream), + .ds = talloc(sh, struct demux_stream), + }; + *sh->ds = (struct demux_stream) { + .in = demuxer->in, + .type = sh->type, + .selected = demuxer->in->autoselect, }; - sh->ds->demuxer = demuxer; - sh->ds->selected = demuxer->stream_select_default; MP_TARRAY_APPEND(demuxer, demuxer->streams, demuxer->num_streams, sh); switch (sh->type) { case STREAM_VIDEO: sh->video = talloc_zero(demuxer, struct sh_video); break; @@ -142,49 +195,84 @@ void free_demuxer(demuxer_t *demuxer) { if (!demuxer) return; + struct demux_internal *in = demuxer->in; + assert(demuxer == in->d_user); + + demux_stop_thread(demuxer); + if (demuxer->desc->close) - demuxer->desc->close(demuxer); - // free streams: + demuxer->desc->close(in->d_thread); for (int n = 0; n < demuxer->num_streams; n++) - ds_free_packs(demuxer->streams[n]->ds); + ds_flush(demuxer->streams[n]->ds); + pthread_mutex_destroy(&in->lock); + pthread_cond_destroy(&in->wakeup); talloc_free(demuxer); } -const char *stream_type_name(enum stream_type type) +// Start the demuxer thread, which reads ahead packets on its own. +void demux_start_thread(struct demuxer *demuxer) { - switch (type) { - case STREAM_VIDEO: return "video"; - case STREAM_AUDIO: return "audio"; - case STREAM_SUB: return "sub"; - default: return "unknown"; + struct demux_internal *in = demuxer->in; + assert(demuxer == in->d_user); + + if (!in->threading) { + in->threading = true; + if (pthread_create(&in->thread, NULL, demux_thread, in)) + in->threading = false; } } -static int count_packs(struct demuxer *demux, enum stream_type type) +void demux_stop_thread(struct demuxer *demuxer) { - int c = 0; - for (int n = 0; n < demux->num_streams; n++) - c += demux->streams[n]->type == type ? demux->streams[n]->ds->packs : 0; - return c; + struct demux_internal *in = demuxer->in; + assert(demuxer == in->d_user); + + if (in->threading) { + pthread_mutex_lock(&in->lock); + in->thread_terminate = true; + pthread_cond_signal(&in->wakeup); + pthread_mutex_unlock(&in->lock); + pthread_join(in->thread, NULL); + in->threading = false; + in->thread_terminate = false; + } } -static int count_bytes(struct demuxer *demux, enum stream_type type) +// The demuxer thread will call cb(ctx) if there's a new packet, or EOF is reached. +void demux_set_wakeup_cb(struct demuxer *demuxer, void (*cb)(void *ctx), void *ctx) { - int c = 0; - for (int n = 0; n < demux->num_streams; n++) - c += demux->streams[n]->type == type ? demux->streams[n]->ds->bytes : 0; - return c; + struct demux_internal *in = demuxer->in; + pthread_mutex_lock(&in->lock); + in->wakeup_cb = cb; + in->wakeup_cb_ctx = ctx; + pthread_mutex_unlock(&in->lock); +} + +const char *stream_type_name(enum stream_type type) +{ + switch (type) { + case STREAM_VIDEO: return "video"; + case STREAM_AUDIO: return "audio"; + case STREAM_SUB: return "sub"; + default: return "unknown"; + } } // Returns the same value as demuxer->fill_buffer: 1 ok, 0 EOF/not selected. int demux_add_packet(struct sh_stream *stream, demux_packet_t *dp) { struct demux_stream *ds = stream ? stream->ds : NULL; - if (!dp || !ds || !ds->selected) { + if (!dp || !ds) { + talloc_free(dp); + return 0; + } + struct demux_internal *in = ds->in; + pthread_mutex_lock(&in->lock); + if (!ds->selected) { + pthread_mutex_unlock(&in->lock); talloc_free(dp); return 0; } - struct demuxer *demuxer = ds->demuxer; dp->stream = stream->index; dp->next = NULL; @@ -200,75 +288,131 @@ int demux_add_packet(struct sh_stream *stream, demux_packet_t *dp) ds->head = ds->tail = dp; } // obviously not true anymore - ds->eof = 0; + ds->eof = false; // For video, PTS determination is not trivial, but for other media types // distinguishing PTS and DTS is not useful. if (stream->type != STREAM_VIDEO && dp->pts == MP_NOPTS_VALUE) dp->pts = dp->dts; - if (mp_msg_test(demuxer->log, MSGL_DEBUG)) { - MP_DBG(demuxer, "DEMUX: Append packet to %s, len=%d pts=%5.3f pos=" - "%"PRIi64" [A=%d V=%d S=%d]\n", stream_type_name(stream->type), - dp->len, dp->pts, dp->pos, count_packs(demuxer, STREAM_AUDIO), - count_packs(demuxer, STREAM_VIDEO), count_packs(demuxer, STREAM_SUB)); - } + MP_DBG(in, "append packet to %s: size=%d pts=%f dts=%f pos=%"PRIi64" " + "[num=%zd size=%zd]\n", stream_type_name(stream->type), + dp->len, dp->pts, dp->pts, dp->pos, ds->packs, ds->bytes); + + if (ds->in->wakeup_cb) + ds->in->wakeup_cb(ds->in->wakeup_cb_ctx); + pthread_cond_signal(&in->wakeup); + pthread_mutex_unlock(&in->lock); return 1; } -static bool demux_check_queue_full(demuxer_t *demux) +// Returns true if there was "progress" (lock was released temporarily). +static bool read_packet(struct demux_internal *in) { - for (int n = 0; n < demux->num_streams; n++) { - struct sh_stream *sh = demux->streams[n]; - if (sh->ds->packs > MAX_PACKS || sh->ds->bytes > MAX_PACK_BYTES) - goto overflow; + in->eof = false; + + // Check if we need to read a new packet. We do this if all queues are below + // the minimum, or if a stream explicitly needs new packets. Also includes + // safe-guards against packet queue overflow. + bool active = false, read_more = false; + size_t packs = 0, bytes = 0; + for (int n = 0; n < in->d_buffer->num_streams; n++) { + struct demux_stream *ds = in->d_buffer->streams[n]->ds; + active |= ds->selected; + read_more |= ds->active && !ds->head; + packs += ds->packs; + bytes += ds->bytes; } - return false; - -overflow: - - if (!demux->warned_queue_overflow) { - MP_ERR(demux, "Too many packets in the demuxer " - "packet queue (video: %d packets in %d bytes, audio: %d " - "packets in %d bytes, sub: %d packets in %d bytes).\n", - count_packs(demux, STREAM_VIDEO), count_bytes(demux, STREAM_VIDEO), - count_packs(demux, STREAM_AUDIO), count_bytes(demux, STREAM_AUDIO), - count_packs(demux, STREAM_SUB), count_bytes(demux, STREAM_SUB)); - MP_INFO(demux, "Maybe you are playing a non-" - "interleaved stream/file or the codec failed?\n"); + MP_DBG(in, "packets=%zd, bytes=%zd, active=%d, more=%d\n", + packs, bytes, active, read_more); + if (packs >= MAX_PACKS || bytes >= MAX_PACK_BYTES) { + if (!in->warned_queue_overflow) { + in->warned_queue_overflow = true; + MP_ERR(in, "Too many packets in the demuxer packet queues:\n"); + for (int n = 0; n < in->d_buffer->num_streams; n++) { + struct demux_stream *ds = in->d_buffer->streams[n]->ds; + if (ds->selected) { + MP_ERR(in, " %s/%d: %zd packets, %zd bytes\n", + stream_type_name(ds->type), n, ds->packs, ds->bytes); + } + } + } + for (int n = 0; n < in->d_buffer->num_streams; n++) { + struct demux_stream *ds = in->d_buffer->streams[n]->ds; + ds->eof |= !ds->head; + } + pthread_cond_signal(&in->wakeup); + return false; + } + if (packs < in->min_packs && bytes < in->min_bytes) + read_more |= active; + + if (!read_more) + return false; + + // Actually read a packet. Drop the lock while doing so, because waiting + // for disk or network I/O can take time. + pthread_mutex_unlock(&in->lock); + struct demuxer *demux = in->d_thread; + bool eof = !demux->desc->fill_buffer || demux->desc->fill_buffer(demux) <= 0; + pthread_mutex_lock(&in->lock); + + update_cache(in); + + in->eof = eof; + if (in->eof) { + for (int n = 0; n < in->d_buffer->num_streams; n++) { + struct demux_stream *ds = in->d_buffer->streams[n]->ds; + ds->eof = true; + } + pthread_cond_signal(&in->wakeup); + MP_VERBOSE(in, "EOF reached.\n"); } - demux->warned_queue_overflow = true; return true; } -// return value: -// 0 = EOF or no stream found or invalid type -// 1 = successfully read a packet - -static int demux_fill_buffer(demuxer_t *demux) +// must be called locked; may temporarily unlock +static void ds_get_packets(struct demux_stream *ds) { - return demux->desc->fill_buffer ? demux->desc->fill_buffer(demux) : 0; + const char *t = stream_type_name(ds->type); + struct demux_internal *in = ds->in; + MP_DBG(in, "reading packet for %s\n", t); + in->eof = false; // force retry + ds->eof = false; + while (ds->selected && !ds->head && !ds->eof) { + ds->active = true; + // Note: the following code marks EOF if it can't continue + if (in->threading) { + MP_VERBOSE(in, "waiting for demux thread (%s)\n", t); + pthread_cond_signal(&in->wakeup); + pthread_cond_wait(&in->wakeup, &in->lock); + } else { + read_packet(in); + } + } } -static void ds_get_packets(struct sh_stream *sh) +static void *demux_thread(void *pctx) { - struct demux_stream *ds = sh->ds; - demuxer_t *demux = sh->demuxer; - MP_TRACE(demux, "ds_get_packets (%s) called\n", - stream_type_name(sh->type)); - while (1) { - if (ds->head) - return; - - if (demux_check_queue_full(demux)) - break; - - if (!demux_fill_buffer(demux)) - break; // EOF + struct demux_internal *in = pctx; + pthread_mutex_lock(&in->lock); + while (!in->thread_terminate) { + in->thread_paused = in->thread_request_pause > 0; + if (in->thread_paused) { + pthread_cond_signal(&in->wakeup); + pthread_cond_wait(&in->wakeup, &in->lock); + continue; + } + if (!in->eof) { + if (read_packet(in)) + continue; // read_packet unlocked, so recheck conditions + } + update_cache(in); + pthread_cond_signal(&in->wakeup); + pthread_cond_wait(&in->wakeup, &in->lock); } - MP_VERBOSE(demux, "ds_get_packets: EOF reached (stream: %s)\n", - stream_type_name(sh->type)); - ds->eof = 1; + pthread_mutex_unlock(&in->lock); + return NULL; } // Read a packet from the given stream. The returned packet belongs to the @@ -277,10 +421,12 @@ static void ds_get_packets(struct sh_stream *sh) struct demux_packet *demux_read_packet(struct sh_stream *sh) { struct demux_stream *ds = sh ? sh->ds : NULL; + struct demux_packet *pkt = NULL; if (ds) { - ds_get_packets(sh); - struct demux_packet *pkt = ds->head; - if (pkt) { + pthread_mutex_lock(&ds->in->lock); + ds_get_packets(ds); + if (ds->head) { + pkt = ds->head; ds->head = pkt->next; pkt->next = NULL; if (!ds->head) @@ -288,13 +434,15 @@ struct demux_packet *demux_read_packet(struct sh_stream *sh) ds->bytes -= pkt->len; ds->packs--; + // This implies this function is actually called from "the" user + // thread. if (pkt && pkt->pos >= 0) - sh->demuxer->filepos = pkt->pos; - - return pkt; + ds->in->d_user->filepos = pkt->pos; } + pthread_cond_signal(&ds->in->wakeup); // possibly read more + pthread_mutex_unlock(&ds->in->lock); } - return NULL; + return pkt; } // Return the pts of the next packet that demux_read_packet() would return. @@ -302,37 +450,55 @@ struct demux_packet *demux_read_packet(struct sh_stream *sh) // packets from the queue. double demux_get_next_pts(struct sh_stream *sh) { - if (sh && sh->ds->selected) { - ds_get_packets(sh); + double res = MP_NOPTS_VALUE; + if (sh) { + pthread_mutex_lock(&sh->ds->in->lock); + ds_get_packets(sh->ds); if (sh->ds->head) - return sh->ds->head->pts; + res = sh->ds->head->pts; + pthread_mutex_unlock(&sh->ds->in->lock); } - return MP_NOPTS_VALUE; + return res; } // Return whether a packet is queued. Never blocks, never forces any reads. bool demux_has_packet(struct sh_stream *sh) { - return sh && sh->ds->head; + bool has_packet = false; + if (sh) { + pthread_mutex_lock(&sh->ds->in->lock); + has_packet = sh->ds->head; + pthread_mutex_unlock(&sh->ds->in->lock); + } + return has_packet; } // Return whether EOF was returned with an earlier packet read. bool demux_stream_eof(struct sh_stream *sh) { - return !sh || sh->ds->eof; + bool eof = false; + if (sh) { + pthread_mutex_lock(&sh->ds->in->lock); + eof = sh->ds->eof && !sh->ds->head; + pthread_mutex_unlock(&sh->ds->in->lock); + } + return eof; } // Read and return any packet we find. struct demux_packet *demux_read_any_packet(struct demuxer *demuxer) { + assert(!demuxer->in->threading); // doesn't work with threading for (int retry = 0; retry < 2; retry++) { for (int n = 0; n < demuxer->num_streams; n++) { struct sh_stream *sh = demuxer->streams[n]; - if (sh->ds->head) + if (demux_has_packet(sh)) return demux_read_packet(sh); } // retry after calling this - demux_fill_buffer(demuxer); + pthread_mutex_lock(&demuxer->in->lock); + read_packet(demuxer->in); + pthread_mutex_unlock(&demuxer->in->lock); } return NULL; } @@ -440,6 +606,80 @@ static void demux_export_replaygain(demuxer_t *demuxer) } } +// Copy all fields from src to dst, depending on event flags. +static void demux_copy(struct demuxer *dst, struct demuxer *src) +{ + if (src->events & DEMUX_EVENT_INIT) { + // Note that we do as shallow copies as possible. We expect the date + // that is not-copied (only referenced) to be immutable. + // This implies e.g. that no chapters are added after initialization. + dst->chapters = src->chapters; + dst->num_chapters = src->num_chapters; + dst->editions = src->editions; + dst->num_editions = src->num_editions; + dst->edition = src->edition; + dst->attachments = src->attachments; + dst->num_attachments = src->num_attachments; + dst->matroska_data = src->matroska_data; + dst->file_contents = src->file_contents; + dst->playlist = src->playlist; + dst->seekable = src->seekable; + dst->filetype = src->filetype; + dst->ts_resets_possible = src->ts_resets_possible; + dst->start_time = src->start_time; + } + if (src->events & DEMUX_EVENT_STREAMS) { + // The stream structs themselves are immutable. + for (int n = dst->num_streams; n < src->num_streams; n++) + MP_TARRAY_APPEND(dst, dst->streams, dst->num_streams, src->streams[n]); + } + if (src->events & DEMUX_EVENT_METADATA) { + talloc_free(dst->metadata); + dst->metadata = mp_tags_dup(dst, src->metadata); + } + dst->events |= src->events; + src->events = 0; +} + +// This is called by demuxer implementations if certain parameters change +// at runtime. +// events is one of DEMUX_EVENT_* +// The code will copy the fields references by the events to the user-thread. +void demux_changed(demuxer_t *demuxer, int events) +{ + assert(demuxer == demuxer->in->d_thread); // call from demuxer impl. only + struct demux_internal *in = demuxer->in; + + demuxer->events |= events; + + pthread_mutex_lock(&in->lock); + + update_cache(in); + + if (demuxer->events & DEMUX_EVENT_INIT) + demuxer_sort_chapters(demuxer); + if (demuxer->events & (DEMUX_EVENT_METADATA | DEMUX_EVENT_STREAMS)) + demux_export_replaygain(demuxer); + + demux_copy(in->d_buffer, demuxer); + + pthread_mutex_unlock(&in->lock); +} + +// Called by the user thread (i.e. player) to update metadata and other things +// from the demuxer thread. +void demux_update(demuxer_t *demuxer) +{ + assert(demuxer == demuxer->in->d_user); + struct demux_internal *in = demuxer->in; + + pthread_mutex_lock(&in->lock); + demux_copy(demuxer, in->d_buffer); + if (in->stream_metadata && (demuxer->events & DEMUX_EVENT_METADATA)) + mp_tags_merge(demuxer->metadata, in->stream_metadata); + pthread_mutex_unlock(&in->lock); +} + static struct demuxer *open_given_type(struct mpv_global *global, struct mp_log *log, const struct demuxer_desc *desc, @@ -459,32 +699,50 @@ static struct demuxer *open_given_type(struct mpv_global *global, .log = mp_log_new(demuxer, log, desc->name), .glog = log, .filename = talloc_strdup(demuxer, stream->url), - .metadata = talloc_zero(demuxer, struct mp_tags), - .events = DEMUX_EVENT_METADATA, + .events = DEMUX_EVENT_ALL, + }; + struct demux_internal *in = demuxer->in = talloc_ptrtype(demuxer, in); + *in = (struct demux_internal){ + .log = demuxer->log, + .d_thread = talloc(demuxer, struct demuxer), + .d_buffer = talloc(demuxer, struct demuxer), + .d_user = demuxer, + .min_packs = demuxer->opts->demuxer_min_packs, + .min_bytes = demuxer->opts->demuxer_min_bytes, }; - demuxer->params = params; // temporary during open() + pthread_mutex_init(&in->lock, NULL); + pthread_cond_init(&in->wakeup, NULL); + + *in->d_thread = *demuxer; + *in->d_buffer = *demuxer; + + in->d_thread->metadata = talloc_zero(in->d_thread, struct mp_tags); + in->d_user->metadata = talloc_zero(in->d_user, struct mp_tags); + in->d_buffer->metadata = talloc_zero(in->d_buffer, struct mp_tags); + int64_t start_pos = stream_tell(stream); mp_verbose(log, "Trying demuxer: %s (force-level: %s)\n", desc->name, d_level(check)); - int ret = demuxer->desc->open(demuxer, check); + in->d_thread->params = params; // temporary during open() + + int ret = demuxer->desc->open(in->d_thread, check); if (ret >= 0) { - demuxer->params = NULL; - if (demuxer->filetype) + in->d_thread->params = NULL; + if (in->d_thread->filetype) mp_verbose(log, "Detected file format: %s (%s)\n", - demuxer->filetype, desc->desc); + in->d_thread->filetype, desc->desc); else mp_verbose(log, "Detected file format: %s\n", desc->desc); - demuxer_sort_chapters(demuxer); - demux_info_update(demuxer); - demux_export_replaygain(demuxer); // Pretend we can seek if we can't seek, but there's a cache. - if (!demuxer->seekable && stream->uncached_stream) { + if (!in->d_thread->seekable && stream->uncached_stream) { mp_warn(log, "File is not seekable, but there's a cache: enabling seeking.\n"); - demuxer->seekable = true; + in->d_thread->seekable = true; } + demux_changed(in->d_thread, DEMUX_EVENT_ALL); + demux_update(demuxer); return demuxer; } @@ -552,9 +810,12 @@ done: void demux_flush(demuxer_t *demuxer) { + pthread_mutex_lock(&demuxer->in->lock); for (int n = 0; n < demuxer->num_streams; n++) - ds_free_packs(demuxer->streams[n]->ds); - demuxer->warned_queue_overflow = false; + ds_flush(demuxer->streams[n]->ds); + demuxer->in->warned_queue_overflow = false; + demuxer->in->eof = false; + pthread_mutex_unlock(&demuxer->in->lock); } int demux_seek(demuxer_t *demuxer, float rel_seek_secs, int flags) @@ -567,29 +828,17 @@ int demux_seek(demuxer_t *demuxer, float rel_seek_secs, int flags) if (rel_seek_secs == MP_NOPTS_VALUE && (flags & SEEK_ABSOLUTE)) return 0; + demux_pause(demuxer); + // clear the packet queues demux_flush(demuxer); if (demuxer->desc->seek) - demuxer->desc->seek(demuxer, rel_seek_secs, flags); + demuxer->desc->seek(demuxer->in->d_thread, rel_seek_secs, flags); - return 1; -} + demux_unpause(demuxer); -static int demux_info_print(demuxer_t *demuxer) -{ - struct mp_tags *info = demuxer->metadata; - int n; - - if (!info || !info->num_keys) - return 0; - - mp_info(demuxer->glog, "File tags:\n"); - for (n = 0; n < info->num_keys; n++) { - mp_info(demuxer->glog, " %s: %s\n", info->keys[n], info->values[n]); - } - - return 0; + return 1; } char *demux_info_get(demuxer_t *demuxer, const char *opt) @@ -597,35 +846,6 @@ char *demux_info_get(demuxer_t *demuxer, const char *opt) return mp_tags_get_str(demuxer->metadata, opt); } -bool demux_info_update(struct demuxer *demuxer) -{ - bool r = false; - // Take care of stream metadata as well - struct mp_tags *s_meta = NULL; - if (stream_control(demuxer->stream, STREAM_CTRL_GET_METADATA, &s_meta) > 0) { - talloc_free(demuxer->stream_metadata); - demuxer->stream_metadata = talloc_steal(demuxer, s_meta); - demuxer->events |= DEMUX_EVENT_METADATA; - } - if (demuxer->events & DEMUX_EVENT_METADATA) { - demuxer->events &= ~DEMUX_EVENT_METADATA; - if (demuxer->stream_metadata) - mp_tags_merge(demuxer->metadata, demuxer->stream_metadata); - demux_info_print(demuxer); - r = true; - } - return r; -} - -int demux_control(demuxer_t *demuxer, int cmd, void *arg) -{ - - if (demuxer->desc->control) - return demuxer->desc->control(demuxer, cmd, arg); - - return DEMUXER_CTRL_NOTIMPL; -} - struct sh_stream *demuxer_stream_by_demuxer_id(struct demuxer *d, enum stream_type t, int id) { @@ -653,16 +873,34 @@ void demuxer_select_track(struct demuxer *demuxer, struct sh_stream *stream, bool selected) { // don't flush buffers if stream is already selected / unselected + pthread_mutex_lock(&demuxer->in->lock); + bool update = false; if (stream->ds->selected != selected) { stream->ds->selected = selected; - ds_free_packs(stream->ds); - demux_control(demuxer, DEMUXER_CTRL_SWITCHED_TRACKS, NULL); + stream->ds->active = false; + ds_flush(stream->ds); + update = true; } + pthread_mutex_unlock(&demuxer->in->lock); + if (update) + demux_control(demuxer, DEMUXER_CTRL_SWITCHED_TRACKS, NULL); +} + +void demux_set_stream_autoselect(struct demuxer *demuxer, bool autoselect) +{ + assert(!demuxer->in->threading); // laziness + demuxer->in->autoselect = autoselect; } bool demux_stream_is_selected(struct sh_stream *stream) { - return stream && stream->ds->selected; + if (!stream) + return false; + bool r = false; + pthread_mutex_lock(&stream->ds->in->lock); + r = stream->ds->selected; + pthread_mutex_unlock(&stream->ds->in->lock); + return r; } int demuxer_add_attachment(demuxer_t *demuxer, struct bstr name, @@ -696,7 +934,7 @@ static int chapter_compare(const void *p1, const void *p2) return c1->original_index > c2->original_index ? 1 :-1; // never equal } -void demuxer_sort_chapters(demuxer_t *demuxer) +static void demuxer_sort_chapters(demuxer_t *demuxer) { qsort(demuxer->chapters, demuxer->num_chapters, sizeof(struct demux_chapter), chapter_compare); @@ -725,3 +963,151 @@ double demuxer_get_time_length(struct demuxer *demuxer) return len; return -1; } + +// must be called locked +static void update_cache(struct demux_internal *in) +{ + struct demuxer *demuxer = in->d_thread; + struct stream *stream = demuxer->stream; + + in->time_length = -1; + if (demuxer->desc->control) { + demuxer->desc->control(demuxer, DEMUXER_CTRL_GET_TIME_LENGTH, + &in->time_length); + } + + struct mp_tags *s_meta = NULL; + stream_control(stream, STREAM_CTRL_GET_METADATA, &s_meta); + if (s_meta) { + talloc_free(in->stream_metadata); + in->stream_metadata = talloc_steal(in, s_meta); + in->d_buffer->events |= DEMUX_EVENT_METADATA; + } + + in->stream_size = -1; + stream_control(stream, STREAM_CTRL_GET_SIZE, &in->stream_size); + in->stream_cache_size = -1; + stream_control(stream, STREAM_CTRL_GET_CACHE_SIZE, &in->stream_cache_size); + in->stream_cache_fill = -1; + stream_control(stream, STREAM_CTRL_GET_CACHE_FILL, &in->stream_cache_fill); + in->stream_cache_idle = -1; + stream_control(stream, STREAM_CTRL_GET_CACHE_IDLE, &in->stream_cache_idle); +} + +// must be called locked +static int cached_stream_control(struct demux_internal *in, int cmd, void *arg) +{ + switch (cmd) { + case STREAM_CTRL_GET_CACHE_SIZE: + if (in->stream_cache_size < 0) + return STREAM_UNSUPPORTED; + *(int64_t *)arg = in->stream_cache_size; + return STREAM_OK; + case STREAM_CTRL_GET_CACHE_FILL: + if (in->stream_cache_fill < 0) + return STREAM_UNSUPPORTED; + *(int64_t *)arg = in->stream_cache_fill; + return STREAM_OK; + case STREAM_CTRL_GET_CACHE_IDLE: + if (in->stream_cache_idle < 0) + return STREAM_UNSUPPORTED; + *(int *)arg = in->stream_cache_idle; + return STREAM_OK; + case STREAM_CTRL_GET_SIZE: + if (in->stream_size < 0) + return STREAM_UNSUPPORTED; + *(int64_t *)arg = in->stream_size; + return STREAM_OK; + } + return STREAM_ERROR; +} + +// must be called locked +static int cached_demux_control(struct demux_internal *in, int cmd, void *arg) +{ + switch (cmd) { + case DEMUXER_CTRL_GET_TIME_LENGTH: + if (in->time_length < 0) + return DEMUXER_CTRL_NOTIMPL; + *(double *)arg = in->time_length; + |