summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorwm4 <wm4@nowhere>2014-09-13 14:23:08 +0200
committerwm4 <wm4@nowhere>2014-09-13 16:09:51 +0200
commit2e91d44e2031c299de0d879e2656024d701c27da (patch)
treefdbe4903705bff3d6a21c42f08eb7e4b31e5cf07
parent2dd819705dcbeb4a20c0d6123082f9662cfa2ae4 (diff)
downloadmpv-2e91d44e2031c299de0d879e2656024d701c27da.tar.bz2
mpv-2e91d44e2031c299de0d879e2656024d701c27da.tar.xz
stream: redo playback abort handling
This mechanism originates from MPlayer's way of dealing with blocking network, but it's still useful. On opening and closing, mpv waits for network synchronously, and also some obscure commands and use-cases can lead to such blocking. In these situations, the stream is asynchronously forced to stop by "interrupting" it. The old design interrupting I/O was a bit broken: polling with a callback, instead of actively interrupting it. Change the direction of this. There is no callback anymore, and the player calls mp_cancel_trigger() to force the stream to return. libavformat (via stream_lavf.c) has the old broken design, and fixing it would require fixing libavformat, which won't happen so quickly. So we have to keep that part. But everything above the stream layer is prepared for a better design, and more sophisticated methods than mp_cancel_test() could be easily introduced. There's still one problem: commands are still run in the central playback loop, which we assume can block on I/O in the worst case. That's not a problem yet, because we simply mark some commands as being able to stop playback of the current file ("quit" etc.), so input.c could abort playback as soon as such a command is queued. But there are also commands abort playback only conditionally, and the logic for that is in the playback core and thus "unreachable". For example, "playlist_next" aborts playback only if there's a next file. We don't want it to always abort playback. As a quite ugly hack, abort playback only if at least 2 abort commands are queued - this pretty much happens only if the core is frozen and doesn't react to input.
-rw-r--r--common/global.h3
-rw-r--r--input/input.c18
-rw-r--r--input/input.h6
-rw-r--r--player/core.h1
-rw-r--r--player/loadfile.c7
-rw-r--r--player/main.c10
-rw-r--r--stream/cache.c4
-rw-r--r--stream/rar.c5
-rw-r--r--stream/rar.h1
-rw-r--r--stream/stream.c59
-rw-r--r--stream/stream.h12
-rw-r--r--stream/stream_lavf.c2
-rw-r--r--stream/stream_rar.c5
13 files changed, 86 insertions, 47 deletions
diff --git a/common/global.h b/common/global.h
index e49169bebb..546c585294 100644
--- a/common/global.h
+++ b/common/global.h
@@ -7,9 +7,6 @@
struct mpv_global {
struct MPOpts *opts;
struct mp_log *log;
-
- int (*stream_interrupt_cb)(void *ctx);
- void *stream_interrupt_cb_ctx;
};
#endif
diff --git a/input/input.c b/input/input.c
index 776bcaf6fb..3e9b40d158 100644
--- a/input/input.c
+++ b/input/input.c
@@ -156,6 +156,8 @@ struct input_ctx {
int num_sources;
struct cmd_queue cmd_queue;
+
+ struct mp_cancel *cancel;
};
static int parse_config(struct input_ctx *ictx, bool builtin, bstr data,
@@ -243,10 +245,10 @@ static int queue_count_cmds(struct cmd_queue *queue)
return res;
}
-static bool queue_has_abort_cmds(struct cmd_queue *queue)
+static bool has_abort_cmds(struct input_ctx *ictx)
{
bool ret = false;
- for (struct mp_cmd *cmd = queue->first; cmd; cmd = cmd->queue_next)
+ for (struct mp_cmd *cmd = ictx->cmd_queue.first; cmd; cmd = cmd->queue_next)
if (mp_input_is_abort_cmd(cmd)) {
ret = true;
break;
@@ -556,8 +558,8 @@ static bool key_updown_ok(enum mp_command_type cmd)
static bool should_drop_cmd(struct input_ctx *ictx, struct mp_cmd *cmd)
{
struct cmd_queue *queue = &ictx->cmd_queue;
- return (queue_count_cmds(queue) >= ictx->key_fifo_size &&
- (!mp_input_is_abort_cmd(cmd) || queue_has_abort_cmds(queue)));
+ return queue_count_cmds(queue) >= ictx->key_fifo_size &&
+ !mp_input_is_abort_cmd(cmd);
}
static struct mp_cmd *resolve_key(struct input_ctx *ictx, int code)
@@ -796,6 +798,9 @@ int mp_input_queue_cmd(struct input_ctx *ictx, mp_cmd_t *cmd)
{
input_lock(ictx);
if (cmd) {
+ // Abort only if there are going to be at least 2 commands in the queue.
+ if (ictx->cancel && mp_input_is_abort_cmd(cmd) && has_abort_cmds(ictx))
+ mp_cancel_trigger(ictx->cancel);
queue_add_tail(&ictx->cmd_queue, cmd);
mp_input_wakeup(ictx);
}
@@ -1304,12 +1309,11 @@ void mp_input_uninit(struct input_ctx *ictx)
talloc_free(ictx);
}
-bool mp_input_check_interrupt(struct input_ctx *ictx)
+void mp_input_set_cancel(struct input_ctx *ictx, struct mp_cancel *cancel)
{
input_lock(ictx);
- bool res = queue_has_abort_cmds(&ictx->cmd_queue);
+ ictx->cancel = cancel;
input_unlock(ictx);
- return res;
}
bool mp_input_use_alt_gr(struct input_ctx *ictx)
diff --git a/input/input.h b/input/input.h
index 56544a99ee..7096ae62cb 100644
--- a/input/input.h
+++ b/input/input.h
@@ -228,8 +228,10 @@ void mp_input_wakeup(struct input_ctx *ictx);
void mp_input_wakeup_nolock(struct input_ctx *ictx);
-// Interruptible usleep: (used by demux)
-bool mp_input_check_interrupt(struct input_ctx *ictx);
+// Used to asynchronously abort playback. Needed because the core still can
+// block on network in some situations.
+struct mp_cancel;
+void mp_input_set_cancel(struct input_ctx *ictx, struct mp_cancel *cancel);
// If this returns true, use Right Alt key as Alt Gr to produce special
// characters. If false, count Right Alt as the modifier Alt key.
diff --git a/player/core.h b/player/core.h
index 5c40e48674..9d9fefee21 100644
--- a/player/core.h
+++ b/player/core.h
@@ -169,6 +169,7 @@ typedef struct MPContext {
struct input_ctx *input;
struct mp_client_api *clients;
struct mp_dispatch_queue *dispatch;
+ struct mp_cancel *playback_abort;
struct mp_log *statusline;
struct osd_state *osd;
diff --git a/player/loadfile.c b/player/loadfile.c
index 4f4db2a376..b8efb92ca5 100644
--- a/player/loadfile.c
+++ b/player/loadfile.c
@@ -1005,6 +1005,8 @@ static void play_current_file(struct MPContext *mpctx)
mp_notify(mpctx, MPV_EVENT_START_FILE, NULL);
+ mp_cancel_reset(mpctx->playback_abort);
+
mpctx->stop_play = 0;
mpctx->filename = NULL;
mpctx->shown_aframes = 0;
@@ -1085,7 +1087,8 @@ static void play_current_file(struct MPContext *mpctx)
int stream_flags = STREAM_READ;
if (!opts->load_unsafe_playlists)
stream_flags |= mpctx->playing->stream_flags;
- mpctx->stream = stream_create(stream_filename, stream_flags, mpctx->global);
+ mpctx->stream = stream_create(stream_filename, stream_flags,
+ mpctx->playback_abort, mpctx->global);
if (!mpctx->stream) { // error...
mp_process_input(mpctx);
goto terminate_playback;
@@ -1307,6 +1310,8 @@ terminate_playback:
if (mpctx->step_frames)
opts->pause = 1;
+ mp_cancel_trigger(mpctx->playback_abort);
+
MP_INFO(mpctx, "\n");
// time to uninit all, except global stuff:
diff --git a/player/main.c b/player/main.c
index 6f9a0fcca8..93f974c107 100644
--- a/player/main.c
+++ b/player/main.c
@@ -308,6 +308,7 @@ struct MPContext *mp_create(void)
.osd_progbar = { .type = -1 },
.playlist = talloc_struct(mpctx, struct playlist, {0}),
.dispatch = mp_dispatch_create(mpctx),
+ .playback_abort = mp_cancel_new(mpctx),
};
mpctx->global = talloc_zero(mpctx, struct mpv_global);
@@ -341,12 +342,6 @@ struct MPContext *mp_create(void)
return mpctx;
}
-static int check_stream_interrupt(void *ctx)
-{
- struct MPContext *mpctx = ctx;
- return mp_input_check_interrupt(mpctx->input);
-}
-
static void wakeup_playloop(void *ctx)
{
struct MPContext *mpctx = ctx;
@@ -383,8 +378,7 @@ int mp_initialize(struct MPContext *mpctx)
}
mpctx->input = mp_input_init(mpctx->global);
- mpctx->global->stream_interrupt_cb = check_stream_interrupt;
- mpctx->global->stream_interrupt_cb_ctx = mpctx;
+ mp_input_set_cancel(mpctx->input, mpctx->playback_abort);
mp_dispatch_set_wakeup_fn(mpctx->dispatch, wakeup_playloop, mpctx);
diff --git a/stream/cache.c b/stream/cache.c
index 81408e098b..a5ac1c2995 100644
--- a/stream/cache.c
+++ b/stream/cache.c
@@ -138,7 +138,7 @@ static int64_t mp_clipi64(int64_t val, int64_t min, int64_t max)
// Returns CACHE_INTERRUPTED if the caller is supposed to abort.
static int cache_wakeup_and_wait(struct priv *s, double *retry_time)
{
- if (stream_check_interrupt(s->cache))
+ if (mp_cancel_test(s->cache->cancel))
return CACHE_INTERRUPTED;
double start = mp_time_sec();
@@ -652,7 +652,7 @@ int stream_cache_init(stream_t *cache, stream_t *stream,
if (min < 1)
return 1;
for (;;) {
- if (stream_check_interrupt(cache))
+ if (mp_cancel_test(cache->cancel))
return -1;
int64_t fill;
int idle;
diff --git a/stream/rar.c b/stream/rar.c
index 31211c777b..12d3b0bfd7 100644
--- a/stream/rar.c
+++ b/stream/rar.c
@@ -386,7 +386,8 @@ int RarParse(struct stream *s, int *count, rar_file_t ***file)
if (!volume_mrl)
goto done;
- vol = stream_create(volume_mrl, STREAM_READ | STREAM_NO_FILTERS, s->global);
+ vol = stream_create(volume_mrl, STREAM_READ | STREAM_NO_FILTERS,
+ s->cancel, s->global);
if (!vol)
goto done;
@@ -423,7 +424,7 @@ int RarSeek(rar_file_t *file, uint64_t position)
free_stream(file->s);
file->s = stream_create(file->current_chunk->mrl,
STREAM_READ | STREAM_NO_FILTERS,
- file->global);
+ file->cancel, file->global);
}
return file->s ? stream_seek(file->s, offset) : 0;
}
diff --git a/stream/rar.h b/stream/rar.h
index c9003b8a1b..f7f80a8177 100644
--- a/stream/rar.h
+++ b/stream/rar.h
@@ -45,6 +45,7 @@ typedef struct {
// When actually reading the data
struct mpv_global *global;
+ struct mp_cancel *cancel;
uint64_t i_pos;
stream_t *s;
rar_file_chunk_t *current_chunk;
diff --git a/stream/stream.c b/stream/stream.c
index e2f71ae58f..a8162ffcb3 100644
--- a/stream/stream.c
+++ b/stream/stream.c
@@ -32,6 +32,7 @@
#include <libavutil/common.h>
#include "osdep/mpbswap.h"
+#include "osdep/atomics.h"
#include "talloc.h"
@@ -252,8 +253,8 @@ static const char *match_proto(const char *url, const char *proto)
}
static int open_internal(const stream_info_t *sinfo, struct stream *underlying,
- const char *url, int flags, struct mpv_global *global,
- struct stream **ret)
+ const char *url, int flags, struct mp_cancel *c,
+ struct mpv_global *global, struct stream **ret)
{
if (sinfo->stream_filter != !!underlying)
return STREAM_NO_MATCH;
@@ -281,6 +282,7 @@ static int open_internal(const stream_info_t *sinfo, struct stream *underlying,
s->log = mp_log_new(s, global->log, sinfo->name);
s->info = sinfo;
s->opts = global->opts;
+ s->cancel = c;
s->global = global;
s->url = talloc_strdup(s, url);
s->path = talloc_strdup(s, path);
@@ -335,7 +337,8 @@ static int open_internal(const stream_info_t *sinfo, struct stream *underlying,
return STREAM_OK;
}
-struct stream *stream_create(const char *url, int flags, struct mpv_global *global)
+struct stream *stream_create(const char *url, int flags,
+ struct mp_cancel *c, struct mpv_global *global)
{
struct mp_log *log = mp_log_new(NULL, global->log, "!stream");
struct stream *s = NULL;
@@ -344,7 +347,7 @@ struct stream *stream_create(const char *url, int flags, struct mpv_global *glob
// Open stream proper
bool unsafe = false;
for (int i = 0; stream_list[i]; i++) {
- int r = open_internal(stream_list[i], NULL, url, flags, global, &s);
+ int r = open_internal(stream_list[i], NULL, url, flags, c, global, &s);
if (r == STREAM_OK)
break;
if (r == STREAM_NO_MATCH || r == STREAM_UNSUPPORTED)
@@ -375,7 +378,7 @@ struct stream *stream_create(const char *url, int flags, struct mpv_global *glob
for (;;) {
struct stream *new = NULL;
for (int i = 0; stream_list[i]; i++) {
- int r = open_internal(stream_list[i], s, s->url, flags, global, &new);
+ int r = open_internal(stream_list[i], s, s->url, flags, c, global, &new);
if (r == STREAM_OK)
break;
}
@@ -391,12 +394,12 @@ done:
struct stream *stream_open(const char *filename, struct mpv_global *global)
{
- return stream_create(filename, STREAM_READ, global);
+ return stream_create(filename, STREAM_READ, NULL, global);
}
stream_t *open_output_stream(const char *filename, struct mpv_global *global)
{
- return stream_create(filename, STREAM_WRITE, global);
+ return stream_create(filename, STREAM_WRITE, NULL, global);
}
static int stream_reconnect(stream_t *s)
@@ -407,7 +410,7 @@ static int stream_reconnect(stream_t *s)
return 0;
if (!s->seekable)
return 0;
- if (stream_check_interrupt(s))
+ if (mp_cancel_test(s->cancel))
return 0;
int64_t pos = s->pos;
int sleep_ms = 5;
@@ -419,7 +422,7 @@ static int stream_reconnect(stream_t *s)
sleep_ms = MPMIN(sleep_ms * 2, RECONNECT_SLEEP_MAX_MS);
}
- if (stream_check_interrupt(s))
+ if (mp_cancel_test(s->cancel))
return 0;
s->eof = 1;
@@ -766,13 +769,6 @@ void free_stream(stream_t *s)
talloc_free(s);
}
-bool stream_check_interrupt(struct stream *s)
-{
- if (!s->global || !s->global->stream_interrupt_cb)
- return false;
- return s->global->stream_interrupt_cb(s->global->stream_interrupt_cb_ctx);
-}
-
stream_t *open_memory_stream(void *data, int len)
{
assert(len >= 0);
@@ -802,6 +798,7 @@ static stream_t *open_cache(stream_t *orig, const char *name)
cache->streaming = orig->streaming,
cache->is_network = orig->is_network;
cache->opts = orig->opts;
+ cache->cancel = orig->cancel;
cache->global = orig->global;
cache->log = mp_log_new(cache, cache->global->log, name);
@@ -989,6 +986,36 @@ struct bstr stream_read_complete(struct stream *s, void *talloc_ctx,
return (struct bstr){buf, total_read};
}
+struct mp_cancel {
+ atomic_bool triggered;
+};
+
+struct mp_cancel *mp_cancel_new(void *talloc_ctx)
+{
+ struct mp_cancel *c = talloc_ptrtype(talloc_ctx, c);
+ *c = (struct mp_cancel){.triggered = ATOMIC_VAR_INIT(false)};
+ return c;
+}
+
+// Request abort.
+void mp_cancel_trigger(struct mp_cancel *c)
+{
+ c->triggered = true;
+}
+
+// Restore original state. (Allows reusing a mp_cancel.)
+void mp_cancel_reset(struct mp_cancel *c)
+{
+ c->triggered = false;
+}
+
+// Return whether the caller should abort.
+// For convenience, c==NULL is allowed.
+bool mp_cancel_test(struct mp_cancel *c)
+{
+ return c ? c->triggered : false;
+}
+
void stream_print_proto_list(struct mp_log *log)
{
int count = 0;
diff --git a/stream/stream.h b/stream/stream.h
index 20c2e03e6a..0f028e330d 100644
--- a/stream/stream.h
+++ b/stream/stream.h
@@ -189,6 +189,8 @@ typedef struct stream {
struct MPOpts *opts;
struct mpv_global *global;
+ struct mp_cancel *cancel; // cancellation notification
+
FILE *capture_file;
char *capture_filename;
@@ -248,16 +250,20 @@ struct bstr stream_read_complete(struct stream *s, void *talloc_ctx,
int max_size);
int stream_control(stream_t *s, int cmd, void *arg);
void free_stream(stream_t *s);
-struct stream *stream_create(const char *url, int flags, struct mpv_global *global);
+struct stream *stream_create(const char *url, int flags,
+ struct mp_cancel *c, struct mpv_global *global);
struct stream *stream_open(const char *filename, struct mpv_global *global);
stream_t *open_output_stream(const char *filename, struct mpv_global *global);
stream_t *open_memory_stream(void *data, int len);
-bool stream_check_interrupt(struct stream *s);
-
void mp_url_unescape_inplace(char *buf);
char *mp_url_escape(void *talloc_ctx, const char *s, const char *ok);
+struct mp_cancel *mp_cancel_new(void *talloc_ctx);
+void mp_cancel_trigger(struct mp_cancel *c);
+bool mp_cancel_test(struct mp_cancel *c);
+void mp_cancel_reset(struct mp_cancel *c);
+
// stream_file.c
char *mp_file_url_to_filename(void *talloc_ctx, bstr url);
diff --git a/stream/stream_lavf.c b/stream/stream_lavf.c
index c994346290..3d1c6e0062 100644
--- a/stream/stream_lavf.c
+++ b/stream/stream_lavf.c
@@ -138,7 +138,7 @@ static int control(stream_t *s, int cmd, void *arg)
static int interrupt_cb(void *ctx)
{
struct stream *stream = ctx;
- return stream_check_interrupt(stream);
+ return mp_cancel_test(stream->cancel);
}
static const char * const prefix[] = { "lavf://", "ffmpeg://" };
diff --git a/stream/stream_rar.c b/stream/stream_rar.c
index ee76d62909..733ea3f36c 100644
--- a/stream/stream_rar.c
+++ b/stream/stream_rar.c
@@ -96,8 +96,8 @@ static int rar_entry_open(stream_t *stream)
*name++ = '\0';
mp_url_unescape_inplace(base);
- struct stream *rar =
- stream_create(base, STREAM_READ | STREAM_NO_FILTERS, stream->global);
+ struct stream *rar = stream_create(base, STREAM_READ | STREAM_NO_FILTERS,
+ stream->cancel, stream->global);
if (!rar)
return STREAM_ERROR;
@@ -126,6 +126,7 @@ static int rar_entry_open(stream_t *stream)
};
file->current_chunk = &dummy;
file->s = rar; // transfer ownership
+ file->cancel = stream->cancel;
file->global = stream->global;
RarSeek(file, 0);