diff --git a/core/sync/sync2/thread.odin b/core/sync/sync2/thread.odin new file mode 100644 index 000000000..a20f1bd7f --- /dev/null +++ b/core/sync/sync2/thread.odin @@ -0,0 +1,193 @@ +package sync2 + +import "core:runtime" +import "core:sync" +import "core:mem" +import "intrinsics" + +_ :: intrinsics; + +Thread_Proc :: #type proc(^Thread); + +MAX_USER_ARGUMENTS :: 8; + +Thread :: struct { + using specific: Thread_Os_Specific, + procedure: Thread_Proc, + data: rawptr, + user_index: int, + user_args: [MAX_USER_ARGUMENTS]rawptr, + + init_context: Maybe(runtime.Context), + + + creation_allocator: mem.Allocator, +} + +#assert(size_of(Thread{}.user_index) == size_of(uintptr)); + +Thread_Priority :: enum { + Normal, + Low, + High, +} + +thread_create :: proc(procedure: Thread_Proc, priority := Thread_Priority.Normal) -> ^Thread { + return _thread_create(procedure, priority); +} +thread_destroy :: proc(thread: ^Thread) { + _thread_destroy(thread); +} + +thread_start :: proc(thread: ^Thread) { + _thread_start(thread); +} + +thread_is_done :: proc(thread: ^Thread) -> bool { + return _thread_is_done(thread); +} + + +thread_join :: proc(thread: ^Thread) { + _thread_join(thread); +} + + +thread_join_mulitple :: proc(threads: ..^Thread) { + _thread_join_multiple(..threads); +} + +thread_terminate :: proc(thread: ^Thread, exit_code: int) { + _thread_terminate(thread, exit_code); +} + +thread_yield :: proc() { + _thread_yield(); +} + + + +thread_run :: proc(fn: proc(), init_context: Maybe(runtime.Context) = nil, priority := Thread_Priority.Normal) { + thread_proc :: proc(t: ^Thread) { + fn := cast(proc())t.data; + fn(); + thread_destroy(t); + } + t := thread_create(thread_proc, priority); + t.data = rawptr(fn); + t.init_context = init_context; + thread_start(t); +} + +thread_run_with_data :: proc(data: rawptr, fn: proc(data: rawptr), init_context: Maybe(runtime.Context) = nil, priority := Thread_Priority.Normal) { + thread_proc :: proc(t: ^Thread) { + fn := cast(proc(rawptr))t.data; + assert(t.user_index >= 1); + data := t.user_args[0]; + fn(data); + thread_destroy(t); + } + t := thread_create(thread_proc, priority); + t.data = rawptr(fn); + t.user_index = 1; + t.user_args = data; + t.init_context = init_context; + thread_start(t); +} + +thread_run_with_poly_data :: proc(data: $T, fn: proc(data: T), init_context: Maybe(runtime.Context) = nil, priority := Thread_Priority.Normal) + where size_of(T) <= size_of(rawptr) { + thread_proc :: proc(t: ^Thread) { + fn := cast(proc(T))t.data; + assert(t.user_index >= 1); + data := (^T)(&t.user_args[0])^; + fn(data); + thread_destroy(t); + } + t := thread_create(thread_proc, priority); + t.data = rawptr(fn); + t.user_index = 1; + data := data; + mem.copy(&t.user_args[0], &data, size_of(data)); + t.init_context = init_context; + thread_start(t); +} + +thread_run_with_poly_data2 :: proc(arg1: $T1, arg2: $T2, fn: proc(T1, T2), init_context: Maybe(runtime.Context) = nil, priority := Thread_Priority.Normal) + where size_of(T1) <= size_of(rawptr), + size_of(T2) <= size_of(rawptr) { + thread_proc :: proc(t: ^Thread) { + fn := cast(proc(T1, T2))t.data; + assert(t.user_index >= 2); + arg1 := (^T1)(&t.user_args[0])^; + arg2 := (^T2)(&t.user_args[1])^; + fn(arg1, arg2); + thread_destroy(t); + } + t := thread_create(thread_proc, priority); + t.data = rawptr(fn); + t.user_index = 2; + arg1, arg2 := arg1, arg2; + mem.copy(&t.user_args[0], &arg1, size_of(arg1)); + mem.copy(&t.user_args[1], &arg2, size_of(arg2)); + t.init_context = init_context; + thread_start(t); +} + +thread_run_with_poly_data3 :: proc(arg1: $T1, arg2: $T2, arg3: $T3, fn: proc(arg1: T1, arg2: T2, arg3: T3), init_context: Maybe(runtime.Context) = nil, priority := Thread_Priority.Normal) + where size_of(T1) <= size_of(rawptr), + size_of(T2) <= size_of(rawptr), + size_of(T3) <= size_of(rawptr) { + thread_proc :: proc(t: ^Thread) { + fn := cast(proc(T1, T2, T3))t.data; + assert(t.user_index >= 3); + arg1 := (^T1)(&t.user_args[0])^; + arg2 := (^T2)(&t.user_args[1])^; + arg3 := (^T3)(&t.user_args[2])^; + fn(arg1, arg2, arg3); + thread_destroy(t); + } + t := thread_create(thread_proc, priority); + t.data = rawptr(fn); + t.user_index = 3; + arg1, arg2, arg3 := arg1, arg2, arg3; + mem.copy(&t.user_args[0], &arg1, size_of(arg1)); + mem.copy(&t.user_args[1], &arg2, size_of(arg2)); + mem.copy(&t.user_args[2], &arg3, size_of(arg3)); + t.init_context = init_context; + thread_start(t); +} +thread_run_with_poly_data4 :: proc(arg1: $T1, arg2: $T2, arg3: $T3, arg4: $T4, fn: proc(arg1: T1, arg2: T2, arg3: T3, arg4: T4), init_context: Maybe(runtime.Context) = nil, priority := Thread_Priority.Normal) + where size_of(T1) <= size_of(rawptr), + size_of(T2) <= size_of(rawptr), + size_of(T3) <= size_of(rawptr) { + thread_proc :: proc(t: ^Thread) { + fn := cast(proc(T1, T2, T3, T4))t.data; + assert(t.user_index >= 4); + arg1 := (^T1)(&t.user_args[0])^; + arg2 := (^T2)(&t.user_args[1])^; + arg3 := (^T3)(&t.user_args[2])^; + arg4 := (^T4)(&t.user_args[3])^; + fn(arg1, arg2, arg3, arg4); + thread_destroy(t); + } + t := thread_create(thread_proc, priority); + t.data = rawptr(fn); + t.user_index = 4; + arg1, arg2, arg3, arg4 := arg1, arg2, arg3, arg4; + mem.copy(&t.user_args[0], &arg1, size_of(arg1)); + mem.copy(&t.user_args[1], &arg2, size_of(arg2)); + mem.copy(&t.user_args[2], &arg3, size_of(arg3)); + mem.copy(&t.user_args[3], &arg4, size_of(arg4)); + t.init_context = init_context; + thread_start(t); +} + + + +thread_create_and_start :: proc(fn: Thread_Proc, init_context: Maybe(runtime.Context) = nil, priority := Thread_Priority.Normal) -> ^Thread { + t := thread_create(fn, priority); + t.init_context = init_context; + thread_start(t); + return t; +} diff --git a/core/sync/sync2/thread_unix.odin b/core/sync/sync2/thread_unix.odin new file mode 100644 index 000000000..d56734ed9 --- /dev/null +++ b/core/sync/sync2/thread_unix.odin @@ -0,0 +1,175 @@ +// +build linux, darwin, freebsd +// +private +package sync2 + +import "core:runtime" +import "core:intrinsics" +import "core:sys/unix" + +// NOTE(tetra): Aligned here because of core/unix/pthread_linux.odin/pthread_t. +// Also see core/sys/darwin/mach_darwin.odin/semaphore_t. +Thread_Os_Specific :: struct #align 16 { + unix_thread: unix.pthread_t, // NOTE: very large on Darwin, small on Linux. + + // NOTE: pthread has a proc to query this, but it is marked + // as non-portable ("np") so we do this instead. + done: bool, + + // since libpthread doesn't seem to have a way to create a thread + // in a suspended state, we have it wait on this gate, which we + // signal to start it. + // destroyed after thread is started. + start_gate: Cond, + start_mutex: Mutex, + + // if true, the thread has been started and the start_gate has been destroyed. + started: bool, + + // NOTE: with pthreads, it is undefined behavior for multiple threads + // to call join on the same thread at the same time. + // this value is atomically updated to detect this. + // See the comment in `join`. + already_joined: bool, +} +// +// Creates a thread which will run the given procedure. +// It then waits for `start` to be called. +// +_thread_create :: proc(procedure: Thread_Proc, priority := Thread_Priority.Normal) -> ^Thread { + __linux_thread_entry_proc :: proc "c" (t: rawptr) -> rawptr { + context = runtime.default_context(); + + t := (^Thread)(t); + cond_wait(&t.start_gate, &t.start_mutex); + t.start_gate = {}; + t.start_mutex = {}; + + c := context; + if ic, ok := t.init_context.?; ok { + c = ic; + } + context = c; + + t.procedure(t); + + if t.init_context == nil { + if context.temp_allocator.data == &runtime.global_default_temp_allocator_data { + runtime.default_temp_allocator_destroy(auto_cast context.temp_allocator.data); + } + } + + atomic_store(&t.done, true, .Sequentially_Consistent); + return nil; + } + + attrs: unix.pthread_attr_t; + if unix.pthread_attr_init(&attrs) != 0 { + return nil; // NOTE(tetra, 2019-11-01): POSIX OOM. + } + defer unix.pthread_attr_destroy(&attrs); + + // NOTE(tetra, 2019-11-01): These only fail if their argument is invalid. + assert(unix.pthread_attr_setdetachstate(&attrs, unix.PTHREAD_CREATE_JOINABLE) == 0); + assert(unix.pthread_attr_setinheritsched(&attrs, unix.PTHREAD_EXPLICIT_SCHED) == 0); + + thread := new(Thread); + if thread == nil { + return nil; + } + thread.creation_allocator = context.allocator; + + // Set thread priority. + policy: i32; + res := unix.pthread_attr_getschedpolicy(&attrs, &policy); + assert(res == 0); + params: unix.sched_param; + res = unix.pthread_attr_getschedparam(&attrs, ¶ms); + assert(res == 0); + low := unix.sched_get_priority_min(policy); + high := unix.sched_get_priority_max(policy); + switch priority { + case .Normal: // Okay + case .Low: params.sched_priority = low + 1; + case .High: params.sched_priority = high; + } + res = unix.pthread_attr_setschedparam(&attrs, ¶ms); + assert(res == 0); + + if unix.pthread_create(&thread.unix_thread, &attrs, __linux_thread_entry_proc, thread) != 0 { + free(thread, thread.creation_allocator); + return nil; + } + thread.procedure = procedure; + + return thread; +} + +_thread_start :: proc(t: ^Thread) { + if intrinsics.atomic_xchg(&t.started, true) { + return; + } + cond_signal(&t.start_gate); +} + +_thread_is_done :: proc(t: ^Thread) -> bool { + return atomic_load(&t.done, .Sequentially_Consistent); +} + +_thread_join :: proc(t: ^Thread) { + if unix.pthread_equal(unix.pthread_self(), t.unix_thread) { + return; + } + // if unix.pthread_self().x == t.unix_thread.x do return; + + // NOTE(tetra): It's apparently UB for multiple threads to join the same thread + // at the same time. + // If someone else already did, spin until the thread dies. + // See note on `already_joined` field. + // TODO(tetra): I'm not sure if we should do this, or panic, since I'm not + // sure it makes sense to need to join from multiple threads? + if intrinsics.atomic_xchg(&t.already_joined, true) { + for { + if intrinsics.atomic_load(&t.done) { + return; + } + intrinsics.cpu_relax(); + } + } + + // NOTE(tetra): If we're already dead, don't bother calling to pthread_join as that + // will just return 3 (ESRCH). + // We do this instead because I don't know if there is a danger + // that you may join a different thread from the one you called join on, + // if the thread handle is reused. + if intrinsics.atomic_load(&t.done) { + return; + } + + ret_val: rawptr; + _ = unix.pthread_join(t.unix_thread, &ret_val); + if !intrinsics.atomic_load(&t.done) { + panic("thread not done after join"); + } +} + +_thread_join_multiple :: proc(threads: ..^Thread) { + for t in threads { + _thread_join(t); + } +} + + +_thread_destroy :: proc(t: ^Thread) { + _thread_join(t); + t.unix_thread = {}; + free(t, t.creation_allocator); +} + + +_thread_terminate :: proc(t: ^Thread, exit_code: int) { + // TODO(bill) +} + +_thread_yield :: proc() { + unix.sched_yield(); +} diff --git a/core/sync/sync2/thread_windows.odin b/core/sync/sync2/thread_windows.odin new file mode 100644 index 000000000..6aa2fddd2 --- /dev/null +++ b/core/sync/sync2/thread_windows.odin @@ -0,0 +1,123 @@ +//+build windows +//+private +package sync2 + +import "core:runtime" +import "core:sync" +import win32 "core:sys/windows" + +Thread_Os_Specific :: struct { + win32_thread: win32.HANDLE, + win32_thread_id: win32.DWORD, + done: bool, // see note in `is_done` +} + +_thread_priority_map := [Thread_Priority]i32{ + .Normal = 0, + .Low = -2, + .High = +2, +}; + +_thread_create :: proc(procedure: Thread_Proc, priority := Thread_Priority.Normal) -> ^Thread { + win32_thread_id: win32.DWORD; + + __windows_thread_entry_proc :: proc "stdcall" (t_: rawptr) -> win32.DWORD { + t := (^Thread)(t_); + context = runtime.default_context(); + c := context; + if ic, ok := t.init_context.?; ok { + c = ic; + } + context = c; + + t.procedure(t); + + if t.init_context == nil { + if context.temp_allocator.data == &runtime.global_default_temp_allocator_data { + runtime.default_temp_allocator_destroy(auto_cast context.temp_allocator.data); + } + } + + sync.atomic_store(&t.done, true, .Sequentially_Consistent); + return 0; + } + + + thread := new(Thread); + if thread == nil { + return nil; + } + thread.creation_allocator = context.allocator; + + win32_thread := win32.CreateThread(nil, 0, __windows_thread_entry_proc, thread, win32.CREATE_SUSPENDED, &win32_thread_id); + if win32_thread == nil { + free(thread, thread.creation_allocator); + return nil; + } + thread.procedure = procedure; + thread.win32_thread = win32_thread; + thread.win32_thread_id = win32_thread_id; + thread.init_context = context; + + ok := win32.SetThreadPriority(win32_thread, _thread_priority_map[priority]); + assert(ok == true); + + return thread; +} + +_thread_start :: proc(thread: ^Thread) { + win32.ResumeThread(thread.win32_thread); +} + +_thread_is_done :: proc(using thread: ^Thread) -> bool { + // NOTE(tetra, 2019-10-31): Apparently using wait_for_single_object and + // checking if it didn't time out immediately, is not good enough, + // so we do it this way instead. + return sync.atomic_load(&done, .Sequentially_Consistent); +} + +_thread_join :: proc(using thread: ^Thread) { + if win32_thread != win32.INVALID_HANDLE { + win32.WaitForSingleObject(win32_thread, win32.INFINITE); + win32.CloseHandle(win32_thread); + win32_thread = win32.INVALID_HANDLE; + } +} + +_thread_join_multiple :: proc(threads: ..^Thread) { + MAXIMUM_WAIT_OBJECTS :: 64; + + handles: [MAXIMUM_WAIT_OBJECTS]win32.HANDLE; + + for k := 0; k < len(threads); k += MAXIMUM_WAIT_OBJECTS { + count := min(len(threads) - k, MAXIMUM_WAIT_OBJECTS); + j := 0; + for i in 0..