summaryrefslogtreecommitdiffstats
path: root/stream
diff options
context:
space:
mode:
authorwm4 <wm4@nowhere>2014-09-13 14:23:08 +0200
committerwm4 <wm4@nowhere>2014-09-13 16:09:51 +0200
commit2e91d44e2031c299de0d879e2656024d701c27da (patch)
treefdbe4903705bff3d6a21c42f08eb7e4b31e5cf07 /stream
parent2dd819705dcbeb4a20c0d6123082f9662cfa2ae4 (diff)
downloadmpv-2e91d44e2031c299de0d879e2656024d701c27da.tar.bz2
mpv-2e91d44e2031c299de0d879e2656024d701c27da.tar.xz
stream: redo playback abort handling
This mechanism originates from MPlayer's way of dealing with blocking network, but it's still useful. On opening and closing, mpv waits for network synchronously, and also some obscure commands and use-cases can lead to such blocking. In these situations, the stream is asynchronously forced to stop by "interrupting" it. The old design interrupting I/O was a bit broken: polling with a callback, instead of actively interrupting it. Change the direction of this. There is no callback anymore, and the player calls mp_cancel_trigger() to force the stream to return. libavformat (via stream_lavf.c) has the old broken design, and fixing it would require fixing libavformat, which won't happen so quickly. So we have to keep that part. But everything above the stream layer is prepared for a better design, and more sophisticated methods than mp_cancel_test() could be easily introduced. There's still one problem: commands are still run in the central playback loop, which we assume can block on I/O in the worst case. That's not a problem yet, because we simply mark some commands as being able to stop playback of the current file ("quit" etc.), so input.c could abort playback as soon as such a command is queued. But there are also commands abort playback only conditionally, and the logic for that is in the playback core and thus "unreachable". For example, "playlist_next" aborts playback only if there's a next file. We don't want it to always abort playback. As a quite ugly hack, abort playback only if at least 2 abort commands are queued - this pretty much happens only if the core is frozen and doesn't react to input.
Diffstat (limited to 'stream')
-rw-r--r--stream/cache.c4
-rw-r--r--stream/rar.c5
-rw-r--r--stream/rar.h1
-rw-r--r--stream/stream.c59
-rw-r--r--stream/stream.h12
-rw-r--r--stream/stream_lavf.c2
-rw-r--r--stream/stream_rar.c5
7 files changed, 62 insertions, 26 deletions
diff --git a/stream/cache.c b/stream/cache.c
index 81408e098b..a5ac1c2995 100644
--- a/stream/cache.c
+++ b/stream/cache.c
@@ -138,7 +138,7 @@ static int64_t mp_clipi64(int64_t val, int64_t min, int64_t max)
// Returns CACHE_INTERRUPTED if the caller is supposed to abort.
static int cache_wakeup_and_wait(struct priv *s, double *retry_time)
{
- if (stream_check_interrupt(s->cache))
+ if (mp_cancel_test(s->cache->cancel))
return CACHE_INTERRUPTED;
double start = mp_time_sec();
@@ -652,7 +652,7 @@ int stream_cache_init(stream_t *cache, stream_t *stream,
if (min < 1)
return 1;
for (;;) {
- if (stream_check_interrupt(cache))
+ if (mp_cancel_test(cache->cancel))
return -1;
int64_t fill;
int idle;
diff --git a/stream/rar.c b/stream/rar.c
index 31211c777b..12d3b0bfd7 100644
--- a/stream/rar.c
+++ b/stream/rar.c
@@ -386,7 +386,8 @@ int RarParse(struct stream *s, int *count, rar_file_t ***file)
if (!volume_mrl)
goto done;
- vol = stream_create(volume_mrl, STREAM_READ | STREAM_NO_FILTERS, s->global);
+ vol = stream_create(volume_mrl, STREAM_READ | STREAM_NO_FILTERS,
+ s->cancel, s->global);
if (!vol)
goto done;
@@ -423,7 +424,7 @@ int RarSeek(rar_file_t *file, uint64_t position)
free_stream(file->s);
file->s = stream_create(file->current_chunk->mrl,
STREAM_READ | STREAM_NO_FILTERS,
- file->global);
+ file->cancel, file->global);
}
return file->s ? stream_seek(file->s, offset) : 0;
}
diff --git a/stream/rar.h b/stream/rar.h
index c9003b8a1b..f7f80a8177 100644
--- a/stream/rar.h
+++ b/stream/rar.h
@@ -45,6 +45,7 @@ typedef struct {
// When actually reading the data
struct mpv_global *global;
+ struct mp_cancel *cancel;
uint64_t i_pos;
stream_t *s;
rar_file_chunk_t *current_chunk;
diff --git a/stream/stream.c b/stream/stream.c
index e2f71ae58f..a8162ffcb3 100644
--- a/stream/stream.c
+++ b/stream/stream.c
@@ -32,6 +32,7 @@
#include <libavutil/common.h>
#include "osdep/mpbswap.h"
+#include "osdep/atomics.h"
#include "talloc.h"
@@ -252,8 +253,8 @@ static const char *match_proto(const char *url, const char *proto)
}
static int open_internal(const stream_info_t *sinfo, struct stream *underlying,
- const char *url, int flags, struct mpv_global *global,
- struct stream **ret)
+ const char *url, int flags, struct mp_cancel *c,
+ struct mpv_global *global, struct stream **ret)
{
if (sinfo->stream_filter != !!underlying)
return STREAM_NO_MATCH;
@@ -281,6 +282,7 @@ static int open_internal(const stream_info_t *sinfo, struct stream *underlying,
s->log = mp_log_new(s, global->log, sinfo->name);
s->info = sinfo;
s->opts = global->opts;
+ s->cancel = c;
s->global = global;
s->url = talloc_strdup(s, url);
s->path = talloc_strdup(s, path);
@@ -335,7 +337,8 @@ static int open_internal(const stream_info_t *sinfo, struct stream *underlying,
return STREAM_OK;
}
-struct stream *stream_create(const char *url, int flags, struct mpv_global *global)
+struct stream *stream_create(const char *url, int flags,
+ struct mp_cancel *c, struct mpv_global *global)
{
struct mp_log *log = mp_log_new(NULL, global->log, "!stream");
struct stream *s = NULL;
@@ -344,7 +347,7 @@ struct stream *stream_create(const char *url, int flags, struct mpv_global *glob
// Open stream proper
bool unsafe = false;
for (int i = 0; stream_list[i]; i++) {
- int r = open_internal(stream_list[i], NULL, url, flags, global, &s);
+ int r = open_internal(stream_list[i], NULL, url, flags, c, global, &s);
if (r == STREAM_OK)
break;
if (r == STREAM_NO_MATCH || r == STREAM_UNSUPPORTED)
@@ -375,7 +378,7 @@ struct stream *stream_create(const char *url, int flags, struct mpv_global *glob
for (;;) {
struct stream *new = NULL;
for (int i = 0; stream_list[i]; i++) {
- int r = open_internal(stream_list[i], s, s->url, flags, global, &new);
+ int r = open_internal(stream_list[i], s, s->url, flags, c, global, &new);
if (r == STREAM_OK)
break;
}
@@ -391,12 +394,12 @@ done:
struct stream *stream_open(const char *filename, struct mpv_global *global)
{
- return stream_create(filename, STREAM_READ, global);
+ return stream_create(filename, STREAM_READ, NULL, global);
}
stream_t *open_output_stream(const char *filename, struct mpv_global *global)
{
- return stream_create(filename, STREAM_WRITE, global);
+ return stream_create(filename, STREAM_WRITE, NULL, global);
}
static int stream_reconnect(stream_t *s)
@@ -407,7 +410,7 @@ static int stream_reconnect(stream_t *s)
return 0;
if (!s->seekable)
return 0;
- if (stream_check_interrupt(s))
+ if (mp_cancel_test(s->cancel))
return 0;
int64_t pos = s->pos;
int sleep_ms = 5;
@@ -419,7 +422,7 @@ static int stream_reconnect(stream_t *s)
sleep_ms = MPMIN(sleep_ms * 2, RECONNECT_SLEEP_MAX_MS);
}
- if (stream_check_interrupt(s))
+ if (mp_cancel_test(s->cancel))
return 0;
s->eof = 1;
@@ -766,13 +769,6 @@ void free_stream(stream_t *s)
talloc_free(s);
}
-bool stream_check_interrupt(struct stream *s)
-{
- if (!s->global || !s->global->stream_interrupt_cb)
- return false;
- return s->global->stream_interrupt_cb(s->global->stream_interrupt_cb_ctx);
-}
-
stream_t *open_memory_stream(void *data, int len)
{
assert(len >= 0);
@@ -802,6 +798,7 @@ static stream_t *open_cache(stream_t *orig, const char *name)
cache->streaming = orig->streaming,
cache->is_network = orig->is_network;
cache->opts = orig->opts;
+ cache->cancel = orig->cancel;
cache->global = orig->global;
cache->log = mp_log_new(cache, cache->global->log, name);
@@ -989,6 +986,36 @@ struct bstr stream_read_complete(struct stream *s, void *talloc_ctx,
return (struct bstr){buf, total_read};
}
+struct mp_cancel {
+ atomic_bool triggered;
+};
+
+struct mp_cancel *mp_cancel_new(void *talloc_ctx)
+{
+ struct mp_cancel *c = talloc_ptrtype(talloc_ctx, c);
+ *c = (struct mp_cancel){.triggered = ATOMIC_VAR_INIT(false)};
+ return c;
+}
+
+// Request abort.
+void mp_cancel_trigger(struct mp_cancel *c)
+{
+ c->triggered = true;
+}
+
+// Restore original state. (Allows reusing a mp_cancel.)
+void mp_cancel_reset(struct mp_cancel *c)
+{
+ c->triggered = false;
+}
+
+// Return whether the caller should abort.
+// For convenience, c==NULL is allowed.
+bool mp_cancel_test(struct mp_cancel *c)
+{
+ return c ? c->triggered : false;
+}
+
void stream_print_proto_list(struct mp_log *log)
{
int count = 0;
diff --git a/stream/stream.h b/stream/stream.h
index 20c2e03e6a..0f028e330d 100644
--- a/stream/stream.h
+++ b/stream/stream.h
@@ -189,6 +189,8 @@ typedef struct stream {
struct MPOpts *opts;
struct mpv_global *global;
+ struct mp_cancel *cancel; // cancellation notification
+
FILE *capture_file;
char *capture_filename;
@@ -248,16 +250,20 @@ struct bstr stream_read_complete(struct stream *s, void *talloc_ctx,
int max_size);
int stream_control(stream_t *s, int cmd, void *arg);
void free_stream(stream_t *s);
-struct stream *stream_create(const char *url, int flags, struct mpv_global *global);
+struct stream *stream_create(const char *url, int flags,
+ struct mp_cancel *c, struct mpv_global *global);
struct stream *stream_open(const char *filename, struct mpv_global *global);
stream_t *open_output_stream(const char *filename, struct mpv_global *global);
stream_t *open_memory_stream(void *data, int len);
-bool stream_check_interrupt(struct stream *s);
-
void mp_url_unescape_inplace(char *buf);
char *mp_url_escape(void *talloc_ctx, const char *s, const char *ok);
+struct mp_cancel *mp_cancel_new(void *talloc_ctx);
+void mp_cancel_trigger(struct mp_cancel *c);
+bool mp_cancel_test(struct mp_cancel *c);
+void mp_cancel_reset(struct mp_cancel *c);
+
// stream_file.c
char *mp_file_url_to_filename(void *talloc_ctx, bstr url);
diff --git a/stream/stream_lavf.c b/stream/stream_lavf.c
index c994346290..3d1c6e0062 100644
--- a/stream/stream_lavf.c
+++ b/stream/stream_lavf.c
@@ -138,7 +138,7 @@ static int control(stream_t *s, int cmd, void *arg)
static int interrupt_cb(void *ctx)
{
struct stream *stream = ctx;
- return stream_check_interrupt(stream);
+ return mp_cancel_test(stream->cancel);
}
static const char * const prefix[] = { "lavf://", "ffmpeg://" };
diff --git a/stream/stream_rar.c b/stream/stream_rar.c
index ee76d62909..733ea3f36c 100644
--- a/stream/stream_rar.c
+++ b/stream/stream_rar.c
@@ -96,8 +96,8 @@ static int rar_entry_open(stream_t *stream)
*name++ = '\0';
mp_url_unescape_inplace(base);
- struct stream *rar =
- stream_create(base, STREAM_READ | STREAM_NO_FILTERS, stream->global);
+ struct stream *rar = stream_create(base, STREAM_READ | STREAM_NO_FILTERS,
+ stream->cancel, stream->global);
if (!rar)
return STREAM_ERROR;
@@ -126,6 +126,7 @@ static int rar_entry_open(stream_t *stream)
};
file->current_chunk = &dummy;
file->s = rar; // transfer ownership
+ file->cancel = stream->cancel;
file->global = stream->global;
RarSeek(file, 0);