Hi, On 2019-05-01 11:41:48 -0700, Andres Freund wrote: > I'm imagining something like > > if (pg_try_advisory_xact_lock(1)) > pg_advisory_xact_lock(2); > else > pg_advisory_xact_lock(1); > > in t1_lock_func. If you then make the session something roughly like > > s1: pg_advisory_xact_lock(1); > s1: pg_advisory_xact_lock(2); > > s2: upsert t1 <blocking for 1> > s1: pg_advisory_xact_unlock(1); > s2: <continuing> > s2: <blocking for 2> > s1: insert into t1 values(1, 'someval'); > s1: pg_advisory_xact_unlock(2); > s2: <continuing> > s2: spec-conflict
Needed to be slightly more complicated than that, but not that much. See the attached test. What do you think? I think we should apply something like this (minus the WARNING, of course). It's a bit complicated, but it seems worth covering this special case. Greetings, Andres Freund
diff --git a/src/backend/executor/nodeModifyTable.c b/src/backend/executor/nodeModifyTable.c index 444c0c05746..bad50aab007 100644 --- a/src/backend/executor/nodeModifyTable.c +++ b/src/backend/executor/nodeModifyTable.c @@ -554,6 +554,8 @@ ExecInsert(ModifyTableState *mtstate, &specConflict, arbiterIndexes); + elog(WARNING, "speculative: %d", !!specConflict); + /* adjust the tuple's state accordingly */ table_complete_speculative(resultRelationDesc, slot, specToken, specConflict); diff --git a/src/test/isolation/expected/speculative-conflict.out b/src/test/isolation/expected/speculative-conflict.out new file mode 100644 index 00000000000..7b6ce4bb989 --- /dev/null +++ b/src/test/isolation/expected/speculative-conflict.out @@ -0,0 +1,185 @@ +Parsed test spec with 3 sessions + +starting permutation: controller_locks controller_show s1_upsert s2_upsert controller_show controller_unlock_1_1 controller_unlock_2_1 controller_unlock_1_3 controller_unlock_2_3 controller_show controller_unlock_2_2 controller_show controller_unlock_1_2 controller_show +step controller_locks: SELECT pg_advisory_lock(sess, lock), sess, lock FROM generate_series(1, 2) a(sess), generate_series(1,3) b(lock); +pg_advisory_locksess lock + + 1 1 + 1 2 + 1 3 + 2 1 + 2 2 + 2 3 +step controller_show: SELECT * FROM upserttest; +key data + +step s1_upsert: INSERT INTO upserttest(key, data) VALUES('k1', 'inserted s1') ON CONFLICT (blurt_and_lock(key)) DO UPDATE SET data = upserttest.data || ' with conflict update s1'; <waiting ...> +step s2_upsert: INSERT INTO upserttest(key, data) VALUES('k1', 'inserted s2') ON CONFLICT (blurt_and_lock(key)) DO UPDATE SET data = upserttest.data || ' with conflict update s2'; <waiting ...> +step controller_show: SELECT * FROM upserttest; +key data + +step controller_unlock_1_1: SELECT pg_advisory_unlock(1, 1); +pg_advisory_unlock + +t +step controller_unlock_2_1: SELECT pg_advisory_unlock(2, 1); +pg_advisory_unlock + +t +step controller_unlock_1_3: SELECT pg_advisory_unlock(1, 3); +pg_advisory_unlock + +t +step controller_unlock_2_3: SELECT pg_advisory_unlock(2, 3); +pg_advisory_unlock + +t +step controller_show: SELECT * FROM upserttest; +key data + +step controller_unlock_2_2: SELECT pg_advisory_unlock(2, 2); +pg_advisory_unlock + +t +s2: WARNING: speculative: 0 +step s2_upsert: <... completed> +step controller_show: SELECT * FROM upserttest; +key data + +k1 inserted s2 +step controller_unlock_1_2: SELECT pg_advisory_unlock(1, 2); +pg_advisory_unlock + +t +s1: WARNING: speculative: 1 +step s1_upsert: <... completed> +step controller_show: SELECT * FROM upserttest; +key data + +k1 inserted s2 with conflict update s1 + +starting permutation: controller_locks controller_show s1_upsert s2_upsert controller_show controller_unlock_1_1 controller_unlock_2_1 controller_unlock_1_3 controller_unlock_2_3 controller_show controller_unlock_1_2 controller_show controller_unlock_2_2 controller_show +step controller_locks: SELECT pg_advisory_lock(sess, lock), sess, lock FROM generate_series(1, 2) a(sess), generate_series(1,3) b(lock); +pg_advisory_locksess lock + + 1 1 + 1 2 + 1 3 + 2 1 + 2 2 + 2 3 +step controller_show: SELECT * FROM upserttest; +key data + +step s1_upsert: INSERT INTO upserttest(key, data) VALUES('k1', 'inserted s1') ON CONFLICT (blurt_and_lock(key)) DO UPDATE SET data = upserttest.data || ' with conflict update s1'; <waiting ...> +step s2_upsert: INSERT INTO upserttest(key, data) VALUES('k1', 'inserted s2') ON CONFLICT (blurt_and_lock(key)) DO UPDATE SET data = upserttest.data || ' with conflict update s2'; <waiting ...> +step controller_show: SELECT * FROM upserttest; +key data + +step controller_unlock_1_1: SELECT pg_advisory_unlock(1, 1); +pg_advisory_unlock + +t +step controller_unlock_2_1: SELECT pg_advisory_unlock(2, 1); +pg_advisory_unlock + +t +step controller_unlock_1_3: SELECT pg_advisory_unlock(1, 3); +pg_advisory_unlock + +t +step controller_unlock_2_3: SELECT pg_advisory_unlock(2, 3); +pg_advisory_unlock + +t +step controller_show: SELECT * FROM upserttest; +key data + +step controller_unlock_1_2: SELECT pg_advisory_unlock(1, 2); +pg_advisory_unlock + +t +s1: WARNING: speculative: 0 +step s1_upsert: <... completed> +step controller_show: SELECT * FROM upserttest; +key data + +k1 inserted s1 +step controller_unlock_2_2: SELECT pg_advisory_unlock(2, 2); +pg_advisory_unlock + +t +s2: WARNING: speculative: 1 +step s2_upsert: <... completed> +step controller_show: SELECT * FROM upserttest; +key data + +k1 inserted s1 with conflict update s2 + +starting permutation: controller_locks controller_show s1_begin s2_begin s1_upsert s2_upsert controller_show controller_unlock_1_1 controller_unlock_2_1 controller_unlock_1_3 controller_unlock_2_3 controller_show controller_unlock_1_2 controller_show controller_unlock_2_2 controller_show s1_commit controller_show s2_commit controller_show +step controller_locks: SELECT pg_advisory_lock(sess, lock), sess, lock FROM generate_series(1, 2) a(sess), generate_series(1,3) b(lock); +pg_advisory_locksess lock + + 1 1 + 1 2 + 1 3 + 2 1 + 2 2 + 2 3 +step controller_show: SELECT * FROM upserttest; +key data + +step s1_begin: BEGIN; +step s2_begin: BEGIN; +step s1_upsert: INSERT INTO upserttest(key, data) VALUES('k1', 'inserted s1') ON CONFLICT (blurt_and_lock(key)) DO UPDATE SET data = upserttest.data || ' with conflict update s1'; <waiting ...> +step s2_upsert: INSERT INTO upserttest(key, data) VALUES('k1', 'inserted s2') ON CONFLICT (blurt_and_lock(key)) DO UPDATE SET data = upserttest.data || ' with conflict update s2'; <waiting ...> +step controller_show: SELECT * FROM upserttest; +key data + +step controller_unlock_1_1: SELECT pg_advisory_unlock(1, 1); +pg_advisory_unlock + +t +step controller_unlock_2_1: SELECT pg_advisory_unlock(2, 1); +pg_advisory_unlock + +t +step controller_unlock_1_3: SELECT pg_advisory_unlock(1, 3); +pg_advisory_unlock + +t +step controller_unlock_2_3: SELECT pg_advisory_unlock(2, 3); +pg_advisory_unlock + +t +step controller_show: SELECT * FROM upserttest; +key data + +step controller_unlock_1_2: SELECT pg_advisory_unlock(1, 2); +pg_advisory_unlock + +t +s1: WARNING: speculative: 0 +step s1_upsert: <... completed> +step controller_show: SELECT * FROM upserttest; +key data + +step controller_unlock_2_2: SELECT pg_advisory_unlock(2, 2); +pg_advisory_unlock + +t +s2: WARNING: speculative: 1 +step controller_show: SELECT * FROM upserttest; +key data + +step s1_commit: COMMIT; +step s2_upsert: <... completed> +step controller_show: SELECT * FROM upserttest; +key data + +k1 inserted s1 +step s2_commit: COMMIT; +step controller_show: SELECT * FROM upserttest; +key data + +k1 inserted s1 with conflict update s2 diff --git a/src/test/isolation/specs/speculative-conflict.spec b/src/test/isolation/specs/speculative-conflict.spec new file mode 100644 index 00000000000..49de55e289a --- /dev/null +++ b/src/test/isolation/specs/speculative-conflict.spec @@ -0,0 +1,129 @@ +setup +{ + CREATE OR REPLACE FUNCTION blurt_and_lock(text) RETURNS text IMMUTABLE LANGUAGE plpgsql AS $$ + BEGIN + RAISE NOTICE 'called for %', $1; + + -- depending on lock state `, wait for lock 2 or 3 + IF pg_try_advisory_xact_lock(current_setting('spec.session')::int, 1) THEN + RAISE NOTICE 'blocking 2'; + PERFORM pg_advisory_xact_lock(current_setting('spec.session')::int, 2); + ELSE + RAISE NOTICE 'blocking 3'; + PERFORM pg_advisory_xact_lock(current_setting('spec.session')::int, 3); + END IF; + RETURN $1; + END;$$; + + CREATE TABLE upserttest(key text, data text); + + CREATE UNIQUE INDEX ON upserttest((blurt_and_lock(key))); +} + +teardown +{ + DROP TABLE upserttest; +} + +session "controller" +step "controller_locks" {SELECT pg_advisory_lock(sess, lock), sess, lock FROM generate_series(1, 2) a(sess), generate_series(1,3) b(lock);} +step "controller_unlock_1_1" { SELECT pg_advisory_unlock(1, 1); } +step "controller_unlock_2_1" { SELECT pg_advisory_unlock(2, 1); } +step "controller_unlock_1_2" { SELECT pg_advisory_unlock(1, 2); } +step "controller_unlock_2_2" { SELECT pg_advisory_unlock(2, 2); } +step "controller_unlock_1_3" { SELECT pg_advisory_unlock(1, 3); } +step "controller_unlock_2_3" { SELECT pg_advisory_unlock(2, 3); } +step "controller_show" {SELECT * FROM upserttest; } + +session "s1" +setup { SET spec.session = 1; } +step "s1_begin" { BEGIN; } +step "s1_upsert" { INSERT INTO upserttest(key, data) VALUES('k1', 'inserted s1') ON CONFLICT (blurt_and_lock(key)) DO UPDATE SET data = upserttest.data || ' with conflict update s1'; } +step "s1_commit" { COMMIT; } + +session "s2" +setup { SET spec.session = 2; } +step "s2_begin" { BEGIN; } +step "s2_upsert" { INSERT INTO upserttest(key, data) VALUES('k1', 'inserted s2') ON CONFLICT (blurt_and_lock(key)) DO UPDATE SET data = upserttest.data || ' with conflict update s2'; } +step "s2_commit" { COMMIT; } + +# Test that speculative locks are correctly acquired and released, s2 +# inserts, s1 updates. +permutation + # acquire a number of locks, to control execution flow - the + # blurt_and_lock function acquires advisory locks that allow us to + # continue after a) the optimistic conflict probe b) after the + # insertion of the speculative tuple. + "controller_locks" + "controller_show" + "s1_upsert" "s2_upsert" + "controller_show" + # Switch both sessions to wait on the other lock next time (the speculative insertion) + "controller_unlock_1_1" "controller_unlock_2_1" + # Allow both sessions to continue + "controller_unlock_1_3" "controller_unlock_2_3" + "controller_show" + # Allow the second session to finish insertion + "controller_unlock_2_2" + # This should now show a successful insertion + "controller_show" + # Allow the first session to finish insertion + "controller_unlock_1_2" + # This should now show a successful UPSERT + "controller_show" + +# Test that speculative locks are correctly acquired and released, s2 +# inserts, s1 updates. +permutation + # acquire a number of locks, to control execution flow - the + # blurt_and_lock function acquires advisory locks that allow us to + # continue after a) the optimistic conflict probe b) after the + # insertion of the speculative tuple. + "controller_locks" + "controller_show" + "s1_upsert" "s2_upsert" + "controller_show" + # Switch both sessions to wait on the other lock next time (the speculative insertion) + "controller_unlock_1_1" "controller_unlock_2_1" + # Allow both sessions to continue + "controller_unlock_1_3" "controller_unlock_2_3" + "controller_show" + # Allow the first session to finish insertion + "controller_unlock_1_2" + # This should now show a successful insertion + "controller_show" + # Allow the second session to finish insertion + "controller_unlock_2_2" + # This should now show a successful UPSERT + "controller_show" + +# Test that speculative locks are correctly acquired and released, s2 +# inserts, s1 updates. With the added complication that transactions +# don't immediately commit. +permutation + # acquire a number of locks, to control execution flow - the + # blurt_and_lock function acquires advisory locks that allow us to + # continue after a) the optimistic conflict probe b) after the + # insertion of the speculative tuple. + "controller_locks" + "controller_show" + "s1_begin" "s2_begin" + "s1_upsert" "s2_upsert" + "controller_show" + # Switch both sessions to wait on the other lock next time (the speculative insertion) + "controller_unlock_1_1" "controller_unlock_2_1" + # Allow both sessions to continue + "controller_unlock_1_3" "controller_unlock_2_3" + "controller_show" + # Allow the first session to finish insertion + "controller_unlock_1_2" + # But the change isn't visible yet, nor should the second session continue + "controller_show" + # Allow the second session to finish insertion, but it's blocked + "controller_unlock_2_2" + "controller_show" + # But committing should unblock + "s1_commit" + "controller_show" + "s2_commit" + "controller_show"