From df82b5a63f16b45e3a765f86264f1ff875887250 Mon Sep 17 00:00:00 2001 From: wm4 Date: Sat, 8 Feb 2020 01:02:08 +0100 Subject: absurd buffering test --- stream/stream_file.c | 98 ++++++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 95 insertions(+), 3 deletions(-) diff --git a/stream/stream_file.c b/stream/stream_file.c index a79ef0e913..b2c4d1d833 100644 --- a/stream/stream_file.c +++ b/stream/stream_file.c @@ -26,6 +26,8 @@ #include #include +#include + #ifndef __MINGW32__ #include #endif @@ -35,6 +37,7 @@ #include "common/common.h" #include "common/msg.h" #include "misc/thread_tools.h" +#include "misc/ring.h" #include "stream.h" #include "options/m_option.h" #include "options/path.h" @@ -66,21 +69,63 @@ struct priv { bool appending; int64_t orig_size; struct mp_cancel *cancel; + struct mp_ring *ring; + pthread_t thread; + pthread_mutex_t lock; + pthread_cond_t wakeup; + bool read_ok, in_io, eof, terminate; + int64_t size; }; // Total timeout = RETRY_TIMEOUT * MAX_RETRIES #define RETRY_TIMEOUT 0.2 #define MAX_RETRIES 10 +static void stop_io(stream_t *s) +{ + struct priv *p = s->priv; + + pthread_mutex_lock(&p->lock); + p->read_ok = false; + while (p->in_io) + pthread_cond_wait(&p->wakeup, &p->lock); + pthread_mutex_unlock(&p->lock); +} + static int64_t get_size(stream_t *s) { struct priv *p = s->priv; - off_t size = lseek(p->fd, 0, SEEK_END); - lseek(p->fd, s->pos, SEEK_SET); - return size == (off_t)-1 ? -1 : size; + if (!p->size) { + stop_io(s); + off_t size = lseek(p->fd, 0, SEEK_END); + lseek(p->fd, s->pos, SEEK_SET); + p->size = size == (off_t)-1 ? -1 : size; + } + return p->size; } static int fill_buffer(stream_t *s, void *buffer, int max_len) +{ + struct priv *p = s->priv; + + int res = 0; + pthread_mutex_lock(&p->lock); + while (1) { + int r = mp_ring_buffered(p->ring); + if (r || p->eof) { + res = mp_ring_read(p->ring, buffer, max_len); + break; + } + p->read_ok = true; + pthread_cond_broadcast(&p->wakeup); + pthread_cond_wait(&p->wakeup, &p->lock); + } + pthread_mutex_unlock(&p->lock); + + return res; +} + +static int real_fill_buffer(stream_t *s, void *buffer, int max_len) { struct priv *p = s->priv; @@ -120,21 +165,61 @@ static int fill_buffer(stream_t *s, void *buffer, int max_len) return 0; } +static void *read_thread(void *arg) +{ + stream_t *s = arg; + struct priv *p = s->priv; + + pthread_mutex_lock(&p->lock); + while (!p->terminate) { + int a = mp_ring_available(p->ring); + if (p->read_ok && a >= 64 * 1024) { + p->in_io = true; + pthread_mutex_unlock(&p->lock); + uint8_t buf[64 * 1024]; + int r = real_fill_buffer(s, buf, sizeof(buf)); + r = MPMAX(r, 0); + pthread_mutex_lock(&p->lock); + p->in_io = false; + mp_ring_write(p->ring, buf, r); + p->eof = !r; + pthread_cond_broadcast(&p->wakeup); + } else { + pthread_cond_wait(&p->wakeup, &p->lock); + } + } + pthread_mutex_unlock(&p->lock); + + return NULL; +} + static int write_buffer(stream_t *s, void *buffer, int len) { struct priv *p = s->priv; + stop_io(s); return write(p->fd, buffer, len); } static int seek(stream_t *s, int64_t newpos) { struct priv *p = s->priv; + stop_io(s); + pthread_mutex_lock(&p->lock); + mp_ring_reset(p->ring); + p->eof = false; + pthread_mutex_unlock(&p->lock); return lseek(p->fd, newpos, SEEK_SET) != (off_t)-1; } static void s_close(stream_t *s) { struct priv *p = s->priv; + stop_io(s); + pthread_mutex_lock(&p->lock); + p->terminate = true; + pthread_cond_broadcast(&p->wakeup); + pthread_mutex_unlock(&p->lock); + pthread_join(p->thread, NULL); if (p->close) close(p->fd); } @@ -340,6 +425,13 @@ static int open_f(stream_t *stream) if (stream->cancel) mp_cancel_set_parent(p->cancel, stream->cancel); + pthread_mutex_init(&p->lock, NULL); + pthread_cond_init(&p->wakeup, NULL); + p->ring = mp_ring_new(stream, 2 * 64 * 1024); + int r = pthread_create(&p->thread, NULL, read_thread, stream); + if (r) + return -1; + return STREAM_OK; } -- cgit v1.2.3