From 67132750a65946a936b0f8fea2099d0a9f7ca889 Mon Sep 17 00:00:00 2001 From: James Ross-Gowan Date: Mon, 17 Nov 2014 16:33:39 +1100 Subject: lua: subprocess: use overlapped I/O on Windows Instead of threads, use overlapped (asynchronous) I/O to read from both stdout and stderr. Like in d0643fa, stdout and stderr could be closed at different times, so a sparse_wait function is added to wrap WaitForMultipleObjects and skip NULL handles. --- player/lua.c | 189 +++++++++++++++++++++++++++++++++++++++++------------------ 1 file changed, 133 insertions(+), 56 deletions(-) diff --git a/player/lua.c b/player/lua.c index 8eb2184fc5..552c45badf 100644 --- a/player/lua.c +++ b/player/lua.c @@ -1173,12 +1173,7 @@ typedef void (*read_cb)(void *ctx, char *data, size_t size); #ifdef __MINGW32__ #include #include "osdep/io.h" - -struct subprocess_ctx { - HANDLE stderr_read; - void *cb_ctx; - read_cb on_stderr; -}; +#include "osdep/atomics.h" static void write_arg(bstr *cmdline, char *arg) { @@ -1248,38 +1243,99 @@ static wchar_t *write_cmdline(void *ctx, char **argv) return wcmdline; } -static void *stderr_routine(void *arg) +static int create_overlapped_pipe(HANDLE *read, HANDLE *write) { - struct subprocess_ctx *ctx = arg; + static atomic_ulong counter = ATOMIC_VAR_INIT(0); - // Read from stderr until it's closed on process exit - char buf[4096]; - DWORD r; - while (ReadFile(ctx->stderr_read, buf, 4096, &r, NULL)) - ctx->on_stderr(ctx->cb_ctx, buf, r); + // Generate pipe name + unsigned long id = atomic_fetch_add(&counter, 1); + unsigned pid = GetCurrentProcessId(); + wchar_t buf[36]; + swprintf(buf, sizeof(buf), L"\\\\?\\pipe\\mpv-anon-%08x-%08lx", pid, id); - return NULL; + // The function for creating anonymous pipes (CreatePipe) can't create + // overlapped pipes, so instead, use a named pipe with a unique name + *read = CreateNamedPipeW(buf, PIPE_ACCESS_INBOUND | + FILE_FLAG_FIRST_PIPE_INSTANCE | FILE_FLAG_OVERLAPPED, + PIPE_TYPE_BYTE | PIPE_WAIT | PIPE_REJECT_REMOTE_CLIENTS, + 1, 0, 4096, 0, NULL); + if (!*read) + goto error; + + // Open the write end of the pipe as a synchronous handle + *write = CreateFileW(buf, GENERIC_WRITE, 0, NULL, OPEN_EXISTING, + FILE_ATTRIBUTE_NORMAL, NULL); + if (*write == INVALID_HANDLE_VALUE) + goto error; + + return 0; +error: + *read = *write = INVALID_HANDLE_VALUE; + return -1; +} + +// Helper method similar to sparse_poll, skips NULL handles +static int sparse_wait(HANDLE *handles, unsigned num_handles) +{ + unsigned w_num_handles = 0; + HANDLE w_handles[num_handles]; + int map[num_handles]; + + for (unsigned i = 0; i < num_handles; i++) { + if (!handles[i]) + continue; + + w_handles[w_num_handles] = handles[i]; + map[w_num_handles] = i; + w_num_handles++; + } + + if (w_num_handles == 0) + return -1; + DWORD i = WaitForMultipleObjects(w_num_handles, w_handles, FALSE, INFINITE); + i -= WAIT_OBJECT_0; + + if (i >= w_num_handles) + return -1; + return map[i]; +} + +// Wrapper for ReadFile that treats ERROR_IO_PENDING as success +static int async_read(HANDLE file, void *buf, unsigned size, OVERLAPPED* ol) +{ + if (!ReadFile(file, buf, size, NULL, ol)) + return (GetLastError() == ERROR_IO_PENDING) ? 0 : -1; + return 0; } static int subprocess(char **args, struct mp_cancel *cancel, void *ctx, read_cb on_stdout, read_cb on_stderr, char **error) { wchar_t *tmp = talloc_new(NULL); - HANDLE stdout_read = NULL, stdout_write = NULL; - HANDLE stderr_read = NULL, stderr_write = NULL; int status = -1; + struct { + HANDLE read; + HANDLE write; + OVERLAPPED ol; + char buf[4096]; + read_cb read_cb; + } pipes[2] = { + { .read_cb = on_stdout }, + { .read_cb = on_stderr }, + }; // If the function exits before CreateProcess, there was an init error *error = "init"; - if (!CreatePipe(&stdout_read, &stdout_write, NULL, 0)) - goto done; - if (!CreatePipe(&stderr_read, &stderr_write, NULL, 0)) - goto done; - if (!SetHandleInformation(stdout_write, HANDLE_FLAG_INHERIT, 1)) - goto done; - if (!SetHandleInformation(stderr_write, HANDLE_FLAG_INHERIT, 1)) - goto done; + for (int i = 0; i < 2; i++) { + pipes[i].ol.hEvent = CreateEventW(NULL, TRUE, FALSE, NULL); + if (!pipes[i].ol.hEvent) + goto done; + if (create_overlapped_pipe(&pipes[i].read, &pipes[i].write)) + goto done; + if (!SetHandleInformation(pipes[i].write, HANDLE_FLAG_INHERIT, 1)) + goto done; + } // Convert the args array to a UTF-16 Windows command-line string wchar_t *cmdline = write_cmdline(tmp, args); @@ -1289,8 +1345,8 @@ static int subprocess(char **args, struct mp_cancel *cancel, void *ctx, .cb = sizeof(si), .dwFlags = STARTF_USESTDHANDLES, .hStdInput = NULL, - .hStdOutput = stdout_write, - .hStdError = stderr_write, + .hStdOutput = pipes[0].write, + .hStdError = pipes[1].write, }; if (!CreateProcessW(NULL, cmdline, NULL, NULL, TRUE, @@ -1303,42 +1359,63 @@ static int subprocess(char **args, struct mp_cancel *cancel, void *ctx, // Init is finished *error = NULL; - // Close our copy of the write end of the pipes - CloseHandle(stdout_write); - stdout_write = NULL; - CloseHandle(stderr_write); - stderr_write = NULL; - - // Create a thread to read stderr output - pthread_t stderr_thread; - struct subprocess_ctx sctx = { - .stderr_read = stderr_read, - .cb_ctx = ctx, - .on_stderr = on_stderr, - }; - if (pthread_create(&stderr_thread, NULL, stderr_routine, &sctx)) - goto done; + // List of handles to watch with sparse_wait + HANDLE handles[] = { pipes[0].ol.hEvent, pipes[1].ol.hEvent, pi.hProcess }; - // Read from stdout until it's closed on process exit - char buf[4096]; - DWORD r; - while (ReadFile(stdout_read, buf, 4096, &r, NULL)) - on_stdout(ctx, buf, r); + for (int i = 0; i < 2; i++) { + // Close our copy of the write end of the pipes + CloseHandle(pipes[i].write); + pipes[i].write = NULL; - pthread_join(stderr_thread, NULL); + // Do the first read operation on each pipe + if (async_read(pipes[i].read, pipes[i].buf, 4096, &pipes[i].ol)) { + CloseHandle(pipes[i].read); + handles[i] = pipes[i].read = NULL; + } + } - // Get process exit code + DWORD r; DWORD exit_code; - WaitForSingleObject(pi.hProcess, INFINITE); - GetExitCodeProcess(pi.hProcess, &exit_code); - status = exit_code; + while (pipes[0].read || pipes[1].read || pi.hProcess) { + int i = sparse_wait(handles, MP_ARRAY_SIZE(handles)); + switch (i) { + case 0: + case 1: + // Complete the read operation on the pipe + if (!GetOverlappedResult(pipes[i].read, &pipes[i].ol, &r, TRUE)) { + CloseHandle(pipes[i].read); + handles[i] = pipes[i].read = NULL; + break; + } + + pipes[i].read_cb(ctx, pipes[i].buf, r); + + // Begin the next read operation on the pipe + if (async_read(pipes[i].read, pipes[i].buf, 4096, &pipes[i].ol)) { + CloseHandle(pipes[i].read); + handles[i] = pipes[i].read = NULL; + } + + break; + case 2: + GetExitCodeProcess(pi.hProcess, &exit_code); + status = exit_code; + + CloseHandle(pi.hProcess); + handles[i] = pi.hProcess = NULL; + break; + default: + goto done; + } + } done: - CloseHandle(stdout_read); - CloseHandle(stdout_write); - CloseHandle(stderr_read); - CloseHandle(stderr_write); - CloseHandle(pi.hProcess); + for (int i = 0; i < 2; i++) { + if (pipes[i].ol.hEvent) CloseHandle(pipes[i].ol.hEvent); + if (pipes[i].read) CloseHandle(pipes[i].read); + if (pipes[i].write) CloseHandle(pipes[i].write); + } + if (pi.hProcess) CloseHandle(pi.hProcess); talloc_free(tmp); return status; } -- cgit v1.2.3