From d48d3bfa87f8f944c7bb96a1efe298beaaa9c1cf Mon Sep 17 00:00:00 2001 From: Thimilius Date: Wed, 11 May 2022 13:12:07 +0200 Subject: [PATCH 1/4] Fix join_multiple typo --- core/thread/thread.odin | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/thread/thread.odin b/core/thread/thread.odin index d1b95a2fd..90230ae75 100644 --- a/core/thread/thread.odin +++ b/core/thread/thread.odin @@ -53,7 +53,7 @@ join :: proc(thread: ^Thread) { } -join_mulitple :: proc(threads: ..^Thread) { +join_multiple :: proc(threads: ..^Thread) { _join_multiple(..threads) } From 56e3b7cb7d96a467ace98ebdca11d2baeef0a8c0 Mon Sep 17 00:00:00 2001 From: Jeroen van Rijn Date: Wed, 11 May 2022 13:43:29 +0200 Subject: [PATCH 2/4] Fix join on *nix. --- core/thread/thread_unix.odin | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/core/thread/thread_unix.odin b/core/thread/thread_unix.odin index 8452df112..3897f6100 100644 --- a/core/thread/thread_unix.odin +++ b/core/thread/thread_unix.odin @@ -7,6 +7,8 @@ import "core:intrinsics" import "core:sync" import "core:sys/unix" +CAS :: intrinsics.atomic_compare_exchange_strong + Thread_State :: enum u8 { Started, Joined, @@ -98,7 +100,7 @@ _create :: proc(procedure: Thread_Proc, priority := Thread_Priority.Normal) -> ^ } _start :: proc(t: ^Thread) { - sync.guard(&t.mutex) + // sync.guard(&t.mutex) t.flags += { .Started } sync.signal(&t.cond) } @@ -108,15 +110,22 @@ _is_done :: proc(t: ^Thread) -> bool { } _join :: proc(t: ^Thread) { - sync.guard(&t.mutex) + // sync.guard(&t.mutex) - if .Joined in t.flags || unix.pthread_equal(unix.pthread_self(), t.unix_thread) { + if unix.pthread_equal(unix.pthread_self(), t.unix_thread) { return } - unix.pthread_join(t.unix_thread, nil) + // Preserve other flags besides `.Joined`, like `.Started`. + unjoined := intrinsics.atomic_load(&t.flags) - {.Joined} + joined := unjoined + {.Joined} - t.flags += { .Joined } + // Try to set `t.flags` from unjoined to joined. If it returns joined, + // it means the previous value had that flag set and we can return. + if res, ok := CAS(&t.flags, unjoined, joined); res == joined && !ok { + return + } + unix.pthread_join(t.unix_thread, nil) } _join_multiple :: proc(threads: ..^Thread) { From 8fb718245a76cc9daa45122e6e6990f558b14de7 Mon Sep 17 00:00:00 2001 From: Jeroen van Rijn Date: Wed, 11 May 2022 15:52:04 +0200 Subject: [PATCH 3/4] Implement pthread_cancel. --- core/sys/unix/pthread_darwin.odin | 13 +++++++++++++ core/sys/unix/pthread_freebsd.odin | 10 +++++++++- core/sys/unix/pthread_linux.odin | 9 +++++++++ core/sys/unix/pthread_openbsd.odin | 11 ++++++++++- core/thread/thread_unix.odin | 11 ++++++++++- examples/demo/demo.odin | 29 ++++++++++++++++++++++++++++- 6 files changed, 79 insertions(+), 4 deletions(-) diff --git a/core/sys/unix/pthread_darwin.odin b/core/sys/unix/pthread_darwin.odin index 542a550cb..e138b8610 100644 --- a/core/sys/unix/pthread_darwin.odin +++ b/core/sys/unix/pthread_darwin.odin @@ -81,3 +81,16 @@ PTHREAD_MUTEX_NORMAL :: 0 PTHREAD_MUTEX_RECURSIVE :: 1 PTHREAD_MUTEX_ERRORCHECK :: 2 +PTHREAD_CANCEL_ENABLE :: 0 +PTHREAD_CANCEL_DISABLE :: 1 +PTHREAD_CANCEL_DEFERRED :: 0 +PTHREAD_CANCEL_ASYNCHRONOUS :: 1 + +foreign import pthread "System.framework" + +@(default_calling_convention="c") +foreign pthread { + pthread_setcancelstate :: proc (state: c.int, old_state: ^c.int) -> c.int --- + pthread_setcanceltype :: proc (type: c.int, old_type: ^c.int) -> c.int --- + pthread_cancel :: proc (thread: pthread_t) -> c.int --- +} \ No newline at end of file diff --git a/core/sys/unix/pthread_freebsd.odin b/core/sys/unix/pthread_freebsd.odin index dd5306417..e02345cad 100644 --- a/core/sys/unix/pthread_freebsd.odin +++ b/core/sys/unix/pthread_freebsd.odin @@ -92,6 +92,11 @@ sem_t :: struct { _padding: u32, } +PTHREAD_CANCEL_ENABLE :: 0 +PTHREAD_CANCEL_DISABLE :: 1 +PTHREAD_CANCEL_DEFERRED :: 0 +PTHREAD_CANCEL_ASYNCHRONOUS :: 1 + foreign import "system:pthread" @(default_calling_convention="c") @@ -110,5 +115,8 @@ foreign pthread { // NOTE: unclear whether pthread_yield is well-supported on Linux systems, // see https://linux.die.net/man/3/pthread_yield pthread_yield :: proc() --- -} + pthread_setcancelstate :: proc (state: c.int, old_state: ^c.int) -> c.int --- + pthread_setcanceltype :: proc (type: c.int, old_type: ^c.int) -> c.int --- + pthread_cancel :: proc (thread: pthread_t) -> c.int --- +} \ No newline at end of file diff --git a/core/sys/unix/pthread_linux.odin b/core/sys/unix/pthread_linux.odin index 099e7c7e9..9c297ef22 100644 --- a/core/sys/unix/pthread_linux.odin +++ b/core/sys/unix/pthread_linux.odin @@ -94,6 +94,11 @@ when size_of(int) == 8 { SEM_T_SIZE :: 16 } +PTHREAD_CANCEL_ENABLE :: 0 +PTHREAD_CANCEL_DISABLE :: 1 +PTHREAD_CANCEL_DEFERRED :: 0 +PTHREAD_CANCEL_ASYNCHRONOUS :: 1 + foreign import "system:pthread" @(default_calling_convention="c") @@ -112,4 +117,8 @@ foreign pthread { // NOTE: unclear whether pthread_yield is well-supported on Linux systems, // see https://linux.die.net/man/3/pthread_yield pthread_yield :: proc() -> c.int --- + + pthread_setcancelstate :: proc (state: c.int, old_state: ^c.int) -> c.int --- + pthread_setcanceltype :: proc (type: c.int, old_type: ^c.int) -> c.int --- + pthread_cancel :: proc (thread: pthread_t) -> c.int --- } diff --git a/core/sys/unix/pthread_openbsd.odin b/core/sys/unix/pthread_openbsd.odin index c855f95c0..7ae82e662 100644 --- a/core/sys/unix/pthread_openbsd.odin +++ b/core/sys/unix/pthread_openbsd.odin @@ -46,6 +46,11 @@ sched_param :: struct { sem_t :: distinct rawptr +PTHREAD_CANCEL_ENABLE :: 0 +PTHREAD_CANCEL_DISABLE :: 1 +PTHREAD_CANCEL_DEFERRED :: 0 +PTHREAD_CANCEL_ASYNCHRONOUS :: 1 + foreign import libc "system:c" @(default_calling_convention="c") @@ -62,4 +67,8 @@ foreign libc { // NOTE: unclear whether pthread_yield is well-supported on Linux systems, // see https://linux.die.net/man/3/pthread_yield pthread_yield :: proc() --- -} + + pthread_setcancelstate :: proc (state: c.int, old_state: ^c.int) -> c.int --- + pthread_setcanceltype :: proc (type: c.int, old_type: ^c.int) -> c.int --- + pthread_cancel :: proc (thread: pthread_t) -> c.int --- +} \ No newline at end of file diff --git a/core/thread/thread_unix.odin b/core/thread/thread_unix.odin index 3897f6100..1a2b30197 100644 --- a/core/thread/thread_unix.odin +++ b/core/thread/thread_unix.odin @@ -31,6 +31,9 @@ _create :: proc(procedure: Thread_Proc, priority := Thread_Priority.Normal) -> ^ __linux_thread_entry_proc :: proc "c" (t: rawptr) -> rawptr { t := (^Thread)(t) + // We need to give the thread a moment to start up before we enable cancellation. + can_set_thread_cancel_state := unix.pthread_setcancelstate(unix.PTHREAD_CANCEL_DISABLE, nil) == 0 + context = runtime.default_context() sync.lock(&t.mutex) @@ -44,6 +47,12 @@ _create :: proc(procedure: Thread_Proc, priority := Thread_Priority.Normal) -> ^ init_context := t.init_context context = init_context.? or_else runtime.default_context() + // Enable thread's cancelability. + if can_set_thread_cancel_state { + unix.pthread_setcanceltype (unix.PTHREAD_CANCEL_ASYNCHRONOUS, nil) + unix.pthread_setcancelstate(unix.PTHREAD_CANCEL_DISABLE, nil) + } + t.procedure(t) intrinsics.atomic_store(&t.flags, t.flags + { .Done }) @@ -141,7 +150,7 @@ _destroy :: proc(t: ^Thread) { } _terminate :: proc(t: ^Thread, exit_code: int) { - // TODO(bill) + unix.pthread_cancel(t.unix_thread) } _yield :: proc() { diff --git a/examples/demo/demo.odin b/examples/demo/demo.odin index a36acdf18..c50a5bdf8 100644 --- a/examples/demo/demo.odin +++ b/examples/demo/demo.odin @@ -1110,9 +1110,16 @@ prefix_table := [?]string{ "Black", } +print_mutex := b64(false) + threading_example :: proc() { fmt.println("\n# threading_example") + did_acquire :: proc(m: ^b64) -> (acquired: bool) { + res, ok := intrinsics.atomic_compare_exchange_strong(m, false, true) + return ok && res == false + } + { // Basic Threads fmt.println("\n## Basic Threads") worker_proc :: proc(t: ^thread.Thread) { @@ -1154,14 +1161,21 @@ threading_example :: proc() { task_proc :: proc(t: thread.Task) { index := t.user_index % len(prefix_table) for iteration in 1..=5 { + for !did_acquire(&print_mutex) { thread.yield() } // Allow one thread to print at a time. + fmt.printf("Worker Task %d is on iteration %d\n", t.user_index, iteration) fmt.printf("`%s`: iteration %d\n", prefix_table[index], iteration) + + print_mutex = false + time.sleep(1 * time.Millisecond) } } + N :: 3 + pool: thread.Pool - thread.pool_init(pool=&pool, thread_count=3, allocator=context.allocator) + thread.pool_init(pool=&pool, thread_count=N, allocator=context.allocator) defer thread.pool_destroy(&pool) @@ -1171,6 +1185,19 @@ threading_example :: proc() { } thread.pool_start(&pool) + + { + // Wait a moment before we cancel a thread + time.sleep(5 * time.Millisecond) + + // Allow one thread to print at a time. + for !did_acquire(&print_mutex) { thread.yield() } + + thread.terminate(pool.threads[N - 1], 0) + fmt.println("Canceled last thread") + print_mutex = false + } + thread.pool_finish(&pool) } } From f4ad4c7aa6f9137dd10c9d201e80f072181d2f93 Mon Sep 17 00:00:00 2001 From: Jeroen van Rijn Date: Wed, 11 May 2022 16:17:35 +0200 Subject: [PATCH 4/4] Disable thread.terminate on Darwin for now. --- core/thread/thread_unix.odin | 21 ++++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/core/thread/thread_unix.odin b/core/thread/thread_unix.odin index 1a2b30197..8c7058f17 100644 --- a/core/thread/thread_unix.odin +++ b/core/thread/thread_unix.odin @@ -31,8 +31,10 @@ _create :: proc(procedure: Thread_Proc, priority := Thread_Priority.Normal) -> ^ __linux_thread_entry_proc :: proc "c" (t: rawptr) -> rawptr { t := (^Thread)(t) - // We need to give the thread a moment to start up before we enable cancellation. - can_set_thread_cancel_state := unix.pthread_setcancelstate(unix.PTHREAD_CANCEL_DISABLE, nil) == 0 + when ODIN_OS != .Darwin { + // We need to give the thread a moment to start up before we enable cancellation. + can_set_thread_cancel_state := unix.pthread_setcancelstate(unix.PTHREAD_CANCEL_DISABLE, nil) == 0 + } context = runtime.default_context() @@ -47,10 +49,12 @@ _create :: proc(procedure: Thread_Proc, priority := Thread_Priority.Normal) -> ^ init_context := t.init_context context = init_context.? or_else runtime.default_context() - // Enable thread's cancelability. - if can_set_thread_cancel_state { - unix.pthread_setcanceltype (unix.PTHREAD_CANCEL_ASYNCHRONOUS, nil) - unix.pthread_setcancelstate(unix.PTHREAD_CANCEL_DISABLE, nil) + when ODIN_OS != .Darwin { + // Enable thread's cancelability. + if can_set_thread_cancel_state { + unix.pthread_setcanceltype (unix.PTHREAD_CANCEL_ASYNCHRONOUS, nil) + unix.pthread_setcancelstate(unix.PTHREAD_CANCEL_DISABLE, nil) + } } t.procedure(t) @@ -150,7 +154,10 @@ _destroy :: proc(t: ^Thread) { } _terminate :: proc(t: ^Thread, exit_code: int) { - unix.pthread_cancel(t.unix_thread) + // `pthread_cancel` is unreliable on Darwin for unknown reasons. + when ODIN_OS != .Darwin { + unix.pthread_cancel(t.unix_thread) + } } _yield :: proc() {