From d4d9f55556ffa71e519ffcc5df431edc097746e2 Mon Sep 17 00:00:00 2001 From: avanspector Date: Fri, 1 Mar 2024 00:41:28 +0100 Subject: [PATCH] Update threading.cpp --- src/threading.cpp | 145 ++++++++++++++++++++++++++++++++-------------- 1 file changed, 102 insertions(+), 43 deletions(-) diff --git a/src/threading.cpp b/src/threading.cpp index 56f246955..a469435d2 100644 --- a/src/threading.cpp +++ b/src/threading.cpp @@ -840,86 +840,131 @@ gb_internal void futex_wait(Futex *f, Footex val) { #include #include + +struct _Spinlock { + std::atomic_flag state; + + void init() { + state.clear(); + } + + void lock() { + while (state.test_and_set(std::memory_order_acquire)) { + #if defined(GB_CPU_X86) + _mm_pause(); + #else + (void)0; // spin... + #endif + } + } + + void unlock() { + state.clear(std::memory_order_release); + } +}; + +struct Futex_Waitq; -struct Futex_Wait_Node { +struct Futex_Waiter { + _Spinlock lock; pthread_t thread; Futex *futex; - Futex_Wait_Node *prev, *next; + Futex_Waitq *waitq; + Futex_Waiter *prev, *next; }; -struct Futex_Wait_Queue { - std::atomic_flag spinlock; - Futex_Wait_Node list; +struct Futex_Waitq { + _Spinlock lock; + Futex_Waiter list; - void lock() { - while (spinlock.test_and_set(std::memory_order_acquire)) { - ; // spin... - } - } - - void unlock() { - spinlock.clear(std::memory_order_release); + void init() { + auto head = &list; + head->prev = head->next = head; } }; // FIXME: This approach may scale badly in the future, // possible solution - hash map (leads to deadlocks now). -Futex_Wait_Queue g_waitq = { - .spinlock = ATOMIC_FLAG_INIT, +Futex_Waitq g_waitq = { + .lock = ATOMIC_FLAG_INIT, .list = { .prev = &g_waitq.list, .next = &g_waitq.list, }, }; -Futex_Wait_Queue *get_wait_queue(Futex *f) { +Futex_Waitq *get_waitq(Futex *f) { // Future hash map method... return &g_waitq; } void futex_signal(Futex *f) { - auto waitq = get_wait_queue(f); + auto waitq = get_waitq(f); - waitq->lock(); + waitq->lock.lock(); auto head = &waitq->list; for (auto waiter = head->next; waiter != head; waiter = waiter->next) { - if (waiter->futex == f) { - pthread_kill(waiter->thread, SIGCONT); - break; - } + if (waiter->futex != f) { + continue; + } + waitq->lock.unlock(); + pthread_kill(waiter->thread, SIGCONT); + return; } - waitq->unlock(); + waitq->lock.unlock(); } void futex_broadcast(Futex *f) { - auto waitq = get_wait_queue(f); + auto waitq = get_waitq(f); - waitq->lock(); + waitq->lock.lock(); auto head = &waitq->list; for (auto waiter = head->next; waiter != head; waiter = waiter->next) { - if (waiter->futex == f) { + if (waiter->futex != f) { + continue; + } + if (waiter->next == head) { + waitq->lock.unlock(); pthread_kill(waiter->thread, SIGCONT); - } + return; + } else { + pthread_kill(waiter->thread, SIGCONT); + } } - waitq->unlock(); + waitq->lock.unlock(); } void futex_wait(Futex *f, Footex val) { - auto waitq = get_wait_queue(f); - - waitq->lock(); - - auto head = &waitq->list; - Futex_Wait_Node waiter; + Futex_Waiter waiter; waiter.thread = pthread_self(); waiter.futex = f; - waiter.prev = head; - waiter.next = head->next; + + auto waitq = get_waitq(f); + while (waitq->lock.state.test_and_set(std::memory_order_acquire)) { + if (f->load(std::memory_order_relaxed) != val) { + return; + } + #if defined(GB_CPU_X86) + _mm_pause(); + #else + (void)0; // spin... + #endif + } + + waiter.waitq = waitq; + waiter.lock.init(); + waiter.lock.lock(); + + auto head = &waitq->list; + waiter.prev = head->prev; + waiter.next = head; + waiter.prev->next = &waiter; + waiter.next->prev = &waiter; waiter.prev->next = &waiter; waiter.next->prev = &waiter; @@ -928,12 +973,25 @@ void futex_wait(Futex *f, Footex val) { sigemptyset(&mask); sigaddset(&mask, SIGCONT); pthread_sigmask(SIG_BLOCK, &mask, &old_mask); - - if (*f == val) { - waitq->unlock(); - int sig; - sigwait(&mask, &sig); - waitq->lock(); + + if (f->load(std::memory_order_relaxed) == val) { + waiter.lock.unlock(); + waitq->lock.unlock(); + + int sig; + sigwait(&mask, &sig); + + waitq->lock.lock(); + waiter.lock.lock(); + + while (waitq != waiter.waitq) { + auto req = waiter.waitq; + waiter.lock.unlock(); + waitq->lock.unlock(); + waitq = req; + waitq->lock.lock(); + waiter.lock.lock(); + } } waiter.prev->next = waiter.next; @@ -941,7 +999,8 @@ void futex_wait(Futex *f, Footex val) { pthread_sigmask(SIG_SETMASK, &old_mask, NULL); - waitq->unlock(); + waiter.lock.unlock(); + waitq->lock.unlock(); } #endif