Add sync.Barrier; Add sync.Blocking_Mutex for unix

This commit is contained in:
gingerBill
2020-06-27 11:23:37 +01:00
parent 2b18f43b65
commit 9fdebebd28
4 changed files with 212 additions and 63 deletions

81
core/sync/barrier.odin Normal file
View File

@@ -0,0 +1,81 @@
package sync
// A barrier enabling multiple threads to synchronize the beginning of some computation
/*
* Example:
*
* package example
*
* import "core:fmt"
* import "core:sync"
* import "core:thread"
*
* barrier := &sync.Barrier{};
*
* main :: proc() {
* fmt.println("Start");
*
* THREAD_COUNT :: 4;
* threads: [THREAD_COUNT]^thread.Thread;
*
* sync.barrier_init(barrier, THREAD_COUNT);
* defer sync.barrier_destroy(barrier);
*
*
* for _, i in threads {
* threads[i] = thread.create_and_start(proc(t: ^thread.Thread) {
* // Same messages will be printed together but without any interleaving
* fmt.println("Getting ready!");
* sync.barrier_wait(barrier);
* fmt.println("Off their marks they go!");
* });
* }
*
* for t in threads {
* thread.destroy(t); // join and free thread
* }
* fmt.println("Finished");
* }
*
*/
Barrier :: struct {
mutex: Blocking_Mutex,
cond: Condition,
index: int,
generation_id: int,
thread_count: int,
}
barrier_init :: proc(b: ^Barrier, thread_count: int) {
blocking_mutex_init(&b.mutex);
condition_init(&b.cond, &b.mutex);
b.index = 0;
b.generation_id = 0;
b.thread_count = thread_count;
}
barrier_destroy :: proc(b: ^Barrier) {
blocking_mutex_destroy(&b.mutex);
condition_destroy(&b.cond);
}
// Block the current thread until all threads have rendezvoused
// Barrier can be reused after all threads rendezvoused once, and can be used continuously
barrier_wait :: proc(b: ^Barrier) -> (is_leader: bool) {
blocking_mutex_lock(&b.mutex);
defer blocking_mutex_unlock(&b.mutex);
local_gen := b.generation_id;
b.index += 1;
if b.index < b.thread_count {
for local_gen == b.generation_id && b.index < b.thread_count {
condition_wait_for(&b.cond);
}
return false;
}
b.index = 0;
b.generation_id += 1;
condition_broadcast(&b.cond);
return true;
}

View File

@@ -6,6 +6,9 @@ cpu_relax :: inline proc() {
intrinsics.cpu_relax();
}
Condition_Mutex_Ptr :: union{^Mutex, ^Blocking_Mutex};
Ticket_Mutex :: struct {
ticket: u64,
serving: u64,

View File

@@ -3,33 +3,18 @@ package sync
import "core:sys/unix"
// A lock that can only be held by one thread at once.
// A recursive lock that can only be held by one thread at once
Mutex :: struct {
handle: unix.pthread_mutex_t,
}
// Blocks until signalled, and then lets past exactly
// one thread.
Condition :: struct {
handle: unix.pthread_cond_t,
mutex: ^Mutex,
// NOTE(tetra, 2019-11-11): Used to mimic the more sane behavior of Windows' AutoResetEvent.
// This means that you may signal the condition before anyone is waiting to cause the
// next thread that tries to wait to just pass by uninterrupted, without sleeping.
// Without this, signalling a condition will only wake up a thread which is already waiting,
// but not one that is about to wait, which can cause your program to become out of sync in
// ways that are hard to debug or fix.
flag: bool, // atomically mutated
}
mutex_init :: proc(m: ^Mutex) {
// NOTE(tetra, 2019-11-01): POSIX OOM if we cannot init the attrs or the mutex.
attrs: unix.pthread_mutexattr_t;
assert(unix.pthread_mutexattr_init(&attrs) == 0);
defer unix.pthread_mutexattr_destroy(&attrs); // ignores destruction error
unix.pthread_mutexattr_settype(&attrs, unix.PTHREAD_MUTEX_RECURSIVE);
assert(unix.pthread_mutex_init(&m.handle, &attrs) == 0);
}
@@ -53,7 +38,56 @@ mutex_unlock :: proc(m: ^Mutex) {
}
condition_init :: proc(c: ^Condition, mutex: ^Mutex) -> bool {
Blocking_Mutex :: struct {
handle: unix.pthread_mutex_t,
}
blocking_mutex_init :: proc(m: ^Blocking_Mutex) {
// NOTE(tetra, 2019-11-01): POSIX OOM if we cannot init the attrs or the mutex.
attrs: unix.pthread_mutexattr_t;
assert(unix.pthread_mutexattr_init(&attrs) == 0);
defer unix.pthread_mutexattr_destroy(&attrs); // ignores destruction error
assert(unix.pthread_mutex_init(&m.handle, &attrs) == 0);
}
blocking_mutex_destroy :: proc(m: ^Blocking_Mutex) {
assert(unix.pthread_mutex_destroy(&m.handle) == 0);
m.handle = {};
}
blocking_mutex_lock :: proc(m: ^Blocking_Mutex) {
assert(unix.pthread_mutex_lock(&m.handle) == 0);
}
// Returns false if someone else holds the lock.
blocking_mutex_try_lock :: proc(m: ^Blocking_Mutex) -> bool {
return unix.pthread_mutex_trylock(&m.handle) == 0;
}
blocking_mutex_unlock :: proc(m: ^Blocking_Mutex) {
assert(unix.pthread_mutex_unlock(&m.handle) == 0);
}
// Blocks until signalled, and then lets past exactly
// one thread.
Condition :: struct {
handle: unix.pthread_cond_t,
mutex: Condition_Mutex_Ptr,
// NOTE(tetra, 2019-11-11): Used to mimic the more sane behavior of Windows' AutoResetEvent.
// This means that you may signal the condition before anyone is waiting to cause the
// next thread that tries to wait to just pass by uninterrupted, without sleeping.
// Without this, signalling a condition will only wake up a thread which is already waiting,
// but not one that is about to wait, which can cause your program to become out of sync in
// ways that are hard to debug or fix.
flag: bool, // atomically mutated
}
condition_init :: proc(c: ^Condition, mutex: Condition_Mutex_Ptr) -> bool {
// NOTE(tetra, 2019-11-01): POSIX OOM if we cannot init the attrs or the condition.
attrs: unix.pthread_condattr_t;
if unix.pthread_condattr_init(&attrs) != 0 {
@@ -73,10 +107,19 @@ condition_destroy :: proc(c: ^Condition) {
// Awaken exactly one thread who is waiting on the condition
condition_signal :: proc(c: ^Condition) -> bool {
mutex_lock(c.mutex);
defer mutex_unlock(c.mutex);
atomic_swap(&c.flag, true, .Sequentially_Consistent);
return unix.pthread_cond_signal(&c.handle) == 0;
switch m in c.mutex {
case ^Mutex:
mutex_lock(m);
defer mutex_unlock(m);
atomic_swap(&c.flag, true, .Sequentially_Consistent);
return unix.pthread_cond_signal(&c.handle) == 0;
case ^Blocking_Mutex:
blocking_mutex_lock(m);
defer blocking_mutex_unlock(m);
atomic_swap(&c.flag, true, .Sequentially_Consistent);
return unix.pthread_cond_signal(&c.handle) == 0;
}
return false;
}
// Awaken all threads who are waiting on the condition
@@ -88,24 +131,48 @@ condition_broadcast :: proc(c: ^Condition) -> bool {
// Does not block if the condition has been signalled and no one
// has waited on it yet.
condition_wait_for :: proc(c: ^Condition) -> bool {
mutex_lock(c.mutex);
defer mutex_unlock(c.mutex);
// NOTE(tetra): If a thread comes by and steals the flag immediately after the signal occurs,
// the thread that gets signalled and wakes up, discovers that the flag was taken and goes
// back to sleep.
// Though this overall behavior is the most sane, there may be a better way to do this that means that
// the first thread to wait, gets the flag first.
if atomic_swap(&c.flag, false, .Sequentially_Consistent) {
return true;
}
for {
if unix.pthread_cond_wait(&c.handle, &c.mutex.handle) != 0 {
return false;
}
switch m in c.mutex {
case ^Mutex:
mutex_lock(m);
defer mutex_unlock(m);
// NOTE(tetra): If a thread comes by and steals the flag immediately after the signal occurs,
// the thread that gets signalled and wakes up, discovers that the flag was taken and goes
// back to sleep.
// Though this overall behavior is the most sane, there may be a better way to do this that means that
// the first thread to wait, gets the flag first.
if atomic_swap(&c.flag, false, .Sequentially_Consistent) {
return true;
}
}
for {
if unix.pthread_cond_wait(&c.handle, &m.handle) != 0 {
return false;
}
if atomic_swap(&c.flag, false, .Sequentially_Consistent) {
return true;
}
}
return false;
case ^Blocking_Mutex:
blocking_mutex_lock(m);
defer blocking_mutex_unlock(m);
// NOTE(tetra): If a thread comes by and steals the flag immediately after the signal occurs,
// the thread that gets signalled and wakes up, discovers that the flag was taken and goes
// back to sleep.
// Though this overall behavior is the most sane, there may be a better way to do this that means that
// the first thread to wait, gets the flag first.
if atomic_swap(&c.flag, false, .Sequentially_Consistent) {
return true;
}
for {
if unix.pthread_cond_wait(&c.handle, &m.handle) != 0 {
return false;
}
if atomic_swap(&c.flag, false, .Sequentially_Consistent) {
return true;
}
}
return false;
}
return false;
}

View File

@@ -4,24 +4,6 @@ package sync
import win32 "core:sys/windows"
import "core:time"
Mutex :: struct {
_critical_section: win32.CRITICAL_SECTION,
}
Blocking_Mutex :: struct {
_handle: win32.SRWLOCK,
}
Condition_Mutex_Ptr :: union{^Mutex, ^Blocking_Mutex};
// Blocks until signalled.
// When signalled, awakens exactly one waiting thread.
Condition :: struct {
_handle: win32.CONDITION_VARIABLE,
mutex: Condition_Mutex_Ptr,
}
// When waited upon, blocks until the internal count is greater than zero, then subtracts one.
// Posting to the semaphore increases the count by one, or the provided amount.
@@ -29,11 +11,6 @@ Semaphore :: struct {
_handle: win32.HANDLE,
}
RW_Lock :: struct {
_handle: win32.SRWLOCK,
}
semaphore_init :: proc(s: ^Semaphore, initial_count := 0) {
s._handle = win32.CreateSemaphoreW(nil, i32(initial_count), 1<<31-1, nil);
}
@@ -53,6 +30,11 @@ semaphore_wait_for :: proc(s: ^Semaphore) {
}
Mutex :: struct {
_critical_section: win32.CRITICAL_SECTION,
}
mutex_init :: proc(m: ^Mutex, spin_count := 0) {
win32.InitializeCriticalSectionAndSpinCount(&m._critical_section, u32(spin_count));
}
@@ -73,6 +55,11 @@ mutex_unlock :: proc(m: ^Mutex) {
win32.LeaveCriticalSection(&m._critical_section);
}
Blocking_Mutex :: struct {
_handle: win32.SRWLOCK,
}
blocking_mutex_init :: proc(m: ^Blocking_Mutex) {
//
}
@@ -94,6 +81,15 @@ blocking_mutex_unlock :: proc(m: ^Blocking_Mutex) {
}
// Blocks until signalled.
// When signalled, awakens exactly one waiting thread.
Condition :: struct {
_handle: win32.CONDITION_VARIABLE,
mutex: Condition_Mutex_Ptr,
}
condition_init :: proc(c: ^Condition, mutex: Condition_Mutex_Ptr) -> bool {
assert(mutex != nil);
win32.InitializeConditionVariable(&c._handle);
@@ -143,6 +139,11 @@ condition_wait_for_timeout :: proc(c: ^Condition, duration: time.Duration) -> bo
RW_Lock :: struct {
_handle: win32.SRWLOCK,
}
rw_lock_init :: proc(l: ^RW_Lock) {
l._handle = win32.SRWLOCK_INIT;
}
@@ -167,6 +168,3 @@ rw_lock_read_unlock :: proc(l: ^RW_Lock) {
rw_lock_write_unlock :: proc(l: ^RW_Lock) {
win32.ReleaseSRWLockExclusive(&l._handle);
}