From c93872cc1371d60863e2dae6c08f556f32dd5a8a Mon Sep 17 00:00:00 2001 From: gingerBill Date: Sun, 1 Sep 2019 22:57:53 +0100 Subject: [PATCH] Thread pool fixes --- build.bat | 13 ++-- src/thread_pool.cpp | 169 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 176 insertions(+), 6 deletions(-) create mode 100644 src/thread_pool.cpp diff --git a/build.bat b/build.bat index 6e5450c2b..334102cf1 100644 --- a/build.bat +++ b/build.bat @@ -4,14 +4,14 @@ set exe_name=odin.exe :: Debug = 0, Release = 1 -set release_mode=0 +set release_mode=1 set compiler_flags= -nologo -Oi -TP -fp:precise -Gm- -MP -FC -GS- -EHsc- -GR- if %release_mode% EQU 0 ( rem Debug - set compiler_flags=%compiler_flags% -Od -MDd -Z7 + set compiler_flags=%compiler_flags% -Od -MDd -Zi rem -DDISPLAY_TIMING ) else ( rem Release - set compiler_flags=%compiler_flags% -O2 -MT -Z7 -DNO_ARRAY_BOUNDS_CHECK + set compiler_flags=%compiler_flags% -O2 -MT -Zi -DNO_ARRAY_BOUNDS_CHECK ) set compiler_warnings= ^ @@ -40,9 +40,10 @@ del *.pdb > NUL 2> NUL del *.ilk > NUL 2> NUL -cl %compiler_settings% "src\main.cpp" ^ - /link %linker_settings% -OUT:%exe_name% ^ - && odin run examples/demo/demo.odin +rem cl %compiler_settings% "src\main.cpp" ^ + rem /link %linker_settings% -OUT:%exe_name% ^ + rem && odin build examples/demo/demo.odin -show-timings +odin check examples/demo/demo.odin -show-timings del *.obj > NUL 2> NUL diff --git a/src/thread_pool.cpp b/src/thread_pool.cpp new file mode 100644 index 000000000..67e698e5d --- /dev/null +++ b/src/thread_pool.cpp @@ -0,0 +1,169 @@ +// worker_queue.cpp + +#define WORKER_TASK_PROC(name) isize name(void *data) +typedef WORKER_TASK_PROC(WorkerTaskProc); + +struct WorkerTask { + WorkerTaskProc *do_work; + void *data; +}; + + +struct ThreadPool { + gbMutex task_mutex; + gbMutex mutex; + gbSemaphore semaphore; + gbAtomic32 processing_work_count; + bool is_running; + + Array tasks; + Array threads; + + gbAllocator original_allocator; + + char worker_prefix[10]; + i32 worker_prefix_len; +}; + + +GB_ALLOCATOR_PROC(thread_pool_allocator_proc) { + ThreadPool *pool = cast(ThreadPool *)allocator_data; + return pool->original_allocator.proc(pool->original_allocator.data, type, size, 256, old_memory, old_size, flags); +} + +gbAllocator thread_pool_allocator(ThreadPool *pool) { + gbAllocator a = {thread_pool_allocator_proc, pool}; + return a; +} + +void thread_pool_init(ThreadPool *pool, gbAllocator const &a, isize thread_count, char const *worker_prefix = nullptr); +void thread_pool_destroy(ThreadPool *pool); +void thread_pool_start(ThreadPool *pool); +void thread_pool_join(ThreadPool *pool); +void thread_pool_add_task(ThreadPool *pool, WorkerTaskProc *proc, void *data); +void thread_pool_kick(ThreadPool *pool); +void thread_pool_kick_and_wait(ThreadPool *pool); +GB_THREAD_PROC(worker_thread_internal); + +void thread_pool_init(ThreadPool *pool, gbAllocator const &a, isize thread_count, char const *worker_prefix) { + pool->original_allocator = a; + gbAllocator tpa = thread_pool_allocator(pool); + pool->tasks = array_make(tpa, 0, 1024); + pool->threads = array_make(tpa, thread_count); + gb_mutex_init(&pool->task_mutex); + gb_mutex_init(&pool->mutex); + gb_semaphore_init(&pool->semaphore); + pool->is_running = true; + + pool->worker_prefix_len = 0; + if (worker_prefix) { + i32 worker_prefix_len = cast(i32)gb_strlen(worker_prefix); + worker_prefix_len = gb_min(worker_prefix_len, 10); + gb_memmove(pool->worker_prefix, worker_prefix, worker_prefix_len); + pool->worker_prefix_len = worker_prefix_len; + } + + for_array(i, pool->threads) { + gbThread *t = &pool->threads[i]; + gb_thread_init(t); + t->user_index = i; + if (pool->worker_prefix_len > 0) { + char worker_name[16] = {}; + gb_snprintf(worker_name, gb_size_of(worker_name), "%.*s%u", pool->worker_prefix_len, pool->worker_prefix, cast(u16)i); + gb_thread_set_name(t, worker_name); + } + } +} + +void thread_pool_start(ThreadPool *pool) { + for_array(i, pool->threads) { + gbThread *t = &pool->threads[i]; + gb_thread_start(t, worker_thread_internal, pool); + } +} + +void thread_pool_join(ThreadPool *pool) { + pool->is_running = false; + + for_array(i, pool->threads) { + gb_semaphore_release(&pool->semaphore); + } + + for_array(i, pool->threads) { + gbThread *t = &pool->threads[i]; + gb_thread_join(t); + } +} + + +void thread_pool_destroy(ThreadPool *pool) { + thread_pool_join(pool); + + gb_semaphore_destroy(&pool->semaphore); + gb_mutex_destroy(&pool->mutex); + gb_mutex_destroy(&pool->task_mutex); + array_free(&pool->threads); + array_free(&pool->tasks); +} + + +void thread_pool_add_task(ThreadPool *pool, WorkerTaskProc *proc, void *data) { + gb_mutex_lock(&pool->task_mutex); + + WorkerTask task = {}; + task.do_work = proc; + task.data = data; + array_add(&pool->tasks, task); + + gb_mutex_unlock(&pool->task_mutex); + + gb_semaphore_post(&pool->semaphore, 1); +} + +void thread_pool_kick(ThreadPool *pool) { + if (pool->tasks.count > 0) { + isize count = gb_min(pool->tasks.count, pool->threads.count); + for (isize i = 0; i < count; i++) { + gb_semaphore_post(&pool->semaphore, 1); + } + } + +} +void thread_pool_kick_and_wait(ThreadPool *pool) { + thread_pool_kick(pool); + + isize return_value = 0; + while (pool->tasks.count > 0 || gb_atomic32_load(&pool->processing_work_count) != 0) { + gb_yield(); + } + + thread_pool_join(pool); +} + + +GB_THREAD_PROC(worker_thread_internal) { + ThreadPool *pool = cast(ThreadPool *)thread->user_data; + thread->return_value = 0; + while (pool->is_running) { + gb_semaphore_wait(&pool->semaphore); + + WorkerTask task = {}; + bool got_task = false; + + if (gb_mutex_try_lock(&pool->task_mutex)) { + if (pool->tasks.count > 0) { + gb_atomic32_fetch_add(&pool->processing_work_count, +1); + task = array_pop(&pool->tasks); + got_task = true; + } + gb_mutex_unlock(&pool->task_mutex); + } + + if (got_task) { + thread->return_value = task.do_work(task.data); + gb_atomic32_fetch_add(&pool->processing_work_count, -1); + } + } + return thread->return_value; +} +