On Mon, Dec 16, 2024 at 7:50 PM Nishant Sharma
<nishant.sha...@enterprisedb.com> wrote:
>
>
> 1) The new switch logic does not look correct to me. It will pass for
> a failing scenario. I think you can use v3's logic instead with below
> changes:-
>
> a)
> while (HeapTupleIsValid(atup = systable_getnext(ascan))) -->
> while (HeapTupleIsValid(atup = systable_getnext(ascan)) && on_error_tbl_ok)
>
> b)
> attcnt++; --> just before the "switch (attForm->attnum)".
>
> Thats it.
>
You are right about this.

On Tue, Dec 17, 2024 at 12:31 PM Kirill Reshke <reshkekir...@gmail.com> wrote:
>
> On Mon, 16 Dec 2024 at 16:50, Nishant Sharma
> <nishant.sha...@enterprisedb.com> wrote:
> > Also, I think Andrew's suggestion can resolve the concern me and Krill
> > had on forcing users to create tables with correct column names and
> > numbers. Also, will make error table checking simpler. No need for the
> > above kind of checks.
>
> +1 on that.
>

Syntax: COPY (on_error table, table error_saving_tbl);
seems not ideal.

but auto-create on_error table if this table does not exist, seems way
more harder.

Since we can not use SPI interface here, maybe we can use DefineRelation
also, to auto-create a table, what if table already exists, then
our operation would be stuck for not COPY related reason.
also auto-create means we need to come up with a magic table name for
all COPY (on_error table)
operations, which seems not ideal IMO.

i realized we should error out case like:
COPY err_tbl FROM STDIN WITH (DELIMITER ',', on_error table, table err_tbl);

also by changing copy_generic_opt_arg, now we can
COPY err_tbl FROM STDIN WITH (DELIMITER ',', on_error table, table  x);
previously, we can only do
COPY err_tbl FROM STDIN WITH (DELIMITER ',', on_error 'table', table  x);
From 4d5458c1c5de85b9f03c22727b92aec45f0cd73d Mon Sep 17 00:00:00 2001
From: jian he <jian.universal...@gmail.com>
Date: Fri, 25 Apr 2025 21:43:39 +0800
Subject: [PATCH v5 1/1] COPY FROM (on_error table)

the syntax is {on_error table, table error_saving_tbl}.  we first check table
error_saving_tbl's existence and data definition.  if it does not meet our
criteria, then we quickly error out.

we also did preliminary check the lock of error saving table so the insert to
error saving table won't stuck.

once there is a error happened, we save the error metedata and insert it to the
error_saving_table. and continue to the next row.  That means for one row, we
can only catch the first field that have errors.

discussion: https://postgr.es/m/CACJufxH_OJpVra%3D0c4ow8fbxHj7heMcVaTNEPa5vAurSeNA-6Q%40mail.gmail.com
context:    https://www.postgresql.org/message-id/752672.1699474336%40sss.pgh.pa.us
commitfest: https://commitfest.postgresql.org/51/4817/
---
 doc/src/sgml/ref/copy.sgml               | 119 ++++++++++++-
 src/backend/commands/copy.c              |  29 ++++
 src/backend/commands/copyfrom.c          | 202 ++++++++++++++++++++++-
 src/backend/commands/copyfromparse.c     |  50 +++++-
 src/backend/parser/gram.y                |   1 +
 src/include/commands/copy.h              |   8 +
 src/include/commands/copyfrom_internal.h |   1 +
 src/test/regress/expected/copy2.out      | 101 ++++++++++++
 src/test/regress/sql/copy2.sql           |  84 ++++++++++
 9 files changed, 582 insertions(+), 13 deletions(-)

diff --git a/doc/src/sgml/ref/copy.sgml b/doc/src/sgml/ref/copy.sgml
index 8433344e5b6..b627d422b38 100644
--- a/doc/src/sgml/ref/copy.sgml
+++ b/doc/src/sgml/ref/copy.sgml
@@ -44,6 +44,7 @@ COPY { <replaceable class="parameter">table_name</replaceable> [ ( <replaceable
     FORCE_NOT_NULL { ( <replaceable class="parameter">column_name</replaceable> [, ...] ) | * }
     FORCE_NULL { ( <replaceable class="parameter">column_name</replaceable> [, ...] ) | * }
     ON_ERROR <replaceable class="parameter">error_action</replaceable>
+    TABLE <replaceable class="parameter">error_saving_table</replaceable>
     REJECT_LIMIT <replaceable class="parameter">maxerror</replaceable>
     ENCODING '<replaceable class="parameter">encoding_name</replaceable>'
     LOG_VERBOSITY <replaceable class="parameter">verbosity</replaceable>
@@ -395,15 +396,25 @@ COPY { <replaceable class="parameter">table_name</replaceable> [ ( <replaceable
       input value into its data type.
       An <replaceable class="parameter">error_action</replaceable> value of
       <literal>stop</literal> means fail the command, while
-      <literal>ignore</literal> means discard the input row and continue with the next one.
+      <literal>ignore</literal> means discard the input row and continue with the next one,
+      <literal>table</literal> means save error details to <replaceable class="parameter">error_saving_table</replaceable>
+      and continue with the next one.
       The default is <literal>stop</literal>.
      </para>
      <para>
-      The <literal>ignore</literal> option is applicable only for <command>COPY FROM</command>
+      The <literal>ignore</literal> and <literal>table</literal> option are applicable only for <command>COPY FROM</command>
       when the <literal>FORMAT</literal> is <literal>text</literal> or <literal>csv</literal>.
      </para>
      <para>
-      A <literal>NOTICE</literal> message containing the ignored row count is
+      If <literal>ON_ERROR</literal>=<literal>table</literal>,
+      a <literal>NOTICE</literal> message containing the row count that is saved to
+      <replaceable class="parameter">error_saving_table</replaceable> is
+      emitted at the end of the <command>COPY FROM</command> if at least one
+      row was saved.
+     </para>
+
+     <para>
+      If <literal>ON_ERROR</literal>=<literal>ignore</literal>, a <literal>NOTICE</literal> message containing the ignored row count is
       emitted at the end of the <command>COPY FROM</command> if at least one
       row was discarded. When <literal>LOG_VERBOSITY</literal> option is set to
       <literal>verbose</literal>, a <literal>NOTICE</literal> message
@@ -463,6 +474,108 @@ COPY { <replaceable class="parameter">table_name</replaceable> [ ( <replaceable
     </listitem>
    </varlistentry>
 
+   <varlistentry>
+    <term><literal>TABLE</literal></term>
+    <listitem>
+      <para>
+        Save error context details to the table <replaceable class="parameter">error_saving_table</replaceable>.
+        This option is allowed only in <command>COPY FROM</command> and
+        <literal>ON_ERROR</literal> is specified with <literal>TABLE</literal>.
+        It also require user have <literal>INSERT</literal> privileges on all columns
+        in the <replaceable class="parameter">error_saving_table</replaceable>.
+      </para>
+
+   <para>
+    If table <replaceable class="parameter">error_saving_table</replaceable> does meet the following definition
+    (column ordinal position should be the same as the below), an error will be raised.
+
+<informaltable>
+    <tgroup cols="3">
+     <thead>
+      <row>
+       <entry>Column name</entry>
+       <entry>Data type</entry>
+       <entry>Description</entry>
+      </row>
+     </thead>
+
+      <tbody>
+       <row>
+       <entry> <literal>userid</literal> </entry>
+       <entry><type>oid</type></entry>
+       <entry>The user generated the error.
+       Reference <link linkend="catalog-pg-authid"><structname>pg_authid</structname></link>.<structfield>oid</structfield>,
+       however there is no hard dependency with catalog <literal>pg_authid</literal>.
+       If the corresponding row on <literal>pg_authid</literal> is deleted, this value becomes stale.
+    </entry>
+       </row>
+
+       <row>
+       <entry> <literal>copy_tbl</literal> </entry>
+       <entry><type>oid</type></entry>
+       <entry>The <command>COPY FROM</command> operation destination table oid.
+        Reference <link linkend="catalog-pg-class"><structname>pg_class</structname></link>.<structfield>oid</structfield>,
+        however there is no hard dependency with catalog <literal>pg_class</literal>.
+        If the corresponding row on <literal>pg_class</literal> is deleted, this value becomes stale.
+        </entry>
+       </row>
+
+       <row>
+       <entry> <literal>filename</literal> </entry>
+       <entry><type>text</type></entry>
+       <entry>The path name of the <command>COPY FROM</command> input</entry>
+       </row>
+
+       <row>
+       <entry> <literal>lineno</literal> </entry>
+       <entry><type>bigint</type></entry>
+       <entry>Line number where the error occurred, counting from 1</entry>
+       </row>
+
+       <row>
+       <entry> <literal>line</literal> </entry>
+       <entry><type>text</type></entry>
+       <entry>Raw content of the error occurred line</entry>
+       </row>
+
+       <row>
+       <entry> <literal>colname</literal> </entry>
+       <entry><type>text</type></entry>
+       <entry>Field where the error occurred</entry>
+       </row>
+
+       <row>
+       <entry> <literal>raw_field_value</literal> </entry>
+       <entry><type>text</type></entry>
+       <entry>Raw content of the error occurred field</entry>
+       </row>
+
+       <row>
+       <entry> <literal>err_message </literal> </entry>
+       <entry><type>text</type></entry>
+       <entry>The error message</entry>
+       </row>
+
+       <row>
+       <entry> <literal>err_detail</literal> </entry>
+       <entry><type>text</type></entry>
+       <entry>Detailed error message </entry>
+       </row>
+
+       <row>
+       <entry> <literal>errorcode </literal> </entry>
+       <entry><type>text</type></entry>
+       <entry>The error code </entry>
+       </row>
+
+      </tbody>
+     </tgroup>
+   </informaltable>
+
+     </para>
+    </listitem>
+   </varlistentry>
+
    <varlistentry>
     <term><literal>WHERE</literal></term>
     <listitem>
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index 74ae42b19a7..a5492214117 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -410,6 +410,8 @@ defGetCopyOnErrorChoice(DefElem *def, ParseState *pstate, bool is_from)
 	if (pg_strcasecmp(sval, "ignore") == 0)
 		return COPY_ON_ERROR_IGNORE;
 
+	if (pg_strcasecmp(sval, "table") == 0)
+		return COPY_ON_ERROR_TABLE;
 	ereport(ERROR,
 			(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
 	/*- translator: first %s is the name of a COPY option, e.g. ON_ERROR */
@@ -502,6 +504,7 @@ ProcessCopyOptions(ParseState *pstate,
 	bool		freeze_specified = false;
 	bool		header_specified = false;
 	bool		on_error_specified = false;
+	bool		on_error_tbl_specified = false;
 	bool		log_verbosity_specified = false;
 	bool		reject_limit_specified = false;
 	ListCell   *option;
@@ -677,6 +680,13 @@ ProcessCopyOptions(ParseState *pstate,
 			reject_limit_specified = true;
 			opts_out->reject_limit = defGetCopyRejectLimitOption(defel);
 		}
+		else if (strcmp(defel->defname, "table") == 0)
+		{
+			if (on_error_tbl_specified)
+				errorConflictingDefElem(defel, pstate);
+			on_error_tbl_specified = true;
+			opts_out->on_error_tbl = defGetString(defel);
+		}
 		else
 			ereport(ERROR,
 					(errcode(ERRCODE_SYNTAX_ERROR),
@@ -685,6 +695,25 @@ ProcessCopyOptions(ParseState *pstate,
 					 parser_errposition(pstate, defel->location)));
 	}
 
+	if (opts_out->on_error == COPY_ON_ERROR_TABLE)
+	{
+		if (opts_out->on_error_tbl == NULL)
+			ereport(ERROR,
+					errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+					errmsg("cannot specify %s option value to \"%s\" when %s is not specified", "ON_ERROR", "TABLE", "TABLE"),
+					errhint("You may need also specify \"%s\" option.", "TABLE"));
+
+		if (opts_out->reject_limit)
+			ereport(ERROR,
+					errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+					errmsg("cannot specify %s option when %s option is specified as \"%s\"", "REJECT_LIMIT", "ON_ERROR", "TABLE"));
+	}
+
+	if (opts_out->on_error != COPY_ON_ERROR_TABLE && opts_out->on_error_tbl != NULL)
+		ereport(ERROR,
+				errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+				errmsg("COPY %s option can only be used when %s option is specified as \"%s\"", "TABLE", "ON_ERROR", "TABLE"));
+
 	/*
 	 * Check for incompatible options (must do these three before inserting
 	 * defaults)
diff --git a/src/backend/commands/copyfrom.c b/src/backend/commands/copyfrom.c
index fbbbc09a97b..91816a7f781 100644
--- a/src/backend/commands/copyfrom.c
+++ b/src/backend/commands/copyfrom.c
@@ -44,7 +44,10 @@
 #include "pgstat.h"
 #include "rewrite/rewriteHandler.h"
 #include "storage/fd.h"
+#include "storage/lmgr.h"
 #include "tcop/tcopprot.h"
+#include "utils/builtins.h"
+#include "utils/fmgroids.h"
 #include "utils/lsyscache.h"
 #include "utils/memutils.h"
 #include "utils/portal.h"
@@ -1175,6 +1178,16 @@ CopyFrom(CopyFromState cstate)
 			continue;
 		}
 
+		if (cstate->opts.on_error == COPY_ON_ERROR_TABLE &&
+			cstate->escontext->error_occurred)
+		{
+			cstate->escontext->error_occurred = false;
+			cstate->escontext->details_wanted = true;
+			memset(cstate->escontext->error_data, 0, sizeof(ErrorData));
+			/* Repeat NextCopyFrom() until no soft error occurs */
+			continue;
+		}
+
 		ExecStoreVirtualTuple(myslot);
 
 		/*
@@ -1467,14 +1480,31 @@ CopyFrom(CopyFromState cstate)
 	/* Done, clean up */
 	error_context_stack = errcallback.previous;
 
-	if (cstate->opts.on_error != COPY_ON_ERROR_STOP &&
-		cstate->num_errors > 0 &&
+	if (cstate->num_errors > 0 &&
 		cstate->opts.log_verbosity >= COPY_LOG_VERBOSITY_DEFAULT)
-		ereport(NOTICE,
-				errmsg_plural("%" PRIu64 " row was skipped due to data type incompatibility",
-							  "%" PRIu64 " rows were skipped due to data type incompatibility",
-							  cstate->num_errors,
-							  cstate->num_errors));
+	{
+		if (cstate->opts.on_error == COPY_ON_ERROR_IGNORE)
+			ereport(NOTICE,
+					errmsg_plural("%" PRIu64 " row was skipped due to data type incompatibility",
+								"%" PRIu64 " rows were skipped due to data type incompatibility",
+								cstate->num_errors,
+								cstate->num_errors));
+		else if (cstate->opts.on_error == COPY_ON_ERROR_TABLE)
+			ereport(NOTICE,
+					errmsg_plural("%" PRIu64 " row was saved to table \"%s\" due to data type incompatibility",
+								  "%" PRIu64 " rows were saved to table \"%s\" due to data type incompatibility",
+								  cstate->num_errors,
+								  cstate->num_errors,
+								  RelationGetRelationName(cstate->error_saving_rel)));
+	}
+
+	/*
+	 * similar to
+	 * (https://postgr.es/m/7bcfc39d4176faf85ab317d0c26786953646a411.ca...@cybertec.at)
+	 * in COPY FROM keep error saving table locks until the transaction end.
+	*/
+	if (cstate->error_saving_rel != NULL)
+		table_close(cstate->error_saving_rel, NoLock);
 
 	if (bistate != NULL)
 		FreeBulkInsertState(bistate);
@@ -1622,15 +1652,169 @@ BeginCopyFrom(ParseState *pstate,
 		cstate->escontext->error_occurred = false;
 
 		/*
-		 * Currently we only support COPY_ON_ERROR_IGNORE. We'll add other
-		 * options later
+		 * Currently we only support COPY_ON_ERROR_IGNORE, COPY_ON_ERROR_TABLE.
+		 * We'll add other options later.
 		 */
 		if (cstate->opts.on_error == COPY_ON_ERROR_IGNORE)
 			cstate->escontext->details_wanted = false;
+		else if (cstate->opts.on_error == COPY_ON_ERROR_TABLE)
+			cstate->escontext->details_wanted = true;
 	}
 	else
 		cstate->escontext = NULL;
 
+	if (cstate->opts.on_error == COPY_ON_ERROR_TABLE)
+	{
+		Datum		ins_prev;
+		Oid			err_tbl_oid;
+		Relation	pg_attribute;
+		ScanKeyData scankey;
+		SysScanDesc scan;
+		HeapTuple	atup;
+		Form_pg_attribute attForm;
+		int			attcnt = 0;
+		bool		on_error_tbl_ok = true;
+
+		Assert(cstate->opts.on_error_tbl != NULL);
+
+		err_tbl_oid = RelnameGetRelid(cstate->opts.on_error_tbl);
+		if (!OidIsValid(err_tbl_oid))
+			ereport(ERROR,
+					errcode(ERRCODE_UNDEFINED_TABLE),
+					errmsg("relation \"%s\" does not exist",
+						   cstate->opts.on_error_tbl));
+
+		if (RelationGetRelid(cstate->rel) == err_tbl_oid)
+			ereport(ERROR,
+					errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+					errmsg("cannot use relation \"%s\" for COPY error saving while copying data to it",
+						   cstate->opts.on_error_tbl));
+
+		/* error saving table must be a regular realtion */
+		if (get_rel_relkind(err_tbl_oid) != RELKIND_RELATION)
+			ereport(ERROR,
+					errcode(ERRCODE_WRONG_OBJECT_TYPE),
+					errmsg("cannot use relation \"%s\" for COPY error saving",
+						   cstate->opts.on_error_tbl),
+					errdetail_relkind_not_supported(get_rel_relkind(err_tbl_oid)));
+
+		/*
+		 * we may insert tuples to error-saving table later, to do that we need
+		 * first check it's lock situation. If it is already under heavy lock,
+		 * then our COPY operation would stuck. Instead of let COPY FROM stuck,
+		 * just error report the error saving table is under heavy lock.
+		*/
+		if (!ConditionalLockRelationOid(err_tbl_oid, RowExclusiveLock))
+			ereport(ERROR,
+					errcode(ERRCODE_OBJECT_IN_USE),
+					errmsg("can not use table \"%s\" for error saving because it was being locked",
+						   cstate->opts.on_error_tbl));
+
+		cstate->error_saving_rel = table_open(err_tbl_oid, RowExclusiveLock);
+
+		/* current user should have INSERT privilege on error_saving table */
+		ins_prev = DirectFunctionCall3(has_table_privilege_id_id,
+									   ObjectIdGetDatum(GetUserId()),
+									   ObjectIdGetDatum(err_tbl_oid),
+									   CStringGetTextDatum("INSERT"));
+		if (!DatumGetBool(ins_prev))
+			ereport(ERROR,
+					errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+					errmsg("permission denied to set table \"%s\" for COPY FROM error saving",
+						   RelationGetRelationName(cstate->error_saving_rel)),
+					errhint("Ensure current user have enough privilege on \"%s\" for COPY FROM error saving",
+							RelationGetRelationName(cstate->error_saving_rel)));
+
+		/*
+		 * Verify whether the definition of the table
+		 * (cstate->error_saving_rel)— including column names, data types, and
+		 * the number of columns— is suitable for use in error saving.
+		*/
+		pg_attribute = table_open(AttributeRelationId, AccessShareLock);
+		ScanKeyInit(&scankey,
+					Anum_pg_attribute_attrelid,
+					BTEqualStrategyNumber, F_OIDEQ,
+					ObjectIdGetDatum(err_tbl_oid));
+
+		scan = systable_beginscan(pg_attribute, AttributeRelidNumIndexId, true,
+								  SnapshotSelf, 1, &scankey);
+		while (HeapTupleIsValid(atup = systable_getnext(scan)) && on_error_tbl_ok)
+		{
+			attForm = (Form_pg_attribute) GETSTRUCT(atup);
+
+			if (attForm->attnum < 1 || attForm->attisdropped)
+				continue;
+
+			attcnt++;
+			switch (attForm->attnum)
+			{
+				case 1:
+					if (attForm->atttypid != OIDOID ||
+						strcmp(NameStr(attForm->attname), "userid") != 0)
+						on_error_tbl_ok = false;
+					break;
+				case 2:
+					if (attForm->atttypid != OIDOID ||
+						strcmp(NameStr(attForm->attname), "copy_tbl") != 0)
+						on_error_tbl_ok = false;						
+					break;
+				case 3:
+					if (attForm->atttypid != TEXTOID ||
+						strcmp(NameStr(attForm->attname), "filename") != 0)
+						on_error_tbl_ok = false;
+					break;
+				case 4:
+					if (attForm->atttypid != INT8OID ||
+						strcmp(NameStr(attForm->attname), "lineno") != 0)
+						on_error_tbl_ok = false;
+					break;
+				case 5:
+					if (attForm->atttypid != TEXTOID ||
+						strcmp(NameStr(attForm->attname), "line") != 0)
+						on_error_tbl_ok = false;
+					break;
+				case 6:
+					if (attForm->atttypid != TEXTOID ||
+						strcmp(NameStr(attForm->attname), "colname") != 0)
+						on_error_tbl_ok = false;
+					break;
+				case 7:
+					if (attForm->atttypid != TEXTOID ||
+						strcmp(NameStr(attForm->attname), "raw_field_value") != 0)
+						on_error_tbl_ok = false;
+					break;
+				case 8:
+					if (attForm->atttypid != TEXTOID ||
+						strcmp(NameStr(attForm->attname), "err_message") != 0)
+						on_error_tbl_ok = false;
+					break;
+				case 9:
+					if (attForm->atttypid != TEXTOID ||
+						strcmp(NameStr(attForm->attname), "err_detail") != 0)
+						on_error_tbl_ok = false;
+					break;
+				case 10:
+					if (attForm->atttypid != TEXTOID ||
+						strcmp(NameStr(attForm->attname), "errorcode") != 0)
+						on_error_tbl_ok = false;
+					break;
+				default:
+					on_error_tbl_ok = false;
+					break;
+			}
+		}
+		systable_endscan(scan);
+		table_close(pg_attribute, AccessShareLock);
+
+		if (attcnt != ERROR_TBL_COLUMNS || !on_error_tbl_ok)
+			ereport(ERROR,
+					errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+					errmsg("table \"%s\" cannot be used for COPY error saving",
+						   RelationGetRelationName(cstate->error_saving_rel)),
+					errdetail("Table \"%s\" data definition is not suitable for error saving",
+							  RelationGetRelationName(cstate->error_saving_rel)));
+	}
+
 	/* Convert FORCE_NULL name list to per-column flags, check validity */
 	cstate->opts.force_null_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
 	if (cstate->opts.force_null_all)
diff --git a/src/backend/commands/copyfromparse.c b/src/backend/commands/copyfromparse.c
index f5fc346e201..34acacf8660 100644
--- a/src/backend/commands/copyfromparse.c
+++ b/src/backend/commands/copyfromparse.c
@@ -62,6 +62,7 @@
 #include <unistd.h>
 #include <sys/stat.h>
 
+#include "access/heapam.h"
 #include "commands/copyapi.h"
 #include "commands/copyfrom_internal.h"
 #include "commands/progress.h"
@@ -78,6 +79,8 @@
 #define ISOCTAL(c) (((c) >= '0') && ((c) <= '7'))
 #define OCTVALUE(c) ((c) - '0')
 
+#define ERROR_TBL_COLUMNS   10
+
 /*
  * These macros centralize code used to process line_buf and input_buf buffers.
  * They are macros because they often do continue/break control and to avoid
@@ -1035,9 +1038,54 @@ CopyFromTextLikeOneRow(CopyFromState cstate, ExprContext *econtext,
 		{
 			Assert(cstate->opts.on_error != COPY_ON_ERROR_STOP);
 
+			if (cstate->opts.on_error == COPY_ON_ERROR_TABLE)
+			{
+				/*
+				 * We use ErrorSaveContext to form a tuple and insert it to the
+				 * error saving table. A RowExclusiveLock on error_saving_rel
+				 * was already acquired in BeginCopyFrom.
+				*/
+				HeapTuple	tuple;
+				TupleDesc	tupdesc;
+				char		*err_detail;
+				char		*err_code;
+				Datum	values[ERROR_TBL_COLUMNS] = {0};
+				bool	isnull[ERROR_TBL_COLUMNS] = {0};
+				int		j = 0;
+
+				Assert(cstate->rel != NULL);
+				Assert(cstate->escontext->error_occurred);
+
+				values[j++] = ObjectIdGetDatum(GetUserId());
+				values[j++] = ObjectIdGetDatum(cstate->rel->rd_rel->oid);
+				values[j++] = CStringGetTextDatum(cstate->filename ? cstate->filename : "STDIN");
+				values[j++] = Int64GetDatum((long long) cstate->cur_lineno);
+				values[j++] = CStringGetTextDatum(cstate->line_buf.data);
+				values[j++] = CStringGetTextDatum(cstate->cur_attname);
+				values[j++] = CStringGetTextDatum(string);
+				values[j++] = CStringGetTextDatum(cstate->escontext->error_data->message);
+
+				if (!cstate->escontext->error_data->detail)
+					err_detail = NULL;
+				else
+					err_detail = cstate->escontext->error_data->detail;
+				values[j]   = err_detail ? CStringGetTextDatum(err_detail) : (Datum) 0;
+				isnull[j++] = err_detail ? false : true;
+
+				err_code = unpack_sql_state(cstate->escontext->error_data->sqlerrcode);
+				values[j++] = CStringGetTextDatum(err_code);
+
+				Assert(j == ERROR_TBL_COLUMNS);
+
+				tupdesc =  RelationGetDescr(cstate->error_saving_rel);
+				tuple = heap_form_tuple(tupdesc, values, isnull);
+				simple_heap_insert(cstate->error_saving_rel, tuple);
+			}
+
 			cstate->num_errors++;
 
-			if (cstate->opts.log_verbosity == COPY_LOG_VERBOSITY_VERBOSE)
+			if (cstate->opts.log_verbosity == COPY_LOG_VERBOSITY_VERBOSE &&
+				cstate->opts.on_error == COPY_ON_ERROR_IGNORE)
 			{
 				/*
 				 * Since we emit line number and column info in the below
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index 3c4268b271a..0a45305c4b4 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -3577,6 +3577,7 @@ copy_generic_opt_arg:
 			| NumericOnly					{ $$ = (Node *) $1; }
 			| '*'							{ $$ = (Node *) makeNode(A_Star); }
 			| DEFAULT                       { $$ = (Node *) makeString("default"); }
+			| TABLE                         { $$ = (Node *) makeString("table"); }
 			| '(' copy_generic_opt_arg_list ')'		{ $$ = (Node *) $2; }
 			| /* EMPTY */					{ $$ = NULL; }
 		;
diff --git a/src/include/commands/copy.h b/src/include/commands/copy.h
index 06dfdfef721..0c7a2defc51 100644
--- a/src/include/commands/copy.h
+++ b/src/include/commands/copy.h
@@ -38,8 +38,15 @@ typedef enum CopyOnErrorChoice
 {
 	COPY_ON_ERROR_STOP = 0,		/* immediately throw errors, default */
 	COPY_ON_ERROR_IGNORE,		/* ignore errors */
+	COPY_ON_ERROR_TABLE,		/* saving errors info to table */
 } CopyOnErrorChoice;
 
+/*
+ * used for (COPY on_error 'table'); the error saving table have only 10
+ * columns.
+*/
+#define ERROR_TBL_COLUMNS   10
+
 /*
  * Represents verbosity of logged messages by COPY command.
  */
@@ -86,6 +93,7 @@ typedef struct CopyFormatOptions
 	CopyOnErrorChoice on_error; /* what to do when error happened */
 	CopyLogVerbosityChoice log_verbosity;	/* verbosity of logged messages */
 	int64		reject_limit;	/* maximum tolerable number of errors */
+	char		*on_error_tbl; 	/* on error, save error info to the table, table name */
 	List	   *convert_select; /* list of column names (can be NIL) */
 } CopyFormatOptions;
 
diff --git a/src/include/commands/copyfrom_internal.h b/src/include/commands/copyfrom_internal.h
index c8b22af22d8..c974311f6bf 100644
--- a/src/include/commands/copyfrom_internal.h
+++ b/src/include/commands/copyfrom_internal.h
@@ -73,6 +73,7 @@ typedef struct CopyFromStateData
 
 	/* parameters from the COPY command */
 	Relation	rel;			/* relation to copy from */
+	Relation	error_saving_rel; /* relation for copy from error saving */
 	List	   *attnumlist;		/* integer list of attnums to copy */
 	char	   *filename;		/* filename, or NULL for STDIN */
 	bool		is_program;		/* is 'filename' a program to popen? */
diff --git a/src/test/regress/expected/copy2.out b/src/test/regress/expected/copy2.out
index 64ea33aeae8..67c15d8849d 100644
--- a/src/test/regress/expected/copy2.out
+++ b/src/test/regress/expected/copy2.out
@@ -813,6 +813,103 @@ ERROR:  skipped more than REJECT_LIMIT (3) rows due to data type incompatibility
 CONTEXT:  COPY check_ign_err, line 5, column n: ""
 COPY check_ign_err FROM STDIN WITH (on_error ignore, reject_limit 4);
 NOTICE:  4 rows were skipped due to data type incompatibility
+create table err_tbl(
+userid oid,   -- the user oid while copy generated this entry
+copy_tbl oid, --copy table
+filename text,
+lineno  bigint,
+line    text,
+colname text,
+raw_field_value text,
+err_message text,
+err_detail text,
+errorcode text
+);
+--cannot use for error saving.
+create table err_tbl_1(
+userid oid, copy_tbl oid, filename text, lineno bigint, line text,
+colname text, raw_field_value text,
+err_message text,
+err_detail text);
+--cannot use for error saving.
+create table err_tbl_2(
+userid oid, copy_tbl oid, filename text, lineno bigint,line text,
+colname text, raw_field_value text, err_message text,
+err_detail text,
+errorcode text,
+errorcode1 text
+);
+create table t_copy_tbl(a int, b int, c int);
+create view s1 as select 1 as a;
+----invalid options, the below all should  fails
+COPY err_tbl FROM STDIN WITH (DELIMITER ',', on_error table, table err_tbl);
+ERROR:  cannot use relation "err_tbl" for COPY error saving while copying data to it
+COPY t_copy_tbl FROM STDIN WITH (on_error table);
+ERROR:  cannot specify ON_ERROR option value to "TABLE" when TABLE is not specified
+HINT:  You may need also specify "TABLE" option.
+COPY t_copy_tbl FROM STDIN WITH (table err_tbl);
+ERROR:  COPY TABLE option can only be used when ON_ERROR option is specified as "TABLE"
+COPY t_copy_tbl TO STDIN WITH (on_error table);
+ERROR:  COPY ON_ERROR cannot be used with COPY TO
+LINE 1: COPY t_copy_tbl TO STDIN WITH (on_error table);
+                                       ^
+COPY t_copy_tbl(a,b) FROM STDIN WITH (on_error table, reject_limit 10, table err_tbl);
+ERROR:  cannot specify REJECT_LIMIT option when ON_ERROR option is specified as "TABLE"
+COPY t_copy_tbl(a,b) FROM STDIN WITH (on_error table, table not_exists);
+ERROR:  relation "not_exists" does not exist
+COPY t_copy_tbl(a) FROM STDIN WITH (on_error table, table s1);
+ERROR:  cannot use relation "s1" for COPY error saving
+DETAIL:  This operation is not supported for views.
+--should fail. err_tbl_1 does not meet criteria
+COPY t_copy_tbl(a,b) FROM STDIN WITH (on_error table, table err_tbl_1);
+ERROR:  table "err_tbl_1" cannot be used for COPY error saving
+DETAIL:  Table "err_tbl_1" data definition is not suitable for error saving
+--should fail. err_tbl_2 does not meet criteria
+COPY t_copy_tbl(a,b) FROM STDIN WITH (on_error table, table err_tbl_2);
+ERROR:  table "err_tbl_2" cannot be used for COPY error saving
+DETAIL:  Table "err_tbl_2" data definition is not suitable for error saving
+----invalid options, the above all should fails
+--should fail, copied data have extra columns
+COPY t_copy_tbl(a,b) FROM STDIN WITH (DELIMITER ',', on_error table, table err_tbl);
+ERROR:  extra data after last expected column
+CONTEXT:  COPY t_copy_tbl, line 1: "1,2,3,4"
+--should fail, copied data have less columns
+COPY t_copy_tbl(a,b) FROM STDIN WITH (DELIMITER ',', on_error table, table err_tbl);
+ERROR:  extra data after last expected column
+CONTEXT:  COPY t_copy_tbl, line 1: "1,2,"
+--ok cases.
+COPY t_copy_tbl FROM STDIN WITH (DELIMITER ',', on_error table, table err_tbl);
+NOTICE:  4 rows were saved to table "err_tbl" due to data type incompatibility
+--should fail. lack privilege
+begin;
+create user regress_user20;
+grant insert(userid,copy_tbl,filename,lineno,line) on table err_tbl to regress_user20;
+grant insert on table t_copy_tbl to regress_user20;
+set role regress_user20;
+COPY t_copy_tbl FROM STDIN WITH (delimiter ',', on_error table, table err_tbl);
+ERROR:  permission denied to set table "err_tbl" for COPY FROM error saving
+HINT:  Ensure current user have enough privilege on "err_tbl" for COPY FROM error saving
+ROLLBACK;
+select	pg_class.relname as copy_destination
+        ,filename,lineno ,line
+        ,colname,raw_field_value,err_message
+        ,err_detail,errorcode
+from err_tbl join pg_class on copy_tbl = pg_class.oid;
+ copy_destination | filename | lineno |           line           | colname |   raw_field_value   |                         err_message                          | err_detail | errorcode 
+------------------+----------+--------+--------------------------+---------+---------------------+--------------------------------------------------------------+------------+-----------
+ t_copy_tbl       | STDIN    |      1 | 1,2,a                    | c       | a                   | invalid input syntax for type integer: "a"                   |            | 22P02
+ t_copy_tbl       | STDIN    |      3 | 1,_junk,test             | b       | _junk               | invalid input syntax for type integer: "_junk"               |            | 22P02
+ t_copy_tbl       | STDIN    |      4 | cola,colb,colc           | a       | cola                | invalid input syntax for type integer: "cola"                |            | 22P02
+ t_copy_tbl       | STDIN    |      6 | 1,11,4238679732489879879 | c       | 4238679732489879879 | value "4238679732489879879" is out of range for type integer |            | 22003
+(4 rows)
+
+select * from t_copy_tbl;
+ a | b | c 
+---+---+---
+ 1 | 2 | 3
+ 4 | 5 | 6
+(2 rows)
+
 -- clean up
 DROP TABLE forcetest;
 DROP TABLE vistest;
@@ -831,6 +928,10 @@ DROP TABLE check_ign_err;
 DROP TABLE check_ign_err2;
 DROP DOMAIN dcheck_ign_err2;
 DROP TABLE hard_err;
+DROP TABLE err_tbl;
+DROP TABLE err_tbl_1;
+DROP TABLE err_tbl_2;
+DROP TABLE t_copy_tbl CASCADE;
 --
 -- COPY FROM ... DEFAULT
 --
diff --git a/src/test/regress/sql/copy2.sql b/src/test/regress/sql/copy2.sql
index 45273557ce0..a190432682c 100644
--- a/src/test/regress/sql/copy2.sql
+++ b/src/test/regress/sql/copy2.sql
@@ -588,6 +588,86 @@ a	{7}	7
 10	{10}	10
 \.
 
+create table err_tbl(
+userid oid,   -- the user oid while copy generated this entry
+copy_tbl oid, --copy table
+filename text,
+lineno  bigint,
+line    text,
+colname text,
+raw_field_value text,
+err_message text,
+err_detail text,
+errorcode text
+);
+--cannot use for error saving.
+create table err_tbl_1(
+userid oid, copy_tbl oid, filename text, lineno bigint, line text,
+colname text, raw_field_value text,
+err_message text,
+err_detail text);
+
+--cannot use for error saving.
+create table err_tbl_2(
+userid oid, copy_tbl oid, filename text, lineno bigint,line text,
+colname text, raw_field_value text, err_message text,
+err_detail text,
+errorcode text,
+errorcode1 text
+);
+create table t_copy_tbl(a int, b int, c int);
+create view s1 as select 1 as a;
+
+----invalid options, the below all should  fails
+COPY err_tbl FROM STDIN WITH (DELIMITER ',', on_error table, table err_tbl);
+COPY t_copy_tbl FROM STDIN WITH (on_error table);
+COPY t_copy_tbl FROM STDIN WITH (table err_tbl);
+COPY t_copy_tbl TO STDIN WITH (on_error table);
+COPY t_copy_tbl(a,b) FROM STDIN WITH (on_error table, reject_limit 10, table err_tbl);
+COPY t_copy_tbl(a,b) FROM STDIN WITH (on_error table, table not_exists);
+COPY t_copy_tbl(a) FROM STDIN WITH (on_error table, table s1);
+--should fail. err_tbl_1 does not meet criteria
+COPY t_copy_tbl(a,b) FROM STDIN WITH (on_error table, table err_tbl_1);
+--should fail. err_tbl_2 does not meet criteria
+COPY t_copy_tbl(a,b) FROM STDIN WITH (on_error table, table err_tbl_2);
+----invalid options, the above all should fails
+
+--should fail, copied data have extra columns
+COPY t_copy_tbl(a,b) FROM STDIN WITH (DELIMITER ',', on_error table, table err_tbl);
+1,2,3,4
+\.
+
+--should fail, copied data have less columns
+COPY t_copy_tbl(a,b) FROM STDIN WITH (DELIMITER ',', on_error table, table err_tbl);
+1,2,
+\.
+
+--ok cases.
+COPY t_copy_tbl FROM STDIN WITH (DELIMITER ',', on_error table, table err_tbl);
+1,2,a
+1,2,3
+1,_junk,test
+cola,colb,colc
+4,5,6
+1,11,4238679732489879879
+\.
+
+--should fail. lack privilege
+begin;
+create user regress_user20;
+grant insert(userid,copy_tbl,filename,lineno,line) on table err_tbl to regress_user20;
+grant insert on table t_copy_tbl to regress_user20;
+set role regress_user20;
+COPY t_copy_tbl FROM STDIN WITH (delimiter ',', on_error table, table err_tbl);
+ROLLBACK;
+
+select	pg_class.relname as copy_destination
+        ,filename,lineno ,line
+        ,colname,raw_field_value,err_message
+        ,err_detail,errorcode
+from err_tbl join pg_class on copy_tbl = pg_class.oid;
+select * from t_copy_tbl;
+
 -- clean up
 DROP TABLE forcetest;
 DROP TABLE vistest;
@@ -606,6 +686,10 @@ DROP TABLE check_ign_err;
 DROP TABLE check_ign_err2;
 DROP DOMAIN dcheck_ign_err2;
 DROP TABLE hard_err;
+DROP TABLE err_tbl;
+DROP TABLE err_tbl_1;
+DROP TABLE err_tbl_2;
+DROP TABLE t_copy_tbl CASCADE;
 
 --
 -- COPY FROM ... DEFAULT
-- 
2.34.1

Reply via email to