From a580cdbe7b61e9b2d9d7cb7e31dcb83cce88ae1e Mon Sep 17 00:00:00 2001 From: gingerBill Date: Wed, 19 May 2021 10:50:27 +0100 Subject: [PATCH] Delete core:sync/sync2/channel* stuff (for the time being) --- core/sync/sync2/channel.odin | 886 --------------------------- core/sync/sync2/channel_unix.odin | 17 - core/sync/sync2/channel_windows.odin | 34 - 3 files changed, 937 deletions(-) delete mode 100644 core/sync/sync2/channel.odin delete mode 100644 core/sync/sync2/channel_unix.odin delete mode 100644 core/sync/sync2/channel_windows.odin diff --git a/core/sync/sync2/channel.odin b/core/sync/sync2/channel.odin deleted file mode 100644 index fc30d8280..000000000 --- a/core/sync/sync2/channel.odin +++ /dev/null @@ -1,886 +0,0 @@ -package sync2 - -// TODO(bill): The Channel implementation needs a complete rewrite for this new package sync design -// Especially how the `select` things work - -import "core:mem" -import "core:time" -import "core:math/rand" - -_, _ :: time, rand; - -Channel_Direction :: enum i8 { - Both = 0, - Send = +1, - Recv = -1, -} - -Channel :: struct(T: typeid, Direction := Channel_Direction.Both) { - using _internal: ^Raw_Channel, -} - -channel_init :: proc(ch: ^$C/Channel($T, $D), cap := 0, allocator := context.allocator) { - context.allocator = allocator; - ch._internal = raw_channel_create(size_of(T), align_of(T), cap); - return; -} - -channel_make :: proc($T: typeid, cap := 0, allocator := context.allocator) -> (ch: Channel(T, .Both)) { - context.allocator = allocator; - ch._internal = raw_channel_create(size_of(T), align_of(T), cap); - return; -} - -channel_make_send :: proc($T: typeid, cap := 0, allocator := context.allocator) -> (ch: Channel(T, .Send)) { - context.allocator = allocator; - ch._internal = raw_channel_create(size_of(T), align_of(T), cap); - return; -} -channel_make_recv :: proc($T: typeid, cap := 0, allocator := context.allocator) -> (ch: Channel(T, .Recv)) { - context.allocator = allocator; - ch._internal = raw_channel_create(size_of(T), align_of(T), cap); - return; -} - -channel_destroy :: proc(ch: $C/Channel($T, $D)) { - raw_channel_destroy(ch._internal); -} - -channel_as_send :: proc(ch: $C/Channel($T, .Both)) -> (res: Channel(T, .Send)) { - res._internal = ch._internal; - return; -} - -channel_as_recv :: proc(ch: $C/Channel($T, .Both)) -> (res: Channel(T, .Recv)) { - res._internal = ch._internal; - return; -} - - -channel_len :: proc(ch: $C/Channel($T, $D)) -> int { - return ch._internal.len if ch._internal != nil else 0; -} -channel_cap :: proc(ch: $C/Channel($T, $D)) -> int { - return ch._internal.cap if ch._internal != nil else 0; -} - - -channel_send :: proc(ch: $C/Channel($T, $D), msg: T, loc := #caller_location) where D >= .Both { - msg := msg; - _ = raw_channel_send_impl(ch._internal, &msg, /*block*/true, loc); -} -channel_try_send :: proc(ch: $C/Channel($T, $D), msg: T, loc := #caller_location) -> bool where D >= .Both { - msg := msg; - return raw_channel_send_impl(ch._internal, &msg, /*block*/false, loc); -} - -channel_recv :: proc(ch: $C/Channel($T, $D), loc := #caller_location) -> (msg: T) where D <= .Both { - c := ch._internal; - if c == nil { - panic(message="cannot recv message; channel is nil", loc=loc); - } - mutex_lock(&c.mutex); - raw_channel_recv_impl(c, &msg, loc); - mutex_unlock(&c.mutex); - return; -} -channel_try_recv :: proc(ch: $C/Channel($T, $D), loc := #caller_location) -> (msg: T, ok: bool) where D <= .Both { - c := ch._internal; - if c != nil && mutex_try_lock(&c.mutex) { - if c.len > 0 { - raw_channel_recv_impl(c, &msg, loc); - ok = true; - } - mutex_unlock(&c.mutex); - } - return; -} -channel_try_recv_ptr :: proc(ch: $C/Channel($T, $D), msg: ^T, loc := #caller_location) -> (ok: bool) where D <= .Both { - res: T; - res, ok = channel_try_recv(ch, loc); - if ok && msg != nil { - msg^ = res; - } - return; -} - - -channel_is_nil :: proc(ch: $C/Channel($T, $D)) -> bool { - return ch._internal == nil; -} -channel_is_open :: proc(ch: $C/Channel($T, $D)) -> bool { - c := ch._internal; - return c != nil && !c.closed; -} - - -channel_eq :: proc(a, b: $C/Channel($T, $D)) -> bool { - return a._internal == b._internal; -} -channel_ne :: proc(a, b: $C/Channel($T, $D)) -> bool { - return a._internal != b._internal; -} - - -channel_can_send :: proc(ch: $C/Channel($T, $D)) -> (ok: bool) where D >= .Both { - return raw_channel_can_send(ch._internal); -} -channel_can_recv :: proc(ch: $C/Channel($T, $D)) -> (ok: bool) where D <= .Both { - return raw_channel_can_recv(ch._internal); -} - - -channel_peek :: proc(ch: $C/Channel($T, $D)) -> int { - c := ch._internal; - if c == nil { - return -1; - } - if atomic_load(&c.closed) { - return -1; - } - return atomic_load(&c.len); -} - - -channel_close :: proc(ch: $C/Channel($T, $D), loc := #caller_location) { - raw_channel_close(ch._internal, loc); -} - - -channel_iterator :: proc(ch: $C/Channel($T, $D)) -> (msg: T, ok: bool) where D <= .Both { - c := ch._internal; - if c == nil { - return; - } - - if !c.closed || c.len > 0 { - msg, ok = channel_recv(ch), true; - } - return; -} -channel_drain :: proc(ch: $C/Channel($T, $D)) where D >= .Both { - raw_channel_drain(ch._internal); -} - - -channel_move :: proc(dst: $C1/Channel($T, $D1) src: $C2/Channel(T, $D2)) where D1 <= .Both, D2 >= .Both { - for msg in channel_iterator(src) { - channel_send(dst, msg); - } -} - - -Raw_Channel_Wait_Queue :: struct { - next: ^Raw_Channel_Wait_Queue, - state: ^uintptr, -} - - -Raw_Channel :: struct { - closed: bool, - ready: bool, // ready to recv - data_offset: u16, // data is stored at the end of this data structure - elem_size: u32, - len, cap: int, - read, write: int, - mutex: Mutex, - cond: Cond, - allocator: mem.Allocator, - - sendq: ^Raw_Channel_Wait_Queue, - recvq: ^Raw_Channel_Wait_Queue, -} - -raw_channel_wait_queue_insert :: proc(head: ^^Raw_Channel_Wait_Queue, val: ^Raw_Channel_Wait_Queue) { - val.next = head^; - head^ = val; -} -raw_channel_wait_queue_remove :: proc(head: ^^Raw_Channel_Wait_Queue, val: ^Raw_Channel_Wait_Queue) { - p := head; - for p^ != nil && p^ != val { - p = &p^.next; - } - if p != nil { - p^ = p^.next; - } -} - - -raw_channel_create :: proc(elem_size, elem_align: int, cap := 0) -> ^Raw_Channel { - assert(int(u32(elem_size)) == elem_size); - - s := size_of(Raw_Channel); - s = mem.align_forward_int(s, elem_align); - data_offset := uintptr(s); - s += elem_size * max(cap, 1); - - a := max(elem_align, align_of(Raw_Channel)); - - c := (^Raw_Channel)(mem.alloc(s, a)); - if c == nil { - return nil; - } - - c.data_offset = u16(data_offset); - c.elem_size = u32(elem_size); - c.len, c.cap = 0, max(cap, 0); - c.read, c.write = 0, 0; - c.allocator = context.allocator; - c.closed = false; - - return c; -} - - -raw_channel_destroy :: proc(c: ^Raw_Channel) { - if c == nil { - return; - } - context.allocator = c.allocator; - atomic_store(&c.closed, true); - free(c); -} - -raw_channel_close :: proc(c: ^Raw_Channel, loc := #caller_location) { - if c == nil { - panic(message="cannot close nil channel", loc=loc); - } - mutex_lock(&c.mutex); - defer mutex_unlock(&c.mutex); - atomic_store(&c.closed, true); - - // Release readers and writers - raw_channel_wait_queue_broadcast(c.recvq); - raw_channel_wait_queue_broadcast(c.sendq); - cond_broadcast(&c.cond); -} - - - -raw_channel_send_impl :: proc(c: ^Raw_Channel, msg: rawptr, block: bool, loc := #caller_location) -> bool { - send :: proc(c: ^Raw_Channel, src: rawptr) { - data := uintptr(c) + uintptr(c.data_offset); - dst := data + uintptr(c.write * int(c.elem_size)); - mem.copy(rawptr(dst), src, int(c.elem_size)); - c.len += 1; - c.write = (c.write + 1) % max(c.cap, 1); - } - - switch { - case c == nil: - panic(message="cannot send message; channel is nil", loc=loc); - case c.closed: - panic(message="cannot send message; channel is closed", loc=loc); - } - - mutex_lock(&c.mutex); - defer mutex_unlock(&c.mutex); - - if c.cap > 0 { - if !block && c.len >= c.cap { - return false; - } - - for c.len >= c.cap { - cond_wait(&c.cond, &c.mutex); - } - } else if c.len > 0 { // TODO(bill): determine correct behaviour - if !block { - return false; - } - cond_wait(&c.cond, &c.mutex); - } else if c.len == 0 && !block { - return false; - } - - send(c, msg); - cond_signal(&c.cond); - raw_channel_wait_queue_signal(c.recvq); - - return true; -} - -raw_channel_recv_impl :: proc(c: ^Raw_Channel, res: rawptr, loc := #caller_location) { - recv :: proc(c: ^Raw_Channel, dst: rawptr, loc := #caller_location) { - if c.len < 1 { - panic(message="cannot recv message; channel is empty", loc=loc); - } - c.len -= 1; - - data := uintptr(c) + uintptr(c.data_offset); - src := data + uintptr(c.read * int(c.elem_size)); - mem.copy(dst, rawptr(src), int(c.elem_size)); - c.read = (c.read + 1) % max(c.cap, 1); - } - - if c == nil { - panic(message="cannot recv message; channel is nil", loc=loc); - } - atomic_store(&c.ready, true); - for c.len < 1 { - raw_channel_wait_queue_signal(c.sendq); - cond_wait(&c.cond, &c.mutex); - } - atomic_store(&c.ready, false); - recv(c, res, loc); - if c.cap > 0 { - if c.len == c.cap - 1 { - // NOTE(bill): Only signal on the last one - cond_signal(&c.cond); - } - } else { - cond_signal(&c.cond); - } -} - - -raw_channel_can_send :: proc(c: ^Raw_Channel) -> (ok: bool) { - if c == nil { - return false; - } - mutex_lock(&c.mutex); - switch { - case c.closed: - ok = false; - case c.cap > 0: - ok = c.ready && c.len < c.cap; - case: - ok = c.ready && c.len == 0; - } - mutex_unlock(&c.mutex); - return; -} -raw_channel_can_recv :: proc(c: ^Raw_Channel) -> (ok: bool) { - if c == nil { - return false; - } - mutex_lock(&c.mutex); - ok = c.len > 0; - mutex_unlock(&c.mutex); - return; -} - - -raw_channel_drain :: proc(c: ^Raw_Channel) { - if c == nil { - return; - } - mutex_lock(&c.mutex); - c.len = 0; - c.read = 0; - c.write = 0; - mutex_unlock(&c.mutex); -} - - - -MAX_SELECT_CHANNELS :: 64; -SELECT_MAX_TIMEOUT :: max(time.Duration); - -Select_Command :: enum { - Recv, - Send, -} - -Select_Channel :: struct { - channel: ^Raw_Channel, - command: Select_Command, -} - - - -select :: proc(channels: ..Select_Channel) -> (index: int) { - return select_timeout(SELECT_MAX_TIMEOUT, ..channels); -} -select_timeout :: proc(timeout: time.Duration, channels: ..Select_Channel) -> (index: int) { - switch len(channels) { - case 0: - panic("sync: select with no channels"); - } - - assert(len(channels) <= MAX_SELECT_CHANNELS); - - backing: [MAX_SELECT_CHANNELS]int; - queues: [MAX_SELECT_CHANNELS]Raw_Channel_Wait_Queue; - candidates := backing[:]; - cap := len(channels); - candidates = candidates[:cap]; - - count := u32(0); - for c, i in channels { - if c.channel == nil { - continue; - } - switch c.command { - case .Recv: - if raw_channel_can_recv(c.channel) { - candidates[count] = i; - count += 1; - } - case .Send: - if raw_channel_can_send(c.channel) { - candidates[count] = i; - count += 1; - } - } - } - - if count == 0 { - wait_state: uintptr = 0; - for _, i in channels { - q := &queues[i]; - q.state = &wait_state; - } - - for c, i in channels { - if c.channel == nil { - continue; - } - q := &queues[i]; - switch c.command { - case .Recv: raw_channel_wait_queue_insert(&c.channel.recvq, q); - case .Send: raw_channel_wait_queue_insert(&c.channel.sendq, q); - } - } - raw_channel_wait_queue_wait_on(&wait_state, timeout); - for c, i in channels { - if c.channel == nil { - continue; - } - q := &queues[i]; - switch c.command { - case .Recv: raw_channel_wait_queue_remove(&c.channel.recvq, q); - case .Send: raw_channel_wait_queue_remove(&c.channel.sendq, q); - } - } - - for c, i in channels { - switch c.command { - case .Recv: - if raw_channel_can_recv(c.channel) { - candidates[count] = i; - count += 1; - } - case .Send: - if raw_channel_can_send(c.channel) { - candidates[count] = i; - count += 1; - } - } - } - if count == 0 && timeout == SELECT_MAX_TIMEOUT { - index = -1; - return; - } - - assert(count != 0); - } - - t := time.now(); - r := rand.create(transmute(u64)t); - i := rand.uint32(&r); - - index = candidates[i % count]; - return; -} - -select_recv :: proc(channels: ..^Raw_Channel) -> (index: int) { - switch len(channels) { - case 0: - panic("sync: select with no channels"); - } - - assert(len(channels) <= MAX_SELECT_CHANNELS); - - backing: [MAX_SELECT_CHANNELS]int; - queues: [MAX_SELECT_CHANNELS]Raw_Channel_Wait_Queue; - candidates := backing[:]; - cap := len(channels); - candidates = candidates[:cap]; - - count := u32(0); - for c, i in channels { - if raw_channel_can_recv(c) { - candidates[count] = i; - count += 1; - } - } - - if count == 0 { - state: uintptr; - for c, i in channels { - q := &queues[i]; - q.state = &state; - raw_channel_wait_queue_insert(&c.recvq, q); - } - raw_channel_wait_queue_wait_on(&state, SELECT_MAX_TIMEOUT); - for c, i in channels { - q := &queues[i]; - raw_channel_wait_queue_remove(&c.recvq, q); - } - - for c, i in channels { - if raw_channel_can_recv(c) { - candidates[count] = i; - count += 1; - } - } - assert(count != 0); - } - - t := time.now(); - r := rand.create(transmute(u64)t); - i := rand.uint32(&r); - - index = candidates[i % count]; - return; -} - -select_recv_msg :: proc(channels: ..$C/Channel($T, $D)) -> (msg: T, index: int) { - switch len(channels) { - case 0: - panic("sync: select with no channels"); - } - - assert(len(channels) <= MAX_SELECT_CHANNELS); - - queues: [MAX_SELECT_CHANNELS]Raw_Channel_Wait_Queue; - candidates: [MAX_SELECT_CHANNELS]int; - - count := u32(0); - for c, i in channels { - if raw_channel_can_recv(c) { - candidates[count] = i; - count += 1; - } - } - - if count == 0 { - state: uintptr; - for c, i in channels { - q := &queues[i]; - q.state = &state; - raw_channel_wait_queue_insert(&c.recvq, q); - } - raw_channel_wait_queue_wait_on(&state, SELECT_MAX_TIMEOUT); - for c, i in channels { - q := &queues[i]; - raw_channel_wait_queue_remove(&c.recvq, q); - } - - for c, i in channels { - if raw_channel_can_recv(c) { - candidates[count] = i; - count += 1; - } - } - assert(count != 0); - } - - t := time.now(); - r := rand.create(transmute(u64)t); - i := rand.uint32(&r); - - index = candidates[i % count]; - msg = channel_recv(channels[index]); - - return; -} - -select_send_msg :: proc(msg: $T, channels: ..$C/Channel(T, $D)) -> (index: int) { - switch len(channels) { - case 0: - panic("sync: select with no channels"); - } - - assert(len(channels) <= MAX_SELECT_CHANNELS); - - backing: [MAX_SELECT_CHANNELS]int; - queues: [MAX_SELECT_CHANNELS]Raw_Channel_Wait_Queue; - candidates := backing[:]; - cap := len(channels); - candidates = candidates[:cap]; - - count := u32(0); - for c, i in channels { - if raw_channel_can_recv(c) { - candidates[count] = i; - count += 1; - } - } - - if count == 0 { - state: uintptr; - for c, i in channels { - q := &queues[i]; - q.state = &state; - raw_channel_wait_queue_insert(&c.recvq, q); - } - raw_channel_wait_queue_wait_on(&state, SELECT_MAX_TIMEOUT); - for c, i in channels { - q := &queues[i]; - raw_channel_wait_queue_remove(&c.recvq, q); - } - - for c, i in channels { - if raw_channel_can_recv(c) { - candidates[count] = i; - count += 1; - } - } - assert(count != 0); - } - - t := time.now(); - r := rand.create(transmute(u64)t); - i := rand.uint32(&r); - - index = candidates[i % count]; - - if msg != nil { - channel_send(channels[index], msg); - } - - return; -} - -select_send :: proc(channels: ..^Raw_Channel) -> (index: int) { - switch len(channels) { - case 0: - panic("sync: select with no channels"); - } - - assert(len(channels) <= MAX_SELECT_CHANNELS); - candidates: [MAX_SELECT_CHANNELS]int; - queues: [MAX_SELECT_CHANNELS]Raw_Channel_Wait_Queue; - - count := u32(0); - for c, i in channels { - if raw_channel_can_send(c) { - candidates[count] = i; - count += 1; - } - } - - if count == 0 { - state: uintptr; - for c, i in channels { - q := &queues[i]; - q.state = &state; - raw_channel_wait_queue_insert(&c.sendq, q); - } - raw_channel_wait_queue_wait_on(&state, SELECT_MAX_TIMEOUT); - for c, i in channels { - q := &queues[i]; - raw_channel_wait_queue_remove(&c.sendq, q); - } - - for c, i in channels { - if raw_channel_can_send(c) { - candidates[count] = i; - count += 1; - } - } - assert(count != 0); - } - - t := time.now(); - r := rand.create(transmute(u64)t); - i := rand.uint32(&r); - - index = candidates[i % count]; - return; -} - -select_try :: proc(channels: ..Select_Channel) -> (index: int) { - switch len(channels) { - case 0: - panic("sync: select with no channels"); - } - - assert(len(channels) <= MAX_SELECT_CHANNELS); - - backing: [MAX_SELECT_CHANNELS]int; - candidates := backing[:]; - cap := len(channels); - candidates = candidates[:cap]; - - count := u32(0); - for c, i in channels { - switch c.command { - case .Recv: - if raw_channel_can_recv(c.channel) { - candidates[count] = i; - count += 1; - } - case .Send: - if raw_channel_can_send(c.channel) { - candidates[count] = i; - count += 1; - } - } - } - - if count == 0 { - index = -1; - return; - } - - t := time.now(); - r := rand.create(transmute(u64)t); - i := rand.uint32(&r); - - index = candidates[i % count]; - return; -} - - -select_try_recv :: proc(channels: ..^Raw_Channel) -> (index: int) { - switch len(channels) { - case 0: - index = -1; - return; - case 1: - index = -1; - if raw_channel_can_recv(channels[0]) { - index = 0; - } - return; - } - - assert(len(channels) <= MAX_SELECT_CHANNELS); - candidates: [MAX_SELECT_CHANNELS]int; - - count := u32(0); - for c, i in channels { - if raw_channel_can_recv(c) { - candidates[count] = i; - count += 1; - } - } - - if count == 0 { - index = -1; - return; - } - - t := time.now(); - r := rand.create(transmute(u64)t); - i := rand.uint32(&r); - - index = candidates[i % count]; - return; -} - - -select_try_send :: proc(channels: ..^Raw_Channel) -> (index: int) #no_bounds_check { - switch len(channels) { - case 0: - return -1; - case 1: - if raw_channel_can_send(channels[0]) { - return 0; - } - return -1; - } - - assert(len(channels) <= MAX_SELECT_CHANNELS); - candidates: [MAX_SELECT_CHANNELS]int; - - count := u32(0); - for c, i in channels { - if raw_channel_can_send(c) { - candidates[count] = i; - count += 1; - } - } - - if count == 0 { - index = -1; - return; - } - - t := time.now(); - r := rand.create(transmute(u64)t); - i := rand.uint32(&r); - - index = candidates[i % count]; - return; -} - -select_try_recv_msg :: proc(channels: ..$C/Channel($T, $D)) -> (msg: T, index: int) { - switch len(channels) { - case 0: - index = -1; - return; - case 1: - ok: bool; - if msg, ok = channel_try_recv(channels[0]); ok { - index = 0; - } - return; - } - - assert(len(channels) <= MAX_SELECT_CHANNELS); - candidates: [MAX_SELECT_CHANNELS]int; - - count := u32(0); - for c, i in channels { - if channel_can_recv(c) { - candidates[count] = i; - count += 1; - } - } - - if count == 0 { - index = -1; - return; - } - - t := time.now(); - r := rand.create(transmute(u64)t); - i := rand.uint32(&r); - - index = candidates[i % count]; - msg = channel_recv(channels[index]); - return; -} - -select_try_send_msg :: proc(msg: $T, channels: ..$C/Channel(T, $D)) -> (index: int) { - index = -1; - switch len(channels) { - case 0: - return; - case 1: - if channel_try_send(channels[0], msg) { - index = 0; - } - return; - } - - - assert(len(channels) <= MAX_SELECT_CHANNELS); - candidates: [MAX_SELECT_CHANNELS]int; - - count := u32(0); - for c, i in channels { - if raw_channel_can_send(c) { - candidates[count] = i; - count += 1; - } - } - - if count == 0 { - index = -1; - return; - } - - t := time.now(); - r := rand.create(transmute(u64)t); - i := rand.uint32(&r); - - index = candidates[i % count]; - channel_send(channels[index], msg); - return; -} - diff --git a/core/sync/sync2/channel_unix.odin b/core/sync/sync2/channel_unix.odin deleted file mode 100644 index 7429b67db..000000000 --- a/core/sync/sync2/channel_unix.odin +++ /dev/null @@ -1,17 +0,0 @@ -//+build linux, darwin, freebsd -//+private -package sync2 - -import "core:time" - -raw_channel_wait_queue_wait_on :: proc(state: ^uintptr, timeout: time.Duration) { - // stub -} - -raw_channel_wait_queue_signal :: proc(q: ^Raw_Channel_Wait_Queue) { - // stub -} - -raw_channel_wait_queue_broadcast :: proc(q: ^Raw_Channel_Wait_Queue) { - // stub -} diff --git a/core/sync/sync2/channel_windows.odin b/core/sync/sync2/channel_windows.odin deleted file mode 100644 index e365506c8..000000000 --- a/core/sync/sync2/channel_windows.odin +++ /dev/null @@ -1,34 +0,0 @@ -//+build windows -//+private -package sync2 - -import win32 "core:sys/windows" -import "core:time" - -raw_channel_wait_queue_wait_on :: proc(state: ^uintptr, timeout: time.Duration) { - ms: win32.DWORD = win32.INFINITE; - if max(time.Duration) != SELECT_MAX_TIMEOUT { - ms = win32.DWORD((max(time.duration_nanoseconds(timeout), 0) + 999999)/1000000); - } - - v := atomic_load(state); - for v == 0 { - win32.WaitOnAddress(state, &v, size_of(state^), ms); - v = atomic_load(state); - } - atomic_store(state, 0); -} - -raw_channel_wait_queue_signal :: proc(q: ^Raw_Channel_Wait_Queue) { - for x := q; x != nil; x = x.next { - atomic_add(x.state, 1); - win32.WakeByAddressSingle(x.state); - } -} - -raw_channel_wait_queue_broadcast :: proc(q: ^Raw_Channel_Wait_Queue) { - for x := q; x != nil; x = x.next { - atomic_add(x.state, 1); - win32.WakeByAddressAll(x.state); - } -}