diff --git a/build_odin.sh b/build_odin.sh index 5ecb7709a..bdf80c534 100755 --- a/build_odin.sh +++ b/build_odin.sh @@ -50,7 +50,7 @@ config_darwin() { panic "Requirement: llvm-config must be base version smaller than 15" fi - LDFLAGS="$LDFLAGS -liconv -ldl" + LDFLAGS="$LDFLAGS -liconv -ldl -framework System" CXXFLAGS="$CXXFLAGS $($LLVM_CONFIG --cxxflags --ldflags)" LDFLAGS="$LDFLAGS -lLLVM-C" } diff --git a/src/common.cpp b/src/common.cpp index 3624446f1..3b6ea59e8 100644 --- a/src/common.cpp +++ b/src/common.cpp @@ -31,7 +31,8 @@ gb_internal gbAllocator heap_allocator(void); -#define for_array(index_, array_) for (isize index_ = 0; index_ < (array_).count; index_++) +#define for_array_off(index_, off_, array_) for (isize index_ = off_; index_ < (array_).count; index_++) +#define for_array(index_, array_) for_array_off(index_, 0, array_) gb_internal i32 next_pow2(i32 n); gb_internal i64 next_pow2(i64 n); @@ -908,4 +909,4 @@ gb_internal Slice did_you_mean_results(DidYouMeanAnswers *d) #if defined(GB_SYSTEM_WINDOWS) #pragma warning(pop) -#endif \ No newline at end of file +#endif diff --git a/src/thread_pool.cpp b/src/thread_pool.cpp index 6df991d7d..3565ef25a 100644 --- a/src/thread_pool.cpp +++ b/src/thread_pool.cpp @@ -3,164 +3,198 @@ struct WorkerTask; struct ThreadPool; -#define WORKER_TASK_PROC(name) isize name(void *data) -typedef WORKER_TASK_PROC(WorkerTaskProc); +gb_thread_local Thread *current_thread; gb_internal void thread_pool_init(ThreadPool *pool, gbAllocator const &a, isize thread_count, char const *worker_name); gb_internal void thread_pool_destroy(ThreadPool *pool); gb_internal bool thread_pool_add_task(ThreadPool *pool, WorkerTaskProc *proc, void *data); gb_internal void thread_pool_wait(ThreadPool *pool); - -struct WorkerTask { - WorkerTask * next; - WorkerTaskProc *do_work; - void * data; - ThreadPool * pool; -}; - - struct ThreadPool { gbAllocator allocator; - BlockingMutex mutex; - Condition task_cond; - + Slice threads; - - WorkerTask *task_queue; - - std::atomic ready; - std::atomic stop; - + std::atomic running; + + BlockingMutex task_lock; + Condition tasks_available; + + Futex tasks_left; }; -gb_internal THREAD_PROC(thread_pool_thread_proc); - gb_internal void thread_pool_init(ThreadPool *pool, gbAllocator const &a, isize thread_count, char const *worker_name) { + mutex_init(&pool->task_lock); + condition_init(&pool->tasks_available); + pool->allocator = a; - pool->stop = false; - mutex_init(&pool->mutex); - condition_init(&pool->task_cond); - - slice_init(&pool->threads, a, thread_count); - for_array(i, pool->threads) { + slice_init(&pool->threads, a, thread_count + 1); + + // setup the main thread + thread_init(pool, &pool->threads[0], 0); + current_thread = &pool->threads[0]; + + for_array_off(i, 1, pool->threads) { Thread *t = &pool->threads[i]; - thread_init_and_start(t, thread_pool_thread_proc, pool); + thread_init_and_start(pool, t, i); } + + pool->running = true; } gb_internal void thread_pool_destroy(ThreadPool *pool) { - mutex_lock(&pool->mutex); - pool->stop = true; - condition_broadcast(&pool->task_cond); - mutex_unlock(&pool->mutex); + pool->running = false; - for_array(i, pool->threads) { + for_array_off(i, 1, pool->threads) { Thread *t = &pool->threads[i]; + condition_broadcast(&pool->tasks_available); thread_join_and_destroy(t); } + for_array(i, pool->threads) { + free(pool->threads[i].queue); + } gb_free(pool->allocator, pool->threads.data); - mutex_destroy(&pool->mutex); - condition_destroy(&pool->task_cond); + mutex_destroy(&pool->task_lock); + condition_destroy(&pool->tasks_available); } -gb_internal bool thread_pool_queue_empty(ThreadPool *pool) { - return pool->task_queue == nullptr; +void thread_pool_queue_push(Thread *thread, WorkerTask task) { + uint64_t capture; + uint64_t new_capture; + do { + capture = thread->head_and_tail.load(); + + uint64_t mask = thread->capacity - 1; + uint64_t head = (capture >> 32) & mask; + uint64_t tail = ((uint32_t)capture) & mask; + + uint64_t new_head = (head + 1) & mask; + if (new_head == tail) { + GB_PANIC("Thread Queue Full!\n"); + } + + // This *must* be done in here, to avoid a potential race condition where we no longer own the slot by the time we're assigning + thread->queue[head] = task; + new_capture = (new_head << 32) | tail; + } while (!thread->head_and_tail.compare_exchange_weak(capture, new_capture)); + + thread->pool->tasks_left.fetch_add(1); + condition_broadcast(&thread->pool->tasks_available); } -gb_internal WorkerTask *thread_pool_queue_pop(ThreadPool *pool) { - GB_ASSERT(pool->task_queue != nullptr); - WorkerTask *task = pool->task_queue; - pool->task_queue = task->next; - return task; -} -gb_internal void thread_pool_queue_push(ThreadPool *pool, WorkerTask *task) { - GB_ASSERT(task != nullptr); - task->next = pool->task_queue; - pool->task_queue = task; +bool thread_pool_queue_pop(Thread *thread, WorkerTask *task) { + uint64_t capture; + uint64_t new_capture; + do { + capture = thread->head_and_tail.load(); + + uint64_t mask = thread->capacity - 1; + uint64_t head = (capture >> 32) & mask; + uint64_t tail = ((uint32_t)capture) & mask; + + uint64_t new_tail = (tail + 1) & mask; + if (tail == head) { + return false; + } + + // Making a copy of the task before we increment the tail, avoiding the same potential race condition as above + *task = thread->queue[tail]; + + new_capture = (head << 32) | new_tail; + } while (!thread->head_and_tail.compare_exchange_weak(capture, new_capture)); + + return true; } gb_internal bool thread_pool_add_task(ThreadPool *pool, WorkerTaskProc *proc, void *data) { - GB_ASSERT(proc != nullptr); - WorkerTask *task = gb_alloc_item(permanent_allocator(), WorkerTask); - if (task == nullptr) { - GB_PANIC("Out of memory"); - return false; - } - task->pool = pool; - task->do_work = proc; - task->data = data; + WorkerTask task = {}; + task.do_work = proc; + task.data = data; - mutex_lock(&pool->mutex); - thread_pool_queue_push(pool, task); - GB_ASSERT(pool->ready >= 0); - pool->ready.fetch_add(1); - condition_broadcast(&pool->task_cond); - mutex_unlock(&pool->mutex); + thread_pool_queue_push(current_thread, task); return true; } - -gb_internal void thread_pool_do_task(WorkerTask *task) { - task->do_work(task->data); -} - gb_internal void thread_pool_wait(ThreadPool *pool) { - if (pool->threads.count == 0) { - while (!thread_pool_queue_empty(pool)) { - thread_pool_do_task(thread_pool_queue_pop(pool)); - pool->ready.fetch_sub(1); - } - GB_ASSERT(pool->ready == 0); - return; - } - for (;;) { - mutex_lock(&pool->mutex); + WorkerTask task; - while (!pool->stop && pool->ready > 0 && thread_pool_queue_empty(pool)) { - condition_wait(&pool->task_cond, &pool->mutex); - } - if ((pool->stop || pool->ready == 0) && thread_pool_queue_empty(pool)) { - mutex_unlock(&pool->mutex); - return; + while (pool->tasks_left) { + + // if we've got tasks on our queue, run them + while (thread_pool_queue_pop(current_thread, &task)) { + task.do_work(task.data); + pool->tasks_left.fetch_sub(1); } - WorkerTask *task = thread_pool_queue_pop(pool); - mutex_unlock(&pool->mutex); - - thread_pool_do_task(task); - if (--pool->ready == 0) { - mutex_lock(&pool->mutex); - condition_broadcast(&pool->task_cond); - mutex_unlock(&pool->mutex); + + // is this mem-barriered enough? + // This *must* be executed in this order, so the futex wakes immediately + // if rem_tasks has changed since we checked last, otherwise the program + // will permanently sleep + Footex rem_tasks = pool->tasks_left.load(); + if (!rem_tasks) { + break; } + + tpool_wait_on_addr(&pool->tasks_left, rem_tasks); } } - gb_internal THREAD_PROC(thread_pool_thread_proc) { - ThreadPool *pool = cast(ThreadPool *)thread->user_data; - + WorkerTask task; + current_thread = thread; + ThreadPool *pool = current_thread->pool; + for (;;) { - mutex_lock(&pool->mutex); - - while (!pool->stop && thread_pool_queue_empty(pool)) { - condition_wait(&pool->task_cond, &pool->mutex); - } - if (pool->stop && thread_pool_queue_empty(pool)) { - mutex_unlock(&pool->mutex); - return 0; +work_start: + if (!pool->running) { + break; } - WorkerTask *task = thread_pool_queue_pop(pool); - mutex_unlock(&pool->mutex); - - thread_pool_do_task(task); - if (--pool->ready == 0) { - mutex_lock(&pool->mutex); - condition_broadcast(&pool->task_cond); - mutex_unlock(&pool->mutex); + // If we've got tasks to process, work through them + size_t finished_tasks = 0; + while (thread_pool_queue_pop(current_thread, &task)) { + task.do_work(task.data); + pool->tasks_left.fetch_sub(1); + + finished_tasks += 1; } + if (finished_tasks > 0 && !pool->tasks_left) { + tpool_wake_addr(&pool->tasks_left); + } + + // If there's still work somewhere and we don't have it, steal it + if (pool->tasks_left) { + isize idx = current_thread->idx; + for_array(i, pool->threads) { + if (!pool->tasks_left) { + break; + } + + idx = (idx + 1) % pool->threads.count; + Thread *thread = &pool->threads[idx]; + + WorkerTask task; + if (!thread_pool_queue_pop(thread, &task)) { + continue; + } + + task.do_work(task.data); + pool->tasks_left.fetch_sub(1); + + if (!pool->tasks_left) { + tpool_wake_addr(&pool->tasks_left); + } + + goto work_start; + } + } + + // if we've done all our work, and there's nothing to steal, go to sleep + mutex_lock(&pool->task_lock); + condition_wait(&pool->tasks_available, &pool->task_lock); + mutex_unlock(&pool->task_lock); } -} \ No newline at end of file + + return 0; +} diff --git a/src/threading.cpp b/src/threading.cpp index e92ed5e31..98b7aa0c2 100644 --- a/src/threading.cpp +++ b/src/threading.cpp @@ -11,24 +11,34 @@ struct RecursiveMutex; struct Semaphore; struct Condition; struct Thread; +struct ThreadPool; #define THREAD_PROC(name) isize name(struct Thread *thread) -typedef THREAD_PROC(ThreadProc); +gb_internal THREAD_PROC(thread_pool_thread_proc); + +#define WORKER_TASK_PROC(name) isize name(void *data) +typedef WORKER_TASK_PROC(WorkerTaskProc); + +typedef struct WorkerTask { + WorkerTaskProc *do_work; + void *data; +} WorkerTask; struct Thread { #if defined(GB_SYSTEM_WINDOWS) - void * win32_handle; + void *win32_handle; #else - pthread_t posix_handle; + pthread_t posix_handle; #endif + + isize idx; - ThreadProc * proc; - void * user_data; - isize user_index; - isize volatile return_value; + WorkerTask *queue; + size_t capacity; + std::atomic head_and_tail; - isize stack_size; - std::atomic is_running; + isize stack_size; + struct ThreadPool *pool; }; @@ -59,10 +69,9 @@ gb_internal void condition_wait_with_timeout(Condition *c, BlockingMutex *m, u32 gb_internal u32 thread_current_id(void); -gb_internal void thread_init_and_start (Thread *t, ThreadProc *proc, void *data); -gb_internal void thread_init_and_start_with_stack(Thread *t, ThreadProc *proc, void *data, isize stack_size); +gb_internal void thread_init (ThreadPool *pool, Thread *t, isize idx); +gb_internal void thread_init_and_start (ThreadPool *pool, Thread *t, isize idx); gb_internal void thread_join_and_destroy(Thread *t); -gb_internal bool thread_is_running (Thread const *t); gb_internal void thread_set_name (Thread *t, char const *name); gb_internal void yield_thread(void); @@ -325,47 +334,45 @@ gb_internal gb_inline void yield(void) { #endif } -gb_internal void private__thread_run(Thread *t) { - t->return_value = t->proc(t); -} - #if defined(GB_SYSTEM_WINDOWS) - gb_internal DWORD __stdcall internal_thread_proc(void *arg) { - Thread *t = cast(Thread *)arg; - t->is_running.store(true); - private__thread_run(t); - return 0; - } +gb_internal DWORD __stdcall internal_thread_proc(void *arg) { + Thread *t = cast(Thread *)arg; + thread_pool_thread_proc(t); + return 0; +} #else - gb_internal void *internal_thread_proc(void *arg) { - #if (GB_SYSTEM_LINUX) - // NOTE: Don't permit any signal delivery to threads on Linux. - sigset_t mask = {}; - sigfillset(&mask); - GB_ASSERT_MSG(pthread_sigmask(SIG_BLOCK, &mask, nullptr) == 0, "failed to block signals"); - #endif - - Thread *t = cast(Thread *)arg; - t->is_running.store(true); - private__thread_run(t); - return NULL; - } +gb_internal void *internal_thread_proc(void *arg) { +#if (GB_SYSTEM_LINUX) + // NOTE: Don't permit any signal delivery to threads on Linux. + sigset_t mask = {}; + sigfillset(&mask); + GB_ASSERT_MSG(pthread_sigmask(SIG_BLOCK, &mask, nullptr) == 0, "failed to block signals"); +#endif + + Thread *t = cast(Thread *)arg; + thread_pool_thread_proc(t); + return NULL; +} #endif -gb_internal void thread_init_and_start(Thread *t, ThreadProc *proc, void *user_data) { thread_init_and_start_with_stack(t, proc, user_data, 0); } - -gb_internal void thread_init_and_start_with_stack(Thread *t, ThreadProc *proc, void *user_data, isize stack_size) { +gb_internal void thread_init(ThreadPool *pool, Thread *t, isize idx) { gb_zero_item(t); #if defined(GB_SYSTEM_WINDOWS) t->win32_handle = INVALID_HANDLE_VALUE; #else t->posix_handle = 0; #endif - GB_ASSERT(!t->is_running.load()); - GB_ASSERT(proc != NULL); - t->proc = proc; - t->user_data = user_data; - t->stack_size = stack_size; + + t->capacity = 1 << 14; // must be a power of 2 + t->queue = (WorkerTask *)calloc(sizeof(WorkerTask), t->capacity); + t->head_and_tail = 0; + t->pool = pool; + t->idx = idx; +} + +gb_internal void thread_init_and_start(ThreadPool *pool, Thread *t, isize idx) { + thread_init(pool, t, idx); + isize stack_size = 0; #if defined(GB_SYSTEM_WINDOWS) t->win32_handle = CreateThread(NULL, stack_size, internal_thread_proc, t, 0, NULL); @@ -385,10 +392,6 @@ gb_internal void thread_init_and_start_with_stack(Thread *t, ThreadProc *proc, v } gb_internal void thread_join_and_destroy(Thread *t) { - if (!t->is_running.load()) { - return; - } - #if defined(GB_SYSTEM_WINDOWS) WaitForSingleObject(t->win32_handle, INFINITE); CloseHandle(t->win32_handle); @@ -397,11 +400,8 @@ gb_internal void thread_join_and_destroy(Thread *t) { pthread_join(t->posix_handle, NULL); t->posix_handle = 0; #endif - t->is_running.store(false); } -gb_internal bool thread_is_running(Thread const *t) { return t->is_running.load(); } - gb_internal void thread_set_name(Thread *t, char const *name) { #if defined(GB_COMPILER_MSVC) #pragma pack(push, 8) @@ -437,7 +437,104 @@ gb_internal void thread_set_name(Thread *t, char const *name) { #endif } +#if defined(GB_SYSTEM_LINUX) +#include +#include + +typedef std::atomic Futex; +typedef volatile int32_t Footex; + +gb_internal void tpool_wake_addr(Futex *addr) { + for (;;) { + int ret = syscall(SYS_futex, addr, FUTEX_WAKE, 1, NULL, NULL, 0); + if (ret == -1) { + perror("Futex wake"); + GB_PANIC("Failed in futex wake!\n"); + } else if (ret > 0) { + return; + } + } +} + +gb_internal void tpool_wait_on_addr(Futex *addr, Footex val) { + for (;;) { + int ret = syscall(SYS_futex, addr, FUTEX_WAIT, val, NULL, NULL, 0); + if (ret == -1) { + if (errno != EAGAIN) { + perror("Futex wait"); + GB_PANIC("Failed in futex wait!\n"); + } else { + return; + } + } else if (ret == 0) { + if (*addr != val) { + return; + } + } + } +} +#elif defined(GB_SYSTEM_OSX) + +typedef std::atomic Futex; +typedef volatile int64_t Footex; + +#define UL_COMPARE_AND_WAIT 0x00000001 +#define ULF_NO_ERRNO 0x01000000 + +int __ulock_wait(uint32_t operation, void *addr, uint64_t value, uint32_t timeout); /* timeout is specified in microseconds */ +int __ulock_wake(uint32_t operation, void *addr, uint64_t wake_value); + +gb_internal void tpool_wake_addr(Futex *addr) { + for (;;) { + int ret = __ulock_wake(UL_COMPARE_AND_WAIT | ULF_NO_ERRNO, addr, 0); + if (ret >= 0) { + return; + } + if (ret == EINTR || ret == EFAULT) { + continue; + } + if (ret == ENOENT) { + return; + } + GB_PANIC("Failed in futex wake!\n"); + } +} + +gb_internal void tpool_wait_on_addr(Futex *addr, Footex val) { + for (;;) { + int ret = __ulock_wait(UL_COMPARE_AND_WAIT | ULF_NO_ERRNO, addr, val, 0); + if (ret >= 0) { + if (*addr != val) { + return; + } + continue; + } + if (ret == EINTR || ret == EFAULT) { + continue; + } + if (ret == ENOENT) { + return; + } + + GB_PANIC("Failed in futex wait!\n"); + } +} +#elif defined(GB_SYSTEM_WINDOWS) +typedef std::atomic Futex; +typedef volatile int64_t Footex; + +gb_internal void tpool_wake_addr(Futex *addr) { + WakeByAddressSingle((void *)addr); +} + +gb_internal void tpool_wait_on_addr(Futex *addr, Footex val) { + for (;;) { + int ret = WaitOnAddress(addr, (void *)&val, sizeof(val), INFINITE); + if (*addr != val) break; + } +} +#endif #if defined(GB_SYSTEM_WINDOWS) #pragma warning(pop) -#endif \ No newline at end of file +#endif