On Wed, Apr 18, 2018 at 9:43 AM, Kyotaro HORIGUCHI
<horiguchi.kyot...@lab.ntt.co.jp> wrote:
>
> Anyway I think we should warn or error out if one nondirect
> update touches two nor more tuples in the first place.
>
> =# UPDATE fplt SET b = (CASE WHEN random() <= 1 THEN 10 ELSE 20 END) WHERE a 
> = 1;
> ERROR:  updated 2 rows for a tuple identity on the remote end

I liked that idea. But I think your patch wasn't quite right, esp.
when the returning had an SRF in it. Right now n_rows tracks the
number of rows returned if there is a returning list or the number of
rows updated/deleted on the foreign server. If there is an SRF, n_rows
can return multiple rows for a single updated or deleted row. So, I
changed your code to track number of rows updated/deleted and number
of rows returned separately. BTW, your patch didn't handle DELETE
case.

I have attached a set of patches
0001 adds a test case showing the issue.
0002 modified patch based on your idea of throwing an error
0003 WIP patch with a partial fix for the issue as discussed upthread

The expected output in 0001 is set to what it would when the problem
gets fixed. The expected output in 0002 is what it would be when we
commit only 0002 without a complete fix.
>
>
>> There are two ways to fix this
>> 1. Use WHERE CURRENT OF with cursors to update rows. This means that
>> we fetch only one row at a time and update it. This can slow down the
>> execution drastically.
>> 2. Along with ctid use tableoid as a qualifier i.e. WHERE clause of
>> UPDATE/DELETE statement has ctid = $1 AND tableoid = $2 as conditions.
>>
>> PFA patch along the lines of 2nd approach and along with the
>> testcases. The idea is to inject tableoid attribute to be fetched from
>> the foreign server similar to ctid and then add it to the DML
>> statement being constructed.
>>
>> It does fix the problem. But the patch as is interferes with the way
>> we handle tableoid currently. That can be seen from the regression
>> diffs that the patch causes.  RIght now, every tableoid reference gets
>> converted into the tableoid of the foreign table (and not the tableoid
>> of the foreign table). Somehow we need to differentiate between the
>> tableoid injected for DML and tableoid references added by the user in
>> the original query and then use tableoid on the foreign server for the
>> first and local foreign table's oid for the second. Right now, I don't
>> see a simple way to do that.
>
> We cannot add no non-system (junk) columns not defined in foreign
> table columns.

Why? That's a probable way of fixing this problem.

> We could pass tableoid via a side channel but we
> get wrong value if the scan is not consists of only one foreign
> relation. I don't think adding remote_tableoid in HeapTupleData
> is acceptable.

I am thinking of adding remote_tableoid in HeapTupleData since not all
FDWs will have the concept of tableoid. But we need to somehow
distinguish the tableoid resjunk added for DMLs and tableoid requested
by the user.

> Explicity defining remote_tableoid column in
> foreign relation might work but it makes things combersome..
>

Not just cumbersome, it's not going to be always right, if the things
change on the foreign server e.g. OID of the table changes because it
got dropped and recreated on the foreign server or OID remained same
but the table got inherited and so on.

I think we should try getting 0001 and 0002 at least committed
independent of 0003.

-- 
Best Wishes,
Ashutosh Bapat
EnterpriseDB Corporation
The Postgres Database Company
From 1a8fc73fffa522e10a831bb9e6557a9fb4b0b602 Mon Sep 17 00:00:00 2001
From: Ashutosh Bapat <ashutosh.ba...@enterprisedb.com>
Date: Wed, 18 Apr 2018 10:20:27 +0530
Subject: [PATCH 1/3] Tests to show problem when foreign table points to a
 partitioned table or inheritance table on the foreign
 server

When a foreign table points to a partitioned table or an inheritance
parent on the foreign server, a non-direct DML can affect multiple
rows when only one row is intended to be affected. This
happens because postgres_fdw uses only ctid to identify a row to work
on. Though ctid uniquely identifies a row in a single table, in a
partitioned table or in an inheritance hierarchy, there can be be
multiple rows, in different partitions, with the same ctid. So
DML statement sent to the foreign server by postgres_fdw ends up
affecting more than one rows, only one of which is intended to be
affected.

This commit adds testcases to show the problem. A subsequent commit
would have a fix to the problem.

Ashutosh Bapat, reviewed by Kyotaro Horiguchi
---
 contrib/postgres_fdw/expected/postgres_fdw.out |  121 ++++++++++++++++++++++++
 contrib/postgres_fdw/sql/postgres_fdw.sql      |   53 +++++++++++
 2 files changed, 174 insertions(+)

diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out
index e4d9469..5156002 100644
--- a/contrib/postgres_fdw/expected/postgres_fdw.out
+++ b/contrib/postgres_fdw/expected/postgres_fdw.out
@@ -6923,6 +6923,69 @@ SELECT tableoid::regclass, * FROM ONLY a;
 DROP TABLE a CASCADE;
 NOTICE:  drop cascades to foreign table b
 DROP TABLE loct;
+-- test DML statement on a foreign table pointing to an inheritance hierarchy
+-- on the remote server
+CREATE TABLE a(aa TEXT);
+ALTER TABLE a SET (autovacuum_enabled = 'false');
+CREATE TABLE b() INHERITS(a);
+ALTER TABLE b SET (autovacuum_enabled = 'false');
+INSERT INTO a(aa) VALUES('aaa');
+INSERT INTO b(aa) VALUES('bbb');
+CREATE FOREIGN TABLE fa (aa TEXT) SERVER loopback OPTIONS (table_name 'a');
+SELECT tableoid::regclass, ctid, * FROM fa;
+ tableoid | ctid  | aa  
+----------+-------+-----
+ fa       | (0,1) | aaa
+ fa       | (0,1) | bbb
+(2 rows)
+
+-- use random() so that DML statement is not pushed down to the foreign
+-- server
+EXPLAIN (VERBOSE, COSTS OFF)
+UPDATE fa SET aa = (CASE WHEN random() <= 1 THEN 'zzzz' ELSE NULL END) WHERE aa = 'aaa';
+                                                QUERY PLAN                                                 
+-----------------------------------------------------------------------------------------------------------
+ Update on public.fa
+   Remote SQL: UPDATE public.a SET aa = $2 WHERE ctid = $1
+   ->  Foreign Scan on public.fa
+         Output: CASE WHEN (random() <= '1'::double precision) THEN 'zzzz'::text ELSE NULL::text END, ctid
+         Remote SQL: SELECT ctid FROM public.a WHERE ((aa = 'aaa'::text)) FOR UPDATE
+(5 rows)
+
+UPDATE fa SET aa = (CASE WHEN random() <= 1 THEN 'zzzz' ELSE NULL END) WHERE aa = 'aaa';
+SELECT tableoid::regclass, ctid, * FROM fa;
+ tableoid | ctid  |  aa  
+----------+-------+------
+ fa       | (0,2) | zzzz
+ fa       | (0,1) | bbb
+(2 rows)
+
+-- repopulate tables so that we have rows with same ctid
+TRUNCATE a, b; 
+INSERT INTO a(aa) VALUES('aaa');
+INSERT INTO b(aa) VALUES('bbb');
+EXPLAIN (VERBOSE, COSTS OFF)
+DELETE FROM fa WHERE aa = (CASE WHEN random() <= 1 THEN 'aaa' ELSE 'bbb' END);
+                                                  QUERY PLAN                                                   
+---------------------------------------------------------------------------------------------------------------
+ Delete on public.fa
+   Remote SQL: DELETE FROM public.a WHERE ctid = $1
+   ->  Foreign Scan on public.fa
+         Output: ctid
+         Filter: (fa.aa = CASE WHEN (random() <= '1'::double precision) THEN 'aaa'::text ELSE 'bbb'::text END)
+         Remote SQL: SELECT aa, ctid FROM public.a FOR UPDATE
+(6 rows)
+
+DELETE FROM fa WHERE aa = (CASE WHEN random() <= 1 THEN 'aaa' ELSE 'bbb' END);
+SELECT tableoid::regclass, ctid, * FROM fa;
+ tableoid | ctid | aa 
+----------+------+----
+ fa       | (0,1) | bbb
+(1 rows)
+
+DROP FOREIGN TABLE fa;
+DROP TABLE a CASCADE;
+NOTICE:  drop cascades to table b
 -- Check SELECT FOR UPDATE/SHARE with an inherited source table
 create table loct1 (f1 int, f2 int, f3 int);
 create table loct2 (f1 int, f2 int, f3 int);
@@ -8317,3 +8380,61 @@ SELECT b, avg(a), max(a), count(*) FROM pagg_tab GROUP BY b HAVING sum(a) < 700
 
 -- Clean-up
 RESET enable_partitionwise_aggregate;
+-- test DML statement on foreign table pointing to a foreign partitioned table
+CREATE TABLE plt (a int, b int) PARTITION BY LIST(a);
+CREATE TABLE plt_p1 PARTITION OF plt FOR VALUES IN (1);
+CREATE TABLE plt_p2 PARTITION OF plt FOR VALUES IN (2);
+INSERT INTO plt VALUES (1, 1), (2, 2);
+CREATE FOREIGN TABLE fplt (a int, b int) SERVER loopback OPTIONS (table_name 'plt');
+SELECT tableoid::regclass, ctid, * FROM fplt;
+ tableoid | ctid  | a | b 
+----------+-------+---+---
+ fplt     | (0,1) | 1 | 1
+ fplt     | (0,1) | 2 | 2
+(2 rows)
+
+-- use random() so that DML statement is not pushed down to the foreign
+-- server
+EXPLAIN (VERBOSE, COSTS OFF)
+UPDATE fplt SET b = (CASE WHEN random() <= 1 THEN 10 ELSE 20 END) WHERE a = 1;
+                                         QUERY PLAN                                         
+--------------------------------------------------------------------------------------------
+ Update on public.fplt
+   Remote SQL: UPDATE public.plt SET b = $2 WHERE ctid = $1
+   ->  Foreign Scan on public.fplt
+         Output: a, CASE WHEN (random() <= '1'::double precision) THEN 10 ELSE 20 END, ctid
+         Remote SQL: SELECT a, ctid FROM public.plt WHERE ((a = 1)) FOR UPDATE
+(5 rows)
+
+UPDATE fplt SET b = (CASE WHEN random() <= 1 THEN 10 ELSE 20 END) WHERE a = 1;
+SELECT tableoid::regclass, ctid, * FROM fplt;
+ tableoid | ctid  | a | b  
+----------+-------+---+----
+ fplt     | (0,2) | 1 | 10
+ fplt     | (0,1) | 2 | 2
+(2 rows)
+
+-- repopulate partitioned table so that we have rows with same ctid
+TRUNCATE plt;
+INSERT INTO plt VALUES (1, 1), (2, 2);
+EXPLAIN (VERBOSE, COSTS OFF)
+DELETE FROM fplt WHERE a = (CASE WHEN random() <=  1 THEN 1 ELSE 10 END);
+                                         QUERY PLAN                                          
+---------------------------------------------------------------------------------------------
+ Delete on public.fplt
+   Remote SQL: DELETE FROM public.plt WHERE ctid = $1
+   ->  Foreign Scan on public.fplt
+         Output: ctid
+         Filter: (fplt.a = CASE WHEN (random() <= '1'::double precision) THEN 1 ELSE 10 END)
+         Remote SQL: SELECT a, ctid FROM public.plt FOR UPDATE
+(6 rows)
+
+DELETE FROM fplt WHERE a = (CASE WHEN random() <=  1 THEN 1 ELSE 10 END);
+SELECT tableoid::regclass, ctid, * FROM fplt;
+ tableoid | ctid  | a | b  
+----------+-------+---+----
+ fplt     | (0,1) | 2 | 2 
+(1 row)
+
+DROP TABLE plt;
+DROP FOREIGN TABLE fplt;
diff --git a/contrib/postgres_fdw/sql/postgres_fdw.sql b/contrib/postgres_fdw/sql/postgres_fdw.sql
index e1df952..1bec916 100644
--- a/contrib/postgres_fdw/sql/postgres_fdw.sql
+++ b/contrib/postgres_fdw/sql/postgres_fdw.sql
@@ -1648,6 +1648,35 @@ SELECT tableoid::regclass, * FROM ONLY a;
 DROP TABLE a CASCADE;
 DROP TABLE loct;
 
+-- test DML statement on a foreign table pointing to an inheritance hierarchy
+-- on the remote server
+CREATE TABLE a(aa TEXT);
+ALTER TABLE a SET (autovacuum_enabled = 'false');
+CREATE TABLE b() INHERITS(a);
+ALTER TABLE b SET (autovacuum_enabled = 'false');
+INSERT INTO a(aa) VALUES('aaa');
+INSERT INTO b(aa) VALUES('bbb');
+CREATE FOREIGN TABLE fa (aa TEXT) SERVER loopback OPTIONS (table_name 'a');
+
+SELECT tableoid::regclass, ctid, * FROM fa;
+-- use random() so that DML statement is not pushed down to the foreign
+-- server
+EXPLAIN (VERBOSE, COSTS OFF)
+UPDATE fa SET aa = (CASE WHEN random() <= 1 THEN 'zzzz' ELSE NULL END) WHERE aa = 'aaa';
+UPDATE fa SET aa = (CASE WHEN random() <= 1 THEN 'zzzz' ELSE NULL END) WHERE aa = 'aaa';
+SELECT tableoid::regclass, ctid, * FROM fa;
+-- repopulate tables so that we have rows with same ctid
+TRUNCATE a, b; 
+INSERT INTO a(aa) VALUES('aaa');
+INSERT INTO b(aa) VALUES('bbb');
+EXPLAIN (VERBOSE, COSTS OFF)
+DELETE FROM fa WHERE aa = (CASE WHEN random() <= 1 THEN 'aaa' ELSE 'bbb' END);
+DELETE FROM fa WHERE aa = (CASE WHEN random() <= 1 THEN 'aaa' ELSE 'bbb' END);
+SELECT tableoid::regclass, ctid, * FROM fa;
+
+DROP FOREIGN TABLE fa;
+DROP TABLE a CASCADE;
+
 -- Check SELECT FOR UPDATE/SHARE with an inherited source table
 create table loct1 (f1 int, f2 int, f3 int);
 create table loct2 (f1 int, f2 int, f3 int);
@@ -2220,3 +2249,27 @@ SELECT b, avg(a), max(a), count(*) FROM pagg_tab GROUP BY b HAVING sum(a) < 700
 
 -- Clean-up
 RESET enable_partitionwise_aggregate;
+
+-- test DML statement on foreign table pointing to a foreign partitioned table
+CREATE TABLE plt (a int, b int) PARTITION BY LIST(a);
+CREATE TABLE plt_p1 PARTITION OF plt FOR VALUES IN (1);
+CREATE TABLE plt_p2 PARTITION OF plt FOR VALUES IN (2);
+INSERT INTO plt VALUES (1, 1), (2, 2);
+CREATE FOREIGN TABLE fplt (a int, b int) SERVER loopback OPTIONS (table_name 'plt');
+SELECT tableoid::regclass, ctid, * FROM fplt;
+-- use random() so that DML statement is not pushed down to the foreign
+-- server
+EXPLAIN (VERBOSE, COSTS OFF)
+UPDATE fplt SET b = (CASE WHEN random() <= 1 THEN 10 ELSE 20 END) WHERE a = 1;
+UPDATE fplt SET b = (CASE WHEN random() <= 1 THEN 10 ELSE 20 END) WHERE a = 1;
+SELECT tableoid::regclass, ctid, * FROM fplt;
+-- repopulate partitioned table so that we have rows with same ctid
+TRUNCATE plt;
+INSERT INTO plt VALUES (1, 1), (2, 2);
+EXPLAIN (VERBOSE, COSTS OFF)
+DELETE FROM fplt WHERE a = (CASE WHEN random() <=  1 THEN 1 ELSE 10 END);
+DELETE FROM fplt WHERE a = (CASE WHEN random() <=  1 THEN 1 ELSE 10 END);
+SELECT tableoid::regclass, ctid, * FROM fplt;
+
+DROP TABLE plt;
+DROP FOREIGN TABLE fplt;
-- 
1.7.9.5

From c81e2497eadbacf4722c55cf8e2d528a15ce1f00 Mon Sep 17 00:00:00 2001
From: Ashutosh Bapat <ashutosh.ba...@enterprisedb.com>
Date: Wed, 18 Apr 2018 11:06:20 +0530
Subject: [PATCH 2/3] Error out if one iteration of non-direct DML affects
 more than one row on the foreign server

When a foreign table points to a partitioned table or an inheritance
parent on the foreign server, a non-direct DML can affect multiple
rows when only one row is intended to be affected. This happens
because postgres_fdw uses only ctid to identify a row to work on.
Though ctid uniquely identifies a row in a single table, in a
partitioned table or in an inheritance hierarchy, there can be be
multiple rows, in different partitions, with the same ctid. So a DML
statement sent to the foreign server by postgres_fdw ends up affecting
more than one rows, only one of which is intended to be affected.

In such a case it's good to throw an error instead of corrupting
remote database with unwanted UPDATE/DELETEs. Subsequent commits will
try to fix this situation.

Ashutosh Bapat and Kyotaro Horiguchi
---
 contrib/postgres_fdw/expected/postgres_fdw.out |   32 ++++++++++-------
 contrib/postgres_fdw/postgres_fdw.c            |   44 ++++++++++++++++++------
 2 files changed, 53 insertions(+), 23 deletions(-)

diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out
index 5156002..77d24ea 100644
--- a/contrib/postgres_fdw/expected/postgres_fdw.out
+++ b/contrib/postgres_fdw/expected/postgres_fdw.out
@@ -6953,10 +6953,11 @@ UPDATE fa SET aa = (CASE WHEN random() <= 1 THEN 'zzzz' ELSE NULL END) WHERE aa
 (5 rows)
 
 UPDATE fa SET aa = (CASE WHEN random() <= 1 THEN 'zzzz' ELSE NULL END) WHERE aa = 'aaa';
+ERROR:  foreign server updated 2 rows when only one row was expected to be updated
 SELECT tableoid::regclass, ctid, * FROM fa;
- tableoid | ctid  |  aa  
-----------+-------+------
- fa       | (0,2) | zzzz
+ tableoid | ctid  | aa  
+----------+-------+-----
+ fa       | (0,1) | aaa
  fa       | (0,1) | bbb
 (2 rows)
 
@@ -6977,11 +6978,13 @@ DELETE FROM fa WHERE aa = (CASE WHEN random() <= 1 THEN 'aaa' ELSE 'bbb' END);
 (6 rows)
 
 DELETE FROM fa WHERE aa = (CASE WHEN random() <= 1 THEN 'aaa' ELSE 'bbb' END);
+ERROR:  foreign server deleted 2 rows when only one row was expected to be deleted
 SELECT tableoid::regclass, ctid, * FROM fa;
- tableoid | ctid | aa 
-----------+------+----
+ tableoid | ctid  | aa  
+----------+-------+-----
+ fa       | (0,1) | aaa
  fa       | (0,1) | bbb
-(1 rows)
+(2 rows)
 
 DROP FOREIGN TABLE fa;
 DROP TABLE a CASCADE;
@@ -8407,10 +8410,11 @@ UPDATE fplt SET b = (CASE WHEN random() <= 1 THEN 10 ELSE 20 END) WHERE a = 1;
 (5 rows)
 
 UPDATE fplt SET b = (CASE WHEN random() <= 1 THEN 10 ELSE 20 END) WHERE a = 1;
+ERROR:  foreign server updated 2 rows when only one row was expected to be updated
 SELECT tableoid::regclass, ctid, * FROM fplt;
- tableoid | ctid  | a | b  
-----------+-------+---+----
- fplt     | (0,2) | 1 | 10
+ tableoid | ctid  | a | b 
+----------+-------+---+---
+ fplt     | (0,1) | 1 | 1
  fplt     | (0,1) | 2 | 2
 (2 rows)
 
@@ -8430,11 +8434,13 @@ DELETE FROM fplt WHERE a = (CASE WHEN random() <=  1 THEN 1 ELSE 10 END);
 (6 rows)
 
 DELETE FROM fplt WHERE a = (CASE WHEN random() <=  1 THEN 1 ELSE 10 END);
+ERROR:  foreign server deleted 2 rows when only one row was expected to be deleted
 SELECT tableoid::regclass, ctid, * FROM fplt;
- tableoid | ctid  | a | b  
-----------+-------+---+----
- fplt     | (0,1) | 2 | 2 
-(1 row)
+ tableoid | ctid  | a | b 
+----------+-------+---+---
+ fplt     | (0,1) | 1 | 1
+ fplt     | (0,1) | 2 | 2
+(2 rows)
 
 DROP TABLE plt;
 DROP FOREIGN TABLE fplt;
diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c
index 30e5726..0404e21 100644
--- a/contrib/postgres_fdw/postgres_fdw.c
+++ b/contrib/postgres_fdw/postgres_fdw.c
@@ -1810,7 +1810,8 @@ postgresExecForeignUpdate(EState *estate,
 	bool		isNull;
 	const char **p_values;
 	PGresult   *res;
-	int			n_rows;
+	int			n_rows_returned;
+	int			n_rows_updated;
 
 	/* Set up the prepared statement on the remote server, if we didn't yet */
 	if (!fmstate->p_name)
@@ -1853,22 +1854,33 @@ postgresExecForeignUpdate(EState *estate,
 		pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
 
 	/* Check number of rows affected, and fetch RETURNING tuple if any */
+	n_rows_updated = atoi(PQcmdTuples(res));
 	if (fmstate->has_returning)
 	{
-		n_rows = PQntuples(res);
-		if (n_rows > 0)
+		n_rows_returned = PQntuples(res);
+		if (n_rows_returned > 0)
 			store_returning_result(fmstate, slot, res);
 	}
 	else
-		n_rows = atoi(PQcmdTuples(res));
+		n_rows_returned = 0;
 
 	/* And clean up */
 	PQclear(res);
 
 	MemoryContextReset(fmstate->temp_cxt);
 
+	/* No rows should be returned if no rows were updated. */
+	Assert(n_rows_returned == 0 || n_rows_updated > 0);
+
+	/* ERROR if more than one row was updated on the remote end */
+	if (n_rows_updated > 1)
+		ereport(ERROR,
+				(errcode (ERRCODE_FDW_ERROR), /* XXX */
+				 errmsg ("foreign server updated %d rows when only one row was expected to be updated",
+						 n_rows_updated)));
+
 	/* Return NULL if nothing was updated on the remote end */
-	return (n_rows > 0) ? slot : NULL;
+	return (n_rows_updated > 0) ? slot : NULL;
 }
 
 /*
@@ -1886,7 +1898,8 @@ postgresExecForeignDelete(EState *estate,
 	bool		isNull;
 	const char **p_values;
 	PGresult   *res;
-	int			n_rows;
+	int			n_rows_returned;
+	int			n_rows_deleted;
 
 	/* Set up the prepared statement on the remote server, if we didn't yet */
 	if (!fmstate->p_name)
@@ -1929,22 +1942,33 @@ postgresExecForeignDelete(EState *estate,
 		pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
 
 	/* Check number of rows affected, and fetch RETURNING tuple if any */
+	n_rows_deleted = atoi(PQcmdTuples(res));
 	if (fmstate->has_returning)
 	{
-		n_rows = PQntuples(res);
-		if (n_rows > 0)
+		n_rows_returned = PQntuples(res);
+		if (n_rows_returned > 0)
 			store_returning_result(fmstate, slot, res);
 	}
 	else
-		n_rows = atoi(PQcmdTuples(res));
+		n_rows_returned = 0;
 
 	/* And clean up */
 	PQclear(res);
 
 	MemoryContextReset(fmstate->temp_cxt);
 
+	/* No rows should be returned if no rows were updated. */
+	Assert(n_rows_returned == 0 || n_rows_deleted > 0);
+
+	/* ERROR if more than one row was updated on the remote end */
+	if (n_rows_deleted > 1)
+		ereport(ERROR,
+				(errcode (ERRCODE_FDW_ERROR), /* XXX */
+				 errmsg ("foreign server deleted %d rows when only one row was expected to be deleted",
+						 n_rows_deleted)));
+
 	/* Return NULL if nothing was deleted on the remote end */
-	return (n_rows > 0) ? slot : NULL;
+	return (n_rows_deleted > 0) ? slot : NULL;
 }
 
 /*
-- 
1.7.9.5

From 98e842769cf7b2a12628d019e3531904b4936b28 Mon Sep 17 00:00:00 2001
From: Ashutosh Bapat <ashutosh.ba...@enterprisedb.com>
Date: Wed, 18 Apr 2018 10:33:59 +0530
Subject: [PATCH 3/3] An incomplete fix for problem in non-direct UPDATE of a
 foreign table pointing to a partitioned table or an
 inheritance parent on the foreign server

When a foreign table points to a partitioned table or an inheritance
parent on the foreign server, a non-direct DML can affect multiple
rows when only one row is intended to be affected. This
happens because postgres_fdw uses only ctid to identify a row to work
on. Though ctid uniquely identifies a row in a single table, in a
partitioned table or in an inheritance hierarchy, there can be be
multiple rows, in different partitions, with the same ctid. So a
DML statement sent to the foreign server by postgres_fdw ends up
affecting more than one row, only one of which is intended to be
affected.

(ctid, tableoid) is unique across a partitioning or inheritance
hierarchy. Thus instead of using just ctid, we can use qualification
based on both ctid and tableoid. This partial commit tries to do that.
The commit adds code to add tableoid as a resjunk column in the
targetlist and pushes it down to the foreign table. But right now,
when tableoid is requested from a foreign table, foreign table's local
tableoid is returned instead of the tableoid of the table pointed by
the foreign table. The commit adds some code to handle that, but it's
not foolproof. We need to find a way to keep these two tableoid's
separate. When a user requests a tableoid, we return local tableoid of
the foreign table. For an UPDATE or a DELETE, however, we want to
fetch the tableoid from the foreign server and use it in the
qualification along-side ctid. (Note that when use requests a ctid, we
always return ctid on the foreign server since there are no rows
locally.)

Ashutosh Bapat, reviewed by Kyotaro Horiguchi
---
 contrib/postgres_fdw/deparse.c         |   24 +++++-
 contrib/postgres_fdw/postgres_fdw.c    |  128 ++++++++++++++++++++++++++------
 src/backend/executor/nodeForeignscan.c |    5 +-
 3 files changed, 128 insertions(+), 29 deletions(-)

diff --git a/contrib/postgres_fdw/deparse.c b/contrib/postgres_fdw/deparse.c
index 6e2fa14..d3c98d3 100644
--- a/contrib/postgres_fdw/deparse.c
+++ b/contrib/postgres_fdw/deparse.c
@@ -1144,7 +1144,7 @@ deparseTargetList(StringInfo buf,
 	}
 
 	/*
-	 * Add ctid and oid if needed.  We currently don't support retrieving any
+	 * Add ctid, tableoid and oid if needed.  We currently don't support retrieving any
 	 * other system columns.
 	 */
 	if (bms_is_member(SelfItemPointerAttributeNumber - FirstLowInvalidHeapAttributeNumber,
@@ -1179,6 +1179,22 @@ deparseTargetList(StringInfo buf,
 		*retrieved_attrs = lappend_int(*retrieved_attrs,
 									   ObjectIdAttributeNumber);
 	}
+	if (bms_is_member(TableOidAttributeNumber - FirstLowInvalidHeapAttributeNumber,
+					  attrs_used))
+	{
+		if (!first)
+			appendStringInfoString(buf, ", ");
+		else if (is_returning)
+			appendStringInfoString(buf, " RETURNING ");
+		first = false;
+
+		if (qualify_col)
+			ADD_REL_QUALIFIER(buf, rtindex);
+		appendStringInfoString(buf, "tableoid");
+
+		*retrieved_attrs = lappend_int(*retrieved_attrs,
+									   TableOidAttributeNumber);
+	}
 
 	/* Don't generate bad syntax if no undropped columns */
 	if (first && !is_returning)
@@ -1725,7 +1741,7 @@ deparseUpdateSql(StringInfo buf, PlannerInfo *root,
 	deparseRelation(buf, rel);
 	appendStringInfoString(buf, " SET ");
 
-	pindex = 2;					/* ctid is always the first param */
+	pindex = 3;					/* ctid, tableoid params appear first */
 	first = true;
 	foreach(lc, targetAttrs)
 	{
@@ -1739,7 +1755,7 @@ deparseUpdateSql(StringInfo buf, PlannerInfo *root,
 		appendStringInfo(buf, " = $%d", pindex);
 		pindex++;
 	}
-	appendStringInfoString(buf, " WHERE ctid = $1");
+	appendStringInfoString(buf, " WHERE ctid = $1 AND tableoid = $2");
 
 	deparseReturningList(buf, root, rtindex, rel,
 						 rel->trigdesc && rel->trigdesc->trig_update_after_row,
@@ -1854,7 +1870,7 @@ deparseDeleteSql(StringInfo buf, PlannerInfo *root,
 {
 	appendStringInfoString(buf, "DELETE FROM ");
 	deparseRelation(buf, rel);
-	appendStringInfoString(buf, " WHERE ctid = $1");
+	appendStringInfoString(buf, " WHERE ctid = $1 AND tableoid = $2");
 
 	deparseReturningList(buf, root, rtindex, rel,
 						 rel->trigdesc && rel->trigdesc->trig_delete_after_row,
diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c
index 0404e21..8761ab2 100644
--- a/contrib/postgres_fdw/postgres_fdw.c
+++ b/contrib/postgres_fdw/postgres_fdw.c
@@ -178,6 +178,7 @@ typedef struct PgFdwModifyState
 
 	/* info about parameters for prepared statement */
 	AttrNumber	ctidAttno;		/* attnum of input resjunk ctid column */
+	AttrNumber	tableoidAttno;	/* attnum of input resjunk tableoid column */
 	int			p_nums;			/* number of parameters to transmit */
 	FmgrInfo   *p_flinfo;		/* output conversion functions for them */
 
@@ -390,7 +391,7 @@ static PgFdwModifyState *create_foreign_modify(EState *estate,
 					  List *retrieved_attrs);
 static void prepare_foreign_modify(PgFdwModifyState *fmstate);
 static const char **convert_prep_stmt_params(PgFdwModifyState *fmstate,
-						 ItemPointer tupleid,
+						 ItemPointer tupleid, Oid tableoid,
 						 TupleTableSlot *slot);
 static void store_returning_result(PgFdwModifyState *fmstate,
 					   TupleTableSlot *slot, PGresult *res);
@@ -1543,26 +1544,39 @@ postgresAddForeignUpdateTargets(Query *parsetree,
 	TargetEntry *tle;
 
 	/*
-	 * In postgres_fdw, what we need is the ctid, same as for a regular table.
+	 * ctid is used to locate a row in a given table and tableoid is used to
+	 * identify a table in a partition or inheritance hierarchy.
 	 */
 
-	/* Make a Var representing the desired value */
+	/*
+	 * Make a Var representing the ctid, wrap it in a resjunk TLE with the
+	 * right name and add it to the query's targetlist.
+	 */
 	var = makeVar(parsetree->resultRelation,
 				  SelfItemPointerAttributeNumber,
 				  TIDOID,
 				  -1,
 				  InvalidOid,
 				  0);
-
-	/* Wrap it in a resjunk TLE with the right name ... */
 	attrname = "ctid";
-
 	tle = makeTargetEntry((Expr *) var,
 						  list_length(parsetree->targetList) + 1,
 						  pstrdup(attrname),
 						  true);
+	parsetree->targetList = lappend(parsetree->targetList, tle);
 
-	/* ... and add it to the query's targetlist */
+	/* Do the same for tableoid */
+	var = makeVar(parsetree->resultRelation,
+				  TableOidAttributeNumber,
+				  OIDOID,
+				  -1,
+				  InvalidOid,
+				  0);
+	attrname = "tableoid";
+	tle = makeTargetEntry((Expr *) var,
+						  list_length(parsetree->targetList) + 1,
+						  pstrdup(attrname),
+						  true);
 	parsetree->targetList = lappend(parsetree->targetList, tle);
 }
 
@@ -1751,7 +1765,7 @@ postgresExecForeignInsert(EState *estate,
 		prepare_foreign_modify(fmstate);
 
 	/* Convert parameters needed by prepared statement to text form */
-	p_values = convert_prep_stmt_params(fmstate, NULL, slot);
+	p_values = convert_prep_stmt_params(fmstate, NULL, InvalidOid, slot);
 
 	/*
 	 * Execute the prepared statement.
@@ -1806,7 +1820,8 @@ postgresExecForeignUpdate(EState *estate,
 						  TupleTableSlot *planSlot)
 {
 	PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
-	Datum		datum;
+	Datum		ctid_datum;
+	Datum		tableoid_datum;
 	bool		isNull;
 	const char **p_values;
 	PGresult   *res;
@@ -1818,16 +1833,29 @@ postgresExecForeignUpdate(EState *estate,
 		prepare_foreign_modify(fmstate);
 
 	/* Get the ctid that was passed up as a resjunk column */
-	datum = ExecGetJunkAttribute(planSlot,
-								 fmstate->ctidAttno,
-								 &isNull);
+	ctid_datum = ExecGetJunkAttribute(planSlot,
+									  fmstate->ctidAttno,
+									  &isNull);
 	/* shouldn't ever get a null result... */
 	if (isNull)
 		elog(ERROR, "ctid is NULL");
 
+	/* Get the tableoid that was passed up as a resjunk column */
+	tableoid_datum = ExecGetJunkAttribute(planSlot,
+										  fmstate->tableoidAttno,
+										  &isNull);
+	/* shouldn't ever get a null result... */
+	if (isNull)
+		elog(ERROR, "tableoid is NULL");
+
+	/* ... and should be always a valid */
+	if (!OidIsValid(DatumGetObjectId(tableoid_datum)))
+		elog(ERROR, "tableoid is invalid");
+
 	/* Convert parameters needed by prepared statement to text form */
 	p_values = convert_prep_stmt_params(fmstate,
-										(ItemPointer) DatumGetPointer(datum),
+										(ItemPointer) DatumGetPointer(ctid_datum),
+										DatumGetObjectId(tableoid_datum),
 										slot);
 
 	/*
@@ -1894,7 +1922,8 @@ postgresExecForeignDelete(EState *estate,
 						  TupleTableSlot *planSlot)
 {
 	PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
-	Datum		datum;
+	Datum		ctid_datum;
+	Datum		tableoid_datum;
 	bool		isNull;
 	const char **p_values;
 	PGresult   *res;
@@ -1906,16 +1935,29 @@ postgresExecForeignDelete(EState *estate,
 		prepare_foreign_modify(fmstate);
 
 	/* Get the ctid that was passed up as a resjunk column */
-	datum = ExecGetJunkAttribute(planSlot,
-								 fmstate->ctidAttno,
-								 &isNull);
+	ctid_datum = ExecGetJunkAttribute(planSlot,
+									  fmstate->ctidAttno,
+									  &isNull);
 	/* shouldn't ever get a null result... */
 	if (isNull)
 		elog(ERROR, "ctid is NULL");
 
+	/* Get the tableoid that was passed up as a resjunk column */
+	tableoid_datum = ExecGetJunkAttribute(planSlot,
+										  fmstate->tableoidAttno,
+										  &isNull);
+	/* shouldn't ever get a null result... */
+	if (isNull)
+		elog(ERROR, "tableoid is NULL");
+
+	/* ... and should be always a valid */
+	if (!OidIsValid(DatumGetObjectId(tableoid_datum)))
+		elog(ERROR, "tableoid is invalid");
+
 	/* Convert parameters needed by prepared statement to text form */
 	p_values = convert_prep_stmt_params(fmstate,
-										(ItemPointer) DatumGetPointer(datum),
+										(ItemPointer) DatumGetPointer(ctid_datum),
+										DatumGetObjectId(tableoid_datum),
 										NULL);
 
 	/*
@@ -3334,7 +3376,7 @@ create_foreign_modify(EState *estate,
 		fmstate->attinmeta = TupleDescGetAttInMetadata(tupdesc);
 
 	/* Prepare for output conversion of parameters used in prepared stmt. */
-	n_params = list_length(fmstate->target_attrs) + 1;
+	n_params = list_length(fmstate->target_attrs) + 2;
 	fmstate->p_flinfo = (FmgrInfo *) palloc0(sizeof(FmgrInfo) * n_params);
 	fmstate->p_nums = 0;
 
@@ -3342,16 +3384,30 @@ create_foreign_modify(EState *estate,
 	{
 		Assert(subplan != NULL);
 
-		/* Find the ctid resjunk column in the subplan's result */
+		/*
+		 * Find the ctid, tableoid resjunk columns in the subplan's result and
+		 * record those as transmittable parameters.
+		 */
+
+
+		/* First transmittable parameter will be ctid */
 		fmstate->ctidAttno = ExecFindJunkAttributeInTlist(subplan->targetlist,
 														  "ctid");
 		if (!AttributeNumberIsValid(fmstate->ctidAttno))
 			elog(ERROR, "could not find junk ctid column");
-
-		/* First transmittable parameter will be ctid */
 		getTypeOutputInfo(TIDOID, &typefnoid, &isvarlena);
 		fmgr_info(typefnoid, &fmstate->p_flinfo[fmstate->p_nums]);
 		fmstate->p_nums++;
+
+		/* Second transmittable parameter will be tableoid */
+		fmstate->tableoidAttno =
+			ExecFindJunkAttributeInTlist(subplan->targetlist,
+										 "tableoid");
+		if (!AttributeNumberIsValid(fmstate->tableoidAttno))
+			elog(ERROR, "could not find junk tableoid column");
+		getTypeOutputInfo(OIDOID, &typefnoid, &isvarlena);
+		fmgr_info(typefnoid, &fmstate->p_flinfo[fmstate->p_nums]);
+		fmstate->p_nums++;
 	}
 
 	if (operation == CMD_INSERT || operation == CMD_UPDATE)
@@ -3425,13 +3481,14 @@ prepare_foreign_modify(PgFdwModifyState *fmstate)
  *		Create array of text strings representing parameter values
  *
  * tupleid is ctid to send, or NULL if none
+ * tableoid is tableoid to send or InvalidOid if none
  * slot is slot to get remaining parameters from, or NULL if none
  *
  * Data is constructed in temp_cxt; caller should reset that after use.
  */
 static const char **
 convert_prep_stmt_params(PgFdwModifyState *fmstate,
-						 ItemPointer tupleid,
+						 ItemPointer tupleid, Oid tableoid,
 						 TupleTableSlot *slot)
 {
 	const char **p_values;
@@ -3451,6 +3508,15 @@ convert_prep_stmt_params(PgFdwModifyState *fmstate,
 		pindex++;
 	}
 
+	/* 2nd parameter should be tableoid, if it's in use */
+	if (OidIsValid(tableoid))
+	{
+		/* don't need set_transmission_modes for TID output */
+		p_values[pindex] = OutputFunctionCall(&fmstate->p_flinfo[pindex],
+											  ObjectIdGetDatum(tableoid));
+		pindex++;
+	}
+
 	/* get following parameters from slot */
 	if (slot != NULL && fmstate->target_attrs != NIL)
 	{
@@ -5549,6 +5615,7 @@ make_tuple_from_result_row(PGresult *res,
 	bool	   *nulls;
 	ItemPointer ctid = NULL;
 	Oid			oid = InvalidOid;
+	Oid			tableoid = InvalidOid;
 	ConversionLocation errpos;
 	ErrorContextCallback errcallback;
 	MemoryContext oldcontext;
@@ -5642,6 +5709,18 @@ make_tuple_from_result_row(PGresult *res,
 				oid = DatumGetObjectId(datum);
 			}
 		}
+		else if (i == TableOidAttributeNumber)
+		{
+			/* tableoid */
+			if (valstr != NULL)
+			{
+				Datum		datum;
+
+				datum = DirectFunctionCall1(oidin, CStringGetDatum(valstr));
+				tableoid = DatumGetObjectId(datum);
+			}
+		}
+
 		errpos.cur_attno = 0;
 
 		j++;
@@ -5691,6 +5770,9 @@ make_tuple_from_result_row(PGresult *res,
 	if (OidIsValid(oid))
 		HeapTupleSetOid(tuple, oid);
 
+	if (OidIsValid(tableoid))
+		tuple->t_tableOid = tableoid;
+
 	/* Clean up */
 	MemoryContextReset(temp_context);
 
diff --git a/src/backend/executor/nodeForeignscan.c b/src/backend/executor/nodeForeignscan.c
index a2a28b7..8ebfdfd 100644
--- a/src/backend/executor/nodeForeignscan.c
+++ b/src/backend/executor/nodeForeignscan.c
@@ -58,13 +58,14 @@ ForeignNext(ForeignScanState *node)
 	 * If any system columns are requested, we have to force the tuple into
 	 * physical-tuple form to avoid "cannot extract system attribute from
 	 * virtual tuple" errors later.  We also insert a valid value for
-	 * tableoid, which is the only actually-useful system column.
+	 * tableoid, in case FDW has not set it as per its needs.
 	 */
 	if (plan->fsSystemCol && !TupIsNull(slot))
 	{
 		HeapTuple	tup = ExecMaterializeSlot(slot);
 
-		tup->t_tableOid = RelationGetRelid(node->ss.ss_currentRelation);
+		if (!OidIsValid(tup->t_tableOid))
+			tup->t_tableOid = RelationGetRelid(node->ss.ss_currentRelation);
 	}
 
 	return slot;
-- 
1.7.9.5

Reply via email to