Hello. At Mon, 04 Jun 2018 20:58:28 +0900 (Tokyo Standard Time), Kyotaro HORIGUCHI <horiguchi.kyot...@lab.ntt.co.jp> wrote in <20180604.205828.208262556.horiguchi.kyot...@lab.ntt.co.jp> > It fails on some join-pushdown cases since it doesn't add tid > columns to join tlist. I suppose that build_tlist_to_deparse > needs something but I'll consider further tomorrow.
I made it work with a few exceptions and bumped. PARAM_EXEC doesn't work at all in a case where Sort exists between ForeignUpdate and ForeignScan. ===== explain (verbose, costs off) update bar set f2 = f2 + 100 from ( select f1 from foo union all select f1+3 from foo ) ss where bar.f1 = ss.f1; QUERY PLAN ----------------------------------------------------------------------------- Update on public.bar Update on public.bar Foreign Update on public.bar2 Remote SQL: UPDATE public.loct2 SET f2 = $3 WHERE tableoid = $1 AND ctid = $2 ... -> Merge Join Output: bar2.f1, (bar2.f2 + 100), bar2.f3, (ROW(foo.f1)) Merge Cond: (bar2.f1 = foo.f1) -> Sort Output: bar2.f1, bar2.f2, bar2.f3, bar2.tableoid, bar2.ctid Sort Key: bar2.f1 -> Foreign Scan on public.bar2 Output: bar2.f1, bar2.f2, bar2.f3, bar2.tableoid, bar2.ctid Remote SQL: SELECT f1, f2, f3, ctid, tableoid FROM public.loct2 FOR UPDATE ===== Even if this worked fine, it cannot be back-patched. We need an extra storage moves together with tuples or prevent sorts or something like from being inserted there. At Fri, 1 Jun 2018 10:21:39 -0400, Ashutosh Bapat <ashutosh.ba...@enterprisedb.com> wrote in <CAFjFpRdraYcQnD4tKzNuP1uP6L-gnizi4HLU_UA=28q2m4z...@mail.gmail.com> > I am not suggesting to commit 0003 in my patch set, but just 0001 and > 0002 which just raise an error when multiple rows get updated when > only one row is expected to be updated. So I agree to commit the two at least in order to prevent doing wrong silently. regards. -- Kyotaro Horiguchi NTT Open Source Software Center
diff --git a/contrib/postgres_fdw/deparse.c b/contrib/postgres_fdw/deparse.c index d272719ff4..bff216f29d 100644 --- a/contrib/postgres_fdw/deparse.c +++ b/contrib/postgres_fdw/deparse.c @@ -1049,9 +1049,16 @@ deparseSelectSql(List *tlist, bool is_subquery, List **retrieved_attrs, * can use NoLock here. */ Relation rel = heap_open(rte->relid, NoLock); + Bitmapset *attrs = fpinfo->attrs_used; + + if (root->parse->commandType != CMD_UPDATE && + root->parse->commandType != CMD_DELETE) + attrs = bms_del_member(bms_copy(attrs), + TableOidAttributeNumber - + FirstLowInvalidHeapAttributeNumber); deparseTargetList(buf, rte, foreignrel->relid, rel, false, - fpinfo->attrs_used, false, retrieved_attrs); + attrs, false, retrieved_attrs); heap_close(rel, NoLock); } } @@ -1107,11 +1114,17 @@ deparseTargetList(StringInfo buf, bool qualify_col, List **retrieved_attrs) { + static int check_attrs[4]; + static char *check_attr_names[] = {"ctid", "oid", "tableoid"}; TupleDesc tupdesc = RelationGetDescr(rel); bool have_wholerow; bool first; int i; + check_attrs[0] = SelfItemPointerAttributeNumber; + check_attrs[1] = ObjectIdAttributeNumber; + check_attrs[2] = TableOidAttributeNumber; + check_attrs[3] = FirstLowInvalidHeapAttributeNumber; *retrieved_attrs = NIL; /* If there's a whole-row reference, we'll need all the columns. */ @@ -1143,13 +1156,16 @@ deparseTargetList(StringInfo buf, } } - /* - * Add ctid and oid if needed. We currently don't support retrieving any - * other system columns. - */ - if (bms_is_member(SelfItemPointerAttributeNumber - FirstLowInvalidHeapAttributeNumber, - attrs_used)) + for (i = 0 ; check_attrs[i] != FirstLowInvalidHeapAttributeNumber ; i++) { + int attr = check_attrs[i]; + char *attr_name = check_attr_names[i]; + + /* Add system columns if needed. */ + if (!bms_is_member(attr - FirstLowInvalidHeapAttributeNumber, + attrs_used)) + continue; + if (!first) appendStringInfoString(buf, ", "); else if (is_returning) @@ -1158,26 +1174,9 @@ deparseTargetList(StringInfo buf, if (qualify_col) ADD_REL_QUALIFIER(buf, rtindex); - appendStringInfoString(buf, "ctid"); + appendStringInfoString(buf, attr_name); - *retrieved_attrs = lappend_int(*retrieved_attrs, - SelfItemPointerAttributeNumber); - } - if (bms_is_member(ObjectIdAttributeNumber - FirstLowInvalidHeapAttributeNumber, - attrs_used)) - { - if (!first) - appendStringInfoString(buf, ", "); - else if (is_returning) - appendStringInfoString(buf, " RETURNING "); - first = false; - - if (qualify_col) - ADD_REL_QUALIFIER(buf, rtindex); - appendStringInfoString(buf, "oid"); - - *retrieved_attrs = lappend_int(*retrieved_attrs, - ObjectIdAttributeNumber); + *retrieved_attrs = lappend_int(*retrieved_attrs, attr); } /* Don't generate bad syntax if no undropped columns */ @@ -1725,7 +1724,7 @@ deparseUpdateSql(StringInfo buf, RangeTblEntry *rte, deparseRelation(buf, rel); appendStringInfoString(buf, " SET "); - pindex = 2; /* ctid is always the first param */ + pindex = 3; /* tableoid and ctid are always the first param */ first = true; foreach(lc, targetAttrs) { @@ -1739,7 +1738,7 @@ deparseUpdateSql(StringInfo buf, RangeTblEntry *rte, appendStringInfo(buf, " = $%d", pindex); pindex++; } - appendStringInfoString(buf, " WHERE ctid = $1"); + appendStringInfoString(buf, " WHERE tableoid = $1 AND ctid = $2"); deparseReturningList(buf, rte, rtindex, rel, rel->trigdesc && rel->trigdesc->trig_update_after_row, @@ -1855,7 +1854,7 @@ deparseDeleteSql(StringInfo buf, RangeTblEntry *rte, { appendStringInfoString(buf, "DELETE FROM "); deparseRelation(buf, rel); - appendStringInfoString(buf, " WHERE ctid = $1"); + appendStringInfoString(buf, " WHERE tableoid = $1 AND ctid = $2"); deparseReturningList(buf, rte, rtindex, rel, rel->trigdesc && rel->trigdesc->trig_delete_after_row, @@ -1951,8 +1950,13 @@ deparseReturningList(StringInfo buf, RangeTblEntry *rte, */ pull_varattnos((Node *) returningList, rtindex, &attrs_used); + + attrs_used = bms_del_member(attrs_used, + TableOidAttributeNumber - + FirstLowInvalidHeapAttributeNumber); } + if (attrs_used != NULL) deparseTargetList(buf, rte, rtindex, rel, true, attrs_used, false, retrieved_attrs); @@ -2066,6 +2070,12 @@ deparseColumnRef(StringInfo buf, int varno, int varattno, RangeTblEntry *rte, ADD_REL_QUALIFIER(buf, varno); appendStringInfoString(buf, "oid"); } + else if (varattno == TableOidAttributeNumber) + { + if (qualify_col) + ADD_REL_QUALIFIER(buf, varno); + appendStringInfoString(buf, "tableoid"); + } else if (varattno < 0) { /* diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c index 78b0f43ca8..e574d7f51b 100644 --- a/contrib/postgres_fdw/postgres_fdw.c +++ b/contrib/postgres_fdw/postgres_fdw.c @@ -73,7 +73,10 @@ enum FdwScanPrivateIndex * String describing join i.e. names of relations being joined and types * of join, added when the scan is join */ - FdwScanPrivateRelations + FdwScanPrivateRelations, + + /* Integer list of ids of EXEC_PARAM */ + FdwScanTupleIdParamIds }; /* @@ -95,7 +98,9 @@ enum FdwModifyPrivateIndex /* has-returning flag (as an integer Value node) */ FdwModifyPrivateHasReturning, /* Integer list of attribute numbers retrieved by RETURNING */ - FdwModifyPrivateRetrievedAttrs + FdwModifyPrivateRetrievedAttrs, + /* Integer list of paramid for tableoid and ctid of source tuple */ + FdwModifyPrivateTidParams }; /* @@ -156,6 +161,8 @@ typedef struct PgFdwScanState MemoryContext temp_cxt; /* context for per-tuple temporary data */ int fetch_size; /* number of tuples per fetch */ + + int *tid_params; /* EXEC_PARAM id for tuple identifier */ } PgFdwScanState; /* @@ -177,7 +184,7 @@ typedef struct PgFdwModifyState List *retrieved_attrs; /* attr numbers retrieved by RETURNING */ /* info about parameters for prepared statement */ - AttrNumber ctidAttno; /* attnum of input resjunk ctid column */ + int *tid_params; /* EXEC_PARAM ids for tuple identifier */ int p_nums; /* number of parameters to transmit */ FmgrInfo *p_flinfo; /* output conversion functions for them */ @@ -293,9 +300,6 @@ static void postgresBeginForeignScan(ForeignScanState *node, int eflags); static TupleTableSlot *postgresIterateForeignScan(ForeignScanState *node); static void postgresReScanForeignScan(ForeignScanState *node); static void postgresEndForeignScan(ForeignScanState *node); -static void postgresAddForeignUpdateTargets(Query *parsetree, - RangeTblEntry *target_rte, - Relation target_relation); static List *postgresPlanForeignModify(PlannerInfo *root, ModifyTable *plan, Index resultRelation, @@ -388,9 +392,11 @@ static PgFdwModifyState *create_foreign_modify(EState *estate, char *query, List *target_attrs, bool has_returning, - List *retrieved_attrs); + List *retrieved_attrs, + int *tid_params); static void prepare_foreign_modify(PgFdwModifyState *fmstate); static const char **convert_prep_stmt_params(PgFdwModifyState *fmstate, + Oid tableoid, ItemPointer tupleid, TupleTableSlot *slot); static void store_returning_result(PgFdwModifyState *fmstate, @@ -451,6 +457,7 @@ static void merge_fdw_options(PgFdwRelationInfo *fpinfo, const PgFdwRelationInfo *fpinfo_o, const PgFdwRelationInfo *fpinfo_i); +static List *add_tidcols_to_tlist(List *org, Index varno); /* * Foreign-data wrapper handler function: return a struct with pointers @@ -471,7 +478,6 @@ postgres_fdw_handler(PG_FUNCTION_ARGS) routine->EndForeignScan = postgresEndForeignScan; /* Functions for updating foreign tables */ - routine->AddForeignUpdateTargets = postgresAddForeignUpdateTargets; routine->PlanForeignModify = postgresPlanForeignModify; routine->BeginForeignModify = postgresBeginForeignModify; routine->ExecForeignInsert = postgresExecForeignInsert; @@ -595,6 +601,39 @@ postgresGetForeignRelSize(PlannerInfo *root, &fpinfo->attrs_used); } + /* + * ctid and tableoid are required for target relation of UPDATE and + * DELETE. Join relations are handled elsewhere. + */ + if (root->parse->resultRelation == baserel->relid && + (root->parse->commandType == CMD_UPDATE || + root->parse->commandType == CMD_DELETE)) + { + Var *v; + + v = makeVar(baserel->relid, + TableOidAttributeNumber, + OIDOID, -1, InvalidOid, 0); + add_new_column_to_pathtarget(baserel->reltarget, (Expr *) v); + v = makeVar(baserel->relid, + SelfItemPointerAttributeNumber, + TIDOID, -1, InvalidOid, 0); + add_new_column_to_pathtarget(baserel->reltarget, (Expr *) v); + + fpinfo->param_attrs = + bms_add_member(fpinfo->param_attrs, + SelfItemPointerAttributeNumber - + FirstLowInvalidHeapAttributeNumber); + + fpinfo->param_attrs = + bms_add_member(fpinfo->param_attrs, + TableOidAttributeNumber - + FirstLowInvalidHeapAttributeNumber); + + fpinfo->attrs_used = + bms_add_members(fpinfo->attrs_used, fpinfo->param_attrs); + } + /* * Compute the selectivity and cost of the local_conds, so we don't have * to do it over again for each path. The best we can do for these @@ -1116,6 +1155,94 @@ postgresGetForeignPaths(PlannerInfo *root, } } +/* Find the id of a PARAM_EXEC matches to the given var */ +static int +find_param_for_var(PlannerInfo *root, Var *var) +{ + ListCell *ppl; + PlannerParamItem *pitem; + Index levelsup; + + /* Find the query level the Var belongs to */ + for (levelsup = var->varlevelsup; levelsup > 0; levelsup--) + root = root->parent_root; + + /* If there's already a matching PlannerParamItem there, just use it */ + foreach(ppl, root->plan_params) + { + pitem = (PlannerParamItem *) lfirst(ppl); + if (IsA(pitem->item, Var)) + { + Var *pvar = (Var *) pitem->item; + + /* + * This comparison must match _equalVar(), except for ignoring + * varlevelsup. Note that _equalVar() ignores the location. + */ + if (pvar->varno == var->varno && + pvar->varattno == var->varattno && + pvar->vartype == var->vartype && + pvar->vartypmod == var->vartypmod && + pvar->varcollid == var->varcollid && + pvar->varnoold == var->varnoold && + pvar->varoattno == var->varoattno) + return pitem->paramId; + } + } + + return -1; +} + +/* + * Select a PARAM_EXEC number to identify the given Var as a parameter for + * the current subquery, or for a nestloop's inner scan. + * If the Var already has a param in the current context, return that one. + * (copy of the function in subselect.c) + */ +static int +assign_param_for_var(PlannerInfo *root, Var *var) +{ + int paramid; + PlannerParamItem *pitem; + + /* Return registered param if any */ + paramid = find_param_for_var(root, var); + if (paramid >= 0) + return paramid; + + /* Nope, so make a new one */ + var = copyObject(var); + var->varlevelsup = 0; + + pitem = makeNode(PlannerParamItem); + pitem->item = (Node *) var; + pitem->paramId = list_length(root->glob->paramExecTypes); + root->glob->paramExecTypes = lappend_oid(root->glob->paramExecTypes, + var->vartype); + + root->plan_params = lappend(root->plan_params, pitem); + + return pitem->paramId; +} + +static List * +add_tidcols_to_tlist(List *org, Index varno) +{ + List *result = NIL; + + result = list_copy(org); + + result = + add_to_flat_tlist(result, + list_make2(makeVar(varno, TableOidAttributeNumber, + OIDOID, -1, InvalidOid, 0), + makeVar(varno, + SelfItemPointerAttributeNumber, + TIDOID, -1, InvalidOid, 0))); + + return result; +} + /* * postgresGetForeignPlan * Create ForeignScan plan node which implements selected best path @@ -1136,6 +1263,7 @@ postgresGetForeignPlan(PlannerInfo *root, List *local_exprs = NIL; List *params_list = NIL; List *fdw_scan_tlist = NIL; + List *fdw_return_tlist = NIL; List *fdw_recheck_quals = NIL; List *retrieved_attrs; StringInfoData sql; @@ -1223,8 +1351,8 @@ postgresGetForeignPlan(PlannerInfo *root, * locally. */ - /* Build the list of columns to be fetched from the foreign server. */ - fdw_scan_tlist = build_tlist_to_deparse(foreignrel); + /* Build the list of columns to be returned to upper node. */ + fdw_scan_tlist = fdw_return_tlist = build_tlist_to_deparse(foreignrel); /* * Ensure that the outer plan produces a tuple whose descriptor @@ -1263,6 +1391,17 @@ postgresGetForeignPlan(PlannerInfo *root, qual); } } + + /* + * Remote query requires tuple identifers if this relation involves + * the target relation of UPDATE/DELETE commands. + */ + if ((root->parse->commandType == CMD_UPDATE || + root->parse->commandType == CMD_DELETE) && + bms_is_member(root->parse->resultRelation, foreignrel->relids)) + fdw_scan_tlist = + add_tidcols_to_tlist(fdw_return_tlist, + root->parse->resultRelation); } /* @@ -1288,6 +1427,45 @@ postgresGetForeignPlan(PlannerInfo *root, fdw_private = lappend(fdw_private, makeString(fpinfo->relation_name->data)); + /* + * Prepare EXEC_PARAM for tuple identifier if this relation is the target + * relation of the current DELETE/UPDATE query. + */ + if ((root->parse->commandType == CMD_DELETE || + root->parse->commandType == CMD_UPDATE) && + (scan_relid ? + !bms_is_empty(fpinfo->param_attrs) : + bms_is_member(root->parse->resultRelation, foreignrel->relids))) + { + int *paramids = palloc(sizeof(int) * 2); + Var *v; + Index target_relid = scan_relid; + + if (target_relid == 0) + target_relid = root->parse->resultRelation; + + if (list_length(fdw_private) == 3) + fdw_private = lappend(fdw_private, NULL); + + v = makeNode(Var); + v->varno = target_relid; + v->vartype = OIDOID; + v->vartypmod = -1; + v->varcollid = InvalidOid; + v->varattno = TableOidAttributeNumber; + paramids[0] = assign_param_for_var(root, v); + + v = makeNode(Var); + v->varno = target_relid; + v->vartype = TIDOID; + v->vartypmod = -1; + v->varcollid = InvalidOid; + v->varattno = SelfItemPointerAttributeNumber; + paramids[1] = assign_param_for_var(root, v); + + fdw_private = lappend(fdw_private, paramids); + } + /* * Create the ForeignScan node for the given relation. * @@ -1300,7 +1478,7 @@ postgresGetForeignPlan(PlannerInfo *root, scan_relid, params_list, fdw_private, - fdw_scan_tlist, + fdw_return_tlist, fdw_recheck_quals, outer_plan); } @@ -1368,6 +1546,9 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags) FdwScanPrivateRetrievedAttrs); fsstate->fetch_size = intVal(list_nth(fsplan->fdw_private, FdwScanPrivateFetchSize)); + if (list_length(fsplan->fdw_private) > FdwScanTupleIdParamIds) + fsstate->tid_params = + (int *) list_nth(fsplan->fdw_private, FdwScanTupleIdParamIds); /* Create contexts for batches of tuples and per-tuple temp workspace. */ fsstate->batch_cxt = AllocSetContextCreate(estate->es_query_cxt, @@ -1418,6 +1599,8 @@ postgresIterateForeignScan(ForeignScanState *node) { PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state; TupleTableSlot *slot = node->ss.ss_ScanTupleSlot; + EState *estate = node->ss.ps.state; + HeapTuple tup; /* * If this is the first call after Begin or ReScan, we need to create the @@ -1439,10 +1622,30 @@ postgresIterateForeignScan(ForeignScanState *node) return ExecClearTuple(slot); } + tup = fsstate->tuples[fsstate->next_tuple++]; + + /* Store the remote table oid and ctid into exec parameter if requested */ + if (fsstate->tid_params != NULL) + { + ParamExecData *prm; + ItemPointer itemp; + + /* set toid */ + prm = &(estate->es_param_exec_vals[fsstate->tid_params[0]]); + prm->value = ObjectIdGetDatum(tup->t_tableOid); + /* set ctid */ + prm = &(estate->es_param_exec_vals[fsstate->tid_params[1]]); + itemp = (ItemPointer) palloc(sizeof(ItemPointerData)); + ItemPointerSet(itemp, + ItemPointerGetBlockNumberNoCheck(&tup->t_self), + ItemPointerGetOffsetNumberNoCheck(&tup->t_self)); + prm->value = PointerGetDatum(itemp); + } + /* * Return the next tuple. */ - ExecStoreTuple(fsstate->tuples[fsstate->next_tuple++], + ExecStoreTuple(tup, slot, InvalidBuffer, false); @@ -1530,43 +1733,6 @@ postgresEndForeignScan(ForeignScanState *node) /* MemoryContexts will be deleted automatically. */ } -/* - * postgresAddForeignUpdateTargets - * Add resjunk column(s) needed for update/delete on a foreign table - */ -static void -postgresAddForeignUpdateTargets(Query *parsetree, - RangeTblEntry *target_rte, - Relation target_relation) -{ - Var *var; - const char *attrname; - TargetEntry *tle; - - /* - * In postgres_fdw, what we need is the ctid, same as for a regular table. - */ - - /* Make a Var representing the desired value */ - var = makeVar(parsetree->resultRelation, - SelfItemPointerAttributeNumber, - TIDOID, - -1, - InvalidOid, - 0); - - /* Wrap it in a resjunk TLE with the right name ... */ - attrname = "ctid"; - - tle = makeTargetEntry((Expr *) var, - list_length(parsetree->targetList) + 1, - pstrdup(attrname), - true); - - /* ... and add it to the query's targetlist */ - parsetree->targetList = lappend(parsetree->targetList, tle); -} - /* * postgresPlanForeignModify * Plan an insert/update/delete operation on a foreign table @@ -1630,6 +1796,33 @@ postgresPlanForeignModify(PlannerInfo *root, } } + /* + * In the non-direct modify cases, the corresponding ForeignScan node must + * have stored remote tableoid and ctid as exec parameters + */ + if (operation == CMD_UPDATE || operation == CMD_DELETE) + { + Var *v; + int *paramids = NULL; + + paramids = palloc(sizeof(int) * 2); + v = makeNode(Var); + v->varno = resultRelation; + v->vartype = OIDOID; + v->vartypmod = -1; + v->varcollid = InvalidOid; + v->varattno = TableOidAttributeNumber; + paramids[0] = find_param_for_var(root, v); + if (paramids[0] < 0) + elog(ERROR, "Tupler ID parameter is not found"); + + v->vartype = TIDOID; + v->varattno = SelfItemPointerAttributeNumber; + paramids[1] = find_param_for_var(root, v); + if (paramids[1] < 0) + elog(ERROR, "Tupler ID parameter is not found"); + } + /* * Extract the relevant RETURNING list if any. */ @@ -1679,10 +1872,11 @@ postgresPlanForeignModify(PlannerInfo *root, * Build the fdw_private list that will be available to the executor. * Items in the list must match enum FdwModifyPrivateIndex, above. */ - return list_make4(makeString(sql.data), + return list_make5(makeString(sql.data), targetAttrs, makeInteger((retrieved_attrs != NIL)), - retrieved_attrs); + retrieved_attrs, + paramids); } /* @@ -1702,6 +1896,7 @@ postgresBeginForeignModify(ModifyTableState *mtstate, bool has_returning; List *retrieved_attrs; RangeTblEntry *rte; + int *tid_params; /* * Do nothing in EXPLAIN (no ANALYZE) case. resultRelInfo->ri_FdwState @@ -1719,6 +1914,7 @@ postgresBeginForeignModify(ModifyTableState *mtstate, FdwModifyPrivateHasReturning)); retrieved_attrs = (List *) list_nth(fdw_private, FdwModifyPrivateRetrievedAttrs); + tid_params = (int *) list_nth(fdw_private, FdwModifyPrivateTidParams); /* Find RTE. */ rte = rt_fetch(resultRelInfo->ri_RangeTableIndex, @@ -1733,7 +1929,8 @@ postgresBeginForeignModify(ModifyTableState *mtstate, query, target_attrs, has_returning, - retrieved_attrs); + retrieved_attrs, + tid_params); resultRelInfo->ri_FdwState = fmstate; } @@ -1758,7 +1955,7 @@ postgresExecForeignInsert(EState *estate, prepare_foreign_modify(fmstate); /* Convert parameters needed by prepared statement to text form */ - p_values = convert_prep_stmt_params(fmstate, NULL, slot); + p_values = convert_prep_stmt_params(fmstate, InvalidOid, NULL, slot); /* * Execute the prepared statement. @@ -1813,28 +2010,31 @@ postgresExecForeignUpdate(EState *estate, TupleTableSlot *planSlot) { PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState; - Datum datum; - bool isNull; + Datum toiddatum, ctiddatum; const char **p_values; PGresult *res; int n_rows; + int *tid_params = fmstate->tid_params; + ParamExecData *prm; /* Set up the prepared statement on the remote server, if we didn't yet */ if (!fmstate->p_name) prepare_foreign_modify(fmstate); - /* Get the ctid that was passed up as a resjunk column */ - datum = ExecGetJunkAttribute(planSlot, - fmstate->ctidAttno, - &isNull); - /* shouldn't ever get a null result... */ - if (isNull) - elog(ERROR, "ctid is NULL"); + Assert(tid_params); + /* Get the tableoid that was passed up as an exec param */ + prm = &(estate->es_param_exec_vals[tid_params[0]]); + toiddatum = prm->value; + + /* Get the ctid that was passed up as an exec param */ + prm = &(estate->es_param_exec_vals[tid_params[1]]); + ctiddatum = prm->value; /* Convert parameters needed by prepared statement to text form */ p_values = convert_prep_stmt_params(fmstate, - (ItemPointer) DatumGetPointer(datum), - slot); + DatumGetObjectId(toiddatum), + (ItemPointer) DatumGetPointer(ctiddatum), + slot); /* * Execute the prepared statement. @@ -1889,28 +2089,32 @@ postgresExecForeignDelete(EState *estate, TupleTableSlot *planSlot) { PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState; - Datum datum; - bool isNull; + Datum toiddatum, ctiddatum; const char **p_values; PGresult *res; int n_rows; + int *tid_params = fmstate->tid_params; + ParamExecData *prm; /* Set up the prepared statement on the remote server, if we didn't yet */ if (!fmstate->p_name) prepare_foreign_modify(fmstate); + Assert(tid_params); + + /* Get the tableoid that was passed up as a exec param */ + prm = &(estate->es_param_exec_vals[tid_params[0]]); + toiddatum = prm->value; + /* Get the ctid that was passed up as a resjunk column */ - datum = ExecGetJunkAttribute(planSlot, - fmstate->ctidAttno, - &isNull); - /* shouldn't ever get a null result... */ - if (isNull) - elog(ERROR, "ctid is NULL"); + prm = &(estate->es_param_exec_vals[tid_params[1]]); + ctiddatum = prm->value; /* Convert parameters needed by prepared statement to text form */ p_values = convert_prep_stmt_params(fmstate, - (ItemPointer) DatumGetPointer(datum), - NULL); + DatumGetObjectId(toiddatum), + (ItemPointer) DatumGetPointer(ctiddatum), + NULL); /* * Execute the prepared statement. @@ -2058,7 +2262,8 @@ postgresBeginForeignInsert(ModifyTableState *mtstate, sql.data, targetAttrs, retrieved_attrs != NIL, - retrieved_attrs); + retrieved_attrs, + NULL); resultRelInfo->ri_FdwState = fmstate; } @@ -2561,8 +2766,13 @@ postgresExplainForeignScan(ForeignScanState *node, ExplainState *es) */ if (list_length(fdw_private) > FdwScanPrivateRelations) { - relations = strVal(list_nth(fdw_private, FdwScanPrivateRelations)); - ExplainPropertyText("Relations", relations, es); + void *v = list_nth(fdw_private, FdwScanPrivateRelations); + + if (v) + { + relations = strVal(v); + ExplainPropertyText("Relations", relations, es); + } } /* @@ -2673,7 +2883,20 @@ estimate_path_cost_size(PlannerInfo *root, /* Build the list of columns to be fetched from the foreign server. */ if (IS_JOIN_REL(foreignrel) || IS_UPPER_REL(foreignrel)) + { fdw_scan_tlist = build_tlist_to_deparse(foreignrel); + + /* + * If this foreign relation need to get remote tableoid and ctid, + * count them in costing. + */ + if ((root->parse->commandType == CMD_UPDATE || + root->parse->commandType == CMD_DELETE) && + bms_is_member(root->parse->resultRelation, foreignrel->relids)) + fdw_scan_tlist = + add_tidcols_to_tlist(fdw_scan_tlist, + root->parse->resultRelation); + } else fdw_scan_tlist = NIL; @@ -3092,7 +3315,6 @@ create_cursor(ForeignScanState *node) initStringInfo(&buf); appendStringInfo(&buf, "DECLARE c%u CURSOR FOR\n%s", fsstate->cursor_number, fsstate->query); - /* * Notice that we pass NULL for paramTypes, thus forcing the remote server * to infer types for all parameters. Since we explicitly cast every @@ -3286,7 +3508,8 @@ create_foreign_modify(EState *estate, char *query, List *target_attrs, bool has_returning, - List *retrieved_attrs) + List *retrieved_attrs, + int *tid_params) { PgFdwModifyState *fmstate; Relation rel = resultRelInfo->ri_RelationDesc; @@ -3333,7 +3556,7 @@ create_foreign_modify(EState *estate, fmstate->attinmeta = TupleDescGetAttInMetadata(tupdesc); /* Prepare for output conversion of parameters used in prepared stmt. */ - n_params = list_length(fmstate->target_attrs) + 1; + n_params = list_length(fmstate->target_attrs) + 2; fmstate->p_flinfo = (FmgrInfo *) palloc0(sizeof(FmgrInfo) * n_params); fmstate->p_nums = 0; @@ -3341,13 +3564,14 @@ create_foreign_modify(EState *estate, { Assert(subplan != NULL); - /* Find the ctid resjunk column in the subplan's result */ - fmstate->ctidAttno = ExecFindJunkAttributeInTlist(subplan->targetlist, - "ctid"); - if (!AttributeNumberIsValid(fmstate->ctidAttno)) - elog(ERROR, "could not find junk ctid column"); + fmstate->tid_params = tid_params; - /* First transmittable parameter will be ctid */ + /* First transmittable parameter will be table oid */ + getTypeOutputInfo(OIDOID, &typefnoid, &isvarlena); + fmgr_info(typefnoid, &fmstate->p_flinfo[fmstate->p_nums]); + fmstate->p_nums++; + + /* Second transmittable parameter will be ctid */ getTypeOutputInfo(TIDOID, &typefnoid, &isvarlena); fmgr_info(typefnoid, &fmstate->p_flinfo[fmstate->p_nums]); fmstate->p_nums++; @@ -3430,6 +3654,7 @@ prepare_foreign_modify(PgFdwModifyState *fmstate) */ static const char ** convert_prep_stmt_params(PgFdwModifyState *fmstate, + Oid tableoid, ItemPointer tupleid, TupleTableSlot *slot) { @@ -3441,10 +3666,13 @@ convert_prep_stmt_params(PgFdwModifyState *fmstate, p_values = (const char **) palloc(sizeof(char *) * fmstate->p_nums); - /* 1st parameter should be ctid, if it's in use */ - if (tupleid != NULL) + /* First two parameters should be tableoid and ctid, if it's in use */ + if (tableoid != InvalidOid) { /* don't need set_transmission_modes for TID output */ + p_values[pindex] = OutputFunctionCall(&fmstate->p_flinfo[pindex], + ObjectIdGetDatum(tableoid)); + pindex++; p_values[pindex] = OutputFunctionCall(&fmstate->p_flinfo[pindex], PointerGetDatum(tupleid)); pindex++; @@ -5549,6 +5777,7 @@ make_tuple_from_result_row(PGresult *res, bool *nulls; ItemPointer ctid = NULL; Oid oid = InvalidOid; + Oid toid = InvalidOid; ConversionLocation errpos; ErrorContextCallback errcallback; MemoryContext oldcontext; @@ -5609,10 +5838,9 @@ make_tuple_from_result_row(PGresult *res, * Note: we ignore system columns other than ctid and oid in result */ errpos.cur_attno = i; - if (i > 0) + if (i > 0 && i <= tupdesc->natts) { /* ordinary column */ - Assert(i <= tupdesc->natts); nulls[i - 1] = (valstr == NULL); /* Apply the input function even to nulls, to support domains */ values[i - 1] = InputFunctionCall(&attinmeta->attinfuncs[i - 1], @@ -5620,7 +5848,20 @@ make_tuple_from_result_row(PGresult *res, attinmeta->attioparams[i - 1], attinmeta->atttypmods[i - 1]); } - else if (i == SelfItemPointerAttributeNumber) + else if (i == TableOidAttributeNumber || + i == tupdesc->natts + 1) + { + /* table oid */ + if (valstr != NULL) + { + Datum datum; + + datum = DirectFunctionCall1(oidin, CStringGetDatum(valstr)); + toid = DatumGetObjectId(datum); + } + } + else if (i == SelfItemPointerAttributeNumber || + i == tupdesc->natts + 2) { /* ctid */ if (valstr != NULL) @@ -5691,6 +5932,9 @@ make_tuple_from_result_row(PGresult *res, if (OidIsValid(oid)) HeapTupleSetOid(tuple, oid); + if (OidIsValid(toid)) + tuple->t_tableOid = toid; + /* Clean up */ MemoryContextReset(temp_context); diff --git a/contrib/postgres_fdw/postgres_fdw.h b/contrib/postgres_fdw/postgres_fdw.h index a5d4011e8d..39e5581125 100644 --- a/contrib/postgres_fdw/postgres_fdw.h +++ b/contrib/postgres_fdw/postgres_fdw.h @@ -108,6 +108,8 @@ typedef struct PgFdwRelationInfo * representing the relation. */ int relation_index; + + Bitmapset *param_attrs; /* attrs required for modification */ } PgFdwRelationInfo; /* in postgres_fdw.c */