summaryrefslogtreecommitdiffstats
path: root/stream/stream.c
diff options
context:
space:
mode:
Diffstat (limited to 'stream/stream.c')
-rw-r--r--stream/stream.c59
1 files changed, 43 insertions, 16 deletions
diff --git a/stream/stream.c b/stream/stream.c
index e2f71ae58f..a8162ffcb3 100644
--- a/stream/stream.c
+++ b/stream/stream.c
@@ -32,6 +32,7 @@
#include <libavutil/common.h>
#include "osdep/mpbswap.h"
+#include "osdep/atomics.h"
#include "talloc.h"
@@ -252,8 +253,8 @@ static const char *match_proto(const char *url, const char *proto)
}
static int open_internal(const stream_info_t *sinfo, struct stream *underlying,
- const char *url, int flags, struct mpv_global *global,
- struct stream **ret)
+ const char *url, int flags, struct mp_cancel *c,
+ struct mpv_global *global, struct stream **ret)
{
if (sinfo->stream_filter != !!underlying)
return STREAM_NO_MATCH;
@@ -281,6 +282,7 @@ static int open_internal(const stream_info_t *sinfo, struct stream *underlying,
s->log = mp_log_new(s, global->log, sinfo->name);
s->info = sinfo;
s->opts = global->opts;
+ s->cancel = c;
s->global = global;
s->url = talloc_strdup(s, url);
s->path = talloc_strdup(s, path);
@@ -335,7 +337,8 @@ static int open_internal(const stream_info_t *sinfo, struct stream *underlying,
return STREAM_OK;
}
-struct stream *stream_create(const char *url, int flags, struct mpv_global *global)
+struct stream *stream_create(const char *url, int flags,
+ struct mp_cancel *c, struct mpv_global *global)
{
struct mp_log *log = mp_log_new(NULL, global->log, "!stream");
struct stream *s = NULL;
@@ -344,7 +347,7 @@ struct stream *stream_create(const char *url, int flags, struct mpv_global *glob
// Open stream proper
bool unsafe = false;
for (int i = 0; stream_list[i]; i++) {
- int r = open_internal(stream_list[i], NULL, url, flags, global, &s);
+ int r = open_internal(stream_list[i], NULL, url, flags, c, global, &s);
if (r == STREAM_OK)
break;
if (r == STREAM_NO_MATCH || r == STREAM_UNSUPPORTED)
@@ -375,7 +378,7 @@ struct stream *stream_create(const char *url, int flags, struct mpv_global *glob
for (;;) {
struct stream *new = NULL;
for (int i = 0; stream_list[i]; i++) {
- int r = open_internal(stream_list[i], s, s->url, flags, global, &new);
+ int r = open_internal(stream_list[i], s, s->url, flags, c, global, &new);
if (r == STREAM_OK)
break;
}
@@ -391,12 +394,12 @@ done:
struct stream *stream_open(const char *filename, struct mpv_global *global)
{
- return stream_create(filename, STREAM_READ, global);
+ return stream_create(filename, STREAM_READ, NULL, global);
}
stream_t *open_output_stream(const char *filename, struct mpv_global *global)
{
- return stream_create(filename, STREAM_WRITE, global);
+ return stream_create(filename, STREAM_WRITE, NULL, global);
}
static int stream_reconnect(stream_t *s)
@@ -407,7 +410,7 @@ static int stream_reconnect(stream_t *s)
return 0;
if (!s->seekable)
return 0;
- if (stream_check_interrupt(s))
+ if (mp_cancel_test(s->cancel))
return 0;
int64_t pos = s->pos;
int sleep_ms = 5;
@@ -419,7 +422,7 @@ static int stream_reconnect(stream_t *s)
sleep_ms = MPMIN(sleep_ms * 2, RECONNECT_SLEEP_MAX_MS);
}
- if (stream_check_interrupt(s))
+ if (mp_cancel_test(s->cancel))
return 0;
s->eof = 1;
@@ -766,13 +769,6 @@ void free_stream(stream_t *s)
talloc_free(s);
}
-bool stream_check_interrupt(struct stream *s)
-{
- if (!s->global || !s->global->stream_interrupt_cb)
- return false;
- return s->global->stream_interrupt_cb(s->global->stream_interrupt_cb_ctx);
-}
-
stream_t *open_memory_stream(void *data, int len)
{
assert(len >= 0);
@@ -802,6 +798,7 @@ static stream_t *open_cache(stream_t *orig, const char *name)
cache->streaming = orig->streaming,
cache->is_network = orig->is_network;
cache->opts = orig->opts;
+ cache->cancel = orig->cancel;
cache->global = orig->global;
cache->log = mp_log_new(cache, cache->global->log, name);
@@ -989,6 +986,36 @@ struct bstr stream_read_complete(struct stream *s, void *talloc_ctx,
return (struct bstr){buf, total_read};
}
+struct mp_cancel {
+ atomic_bool triggered;
+};
+
+struct mp_cancel *mp_cancel_new(void *talloc_ctx)
+{
+ struct mp_cancel *c = talloc_ptrtype(talloc_ctx, c);
+ *c = (struct mp_cancel){.triggered = ATOMIC_VAR_INIT(false)};
+ return c;
+}
+
+// Request abort.
+void mp_cancel_trigger(struct mp_cancel *c)
+{
+ c->triggered = true;
+}
+
+// Restore original state. (Allows reusing a mp_cancel.)
+void mp_cancel_reset(struct mp_cancel *c)
+{
+ c->triggered = false;
+}
+
+// Return whether the caller should abort.
+// For convenience, c==NULL is allowed.
+bool mp_cancel_test(struct mp_cancel *c)
+{
+ return c ? c->triggered : false;
+}
+
void stream_print_proto_list(struct mp_log *log)
{
int count = 0;