Changeset: 83e964c1508b for MonetDB URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=83e964c1508b Modified Files: monetdb5/extras/rdf/rdf.h monetdb5/extras/rdf/rdfalgebra.c sql/backends/monet5/sql_rdf.c Branch: rdf Log Message:
Implement multi-way merging algorithm for merging BATs of object values for each property (Slicing from PSO table) diffs (196 lines): diff --git a/monetdb5/extras/rdf/rdf.h b/monetdb5/extras/rdf/rdf.h --- a/monetdb5/extras/rdf/rdf.h +++ b/monetdb5/extras/rdf/rdf.h @@ -50,6 +50,9 @@ RDFParser(BAT **graph, str *location, st rdf_export str RDFleftfetchjoin_sorted(bat *result, const bat* lid, const bat *rid); +rdf_export str +RDFmultiway_merge_outerjoins(int np, BAT **sbats, BAT **obats, BAT **r_sbat, BAT **r_obats); + rdf_export str TKNZRrdf2str (bat *res, const bat *bid, const bat *map); diff --git a/monetdb5/extras/rdf/rdfalgebra.c b/monetdb5/extras/rdf/rdfalgebra.c --- a/monetdb5/extras/rdf/rdfalgebra.c +++ b/monetdb5/extras/rdf/rdfalgebra.c @@ -21,8 +21,8 @@ #include "monetdb_config.h" #include "rdf.h" +#include "rdfminheap.h" #include "algebra.h" -#include <gdk.h> #include "tokenizer.h" str @@ -201,6 +201,118 @@ str RDFtriplesubsort(BAT **sbat, BAT **p return MAL_SUCCEED; } +/* This function RDFmultiway_merge_outerjoins() + * is used to create full outer join from multiple + * Input: + * - np: Number of properties --> == number of obats, number of sbats + * - Set of pair of bats corresponding a slice of PSO with certain P value + * [bat_s1, bat_o1], [bat_s2, bat_o2],....,[bat_sn, bat_on] + * - All bat_si are sorted + * Output: + * bat_s, bat_o1_new, bat_o2_new, ..., bat_on_new + * Where bat_s is union of all bat_s1, ..., bat_sn + * + * Use a minheap to merge multiple list + * */ +str RDFmultiway_merge_outerjoins(int np, BAT **sbats, BAT **obats, BAT **r_sbat, BAT **r_obats){ + BUN estimate = 0; + int i = 0; + MinHeap *hp; + MinHeapNode *harr; + oid **sbatCursors, **obatCursors; + int numMergedS = 0; + oid lastS = BUN_NONE; + oid tmpO; + + for (i = 0; i < np; i++){ + estimate += BATcount(obats[i]); + } + + sbatCursors = (oid **) malloc(sizeof(oid*) * np); + obatCursors = (oid **) malloc(sizeof(oid*) * np); + + *r_sbat = BATnew(TYPE_void, TYPE_oid, estimate, TRANSIENT); + + for (i = 0; i < np; i++){ + r_obats[i] = BATnew(TYPE_void, TYPE_oid, estimate, TRANSIENT); + + //Keep the cursor to the first element of each input sbats + sbatCursors[i] = (oid *) Tloc(sbats[i], BUNfirst(sbats[i])); + obatCursors[i] = (oid *) Tloc(obats[i], BUNfirst(obats[i])); + } + + //Create a min heap with np heap nodes. Every heap node + //has first element of an array (pointing to the first element of each sbat) + harr = (MinHeapNode*)malloc(sizeof(MinHeapNode) * np); + for (i = 0; i < np; i++){ + harr[i].element = sbatCursors[i][0]; //Store the first element + harr[i].i = i; //index of array + harr[i].j = 1; //Index of next element to be stored from array + } + + hp = (MinHeap *) malloc(sizeof(MinHeap)); + initMinHeap(hp, harr, np); //Create the heap + + //Now one by one get the minimum element from min + //heap and replace it with next element of its array + numMergedS = 0; //Number of S in the output BAT + while (1){ + //Get the minimum element and store it in output + MinHeapNode root = getMin(hp); + if (root.element == INT_MAX) break; + + if (lastS != root.element){ //New S + + //Go through all output o_bat to add Null value + //if they do not value for the last S + for (i = 0; i < np; i++){ + if (BATcount(r_obats[i]) < (BUN)numMergedS) + BUNappend(r_obats[i], ATOMnilptr(TYPE_oid), TRUE); + } + + //Append new s to output sbat + BUNappend(*r_sbat, &(root.element), TRUE); + //Append the obat corresonding to this root node + tmpO = obatCursors[root.i][root.j - 1]; + BUNappend(r_obats[root.i], &tmpO, TRUE); + + lastS = root.element; + (numMergedS)++; + } + else{ + //Get element from the corresponding o + //Add to the output o + tmpO = obatCursors[root.i][root.j - 1]; + BUNappend(r_obats[root.i], &tmpO, TRUE); + } + + //Find the next elelement that will replace current + //root of heap. The next element belongs to same + //array as the current root. + if (root.j < (int) BATcount(sbats[root.i])) + { + root.element = sbatCursors[root.i][root.j]; + root.j += 1; + } + //If root was the last element of its array + else root.element = INT_MAX; //INT_MAX is for infinite + + //Replace root with next element of array + replaceMin(hp, root); + } + + for (i = 0; i < np; i++){ + if (BATcount(r_obats[i]) < (BUN)numMergedS) + BUNappend(r_obats[i], ATOMnilptr(TYPE_oid), TRUE); + } + + free(hp); + free(harr); + free(sbatCursors); + free(obatCursors); + return MAL_SUCCEED; +} + /* * Sort left bat and re-order right bat according to the lef bat * */ diff --git a/sql/backends/monet5/sql_rdf.c b/sql/backends/monet5/sql_rdf.c --- a/sql/backends/monet5/sql_rdf.c +++ b/sql/backends/monet5/sql_rdf.c @@ -1757,10 +1757,10 @@ void getSlides_per_P(PsoPropStat *pso_ps getOffsets(pso_pstat, p, &l, &h); - *ret_oBat = BATslice(obat, l, h); + *ret_oBat = BATslice(obat, l, h+1); - *ret_sBat = BATslice(sbat, l, h); - + *ret_sBat = BATslice(sbat, l, h+1); + (*ret_sBat)->tsorted = true; } @@ -1801,13 +1801,27 @@ void build_PsoPropStat(BAT *full_pbat, i BATprint(pso_propstat->offsetBat); { - BAT *obat, *sbat; - oid p = 100; + BAT **obats, **sbats, *r_sbat, **r_obats; + int i; + int np = 5; - getSlides_per_P(pso_propstat, &p, full_obat, full_sbat, &obat, &sbat); - printf("Slice of P = "BUNFMT "\n", p); - BATprint(sbat); - BATprint(obat); + oid props[5] = {100, 200, 400, 500, 700} ; + obats = (BAT**)malloc(sizeof(BAT*) * np); + sbats = (BAT**)malloc(sizeof(BAT*) * np); + r_obats = (BAT**)malloc(sizeof(BAT*) * np); + + for (i = 0; i < np; i++){ + getSlides_per_P(pso_propstat, &(props[i]),full_obat, full_sbat, &(obats[i]), &(sbats[i])); + } + + RDFmultiway_merge_outerjoins(np, sbats, obats, &r_sbat, r_obats); + + printf("Outer join result: \n"); + BATprint(r_sbat); + for (i = 0; i < np; i++){ + BATprint(r_obats[i]); + } + } } _______________________________________________ checkin-list mailing list checkin-list@monetdb.org https://www.monetdb.org/mailman/listinfo/checkin-list