Changeset: ff62644620e6 for MonetDB URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=ff62644620e6 Modified Files: sql/backends/monet5/miniseed/79_registrar.sql sql/backends/monet5/miniseed/registrar.c sql/backends/monet5/miniseed/registrar.mal Branch: DVframework Log Message:
registrar became multi-threaded (optional). diffs (truncated from 450 to 300 lines): diff --git a/sql/backends/monet5/miniseed/79_registrar.sql b/sql/backends/monet5/miniseed/79_registrar.sql --- a/sql/backends/monet5/miniseed/79_registrar.sql +++ b/sql/backends/monet5/miniseed/79_registrar.sql @@ -17,7 +17,7 @@ Copyright August 2008-2012 MonetDB B.V. All Rights Reserved. */ -CREATE PROCEDURE register_repo(repo string, mode int) +CREATE PROCEDURE register_repo(repo string, mode int, num_threads int) external name registrar.register_repo; diff --git a/sql/backends/monet5/miniseed/registrar.c b/sql/backends/monet5/miniseed/registrar.c --- a/sql/backends/monet5/miniseed/registrar.c +++ b/sql/backends/monet5/miniseed/registrar.c @@ -9,6 +9,10 @@ #include "sql_mvc.h" #include "sql.h" +#ifdef HAVE_PTHREAD_H +#include <pthread.h> +#endif + /* * keeps BAT and other properties of columns of a table. */ @@ -29,6 +33,16 @@ typedef struct { sht num_tables; } temp_container; +typedef struct { + int tid; /* thread id */ + str *file_paths; /* array of file paths to loop on in the thread */ + long loop_start; + long loop_end; + int mode; /* carries to the thread */ + Client cntxt; /* carries to the thread */ + int *function_created; +} thread_argv; + lng get_line_num(str filename); lng get_file_paths(str repo_path, str** ret_file_paths); str mseed_create_temp_container(temp_container* ret_tc); @@ -37,9 +51,13 @@ str mseed_register(str file_path, temp_c str mseed_register_and_mount(str file_path, temp_container* ret_tc); int concatenate_strs(str* words_to_concat, int num_words_to_concat, str* ret_concatenated); str prepare_insertion(Client cntxt, temp_container* tc); -str insert_into_vault(Client cntxt, MalBlkPtr mb, temp_container* tc); +str insert_into_vault(Client cntxt, temp_container* tc); str SQLstatementIntern(Client c, str *expr, str nme, int execute, bit output); str register_clean_up(temp_container* tc); +void *register_files(void *args); + +pthread_mutex_t create_lock; +pthread_mutex_t insert_lock; /* * returns number of lines in a file. @@ -462,14 +480,13 @@ str prepare_insertion(Client cntxt, temp * * returns error or MAL_SUCCEED. */ -str insert_into_vault(Client cntxt, MalBlkPtr mb, temp_container* tc) +str insert_into_vault(Client cntxt, temp_container* tc) { /* form a sql query str like this: */ /* INSERT INTO mseed.files SELECT * FROM mseed_files_reg(ticket, table_idx); */ int t; long ticket = (long) tc; - mvc *m = NULL; str msg; for(t = 0; t < tc->num_tables; t++) @@ -484,16 +501,6 @@ str insert_into_vault(Client cntxt, MalB } - if((msg = getSQLContext(cntxt, mb, &m, NULL))!= MAL_SUCCEED) - {/* getting mvc failed, what to do */ - return msg; - } - - if(mvc_commit(m, 0, NULL) < 0) - {/* committing failed */ - throw(MAL,"registrar.insert_into_vault", "committing failed\n"); - } - return MAL_SUCCEED; } @@ -547,6 +554,7 @@ str mseed_register(str file_path, temp_c { MSRecord *msr = NULL; + MSFileParam *msfp = NULL; int retcode; short int verbose = 1; BAT *aBAT = NULL; @@ -557,7 +565,8 @@ str mseed_register(str file_path, temp_c str ch = (str) GDKmalloc(2*sizeof(char)); ch[1] = '\0'; - while ((retcode = ms_readmsr (&msr, file_path, 0, NULL, NULL, 1, 0, verbose)) == MS_NOERROR) + /* while ((retcode = ms_readmsr (&msr, file_path, 0, NULL, NULL, 1, 0, verbose)) == MS_NOERROR) */ + while ((retcode = ms_readmsr_r (&msfp, &msr, file_path, 0, NULL, NULL, 1, 0, verbose)) == MS_NOERROR) { if(!files_done) { @@ -647,8 +656,11 @@ str mseed_register(str file_path, temp_c seq_no_fake++; } + GDKfree(ch); + /* Cleanup memory and close file */ - ms_readmsr (&msr, NULL, 0, NULL, NULL, 0, 0, 0); + /* ms_readmsr (&msr, NULL, 0, NULL, NULL, 0, 0, 0); */ + ms_readmsr_r (&msfp, &msr, NULL, 0, NULL, NULL, 0, 0, 0); if ( retcode != MS_ENDOFFILE ) throw(MAL, "mseed_register", "Cannot read %s: %s\n", file_path, ms_errorstr(retcode)); @@ -671,6 +683,7 @@ str mseed_register_and_mount(str file_pa { MSRecord *msr = NULL; + MSFileParam *msfp = NULL; int retcode; short int verbose = 1; short int data_flag = 1; @@ -684,7 +697,8 @@ str mseed_register_and_mount(str file_pa str ch = (str) GDKmalloc(2*sizeof(char)); ch[1] = '\0'; - while ((retcode = ms_readmsr (&msr, file_path, 0, NULL, NULL, 1, data_flag, verbose)) == MS_NOERROR) + /* while ((retcode = ms_readmsr (&msr, file_path, 0, NULL, NULL, 1, data_flag, verbose)) == MS_NOERROR) */ + while ((retcode = ms_readmsr_r (&msfp, &msr, file_path, 0, NULL, NULL, 1, data_flag, verbose)) == MS_NOERROR) { if(!files_done) { @@ -814,7 +828,8 @@ str mseed_register_and_mount(str file_pa } /* Cleanup memory and close file */ - ms_readmsr (&msr, NULL, 0, NULL, NULL, 0, 0, 0); + /* ms_readmsr (&msr, NULL, 0, NULL, NULL, 0, 0, 0); */ + ms_readmsr_r (&msfp, &msr, NULL, 0, NULL, NULL, 0, 0, 0); if ( retcode != MS_ENDOFFILE ) throw(MAL, "mseed_register", "Cannot read %s: %s\n", file_path, ms_errorstr(retcode)); @@ -822,6 +837,90 @@ str mseed_register_and_mount(str file_pa return MAL_SUCCEED; } +void *register_files(void *args) +{ + temp_container *tc; + long i; + str err = NULL; + long start, finish; + + thread_argv targv = *((thread_argv*)args); + + /* create temp_container */ + tc = (temp_container*)GDKmalloc(sizeof(temp_container)); + assert(tc != NULL); + if(targv.mode == 0) + err = mseed_create_temp_container(tc); /* depending on design can get different argument(s) */ + else + err = mseed_create_temp_container_with_data_tables(tc); /* depending on design can get different argument(s) */ + if(err != MAL_SUCCEED) + {/* temp_container creation failed, what to do */ + throw(MAL,"registrar.register_repo", "temp_container creation failed in thread %d: %s\n", targv.tid, err); + } + + start = GDKms(); + /* loop through the file_paths in repo */ + if(targv.mode == 0) + { + for(i = targv.loop_start; i < targv.loop_end; i++) + { + err = mseed_register(targv.file_paths[i], tc); + if(err != MAL_SUCCEED) + {/* current file cannot be registered, what to do */ + /*throw(MAL,"registrar.register_repo", "Current file cannot be registered: %s\n", err); */ + printf("registrar.register_repo: current file cannot be registered in thread %d: %s\n", targv.tid, err); + } + } + } + else + { + for(i = targv.loop_start; i < targv.loop_end; i++) + { + err = mseed_register_and_mount(targv.file_paths[i], tc); + if(err != MAL_SUCCEED) + {/* current file cannot be registered, what to do */ + /* throw(MAL,"registrar.register_repo", "Current file cannot be registered: %s\n", err); */ + printf("registrar.register_repo: current file cannot be registered and/or mounted in thread %d: %s\n", targv.tid, err); + } + } + } + finish = GDKms(); + printf("# In thread %d, time for extraction and transformation of (meta-)data: %ld milliseconds\n", targv.tid, finish - start); + + pthread_mutex_lock(&create_lock); + if(*targv.function_created == 0) + { + /* prepare sql functions for inserting temp_container into tables_to_be_filled */ + err = prepare_insertion(targv.cntxt, tc); + if(err != MAL_SUCCEED) + {/* preparing the insertion failed, what to do */ + throw(MAL,"registrar.register_repo", "Insertion prepare failed in thread %d: %s\n", targv.tid, err); + } + *targv.function_created = 1; + } + pthread_mutex_unlock(&create_lock); + + pthread_mutex_lock(&insert_lock); + start = GDKms(); + /* insert temp_container into tables_to_be_filled */ + err = insert_into_vault(targv.cntxt, tc); + if(err != MAL_SUCCEED) + {/* inserting the temp_container into one of the tables failed, what to do */ + throw(MAL,"registrar.register_repo", "Inserting the temp_container into one of the tables failed in thread %d: %s\n", targv.tid, err); + } + finish = GDKms(); + printf("# In thread %d, time for loading of (meta-)data: %ld milliseconds\n", targv.tid, finish - start); + pthread_mutex_unlock(&insert_lock); + + err = register_clean_up(tc); + if(err != MAL_SUCCEED) + {/* inserting the temp_container into one of the tables failed, what to do */ + throw(MAL,"registrar.register_repo", "Cleaning up the temp_container failed in thread %d: %s\n", targv.tid, err); + } + + return NULL; +} + /* * takes a repository path repo_path, finds out the files in it, creates a * temp_container of the metadata to be inserted, for each file calls the @@ -837,12 +936,15 @@ str register_repo(Client cntxt, MalBlkPt { str *repo_path = (str*) getArgReference(stk,pci,pci->retc); /* arg 1: repo_path */ int mode = *(int*) getArgReference(stk,pci,pci->retc+1); /* arg 2: mode 0:register only, mode 1: register+mount */ + int num_threads = *(int*) getArgReference(stk,pci,pci->retc+2); /* arg 3: 1: no threads, >1: multi-threaded */ str *file_paths = NULL; long num_file_paths; temp_container *tc; long i; str err = NULL; long start, finish; + int function_created = 0; + mvc *m = NULL; /* fetch file_paths from repo_path */ num_file_paths = get_file_paths(*repo_path, &file_paths); @@ -851,70 +953,129 @@ str register_repo(Client cntxt, MalBlkPt throw(MAL,"registrar.register_repo", "Problematic repository: %s\n", err); } - /* create temp_container */ - tc = (temp_container*)GDKmalloc(sizeof(temp_container)); - assert(tc != NULL); - if(mode == 0) - err = mseed_create_temp_container(tc); /* depending on design can get different argument(s) */ - else - err = mseed_create_temp_container_with_data_tables(tc); /* depending on design can get different argument(s) */ - if(err != MAL_SUCCEED) - {/* temp_container creation failed, what to do */ - throw(MAL,"registrar.register_repo", "temp_container creation failed: %s\n", err); - } - - start = GDKms(); - /* loop through the file_paths in repo */ - if(mode == 0) + if(num_threads > 1) { - for(i = 0; i < num_file_paths; i++) + + /* multi-threaded */ + long loop_start = 0; + long num_file_paths_per_thread = num_file_paths / num_threads; + pthread_t *threads = (pthread_t*)GDKmalloc(num_threads*sizeof(pthread_t)); + thread_argv *targvs = (thread_argv*)GDKmalloc(num_threads*sizeof(thread_argv)); + int j; + + if (pthread_mutex_init(&insert_lock, NULL) != 0 || pthread_mutex_init(&create_lock, NULL) != 0) { - err = mseed_register(file_paths[i], tc); - if(err != MAL_SUCCEED) - {/* current file cannot be registered, what to do */ - /* throw(MAL,"registrar.register_repo", "Current file cannot be registered: %s\n", err); */ - printf("registrar.register_repo: current file cannot be registered: %s\n", err); - } + throw(MAL,"registrar.register_repo", "mutex init failed\n"); } + + for(j = 0; j < num_threads; j++) + { _______________________________________________ checkin-list mailing list checkin-list@monetdb.org http://mail.monetdb.org/mailman/listinfo/checkin-list