From f08387c55265a69f2e05c83ca81d71770e642fbc Mon Sep 17 00:00:00 2001 From: wm4 Date: Fri, 17 May 2019 20:53:09 +0200 Subject: demux: remove logic duplication from packet read functions There were 3 packet reading functions: the "old" demux_read_packet() that blocked (leftover from MPlayer times, but was still used until recently by some obscure code), the "new" demux_read_packet_async(), and the special demux_read_any_packet(), that is used by pseudo-demuxers like demux_edl. The first two could be used both in threaded and un-threaded mode. This made 5 cases in total. Some bits of logic was spread across all of them. Unify the logic. A recent commit made demux_read_packet() private, and the code for it in threaded mode disappears. The difference between threaded and un-threaded is minimized. It's possible that this commit causes random regression. Enjoy. --- demux/demux.c | 125 ++++++++++++++++++++++++++-------------------------------- 1 file changed, 57 insertions(+), 68 deletions(-) diff --git a/demux/demux.c b/demux/demux.c index 2fbafa9a15..b033fd504a 100644 --- a/demux/demux.c +++ b/demux/demux.c @@ -1855,22 +1855,52 @@ static struct demux_packet *advance_reader_head(struct demux_stream *ds) return pkt; } -static struct demux_packet *dequeue_packet(struct demux_stream *ds) +// Returns: +// < 0: EOF was reached, *res is not set +// == 0: no new packet yet, wait, *res is not set +// > 0: new packet is moved to *res +static int dequeue_packet(struct demux_stream *ds, struct demux_packet **res) { + struct demux_internal *in = ds->in; + + if (!ds->selected) + return -1; + if (in->blocked) + return 0; + if (ds->sh->attached_picture) { ds->eof = true; if (ds->attached_picture_added) - return NULL; + return -1; ds->attached_picture_added = true; struct demux_packet *pkt = demux_copy_packet(ds->sh->attached_picture); if (!pkt) abort(); pkt->stream = ds->sh->index; - return pkt; + *res = pkt; + return 1; } - if (!ds->reader_head || ds->in->blocked) - return NULL; + + if (ds->eager) { + in->reading = true; // enable readahead + in->eof = false; // force retry + pthread_cond_signal(&in->wakeup); // possibly read more + } + + ds->need_wakeup = !ds->reader_head; + if (!ds->reader_head) { + if (!ds->eager) { + // Non-eager streams temporarily return EOF. If they returned 0, + // the reader would have to wait for new packets, which does not + // make sense due to the sparseness and passiveness of non-eager + // streams. + return -1; + } + return ds->eof ? -1 : 0; + } + struct demux_packet *pkt = advance_reader_head(ds); + assert(pkt); // The returned packet is mutated etc. and will be owned by the user. pkt = demux_copy_packet(pkt); @@ -1924,42 +1954,8 @@ static struct demux_packet *dequeue_packet(struct demux_stream *ds) } prune_old_packets(ds->in); - return pkt; -} - -// Read a packet from the given stream. The returned packet belongs to the -// caller, who has to free it with talloc_free(). Might block. Returns NULL -// on EOF. -static struct demux_packet *demux_read_packet(struct sh_stream *sh) -{ - struct demux_stream *ds = sh ? sh->ds : NULL; - if (!ds) - return NULL; - struct demux_internal *in = ds->in; - pthread_mutex_lock(&in->lock); - if (ds->eager) { - const char *t = stream_type_name(ds->type); - MP_DBG(in, "reading packet for %s\n", t); - in->eof = false; // force retry - ds->need_wakeup = true; - while (ds->selected && !ds->reader_head && !in->blocked) { - in->reading = true; - // Note: the following code marks EOF if it can't continue - if (in->threading) { - MP_VERBOSE(in, "waiting for demux thread (%s)\n", t); - pthread_cond_signal(&in->wakeup); - pthread_cond_wait(&in->wakeup, &in->lock); - } else { - thread_work(in); - } - if (ds->eof) - break; - } - } - struct demux_packet *pkt = dequeue_packet(ds); - pthread_cond_signal(&in->wakeup); // possibly read more - pthread_mutex_unlock(&in->lock); - return pkt; + *res = pkt; + return 1; } // Poll the demuxer queue, and if there's a packet, return it. Otherwise, just @@ -1977,32 +1973,21 @@ static struct demux_packet *demux_read_packet(struct sh_stream *sh) int demux_read_packet_async(struct sh_stream *sh, struct demux_packet **out_pkt) { struct demux_stream *ds = sh ? sh->ds : NULL; - int r = -1; *out_pkt = NULL; if (!ds) - return r; - if (ds->in->threading) { - pthread_mutex_lock(&ds->in->lock); - *out_pkt = dequeue_packet(ds); - if (ds->eager) { - r = *out_pkt ? 1 : (ds->eof ? -1 : 0); - ds->in->reading = true; // enable readahead - ds->in->eof = false; // force retry - pthread_cond_signal(&ds->in->wakeup); // possibly read more - } else { - r = *out_pkt ? 1 : -1; - } - ds->need_wakeup = r != 1; - pthread_mutex_unlock(&ds->in->lock); - } else { - if (ds->in->blocked) { - r = 0; - } else { - *out_pkt = demux_read_packet(sh); - r = *out_pkt ? 1 : -1; - } - ds->need_wakeup = r != 1; + return -1; + struct demux_internal *in = ds->in; + + pthread_mutex_lock(&in->lock); + int r = -1; + while (1) { + r = dequeue_packet(ds, out_pkt); + if (in->threading || in->blocked || r != 0) + break; + // Needs to actually read packets until we got a packet or EOF. + thread_work(in); } + pthread_mutex_unlock(&in->lock); return r; } @@ -2013,16 +1998,20 @@ struct demux_packet *demux_read_any_packet(struct demuxer *demuxer) assert(!in->threading); // doesn't work with threading bool read_more = true; while (read_more && !in->blocked) { + bool all_eof = true; for (int n = 0; n < in->num_streams; n++) { in->reading = true; // force read_packet() to read - struct demux_packet *pkt = dequeue_packet(in->streams[n]->ds); - if (pkt) - return pkt; + struct demux_packet *out_pkt = NULL; + int r = dequeue_packet(in->streams[n]->ds, &out_pkt); + if (r > 0) + return out_pkt; + if (r == 0) + all_eof = false; } // retry after calling this pthread_mutex_lock(&in->lock); // lock only because thread_work unlocks read_more = thread_work(in); - read_more &= !in->eof; + read_more &= !all_eof; pthread_mutex_unlock(&in->lock); } return NULL; -- cgit v1.2.3