Changeset: a0cf4f652491 for MonetDB URL: https://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=a0cf4f652491 Modified Files: gdk/gdk_analytic_func.c Branch: window-tunning Log Message:
Added code for window function computation on a segment tree, plus the application for min/max aggregates. Now clean up the rest diffs (266 lines): diff --git a/gdk/gdk_analytic_func.c b/gdk/gdk_analytic_func.c --- a/gdk/gdk_analytic_func.c +++ b/gdk/gdk_analytic_func.c @@ -11,6 +11,114 @@ #include "gdk_analytic.h" #include "gdk_calc_private.h" +#define SEGMENT_TREE_FANOUT 16 +#define NOTHING /* used for not used optional arguments for aggregate computation */ + +/* segment_tree is the tree as an array, levels_offset contains the offsets in the tree where which level does start, + and nlevels contains the number of levels on the current segment tree */ +static gdk_return +rebuild_segmentree(oid ncount, oid data_size, void **segment_tree, oid *tree_capacity, oid **levels_offset, oid *levels_capacity, oid *nlevels) +{ + oid next_tree_size = ncount, counter = ncount, *new_levels_offset, next_levels = 1; /* there will be at least one level */ + void *new_segment_tree; + + assert(ncount > 0); + do { /* compute the next number of levels */ + counter = (oid) ceil((dbl)counter / SEGMENT_TREE_FANOUT); + next_tree_size += counter; + next_levels++; + } while (counter > 1); + + next_tree_size *= data_size; + if (next_tree_size > *tree_capacity) { + *tree_capacity = (((next_tree_size) + 1023) & ~1023); /* align to a multiple of 1024 bytes */ + if (!(new_segment_tree = GDKmalloc(*tree_capacity))) + return GDK_FAIL; + GDKfree(*segment_tree); + *segment_tree = new_segment_tree; + } + + *nlevels = next_levels; /* set the logical size of levels before the physical one */ + next_levels *= sizeof(oid); + if (next_levels > *levels_capacity) { + *levels_capacity = (((next_levels) + 1023) & ~1023); /* align to a multiple of 1024 bytes */ + if (!(new_levels_offset = GDKmalloc(*levels_capacity))) + return GDK_FAIL; + GDKfree(*levels_offset); + *levels_offset = new_levels_offset; + } + return GDK_SUCCEED; +} + +/* segment_tree, levels_offset and nlevels must be already defined. ARG1 and ARG2 are to be used by the aggregate */ +#define populate_segment_tree(CAST, COUNT, INIT_AGGREGATE, COMPUTE_LEVEL0, COMPUTE_LEVELN, ARG1, ARG2) \ + do { \ + CAST *ctree = (CAST *) segment_tree; \ + CAST *prev_level_begin = ctree; \ + oid level_size = COUNT, tree_offset = 0, current_level = 0; \ + \ + levels_offset[current_level++] = 0; /* first level is trivial */ \ + for (oid pos = 0; pos < level_size; pos += SEGMENT_TREE_FANOUT) { \ + oid end = MIN(level_size, pos + SEGMENT_TREE_FANOUT); \ + \ + for (oid x = pos; x < end; x++) { \ + CAST computed; \ + COMPUTE_LEVEL0(x, ARG1, ARG2); \ + ctree[tree_offset++] = computed; \ + } \ + } \ + \ + while (current_level < nlevels) { /* for the following levels we have to use the previous level results */ \ + oid prev_tree_offset = tree_offset; \ + levels_offset[current_level++] = tree_offset; \ + for (oid pos = 0; pos < level_size; pos += SEGMENT_TREE_FANOUT) { \ + oid begin = pos, end = MIN(level_size, pos + SEGMENT_TREE_FANOUT), width = end - begin; \ + CAST computed; \ + \ + INIT_AGGREGATE(ARG1, ARG2); \ + for (oid x = 0; x < width; x++) \ + COMPUTE_LEVELN(prev_level_begin[x], ARG1, ARG2); \ + ctree[tree_offset++] = computed; \ + prev_level_begin += width; \ + } \ + level_size = tree_offset - prev_tree_offset; \ + } \ + } while (0) + +#define compute_on_segment_tree(CAST, START, END, INIT_AGGREGATE, COMPUTE, FINALIZE_AGGREGATE, ARG1, ARG2) \ + do { /* taken from https://www.vldb.org/pvldb/vol8/p1058-leis.pdf */ \ + oid begin = START, tend = END; \ + CAST computed; \ + \ + INIT_AGGREGATE(ARG1, ARG2); \ + for (oid level = 0; level < nlevels; level++) { \ + CAST *tlevel = (CAST *) segment_tree + levels_offset[level]; \ + oid parent_begin = begin / SEGMENT_TREE_FANOUT; \ + oid parent_end = tend / SEGMENT_TREE_FANOUT; \ + \ + if (parent_begin == parent_end) { \ + for (oid pos = begin; pos < tend; pos++) \ + COMPUTE(tlevel[pos], ARG1, ARG2); \ + break; \ + } \ + oid group_begin = parent_begin * SEGMENT_TREE_FANOUT; \ + if (begin != group_begin) { \ + oid limit = group_begin + SEGMENT_TREE_FANOUT; \ + for (oid pos = begin; pos < limit; pos++) \ + COMPUTE(tlevel[pos], ARG1, ARG2); \ + parent_begin++; \ + } \ + oid group_end = parent_end * SEGMENT_TREE_FANOUT; \ + if (tend != group_end) { \ + for (oid pos = group_end; pos < tend; pos++) \ + COMPUTE(tlevel[pos], ARG1, ARG2); \ + } \ + begin = parent_begin; \ + tend = parent_end; \ + } \ + FINALIZE_AGGREGATE(ARG1, ARG2); \ + } while (0) + #define NTILE_CALC(TPE, NEXT_VALUE, LNG_HGE, UPCAST, VALIDATION) \ do { \ TPE j = 0; \ @@ -757,27 +865,37 @@ GDKanalyticallead(BAT *r, BAT *b, BAT *p } \ } while (0) +#define INIT_AGGREGATE_MIN_MAX_FIXED(TPE, MIN_MAX) \ + do { \ + computed = TPE##_nil; \ + } while (0) +#define COMPUTE_LEVEL0_MIN_MAX_FIXED(X, TPE, MIN_MAX) \ + do { \ + computed = bp[j + X]; \ + } while (0) +#define COMPUTE_LEVELN_MIN_MAX_FIXED(VAL, TPE, MIN_MAX) \ + do { \ + if (!is_##TPE##_nil(VAL)) { \ + if (is_##TPE##_nil(computed)) \ + computed = VAL; \ + else \ + computed = MIN_MAX(computed, VAL); \ + } \ + } while (0) +#define FINALIZE_AGGREGATE_MIN_MAX_FIXED(TPE, MIN_MAX) \ + do { \ + rb[k] = computed; \ + has_nils |= is_##TPE##_nil(computed); \ + } while (0) #define ANALYTICAL_MIN_MAX_CALC_FIXED_OTHERS(TPE, MIN_MAX) \ do { \ - TPE curval = TPE##_nil; \ - for (; k < i; k++) { \ - TPE *bs = bp + start[k]; \ - TPE *be = bp + end[k]; \ - for (; bs < be; bs++) { \ - TPE v = *bs; \ - if (!is_##TPE##_nil(v)) { \ - if (is_##TPE##_nil(curval)) \ - curval = v; \ - else \ - curval = MIN_MAX(v, curval); \ - } \ - } \ - rb[k] = curval; \ - if (is_##TPE##_nil(curval)) \ - has_nils = true; \ - else \ - curval = TPE##_nil; /* For the next iteration */ \ - } \ + oid ncount = i - k; \ + if ((res = rebuild_segmentree(ncount, data_size, &segment_tree, &tree_capacity, &levels_offset, &levels_capacity, &nlevels)) != GDK_SUCCEED) \ + goto cleanup; \ + populate_segment_tree(TPE, ncount, INIT_AGGREGATE_MIN_MAX_FIXED, COMPUTE_LEVEL0_MIN_MAX_FIXED, COMPUTE_LEVELN_MIN_MAX_FIXED, TPE, MIN_MAX); \ + for (; k < i; k++) \ + compute_on_segment_tree(TPE, start[k] - j, end[k] - j, INIT_AGGREGATE_MIN_MAX_FIXED, COMPUTE_LEVELN_MIN_MAX_FIXED, FINALIZE_AGGREGATE_MIN_MAX_FIXED, TPE, MIN_MAX); \ + j = k; \ } while (0) #define ANALYTICAL_MIN_MAX_CALC_VARSIZED_UNBOUNDED_TILL_CURRENT_ROW(GT_LT) \ @@ -858,26 +976,38 @@ GDKanalyticallead(BAT *r, BAT *b, BAT *p } \ } while (0) +#define INIT_AGGREGATE_MIN_MAX_VARSIZED(GT_LT, NOTHING) \ + do { \ + computed = (void*) nil; \ + } while (0) +#define COMPUTE_LEVEL0_MIN_MAX_VARSIZED(X, GT_LT, NOTHING) \ + do { \ + computed = BUNtail(bpi, j + X); \ + } while (0) +#define COMPUTE_LEVELN_MIN_MAX_VARSIZED(VAL, GT_LT, NOTHING) \ + do { \ + if (atomcmp(VAL, nil) != 0) { \ + if (atomcmp(computed, nil) == 0) \ + computed = VAL; \ + else \ + computed = atomcmp(VAL, computed) GT_LT 0 ? computed : VAL; \ + } \ + } while (0) +#define FINALIZE_AGGREGATE_MIN_MAX_VARSIZED(GT_LT, NOTHING) \ + do { \ + if ((res = tfastins_nocheckVAR(r, k, computed, Tsize(r))) != GDK_SUCCEED) \ + goto cleanup; \ + has_nils |= atomcmp(computed, nil) == 0; \ + } while (0) #define ANALYTICAL_MIN_MAX_CALC_VARSIZED_OTHERS(GT_LT) \ do { \ - void *curval = (void*) nil; \ - for (; k < i; k++) { \ - j = start[k]; \ - l = end[k]; \ - curval = (void*) nil; \ - for (; j < l; j++) { \ - void *next = BUNtail(bpi, j); \ - if (atomcmp(next, nil) != 0) { \ - if (atomcmp(curval, nil) == 0) \ - curval = next; \ - else \ - curval = atomcmp(next, curval) GT_LT 0 ? curval : next; \ - } \ - } \ - if (tfastins_nocheckVAR(r, k, curval, Tsize(r)) != GDK_SUCCEED) \ - return GDK_FAIL; \ - has_nils |= atomcmp(curval, nil) == 0; \ - } \ + oid ncount = i - k; \ + if ((res = rebuild_segmentree(ncount, data_size, &segment_tree, &tree_capacity, &levels_offset, &levels_capacity, &nlevels)) != GDK_SUCCEED) \ + goto cleanup; \ + populate_segment_tree(void*, ncount, INIT_AGGREGATE_MIN_MAX_VARSIZED, COMPUTE_LEVEL0_MIN_MAX_VARSIZED, COMPUTE_LEVELN_MIN_MAX_VARSIZED, GT_LT, NOTHING); \ + for (; k < i; k++) \ + compute_on_segment_tree(void*, start[k] - j, end[k] - j, INIT_AGGREGATE_MIN_MAX_VARSIZED, COMPUTE_LEVELN_MIN_MAX_VARSIZED, FINALIZE_AGGREGATE_MIN_MAX_VARSIZED, GT_LT, NOTHING); \ + j = k; \ } while (0) #define ANALYTICAL_MIN_MAX_PARTITIONS(TPE, MIN_MAX, IMP) \ @@ -942,11 +1072,14 @@ gdk_return \ GDKanalytical##OP(BAT *r, BAT *p, BAT *o, BAT *b, BAT *s, BAT *e, int tpe, int frame_type) \ { \ bool has_nils = false; \ - oid i = 0, j = 0, k = 0, l = 0, cnt = BATcount(b), *restrict start = s ? (oid*)Tloc(s, 0) : NULL, *restrict end = e ? (oid*)Tloc(e, 0) : NULL; \ + oid i = 0, j = 0, k = 0, l = 0, cnt = BATcount(b), *restrict start = s ? (oid*)Tloc(s, 0) : NULL, *restrict end = e ? (oid*)Tloc(e, 0) : NULL, \ + *levels_offset = NULL, data_size = ATOMextern(tpe) ? sizeof(void*) : ATOMsize(tpe), tree_capacity = 0, nlevels = 0, levels_capacity = 0; \ bit *np = p ? Tloc(p, 0) : NULL, *op = o ? Tloc(o, 0) : NULL; \ BATiter bpi = bat_iterator(b); \ const void *nil = ATOMnilptr(tpe); \ int (*atomcmp)(const void *, const void *) = ATOMcompare(tpe); \ + void *segment_tree = NULL; \ + gdk_return res = GDK_SUCCEED; \ \ if (cnt > 0) { \ switch (frame_type) { \ @@ -968,10 +1101,13 @@ GDKanalytical##OP(BAT *r, BAT *p, BAT *o } \ } \ \ - BATsetcount(r, cnt); \ - r->tnonil = !has_nils; \ - r->tnil = has_nils; \ - return GDK_SUCCEED; \ + BATsetcount(r, cnt); \ + r->tnonil = !has_nils; \ + r->tnil = has_nils; \ +cleanup: \ + GDKfree(segment_tree); \ + GDKfree(levels_offset); \ + return res; \ } ANALYTICAL_MIN_MAX(min, MIN, >) _______________________________________________ checkin-list mailing list checkin-list@monetdb.org https://www.monetdb.org/mailman/listinfo/checkin-list