diff --git a/core/sync/chan/chan.odin b/core/sync/chan/chan.odin index fbd11be99..cbcfdf3bf 100644 --- a/core/sync/chan/chan.odin +++ b/core/sync/chan/chan.odin @@ -7,12 +7,6 @@ import "core:mem" import "core:sync" import "core:math/rand" -_ :: runtime -_ :: mem -_ :: sync - - - Direction :: enum { Send = -1, Both = 0, @@ -20,29 +14,28 @@ Direction :: enum { } Chan :: struct($T: typeid, $D: Direction = Direction.Both) { - #subtype impl: ^Raw_Chan, + #subtype impl: ^Raw_Chan `fmt:"-"`, } Raw_Chan :: struct { - allocator: runtime.Allocator, + // Shared + allocator: runtime.Allocator, allocation_size: int, + msg_size: u16, + closed: b16, // atomic + mutex: sync.Mutex, + r_cond: sync.Cond, + w_cond: sync.Cond, + r_waiting: int, // atomic + w_waiting: int, // atomic // Buffered queue: ^Raw_Queue, // Unbuffered - r_mutex: sync.Mutex, - w_mutex: sync.Mutex, + r_mutex: sync.Mutex, + w_mutex: sync.Mutex, unbuffered_data: rawptr, - msg_size: int, - - // Shared - mutex: sync.Mutex, - r_cond: sync.Cond, - w_cond: sync.Cond, - closed: bool, // atomic - r_waiting: int, // atomic - w_waiting: int, // atomic } @@ -52,13 +45,15 @@ create :: proc{ } @(require_results) -create_unbuffered :: proc($C: typeid/Chan($T), allocator: runtime.Allocator) -> (c: C, err: runtime.Allocator_Error) { +create_unbuffered :: proc($C: typeid/Chan($T), allocator: runtime.Allocator) -> (c: C, err: runtime.Allocator_Error) + where size_of(T) <= int(max(u16)) { c.impl, err = create_raw_unbuffered(size_of(T), align_of(T), allocator) return } @(require_results) -create_buffered :: proc($C: typeid/Chan($T), #any_int cap: int, allocator: runtime.Allocator) -> (c: C, err: runtime.Allocator_Error) { +create_buffered :: proc($C: typeid/Chan($T), #any_int cap: int, allocator: runtime.Allocator) -> (c: C, err: runtime.Allocator_Error) + where size_of(T) <= int(max(u16)) { c.impl, err = create_raw_buffered(size_of(T), align_of(T), cap, allocator) return } @@ -70,6 +65,7 @@ create_raw :: proc{ @(require_results) create_raw_unbuffered :: proc(#any_int msg_size, msg_alignment: int, allocator: runtime.Allocator) -> (c: ^Raw_Chan, err: runtime.Allocator_Error) { + assert(msg_size <= int(max(u16))) align := max(align_of(Raw_Chan), msg_alignment) size := mem.align_forward_int(size_of(Raw_Chan), align) @@ -81,12 +77,13 @@ create_raw_unbuffered :: proc(#any_int msg_size, msg_alignment: int, allocator: c = (^Raw_Chan)(ptr) c.allocation_size = size c.unbuffered_data = ([^]byte)(ptr)[offset:] - c.msg_size = msg_size + c.msg_size = u16(msg_size) return } @(require_results) create_raw_buffered :: proc(#any_int msg_size, msg_alignment: int, #any_int cap: int, allocator: runtime.Allocator) -> (c: ^Raw_Chan, err: runtime.Allocator_Error) { + assert(msg_size <= int(max(u16))) if cap <= 0 { return create_raw_unbuffered(msg_size, msg_alignment, allocator) } @@ -97,7 +94,7 @@ create_raw_buffered :: proc(#any_int msg_size, msg_alignment: int, #any_int cap: q_offset := size size = mem.align_forward_int(q_offset + size_of(Raw_Queue), msg_alignment) offset := size - size += msg_size * (cap+1) + size += msg_size * cap size = mem.align_forward_int(size, align) ptr := mem.alloc(size, align, allocator) or_return @@ -107,20 +104,18 @@ create_raw_buffered :: proc(#any_int msg_size, msg_alignment: int, #any_int cap: bptr := ([^]byte)(ptr) c.queue = (^Raw_Queue)(bptr[q_offset:]) - c.msg_size = msg_size + c.msg_size = u16(msg_size) - items := ([^]byte)(bptr[offset:]) - c.unbuffered_data = items - raw_queue_init(c.queue, items[msg_size:], cap, msg_size) + raw_queue_init(c.queue, ([^]byte)(bptr[offset:]), cap, msg_size) return } -destroy :: proc(c: ^Raw_Chan) -> runtime.Allocator_Error { +destroy :: proc(c: ^Raw_Chan) -> (err: runtime.Allocator_Error) { if c != nil { allocator := c.allocator - return mem.free_with_size(c, c.allocation_size, allocator) + err = mem.free_with_size(c, c.allocation_size, allocator) } - return nil + return } @(require_results) @@ -139,6 +134,13 @@ send :: proc "contextless" (c: $C/Chan($T, $D), data: T) -> (ok: bool) where C.D return } +@(require_results) +try_send :: proc "contextless" (c: $C/Chan($T, $D), data: T) -> (ok: bool) where C.D <= .Both { + data := data + ok = try_send_raw(c, &data) + return +} + @(require_results) recv :: proc "contextless" (c: $C/Chan($T)) -> (data: T, ok: bool) where C.D >= .Both { ok = recv_raw(c, &data) @@ -146,6 +148,13 @@ recv :: proc "contextless" (c: $C/Chan($T)) -> (data: T, ok: bool) where C.D >= } +@(require_results) +try_recv :: proc "contextless" (c: $C/Chan($T)) -> (data: T, ok: bool) where C.D >= .Both { + ok = try_recv_raw(c, &data) + return +} + + @(require_results) send_raw :: proc "contextless" (c: ^Raw_Chan, msg_in: rawptr) -> (ok: bool) { if c == nil { @@ -171,7 +180,7 @@ send_raw :: proc "contextless" (c: ^Raw_Chan, msg_in: rawptr) -> (ok: bool) { return false } - mem.copy(c.unbuffered_data, msg_in, c.msg_size) + mem.copy(c.unbuffered_data, msg_in, int(c.msg_size)) sync.atomic_add(&c.w_waiting, 1) if sync.atomic_load(&c.r_waiting) > 0 { sync.signal(&c.r_cond) @@ -201,7 +210,7 @@ recv_raw :: proc "contextless" (c: ^Raw_Chan, msg_out: rawptr) -> (ok: bool) { msg := raw_queue_pop(c.queue) if msg != nil { - mem.copy(msg_out, msg, c.msg_size) + mem.copy(msg_out, msg, int(c.msg_size)) } if sync.atomic_load(&c.w_waiting) > 0 { @@ -223,7 +232,7 @@ recv_raw :: proc "contextless" (c: ^Raw_Chan, msg_out: rawptr) -> (ok: bool) { return } - mem.copy(msg_out, c.unbuffered_data, c.msg_size) + mem.copy(msg_out, c.unbuffered_data, int(c.msg_size)) sync.atomic_sub(&c.w_waiting, 1) sync.signal(&c.w_cond) @@ -233,11 +242,90 @@ recv_raw :: proc "contextless" (c: ^Raw_Chan, msg_out: rawptr) -> (ok: bool) { } +@(require_results) +try_send_raw :: proc "contextless" (c: ^Raw_Chan, msg_in: rawptr) -> (ok: bool) { + if c == nil { + return false + } + if c.queue != nil { // buffered + sync.guard(&c.mutex) + if c.queue.len == c.queue.cap { + return false + } + + ok = raw_queue_push(c.queue, msg_in) + if sync.atomic_load(&c.r_waiting) > 0 { + sync.signal(&c.r_cond) + } + } else if c.unbuffered_data != nil { // unbuffered + sync.guard(&c.w_mutex) + sync.guard(&c.mutex) + + if sync.atomic_load(&c.closed) { + return false + } + + mem.copy(c.unbuffered_data, msg_in, int(c.msg_size)) + sync.atomic_add(&c.w_waiting, 1) + if sync.atomic_load(&c.r_waiting) > 0 { + sync.signal(&c.r_cond) + } + sync.wait(&c.w_cond, &c.mutex) + ok = true + } + return +} + +@(require_results) +try_recv_raw :: proc "contextless" (c: ^Raw_Chan, msg_out: rawptr) -> bool { + if c == nil { + return false + } + if c.queue != nil { // buffered + sync.guard(&c.mutex) + if c.queue.len == 0 { + return false + } + + msg := raw_queue_pop(c.queue) + if msg != nil { + mem.copy(msg_out, msg, int(c.msg_size)) + } + + if sync.atomic_load(&c.w_waiting) > 0 { + sync.signal(&c.w_cond) + } + return true + } else if c.unbuffered_data != nil { // unbuffered + sync.guard(&c.r_mutex) + sync.guard(&c.mutex) + + if sync.atomic_load(&c.closed) || + sync.atomic_load(&c.w_waiting) == 0 { + return false + } + + mem.copy(msg_out, c.unbuffered_data, int(c.msg_size)) + sync.atomic_sub(&c.w_waiting, 1) + + sync.signal(&c.w_cond) + return true + } + return false +} + + + @(require_results) is_buffered :: proc "contextless" (c: ^Raw_Chan) -> bool { return c != nil && c.queue != nil } +@(require_results) +is_unbuffered :: proc "contextless" (c: ^Raw_Chan) -> bool { + return c != nil && c.unbuffered_data != nil +} + @(require_results) len :: proc "contextless" (c: ^Raw_Chan) -> int { if c != nil && c.queue != nil { @@ -276,7 +364,7 @@ is_closed :: proc "contextless" (c: ^Raw_Chan) -> bool { return true } sync.guard(&c.mutex) - return sync.atomic_load(&c.closed) + return bool(sync.atomic_load(&c.closed)) }