Changeset: ddca859a0f93 for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB/rev/ddca859a0f93
Modified Files:
        sql/storage/sql_storage.h
        sql/storage/store.c
Branch: scatter
Log Message:

Merged with Jul2021


diffs (238 lines):

diff --git a/sql/backends/monet5/UDF/pyapi3/pyapi3.c 
b/sql/backends/monet5/UDF/pyapi3/pyapi3.c
--- a/sql/backends/monet5/UDF/pyapi3/pyapi3.c
+++ b/sql/backends/monet5/UDF/pyapi3/pyapi3.c
@@ -122,11 +122,19 @@ PYAPI3PyAPIevalAggrMap(Client cntxt, Mal
                }                                                               
       \
                /*iterate over the elements of the current BAT*/                
       \
                temp_indices = GDKzalloc(sizeof(lng) * group_count);            
       \
-               for (element_it = 0; element_it < elements; element_it++) {     
       \
-                       /*group of current element*/                            
           \
-                       oid group = aggr_group_arr[element_it];                 
           \
-                       /*append current element to proper group*/              
           \
-                       ptr[group][i][temp_indices[group]++] = 
batcontent[element_it];     \
+               if (BATtvoid(aggr_group)) {                                     
       \
+                       for (element_it = 0; element_it < elements; 
element_it++) {        \
+                               /*append current element to proper group*/      
               \
+                               ptr[element_it][i][temp_indices[element_it]++] 
=               \
+                                       batcontent[element_it];                 
                   \
+                       }                                                       
           \
+               } else {                                                        
       \
+                       for (element_it = 0; element_it < elements; 
element_it++) {        \
+                               /*group of current element*/                    
               \
+                               oid group = aggr_group_arr[element_it];         
               \
+                               /*append current element to proper group*/      
               \
+                               ptr[group][i][temp_indices[group]++] = 
batcontent[element_it]; \
+                       }                                                       
           \
                }                                                               
       \
                GDKfree(temp_indices);                                          
       \
        }
@@ -736,9 +744,15 @@ static str PyAPIeval(Client cntxt, MalBl
                                goto aggrwrapup;
                        }
 
-                       aggr_group_arr = (oid *)aggr_group->theap->base + 
aggr_group->tbaseoff;
-                       for (element_it = 0; element_it < elements; 
element_it++) {
-                               group_counts[aggr_group_arr[element_it]]++;
+                       if (BATtvoid(aggr_group)) {
+                               for (element_it = 0; element_it < elements; 
element_it++) {
+                                       group_counts[element_it]++;
+                               }
+                       } else {
+                               aggr_group_arr = (oid *)aggr_group->theap->base 
+ aggr_group->tbaseoff;
+                               for (element_it = 0; element_it < elements; 
element_it++) {
+                                       
group_counts[aggr_group_arr[element_it]]++;
+                               }
                        }
 
                        // now perform the actual splitting of the data, first 
construct
@@ -807,13 +821,22 @@ static str PyAPIeval(Client cntxt, MalBl
                                                        // iterate over the 
elements of the current BAT
                                                        temp_indices =
                                                                
GDKzalloc(sizeof(PyObject *) * group_count);
-                                                       for (element_it = 0; 
element_it < elements;
-                                                                element_it++) {
-                                                               // group of 
current element
-                                                               oid group = 
aggr_group_arr[element_it];
-                                                               // append 
current element to proper group
-                                                               
ptr[group][i][temp_indices[group]++] =
-                                                                       
batcontent[element_it];
+                                                       if 
(BATtvoid(aggr_group)) {
+                                                               for (element_it 
= 0; element_it < elements;
+                                                                        
element_it++) {
+                                                                       // 
append current element to proper group
+                                                                       
ptr[element_it][i][temp_indices[element_it]++] = 
+                                                                               
batcontent[element_it];
+                                                               }
+                                                       } else {
+                                                               for (element_it 
= 0; element_it < elements;
+                                                                        
element_it++) {
+                                                                       // 
group of current element
+                                                                       oid 
group = aggr_group_arr[element_it];
+                                                                       // 
append current element to proper group
+                                                                       
ptr[group][i][temp_indices[group]++] =
+                                                                               
batcontent[element_it];
+                                                               }
                                                        }
                                                        GDKfree(temp_indices);
                                                        break;
diff --git a/sql/storage/sql_storage.h b/sql/storage/sql_storage.h
--- a/sql/storage/sql_storage.h
+++ b/sql/storage/sql_storage.h
@@ -446,8 +446,9 @@ typedef struct sqlstore {
        sql_catalog *cat;               /* the catalog of persistent tables 
(what to do with tmp tables ?) */
        sql_schema *tmp;                /* keep pointer to default (empty) tmp 
schema */
        MT_Lock lock;                   /* lock protecting concurrent writes 
(not reads, ie use rcu) */
+       MT_Lock commit;                 /* protect transactions, only single 
commit (one wal writer) */
        MT_Lock flush;                  /* flush lock protecting concurrent 
writes (not reads, ie use rcu) */
-       MT_Lock table_locks[NR_TABLE_LOCKS];            /* protecting 
concurrent writes too table (storage) */
+       MT_Lock table_locks[NR_TABLE_LOCKS];            /* protecting 
concurrent writes too table/columns (storage) */
        list *active;                   /* list of running transactions */
 
        ATOMIC_TYPE nr_active;  /* count number of transactions */
diff --git a/sql/storage/store.c b/sql/storage/store.c
--- a/sql/storage/store.c
+++ b/sql/storage/store.c
@@ -1888,6 +1888,7 @@ store_init(sql_allocator *pa, int debug,
 
        (void)store_timestamp(store); /* increment once */
        MT_lock_init(&store->lock, "sqlstore_lock");
+       MT_lock_init(&store->commit, "sqlstore_commit");
        MT_lock_init(&store->flush, "sqlstore_flush");
        for(int i = 0; i<NR_TABLE_LOCKS; i++)
                MT_lock_init(&store->table_locks[i], "sqlstore_table");
@@ -1975,6 +1976,7 @@ store_exit(sqlstore *store)
                        MT_lock_set(&store->lock);
                        MT_lock_set(&store->flush);
                }
+               MT_lock_set(&store->commit);
                if (store->changes) {
                        ulng oldest = store_timestamp(store)+1;
                        for(node *n=store->changes->h; n; n = n->next) {
@@ -1991,6 +1993,7 @@ store_exit(sqlstore *store)
                        }
                        list_destroy(store->changes);
                }
+               MT_lock_unset(&store->commit);
                os_destroy(store->cat->objects, store);
                os_destroy(store->cat->schemas, store);
                _DELETE(store->cat);
@@ -3245,13 +3248,13 @@ sql_trans_rollback(sql_trans *tr)
                }
        }
        if (tr->changes) {
-               store_lock(store);
                /* revert the change list */
                list *nl = SA_LIST(tr->sa, (fdestroy) NULL);
                for(node *n=tr->changes->h; n; n = n->next)
                        list_prepend(nl, n->data);
 
                /* rollback */
+               store_lock(store);
                ulng oldest = store_oldest(store);
                ulng commit_ts = store_get_timestamp(store); /* use most recent 
timestamp such that we can cleanup savely */
                for(node *n=nl->h; n; n = n->next) {
@@ -3415,6 +3418,7 @@ sql_trans_commit(sql_trans *tr)
        int ok = LOG_OK;
        sqlstore *store = tr->store;
 
+       MT_lock_set(&store->commit);
        store_lock(store);
        ulng oldest = store_oldest(store);
        store_pending_changes(store, oldest);
@@ -3461,7 +3465,7 @@ sql_trans_commit(sql_trans *tr)
                                c->obj->flags = 0;
                        c->ts = commit_ts;
                }
-               /* flush logger after changes got applied */
+               /* when directly flushing: flush logger after changes got 
applied */
                if (ok == LOG_OK && flush)
                        ok = store->logger_api.log_tend(store, commit_ts);
                /* garbage collect */
@@ -3483,6 +3487,7 @@ sql_trans_commit(sql_trans *tr)
                tr->ts = commit_ts;
                store_unlock(store);
        }
+       MT_lock_unset(&store->commit);
        /* drop local temp tables with commit action CA_DROP, after cleanup */
        if (cs_size(&tr->localtmps)) {
                for(node *n=tr->localtmps.set->h; n; ) {
diff --git a/sql/test/BugTracker-2021/Tests/All 
b/sql/test/BugTracker-2021/Tests/All
--- a/sql/test/BugTracker-2021/Tests/All
+++ b/sql/test/BugTracker-2021/Tests/All
@@ -14,3 +14,4 @@ batappend-undefined.Bug-7130
 WITH-alias-DELETE-1.deletes-wrong-tuples.Bug-7133
 WITH-alias-DELETE-2.deletes-too-many-tuples.Bug-7133
 merge-delete.Bug-7136
+HAVE_LIBPY3?python-aggregates-void-bat.Bug-7138
diff --git a/sql/test/BugTracker-2021/Tests/SingleServer 
b/sql/test/BugTracker-2021/Tests/SingleServer
new file mode 100644
--- /dev/null
+++ b/sql/test/BugTracker-2021/Tests/SingleServer
@@ -0,0 +1,1 @@
+--set embedded_py=3
diff --git 
a/sql/test/BugTracker-2021/Tests/python-aggregates-void-bat.Bug-7138.test 
b/sql/test/BugTracker-2021/Tests/python-aggregates-void-bat.Bug-7138.test
new file mode 100644
--- /dev/null
+++ b/sql/test/BugTracker-2021/Tests/python-aggregates-void-bat.Bug-7138.test
@@ -0,0 +1,61 @@
+statement ok
+START TRANSACTION
+
+statement ok
+create table integers (i int)
+
+statement ok rowcount 10
+insert into integers (select value from generate_series(1,11,1))
+
+statement ok
+CREATE FUNCTION python_sum(v int) returns int
+language P
+{
+    return numpy.sum(v)
+}
+
+statement ok
+CREATE FUNCTION python_sum2(v int) returns int
+language PYTHON_MAP
+{
+    return numpy.sum(v)
+}
+
+statement ok
+CREATE AGGREGATE python_sum3(v int) returns int
+language P
+{
+    return numpy.sum(v)
+}
+
+statement ok
+CREATE AGGREGATE python_sum4(v int) returns int
+language PYTHON_MAP
+{
+    return numpy.sum(v)
+}
+
+query I rowsort
+select python_sum(i) from integers where i = 1 group by i
+----
+1
+
+query I rowsort
+select python_sum2(i) from integers where i = 1 group by i
+----
+1
+
+query I rowsort
+select python_sum3(i) from integers where i = 1 group by i
+----
+1
+
+query I rowsort
+select python_sum4(i) from integers where i = 1 group by i
+----
+1
+
+statement ok
+ROLLBACK
+
+
_______________________________________________
checkin-list mailing list
checkin-list@monetdb.org
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to