diff options
Diffstat (limited to 'player')
-rw-r--r-- | player/client.c | 101 | ||||
-rw-r--r-- | player/client.h | 1 | ||||
-rw-r--r-- | player/command.c | 170 | ||||
-rw-r--r-- | player/command.h | 24 | ||||
-rw-r--r-- | player/core.h | 4 | ||||
-rw-r--r-- | player/main.c | 2 | ||||
-rw-r--r-- | player/playloop.c | 13 |
7 files changed, 271 insertions, 44 deletions
diff --git a/player/client.c b/player/client.c index 02a8db2911..21bb069413 100644 --- a/player/client.c +++ b/player/client.c @@ -32,6 +32,7 @@ #include "misc/ctype.h" #include "misc/dispatch.h" #include "misc/rendezvous.h" +#include "misc/thread_tools.h" #include "options/m_config.h" #include "options/m_option.h" #include "options/m_property.h" @@ -508,6 +509,15 @@ void mp_shutdown_clients(struct MPContext *mpctx) pthread_mutex_unlock(&clients->lock); } +bool mp_is_shutting_down(struct MPContext *mpctx) +{ + struct mp_client_api *clients = mpctx->clients; + pthread_mutex_lock(&clients->lock); + bool res = clients->shutting_down; + pthread_mutex_unlock(&clients->lock); + return res; +} + static void *core_thread(void *p) { struct MPContext *mpctx = p; @@ -1014,29 +1024,30 @@ static int run_async(mpv_handle *ctx, void (*fn)(void *fn_data), void *fn_data) talloc_free(fn_data); return err; } - mp_dispatch_enqueue_autofree(ctx->mpctx->dispatch, fn, fn_data); + mp_dispatch_enqueue(ctx->mpctx->dispatch, fn, fn_data); return 0; } struct cmd_request { struct MPContext *mpctx; struct mp_cmd *cmd; - struct mpv_node *res; int status; - struct mpv_handle *reply_ctx; - uint64_t userdata; + struct mpv_node *res; + struct mp_waiter completion; }; -static void cmd_fn(void *data) +static void cmd_complete(struct mp_cmd_ctx *cmd) { - struct cmd_request *req = data; - int r = run_command(req->mpctx, req->cmd, req->res); - req->status = r >= 0 ? 0 : MPV_ERROR_COMMAND; - talloc_free(req->cmd); - if (req->reply_ctx) { - status_reply(req->reply_ctx, MPV_EVENT_COMMAND_REPLY, - req->userdata, req->status); + struct cmd_request *req = cmd->on_completion_priv; + + req->status = cmd->success ? 0 : MPV_ERROR_COMMAND; + if (req->res) { + *req->res = cmd->result; + cmd->result = (mpv_node){0}; } + + // Unblock the waiting thread (especially for async commands). + mp_waiter_wakeup(&req->completion, 0); } static int run_client_command(mpv_handle *ctx, struct mp_cmd *cmd, mpv_node *res) @@ -1055,8 +1066,22 @@ static int run_client_command(mpv_handle *ctx, struct mp_cmd *cmd, mpv_node *res .mpctx = ctx->mpctx, .cmd = cmd, .res = res, + .completion = MP_WAITER_INITIALIZER, }; - run_locked(ctx, cmd_fn, &req); + + bool async = cmd->flags & MP_ASYNC_CMD; + + lock_core(ctx); + if (async) { + run_command(ctx->mpctx, req.cmd, NULL, NULL); + } else { + run_command(ctx->mpctx, req.cmd, cmd_complete, &req); + } + unlock_core(ctx); + + if (!async) + mp_waiter_wait(&req.completion); + return req.status; } @@ -1080,7 +1105,41 @@ int mpv_command_string(mpv_handle *ctx, const char *args) mp_input_parse_cmd(ctx->mpctx->input, bstr0((char*)args), ctx->name), NULL); } -static int run_cmd_async(mpv_handle *ctx, uint64_t ud, struct mp_cmd *cmd) +struct async_cmd_request { + struct MPContext *mpctx; + struct mp_cmd *cmd; + int status; + struct mpv_handle *reply_ctx; + uint64_t userdata; +}; + +static void async_cmd_complete(struct mp_cmd_ctx *cmd) +{ + struct async_cmd_request *req = cmd->on_completion_priv; + + req->status = cmd->success ? 0 : MPV_ERROR_COMMAND; + + // Async command invocation - send a reply message. + status_reply(req->reply_ctx, MPV_EVENT_COMMAND_REPLY, + req->userdata, req->status); + + talloc_free(req); +} + +static void async_cmd_fn(void *data) +{ + struct async_cmd_request *req = data; + + struct mp_cmd *cmd = req->cmd; + ta_xset_parent(cmd, NULL); + req->cmd = NULL; + + // This will synchronously or asynchronously call cmd_complete (depending + // on the command). + run_command(req->mpctx, cmd, async_cmd_complete, req); +} + +static int run_async_cmd(mpv_handle *ctx, uint64_t ud, struct mp_cmd *cmd) { if (!ctx->mpctx->initialized) return MPV_ERROR_UNINITIALIZED; @@ -1089,24 +1148,24 @@ static int run_cmd_async(mpv_handle *ctx, uint64_t ud, struct mp_cmd *cmd) cmd->sender = ctx->name; - struct cmd_request *req = talloc_ptrtype(NULL, req); - *req = (struct cmd_request){ + struct async_cmd_request *req = talloc_ptrtype(NULL, req); + *req = (struct async_cmd_request){ .mpctx = ctx->mpctx, - .cmd = cmd, + .cmd = talloc_steal(req, cmd), .reply_ctx = ctx, .userdata = ud, }; - return run_async(ctx, cmd_fn, req); + return run_async(ctx, async_cmd_fn, req); } int mpv_command_async(mpv_handle *ctx, uint64_t ud, const char **args) { - return run_cmd_async(ctx, ud, mp_input_parse_cmd_strv(ctx->log, args)); + return run_async_cmd(ctx, ud, mp_input_parse_cmd_strv(ctx->log, args)); } int mpv_command_node_async(mpv_handle *ctx, uint64_t ud, mpv_node *args) { - return run_cmd_async(ctx, ud, mp_input_parse_cmd_node(ctx->log, args)); + return run_async_cmd(ctx, ud, mp_input_parse_cmd_node(ctx->log, args)); } static int translate_property_error(int errc) @@ -1155,6 +1214,7 @@ static void setproperty_fn(void *arg) if (req->reply_ctx) { status_reply(req->reply_ctx, MPV_EVENT_SET_PROPERTY_REPLY, req->userdata, req->status); + talloc_free(req); } } @@ -1310,6 +1370,7 @@ static void getproperty_fn(void *arg) .error = req->status, }; send_reply(req->reply_ctx, req->userdata, &reply); + talloc_free(req); } } diff --git a/player/client.h b/player/client.h index 2512315d42..7426e94372 100644 --- a/player/client.h +++ b/player/client.h @@ -20,6 +20,7 @@ struct mpv_global; void mp_clients_init(struct MPContext *mpctx); void mp_clients_destroy(struct MPContext *mpctx); void mp_shutdown_clients(struct MPContext *mpctx); +bool mp_is_shutting_down(struct MPContext *mpctx); bool mp_clients_all_initialized(struct MPContext *mpctx); bool mp_client_exists(struct MPContext *mpctx, const char *client_name); diff --git a/player/command.c b/player/command.c index 3fe39642a6..4600e65236 100644 --- a/player/command.c +++ b/player/command.c @@ -60,7 +60,9 @@ #include "video/out/bitmap_packer.h" #include "options/path.h" #include "screenshot.h" +#include "misc/dispatch.h" #include "misc/node.h" +#include "misc/thread_pool.h" #include "osdep/io.h" #include "osdep/subprocess.h" @@ -4825,27 +4827,147 @@ static void cmd_cycle_values(void *p) change_property_cmd(cmd, name, M_PROPERTY_SET_STRING, cmd->args[current].v.s); } +struct cmd_list_ctx { + struct MPContext *mpctx; + + // actual list command + struct mp_cmd_ctx *parent; + + bool current_valid; + pthread_t current; + bool completed_recursive; + + // list of sub commands yet to run + struct mp_cmd **sub; + int num_sub; +}; + +static void continue_cmd_list(struct cmd_list_ctx *list); + +static void on_cmd_list_sub_completion(struct mp_cmd_ctx *cmd) +{ + struct cmd_list_ctx *list = cmd->on_completion_priv; + + if (list->current_valid && pthread_equal(list->current, pthread_self())) { + list->completed_recursive = true; + } else { + continue_cmd_list(list); + } +} + +static void continue_cmd_list(struct cmd_list_ctx *list) +{ + while (list->parent->args[0].v.p) { + struct mp_cmd *sub = list->parent->args[0].v.p; + list->parent->args[0].v.p = sub->queue_next; + + ta_xset_parent(sub, NULL); + + if (sub->flags & MP_ASYNC_CMD) { + // We run it "detached" (fire & forget) + run_command(list->mpctx, sub, NULL, NULL); + } else { + // Run the next command once this one completes. + + list->completed_recursive = false; + list->current_valid = true; + list->current = pthread_self(); + + run_command(list->mpctx, sub, on_cmd_list_sub_completion, list); + + list->current_valid = false; + + // run_command() either recursively calls the completion function, + // or lets the command continue run in the background. If it was + // completed recursively, we can just continue our loop. Otherwise + // the completion handler will invoke this loop again elsewhere. + // We could unconditionally call continue_cmd_list() in the handler + // instead, but then stack depth would grow with list length. + if (!list->completed_recursive) + return; + } + } + + mp_cmd_ctx_complete(list->parent); + talloc_free(list); +} + static void cmd_list(void *p) { struct mp_cmd_ctx *cmd = p; - for (struct mp_cmd *sub = cmd->cmd->args[0].v.p; sub; sub = sub->queue_next) - run_command(cmd->mpctx, sub, NULL); + cmd->completed = false; + + struct cmd_list_ctx *list = talloc_zero(NULL, struct cmd_list_ctx); + list->mpctx = cmd->mpctx; + list->parent = p; + + continue_cmd_list(list); +} + +const struct mp_cmd_def mp_cmd_list = { "list", cmd_list, .exec_async = true }; + +// Signal that the command is complete now. This also deallocates cmd. +// You must call this function in a state where the core is locked for the +// current thread (e.g. from the main thread, or from within mp_dispatch_lock()). +// Completion means the command is finished, even if it errored or never ran. +// Keep in mind that calling this can execute further user command that can +// change arbitrary state (due to cmd_list). +void mp_cmd_ctx_complete(struct mp_cmd_ctx *cmd) +{ + cmd->completed = true; + if (!cmd->success) + mpv_free_node_contents(&cmd->result); + if (cmd->on_completion) + cmd->on_completion(cmd); + mpv_free_node_contents(&cmd->result); + talloc_free(cmd->cmd); + talloc_free(cmd); } -const struct mp_cmd_def mp_cmd_list = { "list", cmd_list }; +static void run_command_on_worker_thread(void *p) +{ + struct mp_cmd_ctx *ctx = p; + struct MPContext *mpctx = ctx->mpctx; + + mp_core_lock(mpctx); + + bool exec_async = ctx->cmd->def->exec_async; + ctx->cmd->def->handler(ctx); + if (!exec_async) + mp_cmd_ctx_complete(ctx); -int run_command(struct MPContext *mpctx, struct mp_cmd *cmd, struct mpv_node *res) + mpctx->outstanding_async -= 1; + if (!mpctx->outstanding_async && mp_is_shutting_down(mpctx)) + mp_wakeup_core(mpctx); + + mp_core_unlock(mpctx); +} + +// Run the given command. Upon command completion, on_completion is called. This +// can happen within the function, or for async commands, some time after the +// function returns (the caller is supposed to be able to handle both cases). In +// both cases, the callback will be called while the core is locked (i.e. you +// can access the core freely). +// on_completion_priv is copied to mp_cmd_ctx.on_completion_priv and can be +// accessed from the completion callback. +// The completion callback is invoked exactly once. If it's NULL, it's ignored. +// Ownership of cmd goes to the caller. +void run_command(struct MPContext *mpctx, struct mp_cmd *cmd, + void (*on_completion)(struct mp_cmd_ctx *cmd), + void *on_completion_priv) { - struct mpv_node dummy_node = {0}; - struct mp_cmd_ctx *ctx = &(struct mp_cmd_ctx){ + struct mp_cmd_ctx *ctx = talloc(NULL, struct mp_cmd_ctx); + *ctx = (struct mp_cmd_ctx){ .mpctx = mpctx, .cmd = cmd, .args = cmd->args, .num_args = cmd->nargs, .priv = cmd->def->priv, .success = true, - .result = res ? res : &dummy_node, + .completed = true, + .on_completion = on_completion, + .on_completion_priv = on_completion_priv, }; struct MPOpts *opts = mpctx->opts; @@ -4863,22 +4985,32 @@ int run_command(struct MPContext *mpctx, struct mp_cmd *cmd, struct mpv_node *re for (int n = 0; n < cmd->nargs; n++) { if (cmd->args[n].type->type == CONF_TYPE_STRING) { char *s = mp_property_expand_string(mpctx, cmd->args[n].v.s); - if (!s) - return -1; + if (!s) { + ctx->success = false; + mp_cmd_ctx_complete(ctx); + return; + } talloc_free(cmd->args[n].v.s); cmd->args[n].v.s = s; } } } - cmd->def->handler(ctx); - - if (!ctx->success) - mpv_free_node_contents(ctx->result); - - mpv_free_node_contents(&dummy_node); - - return ctx->success ? 0 : -1; + if (cmd->def->spawn_thread) { + mpctx->outstanding_async += 1; // prevent that core disappears + if (!mp_thread_pool_queue(mpctx->thread_pool, + run_command_on_worker_thread, ctx)) + { + mpctx->outstanding_async -= 1; + ctx->success = false; + mp_cmd_ctx_complete(ctx); + } + } else { + bool exec_async = cmd->def->exec_async; + cmd->def->handler(ctx); + if (!exec_async) + mp_cmd_ctx_complete(ctx); + } } static void cmd_seek(void *p) @@ -5199,7 +5331,7 @@ static void cmd_expand_text(void *p) struct mp_cmd_ctx *cmd = p; struct MPContext *mpctx = cmd->mpctx; - *cmd->result = (mpv_node){ + cmd->result = (mpv_node){ .format = MPV_FORMAT_STRING, .u.string = mp_property_expand_string(mpctx, cmd->args[0].v.s) }; @@ -5513,7 +5645,7 @@ static void cmd_screenshot_raw(void *p) { struct mp_cmd_ctx *cmd = p; struct MPContext *mpctx = cmd->mpctx; - struct mpv_node *res = cmd->result; + struct mpv_node *res = &cmd->result; struct mp_image *img = screenshot_get_rgb(mpctx, cmd->args[0].v.i); if (!img) { diff --git a/player/command.h b/player/command.h index b6ccffe80f..9e385dbc16 100644 --- a/player/command.h +++ b/player/command.h @@ -20,6 +20,8 @@ #include <stdbool.h> +#include "libmpv/client.h" + struct MPContext; struct mp_cmd; struct mp_log; @@ -43,12 +45,28 @@ struct mp_cmd_ctx { bool bar_osd; // OSD bar requested bool seek_msg_osd; // same as above, but for seek commands bool seek_bar_osd; - // Return values + // Return values (to be set by command implementation, read by the + // completion callback). bool success; // true by default - struct mpv_node *result; + struct mpv_node result; + // Command handlers can set this to false if returning from the command + // handler does not complete the command. It stops the common command code + // from signaling the completion automatically, and you can call + // mp_cmd_ctx_complete() to invoke on_completion() properly (including all + // the bookkeeping). + /// (Note that in no case you can call mp_cmd_ctx_complete() from within + // the command handler, because it frees the mp_cmd_ctx.) + bool completed; // true by default + // This is managed by the common command code. For rules about how and where + // this is called see run_command() comments. + void (*on_completion)(struct mp_cmd_ctx *cmd); + void *on_completion_priv; // for free use by on_completion callback }; -int run_command(struct MPContext *mpctx, struct mp_cmd *cmd, struct mpv_node *res); +void run_command(struct MPContext *mpctx, struct mp_cmd *cmd, + void (*on_completion)(struct mp_cmd_ctx *cmd), + void *on_completion_priv); +void mp_cmd_ctx_complete(struct mp_cmd_ctx *cmd); char *mp_property_expand_string(struct MPContext *mpctx, const char *str); char *mp_property_expand_escaped_string(struct MPContext *mpctx, const char *str); void property_print_help(struct MPContext *mpctx); diff --git a/player/core.h b/player/core.h index 71c39dcaa5..0840a7836d 100644 --- a/player/core.h +++ b/player/core.h @@ -243,6 +243,8 @@ typedef struct MPContext { // mp_dispatch_lock must be called to change it. int64_t outstanding_async; + struct mp_thread_pool *thread_pool; // for coarse I/O, often during loading + struct mp_log *statusline; struct osd_state *osd; char *term_osd_text; @@ -551,6 +553,8 @@ void mp_wait_events(struct MPContext *mpctx); void mp_set_timeout(struct MPContext *mpctx, double sleeptime); void mp_wakeup_core(struct MPContext *mpctx); void mp_wakeup_core_cb(void *ctx); +void mp_core_lock(struct MPContext *mpctx); +void mp_core_unlock(struct MPContext *mpctx); void mp_process_input(struct MPContext *mpctx); double get_relative_time(struct MPContext *mpctx); void reset_playback_state(struct MPContext *mpctx); diff --git a/player/main.c b/player/main.c index f56191a297..bc14bbce1c 100644 --- a/player/main.c +++ b/player/main.c @@ -28,6 +28,7 @@ #include "mpv_talloc.h" #include "misc/dispatch.h" +#include "misc/thread_pool.h" #include "osdep/io.h" #include "osdep/terminal.h" #include "osdep/timer.h" @@ -279,6 +280,7 @@ struct MPContext *mp_create(void) .playlist = talloc_struct(mpctx, struct playlist, {0}), .dispatch = mp_dispatch_create(mpctx), .playback_abort = mp_cancel_new(mpctx), + .thread_pool = mp_thread_pool_create(mpctx, 0, 1, 30), }; pthread_mutex_init(&mpctx->lock, NULL); diff --git a/player/playloop.c b/player/playloop.c index f5c1fde0ef..26f9f12d82 100644 --- a/player/playloop.c +++ b/player/playloop.c @@ -92,6 +92,16 @@ void mp_wakeup_core_cb(void *ctx) mp_wakeup_core(mpctx); } +void mp_core_lock(struct MPContext *mpctx) +{ + mp_dispatch_lock(mpctx->dispatch); +} + +void mp_core_unlock(struct MPContext *mpctx) +{ + mp_dispatch_unlock(mpctx->dispatch); +} + // Process any queued input, whether it's user input, or requests from client // API threads. This also resets the "wakeup" flag used with mp_wait_events(). void mp_process_input(struct MPContext *mpctx) @@ -100,8 +110,7 @@ void mp_process_input(struct MPContext *mpctx) mp_cmd_t *cmd = mp_input_read_cmd(mpctx->input); if (!cmd) break; - run_command(mpctx, cmd, NULL); - mp_cmd_free(cmd); + run_command(mpctx, cmd, NULL, NULL); } mp_set_timeout(mpctx, mp_input_get_delay(mpctx->input)); } |