Simplify Atomic_Cond implementation

This commit is contained in:
gingerBill
2022-04-26 13:04:50 +01:00
parent c81fd2e5dd
commit ec8221cb5d
2 changed files with 19 additions and 94 deletions

View File

@@ -6,8 +6,8 @@ cpu_relax :: intrinsics.cpu_relax
/*
Atomic_Memory_Order :: enum {
Relaxed = 0,
Consume = 1,
Relaxed = 0, // Unordered
Consume = 1, // Monotonic
Acquire = 2,
Release = 3,
Acq_Rel = 4,

View File

@@ -281,118 +281,43 @@ atomic_recursive_mutex_guard :: proc(m: ^Atomic_Recursive_Mutex) -> bool {
@(private="file")
Queue_Item :: struct {
next: ^Queue_Item,
futex: Futex,
}
@(private="file")
queue_item_wait :: proc(item: ^Queue_Item) {
for atomic_load_explicit(&item.futex, .Acquire) == 0 {
futex_wait(&item.futex, 0)
cpu_relax()
}
}
@(private="file")
queue_item_wait_with_timeout :: proc(item: ^Queue_Item, duration: time.Duration) -> bool {
start := time.tick_now()
for atomic_load_explicit(&item.futex, .Acquire) == 0 {
remaining := duration - time.tick_since(start)
if remaining < 0 {
return false
}
if !futex_wait_with_timeout(&item.futex, 0, remaining) {
return false
}
cpu_relax()
}
return true
}
@(private="file")
queue_item_signal :: proc(item: ^Queue_Item) {
atomic_store_explicit(&item.futex, 1, .Release)
futex_signal(&item.futex)
}
// Atomic_Cond implements a condition variable, a rendezvous point for threads
// waiting for signalling the occurence of an event
//
// An Atomic_Cond must not be copied after first use
Atomic_Cond :: struct {
queue_mutex: Atomic_Mutex,
queue_head: ^Queue_Item,
pending: bool,
state: Futex,
prev: u32,
}
atomic_cond_wait :: proc(c: ^Atomic_Cond, m: ^Atomic_Mutex) {
waiter := &Queue_Item{}
state := u32(atomic_load(&c.state))
atomic_store(&c.prev, state)
unlock(m)
futex_wait(&c.state, state)
lock(m)
atomic_mutex_lock(&c.queue_mutex)
waiter.next = c.queue_head
c.queue_head = waiter
atomic_store(&c.pending, true)
atomic_mutex_unlock(&c.queue_mutex)
atomic_mutex_unlock(m)
queue_item_wait(waiter)
atomic_mutex_lock(m)
}
atomic_cond_wait_with_timeout :: proc(c: ^Atomic_Cond, m: ^Atomic_Mutex, duration: time.Duration) -> (ok: bool) {
waiter := &Queue_Item{}
atomic_mutex_lock(&c.queue_mutex)
waiter.next = c.queue_head
c.queue_head = waiter
atomic_store(&c.pending, true)
atomic_mutex_unlock(&c.queue_mutex)
atomic_mutex_unlock(m)
ok = queue_item_wait_with_timeout(waiter, duration)
atomic_mutex_lock(m)
state := u32(atomic_load(&c.state))
unlock(m)
ok = futex_wait_with_timeout(&c.state, state, duration)
lock(m)
return
}
atomic_cond_signal :: proc(c: ^Atomic_Cond) {
if !atomic_load(&c.pending) {
return
}
atomic_mutex_lock(&c.queue_mutex)
waiter := c.queue_head
if c.queue_head != nil {
c.queue_head = c.queue_head.next
}
atomic_store(&c.pending, c.queue_head != nil)
atomic_mutex_unlock(&c.queue_mutex)
if waiter != nil {
queue_item_signal(waiter)
}
state := 1 + atomic_load(&c.prev)
atomic_store(&c.state, Futex(state))
futex_signal(&c.state)
}
atomic_cond_broadcast :: proc(c: ^Atomic_Cond) {
if !atomic_load(&c.pending) {
return
}
atomic_store(&c.pending, false)
atomic_mutex_lock(&c.queue_mutex)
waiters := c.queue_head
c.queue_head = nil
atomic_mutex_unlock(&c.queue_mutex)
for waiters != nil {
queue_item_signal(waiters)
waiters = waiters.next
}
state := 1 + atomic_load(&c.prev)
atomic_store(&c.state, Futex(state))
futex_broadcast(&c.state)
}
// When waited upon, blocks until the internal count is greater than zero, then subtracts one.