diff --git a/src/queue.cpp b/src/queue.cpp index d69a2845c..ee8b1b086 100644 --- a/src/queue.cpp +++ b/src/queue.cpp @@ -71,6 +71,29 @@ void mpmc_destroy(MPMCQueue *q) { } +template +bool mpmc_internal_grow(MPMCQueue *q) { + mutex_lock(&q->mutex); + i32 old_size = q->mask+1; + i32 new_size = old_size*2; + resize_array_raw(&q->nodes, q->allocator, old_size, new_size); + if (q->nodes == nullptr) { + GB_PANIC("Unable to resize enqueue: %td -> %td", old_size, new_size); + mutex_unlock(&q->mutex); + return false; + } + resize_array_raw(&q->indices, q->allocator, old_size, new_size); + if (q->indices == nullptr) { + GB_PANIC("Unable to resize enqueue: %td -> %td", old_size, new_size); + mutex_unlock(&q->mutex); + return false; + } + mpmc_internal_init_indices(q->indices, old_size, new_size); + q->mask = new_size-1; + mutex_unlock(&q->mutex); + return true; +} + template i32 mpmc_enqueue(MPMCQueue *q, T const &data) { GB_ASSERT(q->mask != 0); @@ -78,8 +101,9 @@ i32 mpmc_enqueue(MPMCQueue *q, T const &data) { i32 head_idx = q->head_idx.load(std::memory_order_relaxed); for (;;) { - auto node = &q->nodes[head_idx & q->mask]; - auto node_idx_ptr = &q->indices[head_idx & q->mask]; + i32 index = head_idx & q->mask; + auto node = &q->nodes[index]; + auto node_idx_ptr = &q->indices[index]; i32 node_idx = node_idx_ptr->load(std::memory_order_acquire); i32 diff = node_idx - head_idx; @@ -91,24 +115,9 @@ i32 mpmc_enqueue(MPMCQueue *q, T const &data) { return q->count.fetch_add(1, std::memory_order_release); } } else if (diff < 0) { - mutex_lock(&q->mutex); - i32 old_size = q->mask+1; - i32 new_size = old_size*2; - resize_array_raw(&q->nodes, q->allocator, old_size, new_size); - if (q->nodes == nullptr) { - GB_PANIC("Unable to resize enqueue: %td -> %td", old_size, new_size); - mutex_unlock(&q->mutex); + if (!mpmc_internal_grow(q)) { return -1; } - resize_array_raw(&q->indices, q->allocator, old_size, new_size); - if (q->indices == nullptr) { - GB_PANIC("Unable to resize enqueue: %td -> %td", old_size, new_size); - mutex_unlock(&q->mutex); - return -1; - } - mpmc_internal_init_indices(q->indices, old_size, new_size); - q->mask = new_size-1; - mutex_unlock(&q->mutex); } else { head_idx = q->head_idx.load(std::memory_order_relaxed); }