Changeset: d57b784efaa1 for MonetDB
Modified Files:
Branch: pyapi
Log Message:

Added support for all data types in parallel aggregates.

diffs (209 lines):

diff --git a/monetdb5/extras/pyapi/pyapi.c b/monetdb5/extras/pyapi/pyapi.c
--- a/monetdb5/extras/pyapi/pyapi.c
+++ b/monetdb5/extras/pyapi/pyapi.c
@@ -379,6 +379,26 @@ Array of type %s no copying will be need
+#define NP_SPLIT_BAT(tpe) {                                                    
+    tpe ***ptr = (tpe***)split_bats;                                           
+    lng *temp_indices;                                                         
+    tpe *batcontent = (tpe*)basevals;                                          
+    /* allocate space for split BAT */                                         
+    for(group_it = 0; group_it < group_count; group_it++) {                    
+        ptr[group_it][i] = GDKzalloc(group_counts[group_it] * sizeof(tpe));    
+    }                                                                          
+    /*iterate over the elements of the current BAT*/                           
+    temp_indices = GDKzalloc(sizeof(lng) * group_count);                       
+    for(element_it = 0; element_it < elements; element_it++) {                 
+        /*group of current element*/                                           
+        lng group = aggr_group_arr[element_it];                                
+        /*append current element to proper group*/                             
+        ptr[group][i][temp_indices[group]++] = batcontent[element_it];         
+    }                                                                          
+    GDKfree(temp_indices);                                                     
 PyAPIeval(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci, bit 
grouped, bit mapped);
@@ -1029,6 +1049,7 @@ str PyAPIeval(Client cntxt, MalBlkPtr mb
             PyDict_SetItemString(pColumnTypes, args[i], arg_type);
+        pyinput_values[i - (pci->retc + 2)].result = result_array;
         PyTuple_SetItem(pArgs, ai++, result_array);
     if (code_object == NULL) {
@@ -1122,30 +1143,68 @@ str PyAPIeval(Client cntxt, MalBlkPtr mb
             // now split the columns one by one
             for(i = 0; i < named_columns; i++) {
                 PyInput input = pyinput_values[i];
-                int ***ptr = (int***)split_bats;
-                if (input.scalar) {
-                    // scalar value, we don't handle this yet 
-                    for(group_it = 0; group_it < group_count; group_it++) {
-                        ptr[group_it][i] = GDKzalloc(sizeof(int));
-                        ptr[group_it][i][0] = 33;
+                void *basevals = input.bat->T->heap.base;
+                if (!input.scalar) {
+                    switch(input.bat_type) {
+                        case TYPE_bit:
+                            NP_SPLIT_BAT(bit);
+                            break;
+                        case TYPE_bte:
+                            NP_SPLIT_BAT(bte);
+                            break;
+                        case TYPE_sht:
+                            NP_SPLIT_BAT(sht);
+                            break;
+                        case TYPE_int:
+                            NP_SPLIT_BAT(int);
+                            break;
+                        case TYPE_oid:
+                            NP_SPLIT_BAT(oid);
+                            break;
+                        case TYPE_lng:
+                            NP_SPLIT_BAT(lng);
+                            break;
+                        case TYPE_wrd:
+                            NP_SPLIT_BAT(wrd);
+                            break;
+                        case TYPE_flt:
+                            NP_SPLIT_BAT(flt);
+                            break;
+                        case TYPE_dbl:
+                            NP_SPLIT_BAT(dbl);
+                            break;
+                    #ifdef HAVE_HGE
+                        case TYPE_hge:
+                            basevals = 
+                            NP_SPLIT_BAT(dbl);
+                            break;
+                    #endif
+                        case TYPE_str:
+                        {
+                            PyObject ****ptr = (PyObject****)split_bats;
+                            lng *temp_indices;
+                            PyObject **batcontent = 
+                            // allocate space for split BAT
+                            for(group_it = 0; group_it < group_count; 
group_it++) {
+                                ptr[group_it][i] = 
GDKzalloc(group_counts[group_it] * sizeof(PyObject*));
+                            }
+                            // iterate over the elements of the current BAT
+                            temp_indices = GDKzalloc(sizeof(PyObject*) * 
+                            for(element_it = 0; element_it < elements; 
element_it++) {
+                                //group of current element
+                                lng group = aggr_group_arr[element_it]; 
+                                //append current element to proper group
+                                ptr[group][i][temp_indices[group]++] = 
+                            }
+                            GDKfree(temp_indices);
+                            break;
+                        }
+                        default:
+                            msg = createException(MAL, "pyapi.eval", 
"Unrecognized BAT type %s", BatType_Format(input.bat_type));
+                            goto aggrwrapup;
+                            break;
-                } else {
-                    // split group, always integer for now
-                    int *temp_indices;
-                    int *batcontent = (int*)input.bat->T->heap.base;
-                    // allocate space for split BAT
-                    for(group_it = 0; group_it < group_count; group_it++) {
-                        ptr[group_it][i] = GDKzalloc(group_counts[group_it] * 
-                    }
-                    // iterate over the elements of the current BAT
-                    temp_indices = GDKzalloc(sizeof(int) * group_count);
-                    for(element_it = 0; element_it < elements; element_it++) {
-                        //group of current element
-                        lng group = aggr_group_arr[element_it]; 
-                        //append current element to proper group
-                        ptr[group][i][temp_indices[group]++] = 
-                    }
-                    GDKfree(temp_indices);
@@ -1337,7 +1396,7 @@ aggrwrapup:
 #ifdef HAVE_FORK
-    /*[SHARED_MEMORY]*/
+    /*[FORKED]*/
     // This is where the child process stops executing
     // We have successfully executed the Python function and converted the 
result object to a C array
     // Now all that is left is to copy the C array to shared memory so the 
main process can read it and return it
@@ -2931,14 +2990,62 @@ PyObject* ComputeParallelAggregation(Agg
             PyInput input = (*p->pyinput_values)[i];
             if (input.scalar) {
                 // scalar not handled yet
-                vararray = PyInt_FromLong(42);
+                vararray = input.result;
             } else {
-                // always integral BAT for now
-                vararray = PyArray_New(&PyArray_Type, 1, 
-                    (npy_intp[1]) { ( group_elements ) }, 
-                    TYPE_int, 
-                    NULL, ((int***)(*p->split_bats))[group_it][i], 0, 
+                switch(input.bat_type) {
+                    case TYPE_bte:
+                        vararray = PyArray_New(&PyArray_Type, 1, 
+                            (npy_intp[1]) { ( group_elements ) }, 
+                            NPY_INT8, 
+                            NULL, ((bte***)(*p->split_bats))[group_it][i], 0, 
+                            NPY_ARRAY_CARRAY || !NPY_ARRAY_WRITEABLE, NULL);
+                        break;
+                    case TYPE_sht:
+                        vararray = PyArray_New(&PyArray_Type, 1, 
+                            (npy_intp[1]) { ( group_elements ) }, 
+                            NPY_INT16, 
+                            NULL, ((sht***)(*p->split_bats))[group_it][i], 0, 
+                            NPY_ARRAY_CARRAY || !NPY_ARRAY_WRITEABLE, NULL);
+                        break;
+                    case TYPE_int:
+                        vararray = PyArray_New(&PyArray_Type, 1, 
+                            (npy_intp[1]) { ( group_elements ) }, 
+                            NPY_INT32, 
+                            NULL, ((int***)(*p->split_bats))[group_it][i], 0, 
+                            NPY_ARRAY_CARRAY || !NPY_ARRAY_WRITEABLE, NULL);
+                        break;
+                    case TYPE_lng:
+                        vararray = PyArray_New(&PyArray_Type, 1, 
+                            (npy_intp[1]) { ( group_elements ) }, 
+                            NPY_INT64, 
+                            NULL, ((lng***)(*p->split_bats))[group_it][i], 0, 
+                            NPY_ARRAY_CARRAY || !NPY_ARRAY_WRITEABLE, NULL);
+                        break;
+                    case TYPE_flt:
+                        vararray = PyArray_New(&PyArray_Type, 1, 
+                            (npy_intp[1]) { ( group_elements ) }, 
+                            NPY_FLOAT32, 
+                            NULL, ((flt***)(*p->split_bats))[group_it][i], 0, 
+                            NPY_ARRAY_CARRAY || !NPY_ARRAY_WRITEABLE, NULL);
+                        break;
+                #ifdef HAVE_HGE
+                    case TYPE_hge:
+                #endif
+                    case TYPE_dbl:
+                        vararray = PyArray_New(&PyArray_Type, 1, 
+                            (npy_intp[1]) { ( group_elements ) }, 
+                            NPY_FLOAT64, 
+                            NULL, ((dbl***)(*p->split_bats))[group_it][i], 0, 
+                            NPY_ARRAY_CARRAY || !NPY_ARRAY_WRITEABLE, NULL);
+                        break;
+                    case TYPE_str:
+                        vararray = PyArray_New(&PyArray_Type, 1, 
+                            (npy_intp[1]) { ( group_elements ) }, 
+                            NPY_OBJECT, 
+                            NULL, 
((PyObject****)(*p->split_bats))[group_it][i], 0, 
+                            NPY_ARRAY_CARRAY || !NPY_ARRAY_WRITEABLE, NULL);
+                        break;
+                }
                 if (vararray == NULL) {
                     p->msg = createException(MAL, "pyapi.eval", 
MAL_MALLOC_FAIL" to create NumPy array.");
checkin-list mailing list

Reply via email to