summaryrefslogtreecommitdiffstats
path: root/player
diff options
context:
space:
mode:
authorwm4 <wm4@nowhere>2018-05-06 18:27:18 +0200
committerwm4 <wm4@nowhere>2018-05-24 19:56:34 +0200
commitb440f6dfb3d29651d8dcb7abfeb8ed18e3f2b995 (patch)
treedfca603aba1521d0a867d152291616f7b7b87126 /player
parenta1ed1f8be09c927994b58399e77e99336ec7f436 (diff)
downloadmpv-b440f6dfb3d29651d8dcb7abfeb8ed18e3f2b995.tar.bz2
mpv-b440f6dfb3d29651d8dcb7abfeb8ed18e3f2b995.tar.xz
command: add infrastructure for async commands
This enables two types of command behavior: 1. Plain async behavior, like "loadfile" not completing until the file is fully loaded. 2. Running parts of the command on worker threads, e.g. for I/O, such as "sub-add" doing network accesses on a thread while the core continues. Both have no implementation yet, and most new code is actually inactive. The plan is to implement a number of useful cases in the following commits. The most tricky part is handling internal keybindings (input.conf) and the multi-command feature (concatenating commands with ";"). It requires a bunch of roundabout code to make it do the expected thing in combination with async commands. There is the question how commands should be handled that come in at a higher rate than what can be handled by the core. Currently, it will simply queue up input.conf commands as long as memory lasts. The client API is limited by the size of the reply queue per client. For commands which require a worker thread, the thread pool is limited to 30 threads, and then will queue up work in memory. The number is completely arbitrary.
Diffstat (limited to 'player')
-rw-r--r--player/client.c101
-rw-r--r--player/client.h1
-rw-r--r--player/command.c170
-rw-r--r--player/command.h24
-rw-r--r--player/core.h4
-rw-r--r--player/main.c2
-rw-r--r--player/playloop.c13
7 files changed, 271 insertions, 44 deletions
diff --git a/player/client.c b/player/client.c
index 02a8db2911..21bb069413 100644
--- a/player/client.c
+++ b/player/client.c
@@ -32,6 +32,7 @@
#include "misc/ctype.h"
#include "misc/dispatch.h"
#include "misc/rendezvous.h"
+#include "misc/thread_tools.h"
#include "options/m_config.h"
#include "options/m_option.h"
#include "options/m_property.h"
@@ -508,6 +509,15 @@ void mp_shutdown_clients(struct MPContext *mpctx)
pthread_mutex_unlock(&clients->lock);
}
+bool mp_is_shutting_down(struct MPContext *mpctx)
+{
+ struct mp_client_api *clients = mpctx->clients;
+ pthread_mutex_lock(&clients->lock);
+ bool res = clients->shutting_down;
+ pthread_mutex_unlock(&clients->lock);
+ return res;
+}
+
static void *core_thread(void *p)
{
struct MPContext *mpctx = p;
@@ -1014,29 +1024,30 @@ static int run_async(mpv_handle *ctx, void (*fn)(void *fn_data), void *fn_data)
talloc_free(fn_data);
return err;
}
- mp_dispatch_enqueue_autofree(ctx->mpctx->dispatch, fn, fn_data);
+ mp_dispatch_enqueue(ctx->mpctx->dispatch, fn, fn_data);
return 0;
}
struct cmd_request {
struct MPContext *mpctx;
struct mp_cmd *cmd;
- struct mpv_node *res;
int status;
- struct mpv_handle *reply_ctx;
- uint64_t userdata;
+ struct mpv_node *res;
+ struct mp_waiter completion;
};
-static void cmd_fn(void *data)
+static void cmd_complete(struct mp_cmd_ctx *cmd)
{
- struct cmd_request *req = data;
- int r = run_command(req->mpctx, req->cmd, req->res);
- req->status = r >= 0 ? 0 : MPV_ERROR_COMMAND;
- talloc_free(req->cmd);
- if (req->reply_ctx) {
- status_reply(req->reply_ctx, MPV_EVENT_COMMAND_REPLY,
- req->userdata, req->status);
+ struct cmd_request *req = cmd->on_completion_priv;
+
+ req->status = cmd->success ? 0 : MPV_ERROR_COMMAND;
+ if (req->res) {
+ *req->res = cmd->result;
+ cmd->result = (mpv_node){0};
}
+
+ // Unblock the waiting thread (especially for async commands).
+ mp_waiter_wakeup(&req->completion, 0);
}
static int run_client_command(mpv_handle *ctx, struct mp_cmd *cmd, mpv_node *res)
@@ -1055,8 +1066,22 @@ static int run_client_command(mpv_handle *ctx, struct mp_cmd *cmd, mpv_node *res
.mpctx = ctx->mpctx,
.cmd = cmd,
.res = res,
+ .completion = MP_WAITER_INITIALIZER,
};
- run_locked(ctx, cmd_fn, &req);
+
+ bool async = cmd->flags & MP_ASYNC_CMD;
+
+ lock_core(ctx);
+ if (async) {
+ run_command(ctx->mpctx, req.cmd, NULL, NULL);
+ } else {
+ run_command(ctx->mpctx, req.cmd, cmd_complete, &req);
+ }
+ unlock_core(ctx);
+
+ if (!async)
+ mp_waiter_wait(&req.completion);
+
return req.status;
}
@@ -1080,7 +1105,41 @@ int mpv_command_string(mpv_handle *ctx, const char *args)
mp_input_parse_cmd(ctx->mpctx->input, bstr0((char*)args), ctx->name), NULL);
}
-static int run_cmd_async(mpv_handle *ctx, uint64_t ud, struct mp_cmd *cmd)
+struct async_cmd_request {
+ struct MPContext *mpctx;
+ struct mp_cmd *cmd;
+ int status;
+ struct mpv_handle *reply_ctx;
+ uint64_t userdata;
+};
+
+static void async_cmd_complete(struct mp_cmd_ctx *cmd)
+{
+ struct async_cmd_request *req = cmd->on_completion_priv;
+
+ req->status = cmd->success ? 0 : MPV_ERROR_COMMAND;
+
+ // Async command invocation - send a reply message.
+ status_reply(req->reply_ctx, MPV_EVENT_COMMAND_REPLY,
+ req->userdata, req->status);
+
+ talloc_free(req);
+}
+
+static void async_cmd_fn(void *data)
+{
+ struct async_cmd_request *req = data;
+
+ struct mp_cmd *cmd = req->cmd;
+ ta_xset_parent(cmd, NULL);
+ req->cmd = NULL;
+
+ // This will synchronously or asynchronously call cmd_complete (depending
+ // on the command).
+ run_command(req->mpctx, cmd, async_cmd_complete, req);
+}
+
+static int run_async_cmd(mpv_handle *ctx, uint64_t ud, struct mp_cmd *cmd)
{
if (!ctx->mpctx->initialized)
return MPV_ERROR_UNINITIALIZED;
@@ -1089,24 +1148,24 @@ static int run_cmd_async(mpv_handle *ctx, uint64_t ud, struct mp_cmd *cmd)
cmd->sender = ctx->name;
- struct cmd_request *req = talloc_ptrtype(NULL, req);
- *req = (struct cmd_request){
+ struct async_cmd_request *req = talloc_ptrtype(NULL, req);
+ *req = (struct async_cmd_request){
.mpctx = ctx->mpctx,
- .cmd = cmd,
+ .cmd = talloc_steal(req, cmd),
.reply_ctx = ctx,
.userdata = ud,
};
- return run_async(ctx, cmd_fn, req);
+ return run_async(ctx, async_cmd_fn, req);
}
int mpv_command_async(mpv_handle *ctx, uint64_t ud, const char **args)
{
- return run_cmd_async(ctx, ud, mp_input_parse_cmd_strv(ctx->log, args));
+ return run_async_cmd(ctx, ud, mp_input_parse_cmd_strv(ctx->log, args));
}
int mpv_command_node_async(mpv_handle *ctx, uint64_t ud, mpv_node *args)
{
- return run_cmd_async(ctx, ud, mp_input_parse_cmd_node(ctx->log, args));
+ return run_async_cmd(ctx, ud, mp_input_parse_cmd_node(ctx->log, args));
}
static int translate_property_error(int errc)
@@ -1155,6 +1214,7 @@ static void setproperty_fn(void *arg)
if (req->reply_ctx) {
status_reply(req->reply_ctx, MPV_EVENT_SET_PROPERTY_REPLY,
req->userdata, req->status);
+ talloc_free(req);
}
}
@@ -1310,6 +1370,7 @@ static void getproperty_fn(void *arg)
.error = req->status,
};
send_reply(req->reply_ctx, req->userdata, &reply);
+ talloc_free(req);
}
}
diff --git a/player/client.h b/player/client.h
index 2512315d42..7426e94372 100644
--- a/player/client.h
+++ b/player/client.h
@@ -20,6 +20,7 @@ struct mpv_global;
void mp_clients_init(struct MPContext *mpctx);
void mp_clients_destroy(struct MPContext *mpctx);
void mp_shutdown_clients(struct MPContext *mpctx);
+bool mp_is_shutting_down(struct MPContext *mpctx);
bool mp_clients_all_initialized(struct MPContext *mpctx);
bool mp_client_exists(struct MPContext *mpctx, const char *client_name);
diff --git a/player/command.c b/player/command.c
index 3fe39642a6..4600e65236 100644
--- a/player/command.c
+++ b/player/command.c
@@ -60,7 +60,9 @@
#include "video/out/bitmap_packer.h"
#include "options/path.h"
#include "screenshot.h"
+#include "misc/dispatch.h"
#include "misc/node.h"
+#include "misc/thread_pool.h"
#include "osdep/io.h"
#include "osdep/subprocess.h"
@@ -4825,27 +4827,147 @@ static void cmd_cycle_values(void *p)
change_property_cmd(cmd, name, M_PROPERTY_SET_STRING, cmd->args[current].v.s);
}
+struct cmd_list_ctx {
+ struct MPContext *mpctx;
+
+ // actual list command
+ struct mp_cmd_ctx *parent;
+
+ bool current_valid;
+ pthread_t current;
+ bool completed_recursive;
+
+ // list of sub commands yet to run
+ struct mp_cmd **sub;
+ int num_sub;
+};
+
+static void continue_cmd_list(struct cmd_list_ctx *list);
+
+static void on_cmd_list_sub_completion(struct mp_cmd_ctx *cmd)
+{
+ struct cmd_list_ctx *list = cmd->on_completion_priv;
+
+ if (list->current_valid && pthread_equal(list->current, pthread_self())) {
+ list->completed_recursive = true;
+ } else {
+ continue_cmd_list(list);
+ }
+}
+
+static void continue_cmd_list(struct cmd_list_ctx *list)
+{
+ while (list->parent->args[0].v.p) {
+ struct mp_cmd *sub = list->parent->args[0].v.p;
+ list->parent->args[0].v.p = sub->queue_next;
+
+ ta_xset_parent(sub, NULL);
+
+ if (sub->flags & MP_ASYNC_CMD) {
+ // We run it "detached" (fire & forget)
+ run_command(list->mpctx, sub, NULL, NULL);
+ } else {
+ // Run the next command once this one completes.
+
+ list->completed_recursive = false;
+ list->current_valid = true;
+ list->current = pthread_self();
+
+ run_command(list->mpctx, sub, on_cmd_list_sub_completion, list);
+
+ list->current_valid = false;
+
+ // run_command() either recursively calls the completion function,
+ // or lets the command continue run in the background. If it was
+ // completed recursively, we can just continue our loop. Otherwise
+ // the completion handler will invoke this loop again elsewhere.
+ // We could unconditionally call continue_cmd_list() in the handler
+ // instead, but then stack depth would grow with list length.
+ if (!list->completed_recursive)
+ return;
+ }
+ }
+
+ mp_cmd_ctx_complete(list->parent);
+ talloc_free(list);
+}
+
static void cmd_list(void *p)
{
struct mp_cmd_ctx *cmd = p;
- for (struct mp_cmd *sub = cmd->cmd->args[0].v.p; sub; sub = sub->queue_next)
- run_command(cmd->mpctx, sub, NULL);
+ cmd->completed = false;
+
+ struct cmd_list_ctx *list = talloc_zero(NULL, struct cmd_list_ctx);
+ list->mpctx = cmd->mpctx;
+ list->parent = p;
+
+ continue_cmd_list(list);
+}
+
+const struct mp_cmd_def mp_cmd_list = { "list", cmd_list, .exec_async = true };
+
+// Signal that the command is complete now. This also deallocates cmd.
+// You must call this function in a state where the core is locked for the
+// current thread (e.g. from the main thread, or from within mp_dispatch_lock()).
+// Completion means the command is finished, even if it errored or never ran.
+// Keep in mind that calling this can execute further user command that can
+// change arbitrary state (due to cmd_list).
+void mp_cmd_ctx_complete(struct mp_cmd_ctx *cmd)
+{
+ cmd->completed = true;
+ if (!cmd->success)
+ mpv_free_node_contents(&cmd->result);
+ if (cmd->on_completion)
+ cmd->on_completion(cmd);
+ mpv_free_node_contents(&cmd->result);
+ talloc_free(cmd->cmd);
+ talloc_free(cmd);
}
-const struct mp_cmd_def mp_cmd_list = { "list", cmd_list };
+static void run_command_on_worker_thread(void *p)
+{
+ struct mp_cmd_ctx *ctx = p;
+ struct MPContext *mpctx = ctx->mpctx;
+
+ mp_core_lock(mpctx);
+
+ bool exec_async = ctx->cmd->def->exec_async;
+ ctx->cmd->def->handler(ctx);
+ if (!exec_async)
+ mp_cmd_ctx_complete(ctx);
-int run_command(struct MPContext *mpctx, struct mp_cmd *cmd, struct mpv_node *res)
+ mpctx->outstanding_async -= 1;
+ if (!mpctx->outstanding_async && mp_is_shutting_down(mpctx))
+ mp_wakeup_core(mpctx);
+
+ mp_core_unlock(mpctx);
+}
+
+// Run the given command. Upon command completion, on_completion is called. This
+// can happen within the function, or for async commands, some time after the
+// function returns (the caller is supposed to be able to handle both cases). In
+// both cases, the callback will be called while the core is locked (i.e. you
+// can access the core freely).
+// on_completion_priv is copied to mp_cmd_ctx.on_completion_priv and can be
+// accessed from the completion callback.
+// The completion callback is invoked exactly once. If it's NULL, it's ignored.
+// Ownership of cmd goes to the caller.
+void run_command(struct MPContext *mpctx, struct mp_cmd *cmd,
+ void (*on_completion)(struct mp_cmd_ctx *cmd),
+ void *on_completion_priv)
{
- struct mpv_node dummy_node = {0};
- struct mp_cmd_ctx *ctx = &(struct mp_cmd_ctx){
+ struct mp_cmd_ctx *ctx = talloc(NULL, struct mp_cmd_ctx);
+ *ctx = (struct mp_cmd_ctx){
.mpctx = mpctx,
.cmd = cmd,
.args = cmd->args,
.num_args = cmd->nargs,
.priv = cmd->def->priv,
.success = true,
- .result = res ? res : &dummy_node,
+ .completed = true,
+ .on_completion = on_completion,
+ .on_completion_priv = on_completion_priv,
};
struct MPOpts *opts = mpctx->opts;
@@ -4863,22 +4985,32 @@ int run_command(struct MPContext *mpctx, struct mp_cmd *cmd, struct mpv_node *re
for (int n = 0; n < cmd->nargs; n++) {
if (cmd->args[n].type->type == CONF_TYPE_STRING) {
char *s = mp_property_expand_string(mpctx, cmd->args[n].v.s);
- if (!s)
- return -1;
+ if (!s) {
+ ctx->success = false;
+ mp_cmd_ctx_complete(ctx);
+ return;
+ }
talloc_free(cmd->args[n].v.s);
cmd->args[n].v.s = s;
}
}
}
- cmd->def->handler(ctx);
-
- if (!ctx->success)
- mpv_free_node_contents(ctx->result);
-
- mpv_free_node_contents(&dummy_node);
-
- return ctx->success ? 0 : -1;
+ if (cmd->def->spawn_thread) {
+ mpctx->outstanding_async += 1; // prevent that core disappears
+ if (!mp_thread_pool_queue(mpctx->thread_pool,
+ run_command_on_worker_thread, ctx))
+ {
+ mpctx->outstanding_async -= 1;
+ ctx->success = false;
+ mp_cmd_ctx_complete(ctx);
+ }
+ } else {
+ bool exec_async = cmd->def->exec_async;
+ cmd->def->handler(ctx);
+ if (!exec_async)
+ mp_cmd_ctx_complete(ctx);
+ }
}
static void cmd_seek(void *p)
@@ -5199,7 +5331,7 @@ static void cmd_expand_text(void *p)
struct mp_cmd_ctx *cmd = p;
struct MPContext *mpctx = cmd->mpctx;
- *cmd->result = (mpv_node){
+ cmd->result = (mpv_node){
.format = MPV_FORMAT_STRING,
.u.string = mp_property_expand_string(mpctx, cmd->args[0].v.s)
};
@@ -5513,7 +5645,7 @@ static void cmd_screenshot_raw(void *p)
{
struct mp_cmd_ctx *cmd = p;
struct MPContext *mpctx = cmd->mpctx;
- struct mpv_node *res = cmd->result;
+ struct mpv_node *res = &cmd->result;
struct mp_image *img = screenshot_get_rgb(mpctx, cmd->args[0].v.i);
if (!img) {
diff --git a/player/command.h b/player/command.h
index b6ccffe80f..9e385dbc16 100644
--- a/player/command.h
+++ b/player/command.h
@@ -20,6 +20,8 @@
#include <stdbool.h>
+#include "libmpv/client.h"
+
struct MPContext;
struct mp_cmd;
struct mp_log;
@@ -43,12 +45,28 @@ struct mp_cmd_ctx {
bool bar_osd; // OSD bar requested
bool seek_msg_osd; // same as above, but for seek commands
bool seek_bar_osd;
- // Return values
+ // Return values (to be set by command implementation, read by the
+ // completion callback).
bool success; // true by default
- struct mpv_node *result;
+ struct mpv_node result;
+ // Command handlers can set this to false if returning from the command
+ // handler does not complete the command. It stops the common command code
+ // from signaling the completion automatically, and you can call
+ // mp_cmd_ctx_complete() to invoke on_completion() properly (including all
+ // the bookkeeping).
+ /// (Note that in no case you can call mp_cmd_ctx_complete() from within
+ // the command handler, because it frees the mp_cmd_ctx.)
+ bool completed; // true by default
+ // This is managed by the common command code. For rules about how and where
+ // this is called see run_command() comments.
+ void (*on_completion)(struct mp_cmd_ctx *cmd);
+ void *on_completion_priv; // for free use by on_completion callback
};
-int run_command(struct MPContext *mpctx, struct mp_cmd *cmd, struct mpv_node *res);
+void run_command(struct MPContext *mpctx, struct mp_cmd *cmd,
+ void (*on_completion)(struct mp_cmd_ctx *cmd),
+ void *on_completion_priv);
+void mp_cmd_ctx_complete(struct mp_cmd_ctx *cmd);
char *mp_property_expand_string(struct MPContext *mpctx, const char *str);
char *mp_property_expand_escaped_string(struct MPContext *mpctx, const char *str);
void property_print_help(struct MPContext *mpctx);
diff --git a/player/core.h b/player/core.h
index 71c39dcaa5..0840a7836d 100644
--- a/player/core.h
+++ b/player/core.h
@@ -243,6 +243,8 @@ typedef struct MPContext {
// mp_dispatch_lock must be called to change it.
int64_t outstanding_async;
+ struct mp_thread_pool *thread_pool; // for coarse I/O, often during loading
+
struct mp_log *statusline;
struct osd_state *osd;
char *term_osd_text;
@@ -551,6 +553,8 @@ void mp_wait_events(struct MPContext *mpctx);
void mp_set_timeout(struct MPContext *mpctx, double sleeptime);
void mp_wakeup_core(struct MPContext *mpctx);
void mp_wakeup_core_cb(void *ctx);
+void mp_core_lock(struct MPContext *mpctx);
+void mp_core_unlock(struct MPContext *mpctx);
void mp_process_input(struct MPContext *mpctx);
double get_relative_time(struct MPContext *mpctx);
void reset_playback_state(struct MPContext *mpctx);
diff --git a/player/main.c b/player/main.c
index f56191a297..bc14bbce1c 100644
--- a/player/main.c
+++ b/player/main.c
@@ -28,6 +28,7 @@
#include "mpv_talloc.h"
#include "misc/dispatch.h"
+#include "misc/thread_pool.h"
#include "osdep/io.h"
#include "osdep/terminal.h"
#include "osdep/timer.h"
@@ -279,6 +280,7 @@ struct MPContext *mp_create(void)
.playlist = talloc_struct(mpctx, struct playlist, {0}),
.dispatch = mp_dispatch_create(mpctx),
.playback_abort = mp_cancel_new(mpctx),
+ .thread_pool = mp_thread_pool_create(mpctx, 0, 1, 30),
};
pthread_mutex_init(&mpctx->lock, NULL);
diff --git a/player/playloop.c b/player/playloop.c
index f5c1fde0ef..26f9f12d82 100644
--- a/player/playloop.c
+++ b/player/playloop.c
@@ -92,6 +92,16 @@ void mp_wakeup_core_cb(void *ctx)
mp_wakeup_core(mpctx);
}
+void mp_core_lock(struct MPContext *mpctx)
+{
+ mp_dispatch_lock(mpctx->dispatch);
+}
+
+void mp_core_unlock(struct MPContext *mpctx)
+{
+ mp_dispatch_unlock(mpctx->dispatch);
+}
+
// Process any queued input, whether it's user input, or requests from client
// API threads. This also resets the "wakeup" flag used with mp_wait_events().
void mp_process_input(struct MPContext *mpctx)
@@ -100,8 +110,7 @@ void mp_process_input(struct MPContext *mpctx)
mp_cmd_t *cmd = mp_input_read_cmd(mpctx->input);
if (!cmd)
break;
- run_command(mpctx, cmd, NULL);
- mp_cmd_free(cmd);
+ run_command(mpctx, cmd, NULL, NULL);
}
mp_set_timeout(mpctx, mp_input_get_delay(mpctx->input));
}