summaryrefslogtreecommitdiffstats
path: root/player/client.c
diff options
context:
space:
mode:
Diffstat (limited to 'player/client.c')
-rw-r--r--player/client.c262
1 files changed, 163 insertions, 99 deletions
diff --git a/player/client.c b/player/client.c
index 2c21dd727c..03e0cbfeb6 100644
--- a/player/client.c
+++ b/player/client.c
@@ -31,7 +31,9 @@
#include "input/cmd.h"
#include "misc/ctype.h"
#include "misc/dispatch.h"
+#include "misc/node.h"
#include "misc/rendezvous.h"
+#include "misc/thread_tools.h"
#include "options/m_config.h"
#include "options/m_option.h"
#include "options/m_property.h"
@@ -370,6 +372,30 @@ void mpv_wait_async_requests(mpv_handle *ctx)
pthread_mutex_unlock(&ctx->lock);
}
+// Send abort signal to all matching work items.
+// If type==0, destroy all of the matching ctx.
+// If ctx==0, destroy all.
+static void abort_async(struct MPContext *mpctx, mpv_handle *ctx,
+ int type, uint64_t id)
+{
+ pthread_mutex_lock(&mpctx->abort_lock);
+
+ // Destroy all => ensure any newly appearing work is aborted immediately.
+ if (ctx == NULL)
+ mpctx->abort_all = true;
+
+ for (int n = 0; n < mpctx->num_abort_list; n++) {
+ struct mp_abort_entry *abort = mpctx->abort_list[n];
+ if (!ctx || (abort->client == ctx && (!type ||
+ (abort->client_work_type == type && abort->client_work_id == id))))
+ {
+ mp_abort_trigger_locked(mpctx, abort);
+ }
+ }
+
+ pthread_mutex_unlock(&mpctx->abort_lock);
+}
+
static void get_thread(void *ptr)
{
*(pthread_t *)ptr = pthread_self();
@@ -388,6 +414,8 @@ static void mp_destroy_client(mpv_handle *ctx, bool terminate)
if (terminate)
mpv_command(ctx, (const char*[]){"quit", NULL});
+ abort_async(mpctx, ctx, 0, 0);
+
// reserved_events equals the number of asynchronous requests that weren't
// yet replied. In order to avoid that trying to reply to a removed client
// causes a crash, block until all asynchronous requests were served.
@@ -483,32 +511,52 @@ void mpv_terminate_destroy(mpv_handle *ctx)
mp_destroy_client(ctx, true);
}
-static bool can_terminate(struct MPContext *mpctx)
-{
- struct mp_client_api *clients = mpctx->clients;
-
- pthread_mutex_lock(&clients->lock);
- bool ok = clients->num_clients == 0 && mpctx->outstanding_async == 0 &&
- (mpctx->is_cli || clients->terminate_core_thread);
- pthread_mutex_unlock(&clients->lock);
-
- return ok;
-}
-
// Can be called on the core thread only. Idempotent.
+// Also happens to take care of shutting down any async work.
void mp_shutdown_clients(struct MPContext *mpctx)
{
struct mp_client_api *clients = mpctx->clients;
- // Prevent that new clients can appear.
+ // Forcefully abort async work after 2 seconds of waiting.
+ double abort_time = mp_time_sec() + 2;
+
pthread_mutex_lock(&clients->lock);
+
+ // Prevent that new clients can appear.
clients->shutting_down = true;
- pthread_mutex_unlock(&clients->lock);
- while (!can_terminate(mpctx)) {
+ // Wait until we can terminate.
+ while (clients->num_clients || mpctx->outstanding_async ||
+ !(mpctx->is_cli || clients->terminate_core_thread))
+ {
+ pthread_mutex_unlock(&clients->lock);
+
+ double left = abort_time - mp_time_sec();
+ if (left >= 0) {
+ mp_set_timeout(mpctx, left);
+ } else {
+ // Forcefully abort any ongoing async work. This is quite rude and
+ // probably not what everyone wants, so it happens only after a
+ // timeout.
+ abort_async(mpctx, NULL, 0, 0);
+ }
+
mp_client_broadcast_event(mpctx, MPV_EVENT_SHUTDOWN, NULL);
mp_wait_events(mpctx);
+
+ pthread_mutex_lock(&clients->lock);
}
+
+ pthread_mutex_unlock(&clients->lock);
+}
+
+bool mp_is_shutting_down(struct MPContext *mpctx)
+{
+ struct mp_client_api *clients = mpctx->clients;
+ pthread_mutex_lock(&clients->lock);
+ bool res = clients->shutting_down;
+ pthread_mutex_unlock(&clients->lock);
+ return res;
}
static void *core_thread(void *p)
@@ -677,16 +725,6 @@ static void send_reply(struct mpv_handle *ctx, uint64_t userdata,
pthread_mutex_unlock(&ctx->lock);
}
-static void status_reply(struct mpv_handle *ctx, int event,
- uint64_t userdata, int status)
-{
- struct mpv_event reply = {
- .event_id = event,
- .error = status,
- };
- send_reply(ctx, userdata, &reply);
-}
-
// Return whether there's any client listening to this event.
// If false is returned, the core doesn't need to send it.
bool mp_client_event_is_registered(struct MPContext *mpctx, int event)
@@ -905,54 +943,6 @@ static bool conv_node_to_format(void *dst, mpv_format dst_fmt, mpv_node *src)
return false;
}
-// Note: for MPV_FORMAT_NODE_MAP, this (incorrectly) takes the order into
-// account, instead of treating it as set.
-static bool compare_value(void *a, void *b, mpv_format format)
-{
- switch (format) {
- case MPV_FORMAT_NONE:
- return true;
- case MPV_FORMAT_STRING:
- case MPV_FORMAT_OSD_STRING:
- return strcmp(*(char **)a, *(char **)b) == 0;
- case MPV_FORMAT_FLAG:
- return *(int *)a == *(int *)b;
- case MPV_FORMAT_INT64:
- return *(int64_t *)a == *(int64_t *)b;
- case MPV_FORMAT_DOUBLE:
- return *(double *)a == *(double *)b;
- case MPV_FORMAT_NODE: {
- struct mpv_node *a_n = a, *b_n = b;
- if (a_n->format != b_n->format)
- return false;
- return compare_value(&a_n->u, &b_n->u, a_n->format);
- }
- case MPV_FORMAT_BYTE_ARRAY: {
- struct mpv_byte_array *a_r = a, *b_r = b;
- if (a_r->size != b_r->size)
- return false;
- return memcmp(a_r->data, b_r->data, a_r->size) == 0;
- }
- case MPV_FORMAT_NODE_ARRAY:
- case MPV_FORMAT_NODE_MAP:
- {
- mpv_node_list *l_a = *(mpv_node_list **)a, *l_b = *(mpv_node_list **)b;
- if (l_a->num != l_b->num)
- return false;
- for (int n = 0; n < l_a->num; n++) {
- if (!compare_value(&l_a->values[n], &l_b->values[n], MPV_FORMAT_NODE))
- return false;
- if (format == MPV_FORMAT_NODE_MAP) {
- if (strcmp(l_a->keys[n], l_b->keys[n]) != 0)
- return false;
- }
- }
- return true;
- }
- }
- abort();
-}
-
void mpv_free_node_contents(mpv_node *node)
{
static const struct m_option type = { .type = CONF_TYPE_NODE };
@@ -1017,29 +1007,30 @@ static int run_async(mpv_handle *ctx, void (*fn)(void *fn_data), void *fn_data)
talloc_free(fn_data);
return err;
}
- mp_dispatch_enqueue_autofree(ctx->mpctx->dispatch, fn, fn_data);
+ mp_dispatch_enqueue(ctx->mpctx->dispatch, fn, fn_data);
return 0;
}
struct cmd_request {
struct MPContext *mpctx;
struct mp_cmd *cmd;
- struct mpv_node *res;
int status;
- struct mpv_handle *reply_ctx;
- uint64_t userdata;
+ struct mpv_node *res;
+ struct mp_waiter completion;
};
-static void cmd_fn(void *data)
+static void cmd_complete(struct mp_cmd_ctx *cmd)
{
- struct cmd_request *req = data;
- int r = run_command(req->mpctx, req->cmd, req->res);
- req->status = r >= 0 ? 0 : MPV_ERROR_COMMAND;
- talloc_free(req->cmd);
- if (req->reply_ctx) {
- status_reply(req->reply_ctx, MPV_EVENT_COMMAND_REPLY,
- req->userdata, req->status);
+ struct cmd_request *req = cmd->on_completion_priv;
+
+ req->status = cmd->success ? 0 : MPV_ERROR_COMMAND;
+ if (req->res) {
+ *req->res = cmd->result;
+ cmd->result = (mpv_node){0};
}
+
+ // Unblock the waiting thread (especially for async commands).
+ mp_waiter_wakeup(&req->completion, 0);
}
static int run_client_command(mpv_handle *ctx, struct mp_cmd *cmd, mpv_node *res)
@@ -1049,17 +1040,33 @@ static int run_client_command(mpv_handle *ctx, struct mp_cmd *cmd, mpv_node *res
if (!cmd)
return MPV_ERROR_INVALID_PARAMETER;
- if (mp_input_is_abort_cmd(cmd))
- mp_abort_playback_async(ctx->mpctx);
-
cmd->sender = ctx->name;
struct cmd_request req = {
.mpctx = ctx->mpctx,
.cmd = cmd,
.res = res,
+ .completion = MP_WAITER_INITIALIZER,
};
- run_locked(ctx, cmd_fn, &req);
+
+ bool async = cmd->flags & MP_ASYNC_CMD;
+
+ lock_core(ctx);
+ if (async) {
+ run_command(ctx->mpctx, cmd, NULL, NULL, NULL);
+ } else {
+ struct mp_abort_entry *abort = NULL;
+ if (cmd->def->can_abort) {
+ abort = talloc_zero(NULL, struct mp_abort_entry);
+ abort->client = ctx;
+ }
+ run_command(ctx->mpctx, cmd, abort, cmd_complete, &req);
+ }
+ unlock_core(ctx);
+
+ if (!async)
+ mp_waiter_wait(&req.completion);
+
return req.status;
}
@@ -1083,7 +1090,54 @@ int mpv_command_string(mpv_handle *ctx, const char *args)
mp_input_parse_cmd(ctx->mpctx->input, bstr0((char*)args), ctx->name), NULL);
}
-static int run_cmd_async(mpv_handle *ctx, uint64_t ud, struct mp_cmd *cmd)
+struct async_cmd_request {
+ struct MPContext *mpctx;
+ struct mp_cmd *cmd;
+ struct mpv_handle *reply_ctx;
+ uint64_t userdata;
+};
+
+static void async_cmd_complete(struct mp_cmd_ctx *cmd)
+{
+ struct async_cmd_request *req = cmd->on_completion_priv;
+
+ struct mpv_event_command *data = talloc_zero(NULL, struct mpv_event_command);
+ data->result = cmd->result;
+ cmd->result = (mpv_node){0};
+ talloc_steal(data, node_get_alloc(&data->result));
+
+ struct mpv_event reply = {
+ .event_id = MPV_EVENT_COMMAND_REPLY,
+ .data = data,
+ .error = cmd->success ? 0 : MPV_ERROR_COMMAND,
+ };
+ send_reply(req->reply_ctx, req->userdata, &reply);
+
+ talloc_free(req);
+}
+
+static void async_cmd_fn(void *data)
+{
+ struct async_cmd_request *req = data;
+
+ struct mp_cmd *cmd = req->cmd;
+ ta_xset_parent(cmd, NULL);
+ req->cmd = NULL;
+
+ struct mp_abort_entry *abort = NULL;
+ if (cmd->def->can_abort) {
+ abort = talloc_zero(NULL, struct mp_abort_entry);
+ abort->client = req->reply_ctx;
+ abort->client_work_type = MPV_EVENT_COMMAND_REPLY;
+ abort->client_work_id = req->userdata;
+ }
+
+ // This will synchronously or asynchronously call cmd_complete (depending
+ // on the command).
+ run_command(req->mpctx, cmd, abort, async_cmd_complete, req);
+}
+
+static int run_async_cmd(mpv_handle *ctx, uint64_t ud, struct mp_cmd *cmd)
{
if (!ctx->mpctx->initialized)
return MPV_ERROR_UNINITIALIZED;
@@ -1092,24 +1146,29 @@ static int run_cmd_async(mpv_handle *ctx, uint64_t ud, struct mp_cmd *cmd)
cmd->sender = ctx->name;
- struct cmd_request *req = talloc_ptrtype(NULL, req);
- *req = (struct cmd_request){
+ struct async_cmd_request *req = talloc_ptrtype(NULL, req);
+ *req = (struct async_cmd_request){
.mpctx = ctx->mpctx,
- .cmd = cmd,
+ .cmd = talloc_steal(req, cmd),
.reply_ctx = ctx,
.userdata = ud,
};
- return run_async(ctx, cmd_fn, req);
+ return run_async(ctx, async_cmd_fn, req);
}
int mpv_command_async(mpv_handle *ctx, uint64_t ud, const char **args)
{
- return run_cmd_async(ctx, ud, mp_input_parse_cmd_strv(ctx->log, args));
+ return run_async_cmd(ctx, ud, mp_input_parse_cmd_strv(ctx->log, args));
}
int mpv_command_node_async(mpv_handle *ctx, uint64_t ud, mpv_node *args)
{
- return run_cmd_async(ctx, ud, mp_input_parse_cmd_node(ctx->log, args));
+ return run_async_cmd(ctx, ud, mp_input_parse_cmd_node(ctx->log, args));
+}
+
+void mpv_abort_async_command(mpv_handle *ctx, uint64_t reply_userdata)
+{
+ abort_async(ctx->mpctx, ctx, MPV_EVENT_COMMAND_REPLY, reply_userdata);
}
static int translate_property_error(int errc)
@@ -1156,8 +1215,12 @@ static void setproperty_fn(void *arg)
req->status = translate_property_error(err);
if (req->reply_ctx) {
- status_reply(req->reply_ctx, MPV_EVENT_SET_PROPERTY_REPLY,
- req->userdata, req->status);
+ struct mpv_event reply = {
+ .event_id = MPV_EVENT_SET_PROPERTY_REPLY,
+ .error = req->status,
+ };
+ send_reply(req->reply_ctx, req->userdata, &reply);
+ talloc_free(req);
}
}
@@ -1313,6 +1376,7 @@ static void getproperty_fn(void *arg)
.error = req->status,
};
send_reply(req->reply_ctx, req->userdata, &reply);
+ talloc_free(req);
}
}
@@ -1508,7 +1572,7 @@ static void update_prop(void *p)
if (prop->user_value_valid != prop->new_value_valid) {
prop->changed = true;
} else if (prop->user_value_valid && prop->new_value_valid) {
- if (!compare_value(&prop->user_value, &prop->new_value, prop->format))
+ if (!equal_mpv_value(&prop->user_value, &prop->new_value, prop->format))
prop->changed = true;
}
if (prop->dead)