On Fri, Apr 16, 2021 at 3:23 PM Bharath Rupireddy <
bharath.rupireddyforpostg...@gmail.com> wrote:
>
> On Tue, Mar 16, 2021 at 2:21 AM Tom Lane <t...@sss.pgh.pa.us> wrote:
> >
> > Bharath Rupireddy <bharath.rupireddyforpostg...@gmail.com> writes:
> > > Thanks for pointing to the changes in the commit message. I corrected
> > > them. Attaching v4 patch set, consider it for further review.
> >
> > I took a quick look at this.  I'm quite worried about the potential
> > performance cost of the v4-0001 patch (the one for fixing
> > slot_store_error_callback).  Previously, we didn't pay any noticeable
> > cost for having the callback unless there actually was an error.
> > As patched, we perform several catalog lookups per column per row,
> > even in the non-error code path.  That seems like it'd be a noticeable
> > performance hit.  Just to add insult to injury, it leaks memory.
> >
> > I propose a more radical but simpler solution: let's just not bother
> > with including the type names in the context message.  How much are
> > they really buying?
>
> << Attaching v5-0001 here again for completion >>
> I'm attaching v5-0001 patch that avoids printing the column type names
> in the context message thus no cache lookups have to be done in the
> error context callback. I think the column name is enough to know on
> which column the error occurred and if required it's type can be known
> by the user. This patch gets rid of printing local and remote type
> names in slot_store_error_callback and also
> logicalrep_typmap_gettypname because it's unnecessary. Thoughts?
>
> > v4-0002 (for postgres_fdw's conversion_error_callback) has the same
> > problems, although mitigated a bit by not needing to do any catalog
> > lookups in the non-join case.  For the join case, I wonder whether
> > we could push the lookups back to plan setup time, so they don't
> > need to be done over again for each row.  (Not doing lookups at all
> > doesn't seem attractive there; it'd render the context message nearly
> > useless.)  A different idea is to try to get the column names out
> > of the query's rangetable instead of going to the catalogs.
>
> I'm attaching v5-0002 which stores the required attribute information
> for foreign joins in postgresBeginForeignScan which is a one time job
> as opposed to the per-row system catalog lookup v4-0001 was doing. I'm
> storing the foreign table relid(as key), relname and the retrieved
> attributes' attnum and attname into a hash table. Whenever a
> conversion error occurs, using relid, the hash table is looked up to
> fetch the relname and attname. Thoughts?

Attaching rebased v6 patches.

With Regards,
Bharath Rupireddy.
EnterpriseDB: http://www.enterprisedb.com
From a65e13d591bb151eb71eb8ec42834ce28a3db304 Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <bharath.rupireddy@enterprisedb.com>
Date: Wed, 14 Apr 2021 13:25:16 +0530
Subject: [PATCH v6] Avoid Catalogue Accesses In slot_store_error_callback

Avoid accessing system catalogues inside slot_store_error_callback
error context callback, because the the entire transaction might
have been broken at this point. logicalrep_typmap_gettypname()
and format_type_be could search the sys cache.

As per suggestion from Tom, a simple solution is to just avoid
printing the column type names in the message, just the column
name is enough to know on which column the error occurred.

The above solution makes logicalrep_typmap_gettypname redundant
hence removed it.
---
 src/backend/replication/logical/relation.c | 49 ----------------------
 src/backend/replication/logical/worker.c   | 25 +----------
 src/include/replication/logicalrelation.h  |  2 -
 3 files changed, 2 insertions(+), 74 deletions(-)

diff --git a/src/backend/replication/logical/relation.c b/src/backend/replication/logical/relation.c
index e861c0ff80..cbc5612dbe 100644
--- a/src/backend/replication/logical/relation.c
+++ b/src/backend/replication/logical/relation.c
@@ -490,55 +490,6 @@ logicalrep_typmap_update(LogicalRepTyp *remotetyp)
 	MemoryContextSwitchTo(oldctx);
 }
 
-/*
- * Fetch type name from the cache by remote type OID.
- *
- * Return a substitute value if we cannot find the data type; no message is
- * sent to the log in that case, because this is used by error callback
- * already.
- */
-char *
-logicalrep_typmap_gettypname(Oid remoteid)
-{
-	LogicalRepTyp *entry;
-	bool		found;
-
-	/* Internal types are mapped directly. */
-	if (remoteid < FirstGenbkiObjectId)
-	{
-		if (!get_typisdefined(remoteid))
-		{
-			/*
-			 * This can be caused by having a publisher with a higher
-			 * PostgreSQL major version than the subscriber.
-			 */
-			return psprintf("unrecognized %u", remoteid);
-		}
-
-		return format_type_be(remoteid);
-	}
-
-	if (LogicalRepTypMap == NULL)
-	{
-		/*
-		 * If the typemap is not initialized yet, we cannot possibly attempt
-		 * to search the hash table; but there's no way we know the type
-		 * locally yet, since we haven't received a message about this type,
-		 * so this is the best we can do.
-		 */
-		return psprintf("unrecognized %u", remoteid);
-	}
-
-	/* search the mapping */
-	entry = hash_search(LogicalRepTypMap, (void *) &remoteid,
-						HASH_FIND, &found);
-	if (!found)
-		return psprintf("unrecognized %u", remoteid);
-
-	Assert(OidIsValid(entry->remoteid));
-	return psprintf("%s.%s", entry->nspname, entry->typname);
-}
-
 /*
  * Partition cache: look up partition LogicalRepRelMapEntry's
  *
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index fb3ba5c415..6494bef1d9 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -132,7 +132,6 @@ static dlist_head lsn_mapping = DLIST_STATIC_INIT(lsn_mapping);
 typedef struct SlotErrCallbackArg
 {
 	LogicalRepRelMapEntry *rel;
-	int			local_attnum;
 	int			remote_attnum;
 } SlotErrCallbackArg;
 
@@ -431,29 +430,15 @@ slot_store_error_callback(void *arg)
 {
 	SlotErrCallbackArg *errarg = (SlotErrCallbackArg *) arg;
 	LogicalRepRelMapEntry *rel;
-	char	   *remotetypname;
-	Oid			remotetypoid,
-				localtypoid;
 
 	/* Nothing to do if remote attribute number is not set */
 	if (errarg->remote_attnum < 0)
 		return;
 
 	rel = errarg->rel;
-	remotetypoid = rel->remoterel.atttyps[errarg->remote_attnum];
-
-	/* Fetch remote type name from the LogicalRepTypMap cache */
-	remotetypname = logicalrep_typmap_gettypname(remotetypoid);
-
-	/* Fetch local type OID from the local sys cache */
-	localtypoid = get_atttype(rel->localreloid, errarg->local_attnum + 1);
-
-	errcontext("processing remote data for replication target relation \"%s.%s\" column \"%s\", "
-			   "remote type %s, local type %s",
+	errcontext("processing remote data for replication target relation \"%s.%s\" column \"%s\"",
 			   rel->remoterel.nspname, rel->remoterel.relname,
-			   rel->remoterel.attnames[errarg->remote_attnum],
-			   remotetypname,
-			   format_type_be(localtypoid));
+			   rel->remoterel.attnames[errarg->remote_attnum]);
 }
 
 /*
@@ -474,7 +459,6 @@ slot_store_data(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
 
 	/* Push callback + info on the error context stack */
 	errarg.rel = rel;
-	errarg.local_attnum = -1;
 	errarg.remote_attnum = -1;
 	errcallback.callback = slot_store_error_callback;
 	errcallback.arg = (void *) &errarg;
@@ -494,7 +478,6 @@ slot_store_data(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
 
 			Assert(remoteattnum < tupleData->ncols);
 
-			errarg.local_attnum = i;
 			errarg.remote_attnum = remoteattnum;
 
 			if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_TEXT)
@@ -543,7 +526,6 @@ slot_store_data(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
 				slot->tts_isnull[i] = true;
 			}
 
-			errarg.local_attnum = -1;
 			errarg.remote_attnum = -1;
 		}
 		else
@@ -600,7 +582,6 @@ slot_modify_data(TupleTableSlot *slot, TupleTableSlot *srcslot,
 
 	/* For error reporting, push callback + info on the error context stack */
 	errarg.rel = rel;
-	errarg.local_attnum = -1;
 	errarg.remote_attnum = -1;
 	errcallback.callback = slot_store_error_callback;
 	errcallback.arg = (void *) &errarg;
@@ -623,7 +604,6 @@ slot_modify_data(TupleTableSlot *slot, TupleTableSlot *srcslot,
 		{
 			StringInfo	colvalue = &tupleData->colvalues[remoteattnum];
 
-			errarg.local_attnum = i;
 			errarg.remote_attnum = remoteattnum;
 
 			if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_TEXT)
@@ -668,7 +648,6 @@ slot_modify_data(TupleTableSlot *slot, TupleTableSlot *srcslot,
 				slot->tts_isnull[i] = true;
 			}
 
-			errarg.local_attnum = -1;
 			errarg.remote_attnum = -1;
 		}
 	}
diff --git a/src/include/replication/logicalrelation.h b/src/include/replication/logicalrelation.h
index 3f0b3deefb..0f67a64bb3 100644
--- a/src/include/replication/logicalrelation.h
+++ b/src/include/replication/logicalrelation.h
@@ -47,6 +47,4 @@ extern void logicalrep_rel_close(LogicalRepRelMapEntry *rel,
 								 LOCKMODE lockmode);
 
 extern void logicalrep_typmap_update(LogicalRepTyp *remotetyp);
-extern char *logicalrep_typmap_gettypname(Oid remoteid);
-
 #endif							/* LOGICALRELATION_H */
-- 
2.25.1

From 79b495cb17baad45ba903cd60587c2c94a2350b2 Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <bharath.rupireddy@enterprisedb.com>
Date: Sun, 16 May 2021 16:52:27 +0530
Subject: [PATCH v6] Avoid Catalogue Accesses In conversion_error_callback

Avoid accessing system catalogs inside conversion_error_callback
error context callback, because the entire transaction might
have been broken at this point.

In case of foreign joins, the conversion_error_callback does
system catalog lookups via get_attname() and get_rel_name().
So store the required attribute and relation information in
PgFdwScanState of ForeignScanState before beginning the foreign
scan. This is one time job, so no extra performance hit. The
stored information will be used in conversion_error_callback.
---
 contrib/postgres_fdw/postgres_fdw.c | 167 ++++++++++++++++++++++++++--
 1 file changed, 157 insertions(+), 10 deletions(-)

diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c
index c48a421e88..acbb331c65 100644
--- a/contrib/postgres_fdw/postgres_fdw.c
+++ b/contrib/postgres_fdw/postgres_fdw.c
@@ -171,6 +171,12 @@ typedef struct PgFdwScanState
 	MemoryContext temp_cxt;		/* context for per-tuple temporary data */
 
 	int			fetch_size;		/* number of tuples per fetch */
+
+	/*
+	 * Hash Table to store information that is used to identify the attribute
+	 * when data conversion fails in foreign joins.
+	 */
+	HTAB 		*fj_htab;
 } PgFdwScanState;
 
 /*
@@ -297,6 +303,18 @@ typedef struct
 	int64		offset_est;
 } PgFdwPathExtraData;
 
+/*
+ * Information used to identify the attribute when data conversion fails in
+ * foreign joins.
+ */
+typedef struct ForeignJoinRelInfo
+{
+	Oid		relid;		/* relid of the foreign table */
+	char    *relname;	/* name of the foreign table */
+	List    *attnos;	/* list of attribute numbers */
+	List    *attnames;	/* list of foreign table attribute names */
+} ForeignJoinRelInfo;
+
 /*
  * Identify the attribute where data conversion fails.
  */
@@ -546,7 +564,7 @@ static void merge_fdw_options(PgFdwRelationInfo *fpinfo,
 							  const PgFdwRelationInfo *fpinfo_o,
 							  const PgFdwRelationInfo *fpinfo_i);
 static int	get_batch_size_option(Relation rel);
-
+static void fill_foreign_join_att_info(ForeignScanState *node);
 
 /*
  * Foreign-data wrapper handler function: return a struct with pointers
@@ -1439,6 +1457,107 @@ postgresGetForeignPlan(PlannerInfo *root,
 							outer_plan);
 }
 
+/*
+ * Capture foreign table and attribute info into a hash table in case of
+ * foreign joins which will be used in conversion error callback avoiding
+ * system catalog lookups.
+ */
+static void
+fill_foreign_join_att_info(ForeignScanState *node)
+{
+	ForeignScan *fsplan = (ForeignScan *) node->ss.ps.plan;
+	EState	   *estate = node->ss.ps.state;
+	PgFdwScanState *fsstate;
+	ListCell   *lc;
+
+	fsstate = (PgFdwScanState *) node->fdw_state;
+
+	/*
+	 * For base foreign relations, the required information can be fetched
+	 * without system catalog lookups, so no need of doing this extra work,
+	 * just return.
+	 */
+	if (fsplan->scan.scanrelid > 0 && fsstate->rel)
+		return;
+
+	/* If there are no foreign relation attributes retrieved, just return. */
+	if (!fsstate->retrieved_attrs)
+		return;
+
+	Assert(fsplan->scan.scanrelid == 0 && !fsstate->rel &&
+		   list_length(fsstate->retrieved_attrs) > 0);
+
+	/* Capture the attribute info in case of foreign joins. */
+	foreach(lc, fsstate->retrieved_attrs)
+	{
+		TargetEntry *tle;
+		int	i;
+
+		i = lfirst_int(lc);
+		tle = list_nth_node(TargetEntry, fsplan->fdw_scan_tlist, i - 1);
+
+		if (IsA(tle->expr, Var))
+		{
+			RangeTblEntry *rte;
+			Var		*var = (Var *) tle->expr;
+			char 	*attname;
+			char    *relname;
+			ForeignJoinRelInfo *fj_rel_info;
+			bool	found;
+
+			rte = exec_rt_fetch(var->varno, estate);
+
+			/*
+			 * Even if var->varattno is 0/invalid, we add attname as NULL to
+			 * the list, so that in the error callback we could detect the
+			 * whole row with it.
+			 */
+			if (AttributeNumberIsValid(var->varattno))
+				attname = get_attname(rte->relid, var->varattno, false);
+			else
+				attname = NULL;
+
+			relname = get_rel_name(rte->relid);
+
+			/*
+			 * First time through, initialize the hashtable for storing
+			 * attribute info in case of foreign joins.
+			 */
+			if (!fsstate->fj_htab)
+			{
+				HASHCTL		hctl;
+
+				memset(&hctl, 0, sizeof(HASHCTL));
+				hctl.keysize = sizeof(Oid);
+				hctl.entrysize = sizeof(ForeignJoinRelInfo);
+				hctl.hcxt = CurrentMemoryContext;
+
+				fsstate->fj_htab = hash_create("Attribute Info for Foreign Table Joins",
+												32,	/* start small and extend */
+												&hctl,
+												HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
+			}
+
+			/* Find or create cached entry for the foreign table */
+			fj_rel_info = hash_search(fsstate->fj_htab,
+									  &rte->relid,
+									  HASH_ENTER,
+									  &found);
+
+			if (!found)
+			{
+				fj_rel_info->relid = rte->relid;
+				fj_rel_info->relname = relname;
+				fj_rel_info->attnames = NULL;
+				fj_rel_info->attnos = NULL;
+			}
+
+			fj_rel_info->attnames = lappend(fj_rel_info->attnames, makeString(attname));
+			fj_rel_info->attnos = lappend_int(fj_rel_info->attnos, i);
+		}
+	}
+}
+
 /*
  * postgresBeginForeignScan
  *		Initiate an executor scan of a foreign PostgreSQL table.
@@ -1543,6 +1662,9 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags)
 
 	/* Set the async-capable flag */
 	fsstate->async_capable = node->ss.ps.async_capable;
+
+	/* Capture the attribute info in case of foreign joins. */
+	fill_foreign_join_att_info(node);
 }
 
 /*
@@ -7154,7 +7276,6 @@ conversion_error_callback(void *arg)
 		/* error occurred in a scan against a foreign join */
 		ForeignScanState *fsstate = errpos->fsstate;
 		ForeignScan *fsplan = castNode(ForeignScan, fsstate->ss.ps.plan);
-		EState	   *estate = fsstate->ss.ps.state;
 		TargetEntry *tle;
 
 		tle = list_nth_node(TargetEntry, fsplan->fdw_scan_tlist,
@@ -7167,17 +7288,43 @@ conversion_error_callback(void *arg)
 		 */
 		if (IsA(tle->expr, Var))
 		{
-			RangeTblEntry *rte;
-			Var		   *var = (Var *) tle->expr;
-
+			RangeTblEntry	*rte;
+			Var		*var;
+			EState	   *estate;
+			ListCell	*lc1;
+			ListCell 	*lc2;
+			ForeignJoinRelInfo *fj_rel_info;
+			bool	found;
+			PgFdwScanState	*fdwstate;
+
+			fdwstate = (PgFdwScanState *) fsstate->fdw_state;
+			estate = fsstate->ss.ps.state;
+			var = (Var *) tle->expr;
 			rte = exec_rt_fetch(var->varno, estate);
 
-			if (var->varattno == 0)
-				is_wholerow = true;
-			else
-				attname = get_attname(rte->relid, var->varattno, false);
+			Assert(fdwstate && fdwstate->fj_htab);
 
-			relname = get_rel_name(rte->relid);
+			fj_rel_info = (ForeignJoinRelInfo *) hash_search(fdwstate->fj_htab,
+															 (void *) &rte->relid,
+															 HASH_FIND,
+															 &found);
+
+			Assert(found && fj_rel_info && fj_rel_info->relid == rte->relid);
+
+			relname = fj_rel_info->relname;
+
+			forboth(lc1, fj_rel_info->attnos, lc2, fj_rel_info->attnames)
+			{
+				if (lfirst_int(lc1) == errpos->cur_attno)
+				{
+					attname = strVal(lfirst(lc2));
+
+					if (!attname)
+						is_wholerow = true;
+
+					break;
+				}
+			}
 		}
 		else
 			errcontext("processing expression at position %d in select list",
-- 
2.25.1

Reply via email to