diff options
Diffstat (limited to 'stream/stream.c')
-rw-r--r-- | stream/stream.c | 59 |
1 files changed, 43 insertions, 16 deletions
diff --git a/stream/stream.c b/stream/stream.c index e2f71ae58f..a8162ffcb3 100644 --- a/stream/stream.c +++ b/stream/stream.c @@ -32,6 +32,7 @@ #include <libavutil/common.h> #include "osdep/mpbswap.h" +#include "osdep/atomics.h" #include "talloc.h" @@ -252,8 +253,8 @@ static const char *match_proto(const char *url, const char *proto) } static int open_internal(const stream_info_t *sinfo, struct stream *underlying, - const char *url, int flags, struct mpv_global *global, - struct stream **ret) + const char *url, int flags, struct mp_cancel *c, + struct mpv_global *global, struct stream **ret) { if (sinfo->stream_filter != !!underlying) return STREAM_NO_MATCH; @@ -281,6 +282,7 @@ static int open_internal(const stream_info_t *sinfo, struct stream *underlying, s->log = mp_log_new(s, global->log, sinfo->name); s->info = sinfo; s->opts = global->opts; + s->cancel = c; s->global = global; s->url = talloc_strdup(s, url); s->path = talloc_strdup(s, path); @@ -335,7 +337,8 @@ static int open_internal(const stream_info_t *sinfo, struct stream *underlying, return STREAM_OK; } -struct stream *stream_create(const char *url, int flags, struct mpv_global *global) +struct stream *stream_create(const char *url, int flags, + struct mp_cancel *c, struct mpv_global *global) { struct mp_log *log = mp_log_new(NULL, global->log, "!stream"); struct stream *s = NULL; @@ -344,7 +347,7 @@ struct stream *stream_create(const char *url, int flags, struct mpv_global *glob // Open stream proper bool unsafe = false; for (int i = 0; stream_list[i]; i++) { - int r = open_internal(stream_list[i], NULL, url, flags, global, &s); + int r = open_internal(stream_list[i], NULL, url, flags, c, global, &s); if (r == STREAM_OK) break; if (r == STREAM_NO_MATCH || r == STREAM_UNSUPPORTED) @@ -375,7 +378,7 @@ struct stream *stream_create(const char *url, int flags, struct mpv_global *glob for (;;) { struct stream *new = NULL; for (int i = 0; stream_list[i]; i++) { - int r = open_internal(stream_list[i], s, s->url, flags, global, &new); + int r = open_internal(stream_list[i], s, s->url, flags, c, global, &new); if (r == STREAM_OK) break; } @@ -391,12 +394,12 @@ done: struct stream *stream_open(const char *filename, struct mpv_global *global) { - return stream_create(filename, STREAM_READ, global); + return stream_create(filename, STREAM_READ, NULL, global); } stream_t *open_output_stream(const char *filename, struct mpv_global *global) { - return stream_create(filename, STREAM_WRITE, global); + return stream_create(filename, STREAM_WRITE, NULL, global); } static int stream_reconnect(stream_t *s) @@ -407,7 +410,7 @@ static int stream_reconnect(stream_t *s) return 0; if (!s->seekable) return 0; - if (stream_check_interrupt(s)) + if (mp_cancel_test(s->cancel)) return 0; int64_t pos = s->pos; int sleep_ms = 5; @@ -419,7 +422,7 @@ static int stream_reconnect(stream_t *s) sleep_ms = MPMIN(sleep_ms * 2, RECONNECT_SLEEP_MAX_MS); } - if (stream_check_interrupt(s)) + if (mp_cancel_test(s->cancel)) return 0; s->eof = 1; @@ -766,13 +769,6 @@ void free_stream(stream_t *s) talloc_free(s); } -bool stream_check_interrupt(struct stream *s) -{ - if (!s->global || !s->global->stream_interrupt_cb) - return false; - return s->global->stream_interrupt_cb(s->global->stream_interrupt_cb_ctx); -} - stream_t *open_memory_stream(void *data, int len) { assert(len >= 0); @@ -802,6 +798,7 @@ static stream_t *open_cache(stream_t *orig, const char *name) cache->streaming = orig->streaming, cache->is_network = orig->is_network; cache->opts = orig->opts; + cache->cancel = orig->cancel; cache->global = orig->global; cache->log = mp_log_new(cache, cache->global->log, name); @@ -989,6 +986,36 @@ struct bstr stream_read_complete(struct stream *s, void *talloc_ctx, return (struct bstr){buf, total_read}; } +struct mp_cancel { + atomic_bool triggered; +}; + +struct mp_cancel *mp_cancel_new(void *talloc_ctx) +{ + struct mp_cancel *c = talloc_ptrtype(talloc_ctx, c); + *c = (struct mp_cancel){.triggered = ATOMIC_VAR_INIT(false)}; + return c; +} + +// Request abort. +void mp_cancel_trigger(struct mp_cancel *c) +{ + c->triggered = true; +} + +// Restore original state. (Allows reusing a mp_cancel.) +void mp_cancel_reset(struct mp_cancel *c) +{ + c->triggered = false; +} + +// Return whether the caller should abort. +// For convenience, c==NULL is allowed. +bool mp_cancel_test(struct mp_cancel *c) +{ + return c ? c->triggered : false; +} + void stream_print_proto_list(struct mp_log *log) { int count = 0; |