Changeset: d2bc6c0dfac6 for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=d2bc6c0dfac6
Modified Files:
        sql/backends/monet5/rel_weld.c
Branch: rel-weld
Log Message:

string support + small readme in rel_weld.c


diffs (192 lines):

diff --git a/sql/backends/monet5/rel_weld.c b/sql/backends/monet5/rel_weld.c
--- a/sql/backends/monet5/rel_weld.c
+++ b/sql/backends/monet5/rel_weld.c
@@ -24,6 +24,24 @@
 #include "mal_builder.h"
 #include "mal_weld.h"
 
+/* The code here generates a Weld program by parsing the relational algebra 
tree and using the
+ * produce-consume model as described in the "Efficiently Compiling Efficient 
Query Plans for Modern
+ * Hardware" paper. Some notes on the implementation:
+ * -  In practice we don't need to separate produce and consume functions. The 
consume phase begins
+ *    when the produce function call ends
+ * -  We still produce a MAL program with one big "weld.run" instruction which 
implements the query logic
+ * -  We need to generate textual Weld code, which is error prone and a pain 
in C. Each operator generates
+ *    code on the fly, i.e. we append Weld statements to a string buffer which 
will eventually result in a
+ *    complete program. Each operator generates its code in a local buffer 
which is then appended to the 
+ *    main Weld code, which is stored in wstate->program.
+ * -  op_basetable is also handled by MonetDB, we rely on the existing code 
for reading the BATs
+ * -  When expressions don't involve columns, we let MonetDB handle them. This 
is useful for evaluating complex 
+ *    atoms
+ * -  String BATs are backed by 2 arrays: one with the strings and the other 
with the offsets. At the end of the Weld
+ *    program we need to also return the strings array so that we can later 
build a string BAT, so special care is
+ *    need to ensure that the strings array is referenced correctly throughout 
the program.
+ * */
+
 #define STR_BUF_SIZE 4096
 
 /* From sql_statement.c */
@@ -41,18 +59,15 @@
 
 typedef struct {
        int next_var;
-       int result_var;
        int num_parens; /* number of parentheses */
        int num_loops;
        str builder;
        str program;
        unsigned long program_max_len;
+       char str_cols[STR_BUF_SIZE * 3]; /* names the vheaps take */
        list *stmt_list;
-       list *input_col_list;
 } weld_state;
 
-/* In practice we don't need separate produce and consume functions. The 
consume phase
- * begins when the produce call ends */
 typedef int (*produce_func)(backend*, sql_rel*, weld_state*);
 
 static produce_func getproduce_func(sql_rel *rel);
@@ -65,6 +80,15 @@ static void dump_program(weld_state *wst
        fclose(f);
 }
 
+static void prepend_weld_stmt(weld_state *wstate, str weld_stmt) {
+       if (strlen(wstate->program) + strlen(weld_stmt) >= 
wstate->program_max_len) {
+               wstate->program_max_len  = strlen(wstate->program) + 
strlen(weld_stmt) + 1;
+               wstate->program = realloc(wstate->program, 
wstate->program_max_len * sizeof(char));
+       }
+       memmove(wstate->program + strlen(weld_stmt), wstate->program, 
strlen(wstate->program) + 1);
+       memcpy(wstate->program, weld_stmt, strlen(weld_stmt));
+}
+
 static void append_weld_stmt(weld_state *wstate, str weld_stmt) {
        if (strlen(wstate->program) + strlen(weld_stmt) >= 
wstate->program_max_len) {
                wstate->program_max_len = strlen(wstate->program) + 
strlen(weld_stmt) + 1;
@@ -249,7 +273,7 @@ base_table_produce(backend *be, sql_rel 
        stmt *sub = subrel_bin(be, rel, NULL);
        node *en;
        sql_exp *exp;
-       char weld_stmt[STR_BUF_SIZE], col_name[256];
+       char weld_stmt[STR_BUF_SIZE], col_name[256], iter_idx[64];
        int count;
        int len = sprintf(weld_stmt, "for(zip(");
        for (en = rel->exps->h; en; en = en->next) {
@@ -259,6 +283,14 @@ base_table_produce(backend *be, sql_rel 
                if (en->next != NULL) {
                        len += sprintf(weld_stmt + len, ", ");
                }
+               if (exp_subtype(exp)->type->localtype == TYPE_str) {
+                       /* Save the vheap and stroffset names */
+                       sprintf(col_name, "%s_%s", (str)exp->l, (str)exp->r);
+                       sprintf(wstate->str_cols + strlen(wstate->str_cols), 
"let %s_strcol = in%dstr;",
+                                       col_name, col->nr);
+                       sprintf(wstate->str_cols + strlen(wstate->str_cols), 
"let %s_stroffset = in%dstroffset;",
+                                       col_name, col->nr);
+               }
        }
        /* builder and function header */
        ++wstate->num_loops;
@@ -270,11 +302,17 @@ base_table_produce(backend *be, sql_rel 
                sprintf(col_name, "%s_%s", (str)exp->l, (str)exp->r);
                if (rel->exps->h->next == NULL) {
                        /* just a single column so n.$0 doesn't work */
-                       len += sprintf(weld_stmt + len, "let %s = n%d", 
col_name, wstate->num_loops);
+                       sprintf(iter_idx, "n%d", wstate->num_loops);
                } else {
-                       len += sprintf(weld_stmt + len, "let %s = n%d.$%d", 
col_name, wstate->num_loops, count);
+                       sprintf(iter_idx, "n%d.$%d", wstate->num_loops, count);
                }
-               len += sprintf(weld_stmt + len, ";");
+               if (exp_subtype(exp)->type->localtype == TYPE_str) {
+                       len += sprintf(weld_stmt + len, "let %s = 
strslice(%s_strcol, i64(%s) + %s_stroffset);",
+                                                  col_name, col_name, 
iter_idx, col_name);
+                       len += sprintf(weld_stmt + len, "let %s_stridx = %s;", 
col_name, iter_idx);
+               } else {
+                       len += sprintf(weld_stmt + len, "let %s = %s;", 
col_name, iter_idx);
+               }
        }
        ++wstate->num_parens;
        append_weld_stmt(wstate, weld_stmt);
@@ -326,9 +364,26 @@ project_produce(backend *be, sql_rel *re
        for (en = rel->exps->h; en; en = en->next) {
                exp = en->data;
                sprintf(col_name, "%s_%s", exp->rname ? exp->rname : exp->name, 
exp->name);
-               len += sprintf(weld_stmt + len, "let %s = ", col_name);
-               if (exp_to_weld(be, weld_stmt, &len, en->data, 
wstate->stmt_list) < 0) return -1;
-               len += sprintf(weld_stmt + len, ";");
+               if (exp_subtype(exp)->type->localtype == TYPE_str) {
+                       if (exp->type != e_column) {
+                               /* We can only handle string column renaming. 
If this is something else, like producing
+                                * a new string, we don't support it yet. */
+                               return -1;
+                       }
+                       char old_col_name[256];
+                       int old_col_len = 0;
+                       if (exp_to_weld(be, old_col_name, &old_col_len, exp, 
wstate->stmt_list) < 0) return -1;
+                       len += sprintf(weld_stmt + len, "let %s = %s;", 
col_name, old_col_name);
+                       len += sprintf(weld_stmt + len, "let %s_stridx = 
%s_stridx;", col_name, old_col_name);
+                       sprintf(wstate->str_cols + strlen(wstate->str_cols), 
"let %s_strcol = %s_strcol;",
+                                       col_name, old_col_name);
+                       sprintf(wstate->str_cols + strlen(wstate->str_cols), 
"let %s_stroffset = %s_stroffset;",
+                                       col_name, old_col_name);
+               } else {
+                       len += sprintf(weld_stmt + len, "let %s = ", col_name);
+                       if (exp_to_weld(be, weld_stmt, &len, exp, 
wstate->stmt_list) < 0) return -1;
+                       len += sprintf(weld_stmt + len, ";");
+               }
        }
        append_weld_stmt(wstate, weld_stmt);
        return 0;
@@ -438,8 +493,13 @@ root_produce(backend *be, sql_rel *rel)
        if (result_is_bat) {
                len += sprintf(weld_stmt + len, "{");
                for (en = root->exps->h; en; en = en->next) {
-                       sql_subtype *subtype = exp_subtype(en->data);
-                       len += sprintf(weld_stmt + len, "appender[%s]", 
getWeldType(subtype->type->localtype));
+                       int type = exp_subtype(en->data)->type->localtype;
+                       if (type == TYPE_str) {
+                               /* We'll append just the offset in vheap, we 
don't know the type yet */
+                               len += sprintf(weld_stmt + len, "appender[?]");
+                       } else {
+                               len += sprintf(weld_stmt + len, "appender[%s]", 
getWeldType(type));
+                       }
                        if (en->next != NULL) {
                                len += sprintf(weld_stmt + len, ", ");
                        }
@@ -464,7 +524,12 @@ root_produce(backend *be, sql_rel *rel)
                sql_exp *exp = en->data;
                sprintf(col_name, "%s_%s", exp->rname ? exp->rname : exp->name, 
exp->name);
                if (result_is_bat) {
-                       len += sprintf(weld_stmt + len, "merge(b%d.$%d, %s)", 
wstate->num_loops, count, col_name);
+                       int type = exp_subtype(en->data)->type->localtype;
+                       if (type == TYPE_str) {
+                               len += sprintf(weld_stmt + len, "merge(b%d.$%d, 
%s_stridx)", wstate->num_loops, count, col_name);
+                       } else {
+                               len += sprintf(weld_stmt + len, "merge(b%d.$%d, 
%s)", wstate->num_loops, count, col_name);
+                       }
                } else {
                        len += sprintf(weld_stmt + len, "%s", col_name);
                }
@@ -483,6 +548,12 @@ root_produce(backend *be, sql_rel *rel)
                len += sprintf(weld_stmt + len, "{");
                for (en = root->exps->h, count = 0; en; en = en->next, count++) 
{
                        len += sprintf(weld_stmt + len, "result(v%d.$%d)", 
result_var, count);
+                       sql_exp *exp = en->data;
+                       int type = exp_subtype(en->data)->type->localtype;
+                       if (type == TYPE_str) {
+                               sprintf(col_name, "%s_%s", exp->rname ? 
exp->rname : exp->name, exp->name);
+                               len += sprintf(weld_stmt + len, ", %s_strcol", 
col_name);
+                       }
                        if (en->next != NULL) {
                                len += sprintf(weld_stmt + len, ", ");
                        }
@@ -492,8 +563,8 @@ root_produce(backend *be, sql_rel *rel)
                /* Just the top variable */
                len += sprintf(weld_stmt + len, "v%d", result_var);
        }
-
        append_weld_stmt(wstate, weld_stmt);
+       prepend_weld_stmt(wstate, wstate->str_cols);
 
        /* Build the Weld MAL instruction */
        InstrPtr weld_instr = newInstruction(be->mb, weldRef, weldRunRef);
_______________________________________________
checkin-list mailing list
checkin-list@monetdb.org
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to