Changeset: 21b34e3c36ca for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB/rev/21b34e3c36ca
Modified Files:
        sql/backends/monet5/copy.c
        sql/backends/monet5/rel_bin.c
Branch: copyparpipe
Log Message:

Generate a reasonable looking plan


diffs (truncated from 419 to 300 lines):

diff --git a/sql/backends/monet5/copy.c b/sql/backends/monet5/copy.c
--- a/sql/backends/monet5/copy.c
+++ b/sql/backends/monet5/copy.c
@@ -11,7 +11,7 @@
 #include "streams.h"
 #include "mal.h"
 #include "mal_errors.h"
-// #include "mal_client.h"
+#include "mal_client.h"
 // #include "mal_instruction.h"
 #include "mal_exception.h"
 // #include "mal_interpreter.h"
@@ -57,12 +57,80 @@ end:
 }
 
 
+static str
+COPYfixlines(lng *ret_linecount, lng *ret_bytesmoved, bat *left_block, lng 
*left_skip_amount, bat *right_block, str *linesep_arg, str *quote_arg)
+{
+       str msg = MAL_SUCCEED;
+
+       (void)ret_linecount;
+       (void)ret_bytesmoved;
+       (void)left_block;
+       (void)left_skip_amount;
+       (void)right_block;
+       (void)linesep_arg;
+       (void)quote_arg;
+
+       bailout("copy.fixlines", "banana");
+end:
+       return msg;
+}
+
+
+static str
+COPYsplitlines(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
+{
+       str msg = MAL_SUCCEED;
+
+       (void)cntxt;
+       (void)mb;
+       (void)stk;
+       (void)pci;
+
+       bailout("copy.splitlines", "banana");
+end:
+       return msg;
+}
+
+static str
+COPYparse_append(int *ret, int *mvc, str *s, str *t, str *c, bat *block, bat 
*fields)
+{
+       str msg = MAL_SUCCEED;
+
+       (void)ret;
+       (void)mvc;
+       (void)s;
+       (void)t;
+       (void)c;
+       (void)block;
+       (void)fields;
+
+       bailout("copy.parse_append", "banana");
+end:
+       return msg;
+}
+
+
 #include "mel.h"
 static mel_func copy_init_funcs[] = {
  command("copy", "read", COPYread, true, "Clear the BAT and read 'block_size' 
bytes into it from 's'",
        args(1, 4,
                arg("",lng),
-               arg("stream", streams), arg("block_size", lng), batarg("block", 
bte)
+               arg("stream", streams), arg("block_size", int), batarg("block", 
bte)
+ )),
+ command("copy", "fixlines", COPYfixlines, true, "Copy bytes from 'right' to 
'left' to complete the final line of 'left'. Return left line count and bytes 
copied",
+       args(2, 7,
+       arg("linecount", lng), arg("bytesmoved", int),
+       batarg("left",bte), arg("left_skip", int), batarg("right", bte), 
arg("linesep", str), arg("quote", str),
+ )),
+ pattern("copy", "splitlines", COPYsplitlines, false, "Find the fields of the 
individual columns", args(1,8,
+       batvararg("", int),
+       batarg("block", bte), arg("skip", int), arg("col_sep", str), 
arg("line_sep", str), arg("quote", str), arg("null_repr", str), arg("escape", 
bit)
+ )),
+
+ command("copy", "parse_append", COPYparse_append, true, "parse the fields and 
append them to the column",
+ args(1, 7,
+       arg("", int),
+       arg("mvc", int), arg("s", str), arg("t", str), arg("c", str), 
batarg("block", bte), batarg("fields", int)
  )),
  { .imp=NULL }
 };
@@ -71,5 +139,7 @@ static mel_func copy_init_funcs[] = {
 #undef read
 #pragma section(".CRT$XCU",read)
 #endif
-LIB_STARTUP_FUNC(init_json_mal)
-{ mal_module("copy", NULL, copy_init_funcs); }
+LIB_STARTUP_FUNC(init_copy_mal)
+{
+       mal_module("copy", NULL, copy_init_funcs);
+}
diff --git a/sql/backends/monet5/rel_bin.c b/sql/backends/monet5/rel_bin.c
--- a/sql/backends/monet5/rel_bin.c
+++ b/sql/backends/monet5/rel_bin.c
@@ -4439,6 +4439,39 @@ can_use_directappend(sql_rel *rel)
        return copy_from;
 }
 
+static int
+extract_parameter(backend *be, list *stmts, sql_exp *copyfrom, int argno)
+{
+       list *args = copyfrom->l;
+       node *n = args->h;
+       for (int i = 0; i < argno; i++)
+               n = n->next;
+       sql_exp *exp = n->data;
+       stmt *st = exp_bin(be, exp, NULL, NULL, NULL, NULL, NULL, NULL, 0, 0, 
0);
+       list_append(stmts, st);
+       return st->nr;
+}
+
+static int
+emit_receive(MalBlkPtr mb, int var_channel, int tpe)
+{
+       InstrPtr q = newAssignment(mb);
+       q = pushReturn(mb, q, var_channel);
+       q = pushArgument(mb, q, var_channel);
+       q = pushNil(mb, q, tpe);
+       return getDestVar(q);
+}
+
+static void
+emit_send(MalBlkPtr mb, int var_channel, int tpe, int var_msg)
+{
+       InstrPtr q = newAssignment(mb);
+       setReturnArgument(q, var_channel);
+       q = pushReturn(mb, q, var_msg);
+       q = pushArgument(mb, q, var_msg);
+       q = pushNil(mb, q, tpe);
+}
+
 static stmt *
 rel2bin_copyparpipe(backend *be, sql_rel *rel, list *refs, sql_exp *copyfrom)
 {
@@ -4447,67 +4480,220 @@ rel2bin_copyparpipe(backend *be, sql_rel
 
        InstrPtr q;
        MalBlkPtr mb = be->mb;
-       // mvc *mvc = be->mvc;
-       // sql_allocator *sa = mvc->sa;
-
-       // Extract arguments
-       list *copyfrom_args = copyfrom->l;
-       node *fname_node = copyfrom_args->h->next->next->next->next->next;
-       sql_exp *fname_exp = fname_node->data;
-       atom *fname_atom = fname_exp->l;
-       const char *fname = fname_atom->data.val.sval;
+       mvc *mvc = be->mvc;
+       sql_allocator *sa = mvc->sa;
+       list *intermediate_stmts = sa_list(sa);
 
        int streams_type = ATOMindex("streams");
-
-       q = newStmtArgs(mb, "streams", "openRead", 1);
-       setDestType(mb, q, streams_type);
-       q = pushStr(mb, q, fname);
-       int var_s = getDestVar(q);
-
-       q = newStmtArgs(mb, "bat", "new", 3);
-       setDestType(mb, q, newBatType(TYPE_bte));
+       int bte_bat_type = newBatType(TYPE_bte);
+       int int_bat_type = newBatType(TYPE_int);
+
+       // Extract table name
+       list *copyfrom_args = copyfrom->l;
+       node *n = copyfrom_args->h;
+       sql_exp *first_arg_exp = n->data;
+       if (first_arg_exp->type != e_atom)
+               return NULL;
+       atom *first_arg_atom = first_arg_exp->l;
+       sql_table *table = first_arg_atom->data.val.pval;
+       const char *table_name = table->base.name;
+       const char *schema_name = table->s->base.name;
+       int column_count = ol_length(table->columns);
+
+       // Extract other arguments
+       int var_col_sep = extract_parameter(be, intermediate_stmts, copyfrom, 
1);
+       int var_line_sep = extract_parameter(be, intermediate_stmts, copyfrom, 
2);
+       int var_quote_char = extract_parameter(be, intermediate_stmts, 
copyfrom, 3);
+       int var_null_representation = extract_parameter(be, intermediate_stmts, 
copyfrom, 4);
+       int var_fname = extract_parameter(be, intermediate_stmts, copyfrom, 5);
+       int var_num_rows = extract_parameter(be, intermediate_stmts, copyfrom, 
6);
+       int var_offset = extract_parameter(be, intermediate_stmts, copyfrom, 7);
+       int var_best_effort = extract_parameter(be, intermediate_stmts, 
copyfrom, 8);
+       int var_fixed_width = extract_parameter(be, intermediate_stmts, 
copyfrom, 9);
+       int var_on_client = extract_parameter(be, intermediate_stmts, copyfrom, 
10);
+       int var_escape = extract_parameter(be, intermediate_stmts, copyfrom, 
11);
+
+       // TODO: Deal with the following
+       (void)var_num_rows;
+       (void)var_offset;
+       (void)var_best_effort;
+       (void)var_on_client;
+       (void)var_fixed_width;
+
+       q = newAssignment(mb);
+       q = pushLng(mb, q, 0);
+       int var_total_row_count = getDestVar(q);
+
+       q = newStmt(mb, "streams", "openRead");
+       q = pushArgument(mb, q, var_fname);
+       int var_stream_channel = getDestVar(q);
+
+       q = newStmt(mb, "bat", "new");
+       q = pushNil(mb, q, TYPE_bte);
+       q = pushLng(mb, q, 300);
+       q = pushBit(mb, q, false);
+       int var_block_channel = getDestVar(q);
+
+       q = newAssignment(mb);
+       q = pushInt(mb, q, 0);
+       int var_skip_amounts_channel = getDestVar(q);
+
+       q = newAssignment(mb);
+       q = pushNil(mb, q, TYPE_bit);
+       int var_claim_channel = getDestVar(q);
+
+
+       // START LOOP
+       q = newAssignment(mb);
+       q->barrier = BARRIERsymbol;
+       q = pushBit(mb, q, true);
+       int var_loop_barrier = getDestVar(q);
+
+       int var_s = emit_receive(mb, var_stream_channel, streams_type);
+
+       q = newStmt(mb, "bat", "new");
        q = pushNil(mb, q, TYPE_bte);
        q = pushLng(mb, q, 300);
        q = pushBit(mb, q, false);
-       int var_block = getDestVar(q);
-
-       q = newStmtArgs(mb, "copy", "read", 3);
-       setDestType(mb, q, TYPE_lng);
+       int var_next_block = getDestVar(q);
+
+       // START READ BLOCK
+       q = newStmt(mb, "calc", "isnotnil");
+       q->barrier = BARRIERsymbol;
        q = pushArgument(mb, q, var_s);
-       q = pushLng(mb, q, 200);
-       q = pushArgument(mb, q, var_block);
+       int var_read_barrier = getDestVar(q);
+
+       q = newStmt(mb, "copy", "read");
+       q = pushArgument(mb, q, var_s);
+       q = pushInt(mb, q, 200);
+       q = pushArgument(mb, q, var_next_block);
        int var_nread = getDestVar(q);
 
-
-
-
-       add_to_rowcount_accumulator(be, var_nread);
-
+       q = newStmt(mb, "calc", ">");
+       q->barrier = LEAVEsymbol;
+       setReturnArgument(q, var_read_barrier);
+       q = pushArgument(mb, q, var_nread);
+       q = pushLng(mb, q, 0);
+
+       q = newStmt(mb, "streams", "close");
+       q = pushArgument(mb, q, var_s);
+
+       q = newAssignment(mb);
+       setReturnArgument(q, var_s);
+       q = pushNil(mb, q, streams_type);
+
+       // END READ BLOCK
+       q = newAssignment(mb);
+       q->barrier = EXITsymbol;
+       getDestVar(q) = var_read_barrier;
+
+       emit_send(mb, var_stream_channel, streams_type, var_s);
+
+       int var_our_block = emit_receive(mb, var_block_channel, bte_bat_type);
+       int var_our_skip_amount = emit_receive(mb, var_skip_amounts_channel, 
TYPE_int);
+
+       q = newStmt(mb, "aggr", "count");
+       q = pushArgument(mb, q, var_our_block);
+       int var_our_count = getDestVar(q);
+
+       q = newStmt(mb, "aggr", "count");
+       q = pushArgument(mb, q, var_next_block);
+       int var_next_count = getDestVar(q);
+
+       q = newStmt(mb, "calc", "+");
_______________________________________________
checkin-list mailing list
checkin-list@monetdb.org
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to