Hi, Consider this scenario postgres=# CREATE TABLE plt (a int, b int) PARTITION BY LIST(a); postgres=# CREATE TABLE plt_p1 PARTITION OF plt FOR VALUES IN (1); postgres=# CREATE TABLE plt_p2 PARTITION OF plt FOR VALUES IN (2); postgres=# INSERT INTO plt VALUES (1, 1), (2, 2); postgres=# CREATE FOREIGN TABLE fplt (a int, b int) SERVER loopback OPTIONS (table_name 'plt'); postgres=# SELECT tableoid::regclass, ctid, * FROM fplt; tableoid | ctid | a | b ----------+-------+---+--- fplt | (0,1) | 1 | 1 fplt | (0,1) | 2 | 2 (2 rows)
-- Need to use random() so that following update doesn't turn into a direct UPDATE. postgres=# EXPLAIN (VERBOSE, COSTS OFF) postgres-# UPDATE fplt SET b = (CASE WHEN random() <= 1 THEN 10 ELSE 20 END) WHERE a = 1; QUERY PLAN -------------------------------------------------------------------------------------------- Update on public.fplt Remote SQL: UPDATE public.plt SET b = $2 WHERE ctid = $1 -> Foreign Scan on public.fplt Output: a, CASE WHEN (random() <= '1'::double precision) THEN 10 ELSE 20 END, ctid Remote SQL: SELECT a, ctid FROM public.plt WHERE ((a = 1)) FOR UPDATE (5 rows) postgres=# UPDATE fplt SET b = (CASE WHEN random() <= 1 THEN 10 ELSE 20 END) WHERE a = 1; postgres=# SELECT tableoid::regclass, ctid, * FROM fplt; tableoid | ctid | a | b ----------+-------+---+---- fplt | (0,2) | 1 | 10 fplt | (0,2) | 2 | 10 (2 rows) We expect only 1 row with a = 1 to be updated, but both the rows get updated. This happens because both the rows has ctid = (0, 1) and that's the only qualification used for UPDATE and DELETE. Thus when a non-direct UPDATE is run on a foreign table which points to a partitioned table or inheritance hierarchy on the foreign server, it will update rows from all child table which have ctids same as the qualifying rows. Same is the case with DELETE. There are two ways to fix this 1. Use WHERE CURRENT OF with cursors to update rows. This means that we fetch only one row at a time and update it. This can slow down the execution drastically. 2. Along with ctid use tableoid as a qualifier i.e. WHERE clause of UPDATE/DELETE statement has ctid = $1 AND tableoid = $2 as conditions. PFA patch along the lines of 2nd approach and along with the testcases. The idea is to inject tableoid attribute to be fetched from the foreign server similar to ctid and then add it to the DML statement being constructed. It does fix the problem. But the patch as is interferes with the way we handle tableoid currently. That can be seen from the regression diffs that the patch causes. RIght now, every tableoid reference gets converted into the tableoid of the foreign table (and not the tableoid of the foreign table). Somehow we need to differentiate between the tableoid injected for DML and tableoid references added by the user in the original query and then use tableoid on the foreign server for the first and local foreign table's oid for the second. Right now, I don't see a simple way to do that. -- Best Wishes, Ashutosh Bapat EnterpriseDB Corporation The Postgres Database Company
diff --git a/contrib/postgres_fdw/deparse.c b/contrib/postgres_fdw/deparse.c index 6e2fa14..d3c98d3 100644 --- a/contrib/postgres_fdw/deparse.c +++ b/contrib/postgres_fdw/deparse.c @@ -1144,7 +1144,7 @@ deparseTargetList(StringInfo buf, } /* - * Add ctid and oid if needed. We currently don't support retrieving any + * Add ctid, tableoid and oid if needed. We currently don't support retrieving any * other system columns. */ if (bms_is_member(SelfItemPointerAttributeNumber - FirstLowInvalidHeapAttributeNumber, @@ -1179,6 +1179,22 @@ deparseTargetList(StringInfo buf, *retrieved_attrs = lappend_int(*retrieved_attrs, ObjectIdAttributeNumber); } + if (bms_is_member(TableOidAttributeNumber - FirstLowInvalidHeapAttributeNumber, + attrs_used)) + { + if (!first) + appendStringInfoString(buf, ", "); + else if (is_returning) + appendStringInfoString(buf, " RETURNING "); + first = false; + + if (qualify_col) + ADD_REL_QUALIFIER(buf, rtindex); + appendStringInfoString(buf, "tableoid"); + + *retrieved_attrs = lappend_int(*retrieved_attrs, + TableOidAttributeNumber); + } /* Don't generate bad syntax if no undropped columns */ if (first && !is_returning) @@ -1725,7 +1741,7 @@ deparseUpdateSql(StringInfo buf, PlannerInfo *root, deparseRelation(buf, rel); appendStringInfoString(buf, " SET "); - pindex = 2; /* ctid is always the first param */ + pindex = 3; /* ctid, tableoid params appear first */ first = true; foreach(lc, targetAttrs) { @@ -1739,7 +1755,7 @@ deparseUpdateSql(StringInfo buf, PlannerInfo *root, appendStringInfo(buf, " = $%d", pindex); pindex++; } - appendStringInfoString(buf, " WHERE ctid = $1"); + appendStringInfoString(buf, " WHERE ctid = $1 AND tableoid = $2"); deparseReturningList(buf, root, rtindex, rel, rel->trigdesc && rel->trigdesc->trig_update_after_row, @@ -1854,7 +1870,7 @@ deparseDeleteSql(StringInfo buf, PlannerInfo *root, { appendStringInfoString(buf, "DELETE FROM "); deparseRelation(buf, rel); - appendStringInfoString(buf, " WHERE ctid = $1"); + appendStringInfoString(buf, " WHERE ctid = $1 AND tableoid = $2"); deparseReturningList(buf, root, rtindex, rel, rel->trigdesc && rel->trigdesc->trig_delete_after_row, diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out index e4d9469..771376d 100644 --- a/contrib/postgres_fdw/expected/postgres_fdw.out +++ b/contrib/postgres_fdw/expected/postgres_fdw.out @@ -6923,6 +6923,46 @@ SELECT tableoid::regclass, * FROM ONLY a; DROP TABLE a CASCADE; NOTICE: drop cascades to foreign table b DROP TABLE loct; +-- test UPDATE on a foreign table pointing to an inheritance hierarchy on the +-- remote server +CREATE TABLE a(aa TEXT); +ALTER TABLE a SET (autovacuum_enabled = 'false'); +CREATE TABLE b() INHERITS(a); +ALTER TABLE b SET (autovacuum_enabled = 'false'); +INSERT INTO a(aa) VALUES('aaa'); +INSERT INTO b(aa) VALUES('bbb'); +CREATE FOREIGN TABLE fa (aa TEXT) SERVER loopback OPTIONS (table_name 'a'); +SELECT tableoid::regclass, ctid, * FROM fa; + tableoid | ctid | aa +----------+-------+----- + fa | (0,1) | aaa + fa | (0,1) | bbb +(2 rows) + +-- use random() so that UPDATE statement is not pushed down to the foreign +-- server +EXPLAIN (VERBOSE, COSTS OFF) +UPDATE fa SET aa = (CASE WHEN random() <= 1 THEN 'zzzz' ELSE NULL END) WHERE aa = 'aaa'; + QUERY PLAN +----------------------------------------------------------------------------------------------------------- + Update on public.fa + Remote SQL: UPDATE public.a SET aa = $2 WHERE ctid = $1 + -> Foreign Scan on public.fa + Output: CASE WHEN (random() <= '1'::double precision) THEN 'zzzz'::text ELSE NULL::text END, ctid + Remote SQL: SELECT ctid FROM public.a WHERE ((aa = 'aaa'::text)) FOR UPDATE +(5 rows) + +UPDATE fa SET aa = (CASE WHEN random() <= 1 THEN 'zzzz' ELSE NULL END) WHERE aa = 'aaa'; +SELECT tableoid::regclass, ctid, * FROM fa; + tableoid | ctid | aa +----------+-------+------ + fa | (0,2) | zzzz + fa | (0,2) | bbb +(2 rows) + +DROP FOREIGN TABLE fa; +DROP TABLE a CASCADE; +NOTICE: drop cascades to table b -- Check SELECT FOR UPDATE/SHARE with an inherited source table create table loct1 (f1 int, f2 int, f3 int); create table loct2 (f1 int, f2 int, f3 int); @@ -8317,3 +8357,39 @@ SELECT b, avg(a), max(a), count(*) FROM pagg_tab GROUP BY b HAVING sum(a) < 700 -- Clean-up RESET enable_partitionwise_aggregate; +-- test UPDATE on foreign table pointing to a foreign partitioned table +CREATE TABLE plt (a int, b int) PARTITION BY LIST(a); +CREATE TABLE plt_p1 PARTITION OF plt FOR VALUES IN (1); +CREATE TABLE plt_p2 PARTITION OF plt FOR VALUES IN (2); +INSERT INTO plt VALUES (1, 1), (2, 2); +CREATE FOREIGN TABLE fplt (a int, b int) SERVER loopback OPTIONS (table_name 'plt'); +SELECT tableoid::regclass, ctid, * FROM fplt; + tableoid | ctid | a | b +----------+-------+---+--- + fplt | (0,1) | 1 | 1 + fplt | (0,1) | 2 | 2 +(2 rows) + +-- use random() so that UPDATE statement is not pushed down to the foreign +-- server +EXPLAIN (VERBOSE, COSTS OFF) +UPDATE fplt SET b = (CASE WHEN random() <= 1 THEN 10 ELSE 20 END) WHERE a = 1; + QUERY PLAN +-------------------------------------------------------------------------------------------- + Update on public.fplt + Remote SQL: UPDATE public.plt SET b = $2 WHERE ctid = $1 + -> Foreign Scan on public.fplt + Output: a, CASE WHEN (random() <= '1'::double precision) THEN 10 ELSE 20 END, ctid + Remote SQL: SELECT a, ctid FROM public.plt WHERE ((a = 1)) FOR UPDATE +(5 rows) + +UPDATE fplt SET b = (CASE WHEN random() <= 1 THEN 10 ELSE 20 END) WHERE a = 1; +SELECT tableoid::regclass, ctid, * FROM fplt; + tableoid | ctid | a | b +----------+-------+---+---- + fplt | (0,2) | 1 | 10 + fplt | (0,2) | 2 | 2 +(2 rows) + +DROP TABLE plt; +DROP FOREIGN TABLE fplt; diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c index 30e5726..e1c2639 100644 --- a/contrib/postgres_fdw/postgres_fdw.c +++ b/contrib/postgres_fdw/postgres_fdw.c @@ -178,6 +178,7 @@ typedef struct PgFdwModifyState /* info about parameters for prepared statement */ AttrNumber ctidAttno; /* attnum of input resjunk ctid column */ + AttrNumber tableoidAttno; /* attnum of input resjunk tableoid column */ int p_nums; /* number of parameters to transmit */ FmgrInfo *p_flinfo; /* output conversion functions for them */ @@ -390,7 +391,7 @@ static PgFdwModifyState *create_foreign_modify(EState *estate, List *retrieved_attrs); static void prepare_foreign_modify(PgFdwModifyState *fmstate); static const char **convert_prep_stmt_params(PgFdwModifyState *fmstate, - ItemPointer tupleid, + ItemPointer tupleid, Oid tableoid, TupleTableSlot *slot); static void store_returning_result(PgFdwModifyState *fmstate, TupleTableSlot *slot, PGresult *res); @@ -1543,26 +1544,39 @@ postgresAddForeignUpdateTargets(Query *parsetree, TargetEntry *tle; /* - * In postgres_fdw, what we need is the ctid, same as for a regular table. + * ctid is used to locate a row in a given table and tableoid is used to + * identify a table in a partition or inheritance hierarchy. */ - /* Make a Var representing the desired value */ + /* + * Make a Var representing the ctid, wrap it in a resjunk TLE with the + * right name and add it to the query's targetlist. + */ var = makeVar(parsetree->resultRelation, SelfItemPointerAttributeNumber, TIDOID, -1, InvalidOid, 0); - - /* Wrap it in a resjunk TLE with the right name ... */ attrname = "ctid"; - tle = makeTargetEntry((Expr *) var, list_length(parsetree->targetList) + 1, pstrdup(attrname), true); + parsetree->targetList = lappend(parsetree->targetList, tle); - /* ... and add it to the query's targetlist */ + /* Do the same for tableoid */ + var = makeVar(parsetree->resultRelation, + TableOidAttributeNumber, + OIDOID, + -1, + InvalidOid, + 0); + attrname = "tableoid"; + tle = makeTargetEntry((Expr *) var, + list_length(parsetree->targetList) + 1, + pstrdup(attrname), + true); parsetree->targetList = lappend(parsetree->targetList, tle); } @@ -1751,7 +1765,7 @@ postgresExecForeignInsert(EState *estate, prepare_foreign_modify(fmstate); /* Convert parameters needed by prepared statement to text form */ - p_values = convert_prep_stmt_params(fmstate, NULL, slot); + p_values = convert_prep_stmt_params(fmstate, NULL, InvalidOid, slot); /* * Execute the prepared statement. @@ -1806,7 +1820,8 @@ postgresExecForeignUpdate(EState *estate, TupleTableSlot *planSlot) { PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState; - Datum datum; + Datum ctid_datum; + Datum tableoid_datum; bool isNull; const char **p_values; PGresult *res; @@ -1817,16 +1832,29 @@ postgresExecForeignUpdate(EState *estate, prepare_foreign_modify(fmstate); /* Get the ctid that was passed up as a resjunk column */ - datum = ExecGetJunkAttribute(planSlot, - fmstate->ctidAttno, - &isNull); + ctid_datum = ExecGetJunkAttribute(planSlot, + fmstate->ctidAttno, + &isNull); /* shouldn't ever get a null result... */ if (isNull) elog(ERROR, "ctid is NULL"); + /* Get the tableoid that was passed up as a resjunk column */ + tableoid_datum = ExecGetJunkAttribute(planSlot, + fmstate->tableoidAttno, + &isNull); + /* shouldn't ever get a null result... */ + if (isNull) + elog(ERROR, "tableoid is NULL"); + + /* ... and should be always a valid */ + if (!OidIsValid(DatumGetObjectId(tableoid_datum))) + elog(ERROR, "tableoid is invalid"); + /* Convert parameters needed by prepared statement to text form */ p_values = convert_prep_stmt_params(fmstate, - (ItemPointer) DatumGetPointer(datum), + (ItemPointer) DatumGetPointer(ctid_datum), + DatumGetObjectId(tableoid_datum), slot); /* @@ -1882,7 +1910,8 @@ postgresExecForeignDelete(EState *estate, TupleTableSlot *planSlot) { PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState; - Datum datum; + Datum ctid_datum; + Datum tableoid_datum; bool isNull; const char **p_values; PGresult *res; @@ -1893,16 +1922,29 @@ postgresExecForeignDelete(EState *estate, prepare_foreign_modify(fmstate); /* Get the ctid that was passed up as a resjunk column */ - datum = ExecGetJunkAttribute(planSlot, - fmstate->ctidAttno, - &isNull); + ctid_datum = ExecGetJunkAttribute(planSlot, + fmstate->ctidAttno, + &isNull); /* shouldn't ever get a null result... */ if (isNull) elog(ERROR, "ctid is NULL"); + /* Get the tableoid that was passed up as a resjunk column */ + tableoid_datum = ExecGetJunkAttribute(planSlot, + fmstate->tableoidAttno, + &isNull); + /* shouldn't ever get a null result... */ + if (isNull) + elog(ERROR, "tableoid is NULL"); + + /* ... and should be always a valid */ + if (!OidIsValid(DatumGetObjectId(tableoid_datum))) + elog(ERROR, "tableoid is invalid"); + /* Convert parameters needed by prepared statement to text form */ p_values = convert_prep_stmt_params(fmstate, - (ItemPointer) DatumGetPointer(datum), + (ItemPointer) DatumGetPointer(ctid_datum), + DatumGetObjectId(tableoid_datum), NULL); /* @@ -3310,7 +3352,7 @@ create_foreign_modify(EState *estate, fmstate->attinmeta = TupleDescGetAttInMetadata(tupdesc); /* Prepare for output conversion of parameters used in prepared stmt. */ - n_params = list_length(fmstate->target_attrs) + 1; + n_params = list_length(fmstate->target_attrs) + 2; fmstate->p_flinfo = (FmgrInfo *) palloc0(sizeof(FmgrInfo) * n_params); fmstate->p_nums = 0; @@ -3318,16 +3360,30 @@ create_foreign_modify(EState *estate, { Assert(subplan != NULL); - /* Find the ctid resjunk column in the subplan's result */ + /* + * Find the ctid, tableoid resjunk columns in the subplan's result and + * record those as transmittable parameters. + */ + + + /* First transmittable parameter will be ctid */ fmstate->ctidAttno = ExecFindJunkAttributeInTlist(subplan->targetlist, "ctid"); if (!AttributeNumberIsValid(fmstate->ctidAttno)) elog(ERROR, "could not find junk ctid column"); - - /* First transmittable parameter will be ctid */ getTypeOutputInfo(TIDOID, &typefnoid, &isvarlena); fmgr_info(typefnoid, &fmstate->p_flinfo[fmstate->p_nums]); fmstate->p_nums++; + + /* Second transmittable parameter will be tableoid */ + fmstate->tableoidAttno = + ExecFindJunkAttributeInTlist(subplan->targetlist, + "tableoid"); + if (!AttributeNumberIsValid(fmstate->tableoidAttno)) + elog(ERROR, "could not find junk tableoid column"); + getTypeOutputInfo(OIDOID, &typefnoid, &isvarlena); + fmgr_info(typefnoid, &fmstate->p_flinfo[fmstate->p_nums]); + fmstate->p_nums++; } if (operation == CMD_INSERT || operation == CMD_UPDATE) @@ -3401,13 +3457,14 @@ prepare_foreign_modify(PgFdwModifyState *fmstate) * Create array of text strings representing parameter values * * tupleid is ctid to send, or NULL if none + * tableoid is tableoid to send or InvalidOid if none * slot is slot to get remaining parameters from, or NULL if none * * Data is constructed in temp_cxt; caller should reset that after use. */ static const char ** convert_prep_stmt_params(PgFdwModifyState *fmstate, - ItemPointer tupleid, + ItemPointer tupleid, Oid tableoid, TupleTableSlot *slot) { const char **p_values; @@ -3427,6 +3484,15 @@ convert_prep_stmt_params(PgFdwModifyState *fmstate, pindex++; } + /* 2nd parameter should be tableoid, if it's in use */ + if (OidIsValid(tableoid)) + { + /* don't need set_transmission_modes for TID output */ + p_values[pindex] = OutputFunctionCall(&fmstate->p_flinfo[pindex], + ObjectIdGetDatum(tableoid)); + pindex++; + } + /* get following parameters from slot */ if (slot != NULL && fmstate->target_attrs != NIL) { @@ -5525,6 +5591,7 @@ make_tuple_from_result_row(PGresult *res, bool *nulls; ItemPointer ctid = NULL; Oid oid = InvalidOid; + Oid tableoid = InvalidOid; ConversionLocation errpos; ErrorContextCallback errcallback; MemoryContext oldcontext; @@ -5618,6 +5685,18 @@ make_tuple_from_result_row(PGresult *res, oid = DatumGetObjectId(datum); } } + else if (i == TableOidAttributeNumber) + { + /* tableoid */ + if (valstr != NULL) + { + Datum datum; + + datum = DirectFunctionCall1(oidin, CStringGetDatum(valstr)); + tableoid = DatumGetObjectId(datum); + } + } + errpos.cur_attno = 0; j++; @@ -5667,6 +5746,9 @@ make_tuple_from_result_row(PGresult *res, if (OidIsValid(oid)) HeapTupleSetOid(tuple, oid); + if (OidIsValid(tableoid)) + tuple->t_tableOid = tableoid; + /* Clean up */ MemoryContextReset(temp_context); diff --git a/contrib/postgres_fdw/sql/postgres_fdw.sql b/contrib/postgres_fdw/sql/postgres_fdw.sql index e1df952..19d1c80 100644 --- a/contrib/postgres_fdw/sql/postgres_fdw.sql +++ b/contrib/postgres_fdw/sql/postgres_fdw.sql @@ -1648,6 +1648,27 @@ SELECT tableoid::regclass, * FROM ONLY a; DROP TABLE a CASCADE; DROP TABLE loct; +-- test UPDATE on a foreign table pointing to an inheritance hierarchy on the +-- remote server +CREATE TABLE a(aa TEXT); +ALTER TABLE a SET (autovacuum_enabled = 'false'); +CREATE TABLE b() INHERITS(a); +ALTER TABLE b SET (autovacuum_enabled = 'false'); +INSERT INTO a(aa) VALUES('aaa'); +INSERT INTO b(aa) VALUES('bbb'); +CREATE FOREIGN TABLE fa (aa TEXT) SERVER loopback OPTIONS (table_name 'a'); + +SELECT tableoid::regclass, ctid, * FROM fa; +-- use random() so that UPDATE statement is not pushed down to the foreign +-- server +EXPLAIN (VERBOSE, COSTS OFF) +UPDATE fa SET aa = (CASE WHEN random() <= 1 THEN 'zzzz' ELSE NULL END) WHERE aa = 'aaa'; +UPDATE fa SET aa = (CASE WHEN random() <= 1 THEN 'zzzz' ELSE NULL END) WHERE aa = 'aaa'; +SELECT tableoid::regclass, ctid, * FROM fa; + +DROP FOREIGN TABLE fa; +DROP TABLE a CASCADE; + -- Check SELECT FOR UPDATE/SHARE with an inherited source table create table loct1 (f1 int, f2 int, f3 int); create table loct2 (f1 int, f2 int, f3 int); @@ -2220,3 +2241,20 @@ SELECT b, avg(a), max(a), count(*) FROM pagg_tab GROUP BY b HAVING sum(a) < 700 -- Clean-up RESET enable_partitionwise_aggregate; + +-- test UPDATE on foreign table pointing to a foreign partitioned table +CREATE TABLE plt (a int, b int) PARTITION BY LIST(a); +CREATE TABLE plt_p1 PARTITION OF plt FOR VALUES IN (1); +CREATE TABLE plt_p2 PARTITION OF plt FOR VALUES IN (2); +INSERT INTO plt VALUES (1, 1), (2, 2); +CREATE FOREIGN TABLE fplt (a int, b int) SERVER loopback OPTIONS (table_name 'plt'); +SELECT tableoid::regclass, ctid, * FROM fplt; +-- use random() so that UPDATE statement is not pushed down to the foreign +-- server +EXPLAIN (VERBOSE, COSTS OFF) +UPDATE fplt SET b = (CASE WHEN random() <= 1 THEN 10 ELSE 20 END) WHERE a = 1; +UPDATE fplt SET b = (CASE WHEN random() <= 1 THEN 10 ELSE 20 END) WHERE a = 1; +SELECT tableoid::regclass, ctid, * FROM fplt; + +DROP TABLE plt; +DROP FOREIGN TABLE fplt; diff --git a/src/backend/executor/nodeForeignscan.c b/src/backend/executor/nodeForeignscan.c index a2a28b7..8ebfdfd 100644 --- a/src/backend/executor/nodeForeignscan.c +++ b/src/backend/executor/nodeForeignscan.c @@ -58,13 +58,14 @@ ForeignNext(ForeignScanState *node) * If any system columns are requested, we have to force the tuple into * physical-tuple form to avoid "cannot extract system attribute from * virtual tuple" errors later. We also insert a valid value for - * tableoid, which is the only actually-useful system column. + * tableoid, in case FDW has not set it as per its needs. */ if (plan->fsSystemCol && !TupIsNull(slot)) { HeapTuple tup = ExecMaterializeSlot(slot); - tup->t_tableOid = RelationGetRelid(node->ss.ss_currentRelation); + if (!OidIsValid(tup->t_tableOid)) + tup->t_tableOid = RelationGetRelid(node->ss.ss_currentRelation); } return slot;