summaryrefslogtreecommitdiffstats
path: root/misc/thread_pool.c
blob: dddfad67349c3298565a1e0c030b7ca8e7654b3d (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
/*
 * This file is part of mpv.
 *
 * mpv is free software; you can redistribute it and/or
 * modify it under the terms of the GNU Lesser General Public
 * License as published by the Free Software Foundation; either
 * version 2.1 of the License, or (at your option) any later version.
 *
 * mpv is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU Lesser General Public License for more details.
 *
 * You should have received a copy of the GNU Lesser General Public
 * License along with mpv.  If not, see <http://www.gnu.org/licenses/>.
 */

#include <pthread.h>

#include "common/common.h"

#include "thread_pool.h"

struct work {
    void (*fn)(void *ctx);
    void *fn_ctx;
};

struct mp_thread_pool {
    pthread_t *threads;
    int num_threads;

    pthread_mutex_t lock;
    pthread_cond_t wakeup;

    // --- the following fields are protected by lock
    bool terminate;
    struct work *work;
    int num_work;
};

static void *worker_thread(void *arg)
{
    struct mp_thread_pool *pool = arg;

    pthread_mutex_lock(&pool->lock);
    while (1) {
        while (!pool->num_work && !pool->terminate)
            pthread_cond_wait(&pool->wakeup, &pool->lock);

        if (!pool->num_work && pool->terminate)
            break;

        assert(pool->num_work > 0);
        struct work work = pool->work[pool->num_work - 1];
        pool->num_work -= 1;

        pthread_mutex_unlock(&pool->lock);
        work.fn(work.fn_ctx);
        pthread_mutex_lock(&pool->lock);
    }
    assert(pool->num_work == 0);
    pthread_mutex_unlock(&pool->lock);

    return NULL;
}

static void thread_pool_dtor(void *ctx)
{
    struct mp_thread_pool *pool = ctx;

    pthread_mutex_lock(&pool->lock);
    pool->terminate = true;
    pthread_cond_broadcast(&pool->wakeup);
    pthread_mutex_unlock(&pool->lock);

    for (int n = 0; n < pool->num_threads; n++)
        pthread_join(pool->threads[n], NULL);

    assert(pool->num_work == 0);
    pthread_cond_destroy(&pool->wakeup);
    pthread_mutex_destroy(&pool->lock);
}

// Create a thread pool with the given number of worker threads. This can return
// NULL if the worker threads could not be created. The thread pool can be
// destroyed with talloc_free(pool), or indirectly with talloc_free(ta_parent).
// If there are still work items on freeing, it will block until all work items
// are done, and the threads terminate.
struct mp_thread_pool *mp_thread_pool_create(void *ta_parent, int threads)
{
    assert(threads > 0);

    struct mp_thread_pool *pool = talloc_zero(ta_parent, struct mp_thread_pool);
    talloc_set_destructor(pool, thread_pool_dtor);

    pthread_mutex_init(&pool->lock, NULL);
    pthread_cond_init(&pool->wakeup, NULL);

    for (int n = 0; n < threads; n++) {
        pthread_t thread;
        if (pthread_create(&thread, NULL, worker_thread, pool)) {
            talloc_free(pool);
            return NULL;
        }
        MP_TARRAY_APPEND(pool, pool->threads, pool->num_threads, thread);
    }

    return pool;
}

// Queue a function to be run on a worker thread: fn(fn_ctx)
// If no worker thread is currently available, it's appended to a list in memory
// with unbounded size. This function always returns immediately.
// Concurrent queue calls are allowed, as long as it does not overlap with
// pool destruction.
void mp_thread_pool_queue(struct mp_thread_pool *pool, void (*fn)(void *ctx),
                          void *fn_ctx)
{
    pthread_mutex_lock(&pool->lock);
    struct work work = {fn, fn_ctx};
    MP_TARRAY_INSERT_AT(pool, pool->work, pool->num_work, 0, work);
    pthread_cond_signal(&pool->wakeup);
    pthread_mutex_unlock(&pool->lock);
}