This commit is contained in:
gingerBill
2022-05-12 11:33:08 +01:00
7 changed files with 101 additions and 10 deletions

View File

@@ -81,3 +81,16 @@ PTHREAD_MUTEX_NORMAL :: 0
PTHREAD_MUTEX_RECURSIVE :: 1
PTHREAD_MUTEX_ERRORCHECK :: 2
PTHREAD_CANCEL_ENABLE :: 0
PTHREAD_CANCEL_DISABLE :: 1
PTHREAD_CANCEL_DEFERRED :: 0
PTHREAD_CANCEL_ASYNCHRONOUS :: 1
foreign import pthread "System.framework"
@(default_calling_convention="c")
foreign pthread {
pthread_setcancelstate :: proc (state: c.int, old_state: ^c.int) -> c.int ---
pthread_setcanceltype :: proc (type: c.int, old_type: ^c.int) -> c.int ---
pthread_cancel :: proc (thread: pthread_t) -> c.int ---
}

View File

@@ -92,6 +92,11 @@ sem_t :: struct {
_padding: u32,
}
PTHREAD_CANCEL_ENABLE :: 0
PTHREAD_CANCEL_DISABLE :: 1
PTHREAD_CANCEL_DEFERRED :: 0
PTHREAD_CANCEL_ASYNCHRONOUS :: 1
foreign import "system:pthread"
@(default_calling_convention="c")
@@ -110,5 +115,8 @@ foreign pthread {
// NOTE: unclear whether pthread_yield is well-supported on Linux systems,
// see https://linux.die.net/man/3/pthread_yield
pthread_yield :: proc() ---
}
pthread_setcancelstate :: proc (state: c.int, old_state: ^c.int) -> c.int ---
pthread_setcanceltype :: proc (type: c.int, old_type: ^c.int) -> c.int ---
pthread_cancel :: proc (thread: pthread_t) -> c.int ---
}

View File

@@ -94,6 +94,11 @@ when size_of(int) == 8 {
SEM_T_SIZE :: 16
}
PTHREAD_CANCEL_ENABLE :: 0
PTHREAD_CANCEL_DISABLE :: 1
PTHREAD_CANCEL_DEFERRED :: 0
PTHREAD_CANCEL_ASYNCHRONOUS :: 1
foreign import "system:pthread"
@(default_calling_convention="c")
@@ -112,4 +117,8 @@ foreign pthread {
// NOTE: unclear whether pthread_yield is well-supported on Linux systems,
// see https://linux.die.net/man/3/pthread_yield
pthread_yield :: proc() -> c.int ---
pthread_setcancelstate :: proc (state: c.int, old_state: ^c.int) -> c.int ---
pthread_setcanceltype :: proc (type: c.int, old_type: ^c.int) -> c.int ---
pthread_cancel :: proc (thread: pthread_t) -> c.int ---
}

View File

@@ -46,6 +46,11 @@ sched_param :: struct {
sem_t :: distinct rawptr
PTHREAD_CANCEL_ENABLE :: 0
PTHREAD_CANCEL_DISABLE :: 1
PTHREAD_CANCEL_DEFERRED :: 0
PTHREAD_CANCEL_ASYNCHRONOUS :: 1
foreign import libc "system:c"
@(default_calling_convention="c")
@@ -62,4 +67,8 @@ foreign libc {
// NOTE: unclear whether pthread_yield is well-supported on Linux systems,
// see https://linux.die.net/man/3/pthread_yield
pthread_yield :: proc() ---
}
pthread_setcancelstate :: proc (state: c.int, old_state: ^c.int) -> c.int ---
pthread_setcanceltype :: proc (type: c.int, old_type: ^c.int) -> c.int ---
pthread_cancel :: proc (thread: pthread_t) -> c.int ---
}

View File

@@ -53,7 +53,7 @@ join :: proc(thread: ^Thread) {
}
join_mulitple :: proc(threads: ..^Thread) {
join_multiple :: proc(threads: ..^Thread) {
_join_multiple(..threads)
}

View File

@@ -7,6 +7,8 @@ import "core:intrinsics"
import "core:sync"
import "core:sys/unix"
CAS :: intrinsics.atomic_compare_exchange_strong
Thread_State :: enum u8 {
Started,
Joined,
@@ -29,6 +31,11 @@ _create :: proc(procedure: Thread_Proc, priority := Thread_Priority.Normal) -> ^
__linux_thread_entry_proc :: proc "c" (t: rawptr) -> rawptr {
t := (^Thread)(t)
when ODIN_OS != .Darwin {
// We need to give the thread a moment to start up before we enable cancellation.
can_set_thread_cancel_state := unix.pthread_setcancelstate(unix.PTHREAD_CANCEL_DISABLE, nil) == 0
}
context = runtime.default_context()
sync.lock(&t.mutex)
@@ -42,6 +49,14 @@ _create :: proc(procedure: Thread_Proc, priority := Thread_Priority.Normal) -> ^
init_context := t.init_context
context = init_context.? or_else runtime.default_context()
when ODIN_OS != .Darwin {
// Enable thread's cancelability.
if can_set_thread_cancel_state {
unix.pthread_setcanceltype (unix.PTHREAD_CANCEL_ASYNCHRONOUS, nil)
unix.pthread_setcancelstate(unix.PTHREAD_CANCEL_DISABLE, nil)
}
}
t.procedure(t)
intrinsics.atomic_store(&t.flags, t.flags + { .Done })
@@ -98,7 +113,7 @@ _create :: proc(procedure: Thread_Proc, priority := Thread_Priority.Normal) -> ^
}
_start :: proc(t: ^Thread) {
sync.guard(&t.mutex)
// sync.guard(&t.mutex)
t.flags += { .Started }
sync.signal(&t.cond)
}
@@ -108,15 +123,22 @@ _is_done :: proc(t: ^Thread) -> bool {
}
_join :: proc(t: ^Thread) {
sync.guard(&t.mutex)
// sync.guard(&t.mutex)
if .Joined in t.flags || unix.pthread_equal(unix.pthread_self(), t.unix_thread) {
if unix.pthread_equal(unix.pthread_self(), t.unix_thread) {
return
}
unix.pthread_join(t.unix_thread, nil)
// Preserve other flags besides `.Joined`, like `.Started`.
unjoined := intrinsics.atomic_load(&t.flags) - {.Joined}
joined := unjoined + {.Joined}
t.flags += { .Joined }
// Try to set `t.flags` from unjoined to joined. If it returns joined,
// it means the previous value had that flag set and we can return.
if res, ok := CAS(&t.flags, unjoined, joined); res == joined && !ok {
return
}
unix.pthread_join(t.unix_thread, nil)
}
_join_multiple :: proc(threads: ..^Thread) {
@@ -132,7 +154,10 @@ _destroy :: proc(t: ^Thread) {
}
_terminate :: proc(t: ^Thread, exit_code: int) {
// TODO(bill)
// `pthread_cancel` is unreliable on Darwin for unknown reasons.
when ODIN_OS != .Darwin {
unix.pthread_cancel(t.unix_thread)
}
}
_yield :: proc() {

View File

@@ -1110,9 +1110,16 @@ prefix_table := [?]string{
"Black",
}
print_mutex := b64(false)
threading_example :: proc() {
fmt.println("\n# threading_example")
did_acquire :: proc(m: ^b64) -> (acquired: bool) {
res, ok := intrinsics.atomic_compare_exchange_strong(m, false, true)
return ok && res == false
}
{ // Basic Threads
fmt.println("\n## Basic Threads")
worker_proc :: proc(t: ^thread.Thread) {
@@ -1154,14 +1161,21 @@ threading_example :: proc() {
task_proc :: proc(t: thread.Task) {
index := t.user_index % len(prefix_table)
for iteration in 1..=5 {
for !did_acquire(&print_mutex) { thread.yield() } // Allow one thread to print at a time.
fmt.printf("Worker Task %d is on iteration %d\n", t.user_index, iteration)
fmt.printf("`%s`: iteration %d\n", prefix_table[index], iteration)
print_mutex = false
time.sleep(1 * time.Millisecond)
}
}
N :: 3
pool: thread.Pool
thread.pool_init(pool=&pool, thread_count=3, allocator=context.allocator)
thread.pool_init(pool=&pool, thread_count=N, allocator=context.allocator)
defer thread.pool_destroy(&pool)
@@ -1171,6 +1185,19 @@ threading_example :: proc() {
}
thread.pool_start(&pool)
{
// Wait a moment before we cancel a thread
time.sleep(5 * time.Millisecond)
// Allow one thread to print at a time.
for !did_acquire(&print_mutex) { thread.yield() }
thread.terminate(pool.threads[N - 1], 0)
fmt.println("Canceled last thread")
print_mutex = false
}
thread.pool_finish(&pool)
}
}