Changeset: e314ebb3c836 for MonetDB URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=e314ebb3c836 Modified Files: monetdb5/extras/pyapi/connection.c monetdb5/extras/pyapi/connection.h monetdb5/extras/pyapi/emit.c monetdb5/extras/pyapi/emit.h monetdb5/extras/pyapi/pyloader.c monetdb5/optimizer/opt_support.c sql/backends/monet5/Tests/All sql/backends/monet5/Tests/pyloader03.sql sql/backends/monet5/Tests/pyloader03.stable.err sql/backends/monet5/sql_gencode.c sql/include/sql_catalog.h sql/server/rel_psm.c sql/server/rel_semantic.c sql/server/sql_parser.h sql/server/sql_parser.y Branch: pythonloader Log Message:
First implementation of CREATE TABLE table FROM LOADER pyloader(); diffs (truncated from 732 to 300 lines): diff --git a/monetdb5/extras/pyapi/connection.c b/monetdb5/extras/pyapi/connection.c --- a/monetdb5/extras/pyapi/connection.c +++ b/monetdb5/extras/pyapi/connection.c @@ -11,6 +11,8 @@ CREATE_SQL_FUNCTION_PTR(void,SQLdestroyResult,(res_table*)); CREATE_SQL_FUNCTION_PTR(str,SQLstatementIntern,(Client, str *, str, int, bit, res_table **)); +CREATE_SQL_FUNCTION_PTR(str,mvc_append_wrap,(Client, MalBlkPtr, MalStkPtr, InstrPtr)); +CREATE_SQL_FUNCTION_PTR(int,sqlcleanup,(mvc*,int)); static PyObject * _connection_execute(Py_ConnectionObject *self, PyObject *args) @@ -70,7 +72,7 @@ static PyObject * Py_RETURN_NONE; } } - else + else #ifdef HAVE_FORK { char *query; @@ -98,13 +100,13 @@ static PyObject * if (self->query_ptr->memsize > 0) // check if there are return values { - char *msg; + char *msg; char *ptr; PyObject *numpy_array; - size_t position = 0; + size_t position = 0; PyObject *result; int i; - + // get a pointer to the shared memory holding the return values msg = init_mmap_memory(self->query_ptr->mmapid, 0, self->query_ptr->memsize, NULL, NULL, &ptr); if (msg != MAL_SUCCEED) { @@ -121,26 +123,26 @@ static PyObject * //load the data for this column from shared memory //[COLNAME] - colname = ptr + position; + colname = ptr + position; position += strlen(colname) + 1; //[BAT] - b = (BAT*) (ptr + position); + b = (BAT*) (ptr + position); position += sizeof(BAT); //[COLrec] - b->T = (COLrec*) (ptr + position); + b->T = (COLrec*) (ptr + position); position += sizeof(COLrec); //[BATrec] - b->S = (BATrec*) (ptr + position); + b->S = (BATrec*) (ptr + position); position += sizeof(BATrec); //[DATA] - b->T->heap.base = (void*)(ptr + position); + b->T->heap.base = (void*)(ptr + position); position += b->T->width * BATcount(b); if (b->T->vheap != NULL) { //[VHEAP] - b->T->vheap = (Heap*) (ptr + position); + b->T->vheap = (Heap*) (ptr + position); position += sizeof(Heap); //[VHEAPDATA] - b->T->vheap->base = (void*) (ptr + position); + b->T->vheap->base = (void*) (ptr + position); position += b->T->vheap->size; } //initialize the PyInput structure @@ -224,7 +226,7 @@ PyTypeObject Py_ConnectionType = { 0, 0, 0, - 0, + 0, 0, 0 #ifdef IS_PY3K @@ -232,18 +234,118 @@ PyTypeObject Py_ConnectionType = { #endif }; -void _connection_cleanup_result(void* output) -{ +void _connection_cleanup_result(void* output) { (*SQLdestroyResult_ptr)((res_table*) output); } -char* _connection_query(Client cntxt, char* query, res_table** result) { +str _connection_query(Client cntxt, char* query, res_table** result) { str res = MAL_SUCCEED; - Client c = cntxt; - res = (*SQLstatementIntern_ptr)(c, &query, "name", 1, 0, result); + res = (*SQLstatementIntern_ptr)(cntxt, &query, "name", 1, 0, result); return res; } +static str _connection_append_table(Client cntxt, char *sname, char *tname, EmitCol *columns, size_t ncols) { + size_t i; + size_t nvar = 6; // variables we need to make up + MalBlkRecord mb; + MalStack* stk = NULL; + InstrRecord* pci = NULL; + str res = MAL_SUCCEED; + VarRecord bat_varrec; + mvc* m = ((backend *) cntxt->sqlcontext)->mvc; + + assert(tname != NULL && columns != NULL && ncols > 0); + + // very black MAL magic below + mb.var = GDKmalloc(nvar * sizeof(VarRecord*)); + stk = GDKmalloc(sizeof(MalStack) + nvar * sizeof(ValRecord)); + pci = GDKmalloc(sizeof(InstrRecord) + nvar * sizeof(int)); + assert(mb.var != NULL && stk != NULL && pci != NULL); // cough, cough + bat_varrec.type = TYPE_bat; + for (i = 0; i < nvar; i++) { + pci->argv[i] = i; + } + stk->stk[0].vtype = TYPE_int; + stk->stk[2].val.sval = (str) sname; + stk->stk[2].vtype = TYPE_str; + stk->stk[3].val.sval = (str) tname; + stk->stk[3].vtype = TYPE_str; + stk->stk[4].vtype = TYPE_str; + stk->stk[5].vtype = TYPE_bat; + mb.var[5] = &bat_varrec; + for (i = 0; i < ncols; i++) { + EmitCol col = columns[i]; + stk->stk[4].val.sval = col.name; + stk->stk[5].val.bval = col.b->batCacheid; + + res = (*mvc_append_wrap_ptr)(cntxt, &mb, stk, pci); + if (res != NULL) { + break; + } + } + if (res == MAL_SUCCEED) { + (*sqlcleanup_ptr)(m, 0); + } + GDKfree(mb.var); + GDKfree(stk); + GDKfree(pci); + return res; +} + +static char *BatType_ToSQLType(int type) { + switch (type) { + case TYPE_bit: + case TYPE_bte: return "TINYINT"; + case TYPE_sht: return "SMALLINT"; + case TYPE_int: return "INTEGER"; + case TYPE_lng: return "BIGINT"; + case TYPE_flt: return "FLOAT"; + case TYPE_dbl: return "DOUBLE"; + case TYPE_str: return "STRING"; + case TYPE_hge: return "HUGEINT"; + case TYPE_oid: return "UNKNOWN"; + default: return "UNKNOWN"; + } +} + +str _connection_create_table(Client cntxt, char *sname, char *tname, EmitCol *columns, size_t ncols) { + char *query = NULL; + size_t i; + size_t query_size = 255, query_index = 0; + res_table *res; + str msg = MAL_SUCCEED; + + if (!sname) sname = "sys"; + + // first compute the size of the query + query_size += strlen(sname); + query_size += strlen(tname); + for(i = 0; i < ncols; i++) { + query_size += strlen(columns[i].name) + 20; + } + + query = GDKzalloc(sizeof(char) * query_size); + if (query == NULL) { + return GDKstrdup(MAL_MALLOC_FAIL"query"); + } + + //format the CREATE TABLE query + query_index = snprintf(query, query_size, "create table %s.%s(", sname, tname); + for(i = 0; i < ncols; i++) { + BAT *b = columns[i].b; + query_index += snprintf(query + query_index, query_size, "%s %s%s", columns[i].name, BatType_ToSQLType(b->T->type), i < ncols - 1 ? "," : ");"); + } + + // execute the create table query + msg = _connection_query(cntxt, query, &res); + GDKfree(query); + if (msg != MAL_SUCCEED) { + return msg; + } + // now append the values to the created table + return _connection_append_table(cntxt, sname, tname, columns, ncols); +} + PyObject *Py_Connection_Create(Client cntxt, bit mapped, QueryStruct *query_ptr, int query_sem) { @@ -273,6 +375,8 @@ str _connection_init(void) LOAD_SQL_FUNCTION_PTR(SQLdestroyResult, "lib_sql.dll"); LOAD_SQL_FUNCTION_PTR(SQLstatementIntern, "lib_sql.dll"); + LOAD_SQL_FUNCTION_PTR(mvc_append_wrap, "lib_sql.dll"); + LOAD_SQL_FUNCTION_PTR(sqlcleanup, "lib_sql.dll"); if (msg != MAL_SUCCEED) { return msg; diff --git a/monetdb5/extras/pyapi/connection.h b/monetdb5/extras/pyapi/connection.h --- a/monetdb5/extras/pyapi/connection.h +++ b/monetdb5/extras/pyapi/connection.h @@ -8,13 +8,14 @@ /* * M. Raasveldt - * + * */ #ifndef _LOOPBACK_QUERY_ #define _LOOPBACK_QUERY_ #include "pytypes.h" +#include "emit.h" typedef struct { PyObject_HEAD @@ -32,7 +33,8 @@ extern PyTypeObject Py_ConnectionType; PyObject *Py_Connection_Create(Client cntxt, bit mapped, QueryStruct *query_ptr, int query_sem); str _connection_init(void); -char* _connection_query(Client cntxt, char* query, res_table** result); +str _connection_query(Client cntxt, char* query, res_table** result); +str _connection_create_table(Client cntxt, char *sname, char *tname, EmitCol *columns, size_t ncols); void _connection_cleanup_result(void* output); #endif /* _LOOPBACK_QUERY_ */ diff --git a/monetdb5/extras/pyapi/emit.c b/monetdb5/extras/pyapi/emit.c --- a/monetdb5/extras/pyapi/emit.c +++ b/monetdb5/extras/pyapi/emit.c @@ -46,41 +46,96 @@ static PyObject * PyErr_SetString(PyExc_TypeError, "dict must contain at least one element"); return NULL; } - for (i = 0; i < self->ncols; i++) { - PyObject *dictEntry = PyDict_GetItemString(args, self->cols[i].name); - if (dictEntry) { + { + PyObject *items = PyDict_Items(args); + for (i = 0; i < dict_elements; i++) { + PyObject *tuple = PyList_GetItem(items, i); + PyObject *key = PyTuple_GetItem(tuple, 0); + PyObject *dictEntry = PyTuple_GetItem(tuple, 1); ssize_t this_size = 1; - matched_elements++; this_size = PyType_Size(dictEntry); if (this_size < 0) { PyErr_Format(PyExc_TypeError, "Unsupported Python Object %s", PyString_AsString(PyObject_Str(PyObject_Type(dictEntry)))); + Py_DECREF(items); return NULL; } if (el_count < 0) { el_count = this_size; } else if (el_count != this_size) { - PyErr_Format(PyExc_TypeError, "Element %s has size %zu, but expected an element with size %zu", self->cols[i].name, this_size, el_count); + PyErr_Format(PyExc_TypeError, "Element %s has size %zu, but expected an element with size %zu", PyString_AsString(PyObject_Str(key)), this_size, el_count); + Py_DECREF(items); return NULL; } } + Py_DECREF(items); } if (el_count == 0) { PyErr_SetString(PyExc_TypeError, "Empty input values supplied"); return NULL; } - if (matched_elements != dict_elements) { - // not all elements in the dictionary were matched, look for the element that was not matched - PyObject *keys = PyDict_Keys(args); + + if (!self->create_table) { + for (i = 0; i < self->ncols; i++) { + PyObject *dictEntry = PyDict_GetItemString(args, self->cols[i].name); + if (dictEntry) { + matched_elements++; + } + } + if (matched_elements != dict_elements) { + // not all elements in the dictionary were matched, look for the element that was not matched + PyObject *keys = PyDict_Keys(args); + for(i = 0; i < (size_t) PyList_Size(keys); i++) { + PyObject *key = PyList_GetItem(keys, i); + char *val; + bool found = false; _______________________________________________ checkin-list mailing list checkin-list@monetdb.org https://www.monetdb.org/mailman/listinfo/checkin-list