Hi,

On 2019-05-01 11:41:48 -0700, Andres Freund wrote:
> I'm imagining something like
> 
> if (pg_try_advisory_xact_lock(1))
>     pg_advisory_xact_lock(2);
> else
>     pg_advisory_xact_lock(1);
> 
> in t1_lock_func. If you then make the session something roughly like
> 
> s1: pg_advisory_xact_lock(1);
> s1: pg_advisory_xact_lock(2);
> 
> s2: upsert t1 <blocking for 1>
> s1: pg_advisory_xact_unlock(1);
> s2: <continuing>
> s2: <blocking for 2>
> s1: insert into t1 values(1, 'someval');
> s1: pg_advisory_xact_unlock(2);
> s2: <continuing>
> s2: spec-conflict

Needed to be slightly more complicated than that, but not that much. See
the attached test.  What do you think?

I think we should apply something like this (minus the WARNING, of
course). It's a bit complicated, but it seems worth covering this
special case.

Greetings,

Andres Freund
diff --git a/src/backend/executor/nodeModifyTable.c b/src/backend/executor/nodeModifyTable.c
index 444c0c05746..bad50aab007 100644
--- a/src/backend/executor/nodeModifyTable.c
+++ b/src/backend/executor/nodeModifyTable.c
@@ -554,6 +554,8 @@ ExecInsert(ModifyTableState *mtstate,
 												   &specConflict,
 												   arbiterIndexes);
 
+			elog(WARNING, "speculative: %d", !!specConflict);
+
 			/* adjust the tuple's state accordingly */
 			table_complete_speculative(resultRelationDesc, slot,
 									   specToken, specConflict);
diff --git a/src/test/isolation/expected/speculative-conflict.out b/src/test/isolation/expected/speculative-conflict.out
new file mode 100644
index 00000000000..7b6ce4bb989
--- /dev/null
+++ b/src/test/isolation/expected/speculative-conflict.out
@@ -0,0 +1,185 @@
+Parsed test spec with 3 sessions
+
+starting permutation: controller_locks controller_show s1_upsert s2_upsert controller_show controller_unlock_1_1 controller_unlock_2_1 controller_unlock_1_3 controller_unlock_2_3 controller_show controller_unlock_2_2 controller_show controller_unlock_1_2 controller_show
+step controller_locks: SELECT pg_advisory_lock(sess, lock), sess, lock FROM generate_series(1, 2) a(sess), generate_series(1,3) b(lock);
+pg_advisory_locksess           lock           
+
+               1              1              
+               1              2              
+               1              3              
+               2              1              
+               2              2              
+               2              3              
+step controller_show: SELECT * FROM upserttest;
+key            data           
+
+step s1_upsert: INSERT INTO upserttest(key, data) VALUES('k1', 'inserted s1') ON CONFLICT (blurt_and_lock(key)) DO UPDATE SET data = upserttest.data || ' with conflict update s1'; <waiting ...>
+step s2_upsert: INSERT INTO upserttest(key, data) VALUES('k1', 'inserted s2') ON CONFLICT (blurt_and_lock(key)) DO UPDATE SET data = upserttest.data || ' with conflict update s2'; <waiting ...>
+step controller_show: SELECT * FROM upserttest;
+key            data           
+
+step controller_unlock_1_1: SELECT pg_advisory_unlock(1, 1);
+pg_advisory_unlock
+
+t              
+step controller_unlock_2_1: SELECT pg_advisory_unlock(2, 1);
+pg_advisory_unlock
+
+t              
+step controller_unlock_1_3: SELECT pg_advisory_unlock(1, 3);
+pg_advisory_unlock
+
+t              
+step controller_unlock_2_3: SELECT pg_advisory_unlock(2, 3);
+pg_advisory_unlock
+
+t              
+step controller_show: SELECT * FROM upserttest;
+key            data           
+
+step controller_unlock_2_2: SELECT pg_advisory_unlock(2, 2);
+pg_advisory_unlock
+
+t              
+s2: WARNING:  speculative: 0
+step s2_upsert: <... completed>
+step controller_show: SELECT * FROM upserttest;
+key            data           
+
+k1             inserted s2    
+step controller_unlock_1_2: SELECT pg_advisory_unlock(1, 2);
+pg_advisory_unlock
+
+t              
+s1: WARNING:  speculative: 1
+step s1_upsert: <... completed>
+step controller_show: SELECT * FROM upserttest;
+key            data           
+
+k1             inserted s2 with conflict update s1
+
+starting permutation: controller_locks controller_show s1_upsert s2_upsert controller_show controller_unlock_1_1 controller_unlock_2_1 controller_unlock_1_3 controller_unlock_2_3 controller_show controller_unlock_1_2 controller_show controller_unlock_2_2 controller_show
+step controller_locks: SELECT pg_advisory_lock(sess, lock), sess, lock FROM generate_series(1, 2) a(sess), generate_series(1,3) b(lock);
+pg_advisory_locksess           lock           
+
+               1              1              
+               1              2              
+               1              3              
+               2              1              
+               2              2              
+               2              3              
+step controller_show: SELECT * FROM upserttest;
+key            data           
+
+step s1_upsert: INSERT INTO upserttest(key, data) VALUES('k1', 'inserted s1') ON CONFLICT (blurt_and_lock(key)) DO UPDATE SET data = upserttest.data || ' with conflict update s1'; <waiting ...>
+step s2_upsert: INSERT INTO upserttest(key, data) VALUES('k1', 'inserted s2') ON CONFLICT (blurt_and_lock(key)) DO UPDATE SET data = upserttest.data || ' with conflict update s2'; <waiting ...>
+step controller_show: SELECT * FROM upserttest;
+key            data           
+
+step controller_unlock_1_1: SELECT pg_advisory_unlock(1, 1);
+pg_advisory_unlock
+
+t              
+step controller_unlock_2_1: SELECT pg_advisory_unlock(2, 1);
+pg_advisory_unlock
+
+t              
+step controller_unlock_1_3: SELECT pg_advisory_unlock(1, 3);
+pg_advisory_unlock
+
+t              
+step controller_unlock_2_3: SELECT pg_advisory_unlock(2, 3);
+pg_advisory_unlock
+
+t              
+step controller_show: SELECT * FROM upserttest;
+key            data           
+
+step controller_unlock_1_2: SELECT pg_advisory_unlock(1, 2);
+pg_advisory_unlock
+
+t              
+s1: WARNING:  speculative: 0
+step s1_upsert: <... completed>
+step controller_show: SELECT * FROM upserttest;
+key            data           
+
+k1             inserted s1    
+step controller_unlock_2_2: SELECT pg_advisory_unlock(2, 2);
+pg_advisory_unlock
+
+t              
+s2: WARNING:  speculative: 1
+step s2_upsert: <... completed>
+step controller_show: SELECT * FROM upserttest;
+key            data           
+
+k1             inserted s1 with conflict update s2
+
+starting permutation: controller_locks controller_show s1_begin s2_begin s1_upsert s2_upsert controller_show controller_unlock_1_1 controller_unlock_2_1 controller_unlock_1_3 controller_unlock_2_3 controller_show controller_unlock_1_2 controller_show controller_unlock_2_2 controller_show s1_commit controller_show s2_commit controller_show
+step controller_locks: SELECT pg_advisory_lock(sess, lock), sess, lock FROM generate_series(1, 2) a(sess), generate_series(1,3) b(lock);
+pg_advisory_locksess           lock           
+
+               1              1              
+               1              2              
+               1              3              
+               2              1              
+               2              2              
+               2              3              
+step controller_show: SELECT * FROM upserttest;
+key            data           
+
+step s1_begin: BEGIN;
+step s2_begin: BEGIN;
+step s1_upsert: INSERT INTO upserttest(key, data) VALUES('k1', 'inserted s1') ON CONFLICT (blurt_and_lock(key)) DO UPDATE SET data = upserttest.data || ' with conflict update s1'; <waiting ...>
+step s2_upsert: INSERT INTO upserttest(key, data) VALUES('k1', 'inserted s2') ON CONFLICT (blurt_and_lock(key)) DO UPDATE SET data = upserttest.data || ' with conflict update s2'; <waiting ...>
+step controller_show: SELECT * FROM upserttest;
+key            data           
+
+step controller_unlock_1_1: SELECT pg_advisory_unlock(1, 1);
+pg_advisory_unlock
+
+t              
+step controller_unlock_2_1: SELECT pg_advisory_unlock(2, 1);
+pg_advisory_unlock
+
+t              
+step controller_unlock_1_3: SELECT pg_advisory_unlock(1, 3);
+pg_advisory_unlock
+
+t              
+step controller_unlock_2_3: SELECT pg_advisory_unlock(2, 3);
+pg_advisory_unlock
+
+t              
+step controller_show: SELECT * FROM upserttest;
+key            data           
+
+step controller_unlock_1_2: SELECT pg_advisory_unlock(1, 2);
+pg_advisory_unlock
+
+t              
+s1: WARNING:  speculative: 0
+step s1_upsert: <... completed>
+step controller_show: SELECT * FROM upserttest;
+key            data           
+
+step controller_unlock_2_2: SELECT pg_advisory_unlock(2, 2);
+pg_advisory_unlock
+
+t              
+s2: WARNING:  speculative: 1
+step controller_show: SELECT * FROM upserttest;
+key            data           
+
+step s1_commit: COMMIT;
+step s2_upsert: <... completed>
+step controller_show: SELECT * FROM upserttest;
+key            data           
+
+k1             inserted s1    
+step s2_commit: COMMIT;
+step controller_show: SELECT * FROM upserttest;
+key            data           
+
+k1             inserted s1 with conflict update s2
diff --git a/src/test/isolation/specs/speculative-conflict.spec b/src/test/isolation/specs/speculative-conflict.spec
new file mode 100644
index 00000000000..49de55e289a
--- /dev/null
+++ b/src/test/isolation/specs/speculative-conflict.spec
@@ -0,0 +1,129 @@
+setup
+{
+     CREATE OR REPLACE FUNCTION blurt_and_lock(text) RETURNS text IMMUTABLE LANGUAGE plpgsql AS $$
+     BEGIN
+        RAISE NOTICE 'called for %', $1;
+
+	-- depending on lock state `, wait for lock 2 or 3
+        IF pg_try_advisory_xact_lock(current_setting('spec.session')::int, 1) THEN
+            RAISE NOTICE 'blocking 2';
+            PERFORM pg_advisory_xact_lock(current_setting('spec.session')::int, 2);
+        ELSE
+            RAISE NOTICE 'blocking 3';
+            PERFORM pg_advisory_xact_lock(current_setting('spec.session')::int, 3);
+        END IF;
+    RETURN $1;
+    END;$$;
+
+    CREATE TABLE upserttest(key text, data text);
+
+    CREATE UNIQUE INDEX ON upserttest((blurt_and_lock(key)));
+}
+
+teardown
+{
+    DROP TABLE upserttest;
+}
+
+session "controller"
+step "controller_locks" {SELECT pg_advisory_lock(sess, lock), sess, lock FROM generate_series(1, 2) a(sess), generate_series(1,3) b(lock);}
+step "controller_unlock_1_1" { SELECT pg_advisory_unlock(1, 1); }
+step "controller_unlock_2_1" { SELECT pg_advisory_unlock(2, 1); }
+step "controller_unlock_1_2" { SELECT pg_advisory_unlock(1, 2); }
+step "controller_unlock_2_2" { SELECT pg_advisory_unlock(2, 2); }
+step "controller_unlock_1_3" { SELECT pg_advisory_unlock(1, 3); }
+step "controller_unlock_2_3" { SELECT pg_advisory_unlock(2, 3); }
+step "controller_show" {SELECT * FROM upserttest; }
+
+session "s1"
+setup { SET spec.session = 1; }
+step "s1_begin"  { BEGIN; }
+step "s1_upsert" { INSERT INTO upserttest(key, data) VALUES('k1', 'inserted s1') ON CONFLICT (blurt_and_lock(key)) DO UPDATE SET data = upserttest.data || ' with conflict update s1'; }
+step "s1_commit"  { COMMIT; }
+
+session "s2"
+setup { SET spec.session = 2; }
+step "s2_begin"  { BEGIN; }
+step "s2_upsert" { INSERT INTO upserttest(key, data) VALUES('k1', 'inserted s2') ON CONFLICT (blurt_and_lock(key)) DO UPDATE SET data = upserttest.data || ' with conflict update s2'; }
+step "s2_commit"  { COMMIT; }
+
+# Test that speculative locks are correctly acquired and released, s2
+# inserts, s1 updates.
+permutation
+   # acquire a number of locks, to control execution flow - the
+   # blurt_and_lock function acquires advisory locks that allow us to
+   # continue after a) the optimistic conflict probe b) after the
+   # insertion of the speculative tuple.
+   "controller_locks"
+   "controller_show"
+   "s1_upsert" "s2_upsert"
+   "controller_show"
+   # Switch both sessions to wait on the other lock next time (the speculative insertion)
+   "controller_unlock_1_1" "controller_unlock_2_1"
+   # Allow both sessions to continue
+   "controller_unlock_1_3" "controller_unlock_2_3"
+   "controller_show"
+   # Allow the second session to finish insertion
+   "controller_unlock_2_2"
+   # This should now show a successful insertion
+   "controller_show"
+   # Allow the first session to finish insertion
+   "controller_unlock_1_2"
+   # This should now show a successful UPSERT
+   "controller_show"
+
+# Test that speculative locks are correctly acquired and released, s2
+# inserts, s1 updates.
+permutation
+   # acquire a number of locks, to control execution flow - the
+   # blurt_and_lock function acquires advisory locks that allow us to
+   # continue after a) the optimistic conflict probe b) after the
+   # insertion of the speculative tuple.
+   "controller_locks"
+   "controller_show"
+   "s1_upsert" "s2_upsert"
+   "controller_show"
+   # Switch both sessions to wait on the other lock next time (the speculative insertion)
+   "controller_unlock_1_1" "controller_unlock_2_1"
+   # Allow both sessions to continue
+   "controller_unlock_1_3" "controller_unlock_2_3"
+   "controller_show"
+   # Allow the first session to finish insertion
+   "controller_unlock_1_2"
+   # This should now show a successful insertion
+   "controller_show"
+   # Allow the second session to finish insertion
+   "controller_unlock_2_2"
+   # This should now show a successful UPSERT
+   "controller_show"
+
+# Test that speculative locks are correctly acquired and released, s2
+# inserts, s1 updates.  With the added complication that transactions
+# don't immediately commit.
+permutation
+   # acquire a number of locks, to control execution flow - the
+   # blurt_and_lock function acquires advisory locks that allow us to
+   # continue after a) the optimistic conflict probe b) after the
+   # insertion of the speculative tuple.
+   "controller_locks"
+   "controller_show"
+   "s1_begin" "s2_begin"
+   "s1_upsert" "s2_upsert"
+   "controller_show"
+   # Switch both sessions to wait on the other lock next time (the speculative insertion)
+   "controller_unlock_1_1" "controller_unlock_2_1"
+   # Allow both sessions to continue
+   "controller_unlock_1_3" "controller_unlock_2_3"
+   "controller_show"
+   # Allow the first session to finish insertion
+   "controller_unlock_1_2"
+   # But the change isn't visible yet, nor should the second session continue
+   "controller_show"
+   # Allow the second session to finish insertion, but it's blocked
+   "controller_unlock_2_2"
+   "controller_show"
+   # But committing should unblock
+   "s1_commit"
+   "controller_show"
+   "s2_commit"
+   "controller_show"

Reply via email to