Changeset: 83e964c1508b for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=83e964c1508b
Modified Files:
        monetdb5/extras/rdf/rdf.h
        monetdb5/extras/rdf/rdfalgebra.c
        sql/backends/monet5/sql_rdf.c
Branch: rdf
Log Message:

Implement multi-way merging algorithm for merging BATs of object values for 
each property (Slicing from PSO table)


diffs (196 lines):

diff --git a/monetdb5/extras/rdf/rdf.h b/monetdb5/extras/rdf/rdf.h
--- a/monetdb5/extras/rdf/rdf.h
+++ b/monetdb5/extras/rdf/rdf.h
@@ -50,6 +50,9 @@ RDFParser(BAT **graph, str *location, st
 rdf_export str 
 RDFleftfetchjoin_sorted(bat *result, const bat* lid, const bat *rid);
 
+rdf_export str
+RDFmultiway_merge_outerjoins(int np, BAT **sbats, BAT **obats, BAT **r_sbat, 
BAT **r_obats); 
+
 rdf_export str 
 TKNZRrdf2str (bat *res, const bat *bid, const bat *map);
 
diff --git a/monetdb5/extras/rdf/rdfalgebra.c b/monetdb5/extras/rdf/rdfalgebra.c
--- a/monetdb5/extras/rdf/rdfalgebra.c
+++ b/monetdb5/extras/rdf/rdfalgebra.c
@@ -21,8 +21,8 @@
 
 #include "monetdb_config.h"
 #include "rdf.h"
+#include "rdfminheap.h"
 #include "algebra.h"
-#include <gdk.h>
 #include "tokenizer.h"
 
 str
@@ -201,6 +201,118 @@ str RDFtriplesubsort(BAT **sbat, BAT **p
        return MAL_SUCCEED; 
 }
 
+/* This function RDFmultiway_merge_outerjoins()
+ * is used to create full outer join from multiple 
+ * Input: 
+ * - np: Number of properties --> == number of obats, number of sbats
+ * - Set of pair of bats corresponding a slice of PSO with certain P value
+ * [bat_s1, bat_o1], [bat_s2, bat_o2],....,[bat_sn, bat_on]
+ * - All bat_si are sorted 
+ * Output:
+ * bat_s, bat_o1_new, bat_o2_new, ..., bat_on_new
+ * Where bat_s is union of all bat_s1, ..., bat_sn
+ * 
+ * Use a minheap to merge multiple list
+ * */
+str RDFmultiway_merge_outerjoins(int np, BAT **sbats, BAT **obats, BAT 
**r_sbat, BAT **r_obats){
+       BUN estimate = 0; 
+       int i = 0; 
+       MinHeap *hp;
+       MinHeapNode *harr;
+       oid **sbatCursors, **obatCursors; 
+       int numMergedS = 0; 
+       oid lastS = BUN_NONE; 
+       oid tmpO; 
+
+       for (i = 0; i < np; i++){
+               estimate += BATcount(obats[i]); 
+       }
+
+       sbatCursors = (oid **) malloc(sizeof(oid*) * np); 
+       obatCursors = (oid **) malloc(sizeof(oid*) * np); 
+
+       *r_sbat = BATnew(TYPE_void, TYPE_oid, estimate, TRANSIENT); 
+       
+       for (i = 0; i < np; i++){
+               r_obats[i] = BATnew(TYPE_void, TYPE_oid, estimate, TRANSIENT); 
+
+               //Keep the cursor to the first element of each input sbats
+               sbatCursors[i] = (oid *) Tloc(sbats[i], BUNfirst(sbats[i]));
+               obatCursors[i] = (oid *) Tloc(obats[i], BUNfirst(obats[i]));    
+       }
+
+       //Create a min heap with np heap nodes.  Every heap node
+       //has first element of an array (pointing to the first element of each 
sbat)
+       harr = (MinHeapNode*)malloc(sizeof(MinHeapNode) * np);
+       for (i = 0; i < np; i++){
+               harr[i].element =  sbatCursors[i][0]; //Store the first element
+               harr[i].i = i; //index of array
+               harr[i].j = 1; //Index of next element to be stored from array
+       }
+
+       hp = (MinHeap *) malloc(sizeof(MinHeap)); 
+       initMinHeap(hp, harr, np);  //Create the heap
+
+       //Now one by one get the minimum element from min
+       //heap and replace it with next element of its array
+       numMergedS = 0;         //Number of S in the output BAT
+       while (1){
+               //Get the minimum element and store it in output
+               MinHeapNode root = getMin(hp);
+               if (root.element == INT_MAX) break; 
+               
+               if (lastS != root.element){             //New S
+                       
+                       //Go through all output o_bat to add Null value
+                       //if they do not value for the last S
+                       for (i = 0; i < np; i++){
+                               if (BATcount(r_obats[i]) < (BUN)numMergedS)     
+                                       BUNappend(r_obats[i], 
ATOMnilptr(TYPE_oid), TRUE); 
+                       }
+
+                       //Append new s to output sbat
+                       BUNappend(*r_sbat, &(root.element), TRUE); 
+                       //Append the obat corresonding to this root node 
+                       tmpO = obatCursors[root.i][root.j - 1]; 
+                       BUNappend(r_obats[root.i], &tmpO, TRUE); 
+                       
+                       lastS = root.element; 
+                       (numMergedS)++;
+               }
+               else{
+                       //Get element from the corresponding o
+                       //Add to the output o
+                       tmpO = obatCursors[root.i][root.j - 1];
+                       BUNappend(r_obats[root.i], &tmpO, TRUE);
+               }
+
+               //Find the next elelement that will replace current
+               //root of heap. The next element belongs to same
+               //array as the current root.
+               if (root.j < (int) BATcount(sbats[root.i]))
+               {
+                       root.element = sbatCursors[root.i][root.j];
+                       root.j += 1;
+               }
+               //If root was the last element of its array
+               else root.element =  INT_MAX; //INT_MAX is for infinite
+
+               //Replace root with next element of array
+               replaceMin(hp, root);
+       }
+       
+       for (i = 0; i < np; i++){
+               if (BATcount(r_obats[i]) < (BUN)numMergedS)     
+                       BUNappend(r_obats[i], ATOMnilptr(TYPE_oid), TRUE); 
+       }
+
+       free(hp); 
+       free(harr); 
+       free(sbatCursors); 
+       free(obatCursors); 
+       return MAL_SUCCEED;
+}
+
 /*
  * Sort left bat and re-order right bat according to the lef bat
  * */
diff --git a/sql/backends/monet5/sql_rdf.c b/sql/backends/monet5/sql_rdf.c
--- a/sql/backends/monet5/sql_rdf.c
+++ b/sql/backends/monet5/sql_rdf.c
@@ -1757,10 +1757,10 @@ void getSlides_per_P(PsoPropStat *pso_ps
        
        getOffsets(pso_pstat, p, &l, &h); 
 
-       *ret_oBat = BATslice(obat, l, h); 
+       *ret_oBat = BATslice(obat, l, h+1); 
 
-       *ret_sBat = BATslice(sbat, l, h); 
-
+       *ret_sBat = BATslice(sbat, l, h+1); 
+       (*ret_sBat)->tsorted = true; 
 
 }
 
@@ -1801,13 +1801,27 @@ void build_PsoPropStat(BAT *full_pbat, i
        BATprint(pso_propstat->offsetBat); 
        {
 
-               BAT *obat, *sbat; 
-               oid p = 100; 
+               BAT **obats, **sbats, *r_sbat, **r_obats; 
+               int i; 
+               int np = 5; 
 
-               getSlides_per_P(pso_propstat, &p, full_obat, full_sbat, &obat, 
&sbat); 
-               printf("Slice of P = "BUNFMT "\n", p);
-               BATprint(sbat);
-               BATprint(obat); 
+               oid props[5] = {100, 200, 400, 500, 700} ; 
+               obats = (BAT**)malloc(sizeof(BAT*) * np);
+               sbats = (BAT**)malloc(sizeof(BAT*) * np);
+               r_obats = (BAT**)malloc(sizeof(BAT*) * np);
+
+               for (i = 0; i < np; i++){
+                       getSlides_per_P(pso_propstat, &(props[i]),full_obat, 
full_sbat, &(obats[i]), &(sbats[i])); 
+               }
+
+               RDFmultiway_merge_outerjoins(np, sbats, obats, &r_sbat, 
r_obats);
+
+               printf("Outer join result: \n");
+               BATprint(r_sbat); 
+               for (i = 0; i < np; i++){
+                       BATprint(r_obats[i]); 
+               }
+               
        }
        
 }
_______________________________________________
checkin-list mailing list
checkin-list@monetdb.org
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to