diff options
-rw-r--r-- | player/client.c | 329 | ||||
-rw-r--r-- | player/client.h | 1 | ||||
-rw-r--r-- | player/playloop.c | 2 |
3 files changed, 195 insertions, 137 deletions
diff --git a/player/client.c b/player/client.c index 44ec7fad8c..0d1a4e6d53 100644 --- a/player/client.c +++ b/player/client.c @@ -73,6 +73,10 @@ struct mp_client_api { bool shutting_down; // do not allow new clients bool have_terminator; // a client took over the role of destroying the core bool terminate_core_thread; // make libmpv core thread exit + // This is incremented whenever the clients[] array above changes. This is + // used to safely unlock mp_client_api.lock while iterating the list of + // clients. + uint64_t clients_list_change_ts; struct mp_custom_protocol *custom_protocols; int num_custom_protocols; @@ -82,6 +86,7 @@ struct mp_client_api { }; struct observe_property { + // -- immutable struct mpv_handle *owner; char *name; int id; // ==mp_get_property_id(name) @@ -89,15 +94,14 @@ struct observe_property { int64_t reply_id; mpv_format format; const struct m_option *type; - bool changed; // property change should be signaled to user - bool dead; // property unobserved while retrieving value + // -- protected by owner->lock + size_t refcount; + uint64_t change_ts; // logical timestamp incremented on each change + uint64_t value_ts; // logical timestamp for value contents bool value_valid; union m_option_value value; - uint64_t async_change_ts; // logical timestamp incremented on each change - uint64_t async_value_ts; // logical timestamp for async_value contents - bool async_updating; // if true, updating async_value_ts to change_ts - bool async_value_valid; - union m_option_value async_value; + uint64_t value_ret_ts; // logical timestamp of value returned to user + union m_option_value value_ret; }; struct mpv_handle { @@ -110,6 +114,7 @@ struct mpv_handle { // -- not thread-safe struct mpv_event *cur_event; struct mpv_event_property cur_property_event; + struct observe_property *cur_property; pthread_mutex_t lock; @@ -134,11 +139,19 @@ struct mpv_handle { int reserved_events; // number of entries reserved for replies size_t async_counter; // pending other async events bool choked; // recovering from queue overflow + bool destroying; // pending destruction; no API accesses allowed struct observe_property **properties; int num_properties; - int lowest_changed; // attempt at making change processing incremental + bool has_pending_properties; // (maybe) new property events (producer side) + bool new_property_events; // new property events (consumer side) + int cur_property_index; // round-robin for property events (consumer side) uint64_t property_event_masks; // or-ed together event masks of all properties + // This is incremented whenever the properties[] array above changes. This + // is used to safely unlock mpv_handle.lock while reading a property. If + // the counter didn't change between unlock and relock, then it will assume + // the array did not change. + uint64_t properties_change_ts; bool fuzzy_initialized; // see scripting.c wait_loaded() bool is_weak; // can not keep core alive on its own @@ -150,6 +163,18 @@ static bool gen_log_message_event(struct mpv_handle *ctx); static bool gen_property_change_event(struct mpv_handle *ctx); static void notify_property_events(struct mpv_handle *ctx, uint64_t event_mask); +// Must be called with prop->owner->lock held. +static void prop_unref(struct observe_property *prop) +{ + if (!prop) + return; + + assert(prop->refcount > 0); + prop->refcount -= 1; + if (!prop->refcount) + talloc_free(prop); +} + void mp_clients_init(struct MPContext *mpctx) { mpctx->clients = talloc_ptrtype(NULL, mpctx->clients); @@ -256,6 +281,7 @@ struct mpv_handle *mp_new_client(struct mp_client_api *clients, const char *name snprintf(client->name, sizeof(client->name), "%s", nname); + clients->clients_list_change_ts += 1; MP_TARRAY_APPEND(clients, clients->clients, clients->num_clients, client); if (clients->num_clients == 1 && !clients->mpctx->is_cli) @@ -410,6 +436,20 @@ static void mp_destroy_client(mpv_handle *ctx, bool terminate) if (terminate) mpv_command(ctx, (const char*[]){"quit", NULL}); + pthread_mutex_lock(&ctx->lock); + + ctx->destroying = true; + + for (int n = 0; n < ctx->num_properties; n++) + prop_unref(ctx->properties[n]); + ctx->num_properties = 0; + ctx->properties_change_ts += 1; + + prop_unref(ctx->cur_property); + ctx->cur_property = NULL; + + pthread_mutex_unlock(&ctx->lock); + abort_async(mpctx, ctx, 0, 0); // reserved_events equals the number of asynchronous requests that weren't @@ -424,6 +464,7 @@ static void mp_destroy_client(mpv_handle *ctx, bool terminate) for (int n = 0; n < clients->num_clients; n++) { if (clients->clients[n] == ctx) { + clients->clients_list_change_ts += 1; MP_TARRAY_REMOVE_AT(clients->clients, clients->num_clients, n); while (ctx->num_events) { talloc_free(ctx->events[ctx->first_event].data); @@ -1415,11 +1456,11 @@ static void property_free(void *p) { struct observe_property *prop = p; - assert(!prop->async_updating); + assert(prop->refcount == 0); if (prop->type) { m_option_free(prop->type, &prop->value); - m_option_free(prop->type, &prop->async_value); + m_option_free(prop->type, &prop->value_ret); } } @@ -1434,6 +1475,7 @@ int mpv_observe_property(mpv_handle *ctx, uint64_t userdata, return MPV_ERROR_PROPERTY_FORMAT; pthread_mutex_lock(&ctx->lock); + assert(!ctx->destroying); struct observe_property *prop = talloc_ptrtype(ctx, prop); talloc_set_destructor(prop, property_free); *prop = (struct observe_property){ @@ -1444,35 +1486,33 @@ int mpv_observe_property(mpv_handle *ctx, uint64_t userdata, .reply_id = userdata, .format = format, .type = type, - .changed = true, - .async_change_ts = 1, + .change_ts = 1, // force initial event + .refcount = 1, }; + ctx->properties_change_ts += 1; MP_TARRAY_APPEND(ctx, ctx->properties, ctx->num_properties, prop); ctx->property_event_masks |= prop->event_mask; - ctx->lowest_changed = 0; + ctx->new_property_events = true; + ctx->cur_property_index = 0; + ctx->has_pending_properties = true; pthread_mutex_unlock(&ctx->lock); + mp_wakeup_core(ctx->mpctx); return 0; } -static void mark_property_changed(struct mpv_handle *client, int index) -{ - struct observe_property *prop = client->properties[index]; - prop->changed = true; - prop->async_change_ts += 1; - client->lowest_changed = MPMIN(client->lowest_changed, index); -} - int mpv_unobserve_property(mpv_handle *ctx, uint64_t userdata) { pthread_mutex_lock(&ctx->lock); int count = 0; - for (int n = 0; n < ctx->num_properties; n++) { + for (int n = ctx->num_properties - 1; n >= 0; n--) { struct observe_property *prop = ctx->properties[n]; // Perform actual removal of the property lazily to avoid creating // dangling pointers and such. - if (prop->reply_id == userdata && !prop->dead) { - mark_property_changed(ctx, n); - prop->dead = true; + if (prop->reply_id == userdata) { + prop_unref(prop); + ctx->properties_change_ts += 1; + MP_TARRAY_REMOVE_AT(ctx->properties, ctx->num_properties, n); + ctx->cur_property_index = 0; count++; } } @@ -1485,6 +1525,7 @@ void mp_client_property_change(struct MPContext *mpctx, const char *name) { struct mp_client_api *clients = mpctx->clients; int id = mp_get_property_id(mpctx, name); + bool any_pending = false; pthread_mutex_lock(&clients->lock); @@ -1492,15 +1533,23 @@ void mp_client_property_change(struct MPContext *mpctx, const char *name) struct mpv_handle *client = clients->clients[n]; pthread_mutex_lock(&client->lock); for (int i = 0; i < client->num_properties; i++) { - if (client->properties[i]->id == id) - mark_property_changed(client, i); + if (client->properties[i]->id == id) { + client->properties[i]->change_ts += 1; + client->has_pending_properties = true; + any_pending = true; + } } - if (client->lowest_changed < client->num_properties) - wakeup_client(client); pthread_mutex_unlock(&client->lock); } pthread_mutex_unlock(&clients->lock); + + // If we're inside mp_dispatch_queue_process(), this will cause the playloop + // to be re-run (to get mp_client_send_property_changes() called). If we're + // inside the normal playloop, this does nothing, but the latter function + // will be called at the end of the playloop anyway. + if (any_pending) + mp_dispatch_adjust_timeout(mpctx->dispatch, 0); } // Mark properties as changed in reaction to specific events. @@ -1508,94 +1557,122 @@ void mp_client_property_change(struct MPContext *mpctx, const char *name) static void notify_property_events(struct mpv_handle *ctx, uint64_t event_mask) { for (int i = 0; i < ctx->num_properties; i++) { - if (ctx->properties[i]->event_mask & event_mask) - mark_property_changed(ctx, i); + if (ctx->properties[i]->event_mask & event_mask) { + ctx->properties[i]->change_ts += 1; + ctx->has_pending_properties = true; + } } - if (ctx->lowest_changed < ctx->num_properties) - wakeup_client(ctx); + // Note: assume this function is called from the playloop only (at least + // if called with events that trigger property changes). } -static void update_prop_async(void *p) +// Call with ctx->lock held (only). May temporarily drop the lock. +static void send_client_property_changes(struct mpv_handle *ctx) { - struct observe_property *prop = p; - struct mpv_handle *ctx = prop->owner; + uint64_t cur_ts = ctx->properties_change_ts; - union m_option_value val = {0}; - bool val_valid = false; - uint64_t value_ts; + ctx->has_pending_properties = false; - pthread_mutex_lock(&ctx->lock); - value_ts = prop->async_change_ts; - assert(prop->async_updating); - pthread_mutex_unlock(&ctx->lock); - - struct getproperty_request req = { - .mpctx = ctx->mpctx, - .name = prop->name, - .format = prop->format, - .data = &val, - }; - getproperty_fn(&req); - val_valid = req.status >= 0; + for (int n = 0; n < ctx->num_properties; n++) { + struct observe_property *prop = ctx->properties[n]; - pthread_mutex_lock(&ctx->lock); + if (prop->value_ts == prop->change_ts) + continue; - assert(prop->async_updating); + bool changed = false; + if (prop->format) { + const struct m_option *type = prop->type; + union m_option_value val = {0}; + struct getproperty_request req = { + .mpctx = ctx->mpctx, + .name = prop->name, + .format = prop->format, + .data = &val, + }; - // Move to prop->async_value - m_option_free(prop->type, &prop->async_value); - memcpy(&prop->async_value, &val, prop->type->type->size); - prop->async_value_valid = val_valid; + // Temporarily unlock and read the property. The very important + // thing is that property getters can do whatever they want, _and_ + // that they may wait on the client API user thread (if vo_libmpv + // or similar things are involved). + prop->refcount += 1; // keep prop alive (esp. prop->name) + ctx->async_counter += 1; // keep ctx alive + pthread_mutex_unlock(&ctx->lock); + getproperty_fn(&req); + pthread_mutex_lock(&ctx->lock); + ctx->async_counter -= 1; + prop_unref(prop); + + // Set of observed properties was changed or something similar + // => start over, retry next time. + if (cur_ts != ctx->properties_change_ts || ctx->destroying) { + m_option_free(type, &val); + mp_wakeup_core(ctx->mpctx); + ctx->has_pending_properties = true; + break; + } + assert(prop->refcount > 0); + + bool val_valid = req.status >= 0; + changed = prop->value_valid != val_valid; + if (prop->value_valid && val_valid) + changed = !equal_mpv_value(&prop->value, &val, prop->format); + if (prop->value_ts == 0) + changed = true; // initial event + + prop->value_valid = val_valid; + if (changed && val_valid) { + // move val to prop->value + m_option_free(type, &prop->value); + memcpy(&prop->value, &val, type->type->size); + memset(&val, 0, type->type->size); + } - prop->async_value_ts = value_ts; - prop->async_updating = false; + m_option_free(prop->type, &val); + } else { + changed = true; + } - // Cause it to re-check the property. - prop->changed = true; - ctx->lowest_changed = 0; + if (changed) { + ctx->new_property_events = true; + } else if (prop->value_ret_ts == prop->value_ts) { + prop->value_ret_ts = prop->change_ts; // no change => no event + } - ctx->async_counter -= 1; - wakeup_client(ctx); + prop->value_ts = prop->change_ts; + } - pthread_mutex_unlock(&ctx->lock); + if (ctx->destroying || ctx->new_property_events) + wakeup_client(ctx); } -static bool update_prop(struct mpv_handle *ctx, struct observe_property *prop) +void mp_client_send_property_changes(struct MPContext *mpctx) { - if (!prop->type) - return true; + struct mp_client_api *clients = mpctx->clients; - if (prop->async_change_ts > prop->async_value_ts) { - if (!prop->async_updating) { - prop->async_updating = true; - ctx->async_counter += 1; - mp_dispatch_enqueue(ctx->mpctx->dispatch, update_prop_async, prop); - } - return false; // re-update later when the changed value comes in - } + pthread_mutex_lock(&clients->lock); + uint64_t cur_ts = clients->clients_list_change_ts; + + for (int n = 0; n < clients->num_clients; n++) { + struct mpv_handle *ctx = clients->clients[n]; - union m_option_value val = {0}; - bool val_valid = prop->async_value_valid; - m_option_copy(prop->type, &val, &prop->async_value); - - bool changed = prop->value_valid != val_valid; - if (prop->value_valid && val_valid) - changed = !equal_mpv_value(&prop->value, &val, prop->format); - - if (changed) { - prop->value_valid = val_valid; - if (val_valid) { - // move val to prop->value - m_option_free(prop->type, &prop->value); - memcpy(&prop->value, &val, prop->type->type->size); - val_valid = false; + pthread_mutex_lock(&ctx->lock); + if (!ctx->has_pending_properties) { + pthread_mutex_unlock(&ctx->lock); + continue; + } + // Keep ctx->lock locked (unlock order does not matter). + pthread_mutex_unlock(&clients->lock); + send_client_property_changes(ctx); + pthread_mutex_unlock(&ctx->lock); + pthread_mutex_lock(&clients->lock); + if (cur_ts != clients->clients_list_change_ts) { + // List changed; need to start over. Do it in the next iteration. + mp_wakeup_core(mpctx); + break; } } - if (val_valid) - m_option_free(prop->type, &val); - - return changed; + pthread_mutex_unlock(&clients->lock); } // Set ctx->cur_event to a generated property change event, if there is any @@ -1605,62 +1682,40 @@ static bool gen_property_change_event(struct mpv_handle *ctx) if (!ctx->mpctx->initialized) return false; - *ctx->cur_event = (struct mpv_event){ - .event_id = MPV_EVENT_NONE, - }; + while (1) { + if (ctx->cur_property_index >= ctx->num_properties) { + if (!ctx->new_property_events || !ctx->num_properties) + break; + ctx->new_property_events = false; + ctx->cur_property_index = 0; + } - bool need_gc = false; - int start = ctx->lowest_changed; - ctx->lowest_changed = ctx->num_properties; - for (int n = start; n < ctx->num_properties; n++) { - struct observe_property *prop = ctx->properties[n]; - if (prop->changed && n < ctx->lowest_changed) - ctx->lowest_changed = n; + struct observe_property *prop = ctx->properties[ctx->cur_property_index++]; - bool updated = false; - if (prop->changed && !prop->dead) { - prop->changed = false; - updated = update_prop(ctx, prop); - } + if (prop->value_ret_ts != prop->value_ts) { + prop->value_ret_ts = prop->value_ts; + prop_unref(ctx->cur_property); + ctx->cur_property = prop; + prop->refcount += 1; + + if (prop->value_valid) + m_option_copy(prop->type, &prop->value_ret, &prop->value); - if (prop->dead) { - need_gc = true; - } else if (updated) { ctx->cur_property_event = (struct mpv_event_property){ .name = prop->name, .format = prop->value_valid ? prop->format : 0, - .data = prop->value_valid ? &prop->value : NULL, + .data = prop->value_valid ? &prop->value_ret : NULL, }; *ctx->cur_event = (struct mpv_event){ .event_id = MPV_EVENT_PROPERTY_CHANGE, .reply_userdata = prop->reply_id, .data = &ctx->cur_property_event, }; - break; - } - } - - if (need_gc) { - // Remove entries which have the .dead flag set. The point of doing this - // here is to ensure that this does not conflict with update_prop(), - // and that a previously returned mpv_event struct pointing to removed - // property entries does not result in dangling pointers. - ctx->property_event_masks = 0; - ctx->lowest_changed = 0; - for (int n = ctx->num_properties - 1; n >= 0; n--) { - struct observe_property *prop = ctx->properties[n]; - if (prop->dead) { - if (!prop->async_updating) { - MP_TARRAY_REMOVE_AT(ctx->properties, ctx->num_properties, n); - talloc_free(prop); - } - } else { - ctx->property_event_masks |= prop->event_mask; - } + return true; } } - return !!ctx->cur_event->event_id; + return false; } int mpv_hook_add(mpv_handle *ctx, uint64_t reply_userdata, diff --git a/player/client.h b/player/client.h index 186238396d..3408ff17f0 100644 --- a/player/client.h +++ b/player/client.h @@ -30,6 +30,7 @@ int mp_client_send_event(struct MPContext *mpctx, const char *client_name, int mp_client_send_event_dup(struct MPContext *mpctx, const char *client_name, int event, void *data); void mp_client_property_change(struct MPContext *mpctx, const char *name); +void mp_client_send_property_changes(struct MPContext *mpctx); struct mpv_handle *mp_new_client(struct mp_client_api *clients, const char *name); void mp_client_set_weak(struct mpv_handle *ctx); diff --git a/player/playloop.c b/player/playloop.c index 3049763661..4ae1864cb9 100644 --- a/player/playloop.c +++ b/player/playloop.c @@ -54,6 +54,8 @@ // mp_wait_events() was called. void mp_wait_events(struct MPContext *mpctx) { + mp_client_send_property_changes(mpctx); + bool sleeping = mpctx->sleeptime > 0; if (sleeping) MP_STATS(mpctx, "start sleep"); |