From c24520b7f3651dbd4cbdffe93a187c3d213ff845 Mon Sep 17 00:00:00 2001 From: wm4 Date: Sat, 19 May 2018 18:19:07 +0200 Subject: demux: add a way to destroy the demuxer asynchronously This will enable the player core to terminate the demuxers in a "nicer" way without having to block on network. If it just used demux_free(), it would either have to block on network, or like currently, essentially kill all I/O forcefully. The API is slightly awkward, because demuxer lifetime is bound to its allocation. On the other hand, changing that would also be awkward, and introduce weird in-between states that would have to be handled in tons of places. Currently unused, to be user later. --- demux/demux.c | 96 ++++++++++++++++++++++++++++++++++++++++++++++++++++++----- demux/demux.h | 5 ++++ 2 files changed, 93 insertions(+), 8 deletions(-) diff --git a/demux/demux.c b/demux/demux.c index 33e3351b9e..c31dfcf809 100644 --- a/demux/demux.c +++ b/demux/demux.c @@ -142,6 +142,7 @@ struct demux_internal { bool thread_terminate; bool threading; + bool shutdown_async; void (*wakeup_cb)(void *ctx); void *wakeup_cb_ctx; @@ -945,29 +946,97 @@ int demux_get_num_stream(struct demuxer *demuxer) return r; } -void demux_free(struct demuxer *demuxer) +static void demux_shutdown(struct demux_internal *in) { - if (!demuxer) - return; - struct demux_internal *in = demuxer->in; - assert(demuxer == in->d_user); - - demux_stop_thread(demuxer); + struct demuxer *demuxer = in->d_user; if (demuxer->desc->close) demuxer->desc->close(in->d_thread); + demuxer->priv = NULL; + in->d_thread->priv = NULL; demux_flush(demuxer); assert(in->total_bytes == 0); if (in->owns_stream) free_stream(demuxer->stream); + demuxer->stream = NULL; +} +static void demux_dealloc(struct demux_internal *in) +{ for (int n = 0; n < in->num_streams; n++) talloc_free(in->streams[n]); pthread_mutex_destroy(&in->lock); pthread_cond_destroy(&in->wakeup); - talloc_free(demuxer); + talloc_free(in->d_user); +} + +void demux_free(struct demuxer *demuxer) +{ + if (!demuxer) + return; + struct demux_internal *in = demuxer->in; + assert(demuxer == in->d_user); + + demux_stop_thread(demuxer); + demux_shutdown(in); + demux_dealloc(in); +} + +// Start closing the demuxer and eventually freeing the demuxer asynchronously. +// You must not access the demuxer once this has been started. Once the demuxer +// is shutdown, the wakeup callback is invoked. Then you need to call +// demux_free_async_finish() to end the operation (it must not be called from +// the wakeup callback). +// This can return NULL. Then the demuxer cannot be free'd asynchronously, and +// you need to call demux_free() instead. +struct demux_free_async_state *demux_free_async(struct demuxer *demuxer) +{ + struct demux_internal *in = demuxer->in; + assert(demuxer == in->d_user); + + if (!in->threading) + return NULL; + + pthread_mutex_lock(&in->lock); + in->thread_terminate = true; + in->shutdown_async = true; + pthread_cond_signal(&in->wakeup); + pthread_mutex_unlock(&in->lock); + + return (struct demux_free_async_state *)demuxer->in; // lies +} + +// As long as state is valid, you can call this to request immediate abort. +// Roughly behaves as demux_cancel_and_free(), except you still need to wait +// for the result. +void demux_free_async_force(struct demux_free_async_state *state) +{ + struct demux_internal *in = (struct demux_internal *)state; // reverse lies + + mp_cancel_trigger(in->d_user->cancel); +} + +// Check whether the demuxer is shutdown yet. If not, return false, and you +// need to call this again in the future (preferably after you were notified by +// the wakeup callback). If yes, deallocate all state, and return true (in +// particular, the state ptr becomes invalid, and the wakeup callback will never +// be called again). +bool demux_free_async_finish(struct demux_free_async_state *state) +{ + struct demux_internal *in = (struct demux_internal *)state; // reverse lies + + pthread_mutex_lock(&in->lock); + bool busy = in->shutdown_async; + pthread_mutex_unlock(&in->lock); + + if (busy) + return false; + + demux_stop_thread(in->d_user); + demux_dealloc(in); + return true; } // Like demux_free(), but trigger an abort, which will force the demuxer to @@ -1688,12 +1757,23 @@ static void *demux_thread(void *pctx) struct demux_internal *in = pctx; mpthread_set_name("demux"); pthread_mutex_lock(&in->lock); + while (!in->thread_terminate) { if (thread_work(in)) continue; pthread_cond_signal(&in->wakeup); pthread_cond_wait(&in->wakeup, &in->lock); } + + if (in->shutdown_async) { + pthread_mutex_unlock(&in->lock); + demux_shutdown(in); + pthread_mutex_lock(&in->lock); + in->shutdown_async = false; + if (in->wakeup_cb) + in->wakeup_cb(in->wakeup_cb_ctx); + } + pthread_mutex_unlock(&in->lock); return NULL; } diff --git a/demux/demux.h b/demux/demux.h index f5b203590f..7d2924000a 100644 --- a/demux/demux.h +++ b/demux/demux.h @@ -253,6 +253,11 @@ typedef struct { void demux_free(struct demuxer *demuxer); void demux_cancel_and_free(struct demuxer *demuxer); +struct demux_free_async_state; +struct demux_free_async_state *demux_free_async(struct demuxer *demuxer); +void demux_free_async_force(struct demux_free_async_state *state); +bool demux_free_async_finish(struct demux_free_async_state *state); + void demux_add_packet(struct sh_stream *stream, demux_packet_t *dp); void demuxer_feed_caption(struct sh_stream *stream, demux_packet_t *dp); -- cgit v1.2.3