diff --git a/core/sync/channel.odin b/core/sync/channel.odin index ed9c526ad..5397bdc2e 100644 --- a/core/sync/channel.odin +++ b/core/sync/channel.odin @@ -1,6 +1,5 @@ package sync -// import "core:fmt" import "core:mem" import "core:time" import "core:intrinsics" @@ -113,17 +112,32 @@ channel_close :: proc(ch: $C/Channel($T), loc := #caller_location) { } -channel_iterator :: proc(ch: $C/Channel($T)) -> (val: T, ok: bool) { +channel_iterator :: proc(ch: $C/Channel($T)) -> (msg: T, ok: bool) { c := ch._internal; if c == nil { return; } if !c.closed || c.len > 0 { - val, ok = channel_recv(ch), true; + msg, ok = channel_recv(ch), true; } return; } +channel_drain :: proc(ch: $C/Channel($T)) { + raw_channel_drain(ch._internal); +} + + +channel_move :: proc(dst, src: $C/Channel($T)) { + // for channel_len(src) > 0 { + // msg := channel_recv(src); + // channel_send(dst, msg); + // } + for msg in channel_iterator(src) { + channel_send(dst, msg); + } +} + channel_select_recv :: proc(channels: ..^Raw_Channel) -> (index: int) { @@ -247,8 +261,6 @@ channel_select_send_msg :: proc(msg: $T, channels: ..$C/Channel(T)) -> (index: i - - Raw_Channel :: struct { data: rawptr, elem_size: int, @@ -393,3 +405,15 @@ raw_channel_can_recv :: proc(c: ^Raw_Channel) -> (ok: bool) { mutex_unlock(&c.mutex); return; } + + +raw_channel_drain :: proc(c: ^Raw_Channel) { + if c == nil { + return; + } + mutex_lock(&c.mutex); + c.len = 0; + c.read = 0; + c.write = 0; + mutex_unlock(&c.mutex); +} diff --git a/core/sync/wait_group.odin b/core/sync/wait_group.odin new file mode 100644 index 000000000..477bce9c2 --- /dev/null +++ b/core/sync/wait_group.odin @@ -0,0 +1,58 @@ +package sync + +import "intrinsics" + +Wait_Group :: struct { + counter: int, + mutex: Blocking_Mutex, + cond: Condition, +} + +wait_group_init :: proc(wg: ^Wait_Group) { + wg.counter = 0; + blocking_mutex_init(&wg.mutex); + condition_init(&wg.cond, &wg.mutex); +} + + +wait_group_destroy :: proc(wg: ^Wait_Group) { + condition_destroy(&wg.cond); + blocking_mutex_destroy(&wg.mutex); +} + +wait_group_add :: proc(wg: ^Wait_Group, delta: int) { + if delta == 0 { + return; + } + + blocking_mutex_lock(&wg.mutex); + defer blocking_mutex_unlock(&wg.mutex); + + intrinsics.atomic_add(&wg.counter, delta); + if wg.counter < 0 { + panic("sync.Wait_Group negative counter"); + } + if wg.counter == 0 { + condition_broadcast(&wg.cond); + if wg.counter != 0 { + panic("sync.Wait_Group misuse: sync.wait_group_add called concurrently with sync.wait_group_wait"); + } + } +} + +wait_group_done :: proc(wg: ^Wait_Group) { + wait_group_add(wg, -1); +} + +wait_group_wait :: proc(wg: ^Wait_Group) { + blocking_mutex_lock(&wg.mutex); + defer blocking_mutex_unlock(&wg.mutex); + + if wg.counter != 0 { + condition_wait_for(&wg.cond); + if wg.counter != 0 { + panic("sync.Wait_Group misuse: sync.wait_group_add called concurrently with sync.wait_group_wait"); + } + } +} +