Changeset: 2e254b0c071f for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=2e254b0c071f
Modified Files:
        sql/backends/monet5/bam/bam_loader.c
        sql/backends/monet5/bam/bam_wrapper.c
        sql/backends/monet5/bam/bam_wrapper.h
Branch: bamloader
Log Message:

Delay opening of file handles to the point where they are really needed, to 
prevent opening all file handles on beginning. On my machine this starts to 
fail if trying to import > 23 files at once, since for every file tens of file 
handlers to binary files are opened.


diffs (truncated from 378 to 300 lines):

diff --git a/sql/backends/monet5/bam/bam_loader.c 
b/sql/backends/monet5/bam/bam_loader.c
--- a/sql/backends/monet5/bam/bam_loader.c
+++ b/sql/backends/monet5/bam/bam_loader.c
@@ -209,10 +209,9 @@ bam_loader(Client cntxt, MalBlkPtr mb, s
                goto cleanup;
        }
        if (nr_threads <= 0) {
-               msg = createException(MAL, "bam_loader",
-                                         "We can not get the work done with 
only %d threads",
-                                         nr_threads);
-               goto cleanup;
+               nr_threads = 1;
+       } else if(nr_threads > 4) {
+               nr_threads = 4;
        }
 
        /* Get SQL context */
@@ -381,7 +380,6 @@ bam_loader(Client cntxt, MalBlkPtr mb, s
        TO_LOG("<bam_loader> Copying data into DB...\n");
        /* All threads finished succesfully, copy all data into DB */
        for (i = 0; i < nr_files; ++i) {
-               prepare_for_copy(bws + i);
                if ((msg = copy_into_db(cntxt, bws + i)) != MAL_SUCCEED) {
                        goto cleanup;
                }
diff --git a/sql/backends/monet5/bam/bam_wrapper.c 
b/sql/backends/monet5/bam/bam_wrapper.c
--- a/sql/backends/monet5/bam/bam_wrapper.c
+++ b/sql/backends/monet5/bam/bam_wrapper.c
@@ -99,6 +99,44 @@ init_bam_wrapper(bam_wrapper * bw, filet
                          file_location, flushdir, strerror(errno));
        }
 
+       // Construct all the file paths to the binary files
+       for (i = 0; i < 6; ++i) {
+               snprintf(bw->fp_files[i], BW_FP_BUF_SIZE, "%s/files_%d",
+                        flushdir, i);
+       }
+       for (i = 0; i < 7; ++i) {
+               snprintf(bw->fp_sq[i], BW_FP_BUF_SIZE, "%s/sq_%d", flushdir,
+                        i);
+       }
+       for (i = 0; i < 13; ++i) {
+               snprintf(bw->fp_rg[i], BW_FP_BUF_SIZE, "%s/rg_%d", flushdir,
+                        i);
+       }
+       for (i = 0; i < 6; ++i) {
+               snprintf(bw->fp_pg[i], BW_FP_BUF_SIZE, "%s/pg_%d", flushdir,
+                        i);
+       }
+       for (i = 0; i < 12; ++i) {
+               snprintf(bw->fp_alignments[i], BW_FP_BUF_SIZE,
+                        "%s/alignments_%d", flushdir, i);
+       }
+       for (i = 0; i < 4; ++i) {
+               snprintf(bw->fp_alignments_extra[i], BW_FP_BUF_SIZE,
+                        "%s/alignments_extra_%d", flushdir, i);
+       }
+       if (dbschema == 1) {
+               for (i = 0; i < 23; ++i) {
+                       snprintf(bw->fp_alignments_paired_primary[i],
+                                BW_FP_BUF_SIZE,
+                                "%s/alignments_paired_primary_%d", flushdir,
+                                i);
+                       snprintf(bw->fp_alignments_paired_secondary[i],
+                                BW_FP_BUF_SIZE,
+                                "%s/alignments_paired_secondary_%d",
+                                flushdir, i);
+               }
+       }
+
        if (type == BAM) {
                /* Open BAM file and read its header */
                if ((bw->bam.input = bam_open(file_location, "r")) == NULL) {
@@ -184,105 +222,102 @@ init_bam_wrapper(bam_wrapper * bw, filet
        bw->cnt_alignments_paired_secondary = 0;
        bw->cnt_alignments_total = 0;
 
+
+       return MAL_SUCCEED;
+}
+
+
+static str 
+open_header_write_streams(bam_wrapper * bw)
+{
+       int i;
+
        for (i = 0; i < 6; ++i) {
-               snprintf(bw->fp_files[i], BW_FP_BUF_SIZE, "%s/files_%d",
-                        flushdir, i);
                if ((bw->files[i] = bsopen(bw->fp_files[i])) == NULL) {
                        throw(MAL, "init_bam_wrapper",
                                  ERR_INIT_BAM_WRAPPER
                                  "Binary file '%s' could not be opened",
-                                 file_location, bw->fp_files[i]);
+                                 bw->file_location, bw->fp_files[i]);
                }
        }
        for (i = 0; i < 7; ++i) {
-               snprintf(bw->fp_sq[i], BW_FP_BUF_SIZE, "%s/sq_%d", flushdir,
-                        i);
                if ((bw->sq[i] = bsopen(bw->fp_sq[i])) == NULL) {
                        throw(MAL, "init_bam_wrapper",
                                  ERR_INIT_BAM_WRAPPER
                                  "Binary file '%s' could not be opened",
-                                 file_location, bw->fp_sq[i]);
+                                 bw->file_location, bw->fp_sq[i]);
                }
        }
        for (i = 0; i < 13; ++i) {
-               snprintf(bw->fp_rg[i], BW_FP_BUF_SIZE, "%s/rg_%d", flushdir,
-                        i);
                if ((bw->rg[i] = bsopen(bw->fp_rg[i])) == NULL) {
                        throw(MAL, "init_bam_wrapper",
                                  ERR_INIT_BAM_WRAPPER
                                  "Binary file '%s' could not be opened",
-                                 file_location, bw->fp_rg[i]);
+                                 bw->file_location, bw->fp_rg[i]);
                }
        }
        for (i = 0; i < 6; ++i) {
-               snprintf(bw->fp_pg[i], BW_FP_BUF_SIZE, "%s/pg_%d", flushdir,
-                        i);
                if ((bw->pg[i] = bsopen(bw->fp_pg[i])) == NULL) {
                        throw(MAL, "init_bam_wrapper",
                                  ERR_INIT_BAM_WRAPPER
                                  "Binary file '%s' could not be opened",
-                                 file_location, bw->fp_pg[i]);
+                                 bw->file_location, bw->fp_pg[i]);
                }
        }
+       return MAL_SUCCEED;
+}
+
+
+static str
+open_alignment_write_streams(bam_wrapper * bw) {
+       int i;
+
        for (i = 0; i < 12; ++i) {
-               snprintf(bw->fp_alignments[i], BW_FP_BUF_SIZE,
-                        "%s/alignments_%d", flushdir, i);
                if ((bw->alignments[i] =
                         bsopen(bw->fp_alignments[i])) == NULL) {
                        throw(MAL, "init_bam_wrapper",
                                  ERR_INIT_BAM_WRAPPER
                                  "Binary file '%s' could not be opened",
-                                 file_location, bw->fp_alignments[i]);
+                                 bw->file_location, bw->fp_alignments[i]);
                }
        }
        for (i = 0; i < 4; ++i) {
-               snprintf(bw->fp_alignments_extra[i], BW_FP_BUF_SIZE,
-                        "%s/alignments_extra_%d", flushdir, i);
                if ((bw->alignments_extra[i] =
                         bsopen(bw->fp_alignments_extra[i])) == NULL) {
                        throw(MAL, "init_bam_wrapper",
                                  ERR_INIT_BAM_WRAPPER
                                  "Binary file '%s' could not be opened",
-                                 file_location, bw->fp_alignments_extra[i]);
+                                 bw->file_location, 
bw->fp_alignments_extra[i]);
                }
        }
-       if (dbschema == 1) {
+       if (bw->dbschema == 1) {
                for (i = 0; i < 23; ++i) {
-                       snprintf(bw->fp_alignments_paired_primary[i],
-                                BW_FP_BUF_SIZE,
-                                "%s/alignments_paired_primary_%d", flushdir,
-                                i);
                        if ((bw->alignments_paired_primary[i] =
                                 bsopen(bw->fp_alignments_paired_primary[i])) ==
                                NULL) {
                                throw(MAL, "init_bam_wrapper",
                                          ERR_INIT_BAM_WRAPPER
                                          "Binary file '%s' could not be 
opened",
-                                         file_location,
+                                         bw->file_location,
                                          bw->fp_alignments_paired_primary[i]);
                        }
-                       snprintf(bw->fp_alignments_paired_secondary[i],
-                                BW_FP_BUF_SIZE,
-                                "%s/alignments_paired_secondary_%d",
-                                flushdir, i);
                        if ((bw->alignments_paired_secondary[i] =
                                 bsopen(bw->fp_alignments_paired_secondary[i])) 
==
                                NULL) {
                                throw(MAL, "init_bam_wrapper",
                                          ERR_INIT_BAM_WRAPPER
                                          "Binary file '%s' could not be 
opened",
-                                         file_location,
+                                         bw->file_location,
                                          
bw->fp_alignments_paired_secondary[i]);
                        }
                }
        }
-
        return MAL_SUCCEED;
 }
 
+
 static void
-close_write_streams(bam_wrapper * bw, bit unlink_files)
-{
+close_header_write_streams(bam_wrapper * bw) {
        int i;
 
        for (i = 0; i < 6; ++i) {
@@ -292,48 +327,43 @@ close_write_streams(bam_wrapper * bw, bi
                         * close_streams gets called again */
                        bw->files[i] = NULL;
                }
-               if (unlink_files)
-                       unlink(bw->fp_files[i]);
        }
        for (i = 0; i < 7; ++i) {
                if (bw->sq[i]) {
                        close_stream(bw->sq[i]);
                        bw->sq[i] = NULL;
                }
-               if (unlink_files)
-                       unlink(bw->fp_sq[i]);
        }
        for (i = 0; i < 13; ++i) {
                if (bw->rg[i]) {
                        close_stream(bw->rg[i]);
                        bw->rg[i] = NULL;
                }
-               if (unlink_files)
-                       unlink(bw->fp_rg[i]);
        }
        for (i = 0; i < 6; ++i) {
                if (bw->pg[i]) {
                        close_stream(bw->pg[i]);
                        bw->pg[i] = NULL;
                }
-               if (unlink_files)
-                       unlink(bw->fp_pg[i]);
        }
+}
+
+
+static void
+close_alignment_write_streams(bam_wrapper * bw) {
+       int i;
+
        for (i = 0; i < 12; ++i) {
                if (bw->alignments[i]) {
                        close_stream(bw->alignments[i]);
                        bw->alignments[i] = NULL;
                }
-               if (unlink_files)
-                       unlink(bw->fp_alignments[i]);
        }
        for (i = 0; i < 4; ++i) {
                if (bw->alignments_extra[i]) {
                        close_stream(bw->alignments_extra[i]);
                        bw->alignments_extra[i] = NULL;
                }
-               if (unlink_files)
-                       unlink(bw->fp_alignments_extra[i]);
        }
        if (bw->dbschema == 1) {
                for (i = 0; i < 23; ++i) {
@@ -347,24 +377,15 @@ close_write_streams(bam_wrapper * bw, bi
                                                 
alignments_paired_secondary[i]);
                                bw->alignments_paired_secondary[i] = NULL;
                        }
-                       if (unlink_files) {
-                               unlink(bw->fp_alignments_paired_primary[i]);
-                               unlink(bw->fp_alignments_paired_primary[i]);
-                       }
                }
        }
 }
 
 void
-prepare_for_copy(bam_wrapper * bw)
-{
-       close_write_streams(bw, FALSE);
-}
-
-void
 clear_bam_wrapper(bam_wrapper * bw)
 {
        char flushdir[128];
+       int i;
 
        /* Clear bam/sam specific fields */
        if (bw->type == BAM) {
@@ -383,8 +404,35 @@ clear_bam_wrapper(bam_wrapper * bw)
                }
        }
 
-       /* Close file streams and remove files */
-       close_write_streams(bw, TRUE);
+       /* Close file streams if this was not done yet */
+       close_header_write_streams(bw);
+       close_alignment_write_streams(bw);
+
_______________________________________________
checkin-list mailing list
checkin-list@monetdb.org
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to