diff --git a/core/sync/channel.odin b/core/sync/channel.odin index 5397bdc2e..7ac046836 100644 --- a/core/sync/channel.odin +++ b/core/sync/channel.odin @@ -1,6 +1,7 @@ package sync import "core:mem" +import "core:fmt" import "core:time" import "core:intrinsics" import "core:math/rand" @@ -64,6 +65,15 @@ channel_try_recv :: proc(ch: $C/Channel($T), loc := #caller_location) -> (msg: T } return; } +channel_try_recv_ptr :: proc(ch: $C/Channel($T), msg: ^T, loc := #caller_location) -> (ok: bool) { + res: T; + res, ok = channel_try_recv(ch, loc); + if ok && msg != nil { + msg^ = res; + } + return; +} + channel_is_nil :: proc(ch: $C/Channel($T)) -> bool { return ch._internal == nil; @@ -90,7 +100,6 @@ channel_can_recv :: proc(ch: $C/Channel($T)) -> (ok: bool) { } - channel_peek :: proc(ch: $C/Channel($T)) -> int { c := ch._internal; if c == nil { @@ -104,11 +113,7 @@ channel_peek :: proc(ch: $C/Channel($T)) -> int { channel_close :: proc(ch: $C/Channel($T), loc := #caller_location) { - c := ch._internal; - if c == nil { - panic(message="cannot close nil channel", loc=loc); - } - intrinsics.atomic_store(&c.closed, true); + raw_channel_close(ch._internal, loc); } @@ -129,152 +134,51 @@ channel_drain :: proc(ch: $C/Channel($T)) { channel_move :: proc(dst, src: $C/Channel($T)) { - // for channel_len(src) > 0 { - // msg := channel_recv(src); - // channel_send(dst, msg); - // } for msg in channel_iterator(src) { channel_send(dst, msg); } } - -channel_select_recv :: proc(channels: ..^Raw_Channel) -> (index: int) { - backing: [64]int; - candidates := backing[:]; - cap := len(channels); - if cap > len(backing) { - candidates = make([]int, cap, context.temp_allocator); - } else { - candidates = candidates[:cap]; - } - - count := u32(0); - for c, i in channels { - if raw_channel_can_recv(c) { - candidates[i] = i; - count += 1; - } - } - - if count == 0 { - index = -1; - return; - } - - t := time.now(); - r := rand.create(transmute(u64)t); - i := rand.uint32(&r); - - index = candidates[i % count]; - return; +Raw_Channel_Wait_Queue :: struct { + next: ^Raw_Channel_Wait_Queue, + state: ^uintptr, } -channel_select_send :: proc(channels: ..^Raw_Channel) -> (index: int) { - backing: [64]int; - candidates := backing[:]; - if len(channels) > len(backing) { - candidates = make([]int, len(channels), context.temp_allocator); - } - - count := u32(0); - for c, i in channels { - if raw_channel_can_send(c) { - candidates[i] = i; - count += 1; - } - } - - if count == 0 { - index = -1; - return; - } - - t := time.now(); - r := rand.create(transmute(u64)t); - i := rand.uint32(&r); - - index = candidates[i % count]; - return; -} - -channel_select_recv_msg :: proc(channels: ..$C/Channel($T)) -> (msg: T, index: int) { - backing: [64]int; - candidates := backing[:]; - if len(channels) > len(backing) { - candidates = make([]int, len(channels), context.temp_allocator); - } - - count := u32(0); - for c, i in channels { - if channel_can_recv(c) { - candidates[i] = i; - count += 1; - } - } - - if count == 0 { - index = -1; - return; - } - - t := time.now(); - r := rand.create(transmute(u64)t); - i := rand.uint32(&r); - - index = candidates[i % count]; - msg = channel_recv(channels[index]); - return; -} - -channel_select_send_msg :: proc(msg: $T, channels: ..$C/Channel(T)) -> (index: int) { - backing: [64]int; - candidates := backing[:]; - if len(channels) > len(backing) { - candidates = make([]int, len(channels), context.temp_allocator); - } - - count := u32(0); - for c, i in channels { - if raw_channel_can_send(c) { - candidates[i] = i; - count += 1; - } - } - - if count == 0 { - index = -1; - return; - } - - t := time.now(); - r := rand.create(transmute(u64)t); - i := rand.uint32(&r); - - index = candidates[i % count]; - channel_send(channels[index], msg); - return; -} - - - - Raw_Channel :: struct { - data: rawptr, - elem_size: int, + closed: bool, + ready: bool, // ready to recv + data_offset: u16, // data is stored at the end of this data structure + elem_size: u32, len, cap: int, read, write: int, mutex: Mutex, cond: Condition, allocator: mem.Allocator, - closed: bool, - ready: bool, // ready to recv + + sendq: ^Raw_Channel_Wait_Queue, + recvq: ^Raw_Channel_Wait_Queue, +} + +raw_channel_wait_queue_insert :: proc(head: ^^Raw_Channel_Wait_Queue, val: ^Raw_Channel_Wait_Queue) { + val.next = head^; + head^ = val; +} +raw_channel_wait_queue_remove :: proc(head: ^^Raw_Channel_Wait_Queue, val: ^Raw_Channel_Wait_Queue) { + p := head; + for p^ != nil && p^ != val { + p = &p^.next; + } + if p != nil { + p^ = p^.next; + } } -raw_channel_create :: proc(elem_size, elem_align, cap: int) -> ^Raw_Channel { +raw_channel_create :: proc(elem_size, elem_align: int, cap := 0) -> ^Raw_Channel { + assert(int(u32(elem_size)) == elem_size); + s := size_of(Raw_Channel); s = mem.align_forward_int(s, elem_align); data_offset := uintptr(s); @@ -287,8 +191,8 @@ raw_channel_create :: proc(elem_size, elem_align, cap: int) -> ^Raw_Channel { return nil; } - c.data = rawptr(uintptr(c) + data_offset); - c.elem_size = elem_size; + c.data_offset = u16(data_offset); + c.elem_size = u32(elem_size); c.len, c.cap = 0, max(cap, 0); c.read, c.write = 0, 0; mutex_init(&c.mutex); @@ -305,18 +209,34 @@ raw_channel_destroy :: proc(c: ^Raw_Channel) { return; } context.allocator = c.allocator; - c.closed = true; + intrinsics.atomic_store(&c.closed, true); condition_destroy(&c.cond); mutex_destroy(&c.mutex); free(c); } +raw_channel_close :: proc(c: ^Raw_Channel, loc := #caller_location) { + if c == nil { + panic(message="cannot close nil channel", loc=loc); + } + mutex_lock(&c.mutex); + defer mutex_unlock(&c.mutex); + intrinsics.atomic_store(&c.closed, true); + + // Release readers and writers + raw_channel_wait_queue_broadcast(c.recvq); + raw_channel_wait_queue_broadcast(c.sendq); + condition_broadcast(&c.cond); +} + + raw_channel_send_impl :: proc(c: ^Raw_Channel, msg: rawptr, block: bool, loc := #caller_location) -> bool { send :: proc(c: ^Raw_Channel, src: rawptr) { - dst := uintptr(c.data) + uintptr(c.write * c.elem_size); - mem.copy(rawptr(dst), src, c.elem_size); + data := uintptr(c) + uintptr(c.data_offset); + dst := data + uintptr(c.write * int(c.elem_size)); + mem.copy(rawptr(dst), src, int(c.elem_size)); c.len += 1; c.write = (c.write + 1) % max(c.cap, 1); } @@ -329,9 +249,10 @@ raw_channel_send_impl :: proc(c: ^Raw_Channel, msg: rawptr, block: bool, loc := } mutex_lock(&c.mutex); + defer mutex_unlock(&c.mutex); + if c.cap > 0 { if !block && c.len >= c.cap { - mutex_unlock(&c.mutex); return false; } @@ -339,12 +260,16 @@ raw_channel_send_impl :: proc(c: ^Raw_Channel, msg: rawptr, block: bool, loc := condition_wait_for(&c.cond); } } else if c.len > 0 { + if !block { + return false; + } condition_wait_for(&c.cond); } send(c, msg); - mutex_unlock(&c.mutex); condition_signal(&c.cond); + raw_channel_wait_queue_signal(c.recvq); + return true; } @@ -355,8 +280,10 @@ raw_channel_recv_impl :: proc(c: ^Raw_Channel, res: rawptr, loc := #caller_locat panic(message="cannot recv message; channel is empty", loc=loc); } c.len -= 1; - src := uintptr(c.data) + uintptr(c.read * c.elem_size); - mem.copy(dst, rawptr(src), c.elem_size); + + data := uintptr(c) + uintptr(c.data_offset); + src := data + uintptr(c.read * int(c.elem_size)); + mem.copy(dst, rawptr(src), int(c.elem_size)); c.read = (c.read + 1) % max(c.cap, 1); } @@ -365,6 +292,7 @@ raw_channel_recv_impl :: proc(c: ^Raw_Channel, res: rawptr, loc := #caller_locat } intrinsics.atomic_store(&c.ready, true); for c.len < 1 { + raw_channel_wait_queue_signal(c.sendq); condition_wait_for(&c.cond); } intrinsics.atomic_store(&c.ready, false); @@ -389,9 +317,9 @@ raw_channel_can_send :: proc(c: ^Raw_Channel) -> (ok: bool) { case c.closed: ok = false; case c.cap > 0: - ok = c.len < c.cap; + ok = c.ready && c.len < c.cap; case: - ok = !c.ready; + ok = c.ready && c.len == 0; } mutex_unlock(&c.mutex); return; @@ -417,3 +345,522 @@ raw_channel_drain :: proc(c: ^Raw_Channel) { c.write = 0; mutex_unlock(&c.mutex); } + + + +MAX_SELECT_CHANNELS :: 64; +SELECT_MAX_TIMEOUT :: max(time.Duration); + +Select_Command :: enum { + Recv, + Send, +} + +Select_Channel :: struct { + channel: ^Raw_Channel, + command: Select_Command, +} + + + +select :: proc(channels: ..Select_Channel) -> (index: int) { + return select_timeout(SELECT_MAX_TIMEOUT, ..channels); +} +select_timeout :: proc(timeout: time.Duration, channels: ..Select_Channel) -> (index: int) { + switch len(channels) { + case 0: + panic("sync: select with no channels"); + } + + assert(len(channels) <= MAX_SELECT_CHANNELS); + + backing: [MAX_SELECT_CHANNELS]int; + queues: [MAX_SELECT_CHANNELS]Raw_Channel_Wait_Queue; + candidates := backing[:]; + cap := len(channels); + candidates = candidates[:cap]; + + nil_channel := Raw_Channel{closed = true}; + + count := u32(0); + for c, i in channels { + if c.channel == nil { + continue; + } + switch c.command { + case .Recv: + if raw_channel_can_recv(c.channel) { + candidates[count] = i; + count += 1; + } + case .Send: + if raw_channel_can_send(c.channel) { + candidates[count] = i; + count += 1; + } + } + } + + if count == 0 { + wait_state: uintptr = 0; + for c, i in channels { + q := &queues[i]; + q.state = &wait_state; + } + + for c, i in channels { + if c.channel == nil { + continue; + } + q := &queues[i]; + switch c.command { + case .Recv: raw_channel_wait_queue_insert(&c.channel.recvq, q); + case .Send: raw_channel_wait_queue_insert(&c.channel.sendq, q); + } + } + raw_channel_wait_queue_wait_on(&wait_state, timeout); + for c, i in channels { + if c.channel == nil { + continue; + } + q := &queues[i]; + switch c.command { + case .Recv: raw_channel_wait_queue_remove(&c.channel.recvq, q); + case .Send: raw_channel_wait_queue_remove(&c.channel.sendq, q); + } + } + + for c, i in channels { + switch c.command { + case .Recv: + if raw_channel_can_recv(c.channel) { + candidates[count] = i; + count += 1; + } + case .Send: + if raw_channel_can_send(c.channel) { + candidates[count] = i; + count += 1; + } + } + } + if count == 0 && timeout == SELECT_MAX_TIMEOUT { + index = -1; + return; + } + + assert(count != 0); + } + + t := time.now(); + r := rand.create(transmute(u64)t); + i := rand.uint32(&r); + + index = candidates[i % count]; + return; +} + +select_recv :: proc(channels: ..^Raw_Channel) -> (index: int) { + switch len(channels) { + case 0: + panic("sync: select with no channels"); + } + + assert(len(channels) <= MAX_SELECT_CHANNELS); + + backing: [MAX_SELECT_CHANNELS]int; + queues: [MAX_SELECT_CHANNELS]Raw_Channel_Wait_Queue; + candidates := backing[:]; + cap := len(channels); + candidates = candidates[:cap]; + + count := u32(0); + for c, i in channels { + if raw_channel_can_recv(c) { + candidates[count] = i; + count += 1; + } + } + + if count == 0 { + state: uintptr; + for c, i in channels { + q := &queues[i]; + q.state = &state; + raw_channel_wait_queue_insert(&c.recvq, q); + } + raw_channel_wait_queue_wait_on(&state, SELECT_MAX_TIMEOUT); + for c, i in channels { + q := &queues[i]; + raw_channel_wait_queue_remove(&c.recvq, q); + } + + for c, i in channels { + if raw_channel_can_recv(c) { + candidates[count] = i; + count += 1; + } + } + assert(count != 0); + } + + t := time.now(); + r := rand.create(transmute(u64)t); + i := rand.uint32(&r); + + index = candidates[i % count]; + return; +} + +select_recv_msg :: proc(channels: ..$C/Channel($T)) -> (msg: T, index: int) { + switch len(channels) { + case 0: + panic("sync: select with no channels"); + } + + assert(len(channels) <= MAX_SELECT_CHANNELS); + + queues: [MAX_SELECT_CHANNELS]Raw_Channel_Wait_Queue; + candidates: [MAX_SELECT_CHANNELS]int; + + count := u32(0); + for c, i in channels { + if raw_channel_can_recv(c) { + candidates[count] = i; + count += 1; + } + } + + if count == 0 { + state: uintptr; + for c, i in channels { + q := &queues[i]; + q.state = &state; + raw_channel_wait_queue_insert(&c.recvq, q); + } + raw_channel_wait_queue_wait_on(&state); + for c, i in channels { + q := &queues[i]; + raw_channel_wait_queue_remove(&c.recvq, q); + } + + for c, i in channels { + if raw_channel_can_recv(c) { + candidates[count] = i; + count += 1; + } + } + assert(count != 0); + } + + t := time.now(); + r := rand.create(transmute(u64)t); + i := rand.uint32(&r); + + index = candidates[i % count]; + msg = channel_recv(channels[index]); + + return; +} + +select_send_msg :: proc(msg: $T, channels: ..$C/Channel(T)) -> (index: int) { + switch len(channels) { + case 0: + panic("sync: select with no channels"); + } + + assert(len(channels) <= MAX_SELECT_CHANNELS); + + backing: [MAX_SELECT_CHANNELS]int; + queues: [MAX_SELECT_CHANNELS]Raw_Channel_Wait_Queue; + candidates := backing[:]; + cap := len(channels); + candidates = candidates[:cap]; + + count := u32(0); + for c, i in channels { + if raw_channel_can_recv(c) { + candidates[count] = i; + count += 1; + } + } + + if count == 0 { + state: uintptr; + for c, i in channels { + q := &queues[i]; + q.state = &state; + raw_channel_wait_queue_insert(&c.recvq, q); + } + raw_channel_wait_queue_wait_on(&state); + for c, i in channels { + q := &queues[i]; + raw_channel_wait_queue_remove(&c.recvq, q); + } + + for c, i in channels { + if raw_channel_can_recv(c) { + candidates[count] = i; + count += 1; + } + } + assert(count != 0); + } + + t := time.now(); + r := rand.create(transmute(u64)t); + i := rand.uint32(&r); + + index = candidates[i % count]; + + if msg != nil { + channel_send(channels[index], msg); + } + + return; +} + +select_send :: proc(channels: ..^Raw_Channel) -> (index: int) { + switch len(channels) { + case 0: + panic("sync: select with no channels"); + } + + assert(len(channels) <= MAX_SELECT_CHANNELS); + candidates: [MAX_SELECT_CHANNELS]int; + queues: [MAX_SELECT_CHANNELS]Raw_Channel_Wait_Queue; + + count := u32(0); + for c, i in channels { + if raw_channel_can_send(c) { + candidates[count] = i; + count += 1; + } + } + + if count == 0 { + state: uintptr; + for c, i in channels { + q := &queues[i]; + q.state = &state; + raw_channel_wait_queue_insert(&c.sendq, q); + } + raw_channel_wait_queue_wait_on(&state, SELECT_MAX_TIMEOUT); + for c, i in channels { + q := &queues[i]; + raw_channel_wait_queue_remove(&c.sendq, q); + } + + for c, i in channels { + if raw_channel_can_send(c) { + candidates[count] = i; + count += 1; + } + } + assert(count != 0); + } + + t := time.now(); + r := rand.create(transmute(u64)t); + i := rand.uint32(&r); + + index = candidates[i % count]; + return; +} + +select_try :: proc(channels: ..Select_Channel) -> (index: int) { + switch len(channels) { + case 0: + panic("sync: select with no channels"); + } + + assert(len(channels) <= MAX_SELECT_CHANNELS); + + backing: [MAX_SELECT_CHANNELS]int; + queues: [MAX_SELECT_CHANNELS]Raw_Channel_Wait_Queue; + candidates := backing[:]; + cap := len(channels); + candidates = candidates[:cap]; + + count := u32(0); + for c, i in channels { + switch c.command { + case .Recv: + if raw_channel_can_recv(c.channel) { + candidates[count] = i; + count += 1; + } + case .Send: + if raw_channel_can_send(c.channel) { + candidates[count] = i; + count += 1; + } + } + } + + if count == 0 { + index = -1; + return; + } + + t := time.now(); + r := rand.create(transmute(u64)t); + i := rand.uint32(&r); + + index = candidates[i % count]; + return; +} + + +select_try_recv :: proc(channels: ..^Raw_Channel) -> (index: int) { + switch len(channels) { + case 0: + index = -1; + return; + case 1: + index = -1; + if raw_channel_can_recv(channels[0]) { + index = 0; + } + return; + } + + assert(len(channels) <= MAX_SELECT_CHANNELS); + candidates: [MAX_SELECT_CHANNELS]int; + + count := u32(0); + for c, i in channels { + if raw_channel_can_recv(c) { + candidates[count] = i; + count += 1; + } + } + + if count == 0 { + index = -1; + return; + } + + t := time.now(); + r := rand.create(transmute(u64)t); + i := rand.uint32(&r); + + index = candidates[i % count]; + return; +} + + +select_try_send :: proc(channels: ..^Raw_Channel) -> (index: int) #no_bounds_check { + switch len(channels) { + case 0: + return -1; + case 1: + if raw_channel_can_send(channels[0]) { + return 0; + } + return -1; + } + + assert(len(channels) <= MAX_SELECT_CHANNELS); + candidates: [MAX_SELECT_CHANNELS]int; + + count := u32(0); + for c, i in channels { + if raw_channel_can_send(c) { + candidates[count] = i; + count += 1; + } + } + + if count == 0 { + index = -1; + return; + } + + t := time.now(); + r := rand.create(transmute(u64)t); + i := rand.uint32(&r); + + index = candidates[i % count]; + return; +} + +select_try_recv_msg :: proc(channels: ..$C/Channel($T)) -> (msg: T, index: int) { + switch len(channels) { + case 0: + index = 0; + return; + case 1: + if c := channels[0]; channel_can_recv(c) { + index = 0; + msg = channel_recv(c); + return; + } + return; + } + + assert(len(channels) <= MAX_SELECT_CHANNELS); + candidates: [MAX_SELECT_CHANNELS]int; + + count := u32(0); + for c, i in channels { + if channel_can_recv(c) { + candidates[count] = i; + count += 1; + } + } + + if count == 0 { + index = -1; + return; + } + + t := time.now(); + r := rand.create(transmute(u64)t); + i := rand.uint32(&r); + + index = candidates[i % count]; + msg = channel_recv(channels[index]); + return; +} + +select_try_send_msg :: proc(msg: $T, channels: ..$C/Channel(T)) -> (index: int) { + switch len(channels) { + case 0: + index = 0; + return; + case 1: + if c := channels[0]; channel_can_send(c) { + index = 0; + channel_send(c, msg); + return; + } + return; + } + + + assert(len(channels) <= MAX_SELECT_CHANNELS); + candidates: [MAX_SELECT_CHANNELS]int; + + count := u32(0); + for c, i in channels { + if raw_channel_can_send(c) { + candidates[count] = i; + count += 1; + } + } + + if count == 0 { + index = -1; + return; + } + + t := time.now(); + r := rand.create(transmute(u64)t); + i := rand.uint32(&r); + + index = candidates[i % count]; + channel_send(channels[index], msg); + return; +} + diff --git a/core/sync/sync_unix.odin b/core/sync/sync_unix.odin index 0a9226e0f..a74ad61ea 100644 --- a/core/sync/sync_unix.odin +++ b/core/sync/sync_unix.odin @@ -241,3 +241,8 @@ condition_wait_for_timeout :: proc(c: ^Condition, duration: time.Duration) -> bo return false; } + + +thread_yield :: proc() { + unix.sched_yield(); +} diff --git a/core/sync/sync_windows.odin b/core/sync/sync_windows.odin index 0fc8f3298..095f4a3d3 100644 --- a/core/sync/sync_windows.odin +++ b/core/sync/sync_windows.odin @@ -61,7 +61,7 @@ Blocking_Mutex :: struct { blocking_mutex_init :: proc(m: ^Blocking_Mutex) { - m^ = Blocking_Mutex{}; + win32.InitializeSRWLock(&m._handle); } blocking_mutex_destroy :: proc(m: ^Blocking_Mutex) { @@ -127,7 +127,7 @@ condition_wait_for :: proc(c: ^Condition) -> bool { return false; } condition_wait_for_timeout :: proc(c: ^Condition, duration: time.Duration) -> bool { - ms := win32.DWORD((time.duration_nanoseconds(duration) + 999999)/1000000); + ms := win32.DWORD((max(time.duration_nanoseconds(duration), 0) + 999999)/1000000); switch m in &c.mutex { case ^Mutex: return cast(bool)win32.SleepConditionVariableCS(&c._handle, &m._critical_section, ms); @@ -168,3 +168,9 @@ rw_lock_read_unlock :: proc(l: ^RW_Lock) { rw_lock_write_unlock :: proc(l: ^RW_Lock) { win32.ReleaseSRWLockExclusive(&l._handle); } + + +thread_yield :: proc() { + win32.SwitchToThread(); +} + diff --git a/core/sys/windows/types.odin b/core/sys/windows/types.odin index e9f2b495a..005820e04 100644 --- a/core/sys/windows/types.odin +++ b/core/sys/windows/types.odin @@ -46,6 +46,7 @@ LPOVERLAPPED :: ^OVERLAPPED; LPPROCESS_INFORMATION :: ^PROCESS_INFORMATION; LPSECURITY_ATTRIBUTES :: ^SECURITY_ATTRIBUTES; LPSTARTUPINFO :: ^STARTUPINFO; +PVOID :: rawptr; LPVOID :: rawptr; LPWCH :: ^WCHAR; LPWIN32_FIND_DATAW :: ^WIN32_FIND_DATAW; diff --git a/core/thread/thread.odin b/core/thread/thread.odin index f2e2e0e57..85e0cc3fe 100644 --- a/core/thread/thread.odin +++ b/core/thread/thread.odin @@ -31,7 +31,7 @@ run :: proc(fn: proc(), init_context: Maybe(runtime.Context) = nil, priority := } -run_with_data :: proc(fn: proc(data: rawptr), data: rawptr, init_context: Maybe(runtime.Context) = nil, priority := Thread_Priority.Normal) { +run_with_data :: proc(data: rawptr, fn: proc(data: rawptr), init_context: Maybe(runtime.Context) = nil, priority := Thread_Priority.Normal) { thread_proc :: proc(t: ^Thread) { fn := cast(proc(rawptr))t.data; data := rawptr(uintptr(t.user_index));