From 772c8779fa4ed38fcc53c1a7a5c4c93e8a13f05a Mon Sep 17 00:00:00 2001 From: gingerBill Date: Tue, 3 Sep 2019 22:11:21 +0100 Subject: [PATCH] Clean up thread pool code --- src/gb/gb.h | 15 ++---- src/parser.cpp | 11 +++-- src/thread_pool.cpp | 112 ++++++++++++++++++++++---------------------- 3 files changed, 64 insertions(+), 74 deletions(-) diff --git a/src/gb/gb.h b/src/gb/gb.h index 1b2bc5188..60303729f 100644 --- a/src/gb/gb.h +++ b/src/gb/gb.h @@ -918,10 +918,7 @@ GB_DEF void gb_lfence (void); #if defined(GB_SYSTEM_WINDOWS) -typedef struct gbSemaphore { - void *win32_handle; - LONG count; -} gbSemaphore; +typedef struct gbSemaphore { void *win32_handle;} gbSemaphore; #elif defined(GB_SYSTEM_OSX) typedef struct gbSemaphore { semaphore_t osx_handle; } gbSemaphore; #elif defined(GB_SYSTEM_UNIX) @@ -4593,21 +4590,15 @@ gb_inline void gb_semaphore_release(gbSemaphore *s) { gb_semaphore_post(s, 1); } #if defined(GB_SYSTEM_WINDOWS) gb_inline void gb_semaphore_init(gbSemaphore *s) { s->win32_handle = CreateSemaphoreA(NULL, 0, I32_MAX, NULL); - s->count = 0; } gb_inline void gb_semaphore_destroy(gbSemaphore *s) { CloseHandle(s->win32_handle); } gb_inline void gb_semaphore_post(gbSemaphore *s, i32 count) { - _InterlockedIncrement(&s->count); - if (ReleaseSemaphore(s->win32_handle, count, NULL) == FALSE) { - _InterlockedDecrement(&s->count); - } + ReleaseSemaphore(s->win32_handle, count, NULL); } gb_inline void gb_semaphore_wait(gbSemaphore *s) { - if (WaitForSingleObjectEx(s->win32_handle, INFINITE, FALSE) == WAIT_OBJECT_0) { - _InterlockedDecrement(&s->count); - } + WaitForSingleObjectEx(s->win32_handle, INFINITE, FALSE); } #elif defined(GB_SYSTEM_OSX) diff --git a/src/parser.cpp b/src/parser.cpp index a026e8ecd..b1d9a457f 100644 --- a/src/parser.cpp +++ b/src/parser.cpp @@ -4798,7 +4798,8 @@ ParseFileError parse_packages(Parser *p, String init_filename) { GB_ASSERT(init_filename.text[init_filename.len] == 0); isize thread_count = gb_max(build_context.thread_count, 1); - thread_pool_init(&parser_thread_pool, heap_allocator(), thread_count, "ParserWork"); + isize worker_count = thread_count-1; // NOTE(bill): The main thread will also be used for work + thread_pool_init(&parser_thread_pool, heap_allocator(), worker_count, "ParserWork"); String init_fullpath = path_to_full_path(heap_allocator(), init_filename); if (!path_is_directory(init_fullpath)) { @@ -4819,12 +4820,12 @@ ParseFileError parse_packages(Parser *p, String init_filename) { p->init_fullpath = init_fullpath; thread_pool_start(&parser_thread_pool); - thread_pool_kick_and_wait(&parser_thread_pool); + thread_pool_wait_to_process(&parser_thread_pool); // NOTE(bill): Get the last error and use that - for (isize i = parser_thread_pool.thread_count-1; i >= 0; i--) { - gbThread *t = &parser_thread_pool.threads[i]; - ParseFileError err = cast(ParseFileError)t->return_value; + for (isize i = parser_thread_pool.task_tail-1; i >= 0; i--) { + WorkerTask *task = &parser_thread_pool.tasks[i]; + ParseFileError err = cast(ParseFileError)task->result; if (err != ParseFile_None) { return err; } diff --git a/src/thread_pool.cpp b/src/thread_pool.cpp index 8a32b7aca..2467ba609 100644 --- a/src/thread_pool.cpp +++ b/src/thread_pool.cpp @@ -6,20 +6,21 @@ typedef WORKER_TASK_PROC(WorkerTaskProc); struct WorkerTask { WorkerTaskProc *do_work; void *data; + isize result; }; struct ThreadPool { - gbMutex task_mutex; gbMutex mutex; - gbSemaphore semaphore; + gbSemaphore sem_available; gbAtomic32 processing_work_count; bool is_running; gbAllocator allocator; WorkerTask *tasks; - isize volatile task_count; + isize volatile task_head; + isize volatile task_tail; isize volatile task_capacity; gbThread *threads; @@ -40,14 +41,14 @@ GB_THREAD_PROC(worker_thread_internal); void thread_pool_init(ThreadPool *pool, gbAllocator const &a, isize thread_count, char const *worker_prefix) { pool->allocator = a; - pool->task_count = 0; + pool->task_head = 0; + pool->task_tail = 0; pool->task_capacity = 1024; pool->tasks = gb_alloc_array(a, WorkerTask, pool->task_capacity); - pool->threads = gb_alloc_array(a, gbThread, thread_count); - pool->thread_count = thread_count; - gb_mutex_init(&pool->task_mutex); + pool->thread_count = gb_max(thread_count, 0); + pool->threads = gb_alloc_array(a, gbThread, pool->thread_count); gb_mutex_init(&pool->mutex); - gb_semaphore_init(&pool->semaphore); + gb_semaphore_init(&pool->sem_available); pool->is_running = true; pool->worker_prefix_len = 0; @@ -63,6 +64,7 @@ void thread_pool_init(ThreadPool *pool, gbAllocator const &a, isize thread_count gb_thread_init(t); t->user_index = i; #if 0 + // TODO(bill): Fix this on Linux as it causes a seg-fault if (pool->worker_prefix_len > 0) { char worker_name[16] = {}; gb_snprintf(worker_name, gb_size_of(worker_name), "%.*s%u", pool->worker_prefix_len, pool->worker_prefix, cast(u16)i); @@ -82,9 +84,9 @@ void thread_pool_start(ThreadPool *pool) { void thread_pool_join(ThreadPool *pool) { pool->is_running = false; - for (isize i = 0; i < pool->thread_count; i++) { - gb_semaphore_release(&pool->semaphore); - } + gb_semaphore_post(&pool->sem_available, cast(i32)pool->thread_count); + + gb_yield(); for (isize i = 0; i < pool->thread_count; i++) { gbThread *t = &pool->threads[i]; @@ -96,25 +98,24 @@ void thread_pool_join(ThreadPool *pool) { void thread_pool_destroy(ThreadPool *pool) { thread_pool_join(pool); - gb_semaphore_destroy(&pool->semaphore); + gb_semaphore_destroy(&pool->sem_available); gb_mutex_destroy(&pool->mutex); - gb_mutex_destroy(&pool->task_mutex); gb_free(pool->allocator, pool->threads); pool->thread_count = 0; gb_free(pool->allocator, pool->tasks); - pool->task_count = 0; + pool->task_head = 0; + pool->task_tail = 0; pool->task_capacity = 0; - } void thread_pool_add_task(ThreadPool *pool, WorkerTaskProc *proc, void *data) { - gb_mutex_lock(&pool->task_mutex); + gb_mutex_lock(&pool->mutex); - if (pool->task_count == pool->task_capacity) { + if (pool->task_tail == pool->task_capacity) { isize new_cap = 2*pool->task_capacity + 8; WorkerTask *new_tasks = gb_alloc_array(pool->allocator, WorkerTask, new_cap); - gb_memmove(new_tasks, pool->tasks, pool->task_count*gb_size_of(WorkerTask)); + gb_memmove(new_tasks, pool->tasks, (pool->task_tail)*gb_size_of(WorkerTask)); pool->tasks = new_tasks; pool->task_capacity = new_cap; } @@ -122,35 +123,42 @@ void thread_pool_add_task(ThreadPool *pool, WorkerTaskProc *proc, void *data) { task.do_work = proc; task.data = data; - pool->tasks[pool->task_count++] = task; - - gb_semaphore_post(&pool->semaphore, 1); - gb_mutex_unlock(&pool->task_mutex); + pool->tasks[pool->task_tail++] = task; + gb_semaphore_post(&pool->sem_available, 1); + gb_mutex_unlock(&pool->mutex); } -void thread_pool_kick(ThreadPool *pool) { - gb_mutex_lock(&pool->task_mutex); - if (pool->task_count > 0) { - isize count = gb_min(pool->task_count, pool->thread_count); - for (isize i = 0; i < count; i++) { - gb_semaphore_post(&pool->semaphore, 1); +bool thread_pool_try_and_pop_task(ThreadPool *pool, WorkerTask *task) { + bool got_task = false; + if (gb_mutex_try_lock(&pool->mutex)) { + if (pool->task_tail > pool->task_head) { + gb_atomic32_fetch_add(&pool->processing_work_count, +1); + *task = pool->tasks[pool->task_head++]; + got_task = true; } + gb_mutex_unlock(&pool->mutex); } - gb_mutex_unlock(&pool->task_mutex); + return got_task; +} +void thread_pool_do_work(ThreadPool *pool, WorkerTask *task) { + task->result = task->do_work(task->data); + gb_atomic32_fetch_add(&pool->processing_work_count, -1); } -void thread_pool_kick_and_wait(ThreadPool *pool) { - thread_pool_kick(pool); - isize return_value = 0; - while (pool->task_count > 0 || gb_atomic32_load(&pool->processing_work_count) != 0) { - - if (pool->task_count > 0 && gb_atomic32_load(&pool->processing_work_count) == 0) { - gb_mutex_lock(&pool->task_mutex); - for (isize i = 0; i < pool->task_count; i++) { - gb_semaphore_post(&pool->semaphore, 1); - } - gb_mutex_unlock(&pool->task_mutex); +void thread_pool_wait_to_process(ThreadPool *pool) { + while (pool->task_tail > pool->task_head || gb_atomic32_load(&pool->processing_work_count) != 0) { + WorkerTask task = {}; + if (thread_pool_try_and_pop_task(pool, &task)) { + thread_pool_do_work(pool, &task); } + + // Safety-kick + if (pool->task_tail > pool->task_head && gb_atomic32_load(&pool->processing_work_count) == 0) { + gb_mutex_lock(&pool->mutex); + gb_semaphore_post(&pool->sem_available, cast(i32)(pool->task_tail-pool->task_head)); + gb_mutex_unlock(&pool->mutex); + } + gb_yield(); } @@ -160,27 +168,17 @@ void thread_pool_kick_and_wait(ThreadPool *pool) { GB_THREAD_PROC(worker_thread_internal) { ThreadPool *pool = cast(ThreadPool *)thread->user_data; - thread->return_value = 0; while (pool->is_running) { - gb_semaphore_wait(&pool->semaphore); + gb_semaphore_wait(&pool->sem_available); WorkerTask task = {}; - bool got_task = false; - - if (gb_mutex_try_lock(&pool->task_mutex)) { - if (pool->task_count > 0) { - gb_atomic32_fetch_add(&pool->processing_work_count, +1); - task = pool->tasks[--pool->task_count]; - got_task = true; - } - gb_mutex_unlock(&pool->task_mutex); - } - - if (got_task) { - thread->return_value = task.do_work(task.data); - gb_atomic32_fetch_add(&pool->processing_work_count, -1); + if (thread_pool_try_and_pop_task(pool, &task)) { + thread_pool_do_work(pool, &task); } } - return thread->return_value; + // Cascade + gb_semaphore_release(&pool->sem_available); + + return 0; }