From c44d25d14f2a8f800170daae4a1c9d08858978b6 Mon Sep 17 00:00:00 2001 From: thebirk Date: Mon, 26 Aug 2019 16:47:41 +0200 Subject: [PATCH 1/7] Fixed parser creating a new thread for each file. --- src/parser.cpp | 107 ++++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 97 insertions(+), 10 deletions(-) diff --git a/src/parser.cpp b/src/parser.cpp index 9aebc78d6..0a2a3c1fc 100644 --- a/src/parser.cpp +++ b/src/parser.cpp @@ -4679,7 +4679,47 @@ skip: return ParseFile_None; } +#if 1 +struct ParserWorkerThreadData { + Parser *parser; + gbSemaphore resume_work; //NOTE(thebirk): Use to signal that the worker thead has a new work to do + ParseFileError err; + + gbMutex lock; //NOTE(thebirk): All variables below are locked by this mutex + bool error_available; + bool should_exit; +}; + + +GB_THREAD_PROC(parse_worker_file_proc) { + GB_ASSERT(thread != nullptr); + + ParserWorkerThreadData* data = cast(ParserWorkerThreadData*) thread->user_data; + + for(;;) { + gb_semaphore_wait(&data->resume_work); + + gb_mutex_lock(&data->lock); + if (data->should_exit) { + gb_mutex_unlock(&data->lock); + return isize(0); + } + + Parser *p = data->parser; + isize index = thread->user_index; + gb_mutex_lock(&p->file_add_mutex); + auto file_to_process = p->files_to_process[index]; + gb_mutex_unlock(&p->file_add_mutex); + data->err = process_imported_file(p, file_to_process); + + data->error_available = true; + gb_mutex_unlock(&data->lock); + } + + //GB_PANIC("A worker thread should not be able to reach the end!!!"); +} +#else GB_THREAD_PROC(parse_worker_file_proc) { if (thread == nullptr) return 0; auto *p = cast(Parser *)thread->user_data; @@ -4690,6 +4730,7 @@ GB_THREAD_PROC(parse_worker_file_proc) { ParseFileError err = process_imported_file(p, file_to_process); return cast(isize)err; } +#endif ParseFileError parse_packages(Parser *p, String init_filename) { GB_ASSERT(init_filename.text[init_filename.len] == 0); @@ -4729,14 +4770,36 @@ ParseFileError parse_packages(Parser *p, String init_filename) { curr_import_index++; } + auto worker_threads_data = array_make(heap_allocator(), thread_count); + defer (array_free(&worker_threads_data)); + + for_array(i, worker_threads_data) { + ParserWorkerThreadData *data = &worker_threads_data[i]; + gb_mutex_init(&data->lock); + gb_semaphore_init(&data->resume_work); + data->parser = p; + data->err = ParseFile_None; + data->should_exit = false; + data->error_available = false; + } + defer(for_array(i, worker_threads_data) { + ParserWorkerThreadData *data = &worker_threads_data[i]; + gb_mutex_destroy(&data->lock); + gb_semaphore_destroy(&data->resume_work); + }); + auto worker_threads = array_make(heap_allocator(), thread_count); defer (array_free(&worker_threads)); for_array(i, worker_threads) { gbThread *t = &worker_threads[i]; gb_thread_init(t); + char buffer[64]; + gb_snprintf(buffer, 64, "Parser Worker #%lld", i); + gb_thread_set_name(t, buffer); + gb_thread_start(t, parse_worker_file_proc, &worker_threads_data[i]); } - defer (for_array(i, worker_threads) { + defer(for_array(i, worker_threads) { gb_thread_destroy(&worker_threads[i]); }); @@ -4744,27 +4807,51 @@ ParseFileError parse_packages(Parser *p, String init_filename) { for (;;) { bool are_any_alive = false; + for_array(i, worker_threads) { gbThread *t = &worker_threads[i]; - if (gb_thread_is_running(t)) { - are_any_alive = true; - } else if (curr_import_index < p->files_to_process.count) { - auto curr_err = cast(ParseFileError)t->return_value; - if (curr_err != ParseFile_None) { - array_add(&errors, curr_err); - } else { + ParserWorkerThreadData *data = &worker_threads_data[i]; + + if (gb_mutex_try_lock(&data->lock)) { + if (data->error_available) { + auto curr_err = data->err; + if (curr_err != ParseFile_None) { + array_add(&errors, curr_err); + } + + data->error_available = false; + } + + if (curr_import_index < p->files_to_process.count) { t->user_index = curr_import_index; curr_import_index++; - gb_thread_start(t, parse_worker_file_proc, p); are_any_alive = true; + + gb_semaphore_release(&data->resume_work); } + + gb_mutex_unlock(&data->lock); + } else { + //NOTE(thebirk): If we cant lock a thread it must be working + are_any_alive = true; } } - if (!are_any_alive && curr_import_index >= p->files_to_process.count) { + + //NOTE(thebirk): Everything collapses without this, but it really shouldn't! + gb_yield(); + + if ((!are_any_alive) && (curr_import_index >= p->files_to_process.count)) { break; } } + //NOTE(thebirk): Signal all workers to exit + for_array(i, worker_threads_data) { + ParserWorkerThreadData* data = &worker_threads_data[i]; + data->should_exit = true; + gb_semaphore_release(&data->resume_work); + } + if (errors.count > 0) { return errors[errors.count-1]; } From 97dfcffa761acdf22a12412db5610cc4beb2c4d6 Mon Sep 17 00:00:00 2001 From: thebirk Date: Mon, 26 Aug 2019 19:09:52 +0200 Subject: [PATCH 2/7] Fixed error where the parser would end early. --- src/parser.cpp | 34 +++++++++++++++++++++++++++------- src/parser.hpp | 1 + 2 files changed, 28 insertions(+), 7 deletions(-) diff --git a/src/parser.cpp b/src/parser.cpp index 0a2a3c1fc..d7e7f63b7 100644 --- a/src/parser.cpp +++ b/src/parser.cpp @@ -4062,6 +4062,7 @@ bool init_parser(Parser *p) { array_init(&p->files_to_process, heap_allocator()); gb_mutex_init(&p->file_add_mutex); gb_mutex_init(&p->file_decl_mutex); + gb_semaphore_init(&p->worker_finished_semaphore); return true; } @@ -4087,6 +4088,7 @@ void destroy_parser(Parser *p) { map_destroy(&p->package_map); gb_mutex_destroy(&p->file_add_mutex); gb_mutex_destroy(&p->file_decl_mutex); + gb_semaphore_destroy(&p->worker_finished_semaphore); } @@ -4714,6 +4716,7 @@ GB_THREAD_PROC(parse_worker_file_proc) { data->err = process_imported_file(p, file_to_process); data->error_available = true; + gb_semaphore_release(&p->worker_finished_semaphore); gb_mutex_unlock(&data->lock); } @@ -4760,6 +4763,8 @@ ParseFileError parse_packages(Parser *p, String init_filename) { isize thread_count = gb_max(build_context.thread_count, 1); if (thread_count > 1) { isize volatile curr_import_index = 0; + +#if 0 isize initial_file_count = p->files_to_process.count; // NOTE(bill): Make sure that these are in parsed in this order for (isize i = 0; i < initial_file_count; i++) { @@ -4769,6 +4774,7 @@ ParseFileError parse_packages(Parser *p, String init_filename) { } curr_import_index++; } +#endif auto worker_threads_data = array_make(heap_allocator(), thread_count); defer (array_free(&worker_threads_data)); @@ -4795,7 +4801,7 @@ ParseFileError parse_packages(Parser *p, String init_filename) { gbThread *t = &worker_threads[i]; gb_thread_init(t); char buffer[64]; - gb_snprintf(buffer, 64, "Parser Worker #%lld", i); + gb_snprintf(buffer, 64, "Parser Worker #%ll", i); gb_thread_set_name(t, buffer); gb_thread_start(t, parse_worker_file_proc, &worker_threads_data[i]); } @@ -4806,7 +4812,7 @@ ParseFileError parse_packages(Parser *p, String init_filename) { auto errors = array_make(heap_allocator(), 0, 16); for (;;) { - bool are_any_alive = false; + int num_alive = 0; for_array(i, worker_threads) { gbThread *t = &worker_threads[i]; @@ -4825,7 +4831,7 @@ ParseFileError parse_packages(Parser *p, String init_filename) { if (curr_import_index < p->files_to_process.count) { t->user_index = curr_import_index; curr_import_index++; - are_any_alive = true; + num_alive += 1; gb_semaphore_release(&data->resume_work); } @@ -4833,14 +4839,28 @@ ParseFileError parse_packages(Parser *p, String init_filename) { gb_mutex_unlock(&data->lock); } else { //NOTE(thebirk): If we cant lock a thread it must be working - are_any_alive = true; + num_alive += 1; + } } - //NOTE(thebirk): Everything collapses without this, but it really shouldn't! - gb_yield(); + while (num_alive > 0) { + isize prev_files_to_process = p->files_to_process.count; + gb_semaphore_wait(&p->worker_finished_semaphore); + num_alive -= 1; - if ((!are_any_alive) && (curr_import_index >= p->files_to_process.count)) { + if (prev_files_to_process < p->files_to_process.count) { + if (num_alive > 0) { + //NOTE(thebirk): Recreate semaphore to avoid overflowing the counter. Only needs to happen when there are more threads alive + gb_semaphore_destroy(&p->worker_finished_semaphore); + gb_semaphore_init(&p->worker_finished_semaphore); + } + break; + } + } + + + if ((num_alive == 0) && (curr_import_index >= p->files_to_process.count)) { break; } } diff --git a/src/parser.hpp b/src/parser.hpp index 3489f1a9b..f1bbd8784 100644 --- a/src/parser.hpp +++ b/src/parser.hpp @@ -139,6 +139,7 @@ struct Parser { isize total_line_count; gbMutex file_add_mutex; gbMutex file_decl_mutex; + gbSemaphore worker_finished_semaphore; }; enum ProcInlining { From 4551521b2cc461d89d9d69ea441ae311a8607f94 Mon Sep 17 00:00:00 2001 From: thebirk Date: Mon, 26 Aug 2019 19:51:33 +0200 Subject: [PATCH 3/7] Im just trying things at this point, Bill should just squash this PR at merge time ;) --- src/parser.cpp | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/src/parser.cpp b/src/parser.cpp index d7e7f63b7..97715b10e 100644 --- a/src/parser.cpp +++ b/src/parser.cpp @@ -4690,6 +4690,7 @@ struct ParserWorkerThreadData { gbMutex lock; //NOTE(thebirk): All variables below are locked by this mutex bool error_available; + bool is_working; bool should_exit; }; @@ -4716,8 +4717,9 @@ GB_THREAD_PROC(parse_worker_file_proc) { data->err = process_imported_file(p, file_to_process); data->error_available = true; - gb_semaphore_release(&p->worker_finished_semaphore); + data->is_working = false; gb_mutex_unlock(&data->lock); + gb_semaphore_release(&p->worker_finished_semaphore); } //GB_PANIC("A worker thread should not be able to reach the end!!!"); @@ -4818,7 +4820,7 @@ ParseFileError parse_packages(Parser *p, String init_filename) { gbThread *t = &worker_threads[i]; ParserWorkerThreadData *data = &worker_threads_data[i]; - if (gb_mutex_try_lock(&data->lock)) { + if (!data->is_working && gb_mutex_try_lock(&data->lock)) { if (data->error_available) { auto curr_err = data->err; if (curr_err != ParseFile_None) { @@ -4834,6 +4836,7 @@ ParseFileError parse_packages(Parser *p, String init_filename) { num_alive += 1; gb_semaphore_release(&data->resume_work); + data->is_working = true; } gb_mutex_unlock(&data->lock); @@ -4844,20 +4847,23 @@ ParseFileError parse_packages(Parser *p, String init_filename) { } } + /* while (num_alive > 0) { isize prev_files_to_process = p->files_to_process.count; gb_semaphore_wait(&p->worker_finished_semaphore); num_alive -= 1; - if (prev_files_to_process < p->files_to_process.count) { + if ((prev_files_to_process < p->files_to_process.count)) { if (num_alive > 0) { //NOTE(thebirk): Recreate semaphore to avoid overflowing the counter. Only needs to happen when there are more threads alive - gb_semaphore_destroy(&p->worker_finished_semaphore); - gb_semaphore_init(&p->worker_finished_semaphore); + //gb_semaphore_destroy(&p->worker_finished_semaphore); + //gb_semaphore_init(&p->worker_finished_semaphore); } + printf("Early out!\n"); break; } } + */ if ((num_alive == 0) && (curr_import_index >= p->files_to_process.count)) { From 6a8b3fee38b3bb05ffc599d619e78472ad8b56a0 Mon Sep 17 00:00:00 2001 From: thebirk Date: Mon, 26 Aug 2019 20:23:52 +0200 Subject: [PATCH 4/7] Removed gb_thread_set_name because it segfaults on linux. --- src/parser.cpp | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/parser.cpp b/src/parser.cpp index 97715b10e..70cc37433 100644 --- a/src/parser.cpp +++ b/src/parser.cpp @@ -4789,6 +4789,7 @@ ParseFileError parse_packages(Parser *p, String init_filename) { data->err = ParseFile_None; data->should_exit = false; data->error_available = false; + data->is_working = false; } defer(for_array(i, worker_threads_data) { ParserWorkerThreadData *data = &worker_threads_data[i]; @@ -4802,9 +4803,9 @@ ParseFileError parse_packages(Parser *p, String init_filename) { for_array(i, worker_threads) { gbThread *t = &worker_threads[i]; gb_thread_init(t); - char buffer[64]; - gb_snprintf(buffer, 64, "Parser Worker #%ll", i); - gb_thread_set_name(t, buffer); + //char buffer[64]; + //gb_snprintf(buffer, 64, "Parser Worker #%ll", i); + //gb_thread_set_name(t, buffer); gb_thread_start(t, parse_worker_file_proc, &worker_threads_data[i]); } defer(for_array(i, worker_threads) { From d76249d90b4d2812c2d8c2d25d854a3e9361b510 Mon Sep 17 00:00:00 2001 From: thebirk Date: Thu, 29 Aug 2019 20:27:38 +0200 Subject: [PATCH 5/7] Cleaned up parse_packages and the worker proc. --- src/parser.cpp | 50 ++++++++++++-------------------------------------- 1 file changed, 12 insertions(+), 38 deletions(-) diff --git a/src/parser.cpp b/src/parser.cpp index 70cc37433..3fc999a2c 100644 --- a/src/parser.cpp +++ b/src/parser.cpp @@ -4681,14 +4681,13 @@ skip: return ParseFile_None; } -#if 1 struct ParserWorkerThreadData { Parser *parser; - - gbSemaphore resume_work; //NOTE(thebirk): Use to signal that the worker thead has a new work to do ParseFileError err; - gbMutex lock; //NOTE(thebirk): All variables below are locked by this mutex + gbSemaphore resume_work; + gbMutex lock; + bool error_available; bool is_working; bool should_exit; @@ -4724,18 +4723,6 @@ GB_THREAD_PROC(parse_worker_file_proc) { //GB_PANIC("A worker thread should not be able to reach the end!!!"); } -#else -GB_THREAD_PROC(parse_worker_file_proc) { - if (thread == nullptr) return 0; - auto *p = cast(Parser *)thread->user_data; - isize index = thread->user_index; - gb_mutex_lock(&p->file_add_mutex); - auto file_to_process = p->files_to_process[index]; - gb_mutex_unlock(&p->file_add_mutex); - ParseFileError err = process_imported_file(p, file_to_process); - return cast(isize)err; -} -#endif ParseFileError parse_packages(Parser *p, String init_filename) { GB_ASSERT(init_filename.text[init_filename.len] == 0); @@ -4767,6 +4754,8 @@ ParseFileError parse_packages(Parser *p, String init_filename) { isize volatile curr_import_index = 0; #if 0 + //NOTE(thebirk): Leaving this piece of code behind if it turns out we need it, yes I know git exists + isize initial_file_count = p->files_to_process.count; // NOTE(bill): Make sure that these are in parsed in this order for (isize i = 0; i < initial_file_count; i++) { @@ -4803,8 +4792,11 @@ ParseFileError parse_packages(Parser *p, String init_filename) { for_array(i, worker_threads) { gbThread *t = &worker_threads[i]; gb_thread_init(t); - //char buffer[64]; - //gb_snprintf(buffer, 64, "Parser Worker #%ll", i); + //NOTE(thebirk): This crashes on linux. In addition to that the method used on windows does + // not get picked up by a lot of tools look into using SetThreadDescription + // when running on new enough windows 10 builds + //char buffer[32]; + //gb_snprintf(buffer, 32, "Parser Worker #%ll", i); //gb_thread_set_name(t, buffer); gb_thread_start(t, parse_worker_file_proc, &worker_threads_data[i]); } @@ -4844,32 +4836,14 @@ ParseFileError parse_packages(Parser *p, String init_filename) { } else { //NOTE(thebirk): If we cant lock a thread it must be working num_alive += 1; - } } - /* - while (num_alive > 0) { - isize prev_files_to_process = p->files_to_process.count; - gb_semaphore_wait(&p->worker_finished_semaphore); - num_alive -= 1; - - if ((prev_files_to_process < p->files_to_process.count)) { - if (num_alive > 0) { - //NOTE(thebirk): Recreate semaphore to avoid overflowing the counter. Only needs to happen when there are more threads alive - //gb_semaphore_destroy(&p->worker_finished_semaphore); - //gb_semaphore_init(&p->worker_finished_semaphore); - } - printf("Early out!\n"); - break; - } - } - */ - - if ((num_alive == 0) && (curr_import_index >= p->files_to_process.count)) { break; } + + gb_yield(); } //NOTE(thebirk): Signal all workers to exit From 4dade346033bc3d19879bee1823abf3d9e230281 Mon Sep 17 00:00:00 2001 From: thebirk Date: Thu, 29 Aug 2019 20:34:09 +0200 Subject: [PATCH 6/7] Removed unused semaphore on Parser. --- src/parser.cpp | 2 -- src/parser.hpp | 1 - 2 files changed, 3 deletions(-) diff --git a/src/parser.cpp b/src/parser.cpp index 3fc999a2c..f08688b5f 100644 --- a/src/parser.cpp +++ b/src/parser.cpp @@ -4062,7 +4062,6 @@ bool init_parser(Parser *p) { array_init(&p->files_to_process, heap_allocator()); gb_mutex_init(&p->file_add_mutex); gb_mutex_init(&p->file_decl_mutex); - gb_semaphore_init(&p->worker_finished_semaphore); return true; } @@ -4088,7 +4087,6 @@ void destroy_parser(Parser *p) { map_destroy(&p->package_map); gb_mutex_destroy(&p->file_add_mutex); gb_mutex_destroy(&p->file_decl_mutex); - gb_semaphore_destroy(&p->worker_finished_semaphore); } diff --git a/src/parser.hpp b/src/parser.hpp index f1bbd8784..3489f1a9b 100644 --- a/src/parser.hpp +++ b/src/parser.hpp @@ -139,7 +139,6 @@ struct Parser { isize total_line_count; gbMutex file_add_mutex; gbMutex file_decl_mutex; - gbSemaphore worker_finished_semaphore; }; enum ProcInlining { From f921a91fc8a1e2674279e79c7357adfb0016b360 Mon Sep 17 00:00:00 2001 From: thebirk Date: Thu, 29 Aug 2019 20:35:12 +0200 Subject: [PATCH 7/7] Properly removed the semaphore. --- src/parser.cpp | 1 - 1 file changed, 1 deletion(-) diff --git a/src/parser.cpp b/src/parser.cpp index f08688b5f..31d21b737 100644 --- a/src/parser.cpp +++ b/src/parser.cpp @@ -4716,7 +4716,6 @@ GB_THREAD_PROC(parse_worker_file_proc) { data->error_available = true; data->is_working = false; gb_mutex_unlock(&data->lock); - gb_semaphore_release(&p->worker_finished_semaphore); } //GB_PANIC("A worker thread should not be able to reach the end!!!");