diff --git a/src/thread_pool.cpp b/src/thread_pool.cpp index 5dbbe37c4..2b176db1c 100644 --- a/src/thread_pool.cpp +++ b/src/thread_pool.cpp @@ -16,7 +16,6 @@ struct ThreadPool { std::atomic running; Futex tasks_available; - Futex tasks_left; }; @@ -46,7 +45,7 @@ gb_internal void thread_pool_destroy(ThreadPool *pool) { for_array_off(i, 1, pool->threads) { Thread *t = &pool->threads[i]; - pool->tasks_available.fetch_add(1, std::memory_order_relaxed); + pool->tasks_available.fetch_add(1, std::memory_order_acquire); futex_broadcast(&pool->tasks_available); thread_join_and_destroy(t); } @@ -54,51 +53,86 @@ gb_internal void thread_pool_destroy(ThreadPool *pool) { gb_free(pool->threads_allocator, pool->threads.data); } +TaskRingBuffer *taskring_grow(TaskRingBuffer *ring, ssize_t bottom, ssize_t top) { + TaskRingBuffer *new_ring = taskring_init(ring->size * 2); + for (ssize_t i = top; i < bottom; i++) { + new_ring->buffer[i % new_ring->size] = ring->buffer[i % ring->size]; + } + return new_ring; +} + void thread_pool_queue_push(Thread *thread, WorkerTask task) { - u64 capture; - u64 new_capture; - do { - capture = thread->head_and_tail.load(); + ssize_t bot = thread->queue.bottom.load(std::memory_order_relaxed); + ssize_t top = thread->queue.top.load(std::memory_order_acquire); + TaskRingBuffer *cur_ring = thread->queue.ring.load(std::memory_order_relaxed); - u64 mask = thread->capacity - 1; - u64 head = (capture >> 32) & mask; - u64 tail = ((u32)capture) & mask; + ssize_t size = bot - top; + if (size > (cur_ring->size - 1)) { + // Queue is full + thread->queue.ring = taskring_grow(thread->queue.ring, bot, top); + cur_ring = thread->queue.ring.load(std::memory_order_relaxed); + } - u64 new_head = (head + 1) & mask; - GB_ASSERT_MSG(new_head != tail, "Thread Queue Full!"); - - // This *must* be done in here, to avoid a potential race condition where we no longer own the slot by the time we're assigning - thread->queue[head] = task; - new_capture = (new_head << 32) | tail; - } while (!thread->head_and_tail.compare_exchange_weak(capture, new_capture)); + cur_ring->buffer[bot % cur_ring->size] = task; + std::atomic_thread_fence(std::memory_order_release); + thread->queue.bottom.store(bot + 1, std::memory_order_relaxed); thread->pool->tasks_left.fetch_add(1, std::memory_order_release); thread->pool->tasks_available.fetch_add(1, std::memory_order_relaxed); futex_broadcast(&thread->pool->tasks_available); } -bool thread_pool_queue_pop(Thread *thread, WorkerTask *task) { - u64 capture; - u64 new_capture; - do { - capture = thread->head_and_tail.load(std::memory_order_acquire); +bool thread_pool_queue_take(Thread *thread, WorkerTask *task) { + ssize_t bot = thread->queue.bottom.load(std::memory_order_relaxed) - 1; + TaskRingBuffer *cur_ring = thread->queue.ring.load(std::memory_order_relaxed); + thread->queue.bottom.store(bot, std::memory_order_relaxed); + std::atomic_thread_fence(std::memory_order_seq_cst); - u64 mask = thread->capacity - 1; - u64 head = (capture >> 32) & mask; - u64 tail = ((u32)capture) & mask; + ssize_t top = thread->queue.top.load(std::memory_order_relaxed); + if (top <= bot) { - u64 new_tail = (tail + 1) & mask; - if (tail == head) { - return false; + // Queue is not empty + *task = cur_ring->buffer[bot % cur_ring->size]; + if (top == bot) { + // Only one entry left in queue + if (!thread->queue.top.compare_exchange_strong(top, top + 1, std::memory_order_seq_cst, std::memory_order_relaxed)) { + // Race failed + thread->queue.bottom.store(bot + 1, std::memory_order_relaxed); + return false; + } + + thread->queue.bottom.store(bot + 1, std::memory_order_relaxed); + return true; } - // Making a copy of the task before we increment the tail, avoiding the same potential race condition as above - *task = thread->queue[tail]; + // We got a task without hitting a race + return true; + } else { + // Queue is empty + thread->queue.bottom.store(bot + 1, std::memory_order_relaxed); + return false; + } +} - new_capture = (head << 32) | new_tail; - } while (!thread->head_and_tail.compare_exchange_weak(capture, new_capture, std::memory_order_release)); +bool thread_pool_queue_steal(Thread *thread, WorkerTask *task) { + ssize_t top = thread->queue.top.load(std::memory_order_acquire); + std::atomic_thread_fence(std::memory_order_seq_cst); + ssize_t bot = thread->queue.bottom.load(std::memory_order_acquire); - return true; + bool ret = false; + if (top < bot) { + // Queue is not empty + TaskRingBuffer *cur_ring = thread->queue.ring.load(std::memory_order_consume); + *task = cur_ring->buffer[top % cur_ring->size]; + + if (!thread->queue.top.compare_exchange_strong(top, top + 1, std::memory_order_seq_cst, std::memory_order_relaxed)) { + // Race failed + ret = false; + } else { + ret = true; + } + } + return ret; } gb_internal bool thread_pool_add_task(ThreadPool *pool, WorkerTaskProc *proc, void *data) { @@ -115,12 +149,11 @@ gb_internal void thread_pool_wait(ThreadPool *pool) { while (pool->tasks_left.load(std::memory_order_acquire)) { // if we've got tasks on our queue, run them - while (thread_pool_queue_pop(current_thread, &task)) { + while (thread_pool_queue_take(current_thread, &task)) { task.do_work(task.data); pool->tasks_left.fetch_sub(1, std::memory_order_release); } - // is this mem-barriered enough? // This *must* be executed in this order, so the futex wakes immediately // if rem_tasks has changed since we checked last, otherwise the program @@ -145,7 +178,7 @@ gb_internal THREAD_PROC(thread_pool_thread_proc) { usize finished_tasks = 0; i32 state; - while (thread_pool_queue_pop(current_thread, &task)) { + while (thread_pool_queue_take(current_thread, &task)) { task.do_work(task.data); pool->tasks_left.fetch_sub(1, std::memory_order_release); @@ -167,7 +200,7 @@ gb_internal THREAD_PROC(thread_pool_thread_proc) { Thread *thread = &pool->threads.data[idx]; WorkerTask task; - if (thread_pool_queue_pop(thread, &task)) { + if (thread_pool_queue_steal(thread, &task)) { task.do_work(task.data); pool->tasks_left.fetch_sub(1, std::memory_order_release); @@ -182,6 +215,7 @@ gb_internal THREAD_PROC(thread_pool_thread_proc) { // if we've done all our work, and there's nothing to steal, go to sleep state = pool->tasks_available.load(std::memory_order_acquire); + if (!pool->running) { break; } futex_wait(&pool->tasks_available, state); main_loop_continue:; diff --git a/src/threading.cpp b/src/threading.cpp index 717dcb874..dda98631b 100644 --- a/src/threading.cpp +++ b/src/threading.cpp @@ -46,6 +46,18 @@ typedef struct WorkerTask { void *data; } WorkerTask; +typedef struct TaskRingBuffer { + std::atomic size; + std::atomic buffer; +} TaskRingBuffer; + +typedef struct TaskQueue { + std::atomic top; + std::atomic bottom; + + std::atomic ring; +} TaskQueue; + struct Thread { #if defined(GB_SYSTEM_WINDOWS) void *win32_handle; @@ -54,12 +66,9 @@ struct Thread { #endif isize idx; - - WorkerTask *queue; - size_t capacity; - std::atomic head_and_tail; - isize stack_size; + + struct TaskQueue queue; struct ThreadPool *pool; }; @@ -551,6 +560,18 @@ gb_internal void *internal_thread_proc(void *arg) { } #endif +TaskRingBuffer *taskring_init(ssize_t size) { + TaskRingBuffer *ring = (TaskRingBuffer *)gb_alloc(heap_allocator(), sizeof(TaskRingBuffer)); + ring->size = size; + ring->buffer = (WorkerTask *)gb_alloc_array(heap_allocator(), WorkerTask, ring->size); + return ring; +} + +void thread_queue_destroy(TaskQueue *q) { + gb_free(heap_allocator(), (*q->ring).buffer); + gb_free(heap_allocator(), q->ring); +} + gb_internal void thread_init(ThreadPool *pool, Thread *t, isize idx) { gb_zero_item(t); #if defined(GB_SYSTEM_WINDOWS) @@ -559,14 +580,12 @@ gb_internal void thread_init(ThreadPool *pool, Thread *t, isize idx) { t->posix_handle = 0; #endif - t->capacity = 1 << 14; // must be a power of 2 - t->queue = gb_alloc_array(heap_allocator(), WorkerTask, t->capacity); - t->head_and_tail = 0; + // Size must be a power of 2 + t->queue.ring = taskring_init(1 << 14); t->pool = pool; t->idx = idx; } - gb_internal void thread_init_and_start(ThreadPool *pool, Thread *t, isize idx) { thread_init(pool, t, idx); isize stack_size = 0; @@ -598,7 +617,7 @@ gb_internal void thread_join_and_destroy(Thread *t) { t->posix_handle = 0; #endif - gb_free(heap_allocator(), t->queue); + thread_queue_destroy(&t->queue); } gb_internal void thread_set_name(Thread *t, char const *name) {