Changeset: 17c614f1431c for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=17c614f1431c
Modified Files:
        gdk/gdk_join.c
Branch: default
Log Message:

Implemented new subjoin also for void tails.
Materialization of columns is not needed anymore.


diffs (truncated from 375 to 300 lines):

diff --git a/gdk/gdk_join.c b/gdk/gdk_join.c
--- a/gdk/gdk_join.c
+++ b/gdk/gdk_join.c
@@ -30,11 +30,7 @@ joinparamcheck(BAT *l, BAT *r, BAT *sl, 
                GDKerror("%s: inputs must have dense head.\n", func);
                return GDK_FAIL;
        }
-       if (l->ttype == TYPE_void || r->ttype == TYPE_void) {
-               GDKerror("%s: tail type must not be VOID.\n", func);
-               return GDK_FAIL;
-       }
-       if (l->ttype != r->ttype) {
+       if (ATOMtype(l->ttype) != ATOMtype(r->ttype)) {
                GDKerror("%s: inputs not compatible.\n", func);
                return GDK_FAIL;
        }
@@ -211,6 +207,8 @@ mergejoin(BAT *r1, BAT *r2, BAT *l, BAT 
        oid lv;
        BUN i;
        int lskipped = 0;       /* whether we skipped values in l */
+       wrd loff = 0, roff = 0;
+       oid lval = oid_nil, rval = oid_nil;
 
        ALGODEBUG fprintf(stderr, "#mergejoin(l=%s#" BUNFMT "[%s]%s%s,"
                          "r=%s#" BUNFMT "[%s]%s%s,sl=%s#" BUNFMT "%s%s,"
@@ -232,19 +230,15 @@ mergejoin(BAT *r1, BAT *r2, BAT *l, BAT 
 
        assert(BAThdense(l));
        assert(BAThdense(r));
-       assert(l->ttype != TYPE_void);
-       assert(r->ttype != TYPE_void);
-       assert(l->ttype == r->ttype);
+       assert(ATOMtype(l->ttype) == ATOMtype(r->ttype));
        assert(r->tsorted || r->trevsorted);
        assert(sl == NULL || sl->tsorted);
        assert(sr == NULL || sr->tsorted);
 
        CANDINIT(l, sl, lstart, lend, lcnt, lcand, lcandend);
        CANDINIT(r, sr, rstart, rend, rcnt, rcand, rcandend);
-       rcandorig = rcand;
-       rstartorig = rstart;
-       lvals = (const char *) Tloc(l, BUNfirst(l));
-       rvals = (const char *) Tloc(r, BUNfirst(r));
+       lvals = l->ttype == TYPE_void ? NULL : (const char *) Tloc(l, 
BUNfirst(l));
+       rvals = r->ttype == TYPE_void ? NULL : (const char *) Tloc(r, 
BUNfirst(r));
        if (l->tvarsized && l->ttype) {
                assert(r->tvarsized && r->ttype);
                lvars = l->T->vheap->base;
@@ -263,6 +257,29 @@ mergejoin(BAT *r1, BAT *r2, BAT *l, BAT 
                /* nothing to do: there are no matches */
                return GDK_SUCCEED;
        }
+       if (!nil_on_miss) {
+               /* if nil_on_miss is set we still have to return
+                * results */
+               if (!nil_matches &&
+                   ((l->ttype == TYPE_void && l->tseqbase == oid_nil) ||
+                    (r->ttype == TYPE_void && r->tseqbase == oid_nil))) {
+                       /* nothing to do: all values in either l or r
+                        * (or both) are nil, and we don't have to
+                        * match nil values */
+                       return GDK_SUCCEED;
+               }
+               if ((l->ttype == TYPE_void && l->tseqbase == oid_nil &&
+                    (r->T->nonil ||
+                     r->ttype != TYPE_void || r->tseqbase != oid_nil)) ||
+                   (r->ttype == TYPE_void && r->tseqbase == oid_nil &&
+                    (l->T->nonil ||
+                     l->ttype != TYPE_void || l->tseqbase != oid_nil))) {
+                       /* nothing to do: all values in l are nil, and
+                        * we can guarantee there are no nil values in
+                        * r, or vice versa */
+                       return GDK_SUCCEED;
+               }
+       }
 
        if (l->tsorted || l->trevsorted) {
                /* determine opportunistic scan window for l */
@@ -294,18 +311,65 @@ mergejoin(BAT *r1, BAT *r2, BAT *l, BAT 
             rscan++)
                nl >>= 1;
 
+       if (l->ttype == TYPE_void) {
+               if (lcand) {
+                       lstart = 0;
+                       lend = lcandend - lcand;
+                       lvals = (const char *) lcand;
+                       lcand = NULL;
+                       lwidth = SIZEOF_OID;
+               }
+               if (l->tseqbase == oid_nil)
+                       loff = wrd_nil;
+               else
+                       loff = (wrd) l->tseqbase - (wrd) l->hseqbase;
+       }
+       if (r->ttype == TYPE_void) {
+               if (rcand) {
+                       rstart = 0;
+                       rend = rcandend - rcand;
+                       rvals = (const char *) rcand;
+                       rcand = NULL;
+                       rwidth = SIZEOF_OID;
+               }
+               if (r->tseqbase == oid_nil)
+                       roff = wrd_nil;
+               else
+                       roff = (wrd) r->tseqbase - (wrd) r->hseqbase;
+       }
+       assert(lvals != NULL || lcand == NULL);
+       assert(rvals != NULL || rcand == NULL);
+
+       rcandorig = rcand;
+       rstartorig = rstart;
        while (lcand ? lcand < lcandend : lstart < lend) {
                if (!nil_on_miss && lscan > 0) {
                        /* if next value in r too far away (more than
                         * lscan from current position in l), use
                         * binary search on l to skip over
-                        * non-matching values */
-                       if (equal_order) {
-                               /* next value to match is first in r */
-                               v = VALUE(r, rcand ? rcand[0] - r->hseqbase : 
rstart);
+                        * non-matching values
+                        *
+                        * next value to match is first if equal_order
+                        * is set, last otherwise */
+                       if (rcand) {
+                               v = VALUE(r, (equal_order ? rcand[0] : 
rcandend[-1]) - r->hseqbase);
+                       } else if (rvals) {
+                               v = VALUE(r, equal_order ? rstart : rend - 1);
+                               if (roff == wrd_nil) {
+                                       rval = oid_nil;
+                                       v = (const char *) &rval;
+                               } else if (roff != 0) {
+                                       rval = * (const oid *) v + roff;
+                                       v = (const char *) &rval;
+                               }
                        } else {
-                               /* next value to match is last in r */
-                               v = VALUE(r, rcand ? rcandend[-1] - r->hseqbase 
: rend - 1);
+                               if (roff == wrd_nil)
+                                       rval = oid_nil;
+                               else if (equal_order)
+                                       rval = rstart + r->tseqbase;
+                               else
+                                       rval = rend - 1 + r->tseqbase;
+                               v = (const char *) &rval;
                        }
                        if (lcand) {
                                if (lscan < (BUN) (lcandend - lcand) &&
@@ -320,7 +384,7 @@ mergejoin(BAT *r1, BAT *r2, BAT *l, BAT 
                                                break;
                                        lskipped = BATcount(r1) > 0;
                                }
-                       } else {
+                       } else if (lvals) {
                                if (lscan < lend - lstart &&
                                    lordering * cmp(VALUE(l, lstart + lscan),
                                                    v) < 0) {
@@ -334,6 +398,25 @@ mergejoin(BAT *r1, BAT *r2, BAT *l, BAT 
                                                break;
                                        lskipped = BATcount(r1) > 0;
                                }
+                       } else if (*(const oid *)v != oid_nil) {
+                               if (l->tseqbase == oid_nil) {
+                                       /* there cannot be any more
+                                        * matches since r's next
+                                        * value is not nil and hence
+                                        * all other values in r are
+                                        * also not nil, and all
+                                        * values in l are nil */
+                                       lstart = lend;
+                                       break;
+                               }
+                               if (*(const oid *)v > l->tseqbase) {
+                                       BUN olstart = lstart;
+                                       lstart = *(const oid *)v - l->tseqbase;
+                                       if (lstart >= lend)
+                                               break;
+                                       if (lstart > olstart)
+                                               lskipped = BATcount(r1) > 0;
+                               }
                        }
                } else if (lscan == 0) {
                        /* always search r completely */
@@ -341,18 +424,28 @@ mergejoin(BAT *r1, BAT *r2, BAT *l, BAT 
                        rstart = rstartorig;
                }
                /* v is the value we're going to work with in this
-                * iteration */
-               v = VALUE(l, lcand ? lcand[0] - l->hseqbase : lstart);
+                * iteration; count number of equal values in left */
                nl = 1;
-               /* count number of equal values in left */
                if (lcand) {
+                       v = VALUE(l, lcand[0] - l->hseqbase);
                        while (++lcand < lcandend &&
                               cmp(v, VALUE(l, lcand[0] - l->hseqbase)) == 0)
                                nl++;
-               } else {
+               } else if (lvals) {
+                       v = VALUE(l, lstart);
                        while (++lstart < lend &&
                               cmp(v, VALUE(l, lstart)) == 0)
                                nl++;
+               } else {
+                       if (loff == wrd_nil) {
+                               lval = oid_nil;
+                               nl = lend - lstart;
+                               lstart = lend;
+                       } else {
+                               lval = lstart + l->tseqbase;
+                               lstart++;
+                       }
+                       v = (const char *) &lval;
                }
                /* lcand/lstart points one beyond the value we're
                 * going to match: ready for the next */
@@ -380,7 +473,7 @@ mergejoin(BAT *r1, BAT *r2, BAT *l, BAT 
                                               rordering * cmp(v, VALUE(r, 
rcand[0] - r->hseqbase)) > 0)
                                                rcand++;
                                }
-                       } else {
+                       } else if (rvals) {
                                /* look ahead a little (rscan) in r to
                                 * see whether we're better off doing
                                 * a binary search, but if l is not
@@ -399,6 +492,14 @@ mergejoin(BAT *r1, BAT *r2, BAT *l, BAT 
                                               rordering * cmp(v, VALUE(r, 
rstart)) > 0)
                                                rstart++;
                                }
+                       } else if (*(const oid *)v != oid_nil) {
+                               if (r->tseqbase == oid_nil) {
+                                       rstart = rend;
+                               } else if (*(const oid *)v > r->tseqbase) {
+                                       rstart = *(const oid *)v - r->tseqbase;
+                                       if (rstart >= rend)
+                                               rstart = rend;
+                               }
                        }
                        /* rstart or rcand points to first value >= v
                         * or end of r */
@@ -421,7 +522,7 @@ mergejoin(BAT *r1, BAT *r2, BAT *l, BAT 
                                               rordering * cmp(v, VALUE(r, 
rcandend[-1] - r->hseqbase)) < 0)
                                                rcandend--;
                                }
-                       } else {
+                       } else if (rvals) {
                                /* look ahead a little (rscan) in r to
                                 * see whether we're better off doing
                                 * a binary search, but if l is not
@@ -439,6 +540,15 @@ mergejoin(BAT *r1, BAT *r2, BAT *l, BAT 
                                               rordering * cmp(v, VALUE(r, rend 
- 1)) < 0)
                                                rend--;
                                }
+                       } else if (*(const oid *)v != oid_nil) {
+                               if (r->tseqbase == oid_nil) {
+                                       rstart = rend;
+                               } else if (*(const oid *)v > r->tseqbase + 
rstart &&
+                                          *(const oid *)v < r->tseqbase + 
rend) {
+                                       rend = *(const oid *)v - r->tseqbase + 
1;
+                                       if (rstart >= rend)
+                                               rend = rstart;
+                               }
                        }
                        /* rend/rcandend now points to first value > v
                         * or start of r */
@@ -464,7 +574,7 @@ mergejoin(BAT *r1, BAT *r2, BAT *l, BAT 
                                                rcand++;
                                        }
                                }
-                       } else {
+                       } else if (rvals) {
                                /* look ahead a little (rscan) in r to
                                 * see whether we're better off doing
                                 * a binary search */
@@ -483,6 +593,15 @@ mergejoin(BAT *r1, BAT *r2, BAT *l, BAT 
                                                rstart++;
                                        }
                                }
+                       } else if (r->tseqbase == oid_nil) {
+                               if (*(const oid *)v == oid_nil) {
+                                       nr = rend - rstart;
+                                       rstart = rend;
+                               }
+                       } else if (rstart < rend &&
+                                  *(const oid *)v == rstart + r->tseqbase) {
+                               nr = 1;
+                               rstart++;
                        }
                } else {
                        if (rcand) {
@@ -502,7 +621,7 @@ mergejoin(BAT *r1, BAT *r2, BAT *l, BAT 
                                                rcandend--;
                                        }
                                }
-                       } else {
+                       } else if (rvals) {
                                /* look ahead a little (rscan) in r to
                                 * see whether we're better off doing
_______________________________________________
checkin-list mailing list
checkin-list@monetdb.org
http://mail.monetdb.org/mailman/listinfo/checkin-list

Reply via email to