On Wed, Dec 11, 2024 at 7:41 PM Nishant Sharma
<nishant.sha...@enterprisedb.com> wrote:
>
> Thanks for the v3 patch!
>
> Please find review comments on v3:-
>
> 1) I think no need to change the below if condition, we can keep
> it the way it was before i.e with
> "cstate->opts.on_error != COPY_ON_ERROR_STOP" and we
> add a new error ereport the way v3 has. Because for
> cstate->opts.on_error as COPY_ON_ERROR_STOP cases we
> can avoid two if conditions inside upper if.
>
> +    if (cstate->num_errors > 0 &&
>          cstate->opts.log_verbosity >= COPY_LOG_VERBOSITY_DEFAULT)

> 2) No need for the below "if" check for maxattnum. We can simply
> increment it with "++maxattnum" and later check if we have exactly
> 10 attributes for the error table. Because even if we drop any
> attribute and maxattnum is 10 in pg_attribute for that rel, we should
> still error out. Maybe we can rename it to "totalatts"?
>
> +                       if (maxattnum <= attForm->attnum)
> +                               maxattnum = attForm->attnum;
>
> 3) #define would be better, also as mentioned by Kirill switch
> condition with proper #define would be better.
>
> +               if (maxattnum != 10)
> +                       on_error_tbl_ok = false;
>
> 4)

hi. Thanks for the review.
The attached v4 patch addressed these two issues.

> > that would be more work.
> > so i stick to if there is a table can use to
> > error saving then use it, otherwise error out.
> >
> YES. but that would lead to a better design with an error table.
> Also, I think Krill mentions the same. That is to auto create, if it
> does not exist.
>
I decided not to auto-create the table.
main reason not to do it:
1. utility COPY command with another SPI utility CREATE TABLE command may work.
but there is no precedent.

2. if we auto-create the on_error table with BeginCopyFrom.
then later we have to use get_relname_relid to get the newly created table Oid,
I think it somehow counts as repeating name lookups, see relevant
linke [1], [2].

[1] https://postgr.es/m/20240808171351.a9.nmi...@google.com
[2] 
https://postgr.es/m/CA+TgmobHYix=nn8d4ruha6fhuvpr88kgamq1pbfngfofejr...@mail.gmail.com
From 4e38c800a7481c1a4fad1b6beb3c11dd24235001 Mon Sep 17 00:00:00 2001
From: jian he <jian.universal...@gmail.com>
Date: Fri, 13 Dec 2024 16:23:17 +0800
Subject: [PATCH v4 1/1] introduce on_error table option for COPY FROM.

the syntax is {on_error table, table error_saving_tbl}.
we first check table error_saving_tbl's existence and data definition.
if it does not meet our criteria, then we quickly error out.

we also did preliminary check the lock of error saving table so the insert to
error saving table won't stuck.

once there is a error happened, we save the error metedata and insert it to the
error_saving_table. and continue to the next row.  That means for one row, we
can only catch the first field that have errors.

discussion: https://postgr.es/m/CACJufxH_OJpVra%3D0c4ow8fbxHj7heMcVaTNEPa5vAurSeNA-6Q%40mail.gmail.com
context:    https://www.postgresql.org/message-id/752672.1699474336%40sss.pgh.pa.us
commitfest: https://commitfest.postgresql.org/51/4817/
---
 doc/src/sgml/ref/copy.sgml               | 118 +++++++++++++-
 src/backend/commands/copy.c              |  31 ++++
 src/backend/commands/copyfrom.c          | 193 ++++++++++++++++++++++-
 src/backend/commands/copyfromparse.c     |  43 +++++
 src/include/commands/copy.h              |   8 +
 src/include/commands/copyfrom_internal.h |   1 +
 src/test/regress/expected/copy2.out      | 107 +++++++++++++
 src/test/regress/sql/copy2.sql           |  89 +++++++++++
 8 files changed, 583 insertions(+), 7 deletions(-)

diff --git a/doc/src/sgml/ref/copy.sgml b/doc/src/sgml/ref/copy.sgml
index 8394402f09..e66474c74f 100644
--- a/doc/src/sgml/ref/copy.sgml
+++ b/doc/src/sgml/ref/copy.sgml
@@ -44,6 +44,7 @@ COPY { <replaceable class="parameter">table_name</replaceable> [ ( <replaceable
     FORCE_NOT_NULL { ( <replaceable class="parameter">column_name</replaceable> [, ...] ) | * }
     FORCE_NULL { ( <replaceable class="parameter">column_name</replaceable> [, ...] ) | * }
     ON_ERROR <replaceable class="parameter">error_action</replaceable>
+    TABLE '<replaceable class="parameter">error_saving_table</replaceable>'
     REJECT_LIMIT <replaceable class="parameter">maxerror</replaceable>
     ENCODING '<replaceable class="parameter">encoding_name</replaceable>'
     LOG_VERBOSITY <replaceable class="parameter">verbosity</replaceable>
@@ -395,11 +396,13 @@ COPY { <replaceable class="parameter">table_name</replaceable> [ ( <replaceable
       input value into its data type.
       An <replaceable class="parameter">error_action</replaceable> value of
       <literal>stop</literal> means fail the command, while
-      <literal>ignore</literal> means discard the input row and continue with the next one.
+      <literal>ignore</literal> means discard the input row and continue with the next one,
+      <literal>table</literal> means <command>COPY</command> insert error related information to <replaceable class="parameter">error_saving_table</replaceable>
+      and continue with the next one.
       The default is <literal>stop</literal>.
      </para>
      <para>
-      The <literal>ignore</literal> option is applicable only for <command>COPY FROM</command>
+      The <literal>ignore</literal> and <literal>table</literal> option is applicable only for <command>COPY FROM</command>
       when the <literal>FORMAT</literal> is <literal>text</literal> or <literal>csv</literal>.
      </para>
      <para>
@@ -463,6 +466,117 @@ COPY { <replaceable class="parameter">table_name</replaceable> [ ( <replaceable
     </listitem>
    </varlistentry>
 
+   <varlistentry>
+    <term><literal>TABLE</literal></term>
+    <listitem>
+      <para>
+        Save error context details to the table <replaceable class="parameter">error_saving_table</replaceable>
+        when <literal>ON_ERROR</literal> is set to <literal>TABLE</literal>.
+      </para>
+
+      <para>
+        The <replaceable class="parameter">error_saving_table</replaceable> must exist in the current database
+        and share the same schema as the destination table of the <command>COPY FROM</command> operation.
+        This option is allowed only in <command>COPY FROM</command> and
+        <literal>ON_ERROR</literal> is specified with <literal>TABLE</literal>.
+
+        The user performing the <command>COPY FROM</command>
+        operation must have <literal>INSERT</literal> privileges for all columns
+        in the <replaceable class="parameter">error_saving_table</replaceable>.
+        If this option is not specified, the <literal>ON_ERROR</literal> parameter cannot be set to <literal>TABLE</literal>.
+      </para>
+
+   <para>
+    If table <replaceable class="parameter">error_saving_table</replaceable> does meet the following definition
+    (column ordinal position should be the same as the below table), an error will be raised.
+
+<informaltable>
+    <tgroup cols="3">
+     <thead>
+      <row>
+       <entry>Column name</entry>
+       <entry>Data type</entry>
+       <entry>Description</entry>
+      </row>
+     </thead>
+
+      <tbody>
+       <row>
+       <entry> <literal>userid</literal> </entry>
+       <entry><type>oid</type></entry>
+       <entry>The user generated the error.
+       Reference <link linkend="catalog-pg-authid"><structname>pg_authid</structname></link>.<structfield>oid</structfield>,
+       however there is no hard dependency with catalog <literal>pg_authid</literal>.
+       If the corresponding row on <literal>pg_authid</literal> is deleted, this value becomes stale.
+    </entry>
+       </row>
+
+       <row>
+       <entry> <literal>copy_tbl</literal> </entry>
+       <entry><type>oid</type></entry>
+       <entry>The <command>COPY FROM</command> operation destination table oid.
+        Reference <link linkend="catalog-pg-class"><structname>pg_class</structname></link>.<structfield>oid</structfield>,
+        however there is no hard dependency with catalog <literal>pg_class</literal>.
+        If the corresponding row on <literal>pg_class</literal> is deleted, this value becomes stale.
+        </entry>
+       </row>
+
+       <row>
+       <entry> <literal>filename</literal> </entry>
+       <entry><type>text</type></entry>
+       <entry>The path name of the <command>COPY FROM</command> input</entry>
+       </row>
+
+       <row>
+       <entry> <literal>lineno</literal> </entry>
+       <entry><type>bigint</type></entry>
+       <entry>Line number where the error occurred, counting from 1</entry>
+       </row>
+
+       <row>
+       <entry> <literal>line</literal> </entry>
+       <entry><type>text</type></entry>
+       <entry>Raw content of the error occurred line</entry>
+       </row>
+
+       <row>
+       <entry> <literal>colname</literal> </entry>
+       <entry><type>text</type></entry>
+       <entry>Field where the error occurred</entry>
+       </row>
+
+       <row>
+       <entry> <literal>raw_field_value</literal> </entry>
+       <entry><type>text</type></entry>
+       <entry>Raw content of the error occurred field</entry>
+       </row>
+
+       <row>
+       <entry> <literal>err_message </literal> </entry>
+       <entry><type>text</type></entry>
+       <entry>The error message</entry>
+       </row>
+
+       <row>
+       <entry> <literal>err_detail</literal> </entry>
+       <entry><type>text</type></entry>
+       <entry>Detailed error message </entry>
+       </row>
+
+       <row>
+       <entry> <literal>errorcode </literal> </entry>
+       <entry><type>text</type></entry>
+       <entry>The error code </entry>
+       </row>
+
+      </tbody>
+     </tgroup>
+   </informaltable>
+
+     </para>
+    </listitem>
+   </varlistentry>
+
    <varlistentry>
     <term><literal>WHERE</literal></term>
     <listitem>
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index 2d98ecf3f4..a63989223c 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -410,6 +410,8 @@ defGetCopyOnErrorChoice(DefElem *def, ParseState *pstate, bool is_from)
 	if (pg_strcasecmp(sval, "ignore") == 0)
 		return COPY_ON_ERROR_IGNORE;
 
+	if (pg_strcasecmp(sval, "table") == 0)
+		return COPY_ON_ERROR_TABLE;
 	ereport(ERROR,
 			(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
 	/*- translator: first %s is the name of a COPY option, e.g. ON_ERROR */
@@ -502,6 +504,7 @@ ProcessCopyOptions(ParseState *pstate,
 	bool		freeze_specified = false;
 	bool		header_specified = false;
 	bool		on_error_specified = false;
+	bool		on_error_tbl_specified = false;
 	bool		log_verbosity_specified = false;
 	bool		reject_limit_specified = false;
 	ListCell   *option;
@@ -677,6 +680,14 @@ ProcessCopyOptions(ParseState *pstate,
 			reject_limit_specified = true;
 			opts_out->reject_limit = defGetCopyRejectLimitOption(defel);
 		}
+		else if (strcmp(defel->defname, "table") == 0)
+		{
+			if (on_error_tbl_specified)
+				errorConflictingDefElem(defel, pstate);
+			on_error_tbl_specified = true;
+
+			opts_out->on_error_tbl = defGetString(defel);
+		}
 		else
 			ereport(ERROR,
 					(errcode(ERRCODE_SYNTAX_ERROR),
@@ -685,6 +696,26 @@ ProcessCopyOptions(ParseState *pstate,
 					 parser_errposition(pstate, defel->location)));
 	}
 
+	if (opts_out->on_error == COPY_ON_ERROR_TABLE && opts_out->on_error_tbl == NULL)
+		ereport(ERROR,
+				 errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+				 errmsg("COPY %s \"table\" requires a custom specified error saving table", "ON_ERROR"),
+				 errhint("You need also specify \"TABLE\" option."));
+
+	if (opts_out->on_error == COPY_ON_ERROR_TABLE && opts_out->reject_limit)
+		ereport(ERROR,
+				 errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+				 errmsg("cannot specify %s option when %s option is specified as \"table\"", "REJECT_LIMIT", "ON_ERROR"));
+
+	if (opts_out->on_error == COPY_ON_ERROR_TABLE && opts_out->log_verbosity == COPY_LOG_VERBOSITY_VERBOSE)
+		ereport(ERROR,
+				 errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+				 errmsg("cannot specify %s as \"verbose\" when %s option is specified as \"table\"", "log_verbosity", "ON_ERROR"));
+
+	if (opts_out->on_error != COPY_ON_ERROR_TABLE && opts_out->on_error_tbl != NULL)
+		ereport(ERROR,
+				 errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+				 errmsg("COPY \"TABLE\" option can only be used when %s option is specified as \"table\"", "ON_ERROR"));
 	/*
 	 * Check for incompatible options (must do these three before inserting
 	 * defaults)
diff --git a/src/backend/commands/copyfrom.c b/src/backend/commands/copyfrom.c
index 4d52c93c30..423944860c 100644
--- a/src/backend/commands/copyfrom.c
+++ b/src/backend/commands/copyfrom.c
@@ -28,6 +28,7 @@
 #include "access/tableam.h"
 #include "access/xact.h"
 #include "catalog/namespace.h"
+#include "catalog/pg_type.h"
 #include "commands/copy.h"
 #include "commands/copyfrom_internal.h"
 #include "commands/progress.h"
@@ -35,6 +36,7 @@
 #include "executor/execPartition.h"
 #include "executor/executor.h"
 #include "executor/nodeModifyTable.h"
+#include "executor/spi.h"
 #include "executor/tuptable.h"
 #include "foreign/fdwapi.h"
 #include "mb/pg_wchar.h"
@@ -44,7 +46,10 @@
 #include "pgstat.h"
 #include "rewrite/rewriteHandler.h"
 #include "storage/fd.h"
+#include "storage/lmgr.h"
 #include "tcop/tcopprot.h"
+#include "utils/builtins.h"
+#include "utils/fmgroids.h"
 #include "utils/lsyscache.h"
 #include "utils/memutils.h"
 #include "utils/portal.h"
@@ -1029,6 +1034,16 @@ CopyFrom(CopyFromState cstate)
 			continue;
 		}
 
+		if (cstate->opts.on_error == COPY_ON_ERROR_TABLE &&
+			cstate->escontext->error_occurred)
+		{
+			cstate->escontext->error_occurred = false;
+			cstate->escontext->details_wanted = true;
+			memset(cstate->escontext->error_data, 0, sizeof(ErrorData));
+			/* Repeat NextCopyFrom() until no soft error occurs */
+			continue;
+		}
+
 		ExecStoreVirtualTuple(myslot);
 
 		/*
@@ -1324,11 +1339,29 @@ CopyFrom(CopyFromState cstate)
 	if (cstate->opts.on_error != COPY_ON_ERROR_STOP &&
 		cstate->num_errors > 0 &&
 		cstate->opts.log_verbosity >= COPY_LOG_VERBOSITY_DEFAULT)
-		ereport(NOTICE,
-				errmsg_plural("%llu row was skipped due to data type incompatibility",
-							  "%llu rows were skipped due to data type incompatibility",
-							  (unsigned long long) cstate->num_errors,
-							  (unsigned long long) cstate->num_errors));
+	{
+		if(cstate->opts.on_error == COPY_ON_ERROR_IGNORE)
+			ereport(NOTICE,
+					errmsg_plural("%llu row was skipped due to data type incompatibility",
+								  "%llu rows were skipped due to data type incompatibility",
+								  (unsigned long long) cstate->num_errors,
+								  (unsigned long long) cstate->num_errors));
+		else
+			ereport(NOTICE,
+					errmsg_plural("%llu row was saved to table \"%s\" due to data type incompatibility",
+								  "%llu rows were saved to table \"%s\" due to data type incompatibility",
+								  (unsigned long long) cstate->num_errors,
+								  (unsigned long long) cstate->num_errors,
+								  RelationGetRelationName(cstate->error_saving_rel)));
+	}
+
+	/*
+	 * similar to
+	 * (https://postgr.es/m/7bcfc39d4176faf85ab317d0c26786953646a411.ca...@cybertec.at)
+	 * in COPY FROM keep error saving table locks until the transaction end.
+	*/
+	if (cstate->error_saving_rel != NULL)
+		table_close(cstate->error_saving_rel, NoLock);
 
 	if (bistate != NULL)
 		FreeBulkInsertState(bistate);
@@ -1483,6 +1516,156 @@ BeginCopyFrom(ParseState *pstate,
 	else
 		cstate->escontext = NULL;
 
+	if (cstate->opts.on_error == COPY_ON_ERROR_TABLE)
+	{
+		const char *copy_nspname;
+		Datum		ins_prev;
+		Oid			err_tbl_oid;
+		Oid			copy_nspoid;
+		Relation	arel;
+		ScanKeyData akey;
+		SysScanDesc ascan;
+		HeapTuple	atup;
+		Form_pg_attribute attForm;
+		int			attcnt = 1;
+
+		Assert(cstate->escontext != NULL);
+		Assert(cstate->opts.on_error_tbl != NULL);
+
+		copy_nspname = get_namespace_name(RelationGetNamespace(cstate->rel));
+		copy_nspoid = get_namespace_oid(copy_nspname, false);
+		err_tbl_oid = get_relname_relid(cstate->opts.on_error_tbl, copy_nspoid);
+
+		if (!OidIsValid(err_tbl_oid))
+			ereport(ERROR,
+					 errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+					 errmsg("error saving table \"%s\".\"%s\" does not exist",
+							 copy_nspname, cstate->opts.on_error_tbl));
+
+		/* error saving table must be a normal realtion kind */
+		if (get_rel_relkind(err_tbl_oid) != RELKIND_RELATION)
+			ereport(ERROR,
+					errcode(ERRCODE_WRONG_OBJECT_TYPE),
+					errmsg("COPY %s cannot use relation \"%s\" for error saving",
+							"ON_ERROR", cstate->opts.on_error_tbl),
+					errdetail_relkind_not_supported(get_rel_relkind(err_tbl_oid)));
+
+		/* current user should have INSERT privilege on error_saving table */
+		ins_prev = DirectFunctionCall3(has_table_privilege_id_id,
+										ObjectIdGetDatum(GetUserId()),
+										ObjectIdGetDatum(err_tbl_oid),
+										CStringGetTextDatum("INSERT"));
+		if (!DatumGetBool(ins_prev))
+			ereport(ERROR,
+						errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+						errmsg("permission denied to set table \"%s\".\"%s\" for COPY FROM error saving",
+								copy_nspname, cstate->opts.on_error_tbl),
+						errhint("Ensure current user have enough privilege on \"%s\".\"%s\" for COPY FROM error saving",
+								copy_nspname, cstate->opts.on_error_tbl));
+
+		/*
+		 * we may insert tuples to error-saving table, to do that we need first
+		 * check it's lock situation. If it is already under heavy lock, then
+		 * our COPY operation would stuck. Instead of let COPY stuck, just
+		 * report ERROR that the error-saving table is under heavy lock.
+		*/
+		if (!ConditionalLockRelationOid(err_tbl_oid, RowExclusiveLock))
+			ereport(ERROR,
+					errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+					errmsg("table \"%s\".\"%s\" was locked, cannot be used for error saving",
+							copy_nspname, cstate->opts.on_error_tbl));
+		cstate->error_saving_rel = table_open(err_tbl_oid, NoLock);
+
+		/*
+		 * can this error saving table (cstate->error_saving_rel) be used for
+		 * error saving? for that we need to check the table's (column
+		 * name, column data types, number of column)
+		 *
+		*/
+		arel = table_open(AttributeRelationId, AccessShareLock);
+		ScanKeyInit(&akey,
+					Anum_pg_attribute_attrelid,
+					BTEqualStrategyNumber, F_OIDEQ,
+					ObjectIdGetDatum(err_tbl_oid));
+
+		ascan = systable_beginscan(arel, AttributeRelidNumIndexId, true,
+								SnapshotSelf, 1, &akey);
+		while (HeapTupleIsValid(atup = systable_getnext(ascan)))
+		{
+			attForm = (Form_pg_attribute) GETSTRUCT(atup);
+			if (attForm->attnum < 1 || attForm->attisdropped)
+				continue;
+
+			switch (attForm->attnum)
+			{
+				case 1:
+					if(attForm->atttypid == OIDOID &&
+					strcmp(NameStr(attForm->attname), "userid") == 0)
+						attcnt++;
+					break;
+				case 2:
+					if (attForm->atttypid == OIDOID &&
+						strcmp(NameStr(attForm->attname), "copy_tbl") == 0)
+						attcnt++;
+					break;
+				case 3:
+					if (attForm->atttypid == TEXTOID &&
+						strcmp(NameStr(attForm->attname), "filename") == 0)
+						attcnt++;
+					break;
+				case 4:
+					if (attForm->atttypid != INT8OID &&
+						strcmp(NameStr(attForm->attname), "lineno") == 0)
+						attcnt++;
+					break;
+				case 5:
+					if (attForm->atttypid == TEXTOID &&
+						strcmp(NameStr(attForm->attname), "line") == 0)
+						attcnt++;
+					break;
+				case 6:
+					if (attForm->atttypid == TEXTOID &&
+						strcmp(NameStr(attForm->attname), "colname") == 0)
+						attcnt++;
+					break;
+				case 7:
+					if (attForm->atttypid == TEXTOID &&
+						strcmp(NameStr(attForm->attname), "raw_field_value") == 0)
+						attcnt++;
+					break;
+				case 8:
+					if (attForm->atttypid == TEXTOID &&
+						strcmp(NameStr(attForm->attname), "err_message") == 0)
+						attcnt++;
+					break;
+				case 9:
+					if (attForm->atttypid == TEXTOID &&
+						strcmp(NameStr(attForm->attname), "err_detail") == 0)
+						attcnt++;
+					break;
+				case 10:
+					if (attForm->atttypid == TEXTOID &&
+						strcmp(NameStr(attForm->attname), "errorcode") == 0)
+						attcnt++;
+					break;
+				default:
+					attcnt++;
+					break;
+			}
+		}
+		systable_endscan(ascan);
+		table_close(arel, AccessShareLock);
+
+		if (attcnt != ERROR_TBL_COLUMNS)
+			ereport(ERROR,
+					errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+					errmsg("table \"%s\".\"%s\" cannot be used for COPY FROM error saving",
+							copy_nspname, cstate->opts.on_error_tbl),
+					errdetail("Table \"%s\".\"%s\" data definition cannot be used for error saving",
+							copy_nspname, cstate->opts.on_error_tbl));
+
+		cstate->escontext->details_wanted = true;
+	}
 	/* Convert FORCE_NULL name list to per-column flags, check validity */
 	cstate->opts.force_null_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
 	if (cstate->opts.force_null_all)
diff --git a/src/backend/commands/copyfromparse.c b/src/backend/commands/copyfromparse.c
index d1d43b53d8..e1606db94c 100644
--- a/src/backend/commands/copyfromparse.c
+++ b/src/backend/commands/copyfromparse.c
@@ -66,6 +66,7 @@
 #include "commands/copyfrom_internal.h"
 #include "commands/progress.h"
 #include "executor/executor.h"
+#include "access/heapam.h"
 #include "libpq/libpq.h"
 #include "libpq/pqformat.h"
 #include "mb/pg_wchar.h"
@@ -959,7 +960,49 @@ NextCopyFrom(CopyFromState cstate, ExprContext *econtext,
 											&values[m]))
 			{
 				Assert(cstate->opts.on_error != COPY_ON_ERROR_STOP);
+				Assert(cstate->escontext->error_occurred);
 
+				if (cstate->opts.on_error == COPY_ON_ERROR_TABLE)
+				{
+					/*
+					 * we mostly use ErrorSaveContext's info to form a tuple and
+					 * insert it to the error saving table. we already acquired
+					 * lock on error_saving_rel in BeginCopyFrom.
+					*/
+					HeapTuple	on_error_tup;
+					TupleDesc	on_error_tupDesc;
+					char		*err_detail;
+					char		*err_code;
+					Datum	t_values[ERROR_TBL_COLUMNS] = {0};
+					bool	t_isnull[ERROR_TBL_COLUMNS] = {0};
+					int		j = 0;
+
+					Assert(cstate->rel != NULL);
+					t_values[j++] = ObjectIdGetDatum(GetUserId());
+					t_values[j++] = ObjectIdGetDatum(cstate->rel->rd_rel->oid);
+					t_values[j++] = CStringGetTextDatum(cstate->filename ? cstate->filename : "STDIN");
+					t_values[j++] = Int64GetDatum((long long) cstate->cur_lineno);
+					t_values[j++] = CStringGetTextDatum(cstate->line_buf.data);
+					t_values[j++] = CStringGetTextDatum(cstate->cur_attname);
+					t_values[j++] = CStringGetTextDatum(string);
+					t_values[j++] = CStringGetTextDatum(cstate->escontext->error_data->message);
+
+					if (!cstate->escontext->error_data->detail)
+						err_detail = NULL;
+					else
+						err_detail = cstate->escontext->error_data->detail;
+					t_values[j]   = err_detail ? CStringGetTextDatum(err_detail) : (Datum) 0;
+					t_isnull[j++] = err_detail ? false : true;
+
+					err_code = unpack_sql_state(cstate->escontext->error_data->sqlerrcode);
+					t_values[j++] = CStringGetTextDatum(err_code);
+
+					Assert(j == ERROR_TBL_COLUMNS);
+
+					on_error_tupDesc =  RelationGetDescr(cstate->error_saving_rel);
+					on_error_tup = heap_form_tuple(on_error_tupDesc, t_values, t_isnull);
+					simple_heap_insert(cstate->error_saving_rel, on_error_tup);
+				}
 				cstate->num_errors++;
 
 				if (cstate->opts.log_verbosity == COPY_LOG_VERBOSITY_VERBOSE)
diff --git a/src/include/commands/copy.h b/src/include/commands/copy.h
index 4002a7f538..40ab35e4a3 100644
--- a/src/include/commands/copy.h
+++ b/src/include/commands/copy.h
@@ -38,8 +38,15 @@ typedef enum CopyOnErrorChoice
 {
 	COPY_ON_ERROR_STOP = 0,		/* immediately throw errors, default */
 	COPY_ON_ERROR_IGNORE,		/* ignore errors */
+	COPY_ON_ERROR_TABLE,		/* saving errors info to table */
 } CopyOnErrorChoice;
 
+/*
+ * used for (COPY on_error 'table'); the error saving table saving error info
+ * only allow have 10 columns.
+*/
+#define ERROR_TBL_COLUMNS   10
+
 /*
  * Represents verbosity of logged messages by COPY command.
  */
@@ -86,6 +93,7 @@ typedef struct CopyFormatOptions
 	CopyOnErrorChoice on_error; /* what to do when error happened */
 	CopyLogVerbosityChoice log_verbosity;	/* verbosity of logged messages */
 	int64		reject_limit;	/* maximum tolerable number of errors */
+	char		*on_error_tbl; /* on error, save error info to the table, table name */
 	List	   *convert_select; /* list of column names (can be NIL) */
 } CopyFormatOptions;
 
diff --git a/src/include/commands/copyfrom_internal.h b/src/include/commands/copyfrom_internal.h
index cad52fcc78..779f86d1ce 100644
--- a/src/include/commands/copyfrom_internal.h
+++ b/src/include/commands/copyfrom_internal.h
@@ -70,6 +70,7 @@ typedef struct CopyFromStateData
 
 	/* parameters from the COPY command */
 	Relation	rel;			/* relation to copy from */
+	Relation	error_saving_rel; /* relation for copy from error saving */
 	List	   *attnumlist;		/* integer list of attnums to copy */
 	char	   *filename;		/* filename, or NULL for STDIN */
 	bool		is_program;		/* is 'filename' a program to popen? */
diff --git a/src/test/regress/expected/copy2.out b/src/test/regress/expected/copy2.out
index 64ea33aeae..1b477ad753 100644
--- a/src/test/regress/expected/copy2.out
+++ b/src/test/regress/expected/copy2.out
@@ -813,6 +813,109 @@ ERROR:  skipped more than REJECT_LIMIT (3) rows due to data type incompatibility
 CONTEXT:  COPY check_ign_err, line 5, column n: ""
 COPY check_ign_err FROM STDIN WITH (on_error ignore, reject_limit 4);
 NOTICE:  4 rows were skipped due to data type incompatibility
+create table err_tbl(
+  userid oid,   -- the user oid while copy generated this entry
+  copy_tbl oid, --copy table
+  filename text,
+  lineno  bigint,
+  line    text,
+  colname text,
+  raw_field_value text,
+  err_message text,
+  err_detail text,
+  errorcode text
+);
+--cannot use for error saving.
+create table err_tbl_1(
+userid    oid,
+copy_tbl  oid,
+filename  text,
+lineno    bigint,
+line      text,
+colname   text,
+raw_field_value text,
+err_message     text,
+err_detail      text
+);
+--cannot use for error saving.
+create table err_tbl_2(
+  userid oid, copy_tbl oid, filename text, lineno bigint,line text,
+  colname text, raw_field_value text, err_message text,
+  err_detail text,
+  errorcode text,
+  errorcode1 text
+);
+create table t_copy_tbl(a int, b int, c int);
+create view s1 as select 1 as a;
+----invalid options, the below all should  fails
+COPY t_copy_tbl FROM STDIN WITH (on_error 'table');
+ERROR:  COPY ON_ERROR "table" requires a custom specified error saving table
+HINT:  You need also specify "TABLE" option.
+COPY t_copy_tbl FROM STDIN WITH (table err_tbl);
+ERROR:  COPY "TABLE" option can only be used when ON_ERROR option is specified as "table"
+COPY t_copy_tbl TO STDIN WITH (on_error 'table');
+ERROR:  COPY ON_ERROR cannot be used with COPY TO
+LINE 1: COPY t_copy_tbl TO STDIN WITH (on_error 'table');
+                                       ^
+COPY t_copy_tbl(a,b) FROM STDIN WITH (on_error 'table', reject_limit 10, table err_tbl);
+ERROR:  cannot specify REJECT_LIMIT option when ON_ERROR option is specified as "table"
+COPY t_copy_tbl(a,b) FROM STDIN WITH (on_error 'table', log_verbosity verbose, table err_tbl);
+ERROR:  cannot specify log_verbosity as "verbose" when ON_ERROR option is specified as "table"
+COPY t_copy_tbl(a,b) FROM STDIN WITH (on_error 'table', table not_exists);
+ERROR:  error saving table "public"."not_exists" does not exist
+COPY t_copy_tbl(a) FROM STDIN WITH (on_error 'table', table s1);
+ERROR:  COPY ON_ERROR cannot use relation "s1" for error saving
+DETAIL:  This operation is not supported for views.
+--should fail. err_tbl_1 does not meet criteria
+COPY t_copy_tbl(a,b) FROM STDIN WITH (on_error 'table', table err_tbl_1);
+ERROR:  table "public"."err_tbl_1" cannot be used for COPY FROM error saving
+DETAIL:  Table "public"."err_tbl_1" data definition cannot be used for error saving
+--should fail. err_tbl_2 does not meet criteria
+COPY t_copy_tbl(a,b) FROM STDIN WITH (on_error 'table', table err_tbl_2);
+ERROR:  table "public"."err_tbl_2" cannot be used for COPY FROM error saving
+DETAIL:  Table "public"."err_tbl_2" data definition cannot be used for error saving
+----invalid options, the above all should fails
+--should fail, copied data have extra columns
+COPY t_copy_tbl(a,b) FROM STDIN WITH (DELIMITER ',', on_error 'table', table err_tbl);
+ERROR:  extra data after last expected column
+CONTEXT:  COPY t_copy_tbl, line 1: "1,2,3,4"
+--should fail, copied data have less columns
+COPY t_copy_tbl(a,b) FROM STDIN WITH (DELIMITER ',', on_error 'table', table err_tbl);
+ERROR:  extra data after last expected column
+CONTEXT:  COPY t_copy_tbl, line 1: "1,2,"
+--ok cases.
+COPY t_copy_tbl FROM STDIN WITH (DELIMITER ',', on_error 'table', table err_tbl);
+NOTICE:  4 rows were saved to table "err_tbl" due to data type incompatibility
+--should fail. lack privilege
+begin;
+create user regress_user20;
+grant insert(userid,copy_tbl,filename,lineno,line) on table err_tbl to regress_user20;
+grant insert on table t_copy_tbl to regress_user20;
+set role regress_user20;
+COPY t_copy_tbl FROM STDIN WITH (delimiter ',', on_error 'table', table err_tbl);
+ERROR:  permission denied to set table "public"."err_tbl" for COPY FROM error saving
+HINT:  Ensure current user have enough privilege on "public"."err_tbl" for COPY FROM error saving
+ROLLBACK;
+select	pg_class.relname as copy_destination
+        ,filename,lineno ,line
+        ,colname,raw_field_value,err_message
+        ,err_detail,errorcode
+from err_tbl join pg_class on copy_tbl = pg_class.oid;
+ copy_destination | filename | lineno |           line           | colname |   raw_field_value   |                         err_message                          | err_detail | errorcode 
+------------------+----------+--------+--------------------------+---------+---------------------+--------------------------------------------------------------+------------+-----------
+ t_copy_tbl       | STDIN    |      1 | 1,2,a                    | c       | a                   | invalid input syntax for type integer: "a"                   |            | 22P02
+ t_copy_tbl       | STDIN    |      3 | 1,_junk,test             | b       | _junk               | invalid input syntax for type integer: "_junk"               |            | 22P02
+ t_copy_tbl       | STDIN    |      4 | cola,colb,colc           | a       | cola                | invalid input syntax for type integer: "cola"                |            | 22P02
+ t_copy_tbl       | STDIN    |      6 | 1,11,4238679732489879879 | c       | 4238679732489879879 | value "4238679732489879879" is out of range for type integer |            | 22003
+(4 rows)
+
+select * from t_copy_tbl;
+ a | b | c 
+---+---+---
+ 1 | 2 | 3
+ 4 | 5 | 6
+(2 rows)
+
 -- clean up
 DROP TABLE forcetest;
 DROP TABLE vistest;
@@ -831,6 +934,10 @@ DROP TABLE check_ign_err;
 DROP TABLE check_ign_err2;
 DROP DOMAIN dcheck_ign_err2;
 DROP TABLE hard_err;
+DROP TABLE err_tbl;
+DROP TABLE err_tbl_1;
+DROP TABLE err_tbl_2;
+DROP TABLE t_copy_tbl CASCADE;
 --
 -- COPY FROM ... DEFAULT
 --
diff --git a/src/test/regress/sql/copy2.sql b/src/test/regress/sql/copy2.sql
index 45273557ce..655a768945 100644
--- a/src/test/regress/sql/copy2.sql
+++ b/src/test/regress/sql/copy2.sql
@@ -588,6 +588,91 @@ a	{7}	7
 10	{10}	10
 \.
 
+create table err_tbl(
+  userid oid,   -- the user oid while copy generated this entry
+  copy_tbl oid, --copy table
+  filename text,
+  lineno  bigint,
+  line    text,
+  colname text,
+  raw_field_value text,
+  err_message text,
+  err_detail text,
+  errorcode text
+);
+--cannot use for error saving.
+create table err_tbl_1(
+userid    oid,
+copy_tbl  oid,
+filename  text,
+lineno    bigint,
+line      text,
+colname   text,
+raw_field_value text,
+err_message     text,
+err_detail      text
+);
+--cannot use for error saving.
+create table err_tbl_2(
+  userid oid, copy_tbl oid, filename text, lineno bigint,line text,
+  colname text, raw_field_value text, err_message text,
+  err_detail text,
+  errorcode text,
+  errorcode1 text
+);
+create table t_copy_tbl(a int, b int, c int);
+create view s1 as select 1 as a;
+
+----invalid options, the below all should  fails
+COPY t_copy_tbl FROM STDIN WITH (on_error 'table');
+COPY t_copy_tbl FROM STDIN WITH (table err_tbl);
+COPY t_copy_tbl TO STDIN WITH (on_error 'table');
+COPY t_copy_tbl(a,b) FROM STDIN WITH (on_error 'table', reject_limit 10, table err_tbl);
+COPY t_copy_tbl(a,b) FROM STDIN WITH (on_error 'table', log_verbosity verbose, table err_tbl);
+COPY t_copy_tbl(a,b) FROM STDIN WITH (on_error 'table', table not_exists);
+COPY t_copy_tbl(a) FROM STDIN WITH (on_error 'table', table s1);
+--should fail. err_tbl_1 does not meet criteria
+COPY t_copy_tbl(a,b) FROM STDIN WITH (on_error 'table', table err_tbl_1);
+--should fail. err_tbl_2 does not meet criteria
+COPY t_copy_tbl(a,b) FROM STDIN WITH (on_error 'table', table err_tbl_2);
+----invalid options, the above all should fails
+
+
+--should fail, copied data have extra columns
+COPY t_copy_tbl(a,b) FROM STDIN WITH (DELIMITER ',', on_error 'table', table err_tbl);
+1,2,3,4
+\.
+
+--should fail, copied data have less columns
+COPY t_copy_tbl(a,b) FROM STDIN WITH (DELIMITER ',', on_error 'table', table err_tbl);
+1,2,
+\.
+
+--ok cases.
+COPY t_copy_tbl FROM STDIN WITH (DELIMITER ',', on_error 'table', table err_tbl);
+1,2,a
+1,2,3
+1,_junk,test
+cola,colb,colc
+4,5,6
+1,11,4238679732489879879
+\.
+
+--should fail. lack privilege
+begin;
+create user regress_user20;
+grant insert(userid,copy_tbl,filename,lineno,line) on table err_tbl to regress_user20;
+grant insert on table t_copy_tbl to regress_user20;
+set role regress_user20;
+COPY t_copy_tbl FROM STDIN WITH (delimiter ',', on_error 'table', table err_tbl);
+ROLLBACK;
+
+select	pg_class.relname as copy_destination
+        ,filename,lineno ,line
+        ,colname,raw_field_value,err_message
+        ,err_detail,errorcode
+from err_tbl join pg_class on copy_tbl = pg_class.oid;
+select * from t_copy_tbl;
 -- clean up
 DROP TABLE forcetest;
 DROP TABLE vistest;
@@ -606,6 +691,10 @@ DROP TABLE check_ign_err;
 DROP TABLE check_ign_err2;
 DROP DOMAIN dcheck_ign_err2;
 DROP TABLE hard_err;
+DROP TABLE err_tbl;
+DROP TABLE err_tbl_1;
+DROP TABLE err_tbl_2;
+DROP TABLE t_copy_tbl CASCADE;
 
 --
 -- COPY FROM ... DEFAULT
-- 
2.34.1

Reply via email to