diff --git a/core/sys/unix/pthread_darwin.odin b/core/sys/unix/pthread_darwin.odin index 542a550cb..e138b8610 100644 --- a/core/sys/unix/pthread_darwin.odin +++ b/core/sys/unix/pthread_darwin.odin @@ -81,3 +81,16 @@ PTHREAD_MUTEX_NORMAL :: 0 PTHREAD_MUTEX_RECURSIVE :: 1 PTHREAD_MUTEX_ERRORCHECK :: 2 +PTHREAD_CANCEL_ENABLE :: 0 +PTHREAD_CANCEL_DISABLE :: 1 +PTHREAD_CANCEL_DEFERRED :: 0 +PTHREAD_CANCEL_ASYNCHRONOUS :: 1 + +foreign import pthread "System.framework" + +@(default_calling_convention="c") +foreign pthread { + pthread_setcancelstate :: proc (state: c.int, old_state: ^c.int) -> c.int --- + pthread_setcanceltype :: proc (type: c.int, old_type: ^c.int) -> c.int --- + pthread_cancel :: proc (thread: pthread_t) -> c.int --- +} \ No newline at end of file diff --git a/core/sys/unix/pthread_freebsd.odin b/core/sys/unix/pthread_freebsd.odin index dd5306417..e02345cad 100644 --- a/core/sys/unix/pthread_freebsd.odin +++ b/core/sys/unix/pthread_freebsd.odin @@ -92,6 +92,11 @@ sem_t :: struct { _padding: u32, } +PTHREAD_CANCEL_ENABLE :: 0 +PTHREAD_CANCEL_DISABLE :: 1 +PTHREAD_CANCEL_DEFERRED :: 0 +PTHREAD_CANCEL_ASYNCHRONOUS :: 1 + foreign import "system:pthread" @(default_calling_convention="c") @@ -110,5 +115,8 @@ foreign pthread { // NOTE: unclear whether pthread_yield is well-supported on Linux systems, // see https://linux.die.net/man/3/pthread_yield pthread_yield :: proc() --- -} + pthread_setcancelstate :: proc (state: c.int, old_state: ^c.int) -> c.int --- + pthread_setcanceltype :: proc (type: c.int, old_type: ^c.int) -> c.int --- + pthread_cancel :: proc (thread: pthread_t) -> c.int --- +} \ No newline at end of file diff --git a/core/sys/unix/pthread_linux.odin b/core/sys/unix/pthread_linux.odin index 099e7c7e9..9c297ef22 100644 --- a/core/sys/unix/pthread_linux.odin +++ b/core/sys/unix/pthread_linux.odin @@ -94,6 +94,11 @@ when size_of(int) == 8 { SEM_T_SIZE :: 16 } +PTHREAD_CANCEL_ENABLE :: 0 +PTHREAD_CANCEL_DISABLE :: 1 +PTHREAD_CANCEL_DEFERRED :: 0 +PTHREAD_CANCEL_ASYNCHRONOUS :: 1 + foreign import "system:pthread" @(default_calling_convention="c") @@ -112,4 +117,8 @@ foreign pthread { // NOTE: unclear whether pthread_yield is well-supported on Linux systems, // see https://linux.die.net/man/3/pthread_yield pthread_yield :: proc() -> c.int --- + + pthread_setcancelstate :: proc (state: c.int, old_state: ^c.int) -> c.int --- + pthread_setcanceltype :: proc (type: c.int, old_type: ^c.int) -> c.int --- + pthread_cancel :: proc (thread: pthread_t) -> c.int --- } diff --git a/core/sys/unix/pthread_openbsd.odin b/core/sys/unix/pthread_openbsd.odin index c855f95c0..7ae82e662 100644 --- a/core/sys/unix/pthread_openbsd.odin +++ b/core/sys/unix/pthread_openbsd.odin @@ -46,6 +46,11 @@ sched_param :: struct { sem_t :: distinct rawptr +PTHREAD_CANCEL_ENABLE :: 0 +PTHREAD_CANCEL_DISABLE :: 1 +PTHREAD_CANCEL_DEFERRED :: 0 +PTHREAD_CANCEL_ASYNCHRONOUS :: 1 + foreign import libc "system:c" @(default_calling_convention="c") @@ -62,4 +67,8 @@ foreign libc { // NOTE: unclear whether pthread_yield is well-supported on Linux systems, // see https://linux.die.net/man/3/pthread_yield pthread_yield :: proc() --- -} + + pthread_setcancelstate :: proc (state: c.int, old_state: ^c.int) -> c.int --- + pthread_setcanceltype :: proc (type: c.int, old_type: ^c.int) -> c.int --- + pthread_cancel :: proc (thread: pthread_t) -> c.int --- +} \ No newline at end of file diff --git a/core/thread/thread.odin b/core/thread/thread.odin index d1b95a2fd..90230ae75 100644 --- a/core/thread/thread.odin +++ b/core/thread/thread.odin @@ -53,7 +53,7 @@ join :: proc(thread: ^Thread) { } -join_mulitple :: proc(threads: ..^Thread) { +join_multiple :: proc(threads: ..^Thread) { _join_multiple(..threads) } diff --git a/core/thread/thread_unix.odin b/core/thread/thread_unix.odin index 8452df112..8c7058f17 100644 --- a/core/thread/thread_unix.odin +++ b/core/thread/thread_unix.odin @@ -7,6 +7,8 @@ import "core:intrinsics" import "core:sync" import "core:sys/unix" +CAS :: intrinsics.atomic_compare_exchange_strong + Thread_State :: enum u8 { Started, Joined, @@ -29,6 +31,11 @@ _create :: proc(procedure: Thread_Proc, priority := Thread_Priority.Normal) -> ^ __linux_thread_entry_proc :: proc "c" (t: rawptr) -> rawptr { t := (^Thread)(t) + when ODIN_OS != .Darwin { + // We need to give the thread a moment to start up before we enable cancellation. + can_set_thread_cancel_state := unix.pthread_setcancelstate(unix.PTHREAD_CANCEL_DISABLE, nil) == 0 + } + context = runtime.default_context() sync.lock(&t.mutex) @@ -42,6 +49,14 @@ _create :: proc(procedure: Thread_Proc, priority := Thread_Priority.Normal) -> ^ init_context := t.init_context context = init_context.? or_else runtime.default_context() + when ODIN_OS != .Darwin { + // Enable thread's cancelability. + if can_set_thread_cancel_state { + unix.pthread_setcanceltype (unix.PTHREAD_CANCEL_ASYNCHRONOUS, nil) + unix.pthread_setcancelstate(unix.PTHREAD_CANCEL_DISABLE, nil) + } + } + t.procedure(t) intrinsics.atomic_store(&t.flags, t.flags + { .Done }) @@ -98,7 +113,7 @@ _create :: proc(procedure: Thread_Proc, priority := Thread_Priority.Normal) -> ^ } _start :: proc(t: ^Thread) { - sync.guard(&t.mutex) + // sync.guard(&t.mutex) t.flags += { .Started } sync.signal(&t.cond) } @@ -108,15 +123,22 @@ _is_done :: proc(t: ^Thread) -> bool { } _join :: proc(t: ^Thread) { - sync.guard(&t.mutex) + // sync.guard(&t.mutex) - if .Joined in t.flags || unix.pthread_equal(unix.pthread_self(), t.unix_thread) { + if unix.pthread_equal(unix.pthread_self(), t.unix_thread) { return } - unix.pthread_join(t.unix_thread, nil) + // Preserve other flags besides `.Joined`, like `.Started`. + unjoined := intrinsics.atomic_load(&t.flags) - {.Joined} + joined := unjoined + {.Joined} - t.flags += { .Joined } + // Try to set `t.flags` from unjoined to joined. If it returns joined, + // it means the previous value had that flag set and we can return. + if res, ok := CAS(&t.flags, unjoined, joined); res == joined && !ok { + return + } + unix.pthread_join(t.unix_thread, nil) } _join_multiple :: proc(threads: ..^Thread) { @@ -132,7 +154,10 @@ _destroy :: proc(t: ^Thread) { } _terminate :: proc(t: ^Thread, exit_code: int) { - // TODO(bill) + // `pthread_cancel` is unreliable on Darwin for unknown reasons. + when ODIN_OS != .Darwin { + unix.pthread_cancel(t.unix_thread) + } } _yield :: proc() { diff --git a/examples/demo/demo.odin b/examples/demo/demo.odin index a36acdf18..c50a5bdf8 100644 --- a/examples/demo/demo.odin +++ b/examples/demo/demo.odin @@ -1110,9 +1110,16 @@ prefix_table := [?]string{ "Black", } +print_mutex := b64(false) + threading_example :: proc() { fmt.println("\n# threading_example") + did_acquire :: proc(m: ^b64) -> (acquired: bool) { + res, ok := intrinsics.atomic_compare_exchange_strong(m, false, true) + return ok && res == false + } + { // Basic Threads fmt.println("\n## Basic Threads") worker_proc :: proc(t: ^thread.Thread) { @@ -1154,14 +1161,21 @@ threading_example :: proc() { task_proc :: proc(t: thread.Task) { index := t.user_index % len(prefix_table) for iteration in 1..=5 { + for !did_acquire(&print_mutex) { thread.yield() } // Allow one thread to print at a time. + fmt.printf("Worker Task %d is on iteration %d\n", t.user_index, iteration) fmt.printf("`%s`: iteration %d\n", prefix_table[index], iteration) + + print_mutex = false + time.sleep(1 * time.Millisecond) } } + N :: 3 + pool: thread.Pool - thread.pool_init(pool=&pool, thread_count=3, allocator=context.allocator) + thread.pool_init(pool=&pool, thread_count=N, allocator=context.allocator) defer thread.pool_destroy(&pool) @@ -1171,6 +1185,19 @@ threading_example :: proc() { } thread.pool_start(&pool) + + { + // Wait a moment before we cancel a thread + time.sleep(5 * time.Millisecond) + + // Allow one thread to print at a time. + for !did_acquire(&print_mutex) { thread.yield() } + + thread.terminate(pool.threads[N - 1], 0) + fmt.println("Canceled last thread") + print_mutex = false + } + thread.pool_finish(&pool) } }