Changeset: 5bb06da5a89d for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=5bb06da5a89d
Modified Files:
        gdk/Makefile.ag
        gdk/gdk.h
        monetdb5/modules/mal/arrange.c
Branch: leftmart
Log Message:

first take for ARNGmarge.

Merge with a min heap several oid ordered indexes.


diffs (275 lines):

diff --git a/gdk/Makefile.ag b/gdk/Makefile.ag
--- a/gdk/Makefile.ag
+++ b/gdk/Makefile.ag
@@ -27,6 +27,7 @@ lib_gdk = {
                gdk_calc.c gdk_calc.h gdk_calc_compare.h gdk_calc_private.h \
                gdk_aggr.c gdk_group.c gdk_mapreduce.c gdk_mapreduce.h \
                gdk_imprints.c gdk_imprints.h \
+               gdk_arngment.c \
                gdk_join.c gdk_join_legacy.c \
                gdk_unique.c \
                gdk_firstn.c \
diff --git a/gdk/gdk.h b/gdk/gdk.h
--- a/gdk/gdk.h
+++ b/gdk/gdk.h
@@ -2108,7 +2108,7 @@ gdk_export lng IMPSimprintsize(BAT *b);
  * The oid index arrangement.
  *
  */
-
+gdk_export gdk_return ARNGindex(BAT *b, BAT *a);
 
 /*
  * @- Multilevel Storage Modes
diff --git a/monetdb5/modules/mal/arrange.c b/monetdb5/modules/mal/arrange.c
--- a/monetdb5/modules/mal/arrange.c
+++ b/monetdb5/modules/mal/arrange.c
@@ -12,6 +12,7 @@
  */
 #include "monetdb_config.h"
 #include "arrange.h"
+#include "gdk.h"
 
 str
 ARNGcreate(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
@@ -123,12 +124,238 @@ ARNGcreate(Client cntxt, MalBlkPtr mb, M
        return msg;
 }
 
+
 str
 ARNGmerge(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
 {
+       bat bid;
+       BAT *b;
+       bat *aid;
+       BAT **a;
+       int i, j, n_ar;
+
        (void) cntxt;
        (void) mb;
-       (void) stk;
-       (void) pci;
+
+       assert(pci->argc > 2);
+       n_ar = pci->argc-2;
+
+       bid = *getArgReference_bat(stk, pci, 1);
+       b = BATdescriptor(bid);
+       if (b == NULL)
+               throw(MAL, "bat.arrange", RUNTIME_OBJECT_MISSING);
+       /* CHECK: types */
+       /* CHECK: if b is sorted call ARNGindex b with void,void */
+       /* CHECK: if b is view and parent has index do a range select */
+
+       aid = (bat *) GDKzalloc(n_ar*sizeof(bat));
+       a = (BAT **) GDKzalloc(n_ar*sizeof(BAT *));
+       /* CHECK: nulls*/
+       for (i = 0; i < n_ar; i++) {
+               aid[i] = *getArgReference_bat(stk, pci, i+2);
+               a[i] = BATdescriptor(aid[i]);
+               if (a[i] == NULL) {
+                       for (j = i-1; j >= 0; j--) {
+                               BBPunfix(aid[j]);
+                       }
+                       GDKfree(aid);
+                       GDKfree(a);
+                       BBPunfix(bid);
+                       throw(MAL, "bat.arrange", RUNTIME_OBJECT_MISSING);
+               }
+       }
+
+       if (n_ar == 1) {
+               /* One oid order bat, nothing to merge */
+               if (ARNGindex(b, a[0]) == GDK_FAIL) {
+                       /* TAKE STEPS*/
+               }
+       } else {
+               BAT *m; /* merged oid's */
+               oid *mv;
+               BUN m_sz;
+
+               for (i=0, m_sz = 0; i < n_ar; i++) {
+                       m_sz += BATcount(a[i]);
+               }
+               m = BATnew(TYPE_void, TYPE_oid, m_sz, TRANSIENT);
+               if (m == NULL) {
+                       /* CHECK: null and clean exit*/
+               }
+               mv = (oid *) Tloc(m, BUNfirst(m));
+
+               /* sort merge with 1 comparison per BUN */
+               if (n_ar == 2) {
+                       oid *p0, *p1, *q0, *q1;
+                       p0 = (oid *) Tloc(a[0], BUNfirst(a[0]));
+                       q0 = (oid *) Tloc(a[0], BUNlast(a[0]));
+                       p1 = (oid *) Tloc(a[1], BUNfirst(a[1]));
+                       q1 = (oid *) Tloc(a[1], BUNlast(a[1]));
+
+#define BINARY_MERGE(TYPE)                                                     
                                                \
+do {                                                                           
                                                                \
+       TYPE *v = (TYPE *) Tloc(b, BUNfirst(b));                                
                                \
+       for (; p0 < q0 && p1 < q1; ) {                                          
                                        \
+               if (v[*p0] < v[*p1]) {                                          
                                                \
+                       *mv++ = *p0++;                                          
                                                        \
+               } else {                                                        
                                                                \
+                       *mv++ = *p1++;                                          
                                                        \
+               }                                                               
                                                                        \
+       }                                                                       
                                                                        \
+       while (p0 < q0) {                                                       
                                                        \
+               *mv++ = *p0++;                                                  
                                                        \
+       }                                                                       
                                                                        \
+       while (p1 < q1) {                                                       
                                                        \
+               *mv++ = *p1++;                                                  
                                                        \
+       }                                                                       
                                                                        \
+} while(0)
+
+                       switch (ATOMbasetype(b->T->type)) {
+                       case TYPE_bte:
+                               BINARY_MERGE(bte);
+                               break;
+                       case TYPE_sht:
+                               BINARY_MERGE(sht);
+                               break;
+                       case TYPE_int:
+                               BINARY_MERGE(int);
+                               break;
+                       case TYPE_lng:
+                               BINARY_MERGE(lng);
+                               break;
+#ifdef HAVE_HGE
+                       case TYPE_hge:
+                               BINARY_MERGE(hge);
+                               break;
+#endif
+                       case TYPE_flt:
+                               BINARY_MERGE(flt);
+                               break;
+                       case TYPE_dbl:
+                               BINARY_MERGE(dbl);
+                               break;
+                       default:
+                               /* should never reach here */
+                               assert(0);
+                       }
+
+               /* use min-heap */
+               } else {
+                       oid **p, **q, *t_oid;
+
+                       p = (oid **) GDKzalloc(n_ar*sizeof(oid *));
+                       q = (oid **) GDKzalloc(n_ar*sizeof(oid *));
+                       /* CHECK null and clean exit */
+                       for (i = 0; i < n_ar; i++) {
+                               p[i] = (oid *) Tloc(a[i], BUNfirst(a[i]));
+                               q[i] = (oid *) Tloc(a[i], BUNlast(a[i]));
+                       }
+
+
+#define swap(X,Y,TMP)  (TMP)=(X);(X)=(Y);(Y)=(TMP)
+
+#define left_child(X)  (2*(X)+1)
+#define right_child(X) (2*(X)+2)
+
+#define HEAPIFY(X)                                                             
                                                        \
+do {                                                                           
                                                                \
+       int __cur, __min = X;                                                   
                                                \
+       do {                                                                    
                                                                \
+               __cur = __min;                                                  
                                                        \
+               if (left_child(__cur) < n_ar &&                                 
                                        \
+                       minhp[left_child(__cur)] < minhp[(__min)]) {            
                        \
+                       __min = left_child(__cur);                              
                                                \
+               }                                                               
                                                                        \
+               if (right_child(__cur) < n_ar &&                                
                                        \
+                       minhp[right_child(__cur)] < minhp[(__min)]) {           
                        \
+                       __min = right_child(__cur);                             
                                                \
+               }                                                               
                                                                        \
+               if (__min != __cur) {                                           
                                                \
+                       swap(minhp[__cur], minhp[__min], t);                    
                                \
+                       swap(p[__cur], p[__min], t_oid);                        
                                        \
+                       swap(q[__cur], q[__min], t_oid);                        
                                        \
+               }                                                               
                                                                        \
+       } while (__cur != __min);                                               
                                                \
+} while (0)
+
+#define NWAY_MERGE(TYPE)                                                       
                                                \
+do {                                                                           
                                                                \
+       TYPE *minhp;                                                            
                                                        \
+       TYPE t;                                                                 
                                                                \
+       TYPE *v = (TYPE *) Tloc(b, BUNfirst(b));                                
                                \
+       if ((minhp = (TYPE *) GDKzalloc(sizeof(TYPE)*n_ar)) == NULL) {          
        \
+               /* CLEAN exit */                                                
                                                        \
+       }                                                                       
                                                                        \
+       /* init min heap */                                                     
                                                        \
+       for (i = 0; i < n_ar; i++) {                                            
                                        \
+               minhp[i] = v[*p[i]];                                            
                                                \
+       }                                                                       
                                                                        \
+       for (i = n_ar/2; i >=0 ; i--) {                                         
                                        \
+               HEAPIFY(i);                                                     
                                                                \
+       }                                                                       
                                                                        \
+       /* merge */                                                             
                                                                \
+       while (n_ar > 1) {                                                      
                                                        \
+               *mv++ = *(p[0])++;                                              
                                                        \
+               if (p[0] < q[0]) {                                              
                                                        \
+                       minhp[0] = v[*p[0]];                                    
                                                \
+               } else {                                                        
                                                                \
+                       swap(minhp[0], minhp[n_ar], t);                         
                                        \
+                       swap(p[0], p[n_ar], t_oid);                             
                                                \
+                       swap(q[0], q[n_ar], t_oid);                             
                                                \
+                       n_ar--;                                                 
                                                                \
+               }                                                               
                                                                        \
+               HEAPIFY(0);                                                     
                                                                \
+       }                                                                       
                                                                        \
+       while (p[0] < q[0]) {                                                   
                                                \
+               *mv++ = *(p[0])++;                                              
                                                        \
+       }                                                                       
                                                                        \
+} while (0)
+
+                       switch (ATOMbasetype(b->T->type)) {
+                       case TYPE_bte:
+                               NWAY_MERGE(bte);
+                               break;
+                       case TYPE_sht:
+                               NWAY_MERGE(sht);
+                               break;
+                       case TYPE_int:
+                               NWAY_MERGE(int);
+                               break;
+                       case TYPE_lng:
+                               NWAY_MERGE(lng);
+                               break;
+#ifdef HAVE_HGE
+                       case TYPE_hge:
+                               NWAY_MERGE(hge);
+                               break;
+#endif
+                       case TYPE_flt:
+                               NWAY_MERGE(flt);
+                               break;
+                       case TYPE_dbl:
+                               NWAY_MERGE(dbl);
+                               break;
+                       default:
+                               /* should never reach here */
+                               assert(0);
+                       }
+               }
+               /* fix m properties */
+               if (ARNGindex(b, m) == GDK_FAIL) {
+                       /* CLEAN EXIT */
+               }
+       }
+
+       for (i = 0; i < n_ar; i++) {
+               BBPunfix(aid[i]);
+       }
+       GDKfree(aid);
+       GDKfree(a);
+       BBPunfix(bid);
        return MAL_SUCCEED;
+
+       /* arrange merge bats creating a new :bat[:oid:oid] *o
+        * feed *o to ARNGkeep(b, o);
+        * destroy *o
+        */
 }
_______________________________________________
checkin-list mailing list
checkin-list@monetdb.org
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to