From 7f7cfebc91cc319918bc0042789b0c7a931e56e2 Mon Sep 17 00:00:00 2001 From: Feoramund <161657516+Feoramund@users.noreply.github.com> Date: Mon, 9 Sep 2024 16:17:08 -0400 Subject: [PATCH] Add tests for `core:sync` and `core:sync/chan` --- tests/core/normal.odin | 2 + tests/core/sync/chan/test_core_sync_chan.odin | 274 +++++++ tests/core/sync/test_core_sync.odin | 718 ++++++++++++++++++ 3 files changed, 994 insertions(+) create mode 100644 tests/core/sync/chan/test_core_sync_chan.odin create mode 100644 tests/core/sync/test_core_sync.odin diff --git a/tests/core/normal.odin b/tests/core/normal.odin index 6174f2d5c..0670842ac 100644 --- a/tests/core/normal.odin +++ b/tests/core/normal.odin @@ -39,6 +39,8 @@ download_assets :: proc() { @(require) import "slice" @(require) import "strconv" @(require) import "strings" +@(require) import "sync" +@(require) import "sync/chan" @(require) import "sys/posix" @(require) import "sys/windows" @(require) import "text/i18n" diff --git a/tests/core/sync/chan/test_core_sync_chan.odin b/tests/core/sync/chan/test_core_sync_chan.odin new file mode 100644 index 000000000..9b8d9b354 --- /dev/null +++ b/tests/core/sync/chan/test_core_sync_chan.odin @@ -0,0 +1,274 @@ +package test_core_sync_chan + +import "base:runtime" +import "base:intrinsics" +import "core:log" +import "core:math/rand" +import "core:sync/chan" +import "core:testing" +import "core:thread" +import "core:time" + + +Message_Type :: enum i32 { + Result, + Add, + Multiply, + Subtract, + Divide, + End, +} + +Message :: struct { + type: Message_Type, + i: i64, +} + +Comm :: struct { + host: chan.Chan(Message), + client: chan.Chan(Message), + manual_buffering: bool, +} + +BUFFER_SIZE :: 8 +MAX_RAND :: 32 +FAIL_TIME :: 1 * time.Second +SLEEP_TIME :: 1 * time.Millisecond + +comm_client :: proc(th: ^thread.Thread) { + data := cast(^Comm)th.data + manual_buffering := data.manual_buffering + + n: i64 + + for manual_buffering && !chan.can_recv(data.host) { + thread.yield() + } + + recv_loop: for msg in chan.recv(data.host) { + #partial switch msg.type { + case .Add: n += msg.i + case .Multiply: n *= msg.i + case .Subtract: n -= msg.i + case .Divide: n /= msg.i + case .End: + break recv_loop + case: + panic("Unknown message type for client.") + } + + for manual_buffering && !chan.can_recv(data.host) { + thread.yield() + } + } + + for manual_buffering && !chan.can_send(data.host) { + thread.yield() + } + + chan.send(data.client, Message{.Result, n}) + chan.close(data.client) +} + +send_messages :: proc(t: ^testing.T, host: chan.Chan(Message), manual_buffering: bool = false) -> (expected: i64) { + expected = 1 + for manual_buffering && !chan.can_send(host) { + thread.yield() + } + chan.send(host, Message{.Add, 1}) + log.debug(Message{.Add, 1}) + + for _ in 0..<1+2*BUFFER_SIZE { + msg: Message + msg.i = 1 + rand.int63_max(MAX_RAND) + switch rand.int_max(4) { + case 0: + msg.type = .Add + expected += msg.i + case 1: + msg.type = .Multiply + expected *= msg.i + case 2: + msg.type = .Subtract + expected -= msg.i + case 3: + msg.type = .Divide + expected /= msg.i + } + + for manual_buffering && !chan.can_send(host) { + thread.yield() + } + if manual_buffering { + testing.expect(t, chan.len(host) == 0) + } + + chan.send(host, msg) + log.debug(msg) + } + + for manual_buffering && !chan.can_send(host) { + thread.yield() + } + chan.send(host, Message{.End, 0}) + log.debug(Message{.End, 0}) + chan.close(host) + + return +} + +@test +test_chan_buffered :: proc(t: ^testing.T) { + testing.set_fail_timeout(t, FAIL_TIME) + + comm: Comm + alloc_err: runtime.Allocator_Error + comm.host, alloc_err = chan.create_buffered(chan.Chan(Message), BUFFER_SIZE, context.allocator) + assert(alloc_err == nil, "allocation failed") + comm.client, alloc_err = chan.create_buffered(chan.Chan(Message), BUFFER_SIZE, context.allocator) + assert(alloc_err == nil, "allocation failed") + defer { + chan.destroy(comm.host) + chan.destroy(comm.client) + } + + testing.expect(t, chan.is_buffered(comm.host)) + testing.expect(t, chan.is_buffered(comm.client)) + testing.expect(t, !chan.is_unbuffered(comm.host)) + testing.expect(t, !chan.is_unbuffered(comm.client)) + testing.expect_value(t, chan.len(comm.host), 0) + testing.expect_value(t, chan.len(comm.client), 0) + testing.expect_value(t, chan.cap(comm.host), BUFFER_SIZE) + testing.expect_value(t, chan.cap(comm.client), BUFFER_SIZE) + + reckoner := thread.create(comm_client) + defer thread.destroy(reckoner) + reckoner.data = &comm + thread.start(reckoner) + + expected := send_messages(t, comm.host, manual_buffering = false) + + // Sleep so we can give the other thread enough time to buffer its message. + time.sleep(SLEEP_TIME) + + testing.expect_value(t, chan.len(comm.client), 1) + result, ok := chan.try_recv(comm.client) + + // One more sleep to ensure it has enough time to close. + time.sleep(SLEEP_TIME) + + testing.expect_value(t, chan.is_closed(comm.client), true) + testing.expect_value(t, ok, true) + testing.expect_value(t, result.i, expected) + log.debug(result, expected) + + // Make sure sending to closed channels fails. + testing.expect_value(t, chan.send(comm.host, Message{.End, 0}), false) + testing.expect_value(t, chan.send(comm.client, Message{.End, 0}), false) + testing.expect_value(t, chan.try_send(comm.host, Message{.End, 0}), false) + testing.expect_value(t, chan.try_send(comm.client, Message{.End, 0}), false) + _, ok = chan.recv(comm.host); testing.expect_value(t, ok, false) + _, ok = chan.recv(comm.client); testing.expect_value(t, ok, false) + _, ok = chan.try_recv(comm.host); testing.expect_value(t, ok, false) + _, ok = chan.try_recv(comm.client); testing.expect_value(t, ok, false) +} + +@test +test_chan_unbuffered :: proc(t: ^testing.T) { + testing.set_fail_timeout(t, FAIL_TIME) + + comm: Comm + comm.manual_buffering = true + alloc_err: runtime.Allocator_Error + comm.host, alloc_err = chan.create_unbuffered(chan.Chan(Message), context.allocator) + assert(alloc_err == nil, "allocation failed") + comm.client, alloc_err = chan.create_unbuffered(chan.Chan(Message), context.allocator) + assert(alloc_err == nil, "allocation failed") + defer { + chan.destroy(comm.host) + chan.destroy(comm.client) + } + + testing.expect(t, !chan.is_buffered(comm.host)) + testing.expect(t, !chan.is_buffered(comm.client)) + testing.expect(t, chan.is_unbuffered(comm.host)) + testing.expect(t, chan.is_unbuffered(comm.client)) + testing.expect_value(t, chan.len(comm.host), 0) + testing.expect_value(t, chan.len(comm.client), 0) + testing.expect_value(t, chan.cap(comm.host), 0) + testing.expect_value(t, chan.cap(comm.client), 0) + + reckoner := thread.create(comm_client) + defer thread.destroy(reckoner) + reckoner.data = &comm + thread.start(reckoner) + + for !chan.can_send(comm.client) { + thread.yield() + } + + expected := send_messages(t, comm.host) + testing.expect_value(t, chan.is_closed(comm.host), true) + + for !chan.can_recv(comm.client) { + thread.yield() + } + + result, ok := chan.try_recv(comm.client) + testing.expect_value(t, ok, true) + testing.expect_value(t, result.i, expected) + log.debug(result, expected) + + // Sleep so we can give the other thread enough time to close its side + // after we've received its message. + time.sleep(SLEEP_TIME) + + testing.expect_value(t, chan.is_closed(comm.client), true) + + // Make sure sending and receiving on closed channels fails. + testing.expect_value(t, chan.send(comm.host, Message{.End, 0}), false) + testing.expect_value(t, chan.send(comm.client, Message{.End, 0}), false) + testing.expect_value(t, chan.try_send(comm.host, Message{.End, 0}), false) + testing.expect_value(t, chan.try_send(comm.client, Message{.End, 0}), false) + _, ok = chan.recv(comm.host); testing.expect_value(t, ok, false) + _, ok = chan.recv(comm.client); testing.expect_value(t, ok, false) + _, ok = chan.try_recv(comm.host); testing.expect_value(t, ok, false) + _, ok = chan.try_recv(comm.client); testing.expect_value(t, ok, false) +} + +@test +test_full_buffered_closed_chan_deadlock :: proc(t: ^testing.T) { + testing.set_fail_timeout(t, FAIL_TIME) + + ch, alloc_err := chan.create_buffered(chan.Chan(int), 1, context.allocator) + assert(alloc_err == nil, "allocation failed") + defer chan.destroy(ch) + + testing.expect(t, chan.can_send(ch)) + testing.expect(t, chan.send(ch, 32)) + testing.expect(t, chan.close(ch)) + testing.expect(t, !chan.send(ch, 32)) +} + +// This test guarantees a buffered channel's messages can still be received +// even after closing. This is currently how the API works. If that changes, +// this test will need to change. +@test +test_accept_message_from_closed_buffered_chan :: proc(t: ^testing.T) { + testing.set_fail_timeout(t, FAIL_TIME) + + ch, alloc_err := chan.create_buffered(chan.Chan(int), 2, context.allocator) + assert(alloc_err == nil, "allocation failed") + defer chan.destroy(ch) + + testing.expect(t, chan.can_send(ch)) + testing.expect(t, chan.send(ch, 32)) + testing.expect(t, chan.send(ch, 64)) + testing.expect(t, chan.close(ch)) + result, ok := chan.recv(ch) + testing.expect_value(t, result, 32) + testing.expect(t, ok) + result, ok = chan.try_recv(ch) + testing.expect_value(t, result, 64) + testing.expect(t, ok) +} diff --git a/tests/core/sync/test_core_sync.odin b/tests/core/sync/test_core_sync.odin new file mode 100644 index 000000000..32c08f935 --- /dev/null +++ b/tests/core/sync/test_core_sync.odin @@ -0,0 +1,718 @@ +// NOTE(Feoramund): These tests should be run a few hundred times, with and +// without `-sanitize:thread` enabled, to ensure maximum safety. +// +// Keep in mind that running with the debug logs uncommented can result in +// failures disappearing due to the delay of sending the log message causing +// different synchronization patterns. +// +// These tests are temporarily disabled on Darwin, as there is currently a +// stall occurring which I cannot debug. + +//+build !darwin +package test_core_sync + +import "base:intrinsics" +// import "core:log" +import "core:sync" +import "core:testing" +import "core:thread" +import "core:time" + +FAIL_TIME :: 1 * time.Second +SLEEP_TIME :: 1 * time.Millisecond +SMALL_SLEEP_TIME :: 10 * time.Microsecond + +// This needs to be high enough to cause a data race if any of the +// synchronization primitives fail. +THREADS :: 8 + +// Manually wait on all threads to finish. +// +// This reduces a dependency on a `Wait_Group` or similar primitives. +// +// It's also important that we wait for every thread to finish, as it's +// possible for a thread to finish after the test if we don't check, despite +// joining it to the test thread. +wait_for :: proc(threads: []^thread.Thread) { + wait_loop: for { + count := len(threads) + for v in threads { + if thread.is_done(v) { + count -= 1 + } + } + if count == 0 { + break wait_loop + } + thread.yield() + } + for t in threads { + thread.join(t) + thread.destroy(t) + } +} + +// +// core:sync/primitives.odin +// + +@test +test_mutex :: proc(t: ^testing.T) { + testing.set_fail_timeout(t, FAIL_TIME) + + Data :: struct { + m: sync.Mutex, + number: int, + } + + p :: proc(th: ^thread.Thread) { + data := cast(^Data)th.data + + // log.debugf("MUTEX-%v> locking", th.id) + sync.mutex_lock(&data.m) + data.number += 1 + // log.debugf("MUTEX-%v> unlocking", th.id) + sync.mutex_unlock(&data.m) + // log.debugf("MUTEX-%v> leaving", th.id) + } + + data: Data + threads: [THREADS]^thread.Thread + + for &v in threads { + v = thread.create(p) + v.data = &data + v.init_context = context + thread.start(v) + } + + wait_for(threads[:]) + + testing.expect_value(t, data.number, THREADS) +} + +@test +test_rw_mutex :: proc(t: ^testing.T) { + testing.set_fail_timeout(t, FAIL_TIME) + + Data :: struct { + m1: sync.RW_Mutex, + m2: sync.RW_Mutex, + number1: int, + number2: int, + } + + p :: proc(th: ^thread.Thread) { + data := cast(^Data)th.data + + sync.rw_mutex_shared_lock(&data.m1) + n := data.number1 + sync.rw_mutex_shared_unlock(&data.m1) + + sync.rw_mutex_lock(&data.m2) + data.number2 += n + sync.rw_mutex_unlock(&data.m2) + } + + data: Data + threads: [THREADS]^thread.Thread + + sync.rw_mutex_lock(&data.m1) + + for &v in threads { + v = thread.create(p) + v.data = &data + v.init_context = context + thread.start(v) + } + + data.number1 = 1 + sync.rw_mutex_unlock(&data.m1) + + wait_for(threads[:]) + + testing.expect_value(t, data.number2, THREADS) +} + +@test +test_recursive_mutex :: proc(t: ^testing.T) { + testing.set_fail_timeout(t, FAIL_TIME) + + Data :: struct { + m: sync.Recursive_Mutex, + number: int, + } + + p :: proc(th: ^thread.Thread) { + data := cast(^Data)th.data + + // log.debugf("REC_MUTEX-%v> locking", th.id) + tried1 := sync.recursive_mutex_try_lock(&data.m) + for _ in 0..<3 { + sync.recursive_mutex_lock(&data.m) + } + tried2 := sync.recursive_mutex_try_lock(&data.m) + // log.debugf("REC_MUTEX-%v> locked", th.id) + data.number += 1 + // log.debugf("REC_MUTEX-%v> unlocking", th.id) + for _ in 0..<3 { + sync.recursive_mutex_unlock(&data.m) + } + if tried1 { sync.recursive_mutex_unlock(&data.m) } + if tried2 { sync.recursive_mutex_unlock(&data.m) } + // log.debugf("REC_MUTEX-%v> leaving", th.id) + } + + data: Data + threads: [THREADS]^thread.Thread + + for &v in threads { + v = thread.create(p) + v.data = &data + v.init_context = context + thread.start(v) + } + + wait_for(threads[:]) + + testing.expect_value(t, data.number, THREADS) +} + +@test +test_cond :: proc(t: ^testing.T) { + testing.set_fail_timeout(t, FAIL_TIME) + + Data :: struct { + c: sync.Cond, + m: sync.Mutex, + i: int, + number: int, + } + + p :: proc(th: ^thread.Thread) { + data := cast(^Data)th.data + + sync.mutex_lock(&data.m) + + for intrinsics.atomic_load(&data.i) != 1 { + sync.cond_wait(&data.c, &data.m) + } + + data.number += intrinsics.atomic_load(&data.i) + + sync.mutex_unlock(&data.m) + } + + data: Data + threads: [THREADS]^thread.Thread + data.i = -1 + + sync.mutex_lock(&data.m) + + for &v in threads { + v = thread.create(p) + v.data = &data + v.init_context = context + thread.start(v) + } + + time.sleep(SLEEP_TIME) + data.i = 1 + sync.mutex_unlock(&data.m) + sync.cond_broadcast(&data.c) + + wait_for(threads[:]) + + testing.expect_value(t, data.number, THREADS) +} + +@test +test_cond_with_timeout :: proc(t: ^testing.T) { + testing.set_fail_timeout(t, FAIL_TIME) + + c: sync.Cond + m: sync.Mutex + sync.mutex_lock(&m) + sync.cond_wait_with_timeout(&c, &m, SLEEP_TIME) +} + +@test +test_semaphore :: proc(t: ^testing.T) { + testing.set_fail_timeout(t, FAIL_TIME) + + Data :: struct { + s: sync.Sema, + number: int, + } + + p :: proc(th: ^thread.Thread) { + data := cast(^Data)th.data + + // log.debugf("SEM-%v> waiting", th.id) + sync.sema_wait(&data.s) + data.number += 1 + // log.debugf("SEM-%v> posting", th.id) + sync.sema_post(&data.s) + // log.debugf("SEM-%v> leaving", th.id) + } + + data: Data + threads: [THREADS]^thread.Thread + + for &v in threads { + v = thread.create(p) + v.data = &data + v.init_context = context + thread.start(v) + } + sync.sema_post(&data.s) + + wait_for(threads[:]) + + testing.expect_value(t, data.number, THREADS) +} + +@test +test_semaphore_with_timeout :: proc(t: ^testing.T) { + testing.set_fail_timeout(t, FAIL_TIME) + + s: sync.Sema + sync.sema_wait_with_timeout(&s, SLEEP_TIME) +} + +@test +test_futex :: proc(t: ^testing.T) { + testing.set_fail_timeout(t, FAIL_TIME) + + Data :: struct { + f: sync.Futex, + i: int, + number: int, + } + + p :: proc(th: ^thread.Thread) { + data := cast(^Data)th.data + + // log.debugf("FUTEX-%v> waiting", th.id) + sync.futex_wait(&data.f, 3) + // log.debugf("FUTEX-%v> done", th.id) + + n := data.i + intrinsics.atomic_add(&data.number, n) + } + + data: Data + data.i = -1 + data.f = 3 + threads: [THREADS]^thread.Thread + + for &v in threads { + v = thread.create(p) + v.data = &data + v.init_context = context + thread.start(v) + } + + data.i = 1 + // Change the futex variable to keep late-starters from stalling. + data.f = 0 + sync.futex_broadcast(&data.f) + + wait_for(threads[:]) + + testing.expect_value(t, data.number, THREADS) +} + +@test +test_futex_with_timeout :: proc(t: ^testing.T) { + testing.set_fail_timeout(t, FAIL_TIME) + + f: sync.Futex = 1 + sync.futex_wait_with_timeout(&f, 1, SLEEP_TIME) +} + +// +// core:sync/extended.odin +// + +@test +test_wait_group :: proc(t: ^testing.T) { + testing.set_fail_timeout(t, FAIL_TIME) + + Data :: struct { + step1: sync.Wait_Group, + step2: sync.Wait_Group, + i: int, + number: int, + } + + p :: proc(th: ^thread.Thread) { + data := cast(^Data)th.data + + sync.wait_group_wait(&data.step1) + + n := data.i + intrinsics.atomic_add(&data.number, n) + + sync.wait_group_done(&data.step2) + } + + data: Data + data.i = -1 + threads: [THREADS]^thread.Thread + + sync.wait_group_add(&data.step1, 1) + sync.wait_group_add(&data.step2, THREADS) + + for &v in threads { + v = thread.create(p) + v.data = &data + v.init_context = context + thread.start(v) + } + + time.sleep(SMALL_SLEEP_TIME) + data.i = 1 + sync.wait_group_done(&data.step1) + + sync.wait_group_wait(&data.step2) + + wait_for(threads[:]) + + testing.expect_value(t, data.step1.counter, 0) + testing.expect_value(t, data.step2.counter, 0) + testing.expect_value(t, data.number, THREADS) +} + +@test +test_wait_group_with_timeout :: proc(t: ^testing.T) { + testing.set_fail_timeout(t, FAIL_TIME) + + wg: sync.Wait_Group + sync.wait_group_wait_with_timeout(&wg, SLEEP_TIME) +} + +@test +test_barrier :: proc(t: ^testing.T) { + testing.set_fail_timeout(t, FAIL_TIME) + + Data :: struct { + b: sync.Barrier, + i: int, + number: int, + + } + + p :: proc(th: ^thread.Thread) { + data := cast(^Data)th.data + + sync.barrier_wait(&data.b) + + intrinsics.atomic_add(&data.number, data.i) + } + + data: Data + data.i = -1 + threads: [THREADS]^thread.Thread + + sync.barrier_init(&data.b, THREADS + 1) // +1 for this thread, of course. + + for &v in threads { + v = thread.create(p) + v.data = &data + v.init_context = context + thread.start(v) + } + time.sleep(SMALL_SLEEP_TIME) + data.i = 1 + sync.barrier_wait(&data.b) + + wait_for(threads[:]) + + testing.expect_value(t, data.b.index, 0) + testing.expect_value(t, data.b.generation_id, 1) + testing.expect_value(t, data.b.thread_count, THREADS + 1) + testing.expect_value(t, data.number, THREADS) +} + +@test +test_auto_reset :: proc(t: ^testing.T) { + testing.set_fail_timeout(t, FAIL_TIME) + + Data :: struct { + a: sync.Auto_Reset_Event, + number: int, + } + + p :: proc(th: ^thread.Thread) { + data := cast(^Data)th.data + + // log.debugf("AUR-%v> entering", th.id) + sync.auto_reset_event_wait(&data.a) + // log.debugf("AUR-%v> adding", th.id) + data.number += 1 + // log.debugf("AUR-%v> signalling", th.id) + sync.auto_reset_event_signal(&data.a) + // log.debugf("AUR-%v> leaving", th.id) + } + + data: Data + threads: [THREADS]^thread.Thread + + for &v in threads { + v = thread.create(p) + v.data = &data + v.init_context = context + thread.start(v) + } + + // There is a chance that this test can stall if a signal is sent before + // all threads are queued, because it's possible for some number of threads + // to get to the waiting state, the signal to fire, all of the waited + // threads to pass successfully, then the other threads come in with no-one + // to run a signal. + // + // So we'll just test a fully-waited queue of cascading threads. + for { + status := intrinsics.atomic_load(&data.a.status) + if status == -THREADS { + // log.debug("All Auto_Reset_Event threads have queued.") + break + } + intrinsics.cpu_relax() + } + + sync.auto_reset_event_signal(&data.a) + + wait_for(threads[:]) + + // The last thread should leave this primitive in a signalled state. + testing.expect_value(t, data.a.status, 1) + testing.expect_value(t, data.number, THREADS) +} + +@test +test_auto_reset_already_signalled :: proc(t: ^testing.T) { + testing.set_fail_timeout(t, FAIL_TIME) + + a: sync.Auto_Reset_Event + sync.auto_reset_event_signal(&a) + sync.auto_reset_event_wait(&a) + testing.expect_value(t, a.status, 0) +} + +@test +test_ticket_mutex :: proc(t: ^testing.T) { + testing.set_fail_timeout(t, FAIL_TIME) + + Data :: struct { + m: sync.Ticket_Mutex, + number: int, + } + + p :: proc(th: ^thread.Thread) { + data := cast(^Data)th.data + + // log.debugf("TIC-%i> entering", th.id) + // intrinsics.debug_trap() + sync.ticket_mutex_lock(&data.m) + // log.debugf("TIC-%i> locked", th.id) + data.number += 1 + // log.debugf("TIC-%i> unlocking", th.id) + sync.ticket_mutex_unlock(&data.m) + // log.debugf("TIC-%i> leaving", th.id) + } + + data: Data + threads: [THREADS]^thread.Thread + + for &v in threads { + v = thread.create(p) + v.data = &data + v.init_context = context + thread.start(v) + } + + wait_for(threads[:]) + + testing.expect_value(t, data.m.ticket, THREADS) + testing.expect_value(t, data.m.serving, THREADS) + testing.expect_value(t, data.number, THREADS) +} + +@test +test_benaphore :: proc(t: ^testing.T) { + testing.set_fail_timeout(t, FAIL_TIME) + + Data :: struct { + b: sync.Benaphore, + number: int, + } + + p :: proc(th: ^thread.Thread) { + data := cast(^Data)th.data + sync.benaphore_lock(&data.b) + data.number += 1 + sync.benaphore_unlock(&data.b) + } + + data: Data + threads: [THREADS]^thread.Thread + + for &v in threads { + v = thread.create(p) + v.data = &data + v.init_context = context + thread.start(v) + } + + wait_for(threads[:]) + + testing.expect_value(t, data.b.counter, 0) + testing.expect_value(t, data.number, THREADS) +} + +@test +test_recursive_benaphore :: proc(t: ^testing.T) { + testing.set_fail_timeout(t, FAIL_TIME) + + Data :: struct { + b: sync.Recursive_Benaphore, + number: int, + } + + p :: proc(th: ^thread.Thread) { + data := cast(^Data)th.data + + // log.debugf("REC_BEP-%i> entering", th.id) + tried1 := sync.recursive_benaphore_try_lock(&data.b) + for _ in 0..<3 { + sync.recursive_benaphore_lock(&data.b) + } + tried2 := sync.recursive_benaphore_try_lock(&data.b) + // log.debugf("REC_BEP-%i> locked", th.id) + data.number += 1 + for _ in 0..<3 { + sync.recursive_benaphore_unlock(&data.b) + } + if tried1 { sync.recursive_benaphore_unlock(&data.b) } + if tried2 { sync.recursive_benaphore_unlock(&data.b) } + // log.debugf("REC_BEP-%i> leaving", th.id) + } + + data: Data + threads: [THREADS]^thread.Thread + + for &v in threads { + v = thread.create(p) + v.data = &data + v.init_context = context + thread.start(v) + } + + wait_for(threads[:]) + + // The benaphore should be unowned at the end. + testing.expect_value(t, data.b.counter, 0) + testing.expect_value(t, data.b.owner, 0) + testing.expect_value(t, data.b.recursion, 0) + testing.expect_value(t, data.number, THREADS) +} + +@test +test_once :: proc(t: ^testing.T) { + testing.set_fail_timeout(t, FAIL_TIME) + + Data :: struct { + once: sync.Once, + number: int, + } + + write :: proc "contextless" (data: rawptr) { + data := cast(^Data)data + data.number += 1 + } + + p :: proc(th: ^thread.Thread) { + data := cast(^Data)th.data + // log.debugf("ONCE-%v> entering", th.id) + sync.once_do_with_data_contextless(&data.once, write, data) + // log.debugf("ONCE-%v> leaving", th.id) + } + + data: Data + threads: [THREADS]^thread.Thread + + for &v in threads { + v = thread.create(p) + v.data = &data + v.init_context = context + thread.start(v) + } + + wait_for(threads[:]) + + testing.expect_value(t, data.once.done, true) + testing.expect_value(t, data.number, 1) +} + +@test +test_park :: proc(t: ^testing.T) { + testing.set_fail_timeout(t, FAIL_TIME) + + Data :: struct { + car: sync.Parker, + number: int, + } + + data: Data + + th := thread.create_and_start_with_data(&data, proc(data: rawptr) { + data := cast(^Data)data + time.sleep(SLEEP_TIME) + sync.unpark(&data.car) + data.number += 1 + }) + + sync.park(&data.car) + + wait_for([]^thread.Thread{ th }) + + PARKER_EMPTY :: 0 + testing.expect_value(t, data.car.state, PARKER_EMPTY) + testing.expect_value(t, data.number, 1) +} + +@test +test_park_with_timeout :: proc(t: ^testing.T) { + testing.set_fail_timeout(t, FAIL_TIME) + + car: sync.Parker + sync.park_with_timeout(&car, SLEEP_TIME) +} + +@test +test_one_shot_event :: proc(t: ^testing.T) { + testing.set_fail_timeout(t, FAIL_TIME) + + Data :: struct { + event: sync.One_Shot_Event, + number: int, + } + + data: Data + + th := thread.create_and_start_with_data(&data, proc(data: rawptr) { + data := cast(^Data)data + time.sleep(SLEEP_TIME) + sync.one_shot_event_signal(&data.event) + data.number += 1 + }) + + sync.one_shot_event_wait(&data.event) + + wait_for([]^thread.Thread{ th }) + + testing.expect_value(t, data.event.state, 1) + testing.expect_value(t, data.number, 1) +}