At Mon, 04 Jun 2018 20:58:28 +0900 (Tokyo Standard Time), Kyotaro HORIGUCHI 
<horiguchi.kyot...@lab.ntt.co.jp> wrote in 
<20180604.205828.208262556.horiguchi.kyot...@lab.ntt.co.jp>
> Hello.
> 
> At Fri, 1 Jun 2018 10:21:39 -0400, Ashutosh Bapat 
> <ashutosh.ba...@enterprisedb.com> wrote in 
> <CAFjFpRdraYcQnD4tKzNuP1uP6L-gnizi4HLU_UA=28q2m4z...@mail.gmail.com>
> > I am not suggesting to commit 0003 in my patch set, but just 0001 and
> > 0002 which just raise an error when multiple rows get updated when
> > only one row is expected to be updated.
> 
> I reconsidered Tom's suggestion and found a way to fix this
> problem avoiding FDW-API change.

The patch just sent contains changes of execnodes.h, which is
useless.

regres.

-- 
Kyotaro Horiguchi
NTT Open Source Software Center


diff --git a/contrib/postgres_fdw/deparse.c b/contrib/postgres_fdw/deparse.c
index d272719ff4..503e705c4c 100644
--- a/contrib/postgres_fdw/deparse.c
+++ b/contrib/postgres_fdw/deparse.c
@@ -1107,11 +1107,17 @@ deparseTargetList(StringInfo buf,
 				  bool qualify_col,
 				  List **retrieved_attrs)
 {
+	static int	check_attrs[4];
+	static char *check_attr_names[] = {"ctid", "oid", "tableoid"};
 	TupleDesc	tupdesc = RelationGetDescr(rel);
 	bool		have_wholerow;
 	bool		first;
 	int			i;
 
+	check_attrs[0] = SelfItemPointerAttributeNumber;
+	check_attrs[1] = ObjectIdAttributeNumber;
+	check_attrs[2] = TableOidAttributeNumber;
+	check_attrs[3] = FirstLowInvalidHeapAttributeNumber;
 	*retrieved_attrs = NIL;
 
 	/* If there's a whole-row reference, we'll need all the columns. */
@@ -1143,41 +1149,27 @@ deparseTargetList(StringInfo buf,
 		}
 	}
 
-	/*
-	 * Add ctid and oid if needed.  We currently don't support retrieving any
-	 * other system columns.
-	 */
-	if (bms_is_member(SelfItemPointerAttributeNumber - FirstLowInvalidHeapAttributeNumber,
-					  attrs_used))
+	for (i = 0 ; check_attrs[i] != FirstLowInvalidHeapAttributeNumber ; i++)
 	{
-		if (!first)
-			appendStringInfoString(buf, ", ");
-		else if (is_returning)
-			appendStringInfoString(buf, " RETURNING ");
-		first = false;
+		int	attr = check_attrs[i];
+		char *attr_name = check_attr_names[i];
 
-		if (qualify_col)
-			ADD_REL_QUALIFIER(buf, rtindex);
-		appendStringInfoString(buf, "ctid");
+		/* Add system columns if needed. */
+		if (bms_is_member(attr - FirstLowInvalidHeapAttributeNumber,
+						  attrs_used))
+		{
+			if (!first)
+				appendStringInfoString(buf, ", ");
+			else if (is_returning)
+				appendStringInfoString(buf, " RETURNING ");
+			first = false;
 
-		*retrieved_attrs = lappend_int(*retrieved_attrs,
-									   SelfItemPointerAttributeNumber);
-	}
-	if (bms_is_member(ObjectIdAttributeNumber - FirstLowInvalidHeapAttributeNumber,
-					  attrs_used))
-	{
-		if (!first)
-			appendStringInfoString(buf, ", ");
-		else if (is_returning)
-			appendStringInfoString(buf, " RETURNING ");
-		first = false;
+			if (qualify_col)
+				ADD_REL_QUALIFIER(buf, rtindex);
+			appendStringInfoString(buf, attr_name);
 
-		if (qualify_col)
-			ADD_REL_QUALIFIER(buf, rtindex);
-		appendStringInfoString(buf, "oid");
-
-		*retrieved_attrs = lappend_int(*retrieved_attrs,
-									   ObjectIdAttributeNumber);
+			*retrieved_attrs = lappend_int(*retrieved_attrs, attr);
+		}
 	}
 
 	/* Don't generate bad syntax if no undropped columns */
@@ -1725,7 +1717,7 @@ deparseUpdateSql(StringInfo buf, RangeTblEntry *rte,
 	deparseRelation(buf, rel);
 	appendStringInfoString(buf, " SET ");
 
-	pindex = 2;					/* ctid is always the first param */
+	pindex = 3;			/* tableoid and ctid are always the first param */
 	first = true;
 	foreach(lc, targetAttrs)
 	{
@@ -1739,7 +1731,7 @@ deparseUpdateSql(StringInfo buf, RangeTblEntry *rte,
 		appendStringInfo(buf, " = $%d", pindex);
 		pindex++;
 	}
-	appendStringInfoString(buf, " WHERE ctid = $1");
+	appendStringInfoString(buf, " WHERE tableoid = $1 AND ctid = $2");
 
 	deparseReturningList(buf, rte, rtindex, rel,
 						 rel->trigdesc && rel->trigdesc->trig_update_after_row,
@@ -1855,7 +1847,7 @@ deparseDeleteSql(StringInfo buf, RangeTblEntry *rte,
 {
 	appendStringInfoString(buf, "DELETE FROM ");
 	deparseRelation(buf, rel);
-	appendStringInfoString(buf, " WHERE ctid = $1");
+	appendStringInfoString(buf, " WHERE tableoid = $1 AND ctid = $2");
 
 	deparseReturningList(buf, rte, rtindex, rel,
 						 rel->trigdesc && rel->trigdesc->trig_delete_after_row,
diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c
index 78b0f43ca8..7557d9add7 100644
--- a/contrib/postgres_fdw/postgres_fdw.c
+++ b/contrib/postgres_fdw/postgres_fdw.c
@@ -73,7 +73,9 @@ enum FdwScanPrivateIndex
 	 * String describing join i.e. names of relations being joined and types
 	 * of join, added when the scan is join
 	 */
-	FdwScanPrivateRelations
+	FdwScanPrivateRelations,
+
+	FdwScanTupleIdParamIds
 };
 
 /*
@@ -95,7 +97,8 @@ enum FdwModifyPrivateIndex
 	/* has-returning flag (as an integer Value node) */
 	FdwModifyPrivateHasReturning,
 	/* Integer list of attribute numbers retrieved by RETURNING */
-	FdwModifyPrivateRetrievedAttrs
+	FdwModifyPrivateRetrievedAttrs,
+	FdwModifyPrivateTidParams
 };
 
 /*
@@ -156,6 +159,8 @@ typedef struct PgFdwScanState
 	MemoryContext temp_cxt;		/* context for per-tuple temporary data */
 
 	int			fetch_size;		/* number of tuples per fetch */
+
+	int		   *tid_params;
 } PgFdwScanState;
 
 /*
@@ -178,6 +183,7 @@ typedef struct PgFdwModifyState
 
 	/* info about parameters for prepared statement */
 	AttrNumber	ctidAttno;		/* attnum of input resjunk ctid column */
+	int			*tid_params;
 	int			p_nums;			/* number of parameters to transmit */
 	FmgrInfo   *p_flinfo;		/* output conversion functions for them */
 
@@ -293,9 +299,6 @@ static void postgresBeginForeignScan(ForeignScanState *node, int eflags);
 static TupleTableSlot *postgresIterateForeignScan(ForeignScanState *node);
 static void postgresReScanForeignScan(ForeignScanState *node);
 static void postgresEndForeignScan(ForeignScanState *node);
-static void postgresAddForeignUpdateTargets(Query *parsetree,
-								RangeTblEntry *target_rte,
-								Relation target_relation);
 static List *postgresPlanForeignModify(PlannerInfo *root,
 						  ModifyTable *plan,
 						  Index resultRelation,
@@ -388,9 +391,11 @@ static PgFdwModifyState *create_foreign_modify(EState *estate,
 					  char *query,
 					  List *target_attrs,
 					  bool has_returning,
-					  List *retrieved_attrs);
+					  List *retrieved_attrs,
+					  int *tid_params);
 static void prepare_foreign_modify(PgFdwModifyState *fmstate);
 static const char **convert_prep_stmt_params(PgFdwModifyState *fmstate,
+						 Oid tableoid,
 						 ItemPointer tupleid,
 						 TupleTableSlot *slot);
 static void store_returning_result(PgFdwModifyState *fmstate,
@@ -471,7 +476,7 @@ postgres_fdw_handler(PG_FUNCTION_ARGS)
 	routine->EndForeignScan = postgresEndForeignScan;
 
 	/* Functions for updating foreign tables */
-	routine->AddForeignUpdateTargets = postgresAddForeignUpdateTargets;
+	routine->AddForeignUpdateTargets = NULL;
 	routine->PlanForeignModify = postgresPlanForeignModify;
 	routine->BeginForeignModify = postgresBeginForeignModify;
 	routine->ExecForeignInsert = postgresExecForeignInsert;
@@ -595,6 +600,26 @@ postgresGetForeignRelSize(PlannerInfo *root,
 					   &fpinfo->attrs_used);
 	}
 
+	/*
+	 * ctid and tableoid are required for UPDATE and DELETE.
+	 */
+	if (root->parse->commandType == CMD_UPDATE ||
+		root->parse->commandType == CMD_DELETE)
+	{
+		fpinfo->param_attrs =
+			bms_add_member(fpinfo->param_attrs,
+						   SelfItemPointerAttributeNumber -
+						   FirstLowInvalidHeapAttributeNumber);
+
+		fpinfo->param_attrs =
+			bms_add_member(fpinfo->param_attrs,
+						   TableOidAttributeNumber -
+						   FirstLowInvalidHeapAttributeNumber);
+
+		fpinfo->attrs_used =
+			bms_add_members(fpinfo->attrs_used, fpinfo->param_attrs);
+	}
+
 	/*
 	 * Compute the selectivity and cost of the local_conds, so we don't have
 	 * to do it over again for each path.  The best we can do for these
@@ -1116,6 +1141,61 @@ postgresGetForeignPaths(PlannerInfo *root,
 	}
 }
 
+/*
+ * Select a PARAM_EXEC number to identify the given Var as a parameter for
+ * the current subquery, or for a nestloop's inner scan.
+ * If the Var already has a param in the current context, return that one.
+ */
+static int
+assign_param_for_var(PlannerInfo *root, Var *var)
+{
+	ListCell   *ppl;
+	PlannerParamItem *pitem;
+	Index		levelsup;
+
+	/* Find the query level the Var belongs to */
+	for (levelsup = var->varlevelsup; levelsup > 0; levelsup--)
+		root = root->parent_root;
+
+	/* If there's already a matching PlannerParamItem there, just use it */
+	foreach(ppl, root->plan_params)
+	{
+		pitem = (PlannerParamItem *) lfirst(ppl);
+		if (IsA(pitem->item, Var))
+		{
+			Var		   *pvar = (Var *) pitem->item;
+
+			/*
+			 * This comparison must match _equalVar(), except for ignoring
+			 * varlevelsup.  Note that _equalVar() ignores the location.
+			 */
+			if (pvar->varno == var->varno &&
+				pvar->varattno == var->varattno &&
+				pvar->vartype == var->vartype &&
+				pvar->vartypmod == var->vartypmod &&
+				pvar->varcollid == var->varcollid &&
+				pvar->varnoold == var->varnoold &&
+				pvar->varoattno == var->varoattno)
+				return pitem->paramId;
+		}
+	}
+
+	/* Nope, so make a new one */
+	var = copyObject(var);
+	var->varlevelsup = 0;
+
+	pitem = makeNode(PlannerParamItem);
+	pitem->item = (Node *) var;
+	pitem->paramId = list_length(root->glob->paramExecTypes);
+	root->glob->paramExecTypes = lappend_oid(root->glob->paramExecTypes,
+											 var->vartype);
+
+	root->plan_params = lappend(root->plan_params, pitem);
+
+	return pitem->paramId;
+}
+
+
 /*
  * postgresGetForeignPlan
  *		Create ForeignScan plan node which implements selected best path
@@ -1287,6 +1367,32 @@ postgresGetForeignPlan(PlannerInfo *root,
 	if (IS_JOIN_REL(foreignrel) || IS_UPPER_REL(foreignrel))
 		fdw_private = lappend(fdw_private,
 							  makeString(fpinfo->relation_name->data));
+	if (!bms_is_empty(fpinfo->param_attrs))
+	{
+		int *paramids = palloc(sizeof(int) * 2);
+		Var	*v;
+
+		if (list_length(fdw_private) == 3)
+			fdw_private = lappend(fdw_private, makeString(""));
+
+		v = makeNode(Var);
+		v->varno = foreignrel->relid;
+		v->vartype = OIDOID;
+		v->vartypmod = -1;
+		v->varcollid = InvalidOid;
+		v->varattno = TableOidAttributeNumber;
+		paramids[0] = assign_param_for_var(root, v);
+
+		v = makeNode(Var);
+		v->varno = foreignrel->relid;
+		v->vartype = TIDOID;
+		v->vartypmod = -1;
+		v->varcollid = InvalidOid;
+		v->varattno = SelfItemPointerAttributeNumber;
+		paramids[1] = assign_param_for_var(root, v);
+
+		fdw_private = lappend(fdw_private, paramids);
+	}
 
 	/*
 	 * Create the ForeignScan node for the given relation.
@@ -1368,6 +1474,9 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags)
 												 FdwScanPrivateRetrievedAttrs);
 	fsstate->fetch_size = intVal(list_nth(fsplan->fdw_private,
 										  FdwScanPrivateFetchSize));
+	if (list_length(fsplan->fdw_private) > FdwScanTupleIdParamIds)
+		fsstate->tid_params =
+			(int *) list_nth(fsplan->fdw_private, FdwScanTupleIdParamIds);
 
 	/* Create contexts for batches of tuples and per-tuple temp workspace. */
 	fsstate->batch_cxt = AllocSetContextCreate(estate->es_query_cxt,
@@ -1418,6 +1527,8 @@ postgresIterateForeignScan(ForeignScanState *node)
 {
 	PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
 	TupleTableSlot *slot = node->ss.ss_ScanTupleSlot;
+	EState *estate = node->ss.ps.state;
+	HeapTuple		tup;
 
 	/*
 	 * If this is the first call after Begin or ReScan, we need to create the
@@ -1439,10 +1550,28 @@ postgresIterateForeignScan(ForeignScanState *node)
 			return ExecClearTuple(slot);
 	}
 
+	tup = fsstate->tuples[fsstate->next_tuple++];
+	if (fsstate->tid_params != NULL)
+	{
+		ParamExecData *prm;
+		ItemPointer	  itemp;
+
+		/* set toid */
+		prm = &(estate->es_param_exec_vals[fsstate->tid_params[0]]);
+		prm->value = ObjectIdGetDatum(tup->t_tableOid);
+		/* set ctid */
+		prm = &(estate->es_param_exec_vals[fsstate->tid_params[1]]);
+		itemp = (ItemPointer) palloc(sizeof(ItemPointerData));
+		ItemPointerSet(itemp,
+					   ItemPointerGetBlockNumberNoCheck(&tup->t_self),
+					   ItemPointerGetOffsetNumberNoCheck(&tup->t_self));
+		prm->value = PointerGetDatum(itemp);
+	}
+	
 	/*
 	 * Return the next tuple.
 	 */
-	ExecStoreTuple(fsstate->tuples[fsstate->next_tuple++],
+	ExecStoreTuple(tup,
 				   slot,
 				   InvalidBuffer,
 				   false);
@@ -1530,41 +1659,41 @@ postgresEndForeignScan(ForeignScanState *node)
 	/* MemoryContexts will be deleted automatically. */
 }
 
-/*
- * postgresAddForeignUpdateTargets
- *		Add resjunk column(s) needed for update/delete on a foreign table
- */
-static void
-postgresAddForeignUpdateTargets(Query *parsetree,
-								RangeTblEntry *target_rte,
-								Relation target_relation)
+static int
+find_param_for_var(PlannerInfo *root, Var *var)
 {
-	Var		   *var;
-	const char *attrname;
-	TargetEntry *tle;
+	ListCell   *ppl;
+	PlannerParamItem *pitem;
+	Index		levelsup;
 
-	/*
-	 * In postgres_fdw, what we need is the ctid, same as for a regular table.
-	 */
+	/* Find the query level the Var belongs to */
+	for (levelsup = var->varlevelsup; levelsup > 0; levelsup--)
+		root = root->parent_root;
 
-	/* Make a Var representing the desired value */
-	var = makeVar(parsetree->resultRelation,
-				  SelfItemPointerAttributeNumber,
-				  TIDOID,
-				  -1,
-				  InvalidOid,
-				  0);
+	/* If there's already a matching PlannerParamItem there, just use it */
+	foreach(ppl, root->plan_params)
+	{
+		pitem = (PlannerParamItem *) lfirst(ppl);
+		if (IsA(pitem->item, Var))
+		{
+			Var		   *pvar = (Var *) pitem->item;
 
-	/* Wrap it in a resjunk TLE with the right name ... */
-	attrname = "ctid";
+			/*
+			 * This comparison must match _equalVar(), except for ignoring
+			 * varlevelsup.  Note that _equalVar() ignores the location.
+			 */
+			if (pvar->varno == var->varno &&
+				pvar->varattno == var->varattno &&
+				pvar->vartype == var->vartype &&
+				pvar->vartypmod == var->vartypmod &&
+				pvar->varcollid == var->varcollid &&
+				pvar->varnoold == var->varnoold &&
+				pvar->varoattno == var->varoattno)
+				return pitem->paramId;
+		}
+	}
 
-	tle = makeTargetEntry((Expr *) var,
-						  list_length(parsetree->targetList) + 1,
-						  pstrdup(attrname),
-						  true);
-
-	/* ... and add it to the query's targetlist */
-	parsetree->targetList = lappend(parsetree->targetList, tle);
+	return -1;
 }
 
 /*
@@ -1585,6 +1714,7 @@ postgresPlanForeignModify(PlannerInfo *root,
 	List	   *returningList = NIL;
 	List	   *retrieved_attrs = NIL;
 	bool		doNothing = false;
+	int *paramids = NULL;
 
 	initStringInfo(&sql);
 
@@ -1630,6 +1760,28 @@ postgresPlanForeignModify(PlannerInfo *root,
 		}
 	}
 
+	if (operation == CMD_UPDATE || operation == CMD_DELETE)
+	{
+		Var	*v;
+
+		paramids = palloc(sizeof(int) * 2);
+		v = makeNode(Var);
+		v->varno = resultRelation;
+		v->vartype = OIDOID;
+		v->vartypmod = -1;
+		v->varcollid = InvalidOid;
+		v->varattno = TableOidAttributeNumber;
+		paramids[0] = find_param_for_var(root, v);
+		if (paramids[0] < 0)
+			elog(ERROR, "ERROR 1");
+
+		v->vartype = TIDOID;
+		v->varattno = SelfItemPointerAttributeNumber;
+		paramids[1] = find_param_for_var(root, v);
+		if (paramids[1] < 0)
+			elog(ERROR, "ERROR 2");
+	}
+
 	/*
 	 * Extract the relevant RETURNING list if any.
 	 */
@@ -1679,10 +1831,11 @@ postgresPlanForeignModify(PlannerInfo *root,
 	 * Build the fdw_private list that will be available to the executor.
 	 * Items in the list must match enum FdwModifyPrivateIndex, above.
 	 */
-	return list_make4(makeString(sql.data),
+	return list_make5(makeString(sql.data),
 					  targetAttrs,
 					  makeInteger((retrieved_attrs != NIL)),
-					  retrieved_attrs);
+					  retrieved_attrs,
+					  paramids);
 }
 
 /*
@@ -1702,6 +1855,7 @@ postgresBeginForeignModify(ModifyTableState *mtstate,
 	bool		has_returning;
 	List	   *retrieved_attrs;
 	RangeTblEntry *rte;
+	int		   *tid_params;
 
 	/*
 	 * Do nothing in EXPLAIN (no ANALYZE) case.  resultRelInfo->ri_FdwState
@@ -1719,6 +1873,7 @@ postgresBeginForeignModify(ModifyTableState *mtstate,
 									FdwModifyPrivateHasReturning));
 	retrieved_attrs = (List *) list_nth(fdw_private,
 										FdwModifyPrivateRetrievedAttrs);
+	tid_params = (int *) list_nth(fdw_private, FdwModifyPrivateTidParams);
 
 	/* Find RTE. */
 	rte = rt_fetch(resultRelInfo->ri_RangeTableIndex,
@@ -1733,7 +1888,8 @@ postgresBeginForeignModify(ModifyTableState *mtstate,
 									query,
 									target_attrs,
 									has_returning,
-									retrieved_attrs);
+									retrieved_attrs,
+									tid_params);
 
 	resultRelInfo->ri_FdwState = fmstate;
 }
@@ -1758,7 +1914,7 @@ postgresExecForeignInsert(EState *estate,
 		prepare_foreign_modify(fmstate);
 
 	/* Convert parameters needed by prepared statement to text form */
-	p_values = convert_prep_stmt_params(fmstate, NULL, slot);
+	p_values = convert_prep_stmt_params(fmstate, InvalidOid, NULL, slot);
 
 	/*
 	 * Execute the prepared statement.
@@ -1813,28 +1969,31 @@ postgresExecForeignUpdate(EState *estate,
 						  TupleTableSlot *planSlot)
 {
 	PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
-	Datum		datum;
-	bool		isNull;
+	Datum		toiddatum, ctiddatum;
 	const char **p_values;
 	PGresult   *res;
 	int			n_rows;
+	int 	   *tid_params = fmstate->tid_params;
+	ParamExecData *prm;
 
 	/* Set up the prepared statement on the remote server, if we didn't yet */
 	if (!fmstate->p_name)
 		prepare_foreign_modify(fmstate);
 
+	Assert(tid_params);
+	/* Get the tableoid that was passed up as a exec param */
+	prm = &(estate->es_param_exec_vals[tid_params[0]]);
+	toiddatum = prm->value;
+
 	/* Get the ctid that was passed up as a resjunk column */
-	datum = ExecGetJunkAttribute(planSlot,
-								 fmstate->ctidAttno,
-								 &isNull);
-	/* shouldn't ever get a null result... */
-	if (isNull)
-		elog(ERROR, "ctid is NULL");
+	prm = &(estate->es_param_exec_vals[tid_params[1]]);
+	ctiddatum = prm->value;
 
 	/* Convert parameters needed by prepared statement to text form */
 	p_values = convert_prep_stmt_params(fmstate,
-										(ItemPointer) DatumGetPointer(datum),
-										slot);
+									DatumGetObjectId(toiddatum),
+									(ItemPointer) DatumGetPointer(ctiddatum),
+									slot);
 
 	/*
 	 * Execute the prepared statement.
@@ -1889,28 +2048,32 @@ postgresExecForeignDelete(EState *estate,
 						  TupleTableSlot *planSlot)
 {
 	PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
-	Datum		datum;
-	bool		isNull;
+	Datum		toiddatum, ctiddatum;
 	const char **p_values;
 	PGresult   *res;
 	int			n_rows;
+	int 	   *tid_params = fmstate->tid_params;
+	ParamExecData *prm;
 
 	/* Set up the prepared statement on the remote server, if we didn't yet */
 	if (!fmstate->p_name)
 		prepare_foreign_modify(fmstate);
 
+	Assert(tid_params);
+
+	/* Get the tableoid that was passed up as a exec param */
+	prm = &(estate->es_param_exec_vals[tid_params[0]]);
+	toiddatum = prm->value;
+
 	/* Get the ctid that was passed up as a resjunk column */
-	datum = ExecGetJunkAttribute(planSlot,
-								 fmstate->ctidAttno,
-								 &isNull);
-	/* shouldn't ever get a null result... */
-	if (isNull)
-		elog(ERROR, "ctid is NULL");
+	prm = &(estate->es_param_exec_vals[tid_params[1]]);
+	ctiddatum = prm->value;
 
 	/* Convert parameters needed by prepared statement to text form */
 	p_values = convert_prep_stmt_params(fmstate,
-										(ItemPointer) DatumGetPointer(datum),
-										NULL);
+									DatumGetObjectId(toiddatum),
+									(ItemPointer) DatumGetPointer(ctiddatum),
+									NULL);
 
 	/*
 	 * Execute the prepared statement.
@@ -2058,7 +2221,8 @@ postgresBeginForeignInsert(ModifyTableState *mtstate,
 									sql.data,
 									targetAttrs,
 									retrieved_attrs != NIL,
-									retrieved_attrs);
+									retrieved_attrs,
+									NULL);
 
 	resultRelInfo->ri_FdwState = fmstate;
 }
@@ -3286,7 +3450,8 @@ create_foreign_modify(EState *estate,
 					  char *query,
 					  List *target_attrs,
 					  bool has_returning,
-					  List *retrieved_attrs)
+					  List *retrieved_attrs,
+					  int *tid_params)
 {
 	PgFdwModifyState *fmstate;
 	Relation	rel = resultRelInfo->ri_RelationDesc;
@@ -3333,7 +3498,7 @@ create_foreign_modify(EState *estate,
 		fmstate->attinmeta = TupleDescGetAttInMetadata(tupdesc);
 
 	/* Prepare for output conversion of parameters used in prepared stmt. */
-	n_params = list_length(fmstate->target_attrs) + 1;
+	n_params = list_length(fmstate->target_attrs) + 2;
 	fmstate->p_flinfo = (FmgrInfo *) palloc0(sizeof(FmgrInfo) * n_params);
 	fmstate->p_nums = 0;
 
@@ -3341,13 +3506,14 @@ create_foreign_modify(EState *estate,
 	{
 		Assert(subplan != NULL);
 
-		/* Find the ctid resjunk column in the subplan's result */
-		fmstate->ctidAttno = ExecFindJunkAttributeInTlist(subplan->targetlist,
-														  "ctid");
-		if (!AttributeNumberIsValid(fmstate->ctidAttno))
-			elog(ERROR, "could not find junk ctid column");
+		fmstate->tid_params = tid_params;
 
-		/* First transmittable parameter will be ctid */
+		/* First transmittable parameter will be table oid */
+		getTypeOutputInfo(OIDOID, &typefnoid, &isvarlena);
+		fmgr_info(typefnoid, &fmstate->p_flinfo[fmstate->p_nums]);
+		fmstate->p_nums++;
+
+		/* Second transmittable parameter will be ctid */
 		getTypeOutputInfo(TIDOID, &typefnoid, &isvarlena);
 		fmgr_info(typefnoid, &fmstate->p_flinfo[fmstate->p_nums]);
 		fmstate->p_nums++;
@@ -3430,6 +3596,7 @@ prepare_foreign_modify(PgFdwModifyState *fmstate)
  */
 static const char **
 convert_prep_stmt_params(PgFdwModifyState *fmstate,
+						 Oid tableoid,
 						 ItemPointer tupleid,
 						 TupleTableSlot *slot)
 {
@@ -3441,10 +3608,13 @@ convert_prep_stmt_params(PgFdwModifyState *fmstate,
 
 	p_values = (const char **) palloc(sizeof(char *) * fmstate->p_nums);
 
-	/* 1st parameter should be ctid, if it's in use */
-	if (tupleid != NULL)
+	/* First two parameters should be tableoid and ctid, if it's in use */
+	if (tableoid != InvalidOid)
 	{
 		/* don't need set_transmission_modes for TID output */
+		p_values[pindex] = OutputFunctionCall(&fmstate->p_flinfo[pindex],
+											  ObjectIdGetDatum(tableoid));
+		pindex++;
 		p_values[pindex] = OutputFunctionCall(&fmstate->p_flinfo[pindex],
 											  PointerGetDatum(tupleid));
 		pindex++;
@@ -5549,6 +5719,7 @@ make_tuple_from_result_row(PGresult *res,
 	bool	   *nulls;
 	ItemPointer ctid = NULL;
 	Oid			oid = InvalidOid;
+	Oid			toid = InvalidOid;
 	ConversionLocation errpos;
 	ErrorContextCallback errcallback;
 	MemoryContext oldcontext;
@@ -5642,6 +5813,17 @@ make_tuple_from_result_row(PGresult *res,
 				oid = DatumGetObjectId(datum);
 			}
 		}
+		else if (i == TableOidAttributeNumber)
+		{
+			/* table oid */
+			if (valstr != NULL)
+			{
+				Datum		datum;
+
+				datum = DirectFunctionCall1(oidin, CStringGetDatum(valstr));
+				toid = DatumGetObjectId(datum);
+			}
+		}
 		errpos.cur_attno = 0;
 
 		j++;
@@ -5691,6 +5873,9 @@ make_tuple_from_result_row(PGresult *res,
 	if (OidIsValid(oid))
 		HeapTupleSetOid(tuple, oid);
 
+	if (OidIsValid(toid))
+		tuple->t_tableOid = toid;
+
 	/* Clean up */
 	MemoryContextReset(temp_context);
 
diff --git a/contrib/postgres_fdw/postgres_fdw.h b/contrib/postgres_fdw/postgres_fdw.h
index a5d4011e8d..39e5581125 100644
--- a/contrib/postgres_fdw/postgres_fdw.h
+++ b/contrib/postgres_fdw/postgres_fdw.h
@@ -108,6 +108,8 @@ typedef struct PgFdwRelationInfo
 	 * representing the relation.
 	 */
 	int			relation_index;
+
+	Bitmapset  *param_attrs;			/* attrs required for modification */
 } PgFdwRelationInfo;
 
 /* in postgres_fdw.c */

Reply via email to