summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorwm4 <wm4@nowhere>2019-12-19 11:11:51 +0100
committerwm4 <wm4@nowhere>2019-12-19 11:11:51 +0100
commit2c5cb2c53f38580696af4085bb257f0112e92a42 (patch)
treeb0e82776c032818fcee418179d427d894af8ba1f
parentf719b638402ca6b26759f23785f2dd46d31ee085 (diff)
downloadmpv-2c5cb2c53f38580696af4085bb257f0112e92a42.tar.bz2
mpv-2c5cb2c53f38580696af4085bb257f0112e92a42.tar.xz
client API: rewrite property observation (again)
I intend to rewrite this code approximately every 2 months. Last time, I did this in commit d66eb93e5d4 (and 065c307e8e7 and b2006eeb74f). It was intended to remove the roundabout synchronous thread "ping pong" when observing properties. At first, the original async. code was replaced with some nice mostly synchronous code. But then an async. code path had to be added for vo_libmpv, and finally the sync. code was dropped because it broke in other obscure cases (like the Objective-C Cocoa backend). Try again. This time, update properties entirely on the main thread. Updates get batched out on every playloop iteration. (At first I wanted it to make it every time the player goes to sleep, but that might starve API clients if the playloop get saturated.) One nice thing is that clients only get woken up once all changed events have been sent, which might reduce overhead. While this sounds simple, it's not. The main problem is that reading properties must not block the client API, i.e. no client API locks can be held while reading the property. Maybe eventually we can avoid this requirement, but currently it's just a fact. This means we have to iterate over all clients and then over all properties (of each client), all while releasing all locks when updating a property. Solve this by rechecking on each iteration whether the list changed, and if so, aborting the iteration and redo it "next time". High risk change, expect bugs such as crashes and missing property updates.
-rw-r--r--player/client.c329
-rw-r--r--player/client.h1
-rw-r--r--player/playloop.c2
3 files changed, 195 insertions, 137 deletions
diff --git a/player/client.c b/player/client.c
index 44ec7fad8c..0d1a4e6d53 100644
--- a/player/client.c
+++ b/player/client.c
@@ -73,6 +73,10 @@ struct mp_client_api {
bool shutting_down; // do not allow new clients
bool have_terminator; // a client took over the role of destroying the core
bool terminate_core_thread; // make libmpv core thread exit
+ // This is incremented whenever the clients[] array above changes. This is
+ // used to safely unlock mp_client_api.lock while iterating the list of
+ // clients.
+ uint64_t clients_list_change_ts;
struct mp_custom_protocol *custom_protocols;
int num_custom_protocols;
@@ -82,6 +86,7 @@ struct mp_client_api {
};
struct observe_property {
+ // -- immutable
struct mpv_handle *owner;
char *name;
int id; // ==mp_get_property_id(name)
@@ -89,15 +94,14 @@ struct observe_property {
int64_t reply_id;
mpv_format format;
const struct m_option *type;
- bool changed; // property change should be signaled to user
- bool dead; // property unobserved while retrieving value
+ // -- protected by owner->lock
+ size_t refcount;
+ uint64_t change_ts; // logical timestamp incremented on each change
+ uint64_t value_ts; // logical timestamp for value contents
bool value_valid;
union m_option_value value;
- uint64_t async_change_ts; // logical timestamp incremented on each change
- uint64_t async_value_ts; // logical timestamp for async_value contents
- bool async_updating; // if true, updating async_value_ts to change_ts
- bool async_value_valid;
- union m_option_value async_value;
+ uint64_t value_ret_ts; // logical timestamp of value returned to user
+ union m_option_value value_ret;
};
struct mpv_handle {
@@ -110,6 +114,7 @@ struct mpv_handle {
// -- not thread-safe
struct mpv_event *cur_event;
struct mpv_event_property cur_property_event;
+ struct observe_property *cur_property;
pthread_mutex_t lock;
@@ -134,11 +139,19 @@ struct mpv_handle {
int reserved_events; // number of entries reserved for replies
size_t async_counter; // pending other async events
bool choked; // recovering from queue overflow
+ bool destroying; // pending destruction; no API accesses allowed
struct observe_property **properties;
int num_properties;
- int lowest_changed; // attempt at making change processing incremental
+ bool has_pending_properties; // (maybe) new property events (producer side)
+ bool new_property_events; // new property events (consumer side)
+ int cur_property_index; // round-robin for property events (consumer side)
uint64_t property_event_masks; // or-ed together event masks of all properties
+ // This is incremented whenever the properties[] array above changes. This
+ // is used to safely unlock mpv_handle.lock while reading a property. If
+ // the counter didn't change between unlock and relock, then it will assume
+ // the array did not change.
+ uint64_t properties_change_ts;
bool fuzzy_initialized; // see scripting.c wait_loaded()
bool is_weak; // can not keep core alive on its own
@@ -150,6 +163,18 @@ static bool gen_log_message_event(struct mpv_handle *ctx);
static bool gen_property_change_event(struct mpv_handle *ctx);
static void notify_property_events(struct mpv_handle *ctx, uint64_t event_mask);
+// Must be called with prop->owner->lock held.
+static void prop_unref(struct observe_property *prop)
+{
+ if (!prop)
+ return;
+
+ assert(prop->refcount > 0);
+ prop->refcount -= 1;
+ if (!prop->refcount)
+ talloc_free(prop);
+}
+
void mp_clients_init(struct MPContext *mpctx)
{
mpctx->clients = talloc_ptrtype(NULL, mpctx->clients);
@@ -256,6 +281,7 @@ struct mpv_handle *mp_new_client(struct mp_client_api *clients, const char *name
snprintf(client->name, sizeof(client->name), "%s", nname);
+ clients->clients_list_change_ts += 1;
MP_TARRAY_APPEND(clients, clients->clients, clients->num_clients, client);
if (clients->num_clients == 1 && !clients->mpctx->is_cli)
@@ -410,6 +436,20 @@ static void mp_destroy_client(mpv_handle *ctx, bool terminate)
if (terminate)
mpv_command(ctx, (const char*[]){"quit", NULL});
+ pthread_mutex_lock(&ctx->lock);
+
+ ctx->destroying = true;
+
+ for (int n = 0; n < ctx->num_properties; n++)
+ prop_unref(ctx->properties[n]);
+ ctx->num_properties = 0;
+ ctx->properties_change_ts += 1;
+
+ prop_unref(ctx->cur_property);
+ ctx->cur_property = NULL;
+
+ pthread_mutex_unlock(&ctx->lock);
+
abort_async(mpctx, ctx, 0, 0);
// reserved_events equals the number of asynchronous requests that weren't
@@ -424,6 +464,7 @@ static void mp_destroy_client(mpv_handle *ctx, bool terminate)
for (int n = 0; n < clients->num_clients; n++) {
if (clients->clients[n] == ctx) {
+ clients->clients_list_change_ts += 1;
MP_TARRAY_REMOVE_AT(clients->clients, clients->num_clients, n);
while (ctx->num_events) {
talloc_free(ctx->events[ctx->first_event].data);
@@ -1415,11 +1456,11 @@ static void property_free(void *p)
{
struct observe_property *prop = p;
- assert(!prop->async_updating);
+ assert(prop->refcount == 0);
if (prop->type) {
m_option_free(prop->type, &prop->value);
- m_option_free(prop->type, &prop->async_value);
+ m_option_free(prop->type, &prop->value_ret);
}
}
@@ -1434,6 +1475,7 @@ int mpv_observe_property(mpv_handle *ctx, uint64_t userdata,
return MPV_ERROR_PROPERTY_FORMAT;
pthread_mutex_lock(&ctx->lock);
+ assert(!ctx->destroying);
struct observe_property *prop = talloc_ptrtype(ctx, prop);
talloc_set_destructor(prop, property_free);
*prop = (struct observe_property){
@@ -1444,35 +1486,33 @@ int mpv_observe_property(mpv_handle *ctx, uint64_t userdata,
.reply_id = userdata,
.format = format,
.type = type,
- .changed = true,
- .async_change_ts = 1,
+ .change_ts = 1, // force initial event
+ .refcount = 1,
};
+ ctx->properties_change_ts += 1;
MP_TARRAY_APPEND(ctx, ctx->properties, ctx->num_properties, prop);
ctx->property_event_masks |= prop->event_mask;
- ctx->lowest_changed = 0;
+ ctx->new_property_events = true;
+ ctx->cur_property_index = 0;
+ ctx->has_pending_properties = true;
pthread_mutex_unlock(&ctx->lock);
+ mp_wakeup_core(ctx->mpctx);
return 0;
}
-static void mark_property_changed(struct mpv_handle *client, int index)
-{
- struct observe_property *prop = client->properties[index];
- prop->changed = true;
- prop->async_change_ts += 1;
- client->lowest_changed = MPMIN(client->lowest_changed, index);
-}
-
int mpv_unobserve_property(mpv_handle *ctx, uint64_t userdata)
{
pthread_mutex_lock(&ctx->lock);
int count = 0;
- for (int n = 0; n < ctx->num_properties; n++) {
+ for (int n = ctx->num_properties - 1; n >= 0; n--) {
struct observe_property *prop = ctx->properties[n];
// Perform actual removal of the property lazily to avoid creating
// dangling pointers and such.
- if (prop->reply_id == userdata && !prop->dead) {
- mark_property_changed(ctx, n);
- prop->dead = true;
+ if (prop->reply_id == userdata) {
+ prop_unref(prop);
+ ctx->properties_change_ts += 1;
+ MP_TARRAY_REMOVE_AT(ctx->properties, ctx->num_properties, n);
+ ctx->cur_property_index = 0;
count++;
}
}
@@ -1485,6 +1525,7 @@ void mp_client_property_change(struct MPContext *mpctx, const char *name)
{
struct mp_client_api *clients = mpctx->clients;
int id = mp_get_property_id(mpctx, name);
+ bool any_pending = false;
pthread_mutex_lock(&clients->lock);
@@ -1492,15 +1533,23 @@ void mp_client_property_change(struct MPContext *mpctx, const char *name)
struct mpv_handle *client = clients->clients[n];
pthread_mutex_lock(&client->lock);
for (int i = 0; i < client->num_properties; i++) {
- if (client->properties[i]->id == id)
- mark_property_changed(client, i);
+ if (client->properties[i]->id == id) {
+ client->properties[i]->change_ts += 1;
+ client->has_pending_properties = true;
+ any_pending = true;
+ }
}
- if (client->lowest_changed < client->num_properties)
- wakeup_client(client);
pthread_mutex_unlock(&client->lock);
}
pthread_mutex_unlock(&clients->lock);
+
+ // If we're inside mp_dispatch_queue_process(), this will cause the playloop
+ // to be re-run (to get mp_client_send_property_changes() called). If we're
+ // inside the normal playloop, this does nothing, but the latter function
+ // will be called at the end of the playloop anyway.
+ if (any_pending)
+ mp_dispatch_adjust_timeout(mpctx->dispatch, 0);
}
// Mark properties as changed in reaction to specific events.
@@ -1508,94 +1557,122 @@ void mp_client_property_change(struct MPContext *mpctx, const char *name)
static void notify_property_events(struct mpv_handle *ctx, uint64_t event_mask)
{
for (int i = 0; i < ctx->num_properties; i++) {
- if (ctx->properties[i]->event_mask & event_mask)
- mark_property_changed(ctx, i);
+ if (ctx->properties[i]->event_mask & event_mask) {
+ ctx->properties[i]->change_ts += 1;
+ ctx->has_pending_properties = true;
+ }
}
- if (ctx->lowest_changed < ctx->num_properties)
- wakeup_client(ctx);
+ // Note: assume this function is called from the playloop only (at least
+ // if called with events that trigger property changes).
}
-static void update_prop_async(void *p)
+// Call with ctx->lock held (only). May temporarily drop the lock.
+static void send_client_property_changes(struct mpv_handle *ctx)
{
- struct observe_property *prop = p;
- struct mpv_handle *ctx = prop->owner;
+ uint64_t cur_ts = ctx->properties_change_ts;
- union m_option_value val = {0};
- bool val_valid = false;
- uint64_t value_ts;
+ ctx->has_pending_properties = false;
- pthread_mutex_lock(&ctx->lock);
- value_ts = prop->async_change_ts;
- assert(prop->async_updating);
- pthread_mutex_unlock(&ctx->lock);
-
- struct getproperty_request req = {
- .mpctx = ctx->mpctx,
- .name = prop->name,
- .format = prop->format,
- .data = &val,
- };
- getproperty_fn(&req);
- val_valid = req.status >= 0;
+ for (int n = 0; n < ctx->num_properties; n++) {
+ struct observe_property *prop = ctx->properties[n];
- pthread_mutex_lock(&ctx->lock);
+ if (prop->value_ts == prop->change_ts)
+ continue;
- assert(prop->async_updating);
+ bool changed = false;
+ if (prop->format) {
+ const struct m_option *type = prop->type;
+ union m_option_value val = {0};
+ struct getproperty_request req = {
+ .mpctx = ctx->mpctx,
+ .name = prop->name,
+ .format = prop->format,
+ .data = &val,
+ };
- // Move to prop->async_value
- m_option_free(prop->type, &prop->async_value);
- memcpy(&prop->async_value, &val, prop->type->type->size);
- prop->async_value_valid = val_valid;
+ // Temporarily unlock and read the property. The very important
+ // thing is that property getters can do whatever they want, _and_
+ // that they may wait on the client API user thread (if vo_libmpv
+ // or similar things are involved).
+ prop->refcount += 1; // keep prop alive (esp. prop->name)
+ ctx->async_counter += 1; // keep ctx alive
+ pthread_mutex_unlock(&ctx->lock);
+ getproperty_fn(&req);
+ pthread_mutex_lock(&ctx->lock);
+ ctx->async_counter -= 1;
+ prop_unref(prop);
+
+ // Set of observed properties was changed or something similar
+ // => start over, retry next time.
+ if (cur_ts != ctx->properties_change_ts || ctx->destroying) {
+ m_option_free(type, &val);
+ mp_wakeup_core(ctx->mpctx);
+ ctx->has_pending_properties = true;
+ break;
+ }
+ assert(prop->refcount > 0);
+
+ bool val_valid = req.status >= 0;
+ changed = prop->value_valid != val_valid;
+ if (prop->value_valid && val_valid)
+ changed = !equal_mpv_value(&prop->value, &val, prop->format);
+ if (prop->value_ts == 0)
+ changed = true; // initial event
+
+ prop->value_valid = val_valid;
+ if (changed && val_valid) {
+ // move val to prop->value
+ m_option_free(type, &prop->value);
+ memcpy(&prop->value, &val, type->type->size);
+ memset(&val, 0, type->type->size);
+ }
- prop->async_value_ts = value_ts;
- prop->async_updating = false;
+ m_option_free(prop->type, &val);
+ } else {
+ changed = true;
+ }
- // Cause it to re-check the property.
- prop->changed = true;
- ctx->lowest_changed = 0;
+ if (changed) {
+ ctx->new_property_events = true;
+ } else if (prop->value_ret_ts == prop->value_ts) {
+ prop->value_ret_ts = prop->change_ts; // no change => no event
+ }
- ctx->async_counter -= 1;
- wakeup_client(ctx);
+ prop->value_ts = prop->change_ts;
+ }
- pthread_mutex_unlock(&ctx->lock);
+ if (ctx->destroying || ctx->new_property_events)
+ wakeup_client(ctx);
}
-static bool update_prop(struct mpv_handle *ctx, struct observe_property *prop)
+void mp_client_send_property_changes(struct MPContext *mpctx)
{
- if (!prop->type)
- return true;
+ struct mp_client_api *clients = mpctx->clients;
- if (prop->async_change_ts > prop->async_value_ts) {
- if (!prop->async_updating) {
- prop->async_updating = true;
- ctx->async_counter += 1;
- mp_dispatch_enqueue(ctx->mpctx->dispatch, update_prop_async, prop);
- }
- return false; // re-update later when the changed value comes in
- }
+ pthread_mutex_lock(&clients->lock);
+ uint64_t cur_ts = clients->clients_list_change_ts;
+
+ for (int n = 0; n < clients->num_clients; n++) {
+ struct mpv_handle *ctx = clients->clients[n];
- union m_option_value val = {0};
- bool val_valid = prop->async_value_valid;
- m_option_copy(prop->type, &val, &prop->async_value);
-
- bool changed = prop->value_valid != val_valid;
- if (prop->value_valid && val_valid)
- changed = !equal_mpv_value(&prop->value, &val, prop->format);
-
- if (changed) {
- prop->value_valid = val_valid;
- if (val_valid) {
- // move val to prop->value
- m_option_free(prop->type, &prop->value);
- memcpy(&prop->value, &val, prop->type->type->size);
- val_valid = false;
+ pthread_mutex_lock(&ctx->lock);
+ if (!ctx->has_pending_properties) {
+ pthread_mutex_unlock(&ctx->lock);
+ continue;
+ }
+ // Keep ctx->lock locked (unlock order does not matter).
+ pthread_mutex_unlock(&clients->lock);
+ send_client_property_changes(ctx);
+ pthread_mutex_unlock(&ctx->lock);
+ pthread_mutex_lock(&clients->lock);
+ if (cur_ts != clients->clients_list_change_ts) {
+ // List changed; need to start over. Do it in the next iteration.
+ mp_wakeup_core(mpctx);
+ break;
}
}
- if (val_valid)
- m_option_free(prop->type, &val);
-
- return changed;
+ pthread_mutex_unlock(&clients->lock);
}
// Set ctx->cur_event to a generated property change event, if there is any
@@ -1605,62 +1682,40 @@ static bool gen_property_change_event(struct mpv_handle *ctx)
if (!ctx->mpctx->initialized)
return false;
- *ctx->cur_event = (struct mpv_event){
- .event_id = MPV_EVENT_NONE,
- };
+ while (1) {
+ if (ctx->cur_property_index >= ctx->num_properties) {
+ if (!ctx->new_property_events || !ctx->num_properties)
+ break;
+ ctx->new_property_events = false;
+ ctx->cur_property_index = 0;
+ }
- bool need_gc = false;
- int start = ctx->lowest_changed;
- ctx->lowest_changed = ctx->num_properties;
- for (int n = start; n < ctx->num_properties; n++) {
- struct observe_property *prop = ctx->properties[n];
- if (prop->changed && n < ctx->lowest_changed)
- ctx->lowest_changed = n;
+ struct observe_property *prop = ctx->properties[ctx->cur_property_index++];
- bool updated = false;
- if (prop->changed && !prop->dead) {
- prop->changed = false;
- updated = update_prop(ctx, prop);
- }
+ if (prop->value_ret_ts != prop->value_ts) {
+ prop->value_ret_ts = prop->value_ts;
+ prop_unref(ctx->cur_property);
+ ctx->cur_property = prop;
+ prop->refcount += 1;
+
+ if (prop->value_valid)
+ m_option_copy(prop->type, &prop->value_ret, &prop->value);
- if (prop->dead) {
- need_gc = true;
- } else if (updated) {
ctx->cur_property_event = (struct mpv_event_property){
.name = prop->name,
.format = prop->value_valid ? prop->format : 0,
- .data = prop->value_valid ? &prop->value : NULL,
+ .data = prop->value_valid ? &prop->value_ret : NULL,
};
*ctx->cur_event = (struct mpv_event){
.event_id = MPV_EVENT_PROPERTY_CHANGE,
.reply_userdata = prop->reply_id,
.data = &ctx->cur_property_event,
};
- break;
- }
- }
-
- if (need_gc) {
- // Remove entries which have the .dead flag set. The point of doing this
- // here is to ensure that this does not conflict with update_prop(),
- // and that a previously returned mpv_event struct pointing to removed
- // property entries does not result in dangling pointers.
- ctx->property_event_masks = 0;
- ctx->lowest_changed = 0;
- for (int n = ctx->num_properties - 1; n >= 0; n--) {
- struct observe_property *prop = ctx->properties[n];
- if (prop->dead) {
- if (!prop->async_updating) {
- MP_TARRAY_REMOVE_AT(ctx->properties, ctx->num_properties, n);
- talloc_free(prop);
- }
- } else {
- ctx->property_event_masks |= prop->event_mask;
- }
+ return true;
}
}
- return !!ctx->cur_event->event_id;
+ return false;
}
int mpv_hook_add(mpv_handle *ctx, uint64_t reply_userdata,
diff --git a/player/client.h b/player/client.h
index 186238396d..3408ff17f0 100644
--- a/player/client.h
+++ b/player/client.h
@@ -30,6 +30,7 @@ int mp_client_send_event(struct MPContext *mpctx, const char *client_name,
int mp_client_send_event_dup(struct MPContext *mpctx, const char *client_name,
int event, void *data);
void mp_client_property_change(struct MPContext *mpctx, const char *name);
+void mp_client_send_property_changes(struct MPContext *mpctx);
struct mpv_handle *mp_new_client(struct mp_client_api *clients, const char *name);
void mp_client_set_weak(struct mpv_handle *ctx);
diff --git a/player/playloop.c b/player/playloop.c
index 3049763661..4ae1864cb9 100644
--- a/player/playloop.c
+++ b/player/playloop.c
@@ -54,6 +54,8 @@
// mp_wait_events() was called.
void mp_wait_events(struct MPContext *mpctx)
{
+ mp_client_send_property_changes(mpctx);
+
bool sleeping = mpctx->sleeptime > 0;
if (sleeping)
MP_STATS(mpctx, "start sleep");