summaryrefslogtreecommitdiffstats
path: root/misc/thread_pool.c
diff options
context:
space:
mode:
Diffstat (limited to 'misc/thread_pool.c')
-rw-r--r--misc/thread_pool.c78
1 files changed, 39 insertions, 39 deletions
diff --git a/misc/thread_pool.c b/misc/thread_pool.c
index 217c990c19..e20d9d07e6 100644
--- a/misc/thread_pool.c
+++ b/misc/thread_pool.c
@@ -13,8 +13,6 @@
* OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
-#include <pthread.h>
-
#include "common/common.h"
#include "osdep/threads.h"
#include "osdep/timer.h"
@@ -33,12 +31,12 @@ struct work {
struct mp_thread_pool {
int min_threads, max_threads;
- pthread_mutex_t lock;
- pthread_cond_t wakeup;
+ mp_mutex lock;
+ mp_cond wakeup;
// --- the following fields are protected by lock
- pthread_t *threads;
+ mp_thread *threads;
int num_threads;
// Number of threads which have taken up work and are still processing it.
@@ -50,15 +48,15 @@ struct mp_thread_pool {
int num_work;
};
-static void *worker_thread(void *arg)
+static MP_THREAD_VOID worker_thread(void *arg)
{
struct mp_thread_pool *pool = arg;
- mpthread_set_name("worker");
+ mp_thread_set_name("worker");
- pthread_mutex_lock(&pool->lock);
+ mp_mutex_lock(&pool->lock);
- struct timespec ts = {0};
+ int64_t destroy_deadline = 0;
bool got_timeout = false;
while (1) {
struct work work = {0};
@@ -72,25 +70,25 @@ static void *worker_thread(void *arg)
break;
if (pool->num_threads > pool->min_threads) {
- if (!ts.tv_sec && !ts.tv_nsec)
- ts = mp_rel_time_to_timespec(DESTROY_TIMEOUT);
- if (pthread_cond_timedwait(&pool->wakeup, &pool->lock, &ts))
+ if (!destroy_deadline)
+ destroy_deadline = mp_time_ns() + MP_TIME_S_TO_NS(DESTROY_TIMEOUT);
+ if (mp_cond_timedwait_until(&pool->wakeup, &pool->lock, destroy_deadline))
got_timeout = pool->num_threads > pool->min_threads;
} else {
- pthread_cond_wait(&pool->wakeup, &pool->lock);
+ mp_cond_wait(&pool->wakeup, &pool->lock);
}
continue;
}
pool->busy_threads += 1;
- pthread_mutex_unlock(&pool->lock);
+ mp_mutex_unlock(&pool->lock);
work.fn(work.fn_ctx);
- pthread_mutex_lock(&pool->lock);
+ mp_mutex_lock(&pool->lock);
pool->busy_threads -= 1;
- ts = (struct timespec){0};
+ destroy_deadline = 0;
got_timeout = false;
}
@@ -98,18 +96,20 @@ static void *worker_thread(void *arg)
// timeout, and nobody is waiting for us. We have to remove ourselves.
if (!pool->terminate) {
for (int n = 0; n < pool->num_threads; n++) {
- if (pthread_equal(pool->threads[n], pthread_self())) {
- pthread_detach(pthread_self());
+ if (mp_thread_id_equal(mp_thread_get_id(pool->threads[n]),
+ mp_thread_current_id()))
+ {
+ mp_thread_detach(pool->threads[n]);
MP_TARRAY_REMOVE_AT(pool->threads, pool->num_threads, n);
- pthread_mutex_unlock(&pool->lock);
- return NULL;
+ mp_mutex_unlock(&pool->lock);
+ MP_THREAD_RETURN();
}
}
- assert(0);
+ MP_ASSERT_UNREACHABLE();
}
- pthread_mutex_unlock(&pool->lock);
- return NULL;
+ mp_mutex_unlock(&pool->lock);
+ MP_THREAD_RETURN();
}
static void thread_pool_dtor(void *ctx)
@@ -117,33 +117,33 @@ static void thread_pool_dtor(void *ctx)
struct mp_thread_pool *pool = ctx;
- pthread_mutex_lock(&pool->lock);
+ mp_mutex_lock(&pool->lock);
pool->terminate = true;
- pthread_cond_broadcast(&pool->wakeup);
+ mp_cond_broadcast(&pool->wakeup);
- pthread_t *threads = pool->threads;
+ mp_thread *threads = pool->threads;
int num_threads = pool->num_threads;
pool->threads = NULL;
pool->num_threads = 0;
- pthread_mutex_unlock(&pool->lock);
+ mp_mutex_unlock(&pool->lock);
for (int n = 0; n < num_threads; n++)
- pthread_join(threads[n], NULL);
+ mp_thread_join(threads[n]);
assert(pool->num_work == 0);
assert(pool->num_threads == 0);
- pthread_cond_destroy(&pool->wakeup);
- pthread_mutex_destroy(&pool->lock);
+ mp_cond_destroy(&pool->wakeup);
+ mp_mutex_destroy(&pool->lock);
}
static bool add_thread(struct mp_thread_pool *pool)
{
- pthread_t thread;
+ mp_thread thread;
- if (pthread_create(&thread, NULL, worker_thread, pool) != 0)
+ if (mp_thread_create(&thread, worker_thread, pool) != 0)
return false;
MP_TARRAY_APPEND(pool, pool->threads, pool->num_threads, thread);
@@ -160,17 +160,17 @@ struct mp_thread_pool *mp_thread_pool_create(void *ta_parent, int init_threads,
struct mp_thread_pool *pool = talloc_zero(ta_parent, struct mp_thread_pool);
talloc_set_destructor(pool, thread_pool_dtor);
- pthread_mutex_init(&pool->lock, NULL);
- pthread_cond_init(&pool->wakeup, NULL);
+ mp_mutex_init(&pool->lock);
+ mp_cond_init(&pool->wakeup);
pool->min_threads = min_threads;
pool->max_threads = max_threads;
- pthread_mutex_lock(&pool->lock);
+ mp_mutex_lock(&pool->lock);
for (int n = 0; n < init_threads; n++)
add_thread(pool);
bool ok = pool->num_threads >= init_threads;
- pthread_mutex_unlock(&pool->lock);
+ mp_mutex_unlock(&pool->lock);
if (!ok)
TA_FREEP(&pool);
@@ -185,7 +185,7 @@ static bool thread_pool_add(struct mp_thread_pool *pool, void (*fn)(void *ctx),
assert(fn);
- pthread_mutex_lock(&pool->lock);
+ mp_mutex_lock(&pool->lock);
struct work work = {fn, fn_ctx};
// If there are not enough threads to process all at once, but we can
@@ -203,10 +203,10 @@ static bool thread_pool_add(struct mp_thread_pool *pool, void (*fn)(void *ctx),
if (ok) {
MP_TARRAY_INSERT_AT(pool, pool->work, pool->num_work, 0, work);
- pthread_cond_signal(&pool->wakeup);
+ mp_cond_signal(&pool->wakeup);
}
- pthread_mutex_unlock(&pool->lock);
+ mp_mutex_unlock(&pool->lock);
return ok;
}