mirror of
https://github.com/odin-lang/Odin.git
synced 2026-04-21 22:05:20 +00:00
Merge pull request #1103 from odin-lang/new-thread-pool
Improved Thread Pool implementation for the Compiler
This commit is contained in:
140
src/checker.cpp
140
src/checker.cpp
@@ -4140,23 +4140,20 @@ struct ThreadProcCheckerSection {
|
||||
};
|
||||
|
||||
|
||||
void check_with_workers(Checker *c, ThreadProc *proc, isize total_count) {
|
||||
void check_with_workers(Checker *c, WorkerTaskProc *proc, isize total_count) {
|
||||
isize thread_count = gb_max(build_context.thread_count, 1);
|
||||
isize worker_count = thread_count-1; // NOTE(bill): The main thread will also be used for work
|
||||
if (!build_context.threaded_checker) {
|
||||
worker_count = 0;
|
||||
}
|
||||
|
||||
|
||||
semaphore_post(&c->info.collect_semaphore, cast(i32)thread_count);
|
||||
|
||||
if (worker_count == 0) {
|
||||
ThreadProcCheckerSection section_all = {};
|
||||
section_all.checker = c;
|
||||
section_all.offset = 0;
|
||||
section_all.count = total_count;
|
||||
Thread dummy_main_thread = {};
|
||||
dummy_main_thread.user_data = §ion_all;
|
||||
proc(&dummy_main_thread);
|
||||
proc(§ion_all);
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -4173,33 +4170,18 @@ void check_with_workers(Checker *c, ThreadProc *proc, isize total_count) {
|
||||
}
|
||||
GB_ASSERT(remaining_count <= 0);
|
||||
|
||||
Thread *threads = gb_alloc_array(permanent_allocator(), Thread, worker_count);
|
||||
for (isize i = 0; i < worker_count; i++) {
|
||||
thread_init(threads+i);
|
||||
|
||||
for (isize i = 0; i < thread_count; i++) {
|
||||
global_thread_pool_add_task(proc, thread_data+i);
|
||||
}
|
||||
|
||||
for (isize i = 0; i < worker_count; i++) {
|
||||
thread_start(threads+i, proc, thread_data+i);
|
||||
}
|
||||
Thread dummy_main_thread = {};
|
||||
dummy_main_thread.user_data = thread_data+worker_count;
|
||||
proc(&dummy_main_thread);
|
||||
|
||||
global_thread_pool_wait();
|
||||
semaphore_wait(&c->info.collect_semaphore);
|
||||
|
||||
for (isize i = 0; i < worker_count; i++) {
|
||||
thread_join(threads+i);
|
||||
}
|
||||
|
||||
for (isize i = 0; i < worker_count; i++) {
|
||||
thread_destroy(threads+i);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
THREAD_PROC(thread_proc_collect_entities) {
|
||||
auto *data = cast(ThreadProcCheckerSection *)thread->user_data;
|
||||
Checker *c = data->checker;
|
||||
WORKER_TASK_PROC(thread_proc_collect_entities) {
|
||||
auto *cs = cast(ThreadProcCheckerSection *)data;
|
||||
Checker *c = cs->checker;
|
||||
CheckerContext collect_entity_ctx = make_checker_context(c);
|
||||
defer (destroy_checker_context(&collect_entity_ctx));
|
||||
|
||||
@@ -4208,8 +4190,8 @@ THREAD_PROC(thread_proc_collect_entities) {
|
||||
UntypedExprInfoMap untyped = {};
|
||||
map_init(&untyped, heap_allocator());
|
||||
|
||||
isize offset = data->offset;
|
||||
isize file_end = gb_min(offset+data->count, c->info.files.entries.count);
|
||||
isize offset = cs->offset;
|
||||
isize file_end = gb_min(offset+cs->count, c->info.files.entries.count);
|
||||
|
||||
for (isize i = offset; i < file_end; i++) {
|
||||
AstFile *f = c->info.files.entries[i].value;
|
||||
@@ -4246,9 +4228,9 @@ void check_export_entities_in_pkg(CheckerContext *ctx, AstPackage *pkg, UntypedE
|
||||
}
|
||||
}
|
||||
|
||||
THREAD_PROC(thread_proc_check_export_entities) {
|
||||
auto data = cast(ThreadProcCheckerSection *)thread->user_data;
|
||||
Checker *c = data->checker;
|
||||
WORKER_TASK_PROC(thread_proc_check_export_entities) {
|
||||
auto cs = cast(ThreadProcCheckerSection *)data;
|
||||
Checker *c = cs->checker;
|
||||
|
||||
CheckerContext ctx = make_checker_context(c);
|
||||
defer (destroy_checker_context(&ctx));
|
||||
@@ -4256,8 +4238,8 @@ THREAD_PROC(thread_proc_check_export_entities) {
|
||||
UntypedExprInfoMap untyped = {};
|
||||
map_init(&untyped, heap_allocator());
|
||||
|
||||
isize end = gb_min(data->offset + data->count, c->info.packages.entries.count);
|
||||
for (isize i = data->offset; i < end; i++) {
|
||||
isize end = gb_min(cs->offset + cs->count, c->info.packages.entries.count);
|
||||
for (isize i = cs->offset; i < end; i++) {
|
||||
AstPackage *pkg = c->info.packages.entries[i].value;
|
||||
check_export_entities_in_pkg(&ctx, pkg, &untyped);
|
||||
}
|
||||
@@ -4575,15 +4557,19 @@ void calculate_global_init_order(Checker *c) {
|
||||
}
|
||||
|
||||
|
||||
void check_proc_info(Checker *c, ProcInfo *pi, UntypedExprInfoMap *untyped, ProcBodyQueue *procs_to_check_queue) {
|
||||
bool check_proc_info(Checker *c, ProcInfo *pi, UntypedExprInfoMap *untyped, ProcBodyQueue *procs_to_check_queue) {
|
||||
if (pi == nullptr) {
|
||||
return;
|
||||
return false;
|
||||
}
|
||||
if (pi->type == nullptr) {
|
||||
return;
|
||||
return false;
|
||||
}
|
||||
Entity *e = pi->decl->entity;
|
||||
if (pi->decl->proc_checked) {
|
||||
return;
|
||||
if (e != nullptr) {
|
||||
GB_ASSERT(e->flags & EntityFlag_ProcBodyChecked);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
CheckerContext ctx = make_checker_context(c);
|
||||
@@ -4601,14 +4587,13 @@ void check_proc_info(Checker *c, ProcInfo *pi, UntypedExprInfoMap *untyped, Proc
|
||||
token = ast_token(pi->poly_def_node);
|
||||
}
|
||||
error(token, "Unspecialized polymorphic procedure '%.*s'", LIT(name));
|
||||
return;
|
||||
return false;
|
||||
}
|
||||
|
||||
if (pt->is_polymorphic && pt->is_poly_specialized) {
|
||||
Entity *e = pi->decl->entity;
|
||||
if ((e->flags & EntityFlag_Used) == 0) {
|
||||
// NOTE(bill, 2019-08-31): It was never used, don't check
|
||||
return;
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4622,16 +4607,17 @@ void check_proc_info(Checker *c, ProcInfo *pi, UntypedExprInfoMap *untyped, Proc
|
||||
ctx.state_flags |= StateFlag_no_bounds_check;
|
||||
ctx.state_flags &= ~StateFlag_bounds_check;
|
||||
}
|
||||
if (pi->body != nullptr && pi->decl->entity != nullptr) {
|
||||
GB_ASSERT((pi->decl->entity->flags & EntityFlag_ProcBodyChecked) == 0);
|
||||
if (pi->body != nullptr && e != nullptr) {
|
||||
GB_ASSERT((e->flags & EntityFlag_ProcBodyChecked) == 0);
|
||||
}
|
||||
|
||||
check_proc_body(&ctx, pi->token, pi->decl, pi->type, pi->body);
|
||||
if (pi->body != nullptr && pi->decl->entity != nullptr) {
|
||||
pi->decl->entity->flags |= EntityFlag_ProcBodyChecked;
|
||||
if (e != nullptr) {
|
||||
e->flags |= EntityFlag_ProcBodyChecked;
|
||||
}
|
||||
pi->decl->proc_checked = true;
|
||||
add_untyped_expressions(&c->info, ctx.untyped);
|
||||
return true;
|
||||
}
|
||||
|
||||
GB_STATIC_ASSERT(sizeof(isize) == sizeof(void *));
|
||||
@@ -4681,9 +4667,10 @@ void check_unchecked_bodies(Checker *c) {
|
||||
ProcInfo *pi = nullptr;
|
||||
while (mpmc_dequeue(q, &pi)) {
|
||||
Entity *e = pi->decl->entity;
|
||||
consume_proc_info_queue(c, pi, q, &untyped);
|
||||
add_dependency_to_set(c, e);
|
||||
GB_ASSERT(e->flags & EntityFlag_ProcBodyChecked);
|
||||
if (consume_proc_info_queue(c, pi, q, &untyped)) {
|
||||
add_dependency_to_set(c, e);
|
||||
GB_ASSERT(e->flags & EntityFlag_ProcBodyChecked);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@@ -4728,15 +4715,15 @@ bool consume_proc_info_queue(Checker *c, ProcInfo *pi, ProcBodyQueue *q, Untyped
|
||||
// NOTE(bill): In single threaded mode, this should never happen
|
||||
if (parent->kind == Entity_Procedure && (parent->flags & EntityFlag_ProcBodyChecked) == 0) {
|
||||
mpmc_enqueue(q, pi);
|
||||
return true;
|
||||
return false;
|
||||
}
|
||||
}
|
||||
if (untyped) {
|
||||
map_clear(untyped);
|
||||
}
|
||||
check_proc_info(c, pi, untyped, q);
|
||||
bool ok = check_proc_info(c, pi, untyped, q);
|
||||
total_bodies_checked.fetch_add(1, std::memory_order_relaxed);
|
||||
return false;
|
||||
return ok;
|
||||
}
|
||||
|
||||
struct ThreadProcBodyData {
|
||||
@@ -4747,11 +4734,11 @@ struct ThreadProcBodyData {
|
||||
ThreadProcBodyData *all_data;
|
||||
};
|
||||
|
||||
THREAD_PROC(thread_proc_body) {
|
||||
ThreadProcBodyData *data = cast(ThreadProcBodyData *)thread->user_data;
|
||||
Checker *c = data->checker;
|
||||
WORKER_TASK_PROC(thread_proc_body) {
|
||||
ThreadProcBodyData *bd = cast(ThreadProcBodyData *)data;
|
||||
Checker *c = bd->checker;
|
||||
GB_ASSERT(c != nullptr);
|
||||
ProcBodyQueue *this_queue = data->queue;
|
||||
ProcBodyQueue *this_queue = bd->queue;
|
||||
|
||||
UntypedExprInfoMap untyped = {};
|
||||
map_init(&untyped, heap_allocator());
|
||||
@@ -4774,12 +4761,18 @@ void check_procedure_bodies(Checker *c) {
|
||||
u32 worker_count = thread_count-1; // NOTE(bill): The main thread will also be used for work
|
||||
if (!build_context.threaded_checker) {
|
||||
worker_count = 0;
|
||||
|
||||
auto *q = &c->procs_to_check_queue;
|
||||
ProcInfo *pi = nullptr;
|
||||
while (mpmc_dequeue(q, &pi)) {
|
||||
consume_proc_info_queue(c, pi, q, nullptr);
|
||||
}
|
||||
if (worker_count == 0) {
|
||||
auto *this_queue = &c->procs_to_check_queue;
|
||||
|
||||
UntypedExprInfoMap untyped = {};
|
||||
map_init(&untyped, heap_allocator());
|
||||
|
||||
for (ProcInfo *pi = nullptr; mpmc_dequeue(this_queue, &pi); /**/) {
|
||||
consume_proc_info_queue(c, pi, this_queue, &untyped);
|
||||
}
|
||||
|
||||
map_destroy(&untyped);
|
||||
|
||||
debugf("Total Procedure Bodies Checked: %td\n", total_bodies_checked.load(std::memory_order_relaxed));
|
||||
return;
|
||||
@@ -4821,31 +4814,14 @@ void check_procedure_bodies(Checker *c) {
|
||||
}
|
||||
GB_ASSERT(total_queued == original_queue_count);
|
||||
|
||||
|
||||
semaphore_post(&c->procs_to_check_semaphore, cast(i32)thread_count);
|
||||
|
||||
Thread *threads = gb_alloc_array(permanent_allocator(), Thread, worker_count);
|
||||
for (isize i = 0; i < worker_count; i++) {
|
||||
thread_init(threads+i);
|
||||
|
||||
for (isize i = 0; i < thread_count; i++) {
|
||||
global_thread_pool_add_task(thread_proc_body, thread_data+i);
|
||||
}
|
||||
|
||||
for (isize i = 0; i < worker_count; i++) {
|
||||
thread_start(threads+i, thread_proc_body, thread_data+i);
|
||||
}
|
||||
Thread dummy_main_thread = {};
|
||||
dummy_main_thread.user_data = thread_data+worker_count;
|
||||
thread_proc_body(&dummy_main_thread);
|
||||
|
||||
global_thread_pool_wait();
|
||||
semaphore_wait(&c->procs_to_check_semaphore);
|
||||
|
||||
for (isize i = 0; i < worker_count; i++) {
|
||||
thread_join(threads+i);
|
||||
}
|
||||
|
||||
for (isize i = 0; i < worker_count; i++) {
|
||||
thread_destroy(threads+i);
|
||||
}
|
||||
|
||||
isize global_remaining = c->procs_to_check_queue.count.load(std::memory_order_relaxed);
|
||||
GB_ASSERT(global_remaining == 0);
|
||||
|
||||
|
||||
@@ -44,11 +44,9 @@ void debugf(char const *fmt, ...);
|
||||
#include "queue.cpp"
|
||||
#include "common_memory.cpp"
|
||||
#include "string.cpp"
|
||||
|
||||
|
||||
|
||||
#include "range_cache.cpp"
|
||||
|
||||
|
||||
u32 fnv32a(void const *data, isize len) {
|
||||
u8 const *bytes = cast(u8 const *)data;
|
||||
u32 h = 0x811c9dc5;
|
||||
|
||||
@@ -1104,9 +1104,6 @@ void lb_generate_code(lbGenerator *gen) {
|
||||
|
||||
LLVMBool do_threading = (LLVMIsMultithreaded() && USE_SEPARATE_MODULES && MULTITHREAD_OBJECT_GENERATION && worker_count > 0);
|
||||
|
||||
thread_pool_init(&lb_thread_pool, heap_allocator(), worker_count, "LLVMBackend");
|
||||
defer (thread_pool_destroy(&lb_thread_pool));
|
||||
|
||||
lbModule *default_module = &gen->default_module;
|
||||
CheckerInfo *info = gen->info;
|
||||
|
||||
@@ -1691,10 +1688,10 @@ void lb_generate_code(lbGenerator *gen) {
|
||||
wd->code_gen_file_type = code_gen_file_type;
|
||||
wd->filepath_obj = filepath_obj;
|
||||
wd->m = m;
|
||||
thread_pool_add_task(&lb_thread_pool, lb_llvm_emit_worker_proc, wd);
|
||||
global_thread_pool_add_task(lb_llvm_emit_worker_proc, wd);
|
||||
}
|
||||
|
||||
thread_pool_wait(&lb_thread_pool);
|
||||
thread_pool_wait(&global_thread_pool);
|
||||
} else {
|
||||
for_array(j, gen->modules.entries) {
|
||||
lbModule *m = gen->modules.entries[j].value;
|
||||
|
||||
@@ -1,7 +1,5 @@
|
||||
void lb_add_debug_local_variable(lbProcedure *p, LLVMValueRef ptr, Type *type, Token const &token);
|
||||
|
||||
gb_global ThreadPool lb_thread_pool = {};
|
||||
|
||||
gb_global Entity *lb_global_type_info_data_entity = {};
|
||||
gb_global lbAddr lb_global_type_info_member_types = {};
|
||||
gb_global lbAddr lb_global_type_info_member_names = {};
|
||||
|
||||
17
src/main.cpp
17
src/main.cpp
@@ -7,6 +7,20 @@
|
||||
#include "exact_value.cpp"
|
||||
#include "build_settings.cpp"
|
||||
|
||||
gb_global ThreadPool global_thread_pool;
|
||||
void init_global_thread_pool(void) {
|
||||
isize thread_count = gb_max(build_context.thread_count, 1);
|
||||
isize worker_count = thread_count-1; // NOTE(bill): The main thread will also be used for work
|
||||
thread_pool_init(&global_thread_pool, permanent_allocator(), worker_count, "ThreadPoolWorker");
|
||||
}
|
||||
bool global_thread_pool_add_task(WorkerTaskProc *proc, void *data) {
|
||||
return thread_pool_add_task(&global_thread_pool, proc, data);
|
||||
}
|
||||
void global_thread_pool_wait(void) {
|
||||
thread_pool_wait(&global_thread_pool);
|
||||
}
|
||||
|
||||
|
||||
void debugf(char const *fmt, ...) {
|
||||
if (build_context.show_debug_messages) {
|
||||
gb_printf_err("[DEBUG] ");
|
||||
@@ -2160,6 +2174,9 @@ int main(int arg_count, char const **arg_ptr) {
|
||||
// return 1;
|
||||
// }
|
||||
|
||||
init_global_thread_pool();
|
||||
defer (thread_pool_destroy(&global_thread_pool));
|
||||
|
||||
init_universal();
|
||||
// TODO(bill): prevent compiling without a linker
|
||||
|
||||
|
||||
@@ -4810,6 +4810,7 @@ bool init_parser(Parser *p) {
|
||||
string_set_init(&p->imported_files, heap_allocator());
|
||||
array_init(&p->packages, heap_allocator());
|
||||
array_init(&p->package_imports, heap_allocator());
|
||||
mutex_init(&p->wait_mutex);
|
||||
mutex_init(&p->import_mutex);
|
||||
mutex_init(&p->file_add_mutex);
|
||||
mutex_init(&p->file_decl_mutex);
|
||||
@@ -4837,6 +4838,7 @@ void destroy_parser(Parser *p) {
|
||||
array_free(&p->packages);
|
||||
array_free(&p->package_imports);
|
||||
string_set_destroy(&p->imported_files);
|
||||
mutex_destroy(&p->wait_mutex);
|
||||
mutex_destroy(&p->import_mutex);
|
||||
mutex_destroy(&p->file_add_mutex);
|
||||
mutex_destroy(&p->file_decl_mutex);
|
||||
@@ -4870,7 +4872,7 @@ void parser_add_file_to_process(Parser *p, AstPackage *pkg, FileInfo fi, TokenPo
|
||||
auto wd = gb_alloc_item(heap_allocator(), ParserWorkerData);
|
||||
wd->parser = p;
|
||||
wd->imported_file = f;
|
||||
thread_pool_add_task(&parser_thread_pool, parser_worker_proc, wd);
|
||||
global_thread_pool_add_task(parser_worker_proc, wd);
|
||||
}
|
||||
|
||||
WORKER_TASK_PROC(foreign_file_worker_proc) {
|
||||
@@ -4909,7 +4911,7 @@ void parser_add_foreign_file_to_process(Parser *p, AstPackage *pkg, AstForeignFi
|
||||
wd->parser = p;
|
||||
wd->imported_file = f;
|
||||
wd->foreign_kind = kind;
|
||||
thread_pool_add_task(&parser_thread_pool, foreign_file_worker_proc, wd);
|
||||
global_thread_pool_add_task(foreign_file_worker_proc, wd);
|
||||
}
|
||||
|
||||
|
||||
@@ -5619,10 +5621,6 @@ ParseFileError process_imported_file(Parser *p, ImportedFile imported_file) {
|
||||
ParseFileError parse_packages(Parser *p, String init_filename) {
|
||||
GB_ASSERT(init_filename.text[init_filename.len] == 0);
|
||||
|
||||
isize thread_count = gb_max(build_context.thread_count, 1);
|
||||
isize worker_count = thread_count-1; // NOTE(bill): The main thread will also be used for work
|
||||
thread_pool_init(&parser_thread_pool, heap_allocator(), worker_count, "ParserWork");
|
||||
|
||||
String init_fullpath = path_to_full_path(heap_allocator(), init_filename);
|
||||
if (!path_is_directory(init_fullpath)) {
|
||||
String const ext = str_lit(".odin");
|
||||
@@ -5631,38 +5629,45 @@ ParseFileError parse_packages(Parser *p, String init_filename) {
|
||||
return ParseFile_WrongExtension;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
TokenPos init_pos = {};
|
||||
{
|
||||
String s = get_fullpath_core(heap_allocator(), str_lit("runtime"));
|
||||
try_add_import_path(p, s, s, init_pos, Package_Runtime);
|
||||
}
|
||||
{ // Add these packages serially and then process them parallel
|
||||
mutex_lock(&p->wait_mutex);
|
||||
defer (mutex_unlock(&p->wait_mutex));
|
||||
|
||||
TokenPos init_pos = {};
|
||||
{
|
||||
String s = get_fullpath_core(heap_allocator(), str_lit("runtime"));
|
||||
try_add_import_path(p, s, s, init_pos, Package_Runtime);
|
||||
}
|
||||
|
||||
try_add_import_path(p, init_fullpath, init_fullpath, init_pos, Package_Init);
|
||||
p->init_fullpath = init_fullpath;
|
||||
try_add_import_path(p, init_fullpath, init_fullpath, init_pos, Package_Init);
|
||||
p->init_fullpath = init_fullpath;
|
||||
|
||||
if (build_context.command_kind == Command_test) {
|
||||
String s = get_fullpath_core(heap_allocator(), str_lit("testing"));
|
||||
try_add_import_path(p, s, s, init_pos, Package_Normal);
|
||||
}
|
||||
if (build_context.command_kind == Command_test) {
|
||||
String s = get_fullpath_core(heap_allocator(), str_lit("testing"));
|
||||
try_add_import_path(p, s, s, init_pos, Package_Normal);
|
||||
}
|
||||
|
||||
|
||||
for_array(i, build_context.extra_packages) {
|
||||
String path = build_context.extra_packages[i];
|
||||
String fullpath = path_to_full_path(heap_allocator(), path); // LEAK?
|
||||
if (!path_is_directory(fullpath)) {
|
||||
String const ext = str_lit(".odin");
|
||||
if (!string_ends_with(fullpath, ext)) {
|
||||
error_line("Expected either a directory or a .odin file, got '%.*s'\n", LIT(fullpath));
|
||||
return ParseFile_WrongExtension;
|
||||
for_array(i, build_context.extra_packages) {
|
||||
String path = build_context.extra_packages[i];
|
||||
String fullpath = path_to_full_path(heap_allocator(), path); // LEAK?
|
||||
if (!path_is_directory(fullpath)) {
|
||||
String const ext = str_lit(".odin");
|
||||
if (!string_ends_with(fullpath, ext)) {
|
||||
error_line("Expected either a directory or a .odin file, got '%.*s'\n", LIT(fullpath));
|
||||
return ParseFile_WrongExtension;
|
||||
}
|
||||
}
|
||||
AstPackage *pkg = try_add_import_path(p, fullpath, fullpath, init_pos, Package_Normal);
|
||||
if (pkg) {
|
||||
pkg->is_extra = true;
|
||||
}
|
||||
}
|
||||
AstPackage *pkg = try_add_import_path(p, fullpath, fullpath, init_pos, Package_Normal);
|
||||
if (pkg) {
|
||||
pkg->is_extra = true;
|
||||
}
|
||||
}
|
||||
|
||||
thread_pool_wait(&parser_thread_pool);
|
||||
|
||||
global_thread_pool_wait();
|
||||
|
||||
for (ParseFileError err = ParseFile_None; mpmc_dequeue(&p->file_error_queue, &err); /**/) {
|
||||
if (err != ParseFile_None) {
|
||||
|
||||
@@ -191,6 +191,7 @@ struct Parser {
|
||||
isize file_to_process_count;
|
||||
isize total_token_count;
|
||||
isize total_line_count;
|
||||
BlockingMutex wait_mutex;
|
||||
BlockingMutex import_mutex;
|
||||
BlockingMutex file_add_mutex;
|
||||
BlockingMutex file_decl_mutex;
|
||||
@@ -198,9 +199,6 @@ struct Parser {
|
||||
MPMCQueue<ParseFileError> file_error_queue;
|
||||
};
|
||||
|
||||
|
||||
gb_global ThreadPool parser_thread_pool = {};
|
||||
|
||||
struct ParserWorkerData {
|
||||
Parser *parser;
|
||||
ImportedFile imported_file;
|
||||
|
||||
@@ -4,85 +4,156 @@
|
||||
typedef WORKER_TASK_PROC(WorkerTaskProc);
|
||||
|
||||
struct WorkerTask {
|
||||
WorkerTask *next_task;
|
||||
WorkerTask * next;
|
||||
WorkerTaskProc *do_work;
|
||||
void *data;
|
||||
void * data;
|
||||
};
|
||||
|
||||
struct ThreadPool {
|
||||
std::atomic<isize> outstanding_task_count;
|
||||
WorkerTask *volatile next_task;
|
||||
BlockingMutex task_list_mutex;
|
||||
isize thread_count;
|
||||
gbAllocator allocator;
|
||||
BlockingMutex mutex;
|
||||
Condition task_cond;
|
||||
|
||||
Slice<Thread> threads;
|
||||
|
||||
WorkerTask *task_queue;
|
||||
|
||||
std::atomic<isize> ready;
|
||||
std::atomic<bool> stop;
|
||||
|
||||
};
|
||||
|
||||
void thread_pool_thread_entry(ThreadPool *pool) {
|
||||
while (pool->outstanding_task_count) {
|
||||
if (!pool->next_task) {
|
||||
yield(); // No need to grab the mutex.
|
||||
} else {
|
||||
mutex_lock(&pool->task_list_mutex);
|
||||
THREAD_PROC(thread_pool_thread_proc);
|
||||
|
||||
if (pool->next_task) {
|
||||
WorkerTask *task = pool->next_task;
|
||||
pool->next_task = task->next_task;
|
||||
mutex_unlock(&pool->task_list_mutex);
|
||||
task->do_work(task->data);
|
||||
pool->outstanding_task_count.fetch_sub(1);
|
||||
gb_free(heap_allocator(), task);
|
||||
} else {
|
||||
mutex_unlock(&pool->task_list_mutex);
|
||||
}
|
||||
void thread_pool_init(ThreadPool *pool, gbAllocator const &a, isize thread_count, char const *worker_name) {
|
||||
pool->allocator = a;
|
||||
pool->stop = false;
|
||||
mutex_init(&pool->mutex);
|
||||
condition_init(&pool->task_cond);
|
||||
|
||||
slice_init(&pool->threads, a, thread_count);
|
||||
for_array(i, pool->threads) {
|
||||
Thread *t = &pool->threads[i];
|
||||
thread_init(t);
|
||||
}
|
||||
|
||||
for_array(i, pool->threads) {
|
||||
Thread *t = &pool->threads[i];
|
||||
thread_start(t, thread_pool_thread_proc, pool);
|
||||
}
|
||||
}
|
||||
|
||||
void thread_pool_destroy(ThreadPool *pool) {
|
||||
pool->stop = true;
|
||||
condition_broadcast(&pool->task_cond);
|
||||
|
||||
for_array(i, pool->threads) {
|
||||
Thread *t = &pool->threads[i];
|
||||
thread_join(t);
|
||||
}
|
||||
|
||||
for_array(i, pool->threads) {
|
||||
Thread *t = &pool->threads[i];
|
||||
thread_destroy(t);
|
||||
}
|
||||
|
||||
gb_free(pool->allocator, pool->threads.data);
|
||||
mutex_destroy(&pool->mutex);
|
||||
condition_destroy(&pool->task_cond);
|
||||
}
|
||||
|
||||
bool thread_pool_queue_empty(ThreadPool *pool) {
|
||||
return pool->task_queue == nullptr;
|
||||
}
|
||||
|
||||
WorkerTask *thread_pool_queue_pop(ThreadPool *pool) {
|
||||
GB_ASSERT(pool->task_queue != nullptr);
|
||||
WorkerTask *task = pool->task_queue;
|
||||
pool->task_queue = task->next;
|
||||
return task;
|
||||
}
|
||||
void thread_pool_queue_push(ThreadPool *pool, WorkerTask *task) {
|
||||
GB_ASSERT(task != nullptr);
|
||||
task->next = pool->task_queue;
|
||||
pool->task_queue = task;
|
||||
}
|
||||
|
||||
bool thread_pool_add_task(ThreadPool *pool, WorkerTaskProc *proc, void *data) {
|
||||
GB_ASSERT(proc != nullptr);
|
||||
mutex_lock(&pool->mutex);
|
||||
WorkerTask *task = gb_alloc_item(permanent_allocator(), WorkerTask);
|
||||
if (task == nullptr) {
|
||||
mutex_unlock(&pool->mutex);
|
||||
GB_PANIC("Out of memory");
|
||||
return false;
|
||||
}
|
||||
task->do_work = proc;
|
||||
task->data = data;
|
||||
|
||||
thread_pool_queue_push(pool, task);
|
||||
GB_ASSERT(pool->ready >= 0);
|
||||
pool->ready++;
|
||||
mutex_unlock(&pool->mutex);
|
||||
condition_signal(&pool->task_cond);
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
void thread_pool_do_task(WorkerTask *task) {
|
||||
task->do_work(task->data);
|
||||
}
|
||||
|
||||
void thread_pool_wait(ThreadPool *pool) {
|
||||
if (pool->threads.count == 0) {
|
||||
while (!thread_pool_queue_empty(pool)) {
|
||||
thread_pool_do_task(thread_pool_queue_pop(pool));
|
||||
--pool->ready;
|
||||
}
|
||||
GB_ASSERT(pool->ready == 0);
|
||||
return;
|
||||
}
|
||||
for (;;) {
|
||||
mutex_lock(&pool->mutex);
|
||||
|
||||
while (!pool->stop && pool->ready > 0 && thread_pool_queue_empty(pool)) {
|
||||
condition_wait(&pool->task_cond, &pool->mutex);
|
||||
}
|
||||
if ((pool->stop || pool->ready == 0) && thread_pool_queue_empty(pool)) {
|
||||
mutex_unlock(&pool->mutex);
|
||||
return;
|
||||
}
|
||||
|
||||
WorkerTask *task = thread_pool_queue_pop(pool);
|
||||
mutex_unlock(&pool->mutex);
|
||||
|
||||
thread_pool_do_task(task);
|
||||
if (--pool->ready == 0) {
|
||||
condition_broadcast(&pool->task_cond);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#if defined(GB_SYSTEM_WINDOWS)
|
||||
DWORD __stdcall thread_pool_thread_entry_platform(void *arg) {
|
||||
thread_pool_thread_entry((ThreadPool *) arg);
|
||||
return 0;
|
||||
|
||||
THREAD_PROC(thread_pool_thread_proc) {
|
||||
ThreadPool *pool = cast(ThreadPool *)thread->user_data;
|
||||
|
||||
for (;;) {
|
||||
mutex_lock(&pool->mutex);
|
||||
|
||||
while (!pool->stop && thread_pool_queue_empty(pool)) {
|
||||
condition_wait(&pool->task_cond, &pool->mutex);
|
||||
}
|
||||
if (pool->stop && thread_pool_queue_empty(pool)) {
|
||||
mutex_unlock(&pool->mutex);
|
||||
return 0;
|
||||
}
|
||||
|
||||
WorkerTask *task = thread_pool_queue_pop(pool);
|
||||
mutex_unlock(&pool->mutex);
|
||||
|
||||
thread_pool_do_task(task);
|
||||
if (--pool->ready == 0) {
|
||||
condition_broadcast(&pool->task_cond);
|
||||
}
|
||||
}
|
||||
|
||||
void thread_pool_start_thread(ThreadPool *pool) {
|
||||
CloseHandle(CreateThread(NULL, 0, thread_pool_thread_entry_platform, pool, 0, NULL));
|
||||
}
|
||||
#else
|
||||
void *thread_pool_thread_entry_platform(void *arg) {
|
||||
thread_pool_thread_entry((ThreadPool *) arg);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
void thread_pool_start_thread(ThreadPool *pool) {
|
||||
pthread_t handle;
|
||||
pthread_create(&handle, NULL, thread_pool_thread_entry_platform, pool);
|
||||
pthread_detach(handle);
|
||||
}
|
||||
#endif
|
||||
|
||||
void thread_pool_init(ThreadPool *pool, gbAllocator const &a, isize thread_count, char const *worker_prefix) {
|
||||
memset(pool, 0, sizeof(ThreadPool));
|
||||
mutex_init(&pool->task_list_mutex);
|
||||
pool->thread_count = thread_count;
|
||||
}
|
||||
|
||||
void thread_pool_destroy(ThreadPool *pool) {
|
||||
mutex_destroy(&pool->task_list_mutex);
|
||||
}
|
||||
|
||||
void thread_pool_wait(ThreadPool *pool) {
|
||||
for (int i = 0; i < pool->thread_count; i++) {
|
||||
thread_pool_start_thread(pool);
|
||||
}
|
||||
thread_pool_thread_entry(pool);
|
||||
}
|
||||
|
||||
void thread_pool_add_task(ThreadPool *pool, WorkerTaskProc *proc, void *data) {
|
||||
WorkerTask *task = gb_alloc_item(heap_allocator(), WorkerTask);
|
||||
task->do_work = proc;
|
||||
task->data = data;
|
||||
mutex_lock(&pool->task_list_mutex);
|
||||
task->next_task = pool->next_task;
|
||||
pool->next_task = task;
|
||||
pool->outstanding_task_count.fetch_add(1);
|
||||
mutex_unlock(&pool->task_list_mutex);
|
||||
}
|
||||
}
|
||||
@@ -1,6 +1,7 @@
|
||||
struct BlockingMutex;
|
||||
struct RecursiveMutex;
|
||||
struct Semaphore;
|
||||
struct Condition;
|
||||
struct Thread;
|
||||
|
||||
#define THREAD_PROC(name) isize name(struct Thread *thread)
|
||||
@@ -41,6 +42,14 @@ void semaphore_post (Semaphore *s, i32 count);
|
||||
void semaphore_wait (Semaphore *s);
|
||||
void semaphore_release(Semaphore *s) { semaphore_post(s, 1); }
|
||||
|
||||
|
||||
void condition_init(Condition *c);
|
||||
void condition_destroy(Condition *c);
|
||||
void condition_broadcast(Condition *c);
|
||||
void condition_signal(Condition *c);
|
||||
void condition_wait(Condition *c, BlockingMutex *m);
|
||||
void condition_wait_with_timeout(Condition *c, BlockingMutex *m, u32 timeout_in_ms);
|
||||
|
||||
u32 thread_current_id(void);
|
||||
|
||||
void thread_init (Thread *t);
|
||||
@@ -108,6 +117,27 @@ void yield_process(void);
|
||||
void semaphore_wait(Semaphore *s) {
|
||||
WaitForSingleObjectEx(s->win32_handle, INFINITE, FALSE);
|
||||
}
|
||||
|
||||
struct Condition {
|
||||
CONDITION_VARIABLE cond;
|
||||
};
|
||||
|
||||
void condition_init(Condition *c) {
|
||||
}
|
||||
void condition_destroy(Condition *c) {
|
||||
}
|
||||
void condition_broadcast(Condition *c) {
|
||||
WakeAllConditionVariable(&c->cond);
|
||||
}
|
||||
void condition_signal(Condition *c) {
|
||||
WakeConditionVariable(&c->cond);
|
||||
}
|
||||
void condition_wait(Condition *c, BlockingMutex *m) {
|
||||
SleepConditionVariableSRW(&c->cond, &m->srwlock, INFINITE, 0);
|
||||
}
|
||||
void condition_wait_with_timeout(Condition *c, BlockingMutex *m, u32 timeout_in_ms) {
|
||||
SleepConditionVariableSRW(&c->cond, &m->srwlock, timeout_in_ms, 0);
|
||||
}
|
||||
|
||||
#else
|
||||
struct BlockingMutex {
|
||||
@@ -170,9 +200,77 @@ void yield_process(void);
|
||||
void semaphore_post (Semaphore *s, i32 count) { while (count --> 0) sem_post(&s->unix_handle); }
|
||||
void semaphore_wait (Semaphore *s) { int i; do { i = sem_wait(&s->unix_handle); } while (i == -1 && errno == EINTR); }
|
||||
#else
|
||||
#error
|
||||
#error Implement Semaphore for this platform
|
||||
#endif
|
||||
|
||||
|
||||
struct Condition {
|
||||
pthread_cond_t pthread_cond;
|
||||
};
|
||||
|
||||
void condition_init(Condition *c) {
|
||||
pthread_cond_init(&c->pthread_cond, NULL);
|
||||
}
|
||||
void condition_destroy(Condition *c) {
|
||||
pthread_cond_destroy(&c->pthread_cond);
|
||||
}
|
||||
void condition_broadcast(Condition *c) {
|
||||
pthread_cond_broadcast(&c->pthread_cond);
|
||||
}
|
||||
void condition_signal(Condition *c) {
|
||||
pthread_cond_signal(&c->pthread_cond);
|
||||
}
|
||||
void condition_wait(Condition *c, BlockingMutex *m) {
|
||||
pthread_cond_wait(&c->pthread_cond, &m->pthread_mutex);
|
||||
}
|
||||
void condition_wait_with_timeout(Condition *c, BlockingMutex *m, u32 timeout_in_ms) {
|
||||
struct timespec abstime = {};
|
||||
abstime.tv_sec = timeout_in_ms/1000;
|
||||
abstime.tv_nsec = cast(long)(timeout_in_ms%1000)*1e6;
|
||||
pthread_cond_timedwait(&c->pthread_cond, &m->pthread_mutex, &abstime);
|
||||
|
||||
}
|
||||
#endif
|
||||
|
||||
|
||||
struct Barrier {
|
||||
BlockingMutex mutex;
|
||||
Condition cond;
|
||||
isize index;
|
||||
isize generation_id;
|
||||
isize thread_count;
|
||||
};
|
||||
|
||||
void barrier_init(Barrier *b, isize thread_count) {
|
||||
mutex_init(&b->mutex);
|
||||
condition_init(&b->cond);
|
||||
b->index = 0;
|
||||
b->generation_id = 0;
|
||||
b->thread_count = 0;
|
||||
}
|
||||
|
||||
void barrier_destroy(Barrier *b) {
|
||||
condition_destroy(&b->cond);
|
||||
mutex_destroy(&b->mutex);
|
||||
}
|
||||
|
||||
// Returns true if it is the leader
|
||||
bool barrier_wait(Barrier *b) {
|
||||
mutex_lock(&b->mutex);
|
||||
defer (mutex_unlock(&b->mutex));
|
||||
isize local_gen = b->generation_id;
|
||||
b->index += 1;
|
||||
if (b->index < b->thread_count) {
|
||||
while (local_gen == b->generation_id && b->index < b->thread_count) {
|
||||
condition_wait(&b->cond, &b->mutex);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
b->index = 0;
|
||||
b->generation_id += 1;
|
||||
condition_broadcast(&b->cond);
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user