From 1d2873a9e33fe2784aa6131e8b3973be9afe81b3 Mon Sep 17 00:00:00 2001
From: Masahiko Sawada <sawada.mshk@gmail.com>
Date: Tue, 25 Feb 2025 13:58:58 -0800
Subject: [PATCH v33 2/2] Refactor COPY FROM to use format callback functions.

This commit introduces a new CopyFromRoutine struct, which is a set of
callback routines to read tuples in a specific format. It also makes
COPY FROM with the existing formats (text, CSV, and binary) utilize
these format callbacks.

This change is a preliminary step towards making the COPY TO command
extensible in terms of output formats.

Similar to XXXX, this refactoring contributes to a performance
improvement by reducing the number of "if" branches that need to be
checked on a per-row basis when sending field representations in text
or CSV mode. The performance benchmark results showed ~5% performance
gain in text or CSV mode.

Author: Sutou Kouhei <kou@clear-code.com>
Reviewed-by: Masahiko Sawada <sawada.mshk@gmail.com>
Reviewed-by: Michael Paquier <michael@paquier.xyz>
Reviewed-by: Andres Freund <andres@anarazel.de>
Reviewed-by: Tomas Vondra <tomas.vondra@enterprisedb.com>
Reviewed-by: Junwang Zhao <zhjwpku@gmail.com>
Discussion: https://postgr.es/m/20231204.153548.2126325458835528809.kou@clear-code.com
---
 src/backend/commands/copyfrom.c          | 192 +++++++---
 src/backend/commands/copyfromparse.c     | 443 +++++++++++++----------
 src/include/commands/copy.h              |   2 -
 src/include/commands/copyapi.h           |  48 ++-
 src/include/commands/copyfrom_internal.h |  11 +
 src/tools/pgindent/typedefs.list         |   1 +
 6 files changed, 453 insertions(+), 244 deletions(-)

diff --git a/src/backend/commands/copyfrom.c b/src/backend/commands/copyfrom.c
index 8875d79d59a..4c2479974c4 100644
--- a/src/backend/commands/copyfrom.c
+++ b/src/backend/commands/copyfrom.c
@@ -28,7 +28,7 @@
 #include "access/tableam.h"
 #include "access/xact.h"
 #include "catalog/namespace.h"
-#include "commands/copy.h"
+#include "commands/copyapi.h"
 #include "commands/copyfrom_internal.h"
 #include "commands/progress.h"
 #include "commands/trigger.h"
@@ -106,6 +106,145 @@ typedef struct CopyMultiInsertInfo
 /* non-export function prototypes */
 static void ClosePipeFromProgram(CopyFromState cstate);
 
+/*
+ * Built-in format-specific routines. One-row callbacks are defined in
+ * copyfromparse.c
+ */
+static void CopyFromTextLikeInFunc(CopyFromState cstate, Oid atttypid, FmgrInfo *finfo,
+								   Oid *typioparam);
+static void CopyFromTextLikeStart(CopyFromState cstate, TupleDesc tupDesc);
+static void CopyFromTextLikeEnd(CopyFromState cstate);
+static void CopyFromBinaryInFunc(CopyFromState cstate, Oid atttypid,
+								 FmgrInfo *finfo, Oid *typioparam);
+static void CopyFromBinaryStart(CopyFromState cstate, TupleDesc tupDesc);
+static void CopyFromBinaryEnd(CopyFromState cstate);
+
+
+/*
+ * COPY FROM routines for built-in formats.
+ *
+ * CSV and text formats share the same TextLike routines except for the
+ * one-row callback.
+ */
+
+/* text format */
+static const CopyFromRoutine CopyFromRoutineText = {
+	.CopyFromInFunc = CopyFromTextLikeInFunc,
+	.CopyFromStart = CopyFromTextLikeStart,
+	.CopyFromOneRow = CopyFromTextOneRow,
+	.CopyFromEnd = CopyFromTextLikeEnd,
+};
+
+/* CSV format */
+static const CopyFromRoutine CopyFromRoutineCSV = {
+	.CopyFromInFunc = CopyFromTextLikeInFunc,
+	.CopyFromStart = CopyFromTextLikeStart,
+	.CopyFromOneRow = CopyFromCSVOneRow,
+	.CopyFromEnd = CopyFromTextLikeEnd,
+};
+
+/* binary format */
+static const CopyFromRoutine CopyFromRoutineBinary = {
+	.CopyFromInFunc = CopyFromBinaryInFunc,
+	.CopyFromStart = CopyFromBinaryStart,
+	.CopyFromOneRow = CopyFromBinaryOneRow,
+	.CopyFromEnd = CopyFromBinaryEnd,
+};
+
+/* Return a COPY FROM routine for the given options */
+static const CopyFromRoutine *
+CopyFromGetRoutine(CopyFormatOptions opts)
+{
+	if (opts.csv_mode)
+		return &CopyFromRoutineCSV;
+	else if (opts.binary)
+		return &CopyFromRoutineBinary;
+
+	/* default is text */
+	return &CopyFromRoutineText;
+}
+
+/* Implementation of the start callback for text and CSV formats */
+static void
+CopyFromTextLikeStart(CopyFromState cstate, TupleDesc tupDesc)
+{
+	AttrNumber	attr_count;
+
+	/*
+	 * If encoding conversion is needed, we need another buffer to hold the
+	 * converted input data.  Otherwise, we can just point input_buf to the
+	 * same buffer as raw_buf.
+	 */
+	if (cstate->need_transcoding)
+	{
+		cstate->input_buf = (char *) palloc(INPUT_BUF_SIZE + 1);
+		cstate->input_buf_index = cstate->input_buf_len = 0;
+	}
+	else
+		cstate->input_buf = cstate->raw_buf;
+	cstate->input_reached_eof = false;
+
+	initStringInfo(&cstate->line_buf);
+
+	/*
+	 * Create workspace for CopyReadAttributes results; used by CSV and text
+	 * format.
+	 */
+	attr_count = list_length(cstate->attnumlist);
+	cstate->max_fields = attr_count;
+	cstate->raw_fields = (char **) palloc(attr_count * sizeof(char *));
+}
+
+/*
+ * Implementation of the infunc callback for text and CSV formats. Assign
+ * the input function data to the given *finfo.
+ */
+static void
+CopyFromTextLikeInFunc(CopyFromState cstate, Oid atttypid, FmgrInfo *finfo,
+					   Oid *typioparam)
+{
+	Oid			func_oid;
+
+	getTypeInputInfo(atttypid, &func_oid, typioparam);
+	fmgr_info(func_oid, finfo);
+}
+
+/* Implementation of the end callback for text and CSV formats */
+static void
+CopyFromTextLikeEnd(CopyFromState cstate)
+{
+	/* nothing to do */
+}
+
+/* Implementation of the start callback for binary format */
+static void
+CopyFromBinaryStart(CopyFromState cstate, TupleDesc tupDesc)
+{
+	/* Read and verify binary header */
+	ReceiveCopyBinaryHeader(cstate);
+}
+
+/*
+ * Implementation of the infunc callback for binary format. Assign
+ * the binary input function to the given *finfo.
+ */
+static void
+CopyFromBinaryInFunc(CopyFromState cstate, Oid atttypid,
+					 FmgrInfo *finfo, Oid *typioparam)
+{
+	Oid			func_oid;
+
+	getTypeBinaryInputInfo(atttypid, &func_oid, typioparam);
+	fmgr_info(func_oid, finfo);
+}
+
+/* Implementation of the end callback for binary format */
+static void
+CopyFromBinaryEnd(CopyFromState cstate)
+{
+	/* nothing to do */
+}
+
 /*
  * error context callback for COPY FROM
  *
@@ -1403,7 +1542,6 @@ BeginCopyFrom(ParseState *pstate,
 				num_defaults;
 	FmgrInfo   *in_functions;
 	Oid		   *typioparams;
-	Oid			in_func_oid;
 	int		   *defmap;
 	ExprState **defexprs;
 	MemoryContext oldcontext;
@@ -1435,6 +1573,9 @@ BeginCopyFrom(ParseState *pstate,
 	/* Extract options from the statement node tree */
 	ProcessCopyOptions(pstate, &cstate->opts, true /* is_from */ , options);
 
+	/* Set the format routine */
+	cstate->routine = CopyFromGetRoutine(cstate->opts);
+
 	/* Process the target relation */
 	cstate->rel = rel;
 
@@ -1590,25 +1731,6 @@ BeginCopyFrom(ParseState *pstate,
 	cstate->raw_buf_index = cstate->raw_buf_len = 0;
 	cstate->raw_reached_eof = false;
 
-	if (!cstate->opts.binary)
-	{
-		/*
-		 * If encoding conversion is needed, we need another buffer to hold
-		 * the converted input data.  Otherwise, we can just point input_buf
-		 * to the same buffer as raw_buf.
-		 */
-		if (cstate->need_transcoding)
-		{
-			cstate->input_buf = (char *) palloc(INPUT_BUF_SIZE + 1);
-			cstate->input_buf_index = cstate->input_buf_len = 0;
-		}
-		else
-			cstate->input_buf = cstate->raw_buf;
-		cstate->input_reached_eof = false;
-
-		initStringInfo(&cstate->line_buf);
-	}
-
 	initStringInfo(&cstate->attribute_buf);
 
 	/* Assign range table and rteperminfos, we'll need them in CopyFrom. */
@@ -1641,13 +1763,9 @@ BeginCopyFrom(ParseState *pstate,
 			continue;
 
 		/* Fetch the input function and typioparam info */
-		if (cstate->opts.binary)
-			getTypeBinaryInputInfo(att->atttypid,
-								   &in_func_oid, &typioparams[attnum - 1]);
-		else
-			getTypeInputInfo(att->atttypid,
-							 &in_func_oid, &typioparams[attnum - 1]);
-		fmgr_info(in_func_oid, &in_functions[attnum - 1]);
+		cstate->routine->CopyFromInFunc(cstate, att->atttypid,
+										&in_functions[attnum - 1],
+										&typioparams[attnum - 1]);
 
 		/* Get default info if available */
 		defexprs[attnum - 1] = NULL;
@@ -1782,20 +1900,7 @@ BeginCopyFrom(ParseState *pstate,
 
 	pgstat_progress_update_multi_param(3, progress_cols, progress_vals);
 
-	if (cstate->opts.binary)
-	{
-		/* Read and verify binary header */
-		ReceiveCopyBinaryHeader(cstate);
-	}
-
-	/* create workspace for CopyReadAttributes results */
-	if (!cstate->opts.binary)
-	{
-		AttrNumber	attr_count = list_length(cstate->attnumlist);
-
-		cstate->max_fields = attr_count;
-		cstate->raw_fields = (char **) palloc(attr_count * sizeof(char *));
-	}
+	cstate->routine->CopyFromStart(cstate, tupDesc);
 
 	MemoryContextSwitchTo(oldcontext);
 
@@ -1808,6 +1913,9 @@ BeginCopyFrom(ParseState *pstate,
 void
 EndCopyFrom(CopyFromState cstate)
 {
+	/* Invoke the end callback */
+	cstate->routine->CopyFromEnd(cstate);
+
 	/* No COPY FROM related resources except memory. */
 	if (cstate->is_program)
 	{
diff --git a/src/backend/commands/copyfromparse.c b/src/backend/commands/copyfromparse.c
index caccdc8563c..d5d5fda0ae5 100644
--- a/src/backend/commands/copyfromparse.c
+++ b/src/backend/commands/copyfromparse.c
@@ -62,7 +62,7 @@
 #include <unistd.h>
 #include <sys/stat.h>
 
-#include "commands/copy.h"
+#include "commands/copyapi.h"
 #include "commands/copyfrom_internal.h"
 #include "commands/progress.h"
 #include "executor/executor.h"
@@ -140,8 +140,8 @@ static const char BinarySignature[11] = "PGCOPY\n\377\r\n\0";
 
 
 /* non-export function prototypes */
-static bool CopyReadLine(CopyFromState cstate);
-static bool CopyReadLineText(CopyFromState cstate);
+static bool CopyReadLine(CopyFromState cstate, bool is_csv);
+static bool CopyReadLineText(CopyFromState cstate, bool is_csv);
 static int	CopyReadAttributesText(CopyFromState cstate);
 static int	CopyReadAttributesCSV(CopyFromState cstate);
 static Datum CopyReadBinaryAttribute(CopyFromState cstate, FmgrInfo *flinfo,
@@ -740,9 +740,11 @@ CopyReadBinaryData(CopyFromState cstate, char *dest, int nbytes)
  * in the relation.
  *
  * NOTE: force_not_null option are not applied to the returned fields.
+ *
+ * We use pg_attribute_always_inline to reduce function call overheads.
  */
-bool
-NextCopyFromRawFields(CopyFromState cstate, char ***fields, int *nfields)
+static pg_attribute_always_inline bool
+NextCopyFromRawFields(CopyFromState cstate, char ***fields, int *nfields, bool is_csv)
 {
 	int			fldct;
 	bool		done;
@@ -759,13 +761,13 @@ NextCopyFromRawFields(CopyFromState cstate, char ***fields, int *nfields)
 		tupDesc = RelationGetDescr(cstate->rel);
 
 		cstate->cur_lineno++;
-		done = CopyReadLine(cstate);
+		done = CopyReadLine(cstate, is_csv);
 
 		if (cstate->opts.header_line == COPY_HEADER_MATCH)
 		{
 			int			fldnum;
 
-			if (cstate->opts.csv_mode)
+			if (is_csv)
 				fldct = CopyReadAttributesCSV(cstate);
 			else
 				fldct = CopyReadAttributesText(cstate);
@@ -809,7 +811,7 @@ NextCopyFromRawFields(CopyFromState cstate, char ***fields, int *nfields)
 	cstate->cur_lineno++;
 
 	/* Actually read the line into memory here */
-	done = CopyReadLine(cstate);
+	done = CopyReadLine(cstate, is_csv);
 
 	/*
 	 * EOF at start of line means we're done.  If we see EOF after some
@@ -819,8 +821,10 @@ NextCopyFromRawFields(CopyFromState cstate, char ***fields, int *nfields)
 	if (done && cstate->line_buf.len == 0)
 		return false;
 
-	/* Parse the line into de-escaped field values */
-	if (cstate->opts.csv_mode)
+	/*
+	 * Parse the line into de-escaped field values.
+	 */
+	if (is_csv)
 		fldct = CopyReadAttributesCSV(cstate);
 	else
 		fldct = CopyReadAttributesText(cstate);
@@ -847,233 +851,274 @@ NextCopyFrom(CopyFromState cstate, ExprContext *econtext,
 {
 	TupleDesc	tupDesc;
 	AttrNumber	num_phys_attrs,
-				attr_count,
 				num_defaults = cstate->num_defaults;
-	FmgrInfo   *in_functions = cstate->in_functions;
-	Oid		   *typioparams = cstate->typioparams;
 	int			i;
 	int		   *defmap = cstate->defmap;
 	ExprState **defexprs = cstate->defexprs;
 
 	tupDesc = RelationGetDescr(cstate->rel);
 	num_phys_attrs = tupDesc->natts;
-	attr_count = list_length(cstate->attnumlist);
 
 	/* Initialize all values for row to NULL */
 	MemSet(values, 0, num_phys_attrs * sizeof(Datum));
 	MemSet(nulls, true, num_phys_attrs * sizeof(bool));
 	MemSet(cstate->defaults, false, num_phys_attrs * sizeof(bool));
 
-	if (!cstate->opts.binary)
+	/* Get one row from source */
+	if (!cstate->routine->CopyFromOneRow(cstate, econtext, values, nulls))
+		return false;
+
+	/*
+	 * Now compute and insert any defaults available for the columns not
+	 * provided by the input data.  Anything not processed here or above will
+	 * remain NULL.
+	 */
+	for (i = 0; i < num_defaults; i++)
 	{
-		char	  **field_strings;
-		ListCell   *cur;
-		int			fldct;
-		int			fieldno;
-		char	   *string;
+		/*
+		 * The caller must supply econtext and have switched into the
+		 * per-tuple memory context in it.
+		 */
+		Assert(econtext != NULL);
+		Assert(CurrentMemoryContext == econtext->ecxt_per_tuple_memory);
 
-		/* read raw fields in the next line */
-		if (!NextCopyFromRawFields(cstate, &field_strings, &fldct))
-			return false;
+		values[defmap[i]] = ExecEvalExpr(defexprs[defmap[i]], econtext,
+										 &nulls[defmap[i]]);
+	}
+
+	return true;
+}
+
+/* Implementation of the per-row callback for text format */
+bool
+CopyFromTextOneRow(CopyFromState cstate, ExprContext *econtext, Datum *values,
+				   bool *nulls)
+{
+	return CopyFromTextLikeOneRow(cstate, econtext, values, nulls, false);
+}
+
+/* Implementation of the per-row callback for CSV format */
+bool
+CopyFromCSVOneRow(CopyFromState cstate, ExprContext *econtext, Datum *values,
+				  bool *nulls)
+{
+	return CopyFromTextLikeOneRow(cstate, econtext, values, nulls, true);
+}
+
+/*
+ * Workhorse for CopyFromTextOneRow() and CopyFromCSVOneRow().
+ *
+ * We use pg_attribute_always_inline to reduce function call overheads.
+ */
+static pg_attribute_always_inline bool
+CopyFromTextLikeOneRow(CopyFromState cstate, ExprContext *econtext,
+					   Datum *values, bool *nulls, bool is_csv)
+{
+	TupleDesc	tupDesc;
+	AttrNumber	attr_count;
+	FmgrInfo   *in_functions = cstate->in_functions;
+	Oid		   *typioparams = cstate->typioparams;
+	ExprState **defexprs = cstate->defexprs;
+	char	  **field_strings;
+	ListCell   *cur;
+	int			fldct;
+	int			fieldno;
+	char	   *string;
+
+	tupDesc = RelationGetDescr(cstate->rel);
+	attr_count = list_length(cstate->attnumlist);
+
+	/* read raw fields in the next line */
+	if (!NextCopyFromRawFields(cstate, &field_strings, &fldct, is_csv))
+		return false;
+
+	/* check for overflowing fields */
+	if (attr_count > 0 && fldct > attr_count)
+		ereport(ERROR,
+				(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
+				 errmsg("extra data after last expected column")));
+
+	fieldno = 0;
+
+	/* Loop to read the user attributes on the line. */
+	foreach(cur, cstate->attnumlist)
+	{
+		int			attnum = lfirst_int(cur);
+		int			m = attnum - 1;
+		Form_pg_attribute att = TupleDescAttr(tupDesc, m);
 
-		/* check for overflowing fields */
-		if (attr_count > 0 && fldct > attr_count)
+		if (fieldno >= fldct)
 			ereport(ERROR,
 					(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
-					 errmsg("extra data after last expected column")));
-
-		fieldno = 0;
+					 errmsg("missing data for column \"%s\"",
+							NameStr(att->attname))));
+		string = field_strings[fieldno++];
 
-		/* Loop to read the user attributes on the line. */
-		foreach(cur, cstate->attnumlist)
+		if (cstate->convert_select_flags &&
+			!cstate->convert_select_flags[m])
 		{
-			int			attnum = lfirst_int(cur);
-			int			m = attnum - 1;
-			Form_pg_attribute att = TupleDescAttr(tupDesc, m);
-
-			if (fieldno >= fldct)
-				ereport(ERROR,
-						(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
-						 errmsg("missing data for column \"%s\"",
-								NameStr(att->attname))));
-			string = field_strings[fieldno++];
+			/* ignore input field, leaving column as NULL */
+			continue;
+		}
 
-			if (cstate->convert_select_flags &&
-				!cstate->convert_select_flags[m])
+		if (is_csv)
+		{
+			if (string == NULL &&
+				cstate->opts.force_notnull_flags[m])
 			{
-				/* ignore input field, leaving column as NULL */
-				continue;
+				/*
+				 * FORCE_NOT_NULL option is set and column is NULL - convert
+				 * it to the NULL string.
+				 */
+				string = cstate->opts.null_print;
 			}
-
-			if (cstate->opts.csv_mode)
+			else if (string != NULL && cstate->opts.force_null_flags[m]
+					 && strcmp(string, cstate->opts.null_print) == 0)
 			{
-				if (string == NULL &&
-					cstate->opts.force_notnull_flags[m])
-				{
-					/*
-					 * FORCE_NOT_NULL option is set and column is NULL -
-					 * convert it to the NULL string.
-					 */
-					string = cstate->opts.null_print;
-				}
-				else if (string != NULL && cstate->opts.force_null_flags[m]
-						 && strcmp(string, cstate->opts.null_print) == 0)
-				{
-					/*
-					 * FORCE_NULL option is set and column matches the NULL
-					 * string. It must have been quoted, or otherwise the
-					 * string would already have been set to NULL. Convert it
-					 * to NULL as specified.
-					 */
-					string = NULL;
-				}
+				/*
+				 * FORCE_NULL option is set and column matches the NULL
+				 * string. It must have been quoted, or otherwise the string
+				 * would already have been set to NULL. Convert it to NULL as
+				 * specified.
+				 */
+				string = NULL;
 			}
+		}
 
-			cstate->cur_attname = NameStr(att->attname);
-			cstate->cur_attval = string;
+		cstate->cur_attname = NameStr(att->attname);
+		cstate->cur_attval = string;
 
-			if (string != NULL)
-				nulls[m] = false;
+		if (string != NULL)
+			nulls[m] = false;
 
-			if (cstate->defaults[m])
-			{
-				/*
-				 * The caller must supply econtext and have switched into the
-				 * per-tuple memory context in it.
-				 */
-				Assert(econtext != NULL);
-				Assert(CurrentMemoryContext == econtext->ecxt_per_tuple_memory);
+		if (cstate->defaults[m])
+		{
+			/* We must have switched into the per-tuple memory context */
+			Assert(econtext != NULL);
+			Assert(CurrentMemoryContext == econtext->ecxt_per_tuple_memory);
 
-				values[m] = ExecEvalExpr(defexprs[m], econtext, &nulls[m]);
-			}
+			values[m] = ExecEvalExpr(defexprs[m], econtext, &nulls[m]);
+		}
 
-			/*
-			 * If ON_ERROR is specified with IGNORE, skip rows with soft
-			 * errors
-			 */
-			else if (!InputFunctionCallSafe(&in_functions[m],
-											string,
-											typioparams[m],
-											att->atttypmod,
-											(Node *) cstate->escontext,
-											&values[m]))
-			{
-				Assert(cstate->opts.on_error != COPY_ON_ERROR_STOP);
+		/*
+		 * If ON_ERROR is specified with IGNORE, skip rows with soft errors
+		 */
+		else if (!InputFunctionCallSafe(&in_functions[m],
+										string,
+										typioparams[m],
+										att->atttypmod,
+										(Node *) cstate->escontext,
+										&values[m]))
+		{
+			Assert(cstate->opts.on_error != COPY_ON_ERROR_STOP);
 
-				cstate->num_errors++;
+			cstate->num_errors++;
 
-				if (cstate->opts.log_verbosity == COPY_LOG_VERBOSITY_VERBOSE)
-				{
-					/*
-					 * Since we emit line number and column info in the below
-					 * notice message, we suppress error context information
-					 * other than the relation name.
-					 */
-					Assert(!cstate->relname_only);
-					cstate->relname_only = true;
+			if (cstate->opts.log_verbosity == COPY_LOG_VERBOSITY_VERBOSE)
+			{
+				/*
+				 * Since we emit line number and column info in the below
+				 * notice message, we suppress error context information other
+				 * than the relation name.
+				 */
+				Assert(!cstate->relname_only);
+				cstate->relname_only = true;
 
-					if (cstate->cur_attval)
-					{
-						char	   *attval;
-
-						attval = CopyLimitPrintoutLength(cstate->cur_attval);
-						ereport(NOTICE,
-								errmsg("skipping row due to data type incompatibility at line %llu for column \"%s\": \"%s\"",
-									   (unsigned long long) cstate->cur_lineno,
-									   cstate->cur_attname,
-									   attval));
-						pfree(attval);
-					}
-					else
-						ereport(NOTICE,
-								errmsg("skipping row due to data type incompatibility at line %llu for column \"%s\": null input",
-									   (unsigned long long) cstate->cur_lineno,
-									   cstate->cur_attname));
-
-					/* reset relname_only */
-					cstate->relname_only = false;
+				if (cstate->cur_attval)
+				{
+					char	   *attval;
+
+					attval = CopyLimitPrintoutLength(cstate->cur_attval);
+					ereport(NOTICE,
+							errmsg("skipping row due to data type incompatibility at line %llu for column \"%s\": \"%s\"",
+								   (unsigned long long) cstate->cur_lineno,
+								   cstate->cur_attname,
+								   attval));
+					pfree(attval);
 				}
+				else
+					ereport(NOTICE,
+							errmsg("skipping row due to data type incompatibility at line %llu for column \"%s\": null input",
+								   (unsigned long long) cstate->cur_lineno,
+								   cstate->cur_attname));
 
-				return true;
+				/* reset relname_only */
+				cstate->relname_only = false;
 			}
 
-			cstate->cur_attname = NULL;
-			cstate->cur_attval = NULL;
+			return true;
 		}
 
-		Assert(fieldno == attr_count);
+		cstate->cur_attname = NULL;
+		cstate->cur_attval = NULL;
 	}
-	else
-	{
-		/* binary */
-		int16		fld_count;
-		ListCell   *cur;
 
-		cstate->cur_lineno++;
+	Assert(fieldno == attr_count);
 
-		if (!CopyGetInt16(cstate, &fld_count))
-		{
-			/* EOF detected (end of file, or protocol-level EOF) */
-			return false;
-		}
+	return true;
+}
 
-		if (fld_count == -1)
-		{
-			/*
-			 * Received EOF marker.  Wait for the protocol-level EOF, and
-			 * complain if it doesn't come immediately.  In COPY FROM STDIN,
-			 * this ensures that we correctly handle CopyFail, if client
-			 * chooses to send that now.  When copying from file, we could
-			 * ignore the rest of the file like in text mode, but we choose to
-			 * be consistent with the COPY FROM STDIN case.
-			 */
-			char		dummy;
+/* Implementation of the per-row callback for binary format */
+bool
+CopyFromBinaryOneRow(CopyFromState cstate, ExprContext *econtext, Datum *values,
+					 bool *nulls)
+{
+	TupleDesc	tupDesc;
+	AttrNumber	attr_count;
+	FmgrInfo   *in_functions = cstate->in_functions;
+	Oid		   *typioparams = cstate->typioparams;
+	int16		fld_count;
+	ListCell   *cur;
 
-			if (CopyReadBinaryData(cstate, &dummy, 1) > 0)
-				ereport(ERROR,
-						(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
-						 errmsg("received copy data after EOF marker")));
-			return false;
-		}
+	tupDesc = RelationGetDescr(cstate->rel);
+	attr_count = list_length(cstate->attnumlist);
 
-		if (fld_count != attr_count)
-			ereport(ERROR,
-					(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
-					 errmsg("row field count is %d, expected %d",
-							(int) fld_count, attr_count)));
+	cstate->cur_lineno++;
 
-		foreach(cur, cstate->attnumlist)
-		{
-			int			attnum = lfirst_int(cur);
-			int			m = attnum - 1;
-			Form_pg_attribute att = TupleDescAttr(tupDesc, m);
-
-			cstate->cur_attname = NameStr(att->attname);
-			values[m] = CopyReadBinaryAttribute(cstate,
-												&in_functions[m],
-												typioparams[m],
-												att->atttypmod,
-												&nulls[m]);
-			cstate->cur_attname = NULL;
-		}
+	if (!CopyGetInt16(cstate, &fld_count))
+	{
+		/* EOF detected (end of file, or protocol-level EOF) */
+		return false;
 	}
 
-	/*
-	 * Now compute and insert any defaults available for the columns not
-	 * provided by the input data.  Anything not processed here or above will
-	 * remain NULL.
-	 */
-	for (i = 0; i < num_defaults; i++)
+	if (fld_count == -1)
 	{
 		/*
-		 * The caller must supply econtext and have switched into the
-		 * per-tuple memory context in it.
+		 * Received EOF marker.  Wait for the protocol-level EOF, and complain
+		 * if it doesn't come immediately.  In COPY FROM STDIN, this ensures
+		 * that we correctly handle CopyFail, if client chooses to send that
+		 * now.  When copying from file, we could ignore the rest of the file
+		 * like in text mode, but we choose to be consistent with the COPY
+		 * FROM STDIN case.
 		 */
-		Assert(econtext != NULL);
-		Assert(CurrentMemoryContext == econtext->ecxt_per_tuple_memory);
+		char		dummy;
 
-		values[defmap[i]] = ExecEvalExpr(defexprs[defmap[i]], econtext,
-										 &nulls[defmap[i]]);
+		if (CopyReadBinaryData(cstate, &dummy, 1) > 0)
+			ereport(ERROR,
+					(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
+					 errmsg("received copy data after EOF marker")));
+		return false;
+	}
+
+	if (fld_count != attr_count)
+		ereport(ERROR,
+				(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
+				 errmsg("row field count is %d, expected %d",
+						(int) fld_count, attr_count)));
+
+	foreach(cur, cstate->attnumlist)
+	{
+		int			attnum = lfirst_int(cur);
+		int			m = attnum - 1;
+		Form_pg_attribute att = TupleDescAttr(tupDesc, m);
+
+		cstate->cur_attname = NameStr(att->attname);
+		values[m] = CopyReadBinaryAttribute(cstate,
+											&in_functions[m],
+											typioparams[m],
+											att->atttypmod,
+											&nulls[m]);
+		cstate->cur_attname = NULL;
 	}
 
 	return true;
@@ -1087,7 +1132,7 @@ NextCopyFrom(CopyFromState cstate, ExprContext *econtext,
  * in the final value of line_buf.
  */
 static bool
-CopyReadLine(CopyFromState cstate)
+CopyReadLine(CopyFromState cstate, bool is_csv)
 {
 	bool		result;
 
@@ -1095,7 +1140,7 @@ CopyReadLine(CopyFromState cstate)
 	cstate->line_buf_valid = false;
 
 	/* Parse data and transfer into line_buf */
-	result = CopyReadLineText(cstate);
+	result = CopyReadLineText(cstate, is_csv);
 
 	if (result)
 	{
@@ -1163,7 +1208,7 @@ CopyReadLine(CopyFromState cstate)
  * CopyReadLineText - inner loop of CopyReadLine for text mode
  */
 static bool
-CopyReadLineText(CopyFromState cstate)
+CopyReadLineText(CopyFromState cstate, bool is_csv)
 {
 	char	   *copy_input_buf;
 	int			input_buf_ptr;
@@ -1178,7 +1223,7 @@ CopyReadLineText(CopyFromState cstate)
 	char		quotec = '\0';
 	char		escapec = '\0';
 
-	if (cstate->opts.csv_mode)
+	if (is_csv)
 	{
 		quotec = cstate->opts.quote[0];
 		escapec = cstate->opts.escape[0];
@@ -1255,7 +1300,7 @@ CopyReadLineText(CopyFromState cstate)
 		prev_raw_ptr = input_buf_ptr;
 		c = copy_input_buf[input_buf_ptr++];
 
-		if (cstate->opts.csv_mode)
+		if (is_csv)
 		{
 			/*
 			 * If character is '\r', we may need to look ahead below.  Force
@@ -1294,7 +1339,7 @@ CopyReadLineText(CopyFromState cstate)
 		}
 
 		/* Process \r */
-		if (c == '\r' && (!cstate->opts.csv_mode || !in_quote))
+		if (c == '\r' && (!is_csv || !in_quote))
 		{
 			/* Check for \r\n on first line, _and_ handle \r\n. */
 			if (cstate->eol_type == EOL_UNKNOWN ||
@@ -1322,10 +1367,10 @@ CopyReadLineText(CopyFromState cstate)
 					if (cstate->eol_type == EOL_CRNL)
 						ereport(ERROR,
 								(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
-								 !cstate->opts.csv_mode ?
+								 !is_csv ?
 								 errmsg("literal carriage return found in data") :
 								 errmsg("unquoted carriage return found in data"),
-								 !cstate->opts.csv_mode ?
+								 !is_csv ?
 								 errhint("Use \"\\r\" to represent carriage return.") :
 								 errhint("Use quoted CSV field to represent carriage return.")));
 
@@ -1339,10 +1384,10 @@ CopyReadLineText(CopyFromState cstate)
 			else if (cstate->eol_type == EOL_NL)
 				ereport(ERROR,
 						(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
-						 !cstate->opts.csv_mode ?
+						 !is_csv ?
 						 errmsg("literal carriage return found in data") :
 						 errmsg("unquoted carriage return found in data"),
-						 !cstate->opts.csv_mode ?
+						 !is_csv ?
 						 errhint("Use \"\\r\" to represent carriage return.") :
 						 errhint("Use quoted CSV field to represent carriage return.")));
 			/* If reach here, we have found the line terminator */
@@ -1350,15 +1395,15 @@ CopyReadLineText(CopyFromState cstate)
 		}
 
 		/* Process \n */
-		if (c == '\n' && (!cstate->opts.csv_mode || !in_quote))
+		if (c == '\n' && (!is_csv || !in_quote))
 		{
 			if (cstate->eol_type == EOL_CR || cstate->eol_type == EOL_CRNL)
 				ereport(ERROR,
 						(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
-						 !cstate->opts.csv_mode ?
+						 !is_csv ?
 						 errmsg("literal newline found in data") :
 						 errmsg("unquoted newline found in data"),
-						 !cstate->opts.csv_mode ?
+						 !is_csv ?
 						 errhint("Use \"\\n\" to represent newline.") :
 						 errhint("Use quoted CSV field to represent newline.")));
 			cstate->eol_type = EOL_NL;	/* in case not set yet */
@@ -1370,7 +1415,7 @@ CopyReadLineText(CopyFromState cstate)
 		 * Process backslash, except in CSV mode where backslash is a normal
 		 * character.
 		 */
-		if (c == '\\' && !cstate->opts.csv_mode)
+		if (c == '\\' && !is_csv)
 		{
 			char		c2;
 
diff --git a/src/include/commands/copy.h b/src/include/commands/copy.h
index 06dfdfef721..7bc044e2816 100644
--- a/src/include/commands/copy.h
+++ b/src/include/commands/copy.h
@@ -107,8 +107,6 @@ extern CopyFromState BeginCopyFrom(ParseState *pstate, Relation rel, Node *where
 extern void EndCopyFrom(CopyFromState cstate);
 extern bool NextCopyFrom(CopyFromState cstate, ExprContext *econtext,
 						 Datum *values, bool *nulls);
-extern bool NextCopyFromRawFields(CopyFromState cstate,
-								  char ***fields, int *nfields);
 extern void CopyFromErrorCallback(void *arg);
 extern char *CopyLimitPrintoutLength(const char *str);
 
diff --git a/src/include/commands/copyapi.h b/src/include/commands/copyapi.h
index eb09271d94b..fc114079d5a 100644
--- a/src/include/commands/copyapi.h
+++ b/src/include/commands/copyapi.h
@@ -1,7 +1,7 @@
 /*-------------------------------------------------------------------------
  *
  * copyapi.h
- *	  API for COPY TO handlers
+ *	  API for COPY TO/FROM handlers
  *
  *
  * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
@@ -52,4 +52,50 @@ typedef struct CopyToRoutine
 	void		(*CopyToEnd) (CopyToState cstate);
 } CopyToRoutine;
 
+/*
+ * API structure for a COPY FROM format implementation. Note this must be
+ * allocated in a server-lifetime manner, typically as a static const struct.
+ */
+typedef struct CopyFromRoutine
+{
+	/*
+	 * Set input function information. This callback is called once at the
+	 * beginning of COPY FROM.
+	 *
+	 * 'finfo' can be optionally filled to provide the catalog information of
+	 * the input function.
+	 *
+	 * 'typioparam' can be optionally filled to define the OID of the type to
+	 * pass to the input function.'atttypid' is the OID of data type used by
+	 * the relation's attribute.
+	 */
+	void		(*CopyFromInFunc) (CopyFromState cstate, Oid atttypid,
+								   FmgrInfo *finfo, Oid *typioparam);
+
+	/*
+	 * Start a COPY FROM. This callback is called once at the beginning of
+	 * COPY FROM.
+	 *
+	 * 'tupDesc' is the tuple descriptor of the relation where the data needs
+	 * to be copied. This can be used for any initialization steps required by
+	 * a format.
+	 */
+	void		(*CopyFromStart) (CopyFromState cstate, TupleDesc tupDesc);
+
+	/*
+	 * Read one row from the source and fill *values and *nulls.
+	 *
+	 * 'econtext' is used to evaluate default expression for each column that
+	 * is either not read from the file or is using the DEFAULT option of COPY
+	 * FROM. It is NULL if no default values are used.
+	 *
+	 * Returns false if there are no more tuples to read.
+	 */
+	bool		(*CopyFromOneRow) (CopyFromState cstate, ExprContext *econtext,
+								   Datum *values, bool *nulls);
+
+	/* End a COPY FROM. This callback is called once at the end of COPY FROM */
+	void		(*CopyFromEnd) (CopyFromState cstate);
+} CopyFromRoutine;
+
 #endif							/* COPYAPI_H */
diff --git a/src/include/commands/copyfrom_internal.h b/src/include/commands/copyfrom_internal.h
index 1d8ac8f62e6..c8b22af22d8 100644
--- a/src/include/commands/copyfrom_internal.h
+++ b/src/include/commands/copyfrom_internal.h
@@ -58,6 +58,9 @@ typedef enum CopyInsertMethod
  */
 typedef struct CopyFromStateData
 {
+	/* format routine */
+	const struct CopyFromRoutine *routine;
+
 	/* low-level state data */
 	CopySource	copy_src;		/* type of copy source */
 	FILE	   *copy_file;		/* used if copy_src == COPY_FILE */
@@ -183,4 +186,12 @@ typedef struct CopyFromStateData
 extern void ReceiveCopyBegin(CopyFromState cstate);
 extern void ReceiveCopyBinaryHeader(CopyFromState cstate);
 
+/* One-row callbacks for built-in formats defined in copyfromparse.c */
+extern bool CopyFromTextOneRow(CopyFromState cstate, ExprContext *econtext,
+							   Datum *values, bool *nulls);
+extern bool CopyFromCSVOneRow(CopyFromState cstate, ExprContext *econtext,
+							  Datum *values, bool *nulls);
+extern bool CopyFromBinaryOneRow(CopyFromState cstate, ExprContext *econtext,
+								 Datum *values, bool *nulls);
+
 #endif							/* COPYFROM_INTERNAL_H */
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index b0c97b0fb85..eca93ab9553 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -501,6 +501,7 @@ ConvertRowtypeExpr
 CookedConstraint
 CopyDest
 CopyFormatOptions
+CopyFromRoutine
 CopyFromState
 CopyFromStateData
 CopyHeaderChoice
-- 
2.43.5

