From 1f25f60a685d50a178b6d0bc571ca5c2f476a984 Mon Sep 17 00:00:00 2001 From: nakst <> Date: Sun, 22 Aug 2021 21:13:41 +0100 Subject: [PATCH] Rewrite thread_pool.cpp --- src/llvm_backend.cpp | 3 +- src/parser.cpp | 3 +- src/thread_pool.cpp | 209 +++++++++++++++---------------------------- 3 files changed, 73 insertions(+), 142 deletions(-) diff --git a/src/llvm_backend.cpp b/src/llvm_backend.cpp index d50a512c7..1f897fe4c 100644 --- a/src/llvm_backend.cpp +++ b/src/llvm_backend.cpp @@ -1693,8 +1693,7 @@ void lb_generate_code(lbGenerator *gen) { thread_pool_add_task(&lb_thread_pool, lb_llvm_emit_worker_proc, wd); } - thread_pool_start(&lb_thread_pool); - thread_pool_wait_to_process(&lb_thread_pool); + thread_pool_wait(&lb_thread_pool); } else { for_array(j, gen->modules.entries) { lbModule *m = gen->modules.entries[j].value; diff --git a/src/parser.cpp b/src/parser.cpp index 2c29f651a..9d9d6120f 100644 --- a/src/parser.cpp +++ b/src/parser.cpp @@ -5685,8 +5685,7 @@ ParseFileError parse_packages(Parser *p, String init_filename) { } } - thread_pool_start(&parser_thread_pool); - thread_pool_wait_to_process(&parser_thread_pool); + thread_pool_wait(&parser_thread_pool); for (ParseFileError err = ParseFile_None; mpmc_dequeue(&p->file_error_queue, &err); /**/) { if (err != ParseFile_None) { diff --git a/src/thread_pool.cpp b/src/thread_pool.cpp index b375d410a..fa8e10049 100644 --- a/src/thread_pool.cpp +++ b/src/thread_pool.cpp @@ -1,168 +1,101 @@ -// worker_queue.cpp +// thread_pool.cpp #define WORKER_TASK_PROC(name) isize name(void *data) typedef WORKER_TASK_PROC(WorkerTaskProc); +#if defined(GB_SYSTEM_WINDOWS) +#define THREAD_POOL_SYNC_FETCH_AND_ADD InterlockedAdd +#else +#define THREAD_POOL_SYNC_FETCH_AND_ADD __sync_fetch_and_add +#endif + struct WorkerTask { + WorkerTask *next_task; WorkerTaskProc *do_work; void *data; - isize result; }; - struct ThreadPool { - BlockingMutex mutex; - Semaphore sem_available; - std::atomic processing_work_count; - std::atomic is_running; - - gbAllocator allocator; - - MPMCQueue tasks; - - Thread *threads; - isize thread_count; - - char worker_prefix[10]; - i32 worker_prefix_len; + volatile i32 outstanding_task_count; + WorkerTask *next_task; + BlockingMutex task_list_mutex; }; void thread_pool_init(ThreadPool *pool, gbAllocator const &a, isize thread_count, char const *worker_prefix = nullptr); void thread_pool_destroy(ThreadPool *pool); -void thread_pool_start(ThreadPool *pool); -void thread_pool_join(ThreadPool *pool); +void thread_pool_wait(ThreadPool *pool); void thread_pool_add_task(ThreadPool *pool, WorkerTaskProc *proc, void *data); -THREAD_PROC(worker_thread_internal); +void worker_thread_internal(); + +void thread_pool_thread_entry(ThreadPool *pool) { + while (pool->outstanding_task_count) { + mutex_lock(&pool->task_list_mutex); + + if (pool->next_task) { + WorkerTask *task = pool->next_task; + pool->next_task = task->next_task; + mutex_unlock(&pool->task_list_mutex); + task->do_work(task->data); + THREAD_POOL_SYNC_FETCH_AND_ADD(&pool->outstanding_task_count, -1); + free(task); + } else { + mutex_unlock(&pool->task_list_mutex); + yield(); + } + } +} + +#if defined(GB_SYSTEM_WINDOWS) + DWORD __stdcall thread_pool_thread_entry_platform(void *arg) { + thread_pool_thread_entry((ThreadPool *) arg); + return 0; + } + + void thread_pool_start_thread(ThreadPool *pool) { + CloseHandle(CreateThread(NULL, 0, thread_pool_thread_entry_platform, pool, 0, NULL)); + } +#else + void *thread_pool_thread_entry_platform(void *arg) { + thread_pool_thread_entry((ThreadPool *) arg); + return NULL; + } + + void thread_pool_start_thread(ThreadPool *pool) { + pthread_t handle; + pthread_create(&handle, NULL, thread_pool_thread_entry_platform, pool); + pthread_detach(handle); + } +#endif void thread_pool_init(ThreadPool *pool, gbAllocator const &a, isize thread_count, char const *worker_prefix) { - pool->allocator = a; - mpmc_init(&pool->tasks, a, 1024); - pool->thread_count = gb_max(thread_count, 0); - pool->threads = gb_alloc_array(a, Thread, pool->thread_count); - mutex_init(&pool->mutex); - semaphore_init(&pool->sem_available); - pool->is_running = true; + memset(pool, 0, sizeof(ThreadPool)); + mutex_init(&pool->task_list_mutex); + pool->outstanding_task_count = 1; - pool->worker_prefix_len = 0; - if (worker_prefix) { - i32 worker_prefix_len = cast(i32)gb_strlen(worker_prefix); - worker_prefix_len = gb_min(worker_prefix_len, 10); - gb_memmove(pool->worker_prefix, worker_prefix, worker_prefix_len); - pool->worker_prefix_len = worker_prefix_len; - } - - for (isize i = 0; i < pool->thread_count; i++) { - Thread *t = &pool->threads[i]; - thread_init(t); - t->user_index = i; - #if 0 - // TODO(bill): Fix this on Linux as it causes a seg-fault - if (pool->worker_prefix_len > 0) { - char worker_name[16] = {}; - gb_snprintf(worker_name, gb_size_of(worker_name), "%.*s%u", pool->worker_prefix_len, pool->worker_prefix, cast(u16)i); - thread_set_name(t, worker_name); - } - #endif + for (int i = 0; i < thread_count; i++) { + thread_pool_start_thread(pool); } } -void thread_pool_start(ThreadPool *pool) { - for (isize i = 0; i < pool->thread_count; i++) { - Thread *t = &pool->threads[i]; - thread_start(t, worker_thread_internal, pool); - } -} - -void thread_pool_join(ThreadPool *pool) { - pool->is_running.store(false); - - semaphore_post(&pool->sem_available, cast(i32)pool->thread_count); - - yield(); - - for (isize i = 0; i < pool->thread_count; i++) { - Thread *t = &pool->threads[i]; - thread_join(t); - } -} - - void thread_pool_destroy(ThreadPool *pool) { - thread_pool_join(pool); - - semaphore_destroy(&pool->sem_available); - mutex_destroy(&pool->mutex); - gb_free(pool->allocator, pool->threads); - pool->thread_count = 0; - mpmc_destroy(&pool->tasks); + mutex_destroy(&pool->task_list_mutex); } +void thread_pool_wait(ThreadPool *pool) { + THREAD_POOL_SYNC_FETCH_AND_ADD(&pool->outstanding_task_count, -1); -void thread_pool_add_task(ThreadPool *pool, WorkerTaskProc *proc, void *data) { - mutex_lock(&pool->mutex); - - WorkerTask task = {}; - task.do_work = proc; - task.data = data; - - mpmc_enqueue(&pool->tasks, task); - semaphore_post(&pool->sem_available, 1); - mutex_unlock(&pool->mutex); -} - -bool thread_pool_try_and_pop_task(ThreadPool *pool, WorkerTask *task) { - bool got_task = false; - if (mpmc_dequeue(&pool->tasks, task)) { - pool->processing_work_count.fetch_add(1); - got_task = true; - } - return got_task; -} -void thread_pool_do_work(ThreadPool *pool, WorkerTask *task) { - task->result = task->do_work(task->data); - pool->processing_work_count.fetch_sub(1); -} - -void thread_pool_wait_to_process(ThreadPool *pool) { - if (pool->thread_count == 0) { - WorkerTask task = {}; - while (thread_pool_try_and_pop_task(pool, &task)) { - thread_pool_do_work(pool, &task); - } - return; - } - while (pool->tasks.count.load(std::memory_order_relaxed) > 0 || pool->processing_work_count.load() != 0) { - WorkerTask task = {}; - if (thread_pool_try_and_pop_task(pool, &task)) { - thread_pool_do_work(pool, &task); - } - - // Safety-kick - while (pool->tasks.count.load(std::memory_order_relaxed) > 0 && pool->processing_work_count.load() == 0) { - mutex_lock(&pool->mutex); - semaphore_post(&pool->sem_available, cast(i32)pool->tasks.count.load(std::memory_order_relaxed)); - mutex_unlock(&pool->mutex); - } - + while (pool->outstanding_task_count) { yield(); } - - thread_pool_join(pool); } - -THREAD_PROC(worker_thread_internal) { - ThreadPool *pool = cast(ThreadPool *)thread->user_data; - while (pool->is_running.load()) { - semaphore_wait(&pool->sem_available); - - WorkerTask task = {}; - if (thread_pool_try_and_pop_task(pool, &task)) { - thread_pool_do_work(pool, &task); - } - } - // Cascade - semaphore_release(&pool->sem_available); - - return 0; +void thread_pool_add_task(ThreadPool *pool, WorkerTaskProc *proc, void *data) { + WorkerTask *task = (WorkerTask *) calloc(1, sizeof(WorkerTask)); + task->do_work = proc; + task->data = data; + mutex_lock(&pool->task_list_mutex); + task->next_task = pool->next_task; + pool->next_task = task; + THREAD_POOL_SYNC_FETCH_AND_ADD(&pool->outstanding_task_count, 1); + mutex_unlock(&pool->task_list_mutex); }