From 152a099c3a92a3bef1edcc531f2d3273fb92a8ac Mon Sep 17 00:00:00 2001 From: wm4 Date: Fri, 18 Jul 2014 15:08:05 +0200 Subject: demux: add function to read packets asychronously --- demux/demux.c | 64 ++++++++++++++++++++++++++++++++++++++++++++++------------- demux/demux.h | 1 + 2 files changed, 51 insertions(+), 14 deletions(-) (limited to 'demux') diff --git a/demux/demux.c b/demux/demux.c index 909038d783..dc4caf12f0 100644 --- a/demux/demux.c +++ b/demux/demux.c @@ -415,6 +415,25 @@ static void *demux_thread(void *pctx) return NULL; } +static struct demux_packet *dequeue_packet(struct demux_stream *ds) +{ + if (!ds->head) + return NULL; + struct demux_packet *pkt = ds->head; + ds->head = pkt->next; + pkt->next = NULL; + if (!ds->head) + ds->tail = NULL; + ds->bytes -= pkt->len; + ds->packs--; + + // This implies this function is actually called from "the" user thread. + if (pkt && pkt->pos >= 0) + ds->in->d_user->filepos = pkt->pos; + + return pkt; +} + // Read a packet from the given stream. The returned packet belongs to the // caller, who has to free it with talloc_free(). Might block. Returns NULL // on EOF. @@ -425,26 +444,43 @@ struct demux_packet *demux_read_packet(struct sh_stream *sh) if (ds) { pthread_mutex_lock(&ds->in->lock); ds_get_packets(ds); - if (ds->head) { - pkt = ds->head; - ds->head = pkt->next; - pkt->next = NULL; - if (!ds->head) - ds->tail = NULL; - ds->bytes -= pkt->len; - ds->packs--; - - // This implies this function is actually called from "the" user - // thread. - if (pkt && pkt->pos >= 0) - ds->in->d_user->filepos = pkt->pos; - } + pkt = dequeue_packet(ds); pthread_cond_signal(&ds->in->wakeup); // possibly read more pthread_mutex_unlock(&ds->in->lock); } return pkt; } +// Poll the demuxer queue, and if there's a packet, return it. Otherwise, just +// make the demuxer thread read packets for this stream, and if there's at +// least one packet, call the wakeup callback. +// Returns: +// < 0: EOF was reached, *out_pkt=NULL +// == 0: no new packet yet, but maybe later, *out_pkt=NULL +// > 0: new packet read, *out_pkt is set +int demux_read_packet_async(struct sh_stream *sh, struct demux_packet **out_pkt) +{ + struct demux_stream *ds = sh ? sh->ds : NULL; + int r = -1; + *out_pkt = NULL; + if (ds) { + if (ds->in->threading) { + pthread_mutex_lock(&ds->in->lock); + *out_pkt = dequeue_packet(ds); + r = *out_pkt ? 1 : (ds->eof ? -1 : 0); + if (r < 1 && ds->selected) + ds->active = true; + ds->in->eof = false; // force retry + pthread_cond_signal(&ds->in->wakeup); // possibly read more + pthread_mutex_unlock(&ds->in->lock); + } else { + *out_pkt = demux_read_packet(sh); + r = *out_pkt ? 1 : -1; + } + } + return r; +} + // Return the pts of the next packet that demux_read_packet() would return. // Might block. Sometimes used to force a packet read, without removing any // packets from the queue. diff --git a/demux/demux.h b/demux/demux.h index 13180d2ce7..d4d7a741e2 100644 --- a/demux/demux.h +++ b/demux/demux.h @@ -235,6 +235,7 @@ void free_demuxer(struct demuxer *demuxer); int demux_add_packet(struct sh_stream *stream, demux_packet_t *dp); struct demux_packet *demux_read_packet(struct sh_stream *sh); +int demux_read_packet_async(struct sh_stream *sh, struct demux_packet **out_pkt); bool demux_stream_is_selected(struct sh_stream *stream); double demux_get_next_pts(struct sh_stream *sh); bool demux_has_packet(struct sh_stream *sh); -- cgit v1.2.3