Move things around for sanity checking for multithread preparation

This commit is contained in:
gingerBill
2021-07-10 21:29:49 +01:00
parent 0a61d4bf2b
commit ed8a6f872d
9 changed files with 104 additions and 74 deletions

View File

@@ -809,9 +809,7 @@ void check_proc_decl(CheckerContext *ctx, Entity *e, DeclInfo *d) {
if (ac.deferred_procedure.entity != nullptr) {
e->Procedure.deferred_procedure = ac.deferred_procedure;
gb_mutex_lock(&ctx->checker->procs_with_deferred_to_check_mutex);
array_add(&ctx->checker->procs_with_deferred_to_check, e);
gb_mutex_unlock(&ctx->checker->procs_with_deferred_to_check_mutex);
mpmc_enqueue(&ctx->checker->procs_with_deferred_to_check, e);
}
if (is_foreign) {
@@ -824,6 +822,8 @@ void check_proc_decl(CheckerContext *ctx, Entity *e, DeclInfo *d) {
init_entity_foreign_library(ctx, e);
gb_mutex_lock(&ctx->info->foreign_mutex);
auto *fp = &ctx->info->foreigns;
StringHashKey key = string_hash_string(name);
Entity **found = string_map_get(fp, key);
@@ -850,12 +850,16 @@ void check_proc_decl(CheckerContext *ctx, Entity *e, DeclInfo *d) {
} else {
string_map_set(fp, key, e);
}
gb_mutex_unlock(&ctx->info->foreign_mutex);
} else {
String name = e->token.string;
if (e->Procedure.link_name.len > 0) {
name = e->Procedure.link_name;
}
if (e->Procedure.link_name.len > 0 || is_export) {
gb_mutex_lock(&ctx->info->foreign_mutex);
auto *fp = &ctx->info->foreigns;
StringHashKey key = string_hash_string(name);
Entity **found = string_map_get(fp, key);
@@ -872,6 +876,8 @@ void check_proc_decl(CheckerContext *ctx, Entity *e, DeclInfo *d) {
} else {
string_map_set(fp, key, e);
}
gb_mutex_unlock(&ctx->info->foreign_mutex);
}
}
}
@@ -896,7 +902,9 @@ void check_global_variable_decl(CheckerContext *ctx, Entity *&e, Ast *type_expr,
}
if (ac.require_declaration) {
gb_mutex_lock(&ctx->info->entity_mutex);
array_add(&ctx->info->required_global_variables, e);
gb_mutex_unlock(&ctx->info->entity_mutex);
}
@@ -1320,6 +1328,8 @@ void check_proc_body(CheckerContext *ctx_, Token token, DeclInfo *decl, Type *ty
if (ps->flags & (ScopeFlag_File & ScopeFlag_Pkg & ScopeFlag_Global)) {
return;
} else {
gb_mutex_lock(&ctx->info->deps_mutex);
// NOTE(bill): Add the dependencies from the procedure literal (lambda)
// But only at the procedure level
for_array(i, decl->deps.entries) {
@@ -1330,6 +1340,8 @@ void check_proc_body(CheckerContext *ctx_, Token token, DeclInfo *decl, Type *ty
Type *t = decl->type_info_deps.entries[i].ptr;
ptr_set_add(&decl->parent->type_info_deps, t);
}
gb_mutex_unlock(&ctx->info->deps_mutex);
}
}
}

View File

@@ -223,6 +223,7 @@ bool find_or_generate_polymorphic_procedure(CheckerContext *c, Entity *base_enti
if (!is_type_proc(base_entity->type)) {
return false;
}
String name = base_entity->token.string;
Type *src = base_type(base_entity->type);

View File

@@ -856,6 +856,10 @@ void init_checker_info(CheckerInfo *i) {
gb_mutex_init(&i->gen_procs_mutex);
gb_mutex_init(&i->gen_types_mutex);
gb_mutex_init(&i->type_info_mutex);
gb_mutex_init(&i->deps_mutex);
gb_mutex_init(&i->identifier_uses_mutex);
gb_mutex_init(&i->entity_mutex);
gb_mutex_init(&i->foreign_mutex);
}
@@ -879,6 +883,10 @@ void destroy_checker_info(CheckerInfo *i) {
gb_mutex_destroy(&i->gen_procs_mutex);
gb_mutex_destroy(&i->gen_types_mutex);
gb_mutex_destroy(&i->type_info_mutex);
gb_mutex_destroy(&i->deps_mutex);
gb_mutex_destroy(&i->identifier_uses_mutex);
gb_mutex_destroy(&i->entity_mutex);
gb_mutex_destroy(&i->foreign_mutex);
}
CheckerContext make_checker_context(Checker *c) {
@@ -945,8 +953,7 @@ bool init_checker(Checker *c, Parser *parser) {
init_checker_info(&c->info);
c->info.checker = c;
gb_mutex_init(&c->procs_with_deferred_to_check_mutex);
array_init(&c->procs_with_deferred_to_check, a);
mpmc_init(&c->procs_with_deferred_to_check, a, 1<<10);
// NOTE(bill): Is this big enough or too small?
isize item_size = gb_max3(gb_size_of(Entity), gb_size_of(Type), gb_size_of(Scope));
@@ -957,18 +964,17 @@ bool init_checker(Checker *c, Parser *parser) {
// NOTE(bill): 1 Mi elements should be enough on average
mpmc_init(&c->procs_to_check_queue, heap_allocator(), 1<<20);
gb_semaphore_init(&c->procs_to_check_semaphore);
return true;
}
void destroy_checker(Checker *c) {
destroy_checker_info(&c->info);
gb_mutex_destroy(&c->procs_with_deferred_to_check_mutex);
array_free(&c->procs_with_deferred_to_check);
destroy_checker_context(&c->builtin_ctx);
// mpmc_destroy(&c->procs_to_check_queue);
// gb_semaphore_destroy(&c->procs_to_check_semaphore);
}
@@ -1160,7 +1166,9 @@ void add_entity_definition(CheckerInfo *i, Ast *identifier, Entity *entity) {
GB_ASSERT(entity != nullptr);
identifier->Ident.entity = entity;
entity->identifier = identifier;
gb_mutex_lock(&i->entity_mutex);
array_add(&i->definitions, entity);
gb_mutex_unlock(&i->entity_mutex);
}
bool redeclaration_error(String name, Entity *prev, Entity *found) {
@@ -1242,7 +1250,9 @@ void add_entity_use(CheckerContext *c, Ast *identifier, Entity *entity) {
identifier->Ident.entity = entity;
if (c->info->allow_identifier_uses) {
gb_mutex_lock(&c->info->identifier_uses_mutex);
array_add(&c->info->identifier_uses, identifier);
gb_mutex_unlock(&c->info->identifier_uses_mutex);
}
String dmsg = entity->deprecated_message;
@@ -1278,13 +1288,16 @@ void add_entity_and_decl_info(CheckerContext *c, Ast *identifier, Entity *e, Dec
add_entity(c, scope, identifier, e);
}
add_entity_definition(&c->checker->info, identifier, e);
CheckerInfo *info = c->info;
add_entity_definition(info, identifier, e);
GB_ASSERT(e->decl_info == nullptr);
gb_mutex_lock(&info->entity_mutex);
e->decl_info = d;
d->entity = e;
array_add(&c->checker->info.entities, e);
e->order_in_src = c->checker->info.entities.count;
array_add(&info->entities, e);
e->order_in_src = info->entities.count; // Is this even correct?
e->pkg = c->pkg;
gb_mutex_unlock(&info->entity_mutex);
}
@@ -1311,11 +1324,11 @@ void add_type_info_type(CheckerContext *c, Type *t) {
return;
}
add_type_info_dependency(c->decl, t);
gb_mutex_lock(&c->info->type_info_mutex);
defer (gb_mutex_unlock(&c->info->type_info_mutex));
add_type_info_dependency(c->decl, t);
auto found = map_get(&c->info->type_info_map, hash_type(t));
if (found != nullptr) {
// Types have already been added
@@ -3760,7 +3773,9 @@ void check_add_foreign_import_decl(CheckerContext *ctx, Ast *decl) {
AttributeContext ac = {};
check_decl_attributes(ctx, fl->attributes, foreign_import_decl_attribute, &ac);
if (ac.require_declaration) {
gb_mutex_lock(&ctx->info->foreign_mutex);
array_add(&ctx->info->required_foreign_imports_through_force, e);
gb_mutex_unlock(&ctx->info->foreign_mutex);
add_entity_use(ctx, nullptr, e);
}
}
@@ -4385,14 +4400,17 @@ void check_test_names(Checker *c) {
}
void check_procedure_bodies(Checker *c) {
auto *q = &c->procs_to_check_queue;
ProcInfo *pi = nullptr;
while (mpmc_dequeue(q, &pi)) {
if (pi->decl->parent && pi->decl->parent->entity) {
Entity *parent = pi->decl->parent->entity;
// NOTE(bill): Only check a nested procedure if its parent's body has been checked first
// This is prevent any possible race conditions in evaluation when multithreaded
// NOTE(bill): In single threaded mode, this should never happen
if (parent->kind == Entity_Procedure && (parent->flags & EntityFlag_ProcBodyChecked) == 0) {
mpmc_enqueue(q, pi);
continue;
@@ -4545,8 +4563,7 @@ void check_parsed_files(Checker *c) {
}
TIME_SECTION("check deferred procedures");
for_array(i, c->procs_with_deferred_to_check) {
Entity *src = c->procs_with_deferred_to_check[i];
for (Entity *src = nullptr; mpmc_dequeue(&c->procs_with_deferred_to_check, &src); /**/) {
GB_ASSERT(src->kind == Entity_Procedure);
DeferredProcedureKind dst_kind = src->Procedure.deferred_procedure.kind;

View File

@@ -264,11 +264,7 @@ struct CheckerInfo {
StringMap<AstFile *> files; // Key (full path)
StringMap<AstPackage *> packages; // Key (full path)
StringMap<Entity *> foreigns;
Array<Entity *> definitions;
Array<Entity *> entities;
Array<DeclInfo *> variable_init_order;
Array<DeclInfo *> variable_init_order;
AstPackage * builtin_package;
AstPackage * runtime_package;
@@ -278,14 +274,9 @@ struct CheckerInfo {
PtrSet<Entity *> minimum_dependency_set;
PtrSet<isize> minimum_dependency_type_info_set;
Array<Entity *> required_foreign_imports_through_force;
Array<Entity *> required_global_variables;
Array<Entity *> testing_procedures;
bool allow_identifier_uses;
Array<Ast *> identifier_uses; // only used by 'odin query'
// Below are accessed within procedures
// NOTE(bill): If the semantic checker (check_proc_body) is to ever to be multithreaded,
// these variables will be of contention
@@ -294,6 +285,10 @@ struct CheckerInfo {
gbMutex gen_procs_mutex;
gbMutex gen_types_mutex;
gbMutex type_info_mutex;
gbMutex deps_mutex;
gbMutex identifier_uses_mutex;
gbMutex entity_mutex;
gbMutex foreign_mutex;
Map<ExprInfo *> untyped; // Key: Ast * | Expression -> ExprInfo *
// NOTE(bill): This needs to be a map and not on the Ast
@@ -304,6 +299,16 @@ struct CheckerInfo {
Array<Type *> type_info_types;
Map<isize> type_info_map; // Key: Type *
bool allow_identifier_uses;
Array<Ast *> identifier_uses; // only used by 'odin query'
Array<Entity *> definitions;
Array<Entity *> entities;
StringMap<Entity *> foreigns;
Array<Entity *> required_global_variables;
Array<Entity *> required_foreign_imports_through_force;
};
struct CheckerContext {
@@ -351,10 +356,10 @@ struct Checker {
CheckerContext builtin_ctx;
gbMutex procs_with_deferred_to_check_mutex;
Array<Entity *> procs_with_deferred_to_check;
MPMCQueue<Entity *> procs_with_deferred_to_check;
MPMCQueue<ProcInfo *> procs_to_check_queue;
gbSemaphore procs_to_check_semaphore;
};

View File

@@ -399,6 +399,7 @@ gb_global Arena permanent_arena = {};
void arena_init(Arena *arena, gbAllocator backing, isize block_size=ARENA_DEFAULT_BLOCK_SIZE) {
arena->backing = backing;
arena->block_size = block_size;
arena->use_mutex = true;
array_init(&arena->blocks, backing, 0, 2);
gb_mutex_init(&arena->mutex);
}
@@ -521,6 +522,7 @@ struct Temp_Allocator {
isize curr_offset;
gbAllocator backup_allocator;
Array<void *> leaked_allocations;
gbMutex mutex;
};
gb_global Temp_Allocator temporary_allocator_data = {};
@@ -531,6 +533,7 @@ void temp_allocator_init(Temp_Allocator *s, isize size) {
s->len = size;
s->curr_offset = 0;
s->leaked_allocations.allocator = s->backup_allocator;
gb_mutex_init(&s->mutex);
}
void *temp_allocator_alloc(Temp_Allocator *s, isize size, isize alignment) {
@@ -573,6 +576,9 @@ GB_ALLOCATOR_PROC(temp_allocator_proc) {
Temp_Allocator *s = cast(Temp_Allocator *)allocator_data;
GB_ASSERT_NOT_NULL(s);
gb_mutex_lock(&s->mutex);
defer (gb_mutex_unlock(&s->mutex));
switch (type) {
case gbAllocation_Alloc:
return temp_allocator_alloc(s, size, alignment);

View File

@@ -4664,8 +4664,10 @@ bool init_parser(Parser *p) {
string_map_init(&p->package_map, heap_allocator());
array_init(&p->packages, heap_allocator());
array_init(&p->package_imports, heap_allocator());
gb_mutex_init(&p->import_mutex);
gb_mutex_init(&p->file_add_mutex);
gb_mutex_init(&p->file_decl_mutex);
mpmc_init(&p->file_error_queue, heap_allocator(), 1024);
return true;
}
@@ -4689,8 +4691,10 @@ void destroy_parser(Parser *p) {
array_free(&p->package_imports);
string_set_destroy(&p->imported_files);
string_map_destroy(&p->package_map);
gb_mutex_destroy(&p->import_mutex);
gb_mutex_destroy(&p->file_add_mutex);
gb_mutex_destroy(&p->file_decl_mutex);
mpmc_destroy(&p->file_error_queue);
}
@@ -4718,6 +4722,9 @@ ParseFileError process_imported_file(Parser *p, ImportedFile const &imported_fil
WORKER_TASK_PROC(parser_worker_proc) {
ParserWorkerData *wd = cast(ParserWorkerData *)data;
ParseFileError err = process_imported_file(wd->parser, wd->imported_file);
if (err != ParseFile_None) {
mpmc_enqueue(&wd->parser->file_error_queue, err);
}
return cast(isize)err;
}
@@ -4775,8 +4782,8 @@ void parser_add_foreign_file_to_process(Parser *p, AstPackage *pkg, AstForeignFi
AstPackage *try_add_import_path(Parser *p, String const &path, String const &rel_path, TokenPos pos, PackageKind kind = Package_Normal) {
String const FILE_EXT = str_lit(".odin");
gb_mutex_lock(&p->file_add_mutex);
defer (gb_mutex_unlock(&p->file_add_mutex));
gb_mutex_lock(&p->import_mutex);
defer (gb_mutex_unlock(&p->import_mutex));
if (string_set_exists(&p->imported_files, path)) {
return nullptr;
@@ -5471,10 +5478,7 @@ ParseFileError parse_packages(Parser *p, String init_filename) {
thread_pool_start(&parser_thread_pool);
thread_pool_wait_to_process(&parser_thread_pool);
// NOTE(bill): Get the last error and use that
for (isize i = parser_thread_pool.task_tail-1; i >= 0; i--) {
WorkerTask *task = &parser_thread_pool.tasks[i];
ParseFileError err = cast(ParseFileError)task->result;
for (ParseFileError err = ParseFile_None; mpmc_dequeue(&p->file_error_queue, &err); /**/) {
if (err != ParseFile_None) {
return err;
}

View File

@@ -162,16 +162,18 @@ struct AstPackage {
struct Parser {
String init_fullpath;
StringSet imported_files; // fullpath
StringMap<AstPackage *> package_map; // Key(package name)
Array<AstPackage *> packages;
Array<ImportedPackage> package_imports;
isize file_to_process_count;
isize total_token_count;
isize total_line_count;
gbMutex file_add_mutex;
gbMutex file_decl_mutex;
String init_fullpath;
StringSet imported_files; // fullpath
StringMap<AstPackage *> package_map; // Key(package name)
Array<AstPackage *> packages;
Array<ImportedPackage> package_imports;
isize file_to_process_count;
isize total_token_count;
isize total_line_count;
gbMutex import_mutex;
gbMutex file_add_mutex;
gbMutex file_decl_mutex;
MPMCQueue<ParseFileError> file_error_queue;
};

View File

@@ -15,6 +15,7 @@ struct MPMCQueue {
isize mask;
Array<MPMCQueueNode<T>> buffer;
gbMutex mutex;
std::atomic<isize> count;
CacheLinePad pad1;
std::atomic<isize> head_idx;
@@ -42,7 +43,7 @@ void mpmc_init(MPMCQueue<T> *q, gbAllocator a, isize size) {
template <typename T>
void mpmc_destroy(MPMCQueue<T> *q) {
gb_mutex_destroy(&q->mutex);
gb_array_free(&q->buffer);
gb_free(q->buffer.allocator, q->buffer.data);
}
@@ -60,6 +61,7 @@ bool mpmc_enqueue(MPMCQueue<T> *q, T const &data) {
if (q->head_idx.compare_exchange_weak(head_idx, next_head_idx)) {
node->data = data;
node->idx.store(next_head_idx, std::memory_order_release);
q->count.fetch_add(1, std::memory_order_release);
return true;
}
} else if (diff < 0) {
@@ -98,6 +100,7 @@ bool mpmc_dequeue(MPMCQueue<T> *q, T *data_) {
if (q->tail_idx.compare_exchange_weak(tail_idx, next_tail_idx)) {
if (data_) *data_ = node->data;
node->idx.store(tail_idx + q->mask + 1, std::memory_order_release);
q->count.fetch_sub(1, std::memory_order_release);
return true;
}
} else if (diff < 0) {

View File

@@ -18,10 +18,7 @@ struct ThreadPool {
gbAllocator allocator;
WorkerTask *tasks;
isize volatile task_head;
isize volatile task_tail;
isize volatile task_capacity;
MPMCQueue<WorkerTask> tasks;
gbThread *threads;
isize thread_count;
@@ -39,10 +36,7 @@ GB_THREAD_PROC(worker_thread_internal);
void thread_pool_init(ThreadPool *pool, gbAllocator const &a, isize thread_count, char const *worker_prefix) {
pool->allocator = a;
pool->task_head = 0;
pool->task_tail = 0;
pool->task_capacity = 1024;
pool->tasks = gb_alloc_array(a, WorkerTask, pool->task_capacity);
mpmc_init(&pool->tasks, a, 1024);
pool->thread_count = gb_max(thread_count, 0);
pool->threads = gb_alloc_array(a, gbThread, pool->thread_count);
gb_mutex_init(&pool->mutex);
@@ -100,41 +94,27 @@ void thread_pool_destroy(ThreadPool *pool) {
gb_mutex_destroy(&pool->mutex);
gb_free(pool->allocator, pool->threads);
pool->thread_count = 0;
gb_free(pool->allocator, pool->tasks);
pool->task_head = 0;
pool->task_tail = 0;
pool->task_capacity = 0;
mpmc_destroy(&pool->tasks);
}
void thread_pool_add_task(ThreadPool *pool, WorkerTaskProc *proc, void *data) {
gb_mutex_lock(&pool->mutex);
if (pool->task_tail == pool->task_capacity) {
isize new_cap = 2*pool->task_capacity + 8;
WorkerTask *new_tasks = gb_alloc_array(pool->allocator, WorkerTask, new_cap);
gb_memmove(new_tasks, pool->tasks, (pool->task_tail)*gb_size_of(WorkerTask));
pool->tasks = new_tasks;
pool->task_capacity = new_cap;
}
WorkerTask task = {};
task.do_work = proc;
task.data = data;
pool->tasks[pool->task_tail++] = task;
mpmc_enqueue(&pool->tasks, task);
gb_semaphore_post(&pool->sem_available, 1);
gb_mutex_unlock(&pool->mutex);
}
bool thread_pool_try_and_pop_task(ThreadPool *pool, WorkerTask *task) {
bool got_task = false;
if (gb_mutex_try_lock(&pool->mutex)) {
if (pool->task_tail > pool->task_head) {
gb_atomic32_fetch_add(&pool->processing_work_count, +1);
*task = pool->tasks[pool->task_head++];
got_task = true;
}
gb_mutex_unlock(&pool->mutex);
if (mpmc_dequeue(&pool->tasks, task)) {
gb_atomic32_fetch_add(&pool->processing_work_count, +1);
got_task = true;
}
return got_task;
}
@@ -144,16 +124,16 @@ void thread_pool_do_work(ThreadPool *pool, WorkerTask *task) {
}
void thread_pool_wait_to_process(ThreadPool *pool) {
while (pool->task_tail > pool->task_head || gb_atomic32_load(&pool->processing_work_count) != 0) {
while (pool->tasks.count.load(std::memory_order_relaxed) > 0 || gb_atomic32_load(&pool->processing_work_count) != 0) {
WorkerTask task = {};
if (thread_pool_try_and_pop_task(pool, &task)) {
thread_pool_do_work(pool, &task);
}
// Safety-kick
if (pool->task_tail > pool->task_head && gb_atomic32_load(&pool->processing_work_count) == 0) {
while (pool->tasks.count.load(std::memory_order_relaxed) > 0 && gb_atomic32_load(&pool->processing_work_count) == 0) {
gb_mutex_lock(&pool->mutex);
gb_semaphore_post(&pool->sem_available, cast(i32)(pool->task_tail-pool->task_head));
gb_semaphore_post(&pool->sem_available, cast(i32)pool->tasks.count.load(std::memory_order_relaxed));
gb_mutex_unlock(&pool->mutex);
}