Replace sync with sync2

This commit is contained in:
gingerBill
2022-03-30 17:42:44 +01:00
parent 72ae061769
commit 4eb4ae6305
37 changed files with 79 additions and 2047 deletions

View File

@@ -1,7 +1,7 @@
//+private
package mem_virtual
import sync "core:sync/sync2"
import "core:sync"
Platform_Memory_Block :: struct {
block: Memory_Block,

View File

@@ -1,6 +1,6 @@
package os2
import sync "core:sync/sync2"
import "core:sync"
import "core:time"
import "core:runtime"

View File

@@ -1,168 +1,79 @@
package sync
package sync2
import "core:intrinsics"
Ordering :: enum {
Relaxed, // Monotonic
Release,
Acquire,
Acquire_Release,
Sequentially_Consistent,
}
cpu_relax :: intrinsics.cpu_relax
strongest_failure_ordering_table := [Ordering]Ordering{
.Relaxed = .Relaxed,
.Release = .Relaxed,
.Acquire = .Acquire,
.Acquire_Release = .Acquire,
.Sequentially_Consistent = .Sequentially_Consistent,
}
atomic_fence :: intrinsics.atomic_fence
atomic_fence_acquire :: intrinsics.atomic_fence_acq
atomic_fence_release :: intrinsics.atomic_fence_rel
atomic_fence_acqrel :: intrinsics.atomic_fence_acqrel
strongest_failure_ordering :: #force_inline proc(order: Ordering) -> Ordering {
return strongest_failure_ordering_table[order]
}
atomic_store :: intrinsics.atomic_store
atomic_store_release :: intrinsics.atomic_store_rel
atomic_store_relaxed :: intrinsics.atomic_store_relaxed
atomic_store_unordered :: intrinsics.atomic_store_unordered
fence :: #force_inline proc($order: Ordering) {
when order == .Relaxed { #panic("there is no such thing as a relaxed fence") }
else when order == .Release { intrinsics.atomic_fence_rel() }
else when order == .Acquire { intrinsics.atomic_fence_acq() }
else when order == .Acquire_Release { intrinsics.atomic_fence_acqrel() }
else when order == .Sequentially_Consistent { intrinsics.atomic_fence() }
else { #panic("unknown order") }
}
atomic_load :: intrinsics.atomic_load
atomic_load_acquire :: intrinsics.atomic_load_acq
atomic_load_relaxed :: intrinsics.atomic_load_relaxed
atomic_load_unordered :: intrinsics.atomic_load_unordered
atomic_add :: intrinsics.atomic_add
atomic_add_acquire :: intrinsics.atomic_add_acq
atomic_add_release :: intrinsics.atomic_add_rel
atomic_add_acqrel :: intrinsics.atomic_add_acqrel
atomic_add_relaxed :: intrinsics.atomic_add_relaxed
atomic_sub :: intrinsics.atomic_sub
atomic_sub_acquire :: intrinsics.atomic_sub_acq
atomic_sub_release :: intrinsics.atomic_sub_rel
atomic_sub_acqrel :: intrinsics.atomic_sub_acqrel
atomic_sub_relaxed :: intrinsics.atomic_sub_relaxed
atomic_and :: intrinsics.atomic_and
atomic_and_acquire :: intrinsics.atomic_and_acq
atomic_and_release :: intrinsics.atomic_and_rel
atomic_and_acqrel :: intrinsics.atomic_and_acqrel
atomic_and_relaxed :: intrinsics.atomic_and_relaxed
atomic_nand :: intrinsics.atomic_nand
atomic_nand_acquire :: intrinsics.atomic_nand_acq
atomic_nand_release :: intrinsics.atomic_nand_rel
atomic_nand_acqrel :: intrinsics.atomic_nand_acqrel
atomic_nand_relaxed :: intrinsics.atomic_nand_relaxed
atomic_or :: intrinsics.atomic_or
atomic_or_acquire :: intrinsics.atomic_or_acq
atomic_or_release :: intrinsics.atomic_or_rel
atomic_or_acqrel :: intrinsics.atomic_or_acqrel
atomic_or_relaxed :: intrinsics.atomic_or_relaxed
atomic_xor :: intrinsics.atomic_xor
atomic_xor_acquire :: intrinsics.atomic_xor_acq
atomic_xor_release :: intrinsics.atomic_xor_rel
atomic_xor_acqrel :: intrinsics.atomic_xor_acqrel
atomic_xor_relaxed :: intrinsics.atomic_xor_relaxed
atomic_store :: #force_inline proc(dst: ^$T, val: T, $order: Ordering) {
when order == .Relaxed { intrinsics.atomic_store_relaxed(dst, val) }
else when order == .Release { intrinsics.atomic_store_rel(dst, val) }
else when order == .Sequentially_Consistent { intrinsics.atomic_store(dst, val) }
else when order == .Acquire { #panic("there is not such thing as an acquire store") }
else when order == .Acquire_Release { #panic("there is not such thing as an acquire/release store") }
else { #panic("unknown order") }
}
atomic_exchange :: intrinsics.atomic_xchg
atomic_exchange_acquire :: intrinsics.atomic_xchg_acq
atomic_exchange_release :: intrinsics.atomic_xchg_rel
atomic_exchange_acqrel :: intrinsics.atomic_xchg_acqrel
atomic_exchange_relaxed :: intrinsics.atomic_xchg_relaxed
atomic_load :: #force_inline proc(dst: ^$T, $order: Ordering) -> T {
when order == .Relaxed { return intrinsics.atomic_load_relaxed(dst) }
else when order == .Acquire { return intrinsics.atomic_load_acq(dst) }
else when order == .Sequentially_Consistent { return intrinsics.atomic_load(dst) }
else when order == .Release { #panic("there is no such thing as a release load") }
else when order == .Acquire_Release { #panic("there is no such thing as an acquire/release load") }
else { #panic("unknown order") }
}
atomic_swap :: #force_inline proc(dst: ^$T, val: T, $order: Ordering) -> T {
when order == .Relaxed { return intrinsics.atomic_xchg_relaxed(dst, val) }
else when order == .Release { return intrinsics.atomic_xchg_rel(dst, val) }
else when order == .Acquire { return intrinsics.atomic_xchg_acq(dst, val) }
else when order == .Acquire_Release { return intrinsics.atomic_xchg_acqrel(dst, val) }
else when order == .Sequentially_Consistent { return intrinsics.atomic_xchg(dst, val) }
else { #panic("unknown order") }
}
atomic_compare_exchange :: #force_inline proc(dst: ^$T, old, new: T, $success, $failure: Ordering) -> (val: T, ok: bool) {
when failure == .Relaxed {
when success == .Relaxed { return intrinsics.atomic_cxchg_relaxed(dst, old, new) }
else when success == .Acquire { return intrinsics.atomic_cxchg_acq_failrelaxed(dst, old, new) }
else when success == .Acquire_Release { return intrinsics.atomic_cxchg_acqrel_failrelaxed(dst, old, new) }
else when success == .Sequentially_Consistent { return intrinsics.atomic_cxchg_failrelaxed(dst, old, new) }
else when success == .Release { return intrinsics.atomic_cxchg_rel(dst, old, new) }
else { #panic("an unknown ordering combination") }
} else when failure == .Acquire {
when success == .Release { return intrinsics.atomic_cxchg_acqrel(dst, old, new) }
else when success == .Acquire { return intrinsics.atomic_cxchg_acq(dst, old, new) }
else { #panic("an unknown ordering combination") }
} else when failure == .Sequentially_Consistent {
when success == .Sequentially_Consistent { return intrinsics.atomic_cxchg(dst, old, new) }
else { #panic("an unknown ordering combination") }
} else when failure == .Acquire_Release {
#panic("there is not such thing as an acquire/release failure ordering")
} else when failure == .Release {
when success == .Acquire { return instrinsics.atomic_cxchg_failacq(dst, old, new) }
else { #panic("an unknown ordering combination") }
} else {
return T{}, false
}
}
atomic_compare_exchange_weak :: #force_inline proc(dst: ^$T, old, new: T, $success, $failure: Ordering) -> (val: T, ok: bool) {
when failure == .Relaxed {
when success == .Relaxed { return intrinsics.atomic_cxchgweak_relaxed(dst, old, new) }
else when success == .Acquire { return intrinsics.atomic_cxchgweak_acq_failrelaxed(dst, old, new) }
else when success == .Acquire_Release { return intrinsics.atomic_cxchgweak_acqrel_failrelaxed(dst, old, new) }
else when success == .Sequentially_Consistent { return intrinsics.atomic_cxchgweak_failrelaxed(dst, old, new) }
else when success == .Release { return intrinsics.atomic_cxchgweak_rel(dst, old, new) }
else { #panic("an unknown ordering combination") }
} else when failure == .Acquire {
when success == .Release { return intrinsics.atomic_cxchgweak_acqrel(dst, old, new) }
else when success == .Acquire { return intrinsics.atomic_cxchgweak_acq(dst, old, new) }
else { #panic("an unknown ordering combination") }
} else when failure == .Sequentially_Consistent {
when success == .Sequentially_Consistent { return intrinsics.atomic_cxchgweak(dst, old, new) }
else { #panic("an unknown ordering combination") }
} else when failure == .Acquire_Release {
#panic("there is not such thing as an acquire/release failure ordering")
} else when failure == .Release {
when success == .Acquire { return intrinsics.atomic_cxchgweak_failacq(dst, old, new) }
else { #panic("an unknown ordering combination") }
} else {
return T{}, false
}
}
atomic_add :: #force_inline proc(dst: ^$T, val: T, $order: Ordering) -> T {
when order == .Relaxed { return intrinsics.atomic_add_relaxed(dst, val) }
else when order == .Release { return intrinsics.atomic_add_rel(dst, val) }
else when order == .Acquire { return intrinsics.atomic_add_acq(dst, val) }
else when order == .Acquire_Release { return intrinsics.atomic_add_acqrel(dst, val) }
else when order == .Sequentially_Consistent { return intrinsics.atomic_add(dst, val) }
else { #panic("unknown order") }
}
atomic_sub :: #force_inline proc(dst: ^$T, val: T, $order: Ordering) -> T {
when order == .Relaxed { return intrinsics.atomic_sub_relaxed(dst, val) }
else when order == .Release { return intrinsics.atomic_sub_rel(dst, val) }
else when order == .Acquire { return intrinsics.atomic_sub_acq(dst, val) }
else when order == .Acquire_Release { return intrinsics.atomic_sub_acqrel(dst, val) }
else when order == .Sequentially_Consistent { return intrinsics.atomic_sub(dst, val) }
else { #panic("unknown order") }
}
atomic_and :: #force_inline proc(dst: ^$T, val: T, $order: Ordering) -> T {
when order == .Relaxed { return intrinsics.atomic_and_relaxed(dst, val) }
else when order == .Release { return intrinsics.atomic_and_rel(dst, val) }
else when order == .Acquire { return intrinsics.atomic_and_acq(dst, val) }
else when order == .Acquire_Release { return intrinsics.atomic_and_acqrel(dst, val) }
else when order == .Sequentially_Consistent { return intrinsics.atomic_and(dst, val) }
else { #panic("unknown order") }
}
atomic_nand :: #force_inline proc(dst: ^$T, val: T, $order: Ordering) -> T {
when order == .Relaxed { return intrinsics.atomic_nand_relaxed(dst, val) }
else when order == .Release { return intrinsics.atomic_nand_rel(dst, val) }
else when order == .Acquire { return intrinsics.atomic_nand_acq(dst, val) }
else when order == .Acquire_Release { return intrinsics.atomic_nand_acqrel(dst, val) }
else when order == .Sequentially_Consistent { return intrinsics.atomic_nand(dst, val) }
else { #panic("unknown order") }
}
atomic_or :: #force_inline proc(dst: ^$T, val: T, $order: Ordering) -> T {
when order == .Relaxed { return intrinsics.atomic_or_relaxed(dst, val) }
else when order == .Release { return intrinsics.atomic_or_rel(dst, val) }
else when order == .Acquire { return intrinsics.atomic_or_acq(dst, val) }
else when order == .Acquire_Release { return intrinsics.atomic_or_acqrel(dst, val) }
else when order == .Sequentially_Consistent { return intrinsics.atomic_or(dst, val) }
else { #panic("unknown order") }
}
atomic_xor :: #force_inline proc(dst: ^$T, val: T, $order: Ordering) -> T {
when order == .Relaxed { return intrinsics.atomic_xor_relaxed(dst, val) }
else when order == .Release { return intrinsics.atomic_xor_rel(dst, val) }
else when order == .Acquire { return intrinsics.atomic_xor_acq(dst, val) }
else when order == .Acquire_Release { return intrinsics.atomic_xor_acqrel(dst, val) }
else when order == .Sequentially_Consistent { return intrinsics.atomic_xor(dst, val) }
else { #panic("unknown order") }
}
// Returns value and optional ok boolean
atomic_compare_exchange_strong :: intrinsics.atomic_cxchg
atomic_compare_exchange_strong_acquire :: intrinsics.atomic_cxchg_acq
atomic_compare_exchange_strong_release :: intrinsics.atomic_cxchg_rel
atomic_compare_exchange_strong_acqrel :: intrinsics.atomic_cxchg_acqrel
atomic_compare_exchange_strong_relaxed :: intrinsics.atomic_cxchg_relaxed
atomic_compare_exchange_strong_failrelaxed :: intrinsics.atomic_cxchg_failrelaxed
atomic_compare_exchange_strong_failacquire :: intrinsics.atomic_cxchg_failacq
atomic_compare_exchange_strong_acquire_failrelaxed :: intrinsics.atomic_cxchg_acq_failrelaxed
atomic_compare_exchange_strong_acqrel_failrelaxed :: intrinsics.atomic_cxchg_acqrel_failrelaxed
// Returns value and optional ok boolean
atomic_compare_exchange_weak :: intrinsics.atomic_cxchgweak
atomic_compare_exchange_weak_acquire :: intrinsics.atomic_cxchgweak_acq
atomic_compare_exchange_weak_release :: intrinsics.atomic_cxchgweak_rel
atomic_compare_exchange_weak_acqrel :: intrinsics.atomic_cxchgweak_acqrel
atomic_compare_exchange_weak_relaxed :: intrinsics.atomic_cxchgweak_relaxed
atomic_compare_exchange_weak_failrelaxed :: intrinsics.atomic_cxchgweak_failrelaxed
atomic_compare_exchange_weak_failacquire :: intrinsics.atomic_cxchgweak_failacq
atomic_compare_exchange_weak_acquire_failrelaxed :: intrinsics.atomic_cxchgweak_acq_failrelaxed
atomic_compare_exchange_weak_acqrel_failrelaxed :: intrinsics.atomic_cxchgweak_acqrel_failrelaxed

View File

@@ -1,80 +0,0 @@
package sync
/*
A barrier enabling multiple threads to synchronize the beginning of some computation
Example:
package example
import "core:fmt"
import "core:sync"
import "core:thread"
barrier := &sync.Barrier{};
main :: proc() {
fmt.println("Start");
THREAD_COUNT :: 4;
threads: [THREAD_COUNT]^thread.Thread;
sync.barrier_init(barrier, THREAD_COUNT);
defer sync.barrier_destroy(barrier);
for _, i in threads {
threads[i] = thread.create_and_start(proc(t: ^thread.Thread) {
// Same messages will be printed together but without any interleaving
fmt.println("Getting ready!");
sync.barrier_wait(barrier);
fmt.println("Off their marks they go!");
});
}
for t in threads {
thread.destroy(t); // join and free thread
}
fmt.println("Finished");
}
*/
Barrier :: struct {
mutex: Blocking_Mutex,
cond: Condition,
index: int,
generation_id: int,
thread_count: int,
}
barrier_init :: proc(b: ^Barrier, thread_count: int) {
blocking_mutex_init(&b.mutex)
condition_init(&b.cond, &b.mutex)
b.index = 0
b.generation_id = 0
b.thread_count = thread_count
}
barrier_destroy :: proc(b: ^Barrier) {
blocking_mutex_destroy(&b.mutex)
condition_destroy(&b.cond)
}
// Block the current thread until all threads have rendezvoused
// Barrier can be reused after all threads rendezvoused once, and can be used continuously
barrier_wait :: proc(b: ^Barrier) -> (is_leader: bool) {
blocking_mutex_lock(&b.mutex)
defer blocking_mutex_unlock(&b.mutex)
local_gen := b.generation_id
b.index += 1
if b.index < b.thread_count {
for local_gen == b.generation_id && b.index < b.thread_count {
condition_wait_for(&b.cond)
}
return false
}
b.index = 0
b.generation_id += 1
condition_broadcast(&b.cond)
return true
}

View File

@@ -1,889 +0,0 @@
package sync
import "core:mem"
import "core:time"
import "core:intrinsics"
import "core:math/rand"
_, _ :: time, rand
Channel_Direction :: enum i8 {
Both = 0,
Send = +1,
Recv = -1,
}
Channel :: struct($T: typeid, $Direction := Channel_Direction.Both) {
using _internal: ^Raw_Channel,
}
channel_init :: proc(ch: ^$C/Channel($T, $D), cap := 0, allocator := context.allocator) {
context.allocator = allocator
ch._internal = raw_channel_create(size_of(T), align_of(T), cap)
return
}
channel_make :: proc($T: typeid, cap := 0, allocator := context.allocator) -> (ch: Channel(T, .Both)) {
context.allocator = allocator
ch._internal = raw_channel_create(size_of(T), align_of(T), cap)
return
}
channel_make_send :: proc($T: typeid, cap := 0, allocator := context.allocator) -> (ch: Channel(T, .Send)) {
context.allocator = allocator
ch._internal = raw_channel_create(size_of(T), align_of(T), cap)
return
}
channel_make_recv :: proc($T: typeid, cap := 0, allocator := context.allocator) -> (ch: Channel(T, .Recv)) {
context.allocator = allocator
ch._internal = raw_channel_create(size_of(T), align_of(T), cap)
return
}
channel_destroy :: proc(ch: $C/Channel($T, $D)) {
raw_channel_destroy(ch._internal)
}
channel_as_send :: proc(ch: $C/Channel($T, .Both)) -> (res: Channel(T, .Send)) {
res._internal = ch._internal
return
}
channel_as_recv :: proc(ch: $C/Channel($T, .Both)) -> (res: Channel(T, .Recv)) {
res._internal = ch._internal
return
}
channel_len :: proc(ch: $C/Channel($T, $D)) -> int {
return ch._internal.len if ch._internal != nil else 0
}
channel_cap :: proc(ch: $C/Channel($T, $D)) -> int {
return ch._internal.cap if ch._internal != nil else 0
}
channel_send :: proc(ch: $C/Channel($T, $D), msg: T, loc := #caller_location) where D >= .Both {
msg := msg
_ = raw_channel_send_impl(ch._internal, &msg, /*block*/true, loc)
}
channel_try_send :: proc(ch: $C/Channel($T, $D), msg: T, loc := #caller_location) -> bool where D >= .Both {
msg := msg
return raw_channel_send_impl(ch._internal, &msg, /*block*/false, loc)
}
channel_recv :: proc(ch: $C/Channel($T, $D), loc := #caller_location) -> (msg: T) where D <= .Both {
c := ch._internal
if c == nil {
panic(message="cannot recv message; channel is nil", loc=loc)
}
mutex_lock(&c.mutex)
raw_channel_recv_impl(c, &msg, loc)
mutex_unlock(&c.mutex)
return
}
channel_try_recv :: proc(ch: $C/Channel($T, $D), loc := #caller_location) -> (msg: T, ok: bool) where D <= .Both {
c := ch._internal
if c != nil && mutex_try_lock(&c.mutex) {
if c.len > 0 {
raw_channel_recv_impl(c, &msg, loc)
ok = true
}
mutex_unlock(&c.mutex)
}
return
}
channel_try_recv_ptr :: proc(ch: $C/Channel($T, $D), msg: ^T, loc := #caller_location) -> (ok: bool) where D <= .Both {
res: T
res, ok = channel_try_recv(ch, loc)
if ok && msg != nil {
msg^ = res
}
return
}
channel_is_nil :: proc(ch: $C/Channel($T, $D)) -> bool {
return ch._internal == nil
}
channel_is_open :: proc(ch: $C/Channel($T, $D)) -> bool {
c := ch._internal
return c != nil && !c.closed
}
channel_eq :: proc(a, b: $C/Channel($T, $D)) -> bool {
return a._internal == b._internal
}
channel_ne :: proc(a, b: $C/Channel($T, $D)) -> bool {
return a._internal != b._internal
}
channel_can_send :: proc(ch: $C/Channel($T, $D)) -> (ok: bool) where D >= .Both {
return raw_channel_can_send(ch._internal)
}
channel_can_recv :: proc(ch: $C/Channel($T, $D)) -> (ok: bool) where D <= .Both {
return raw_channel_can_recv(ch._internal)
}
channel_peek :: proc(ch: $C/Channel($T, $D)) -> int {
c := ch._internal
if c == nil {
return -1
}
if intrinsics.atomic_load(&c.closed) {
return -1
}
return intrinsics.atomic_load(&c.len)
}
channel_close :: proc(ch: $C/Channel($T, $D), loc := #caller_location) {
raw_channel_close(ch._internal, loc)
}
channel_iterator :: proc(ch: $C/Channel($T, $D)) -> (msg: T, ok: bool) where D <= .Both {
c := ch._internal
if c == nil {
return
}
if !c.closed || c.len > 0 {
msg, ok = channel_recv(ch), true
}
return
}
channel_drain :: proc(ch: $C/Channel($T, $D)) where D >= .Both {
raw_channel_drain(ch._internal)
}
channel_move :: proc(dst: $C1/Channel($T, $D1) src: $C2/Channel(T, $D2)) where D1 <= .Both, D2 >= .Both {
for msg in channel_iterator(src) {
channel_send(dst, msg)
}
}
Raw_Channel_Wait_Queue :: struct {
next: ^Raw_Channel_Wait_Queue,
state: ^uintptr,
}
Raw_Channel :: struct {
closed: bool,
ready: bool, // ready to recv
data_offset: u16, // data is stored at the end of this data structure
elem_size: u32,
len, cap: int,
read, write: int,
mutex: Mutex,
cond: Condition,
allocator: mem.Allocator,
sendq: ^Raw_Channel_Wait_Queue,
recvq: ^Raw_Channel_Wait_Queue,
}
raw_channel_wait_queue_insert :: proc(head: ^^Raw_Channel_Wait_Queue, val: ^Raw_Channel_Wait_Queue) {
val.next = head^
head^ = val
}
raw_channel_wait_queue_remove :: proc(head: ^^Raw_Channel_Wait_Queue, val: ^Raw_Channel_Wait_Queue) {
p := head
for p^ != nil && p^ != val {
p = &p^.next
}
if p != nil {
p^ = p^.next
}
}
raw_channel_create :: proc(elem_size, elem_align: int, cap := 0) -> ^Raw_Channel {
assert(int(u32(elem_size)) == elem_size)
s := size_of(Raw_Channel)
s = mem.align_forward_int(s, elem_align)
data_offset := uintptr(s)
s += elem_size * max(cap, 1)
a := max(elem_align, align_of(Raw_Channel))
c := (^Raw_Channel)(mem.alloc(s, a))
if c == nil {
return nil
}
c.data_offset = u16(data_offset)
c.elem_size = u32(elem_size)
c.len, c.cap = 0, max(cap, 0)
c.read, c.write = 0, 0
mutex_init(&c.mutex)
condition_init(&c.cond, &c.mutex)
c.allocator = context.allocator
c.closed = false
return c
}
raw_channel_destroy :: proc(c: ^Raw_Channel) {
if c == nil {
return
}
context.allocator = c.allocator
intrinsics.atomic_store(&c.closed, true)
condition_destroy(&c.cond)
mutex_destroy(&c.mutex)
free(c)
}
raw_channel_close :: proc(c: ^Raw_Channel, loc := #caller_location) {
if c == nil {
panic(message="cannot close nil channel", loc=loc)
}
mutex_lock(&c.mutex)
defer mutex_unlock(&c.mutex)
intrinsics.atomic_store(&c.closed, true)
// Release readers and writers
raw_channel_wait_queue_broadcast(c.recvq)
raw_channel_wait_queue_broadcast(c.sendq)
condition_broadcast(&c.cond)
}
raw_channel_send_impl :: proc(c: ^Raw_Channel, msg: rawptr, block: bool, loc := #caller_location) -> bool {
send :: proc(c: ^Raw_Channel, src: rawptr) {
data := uintptr(c) + uintptr(c.data_offset)
dst := data + uintptr(c.write * int(c.elem_size))
mem.copy(rawptr(dst), src, int(c.elem_size))
c.len += 1
c.write = (c.write + 1) % max(c.cap, 1)
}
switch {
case c == nil:
panic(message="cannot send message; channel is nil", loc=loc)
case c.closed:
panic(message="cannot send message; channel is closed", loc=loc)
}
mutex_lock(&c.mutex)
defer mutex_unlock(&c.mutex)
if c.cap > 0 {
if !block && c.len >= c.cap {
return false
}
for c.len >= c.cap {
condition_wait_for(&c.cond)
}
} else if c.len > 0 { // TODO(bill): determine correct behaviour
if !block {
return false
}
condition_wait_for(&c.cond)
} else if c.len == 0 && !block {
return false
}
send(c, msg)
condition_signal(&c.cond)
raw_channel_wait_queue_signal(c.recvq)
return true
}
raw_channel_recv_impl :: proc(c: ^Raw_Channel, res: rawptr, loc := #caller_location) {
recv :: proc(c: ^Raw_Channel, dst: rawptr, loc := #caller_location) {
if c.len < 1 {
panic(message="cannot recv message; channel is empty", loc=loc)
}
c.len -= 1
data := uintptr(c) + uintptr(c.data_offset)
src := data + uintptr(c.read * int(c.elem_size))
mem.copy(dst, rawptr(src), int(c.elem_size))
c.read = (c.read + 1) % max(c.cap, 1)
}
if c == nil {
panic(message="cannot recv message; channel is nil", loc=loc)
}
intrinsics.atomic_store(&c.ready, true)
for c.len < 1 {
raw_channel_wait_queue_signal(c.sendq)
condition_wait_for(&c.cond)
}
intrinsics.atomic_store(&c.ready, false)
recv(c, res, loc)
if c.cap > 0 {
if c.len == c.cap - 1 {
// NOTE(bill): Only signal on the last one
condition_signal(&c.cond)
}
} else {
condition_signal(&c.cond)
}
}
raw_channel_can_send :: proc(c: ^Raw_Channel) -> (ok: bool) {
if c == nil {
return false
}
mutex_lock(&c.mutex)
switch {
case c.closed:
ok = false
case c.cap > 0:
ok = c.ready && c.len < c.cap
case:
ok = c.ready && c.len == 0
}
mutex_unlock(&c.mutex)
return
}
raw_channel_can_recv :: proc(c: ^Raw_Channel) -> (ok: bool) {
if c == nil {
return false
}
mutex_lock(&c.mutex)
ok = c.len > 0
mutex_unlock(&c.mutex)
return
}
raw_channel_drain :: proc(c: ^Raw_Channel) {
if c == nil {
return
}
mutex_lock(&c.mutex)
c.len = 0
c.read = 0
c.write = 0
mutex_unlock(&c.mutex)
}
MAX_SELECT_CHANNELS :: 64
SELECT_MAX_TIMEOUT :: max(time.Duration)
Select_Command :: enum {
Recv,
Send,
}
Select_Channel :: struct {
channel: ^Raw_Channel,
command: Select_Command,
}
select :: proc(channels: ..Select_Channel) -> (index: int) {
return select_timeout(SELECT_MAX_TIMEOUT, ..channels)
}
select_timeout :: proc(timeout: time.Duration, channels: ..Select_Channel) -> (index: int) {
switch len(channels) {
case 0:
panic("sync: select with no channels")
}
assert(len(channels) <= MAX_SELECT_CHANNELS)
backing: [MAX_SELECT_CHANNELS]int
queues: [MAX_SELECT_CHANNELS]Raw_Channel_Wait_Queue
candidates := backing[:]
cap := len(channels)
candidates = candidates[:cap]
count := u32(0)
for c, i in channels {
if c.channel == nil {
continue
}
switch c.command {
case .Recv:
if raw_channel_can_recv(c.channel) {
candidates[count] = i
count += 1
}
case .Send:
if raw_channel_can_send(c.channel) {
candidates[count] = i
count += 1
}
}
}
if count == 0 {
wait_state: uintptr = 0
for _, i in channels {
q := &queues[i]
q.state = &wait_state
}
for c, i in channels {
if c.channel == nil {
continue
}
q := &queues[i]
switch c.command {
case .Recv: raw_channel_wait_queue_insert(&c.channel.recvq, q)
case .Send: raw_channel_wait_queue_insert(&c.channel.sendq, q)
}
}
raw_channel_wait_queue_wait_on(&wait_state, timeout)
for c, i in channels {
if c.channel == nil {
continue
}
q := &queues[i]
switch c.command {
case .Recv: raw_channel_wait_queue_remove(&c.channel.recvq, q)
case .Send: raw_channel_wait_queue_remove(&c.channel.sendq, q)
}
}
for c, i in channels {
switch c.command {
case .Recv:
if raw_channel_can_recv(c.channel) {
candidates[count] = i
count += 1
}
case .Send:
if raw_channel_can_send(c.channel) {
candidates[count] = i
count += 1
}
}
}
if count == 0 && timeout == SELECT_MAX_TIMEOUT {
index = -1
return
}
assert(count != 0)
}
t := time.now()
r := rand.create(transmute(u64)t)
i := rand.uint32(&r)
index = candidates[i % count]
return
}
select_recv :: proc(channels: ..^Raw_Channel) -> (index: int) {
switch len(channels) {
case 0:
panic("sync: select with no channels")
}
assert(len(channels) <= MAX_SELECT_CHANNELS)
backing: [MAX_SELECT_CHANNELS]int
queues: [MAX_SELECT_CHANNELS]Raw_Channel_Wait_Queue
candidates := backing[:]
cap := len(channels)
candidates = candidates[:cap]
count := u32(0)
for c, i in channels {
if raw_channel_can_recv(c) {
candidates[count] = i
count += 1
}
}
if count == 0 {
state: uintptr
for c, i in channels {
q := &queues[i]
q.state = &state
raw_channel_wait_queue_insert(&c.recvq, q)
}
raw_channel_wait_queue_wait_on(&state, SELECT_MAX_TIMEOUT)
for c, i in channels {
q := &queues[i]
raw_channel_wait_queue_remove(&c.recvq, q)
}
for c, i in channels {
if raw_channel_can_recv(c) {
candidates[count] = i
count += 1
}
}
assert(count != 0)
}
t := time.now()
r := rand.create(transmute(u64)t)
i := rand.uint32(&r)
index = candidates[i % count]
return
}
select_recv_msg :: proc(channels: ..$C/Channel($T, $D)) -> (msg: T, index: int) {
switch len(channels) {
case 0:
panic("sync: select with no channels")
}
assert(len(channels) <= MAX_SELECT_CHANNELS)
queues: [MAX_SELECT_CHANNELS]Raw_Channel_Wait_Queue
candidates: [MAX_SELECT_CHANNELS]int
count := u32(0)
for c, i in channels {
if raw_channel_can_recv(c) {
candidates[count] = i
count += 1
}
}
if count == 0 {
state: uintptr
for c, i in channels {
q := &queues[i]
q.state = &state
raw_channel_wait_queue_insert(&c.recvq, q)
}
raw_channel_wait_queue_wait_on(&state, SELECT_MAX_TIMEOUT)
for c, i in channels {
q := &queues[i]
raw_channel_wait_queue_remove(&c.recvq, q)
}
for c, i in channels {
if raw_channel_can_recv(c) {
candidates[count] = i
count += 1
}
}
assert(count != 0)
}
t := time.now()
r := rand.create(transmute(u64)t)
i := rand.uint32(&r)
index = candidates[i % count]
msg = channel_recv(channels[index])
return
}
select_send_msg :: proc(msg: $T, channels: ..$C/Channel(T, $D)) -> (index: int) {
switch len(channels) {
case 0:
panic("sync: select with no channels")
}
assert(len(channels) <= MAX_SELECT_CHANNELS)
backing: [MAX_SELECT_CHANNELS]int
queues: [MAX_SELECT_CHANNELS]Raw_Channel_Wait_Queue
candidates := backing[:]
cap := len(channels)
candidates = candidates[:cap]
count := u32(0)
for c, i in channels {
if raw_channel_can_recv(c) {
candidates[count] = i
count += 1
}
}
if count == 0 {
state: uintptr
for c, i in channels {
q := &queues[i]
q.state = &state
raw_channel_wait_queue_insert(&c.recvq, q)
}
raw_channel_wait_queue_wait_on(&state, SELECT_MAX_TIMEOUT)
for c, i in channels {
q := &queues[i]
raw_channel_wait_queue_remove(&c.recvq, q)
}
for c, i in channels {
if raw_channel_can_recv(c) {
candidates[count] = i
count += 1
}
}
assert(count != 0)
}
t := time.now()
r := rand.create(transmute(u64)t)
i := rand.uint32(&r)
index = candidates[i % count]
if msg != nil {
channel_send(channels[index], msg)
}
return
}
select_send :: proc(channels: ..^Raw_Channel) -> (index: int) {
switch len(channels) {
case 0:
panic("sync: select with no channels")
}
assert(len(channels) <= MAX_SELECT_CHANNELS)
candidates: [MAX_SELECT_CHANNELS]int
queues: [MAX_SELECT_CHANNELS]Raw_Channel_Wait_Queue
count := u32(0)
for c, i in channels {
if raw_channel_can_send(c) {
candidates[count] = i
count += 1
}
}
if count == 0 {
state: uintptr
for c, i in channels {
q := &queues[i]
q.state = &state
raw_channel_wait_queue_insert(&c.sendq, q)
}
raw_channel_wait_queue_wait_on(&state, SELECT_MAX_TIMEOUT)
for c, i in channels {
q := &queues[i]
raw_channel_wait_queue_remove(&c.sendq, q)
}
for c, i in channels {
if raw_channel_can_send(c) {
candidates[count] = i
count += 1
}
}
assert(count != 0)
}
t := time.now()
r := rand.create(transmute(u64)t)
i := rand.uint32(&r)
index = candidates[i % count]
return
}
select_try :: proc(channels: ..Select_Channel) -> (index: int) {
switch len(channels) {
case 0:
panic("sync: select with no channels")
}
assert(len(channels) <= MAX_SELECT_CHANNELS)
backing: [MAX_SELECT_CHANNELS]int
candidates := backing[:]
cap := len(channels)
candidates = candidates[:cap]
count := u32(0)
for c, i in channels {
switch c.command {
case .Recv:
if raw_channel_can_recv(c.channel) {
candidates[count] = i
count += 1
}
case .Send:
if raw_channel_can_send(c.channel) {
candidates[count] = i
count += 1
}
}
}
if count == 0 {
index = -1
return
}
t := time.now()
r := rand.create(transmute(u64)t)
i := rand.uint32(&r)
index = candidates[i % count]
return
}
select_try_recv :: proc(channels: ..^Raw_Channel) -> (index: int) {
switch len(channels) {
case 0:
index = -1
return
case 1:
index = -1
if raw_channel_can_recv(channels[0]) {
index = 0
}
return
}
assert(len(channels) <= MAX_SELECT_CHANNELS)
candidates: [MAX_SELECT_CHANNELS]int
count := u32(0)
for c, i in channels {
if raw_channel_can_recv(c) {
candidates[count] = i
count += 1
}
}
if count == 0 {
index = -1
return
}
t := time.now()
r := rand.create(transmute(u64)t)
i := rand.uint32(&r)
index = candidates[i % count]
return
}
select_try_send :: proc(channels: ..^Raw_Channel) -> (index: int) #no_bounds_check {
switch len(channels) {
case 0:
return -1
case 1:
if raw_channel_can_send(channels[0]) {
return 0
}
return -1
}
assert(len(channels) <= MAX_SELECT_CHANNELS)
candidates: [MAX_SELECT_CHANNELS]int
count := u32(0)
for c, i in channels {
if raw_channel_can_send(c) {
candidates[count] = i
count += 1
}
}
if count == 0 {
index = -1
return
}
t := time.now()
r := rand.create(transmute(u64)t)
i := rand.uint32(&r)
index = candidates[i % count]
return
}
select_try_recv_msg :: proc(channels: ..$C/Channel($T, $D)) -> (msg: T, index: int) {
switch len(channels) {
case 0:
index = -1
return
case 1:
ok: bool
if msg, ok = channel_try_recv(channels[0]); ok {
index = 0
}
return
}
assert(len(channels) <= MAX_SELECT_CHANNELS)
candidates: [MAX_SELECT_CHANNELS]int
count := u32(0)
for c, i in channels {
if channel_can_recv(c) {
candidates[count] = i
count += 1
}
}
if count == 0 {
index = -1
return
}
t := time.now()
r := rand.create(transmute(u64)t)
i := rand.uint32(&r)
index = candidates[i % count]
msg = channel_recv(channels[index])
return
}
select_try_send_msg :: proc(msg: $T, channels: ..$C/Channel(T, $D)) -> (index: int) {
index = -1
switch len(channels) {
case 0:
return
case 1:
if channel_try_send(channels[0], msg) {
index = 0
}
return
}
assert(len(channels) <= MAX_SELECT_CHANNELS)
candidates: [MAX_SELECT_CHANNELS]int
count := u32(0)
for c, i in channels {
if raw_channel_can_send(c) {
candidates[count] = i
count += 1
}
}
if count == 0 {
index = -1
return
}
t := time.now()
r := rand.create(transmute(u64)t)
i := rand.uint32(&r)
index = candidates[i % count]
channel_send(channels[index], msg)
return
}

View File

@@ -1,16 +0,0 @@
// +build linux, darwin, freebsd, openbsd
package sync
import "core:time"
raw_channel_wait_queue_wait_on :: proc(state: ^uintptr, timeout: time.Duration) {
// stub
}
raw_channel_wait_queue_signal :: proc(q: ^Raw_Channel_Wait_Queue) {
// stub
}
raw_channel_wait_queue_broadcast :: proc(q: ^Raw_Channel_Wait_Queue) {
// stub
}

View File

@@ -1,33 +0,0 @@
package sync
import "core:intrinsics"
import win32 "core:sys/windows"
import "core:time"
raw_channel_wait_queue_wait_on :: proc(state: ^uintptr, timeout: time.Duration) {
ms: win32.DWORD = win32.INFINITE
if max(time.Duration) != SELECT_MAX_TIMEOUT {
ms = win32.DWORD((max(time.duration_nanoseconds(timeout), 0) + 999999)/1000000)
}
v := intrinsics.atomic_load(state)
for v == 0 {
win32.WaitOnAddress(state, &v, size_of(state^), ms)
v = intrinsics.atomic_load(state)
}
intrinsics.atomic_store(state, 0)
}
raw_channel_wait_queue_signal :: proc(q: ^Raw_Channel_Wait_Queue) {
for x := q; x != nil; x = x.next {
intrinsics.atomic_add(x.state, 1)
win32.WakeByAddressSingle(x.state)
}
}
raw_channel_wait_queue_broadcast :: proc(q: ^Raw_Channel_Wait_Queue) {
for x := q; x != nil; x = x.next {
intrinsics.atomic_add(x.state, 1)
win32.WakeByAddressAll(x.state)
}
}

View File

@@ -1,123 +0,0 @@
package sync
import "core:intrinsics"
cpu_relax :: #force_inline proc "contextless" () {
intrinsics.cpu_relax()
}
Condition_Mutex_Ptr :: union{^Mutex, ^Blocking_Mutex}
Ticket_Mutex :: struct {
ticket: u64,
serving: u64,
}
ticket_mutex_init :: proc(m: ^Ticket_Mutex) {
atomic_store(&m.ticket, 0, .Relaxed)
atomic_store(&m.serving, 0, .Relaxed)
}
ticket_mutex_lock :: #force_inline proc(m: ^Ticket_Mutex) {
ticket := atomic_add(&m.ticket, 1, .Relaxed)
for ticket != atomic_load(&m.serving, .Acquire) {
intrinsics.cpu_relax()
}
}
ticket_mutex_unlock :: #force_inline proc(m: ^Ticket_Mutex) {
atomic_add(&m.serving, 1, .Relaxed)
}
Benaphore :: struct {
counter: int,
sema: Semaphore,
}
benaphore_init :: proc(b: ^Benaphore) {
intrinsics.atomic_store(&b.counter, 0)
semaphore_init(&b.sema)
}
benaphore_destroy :: proc(b: ^Benaphore) {
semaphore_destroy(&b.sema)
}
benaphore_lock :: proc(b: ^Benaphore) {
if intrinsics.atomic_add_acq(&b.counter, 1) > 1 {
semaphore_wait_for(&b.sema)
}
}
benaphore_try_lock :: proc(b: ^Benaphore) -> bool {
v, _ := intrinsics.atomic_cxchg_acq(&b.counter, 1, 0)
return v == 0
}
benaphore_unlock :: proc(b: ^Benaphore) {
if intrinsics.atomic_sub_rel(&b.counter, 1) > 0 {
semaphore_post(&b.sema)
}
}
Recursive_Benaphore :: struct {
counter: int,
owner: int,
recursion: int,
sema: Semaphore,
}
recursive_benaphore_init :: proc(b: ^Recursive_Benaphore) {
intrinsics.atomic_store(&b.counter, 0)
semaphore_init(&b.sema)
}
recursive_benaphore_destroy :: proc(b: ^Recursive_Benaphore) {
semaphore_destroy(&b.sema)
}
recursive_benaphore_lock :: proc(b: ^Recursive_Benaphore) {
tid := current_thread_id()
if intrinsics.atomic_add_acq(&b.counter, 1) > 1 {
if tid != b.owner {
semaphore_wait_for(&b.sema)
}
}
// inside the lock
b.owner = tid
b.recursion += 1
}
recursive_benaphore_try_lock :: proc(b: ^Recursive_Benaphore) -> bool {
tid := current_thread_id()
if b.owner == tid {
intrinsics.atomic_add_acq(&b.counter, 1)
} else {
v, _ := intrinsics.atomic_cxchg_acq(&b.counter, 1, 0)
if v != 0 {
return false
}
// inside the lock
b.owner = tid
}
b.recursion += 1
return true
}
recursive_benaphore_unlock :: proc(b: ^Recursive_Benaphore) {
tid := current_thread_id()
assert(tid == b.owner)
b.recursion -= 1
recursion := b.recursion
if recursion == 0 {
b.owner = 0
}
if intrinsics.atomic_sub_rel(&b.counter, 1) > 0 {
if recursion == 0 {
semaphore_post(&b.sema)
}
}
// outside the lock
}

View File

@@ -1,79 +0,0 @@
package sync2
import "core:intrinsics"
cpu_relax :: intrinsics.cpu_relax
atomic_fence :: intrinsics.atomic_fence
atomic_fence_acquire :: intrinsics.atomic_fence_acq
atomic_fence_release :: intrinsics.atomic_fence_rel
atomic_fence_acqrel :: intrinsics.atomic_fence_acqrel
atomic_store :: intrinsics.atomic_store
atomic_store_release :: intrinsics.atomic_store_rel
atomic_store_relaxed :: intrinsics.atomic_store_relaxed
atomic_store_unordered :: intrinsics.atomic_store_unordered
atomic_load :: intrinsics.atomic_load
atomic_load_acquire :: intrinsics.atomic_load_acq
atomic_load_relaxed :: intrinsics.atomic_load_relaxed
atomic_load_unordered :: intrinsics.atomic_load_unordered
atomic_add :: intrinsics.atomic_add
atomic_add_acquire :: intrinsics.atomic_add_acq
atomic_add_release :: intrinsics.atomic_add_rel
atomic_add_acqrel :: intrinsics.atomic_add_acqrel
atomic_add_relaxed :: intrinsics.atomic_add_relaxed
atomic_sub :: intrinsics.atomic_sub
atomic_sub_acquire :: intrinsics.atomic_sub_acq
atomic_sub_release :: intrinsics.atomic_sub_rel
atomic_sub_acqrel :: intrinsics.atomic_sub_acqrel
atomic_sub_relaxed :: intrinsics.atomic_sub_relaxed
atomic_and :: intrinsics.atomic_and
atomic_and_acquire :: intrinsics.atomic_and_acq
atomic_and_release :: intrinsics.atomic_and_rel
atomic_and_acqrel :: intrinsics.atomic_and_acqrel
atomic_and_relaxed :: intrinsics.atomic_and_relaxed
atomic_nand :: intrinsics.atomic_nand
atomic_nand_acquire :: intrinsics.atomic_nand_acq
atomic_nand_release :: intrinsics.atomic_nand_rel
atomic_nand_acqrel :: intrinsics.atomic_nand_acqrel
atomic_nand_relaxed :: intrinsics.atomic_nand_relaxed
atomic_or :: intrinsics.atomic_or
atomic_or_acquire :: intrinsics.atomic_or_acq
atomic_or_release :: intrinsics.atomic_or_rel
atomic_or_acqrel :: intrinsics.atomic_or_acqrel
atomic_or_relaxed :: intrinsics.atomic_or_relaxed
atomic_xor :: intrinsics.atomic_xor
atomic_xor_acquire :: intrinsics.atomic_xor_acq
atomic_xor_release :: intrinsics.atomic_xor_rel
atomic_xor_acqrel :: intrinsics.atomic_xor_acqrel
atomic_xor_relaxed :: intrinsics.atomic_xor_relaxed
atomic_exchange :: intrinsics.atomic_xchg
atomic_exchange_acquire :: intrinsics.atomic_xchg_acq
atomic_exchange_release :: intrinsics.atomic_xchg_rel
atomic_exchange_acqrel :: intrinsics.atomic_xchg_acqrel
atomic_exchange_relaxed :: intrinsics.atomic_xchg_relaxed
// Returns value and optional ok boolean
atomic_compare_exchange_strong :: intrinsics.atomic_cxchg
atomic_compare_exchange_strong_acquire :: intrinsics.atomic_cxchg_acq
atomic_compare_exchange_strong_release :: intrinsics.atomic_cxchg_rel
atomic_compare_exchange_strong_acqrel :: intrinsics.atomic_cxchg_acqrel
atomic_compare_exchange_strong_relaxed :: intrinsics.atomic_cxchg_relaxed
atomic_compare_exchange_strong_failrelaxed :: intrinsics.atomic_cxchg_failrelaxed
atomic_compare_exchange_strong_failacquire :: intrinsics.atomic_cxchg_failacq
atomic_compare_exchange_strong_acquire_failrelaxed :: intrinsics.atomic_cxchg_acq_failrelaxed
atomic_compare_exchange_strong_acqrel_failrelaxed :: intrinsics.atomic_cxchg_acqrel_failrelaxed
// Returns value and optional ok boolean
atomic_compare_exchange_weak :: intrinsics.atomic_cxchgweak
atomic_compare_exchange_weak_acquire :: intrinsics.atomic_cxchgweak_acq
atomic_compare_exchange_weak_release :: intrinsics.atomic_cxchgweak_rel
atomic_compare_exchange_weak_acqrel :: intrinsics.atomic_cxchgweak_acqrel
atomic_compare_exchange_weak_relaxed :: intrinsics.atomic_cxchgweak_relaxed
atomic_compare_exchange_weak_failrelaxed :: intrinsics.atomic_cxchgweak_failrelaxed
atomic_compare_exchange_weak_failacquire :: intrinsics.atomic_cxchgweak_failacq
atomic_compare_exchange_weak_acquire_failrelaxed :: intrinsics.atomic_cxchgweak_acq_failrelaxed
atomic_compare_exchange_weak_acqrel_failrelaxed :: intrinsics.atomic_cxchgweak_acqrel_failrelaxed

View File

@@ -1,54 +0,0 @@
package sync
import "core:sys/darwin"
import "core:c"
foreign import pthread "System.framework"
current_thread_id :: proc "contextless" () -> int {
tid: u64
// NOTE(Oskar): available from OSX 10.6 and iOS 3.2.
// For older versions there is `syscall(SYS_thread_selfid)`, but not really
// the same thing apparently.
foreign pthread { pthread_threadid_np :: proc "c" (rawptr, ^u64) -> c.int --- }
pthread_threadid_np(nil, &tid)
return int(tid)
}
// The Darwin docs say it best:
// A semaphore is much like a lock, except that a finite number of threads can hold it simultaneously.
// Semaphores can be thought of as being much like piles of tokens; multiple threads can take these tokens,
// but when there are none left, a thread must wait until another thread returns one.
Semaphore :: struct #align 16 {
handle: darwin.semaphore_t,
}
// TODO(tetra): Only marked with alignment because we cannot mark distinct integers with alignments.
// See core/sys/unix/pthread_linux.odin/pthread_t.
semaphore_init :: proc(s: ^Semaphore, initial_count := 0) {
ct := darwin.mach_task_self()
res := darwin.semaphore_create(ct, &s.handle, 0, c.int(initial_count))
assert(res == 0)
}
semaphore_destroy :: proc(s: ^Semaphore) {
ct := darwin.mach_task_self()
res := darwin.semaphore_destroy(ct, s.handle)
assert(res == 0)
s.handle = {}
}
semaphore_post :: proc(s: ^Semaphore, count := 1) {
// NOTE: SPEED: If there's one syscall to do this, we should use it instead of the loop.
for in 0..<count {
res := darwin.semaphore_signal(s.handle)
assert(res == 0)
}
}
semaphore_wait_for :: proc(s: ^Semaphore) {
res := darwin.semaphore_wait(s.handle)
assert(res == 0)
}

View File

@@ -1,40 +0,0 @@
package sync
import "core:sys/unix"
import "core:intrinsics"
current_thread_id :: proc "contextless" () -> int {
SYS_GETTID :: 186
return int(intrinsics.syscall(SYS_GETTID))
}
// The Darwin docs say it best:
// A semaphore is much like a lock, except that a finite number of threads can hold it simultaneously.
// Semaphores can be thought of as being much like piles of tokens; multiple threads can take these tokens,
// but when there are none left, a thread must wait until another thread returns one.
Semaphore :: struct #align 16 {
handle: unix.sem_t,
}
semaphore_init :: proc(s: ^Semaphore, initial_count := 0) {
assert(unix.sem_init(&s.handle, 0, u32(initial_count)) == 0)
}
semaphore_destroy :: proc(s: ^Semaphore) {
assert(unix.sem_destroy(&s.handle) == 0)
s.handle = {}
}
semaphore_post :: proc(s: ^Semaphore, count := 1) {
// NOTE: SPEED: If there's one syscall to do this, we should use it instead of the loop.
for in 0..<count {
assert(unix.sem_post(&s.handle) == 0)
}
}
semaphore_wait_for :: proc(s: ^Semaphore) {
assert(unix.sem_wait(&s.handle) == 0)
}

View File

@@ -1,36 +0,0 @@
package sync
import "core:sys/unix"
current_thread_id :: proc "contextless" () -> int {
return unix.sys_gettid()
}
// The Darwin docs say it best:
// A semaphore is much like a lock, except that a finite number of threads can hold it simultaneously.
// Semaphores can be thought of as being much like piles of tokens; multiple threads can take these tokens,
// but when there are none left, a thread must wait until another thread returns one.
Semaphore :: struct #align 16 {
handle: unix.sem_t,
}
semaphore_init :: proc(s: ^Semaphore, initial_count := 0) {
assert(unix.sem_init(&s.handle, 0, u32(initial_count)) == 0)
}
semaphore_destroy :: proc(s: ^Semaphore) {
assert(unix.sem_destroy(&s.handle) == 0)
s.handle = {}
}
semaphore_post :: proc(s: ^Semaphore, count := 1) {
// NOTE: SPEED: If there's one syscall to do this, we should use it instead of the loop.
for in 0..<count {
assert(unix.sem_post(&s.handle) == 0)
}
}
semaphore_wait_for :: proc(s: ^Semaphore) {
assert(unix.sem_wait(&s.handle) == 0)
}

View File

@@ -1,36 +0,0 @@
package sync
import "core:sys/unix"
import "core:os"
current_thread_id :: proc "contextless" () -> int {
return os.current_thread_id()
}
// The Darwin docs say it best:
// A semaphore is much like a lock, except that a finite number of threads can hold it simultaneously.
// Semaphores can be thought of as being much like piles of tokens; multiple threads can take these tokens,
// but when there are none left, a thread must wait until another thread returns one.
Semaphore :: struct #align 16 {
handle: unix.sem_t,
}
semaphore_init :: proc(s: ^Semaphore, initial_count := 0) {
assert(unix.sem_init(&s.handle, 0, u32(initial_count)) == 0)
}
semaphore_destroy :: proc(s: ^Semaphore) {
assert(unix.sem_destroy(&s.handle) == 0)
s.handle = {}
}
semaphore_post :: proc(s: ^Semaphore, count := 1) {
// NOTE: SPEED: If there's one syscall to do this, we should use it instead of the loop.
for in 0..<count {
assert(unix.sem_post(&s.handle) == 0)
}
}
semaphore_wait_for :: proc(s: ^Semaphore) {
assert(unix.sem_wait(&s.handle) == 0)
}

View File

@@ -1,248 +0,0 @@
// +build linux, darwin, freebsd, openbsd
package sync
import "core:sys/unix"
import "core:time"
// A recursive lock that can only be held by one thread at once
Mutex :: struct {
handle: unix.pthread_mutex_t,
}
mutex_init :: proc(m: ^Mutex) {
// NOTE(tetra, 2019-11-01): POSIX OOM if we cannot init the attrs or the mutex.
attrs: unix.pthread_mutexattr_t
assert(unix.pthread_mutexattr_init(&attrs) == 0)
defer unix.pthread_mutexattr_destroy(&attrs) // ignores destruction error
unix.pthread_mutexattr_settype(&attrs, unix.PTHREAD_MUTEX_RECURSIVE)
assert(unix.pthread_mutex_init(&m.handle, &attrs) == 0)
}
mutex_destroy :: proc(m: ^Mutex) {
assert(unix.pthread_mutex_destroy(&m.handle) == 0)
m.handle = {}
}
mutex_lock :: proc(m: ^Mutex) {
assert(unix.pthread_mutex_lock(&m.handle) == 0)
}
// Returns false if someone else holds the lock.
mutex_try_lock :: proc(m: ^Mutex) -> bool {
return unix.pthread_mutex_trylock(&m.handle) == 0
}
mutex_unlock :: proc(m: ^Mutex) {
assert(unix.pthread_mutex_unlock(&m.handle) == 0)
}
Blocking_Mutex :: struct {
handle: unix.pthread_mutex_t,
}
blocking_mutex_init :: proc(m: ^Blocking_Mutex) {
// NOTE(tetra, 2019-11-01): POSIX OOM if we cannot init the attrs or the mutex.
attrs: unix.pthread_mutexattr_t
assert(unix.pthread_mutexattr_init(&attrs) == 0)
defer unix.pthread_mutexattr_destroy(&attrs) // ignores destruction error
assert(unix.pthread_mutex_init(&m.handle, &attrs) == 0)
}
blocking_mutex_destroy :: proc(m: ^Blocking_Mutex) {
assert(unix.pthread_mutex_destroy(&m.handle) == 0)
m.handle = {}
}
blocking_mutex_lock :: proc(m: ^Blocking_Mutex) {
assert(unix.pthread_mutex_lock(&m.handle) == 0)
}
// Returns false if someone else holds the lock.
blocking_mutex_try_lock :: proc(m: ^Blocking_Mutex) -> bool {
return unix.pthread_mutex_trylock(&m.handle) == 0
}
blocking_mutex_unlock :: proc(m: ^Blocking_Mutex) {
assert(unix.pthread_mutex_unlock(&m.handle) == 0)
}
// Blocks until signalled, and then lets past exactly
// one thread.
Condition :: struct {
handle: unix.pthread_cond_t,
mutex: Condition_Mutex_Ptr,
// NOTE(tetra, 2019-11-11): Used to mimic the more sane behavior of Windows' AutoResetEvent.
// This means that you may signal the condition before anyone is waiting to cause the
// next thread that tries to wait to just pass by uninterrupted, without sleeping.
// Without this, signalling a condition will only wake up a thread which is already waiting,
// but not one that is about to wait, which can cause your program to become out of sync in
// ways that are hard to debug or fix.
flag: bool, // atomically mutated
}
condition_init :: proc(c: ^Condition, mutex: Condition_Mutex_Ptr) -> bool {
// NOTE(tetra, 2019-11-01): POSIX OOM if we cannot init the attrs or the condition.
attrs: unix.pthread_condattr_t
if unix.pthread_condattr_init(&attrs) != 0 {
return false
}
defer unix.pthread_condattr_destroy(&attrs) // ignores destruction error
c.flag = false
c.mutex = mutex
return unix.pthread_cond_init(&c.handle, &attrs) == 0
}
condition_destroy :: proc(c: ^Condition) {
assert(unix.pthread_cond_destroy(&c.handle) == 0)
c.handle = {}
}
// Awaken exactly one thread who is waiting on the condition
condition_signal :: proc(c: ^Condition) -> bool {
switch m in c.mutex {
case ^Mutex:
mutex_lock(m)
defer mutex_unlock(m)
atomic_swap(&c.flag, true, .Sequentially_Consistent)
return unix.pthread_cond_signal(&c.handle) == 0
case ^Blocking_Mutex:
blocking_mutex_lock(m)
defer blocking_mutex_unlock(m)
atomic_swap(&c.flag, true, .Sequentially_Consistent)
return unix.pthread_cond_signal(&c.handle) == 0
}
return false
}
// Awaken all threads who are waiting on the condition
condition_broadcast :: proc(c: ^Condition) -> bool {
return unix.pthread_cond_broadcast(&c.handle) == 0
}
// Wait for the condition to be signalled.
// Does not block if the condition has been signalled and no one
// has waited on it yet.
condition_wait_for :: proc(c: ^Condition) -> bool {
switch m in c.mutex {
case ^Mutex:
mutex_lock(m)
defer mutex_unlock(m)
// NOTE(tetra): If a thread comes by and steals the flag immediately after the signal occurs,
// the thread that gets signalled and wakes up, discovers that the flag was taken and goes
// back to sleep.
// Though this overall behavior is the most sane, there may be a better way to do this that means that
// the first thread to wait, gets the flag first.
if atomic_swap(&c.flag, false, .Sequentially_Consistent) {
return true
}
for {
if unix.pthread_cond_wait(&c.handle, &m.handle) != 0 {
return false
}
if atomic_swap(&c.flag, false, .Sequentially_Consistent) {
return true
}
}
return false
case ^Blocking_Mutex:
blocking_mutex_lock(m)
defer blocking_mutex_unlock(m)
// NOTE(tetra): If a thread comes by and steals the flag immediately after the signal occurs,
// the thread that gets signalled and wakes up, discovers that the flag was taken and goes
// back to sleep.
// Though this overall behavior is the most sane, there may be a better way to do this that means that
// the first thread to wait, gets the flag first.
if atomic_swap(&c.flag, false, .Sequentially_Consistent) {
return true
}
for {
if unix.pthread_cond_wait(&c.handle, &m.handle) != 0 {
return false
}
if atomic_swap(&c.flag, false, .Sequentially_Consistent) {
return true
}
}
return false
}
return false
}
// Wait for the condition to be signalled.
// Does not block if the condition has been signalled and no one
// has waited on it yet.
condition_wait_for_timeout :: proc(c: ^Condition, duration: time.Duration) -> bool {
switch m in c.mutex {
case ^Mutex:
mutex_lock(m)
defer mutex_unlock(m)
// NOTE(tetra): If a thread comes by and steals the flag immediately after the signal occurs,
// the thread that gets signalled and wakes up, discovers that the flag was taken and goes
// back to sleep.
// Though this overall behavior is the most sane, there may be a better way to do this that means that
// the first thread to wait, gets the flag first.
if atomic_swap(&c.flag, false, .Sequentially_Consistent) {
return true
}
ns := time.duration_nanoseconds(duration)
timeout: time.TimeSpec
timeout.tv_sec = ns / 1e9
timeout.tv_nsec = ns % 1e9
for {
if unix.pthread_cond_timedwait(&c.handle, &m.handle, &timeout) != 0 {
return false
}
if atomic_swap(&c.flag, false, .Sequentially_Consistent) {
return true
}
}
return false
case ^Blocking_Mutex:
blocking_mutex_lock(m)
defer blocking_mutex_unlock(m)
// NOTE(tetra): If a thread comes by and steals the flag immediately after the signal occurs,
// the thread that gets signalled and wakes up, discovers that the flag was taken and goes
// back to sleep.
// Though this overall behavior is the most sane, there may be a better way to do this that means that
// the first thread to wait, gets the flag first.
if atomic_swap(&c.flag, false, .Sequentially_Consistent) {
return true
}
ns := time.duration_nanoseconds(duration)
timeout: time.TimeSpec
timeout.tv_sec = ns / 1e9
timeout.tv_nsec = ns % 1e9
for {
if unix.pthread_cond_timedwait(&c.handle, &m.handle, &timeout) != 0 {
return false
}
if atomic_swap(&c.flag, false, .Sequentially_Consistent) {
return true
}
}
return false
}
return false
}
thread_yield :: proc() {
unix.sched_yield()
}

View File

@@ -1,180 +0,0 @@
// +build windows
package sync
import win32 "core:sys/windows"
import "core:time"
current_thread_id :: proc "contextless" () -> int {
return int(win32.GetCurrentThreadId())
}
// When waited upon, blocks until the internal count is greater than zero, then subtracts one.
// Posting to the semaphore increases the count by one, or the provided amount.
Semaphore :: struct {
_handle: win32.HANDLE,
}
semaphore_init :: proc(s: ^Semaphore, initial_count := 0) {
s._handle = win32.CreateSemaphoreW(nil, i32(initial_count), 1<<31-1, nil)
}
semaphore_destroy :: proc(s: ^Semaphore) {
win32.CloseHandle(s._handle)
}
semaphore_post :: proc(s: ^Semaphore, count := 1) {
win32.ReleaseSemaphore(s._handle, i32(count), nil)
}
semaphore_wait_for :: proc(s: ^Semaphore) {
// NOTE(tetra, 2019-10-30): wait_for_single_object decrements the count before it returns.
result := win32.WaitForSingleObject(s._handle, win32.INFINITE)
assert(result != win32.WAIT_FAILED)
}
Mutex :: struct {
_critical_section: win32.CRITICAL_SECTION,
}
mutex_init :: proc(m: ^Mutex, spin_count := 0) {
win32.InitializeCriticalSectionAndSpinCount(&m._critical_section, u32(spin_count))
}
mutex_destroy :: proc(m: ^Mutex) {
win32.DeleteCriticalSection(&m._critical_section)
}
mutex_lock :: proc(m: ^Mutex) {
win32.EnterCriticalSection(&m._critical_section)
}
mutex_try_lock :: proc(m: ^Mutex) -> bool {
return bool(win32.TryEnterCriticalSection(&m._critical_section))
}
mutex_unlock :: proc(m: ^Mutex) {
win32.LeaveCriticalSection(&m._critical_section)
}
Blocking_Mutex :: struct {
_handle: win32.SRWLOCK,
}
blocking_mutex_init :: proc(m: ^Blocking_Mutex) {
win32.InitializeSRWLock(&m._handle)
}
blocking_mutex_destroy :: proc(m: ^Blocking_Mutex) {
//
}
blocking_mutex_lock :: proc(m: ^Blocking_Mutex) {
win32.AcquireSRWLockExclusive(&m._handle)
}
blocking_mutex_try_lock :: proc(m: ^Blocking_Mutex) -> bool {
return bool(win32.TryAcquireSRWLockExclusive(&m._handle))
}
blocking_mutex_unlock :: proc(m: ^Blocking_Mutex) {
win32.ReleaseSRWLockExclusive(&m._handle)
}
// Blocks until signalled.
// When signalled, awakens exactly one waiting thread.
Condition :: struct {
_handle: win32.CONDITION_VARIABLE,
mutex: Condition_Mutex_Ptr,
}
condition_init :: proc(c: ^Condition, mutex: Condition_Mutex_Ptr) -> bool {
assert(mutex != nil)
win32.InitializeConditionVariable(&c._handle)
c.mutex = mutex
return true
}
condition_destroy :: proc(c: ^Condition) {
//
}
condition_signal :: proc(c: ^Condition) -> bool {
if c._handle.ptr == nil {
return false
}
win32.WakeConditionVariable(&c._handle)
return true
}
condition_broadcast :: proc(c: ^Condition) -> bool {
if c._handle.ptr == nil {
return false
}
win32.WakeAllConditionVariable(&c._handle)
return true
}
condition_wait_for :: proc(c: ^Condition) -> bool {
switch m in &c.mutex {
case ^Mutex:
return cast(bool)win32.SleepConditionVariableCS(&c._handle, &m._critical_section, win32.INFINITE)
case ^Blocking_Mutex:
return cast(bool)win32.SleepConditionVariableSRW(&c._handle, &m._handle, win32.INFINITE, 0)
}
return false
}
condition_wait_for_timeout :: proc(c: ^Condition, duration: time.Duration) -> bool {
ms := win32.DWORD((max(time.duration_nanoseconds(duration), 0) + 999999)/1000000)
switch m in &c.mutex {
case ^Mutex:
return cast(bool)win32.SleepConditionVariableCS(&c._handle, &m._critical_section, ms)
case ^Blocking_Mutex:
return cast(bool)win32.SleepConditionVariableSRW(&c._handle, &m._handle, ms, 0)
}
return false
}
RW_Lock :: struct {
_handle: win32.SRWLOCK,
}
rw_lock_init :: proc(l: ^RW_Lock) {
l._handle = win32.SRWLOCK_INIT
}
rw_lock_destroy :: proc(l: ^RW_Lock) {
//
}
rw_lock_read :: proc(l: ^RW_Lock) {
win32.AcquireSRWLockShared(&l._handle)
}
rw_lock_try_read :: proc(l: ^RW_Lock) -> bool {
return bool(win32.TryAcquireSRWLockShared(&l._handle))
}
rw_lock_write :: proc(l: ^RW_Lock) {
win32.AcquireSRWLockExclusive(&l._handle)
}
rw_lock_try_write :: proc(l: ^RW_Lock) -> bool {
return bool(win32.TryAcquireSRWLockExclusive(&l._handle))
}
rw_lock_read_unlock :: proc(l: ^RW_Lock) {
win32.ReleaseSRWLockShared(&l._handle)
}
rw_lock_write_unlock :: proc(l: ^RW_Lock) {
win32.ReleaseSRWLockExclusive(&l._handle)
}
thread_yield :: proc() {
win32.SwitchToThread()
}

View File

@@ -1,58 +0,0 @@
package sync
import "core:intrinsics"
Wait_Group :: struct {
counter: int,
mutex: Blocking_Mutex,
cond: Condition,
}
wait_group_init :: proc(wg: ^Wait_Group) {
wg.counter = 0
blocking_mutex_init(&wg.mutex)
condition_init(&wg.cond, &wg.mutex)
}
wait_group_destroy :: proc(wg: ^Wait_Group) {
condition_destroy(&wg.cond)
blocking_mutex_destroy(&wg.mutex)
}
wait_group_add :: proc(wg: ^Wait_Group, delta: int) {
if delta == 0 {
return
}
blocking_mutex_lock(&wg.mutex)
defer blocking_mutex_unlock(&wg.mutex)
intrinsics.atomic_add(&wg.counter, delta)
if wg.counter < 0 {
panic("sync.Wait_Group negative counter")
}
if wg.counter == 0 {
condition_broadcast(&wg.cond)
if wg.counter != 0 {
panic("sync.Wait_Group misuse: sync.wait_group_add called concurrently with sync.wait_group_wait")
}
}
}
wait_group_done :: proc(wg: ^Wait_Group) {
wait_group_add(wg, -1)
}
wait_group_wait :: proc(wg: ^Wait_Group) {
blocking_mutex_lock(&wg.mutex)
defer blocking_mutex_unlock(&wg.mutex)
if wg.counter != 0 {
condition_wait_for(&wg.cond)
if wg.counter != 0 {
panic("sync.Wait_Group misuse: sync.wait_group_add called concurrently with sync.wait_group_wait")
}
}
}

View File

@@ -26,7 +26,7 @@ INVALID_TASK_ID :: Task_Id(-1)
Pool :: struct {
allocator: mem.Allocator,
mutex: sync.Mutex,
sem_available: sync.Semaphore,
sem_available: sync.Sema,
processing_task_count: int, // atomic
is_running: bool,
@@ -40,14 +40,14 @@ pool_init :: proc(pool: ^Pool, thread_count: int, allocator := context.allocator
pool := (^Pool)(t.data)
for pool.is_running {
sync.semaphore_wait_for(&pool.sem_available)
sync.sema_wait(&pool.sem_available)
if task, ok := pool_try_and_pop_task(pool); ok {
pool_do_work(pool, &task)
}
}
sync.semaphore_post(&pool.sem_available, 1)
sync.sema_post(&pool.sem_available, 1)
}
@@ -56,8 +56,6 @@ pool_init :: proc(pool: ^Pool, thread_count: int, allocator := context.allocator
pool.tasks = make([dynamic]Task)
pool.threads = make([]^Thread, thread_count)
sync.mutex_init(&pool.mutex)
sync.semaphore_init(&pool.sem_available)
pool.is_running = true
for _, i in pool.threads {
@@ -76,9 +74,6 @@ pool_destroy :: proc(pool: ^Pool) {
}
delete(pool.threads, pool.allocator)
sync.mutex_destroy(&pool.mutex)
sync.semaphore_destroy(&pool.sem_available)
}
pool_start :: proc(pool: ^Pool) {
@@ -90,7 +85,7 @@ pool_start :: proc(pool: ^Pool) {
pool_join :: proc(pool: ^Pool) {
pool.is_running = false
sync.semaphore_post(&pool.sem_available, len(pool.threads))
sync.sema_post(&pool.sem_available, len(pool.threads))
yield()
@@ -109,7 +104,7 @@ pool_add_task :: proc(pool: ^Pool, procedure: Task_Proc, data: rawptr, user_inde
task.user_index = user_index
append(&pool.tasks, task)
sync.semaphore_post(&pool.sem_available, 1)
sync.sema_post(&pool.sem_available, 1)
}
pool_try_and_pop_task :: proc(pool: ^Pool) -> (task: Task, got_task: bool = false) {
@@ -140,7 +135,7 @@ pool_wait_and_process :: proc(pool: ^Pool) {
// Safety kick
if len(pool.tasks) != 0 && intrinsics.atomic_load(&pool.processing_task_count) == 0 {
sync.mutex_lock(&pool.mutex)
sync.semaphore_post(&pool.sem_available, len(pool.tasks))
sync.sema_post(&pool.sem_available, len(pool.tasks))
sync.mutex_unlock(&pool.mutex)
}

View File

@@ -4,7 +4,7 @@ package thread
import "core:runtime"
import "core:intrinsics"
import sync "core:sync/sync2"
import "core:sync"
import "core:sys/unix"
Thread_State :: enum u8 {

View File

@@ -3,7 +3,7 @@
package thread
import "core:runtime"
import sync "core:sync/sync2"
import "core:sync"
import win32 "core:sys/windows"
Thread_Os_Specific :: struct {

View File

@@ -96,7 +96,6 @@ import sort "core:sort"
import strconv "core:strconv"
import strings "core:strings"
import sync "core:sync"
import sync2 "core:sync/sync2"
import testing "core:testing"
import scanner "core:text/scanner"
import thread "core:thread"
@@ -187,7 +186,6 @@ _ :: sort
_ :: strconv
_ :: strings
_ :: sync
_ :: sync2
_ :: testing
_ :: scanner
_ :: thread