From 5c4d95d539944dcb6617ce9c70bcedd12552225a Mon Sep 17 00:00:00 2001 From: gingerBill Date: Thu, 19 Aug 2021 15:19:36 +0100 Subject: [PATCH] Move more of `gb.h`'s Synchronization code into common.cpp --- src/checker.cpp | 20 ++++---- src/checker.hpp | 4 +- src/common.cpp | 116 +++++++++++++++++++++++++++++++++++--------- src/thread_pool.cpp | 30 ++++++------ 4 files changed, 120 insertions(+), 50 deletions(-) diff --git a/src/checker.cpp b/src/checker.cpp index e3d80f68a..a207ed78e 100644 --- a/src/checker.cpp +++ b/src/checker.cpp @@ -883,7 +883,7 @@ void init_checker_info(CheckerInfo *i) { mutex_init(&i->identifier_uses_mutex); mutex_init(&i->foreign_mutex); - gb_semaphore_init(&i->collect_semaphore); + semaphore_init(&i->collect_semaphore); #undef TIME_SECTION @@ -976,7 +976,7 @@ void init_checker(Checker *c) { // NOTE(bill): 1 Mi elements should be enough on average mpmc_init(&c->procs_to_check_queue, heap_allocator(), 1<<20); - gb_semaphore_init(&c->procs_to_check_semaphore); + semaphore_init(&c->procs_to_check_semaphore); mpmc_init(&c->global_untyped_queue, a, 1<<20); @@ -991,7 +991,7 @@ void destroy_checker(Checker *c) { destroy_checker_context(&c->builtin_ctx); mpmc_destroy(&c->procs_to_check_queue); - gb_semaphore_destroy(&c->procs_to_check_semaphore); + semaphore_destroy(&c->procs_to_check_semaphore); mpmc_destroy(&c->global_untyped_queue); } @@ -4136,7 +4136,7 @@ void check_with_workers(Checker *c, gbThreadProc *proc, isize total_count) { worker_count = 0; } - gb_semaphore_post(&c->info.collect_semaphore, cast(i32)thread_count); + semaphore_post(&c->info.collect_semaphore, cast(i32)thread_count); if (worker_count == 0) { ThreadProcCheckerSection section_all = {}; @@ -4174,7 +4174,7 @@ void check_with_workers(Checker *c, gbThreadProc *proc, isize total_count) { dummy_main_thread.user_data = thread_data+worker_count; proc(&dummy_main_thread); - gb_semaphore_wait(&c->info.collect_semaphore); + semaphore_wait(&c->info.collect_semaphore); for (isize i = 0; i < worker_count; i++) { gb_thread_destroy(threads+i); @@ -4208,7 +4208,7 @@ GB_THREAD_PROC(thread_proc_collect_entities) { map_destroy(&untyped); - gb_semaphore_release(&c->info.collect_semaphore); + semaphore_release(&c->info.collect_semaphore); return 0; } @@ -4249,7 +4249,7 @@ GB_THREAD_PROC(thread_proc_check_export_entities) { map_destroy(&untyped); - gb_semaphore_release(&c->info.collect_semaphore); + semaphore_release(&c->info.collect_semaphore); return 0; } @@ -4735,7 +4735,7 @@ GB_THREAD_PROC(thread_proc_body) { map_destroy(&untyped); - gb_semaphore_release(&c->procs_to_check_semaphore); + semaphore_release(&c->procs_to_check_semaphore); return 0; } @@ -4795,7 +4795,7 @@ void check_procedure_bodies(Checker *c) { GB_ASSERT(total_queued == original_queue_count); - gb_semaphore_post(&c->procs_to_check_semaphore, cast(i32)thread_count); + semaphore_post(&c->procs_to_check_semaphore, cast(i32)thread_count); gbThread *threads = gb_alloc_array(permanent_allocator(), gbThread, worker_count); for (isize i = 0; i < worker_count; i++) { @@ -4809,7 +4809,7 @@ void check_procedure_bodies(Checker *c) { dummy_main_thread.user_data = thread_data+worker_count; thread_proc_body(&dummy_main_thread); - gb_semaphore_wait(&c->procs_to_check_semaphore); + semaphore_wait(&c->procs_to_check_semaphore); for (isize i = 0; i < worker_count; i++) { gb_thread_destroy(threads+i); diff --git a/src/checker.hpp b/src/checker.hpp index 81b093137..1b2ad0575 100644 --- a/src/checker.hpp +++ b/src/checker.hpp @@ -294,7 +294,7 @@ struct CheckerInfo { // NOTE(bill): If the semantic checker (check_proc_body) is to ever to be multithreaded, // these variables will be of contention - gbSemaphore collect_semaphore; + Semaphore collect_semaphore; UntypedExprInfoMap global_untyped; // NOTE(bill): This needs to be a map and not on the Ast // as it needs to be iterated across afterwards @@ -390,7 +390,7 @@ struct Checker { MPMCQueue procs_with_deferred_to_check; ProcBodyQueue procs_to_check_queue; - gbSemaphore procs_to_check_semaphore; + Semaphore procs_to_check_semaphore; // TODO(bill): Technically MPSC queue MPMCQueue global_untyped_queue; diff --git a/src/common.cpp b/src/common.cpp index 96e29f822..fd6268a82 100644 --- a/src/common.cpp +++ b/src/common.cpp @@ -47,6 +47,47 @@ void mutex_unlock(BlockingMutex *m) { ReleaseSRWLockExclusive(&m->srwlock); } + + struct RecursiveMutex { + CRITICAL_SECTION win32_critical_section; + }; + void mutex_init(RecursiveMutex *m) { + InitializeCriticalSection(&m->win32_critical_section); + } + void mutex_destroy(RecursiveMutex *m) { + DeleteCriticalSection(&m->win32_critical_section); + } + void mutex_lock(RecursiveMutex *m) { + EnterCriticalSection(&m->win32_critical_section); + } + bool mutex_try_lock(RecursiveMutex *m) { + return TryEnterCriticalSection(&m->win32_critical_section) != 0; + } + void mutex_unlock(RecursiveMutex *m) { + LeaveCriticalSection(&m->win32_critical_section); + } + + struct Semaphore { + void *win32_handle; + }; + + gb_inline void semaphore_init(Semaphore *s) { + s->win32_handle = CreateSemaphoreA(NULL, 0, I32_MAX, NULL); + } + gb_inline void semaphore_destroy(Semaphore *s) { + CloseHandle(s->win32_handle); + } + gb_inline void semaphore_post(Semaphore *s, i32 count) { + ReleaseSemaphore(s->win32_handle, count, NULL); + } + gb_inline void semaphore_wait(Semaphore *s) { + WaitForSingleObjectEx(s->win32_handle, INFINITE, FALSE); + } + + gb_inline void semaphore_release(Semaphore *s) { + semaphore_post(s, 1); + } + #else struct BlockingMutex { pthread_mutex_t pthread_mutex; @@ -66,26 +107,55 @@ void mutex_unlock(BlockingMutex *m) { pthread_mutex_unlock(&m->pthread_mutex); } -#endif -struct RecursiveMutex { - gbMutex mutex; -}; -void mutex_init(RecursiveMutex *m) { - gb_mutex_init(&m->mutex); -} -void mutex_destroy(RecursiveMutex *m) { - gb_mutex_destroy(&m->mutex); -} -void mutex_lock(RecursiveMutex *m) { - gb_mutex_lock(&m->mutex); -} -bool mutex_try_lock(RecursiveMutex *m) { - return !!gb_mutex_try_lock(&m->mutex); -} -void mutex_unlock(RecursiveMutex *m) { - gb_mutex_unlock(&m->mutex); -} + struct RecursiveMutex { + pthread_mutex_t pthread_mutex; + pthread_mutexattr_t pthread_mutexattr; + }; + void mutex_init(RecursiveMutex *m) { + pthread_mutexattr_init(&m->pthread_mutexattr); + pthread_mutexattr_settype(&m->pthread_mutexattr, PTHREAD_MUTEX_RECURSIVE); + pthread_mutex_init(&m->pthread_mutex, &m->pthread_mutexattr); + } + void mutex_destroy(RecursiveMutex *m) { + pthread_mutex_destroy(&m->pthread_mutex); + } + void mutex_lock(RecursiveMutex *m) { + pthread_mutex_lock(&m->pthread_mutex); + } + bool mutex_try_lock(RecursiveMutex *m) { + return pthread_mutex_trylock(&m->pthread_mutex) == 0; + } + void mutex_unlock(RecursiveMutex *m) { + pthread_mutex_unlock(&m->pthread_mutex); + } + + #if defined(GB_SYSTEM_OSX) + struct Semaphore { + semaphore_t osx_handle; + }; + + gb_inline void semaphore_init (Semaphore *s) { semaphore_create(mach_task_self(), &s->osx_handle, SYNC_POLICY_FIFO, 0); } + gb_inline void semaphore_destroy(Semaphore *s) { semaphore_destroy(mach_task_self(), s->osx_handle); } + gb_inline void semaphore_post (Semaphore *s, i32 count) { while (count --> 0) semaphore_signal(s->osx_handle); } + gb_inline void semaphore_wait (Semaphore *s) { semaphore_wait(s->osx_handle); } + #elif defined(GB_SYSTEM_UNIX) + struct Semaphore { + sem_t unix_handle; + }; + + gb_inline void semaphore_init (Semaphore *s) { sem_init(&s->unix_handle, 0, 0); } + gb_inline void semaphore_destroy(Semaphore *s) { sem_destroy(&s->unix_handle); } + gb_inline void semaphore_post (Semaphore *s, i32 count) { while (count --> 0) sem_post(&s->unix_handle); } + gb_inline void semaphore_wait (Semaphore *s) { int i; do { i = sem_wait(&s->unix_handle); } while (i == -1 && errno == EINTR); } + #else + #error + #endif + + gb_inline void semaphore_release(Semaphore *s) { + semaphore_post(s, 1); + } +#endif @@ -585,7 +655,7 @@ struct Temp_Allocator { isize curr_offset; gbAllocator backup_allocator; Array leaked_allocations; - gbMutex mutex; + BlockingMutex mutex; }; gb_global Temp_Allocator temporary_allocator_data = {}; @@ -596,7 +666,7 @@ void temp_allocator_init(Temp_Allocator *s, isize size) { s->len = size; s->curr_offset = 0; s->leaked_allocations.allocator = s->backup_allocator; - gb_mutex_init(&s->mutex); + mutex_init(&s->mutex); } void *temp_allocator_alloc(Temp_Allocator *s, isize size, isize alignment) { @@ -639,8 +709,8 @@ GB_ALLOCATOR_PROC(temp_allocator_proc) { Temp_Allocator *s = cast(Temp_Allocator *)allocator_data; GB_ASSERT_NOT_NULL(s); - gb_mutex_lock(&s->mutex); - defer (gb_mutex_unlock(&s->mutex)); + mutex_lock(&s->mutex); + defer (mutex_unlock(&s->mutex)); switch (type) { case gbAllocation_Alloc: diff --git a/src/thread_pool.cpp b/src/thread_pool.cpp index 35bc5b7d2..be4c3122b 100644 --- a/src/thread_pool.cpp +++ b/src/thread_pool.cpp @@ -11,10 +11,10 @@ struct WorkerTask { struct ThreadPool { - BlockingMutex mutex; - gbSemaphore sem_available; - gbAtomic32 processing_work_count; - bool is_running; + BlockingMutex mutex; + Semaphore sem_available; + std::atomic processing_work_count; + bool is_running; gbAllocator allocator; @@ -40,7 +40,7 @@ void thread_pool_init(ThreadPool *pool, gbAllocator const &a, isize thread_count pool->thread_count = gb_max(thread_count, 0); pool->threads = gb_alloc_array(a, gbThread, pool->thread_count); mutex_init(&pool->mutex); - gb_semaphore_init(&pool->sem_available); + semaphore_init(&pool->sem_available); pool->is_running = true; pool->worker_prefix_len = 0; @@ -76,7 +76,7 @@ void thread_pool_start(ThreadPool *pool) { void thread_pool_join(ThreadPool *pool) { pool->is_running = false; - gb_semaphore_post(&pool->sem_available, cast(i32)pool->thread_count); + semaphore_post(&pool->sem_available, cast(i32)pool->thread_count); gb_yield(); @@ -90,7 +90,7 @@ void thread_pool_join(ThreadPool *pool) { void thread_pool_destroy(ThreadPool *pool) { thread_pool_join(pool); - gb_semaphore_destroy(&pool->sem_available); + semaphore_destroy(&pool->sem_available); mutex_destroy(&pool->mutex); gb_free(pool->allocator, pool->threads); pool->thread_count = 0; @@ -106,21 +106,21 @@ void thread_pool_add_task(ThreadPool *pool, WorkerTaskProc *proc, void *data) { task.data = data; mpmc_enqueue(&pool->tasks, task); - gb_semaphore_post(&pool->sem_available, 1); + semaphore_post(&pool->sem_available, 1); mutex_unlock(&pool->mutex); } bool thread_pool_try_and_pop_task(ThreadPool *pool, WorkerTask *task) { bool got_task = false; if (mpmc_dequeue(&pool->tasks, task)) { - gb_atomic32_fetch_add(&pool->processing_work_count, +1); + pool->processing_work_count.fetch_add(1); got_task = true; } return got_task; } void thread_pool_do_work(ThreadPool *pool, WorkerTask *task) { task->result = task->do_work(task->data); - gb_atomic32_fetch_add(&pool->processing_work_count, -1); + pool->processing_work_count.fetch_sub(1); } void thread_pool_wait_to_process(ThreadPool *pool) { @@ -131,16 +131,16 @@ void thread_pool_wait_to_process(ThreadPool *pool) { } return; } - while (pool->tasks.count.load(std::memory_order_relaxed) > 0 || gb_atomic32_load(&pool->processing_work_count) != 0) { + while (pool->tasks.count.load(std::memory_order_relaxed) > 0 || pool->processing_work_count.load() != 0) { WorkerTask task = {}; if (thread_pool_try_and_pop_task(pool, &task)) { thread_pool_do_work(pool, &task); } // Safety-kick - while (pool->tasks.count.load(std::memory_order_relaxed) > 0 && gb_atomic32_load(&pool->processing_work_count) == 0) { + while (pool->tasks.count.load(std::memory_order_relaxed) > 0 && pool->processing_work_count.load() == 0) { mutex_lock(&pool->mutex); - gb_semaphore_post(&pool->sem_available, cast(i32)pool->tasks.count.load(std::memory_order_relaxed)); + semaphore_post(&pool->sem_available, cast(i32)pool->tasks.count.load(std::memory_order_relaxed)); mutex_unlock(&pool->mutex); } @@ -154,7 +154,7 @@ void thread_pool_wait_to_process(ThreadPool *pool) { GB_THREAD_PROC(worker_thread_internal) { ThreadPool *pool = cast(ThreadPool *)thread->user_data; while (pool->is_running) { - gb_semaphore_wait(&pool->sem_available); + semaphore_wait(&pool->sem_available); WorkerTask task = {}; if (thread_pool_try_and_pop_task(pool, &task)) { @@ -162,7 +162,7 @@ GB_THREAD_PROC(worker_thread_internal) { } } // Cascade - gb_semaphore_release(&pool->sem_available); + semaphore_release(&pool->sem_available); return 0; }