Update core:sync/sync2 to have a generic Futex interface, and implement the calls appropriately for each platform

This commit is contained in:
gingerBill
2021-10-09 16:33:28 +01:00
parent d386563344
commit 2ef0e6b8f6
11 changed files with 486 additions and 402 deletions

View File

@@ -251,17 +251,18 @@ Once :: struct {
}
once_do :: proc(o: ^Once, fn: proc()) {
if atomic_load_acquire(&o.done) == false {
_once_do_slow(o, fn)
@(cold)
do_slow :: proc(o: ^Once, fn: proc()) {
mutex_lock(&o.m)
defer mutex_unlock(&o.m)
if !o.done {
fn()
atomic_store_release(&o.done, true)
}
}
}
@(cold)
_once_do_slow :: proc(o: ^Once, fn: proc()) {
mutex_lock(&o.m)
defer mutex_unlock(&o.m)
if !o.done {
fn()
atomic_store_release(&o.done, true)
if atomic_load_acquire(&o.done) == false {
do_slow(o, fn)
}
}

View File

@@ -0,0 +1,83 @@
//+private
//+build darwin
package sync2
import "core:c"
import "core:time"
foreign import System "System.framework"
foreign System {
__ulock_wait :: proc "c" (operation: u32, addr: rawptr, value: u64, timeout_ms: u32) -> c.int ---
__ulock_wait2 :: proc "c" (operation: u32, addr: rawptr, value: u64, timeout_ns: u64, value2: u64) -> c.int ---
__ulock_wake :: proc "c" (operation: u32, addr: rawptr, wake_value: u64) -> c.int ---
}
UL_COMPARE_AND_WAIT :: 1
ULF_WAKE_ALL :: 0x00000100
ULF_NO_ERRNO :: 0x01000000
ENOENT :: -2
EINTR :: -4
EFAULT :: -14
ETIMEDOUT :: -60
_futex_wait :: proc(f: ^Futex, expected: u32) -> Futex_Error {
return _futex_wait_with_timeout(f, expected, 0)
}
_futex_wait_with_timeout :: proc(f: ^Futex, expected: u32, duration: time.Duration) -> Futex_Error {
timeout_ns := u64(duration)
timeout_overflowed := false
s := __ulock_wait2(UL_COMPARE_AND_WAIT | ULF_NO_ERRNO, f, u64(expected), timeout_ns, 0)
if s >= 0 {
return nil
}
switch s {
case EINTR, EFAULT:
return nil
case ETIMEDOUT:
return .Timed_Out
case:
panic("futex_wait failure")
}
return nil
}
_futex_wake_single :: proc(f: ^Futex) {
loop: for {
s := __ulock_wake(UL_COMPARE_AND_WAIT | ULF_NO_ERRNO, f, 0)
if s >= 0 {
return
}
switch s {
case EINTR, EFAULT:
continue loop
case ENOENT:
return
case:
panic("futex_wake_single failure")
}
}
}
_futex_wake_all :: proc(f: ^Futex) {
loop: for {
s := __ulock_wake(UL_COMPARE_AND_WAIT | ULF_NO_ERRNO | ULF_WAKE_ALL, f, 0)
if s >= 0 {
return
}
switch s {
case EINTR, EFAULT:
continue loop
case ENOENT:
return
case:
panic("futex_wake_all failure")
}
}
}

View File

@@ -0,0 +1,98 @@
//+private
//+build linux
package sync2
import "core:c"
import "core:time"
import "core:intrinsics"
FUTEX_WAIT :: 0
FUTEX_WAKE :: 1
FUTEX_PRIVATE_FLAG :: 128
FUTEX_WAIT_PRIVATE :: (FUTEX_WAIT | FUTEX_PRIVATE_FLAG)
FUTEX_WAKE_PRIVATE :: (FUTEX_WAKE | FUTEX_PRIVATE_FLAG)
foreign import libc "system:c"
foreign libc {
__errno_location :: proc "c" () -> ^c.int ---
}
ESUCCESS :: 0
EINTR :: -4
EAGAIN :: -11
EFAULT :: -14
EINVAL :: -22
ETIMEDOUT :: -110
get_errno :: proc(r: int) -> int {
if -4096 < r && r < 0 {
return r
}
return 0
}
internal_futex :: proc(f: ^Futex, op: uintptr, val: u32, timeout: rawptr) -> int {
code := int(intrinsics.syscall(202, uintptr(f), uintptr(op), uintptr(val), uintptr(timeout), 0, 0))
return get_errno(code)
}
_futex_wait :: proc(f: ^Futex, expected: u32) -> Futex_Error {
err := internal_futex(f, FUTEX_WAIT_PRIVATE | FUTEX_WAIT, expected, nil)
switch err {
case ESUCCESS, EINTR, EAGAIN, EINVAL:
// okay
case ETIMEDOUT:
return .Timed_Out
case EFAULT:
fallthrough
case:
panic("futex_wait failure")
}
return nil
}
_futex_wait_with_timeout :: proc(f: ^Futex, expected: u32, duration: time.Duration) -> Futex_Error {
timeout: struct {
tv_sec: c.long,
tv_nsec: c.long,
}
timeout.tv_sec = (c.long)(duration/1e9)
timeout.tv_nsec = (c.long)(duration%1e9)
err := internal_futex(f, FUTEX_WAIT_PRIVATE | FUTEX_WAIT, expected, &timeout)
switch err {
case ESUCCESS, EINTR, EAGAIN, EINVAL:
// okay
case ETIMEDOUT:
return .Timed_Out
case EFAULT:
fallthrough
case:
panic("futex_wait_with_timeout failure")
}
return nil
}
_futex_wake_single :: proc(f: ^Futex) {
err := internal_futex(f, FUTEX_WAKE_PRIVATE | FUTEX_WAKE, 1, nil)
switch err {
case ESUCCESS, EINVAL, EFAULT:
// okay
case:
panic("futex_wake_single failure")
}
}
_futex_wake_all :: proc(f: ^Futex) {
err := internal_futex(f, FUTEX_WAKE_PRIVATE | FUTEX_WAKE, u32(max(i32)), nil)
switch err {
case ESUCCESS, EINVAL, EFAULT:
// okay
case:
panic("_futex_wake_all failure")
}
}

View File

@@ -0,0 +1,41 @@
//+private
//+build windows
package sync2
import "core:time"
foreign import Synchronization "system:Synchronization.lib"
@(default_calling_convention="c")
foreign Synchronization {
WaitOnAddress :: proc(Address: rawptr, CompareAddress: rawptr, AddressSize: uint, dwMilliseconds: u32) -> b32 ---
WakeByAddressSingle :: proc(Address: rawptr) ---
WakeByAddressAll :: proc(Address: rawptr) ---
}
_futex_wait :: proc(f: ^Futex, expect: u32) -> Futex_Error {
expect := expect
ms :: ~u32(0) // infinite
ok := WaitOnAddress(f, &expect, size_of(expect), ms)
return nil if ok else .Timed_Out
}
_futex_wait_with_timeout :: proc(f: ^Futex, expect: u32, duration: time.Duration) -> Futex_Error {
expect := expect
ms: u32 = 0
if duration >= 0 {
ms = u32(u64(duration)/1e6)
}
ok := WaitOnAddress(f, &expect, size_of(expect), ms)
return nil if ok else .Timed_Out
}
_futex_wake_single :: proc(f: ^Futex) {
WakeByAddressSingle(f)
}
_futex_wake_all :: proc(f: ^Futex) {
WakeByAddressAll(f)
}

View File

@@ -153,10 +153,6 @@ cond_wait :: proc(c: ^Cond, m: ^Mutex) {
_cond_wait(c, m)
}
cond_wait_with_timeout :: proc(c: ^Cond, m: ^Mutex, timeout: time.Duration) -> bool {
return _cond_wait_with_timeout(c, m, timeout)
}
cond_signal :: proc(c: ^Cond) {
_cond_signal(c)
}
@@ -166,20 +162,67 @@ cond_broadcast :: proc(c: ^Cond) {
}
// When waited upon, blocks until the internal count is greater than zero, then subtracts one.
// Posting to the semaphore increases the count by one, or the provided amount.
//
// A Sema must not be copied after first use
Sema :: struct {
impl: _Sema,
count: Futex,
}
sema_wait :: proc(s: ^Sema) {
_sema_wait(s)
for {
original_count := atomic_load(&s.count)
for original_count == 0 {
futex_wait(&s.count, u32(original_count))
original_count = s.count
}
if original_count == atomic_compare_exchange_strong(&s.count, original_count-1, original_count) {
return
}
}
}
sema_post :: proc(s: ^Sema, count := 1) {
_sema_post(s, count)
atomic_add(&s.count, Futex(count))
if count == 1 {
futex_wake_single(&s.count)
} else {
futex_wake_all(&s.count)
}
}
// Futex is a fast userspace mutual exclusion lock, using a 32-bit memory address as a hint
//
// An Futex must not be copied after first use
Futex :: distinct u32
Futex_Error :: enum {
None,
Timed_Out,
}
futex_wait :: proc(f: ^Futex, expected: u32) {
if u32(atomic_load(f)) != expected {
return
}
assert(_futex_wait(f, expected) != nil, "futex_wait failure")
}
futex_wait_with_timeout :: proc(f: ^Futex, expected: u32, duration: time.Duration) -> Futex_Error {
if u32(atomic_load(f)) != expected {
return nil
}
return _futex_wait_with_timeout(f, expected, duration)
}
futex_wake_single :: proc(f: ^Futex) {
_futex_wake_single(f)
}
futex_wake_all :: proc(f: ^Futex) {
_futex_wake_all(f)
}

View File

@@ -2,7 +2,7 @@ package sync2
import "core:time"
Atomic_Mutex_State :: enum i32 {
Atomic_Mutex_State :: enum Futex {
Unlocked = 0,
Locked = 1,
Waiting = 2,
@@ -42,8 +42,8 @@ atomic_mutex_lock :: proc(m: ^Atomic_Mutex) {
if atomic_exchange_acquire(&m.state, .Waiting) == .Unlocked {
return
}
// TODO(bill): Use a Futex here for Linux to improve performance and error handling
futex_wait((^Futex)(&m.state), u32(new_state))
cpu_relax()
}
}
@@ -62,7 +62,7 @@ atomic_mutex_lock :: proc(m: ^Atomic_Mutex) {
atomic_mutex_unlock :: proc(m: ^Atomic_Mutex) {
@(cold)
unlock_slow :: proc(m: ^Atomic_Mutex) {
// TODO(bill): Use a Futex here for Linux to improve performance and error handling
futex_wake_single((^Futex)(&m.state))
}
@@ -289,20 +289,20 @@ atomic_recursive_mutex_guard :: proc(m: ^Atomic_Recursive_Mutex) -> bool {
@(private="file")
Queue_Item :: struct {
next: ^Queue_Item,
futex: i32,
futex: Futex,
}
@(private="file")
queue_item_wait :: proc(item: ^Queue_Item) {
for atomic_load_acquire(&item.futex) == 0 {
// TODO(bill): Use a Futex here for Linux to improve performance and error handling
futex_wait(&item.futex, 0)
cpu_relax()
}
}
@(private="file")
queue_item_signal :: proc(item: ^Queue_Item) {
atomic_store_release(&item.futex, 1)
// TODO(bill): Use a Futex here for Linux to improve performance and error handling
futex_wake_single(&item.futex)
}
@@ -331,11 +331,6 @@ atomic_cond_wait :: proc(c: ^Atomic_Cond, m: ^Atomic_Mutex) {
atomic_mutex_lock(m)
}
atomic_cond_wait_with_timeout :: proc(c: ^Atomic_Cond, m: ^Atomic_Mutex, timeout: time.Duration) -> bool {
// TODO(bill): _cond_wait_with_timeout for unix
return false
}
atomic_cond_signal :: proc(c: ^Atomic_Cond) {
if !atomic_load(&c.pending) {
return

View File

@@ -18,126 +18,36 @@ _current_thread_id :: proc "contextless" () -> int {
return int(tid)
}
foreign {
@(link_name="usleep")
_darwin_usleep :: proc "c" (us: uint) -> i32 ---
@(link_name="sched_yield")
_darwin_sched_yield :: proc "c" () -> i32 ---
}
_atomic_try_wait_slow :: proc(ptr: ^u32, val: u32) {
history: uint = 10
for {
// Exponential wait
_darwin_usleep(history >> 2)
history += history >> 2
if history > (1 << 10) {
history = 1 << 10
}
if atomic_load(ptr) != val {
break
}
}
}
_atomic_wait :: proc(ptr: ^u32, val: u32) {
if intrinsics.expect(atomic_load(ptr) != val, true) {
return
}
for i in 0..<16 {
if atomic_load(ptr) != val {
return
}
if i < 12 {
intrinsics.cpu_relax()
} else {
_darwin_sched_yield()
}
}
for val == atomic_load(ptr) {
_atomic_try_wait_slow(ptr, val)
}
}
_Mutex :: struct {
mutex: Atomic_Mutex,
}
_mutex_lock :: proc(m: ^Mutex) {
atomic_mutex_lock(&m.impl.mutex)
}
_mutex_unlock :: proc(m: ^Mutex) {
atomic_mutex_unlock(&m.impl.mutex)
}
_mutex_try_lock :: proc(m: ^Mutex) -> bool {
return false
return atomic_mutex_try_lock(&m.impl.mutex)
}
_RW_Mutex :: struct {
}
_rw_mutex_lock :: proc(rw: ^RW_Mutex) {
}
_rw_mutex_unlock :: proc(rw: ^RW_Mutex) {
}
_rw_mutex_try_lock :: proc(rw: ^RW_Mutex) -> bool {
return false
}
_rw_mutex_shared_lock :: proc(rw: ^RW_Mutex) {
}
_rw_mutex_shared_unlock :: proc(rw: ^RW_Mutex) {
}
_rw_mutex_try_shared_lock :: proc(rw: ^RW_Mutex) -> bool {
return false
}
_Recursive_Mutex :: struct {
}
_recursive_mutex_lock :: proc(m: ^Recursive_Mutex) {
}
_recursive_mutex_unlock :: proc(m: ^Recursive_Mutex) {
}
_recursive_mutex_try_lock :: proc(m: ^Recursive_Mutex) -> bool {
return false
}
_Cond :: struct {
cond: Atomic_Cond,
}
_cond_wait :: proc(c: ^Cond, m: ^Mutex) {
}
_cond_wait_with_timeout :: proc(c: ^Cond, m: ^Mutex, timeout: time.Duration) -> bool {
return false
atomic_cond_wait(&c.impl.cond, &m.impl.mutex)
}
_cond_signal :: proc(c: ^Cond) {
atomic_cond_signal(&c.impl.cond)
}
_cond_broadcast :: proc(c: ^Cond) {
}
_Sema :: struct {
}
_sema_wait :: proc(s: ^Sema) {
}
_sema_post :: proc(s: ^Sema, count := 1) {
atomic_cond_broadcast(&c.impl.cond)
}

View File

@@ -0,0 +1,184 @@
//+private
package sync2
when #config(ODIN_SYNC_RECURSIVE_MUTEX_USE_FUTEX, true) {
_Recursive_Mutex :: struct {
owner: Futex,
recursion: i32,
}
_recursive_mutex_lock :: proc(m: ^Recursive_Mutex) {
tid := Futex(current_thread_id())
for {
prev_owner := atomic_compare_exchange_strong_acquire(&m.impl.owner, tid, 0)
switch prev_owner {
case 0, tid:
m.impl.recursion += 1
// inside the lock
return
}
futex_wait(&m.impl.owner, u32(prev_owner))
}
}
_recursive_mutex_unlock :: proc(m: ^Recursive_Mutex) {
m.impl.recursion -= 1
if m.impl.recursion != 0 {
return
}
atomic_exchange_release(&m.impl.owner, 0)
futex_wake_single(&m.impl.owner)
// outside the lock
}
_recursive_mutex_try_lock :: proc(m: ^Recursive_Mutex) -> bool {
tid := Futex(current_thread_id())
prev_owner := atomic_compare_exchange_strong_acquire(&m.impl.owner, tid, 0)
switch prev_owner {
case 0, tid:
m.impl.recursion += 1
// inside the lock
return true
}
return false
}
} else {
_Recursive_Mutex :: struct {
owner: int,
recursion: int,
mutex: Mutex,
}
_recursive_mutex_lock :: proc(m: ^Recursive_Mutex) {
tid := current_thread_id()
if tid != m.impl.owner {
mutex_lock(&m.impl.mutex)
}
// inside the lock
m.impl.owner = tid
m.impl.recursion += 1
}
_recursive_mutex_unlock :: proc(m: ^Recursive_Mutex) {
tid := current_thread_id()
assert(tid == m.impl.owner)
m.impl.recursion -= 1
recursion := m.impl.recursion
if recursion == 0 {
m.impl.owner = 0
}
if recursion == 0 {
mutex_unlock(&m.impl.mutex)
}
// outside the lock
}
_recursive_mutex_try_lock :: proc(m: ^Recursive_Mutex) -> bool {
tid := current_thread_id()
if m.impl.owner == tid {
return mutex_try_lock(&m.impl.mutex)
}
if !mutex_try_lock(&m.impl.mutex) {
return false
}
// inside the lock
m.impl.owner = tid
m.impl.recursion += 1
return true
}
}
when ODIN_OS != "windows" {
RW_Mutex_State :: distinct uint
RW_Mutex_State_Half_Width :: size_of(RW_Mutex_State)*8/2
RW_Mutex_State_Is_Writing :: RW_Mutex_State(1)
RW_Mutex_State_Writer :: RW_Mutex_State(1)<<1
RW_Mutex_State_Reader :: RW_Mutex_State(1)<<RW_Mutex_State_Half_Width
RW_Mutex_State_Writer_Mask :: RW_Mutex_State(1<<(RW_Mutex_State_Half_Width-1) - 1) << 1
RW_Mutex_State_Reader_Mask :: RW_Mutex_State(1<<(RW_Mutex_State_Half_Width-1) - 1) << RW_Mutex_State_Half_Width
_RW_Mutex :: struct {
// NOTE(bill): pthread_rwlock_t cannot be used since pthread_rwlock_destroy is required on some platforms
// TODO(bill): Can we determine which platforms exactly?
state: RW_Mutex_State,
mutex: Mutex,
sema: Sema,
}
_rw_mutex_lock :: proc(rw: ^RW_Mutex) {
_ = atomic_add(&rw.impl.state, RW_Mutex_State_Writer)
mutex_lock(&rw.impl.mutex)
state := atomic_or(&rw.impl.state, RW_Mutex_State_Writer)
if state & RW_Mutex_State_Reader_Mask != 0 {
sema_wait(&rw.impl.sema)
}
}
_rw_mutex_unlock :: proc(rw: ^RW_Mutex) {
_ = atomic_and(&rw.impl.state, ~RW_Mutex_State_Is_Writing)
mutex_unlock(&rw.impl.mutex)
}
_rw_mutex_try_lock :: proc(rw: ^RW_Mutex) -> bool {
if mutex_try_lock(&rw.impl.mutex) {
state := atomic_load(&rw.impl.state)
if state & RW_Mutex_State_Reader_Mask == 0 {
_ = atomic_or(&rw.impl.state, RW_Mutex_State_Is_Writing)
return true
}
mutex_unlock(&rw.impl.mutex)
}
return false
}
_rw_mutex_shared_lock :: proc(rw: ^RW_Mutex) {
state := atomic_load(&rw.impl.state)
for state & (RW_Mutex_State_Is_Writing|RW_Mutex_State_Writer_Mask) == 0 {
ok: bool
state, ok = atomic_compare_exchange_weak(&rw.impl.state, state, state + RW_Mutex_State_Reader)
if ok {
return
}
}
mutex_lock(&rw.impl.mutex)
_ = atomic_add(&rw.impl.state, RW_Mutex_State_Reader)
mutex_unlock(&rw.impl.mutex)
}
_rw_mutex_shared_unlock :: proc(rw: ^RW_Mutex) {
state := atomic_sub(&rw.impl.state, RW_Mutex_State_Reader)
if (state & RW_Mutex_State_Reader_Mask == RW_Mutex_State_Reader) &&
(state & RW_Mutex_State_Is_Writing != 0) {
sema_post(&rw.impl.sema)
}
}
_rw_mutex_try_shared_lock :: proc(rw: ^RW_Mutex) -> bool {
state := atomic_load(&rw.impl.state)
if state & (RW_Mutex_State_Is_Writing|RW_Mutex_State_Writer_Mask) == 0 {
_, ok := atomic_compare_exchange_strong(&rw.impl.state, state, state + RW_Mutex_State_Reader)
if ok {
return true
}
}
if mutex_try_lock(&rw.impl.mutex) {
_ = atomic_add(&rw.impl.state, RW_Mutex_State_Reader)
mutex_unlock(&rw.impl.mutex)
return true
}
return false
}
}

View File

@@ -2,14 +2,9 @@
//+private
package sync2
// TODO(bill): remove libc
foreign import libc "system:c"
import "core:intrinsics"
_current_thread_id :: proc "contextless" () -> int {
foreign libc {
syscall :: proc(number: i32, #c_vararg args: ..any) -> i32 ---
}
SYS_GETTID :: 186
return int(syscall(SYS_GETTID))
return int(intrinsics.syscall(SYS_GETTID))
}

View File

@@ -2,8 +2,6 @@
//+private
package sync2
when #config(ODIN_SYNC_USE_PTHREADS, true) {
import "core:time"
import "core:runtime"
import "core:sys/unix"
@@ -32,142 +30,6 @@ _mutex_try_lock :: proc(m: ^Mutex) -> bool {
return err == 0
}
RW_Mutex_State :: distinct uint
RW_Mutex_State_Half_Width :: size_of(RW_Mutex_State)*8/2
RW_Mutex_State_Is_Writing :: RW_Mutex_State(1)
RW_Mutex_State_Writer :: RW_Mutex_State(1)<<1
RW_Mutex_State_Reader :: RW_Mutex_State(1)<<RW_Mutex_State_Half_Width
RW_Mutex_State_Writer_Mask :: RW_Mutex_State(1<<(RW_Mutex_State_Half_Width-1) - 1) << 1
RW_Mutex_State_Reader_Mask :: RW_Mutex_State(1<<(RW_Mutex_State_Half_Width-1) - 1) << RW_Mutex_State_Half_Width
_RW_Mutex :: struct {
// NOTE(bill): pthread_rwlock_t cannot be used since pthread_rwlock_destroy is required on some platforms
// TODO(bill): Can we determine which platforms exactly?
state: RW_Mutex_State,
mutex: Mutex,
sema: Sema,
}
_rw_mutex_lock :: proc(rw: ^RW_Mutex) {
_ = atomic_add(&rw.impl.state, RW_Mutex_State_Writer)
mutex_lock(&rw.impl.mutex)
state := atomic_or(&rw.impl.state, RW_Mutex_State_Writer)
if state & RW_Mutex_State_Reader_Mask != 0 {
sema_wait(&rw.impl.sema)
}
}
_rw_mutex_unlock :: proc(rw: ^RW_Mutex) {
_ = atomic_and(&rw.impl.state, ~RW_Mutex_State_Is_Writing)
mutex_unlock(&rw.impl.mutex)
}
_rw_mutex_try_lock :: proc(rw: ^RW_Mutex) -> bool {
if mutex_try_lock(&rw.impl.mutex) {
state := atomic_load(&rw.impl.state)
if state & RW_Mutex_State_Reader_Mask == 0 {
_ = atomic_or(&rw.impl.state, RW_Mutex_State_Is_Writing)
return true
}
mutex_unlock(&rw.impl.mutex)
}
return false
}
_rw_mutex_shared_lock :: proc(rw: ^RW_Mutex) {
state := atomic_load(&rw.impl.state)
for state & (RW_Mutex_State_Is_Writing|RW_Mutex_State_Writer_Mask) == 0 {
ok: bool
state, ok = atomic_compare_exchange_weak(&rw.impl.state, state, state + RW_Mutex_State_Reader)
if ok {
return
}
}
mutex_lock(&rw.impl.mutex)
_ = atomic_add(&rw.impl.state, RW_Mutex_State_Reader)
mutex_unlock(&rw.impl.mutex)
}
_rw_mutex_shared_unlock :: proc(rw: ^RW_Mutex) {
state := atomic_sub(&rw.impl.state, RW_Mutex_State_Reader)
if (state & RW_Mutex_State_Reader_Mask == RW_Mutex_State_Reader) &&
(state & RW_Mutex_State_Is_Writing != 0) {
sema_post(&rw.impl.sema)
}
}
_rw_mutex_try_shared_lock :: proc(rw: ^RW_Mutex) -> bool {
state := atomic_load(&rw.impl.state)
if state & (RW_Mutex_State_Is_Writing|RW_Mutex_State_Writer_Mask) == 0 {
_, ok := atomic_compare_exchange_strong(&rw.impl.state, state, state + RW_Mutex_State_Reader)
if ok {
return true
}
}
if mutex_try_lock(&rw.impl.mutex) {
_ = atomic_add(&rw.impl.state, RW_Mutex_State_Reader)
mutex_unlock(&rw.impl.mutex)
return true
}
return false
}
_Recursive_Mutex :: struct {
owner: int,
recursion: int,
mutex: Mutex,
}
_recursive_mutex_lock :: proc(m: ^Recursive_Mutex) {
tid := _current_thread_id()
if tid != m.impl.owner {
mutex_lock(&m.impl.mutex)
}
// inside the lock
m.impl.owner = tid
m.impl.recursion += 1
}
_recursive_mutex_unlock :: proc(m: ^Recursive_Mutex) {
tid := _current_thread_id()
assert(tid == m.impl.owner)
m.impl.recursion -= 1
recursion := m.impl.recursion
if recursion == 0 {
m.impl.owner = 0
}
if recursion == 0 {
mutex_unlock(&m.impl.mutex)
}
// outside the lock
}
_recursive_mutex_try_lock :: proc(m: ^Recursive_Mutex) -> bool {
tid := _current_thread_id()
if m.impl.owner == tid {
return mutex_try_lock(&m.impl.mutex)
}
if !mutex_try_lock(&m.impl.mutex) {
return false
}
// inside the lock
m.impl.owner = tid
m.impl.recursion += 1
return true
}
_Cond :: struct {
pthread_cond: unix.pthread_cond_t,
}
@@ -177,17 +39,6 @@ _cond_wait :: proc(c: ^Cond, m: ^Mutex) {
assert(err == 0)
}
_cond_wait_with_timeout :: proc(c: ^Cond, m: ^Mutex, timeout: time.Duration) -> bool {
ns := time.duration_nanoseconds(timeout)
timeout_timespec := &time.TimeSpec{
tv_sec = ns / 1e9,
tv_nsec = ns % 1e9,
}
err := unix.pthread_cond_timedwait(&c.impl.pthread_cond, &m.impl.pthread_mutex, timeout_timespec)
// TODO(bill):
return err == 0
}
_cond_signal :: proc(c: ^Cond) {
err := unix.pthread_cond_signal(&c.impl.pthread_cond)
assert(err == 0)
@@ -197,35 +48,3 @@ _cond_broadcast :: proc(c: ^Cond) {
err := unix.pthread_cond_broadcast(&c.impl.pthread_cond)
assert(err == 0)
}
_Sema :: struct {
mutex: Mutex,
cond: Cond,
count: int,
}
_sema_wait :: proc(s: ^Sema) {
mutex_lock(&s.impl.mutex)
defer mutex_unlock(&s.impl.mutex)
for s.impl.count == 0 {
cond_wait(&s.impl.cond, &s.impl.mutex)
}
s.impl.count -= 1
if s.impl.count > 0 {
cond_signal(&s.impl.cond)
}
}
_sema_post :: proc(s: ^Sema, count := 1) {
mutex_lock(&s.impl.mutex)
defer mutex_unlock(&s.impl.mutex)
s.impl.count += count
cond_signal(&s.impl.cond)
}
} // ODIN_SYNC_USE_PTHREADS

View File

@@ -54,55 +54,6 @@ _rw_mutex_try_shared_lock :: proc(rw: ^RW_Mutex) -> bool {
}
_Recursive_Mutex :: struct {
owner: u32,
claim_count: i32,
}
_recursive_mutex_lock :: proc(m: ^Recursive_Mutex) {
tid := win32.GetCurrentThreadId()
for {
prev_owner := atomic_compare_exchange_strong_acquire(&m.impl.owner, tid, 0)
switch prev_owner {
case 0, tid:
m.impl.claim_count += 1
// inside the lock
return
}
win32.WaitOnAddress(
&m.impl.owner,
&prev_owner,
size_of(prev_owner),
win32.INFINITE,
)
}
}
_recursive_mutex_unlock :: proc(m: ^Recursive_Mutex) {
m.impl.claim_count -= 1
if m.impl.claim_count != 0 {
return
}
atomic_exchange_release(&m.impl.owner, 0)
win32.WakeByAddressSingle(&m.impl.owner)
// outside the lock
}
_recursive_mutex_try_lock :: proc(m: ^Recursive_Mutex) -> bool {
tid := win32.GetCurrentThreadId()
prev_owner := atomic_compare_exchange_strong_acquire(&m.impl.owner, tid, 0)
switch prev_owner {
case 0, tid:
m.impl.claim_count += 1
// inside the lock
return true
}
return false
}
_Cond :: struct {
@@ -113,11 +64,6 @@ _cond_wait :: proc(c: ^Cond, m: ^Mutex) {
_ = win32.SleepConditionVariableSRW(&c.impl.cond, &m.impl.srwlock, win32.INFINITE, 0)
}
_cond_wait_with_timeout :: proc(c: ^Cond, m: ^Mutex, timeout: time.Duration) -> bool {
ms := win32.DWORD((max(time.duration_nanoseconds(timeout), 0) + 999999)/1000000)
return cast(bool)win32.SleepConditionVariableSRW(&c.impl.cond, &m.impl.srwlock, ms, 0)
}
_cond_signal :: proc(c: ^Cond) {
win32.WakeConditionVariable(&c.impl.cond)
}
@@ -126,34 +72,3 @@ _cond_broadcast :: proc(c: ^Cond) {
win32.WakeAllConditionVariable(&c.impl.cond)
}
_Sema :: struct {
count: i32,
}
_sema_wait :: proc(s: ^Sema) {
for {
original_count := s.impl.count
for original_count == 0 {
win32.WaitOnAddress(
&s.impl.count,
&original_count,
size_of(original_count),
win32.INFINITE,
)
original_count = s.impl.count
}
if original_count == atomic_compare_exchange_strong(&s.impl.count, original_count-1, original_count) {
return
}
}
}
_sema_post :: proc(s: ^Sema, count := 1) {
atomic_add(&s.impl.count, i32(count))
if count == 1 {
win32.WakeByAddressSingle(&s.impl.count)
} else {
win32.WakeByAddressAll(&s.impl.count)
}
}