Changeset: ff62644620e6 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=ff62644620e6
Modified Files:
        sql/backends/monet5/miniseed/79_registrar.sql
        sql/backends/monet5/miniseed/registrar.c
        sql/backends/monet5/miniseed/registrar.mal
Branch: DVframework
Log Message:

registrar became multi-threaded (optional).


diffs (truncated from 450 to 300 lines):

diff --git a/sql/backends/monet5/miniseed/79_registrar.sql 
b/sql/backends/monet5/miniseed/79_registrar.sql
--- a/sql/backends/monet5/miniseed/79_registrar.sql
+++ b/sql/backends/monet5/miniseed/79_registrar.sql
@@ -17,7 +17,7 @@ Copyright August 2008-2012 MonetDB B.V.
 All Rights Reserved.
 */
 
-CREATE PROCEDURE register_repo(repo string, mode int)
+CREATE PROCEDURE register_repo(repo string, mode int, num_threads int)
 external name registrar.register_repo;
 
 
diff --git a/sql/backends/monet5/miniseed/registrar.c 
b/sql/backends/monet5/miniseed/registrar.c
--- a/sql/backends/monet5/miniseed/registrar.c
+++ b/sql/backends/monet5/miniseed/registrar.c
@@ -9,6 +9,10 @@
 #include "sql_mvc.h"
 #include "sql.h"
 
+#ifdef HAVE_PTHREAD_H
+#include <pthread.h>
+#endif
+
 /*
  * keeps BAT and other properties of columns of a table.
  */
@@ -29,6 +33,16 @@ typedef struct {
        sht num_tables;
 } temp_container;
 
+typedef struct {
+       int tid; /* thread id */
+       str *file_paths; /* array of file paths to loop on in the thread */
+       long loop_start;
+       long loop_end;
+       int mode; /* carries to the thread */
+       Client cntxt; /* carries to the thread */
+       int *function_created;
+} thread_argv;
+
 lng get_line_num(str filename);
 lng get_file_paths(str repo_path, str** ret_file_paths);
 str mseed_create_temp_container(temp_container* ret_tc);
@@ -37,9 +51,13 @@ str mseed_register(str file_path, temp_c
 str mseed_register_and_mount(str file_path, temp_container* ret_tc);
 int concatenate_strs(str* words_to_concat, int num_words_to_concat, str* 
ret_concatenated);
 str prepare_insertion(Client cntxt, temp_container* tc);
-str insert_into_vault(Client cntxt, MalBlkPtr mb, temp_container* tc);
+str insert_into_vault(Client cntxt, temp_container* tc);
 str SQLstatementIntern(Client c, str *expr, str nme, int execute, bit output);
 str register_clean_up(temp_container* tc);
+void *register_files(void *args);
+
+pthread_mutex_t create_lock;
+pthread_mutex_t insert_lock;
 
 /*
  * returns number of lines in a file.
@@ -462,14 +480,13 @@ str prepare_insertion(Client cntxt, temp
  *
  * returns error or MAL_SUCCEED.
  */
-str insert_into_vault(Client cntxt, MalBlkPtr mb, temp_container* tc)
+str insert_into_vault(Client cntxt, temp_container* tc)
 {
 /* form a sql query str like this: */
 /* INSERT INTO mseed.files SELECT * FROM mseed_files_reg(ticket, table_idx); */
 
        int t;
        long ticket = (long) tc;
-       mvc *m = NULL;
        str msg;
 
        for(t = 0; t < tc->num_tables; t++)
@@ -484,16 +501,6 @@ str insert_into_vault(Client cntxt, MalB
 
        }
 
-       if((msg = getSQLContext(cntxt, mb, &m, NULL))!= MAL_SUCCEED)
-       {/* getting mvc failed, what to do */
-               return msg;
-       }
-
-       if(mvc_commit(m, 0, NULL) < 0)
-       {/* committing failed */
-               throw(MAL,"registrar.insert_into_vault", "committing failed\n");
-       }
-
        return MAL_SUCCEED;
 }
 
@@ -547,6 +554,7 @@ str mseed_register(str file_path, temp_c
 {
 
        MSRecord *msr = NULL;
+       MSFileParam *msfp = NULL;
        int retcode;
        short int verbose = 1;
        BAT *aBAT = NULL;
@@ -557,7 +565,8 @@ str mseed_register(str file_path, temp_c
        str ch = (str) GDKmalloc(2*sizeof(char));
        ch[1] = '\0';
 
-       while ((retcode = ms_readmsr (&msr, file_path, 0, NULL, NULL, 1, 0, 
verbose)) == MS_NOERROR)
+       /* while ((retcode = ms_readmsr (&msr, file_path, 0, NULL, NULL, 1, 0, 
verbose)) == MS_NOERROR) */
+       while ((retcode = ms_readmsr_r (&msfp, &msr, file_path, 0, NULL, NULL, 
1, 0, verbose)) == MS_NOERROR)
        {
                if(!files_done)
                {
@@ -647,8 +656,11 @@ str mseed_register(str file_path, temp_c
                seq_no_fake++;
        }
 
+       GDKfree(ch);
+       
        /* Cleanup memory and close file */
-       ms_readmsr (&msr, NULL, 0, NULL, NULL, 0, 0, 0);
+       /* ms_readmsr (&msr, NULL, 0, NULL, NULL, 0, 0, 0); */
+       ms_readmsr_r (&msfp, &msr, NULL, 0, NULL, NULL, 0, 0, 0);
 
        if ( retcode != MS_ENDOFFILE )
                throw(MAL, "mseed_register", "Cannot read %s: %s\n", file_path, 
ms_errorstr(retcode));
@@ -671,6 +683,7 @@ str mseed_register_and_mount(str file_pa
 {
 
        MSRecord *msr = NULL;
+       MSFileParam *msfp = NULL;
        int retcode;
        short int verbose = 1;
        short int data_flag = 1;
@@ -684,7 +697,8 @@ str mseed_register_and_mount(str file_pa
        str ch = (str) GDKmalloc(2*sizeof(char));
        ch[1] = '\0';
 
-       while ((retcode = ms_readmsr (&msr, file_path, 0, NULL, NULL, 1, 
data_flag, verbose)) == MS_NOERROR)
+       /* while ((retcode = ms_readmsr (&msr, file_path, 0, NULL, NULL, 1, 
data_flag, verbose)) == MS_NOERROR) */
+       while ((retcode = ms_readmsr_r (&msfp, &msr, file_path, 0, NULL, NULL, 
1, data_flag, verbose)) == MS_NOERROR)
        {
                if(!files_done)
                {
@@ -814,7 +828,8 @@ str mseed_register_and_mount(str file_pa
        }
 
        /* Cleanup memory and close file */
-       ms_readmsr (&msr, NULL, 0, NULL, NULL, 0, 0, 0);
+       /* ms_readmsr (&msr, NULL, 0, NULL, NULL, 0, 0, 0); */
+       ms_readmsr_r (&msfp, &msr, NULL, 0, NULL, NULL, 0, 0, 0);
 
        if ( retcode != MS_ENDOFFILE )
                throw(MAL, "mseed_register", "Cannot read %s: %s\n", file_path, 
ms_errorstr(retcode));
@@ -822,6 +837,90 @@ str mseed_register_and_mount(str file_pa
        return MAL_SUCCEED;
 }
 
+void *register_files(void *args)
+{
+       temp_container *tc;
+       long i;
+       str err = NULL;
+       long start, finish;
+       
+       thread_argv targv = *((thread_argv*)args);
+       
+       /* create temp_container */
+       tc = (temp_container*)GDKmalloc(sizeof(temp_container));
+       assert(tc != NULL);
+       if(targv.mode == 0)
+               err = mseed_create_temp_container(tc); /* depending on design 
can get different argument(s) */
+       else
+               err = mseed_create_temp_container_with_data_tables(tc); /* 
depending on design can get different argument(s) */
+       if(err != MAL_SUCCEED)
+       {/* temp_container creation failed, what to do */
+               throw(MAL,"registrar.register_repo", "temp_container creation 
failed in thread %d: %s\n", targv.tid, err);
+       }
+       
+       start = GDKms();
+       /* loop through the file_paths in repo */
+       if(targv.mode == 0)
+       {
+               for(i = targv.loop_start; i < targv.loop_end; i++)
+               {
+                       err = mseed_register(targv.file_paths[i], tc);
+                       if(err != MAL_SUCCEED)
+                       {/* current file cannot be registered, what to do */
+                               /*throw(MAL,"registrar.register_repo", "Current 
file cannot be registered: %s\n", err); */
+                               printf("registrar.register_repo: current file 
cannot be registered in thread %d: %s\n", targv.tid, err);
+                       }
+               }
+       }
+       else
+       {
+               for(i = targv.loop_start; i < targv.loop_end; i++)
+               {
+                       err = mseed_register_and_mount(targv.file_paths[i], tc);
+                       if(err != MAL_SUCCEED)
+                       {/* current file cannot be registered, what to do */
+                               /* throw(MAL,"registrar.register_repo", 
"Current file cannot be registered: %s\n", err); */
+                               printf("registrar.register_repo: current file 
cannot be registered and/or mounted in thread %d: %s\n", targv.tid, err);
+                       }
+               }
+       }
+       finish = GDKms();
+       printf("# In thread %d, time for extraction and transformation of 
(meta-)data: %ld milliseconds\n", targv.tid, finish - start);
+       
+       pthread_mutex_lock(&create_lock);
+       if(*targv.function_created == 0)
+       {
+               /* prepare sql functions for inserting temp_container into 
tables_to_be_filled */
+               err = prepare_insertion(targv.cntxt, tc);
+               if(err != MAL_SUCCEED)
+               {/* preparing the insertion failed, what to do */
+                       throw(MAL,"registrar.register_repo", "Insertion prepare 
failed in thread %d: %s\n", targv.tid, err);
+               }
+               *targv.function_created = 1;
+       }
+       pthread_mutex_unlock(&create_lock);
+       
+       pthread_mutex_lock(&insert_lock);
+       start = GDKms();
+       /* insert temp_container into tables_to_be_filled */
+       err = insert_into_vault(targv.cntxt, tc);
+       if(err != MAL_SUCCEED)
+       {/* inserting the temp_container into one of the tables failed, what to 
do */
+               throw(MAL,"registrar.register_repo", "Inserting the 
temp_container into one of the tables failed in thread %d: %s\n", targv.tid, 
err);
+       }
+       finish = GDKms();
+       printf("# In thread %d, time for loading of (meta-)data: %ld 
milliseconds\n", targv.tid, finish - start);
+       pthread_mutex_unlock(&insert_lock);
+       
+       err = register_clean_up(tc);
+       if(err != MAL_SUCCEED)
+       {/* inserting the temp_container into one of the tables failed, what to 
do */
+       throw(MAL,"registrar.register_repo", "Cleaning up the temp_container 
failed in thread %d: %s\n", targv.tid, err);
+       }
+       
+       return NULL;
+}
+
 /*
  * takes a repository path repo_path, finds out the files in it, creates a
  * temp_container of the metadata to be inserted, for each file calls the
@@ -837,12 +936,15 @@ str register_repo(Client cntxt, MalBlkPt
 {
        str *repo_path = (str*) getArgReference(stk,pci,pci->retc); /* arg 1: 
repo_path */
        int mode = *(int*) getArgReference(stk,pci,pci->retc+1); /* arg 2: mode 
0:register only, mode 1: register+mount */
+       int num_threads = *(int*) getArgReference(stk,pci,pci->retc+2); /* arg 
3: 1: no threads, >1: multi-threaded */
        str *file_paths = NULL;
        long num_file_paths;
        temp_container *tc;
        long i;
        str err = NULL;
        long start, finish;
+       int function_created = 0;
+       mvc *m = NULL;
 
        /* fetch file_paths from repo_path */
        num_file_paths = get_file_paths(*repo_path, &file_paths);
@@ -851,70 +953,129 @@ str register_repo(Client cntxt, MalBlkPt
                throw(MAL,"registrar.register_repo", "Problematic repository: 
%s\n", err);
        }
 
-       /* create temp_container */
-       tc = (temp_container*)GDKmalloc(sizeof(temp_container));
-       assert(tc != NULL);
-       if(mode == 0)
-               err = mseed_create_temp_container(tc); /* depending on design 
can get different argument(s) */
-       else
-               err = mseed_create_temp_container_with_data_tables(tc); /* 
depending on design can get different argument(s) */
-       if(err != MAL_SUCCEED)
-       {/* temp_container creation failed, what to do */
-               throw(MAL,"registrar.register_repo", "temp_container creation 
failed: %s\n", err);
-       }
-
-       start = GDKms();
-       /* loop through the file_paths in repo */
-       if(mode == 0)
+       if(num_threads > 1)
        {
-               for(i = 0; i < num_file_paths; i++)
+               
+               /* multi-threaded */
+               long loop_start = 0;
+               long num_file_paths_per_thread = num_file_paths / num_threads;
+               pthread_t *threads = 
(pthread_t*)GDKmalloc(num_threads*sizeof(pthread_t));
+               thread_argv *targvs = 
(thread_argv*)GDKmalloc(num_threads*sizeof(thread_argv));
+               int j;
+       
+               if (pthread_mutex_init(&insert_lock, NULL) != 0 || 
pthread_mutex_init(&create_lock, NULL) != 0)
                {
-                       err = mseed_register(file_paths[i], tc);
-                       if(err != MAL_SUCCEED)
-                       {/* current file cannot be registered, what to do */
-       /*                      throw(MAL,"registrar.register_repo", "Current 
file cannot be registered: %s\n", err); */
-                               printf("registrar.register_repo: current file 
cannot be registered: %s\n", err);
-                       }
+                       throw(MAL,"registrar.register_repo", "mutex init 
failed\n");
                }
+               
+               for(j = 0; j < num_threads; j++)
+               {
_______________________________________________
checkin-list mailing list
checkin-list@monetdb.org
http://mail.monetdb.org/mailman/listinfo/checkin-list

Reply via email to