diff options
Diffstat (limited to 'player/client.c')
-rw-r--r-- | player/client.c | 262 |
1 files changed, 163 insertions, 99 deletions
diff --git a/player/client.c b/player/client.c index 2c21dd727c..03e0cbfeb6 100644 --- a/player/client.c +++ b/player/client.c @@ -31,7 +31,9 @@ #include "input/cmd.h" #include "misc/ctype.h" #include "misc/dispatch.h" +#include "misc/node.h" #include "misc/rendezvous.h" +#include "misc/thread_tools.h" #include "options/m_config.h" #include "options/m_option.h" #include "options/m_property.h" @@ -370,6 +372,30 @@ void mpv_wait_async_requests(mpv_handle *ctx) pthread_mutex_unlock(&ctx->lock); } +// Send abort signal to all matching work items. +// If type==0, destroy all of the matching ctx. +// If ctx==0, destroy all. +static void abort_async(struct MPContext *mpctx, mpv_handle *ctx, + int type, uint64_t id) +{ + pthread_mutex_lock(&mpctx->abort_lock); + + // Destroy all => ensure any newly appearing work is aborted immediately. + if (ctx == NULL) + mpctx->abort_all = true; + + for (int n = 0; n < mpctx->num_abort_list; n++) { + struct mp_abort_entry *abort = mpctx->abort_list[n]; + if (!ctx || (abort->client == ctx && (!type || + (abort->client_work_type == type && abort->client_work_id == id)))) + { + mp_abort_trigger_locked(mpctx, abort); + } + } + + pthread_mutex_unlock(&mpctx->abort_lock); +} + static void get_thread(void *ptr) { *(pthread_t *)ptr = pthread_self(); @@ -388,6 +414,8 @@ static void mp_destroy_client(mpv_handle *ctx, bool terminate) if (terminate) mpv_command(ctx, (const char*[]){"quit", NULL}); + abort_async(mpctx, ctx, 0, 0); + // reserved_events equals the number of asynchronous requests that weren't // yet replied. In order to avoid that trying to reply to a removed client // causes a crash, block until all asynchronous requests were served. @@ -483,32 +511,52 @@ void mpv_terminate_destroy(mpv_handle *ctx) mp_destroy_client(ctx, true); } -static bool can_terminate(struct MPContext *mpctx) -{ - struct mp_client_api *clients = mpctx->clients; - - pthread_mutex_lock(&clients->lock); - bool ok = clients->num_clients == 0 && mpctx->outstanding_async == 0 && - (mpctx->is_cli || clients->terminate_core_thread); - pthread_mutex_unlock(&clients->lock); - - return ok; -} - // Can be called on the core thread only. Idempotent. +// Also happens to take care of shutting down any async work. void mp_shutdown_clients(struct MPContext *mpctx) { struct mp_client_api *clients = mpctx->clients; - // Prevent that new clients can appear. + // Forcefully abort async work after 2 seconds of waiting. + double abort_time = mp_time_sec() + 2; + pthread_mutex_lock(&clients->lock); + + // Prevent that new clients can appear. clients->shutting_down = true; - pthread_mutex_unlock(&clients->lock); - while (!can_terminate(mpctx)) { + // Wait until we can terminate. + while (clients->num_clients || mpctx->outstanding_async || + !(mpctx->is_cli || clients->terminate_core_thread)) + { + pthread_mutex_unlock(&clients->lock); + + double left = abort_time - mp_time_sec(); + if (left >= 0) { + mp_set_timeout(mpctx, left); + } else { + // Forcefully abort any ongoing async work. This is quite rude and + // probably not what everyone wants, so it happens only after a + // timeout. + abort_async(mpctx, NULL, 0, 0); + } + mp_client_broadcast_event(mpctx, MPV_EVENT_SHUTDOWN, NULL); mp_wait_events(mpctx); + + pthread_mutex_lock(&clients->lock); } + + pthread_mutex_unlock(&clients->lock); +} + +bool mp_is_shutting_down(struct MPContext *mpctx) +{ + struct mp_client_api *clients = mpctx->clients; + pthread_mutex_lock(&clients->lock); + bool res = clients->shutting_down; + pthread_mutex_unlock(&clients->lock); + return res; } static void *core_thread(void *p) @@ -677,16 +725,6 @@ static void send_reply(struct mpv_handle *ctx, uint64_t userdata, pthread_mutex_unlock(&ctx->lock); } -static void status_reply(struct mpv_handle *ctx, int event, - uint64_t userdata, int status) -{ - struct mpv_event reply = { - .event_id = event, - .error = status, - }; - send_reply(ctx, userdata, &reply); -} - // Return whether there's any client listening to this event. // If false is returned, the core doesn't need to send it. bool mp_client_event_is_registered(struct MPContext *mpctx, int event) @@ -905,54 +943,6 @@ static bool conv_node_to_format(void *dst, mpv_format dst_fmt, mpv_node *src) return false; } -// Note: for MPV_FORMAT_NODE_MAP, this (incorrectly) takes the order into -// account, instead of treating it as set. -static bool compare_value(void *a, void *b, mpv_format format) -{ - switch (format) { - case MPV_FORMAT_NONE: - return true; - case MPV_FORMAT_STRING: - case MPV_FORMAT_OSD_STRING: - return strcmp(*(char **)a, *(char **)b) == 0; - case MPV_FORMAT_FLAG: - return *(int *)a == *(int *)b; - case MPV_FORMAT_INT64: - return *(int64_t *)a == *(int64_t *)b; - case MPV_FORMAT_DOUBLE: - return *(double *)a == *(double *)b; - case MPV_FORMAT_NODE: { - struct mpv_node *a_n = a, *b_n = b; - if (a_n->format != b_n->format) - return false; - return compare_value(&a_n->u, &b_n->u, a_n->format); - } - case MPV_FORMAT_BYTE_ARRAY: { - struct mpv_byte_array *a_r = a, *b_r = b; - if (a_r->size != b_r->size) - return false; - return memcmp(a_r->data, b_r->data, a_r->size) == 0; - } - case MPV_FORMAT_NODE_ARRAY: - case MPV_FORMAT_NODE_MAP: - { - mpv_node_list *l_a = *(mpv_node_list **)a, *l_b = *(mpv_node_list **)b; - if (l_a->num != l_b->num) - return false; - for (int n = 0; n < l_a->num; n++) { - if (!compare_value(&l_a->values[n], &l_b->values[n], MPV_FORMAT_NODE)) - return false; - if (format == MPV_FORMAT_NODE_MAP) { - if (strcmp(l_a->keys[n], l_b->keys[n]) != 0) - return false; - } - } - return true; - } - } - abort(); -} - void mpv_free_node_contents(mpv_node *node) { static const struct m_option type = { .type = CONF_TYPE_NODE }; @@ -1017,29 +1007,30 @@ static int run_async(mpv_handle *ctx, void (*fn)(void *fn_data), void *fn_data) talloc_free(fn_data); return err; } - mp_dispatch_enqueue_autofree(ctx->mpctx->dispatch, fn, fn_data); + mp_dispatch_enqueue(ctx->mpctx->dispatch, fn, fn_data); return 0; } struct cmd_request { struct MPContext *mpctx; struct mp_cmd *cmd; - struct mpv_node *res; int status; - struct mpv_handle *reply_ctx; - uint64_t userdata; + struct mpv_node *res; + struct mp_waiter completion; }; -static void cmd_fn(void *data) +static void cmd_complete(struct mp_cmd_ctx *cmd) { - struct cmd_request *req = data; - int r = run_command(req->mpctx, req->cmd, req->res); - req->status = r >= 0 ? 0 : MPV_ERROR_COMMAND; - talloc_free(req->cmd); - if (req->reply_ctx) { - status_reply(req->reply_ctx, MPV_EVENT_COMMAND_REPLY, - req->userdata, req->status); + struct cmd_request *req = cmd->on_completion_priv; + + req->status = cmd->success ? 0 : MPV_ERROR_COMMAND; + if (req->res) { + *req->res = cmd->result; + cmd->result = (mpv_node){0}; } + + // Unblock the waiting thread (especially for async commands). + mp_waiter_wakeup(&req->completion, 0); } static int run_client_command(mpv_handle *ctx, struct mp_cmd *cmd, mpv_node *res) @@ -1049,17 +1040,33 @@ static int run_client_command(mpv_handle *ctx, struct mp_cmd *cmd, mpv_node *res if (!cmd) return MPV_ERROR_INVALID_PARAMETER; - if (mp_input_is_abort_cmd(cmd)) - mp_abort_playback_async(ctx->mpctx); - cmd->sender = ctx->name; struct cmd_request req = { .mpctx = ctx->mpctx, .cmd = cmd, .res = res, + .completion = MP_WAITER_INITIALIZER, }; - run_locked(ctx, cmd_fn, &req); + + bool async = cmd->flags & MP_ASYNC_CMD; + + lock_core(ctx); + if (async) { + run_command(ctx->mpctx, cmd, NULL, NULL, NULL); + } else { + struct mp_abort_entry *abort = NULL; + if (cmd->def->can_abort) { + abort = talloc_zero(NULL, struct mp_abort_entry); + abort->client = ctx; + } + run_command(ctx->mpctx, cmd, abort, cmd_complete, &req); + } + unlock_core(ctx); + + if (!async) + mp_waiter_wait(&req.completion); + return req.status; } @@ -1083,7 +1090,54 @@ int mpv_command_string(mpv_handle *ctx, const char *args) mp_input_parse_cmd(ctx->mpctx->input, bstr0((char*)args), ctx->name), NULL); } -static int run_cmd_async(mpv_handle *ctx, uint64_t ud, struct mp_cmd *cmd) +struct async_cmd_request { + struct MPContext *mpctx; + struct mp_cmd *cmd; + struct mpv_handle *reply_ctx; + uint64_t userdata; +}; + +static void async_cmd_complete(struct mp_cmd_ctx *cmd) +{ + struct async_cmd_request *req = cmd->on_completion_priv; + + struct mpv_event_command *data = talloc_zero(NULL, struct mpv_event_command); + data->result = cmd->result; + cmd->result = (mpv_node){0}; + talloc_steal(data, node_get_alloc(&data->result)); + + struct mpv_event reply = { + .event_id = MPV_EVENT_COMMAND_REPLY, + .data = data, + .error = cmd->success ? 0 : MPV_ERROR_COMMAND, + }; + send_reply(req->reply_ctx, req->userdata, &reply); + + talloc_free(req); +} + +static void async_cmd_fn(void *data) +{ + struct async_cmd_request *req = data; + + struct mp_cmd *cmd = req->cmd; + ta_xset_parent(cmd, NULL); + req->cmd = NULL; + + struct mp_abort_entry *abort = NULL; + if (cmd->def->can_abort) { + abort = talloc_zero(NULL, struct mp_abort_entry); + abort->client = req->reply_ctx; + abort->client_work_type = MPV_EVENT_COMMAND_REPLY; + abort->client_work_id = req->userdata; + } + + // This will synchronously or asynchronously call cmd_complete (depending + // on the command). + run_command(req->mpctx, cmd, abort, async_cmd_complete, req); +} + +static int run_async_cmd(mpv_handle *ctx, uint64_t ud, struct mp_cmd *cmd) { if (!ctx->mpctx->initialized) return MPV_ERROR_UNINITIALIZED; @@ -1092,24 +1146,29 @@ static int run_cmd_async(mpv_handle *ctx, uint64_t ud, struct mp_cmd *cmd) cmd->sender = ctx->name; - struct cmd_request *req = talloc_ptrtype(NULL, req); - *req = (struct cmd_request){ + struct async_cmd_request *req = talloc_ptrtype(NULL, req); + *req = (struct async_cmd_request){ .mpctx = ctx->mpctx, - .cmd = cmd, + .cmd = talloc_steal(req, cmd), .reply_ctx = ctx, .userdata = ud, }; - return run_async(ctx, cmd_fn, req); + return run_async(ctx, async_cmd_fn, req); } int mpv_command_async(mpv_handle *ctx, uint64_t ud, const char **args) { - return run_cmd_async(ctx, ud, mp_input_parse_cmd_strv(ctx->log, args)); + return run_async_cmd(ctx, ud, mp_input_parse_cmd_strv(ctx->log, args)); } int mpv_command_node_async(mpv_handle *ctx, uint64_t ud, mpv_node *args) { - return run_cmd_async(ctx, ud, mp_input_parse_cmd_node(ctx->log, args)); + return run_async_cmd(ctx, ud, mp_input_parse_cmd_node(ctx->log, args)); +} + +void mpv_abort_async_command(mpv_handle *ctx, uint64_t reply_userdata) +{ + abort_async(ctx->mpctx, ctx, MPV_EVENT_COMMAND_REPLY, reply_userdata); } static int translate_property_error(int errc) @@ -1156,8 +1215,12 @@ static void setproperty_fn(void *arg) req->status = translate_property_error(err); if (req->reply_ctx) { - status_reply(req->reply_ctx, MPV_EVENT_SET_PROPERTY_REPLY, - req->userdata, req->status); + struct mpv_event reply = { + .event_id = MPV_EVENT_SET_PROPERTY_REPLY, + .error = req->status, + }; + send_reply(req->reply_ctx, req->userdata, &reply); + talloc_free(req); } } @@ -1313,6 +1376,7 @@ static void getproperty_fn(void *arg) .error = req->status, }; send_reply(req->reply_ctx, req->userdata, &reply); + talloc_free(req); } } @@ -1508,7 +1572,7 @@ static void update_prop(void *p) if (prop->user_value_valid != prop->new_value_valid) { prop->changed = true; } else if (prop->user_value_valid && prop->new_value_valid) { - if (!compare_value(&prop->user_value, &prop->new_value, prop->format)) + if (!equal_mpv_value(&prop->user_value, &prop->new_value, prop->format)) prop->changed = true; } if (prop->dead) |