Changeset: fb1bb4293768 for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB/rev/fb1bb4293768
Modified Files:
        sql/backends/monet5/rel_bin.c
        sql/backends/monet5/sql.c
        sql/backends/monet5/sql_statement.h
        sql/include/sql_relation.h
        sql/rel.txt
        sql/server/rel_distribute.c
        sql/server/rel_dump.c
        sql/server/rel_exp.c
        sql/server/rel_optimizer.c
        sql/server/rel_partition.c
        sql/server/rel_propagate.c
        sql/server/rel_rel.c
        sql/server/rel_rel.h
        sql/server/rel_select.c
        sql/server/rel_updates.c
        sql/server/sql_partition.c
        sql/test/SQLancer/Tests/sqlancer11.test
Branch: default
Log Message:

General cleanup of merge statements.

The previous merge statment implementation was prone to error because it was 
not possible to accurately select the matched and non matched values of a join 
in the parsing phase. Instead the new op_merge relation is created and the 
matched/non matched split is done at the code generation layer.

However a few issues remain:

Some complex subqueries get rewritten at the unnest layer, which makes 
difficult to make the match/non matched split, so they are now disabled on 
merge join conditions.

I changed the join code generation to return the joined and the join difference 
oids in a hacky way, but this will be cleaned up in the pushcands branch.

In the other hand, fewer intermediates are now generated and it makes the 
implementation of other merge statement features easier in the future.


diffs (truncated from 909 to 300 lines):

diff --git a/sql/backends/monet5/rel_bin.c b/sql/backends/monet5/rel_bin.c
--- a/sql/backends/monet5/rel_bin.c
+++ b/sql/backends/monet5/rel_bin.c
@@ -1969,6 +1969,7 @@ rel2bin_args(backend *be, sql_rel *rel, 
        case op_union:
        case op_inter:
        case op_except:
+       case op_merge:
                args = rel2bin_args(be, rel->l, args);
                args = rel2bin_args(be, rel->r, args);
                break;
@@ -2509,11 +2510,10 @@ static stmt *
 rel2bin_join(backend *be, sql_rel *rel, list *refs)
 {
        mvc *sql = be->mvc;
-       list *l, *sexps = NULL;
+       list *l, *sexps = NULL, *l2 = NULL;
        node *en = NULL, *n;
-       stmt *left = NULL, *right = NULL, *join = NULL, *jl, *jr;
-       stmt *ld = NULL, *rd = NULL;
-       int need_left = (rel->flag == LEFT_JOIN);
+       stmt *left = NULL, *right = NULL, *join = NULL, *jl, *jr, *ld = NULL, 
*rd = NULL, *res;
+       int need_left = (rel->flag & LEFT_JOIN);
 
        if (rel->l) /* first construct the left sub relation */
                left = subrel_bin(be, rel->l, refs);
@@ -2679,6 +2679,15 @@ rel2bin_join(backend *be, sql_rel *rel, 
                rd = stmt_tdiff(be, rd, jr, NULL);
        }
 
+       if (rel->op == op_left) { /* used for merge statments, this will be 
cleaned out on the pushcands branch :) */
+               l2 = sa_list(sql->sa);
+               list_append(l2, left);
+               list_append(l2, right);
+               list_append(l2, jl);
+               list_append(l2, jr);
+               list_append(l2, ld);
+       }
+
        for( n = left->op4.lval->h; n; n = n->next ) {
                stmt *c = n->data;
                const char *rnme = table_name(sql->sa, c);
@@ -2713,7 +2722,9 @@ rel2bin_join(backend *be, sql_rel *rel, 
                s = stmt_alias(be, s, rnme, nme);
                list_append(l, s);
        }
-       return stmt_list(be, l);
+       res = stmt_list(be, l);
+       res->extra = l2; /* used for merge statments, this will be cleaned out 
on the pushcands branch :) */
+       return res;
 }
 
 static int
@@ -5813,6 +5824,119 @@ rel2bin_output(backend *be, sql_rel *rel
        }
 }
 
+static list *
+merge_stmt_join_projections(backend *be, stmt *left, stmt *right, stmt *jl, 
stmt *jr, stmt *diff)
+{
+       mvc *sql = be->mvc;
+       list *l = sa_list(sql->sa);
+
+       if (left)
+               for( node *n = left->op4.lval->h; n; n = n->next ) {
+                       stmt *c = n->data;
+                       const char *rnme = table_name(sql->sa, c);
+                       const char *nme = column_name(sql->sa, c);
+                       stmt *s = stmt_project(be, jl ? jl : diff, column(be, 
c));
+
+                       s = stmt_alias(be, s, rnme, nme);
+                       list_append(l, s);
+               }
+       if (right)
+               for( node *n = right->op4.lval->h; n; n = n->next ) {
+                       stmt *c = n->data;
+                       const char *rnme = table_name(sql->sa, c);
+                       const char *nme = column_name(sql->sa, c);
+                       stmt *s = stmt_project(be, jr ? jr : diff, column(be, 
c));
+
+                       s = stmt_alias(be, s, rnme, nme);
+                       list_append(l, s);
+               }
+       return l;
+}
+
+static void
+validate_merge_delete_update(backend *be, bool delete, stmt *bt_stmt, sql_rel 
*bt, stmt *jl, stmt *ld)
+{
+       mvc *sql = be->mvc;
+       str msg;
+       sql_table *t = bt->l;
+       char *alias = (char *) rel_name(bt);
+       stmt *cnt1 = stmt_aggr(be, jl, NULL, NULL, sql_bind_func(sql, "sys", 
"count", sql_bind_localtype("void"), NULL, F_AGGR), 1, 0, 1);
+       stmt *cnt2 = stmt_aggr(be, ld, NULL, NULL, sql_bind_func(sql, "sys", 
"count", sql_bind_localtype("void"), NULL, F_AGGR), 1, 0, 1);
+       sql_subfunc *add = sql_bind_func(sql, "sys", "sql_add", 
tail_type(cnt1), tail_type(cnt2), F_FUNC);
+       stmt *s1 = stmt_binop(be, cnt1, cnt2, NULL, add);
+       stmt *cnt3 = stmt_aggr(be, bin_find_smallest_column(be, bt_stmt), NULL, 
NULL, sql_bind_func(sql, "sys", "count", sql_bind_localtype("void"), NULL, 
F_AGGR), 1, 0, 1);
+       sql_subfunc *bf = sql_bind_func(sql, "sys", ">", tail_type(s1), 
tail_type(cnt3), F_FUNC);
+       stmt *s2 = stmt_binop(be, s1, cnt3, NULL, bf);
+
+       if (alias && strcmp(alias, t->base.name) == 0) /* detect if alias is 
present */
+               alias = NULL;
+       msg = sa_message(sql->sa, SQLSTATE(40002) "MERGE %s: Multiple rows in 
the input relation match the same row in the target %s '%s%s%s'",
+                                        delete ? "DELETE" : "UPDATE",
+                                        alias ? "relation" : "table",
+                                        alias ? alias : t->s ? t->s->base.name 
: "", alias ? "" : ".", alias ? "" : t->base.name);
+       (void)stmt_exception(be, s2, msg, 00001);
+}
+
+static stmt *
+rel2bin_merge_apply_update(backend *be, sql_rel *join, sql_rel *upd, list 
*refs, stmt *bt_stmt, stmt *target_stmt, stmt *jl, stmt *jr, stmt *ld, stmt 
**rd)
+{
+       if (is_insert(upd->op)) {
+               if (!*rd) {
+                       *rd = stmt_tdiff(be, stmt_mirror(be, 
bin_find_smallest_column(be, target_stmt)), jr, NULL);
+               }
+               stmt *s = stmt_list(be, merge_stmt_join_projections(be, NULL, 
target_stmt, NULL, NULL, *rd));
+               refs_update_stmt(refs, join, s); /* project the differences on 
the target side for inserts */
+
+               return rel2bin_insert(be, upd, refs);
+       } else {
+               stmt *s = stmt_list(be, merge_stmt_join_projections(be, 
bt_stmt, is_update(upd->op) ? target_stmt : NULL, jl, is_update(upd->op) ? jr : 
NULL, NULL));
+               refs_update_stmt(refs, join, s); /* project the matched values 
on both sides for updates and deletes */
+
+               assert(is_update(upd->op) || is_delete(upd->op));
+               /* the left joined values + left difference must be smaller 
than the table count */
+               validate_merge_delete_update(be, is_update(upd->op), bt_stmt, 
join->l, jl, ld);
+
+               return is_update(upd->op) ? rel2bin_update(be, upd, refs) : 
rel2bin_delete(be, upd, refs);
+       }
+}
+
+static stmt *
+rel2bin_merge(backend *be, sql_rel *rel, list *refs)
+{
+       mvc *sql = be->mvc;
+       sql_rel *join = rel->l, *r = rel->r;
+       stmt *join_st, *bt_stmt, *target_stmt, *jl, *jr, *ld, *rd = NULL, *ns;
+       list *slist = sa_list(sql->sa);
+
+       assert(rel_is_ref(join) && is_left(join->op));
+       join_st = subrel_bin(be, join, refs);
+       if (!join_st)
+               return NULL;
+
+       /* grab generated left join outputs and generate updates accordingly to 
matched and not matched values */
+       assert(join_st->type == st_list && list_length(join_st->extra) == 5);
+       bt_stmt = join_st->extra->h->data;
+       target_stmt = join_st->extra->h->next->data;
+       jl = join_st->extra->h->next->next->data;
+       jr = join_st->extra->h->next->next->next->data;
+       ld = join_st->extra->h->next->next->next->next->data;
+
+       if (is_ddl(r->op)) {
+               assert(r->flag == ddl_list);
+               if (r->l && !(ns = rel2bin_merge_apply_update(be, join, r->l, 
refs, bt_stmt, target_stmt, jl, jr, ld, &rd)))
+                       return NULL;
+               list_append(slist, ns);
+               if (r->r && !(ns = rel2bin_merge_apply_update(be, join, r->r, 
refs, bt_stmt, target_stmt, jl, jr, ld, &rd)))
+                       return NULL;
+               list_append(slist, ns);
+       } else {
+               if (!(ns = rel2bin_merge_apply_update(be, join, r, refs, 
bt_stmt, target_stmt, jl, jr, ld, &rd)))
+                       return NULL;
+               list_append(slist, ns);
+       }
+       return stmt_list(be, slist);
+}
+
 static stmt *
 rel2bin_list(backend *be, sql_rel *rel, list *refs)
 {
@@ -6257,6 +6381,11 @@ subrel_bin(backend *be, sql_rel *rel, li
                if (sql->type == Q_TABLE)
                        sql->type = Q_UPDATE;
                break;
+       case op_merge:
+               s = rel2bin_merge(be, rel, refs);
+               if (sql->type == Q_TABLE)
+                       sql->type = Q_UPDATE;
+               break;
        case op_ddl:
                s = rel2bin_ddl(be, rel, refs);
                break;
diff --git a/sql/backends/monet5/sql.c b/sql/backends/monet5/sql.c
--- a/sql/backends/monet5/sql.c
+++ b/sql/backends/monet5/sql.c
@@ -90,7 +90,7 @@ rel_no_mitosis(sql_rel *rel)
        if (is_topn(rel->op) || is_sample(rel->op) || 
is_simple_project(rel->op))
                return rel_no_mitosis(rel->l);
        if (is_modify(rel->op) && rel->card <= CARD_AGGR) {
-               if (is_delete(rel->op))
+               if (is_delete(rel->op) || is_merge(rel->op))
                        return 1;
                return rel_no_mitosis(rel->r);
        }
diff --git a/sql/backends/monet5/sql_statement.h 
b/sql/backends/monet5/sql_statement.h
--- a/sql/backends/monet5/sql_statement.h
+++ b/sql/backends/monet5/sql_statement.h
@@ -129,6 +129,7 @@ typedef struct stmt {
        const char *tname;
        const char *cname;
        InstrPtr q;
+       list *extra;    /* used for merge statments, this will be cleaned out 
on the pushcands branch :) */
 } stmt;
 
 /* which MAL modules can push candidates */
diff --git a/sql/include/sql_relation.h b/sql/include/sql_relation.h
--- a/sql/include/sql_relation.h
+++ b/sql/include/sql_relation.h
@@ -76,6 +76,7 @@ typedef struct expression {
 
 #define LEFT_JOIN              4
 #define REL_PARTITION          8
+#define MERGE_LEFT             16 /* used by merge statements */
 
 /* We need bit wise exclusive numbers as we merge the level also in the flag */
 #define PSM_SET 1
@@ -161,7 +162,8 @@ typedef enum operator_type {
        op_insert,      /* insert(l=table, r insert expressions) */
        op_update,      /* update(l=table, r update expressions) */
        op_delete,      /* delete(l=table, r delete expression) */
-       op_truncate /* truncate(l=table) */
+       op_truncate, /* truncate(l=table) */
+       op_merge
 } operator_type;
 
 #define is_atom(et)            (et == e_atom)
@@ -197,12 +199,13 @@ typedef enum operator_type {
 #define is_project(op)                 (op == op_project || op == op_groupby 
|| is_set(op))
 #define is_groupby(op)                 (op == op_groupby)
 #define is_topn(op)            (op == op_topn)
-#define is_modify(op)          (op == op_insert || op == op_update || op == 
op_delete || op == op_truncate)
+#define is_modify(op)          (op == op_insert || op == op_update || op == 
op_delete || op == op_truncate || op == op_merge)
 #define is_sample(op)          (op == op_sample)
 #define is_insert(op)          (op == op_insert)
 #define is_update(op)          (op == op_update)
 #define is_delete(op)          (op == op_delete)
 #define is_truncate(op)        (op == op_truncate)
+#define is_merge(op)           (op == op_merge)
 
 /* ZERO on empty sets, needed for sum (of counts)). */
 #define zero_if_empty(e)       ((e)->zero_if_empty)
diff --git a/sql/rel.txt b/sql/rel.txt
--- a/sql/rel.txt
+++ b/sql/rel.txt
@@ -74,7 +74,7 @@ SAMPLE        (card ATOM, AGGR, or MULTI (same 
        -> l            is relation
        -> flag         (0) no flags
 
-INSERT|DELETE|UPDATE|TRUNCATE  (card MULTI)
+INSERT|DELETE|UPDATE|TRUNCATE  (card ATOM, AGGR or MULTI (same card as the to 
be inserted/deleted/updated relation))
        -> l            is relation to modify
        -> r            to be inserted/deleted/updated relation
                        For update the ->r projection joins in current
@@ -94,6 +94,12 @@ INSERT/UPDATE have a special case
                        relation is used as we need to keep access to
                        the bottom value relation
 
+MERGE (card ATOM, AGGR or MULTI (max card of updates))
+       -> l            left join between the target table and the source
+       -> r            a DDL list with an insert and update/delete, or a 
single relation update on the target table
+       -> flag         (0) no flags
+       ->exps          empty list, ie not used
+
 Expressions
 
 e_atom         (card ATOM)
diff --git a/sql/server/rel_distribute.c b/sql/server/rel_distribute.c
--- a/sql/server/rel_distribute.c
+++ b/sql/server/rel_distribute.c
@@ -44,6 +44,7 @@ has_remote_or_replica( sql_rel *rel )
        case op_union:
        case op_inter:
        case op_except:
+       case op_merge:
                if (has_remote_or_replica( rel->l ) ||
                        has_remote_or_replica( rel->r ))
                        return 1;
@@ -262,6 +263,7 @@ replica(mvc *sql, sql_rel *rel, char *ur
        case op_union:
        case op_inter:
        case op_except:
+       case op_merge:
                rel->l = replica(sql, rel->l, uri);
                rel->r = replica(sql, rel->r, uri);
                break;
@@ -494,6 +496,7 @@ distribute(mvc *sql, sql_rel *rel)
        case op_insert:
        case op_update:
        case op_delete:
+       case op_merge:
                l = rel->l = distribute(sql, rel->l);
                r = rel->r = distribute(sql, rel->r);
 
@@ -612,6 +615,8 @@ rel_remote_func(mvc *sql, sql_rel *rel)
        case op_union:
        case op_inter:
        case op_except:
_______________________________________________
checkin-list mailing list
checkin-list@monetdb.org
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to