mirror of
https://github.com/odin-lang/Odin.git
synced 2025-12-30 18:02:02 +00:00
Add Thread stuff to new sync package
This commit is contained in:
193
core/sync/sync2/thread.odin
Normal file
193
core/sync/sync2/thread.odin
Normal file
@@ -0,0 +1,193 @@
|
||||
package sync2
|
||||
|
||||
import "core:runtime"
|
||||
import "core:sync"
|
||||
import "core:mem"
|
||||
import "intrinsics"
|
||||
|
||||
_ :: intrinsics;
|
||||
|
||||
Thread_Proc :: #type proc(^Thread);
|
||||
|
||||
MAX_USER_ARGUMENTS :: 8;
|
||||
|
||||
Thread :: struct {
|
||||
using specific: Thread_Os_Specific,
|
||||
procedure: Thread_Proc,
|
||||
data: rawptr,
|
||||
user_index: int,
|
||||
user_args: [MAX_USER_ARGUMENTS]rawptr,
|
||||
|
||||
init_context: Maybe(runtime.Context),
|
||||
|
||||
|
||||
creation_allocator: mem.Allocator,
|
||||
}
|
||||
|
||||
#assert(size_of(Thread{}.user_index) == size_of(uintptr));
|
||||
|
||||
Thread_Priority :: enum {
|
||||
Normal,
|
||||
Low,
|
||||
High,
|
||||
}
|
||||
|
||||
thread_create :: proc(procedure: Thread_Proc, priority := Thread_Priority.Normal) -> ^Thread {
|
||||
return _thread_create(procedure, priority);
|
||||
}
|
||||
thread_destroy :: proc(thread: ^Thread) {
|
||||
_thread_destroy(thread);
|
||||
}
|
||||
|
||||
thread_start :: proc(thread: ^Thread) {
|
||||
_thread_start(thread);
|
||||
}
|
||||
|
||||
thread_is_done :: proc(thread: ^Thread) -> bool {
|
||||
return _thread_is_done(thread);
|
||||
}
|
||||
|
||||
|
||||
thread_join :: proc(thread: ^Thread) {
|
||||
_thread_join(thread);
|
||||
}
|
||||
|
||||
|
||||
thread_join_mulitple :: proc(threads: ..^Thread) {
|
||||
_thread_join_multiple(..threads);
|
||||
}
|
||||
|
||||
thread_terminate :: proc(thread: ^Thread, exit_code: int) {
|
||||
_thread_terminate(thread, exit_code);
|
||||
}
|
||||
|
||||
thread_yield :: proc() {
|
||||
_thread_yield();
|
||||
}
|
||||
|
||||
|
||||
|
||||
thread_run :: proc(fn: proc(), init_context: Maybe(runtime.Context) = nil, priority := Thread_Priority.Normal) {
|
||||
thread_proc :: proc(t: ^Thread) {
|
||||
fn := cast(proc())t.data;
|
||||
fn();
|
||||
thread_destroy(t);
|
||||
}
|
||||
t := thread_create(thread_proc, priority);
|
||||
t.data = rawptr(fn);
|
||||
t.init_context = init_context;
|
||||
thread_start(t);
|
||||
}
|
||||
|
||||
thread_run_with_data :: proc(data: rawptr, fn: proc(data: rawptr), init_context: Maybe(runtime.Context) = nil, priority := Thread_Priority.Normal) {
|
||||
thread_proc :: proc(t: ^Thread) {
|
||||
fn := cast(proc(rawptr))t.data;
|
||||
assert(t.user_index >= 1);
|
||||
data := t.user_args[0];
|
||||
fn(data);
|
||||
thread_destroy(t);
|
||||
}
|
||||
t := thread_create(thread_proc, priority);
|
||||
t.data = rawptr(fn);
|
||||
t.user_index = 1;
|
||||
t.user_args = data;
|
||||
t.init_context = init_context;
|
||||
thread_start(t);
|
||||
}
|
||||
|
||||
thread_run_with_poly_data :: proc(data: $T, fn: proc(data: T), init_context: Maybe(runtime.Context) = nil, priority := Thread_Priority.Normal)
|
||||
where size_of(T) <= size_of(rawptr) {
|
||||
thread_proc :: proc(t: ^Thread) {
|
||||
fn := cast(proc(T))t.data;
|
||||
assert(t.user_index >= 1);
|
||||
data := (^T)(&t.user_args[0])^;
|
||||
fn(data);
|
||||
thread_destroy(t);
|
||||
}
|
||||
t := thread_create(thread_proc, priority);
|
||||
t.data = rawptr(fn);
|
||||
t.user_index = 1;
|
||||
data := data;
|
||||
mem.copy(&t.user_args[0], &data, size_of(data));
|
||||
t.init_context = init_context;
|
||||
thread_start(t);
|
||||
}
|
||||
|
||||
thread_run_with_poly_data2 :: proc(arg1: $T1, arg2: $T2, fn: proc(T1, T2), init_context: Maybe(runtime.Context) = nil, priority := Thread_Priority.Normal)
|
||||
where size_of(T1) <= size_of(rawptr),
|
||||
size_of(T2) <= size_of(rawptr) {
|
||||
thread_proc :: proc(t: ^Thread) {
|
||||
fn := cast(proc(T1, T2))t.data;
|
||||
assert(t.user_index >= 2);
|
||||
arg1 := (^T1)(&t.user_args[0])^;
|
||||
arg2 := (^T2)(&t.user_args[1])^;
|
||||
fn(arg1, arg2);
|
||||
thread_destroy(t);
|
||||
}
|
||||
t := thread_create(thread_proc, priority);
|
||||
t.data = rawptr(fn);
|
||||
t.user_index = 2;
|
||||
arg1, arg2 := arg1, arg2;
|
||||
mem.copy(&t.user_args[0], &arg1, size_of(arg1));
|
||||
mem.copy(&t.user_args[1], &arg2, size_of(arg2));
|
||||
t.init_context = init_context;
|
||||
thread_start(t);
|
||||
}
|
||||
|
||||
thread_run_with_poly_data3 :: proc(arg1: $T1, arg2: $T2, arg3: $T3, fn: proc(arg1: T1, arg2: T2, arg3: T3), init_context: Maybe(runtime.Context) = nil, priority := Thread_Priority.Normal)
|
||||
where size_of(T1) <= size_of(rawptr),
|
||||
size_of(T2) <= size_of(rawptr),
|
||||
size_of(T3) <= size_of(rawptr) {
|
||||
thread_proc :: proc(t: ^Thread) {
|
||||
fn := cast(proc(T1, T2, T3))t.data;
|
||||
assert(t.user_index >= 3);
|
||||
arg1 := (^T1)(&t.user_args[0])^;
|
||||
arg2 := (^T2)(&t.user_args[1])^;
|
||||
arg3 := (^T3)(&t.user_args[2])^;
|
||||
fn(arg1, arg2, arg3);
|
||||
thread_destroy(t);
|
||||
}
|
||||
t := thread_create(thread_proc, priority);
|
||||
t.data = rawptr(fn);
|
||||
t.user_index = 3;
|
||||
arg1, arg2, arg3 := arg1, arg2, arg3;
|
||||
mem.copy(&t.user_args[0], &arg1, size_of(arg1));
|
||||
mem.copy(&t.user_args[1], &arg2, size_of(arg2));
|
||||
mem.copy(&t.user_args[2], &arg3, size_of(arg3));
|
||||
t.init_context = init_context;
|
||||
thread_start(t);
|
||||
}
|
||||
thread_run_with_poly_data4 :: proc(arg1: $T1, arg2: $T2, arg3: $T3, arg4: $T4, fn: proc(arg1: T1, arg2: T2, arg3: T3, arg4: T4), init_context: Maybe(runtime.Context) = nil, priority := Thread_Priority.Normal)
|
||||
where size_of(T1) <= size_of(rawptr),
|
||||
size_of(T2) <= size_of(rawptr),
|
||||
size_of(T3) <= size_of(rawptr) {
|
||||
thread_proc :: proc(t: ^Thread) {
|
||||
fn := cast(proc(T1, T2, T3, T4))t.data;
|
||||
assert(t.user_index >= 4);
|
||||
arg1 := (^T1)(&t.user_args[0])^;
|
||||
arg2 := (^T2)(&t.user_args[1])^;
|
||||
arg3 := (^T3)(&t.user_args[2])^;
|
||||
arg4 := (^T4)(&t.user_args[3])^;
|
||||
fn(arg1, arg2, arg3, arg4);
|
||||
thread_destroy(t);
|
||||
}
|
||||
t := thread_create(thread_proc, priority);
|
||||
t.data = rawptr(fn);
|
||||
t.user_index = 4;
|
||||
arg1, arg2, arg3, arg4 := arg1, arg2, arg3, arg4;
|
||||
mem.copy(&t.user_args[0], &arg1, size_of(arg1));
|
||||
mem.copy(&t.user_args[1], &arg2, size_of(arg2));
|
||||
mem.copy(&t.user_args[2], &arg3, size_of(arg3));
|
||||
mem.copy(&t.user_args[3], &arg4, size_of(arg4));
|
||||
t.init_context = init_context;
|
||||
thread_start(t);
|
||||
}
|
||||
|
||||
|
||||
|
||||
thread_create_and_start :: proc(fn: Thread_Proc, init_context: Maybe(runtime.Context) = nil, priority := Thread_Priority.Normal) -> ^Thread {
|
||||
t := thread_create(fn, priority);
|
||||
t.init_context = init_context;
|
||||
thread_start(t);
|
||||
return t;
|
||||
}
|
||||
175
core/sync/sync2/thread_unix.odin
Normal file
175
core/sync/sync2/thread_unix.odin
Normal file
@@ -0,0 +1,175 @@
|
||||
// +build linux, darwin, freebsd
|
||||
// +private
|
||||
package sync2
|
||||
|
||||
import "core:runtime"
|
||||
import "core:intrinsics"
|
||||
import "core:sys/unix"
|
||||
|
||||
// NOTE(tetra): Aligned here because of core/unix/pthread_linux.odin/pthread_t.
|
||||
// Also see core/sys/darwin/mach_darwin.odin/semaphore_t.
|
||||
Thread_Os_Specific :: struct #align 16 {
|
||||
unix_thread: unix.pthread_t, // NOTE: very large on Darwin, small on Linux.
|
||||
|
||||
// NOTE: pthread has a proc to query this, but it is marked
|
||||
// as non-portable ("np") so we do this instead.
|
||||
done: bool,
|
||||
|
||||
// since libpthread doesn't seem to have a way to create a thread
|
||||
// in a suspended state, we have it wait on this gate, which we
|
||||
// signal to start it.
|
||||
// destroyed after thread is started.
|
||||
start_gate: Cond,
|
||||
start_mutex: Mutex,
|
||||
|
||||
// if true, the thread has been started and the start_gate has been destroyed.
|
||||
started: bool,
|
||||
|
||||
// NOTE: with pthreads, it is undefined behavior for multiple threads
|
||||
// to call join on the same thread at the same time.
|
||||
// this value is atomically updated to detect this.
|
||||
// See the comment in `join`.
|
||||
already_joined: bool,
|
||||
}
|
||||
//
|
||||
// Creates a thread which will run the given procedure.
|
||||
// It then waits for `start` to be called.
|
||||
//
|
||||
_thread_create :: proc(procedure: Thread_Proc, priority := Thread_Priority.Normal) -> ^Thread {
|
||||
__linux_thread_entry_proc :: proc "c" (t: rawptr) -> rawptr {
|
||||
context = runtime.default_context();
|
||||
|
||||
t := (^Thread)(t);
|
||||
cond_wait(&t.start_gate, &t.start_mutex);
|
||||
t.start_gate = {};
|
||||
t.start_mutex = {};
|
||||
|
||||
c := context;
|
||||
if ic, ok := t.init_context.?; ok {
|
||||
c = ic;
|
||||
}
|
||||
context = c;
|
||||
|
||||
t.procedure(t);
|
||||
|
||||
if t.init_context == nil {
|
||||
if context.temp_allocator.data == &runtime.global_default_temp_allocator_data {
|
||||
runtime.default_temp_allocator_destroy(auto_cast context.temp_allocator.data);
|
||||
}
|
||||
}
|
||||
|
||||
atomic_store(&t.done, true, .Sequentially_Consistent);
|
||||
return nil;
|
||||
}
|
||||
|
||||
attrs: unix.pthread_attr_t;
|
||||
if unix.pthread_attr_init(&attrs) != 0 {
|
||||
return nil; // NOTE(tetra, 2019-11-01): POSIX OOM.
|
||||
}
|
||||
defer unix.pthread_attr_destroy(&attrs);
|
||||
|
||||
// NOTE(tetra, 2019-11-01): These only fail if their argument is invalid.
|
||||
assert(unix.pthread_attr_setdetachstate(&attrs, unix.PTHREAD_CREATE_JOINABLE) == 0);
|
||||
assert(unix.pthread_attr_setinheritsched(&attrs, unix.PTHREAD_EXPLICIT_SCHED) == 0);
|
||||
|
||||
thread := new(Thread);
|
||||
if thread == nil {
|
||||
return nil;
|
||||
}
|
||||
thread.creation_allocator = context.allocator;
|
||||
|
||||
// Set thread priority.
|
||||
policy: i32;
|
||||
res := unix.pthread_attr_getschedpolicy(&attrs, &policy);
|
||||
assert(res == 0);
|
||||
params: unix.sched_param;
|
||||
res = unix.pthread_attr_getschedparam(&attrs, ¶ms);
|
||||
assert(res == 0);
|
||||
low := unix.sched_get_priority_min(policy);
|
||||
high := unix.sched_get_priority_max(policy);
|
||||
switch priority {
|
||||
case .Normal: // Okay
|
||||
case .Low: params.sched_priority = low + 1;
|
||||
case .High: params.sched_priority = high;
|
||||
}
|
||||
res = unix.pthread_attr_setschedparam(&attrs, ¶ms);
|
||||
assert(res == 0);
|
||||
|
||||
if unix.pthread_create(&thread.unix_thread, &attrs, __linux_thread_entry_proc, thread) != 0 {
|
||||
free(thread, thread.creation_allocator);
|
||||
return nil;
|
||||
}
|
||||
thread.procedure = procedure;
|
||||
|
||||
return thread;
|
||||
}
|
||||
|
||||
_thread_start :: proc(t: ^Thread) {
|
||||
if intrinsics.atomic_xchg(&t.started, true) {
|
||||
return;
|
||||
}
|
||||
cond_signal(&t.start_gate);
|
||||
}
|
||||
|
||||
_thread_is_done :: proc(t: ^Thread) -> bool {
|
||||
return atomic_load(&t.done, .Sequentially_Consistent);
|
||||
}
|
||||
|
||||
_thread_join :: proc(t: ^Thread) {
|
||||
if unix.pthread_equal(unix.pthread_self(), t.unix_thread) {
|
||||
return;
|
||||
}
|
||||
// if unix.pthread_self().x == t.unix_thread.x do return;
|
||||
|
||||
// NOTE(tetra): It's apparently UB for multiple threads to join the same thread
|
||||
// at the same time.
|
||||
// If someone else already did, spin until the thread dies.
|
||||
// See note on `already_joined` field.
|
||||
// TODO(tetra): I'm not sure if we should do this, or panic, since I'm not
|
||||
// sure it makes sense to need to join from multiple threads?
|
||||
if intrinsics.atomic_xchg(&t.already_joined, true) {
|
||||
for {
|
||||
if intrinsics.atomic_load(&t.done) {
|
||||
return;
|
||||
}
|
||||
intrinsics.cpu_relax();
|
||||
}
|
||||
}
|
||||
|
||||
// NOTE(tetra): If we're already dead, don't bother calling to pthread_join as that
|
||||
// will just return 3 (ESRCH).
|
||||
// We do this instead because I don't know if there is a danger
|
||||
// that you may join a different thread from the one you called join on,
|
||||
// if the thread handle is reused.
|
||||
if intrinsics.atomic_load(&t.done) {
|
||||
return;
|
||||
}
|
||||
|
||||
ret_val: rawptr;
|
||||
_ = unix.pthread_join(t.unix_thread, &ret_val);
|
||||
if !intrinsics.atomic_load(&t.done) {
|
||||
panic("thread not done after join");
|
||||
}
|
||||
}
|
||||
|
||||
_thread_join_multiple :: proc(threads: ..^Thread) {
|
||||
for t in threads {
|
||||
_thread_join(t);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
_thread_destroy :: proc(t: ^Thread) {
|
||||
_thread_join(t);
|
||||
t.unix_thread = {};
|
||||
free(t, t.creation_allocator);
|
||||
}
|
||||
|
||||
|
||||
_thread_terminate :: proc(t: ^Thread, exit_code: int) {
|
||||
// TODO(bill)
|
||||
}
|
||||
|
||||
_thread_yield :: proc() {
|
||||
unix.sched_yield();
|
||||
}
|
||||
123
core/sync/sync2/thread_windows.odin
Normal file
123
core/sync/sync2/thread_windows.odin
Normal file
@@ -0,0 +1,123 @@
|
||||
//+build windows
|
||||
//+private
|
||||
package sync2
|
||||
|
||||
import "core:runtime"
|
||||
import "core:sync"
|
||||
import win32 "core:sys/windows"
|
||||
|
||||
Thread_Os_Specific :: struct {
|
||||
win32_thread: win32.HANDLE,
|
||||
win32_thread_id: win32.DWORD,
|
||||
done: bool, // see note in `is_done`
|
||||
}
|
||||
|
||||
_thread_priority_map := [Thread_Priority]i32{
|
||||
.Normal = 0,
|
||||
.Low = -2,
|
||||
.High = +2,
|
||||
};
|
||||
|
||||
_thread_create :: proc(procedure: Thread_Proc, priority := Thread_Priority.Normal) -> ^Thread {
|
||||
win32_thread_id: win32.DWORD;
|
||||
|
||||
__windows_thread_entry_proc :: proc "stdcall" (t_: rawptr) -> win32.DWORD {
|
||||
t := (^Thread)(t_);
|
||||
context = runtime.default_context();
|
||||
c := context;
|
||||
if ic, ok := t.init_context.?; ok {
|
||||
c = ic;
|
||||
}
|
||||
context = c;
|
||||
|
||||
t.procedure(t);
|
||||
|
||||
if t.init_context == nil {
|
||||
if context.temp_allocator.data == &runtime.global_default_temp_allocator_data {
|
||||
runtime.default_temp_allocator_destroy(auto_cast context.temp_allocator.data);
|
||||
}
|
||||
}
|
||||
|
||||
sync.atomic_store(&t.done, true, .Sequentially_Consistent);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
thread := new(Thread);
|
||||
if thread == nil {
|
||||
return nil;
|
||||
}
|
||||
thread.creation_allocator = context.allocator;
|
||||
|
||||
win32_thread := win32.CreateThread(nil, 0, __windows_thread_entry_proc, thread, win32.CREATE_SUSPENDED, &win32_thread_id);
|
||||
if win32_thread == nil {
|
||||
free(thread, thread.creation_allocator);
|
||||
return nil;
|
||||
}
|
||||
thread.procedure = procedure;
|
||||
thread.win32_thread = win32_thread;
|
||||
thread.win32_thread_id = win32_thread_id;
|
||||
thread.init_context = context;
|
||||
|
||||
ok := win32.SetThreadPriority(win32_thread, _thread_priority_map[priority]);
|
||||
assert(ok == true);
|
||||
|
||||
return thread;
|
||||
}
|
||||
|
||||
_thread_start :: proc(thread: ^Thread) {
|
||||
win32.ResumeThread(thread.win32_thread);
|
||||
}
|
||||
|
||||
_thread_is_done :: proc(using thread: ^Thread) -> bool {
|
||||
// NOTE(tetra, 2019-10-31): Apparently using wait_for_single_object and
|
||||
// checking if it didn't time out immediately, is not good enough,
|
||||
// so we do it this way instead.
|
||||
return sync.atomic_load(&done, .Sequentially_Consistent);
|
||||
}
|
||||
|
||||
_thread_join :: proc(using thread: ^Thread) {
|
||||
if win32_thread != win32.INVALID_HANDLE {
|
||||
win32.WaitForSingleObject(win32_thread, win32.INFINITE);
|
||||
win32.CloseHandle(win32_thread);
|
||||
win32_thread = win32.INVALID_HANDLE;
|
||||
}
|
||||
}
|
||||
|
||||
_thread_join_multiple :: proc(threads: ..^Thread) {
|
||||
MAXIMUM_WAIT_OBJECTS :: 64;
|
||||
|
||||
handles: [MAXIMUM_WAIT_OBJECTS]win32.HANDLE;
|
||||
|
||||
for k := 0; k < len(threads); k += MAXIMUM_WAIT_OBJECTS {
|
||||
count := min(len(threads) - k, MAXIMUM_WAIT_OBJECTS);
|
||||
j := 0;
|
||||
for i in 0..<count {
|
||||
handle := threads[i+k].win32_thread;
|
||||
if handle != win32.INVALID_HANDLE {
|
||||
handles[j] = handle;
|
||||
j += 1;
|
||||
}
|
||||
}
|
||||
win32.WaitForMultipleObjects(u32(j), &handles[0], true, win32.INFINITE);
|
||||
}
|
||||
|
||||
for t in threads {
|
||||
win32.CloseHandle(t.win32_thread);
|
||||
t.win32_thread = win32.INVALID_HANDLE;
|
||||
}
|
||||
}
|
||||
|
||||
_thread_destroy :: proc(thread: ^Thread) {
|
||||
_thread_join(thread);
|
||||
free(thread, thread.creation_allocator);
|
||||
}
|
||||
|
||||
_thread_terminate :: proc(using thread : ^Thread, exit_code: int) {
|
||||
win32.TerminateThread(win32_thread, u32(exit_code));
|
||||
}
|
||||
|
||||
_thread_yield :: proc() {
|
||||
win32.SwitchToThread();
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user