From fe74b479c69fde6d7651e5dff6429736a170ec5d Mon Sep 17 00:00:00 2001 From: gingerBill Date: Thu, 20 May 2021 21:02:05 +0100 Subject: [PATCH] Begin changes to sync2 --- core/sync/sync2/atomic.odin | 2 + core/sync/sync2/primitives.odin | 4 +- core/sync/sync2/primitives_atomic.odin | 454 ++++++++++++++--------- core/sync/sync2/primitives_pthreads.odin | 2 +- 4 files changed, 274 insertions(+), 188 deletions(-) diff --git a/core/sync/sync2/atomic.odin b/core/sync/sync2/atomic.odin index fa86ec352..efefc8025 100644 --- a/core/sync/sync2/atomic.odin +++ b/core/sync/sync2/atomic.odin @@ -56,6 +56,7 @@ atomic_exchange_release :: intrinsics.atomic_xchg_rel; atomic_exchange_acqrel :: intrinsics.atomic_xchg_acqrel; atomic_exchange_relaxed :: intrinsics.atomic_xchg_relaxed; +// Returns value and optional ok boolean atomic_compare_exchange_strong :: intrinsics.atomic_cxchg; atomic_compare_exchange_strong_acquire :: intrinsics.atomic_cxchg_acq; atomic_compare_exchange_strong_release :: intrinsics.atomic_cxchg_rel; @@ -66,6 +67,7 @@ atomic_compare_exchange_strong_failacquire :: intrinsics.atomic_cxchg_fa atomic_compare_exchange_strong_acquire_failrelaxed :: intrinsics.atomic_cxchg_acq_failrelaxed; atomic_compare_exchange_strong_acqrel_failrelaxed :: intrinsics.atomic_cxchg_acqrel_failrelaxed; +// Returns value and optional ok boolean atomic_compare_exchange_weak :: intrinsics.atomic_cxchgweak; atomic_compare_exchange_weak_acquire :: intrinsics.atomic_cxchgweak_acq; atomic_compare_exchange_weak_release :: intrinsics.atomic_cxchgweak_rel; diff --git a/core/sync/sync2/primitives.odin b/core/sync/sync2/primitives.odin index 1ed83f706..e524586ec 100644 --- a/core/sync/sync2/primitives.odin +++ b/core/sync/sync2/primitives.odin @@ -15,7 +15,7 @@ mutex_lock :: proc(m: ^Mutex) { _mutex_lock(m); } -// mutex_lock unlocks m +// mutex_unlock unlocks m mutex_unlock :: proc(m: ^Mutex) { _mutex_unlock(m); } @@ -103,7 +103,7 @@ rw_mutex_shared_guard :: proc(m: ^RW_Mutex) -> bool { -// A Recusrive_Mutex is a recursive mutual exclusion lock +// A Recursive_Mutex is a recursive mutual exclusion lock // The zero value for a Recursive_Mutex is an unlocked mutex // // A Recursive_Mutex must not be copied after first use diff --git a/core/sync/sync2/primitives_atomic.odin b/core/sync/sync2/primitives_atomic.odin index 7043f8c84..aed01eb1f 100644 --- a/core/sync/sync2/primitives_atomic.odin +++ b/core/sync/sync2/primitives_atomic.odin @@ -1,159 +1,193 @@ -//+build linux, darwin, freebsd -//+private package sync2 -when !#config(ODIN_SYNC_USE_PTHREADS, true) { - import "core:time" import "core:runtime" -_Mutex_State :: enum i32 { +Atomic_Mutex_State :: enum i32 { Unlocked = 0, Locked = 1, Waiting = 2, } -_Mutex :: struct { - state: _Mutex_State, + + +// An Atomic_Mutex is a mutual exclusion lock +// The zero value for a Atomic_Mutex is an unlocked mutex +// +// An Atomic_Mutex must not be copied after first use +Atomic_Mutex :: struct { + state: Atomic_Mutex_State, } -_mutex_lock :: proc(m: ^Mutex) { - if atomic_xchg_rel(&m.impl.state, .Unlocked) != .Unlocked { - _mutex_unlock_slow(m); +// atomic_mutex_lock locks m +atomic_mutex_lock :: proc(m: ^Atomic_Mutex) { + @(cold) + lock_slow :: proc(m: ^Atomic_Mutex, curr_state: Atomic_Mutex_State) { + new_state := curr_state; // Make a copy of it + + spin_lock: for spin in 0.. 0; i -= 1 { + cpu_relax(); + } + } + + for { + if atomic_exchange_acquire(&m.state, .Waiting) == .Unlocked { + return; + } + + // TODO(bill): Use a Futex here for Linux to improve performance and error handling + cpu_relax(); + } + } + + + switch v := atomic_exchange_acquire(&m.state, .Locked); v { + case .Unlocked: + // Okay + case: fallthrough; + case .Locked, .Waiting: + lock_slow(m, v); } } -_mutex_unlock :: proc(m: ^Mutex) { - switch atomic_xchg_rel(&m.impl.state, .Unlocked) { +// atomic_mutex_unlock unlocks m +atomic_mutex_unlock :: proc(m: ^Atomic_Mutex) { + @(cold) + unlock_slow :: proc(m: ^Atomic_Mutex) { + // TODO(bill): Use a Futex here for Linux to improve performance and error handling + } + + + switch atomic_exchange_release(&m.state, .Unlocked) { case .Unlocked: unreachable(); case .Locked: // Okay case .Waiting: - _mutex_unlock_slow(m); + unlock_slow(m); } } -_mutex_try_lock :: proc(m: ^Mutex) -> bool { - _, ok := atomic_cxchg_acq(&m.impl.state, .Unlocked, .Locked); +// atomic_mutex_try_lock tries to lock m, will return true on success, and false on failure +atomic_mutex_try_lock :: proc(m: ^Atomic_Mutex) -> bool { + _, ok := atomic_compare_exchange_strong_acquire(&m.state, .Unlocked, .Locked); return ok; } -@(cold) -_mutex_lock_slow :: proc(m: ^Mutex, curr_state: _Mutex_State) { - new_state := curr_state; // Make a copy of it +// Example: +// +// if atomic_mutex_guard(&m) { +// ... +// } +// +@(deferred_in=atomic_mutex_unlock) +atomic_mutex_guard :: proc(m: ^Atomic_Mutex) -> bool { + atomic_mutex_lock(m); + return true; +} - spin_lock: for spin in 0.. 0; i -= 1 { - cpu_relax(); - } - } +Atomic_RW_Mutex_State_Writer_Mask :: Atomic_RW_Mutex_State(1<<(Atomic_RW_Mutex_State_Half_Width-1) - 1) << 1; +Atomic_RW_Mutex_State_Reader_Mask :: Atomic_RW_Mutex_State(1<<(Atomic_RW_Mutex_State_Half_Width-1) - 1) << Atomic_RW_Mutex_State_Half_Width; - for { - if atomic_xchg_acq(&m.impl.state, .Waiting) == .Unlocked { - return; - } - // TODO(bill): Use a Futex here for Linux to improve performance and error handling - cpu_relax(); +// An Atomic_RW_Mutex is a reader/writer mutual exclusion lock +// The lock can be held by any arbitrary number of readers or a single writer +// The zero value for an Atomic_RW_Mutex is an unlocked mutex +// +// An Atomic_RW_Mutex must not be copied after first use +Atomic_RW_Mutex :: struct { + state: Atomic_RW_Mutex_State, + mutex: Atomic_Mutex, + sema: Atomic_Sema, +} + +// atomic_rw_mutex_lock locks rw for writing (with a single writer) +// If the mutex is already locked for reading or writing, the mutex blocks until the mutex is available. +atomic_rw_mutex_lock :: proc(rw: ^Atomic_RW_Mutex) { + _ = atomic_add(&rw.state, Atomic_RW_Mutex_State_Writer); + atomic_mutex_lock(&rw.mutex); + + state := atomic_or(&rw.state, Atomic_RW_Mutex_State_Writer); + if state & Atomic_RW_Mutex_State_Reader_Mask != 0 { + atomic_sema_wait(&rw.sema); } } - -@(cold) -_mutex_unlock_slow :: proc(m: ^Mutex) { - // TODO(bill): Use a Futex here for Linux to improve performance and error handling +// atomic_rw_mutex_unlock unlocks rw for writing (with a single writer) +atomic_rw_mutex_unlock :: proc(rw: ^Atomic_RW_Mutex) { + _ = atomic_and(&rw.state, ~Atomic_RW_Mutex_State_Is_Writing); + atomic_mutex_unlock(&rw.mutex); } - -RW_Mutex_State :: distinct uint; -RW_Mutex_State_Half_Width :: size_of(RW_Mutex_State)*8/2; -RW_Mutex_State_Is_Writing :: RW_Mutex_State(1); -RW_Mutex_State_Writer :: RW_Mutex_State(1)<<1; -RW_Mutex_State_Reader :: RW_Mutex_State(1)< bool { - if mutex_try_lock(&rw.impl.mutex) { - state := atomic_load(&rw.impl.state); - if state & RW_Mutex_State_Reader_Mask == 0 { - _ = atomic_or(&rw.impl.state, RW_Mutex_State_Is_Writing); +// atomic_rw_mutex_try_lock tries to lock rw for writing (with a single writer) +atomic_rw_mutex_try_lock :: proc(rw: ^Atomic_RW_Mutex) -> bool { + if atomic_mutex_try_lock(&rw.mutex) { + state := atomic_load(&rw.state); + if state & Atomic_RW_Mutex_State_Reader_Mask == 0 { + _ = atomic_or(&rw.state, Atomic_RW_Mutex_State_Is_Writing); return true; } - mutex_unlock(&rw.impl.mutex); + atomic_mutex_unlock(&rw.mutex); } return false; } -_rw_mutex_shared_lock :: proc(rw: ^RW_Mutex) { - state := atomic_load(&rw.impl.state); - for state & (RW_Mutex_State_Is_Writing|RW_Mutex_State_Writer_Mask) == 0 { +// atomic_rw_mutex_shared_lock locks rw for reading (with arbitrary number of readers) +atomic_rw_mutex_shared_lock :: proc(rw: ^Atomic_RW_Mutex) { + state := atomic_load(&rw.state); + for state & (Atomic_RW_Mutex_State_Is_Writing|Atomic_RW_Mutex_State_Writer_Mask) == 0 { ok: bool; - state, ok = atomic_cxchgweak(&rw.impl.state, state, state + RW_Mutex_State_Reader); + state, ok = atomic_compare_exchange_weak(&rw.state, state, state + Atomic_RW_Mutex_State_Reader); if ok { return; } } - mutex_lock(&rw.impl.mutex); - _ = atomic_add(&rw.impl.state, RW_Mutex_State_Reader); - mutex_unlock(&rw.impl.mutex); + atomic_mutex_lock(&rw.mutex); + _ = atomic_add(&rw.state, Atomic_RW_Mutex_State_Reader); + atomic_mutex_unlock(&rw.mutex); } -_rw_mutex_shared_unlock :: proc(rw: ^RW_Mutex) { - state := atomic_sub(&rw.impl.state, RW_Mutex_State_Reader); +// atomic_rw_mutex_shared_unlock unlocks rw for reading (with arbitrary number of readers) +atomic_rw_mutex_shared_unlock :: proc(rw: ^Atomic_RW_Mutex) { + state := atomic_sub(&rw.state, Atomic_RW_Mutex_State_Reader); - if (state & RW_Mutex_State_Reader_Mask == RW_Mutex_State_Reader) && - (state & RW_Mutex_State_Is_Writing != 0) { - sema_post(&rw.impl.sema); + if (state & Atomic_RW_Mutex_State_Reader_Mask == Atomic_RW_Mutex_State_Reader) && + (state & Atomic_RW_Mutex_State_Is_Writing != 0) { + atomic_sema_post(&rw.sema); } } -_rw_mutex_try_shared_lock :: proc(rw: ^RW_Mutex) -> bool { - state := atomic_load(&rw.impl.state); - if state & (RW_Mutex_State_Is_Writing|RW_Mutex_State_Writer_Mask) == 0 { - _, ok := atomic_cxchg(&rw.impl.state, state, state + RW_Mutex_State_Reader); +// atomic_rw_mutex_try_shared_lock tries to lock rw for reading (with arbitrary number of readers) +atomic_rw_mutex_try_shared_lock :: proc(rw: ^Atomic_RW_Mutex) -> bool { + state := atomic_load(&rw.state); + if state & (Atomic_RW_Mutex_State_Is_Writing|Atomic_RW_Mutex_State_Writer_Mask) == 0 { + _, ok := atomic_compare_exchange_strong(&rw.state, state, state + Atomic_RW_Mutex_State_Reader); if ok { return true; } } - if mutex_try_lock(&rw.impl.mutex) { - _ = atomic_add(&rw.impl.state, RW_Mutex_State_Reader); - mutex_unlock(&rw.impl.mutex); + if atomic_mutex_try_lock(&rw.mutex) { + _ = atomic_add(&rw.state, Atomic_RW_Mutex_State_Reader); + atomic_mutex_unlock(&rw.mutex); return true; } @@ -161,127 +195,177 @@ _rw_mutex_try_shared_lock :: proc(rw: ^RW_Mutex) -> bool { } -_Recursive_Mutex :: struct { - owner: int, - recursion: int, - mutex: Mutex, +// Example: +// +// if atomic_rw_mutex_guard(&m) { +// ... +// } +// +@(deferred_in=atomic_rw_mutex_unlock) +atomic_rw_mutex_guard :: proc(m: ^Atomic_RW_Mutex) -> bool { + atomic_rw_mutex_lock(m); + return true; } -_recursive_mutex_lock :: proc(m: ^Recursive_Mutex) { - tid := runtime.current_thread_id(); - if tid != m.impl.owner { - mutex_lock(&m.impl.mutex); - } - // inside the lock - m.impl.owner = tid; - m.impl.recursion += 1; -} - -_recursive_mutex_unlock :: proc(m: ^Recursive_Mutex) { - tid := runtime.current_thread_id(); - assert(tid == m.impl.owner); - m.impl.recursion -= 1; - recursion := m.impl.recursion; - if recursion == 0 { - m.impl.owner = 0; - } - if recursion == 0 { - mutex_unlock(&m.impl.mutex); - } - // outside the lock - -} - -_recursive_mutex_try_lock :: proc(m: ^Recursive_Mutex) -> bool { - tid := runtime.current_thread_id(); - if m.impl.owner == tid { - return mutex_try_lock(&m.impl.mutex); - } - if !mutex_try_lock(&m.impl.mutex) { - return false; - } - // inside the lock - m.impl.owner = tid; - m.impl.recursion += 1; +// Example: +// +// if atomic_rw_mutex_shared_guard(&m) { +// ... +// } +// +@(deferred_in=atomic_rw_mutex_shared_unlock) +atomic_rw_mutex_shared_guard :: proc(m: ^Atomic_RW_Mutex) -> bool { + atomic_rw_mutex_shared_lock(m); return true; } +// An Atomic_Recursive_Mutex is a recursive mutual exclusion lock +// The zero value for a Recursive_Mutex is an unlocked mutex +// +// An Atomic_Recursive_Mutex must not be copied after first use +Atomic_Recursive_Mutex :: struct { + owner: int, + recursion: int, + mutex: Mutex, +} +atomic_recursive_mutex_lock :: proc(m: ^Atomic_Recursive_Mutex) { + tid := runtime.current_thread_id(); + if tid != m.owner { + mutex_lock(&m.mutex); + } + // inside the lock + m.owner = tid; + m.recursion += 1; +} + +atomic_recursive_mutex_unlock :: proc(m: ^Atomic_Recursive_Mutex) { + tid := runtime.current_thread_id(); + assert(tid == m.owner); + m.recursion -= 1; + recursion := m.recursion; + if recursion == 0 { + m.owner = 0; + } + if recursion == 0 { + mutex_unlock(&m.mutex); + } + // outside the lock + +} + +atomic_recursive_mutex_try_lock :: proc(m: ^Atomic_Recursive_Mutex) -> bool { + tid := runtime.current_thread_id(); + if m.owner == tid { + return mutex_try_lock(&m.mutex); + } + if !mutex_try_lock(&m.mutex) { + return false; + } + // inside the lock + m.owner = tid; + m.recursion += 1; + return true; +} + + +// Example: +// +// if atomic_recursive_mutex_guard(&m) { +// ... +// } +// +@(deferred_in=atomic_recursive_mutex_unlock) +atomic_recursive_mutex_guard :: proc(m: ^Atomic_Recursive_Mutex) -> bool { + atomic_recursive_mutex_lock(m); + return true; +} + + + + +@(private="file") Queue_Item :: struct { next: ^Queue_Item, futex: i32, } +@(private="file") queue_item_wait :: proc(item: ^Queue_Item) { - for atomic_load_acq(&item.futex) == 0 { + for atomic_load_acquire(&item.futex) == 0 { // TODO(bill): Use a Futex here for Linux to improve performance and error handling cpu_relax(); } } +@(private="file") queue_item_signal :: proc(item: ^Queue_Item) { - atomic_store_rel(&item.futex, 1); + atomic_store_release(&item.futex, 1); // TODO(bill): Use a Futex here for Linux to improve performance and error handling } -_Cond :: struct { - queue_mutex: Mutex, +// Atomic_Cond implements a condition variable, a rendezvous point for threads +// waiting for signalling the occurence of an event +// +// An Atomic_Cond must not be copied after first use +Atomic_Cond :: struct { + queue_mutex: Atomic_Mutex, queue_head: ^Queue_Item, pending: bool, } -_cond_wait :: proc(c: ^Cond, m: ^Mutex) { +atomic_cond_wait :: proc(c: ^Atomic_Cond, m: ^Atomic_Mutex) { waiter := &Queue_Item{}; - mutex_lock(&c.impl.queue_mutex); - waiter.next = c.impl.queue_head; - c.impl.queue_head = waiter; + atomic_mutex_lock(&c.queue_mutex); + waiter.next = c.queue_head; + c.queue_head = waiter; - atomic_store(&c.impl.pending, true); - mutex_unlock(&c.impl.queue_mutex); + atomic_store(&c.pending, true); + atomic_mutex_unlock(&c.queue_mutex); - mutex_unlock(m); + atomic_mutex_unlock(m); queue_item_wait(waiter); - mutex_lock(m); + atomic_mutex_lock(m); } -_cond_wait_with_timeout :: proc(c: ^Cond, m: ^Mutex, timeout: time.Duration) -> bool { +atomic_cond_wait_with_timeout :: proc(c: ^Atomic_Cond, m: ^Atomic_Mutex, timeout: time.Duration) -> bool { // TODO(bill): _cond_wait_with_timeout for unix return false; } -_cond_signal :: proc(c: ^Cond) { - if !atomic_load(&c.impl.pending) { +atomic_cond_signal :: proc(c: ^Atomic_Cond) { + if !atomic_load(&c.pending) { return; } - mutex_lock(&c.impl.queue_mutex); - waiter := c.impl.queue_head; - if c.impl.queue_head != nil { - c.impl.queue_head = c.impl.queue_head.next; + atomic_mutex_lock(&c.queue_mutex); + waiter := c.queue_head; + if c.queue_head != nil { + c.queue_head = c.queue_head.next; } - atomic_store(&c.impl.pending, c.impl.queue_head != nil); - mutex_unlock(&c.impl.queue_mutex); + atomic_store(&c.pending, c.queue_head != nil); + atomic_mutex_unlock(&c.queue_mutex); if waiter != nil { queue_item_signal(waiter); } } -_cond_broadcast :: proc(c: ^Cond) { - if !atomic_load(&c.impl.pending) { +atomic_cond_broadcast :: proc(c: ^Atomic_Cond) { + if !atomic_load(&c.pending) { return; } - atomic_store(&c.impl.pending, false); + atomic_store(&c.pending, false); - mutex_lock(&c.impl.queue_mutex); - waiters := c.impl.queue_head; - c.impl.queue_head = nil; - mutex_unlock(&c.impl.queue_mutex); + atomic_mutex_lock(&c.queue_mutex); + waiters := c.queue_head; + c.queue_head = nil; + atomic_mutex_unlock(&c.queue_mutex); for waiters != nil { queue_item_signal(waiters); @@ -289,35 +373,35 @@ _cond_broadcast :: proc(c: ^Cond) { } } -_Sema :: struct { - mutex: Mutex, - cond: Cond, +// When waited upon, blocks until the internal count is greater than zero, then subtracts one. +// Posting to the semaphore increases the count by one, or the provided amount. +// +// An Atomic_Sema must not be copied after first use +Atomic_Sema :: struct { + mutex: Atomic_Mutex, + cond: Atomic_Cond, count: int, } -_sema_wait :: proc(s: ^Sema) { - mutex_lock(&s.impl.mutex); - defer mutex_unlock(&s.impl.mutex); +atomic_sema_wait :: proc(s: ^Atomic_Sema) { + atomic_mutex_lock(&s.mutex); + defer atomic_mutex_unlock(&s.mutex); - for s.impl.count == 0 { - cond_wait(&s.impl.cond, &s.impl.mutex); + for s.count == 0 { + atomic_cond_wait(&s.cond, &s.mutex); } - s.impl.count -= 1; - if s.impl.count > 0 { - cond_signal(&s.impl.cond); + s.count -= 1; + if s.count > 0 { + atomic_cond_signal(&s.cond); } } -_sema_post :: proc(s: ^Sema, count := 1) { - mutex_lock(&s.impl.mutex); - defer mutex_unlock(&s.impl.mutex); +atomic_sema_post :: proc(s: ^Atomic_Sema, count := 1) { + atomic_mutex_lock(&s.mutex); + defer atomic_mutex_unlock(&s.mutex); - s.impl.count += count; - cond_signal(&s.impl.cond); + s.count += count; + atomic_cond_signal(&s.cond); } - - - -} // !ODIN_SYNC_USE_PTHREADS diff --git a/core/sync/sync2/primitives_pthreads.odin b/core/sync/sync2/primitives_pthreads.odin index 5fd43d871..7e45e0565 100644 --- a/core/sync/sync2/primitives_pthreads.odin +++ b/core/sync/sync2/primitives_pthreads.odin @@ -1,4 +1,4 @@ -//+build linux, darwin, freebsd +//+build linux, freebsd //+private package sync2