Changeset: d57b784efaa1 for MonetDB URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=d57b784efaa1 Modified Files: monetdb5/extras/pyapi/pyapi.c Branch: pyapi Log Message:
Added support for all data types in parallel aggregates. diffs (209 lines): diff --git a/monetdb5/extras/pyapi/pyapi.c b/monetdb5/extras/pyapi/pyapi.c --- a/monetdb5/extras/pyapi/pyapi.c +++ b/monetdb5/extras/pyapi/pyapi.c @@ -379,6 +379,26 @@ Array of type %s no copying will be need } \ } +#define NP_SPLIT_BAT(tpe) { \ + tpe ***ptr = (tpe***)split_bats; \ + lng *temp_indices; \ + tpe *batcontent = (tpe*)basevals; \ + /* allocate space for split BAT */ \ + for(group_it = 0; group_it < group_count; group_it++) { \ + ptr[group_it][i] = GDKzalloc(group_counts[group_it] * sizeof(tpe)); \ + } \ + /*iterate over the elements of the current BAT*/ \ + temp_indices = GDKzalloc(sizeof(lng) * group_count); \ + for(element_it = 0; element_it < elements; element_it++) { \ + /*group of current element*/ \ + lng group = aggr_group_arr[element_it]; \ + /*append current element to proper group*/ \ + ptr[group][i][temp_indices[group]++] = batcontent[element_it]; \ + } \ + GDKfree(temp_indices); \ +} + + str PyAPIeval(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci, bit grouped, bit mapped); @@ -1029,6 +1049,7 @@ str PyAPIeval(Client cntxt, MalBlkPtr mb PyDict_SetItemString(pColumnTypes, args[i], arg_type); Py_DECREF(arg_type); } + pyinput_values[i - (pci->retc + 2)].result = result_array; PyTuple_SetItem(pArgs, ai++, result_array); } if (code_object == NULL) { @@ -1122,30 +1143,68 @@ str PyAPIeval(Client cntxt, MalBlkPtr mb // now split the columns one by one for(i = 0; i < named_columns; i++) { PyInput input = pyinput_values[i]; - int ***ptr = (int***)split_bats; - if (input.scalar) { - // scalar value, we don't handle this yet - for(group_it = 0; group_it < group_count; group_it++) { - ptr[group_it][i] = GDKzalloc(sizeof(int)); - ptr[group_it][i][0] = 33; + void *basevals = input.bat->T->heap.base; + + if (!input.scalar) { + switch(input.bat_type) { + case TYPE_bit: + NP_SPLIT_BAT(bit); + break; + case TYPE_bte: + NP_SPLIT_BAT(bte); + break; + case TYPE_sht: + NP_SPLIT_BAT(sht); + break; + case TYPE_int: + NP_SPLIT_BAT(int); + break; + case TYPE_oid: + NP_SPLIT_BAT(oid); + break; + case TYPE_lng: + NP_SPLIT_BAT(lng); + break; + case TYPE_wrd: + NP_SPLIT_BAT(wrd); + break; + case TYPE_flt: + NP_SPLIT_BAT(flt); + break; + case TYPE_dbl: + NP_SPLIT_BAT(dbl); + break; + #ifdef HAVE_HGE + case TYPE_hge: + basevals = PyArray_BYTES((PyArrayObject*)input.result); + NP_SPLIT_BAT(dbl); + break; + #endif + case TYPE_str: + { + PyObject ****ptr = (PyObject****)split_bats; + lng *temp_indices; + PyObject **batcontent = (PyObject**)PyArray_DATA((PyArrayObject*)input.result); + // allocate space for split BAT + for(group_it = 0; group_it < group_count; group_it++) { + ptr[group_it][i] = GDKzalloc(group_counts[group_it] * sizeof(PyObject*)); + } + // iterate over the elements of the current BAT + temp_indices = GDKzalloc(sizeof(PyObject*) * group_count); + for(element_it = 0; element_it < elements; element_it++) { + //group of current element + lng group = aggr_group_arr[element_it]; + //append current element to proper group + ptr[group][i][temp_indices[group]++] = batcontent[element_it]; + } + GDKfree(temp_indices); + break; + } + default: + msg = createException(MAL, "pyapi.eval", "Unrecognized BAT type %s", BatType_Format(input.bat_type)); + goto aggrwrapup; + break; } - } else { - // split group, always integer for now - int *temp_indices; - int *batcontent = (int*)input.bat->T->heap.base; - // allocate space for split BAT - for(group_it = 0; group_it < group_count; group_it++) { - ptr[group_it][i] = GDKzalloc(group_counts[group_it] * sizeof(int)); - } - // iterate over the elements of the current BAT - temp_indices = GDKzalloc(sizeof(int) * group_count); - for(element_it = 0; element_it < elements; element_it++) { - //group of current element - lng group = aggr_group_arr[element_it]; - //append current element to proper group - ptr[group][i][temp_indices[group]++] = batcontent[element_it]; - } - GDKfree(temp_indices); } } @@ -1337,7 +1396,7 @@ aggrwrapup: #ifdef HAVE_FORK - /*[SHARED_MEMORY]*/ + /*[FORKED]*/ // This is where the child process stops executing // We have successfully executed the Python function and converted the result object to a C array // Now all that is left is to copy the C array to shared memory so the main process can read it and return it @@ -2931,14 +2990,62 @@ PyObject* ComputeParallelAggregation(Agg PyInput input = (*p->pyinput_values)[i]; if (input.scalar) { // scalar not handled yet - vararray = PyInt_FromLong(42); + vararray = input.result; } else { - // always integral BAT for now - vararray = PyArray_New(&PyArray_Type, 1, - (npy_intp[1]) { ( group_elements ) }, - TYPE_int, - NULL, ((int***)(*p->split_bats))[group_it][i], 0, - NPY_ARRAY_CARRAY || !NPY_ARRAY_WRITEABLE, NULL); + switch(input.bat_type) { + case TYPE_bte: + vararray = PyArray_New(&PyArray_Type, 1, + (npy_intp[1]) { ( group_elements ) }, + NPY_INT8, + NULL, ((bte***)(*p->split_bats))[group_it][i], 0, + NPY_ARRAY_CARRAY || !NPY_ARRAY_WRITEABLE, NULL); + break; + case TYPE_sht: + vararray = PyArray_New(&PyArray_Type, 1, + (npy_intp[1]) { ( group_elements ) }, + NPY_INT16, + NULL, ((sht***)(*p->split_bats))[group_it][i], 0, + NPY_ARRAY_CARRAY || !NPY_ARRAY_WRITEABLE, NULL); + break; + case TYPE_int: + vararray = PyArray_New(&PyArray_Type, 1, + (npy_intp[1]) { ( group_elements ) }, + NPY_INT32, + NULL, ((int***)(*p->split_bats))[group_it][i], 0, + NPY_ARRAY_CARRAY || !NPY_ARRAY_WRITEABLE, NULL); + break; + case TYPE_lng: + vararray = PyArray_New(&PyArray_Type, 1, + (npy_intp[1]) { ( group_elements ) }, + NPY_INT64, + NULL, ((lng***)(*p->split_bats))[group_it][i], 0, + NPY_ARRAY_CARRAY || !NPY_ARRAY_WRITEABLE, NULL); + break; + case TYPE_flt: + vararray = PyArray_New(&PyArray_Type, 1, + (npy_intp[1]) { ( group_elements ) }, + NPY_FLOAT32, + NULL, ((flt***)(*p->split_bats))[group_it][i], 0, + NPY_ARRAY_CARRAY || !NPY_ARRAY_WRITEABLE, NULL); + break; + #ifdef HAVE_HGE + case TYPE_hge: + #endif + case TYPE_dbl: + vararray = PyArray_New(&PyArray_Type, 1, + (npy_intp[1]) { ( group_elements ) }, + NPY_FLOAT64, + NULL, ((dbl***)(*p->split_bats))[group_it][i], 0, + NPY_ARRAY_CARRAY || !NPY_ARRAY_WRITEABLE, NULL); + break; + case TYPE_str: + vararray = PyArray_New(&PyArray_Type, 1, + (npy_intp[1]) { ( group_elements ) }, + NPY_OBJECT, + NULL, ((PyObject****)(*p->split_bats))[group_it][i], 0, + NPY_ARRAY_CARRAY || !NPY_ARRAY_WRITEABLE, NULL); + break; + } if (vararray == NULL) { p->msg = createException(MAL, "pyapi.eval", MAL_MALLOC_FAIL" to create NumPy array."); _______________________________________________ checkin-list mailing list checkin-list@monetdb.org https://www.monetdb.org/mailman/listinfo/checkin-list