summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorwm4 <wm4@nowhere>2020-02-08 01:02:08 +0100
committerwm4 <wm4@nowhere>2020-02-08 01:02:08 +0100
commitdf82b5a63f16b45e3a765f86264f1ff875887250 (patch)
treec1ca288ec0f7d68f937bcaef2030088c79d8cbfd
parentaf021a289169ae215ab8443634ff26fd1c79ad36 (diff)
downloadmpv-df82b5a63f16b45e3a765f86264f1ff875887250.tar.bz2
mpv-df82b5a63f16b45e3a765f86264f1ff875887250.tar.xz
absurd buffering test
-rw-r--r--stream/stream_file.c98
1 files changed, 95 insertions, 3 deletions
diff --git a/stream/stream_file.c b/stream/stream_file.c
index a79ef0e913..b2c4d1d833 100644
--- a/stream/stream_file.c
+++ b/stream/stream_file.c
@@ -26,6 +26,8 @@
#include <unistd.h>
#include <errno.h>
+#include <pthread.h>
+
#ifndef __MINGW32__
#include <poll.h>
#endif
@@ -35,6 +37,7 @@
#include "common/common.h"
#include "common/msg.h"
#include "misc/thread_tools.h"
+#include "misc/ring.h"
#include "stream.h"
#include "options/m_option.h"
#include "options/path.h"
@@ -66,24 +69,66 @@ struct priv {
bool appending;
int64_t orig_size;
struct mp_cancel *cancel;
+ struct mp_ring *ring;
+ pthread_t thread;
+ pthread_mutex_t lock;
+ pthread_cond_t wakeup;
+ bool read_ok, in_io, eof, terminate;
+ int64_t size;
};
// Total timeout = RETRY_TIMEOUT * MAX_RETRIES
#define RETRY_TIMEOUT 0.2
#define MAX_RETRIES 10
+static void stop_io(stream_t *s)
+{
+ struct priv *p = s->priv;
+
+ pthread_mutex_lock(&p->lock);
+ p->read_ok = false;
+ while (p->in_io)
+ pthread_cond_wait(&p->wakeup, &p->lock);
+ pthread_mutex_unlock(&p->lock);
+}
+
static int64_t get_size(stream_t *s)
{
struct priv *p = s->priv;
- off_t size = lseek(p->fd, 0, SEEK_END);
- lseek(p->fd, s->pos, SEEK_SET);
- return size == (off_t)-1 ? -1 : size;
+ if (!p->size) {
+ stop_io(s);
+ off_t size = lseek(p->fd, 0, SEEK_END);
+ lseek(p->fd, s->pos, SEEK_SET);
+ p->size = size == (off_t)-1 ? -1 : size;
+ }
+ return p->size;
}
static int fill_buffer(stream_t *s, void *buffer, int max_len)
{
struct priv *p = s->priv;
+ int res = 0;
+ pthread_mutex_lock(&p->lock);
+ while (1) {
+ int r = mp_ring_buffered(p->ring);
+ if (r || p->eof) {
+ res = mp_ring_read(p->ring, buffer, max_len);
+ break;
+ }
+ p->read_ok = true;
+ pthread_cond_broadcast(&p->wakeup);
+ pthread_cond_wait(&p->wakeup, &p->lock);
+ }
+ pthread_mutex_unlock(&p->lock);
+
+ return res;
+}
+
+static int real_fill_buffer(stream_t *s, void *buffer, int max_len)
+{
+ struct priv *p = s->priv;
+
#ifndef __MINGW32__
if (p->use_poll) {
int c = mp_cancel_get_fd(p->cancel);
@@ -120,21 +165,61 @@ static int fill_buffer(stream_t *s, void *buffer, int max_len)
return 0;
}
+static void *read_thread(void *arg)
+{
+ stream_t *s = arg;
+ struct priv *p = s->priv;
+
+ pthread_mutex_lock(&p->lock);
+ while (!p->terminate) {
+ int a = mp_ring_available(p->ring);
+ if (p->read_ok && a >= 64 * 1024) {
+ p->in_io = true;
+ pthread_mutex_unlock(&p->lock);
+ uint8_t buf[64 * 1024];
+ int r = real_fill_buffer(s, buf, sizeof(buf));
+ r = MPMAX(r, 0);
+ pthread_mutex_lock(&p->lock);
+ p->in_io = false;
+ mp_ring_write(p->ring, buf, r);
+ p->eof = !r;
+ pthread_cond_broadcast(&p->wakeup);
+ } else {
+ pthread_cond_wait(&p->wakeup, &p->lock);
+ }
+ }
+ pthread_mutex_unlock(&p->lock);
+
+ return NULL;
+}
+
static int write_buffer(stream_t *s, void *buffer, int len)
{
struct priv *p = s->priv;
+ stop_io(s);
return write(p->fd, buffer, len);
}
static int seek(stream_t *s, int64_t newpos)
{
struct priv *p = s->priv;
+ stop_io(s);
+ pthread_mutex_lock(&p->lock);
+ mp_ring_reset(p->ring);
+ p->eof = false;
+ pthread_mutex_unlock(&p->lock);
return lseek(p->fd, newpos, SEEK_SET) != (off_t)-1;
}
static void s_close(stream_t *s)
{
struct priv *p = s->priv;
+ stop_io(s);
+ pthread_mutex_lock(&p->lock);
+ p->terminate = true;
+ pthread_cond_broadcast(&p->wakeup);
+ pthread_mutex_unlock(&p->lock);
+ pthread_join(p->thread, NULL);
if (p->close)
close(p->fd);
}
@@ -340,6 +425,13 @@ static int open_f(stream_t *stream)
if (stream->cancel)
mp_cancel_set_parent(p->cancel, stream->cancel);
+ pthread_mutex_init(&p->lock, NULL);
+ pthread_cond_init(&p->wakeup, NULL);
+ p->ring = mp_ring_new(stream, 2 * 64 * 1024);
+ int r = pthread_create(&p->thread, NULL, read_thread, stream);
+ if (r)
+ return -1;
+
return STREAM_OK;
}