mirror of
https://github.com/odin-lang/Odin.git
synced 2025-12-29 17:34:34 +00:00
420 lines
10 KiB
Odin
420 lines
10 KiB
Odin
package sync
|
|
|
|
import "core:time"
|
|
import vg "core:sys/valgrind"
|
|
_ :: vg
|
|
|
|
// A Wait_Group waits for a collection of threads to finish
|
|
//
|
|
// A Wait_Group must not be copied after first use
|
|
Wait_Group :: struct #no_copy {
|
|
counter: int,
|
|
mutex: Mutex,
|
|
cond: Cond,
|
|
}
|
|
|
|
wait_group_add :: proc "contextless" (wg: ^Wait_Group, delta: int) {
|
|
if delta == 0 {
|
|
return
|
|
}
|
|
|
|
guard(&wg.mutex)
|
|
|
|
atomic_add(&wg.counter, delta)
|
|
if wg.counter < 0 {
|
|
_panic("sync.Wait_Group negative counter")
|
|
}
|
|
if wg.counter == 0 {
|
|
cond_broadcast(&wg.cond)
|
|
if wg.counter != 0 {
|
|
_panic("sync.Wait_Group misuse: sync.wait_group_add called concurrently with sync.wait_group_wait")
|
|
}
|
|
}
|
|
}
|
|
|
|
wait_group_done :: proc "contextless" (wg: ^Wait_Group) {
|
|
wait_group_add(wg, -1)
|
|
}
|
|
|
|
wait_group_wait :: proc "contextless" (wg: ^Wait_Group) {
|
|
guard(&wg.mutex)
|
|
|
|
if wg.counter != 0 {
|
|
cond_wait(&wg.cond, &wg.mutex)
|
|
if wg.counter != 0 {
|
|
_panic("sync.Wait_Group misuse: sync.wait_group_add called concurrently with sync.wait_group_wait")
|
|
}
|
|
}
|
|
}
|
|
|
|
wait_group_wait_with_timeout :: proc "contextless" (wg: ^Wait_Group, duration: time.Duration) -> bool {
|
|
if duration <= 0 {
|
|
return false
|
|
}
|
|
guard(&wg.mutex)
|
|
|
|
if wg.counter != 0 {
|
|
if !cond_wait_with_timeout(&wg.cond, &wg.mutex, duration) {
|
|
return false
|
|
}
|
|
if wg.counter != 0 {
|
|
_panic("sync.Wait_Group misuse: sync.wait_group_add called concurrently with sync.wait_group_wait")
|
|
}
|
|
}
|
|
return true
|
|
}
|
|
|
|
|
|
|
|
/*
|
|
A barrier enabling multiple threads to synchronize the beginning of some computation
|
|
|
|
Example:
|
|
package example
|
|
|
|
import "core:fmt"
|
|
import "core:sync"
|
|
import "core:thread"
|
|
|
|
barrier := &sync.Barrier{}
|
|
|
|
main :: proc "contextless" () {
|
|
fmt.println("Start")
|
|
|
|
THREAD_COUNT :: 4
|
|
threads: [THREAD_COUNT]^thread.Thread
|
|
|
|
sync.barrier_init(barrier, THREAD_COUNT)
|
|
|
|
for _, i in threads {
|
|
threads[i] = thread.create_and_start(proc(t: ^thread.Thread) {
|
|
// Same messages will be printed together but without any interleaving
|
|
fmt.println("Getting ready!")
|
|
sync.barrier_wait(barrier)
|
|
fmt.println("Off their marks they go!")
|
|
})
|
|
}
|
|
|
|
for t in threads {
|
|
thread.destroy(t) // join and free thread
|
|
}
|
|
fmt.println("Finished")
|
|
}
|
|
*/
|
|
Barrier :: struct #no_copy {
|
|
mutex: Mutex,
|
|
cond: Cond,
|
|
index: int,
|
|
generation_id: int,
|
|
thread_count: int,
|
|
}
|
|
|
|
barrier_init :: proc "contextless" (b: ^Barrier, thread_count: int) {
|
|
when ODIN_VALGRIND_SUPPORT {
|
|
vg.helgrind_barrier_resize_pre(b, uint(thread_count))
|
|
}
|
|
b.index = 0
|
|
b.generation_id = 0
|
|
b.thread_count = thread_count
|
|
}
|
|
|
|
// Block the current thread until all threads have rendezvoused
|
|
// Barrier can be reused after all threads rendezvoused once, and can be used continuously
|
|
barrier_wait :: proc "contextless" (b: ^Barrier) -> (is_leader: bool) {
|
|
when ODIN_VALGRIND_SUPPORT {
|
|
vg.helgrind_barrier_wait_pre(b)
|
|
}
|
|
guard(&b.mutex)
|
|
local_gen := b.generation_id
|
|
b.index += 1
|
|
if b.index < b.thread_count {
|
|
for local_gen == b.generation_id && b.index < b.thread_count {
|
|
cond_wait(&b.cond, &b.mutex)
|
|
}
|
|
return false
|
|
}
|
|
|
|
b.index = 0
|
|
b.generation_id += 1
|
|
cond_broadcast(&b.cond)
|
|
return true
|
|
}
|
|
|
|
|
|
Auto_Reset_Event :: struct #no_copy {
|
|
// status == 0: Event is reset and no threads are waiting
|
|
// status == 1: Event is signaled
|
|
// status == -N: Event is reset and N threads are waiting
|
|
status: i32,
|
|
sema: Sema,
|
|
}
|
|
|
|
auto_reset_event_signal :: proc "contextless" (e: ^Auto_Reset_Event) {
|
|
old_status := atomic_load_explicit(&e.status, .Relaxed)
|
|
for {
|
|
new_status := old_status + 1 if old_status < 1 else 1
|
|
if _, ok := atomic_compare_exchange_weak_explicit(&e.status, old_status, new_status, .Release, .Relaxed); ok {
|
|
break
|
|
}
|
|
|
|
if old_status < 0 {
|
|
sema_post(&e.sema)
|
|
}
|
|
}
|
|
}
|
|
|
|
auto_reset_event_wait :: proc "contextless" (e: ^Auto_Reset_Event) {
|
|
old_status := atomic_sub_explicit(&e.status, 1, .Acquire)
|
|
if old_status < 1 {
|
|
sema_wait(&e.sema)
|
|
}
|
|
}
|
|
|
|
|
|
|
|
Ticket_Mutex :: struct #no_copy {
|
|
ticket: uint,
|
|
serving: uint,
|
|
}
|
|
|
|
ticket_mutex_lock :: #force_inline proc "contextless" (m: ^Ticket_Mutex) {
|
|
ticket := atomic_add_explicit(&m.ticket, 1, .Relaxed)
|
|
for ticket != atomic_load_explicit(&m.serving, .Acquire) {
|
|
cpu_relax()
|
|
}
|
|
}
|
|
|
|
ticket_mutex_unlock :: #force_inline proc "contextless" (m: ^Ticket_Mutex) {
|
|
atomic_add_explicit(&m.serving, 1, .Relaxed)
|
|
}
|
|
@(deferred_in=ticket_mutex_unlock)
|
|
ticket_mutex_guard :: proc "contextless" (m: ^Ticket_Mutex) -> bool {
|
|
ticket_mutex_lock(m)
|
|
return true
|
|
}
|
|
|
|
|
|
Benaphore :: struct #no_copy {
|
|
counter: i32,
|
|
sema: Sema,
|
|
}
|
|
|
|
benaphore_lock :: proc "contextless" (b: ^Benaphore) {
|
|
if atomic_add_explicit(&b.counter, 1, .Acquire) > 1 {
|
|
sema_wait(&b.sema)
|
|
}
|
|
}
|
|
|
|
benaphore_try_lock :: proc "contextless" (b: ^Benaphore) -> bool {
|
|
v, _ := atomic_compare_exchange_strong_explicit(&b.counter, 0, 1, .Acquire, .Acquire)
|
|
return v == 0
|
|
}
|
|
|
|
benaphore_unlock :: proc "contextless" (b: ^Benaphore) {
|
|
if atomic_sub_explicit(&b.counter, 1, .Release) > 0 {
|
|
sema_post(&b.sema)
|
|
}
|
|
}
|
|
|
|
@(deferred_in=benaphore_unlock)
|
|
benaphore_guard :: proc "contextless" (m: ^Benaphore) -> bool {
|
|
benaphore_lock(m)
|
|
return true
|
|
}
|
|
|
|
Recursive_Benaphore :: struct #no_copy {
|
|
counter: int,
|
|
owner: int,
|
|
recursion: i32,
|
|
sema: Sema,
|
|
}
|
|
|
|
recursive_benaphore_lock :: proc "contextless" (b: ^Recursive_Benaphore) {
|
|
tid := current_thread_id()
|
|
if atomic_add_explicit(&b.counter, 1, .Acquire) > 1 {
|
|
if tid != b.owner {
|
|
sema_wait(&b.sema)
|
|
}
|
|
}
|
|
// inside the lock
|
|
b.owner = tid
|
|
b.recursion += 1
|
|
}
|
|
|
|
recursive_benaphore_try_lock :: proc "contextless" (b: ^Recursive_Benaphore) -> bool {
|
|
tid := current_thread_id()
|
|
if b.owner == tid {
|
|
atomic_add_explicit(&b.counter, 1, .Acquire)
|
|
}
|
|
|
|
if v, _ := atomic_compare_exchange_strong_explicit(&b.counter, 0, 1, .Acquire, .Acquire); v != 0 {
|
|
return false
|
|
}
|
|
// inside the lock
|
|
b.owner = tid
|
|
b.recursion += 1
|
|
return true
|
|
}
|
|
|
|
recursive_benaphore_unlock :: proc "contextless" (b: ^Recursive_Benaphore) {
|
|
tid := current_thread_id()
|
|
_assert(tid == b.owner, "tid != b.owner")
|
|
b.recursion -= 1
|
|
recursion := b.recursion
|
|
if recursion == 0 {
|
|
b.owner = 0
|
|
}
|
|
if atomic_sub_explicit(&b.counter, 1, .Release) > 0 {
|
|
if recursion == 0 {
|
|
sema_post(&b.sema)
|
|
}
|
|
}
|
|
// outside the lock
|
|
}
|
|
|
|
@(deferred_in=recursive_benaphore_unlock)
|
|
recursive_benaphore_guard :: proc "contextless" (m: ^Recursive_Benaphore) -> bool {
|
|
recursive_benaphore_lock(m)
|
|
return true
|
|
}
|
|
|
|
|
|
|
|
|
|
// Once is a data value that will perform exactly on action.
|
|
//
|
|
// A Once must not be copied after first use.
|
|
Once :: struct #no_copy {
|
|
m: Mutex,
|
|
done: bool,
|
|
}
|
|
|
|
// once_do calls the procedure fn if and only if once_do is being called for the first for this instance of Once.
|
|
once_do :: proc{
|
|
once_do_without_data,
|
|
once_do_without_data_contextless,
|
|
once_do_with_data,
|
|
once_do_with_data_contextless,
|
|
}
|
|
|
|
// once_do_without_data calls the procedure fn if and only if once_do_without_data is being called for the first for this instance of Once.
|
|
once_do_without_data :: proc(o: ^Once, fn: proc()) {
|
|
@(cold)
|
|
do_slow :: proc(o: ^Once, fn: proc()) {
|
|
guard(&o.m)
|
|
if !o.done {
|
|
fn()
|
|
atomic_store_explicit(&o.done, true, .Release)
|
|
}
|
|
}
|
|
|
|
if atomic_load_explicit(&o.done, .Acquire) == false {
|
|
do_slow(o, fn)
|
|
}
|
|
}
|
|
|
|
// once_do_without_data calls the procedure fn if and only if once_do_without_data is being called for the first for this instance of Once.
|
|
once_do_without_data_contextless :: proc(o: ^Once, fn: proc "contextless" ()) {
|
|
@(cold)
|
|
do_slow :: proc(o: ^Once, fn: proc "contextless" ()) {
|
|
guard(&o.m)
|
|
if !o.done {
|
|
fn()
|
|
atomic_store_explicit(&o.done, true, .Release)
|
|
}
|
|
}
|
|
|
|
if atomic_load_explicit(&o.done, .Acquire) == false {
|
|
do_slow(o, fn)
|
|
}
|
|
}
|
|
|
|
// once_do_with_data calls the procedure fn if and only if once_do_with_data is being called for the first for this instance of Once.
|
|
once_do_with_data :: proc(o: ^Once, fn: proc(data: rawptr), data: rawptr) {
|
|
@(cold)
|
|
do_slow :: proc(o: ^Once, fn: proc(data: rawptr), data: rawptr) {
|
|
guard(&o.m)
|
|
if !o.done {
|
|
fn(data)
|
|
atomic_store_explicit(&o.done, true, .Release)
|
|
}
|
|
}
|
|
|
|
if atomic_load_explicit(&o.done, .Acquire) == false {
|
|
do_slow(o, fn, data)
|
|
}
|
|
}
|
|
|
|
// once_do_with_data_contextless calls the procedure fn if and only if once_do_with_data_contextless is being called for the first for this instance of Once.
|
|
once_do_with_data_contextless :: proc "contextless" (o: ^Once, fn: proc "contextless" (data: rawptr), data: rawptr) {
|
|
@(cold)
|
|
do_slow :: proc "contextless" (o: ^Once, fn: proc "contextless" (data: rawptr), data: rawptr) {
|
|
guard(&o.m)
|
|
if !o.done {
|
|
fn(data)
|
|
atomic_store_explicit(&o.done, true, .Release)
|
|
}
|
|
}
|
|
|
|
if atomic_load_explicit(&o.done, .Acquire) == false {
|
|
do_slow(o, fn, data)
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// A Parker is an associated token which is initially not present:
|
|
// * The `park` procedure blocks the current thread unless or until the token
|
|
// is available, at which point the token is consumed.
|
|
// * The `park_with_timeout` procedures works the same as `park` but only
|
|
// blocks for the specified duration.
|
|
// * The `unpark` procedure automatically makes the token available if it
|
|
// was not already.
|
|
Parker :: struct #no_copy {
|
|
state: Futex,
|
|
}
|
|
|
|
// Blocks the current thread until the token is made available.
|
|
//
|
|
// Assumes this is only called by the thread that owns the Parker.
|
|
park :: proc "contextless" (p: ^Parker) {
|
|
EMPTY :: 0
|
|
NOTIFIED :: 1
|
|
PARKED :: max(u32)
|
|
if atomic_sub_explicit(&p.state, 1, .Acquire) == NOTIFIED {
|
|
return
|
|
}
|
|
for {
|
|
futex_wait(&p.state, PARKED)
|
|
if _, ok := atomic_compare_exchange_strong_explicit(&p.state, NOTIFIED, EMPTY, .Acquire, .Acquire); ok {
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// Blocks the current thread until the token is made available, but only
|
|
// for a limited duration.
|
|
//
|
|
// Assumes this is only called by the thread that owns the Parker
|
|
park_with_timeout :: proc "contextless" (p: ^Parker, duration: time.Duration) {
|
|
EMPTY :: 0
|
|
NOTIFIED :: 1
|
|
PARKED :: max(u32)
|
|
if atomic_sub_explicit(&p.state, 1, .Acquire) == NOTIFIED {
|
|
return
|
|
}
|
|
futex_wait_with_timeout(&p.state, PARKED, duration)
|
|
atomic_exchange_explicit(&p.state, EMPTY, .Acquire)
|
|
}
|
|
|
|
// Automatically makes thee token available if it was not already.
|
|
unpark :: proc "contextless" (p: ^Parker) {
|
|
EMPTY :: 0
|
|
NOTIFIED :: 1
|
|
PARKED :: max(Futex)
|
|
if atomic_exchange_explicit(&p.state, NOTIFIED, .Release) == PARKED {
|
|
futex_signal(&p.state)
|
|
}
|
|
} |