summaryrefslogtreecommitdiffstats
path: root/player/client.c
diff options
context:
space:
mode:
authorwm4 <wm4@nowhere>2018-05-06 18:27:18 +0200
committerwm4 <wm4@nowhere>2018-05-24 19:56:34 +0200
commitb440f6dfb3d29651d8dcb7abfeb8ed18e3f2b995 (patch)
treedfca603aba1521d0a867d152291616f7b7b87126 /player/client.c
parenta1ed1f8be09c927994b58399e77e99336ec7f436 (diff)
downloadmpv-b440f6dfb3d29651d8dcb7abfeb8ed18e3f2b995.tar.bz2
mpv-b440f6dfb3d29651d8dcb7abfeb8ed18e3f2b995.tar.xz
command: add infrastructure for async commands
This enables two types of command behavior: 1. Plain async behavior, like "loadfile" not completing until the file is fully loaded. 2. Running parts of the command on worker threads, e.g. for I/O, such as "sub-add" doing network accesses on a thread while the core continues. Both have no implementation yet, and most new code is actually inactive. The plan is to implement a number of useful cases in the following commits. The most tricky part is handling internal keybindings (input.conf) and the multi-command feature (concatenating commands with ";"). It requires a bunch of roundabout code to make it do the expected thing in combination with async commands. There is the question how commands should be handled that come in at a higher rate than what can be handled by the core. Currently, it will simply queue up input.conf commands as long as memory lasts. The client API is limited by the size of the reply queue per client. For commands which require a worker thread, the thread pool is limited to 30 threads, and then will queue up work in memory. The number is completely arbitrary.
Diffstat (limited to 'player/client.c')
-rw-r--r--player/client.c101
1 files changed, 81 insertions, 20 deletions
diff --git a/player/client.c b/player/client.c
index 02a8db2911..21bb069413 100644
--- a/player/client.c
+++ b/player/client.c
@@ -32,6 +32,7 @@
#include "misc/ctype.h"
#include "misc/dispatch.h"
#include "misc/rendezvous.h"
+#include "misc/thread_tools.h"
#include "options/m_config.h"
#include "options/m_option.h"
#include "options/m_property.h"
@@ -508,6 +509,15 @@ void mp_shutdown_clients(struct MPContext *mpctx)
pthread_mutex_unlock(&clients->lock);
}
+bool mp_is_shutting_down(struct MPContext *mpctx)
+{
+ struct mp_client_api *clients = mpctx->clients;
+ pthread_mutex_lock(&clients->lock);
+ bool res = clients->shutting_down;
+ pthread_mutex_unlock(&clients->lock);
+ return res;
+}
+
static void *core_thread(void *p)
{
struct MPContext *mpctx = p;
@@ -1014,29 +1024,30 @@ static int run_async(mpv_handle *ctx, void (*fn)(void *fn_data), void *fn_data)
talloc_free(fn_data);
return err;
}
- mp_dispatch_enqueue_autofree(ctx->mpctx->dispatch, fn, fn_data);
+ mp_dispatch_enqueue(ctx->mpctx->dispatch, fn, fn_data);
return 0;
}
struct cmd_request {
struct MPContext *mpctx;
struct mp_cmd *cmd;
- struct mpv_node *res;
int status;
- struct mpv_handle *reply_ctx;
- uint64_t userdata;
+ struct mpv_node *res;
+ struct mp_waiter completion;
};
-static void cmd_fn(void *data)
+static void cmd_complete(struct mp_cmd_ctx *cmd)
{
- struct cmd_request *req = data;
- int r = run_command(req->mpctx, req->cmd, req->res);
- req->status = r >= 0 ? 0 : MPV_ERROR_COMMAND;
- talloc_free(req->cmd);
- if (req->reply_ctx) {
- status_reply(req->reply_ctx, MPV_EVENT_COMMAND_REPLY,
- req->userdata, req->status);
+ struct cmd_request *req = cmd->on_completion_priv;
+
+ req->status = cmd->success ? 0 : MPV_ERROR_COMMAND;
+ if (req->res) {
+ *req->res = cmd->result;
+ cmd->result = (mpv_node){0};
}
+
+ // Unblock the waiting thread (especially for async commands).
+ mp_waiter_wakeup(&req->completion, 0);
}
static int run_client_command(mpv_handle *ctx, struct mp_cmd *cmd, mpv_node *res)
@@ -1055,8 +1066,22 @@ static int run_client_command(mpv_handle *ctx, struct mp_cmd *cmd, mpv_node *res
.mpctx = ctx->mpctx,
.cmd = cmd,
.res = res,
+ .completion = MP_WAITER_INITIALIZER,
};
- run_locked(ctx, cmd_fn, &req);
+
+ bool async = cmd->flags & MP_ASYNC_CMD;
+
+ lock_core(ctx);
+ if (async) {
+ run_command(ctx->mpctx, req.cmd, NULL, NULL);
+ } else {
+ run_command(ctx->mpctx, req.cmd, cmd_complete, &req);
+ }
+ unlock_core(ctx);
+
+ if (!async)
+ mp_waiter_wait(&req.completion);
+
return req.status;
}
@@ -1080,7 +1105,41 @@ int mpv_command_string(mpv_handle *ctx, const char *args)
mp_input_parse_cmd(ctx->mpctx->input, bstr0((char*)args), ctx->name), NULL);
}
-static int run_cmd_async(mpv_handle *ctx, uint64_t ud, struct mp_cmd *cmd)
+struct async_cmd_request {
+ struct MPContext *mpctx;
+ struct mp_cmd *cmd;
+ int status;
+ struct mpv_handle *reply_ctx;
+ uint64_t userdata;
+};
+
+static void async_cmd_complete(struct mp_cmd_ctx *cmd)
+{
+ struct async_cmd_request *req = cmd->on_completion_priv;
+
+ req->status = cmd->success ? 0 : MPV_ERROR_COMMAND;
+
+ // Async command invocation - send a reply message.
+ status_reply(req->reply_ctx, MPV_EVENT_COMMAND_REPLY,
+ req->userdata, req->status);
+
+ talloc_free(req);
+}
+
+static void async_cmd_fn(void *data)
+{
+ struct async_cmd_request *req = data;
+
+ struct mp_cmd *cmd = req->cmd;
+ ta_xset_parent(cmd, NULL);
+ req->cmd = NULL;
+
+ // This will synchronously or asynchronously call cmd_complete (depending
+ // on the command).
+ run_command(req->mpctx, cmd, async_cmd_complete, req);
+}
+
+static int run_async_cmd(mpv_handle *ctx, uint64_t ud, struct mp_cmd *cmd)
{
if (!ctx->mpctx->initialized)
return MPV_ERROR_UNINITIALIZED;
@@ -1089,24 +1148,24 @@ static int run_cmd_async(mpv_handle *ctx, uint64_t ud, struct mp_cmd *cmd)
cmd->sender = ctx->name;
- struct cmd_request *req = talloc_ptrtype(NULL, req);
- *req = (struct cmd_request){
+ struct async_cmd_request *req = talloc_ptrtype(NULL, req);
+ *req = (struct async_cmd_request){
.mpctx = ctx->mpctx,
- .cmd = cmd,
+ .cmd = talloc_steal(req, cmd),
.reply_ctx = ctx,
.userdata = ud,
};
- return run_async(ctx, cmd_fn, req);
+ return run_async(ctx, async_cmd_fn, req);
}
int mpv_command_async(mpv_handle *ctx, uint64_t ud, const char **args)
{
- return run_cmd_async(ctx, ud, mp_input_parse_cmd_strv(ctx->log, args));
+ return run_async_cmd(ctx, ud, mp_input_parse_cmd_strv(ctx->log, args));
}
int mpv_command_node_async(mpv_handle *ctx, uint64_t ud, mpv_node *args)
{
- return run_cmd_async(ctx, ud, mp_input_parse_cmd_node(ctx->log, args));
+ return run_async_cmd(ctx, ud, mp_input_parse_cmd_node(ctx->log, args));
}
static int translate_property_error(int errc)
@@ -1155,6 +1214,7 @@ static void setproperty_fn(void *arg)
if (req->reply_ctx) {
status_reply(req->reply_ctx, MPV_EVENT_SET_PROPERTY_REPLY,
req->userdata, req->status);
+ talloc_free(req);
}
}
@@ -1310,6 +1370,7 @@ static void getproperty_fn(void *arg)
.error = req->status,
};
send_reply(req->reply_ctx, req->userdata, &reply);
+ talloc_free(req);
}
}