From e4fb23ed7de874bb2d05824d7edb84cfd1b21101 Mon Sep 17 00:00:00 2001 From: wm4 Date: Sat, 12 May 2018 18:46:37 +0200 Subject: command: add a way to abort asynchronous commands Many asynchronous commands are potentially long running operations, such as loading something from network or running a foreign process. Obviously it shouldn't just be possible for them to freeze the player if they don't terminate as expected. Also, there will be situations where you want to explicitly stop some of those operations explicitly. So add an infrastructure for this. Commands have to support this explicitly. The next commit uses this to actually add support to a command. --- DOCS/client-api-changes.rst | 1 + input/cmd.h | 4 ++++ libmpv/client.h | 32 ++++++++++++++++++++++++++++ libmpv/mpv.def | 1 + player/client.c | 36 +++++++++++++++++++++++++++++--- player/command.c | 22 +++++++++++++++---- player/command.h | 3 +++ player/core.h | 22 +++++++++++++++++++ player/loadfile.c | 51 +++++++++++++++++++++++++++++++++++++++++++++ player/main.c | 2 ++ player/playloop.c | 2 +- player/screenshot.c | 2 +- 12 files changed, 169 insertions(+), 9 deletions(-) diff --git a/DOCS/client-api-changes.rst b/DOCS/client-api-changes.rst index 75484c390a..73e3048498 100644 --- a/DOCS/client-api-changes.rst +++ b/DOCS/client-api-changes.rst @@ -36,6 +36,7 @@ API changes 1.102 - redo handling of async commands - add mpv_event_command and make it possible to return values from commands issued with mpv_command_async() or mpv_command_node_async() + - add mpv_abort_async_command() 1.101 - add MPV_RENDER_PARAM_ADVANCED_CONTROL and related API - add MPV_RENDER_PARAM_NEXT_FRAME_INFO and related symbols - add MPV_RENDER_PARAM_BLOCK_FOR_TARGET_TIME diff --git a/input/cmd.h b/input/cmd.h index 0e65565252..f6408988ab 100644 --- a/input/cmd.h +++ b/input/cmd.h @@ -54,6 +54,10 @@ struct mp_cmd_def { // unlocked, you have no synchronized access to mpctx, but you can do long // running operations without blocking playback or input handling). bool spawn_thread; + // If this is set, mp_cmd_ctx.abort is set. Set this if handler() can do + // asynchronous abort of the command, and explicitly uses mp_cmd_ctx.abort. + // (Not setting it when it's not needed can save resources.) + bool can_abort; }; enum mp_cmd_flags { diff --git a/libmpv/client.h b/libmpv/client.h index f7f7fada58..32c2d0a6e1 100644 --- a/libmpv/client.h +++ b/libmpv/client.h @@ -1013,6 +1013,38 @@ int mpv_command_async(mpv_handle *ctx, uint64_t reply_userdata, int mpv_command_node_async(mpv_handle *ctx, uint64_t reply_userdata, mpv_node *args); +/** + * Signal to all async requests with the matching ID to abort. This affects + * the following API calls: + * + * mpv_command_async + * mpv_command_node_async + * + * All of these functions take a reply_userdata parameter. This API function + * tells all requests with the matching reply_userdata value to try to return + * as soon as possible. If there are multiple requests with matching ID, it + * aborts all of them. + * + * This API function is mostly asynchronous itself. It will not wait until the + * command is aborted. Instead, the command will terminate as usual, but with + * some work not done. How this is signaled depends on the specific command (for + * example, the "subprocess" command will indicate it by "killed_by_us" set to + * true in the result). How long it takes also depends on the situation. The + * aborting process is completely asynchronous. + * + * Not all commands may support this functionality. In this case, this function + * will have no effect. The same is true if the request using the passed + * reply_userdata has already terminated, has not been started yet, or was + * never in use at all. + * + * You have to be careful of race conditions: the time during which the abort + * request will be effective is _after_ e.g. mpv_command_async() has returned, + * and before the command has signaled completion with MPV_EVENT_COMMAND_REPLY. + * + * @param reply_userdata ID of the request to be aborted (see above) + */ +void mpv_abort_async_command(mpv_handle *ctx, uint64_t reply_userdata); + /** * Set a property to a given value. Properties are essentially variables which * can be queried or set at runtime. For example, writing to the pause property diff --git a/libmpv/mpv.def b/libmpv/mpv.def index b74378c4ae..5b1c2423ff 100644 --- a/libmpv/mpv.def +++ b/libmpv/mpv.def @@ -1,3 +1,4 @@ +mpv_abort_async_command mpv_client_api_version mpv_client_name mpv_command diff --git a/player/client.c b/player/client.c index b6282a1134..31339f47a3 100644 --- a/player/client.c +++ b/player/client.c @@ -1063,9 +1063,14 @@ static int run_client_command(mpv_handle *ctx, struct mp_cmd *cmd, mpv_node *res lock_core(ctx); if (async) { - run_command(ctx->mpctx, req.cmd, NULL, NULL); + run_command(ctx->mpctx, cmd, NULL, NULL, NULL); } else { - run_command(ctx->mpctx, req.cmd, cmd_complete, &req); + struct mp_abort_entry *abort = NULL; + if (cmd->def->can_abort) { + abort = talloc_zero(NULL, struct mp_abort_entry); + abort->client = ctx; + } + run_command(ctx->mpctx, cmd, abort, cmd_complete, &req); } unlock_core(ctx); @@ -1129,9 +1134,17 @@ static void async_cmd_fn(void *data) ta_xset_parent(cmd, NULL); req->cmd = NULL; + struct mp_abort_entry *abort = NULL; + if (cmd->def->can_abort) { + abort = talloc_zero(NULL, struct mp_abort_entry); + abort->client = req->reply_ctx; + abort->client_work_type = MPV_EVENT_COMMAND_REPLY; + abort->client_work_id = req->userdata; + } + // This will synchronously or asynchronously call cmd_complete (depending // on the command). - run_command(req->mpctx, cmd, async_cmd_complete, req); + run_command(req->mpctx, cmd, abort, async_cmd_complete, req); } static int run_async_cmd(mpv_handle *ctx, uint64_t ud, struct mp_cmd *cmd) @@ -1163,6 +1176,23 @@ int mpv_command_node_async(mpv_handle *ctx, uint64_t ud, mpv_node *args) return run_async_cmd(ctx, ud, mp_input_parse_cmd_node(ctx->log, args)); } +void mpv_abort_async_command(mpv_handle *ctx, uint64_t reply_userdata) +{ + struct MPContext *mpctx = ctx->mpctx; + + pthread_mutex_lock(&mpctx->abort_lock); + for (int n = 0; n < mpctx->num_abort_list; n++) { + struct mp_abort_entry *abort = mpctx->abort_list[n]; + if (abort->client == ctx && + abort->client_work_type == MPV_EVENT_COMMAND_REPLY && + abort->client_work_id == reply_userdata) + { + mp_abort_trigger_locked(mpctx, abort); + } + } + pthread_mutex_unlock(&mpctx->abort_lock); +} + static int translate_property_error(int errc) { switch (errc) { diff --git a/player/command.c b/player/command.c index f2d7eedd5f..cd623c03f4 100644 --- a/player/command.c +++ b/player/command.c @@ -4865,7 +4865,7 @@ static void continue_cmd_list(struct cmd_list_ctx *list) if (sub->flags & MP_ASYNC_CMD) { // We run it "detached" (fire & forget) - run_command(list->mpctx, sub, NULL, NULL); + run_command(list->mpctx, sub, NULL, NULL, NULL); } else { // Run the next command once this one completes. @@ -4873,7 +4873,7 @@ static void continue_cmd_list(struct cmd_list_ctx *list) list->current_valid = true; list->current = pthread_self(); - run_command(list->mpctx, sub, on_cmd_list_sub_completion, list); + run_command(list->mpctx, sub, NULL, on_cmd_list_sub_completion, list); list->current_valid = false; @@ -4920,8 +4920,9 @@ void mp_cmd_ctx_complete(struct mp_cmd_ctx *cmd) mpv_free_node_contents(&cmd->result); if (cmd->on_completion) cmd->on_completion(cmd); + if (cmd->abort) + mp_abort_remove(cmd->mpctx, cmd->abort); mpv_free_node_contents(&cmd->result); - talloc_free(cmd->cmd); talloc_free(cmd); } @@ -4949,27 +4950,40 @@ static void run_command_on_worker_thread(void *p) // function returns (the caller is supposed to be able to handle both cases). In // both cases, the callback will be called while the core is locked (i.e. you // can access the core freely). +// If abort is non-NULL, then the caller creates the abort object. It must have +// been allocated with talloc. run_command() will register/unregister/destroy +// it. Must not be set if cmd->def->can_abort==false. // on_completion_priv is copied to mp_cmd_ctx.on_completion_priv and can be // accessed from the completion callback. // The completion callback is invoked exactly once. If it's NULL, it's ignored. // Ownership of cmd goes to the caller. void run_command(struct MPContext *mpctx, struct mp_cmd *cmd, + struct mp_abort_entry *abort, void (*on_completion)(struct mp_cmd_ctx *cmd), void *on_completion_priv) { struct mp_cmd_ctx *ctx = talloc(NULL, struct mp_cmd_ctx); *ctx = (struct mp_cmd_ctx){ .mpctx = mpctx, - .cmd = cmd, + .cmd = talloc_steal(ctx, cmd), .args = cmd->args, .num_args = cmd->nargs, .priv = cmd->def->priv, + .abort = talloc_steal(ctx, abort), .success = true, .completed = true, .on_completion = on_completion, .on_completion_priv = on_completion_priv, }; + if (!ctx->abort && cmd->def->can_abort) + ctx->abort = talloc_zero(ctx, struct mp_abort_entry); + + assert(cmd->def->can_abort == !!ctx->abort); + + if (ctx->abort) + mp_abort_add(mpctx, ctx->abort); + struct MPOpts *opts = mpctx->opts; ctx->on_osd = cmd->flags & MP_ON_OSD_FLAGS; bool auto_osd = ctx->on_osd == MP_ON_OSD_AUTO; diff --git a/player/command.h b/player/command.h index 9e385dbc16..ae3af82886 100644 --- a/player/command.h +++ b/player/command.h @@ -45,6 +45,8 @@ struct mp_cmd_ctx { bool bar_osd; // OSD bar requested bool seek_msg_osd; // same as above, but for seek commands bool seek_bar_osd; + // If mp_cmd_def.can_abort is set, this will be set. + struct mp_abort_entry *abort; // Return values (to be set by command implementation, read by the // completion callback). bool success; // true by default @@ -64,6 +66,7 @@ struct mp_cmd_ctx { }; void run_command(struct MPContext *mpctx, struct mp_cmd *cmd, + struct mp_abort_entry *abort, void (*on_completion)(struct mp_cmd_ctx *cmd), void *on_completion_priv); void mp_cmd_ctx_complete(struct mp_cmd_ctx *cmd); diff --git a/player/core.h b/player/core.h index 0434c5cb64..a42b1252b8 100644 --- a/player/core.h +++ b/player/core.h @@ -442,6 +442,8 @@ typedef struct MPContext { // --- The following fields are protected by abort_lock struct mp_cancel *demuxer_cancel; // cancel handle for MPContext.demuxer + struct mp_abort_entry **abort_list; + int num_abort_list; // --- Owned by MPContext pthread_t open_thread; @@ -459,6 +461,20 @@ typedef struct MPContext { int open_res_error; } MPContext; +// Contains information about an asynchronous work item, how it can be aborted, +// and when. All fields are protected by MPContext.abort_lock. +struct mp_abort_entry { + // General conditions. + bool coupled_to_playback; // trigger when playback is terminated + // Actual trigger to abort the work. + struct mp_cancel *cancel; + // For client API. + struct mpv_handle *client; // non-NULL if done by a client API user + int client_work_type; // client API type, e.h. MPV_EVENT_COMMAND_REPLY + uint64_t client_work_id; // client API user reply_userdata value + // (only valid if client_work_type set) +}; + // audio.c void reset_audio_state(struct MPContext *mpctx); void reinit_audio_chain(struct MPContext *mpctx); @@ -488,6 +504,12 @@ struct playlist_entry *mp_check_playlist_resume(struct MPContext *mpctx, // loadfile.c void mp_abort_playback_async(struct MPContext *mpctx); +void mp_abort_add(struct MPContext *mpctx, struct mp_abort_entry *abort); +void mp_abort_remove(struct MPContext *mpctx, struct mp_abort_entry *abort); +void mp_abort_recheck_locked(struct MPContext *mpctx, + struct mp_abort_entry *abort); +void mp_abort_trigger_locked(struct MPContext *mpctx, + struct mp_abort_entry *abort); void uninit_player(struct MPContext *mpctx, unsigned int mask); int mp_add_external_file(struct MPContext *mpctx, char *filename, enum stream_type filter); diff --git a/player/loadfile.c b/player/loadfile.c index e1864f3fd5..6f28d2ee38 100644 --- a/player/loadfile.c +++ b/player/loadfile.c @@ -67,11 +67,62 @@ void mp_abort_playback_async(struct MPContext *mpctx) mp_cancel_trigger(mpctx->playback_abort); pthread_mutex_lock(&mpctx->abort_lock); + if (mpctx->demuxer_cancel) mp_cancel_trigger(mpctx->demuxer_cancel); + + for (int n = 0; n < mpctx->num_abort_list; n++) { + struct mp_abort_entry *abort = mpctx->abort_list[n]; + if (abort->coupled_to_playback) + mp_abort_trigger_locked(mpctx, abort); + } + + pthread_mutex_unlock(&mpctx->abort_lock); +} + +// Add it to the global list, and allocate required data structures. +void mp_abort_add(struct MPContext *mpctx, struct mp_abort_entry *abort) +{ + pthread_mutex_lock(&mpctx->abort_lock); + assert(!abort->cancel); + abort->cancel = mp_cancel_new(NULL); + MP_TARRAY_APPEND(NULL, mpctx->abort_list, mpctx->num_abort_list, abort); + mp_abort_recheck_locked(mpctx, abort); pthread_mutex_unlock(&mpctx->abort_lock); } +// Remove Add it to the global list, and free/clear required data structures. +// Does not deallocate the abort value itself. +void mp_abort_remove(struct MPContext *mpctx, struct mp_abort_entry *abort) +{ + pthread_mutex_lock(&mpctx->abort_lock); + for (int n = 0; n < mpctx->num_abort_list; n++) { + if (mpctx->abort_list[n] == abort) { + MP_TARRAY_REMOVE_AT(mpctx->abort_list, mpctx->num_abort_list, n); + TA_FREEP(&abort->cancel); + abort = NULL; // it's not free'd, just clear for the assert below + break; + } + } + assert(!abort); // should have been in the list + pthread_mutex_unlock(&mpctx->abort_lock); +} + +// Verify whether the abort needs to be signaled after changing certain fields +// in abort. +void mp_abort_recheck_locked(struct MPContext *mpctx, + struct mp_abort_entry *abort) +{ + if (abort->coupled_to_playback && mp_cancel_test(mpctx->playback_abort)) + mp_abort_trigger_locked(mpctx, abort); +} + +void mp_abort_trigger_locked(struct MPContext *mpctx, + struct mp_abort_entry *abort) +{ + mp_cancel_trigger(abort->cancel); +} + static void uninit_demuxer(struct MPContext *mpctx) { for (int r = 0; r < NUM_PTRACKS; r++) { diff --git a/player/main.c b/player/main.c index d744c9cf12..c51f93a430 100644 --- a/player/main.c +++ b/player/main.c @@ -189,6 +189,8 @@ void mp_destroy(struct MPContext *mpctx) uninit_libav(mpctx->global); mp_msg_uninit(mpctx->global); + assert(!mpctx->num_abort_list); + talloc_free(mpctx->abort_list); pthread_mutex_destroy(&mpctx->abort_lock); talloc_free(mpctx); } diff --git a/player/playloop.c b/player/playloop.c index 26f9f12d82..a784bfd554 100644 --- a/player/playloop.c +++ b/player/playloop.c @@ -110,7 +110,7 @@ void mp_process_input(struct MPContext *mpctx) mp_cmd_t *cmd = mp_input_read_cmd(mpctx->input); if (!cmd) break; - run_command(mpctx, cmd, NULL, NULL); + run_command(mpctx, cmd, NULL, NULL, NULL); } mp_set_timeout(mpctx, mp_input_get_delay(mpctx->input)); } diff --git a/player/screenshot.c b/player/screenshot.c index e654ce2081..e24ca051f1 100644 --- a/player/screenshot.c +++ b/player/screenshot.c @@ -519,7 +519,7 @@ void screenshot_flip(struct MPContext *mpctx) struct mp_waiter wait = MP_WAITER_INITIALIZER; void *a[] = {mpctx, &wait}; - run_command(mpctx, mp_cmd_clone(ctx->each_frame), screenshot_fin, a); + run_command(mpctx, mp_cmd_clone(ctx->each_frame), NULL, screenshot_fin, a); // Block (in a reentrant way) until he screenshot was written. Otherwise, // we could pile up screenshot requests forever. -- cgit v1.2.3