On Tue, Nov 5, 2024 at 6:30 PM Nishant Sharma
<nishant.sha...@enterprisedb.com> wrote:
>
> Thanks for the v2 patch!
>
> I see v1 review comments got addressed in v2 along with some
> further improvements.
>
> 1) v2 Patch again needs re-base.
>
> 2) I think we need not worry whether table name is unique or not,
> table name can be provided by user and we can check if it does
> not exists then simply we can create it with appropriate columns,
> if it exists we use it to check if its correct on_error table and
> proceed.

"simply we can create it with appropriate columns,"
that would be more work.
so i stick to if there is a table can use to
error saving then use it, otherwise error out.


>
> 3) Using #define in between the code? I don't see that style in
> copyfromparse.c file. I do see such style in other src file. So, not
> sure if committer would allow it or not.
> #define ERROR_TBL_COLUMNS   10
>
let's wait and see.

> 4) Below appears redundant to me, it was not the case in v1 patch
> set, where it had only one return and one increment of error as new
> added code was at the end of the block:-
> +                   cstate->num_errors++;
> +                   return true;
> +               }
>                 cstate->num_errors++;
>
changed per your advice.

> I was not able to test the v2 due to conflicts in v2, I did attempt to
> resolve but I saw many failures in make world.
>
I get rid of all the SPI code.

Instead, now I iterate through AttributeRelationId to check if the
error saving table is ok or not,
using DirectFunctionCall3 to do the privilege check.
removed gram.y change, turns out it is not necessary.
and other kinds of refactoring.

please check attached.
From b62dfa333cb0bc7efd45f221662c01a5db6d5c24 Mon Sep 17 00:00:00 2001
From: jian he <jian.universal...@gmail.com>
Date: Tue, 3 Dec 2024 12:13:29 +0800
Subject: [PATCH v3 1/1] introduce on_error table option for COPY FROM.

the syntax is {on_error table, table error_saving_tbl}.

we first check table error_saving_tbl's existence and data definition.  if it
does not meet our criteria, then we quickly error out.

we also did preliminary check the lock of error saving table so the insert to
error saving table won't stuck.

once there is a error happened, we save the error metedata and insert it to the
error_saving_table. and continue to the next row.  That means for one row, we
can only catch the first filed that have errors.

discussion: https://postgr.es/m/CACJufxH_OJpVra%3D0c4ow8fbxHj7heMcVaTNEPa5vAurSeNA-6Q%40mail.gmail.com
---
 doc/src/sgml/ref/copy.sgml               | 118 ++++++++++++-
 src/backend/commands/copy.c              |  31 ++++
 src/backend/commands/copyfrom.c          | 203 ++++++++++++++++++++++-
 src/backend/commands/copyfromparse.c     |  45 +++++
 src/include/commands/copy.h              |   2 +
 src/include/commands/copyfrom_internal.h |   1 +
 src/test/regress/expected/copy2.out      | 108 ++++++++++++
 src/test/regress/sql/copy2.sql           |  91 ++++++++++
 8 files changed, 590 insertions(+), 9 deletions(-)

diff --git a/doc/src/sgml/ref/copy.sgml b/doc/src/sgml/ref/copy.sgml
index 8394402f09..72ad4461ac 100644
--- a/doc/src/sgml/ref/copy.sgml
+++ b/doc/src/sgml/ref/copy.sgml
@@ -44,6 +44,7 @@ COPY { <replaceable class="parameter">table_name</replaceable> [ ( <replaceable
     FORCE_NOT_NULL { ( <replaceable class="parameter">column_name</replaceable> [, ...] ) | * }
     FORCE_NULL { ( <replaceable class="parameter">column_name</replaceable> [, ...] ) | * }
     ON_ERROR <replaceable class="parameter">error_action</replaceable>
+    TABLE '<replaceable class="parameter">error_saving_table</replaceable>'
     REJECT_LIMIT <replaceable class="parameter">maxerror</replaceable>
     ENCODING '<replaceable class="parameter">encoding_name</replaceable>'
     LOG_VERBOSITY <replaceable class="parameter">verbosity</replaceable>
@@ -395,11 +396,13 @@ COPY { <replaceable class="parameter">table_name</replaceable> [ ( <replaceable
       input value into its data type.
       An <replaceable class="parameter">error_action</replaceable> value of
       <literal>stop</literal> means fail the command, while
-      <literal>ignore</literal> means discard the input row and continue with the next one.
+      <literal>ignore</literal> means discard the input row and continue with the next one,
+      <literal>table</literal> means <command>COPY</command> insert error related information to <replaceable class="parameter">error_saving_table</replaceable>
+      and continue with the next one.
       The default is <literal>stop</literal>.
      </para>
      <para>
-      The <literal>ignore</literal> option is applicable only for <command>COPY FROM</command>
+      The <literal>ignore</literal> and <literal>table</literal> option is applicable only for <command>COPY FROM</command>
       when the <literal>FORMAT</literal> is <literal>text</literal> or <literal>csv</literal>.
      </para>
      <para>
@@ -463,6 +466,117 @@ COPY { <replaceable class="parameter">table_name</replaceable> [ ( <replaceable
     </listitem>
    </varlistentry>
 
+   <varlistentry>
+    <term><literal>TABLE</literal></term>
+    <listitem>
+      <para>
+        Save error context details to the table <replaceable class="parameter">error_saving_table</replaceable>
+        when errors occur during the conversion of a column's input value into its data type in a <command>COPY FROM</command> operation.
+      </para>
+
+      <para>
+        The <replaceable class="parameter">error_saving_table</replaceable> must exist in the current database
+        and share the same schema as the destination table of the <command>COPY FROM</command> operation.
+        This option is allowed only in <command>COPY FROM</command> and
+        <literal>ON_ERROR</literal> is specified with <literal>TABLE</literal>.
+
+        The user performing the <command>COPY FROM</command>
+        operation must have <literal>INSERT</literal> privileges for all columns
+        in the <replaceable class="parameter">error_saving_table</replaceable>.
+        If this option is not specified, the <literal>ON_ERROR</literal> parameter cannot be set to <literal>TABLE</literal>.
+      </para>
+
+   <para>
+    If table <replaceable class="parameter">error_saving_table</replaceable> does meet the following definition
+    (column ordinal position should be the same as the below table), an error will be raised.
+
+<informaltable>
+    <tgroup cols="3">
+     <thead>
+      <row>
+       <entry>Column name</entry>
+       <entry>Data type</entry>
+       <entry>Description</entry>
+      </row>
+     </thead>
+
+      <tbody>
+       <row>
+       <entry> <literal>userid</literal> </entry>
+       <entry><type>oid</type></entry>
+       <entry>The user generated the error.
+       Reference <link linkend="catalog-pg-authid"><structname>pg_authid</structname></link>.<structfield>oid</structfield>,
+       however there is no hard dependency with catalog <literal>pg_authid</literal>.
+       If the corresponding row on <literal>pg_authid</literal> is deleted, this value becomes stale.
+    </entry>
+       </row>
+
+       <row>
+       <entry> <literal>copy_tbl</literal> </entry>
+       <entry><type>oid</type></entry>
+       <entry>The <command>COPY FROM</command> operation destination table oid.
+        Reference <link linkend="catalog-pg-class"><structname>pg_class</structname></link>.<structfield>oid</structfield>,
+        however there is no hard dependency with catalog <literal>pg_class</literal>.
+        If the corresponding row on <literal>pg_class</literal> is deleted, this value becomes stale.
+        </entry>
+       </row>
+
+       <row>
+       <entry> <literal>filename</literal> </entry>
+       <entry><type>text</type></entry>
+       <entry>The path name of the <command>COPY FROM</command> input</entry>
+       </row>
+
+       <row>
+       <entry> <literal>lineno</literal> </entry>
+       <entry><type>bigint</type></entry>
+       <entry>Line number where the error occurred, counting from 1</entry>
+       </row>
+
+       <row>
+       <entry> <literal>line</literal> </entry>
+       <entry><type>text</type></entry>
+       <entry>Raw content of the error occurred line</entry>
+       </row>
+
+       <row>
+       <entry> <literal>colname</literal> </entry>
+       <entry><type>text</type></entry>
+       <entry>Field where the error occurred</entry>
+       </row>
+
+       <row>
+       <entry> <literal>raw_field_value</literal> </entry>
+       <entry><type>text</type></entry>
+       <entry>Raw content of the error occurred field</entry>
+       </row>
+
+       <row>
+       <entry> <literal>err_message </literal> </entry>
+       <entry><type>text</type></entry>
+       <entry>The error message</entry>
+       </row>
+
+       <row>
+       <entry> <literal>err_detail</literal> </entry>
+       <entry><type>text</type></entry>
+       <entry>Detailed error message </entry>
+       </row>
+
+       <row>
+       <entry> <literal>errorcode </literal> </entry>
+       <entry><type>text</type></entry>
+       <entry>The error code </entry>
+       </row>
+
+      </tbody>
+     </tgroup>
+   </informaltable>
+
+     </para>
+    </listitem>
+   </varlistentry>
+
    <varlistentry>
     <term><literal>WHERE</literal></term>
     <listitem>
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index 2d98ecf3f4..f159502a9b 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -410,6 +410,8 @@ defGetCopyOnErrorChoice(DefElem *def, ParseState *pstate, bool is_from)
 	if (pg_strcasecmp(sval, "ignore") == 0)
 		return COPY_ON_ERROR_IGNORE;
 
+	if (pg_strcasecmp(sval, "table") == 0)
+		return COPY_ON_ERROR_TABLE;
 	ereport(ERROR,
 			(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
 	/*- translator: first %s is the name of a COPY option, e.g. ON_ERROR */
@@ -502,6 +504,7 @@ ProcessCopyOptions(ParseState *pstate,
 	bool		freeze_specified = false;
 	bool		header_specified = false;
 	bool		on_error_specified = false;
+	bool		on_error_tbl_specified = false;
 	bool		log_verbosity_specified = false;
 	bool		reject_limit_specified = false;
 	ListCell   *option;
@@ -677,6 +680,14 @@ ProcessCopyOptions(ParseState *pstate,
 			reject_limit_specified = true;
 			opts_out->reject_limit = defGetCopyRejectLimitOption(defel);
 		}
+		else if (strcmp(defel->defname, "table") == 0)
+		{
+			if (on_error_tbl_specified)
+				errorConflictingDefElem(defel, pstate);
+			on_error_tbl_specified = true;
+
+			opts_out->on_error_tbl = defGetString(defel);
+		}
 		else
 			ereport(ERROR,
 					(errcode(ERRCODE_SYNTAX_ERROR),
@@ -685,6 +696,26 @@ ProcessCopyOptions(ParseState *pstate,
 					 parser_errposition(pstate, defel->location)));
 	}
 
+	if (opts_out->on_error == COPY_ON_ERROR_TABLE && opts_out->on_error_tbl == NULL)
+		ereport(ERROR,
+				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+				 errmsg("COPY %s \"table\" requires a custom specified error saving table", "ON_ERROR"),
+				 errhint("You need also specify \"TABLE\" option.")));
+
+	if (opts_out->on_error == COPY_ON_ERROR_TABLE && opts_out->reject_limit)
+		ereport(ERROR,
+				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+				 errmsg("cannot specify %s option when %s option is specified as \"table\"", "REJECT_LIMIT", "ON_ERROR")));
+
+	if (opts_out->on_error == COPY_ON_ERROR_TABLE && opts_out->log_verbosity == COPY_LOG_VERBOSITY_VERBOSE)
+		ereport(ERROR,
+				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+				 errmsg("cannot specify %s as \"verbose\" when %s option is specified as \"table\"", "log_verbosity", "ON_ERROR")));
+
+	if (opts_out->on_error != COPY_ON_ERROR_TABLE && opts_out->on_error_tbl != NULL)
+		ereport(ERROR,
+				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+				 errmsg("COPY \"TABLE\" option can only be used when %s option is specified as \"table\"", "ON_ERROR")));
 	/*
 	 * Check for incompatible options (must do these three before inserting
 	 * defaults)
diff --git a/src/backend/commands/copyfrom.c b/src/backend/commands/copyfrom.c
index 4d52c93c30..376d3f04c9 100644
--- a/src/backend/commands/copyfrom.c
+++ b/src/backend/commands/copyfrom.c
@@ -28,6 +28,7 @@
 #include "access/tableam.h"
 #include "access/xact.h"
 #include "catalog/namespace.h"
+#include "catalog/pg_type.h"
 #include "commands/copy.h"
 #include "commands/copyfrom_internal.h"
 #include "commands/progress.h"
@@ -44,7 +45,10 @@
 #include "pgstat.h"
 #include "rewrite/rewriteHandler.h"
 #include "storage/fd.h"
+#include "storage/lmgr.h"
 #include "tcop/tcopprot.h"
+#include "utils/builtins.h"
+#include "utils/fmgroids.h"
 #include "utils/lsyscache.h"
 #include "utils/memutils.h"
 #include "utils/portal.h"
@@ -1029,6 +1033,16 @@ CopyFrom(CopyFromState cstate)
 			continue;
 		}
 
+		if (cstate->opts.on_error == COPY_ON_ERROR_TABLE &&
+			cstate->escontext->error_occurred)
+		{
+			cstate->escontext->error_occurred = false;
+			cstate->escontext->details_wanted = true;
+			memset(cstate->escontext->error_data, 0, sizeof(ErrorData));
+			/* Repeat NextCopyFrom() until no soft error occurs */
+			continue;
+		}
+
 		ExecStoreVirtualTuple(myslot);
 
 		/*
@@ -1321,14 +1335,31 @@ CopyFrom(CopyFromState cstate)
 	/* Done, clean up */
 	error_context_stack = errcallback.previous;
 
-	if (cstate->opts.on_error != COPY_ON_ERROR_STOP &&
-		cstate->num_errors > 0 &&
+	if (cstate->num_errors > 0 &&
 		cstate->opts.log_verbosity >= COPY_LOG_VERBOSITY_DEFAULT)
-		ereport(NOTICE,
-				errmsg_plural("%llu row was skipped due to data type incompatibility",
-							  "%llu rows were skipped due to data type incompatibility",
-							  (unsigned long long) cstate->num_errors,
-							  (unsigned long long) cstate->num_errors));
+	{
+		if(cstate->opts.on_error == COPY_ON_ERROR_IGNORE)
+			ereport(NOTICE,
+					errmsg_plural("%llu row was skipped due to data type incompatibility",
+								"%llu rows were skipped due to data type incompatibility",
+								(unsigned long long) cstate->num_errors,
+								(unsigned long long) cstate->num_errors));
+		else if(cstate->opts.on_error == COPY_ON_ERROR_TABLE)
+			ereport(NOTICE,
+					errmsg_plural("%llu row was saved to table \"%s\" due to data type incompatibility",
+								  "%llu rows were saved to table \"%s\" due to data type incompatibility",
+								  (unsigned long long) cstate->num_errors,
+								  (unsigned long long) cstate->num_errors,
+								  RelationGetRelationName(cstate->error_saving_rel)));
+	}
+
+	/*
+	 * similar to commit a9cf48a
+	 * (https://postgr.es/m/7bcfc39d4176faf85ab317d0c26786953646a411.ca...@cybertec.at)
+	 * in COPY FROM keep error saving table locks until the transaction end.
+	*/
+	if (cstate->error_saving_rel != NULL)
+		table_close(cstate->error_saving_rel, NoLock);
 
 	if (bistate != NULL)
 		FreeBulkInsertState(bistate);
@@ -1483,6 +1514,164 @@ BeginCopyFrom(ParseState *pstate,
 	else
 		cstate->escontext = NULL;
 
+	if (cstate->opts.on_error == COPY_ON_ERROR_TABLE)
+	{
+		const char *copy_nspname;
+		Datum		ins_prev;
+		Oid			err_tbl_oid;
+		Oid			copy_nspoid;
+		bool		on_error_tbl_ok = true;
+		Relation	arel;
+		ScanKeyData akey;
+		SysScanDesc ascan;
+		HeapTuple	atup;
+		Form_pg_attribute attForm;
+		int			maxattnum = 0;
+
+		Assert(cstate->escontext != NULL);
+		Assert(cstate->opts.on_error_tbl != NULL);
+
+		copy_nspname = get_namespace_name(RelationGetNamespace(cstate->rel));
+		copy_nspoid = get_namespace_oid(copy_nspname, false);
+		err_tbl_oid = get_relname_relid(cstate->opts.on_error_tbl, copy_nspoid);
+
+		if (!OidIsValid(err_tbl_oid))
+			ereport(ERROR,
+					(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+					 errmsg("error saving table \"%s\".\"%s\" does not exist",
+							copy_nspname, cstate->opts.on_error_tbl)));
+
+		/* error saving table must be a normal realtion kind */
+		if (get_rel_relkind(err_tbl_oid) != RELKIND_RELATION)
+			ereport(ERROR,
+					(errcode(ERRCODE_WRONG_OBJECT_TYPE),
+					 errmsg("COPY %s cannot use relation \"%s\" for error saving",
+							"ON_ERROR", cstate->opts.on_error_tbl),
+					 errdetail_relkind_not_supported(get_rel_relkind(err_tbl_oid))));
+
+		/* current user should have INSERT privilege on error_saving table */
+		ins_prev = DirectFunctionCall3(has_table_privilege_id_id,
+										ObjectIdGetDatum(GetUserId()),
+										ObjectIdGetDatum(err_tbl_oid),
+										CStringGetTextDatum("INSERT"));
+		if (!DatumGetBool(ins_prev))
+			ereport(ERROR,
+						(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+						 errmsg("permission denied to set table \"%s\".\"%s\" for COPY FROM error saving",
+								 copy_nspname, cstate->opts.on_error_tbl),
+						 errhint("Ensure current user have enough privilege on \"%s\".\"%s\" for COPY FROM error saving",
+								  copy_nspname, cstate->opts.on_error_tbl)));
+
+		/*
+		 * we may insert tuples to error-saving table, to do that we need first
+		 * check it's lock situation. If it is already under heavy lock, then
+		 * our COPY operation would be stuck. instead of let COPY stuck, just
+		 * error report the error-saving table is in heavy lock.
+		*/
+		if (!ConditionalLockRelationOid(err_tbl_oid, RowExclusiveLock))
+			ereport(ERROR,
+					(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+					 errmsg("table \"%s\".\"%s\" was locked, cannot be used for error saving",
+							 copy_nspname, cstate->opts.on_error_tbl)));
+		cstate->error_saving_rel = table_open(err_tbl_oid, NoLock);
+
+		/*
+		 * can this error saving table (cstate->error_saving_rel) be used for
+		 * error saving or not? for that we need to check the table's (column
+		 * name, column data types, number of column)
+		 *
+		*/
+		arel = table_open(AttributeRelationId, AccessShareLock);
+		ScanKeyInit(&akey,
+					Anum_pg_attribute_attrelid,
+					BTEqualStrategyNumber, F_OIDEQ,
+					ObjectIdGetDatum(err_tbl_oid));
+
+		ascan = systable_beginscan(arel, AttributeRelidNumIndexId, true,
+								   SnapshotSelf, 1, &akey);
+		while (HeapTupleIsValid(atup = systable_getnext(ascan)))
+		{
+			attForm = (Form_pg_attribute) GETSTRUCT(atup);
+			if (attForm->attnum < 1 || attForm->attisdropped)
+				continue;
+
+			if (maxattnum <= attForm->attnum)
+				maxattnum = attForm->attnum;
+
+			switch (attForm->attnum)
+			{
+				case 1:
+					if((attForm->atttypid != OIDOID) ||
+						(strcmp(NameStr(attForm->attname), "userid") != 0))
+						on_error_tbl_ok = false;
+					break;
+				case 2:
+					if ((attForm->atttypid != OIDOID) ||
+						(strcmp(NameStr(attForm->attname), "copy_tbl") != 0))
+						on_error_tbl_ok = false;
+					break;
+				case 3:
+					if ((attForm->atttypid != TEXTOID) ||
+						(strcmp(NameStr(attForm->attname), "filename") != 0))
+						on_error_tbl_ok = false;
+					break;
+				case 4:
+					if ((attForm->atttypid != INT8OID) ||
+						(strcmp(NameStr(attForm->attname), "lineno") != 0))
+						on_error_tbl_ok = false;
+					break;
+				case 5:
+					if ((attForm->atttypid != TEXTOID) ||
+						(strcmp(NameStr(attForm->attname), "line") != 0))
+						on_error_tbl_ok = false;
+					break;
+				case 6:
+					if ((attForm->atttypid != TEXTOID) ||
+						(strcmp(NameStr(attForm->attname), "colname") != 0))
+						on_error_tbl_ok = false;
+					break;
+				case 7:
+					if ((attForm->atttypid != TEXTOID) ||
+						(strcmp(NameStr(attForm->attname), "raw_field_value") != 0))
+						on_error_tbl_ok = false;
+					break;
+				case 8:
+					if ((attForm->atttypid != TEXTOID) ||
+						(strcmp(NameStr(attForm->attname), "err_message") != 0))
+						on_error_tbl_ok = false;
+					break;
+				case 9:
+					if ((attForm->atttypid != TEXTOID) ||
+						(strcmp(NameStr(attForm->attname), "err_detail") != 0))
+						on_error_tbl_ok = false;
+					break;
+				case 10:
+					if ((attForm->atttypid != TEXTOID) ||
+						(strcmp(NameStr(attForm->attname), "errorcode") != 0))
+						on_error_tbl_ok = false;
+					break;
+				default:
+					on_error_tbl_ok = false;
+					break;
+			}
+		}
+		if (maxattnum != 10)
+			on_error_tbl_ok = false;
+
+		systable_endscan(ascan);
+		table_close(arel, AccessShareLock);
+
+		if(!on_error_tbl_ok)
+			ereport(ERROR,
+					(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+					 errmsg("table \"%s\".\"%s\" cannot be used for COPY FROM error saving",
+							copy_nspname, cstate->opts.on_error_tbl),
+					 errdetail("Table \"%s\".\"%s\" data definition cannot be used for error saving",
+							   copy_nspname, cstate->opts.on_error_tbl)));
+
+		cstate->escontext->details_wanted = true;
+	}
+
 	/* Convert FORCE_NULL name list to per-column flags, check validity */
 	cstate->opts.force_null_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
 	if (cstate->opts.force_null_all)
diff --git a/src/backend/commands/copyfromparse.c b/src/backend/commands/copyfromparse.c
index d1d43b53d8..1f1bf24788 100644
--- a/src/backend/commands/copyfromparse.c
+++ b/src/backend/commands/copyfromparse.c
@@ -66,6 +66,7 @@
 #include "commands/copyfrom_internal.h"
 #include "commands/progress.h"
 #include "executor/executor.h"
+#include "access/heapam.h"
 #include "libpq/libpq.h"
 #include "libpq/pqformat.h"
 #include "mb/pg_wchar.h"
@@ -959,7 +960,51 @@ NextCopyFrom(CopyFromState cstate, ExprContext *econtext,
 											&values[m]))
 			{
 				Assert(cstate->opts.on_error != COPY_ON_ERROR_STOP);
+				Assert(cstate->escontext->error_occurred);
 
+				if (cstate->opts.on_error == COPY_ON_ERROR_TABLE)
+				{
+					/*
+					 * we mostly use ErrorSaveContext's info to form a tuple and
+					 * insert it to the error saving table. we already acquired
+					 * lock on error_saving_rel in BeginCopyFrom.
+					*/
+					#define ERROR_TBL_COLUMNS	10
+					HeapTuple	on_error_tup;
+					TupleDesc	on_error_tupDesc;
+					char	*err_detail;
+					char	*err_code;
+					Datum	t_values[ERROR_TBL_COLUMNS] = {0};
+					bool	t_isnull[ERROR_TBL_COLUMNS] = {0};
+					int		j = 0;
+
+					Assert(cstate->rel != NULL);
+					t_values[j++] = ObjectIdGetDatum(GetUserId());
+					t_values[j++] = ObjectIdGetDatum(cstate->rel->rd_rel->oid);
+					t_values[j++] = CStringGetTextDatum(cstate->filename ? cstate->filename : "STDIN");
+					t_values[j++] = Int64GetDatum((long long) cstate->cur_lineno);
+					t_values[j++] = CStringGetTextDatum(cstate->line_buf.data);
+					t_values[j++] = CStringGetTextDatum(cstate->cur_attname);
+					t_values[j++] = CStringGetTextDatum(string);
+					t_values[j++] = CStringGetTextDatum(cstate->escontext->error_data->message);
+
+					if (!cstate->escontext->error_data->detail)
+						err_detail = NULL;
+					else
+						err_detail = cstate->escontext->error_data->detail;
+					t_values[j]   = err_detail ? CStringGetTextDatum(err_detail) : (Datum) 0;
+					t_isnull[j++] = err_detail ? false : true;
+
+					err_code = unpack_sql_state(cstate->escontext->error_data->sqlerrcode);
+					t_values[j++] = CStringGetTextDatum(err_code);
+
+					Assert(j == ERROR_TBL_COLUMNS);
+					#undef ERROR_TBL_COLUMNS
+
+					on_error_tupDesc =  cstate->error_saving_rel->rd_att;
+					on_error_tup = heap_form_tuple(on_error_tupDesc, t_values, t_isnull);
+					simple_heap_insert(cstate->error_saving_rel, on_error_tup);
+				}
 				cstate->num_errors++;
 
 				if (cstate->opts.log_verbosity == COPY_LOG_VERBOSITY_VERBOSE)
diff --git a/src/include/commands/copy.h b/src/include/commands/copy.h
index 4002a7f538..59365c81da 100644
--- a/src/include/commands/copy.h
+++ b/src/include/commands/copy.h
@@ -38,6 +38,7 @@ typedef enum CopyOnErrorChoice
 {
 	COPY_ON_ERROR_STOP = 0,		/* immediately throw errors, default */
 	COPY_ON_ERROR_IGNORE,		/* ignore errors */
+	COPY_ON_ERROR_TABLE,		/* saving errors info to table */
 } CopyOnErrorChoice;
 
 /*
@@ -86,6 +87,7 @@ typedef struct CopyFormatOptions
 	CopyOnErrorChoice on_error; /* what to do when error happened */
 	CopyLogVerbosityChoice log_verbosity;	/* verbosity of logged messages */
 	int64		reject_limit;	/* maximum tolerable number of errors */
+	char		*on_error_tbl; /* on error, save error info to the table, table name */
 	List	   *convert_select; /* list of column names (can be NIL) */
 } CopyFormatOptions;
 
diff --git a/src/include/commands/copyfrom_internal.h b/src/include/commands/copyfrom_internal.h
index cad52fcc78..779f86d1ce 100644
--- a/src/include/commands/copyfrom_internal.h
+++ b/src/include/commands/copyfrom_internal.h
@@ -70,6 +70,7 @@ typedef struct CopyFromStateData
 
 	/* parameters from the COPY command */
 	Relation	rel;			/* relation to copy from */
+	Relation	error_saving_rel; /* relation for copy from error saving */
 	List	   *attnumlist;		/* integer list of attnums to copy */
 	char	   *filename;		/* filename, or NULL for STDIN */
 	bool		is_program;		/* is 'filename' a program to popen? */
diff --git a/src/test/regress/expected/copy2.out b/src/test/regress/expected/copy2.out
index 64ea33aeae..6a5290b53f 100644
--- a/src/test/regress/expected/copy2.out
+++ b/src/test/regress/expected/copy2.out
@@ -813,6 +813,110 @@ ERROR:  skipped more than REJECT_LIMIT (3) rows due to data type incompatibility
 CONTEXT:  COPY check_ign_err, line 5, column n: ""
 COPY check_ign_err FROM STDIN WITH (on_error ignore, reject_limit 4);
 NOTICE:  4 rows were skipped due to data type incompatibility
+create table err_tbl(
+  userid oid,   -- the user oid while copy generated this entry
+  copy_tbl oid, --copy table
+  filename text,
+  lineno  bigint,
+  line    text,
+  colname text,
+  raw_field_value text,
+  err_message text,
+  err_detail text,
+  errorcode text
+);
+--cannot use for error saving.
+create table err_tbl_1(
+userid    oid,
+copy_tbl  oid,
+filename  text,
+lineno    bigint,
+line      text,
+colname   text,
+raw_field_value text,
+err_message     text,
+err_detail      text
+);
+--cannot use for error saving.
+create table err_tbl_2(
+  userid oid, copy_tbl oid, filename text, lineno bigint,line text,
+  colname text, raw_field_value text, err_message text,
+  err_detail text,
+  errorcode text,
+  errorcode1 text
+);
+create table t_copy_tbl(a int, b int, c int);
+create view s1 as select 1 as a;
+----invalid options, the below all should  fails
+COPY t_copy_tbl FROM STDIN WITH (on_error 'table');
+ERROR:  COPY ON_ERROR "table" requires a custom specified error saving table
+HINT:  You need also specify "TABLE" option.
+COPY t_copy_tbl FROM STDIN WITH (table err_tbl);
+ERROR:  COPY "TABLE" option can only be used when ON_ERROR option is specified as "table"
+COPY t_copy_tbl TO STDIN WITH (on_error 'table');
+ERROR:  COPY ON_ERROR cannot be used with COPY TO
+LINE 1: COPY t_copy_tbl TO STDIN WITH (on_error 'table');
+                                       ^
+COPY t_copy_tbl(a,b) FROM STDIN WITH (on_error 'table', reject_limit 10, table err_tbl);
+ERROR:  cannot specify REJECT_LIMIT option when ON_ERROR option is specified as "table"
+COPY t_copy_tbl(a,b) FROM STDIN WITH (on_error 'table', log_verbosity verbose, table err_tbl);
+ERROR:  cannot specify log_verbosity as "verbose" when ON_ERROR option is specified as "table"
+COPY t_copy_tbl(a,b) FROM STDIN WITH (on_error 'table', table not_exists);
+ERROR:  error saving table "public"."not_exists" does not exist
+COPY t_copy_tbl(a) FROM STDIN WITH (on_error 'table', table s1);
+ERROR:  COPY ON_ERROR cannot use relation "s1" for error saving
+DETAIL:  This operation is not supported for views.
+----invalid options, the above all should fails
+drop view s1;
+--should fail. err_tbl_1 does not meet criteria
+COPY t_copy_tbl(a,b) FROM STDIN WITH (on_error 'table', table err_tbl_1);
+ERROR:  table "public"."err_tbl_1" cannot be used for COPY FROM error saving
+DETAIL:  Table "public"."err_tbl_1" data definition cannot be used for error saving
+--should fail. err_tbl_2 does not meet criteria
+COPY t_copy_tbl(a,b) FROM STDIN WITH (on_error 'table', table err_tbl_2);
+ERROR:  table "public"."err_tbl_2" cannot be used for COPY FROM error saving
+DETAIL:  Table "public"."err_tbl_2" data definition cannot be used for error saving
+--should fail, copied data have extra columns
+COPY t_copy_tbl(a,b) FROM STDIN WITH (DELIMITER ',', on_error 'table', table err_tbl);
+ERROR:  extra data after last expected column
+CONTEXT:  COPY t_copy_tbl, line 1: "1,2,3,4"
+--should fail, copied data have less columns
+COPY t_copy_tbl(a,b) FROM STDIN WITH (DELIMITER ',', on_error 'table', table err_tbl);
+ERROR:  extra data after last expected column
+CONTEXT:  COPY t_copy_tbl, line 1: "1,2,"
+--ok cases.
+COPY t_copy_tbl FROM STDIN WITH (DELIMITER ',', on_error 'table', table err_tbl);
+NOTICE:  4 rows were saved to table "err_tbl" due to data type incompatibility
+--should fail. lack privilege
+begin;
+create user regress_user20;
+grant insert(userid,copy_tbl,filename,lineno,line) on table err_tbl to regress_user20;
+grant insert on table t_copy_tbl to regress_user20;
+set role regress_user20;
+COPY t_copy_tbl FROM STDIN WITH (delimiter ',', on_error 'table', table err_tbl);
+ERROR:  permission denied to set table "public"."err_tbl" for COPY FROM error saving
+HINT:  Ensure current user have enough privilege on "public"."err_tbl" for COPY FROM error saving
+ROLLBACK;
+select	pg_class.relname as copy_destination
+        ,filename,lineno ,line
+        ,colname,raw_field_value,err_message
+        ,err_detail,errorcode
+from err_tbl join pg_class on copy_tbl = pg_class.oid;
+ copy_destination | filename | lineno |           line           | colname |   raw_field_value   |                         err_message                          | err_detail | errorcode 
+------------------+----------+--------+--------------------------+---------+---------------------+--------------------------------------------------------------+------------+-----------
+ t_copy_tbl       | STDIN    |      1 | 1,2,a                    | c       | a                   | invalid input syntax for type integer: "a"                   |            | 22P02
+ t_copy_tbl       | STDIN    |      3 | 1,_junk,test             | b       | _junk               | invalid input syntax for type integer: "_junk"               |            | 22P02
+ t_copy_tbl       | STDIN    |      4 | cola,colb,colc           | a       | cola                | invalid input syntax for type integer: "cola"                |            | 22P02
+ t_copy_tbl       | STDIN    |      6 | 1,11,4238679732489879879 | c       | 4238679732489879879 | value "4238679732489879879" is out of range for type integer |            | 22003
+(4 rows)
+
+select * from t_copy_tbl;
+ a | b | c 
+---+---+---
+ 1 | 2 | 3
+ 4 | 5 | 6
+(2 rows)
+
 -- clean up
 DROP TABLE forcetest;
 DROP TABLE vistest;
@@ -831,6 +935,10 @@ DROP TABLE check_ign_err;
 DROP TABLE check_ign_err2;
 DROP DOMAIN dcheck_ign_err2;
 DROP TABLE hard_err;
+DROP TABLE err_tbl;
+DROP TABLE err_tbl_1;
+DROP TABLE err_tbl_2;
+DROP TABLE t_copy_tbl;
 --
 -- COPY FROM ... DEFAULT
 --
diff --git a/src/test/regress/sql/copy2.sql b/src/test/regress/sql/copy2.sql
index 45273557ce..aa5fc12cb3 100644
--- a/src/test/regress/sql/copy2.sql
+++ b/src/test/regress/sql/copy2.sql
@@ -588,6 +588,93 @@ a	{7}	7
 10	{10}	10
 \.
 
+create table err_tbl(
+  userid oid,   -- the user oid while copy generated this entry
+  copy_tbl oid, --copy table
+  filename text,
+  lineno  bigint,
+  line    text,
+  colname text,
+  raw_field_value text,
+  err_message text,
+  err_detail text,
+  errorcode text
+);
+--cannot use for error saving.
+create table err_tbl_1(
+userid    oid,
+copy_tbl  oid,
+filename  text,
+lineno    bigint,
+line      text,
+colname   text,
+raw_field_value text,
+err_message     text,
+err_detail      text
+);
+--cannot use for error saving.
+create table err_tbl_2(
+  userid oid, copy_tbl oid, filename text, lineno bigint,line text,
+  colname text, raw_field_value text, err_message text,
+  err_detail text,
+  errorcode text,
+  errorcode1 text
+);
+create table t_copy_tbl(a int, b int, c int);
+create view s1 as select 1 as a;
+
+----invalid options, the below all should  fails
+COPY t_copy_tbl FROM STDIN WITH (on_error 'table');
+COPY t_copy_tbl FROM STDIN WITH (table err_tbl);
+COPY t_copy_tbl TO STDIN WITH (on_error 'table');
+COPY t_copy_tbl(a,b) FROM STDIN WITH (on_error 'table', reject_limit 10, table err_tbl);
+COPY t_copy_tbl(a,b) FROM STDIN WITH (on_error 'table', log_verbosity verbose, table err_tbl);
+COPY t_copy_tbl(a,b) FROM STDIN WITH (on_error 'table', table not_exists);
+COPY t_copy_tbl(a) FROM STDIN WITH (on_error 'table', table s1);
+----invalid options, the above all should fails
+
+drop view s1;
+--should fail. err_tbl_1 does not meet criteria
+COPY t_copy_tbl(a,b) FROM STDIN WITH (on_error 'table', table err_tbl_1);
+
+--should fail. err_tbl_2 does not meet criteria
+COPY t_copy_tbl(a,b) FROM STDIN WITH (on_error 'table', table err_tbl_2);
+
+--should fail, copied data have extra columns
+COPY t_copy_tbl(a,b) FROM STDIN WITH (DELIMITER ',', on_error 'table', table err_tbl);
+1,2,3,4
+\.
+
+--should fail, copied data have less columns
+COPY t_copy_tbl(a,b) FROM STDIN WITH (DELIMITER ',', on_error 'table', table err_tbl);
+1,2,
+\.
+
+--ok cases.
+COPY t_copy_tbl FROM STDIN WITH (DELIMITER ',', on_error 'table', table err_tbl);
+1,2,a
+1,2,3
+1,_junk,test
+cola,colb,colc
+4,5,6
+1,11,4238679732489879879
+\.
+
+--should fail. lack privilege
+begin;
+create user regress_user20;
+grant insert(userid,copy_tbl,filename,lineno,line) on table err_tbl to regress_user20;
+grant insert on table t_copy_tbl to regress_user20;
+set role regress_user20;
+COPY t_copy_tbl FROM STDIN WITH (delimiter ',', on_error 'table', table err_tbl);
+ROLLBACK;
+
+select	pg_class.relname as copy_destination
+        ,filename,lineno ,line
+        ,colname,raw_field_value,err_message
+        ,err_detail,errorcode
+from err_tbl join pg_class on copy_tbl = pg_class.oid;
+select * from t_copy_tbl;
 -- clean up
 DROP TABLE forcetest;
 DROP TABLE vistest;
@@ -606,6 +693,10 @@ DROP TABLE check_ign_err;
 DROP TABLE check_ign_err2;
 DROP DOMAIN dcheck_ign_err2;
 DROP TABLE hard_err;
+DROP TABLE err_tbl;
+DROP TABLE err_tbl_1;
+DROP TABLE err_tbl_2;
+DROP TABLE t_copy_tbl;
 
 --
 -- COPY FROM ... DEFAULT
-- 
2.34.1

Reply via email to