From ed8a6f872dbcd8b195940dec40a0d86d59f11eaa Mon Sep 17 00:00:00 2001 From: gingerBill Date: Sat, 10 Jul 2021 21:29:49 +0100 Subject: [PATCH] Move things around for sanity checking for multithread preparation --- src/check_decl.cpp | 18 +++++++++++++++--- src/check_expr.cpp | 1 + src/checker.cpp | 41 +++++++++++++++++++++++++++++------------ src/checker.hpp | 29 +++++++++++++++++------------ src/common.cpp | 6 ++++++ src/parser.cpp | 16 ++++++++++------ src/parser.hpp | 22 ++++++++++++---------- src/queue.cpp | 5 ++++- src/thread_pool.cpp | 40 ++++++++++------------------------------ 9 files changed, 104 insertions(+), 74 deletions(-) diff --git a/src/check_decl.cpp b/src/check_decl.cpp index 84817e9d8..d7f04ca5c 100644 --- a/src/check_decl.cpp +++ b/src/check_decl.cpp @@ -809,9 +809,7 @@ void check_proc_decl(CheckerContext *ctx, Entity *e, DeclInfo *d) { if (ac.deferred_procedure.entity != nullptr) { e->Procedure.deferred_procedure = ac.deferred_procedure; - gb_mutex_lock(&ctx->checker->procs_with_deferred_to_check_mutex); - array_add(&ctx->checker->procs_with_deferred_to_check, e); - gb_mutex_unlock(&ctx->checker->procs_with_deferred_to_check_mutex); + mpmc_enqueue(&ctx->checker->procs_with_deferred_to_check, e); } if (is_foreign) { @@ -824,6 +822,8 @@ void check_proc_decl(CheckerContext *ctx, Entity *e, DeclInfo *d) { init_entity_foreign_library(ctx, e); + gb_mutex_lock(&ctx->info->foreign_mutex); + auto *fp = &ctx->info->foreigns; StringHashKey key = string_hash_string(name); Entity **found = string_map_get(fp, key); @@ -850,12 +850,16 @@ void check_proc_decl(CheckerContext *ctx, Entity *e, DeclInfo *d) { } else { string_map_set(fp, key, e); } + + gb_mutex_unlock(&ctx->info->foreign_mutex); } else { String name = e->token.string; if (e->Procedure.link_name.len > 0) { name = e->Procedure.link_name; } if (e->Procedure.link_name.len > 0 || is_export) { + gb_mutex_lock(&ctx->info->foreign_mutex); + auto *fp = &ctx->info->foreigns; StringHashKey key = string_hash_string(name); Entity **found = string_map_get(fp, key); @@ -872,6 +876,8 @@ void check_proc_decl(CheckerContext *ctx, Entity *e, DeclInfo *d) { } else { string_map_set(fp, key, e); } + + gb_mutex_unlock(&ctx->info->foreign_mutex); } } } @@ -896,7 +902,9 @@ void check_global_variable_decl(CheckerContext *ctx, Entity *&e, Ast *type_expr, } if (ac.require_declaration) { + gb_mutex_lock(&ctx->info->entity_mutex); array_add(&ctx->info->required_global_variables, e); + gb_mutex_unlock(&ctx->info->entity_mutex); } @@ -1320,6 +1328,8 @@ void check_proc_body(CheckerContext *ctx_, Token token, DeclInfo *decl, Type *ty if (ps->flags & (ScopeFlag_File & ScopeFlag_Pkg & ScopeFlag_Global)) { return; } else { + gb_mutex_lock(&ctx->info->deps_mutex); + // NOTE(bill): Add the dependencies from the procedure literal (lambda) // But only at the procedure level for_array(i, decl->deps.entries) { @@ -1330,6 +1340,8 @@ void check_proc_body(CheckerContext *ctx_, Token token, DeclInfo *decl, Type *ty Type *t = decl->type_info_deps.entries[i].ptr; ptr_set_add(&decl->parent->type_info_deps, t); } + + gb_mutex_unlock(&ctx->info->deps_mutex); } } } diff --git a/src/check_expr.cpp b/src/check_expr.cpp index 9399f94dc..3ff002eb1 100644 --- a/src/check_expr.cpp +++ b/src/check_expr.cpp @@ -223,6 +223,7 @@ bool find_or_generate_polymorphic_procedure(CheckerContext *c, Entity *base_enti if (!is_type_proc(base_entity->type)) { return false; } + String name = base_entity->token.string; Type *src = base_type(base_entity->type); diff --git a/src/checker.cpp b/src/checker.cpp index 41c317527..e7a53ded9 100644 --- a/src/checker.cpp +++ b/src/checker.cpp @@ -856,6 +856,10 @@ void init_checker_info(CheckerInfo *i) { gb_mutex_init(&i->gen_procs_mutex); gb_mutex_init(&i->gen_types_mutex); gb_mutex_init(&i->type_info_mutex); + gb_mutex_init(&i->deps_mutex); + gb_mutex_init(&i->identifier_uses_mutex); + gb_mutex_init(&i->entity_mutex); + gb_mutex_init(&i->foreign_mutex); } @@ -879,6 +883,10 @@ void destroy_checker_info(CheckerInfo *i) { gb_mutex_destroy(&i->gen_procs_mutex); gb_mutex_destroy(&i->gen_types_mutex); gb_mutex_destroy(&i->type_info_mutex); + gb_mutex_destroy(&i->deps_mutex); + gb_mutex_destroy(&i->identifier_uses_mutex); + gb_mutex_destroy(&i->entity_mutex); + gb_mutex_destroy(&i->foreign_mutex); } CheckerContext make_checker_context(Checker *c) { @@ -945,8 +953,7 @@ bool init_checker(Checker *c, Parser *parser) { init_checker_info(&c->info); c->info.checker = c; - gb_mutex_init(&c->procs_with_deferred_to_check_mutex); - array_init(&c->procs_with_deferred_to_check, a); + mpmc_init(&c->procs_with_deferred_to_check, a, 1<<10); // NOTE(bill): Is this big enough or too small? isize item_size = gb_max3(gb_size_of(Entity), gb_size_of(Type), gb_size_of(Scope)); @@ -957,18 +964,17 @@ bool init_checker(Checker *c, Parser *parser) { // NOTE(bill): 1 Mi elements should be enough on average mpmc_init(&c->procs_to_check_queue, heap_allocator(), 1<<20); + gb_semaphore_init(&c->procs_to_check_semaphore); return true; } void destroy_checker(Checker *c) { destroy_checker_info(&c->info); - gb_mutex_destroy(&c->procs_with_deferred_to_check_mutex); - array_free(&c->procs_with_deferred_to_check); - destroy_checker_context(&c->builtin_ctx); // mpmc_destroy(&c->procs_to_check_queue); + // gb_semaphore_destroy(&c->procs_to_check_semaphore); } @@ -1160,7 +1166,9 @@ void add_entity_definition(CheckerInfo *i, Ast *identifier, Entity *entity) { GB_ASSERT(entity != nullptr); identifier->Ident.entity = entity; entity->identifier = identifier; + gb_mutex_lock(&i->entity_mutex); array_add(&i->definitions, entity); + gb_mutex_unlock(&i->entity_mutex); } bool redeclaration_error(String name, Entity *prev, Entity *found) { @@ -1242,7 +1250,9 @@ void add_entity_use(CheckerContext *c, Ast *identifier, Entity *entity) { identifier->Ident.entity = entity; if (c->info->allow_identifier_uses) { + gb_mutex_lock(&c->info->identifier_uses_mutex); array_add(&c->info->identifier_uses, identifier); + gb_mutex_unlock(&c->info->identifier_uses_mutex); } String dmsg = entity->deprecated_message; @@ -1278,13 +1288,16 @@ void add_entity_and_decl_info(CheckerContext *c, Ast *identifier, Entity *e, Dec add_entity(c, scope, identifier, e); } - add_entity_definition(&c->checker->info, identifier, e); + CheckerInfo *info = c->info; + add_entity_definition(info, identifier, e); GB_ASSERT(e->decl_info == nullptr); + gb_mutex_lock(&info->entity_mutex); e->decl_info = d; d->entity = e; - array_add(&c->checker->info.entities, e); - e->order_in_src = c->checker->info.entities.count; + array_add(&info->entities, e); + e->order_in_src = info->entities.count; // Is this even correct? e->pkg = c->pkg; + gb_mutex_unlock(&info->entity_mutex); } @@ -1311,11 +1324,11 @@ void add_type_info_type(CheckerContext *c, Type *t) { return; } - add_type_info_dependency(c->decl, t); - gb_mutex_lock(&c->info->type_info_mutex); defer (gb_mutex_unlock(&c->info->type_info_mutex)); + add_type_info_dependency(c->decl, t); + auto found = map_get(&c->info->type_info_map, hash_type(t)); if (found != nullptr) { // Types have already been added @@ -3760,7 +3773,9 @@ void check_add_foreign_import_decl(CheckerContext *ctx, Ast *decl) { AttributeContext ac = {}; check_decl_attributes(ctx, fl->attributes, foreign_import_decl_attribute, &ac); if (ac.require_declaration) { + gb_mutex_lock(&ctx->info->foreign_mutex); array_add(&ctx->info->required_foreign_imports_through_force, e); + gb_mutex_unlock(&ctx->info->foreign_mutex); add_entity_use(ctx, nullptr, e); } } @@ -4385,14 +4400,17 @@ void check_test_names(Checker *c) { } + void check_procedure_bodies(Checker *c) { auto *q = &c->procs_to_check_queue; ProcInfo *pi = nullptr; + while (mpmc_dequeue(q, &pi)) { if (pi->decl->parent && pi->decl->parent->entity) { Entity *parent = pi->decl->parent->entity; // NOTE(bill): Only check a nested procedure if its parent's body has been checked first // This is prevent any possible race conditions in evaluation when multithreaded + // NOTE(bill): In single threaded mode, this should never happen if (parent->kind == Entity_Procedure && (parent->flags & EntityFlag_ProcBodyChecked) == 0) { mpmc_enqueue(q, pi); continue; @@ -4545,8 +4563,7 @@ void check_parsed_files(Checker *c) { } TIME_SECTION("check deferred procedures"); - for_array(i, c->procs_with_deferred_to_check) { - Entity *src = c->procs_with_deferred_to_check[i]; + for (Entity *src = nullptr; mpmc_dequeue(&c->procs_with_deferred_to_check, &src); /**/) { GB_ASSERT(src->kind == Entity_Procedure); DeferredProcedureKind dst_kind = src->Procedure.deferred_procedure.kind; diff --git a/src/checker.hpp b/src/checker.hpp index ad1edb934..9395d1565 100644 --- a/src/checker.hpp +++ b/src/checker.hpp @@ -264,11 +264,7 @@ struct CheckerInfo { StringMap files; // Key (full path) StringMap packages; // Key (full path) - StringMap foreigns; - Array definitions; - Array entities; - Array variable_init_order; - + Array variable_init_order; AstPackage * builtin_package; AstPackage * runtime_package; @@ -278,14 +274,9 @@ struct CheckerInfo { PtrSet minimum_dependency_set; PtrSet minimum_dependency_type_info_set; - Array required_foreign_imports_through_force; - Array required_global_variables; Array testing_procedures; - bool allow_identifier_uses; - Array identifier_uses; // only used by 'odin query' - // Below are accessed within procedures // NOTE(bill): If the semantic checker (check_proc_body) is to ever to be multithreaded, // these variables will be of contention @@ -294,6 +285,10 @@ struct CheckerInfo { gbMutex gen_procs_mutex; gbMutex gen_types_mutex; gbMutex type_info_mutex; + gbMutex deps_mutex; + gbMutex identifier_uses_mutex; + gbMutex entity_mutex; + gbMutex foreign_mutex; Map untyped; // Key: Ast * | Expression -> ExprInfo * // NOTE(bill): This needs to be a map and not on the Ast @@ -304,6 +299,16 @@ struct CheckerInfo { Array type_info_types; Map type_info_map; // Key: Type * + + bool allow_identifier_uses; + Array identifier_uses; // only used by 'odin query' + + Array definitions; + Array entities; + StringMap foreigns; + + Array required_global_variables; + Array required_foreign_imports_through_force; }; struct CheckerContext { @@ -351,10 +356,10 @@ struct Checker { CheckerContext builtin_ctx; - gbMutex procs_with_deferred_to_check_mutex; - Array procs_with_deferred_to_check; + MPMCQueue procs_with_deferred_to_check; MPMCQueue procs_to_check_queue; + gbSemaphore procs_to_check_semaphore; }; diff --git a/src/common.cpp b/src/common.cpp index 2591ca068..b0b1c3353 100644 --- a/src/common.cpp +++ b/src/common.cpp @@ -399,6 +399,7 @@ gb_global Arena permanent_arena = {}; void arena_init(Arena *arena, gbAllocator backing, isize block_size=ARENA_DEFAULT_BLOCK_SIZE) { arena->backing = backing; arena->block_size = block_size; + arena->use_mutex = true; array_init(&arena->blocks, backing, 0, 2); gb_mutex_init(&arena->mutex); } @@ -521,6 +522,7 @@ struct Temp_Allocator { isize curr_offset; gbAllocator backup_allocator; Array leaked_allocations; + gbMutex mutex; }; gb_global Temp_Allocator temporary_allocator_data = {}; @@ -531,6 +533,7 @@ void temp_allocator_init(Temp_Allocator *s, isize size) { s->len = size; s->curr_offset = 0; s->leaked_allocations.allocator = s->backup_allocator; + gb_mutex_init(&s->mutex); } void *temp_allocator_alloc(Temp_Allocator *s, isize size, isize alignment) { @@ -573,6 +576,9 @@ GB_ALLOCATOR_PROC(temp_allocator_proc) { Temp_Allocator *s = cast(Temp_Allocator *)allocator_data; GB_ASSERT_NOT_NULL(s); + gb_mutex_lock(&s->mutex); + defer (gb_mutex_unlock(&s->mutex)); + switch (type) { case gbAllocation_Alloc: return temp_allocator_alloc(s, size, alignment); diff --git a/src/parser.cpp b/src/parser.cpp index 7e7b317c4..e45195578 100644 --- a/src/parser.cpp +++ b/src/parser.cpp @@ -4664,8 +4664,10 @@ bool init_parser(Parser *p) { string_map_init(&p->package_map, heap_allocator()); array_init(&p->packages, heap_allocator()); array_init(&p->package_imports, heap_allocator()); + gb_mutex_init(&p->import_mutex); gb_mutex_init(&p->file_add_mutex); gb_mutex_init(&p->file_decl_mutex); + mpmc_init(&p->file_error_queue, heap_allocator(), 1024); return true; } @@ -4689,8 +4691,10 @@ void destroy_parser(Parser *p) { array_free(&p->package_imports); string_set_destroy(&p->imported_files); string_map_destroy(&p->package_map); + gb_mutex_destroy(&p->import_mutex); gb_mutex_destroy(&p->file_add_mutex); gb_mutex_destroy(&p->file_decl_mutex); + mpmc_destroy(&p->file_error_queue); } @@ -4718,6 +4722,9 @@ ParseFileError process_imported_file(Parser *p, ImportedFile const &imported_fil WORKER_TASK_PROC(parser_worker_proc) { ParserWorkerData *wd = cast(ParserWorkerData *)data; ParseFileError err = process_imported_file(wd->parser, wd->imported_file); + if (err != ParseFile_None) { + mpmc_enqueue(&wd->parser->file_error_queue, err); + } return cast(isize)err; } @@ -4775,8 +4782,8 @@ void parser_add_foreign_file_to_process(Parser *p, AstPackage *pkg, AstForeignFi AstPackage *try_add_import_path(Parser *p, String const &path, String const &rel_path, TokenPos pos, PackageKind kind = Package_Normal) { String const FILE_EXT = str_lit(".odin"); - gb_mutex_lock(&p->file_add_mutex); - defer (gb_mutex_unlock(&p->file_add_mutex)); + gb_mutex_lock(&p->import_mutex); + defer (gb_mutex_unlock(&p->import_mutex)); if (string_set_exists(&p->imported_files, path)) { return nullptr; @@ -5471,10 +5478,7 @@ ParseFileError parse_packages(Parser *p, String init_filename) { thread_pool_start(&parser_thread_pool); thread_pool_wait_to_process(&parser_thread_pool); - // NOTE(bill): Get the last error and use that - for (isize i = parser_thread_pool.task_tail-1; i >= 0; i--) { - WorkerTask *task = &parser_thread_pool.tasks[i]; - ParseFileError err = cast(ParseFileError)task->result; + for (ParseFileError err = ParseFile_None; mpmc_dequeue(&p->file_error_queue, &err); /**/) { if (err != ParseFile_None) { return err; } diff --git a/src/parser.hpp b/src/parser.hpp index ad2b8c260..b14e336c3 100644 --- a/src/parser.hpp +++ b/src/parser.hpp @@ -162,16 +162,18 @@ struct AstPackage { struct Parser { - String init_fullpath; - StringSet imported_files; // fullpath - StringMap package_map; // Key(package name) - Array packages; - Array package_imports; - isize file_to_process_count; - isize total_token_count; - isize total_line_count; - gbMutex file_add_mutex; - gbMutex file_decl_mutex; + String init_fullpath; + StringSet imported_files; // fullpath + StringMap package_map; // Key(package name) + Array packages; + Array package_imports; + isize file_to_process_count; + isize total_token_count; + isize total_line_count; + gbMutex import_mutex; + gbMutex file_add_mutex; + gbMutex file_decl_mutex; + MPMCQueue file_error_queue; }; diff --git a/src/queue.cpp b/src/queue.cpp index 7087af03e..048f8fcb9 100644 --- a/src/queue.cpp +++ b/src/queue.cpp @@ -15,6 +15,7 @@ struct MPMCQueue { isize mask; Array> buffer; gbMutex mutex; + std::atomic count; CacheLinePad pad1; std::atomic head_idx; @@ -42,7 +43,7 @@ void mpmc_init(MPMCQueue *q, gbAllocator a, isize size) { template void mpmc_destroy(MPMCQueue *q) { gb_mutex_destroy(&q->mutex); - gb_array_free(&q->buffer); + gb_free(q->buffer.allocator, q->buffer.data); } @@ -60,6 +61,7 @@ bool mpmc_enqueue(MPMCQueue *q, T const &data) { if (q->head_idx.compare_exchange_weak(head_idx, next_head_idx)) { node->data = data; node->idx.store(next_head_idx, std::memory_order_release); + q->count.fetch_add(1, std::memory_order_release); return true; } } else if (diff < 0) { @@ -98,6 +100,7 @@ bool mpmc_dequeue(MPMCQueue *q, T *data_) { if (q->tail_idx.compare_exchange_weak(tail_idx, next_tail_idx)) { if (data_) *data_ = node->data; node->idx.store(tail_idx + q->mask + 1, std::memory_order_release); + q->count.fetch_sub(1, std::memory_order_release); return true; } } else if (diff < 0) { diff --git a/src/thread_pool.cpp b/src/thread_pool.cpp index 73118321b..9e178b833 100644 --- a/src/thread_pool.cpp +++ b/src/thread_pool.cpp @@ -18,10 +18,7 @@ struct ThreadPool { gbAllocator allocator; - WorkerTask *tasks; - isize volatile task_head; - isize volatile task_tail; - isize volatile task_capacity; + MPMCQueue tasks; gbThread *threads; isize thread_count; @@ -39,10 +36,7 @@ GB_THREAD_PROC(worker_thread_internal); void thread_pool_init(ThreadPool *pool, gbAllocator const &a, isize thread_count, char const *worker_prefix) { pool->allocator = a; - pool->task_head = 0; - pool->task_tail = 0; - pool->task_capacity = 1024; - pool->tasks = gb_alloc_array(a, WorkerTask, pool->task_capacity); + mpmc_init(&pool->tasks, a, 1024); pool->thread_count = gb_max(thread_count, 0); pool->threads = gb_alloc_array(a, gbThread, pool->thread_count); gb_mutex_init(&pool->mutex); @@ -100,41 +94,27 @@ void thread_pool_destroy(ThreadPool *pool) { gb_mutex_destroy(&pool->mutex); gb_free(pool->allocator, pool->threads); pool->thread_count = 0; - gb_free(pool->allocator, pool->tasks); - pool->task_head = 0; - pool->task_tail = 0; - pool->task_capacity = 0; + mpmc_destroy(&pool->tasks); } void thread_pool_add_task(ThreadPool *pool, WorkerTaskProc *proc, void *data) { gb_mutex_lock(&pool->mutex); - if (pool->task_tail == pool->task_capacity) { - isize new_cap = 2*pool->task_capacity + 8; - WorkerTask *new_tasks = gb_alloc_array(pool->allocator, WorkerTask, new_cap); - gb_memmove(new_tasks, pool->tasks, (pool->task_tail)*gb_size_of(WorkerTask)); - pool->tasks = new_tasks; - pool->task_capacity = new_cap; - } WorkerTask task = {}; task.do_work = proc; task.data = data; - pool->tasks[pool->task_tail++] = task; + mpmc_enqueue(&pool->tasks, task); gb_semaphore_post(&pool->sem_available, 1); gb_mutex_unlock(&pool->mutex); } bool thread_pool_try_and_pop_task(ThreadPool *pool, WorkerTask *task) { bool got_task = false; - if (gb_mutex_try_lock(&pool->mutex)) { - if (pool->task_tail > pool->task_head) { - gb_atomic32_fetch_add(&pool->processing_work_count, +1); - *task = pool->tasks[pool->task_head++]; - got_task = true; - } - gb_mutex_unlock(&pool->mutex); + if (mpmc_dequeue(&pool->tasks, task)) { + gb_atomic32_fetch_add(&pool->processing_work_count, +1); + got_task = true; } return got_task; } @@ -144,16 +124,16 @@ void thread_pool_do_work(ThreadPool *pool, WorkerTask *task) { } void thread_pool_wait_to_process(ThreadPool *pool) { - while (pool->task_tail > pool->task_head || gb_atomic32_load(&pool->processing_work_count) != 0) { + while (pool->tasks.count.load(std::memory_order_relaxed) > 0 || gb_atomic32_load(&pool->processing_work_count) != 0) { WorkerTask task = {}; if (thread_pool_try_and_pop_task(pool, &task)) { thread_pool_do_work(pool, &task); } // Safety-kick - if (pool->task_tail > pool->task_head && gb_atomic32_load(&pool->processing_work_count) == 0) { + while (pool->tasks.count.load(std::memory_order_relaxed) > 0 && gb_atomic32_load(&pool->processing_work_count) == 0) { gb_mutex_lock(&pool->mutex); - gb_semaphore_post(&pool->sem_available, cast(i32)(pool->task_tail-pool->task_head)); + gb_semaphore_post(&pool->sem_available, cast(i32)pool->tasks.count.load(std::memory_order_relaxed)); gb_mutex_unlock(&pool->mutex); }