Changeset: ff62644620e6 for MonetDB
Modified Files:
Branch: DVframework
Log Message:

registrar became multi-threaded (optional).

diffs (truncated from 450 to 300 lines):

diff --git a/sql/backends/monet5/miniseed/79_registrar.sql 
--- a/sql/backends/monet5/miniseed/79_registrar.sql
+++ b/sql/backends/monet5/miniseed/79_registrar.sql
@@ -17,7 +17,7 @@ Copyright August 2008-2012 MonetDB B.V.
 All Rights Reserved.
-CREATE PROCEDURE register_repo(repo string, mode int)
+CREATE PROCEDURE register_repo(repo string, mode int, num_threads int)
 external name registrar.register_repo;
diff --git a/sql/backends/monet5/miniseed/registrar.c 
--- a/sql/backends/monet5/miniseed/registrar.c
+++ b/sql/backends/monet5/miniseed/registrar.c
@@ -9,6 +9,10 @@
 #include "sql_mvc.h"
 #include "sql.h"
+#include <pthread.h>
  * keeps BAT and other properties of columns of a table.
@@ -29,6 +33,16 @@ typedef struct {
        sht num_tables;
 } temp_container;
+typedef struct {
+       int tid; /* thread id */
+       str *file_paths; /* array of file paths to loop on in the thread */
+       long loop_start;
+       long loop_end;
+       int mode; /* carries to the thread */
+       Client cntxt; /* carries to the thread */
+       int *function_created;
+} thread_argv;
 lng get_line_num(str filename);
 lng get_file_paths(str repo_path, str** ret_file_paths);
 str mseed_create_temp_container(temp_container* ret_tc);
@@ -37,9 +51,13 @@ str mseed_register(str file_path, temp_c
 str mseed_register_and_mount(str file_path, temp_container* ret_tc);
 int concatenate_strs(str* words_to_concat, int num_words_to_concat, str* 
 str prepare_insertion(Client cntxt, temp_container* tc);
-str insert_into_vault(Client cntxt, MalBlkPtr mb, temp_container* tc);
+str insert_into_vault(Client cntxt, temp_container* tc);
 str SQLstatementIntern(Client c, str *expr, str nme, int execute, bit output);
 str register_clean_up(temp_container* tc);
+void *register_files(void *args);
+pthread_mutex_t create_lock;
+pthread_mutex_t insert_lock;
  * returns number of lines in a file.
@@ -462,14 +480,13 @@ str prepare_insertion(Client cntxt, temp
  * returns error or MAL_SUCCEED.
-str insert_into_vault(Client cntxt, MalBlkPtr mb, temp_container* tc)
+str insert_into_vault(Client cntxt, temp_container* tc)
 /* form a sql query str like this: */
 /* INSERT INTO mseed.files SELECT * FROM mseed_files_reg(ticket, table_idx); */
        int t;
        long ticket = (long) tc;
-       mvc *m = NULL;
        str msg;
        for(t = 0; t < tc->num_tables; t++)
@@ -484,16 +501,6 @@ str insert_into_vault(Client cntxt, MalB
-       if((msg = getSQLContext(cntxt, mb, &m, NULL))!= MAL_SUCCEED)
-       {/* getting mvc failed, what to do */
-               return msg;
-       }
-       if(mvc_commit(m, 0, NULL) < 0)
-       {/* committing failed */
-               throw(MAL,"registrar.insert_into_vault", "committing failed\n");
-       }
        return MAL_SUCCEED;
@@ -547,6 +554,7 @@ str mseed_register(str file_path, temp_c
        MSRecord *msr = NULL;
+       MSFileParam *msfp = NULL;
        int retcode;
        short int verbose = 1;
        BAT *aBAT = NULL;
@@ -557,7 +565,8 @@ str mseed_register(str file_path, temp_c
        str ch = (str) GDKmalloc(2*sizeof(char));
        ch[1] = '\0';
-       while ((retcode = ms_readmsr (&msr, file_path, 0, NULL, NULL, 1, 0, 
verbose)) == MS_NOERROR)
+       /* while ((retcode = ms_readmsr (&msr, file_path, 0, NULL, NULL, 1, 0, 
verbose)) == MS_NOERROR) */
+       while ((retcode = ms_readmsr_r (&msfp, &msr, file_path, 0, NULL, NULL, 
1, 0, verbose)) == MS_NOERROR)
@@ -647,8 +656,11 @@ str mseed_register(str file_path, temp_c
+       GDKfree(ch);
        /* Cleanup memory and close file */
-       ms_readmsr (&msr, NULL, 0, NULL, NULL, 0, 0, 0);
+       /* ms_readmsr (&msr, NULL, 0, NULL, NULL, 0, 0, 0); */
+       ms_readmsr_r (&msfp, &msr, NULL, 0, NULL, NULL, 0, 0, 0);
        if ( retcode != MS_ENDOFFILE )
                throw(MAL, "mseed_register", "Cannot read %s: %s\n", file_path, 
@@ -671,6 +683,7 @@ str mseed_register_and_mount(str file_pa
        MSRecord *msr = NULL;
+       MSFileParam *msfp = NULL;
        int retcode;
        short int verbose = 1;
        short int data_flag = 1;
@@ -684,7 +697,8 @@ str mseed_register_and_mount(str file_pa
        str ch = (str) GDKmalloc(2*sizeof(char));
        ch[1] = '\0';
-       while ((retcode = ms_readmsr (&msr, file_path, 0, NULL, NULL, 1, 
data_flag, verbose)) == MS_NOERROR)
+       /* while ((retcode = ms_readmsr (&msr, file_path, 0, NULL, NULL, 1, 
data_flag, verbose)) == MS_NOERROR) */
+       while ((retcode = ms_readmsr_r (&msfp, &msr, file_path, 0, NULL, NULL, 
1, data_flag, verbose)) == MS_NOERROR)
@@ -814,7 +828,8 @@ str mseed_register_and_mount(str file_pa
        /* Cleanup memory and close file */
-       ms_readmsr (&msr, NULL, 0, NULL, NULL, 0, 0, 0);
+       /* ms_readmsr (&msr, NULL, 0, NULL, NULL, 0, 0, 0); */
+       ms_readmsr_r (&msfp, &msr, NULL, 0, NULL, NULL, 0, 0, 0);
        if ( retcode != MS_ENDOFFILE )
                throw(MAL, "mseed_register", "Cannot read %s: %s\n", file_path, 
@@ -822,6 +837,90 @@ str mseed_register_and_mount(str file_pa
        return MAL_SUCCEED;
+void *register_files(void *args)
+       temp_container *tc;
+       long i;
+       str err = NULL;
+       long start, finish;
+       thread_argv targv = *((thread_argv*)args);
+       /* create temp_container */
+       tc = (temp_container*)GDKmalloc(sizeof(temp_container));
+       assert(tc != NULL);
+       if(targv.mode == 0)
+               err = mseed_create_temp_container(tc); /* depending on design 
can get different argument(s) */
+       else
+               err = mseed_create_temp_container_with_data_tables(tc); /* 
depending on design can get different argument(s) */
+       if(err != MAL_SUCCEED)
+       {/* temp_container creation failed, what to do */
+               throw(MAL,"registrar.register_repo", "temp_container creation 
failed in thread %d: %s\n", targv.tid, err);
+       }
+       start = GDKms();
+       /* loop through the file_paths in repo */
+       if(targv.mode == 0)
+       {
+               for(i = targv.loop_start; i < targv.loop_end; i++)
+               {
+                       err = mseed_register(targv.file_paths[i], tc);
+                       if(err != MAL_SUCCEED)
+                       {/* current file cannot be registered, what to do */
+                               /*throw(MAL,"registrar.register_repo", "Current 
file cannot be registered: %s\n", err); */
+                               printf("registrar.register_repo: current file 
cannot be registered in thread %d: %s\n", targv.tid, err);
+                       }
+               }
+       }
+       else
+       {
+               for(i = targv.loop_start; i < targv.loop_end; i++)
+               {
+                       err = mseed_register_and_mount(targv.file_paths[i], tc);
+                       if(err != MAL_SUCCEED)
+                       {/* current file cannot be registered, what to do */
+                               /* throw(MAL,"registrar.register_repo", 
"Current file cannot be registered: %s\n", err); */
+                               printf("registrar.register_repo: current file 
cannot be registered and/or mounted in thread %d: %s\n", targv.tid, err);
+                       }
+               }
+       }
+       finish = GDKms();
+       printf("# In thread %d, time for extraction and transformation of 
(meta-)data: %ld milliseconds\n", targv.tid, finish - start);
+       pthread_mutex_lock(&create_lock);
+       if(*targv.function_created == 0)
+       {
+               /* prepare sql functions for inserting temp_container into 
tables_to_be_filled */
+               err = prepare_insertion(targv.cntxt, tc);
+               if(err != MAL_SUCCEED)
+               {/* preparing the insertion failed, what to do */
+                       throw(MAL,"registrar.register_repo", "Insertion prepare 
failed in thread %d: %s\n", targv.tid, err);
+               }
+               *targv.function_created = 1;
+       }
+       pthread_mutex_unlock(&create_lock);
+       pthread_mutex_lock(&insert_lock);
+       start = GDKms();
+       /* insert temp_container into tables_to_be_filled */
+       err = insert_into_vault(targv.cntxt, tc);
+       if(err != MAL_SUCCEED)
+       {/* inserting the temp_container into one of the tables failed, what to 
do */
+               throw(MAL,"registrar.register_repo", "Inserting the 
temp_container into one of the tables failed in thread %d: %s\n", targv.tid, 
+       }
+       finish = GDKms();
+       printf("# In thread %d, time for loading of (meta-)data: %ld 
milliseconds\n", targv.tid, finish - start);
+       pthread_mutex_unlock(&insert_lock);
+       err = register_clean_up(tc);
+       if(err != MAL_SUCCEED)
+       {/* inserting the temp_container into one of the tables failed, what to 
do */
+       throw(MAL,"registrar.register_repo", "Cleaning up the temp_container 
failed in thread %d: %s\n", targv.tid, err);
+       }
+       return NULL;
  * takes a repository path repo_path, finds out the files in it, creates a
  * temp_container of the metadata to be inserted, for each file calls the
@@ -837,12 +936,15 @@ str register_repo(Client cntxt, MalBlkPt
        str *repo_path = (str*) getArgReference(stk,pci,pci->retc); /* arg 1: 
repo_path */
        int mode = *(int*) getArgReference(stk,pci,pci->retc+1); /* arg 2: mode 
0:register only, mode 1: register+mount */
+       int num_threads = *(int*) getArgReference(stk,pci,pci->retc+2); /* arg 
3: 1: no threads, >1: multi-threaded */
        str *file_paths = NULL;
        long num_file_paths;
        temp_container *tc;
        long i;
        str err = NULL;
        long start, finish;
+       int function_created = 0;
+       mvc *m = NULL;
        /* fetch file_paths from repo_path */
        num_file_paths = get_file_paths(*repo_path, &file_paths);
@@ -851,70 +953,129 @@ str register_repo(Client cntxt, MalBlkPt
                throw(MAL,"registrar.register_repo", "Problematic repository: 
%s\n", err);
-       /* create temp_container */
-       tc = (temp_container*)GDKmalloc(sizeof(temp_container));
-       assert(tc != NULL);
-       if(mode == 0)
-               err = mseed_create_temp_container(tc); /* depending on design 
can get different argument(s) */
-       else
-               err = mseed_create_temp_container_with_data_tables(tc); /* 
depending on design can get different argument(s) */
-       if(err != MAL_SUCCEED)
-       {/* temp_container creation failed, what to do */
-               throw(MAL,"registrar.register_repo", "temp_container creation 
failed: %s\n", err);
-       }
-       start = GDKms();
-       /* loop through the file_paths in repo */
-       if(mode == 0)
+       if(num_threads > 1)
-               for(i = 0; i < num_file_paths; i++)
+               /* multi-threaded */
+               long loop_start = 0;
+               long num_file_paths_per_thread = num_file_paths / num_threads;
+               pthread_t *threads = 
+               thread_argv *targvs = 
+               int j;
+               if (pthread_mutex_init(&insert_lock, NULL) != 0 || 
pthread_mutex_init(&create_lock, NULL) != 0)
-                       err = mseed_register(file_paths[i], tc);
-                       if(err != MAL_SUCCEED)
-                       {/* current file cannot be registered, what to do */
-       /*                      throw(MAL,"registrar.register_repo", "Current 
file cannot be registered: %s\n", err); */
-                               printf("registrar.register_repo: current file 
cannot be registered: %s\n", err);
-                       }
+                       throw(MAL,"registrar.register_repo", "mutex init 
+               for(j = 0; j < num_threads; j++)
+               {
checkin-list mailing list

Reply via email to