From cec230950466afc11acf96f8d6030ee09bdbc315 Mon Sep 17 00:00:00 2001 From: gingerBill Date: Tue, 13 Jul 2021 16:58:40 +0100 Subject: [PATCH] Big improvement to the `-threaded-checker` code, unifying the logic and simplify behaviour --- src/check_decl.cpp | 3 +- src/check_expr.cpp | 4 +- src/checker.cpp | 142 ++++++++++++++++++++++++++++++++------------- src/checker.hpp | 5 +- 4 files changed, 108 insertions(+), 46 deletions(-) diff --git a/src/check_decl.cpp b/src/check_decl.cpp index 8b66452f3..a82215815 100644 --- a/src/check_decl.cpp +++ b/src/check_decl.cpp @@ -786,8 +786,7 @@ void check_proc_decl(CheckerContext *ctx, Entity *e, DeclInfo *d) { GB_ASSERT(pl->body->kind == Ast_BlockStmt); if (!pt->is_polymorphic) { - // check_procedure_now(ctx->checker, ctx->file, e->token, d, proc_type, pl->body, pl->tags); - check_procedure_later(ctx->checker, ctx->file, e->token, d, proc_type, pl->body, pl->tags); + check_procedure_later(ctx, ctx->file, e->token, d, proc_type, pl->body, pl->tags); } } else if (!is_foreign) { if (e->Procedure.is_export) { diff --git a/src/check_expr.cpp b/src/check_expr.cpp index 0c8bea126..beff1a532 100644 --- a/src/check_expr.cpp +++ b/src/check_expr.cpp @@ -443,7 +443,7 @@ bool find_or_generate_polymorphic_procedure(CheckerContext *c, Entity *base_enti } // NOTE(bill): Check the newly generated procedure body - check_procedure_later(nctx.checker, proc_info); + check_procedure_later(&nctx, proc_info); return true; } @@ -6497,7 +6497,7 @@ ExprKind check_expr_base_internal(CheckerContext *c, Operand *o, Ast *node, Type } pl->decl = decl; - check_procedure_later(ctx.checker, ctx.file, empty_token, decl, type, pl->body, pl->tags); + check_procedure_later(&ctx, ctx.file, empty_token, decl, type, pl->body, pl->tags); } check_close_scope(&ctx); diff --git a/src/checker.cpp b/src/checker.cpp index b23ac9e2d..7acaaa7a4 100644 --- a/src/checker.cpp +++ b/src/checker.cpp @@ -925,8 +925,10 @@ void reset_checker_context(CheckerContext *ctx, AstFile *file) { return; } destroy_checker_context(ctx); + auto *queue = ctx->procs_to_check_queue; *ctx = make_checker_context(ctx->checker); add_curr_ast_file(ctx, file); + ctx->procs_to_check_queue = queue; } @@ -966,8 +968,8 @@ void destroy_checker(Checker *c) { destroy_checker_context(&c->builtin_ctx); - // mpmc_destroy(&c->procs_to_check_queue); - // gb_semaphore_destroy(&c->procs_to_check_semaphore); + mpmc_destroy(&c->procs_to_check_queue); + gb_semaphore_destroy(&c->procs_to_check_semaphore); } @@ -1519,15 +1521,21 @@ void add_type_info_type(CheckerContext *c, Type *t) { } } -void check_procedure_later(Checker *c, ProcInfo *info) { +gb_global bool global_procedure_body_in_worker_queue = false; + +void check_procedure_later(CheckerContext *c, ProcInfo *info) { GB_ASSERT(info != nullptr); GB_ASSERT(info->decl != nullptr); - mpmc_enqueue(&c->procs_to_check_queue, info); - gb_semaphore_post(&c->procs_to_check_semaphore, 1); + if (build_context.threaded_checker && global_procedure_body_in_worker_queue) { + GB_ASSERT(c->procs_to_check_queue != nullptr); + } + + auto *queue = c->procs_to_check_queue ? c->procs_to_check_queue : &c->checker->procs_to_check_queue; + mpmc_enqueue(queue, info); } -void check_procedure_later(Checker *c, AstFile *file, Token token, DeclInfo *decl, Type *type, Ast *body, u64 tags) { +void check_procedure_later(CheckerContext *c, AstFile *file, Token token, DeclInfo *decl, Type *type, Ast *body, u64 tags) { ProcInfo *info = gb_alloc_item(permanent_allocator(), ProcInfo); info->file = file; info->token = token; @@ -4274,7 +4282,7 @@ void calculate_global_init_order(Checker *c) { } -void check_proc_info(Checker *c, ProcInfo *pi, UntypedExprInfoMap *untyped) { +void check_proc_info(Checker *c, ProcInfo *pi, UntypedExprInfoMap *untyped, ProcBodyQueue *procs_to_check_queue) { if (pi == nullptr) { return; } @@ -4290,6 +4298,7 @@ void check_proc_info(Checker *c, ProcInfo *pi, UntypedExprInfoMap *untyped) { reset_checker_context(&ctx, pi->file); ctx.decl = pi->decl; ctx.untyped = untyped; + ctx.procs_to_check_queue = procs_to_check_queue; TypeProc *pt = &pi->type->Proc; String name = pi->token.string; @@ -4370,7 +4379,7 @@ void check_unchecked_bodies(Checker *c) { } map_clear(&untyped); - check_proc_info(c, &pi, &untyped); + check_proc_info(c, &pi, &untyped, nullptr); } } } @@ -4407,45 +4416,22 @@ void check_test_names(Checker *c) { } -static bool proc_bodies_is_running; +struct ThreadProcBodyData { + Checker *checker; + ProcBodyQueue *queue; +}; + +gb_global std::atomic total_bodies_checked; GB_THREAD_PROC(thread_proc_body) { - Checker *c = cast(Checker *)thread->user_data; - auto *q = &c->procs_to_check_queue; - ProcInfo *pi = nullptr; - + ThreadProcBodyData *data = cast(ThreadProcBodyData *)thread->user_data; + Checker *c = data->checker; + ProcBodyQueue *q = data->queue; UntypedExprInfoMap untyped = {}; map_init(&untyped, heap_allocator()); defer (map_destroy(&untyped)); - while (proc_bodies_is_running) { - gb_semaphore_wait(&c->procs_to_check_semaphore); - - if (mpmc_dequeue(q, &pi)) { - if (pi->decl->parent && pi->decl->parent->entity) { - Entity *parent = pi->decl->parent->entity; - // NOTE(bill): Only check a nested procedure if its parent's body has been checked first - // This is prevent any possible race conditions in evaluation when multithreaded - // NOTE(bill): In single threaded mode, this should never happen - if (parent->kind == Entity_Procedure && (parent->flags & EntityFlag_ProcBodyChecked) == 0) { - mpmc_enqueue(q, pi); - continue; - } - } - map_clear(&untyped); - check_proc_info(c, pi, &untyped); - } - } - - gb_semaphore_release(&c->procs_to_check_semaphore); - - return 0; -} - -void check_procedure_bodies(Checker *c) { - auto *q = &c->procs_to_check_queue; ProcInfo *pi = nullptr; - while (mpmc_dequeue(q, &pi)) { GB_ASSERT(pi->decl != nullptr); if (pi->decl->parent && pi->decl->parent->entity) { @@ -4458,10 +4444,84 @@ void check_procedure_bodies(Checker *c) { continue; } } - check_proc_info(c, pi, nullptr); + map_clear(&untyped); + check_proc_info(c, pi, &untyped, q); + total_bodies_checked.fetch_add(1, std::memory_order_relaxed); } + + gb_semaphore_release(&c->procs_to_check_semaphore); + + return 0; } +void check_procedure_bodies(Checker *c) { + + isize thread_count = gb_max(build_context.thread_count, 1); + isize worker_count = thread_count-1; // NOTE(bill): The main thread will also be used for work + if (!build_context.threaded_checker) { + worker_count = 0; + } + + global_procedure_body_in_worker_queue = true; + + isize original_queue_count = c->procs_to_check_queue.count.load(std::memory_order_relaxed); + isize load_count = (original_queue_count+thread_count-1)/thread_count; + + ThreadProcBodyData *thread_data = gb_alloc_array(permanent_allocator(), ThreadProcBodyData, thread_count); + for (isize i = 0; i < thread_count; i++) { + ThreadProcBodyData *data = thread_data + i; + data->checker = c; + data->queue = gb_alloc_item(permanent_allocator(), ProcBodyQueue); + mpmc_init(data->queue, heap_allocator(), next_pow2_isize(load_count*2)); + } + + for (isize i = 0; i < thread_count; i++) { + ProcBodyQueue *queue = thread_data[i].queue; + for (isize j = 0; j < load_count; j++) { + ProcInfo *pi = nullptr; + if (!mpmc_dequeue(&c->procs_to_check_queue, &pi)) { + break; + } + mpmc_enqueue(queue, pi); + } + } + isize total_queued = 0; + for (isize i = 0; i < thread_count; i++) { + ProcBodyQueue *queue = thread_data[i].queue; + total_queued += queue->count.load(); + } + GB_ASSERT(total_queued == original_queue_count); + + + gb_semaphore_post(&c->procs_to_check_semaphore, cast(i32)thread_count); + + gbThread *threads = gb_alloc_array(permanent_allocator(), gbThread, worker_count); + for (isize i = 0; i < worker_count; i++) { + gb_thread_init(threads+i); + } + + for (isize i = 0; i < worker_count; i++) { + gb_thread_start(threads+i, thread_proc_body, thread_data+i); + } + gbThread dummy_main_thread = {}; + dummy_main_thread.user_data = thread_data+worker_count; + thread_proc_body(&dummy_main_thread); + gb_semaphore_release(&c->procs_to_check_semaphore); + + for (isize i = 0; i < worker_count; i++) { + gb_thread_join(threads+i); + } + + gb_semaphore_wait(&c->procs_to_check_semaphore); + + for (isize i = 0; i < worker_count; i++) { + gb_thread_destroy(threads+i); + } + + // gb_printf_err("Total Procedure Bodies Checked: %td\n", total_bodies_checked.load(std::memory_order_relaxed)); + + global_procedure_body_in_worker_queue = false; +} void add_untyped_expressions(CheckerInfo *cinfo, UntypedExprInfoMap *untyped) { if (untyped == nullptr) { return; diff --git a/src/checker.hpp b/src/checker.hpp index 98aba8273..c531020a8 100644 --- a/src/checker.hpp +++ b/src/checker.hpp @@ -265,6 +265,7 @@ struct UntypedExprInfo { }; typedef Map UntypedExprInfoMap; // Key: Ast * +typedef MPMCQueue ProcBodyQueue; // CheckerInfo stores all the symbol information for a type-checked program struct CheckerInfo { @@ -358,6 +359,8 @@ struct CheckerContext { Scope * polymorphic_scope; Ast *assignment_lhs_hint; + + ProcBodyQueue *procs_to_check_queue; }; struct Checker { @@ -368,7 +371,7 @@ struct Checker { MPMCQueue procs_with_deferred_to_check; - MPMCQueue procs_to_check_queue; + ProcBodyQueue procs_to_check_queue; gbSemaphore procs_to_check_semaphore; gbMutex poly_type_mutex;