Changeset: 21b34e3c36ca for MonetDB URL: https://dev.monetdb.org/hg/MonetDB/rev/21b34e3c36ca Modified Files: sql/backends/monet5/copy.c sql/backends/monet5/rel_bin.c Branch: copyparpipe Log Message:
Generate a reasonable looking plan diffs (truncated from 419 to 300 lines): diff --git a/sql/backends/monet5/copy.c b/sql/backends/monet5/copy.c --- a/sql/backends/monet5/copy.c +++ b/sql/backends/monet5/copy.c @@ -11,7 +11,7 @@ #include "streams.h" #include "mal.h" #include "mal_errors.h" -// #include "mal_client.h" +#include "mal_client.h" // #include "mal_instruction.h" #include "mal_exception.h" // #include "mal_interpreter.h" @@ -57,12 +57,80 @@ end: } +static str +COPYfixlines(lng *ret_linecount, lng *ret_bytesmoved, bat *left_block, lng *left_skip_amount, bat *right_block, str *linesep_arg, str *quote_arg) +{ + str msg = MAL_SUCCEED; + + (void)ret_linecount; + (void)ret_bytesmoved; + (void)left_block; + (void)left_skip_amount; + (void)right_block; + (void)linesep_arg; + (void)quote_arg; + + bailout("copy.fixlines", "banana"); +end: + return msg; +} + + +static str +COPYsplitlines(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci) +{ + str msg = MAL_SUCCEED; + + (void)cntxt; + (void)mb; + (void)stk; + (void)pci; + + bailout("copy.splitlines", "banana"); +end: + return msg; +} + +static str +COPYparse_append(int *ret, int *mvc, str *s, str *t, str *c, bat *block, bat *fields) +{ + str msg = MAL_SUCCEED; + + (void)ret; + (void)mvc; + (void)s; + (void)t; + (void)c; + (void)block; + (void)fields; + + bailout("copy.parse_append", "banana"); +end: + return msg; +} + + #include "mel.h" static mel_func copy_init_funcs[] = { command("copy", "read", COPYread, true, "Clear the BAT and read 'block_size' bytes into it from 's'", args(1, 4, arg("",lng), - arg("stream", streams), arg("block_size", lng), batarg("block", bte) + arg("stream", streams), arg("block_size", int), batarg("block", bte) + )), + command("copy", "fixlines", COPYfixlines, true, "Copy bytes from 'right' to 'left' to complete the final line of 'left'. Return left line count and bytes copied", + args(2, 7, + arg("linecount", lng), arg("bytesmoved", int), + batarg("left",bte), arg("left_skip", int), batarg("right", bte), arg("linesep", str), arg("quote", str), + )), + pattern("copy", "splitlines", COPYsplitlines, false, "Find the fields of the individual columns", args(1,8, + batvararg("", int), + batarg("block", bte), arg("skip", int), arg("col_sep", str), arg("line_sep", str), arg("quote", str), arg("null_repr", str), arg("escape", bit) + )), + + command("copy", "parse_append", COPYparse_append, true, "parse the fields and append them to the column", + args(1, 7, + arg("", int), + arg("mvc", int), arg("s", str), arg("t", str), arg("c", str), batarg("block", bte), batarg("fields", int) )), { .imp=NULL } }; @@ -71,5 +139,7 @@ static mel_func copy_init_funcs[] = { #undef read #pragma section(".CRT$XCU",read) #endif -LIB_STARTUP_FUNC(init_json_mal) -{ mal_module("copy", NULL, copy_init_funcs); } +LIB_STARTUP_FUNC(init_copy_mal) +{ + mal_module("copy", NULL, copy_init_funcs); +} diff --git a/sql/backends/monet5/rel_bin.c b/sql/backends/monet5/rel_bin.c --- a/sql/backends/monet5/rel_bin.c +++ b/sql/backends/monet5/rel_bin.c @@ -4439,6 +4439,39 @@ can_use_directappend(sql_rel *rel) return copy_from; } +static int +extract_parameter(backend *be, list *stmts, sql_exp *copyfrom, int argno) +{ + list *args = copyfrom->l; + node *n = args->h; + for (int i = 0; i < argno; i++) + n = n->next; + sql_exp *exp = n->data; + stmt *st = exp_bin(be, exp, NULL, NULL, NULL, NULL, NULL, NULL, 0, 0, 0); + list_append(stmts, st); + return st->nr; +} + +static int +emit_receive(MalBlkPtr mb, int var_channel, int tpe) +{ + InstrPtr q = newAssignment(mb); + q = pushReturn(mb, q, var_channel); + q = pushArgument(mb, q, var_channel); + q = pushNil(mb, q, tpe); + return getDestVar(q); +} + +static void +emit_send(MalBlkPtr mb, int var_channel, int tpe, int var_msg) +{ + InstrPtr q = newAssignment(mb); + setReturnArgument(q, var_channel); + q = pushReturn(mb, q, var_msg); + q = pushArgument(mb, q, var_msg); + q = pushNil(mb, q, tpe); +} + static stmt * rel2bin_copyparpipe(backend *be, sql_rel *rel, list *refs, sql_exp *copyfrom) { @@ -4447,67 +4480,220 @@ rel2bin_copyparpipe(backend *be, sql_rel InstrPtr q; MalBlkPtr mb = be->mb; - // mvc *mvc = be->mvc; - // sql_allocator *sa = mvc->sa; - - // Extract arguments - list *copyfrom_args = copyfrom->l; - node *fname_node = copyfrom_args->h->next->next->next->next->next; - sql_exp *fname_exp = fname_node->data; - atom *fname_atom = fname_exp->l; - const char *fname = fname_atom->data.val.sval; + mvc *mvc = be->mvc; + sql_allocator *sa = mvc->sa; + list *intermediate_stmts = sa_list(sa); int streams_type = ATOMindex("streams"); - - q = newStmtArgs(mb, "streams", "openRead", 1); - setDestType(mb, q, streams_type); - q = pushStr(mb, q, fname); - int var_s = getDestVar(q); - - q = newStmtArgs(mb, "bat", "new", 3); - setDestType(mb, q, newBatType(TYPE_bte)); + int bte_bat_type = newBatType(TYPE_bte); + int int_bat_type = newBatType(TYPE_int); + + // Extract table name + list *copyfrom_args = copyfrom->l; + node *n = copyfrom_args->h; + sql_exp *first_arg_exp = n->data; + if (first_arg_exp->type != e_atom) + return NULL; + atom *first_arg_atom = first_arg_exp->l; + sql_table *table = first_arg_atom->data.val.pval; + const char *table_name = table->base.name; + const char *schema_name = table->s->base.name; + int column_count = ol_length(table->columns); + + // Extract other arguments + int var_col_sep = extract_parameter(be, intermediate_stmts, copyfrom, 1); + int var_line_sep = extract_parameter(be, intermediate_stmts, copyfrom, 2); + int var_quote_char = extract_parameter(be, intermediate_stmts, copyfrom, 3); + int var_null_representation = extract_parameter(be, intermediate_stmts, copyfrom, 4); + int var_fname = extract_parameter(be, intermediate_stmts, copyfrom, 5); + int var_num_rows = extract_parameter(be, intermediate_stmts, copyfrom, 6); + int var_offset = extract_parameter(be, intermediate_stmts, copyfrom, 7); + int var_best_effort = extract_parameter(be, intermediate_stmts, copyfrom, 8); + int var_fixed_width = extract_parameter(be, intermediate_stmts, copyfrom, 9); + int var_on_client = extract_parameter(be, intermediate_stmts, copyfrom, 10); + int var_escape = extract_parameter(be, intermediate_stmts, copyfrom, 11); + + // TODO: Deal with the following + (void)var_num_rows; + (void)var_offset; + (void)var_best_effort; + (void)var_on_client; + (void)var_fixed_width; + + q = newAssignment(mb); + q = pushLng(mb, q, 0); + int var_total_row_count = getDestVar(q); + + q = newStmt(mb, "streams", "openRead"); + q = pushArgument(mb, q, var_fname); + int var_stream_channel = getDestVar(q); + + q = newStmt(mb, "bat", "new"); + q = pushNil(mb, q, TYPE_bte); + q = pushLng(mb, q, 300); + q = pushBit(mb, q, false); + int var_block_channel = getDestVar(q); + + q = newAssignment(mb); + q = pushInt(mb, q, 0); + int var_skip_amounts_channel = getDestVar(q); + + q = newAssignment(mb); + q = pushNil(mb, q, TYPE_bit); + int var_claim_channel = getDestVar(q); + + + // START LOOP + q = newAssignment(mb); + q->barrier = BARRIERsymbol; + q = pushBit(mb, q, true); + int var_loop_barrier = getDestVar(q); + + int var_s = emit_receive(mb, var_stream_channel, streams_type); + + q = newStmt(mb, "bat", "new"); q = pushNil(mb, q, TYPE_bte); q = pushLng(mb, q, 300); q = pushBit(mb, q, false); - int var_block = getDestVar(q); - - q = newStmtArgs(mb, "copy", "read", 3); - setDestType(mb, q, TYPE_lng); + int var_next_block = getDestVar(q); + + // START READ BLOCK + q = newStmt(mb, "calc", "isnotnil"); + q->barrier = BARRIERsymbol; q = pushArgument(mb, q, var_s); - q = pushLng(mb, q, 200); - q = pushArgument(mb, q, var_block); + int var_read_barrier = getDestVar(q); + + q = newStmt(mb, "copy", "read"); + q = pushArgument(mb, q, var_s); + q = pushInt(mb, q, 200); + q = pushArgument(mb, q, var_next_block); int var_nread = getDestVar(q); - - - - add_to_rowcount_accumulator(be, var_nread); - + q = newStmt(mb, "calc", ">"); + q->barrier = LEAVEsymbol; + setReturnArgument(q, var_read_barrier); + q = pushArgument(mb, q, var_nread); + q = pushLng(mb, q, 0); + + q = newStmt(mb, "streams", "close"); + q = pushArgument(mb, q, var_s); + + q = newAssignment(mb); + setReturnArgument(q, var_s); + q = pushNil(mb, q, streams_type); + + // END READ BLOCK + q = newAssignment(mb); + q->barrier = EXITsymbol; + getDestVar(q) = var_read_barrier; + + emit_send(mb, var_stream_channel, streams_type, var_s); + + int var_our_block = emit_receive(mb, var_block_channel, bte_bat_type); + int var_our_skip_amount = emit_receive(mb, var_skip_amounts_channel, TYPE_int); + + q = newStmt(mb, "aggr", "count"); + q = pushArgument(mb, q, var_our_block); + int var_our_count = getDestVar(q); + + q = newStmt(mb, "aggr", "count"); + q = pushArgument(mb, q, var_next_block); + int var_next_count = getDestVar(q); + + q = newStmt(mb, "calc", "+"); _______________________________________________ checkin-list mailing list checkin-list@monetdb.org https://www.monetdb.org/mailman/listinfo/checkin-list