Changeset: 2e254b0c071f for MonetDB URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=2e254b0c071f Modified Files: sql/backends/monet5/bam/bam_loader.c sql/backends/monet5/bam/bam_wrapper.c sql/backends/monet5/bam/bam_wrapper.h Branch: bamloader Log Message:
Delay opening of file handles to the point where they are really needed, to prevent opening all file handles on beginning. On my machine this starts to fail if trying to import > 23 files at once, since for every file tens of file handlers to binary files are opened. diffs (truncated from 378 to 300 lines): diff --git a/sql/backends/monet5/bam/bam_loader.c b/sql/backends/monet5/bam/bam_loader.c --- a/sql/backends/monet5/bam/bam_loader.c +++ b/sql/backends/monet5/bam/bam_loader.c @@ -209,10 +209,9 @@ bam_loader(Client cntxt, MalBlkPtr mb, s goto cleanup; } if (nr_threads <= 0) { - msg = createException(MAL, "bam_loader", - "We can not get the work done with only %d threads", - nr_threads); - goto cleanup; + nr_threads = 1; + } else if(nr_threads > 4) { + nr_threads = 4; } /* Get SQL context */ @@ -381,7 +380,6 @@ bam_loader(Client cntxt, MalBlkPtr mb, s TO_LOG("<bam_loader> Copying data into DB...\n"); /* All threads finished succesfully, copy all data into DB */ for (i = 0; i < nr_files; ++i) { - prepare_for_copy(bws + i); if ((msg = copy_into_db(cntxt, bws + i)) != MAL_SUCCEED) { goto cleanup; } diff --git a/sql/backends/monet5/bam/bam_wrapper.c b/sql/backends/monet5/bam/bam_wrapper.c --- a/sql/backends/monet5/bam/bam_wrapper.c +++ b/sql/backends/monet5/bam/bam_wrapper.c @@ -99,6 +99,44 @@ init_bam_wrapper(bam_wrapper * bw, filet file_location, flushdir, strerror(errno)); } + // Construct all the file paths to the binary files + for (i = 0; i < 6; ++i) { + snprintf(bw->fp_files[i], BW_FP_BUF_SIZE, "%s/files_%d", + flushdir, i); + } + for (i = 0; i < 7; ++i) { + snprintf(bw->fp_sq[i], BW_FP_BUF_SIZE, "%s/sq_%d", flushdir, + i); + } + for (i = 0; i < 13; ++i) { + snprintf(bw->fp_rg[i], BW_FP_BUF_SIZE, "%s/rg_%d", flushdir, + i); + } + for (i = 0; i < 6; ++i) { + snprintf(bw->fp_pg[i], BW_FP_BUF_SIZE, "%s/pg_%d", flushdir, + i); + } + for (i = 0; i < 12; ++i) { + snprintf(bw->fp_alignments[i], BW_FP_BUF_SIZE, + "%s/alignments_%d", flushdir, i); + } + for (i = 0; i < 4; ++i) { + snprintf(bw->fp_alignments_extra[i], BW_FP_BUF_SIZE, + "%s/alignments_extra_%d", flushdir, i); + } + if (dbschema == 1) { + for (i = 0; i < 23; ++i) { + snprintf(bw->fp_alignments_paired_primary[i], + BW_FP_BUF_SIZE, + "%s/alignments_paired_primary_%d", flushdir, + i); + snprintf(bw->fp_alignments_paired_secondary[i], + BW_FP_BUF_SIZE, + "%s/alignments_paired_secondary_%d", + flushdir, i); + } + } + if (type == BAM) { /* Open BAM file and read its header */ if ((bw->bam.input = bam_open(file_location, "r")) == NULL) { @@ -184,105 +222,102 @@ init_bam_wrapper(bam_wrapper * bw, filet bw->cnt_alignments_paired_secondary = 0; bw->cnt_alignments_total = 0; + + return MAL_SUCCEED; +} + + +static str +open_header_write_streams(bam_wrapper * bw) +{ + int i; + for (i = 0; i < 6; ++i) { - snprintf(bw->fp_files[i], BW_FP_BUF_SIZE, "%s/files_%d", - flushdir, i); if ((bw->files[i] = bsopen(bw->fp_files[i])) == NULL) { throw(MAL, "init_bam_wrapper", ERR_INIT_BAM_WRAPPER "Binary file '%s' could not be opened", - file_location, bw->fp_files[i]); + bw->file_location, bw->fp_files[i]); } } for (i = 0; i < 7; ++i) { - snprintf(bw->fp_sq[i], BW_FP_BUF_SIZE, "%s/sq_%d", flushdir, - i); if ((bw->sq[i] = bsopen(bw->fp_sq[i])) == NULL) { throw(MAL, "init_bam_wrapper", ERR_INIT_BAM_WRAPPER "Binary file '%s' could not be opened", - file_location, bw->fp_sq[i]); + bw->file_location, bw->fp_sq[i]); } } for (i = 0; i < 13; ++i) { - snprintf(bw->fp_rg[i], BW_FP_BUF_SIZE, "%s/rg_%d", flushdir, - i); if ((bw->rg[i] = bsopen(bw->fp_rg[i])) == NULL) { throw(MAL, "init_bam_wrapper", ERR_INIT_BAM_WRAPPER "Binary file '%s' could not be opened", - file_location, bw->fp_rg[i]); + bw->file_location, bw->fp_rg[i]); } } for (i = 0; i < 6; ++i) { - snprintf(bw->fp_pg[i], BW_FP_BUF_SIZE, "%s/pg_%d", flushdir, - i); if ((bw->pg[i] = bsopen(bw->fp_pg[i])) == NULL) { throw(MAL, "init_bam_wrapper", ERR_INIT_BAM_WRAPPER "Binary file '%s' could not be opened", - file_location, bw->fp_pg[i]); + bw->file_location, bw->fp_pg[i]); } } + return MAL_SUCCEED; +} + + +static str +open_alignment_write_streams(bam_wrapper * bw) { + int i; + for (i = 0; i < 12; ++i) { - snprintf(bw->fp_alignments[i], BW_FP_BUF_SIZE, - "%s/alignments_%d", flushdir, i); if ((bw->alignments[i] = bsopen(bw->fp_alignments[i])) == NULL) { throw(MAL, "init_bam_wrapper", ERR_INIT_BAM_WRAPPER "Binary file '%s' could not be opened", - file_location, bw->fp_alignments[i]); + bw->file_location, bw->fp_alignments[i]); } } for (i = 0; i < 4; ++i) { - snprintf(bw->fp_alignments_extra[i], BW_FP_BUF_SIZE, - "%s/alignments_extra_%d", flushdir, i); if ((bw->alignments_extra[i] = bsopen(bw->fp_alignments_extra[i])) == NULL) { throw(MAL, "init_bam_wrapper", ERR_INIT_BAM_WRAPPER "Binary file '%s' could not be opened", - file_location, bw->fp_alignments_extra[i]); + bw->file_location, bw->fp_alignments_extra[i]); } } - if (dbschema == 1) { + if (bw->dbschema == 1) { for (i = 0; i < 23; ++i) { - snprintf(bw->fp_alignments_paired_primary[i], - BW_FP_BUF_SIZE, - "%s/alignments_paired_primary_%d", flushdir, - i); if ((bw->alignments_paired_primary[i] = bsopen(bw->fp_alignments_paired_primary[i])) == NULL) { throw(MAL, "init_bam_wrapper", ERR_INIT_BAM_WRAPPER "Binary file '%s' could not be opened", - file_location, + bw->file_location, bw->fp_alignments_paired_primary[i]); } - snprintf(bw->fp_alignments_paired_secondary[i], - BW_FP_BUF_SIZE, - "%s/alignments_paired_secondary_%d", - flushdir, i); if ((bw->alignments_paired_secondary[i] = bsopen(bw->fp_alignments_paired_secondary[i])) == NULL) { throw(MAL, "init_bam_wrapper", ERR_INIT_BAM_WRAPPER "Binary file '%s' could not be opened", - file_location, + bw->file_location, bw->fp_alignments_paired_secondary[i]); } } } - return MAL_SUCCEED; } + static void -close_write_streams(bam_wrapper * bw, bit unlink_files) -{ +close_header_write_streams(bam_wrapper * bw) { int i; for (i = 0; i < 6; ++i) { @@ -292,48 +327,43 @@ close_write_streams(bam_wrapper * bw, bi * close_streams gets called again */ bw->files[i] = NULL; } - if (unlink_files) - unlink(bw->fp_files[i]); } for (i = 0; i < 7; ++i) { if (bw->sq[i]) { close_stream(bw->sq[i]); bw->sq[i] = NULL; } - if (unlink_files) - unlink(bw->fp_sq[i]); } for (i = 0; i < 13; ++i) { if (bw->rg[i]) { close_stream(bw->rg[i]); bw->rg[i] = NULL; } - if (unlink_files) - unlink(bw->fp_rg[i]); } for (i = 0; i < 6; ++i) { if (bw->pg[i]) { close_stream(bw->pg[i]); bw->pg[i] = NULL; } - if (unlink_files) - unlink(bw->fp_pg[i]); } +} + + +static void +close_alignment_write_streams(bam_wrapper * bw) { + int i; + for (i = 0; i < 12; ++i) { if (bw->alignments[i]) { close_stream(bw->alignments[i]); bw->alignments[i] = NULL; } - if (unlink_files) - unlink(bw->fp_alignments[i]); } for (i = 0; i < 4; ++i) { if (bw->alignments_extra[i]) { close_stream(bw->alignments_extra[i]); bw->alignments_extra[i] = NULL; } - if (unlink_files) - unlink(bw->fp_alignments_extra[i]); } if (bw->dbschema == 1) { for (i = 0; i < 23; ++i) { @@ -347,24 +377,15 @@ close_write_streams(bam_wrapper * bw, bi alignments_paired_secondary[i]); bw->alignments_paired_secondary[i] = NULL; } - if (unlink_files) { - unlink(bw->fp_alignments_paired_primary[i]); - unlink(bw->fp_alignments_paired_primary[i]); - } } } } void -prepare_for_copy(bam_wrapper * bw) -{ - close_write_streams(bw, FALSE); -} - -void clear_bam_wrapper(bam_wrapper * bw) { char flushdir[128]; + int i; /* Clear bam/sam specific fields */ if (bw->type == BAM) { @@ -383,8 +404,35 @@ clear_bam_wrapper(bam_wrapper * bw) } } - /* Close file streams and remove files */ - close_write_streams(bw, TRUE); + /* Close file streams if this was not done yet */ + close_header_write_streams(bw); + close_alignment_write_streams(bw); + _______________________________________________ checkin-list mailing list checkin-list@monetdb.org https://www.monetdb.org/mailman/listinfo/checkin-list