Changeset: 82507b8f073c for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB/rev/82507b8f073c
Added Files:
        sql/test/concurrent/Tests/read-segment-after-free.SQL.py
Modified Files:
        sql/storage/bat/bat_storage.c
        sql/test/concurrent/Tests/All
Branch: Jan2022
Log Message:

Fix concurrent truncate.


diffs (170 lines):

diff --git a/sql/storage/bat/bat_storage.c b/sql/storage/bat/bat_storage.c
--- a/sql/storage/bat/bat_storage.c
+++ b/sql/storage/bat/bat_storage.c
@@ -496,13 +496,6 @@ new_segments(sql_trans *tr, size_t cnt)
        return n;
 }
 
-static segments*
-dup_segments(segments *s)
-{
-       sql_ref_inc(&s->r);
-       return s;
-}
-
 static int
 temp_dup_cs(column_storage *cs, ulng tid, int type)
 {
@@ -2354,15 +2347,11 @@ delta_append_val(sql_trans *tr, sql_delt
 }
 
 static int
-dup_storage( sql_trans *tr, storage *obat, storage *bat, int temp)
+dup_storage( sql_trans *tr, storage *obat, storage *bat)
 {
-       if (temp) {
-               if (!(bat->segs = new_segments(tr, 0)))
-                       return LOG_ERR;
-       } else {
-               bat->segs = dup_segments(obat->segs);
-       }
-       return dup_cs(tr, &obat->cs, &bat->cs, TYPE_msk, temp);
+       if (!(bat->segs = new_segments(tr, 0)))
+               return LOG_ERR;
+       return dup_cs(tr, &obat->cs, &bat->cs, TYPE_msk, 1);
 }
 
 static int
@@ -2671,40 +2660,53 @@ segments_conflict(sql_trans *tr, segment
        return 0;
 }
 
+static int clear_storage(sql_trans *tr, sql_table *t, storage *s);
+
 static storage *
 bind_del_data(sql_trans *tr, sql_table *t, bool *clear)
 {
        storage *obat = ATOMIC_PTR_GET(&t->data);
 
-       if (isTempTable(t) && !(obat = temp_tab_timestamp_storage(tr, t)))
-               return NULL;
-
-       if (obat->cs.ts == tr->tid)
+       if (isTempTable(t)) {
+               if (!(obat = temp_tab_timestamp_storage(tr, t)))
+                       return NULL;
+
+               assert(obat->cs.ts == tr->tid);
+
+               if (clear && clear_storage(tr, t, obat)  != LOG_OK)
+                       return NULL;
+
                return obat;
-       if ((!tr->parent || !tr_version_of_parent(tr, obat->cs.ts)) && 
obat->cs.ts >= TRANSACTION_ID_BASE && !isTempTable(t)) {
-               /* abort */
-               if (clear)
-                       *clear = true;
-               return NULL;
        }
-       if (!isTempTable(t) && !clear)
+
+       if (obat->cs.ts != tr->tid)
+               if (!tr->parent || !tr_version_of_parent(tr, obat->cs.ts))
+                       if (obat->cs.ts >= TRANSACTION_ID_BASE && 
!isTempTable(t)) {
+                               /* abort */
+                               if (clear)
+                                       *clear = true;
+                               return NULL;
+                       }
+
+       if (!clear)
                return obat;
-       if (!isTempTable(t) && clear && segments_conflict(tr, obat->segs, 1)) {
+
+       /* remainder is only to handle clear */
+       if (segments_conflict(tr, obat->segs, 1)) {
                *clear = true;
                return NULL;
        }
-
-       assert(!isTempTable(t));
        if (!(obat = timestamp_storage(tr, ATOMIC_PTR_GET(&t->data))))
                return NULL;
        storage *bat = ZNEW(storage);
        if (!bat)
                return NULL;
        bat->cs.refcnt = 1;
-       if (dup_storage(tr, obat, bat, clear || isTempTable(t) /* for clear and 
temp create empty storage */) != LOG_OK) {
+       if (dup_storage(tr, obat, bat) != LOG_OK) {
                destroy_storage(bat);
                return NULL;
        }
+       bat->cs.cleared = true;
        bat->cs.ts = tr->tid;
        /* only one writer else abort */
        bat->next = obat;
diff --git a/sql/test/concurrent/Tests/All b/sql/test/concurrent/Tests/All
--- a/sql/test/concurrent/Tests/All
+++ b/sql/test/concurrent/Tests/All
@@ -1,4 +1,5 @@
 simple_select
 crash_on_concurrent_use.SF-1411926
 segments-corruption
+read-segments-after-free
 smart-segment-merge
diff --git a/sql/test/concurrent/Tests/read-segment-after-free.SQL.py 
b/sql/test/concurrent/Tests/read-segment-after-free.SQL.py
new file mode 100644
--- /dev/null
+++ b/sql/test/concurrent/Tests/read-segment-after-free.SQL.py
@@ -0,0 +1,51 @@
+import os, random, pymonetdb
+from concurrent.futures import ProcessPoolExecutor
+import time
+from pymonetdb.exceptions import OperationalError
+
+init    =   '''
+            drop table if exists foo;
+            create table foo (c1, c2, c3, c4, c5) AS VALUES
+            (10, 20, 30, 40, 50),
+            (11, 21, 31, 41, 51),
+            (12, 22, 32, 42, 52);
+            '''
+
+query =     """
+            truncate foo;
+            insert into foo VALUES
+            (10, 20, 30, 40, 50),
+            (11, 21, 31, 41, 51),
+            (12, 22, 32, 42, 52);
+            """
+
+h   = os.getenv('MAPIHOST')
+p   = int(os.getenv('MAPIPORT'))
+db  = os.getenv('TSTDB')
+
+nr_queries  = 1000
+nr_clients  = 16
+
+conn = pymonetdb.connect(hostname=h, port=p,database=db, autocommit=True)
+cursor = conn.cursor()
+
+try:
+    cursor.execute(init)
+except  Exception as e:
+            print(e)
+            exit(1)
+
+def client(_):
+    conn = pymonetdb.connect(hostname=h, port=p,database=db, autocommit=True)
+    cursor = conn.cursor()
+    cursor.execute("set optimizer = 'minimal_fast';")
+
+    for x in range(0, nr_queries):
+        try:
+            cursor.execute(query)
+        except OperationalError as e:
+            # concurrency conflicts are allowed
+            pass
+
+with ProcessPoolExecutor(nr_clients) as pool:
+    pool.map(client, range(nr_clients))
_______________________________________________
checkin-list mailing list -- checkin-list@monetdb.org
To unsubscribe send an email to checkin-list-le...@monetdb.org

Reply via email to