diff --git a/core/sync/channel.odin b/core/sync/channel.odin index be8a96f47..059b5f747 100644 --- a/core/sync/channel.odin +++ b/core/sync/channel.odin @@ -2,290 +2,385 @@ package sync import "core:mem" import "core:time" +import "core:fmt" +import "core:intrinsics" import "core:math/rand" _, _ :: time, rand; -chan :: struct(T: typeid) { - qlen: uint, - qcap: uint, - closed: b32, - sendx: uint, - recvx: uint, - mutex: Blocking_Mutex, - allocator: mem.Allocator, - - buf: [0]T, -} - -makechan :: proc($T: typeid, cap: int, allocator := context.allocator) -> ^chan(T) { - chan_size :: size_of(chan(T)); - chan_align :: align_of(chan(T)); - - mem := uintptr(cap) * size_of(T); - c := cast(^chan(T))mem.alloc(chan_size+mem, chan_align, allocator); - c.allocator = allocator; - c.qlen = 0; - c.qcap = uint(cap); - blocking_mutex_init(&c.mutex); - return c; -} -chanbuf :: proc(c: ^$C/chan($T)) -> []T #no_bounds_check { - return c.buf[0:c.qcap]; -} - - - - -/* Channel :: struct(T: typeid) { - using internal: ^_Channel_Internal(T), + using _internal: ^Raw_Channel, } -_Channel_Internal :: struct(T: typeid) { - allocator: mem.Allocator, - - queue: [dynamic]T, - - unbuffered_msg: T, // Will be used as the backing to the queue if no `cap` is given - - mutex: Mutex, - r_cond: Condition, - w_cond: Condition, - - closed: bool, - r_waiting: int, - w_waiting: int, +channel_init :: proc(ch: ^$C/Channel($T), cap := 0, allocator := context.allocator) { + context.allocator = allocator; + ch._internal = raw_channel_create(size_of(T), align_of(T), cap); + return; } -channel_init :: proc(c: ^$C/Channel($T), cap: int = 0, allocator := context.allocator) { - c^ = cast(C)channel_make(T, cap, allocator); -} - -channel_make :: proc($T: typeid, cap: int = 0, allocator := context.allocator) -> (ch: Channel(T)) { - ch.internal = new(_Channel_Internal(T), allocator); - if ch.internal == nil { - return {}; - } - ch.allocator = allocator; - - mutex_init(&ch.mutex); - condition_init(&ch.r_cond, &ch.mutex); - condition_init(&ch.w_cond, &ch.mutex); - ch.closed = false; - ch.r_waiting = 0; - ch.w_waiting = 0; - ch.unbuffered_msg = T{}; - - if cap > 0 { - ch.queue = make([dynamic]T, 0, cap, ch.allocator); - } else { - d := mem.Raw_Dynamic_Array{ - data = &ch.unbuffered_msg, - len = 0, - cap = 1, - allocator = mem.nil_allocator(), - }; - ch.queue = transmute([dynamic]T)d; - } - return ch; +channel_make :: proc($T: typeid, cap := 0, allocator := context.allocator) -> (ch: Channel(T)) { + context.allocator = allocator; + ch._internal = raw_channel_create(size_of(T), align_of(T), cap); + return; } channel_destroy :: proc(ch: $C/Channel($T)) { - channel_close(ch); - - if channel_is_buffered(ch) { - delete(ch.queue); - } - - mutex_destroy(&ch.mutex); - condition_destroy(&ch.r_cond); - condition_destroy(&ch.w_cond); - free(ch.internal, ch.allocator); + raw_channel_destroy(ch._internal); } -channel_close :: proc(ch: $C/Channel($T)) -> (ok: bool) { - mutex_lock(&ch.mutex); - if !ch.closed { - ch.closed = true; - condition_broadcast(&ch.r_cond); - condition_broadcast(&ch.w_cond); - ok = true; +channel_len :: proc(ch: $C/Channel($T)) -> int { + return ch._internal.len; +} +channel_cap :: proc(ch: $C/Channel($T)) -> int { + return ch._internal.cap; +} + + +channel_send :: proc(ch: $C/Channel($T), msg: T, loc := #caller_location) { + msg := msg; + _ = raw_channel_send_impl(ch._internal, &msg, false, loc); +} +channel_try_send :: proc(ch: $C/Channel($T), msg: T, loc := #caller_location) -> bool { + msg := msg; + return raw_channel_send_impl(ch._internal, &msg, true, loc); +} + +channel_recv :: proc(ch: $C/Channel($T), loc := #caller_location) -> (msg: T) { + c := ch._internal; + mutex_lock(&c.mutex); + raw_channel_recv_impl(c, &msg, loc); + mutex_unlock(&c.mutex); + return; +} +channel_try_recv :: proc(ch: $C/Channel($T), loc := #caller_location) -> (msg: T, ok: bool) { + c := ch._internal; + if mutex_try_lock(&c.mutex) { + if c.len > 0 { + raw_channel_recv_impl(c, &msg, loc); + ok = true; + } + mutex_unlock(&c.mutex); } - - mutex_unlock(&ch.mutex); return; } -channel_write :: proc(ch: $C/Channel($T), msg: T) -> (ok: bool) { - mutex_lock(&ch.mutex); - defer mutex_unlock(&ch.mutex); +channel_is_nil :: proc(ch: $C/Channel($T)) -> bool { + return ch._internal == nil; +} - if ch.closed { + +channel_eq :: proc(a, b: $C/Channel($T)) -> bool { + return a._internal == b._internal; +} +channel_ne :: proc(a, b: $C/Channel($T)) -> bool { + return a._internal != b._internal; +} + + +channel_can_send :: proc(ch: $C/Channel($T)) -> (ok: bool) { + return raw_channel_can_send(ch._internal); +} +channel_can_recv :: proc(ch: $C/Channel($T)) -> (ok: bool) { + return raw_channel_can_recv(ch._internal); +} + + + +channel_peek :: proc(ch: $C/Channel($T)) -> int { + c := ch._internal; + if c == nil { + return -1; + } + if intrinsics.atomic_load(&c.closed) { + return -1; + } + return intrinsics.atomic_load(&c.len); +} + + +channel_close :: proc(ch: $C/Channel($T), loc := #caller_location) { + c := ch._internal; + if c == nil { + panic(message="cannot close nil channel", loc=loc); + } + intrinsics.atomic_store(&c.closed, true); +} + + +channel_iterator :: proc(ch: $C/Channel($T)) -> (val: T, open: bool) { + c := ch._internal; + switch { + case c == nil: return; - } - - - for len(ch.queue) == cap(ch.queue) { - ch.w_waiting += 1; - condition_wait_for(&ch.w_cond); - ch.w_waiting -= 1; - } - - if len(ch.queue) < cap(ch.queue) { - append(&ch.queue, msg); - ok = true; - } - - if ch.r_waiting > 0 { - condition_signal(&ch.r_cond); - } - - return; -} - -channel_read :: proc(ch: $C/Channel($T)) -> (msg: T, ok: bool) #optional_ok { - mutex_lock(&ch.mutex); - defer mutex_unlock(&ch.mutex); - - for len(ch.queue) == 0 { - if ch.closed { - return; + case intrinsics.atomic_load(&c.closed): + if channel_can_recv(ch) { + val = channel_recv(ch); + open = true; } - - ch.r_waiting += 1; - condition_wait_for(&ch.r_cond); - ch.r_waiting -= 1; - } - - msg, ok = pop_front(&ch.queue); - - if ch.w_waiting > 0 { - condition_signal(&ch.w_cond); - } - - return; -} - -channel_size :: proc(ch: $C/Channel($T)) -> (size: int) { - if channel_is_buffered(ch) { - mutex_lock(&ch.mutex); - size = len(ch.queue); - mutex_unlock(&ch.mutex); + case: + val = channel_recv(ch); + open = true; } return; } -channel_is_closed :: proc(ch: $C/Channel($T)) -> bool { - mutex_lock(&ch.mutex); - closed := ch.closed; - mutex_unlock(&ch.mutex); - return closed; -} -channel_is_buffered :: proc(ch: $C/Channel($T)) -> bool { - q := transmute(mem.Raw_Dynamic_Array)ch.queue; - return q.cap != 0 && (q.data != &ch.unbuffered_msg); -} - -channel_can_write :: proc(ch: $C/Channel($T)) -> bool { - mutex_lock(&ch.mutex); - defer mutex_unlock(&ch.mutex); - return len(ch.queue) < cap(ch.queue); -} - -channel_can_read :: proc(ch: $C/Channel($T)) -> bool { - mutex_lock(&ch.mutex); - defer mutex_unlock(&ch.mutex); - return len(ch.queue) > 0; -} - -channel_can_read_write :: proc(ch: $C/Channel($T)) -> bool { - mutex_lock(&ch.mutex); - defer mutex_unlock(&ch.mutex); - return 0 < len(ch.queue) && len(ch.queue) < cap(ch.queue); -} - -channel_iterator :: proc(ch: $C/Channel($T)) -> (elem: T, ok: bool) { - mutex_lock(&ch.mutex); - defer mutex_unlock(&ch.mutex); - - if len(ch.queue) > 0 { - return channel_read(ch); +channel_select_recv :: proc(channels: ..^Raw_Channel) -> (index: int) { + backing: [64]int; + candidates := backing[:]; + if len(channels) > len(backing) { + candidates = make([]int, len(channels), context.temp_allocator); } - return T{}, false; -} - - - -channel_select :: proc(readers, writers: []$C/Channel($T), write_msgs: []T) -> (read_msg: T, index: int) { - Candidate :: struct { - ch: C, - msg: T, - index: int, - read: bool, - }; - - count := 0; - candidates := make([]Candidate, len(readers) + len(writers)); - defer delete(candidates); - - for c, i in readers { - if channel_can_read(c) { - candidates[count] = { - ch = c, - index = i, - read = true, - }; - count += 1; - } - } - - for c, i in writers { - if channel_can_write(c) { - candidates[count] = { - ch = c, - index = count, - read = false, - msg = write_msgs[i], - }; + count := u32(0); + for c, i in channels { + if raw_channel_can_recv(c) { + candidates[i] = i; count += 1; } } if count == 0 { - return T{}, -1; + index = -1; + return; } - // Randomize the input - r := rand.create(time.read_cycle_counter()); - s := candidates[rand.int_max(count, &r)]; - if s.read { - ok: bool; - if read_msg, ok = channel_read(s.ch); !ok { - index = -1; - return; - } - } else { - if !channel_write(s.ch, s.msg) { - index = -1; - return; - } - } + t := time.now(); + r := rand.create(transmute(u64)t); + i := rand.uint32(&r); - index = s.index; + index = candidates[i % count]; return; } -channel_select_write :: proc(writers: []$C/Channel($T), write_msgs: []T) -> (read_msg: T, index: int) { - return channel_select([]C{}, writers, msg); -} -channel_select_read :: proc(readers: []$C/Channel($T)) -> (index: int) { - _, index = channel_select(readers, []C{}, nil); +channel_select_send :: proc(channels: ..^Raw_Channel) -> (index: int) { + backing: [64]int; + candidates := backing[:]; + if len(channels) > len(backing) { + candidates = make([]int, len(channels), context.temp_allocator); + } + + count := u32(0); + for c, i in channels { + if raw_channel_can_send(c) { + candidates[i] = i; + count += 1; + } + } + + if count == 0 { + index = -1; + return; + } + + t := time.now(); + r := rand.create(transmute(u64)t); + i := rand.uint32(&r); + + index = candidates[i % count]; + return; +} + +channel_select_recv_msg :: proc(channels: ..$C/Channel($T)) -> (msg: T, index: int) { + backing: [64]int; + candidates := backing[:]; + if len(channels) > len(backing) { + candidates = make([]int, len(channels), context.temp_allocator); + } + + count := u32(0); + for c, i in channels { + if channel_can_recv(c) { + candidates[i] = i; + count += 1; + } + } + + if count == 0 { + index = -1; + return; + } + + t := time.now(); + r := rand.create(transmute(u64)t); + i := rand.uint32(&r); + + index = candidates[i % count]; + msg = channel_recv(channels[index]); + return; +} + +channel_select_send_msg :: proc(msg: $T, channels: ..$C/Channel(T)) -> (index: int) { + backing: [64]int; + candidates := backing[:]; + if len(channels) > len(backing) { + candidates = make([]int, len(channels), context.temp_allocator); + } + + count := u32(0); + for c, i in channels { + if raw_channel_can_send(c) { + candidates[i] = i; + count += 1; + } + } + + if count == 0 { + index = -1; + return; + } + + t := time.now(); + r := rand.create(transmute(u64)t); + i := rand.uint32(&r); + + index = candidates[i % count]; + channel_send(channels[index], msg); + return; +} + + + + + + +Raw_Channel :: struct { + data: rawptr, + elem_size: int, + len, cap: int, + read, write: int, + mutex: Mutex, + cond: Condition, + allocator: mem.Allocator, + closed: bool, + ready: bool, // ready to recv +} + + +raw_channel_create :: proc(elem_size, elem_align, cap: int) -> ^Raw_Channel { + s := size_of(Raw_Channel); + s = mem.align_forward_int(s, elem_align); + data_offset := uintptr(s); + s += elem_size * max(cap, 1); + + a := max(elem_align, align_of(Raw_Channel)); + + c := (^Raw_Channel)(mem.alloc(s, a)); + if c == nil { + return nil; + } + + c.data = rawptr(uintptr(c) + data_offset); + c.elem_size = elem_size; + c.len, c.cap = 0, max(cap, 0); + c.read, c.write = 0, 0; + mutex_init(&c.mutex); + condition_init(&c.cond, &c.mutex); + c.allocator = context.allocator; + c.closed = false; + + return c; +} + + +raw_channel_destroy :: proc(c: ^Raw_Channel) { + if c == nil { + return; + } + context.allocator = c.allocator; + c.closed = true; + + condition_destroy(&c.cond); + mutex_destroy(&c.mutex); + free(c); +} + + +raw_channel_send_impl :: proc(c: ^Raw_Channel, msg: rawptr, no_block: bool, loc := #caller_location) -> bool { + send :: proc(c: ^Raw_Channel, src: rawptr) { + dst := uintptr(c.data) + uintptr(c.write * c.elem_size); + mem.copy(rawptr(dst), src, c.elem_size); + c.len += 1; + c.write = (c.write + 1) % max(c.cap, 1); + } + + switch { + case c == nil: + panic(message="cannot send message; channel is nil", loc=loc); + case c.closed: + panic(message="cannot send message; channel is closed", loc=loc); + } + + mutex_lock(&c.mutex); + if c.cap > 0 { + if no_block && c.len >= c.cap { + mutex_unlock(&c.mutex); + return false; + } + + for c.len >= c.cap { + condition_wait_for(&c.cond); + } + } + + send(c, msg); + mutex_unlock(&c.mutex); + condition_signal(&c.cond); + + return true; +} + +raw_channel_recv_impl :: proc(c: ^Raw_Channel, res: rawptr, loc := #caller_location) { + recv :: proc(c: ^Raw_Channel, dst: rawptr, loc := #caller_location) { + if c.len < 1 { + panic(message="cannot recv message; channel is empty", loc=loc); + } + c.len -= 1; + src := uintptr(c.data) + uintptr(c.read * c.elem_size); + mem.copy(dst, rawptr(src), c.elem_size); + c.read = (c.read + 1) % max(c.cap, 1); + } + + if c == nil { + panic(message="cannot recv message; channel is nil", loc=loc); + } + intrinsics.atomic_store(&c.ready, true); + for c.len < 1 { + condition_wait_for(&c.cond); + } + intrinsics.atomic_store(&c.ready, false); + recv(c, res, loc); + if c.cap > 0 && c.len == c.cap - 1 { + condition_signal(&c.cond); + } +} + + +raw_channel_can_send :: proc(c: ^Raw_Channel) -> (ok: bool) { + if c == nil { + return false; + } + mutex_lock(&c.mutex); + switch { + case c.closed: + ok = false; + case c.cap > 0: + ok = c.len < c.cap; + case: + ok = !c.ready; + } + mutex_unlock(&c.mutex); + return; +} +raw_channel_can_recv :: proc(c: ^Raw_Channel) -> (ok: bool) { + if c == nil { + return false; + } + mutex_lock(&c.mutex); + ok = c.len > 0; + mutex_unlock(&c.mutex); return; } -*/