From b9be20b529de8f47e795d9a869299548d8788528 Mon Sep 17 00:00:00 2001 From: wm4 Date: Fri, 7 Sep 2018 15:12:24 +0200 Subject: demux: return packets directly from demuxer instead of using sh_stream Preparation for other potential changes to separate demuxer cache/thread and actual demuxers. Most things are untested, but it seems to work somewhat. --- demux/demux.c | 20 +++++++++++-------- demux/demux.h | 7 +++++-- demux/demux_lavf.c | 21 +++++++++++--------- demux/demux_mf.c | 15 +++++++-------- demux/demux_mkv.c | 52 ++++++++++++++++++++++++++++++++++++-------------- demux/demux_raw.c | 16 +++++++++------- demux/demux_timeline.c | 17 +++++++++-------- demux/packet.h | 2 +- 8 files changed, 93 insertions(+), 57 deletions(-) (limited to 'demux') diff --git a/demux/demux.c b/demux/demux.c index 8a1e9b0b82..daf59ff8f4 100644 --- a/demux/demux.c +++ b/demux/demux.c @@ -355,6 +355,7 @@ struct mp_packet_tags { static void demuxer_sort_chapters(demuxer_t *demuxer); static void *demux_thread(void *pctx); static void update_cache(struct demux_internal *in); +static void add_packet_locked(struct sh_stream *stream, demux_packet_t *dp); #if 0 // very expensive check for redundant cached queue state @@ -1150,9 +1151,9 @@ void demuxer_feed_caption(struct sh_stream *stream, demux_packet_t *dp) dp->keyframe = true; dp->pts = MP_ADD_PTS(dp->pts, -in->ts_offset); dp->dts = MP_ADD_PTS(dp->dts, -in->ts_offset); + add_packet_locked(sh, dp); pthread_mutex_unlock(&in->lock); - demux_add_packet(sh, dp); } // Add the keyframe to the end of the index. Not all packets are actually added. @@ -1398,15 +1399,14 @@ static void adjust_seek_range_on_packet(struct demux_stream *ds, attempt_range_joining(ds->in); } -void demux_add_packet(struct sh_stream *stream, demux_packet_t *dp) +static void add_packet_locked(struct sh_stream *stream, demux_packet_t *dp) { struct demux_stream *ds = stream ? stream->ds : NULL; - if (!dp || !dp->len || !ds || demux_cancel_test(ds->in->d_thread)) { + if (!dp->len || demux_cancel_test(ds->in->d_thread)) { talloc_free(dp); return; } struct demux_internal *in = ds->in; - pthread_mutex_lock(&in->lock); in->initial_state = false; @@ -1436,7 +1436,6 @@ void demux_add_packet(struct sh_stream *stream, demux_packet_t *dp) } if (drop) { - pthread_mutex_unlock(&in->lock); talloc_free(dp); return; } @@ -1542,7 +1541,6 @@ void demux_add_packet(struct sh_stream *stream, demux_packet_t *dp) } wakeup_ds(ds); - pthread_mutex_unlock(&in->lock); } // Returns true if there was "progress" (lock was released temporarily). @@ -1614,14 +1612,20 @@ static bool read_packet(struct demux_internal *in) pthread_mutex_unlock(&in->lock); struct demuxer *demux = in->d_thread; + struct demux_packet *pkt = NULL; bool eof = true; - if (demux->desc->fill_buffer && !demux_cancel_test(demux)) - eof = demux->desc->fill_buffer(demux) <= 0; + if (demux->desc->read_packet && !demux_cancel_test(demux)) + eof = !demux->desc->read_packet(demux, &pkt); update_cache(in); pthread_mutex_lock(&in->lock); + if (pkt) { + assert(pkt->stream >= 0 && pkt->stream < in->num_streams); + add_packet_locked(in->streams[pkt->stream], pkt); + } + if (!in->seeking) { if (eof) { for (int n = 0; n < in->num_streams; n++) { diff --git a/demux/demux.h b/demux/demux.h index 3a4b7dd331..b8d5cb5ee0 100644 --- a/demux/demux.h +++ b/demux/demux.h @@ -111,7 +111,11 @@ typedef struct demuxer_desc { // Return 0 on success, otherwise -1 int (*open)(struct demuxer *demuxer, enum demux_check check); // The following functions are all optional - int (*fill_buffer)(struct demuxer *demuxer); // 0 on EOF, otherwise 1 + // Try to read a packet. Return false on EOF. If true is returned, the + // demuxer may set *pkt to a new packet (the reference goes to the caller). + // If *pkt is NULL (the value when this function is called), the call + // will be repeated. + bool (*read_packet)(struct demuxer *demuxer, struct demux_packet **pkt); void (*close)(struct demuxer *demuxer); void (*seek)(struct demuxer *demuxer, double rel_seek_secs, int flags); int (*control)(struct demuxer *demuxer, int cmd, void *arg); @@ -252,7 +256,6 @@ struct demux_free_async_state *demux_free_async(struct demuxer *demuxer); void demux_free_async_force(struct demux_free_async_state *state); bool demux_free_async_finish(struct demux_free_async_state *state); -void demux_add_packet(struct sh_stream *stream, demux_packet_t *dp); void demuxer_feed_caption(struct sh_stream *stream, demux_packet_t *dp); struct demux_packet *demux_read_packet(struct sh_stream *sh); diff --git a/demux/demux_lavf.c b/demux/demux_lavf.c index bccc0c53a1..19997a1855 100644 --- a/demux/demux_lavf.c +++ b/demux/demux_lavf.c @@ -1066,7 +1066,8 @@ static int demux_open_lavf(demuxer_t *demuxer, enum demux_check check) return 0; } -static int demux_lavf_fill_buffer(demuxer_t *demux) +static bool demux_lavf_read_packet(struct demuxer *demux, + struct demux_packet **mp_pkt) { lavf_priv_t *priv = demux->priv; @@ -1076,11 +1077,11 @@ static int demux_lavf_fill_buffer(demuxer_t *demux) if (r < 0) { av_packet_unref(pkt); if (r == AVERROR(EAGAIN)) - return 1; + return true; if (r == AVERROR_EOF) - return 0; + return false; MP_WARN(demux, "error reading packet.\n"); - return -1; + return false; } add_new_streams(demux); @@ -1092,13 +1093,13 @@ static int demux_lavf_fill_buffer(demuxer_t *demux) if (!demux_stream_is_selected(stream)) { av_packet_unref(pkt); - return 1; // don't signal EOF if skipping a packet + return true; // don't signal EOF if skipping a packet } struct demux_packet *dp = new_demux_packet_from_avpacket(pkt); if (!dp) { av_packet_unref(pkt); - return 1; + return true; } if (pkt->pts != AV_NOPTS_VALUE) @@ -1117,8 +1118,10 @@ static int demux_lavf_fill_buffer(demuxer_t *demux) if (priv->format_hack.clear_filepos) dp->pos = -1; - demux_add_packet(stream, dp); - return 1; + dp->stream = stream->index; + + *mp_pkt = dp; + return true; } static void demux_seek_lavf(demuxer_t *demuxer, double seek_pts, int flags) @@ -1228,7 +1231,7 @@ static void demux_close_lavf(demuxer_t *demuxer) const demuxer_desc_t demuxer_desc_lavf = { .name = "lavf", .desc = "libavformat", - .fill_buffer = demux_lavf_fill_buffer, + .read_packet = demux_lavf_read_packet, .open = demux_open_lavf, .close = demux_close_lavf, .seek = demux_seek_lavf, diff --git a/demux/demux_mf.c b/demux/demux_mf.c index c4995a66c5..7da07c793a 100644 --- a/demux/demux_mf.c +++ b/demux/demux_mf.c @@ -173,14 +173,12 @@ static void demux_seek_mf(demuxer_t *demuxer, double seek_pts, int flags) mf->curr_frame = newpos; } -// return value: -// 0 = EOF or no stream found -// 1 = successfully read a packet -static int demux_mf_fill_buffer(demuxer_t *demuxer) +static bool demux_mf_read_packet(struct demuxer *demuxer, + struct demux_packet **pkt) { mf_t *mf = demuxer->priv; if (mf->curr_frame >= mf->nr_of_files) - return 0; + return false; struct stream *entry_stream = NULL; if (mf->streams) @@ -201,7 +199,8 @@ static int demux_mf_fill_buffer(demuxer_t *demuxer) memcpy(dp->buffer, data.start, data.len); dp->pts = mf->curr_frame / mf->sh->codec->fps; dp->keyframe = true; - demux_add_packet(mf->sh, dp); + dp->stream = mf->sh->index; + *pkt = dp; } } talloc_free(data.start); @@ -211,7 +210,7 @@ static int demux_mf_fill_buffer(demuxer_t *demuxer) free_stream(stream); mf->curr_frame++; - return 1; + return true; } // map file extension/type to a codec name @@ -350,7 +349,7 @@ static void demux_close_mf(demuxer_t *demuxer) const demuxer_desc_t demuxer_desc_mf = { .name = "mf", .desc = "image files (mf)", - .fill_buffer = demux_mf_fill_buffer, + .read_packet = demux_mf_read_packet, .open = demux_open_mf, .close = demux_close_mf, .seek = demux_seek_mf, diff --git a/demux/demux_mkv.c b/demux/demux_mkv.c index a62057dfa0..4ffe804faf 100644 --- a/demux/demux_mkv.c +++ b/demux/demux_mkv.c @@ -210,6 +210,10 @@ typedef struct mkv_demuxer { // temporary data, and not normally larger than 0 or 1 elements. struct block_info *blocks; int num_blocks; + + // Packets to return. + struct demux_packet **packets; + int num_packets; } mkv_demuxer_t; #define OPT_BASE_STRUCT struct demux_mkv_opts @@ -256,6 +260,17 @@ static void probe_first_timestamp(struct demuxer *demuxer); static int read_next_block_into_queue(demuxer_t *demuxer); static void free_block(struct block_info *block); +static void add_packet(struct demuxer *demuxer, struct sh_stream *stream, + struct demux_packet *pkt) +{ + mkv_demuxer_t *mkv_d = demuxer->priv; + if (!pkt) + return; + + pkt->stream = stream->index; + MP_TARRAY_APPEND(mkv_d, mkv_d->packets, mkv_d->num_packets, pkt); +} + #define AAC_SYNC_EXTENSION_TYPE 0x02b7 static int aac_get_sample_rate_index(uint32_t sample_rate) { @@ -2312,7 +2327,7 @@ static bool handle_realaudio(demuxer_t *demuxer, mkv_track_t *track, track->audio_timestamp[x * apk_usize / w]; dp->pos = orig->pos + x; dp->keyframe = !x; // Mark first packet as keyframe - demux_add_packet(track->stream, dp); + add_packet(demuxer, track->stream, dp); } } @@ -2337,6 +2352,10 @@ static void mkv_seek_reset(demuxer_t *demuxer) free_block(&mkv_d->blocks[n]); mkv_d->num_blocks = 0; + for (int n = 0; n < mkv_d->num_packets; n++) + talloc_free(mkv_d->packets[n]); + mkv_d->num_packets = 0; + mkv_d->skip_to_timecode = INT64_MIN; } @@ -2440,7 +2459,7 @@ static void mkv_parse_and_add_packet(demuxer_t *demuxer, mkv_track_t *track, if (new) { demux_packet_copy_attribs(new, dp); talloc_free(dp); - demux_add_packet(stream, new); + add_packet(demuxer, stream, new); return; } } @@ -2455,7 +2474,7 @@ static void mkv_parse_and_add_packet(demuxer_t *demuxer, mkv_track_t *track, memcpy(new->buffer + 8, dp->buffer, dp->len); demux_packet_copy_attribs(new, dp); talloc_free(dp); - demux_add_packet(stream, new); + add_packet(demuxer, stream, new); return; } } @@ -2469,7 +2488,7 @@ static void mkv_parse_and_add_packet(demuxer_t *demuxer, mkv_track_t *track, } if (!track->parse || !track->av_parser || !track->av_parser_codec) { - demux_add_packet(stream, dp); + add_packet(demuxer, stream, dp); return; } @@ -2503,13 +2522,13 @@ static void mkv_parse_and_add_packet(demuxer_t *demuxer, mkv_track_t *track, new->dts = track->av_parser->dts == AV_NOPTS_VALUE ? MP_NOPTS_VALUE : track->av_parser->dts / tb; } - demux_add_packet(stream, new); + add_packet(demuxer, stream, new); } pts = dts = AV_NOPTS_VALUE; } if (dp->len) { - demux_add_packet(stream, dp); + add_packet(demuxer, stream, dp); } else { talloc_free(dp); } @@ -2897,19 +2916,26 @@ static int read_next_block(demuxer_t *demuxer, struct block_info *block) return 1; } -static int demux_mkv_fill_buffer(demuxer_t *demuxer) +static bool demux_mkv_read_packet(struct demuxer *demuxer, + struct demux_packet **pkt) { + struct mkv_demuxer *mkv_d = demuxer->priv; + for (;;) { + if (mkv_d->num_packets) { + *pkt = mkv_d->packets[0]; + MP_TARRAY_REMOVE_AT(mkv_d->packets, mkv_d->num_packets, 0); + return true; + } + int res; struct block_info block; res = read_next_block(demuxer, &block); if (res < 0) - return 0; + return false; if (res > 0) { - res = handle_block(demuxer, &block); + handle_block(demuxer, &block); free_block(&block); - if (res > 0) - return 1; } } } @@ -3130,8 +3156,6 @@ static void demux_mkv_seek(demuxer_t *demuxer, double seek_pts, int flags) mkv_d->v_skip_to_keyframe = st_active[STREAM_VIDEO]; mkv_d->a_skip_to_keyframe = st_active[STREAM_AUDIO]; mkv_d->a_skip_preroll = mkv_d->a_skip_to_keyframe; - - demux_mkv_fill_buffer(demuxer); } static void probe_last_timestamp(struct demuxer *demuxer, int64_t start_pos) @@ -3238,7 +3262,7 @@ const demuxer_desc_t demuxer_desc_matroska = { .name = "mkv", .desc = "Matroska", .open = demux_mkv_open, - .fill_buffer = demux_mkv_fill_buffer, + .read_packet = demux_mkv_read_packet, .close = mkv_free, .seek = demux_mkv_seek, .load_timeline = build_ordered_chapter_timeline, diff --git a/demux/demux_raw.c b/demux/demux_raw.c index 90b071dd3d..52d3ee268e 100644 --- a/demux/demux_raw.c +++ b/demux/demux_raw.c @@ -269,17 +269,17 @@ static int demux_rawvideo_open(demuxer_t *demuxer, enum demux_check check) return generic_open(demuxer); } -static int raw_fill_buffer(demuxer_t *demuxer) +static bool raw_read_packet(struct demuxer *demuxer, struct demux_packet **pkt) { struct priv *p = demuxer->priv; if (demuxer->stream->eof) - return 0; + return false; struct demux_packet *dp = new_demux_packet(p->frame_size * p->read_frames); if (!dp) { MP_ERR(demuxer, "Can't read packet.\n"); - return 1; + return true; } dp->pos = stream_tell(demuxer->stream); @@ -287,9 +287,11 @@ static int raw_fill_buffer(demuxer_t *demuxer) int len = stream_read(demuxer->stream, dp->buffer, dp->len); demux_packet_shorten(dp, len); - demux_add_packet(p->sh, dp); - return 1; + dp->stream = p->sh->index; + *pkt = dp; + + return true; } static void raw_seek(demuxer_t *demuxer, double seek_pts, int flags) @@ -312,7 +314,7 @@ const demuxer_desc_t demuxer_desc_rawaudio = { .name = "rawaudio", .desc = "Uncompressed audio", .open = demux_rawaudio_open, - .fill_buffer = raw_fill_buffer, + .read_packet = raw_read_packet, .seek = raw_seek, }; @@ -320,6 +322,6 @@ const demuxer_desc_t demuxer_desc_rawvideo = { .name = "rawvideo", .desc = "Uncompressed video", .open = demux_rawvideo_open, - .fill_buffer = raw_fill_buffer, + .read_packet = raw_read_packet, .seek = raw_seek, }; diff --git a/demux/demux_timeline.c b/demux/demux_timeline.c index 1eb73956c3..c34619a6d2 100644 --- a/demux/demux_timeline.c +++ b/demux/demux_timeline.c @@ -222,7 +222,7 @@ static void d_seek(struct demuxer *demuxer, double seek_pts, int flags) switch_segment(demuxer, new, pts, flags, false); } -static int d_fill_buffer(struct demuxer *demuxer) +static bool d_read_packet(struct demuxer *demuxer, struct demux_packet **out_pkt) { struct priv *p = demuxer->priv; @@ -231,7 +231,7 @@ static int d_fill_buffer(struct demuxer *demuxer) struct segment *seg = p->current; if (!seg || !seg->d) - return 0; + return false; struct demux_packet *pkt = demux_read_any_packet(seg->d); if (!pkt || pkt->pts >= seg->end) @@ -267,9 +267,9 @@ static int d_fill_buffer(struct demuxer *demuxer) } } if (!next) - return 0; + return false; switch_segment(demuxer, next, next->start, 0, true); - return 1; // reader will retry + return true; // reader will retry } if (pkt->stream < 0 || pkt->stream > seg->num_stream_map) @@ -308,12 +308,13 @@ static int d_fill_buffer(struct demuxer *demuxer) } } - demux_add_packet(vs->sh, pkt); - return 1; + pkt->stream = vs->sh->index; + *out_pkt = pkt; + return true; drop: talloc_free(pkt); - return 1; + return true; } static void print_timeline(struct demuxer *demuxer) @@ -447,7 +448,7 @@ static int d_control(struct demuxer *demuxer, int cmd, void *arg) const demuxer_desc_t demuxer_desc_timeline = { .name = "timeline", .desc = "timeline segments", - .fill_buffer = d_fill_buffer, + .read_packet = d_read_packet, .open = d_open, .close = d_close, .seek = d_seek, diff --git a/demux/packet.h b/demux/packet.h index 4d34b3d766..7c5b04720f 100644 --- a/demux/packet.h +++ b/demux/packet.h @@ -33,7 +33,7 @@ typedef struct demux_packet { bool keyframe; int64_t pos; // position in source file byte stream - int stream; // source stream index + int stream; // source stream index (typically sh_stream.index) // segmentation (ordered chapters, EDL) bool segmented; -- cgit v1.2.3