On Tue, Dec 19, 2023 at 9:14 AM Masahiko Sawada <sawada.m...@gmail.com> wrote:
>
>
> The error table hub idea is still unclear to me. I assume that there
> are error tables at least on each database. And an error table can
> have error data that happened during COPY FROM, including malformed
> lines. Do the error tables grow without bounds and the users have to
> delete rows at some point? If so, who can do that? How can we achieve
> that the users can see only errored rows they generated? And the issue
> with logical replication also needs to be resolved. Anyway, if we go
> this direction, we need to discuss the overall design.
>
> Regards,
>
> --
> Masahiko Sawada
> Amazon Web Services: https://aws.amazon.com

Please check my latest attached POC.
Main content is to build spi query, execute the spi query, regress
test and regress output.

copy_errors one per schema.
foo.copy_errors will be owned by the schema: foo owner.

if you can insert to a table in that specific schema let's say foo,
then you will get privilege to INSERT/DELETE/SELECT
to foo.copy_errors.
If you are not a superuser, you are only allowed to do
INSERT/DELETE/SELECT on foo.copy_errors rows where USERID =
current_user::regrole::oid.
This is done via row level security.

Since foo.copy_errors is mainly INSERT operations, if copy_errors grow
too much, that means your source file has many errors, it will take a
very long time to finish the whole COPY. maybe we can capture how many
errors encountered in another client.

I don't know how to deal with logic replication. looking for ideas.
From 9affaf6d94eb4afe26fc7181e38e53eed14e0216 Mon Sep 17 00:00:00 2001
From: pgaddict <jian.universal...@gmail.com>
Date: Wed, 20 Dec 2023 11:26:25 +0800
Subject: [PATCH v12 1/1] Make COPY FROM more error tolerant

Currently COPY FROM has 3 types of error while processing the source file.
* extra data after last expected column
* missing data for column \"%s\"
* data type conversion error.

Instead of throwing errors while copying, save_error specifier will
save errors to table copy_errors for all the copy from operation in the same schema.

We check the existing copy_error table definition by column name and column data type.
if table already exists and meets the criteria then errors will to it
if the table does not exist, then create one.

copy_errors is per schema, it's owned by schema's owner.
for non-superusers, if you can do insert in that schema, then you can insert to copy_errors,
but you are only allowed to select/delet your own rows, which is judged by current_user
with copy_error's userid column. Priviledge restirction is implmented via ROW LEVEL SECURITY.

Only works for COPY FROM, non-BINARY mode.

While copying, if error never happened, error saving  table will be dropped at the ending of COPY FROM.
If the error saving table exists, meaning at least once COPY FROM errors has happened,
then all the future errors will be saved to that table.
We save the error related meta info to error saving table using SPI,
that is construct a query string, then execute the query.
---
 contrib/file_fdw/file_fdw.c              |   4 +-
 doc/src/sgml/ref/copy.sgml               | 122 ++++++++++++++-
 src/backend/commands/copy.c              |  12 ++
 src/backend/commands/copyfrom.c          | 168 ++++++++++++++++++++-
 src/backend/commands/copyfromparse.c     | 179 +++++++++++++++++++++--
 src/backend/parser/gram.y                |   8 +-
 src/bin/psql/tab-complete.c              |   3 +-
 src/include/commands/copy.h              |   3 +-
 src/include/commands/copyfrom_internal.h |   5 +
 src/include/parser/kwlist.h              |   1 +
 src/test/regress/expected/copy2.out      | 160 ++++++++++++++++++++
 src/test/regress/sql/copy2.sql           | 142 ++++++++++++++++++
 12 files changed, 787 insertions(+), 20 deletions(-)

diff --git a/contrib/file_fdw/file_fdw.c b/contrib/file_fdw/file_fdw.c
index 2189be8a..2d3eb34f 100644
--- a/contrib/file_fdw/file_fdw.c
+++ b/contrib/file_fdw/file_fdw.c
@@ -751,7 +751,7 @@ fileIterateForeignScan(ForeignScanState *node)
 	 */
 	oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
 	found = NextCopyFrom(festate->cstate, econtext,
-						 slot->tts_values, slot->tts_isnull);
+						 slot->tts_values, slot->tts_isnull, NULL);
 	if (found)
 		ExecStoreVirtualTuple(slot);
 
@@ -1183,7 +1183,7 @@ file_acquire_sample_rows(Relation onerel, int elevel,
 		MemoryContextReset(tupcontext);
 		MemoryContextSwitchTo(tupcontext);
 
-		found = NextCopyFrom(cstate, NULL, values, nulls);
+		found = NextCopyFrom(cstate, NULL, values, nulls, NULL);
 
 		MemoryContextSwitchTo(oldcontext);
 
diff --git a/doc/src/sgml/ref/copy.sgml b/doc/src/sgml/ref/copy.sgml
index 18ecc69c..3dbf70ee 100644
--- a/doc/src/sgml/ref/copy.sgml
+++ b/doc/src/sgml/ref/copy.sgml
@@ -44,6 +44,7 @@ COPY { <replaceable class="parameter">table_name</replaceable> [ ( <replaceable
     FORCE_NOT_NULL { ( <replaceable class="parameter">column_name</replaceable> [, ...] ) | * }
     FORCE_NULL { ( <replaceable class="parameter">column_name</replaceable> [, ...] ) | * }
     ENCODING '<replaceable class="parameter">encoding_name</replaceable>'
+    SAVE_ERROR [ <replaceable class="parameter">boolean</replaceable> ]
 </synopsis>
  </refsynopsisdiv>
 
@@ -411,6 +412,18 @@ WHERE <replaceable class="parameter">condition</replaceable>
     </listitem>
    </varlistentry>
 
+   <varlistentry>
+    <term><literal>SAVE_ERROR</literal></term>
+    <listitem>
+     <para>
+      Specifies that any data conversion errors while copying will automatically saved in table <literal>COPY_ERRORS</literal> and the <command>COPY FROM</command> operation will not be interrupted by conversion errors.
+      This option is not allowed when using <literal>binary</literal> format. Note that this
+      is only supported in current <command>COPY FROM</command> syntax.
+      If this option is omitted, any data type conversion errors will be raised immediately.
+     </para>
+    </listitem>
+   </varlistentry>
+
   </variablelist>
  </refsect1>
 
@@ -564,6 +577,7 @@ COPY <replaceable class="parameter">count</replaceable>
     amount to a considerable amount of wasted disk space if the failure
     happened well into a large copy operation. You might wish to invoke
     <command>VACUUM</command> to recover the wasted space.
+    To continue copying while skip conversion errors in a <command>COPY FROM</command>, you might wish to specify <literal>SAVE_ERROR</literal>.
    </para>
 
    <para>
@@ -572,6 +586,19 @@ COPY <replaceable class="parameter">count</replaceable>
     null strings to null values and unquoted null strings to empty strings.
    </para>
 
+   <para>
+
+    If the <literal>SAVE_ERROR</literal> option is specified and conversion errors occur while copying,
+    <productname>PostgreSQL</productname> will first check the table <literal>COPY_ERRORS</literal> existence, then save the conversion error related information to it.
+    If it does exist, but the actual table definition cannot use it to save the error information, an error is raised, <command>COPY FROM</command> operation stops.
+    If it does not exist, <productname>PostgreSQL</productname> will try to create it before doing the actual copy operation.
+    The table <literal>COPY_ERRORS</literal> owner is the current schema owner.
+    All the future errors related information generated while copying data to the same schema will automatically be saved to the same <literal>COPY_ERRORS</literal> table.
+    Copy conversion error is privileged information, non-superusers is only allowed to <literal>SELECT</literal>, <literal>DELETE</literal> or <literal>INSERT</literal> their own row in the <literal>COPY_ERRORS</literal> table.
+    Conversion errors include data type conversion failure, extra data or missing data in the source file.
+    <literal>COPY_ERRORS</literal> table detailed description listed in <xref linkend="copy-errors-table"/>.
+
+   </para>
  </refsect1>
 
  <refsect1>
@@ -588,7 +615,7 @@ COPY <replaceable class="parameter">count</replaceable>
     output function, or acceptable to the input function, of each
     attribute's data type.  The specified null string is used in
     place of columns that are null.
-    <command>COPY FROM</command> will raise an error if any line of the
+    By default, if <literal>SAVE_ERROR</literal> not specified, <command>COPY FROM</command> will raise an error if any line of the
     input file contains more or fewer columns than are expected.
    </para>
 
@@ -962,6 +989,99 @@ versions of <productname>PostgreSQL</productname>.
      check against somehow getting out of sync with the data.
     </para>
    </refsect3>
+
+   <refsect3>
+    <title> TABLE COPY_ERRORS </title>
+    <para>
+        If <literal>SAVE_ERROR</literal> specified, all the data type conversion errors while copying will automatically saved in <literal>COPY_ERRORS</literal>
+        <xref linkend="copy-errors-table"/> shows <literal>COPY_ERRORS</literal>  table's column name, data type, and description.
+    </para>
+
+   <table id="copy-errors-table">
+    <title>Error Saving table description </title>
+
+    <tgroup cols="3">
+     <thead>
+      <row>
+       <entry>Column name</entry>
+       <entry>Data type</entry>
+       <entry>Description</entry>
+      </row>
+     </thead>
+
+      <tbody>
+       <row>
+       <entry> <literal>userid</literal> </entry>
+       <entry><type>oid</type></entry>
+       <entry>The user generated the conversion error.
+       Refer <link linkend="catalog-pg-authid"><structname>pg_authid</structname></link>.<structfield>oid</structfield>.
+       There is no hard depenedency with <literal>pg_authid</literal>, if correspond <structfield>oid</structfield> deleted in <literal>pg_authid</literal>, it becomes stale.
+    </entry>
+       </row>
+
+       <row>
+       <entry> <literal>copy_destination</literal> </entry>
+       <entry><type>oid</type></entry>
+       <entry>The <command>COPY FROM</command> operation destination table oid.
+        Refer <link linkend="catalog-pg-class"><structname>pg_class</structname></link>.<structfield>oid</structfield>.
+        There is no hard depenedency with <literal>pg_class</literal> if correspond <structfield>oid</structfield> deleted in <literal>pg_class</literal>, it becomes stale.
+
+        </entry>
+       </row>
+
+       <row>
+       <entry> <literal>filename</literal> </entry>
+       <entry><type>text</type></entry>
+       <entry>The path name of the input filed</entry>
+       </row>
+
+       <row>
+       <entry> <literal>lineno</literal> </entry>
+       <entry><type>bigint</type></entry>
+       <entry>Line number where the error occurred, counting from 1</entry>
+       </row>
+
+       <row>
+       <entry> <literal>line</literal> </entry>
+       <entry><type>text</type></entry>
+       <entry>Raw content of the error occurred line</entry>
+       </row>
+
+       <row>
+       <entry> <literal>colname</literal> </entry>
+       <entry><type>text</type></entry>
+       <entry>Field where the error occurred</entry>
+       </row>
+
+       <row>
+       <entry> <literal>raw_field_value</literal> </entry>
+       <entry><type>text</type></entry>
+       <entry>Raw content of the error occurred field</entry>
+       </row>
+
+       <row>
+       <entry> <literal>err_message </literal> </entry>
+       <entry><type>text</type></entry>
+       <entry>The error message text </entry>
+       </row>
+
+       <row>
+       <entry> <literal>err_detail</literal> </entry>
+       <entry><type>text</type></entry>
+       <entry>Detailed error message </entry>
+       </row>
+
+       <row>
+       <entry> <literal>errorcode </literal> </entry>
+       <entry><type>text</type></entry>
+       <entry>The error code for the copying error</entry>
+       </row>
+
+      </tbody>
+     </tgroup>
+    </table>
+   </refsect3>
+
   </refsect2>
  </refsect1>
 
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index cfad47b5..bc4af10a 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -419,6 +419,7 @@ ProcessCopyOptions(ParseState *pstate,
 	bool		format_specified = false;
 	bool		freeze_specified = false;
 	bool		header_specified = false;
+	bool		save_error_specified = false;
 	ListCell   *option;
 
 	/* Support external use for option sanity checking */
@@ -458,6 +459,13 @@ ProcessCopyOptions(ParseState *pstate,
 			freeze_specified = true;
 			opts_out->freeze = defGetBoolean(defel);
 		}
+		else if (strcmp(defel->defname, "save_error") == 0)
+		{
+			if (save_error_specified)
+				errorConflictingDefElem(defel, pstate);
+			save_error_specified = true;
+			opts_out->save_error = defGetBoolean(defel);
+		}
 		else if (strcmp(defel->defname, "delimiter") == 0)
 		{
 			if (opts_out->delim)
@@ -598,6 +606,10 @@ ProcessCopyOptions(ParseState *pstate,
 				(errcode(ERRCODE_SYNTAX_ERROR),
 				 errmsg("cannot specify DEFAULT in BINARY mode")));
 
+	if (opts_out->binary && opts_out->save_error)
+		ereport(ERROR,
+				(errcode(ERRCODE_SYNTAX_ERROR),
+				 errmsg("cannot specify SAVE_ERROR in BINARY mode")));
 	/* Set defaults for omitted options */
 	if (!opts_out->delim)
 		opts_out->delim = opts_out->csv_mode ? "," : "\t";
diff --git a/src/backend/commands/copyfrom.c b/src/backend/commands/copyfrom.c
index f4861652..a84080b4 100644
--- a/src/backend/commands/copyfrom.c
+++ b/src/backend/commands/copyfrom.c
@@ -29,7 +29,9 @@
 #include "access/tableam.h"
 #include "access/xact.h"
 #include "access/xlog.h"
+#include "catalog/pg_authid.h"
 #include "catalog/namespace.h"
+#include "catalog/pg_namespace.h"
 #include "commands/copy.h"
 #include "commands/copyfrom_internal.h"
 #include "commands/progress.h"
@@ -38,6 +40,7 @@
 #include "executor/executor.h"
 #include "executor/nodeModifyTable.h"
 #include "executor/tuptable.h"
+#include "executor/spi.h"
 #include "foreign/fdwapi.h"
 #include "libpq/libpq.h"
 #include "libpq/pqformat.h"
@@ -52,6 +55,7 @@
 #include "utils/portal.h"
 #include "utils/rel.h"
 #include "utils/snapmgr.h"
+#include "utils/syscache.h"
 
 /*
  * No more than this many tuples per CopyMultiInsertBuffer
@@ -652,10 +656,12 @@ CopyFrom(CopyFromState cstate)
 	bool		has_before_insert_row_trig;
 	bool		has_instead_insert_row_trig;
 	bool		leafpart_use_multi_insert = false;
+	StringInfo	err_save_buf;
 
 	Assert(cstate->rel);
 	Assert(list_length(cstate->range_table) == 1);
-
+	if (cstate->opts.save_error)
+		Assert(cstate->escontext);
 	/*
 	 * The target must be a plain, foreign, or partitioned relation, or have
 	 * an INSTEAD OF INSERT row trigger.  (Currently, such triggers are only
@@ -952,6 +958,7 @@ CopyFrom(CopyFromState cstate)
 	errcallback.previous = error_context_stack;
 	error_context_stack = &errcallback;
 
+	err_save_buf = makeStringInfo();
 	for (;;)
 	{
 		TupleTableSlot *myslot;
@@ -989,9 +996,13 @@ CopyFrom(CopyFromState cstate)
 		ExecClearTuple(myslot);
 
 		/* Directly store the values/nulls array in the slot */
-		if (!NextCopyFrom(cstate, econtext, myslot->tts_values, myslot->tts_isnull))
+		if (!NextCopyFrom(cstate, econtext, myslot->tts_values, myslot->tts_isnull, err_save_buf))
 			break;
 
+		/* Soft error occured, skip this tuple. */
+		if (cstate->opts.save_error && cstate->line_error_occured)
+			continue;
+
 		ExecStoreVirtualTuple(myslot);
 
 		/*
@@ -1297,6 +1308,20 @@ CopyFrom(CopyFromState cstate)
 
 	ExecResetTupleTable(estate->es_tupleTable, false);
 
+	if (cstate->opts.save_error)
+	{
+		Assert(cstate->copy_errors_nspname);
+
+		if (cstate->error_rows_cnt > 0)
+		{
+			ereport(NOTICE,
+					errmsg("%llu rows were skipped because of conversion error."
+							" Skipped rows saved to table %s.copy_errors",
+							(unsigned long long) cstate->error_rows_cnt,
+							cstate->copy_errors_nspname));
+		}
+	}
+
 	/* Allow the FDW to shut down */
 	if (target_resultRelInfo->ri_FdwRoutine != NULL &&
 		target_resultRelInfo->ri_FdwRoutine->EndForeignInsert != NULL)
@@ -1444,6 +1469,145 @@ BeginCopyFrom(ParseState *pstate,
 		}
 	}
 
+	/* Set up soft error handler for SAVE_ERROR */
+	if (cstate->opts.save_error)
+	{
+		StringInfoData 	querybuf;
+		bool		isnull;
+		bool		copy_erros_table_ok;
+		Oid			nsp_oid;
+		Oid			save_userid;
+		Oid			ownerId;
+		int			save_sec_context;
+		const char	*copy_errors_nspname;
+		HeapTuple	utup;
+		HeapTuple	tuple;
+		const char	*rname;
+
+		cstate->escontext = makeNode(ErrorSaveContext);
+		cstate->escontext->type = T_ErrorSaveContext;
+		cstate->escontext->details_wanted = true;
+		cstate->escontext->error_occurred = false;
+
+		copy_errors_nspname = get_namespace_name(RelationGetNamespace(cstate->rel));
+		nsp_oid = get_namespace_oid(copy_errors_nspname, false);
+
+		initStringInfo(&querybuf);
+		/*
+		*
+		* Verify whether the nsp_oid.COPY_ERRORS table already exists, and if so,
+		* examine its column names and data types.
+		*/
+		appendStringInfo(&querybuf,
+						"SELECT (array_agg(pa.attname ORDER BY pa.attnum) "
+							"= '{ctid,userid,copy_destination,filename,lineno, "
+							"line,colname,raw_field_value,err_message,err_detail,errorcode}') "
+							"AND (ARRAY_AGG(pt.typname ORDER BY pa.attnum) "
+							"= '{tid,oid,oid,text,int8,text,text,text,text,text,text}') "
+							"FROM pg_catalog.pg_attribute pa "
+							"JOIN pg_catalog.pg_class	pc ON pc.oid = pa.attrelid "
+							"JOIN pg_catalog.pg_type 	pt ON pt.oid = pa.atttypid "
+							"JOIN pg_catalog.pg_namespace pn "
+							"ON pn.oid = pc.relnamespace WHERE ");
+		appendStringInfo(&querybuf,
+							"relname = $$copy_errors$$ AND pn.nspname = $$%s$$ "
+							" AND pa.attnum >= -1 AND NOT attisdropped ",
+							copy_errors_nspname);
+
+		if (SPI_connect() != SPI_OK_CONNECT)
+			elog(ERROR, "SPI_connect failed");
+
+		if (SPI_execute(querybuf.data, false, 0) != SPI_OK_SELECT)
+			elog(ERROR, "SPI_exec failed: %s", querybuf.data);
+
+		copy_erros_table_ok = DatumGetBool(SPI_getbinval(SPI_tuptable->vals[0],
+									   SPI_tuptable->tupdesc,
+									   1, &isnull));
+		/*
+		* Switch to the schema owner's userid, so that the COPY_ERRORS table owned by
+		* that user. Also record the current userid.
+		*/
+		GetUserIdAndSecContext(&save_userid, &save_sec_context);
+
+		utup = SearchSysCache1(AUTHOID, ObjectIdGetDatum(save_userid));
+		if (!HeapTupleIsValid(utup))
+			elog(ERROR, "cache lookup failed for role %u", save_userid);
+
+		rname = pstrdup(NameStr(((Form_pg_authid) GETSTRUCT(utup))->rolname));
+		ReleaseSysCache(utup);
+
+		tuple = SearchSysCache1(NAMESPACEOID, ObjectIdGetDatum(nsp_oid));
+		if (!HeapTupleIsValid(utup))
+				ereport(ERROR,
+						(errcode(ERRCODE_UNDEFINED_SCHEMA),
+						errmsg("schema with OID %u does not exist", nsp_oid)));
+		ownerId = ((Form_pg_namespace) GETSTRUCT(tuple))->nspowner;
+		ReleaseSysCache(tuple);
+
+		/* not sure the flag is correct */
+		SetUserIdAndSecContext(ownerId,
+							save_sec_context | SECURITY_LOCAL_USERID_CHANGE |
+							SECURITY_NOFORCE_RLS);
+
+		/* No copy_errors_nspname.COPY_ERRORS table then create it for holding all the potential error. */
+		if (isnull)
+		{
+			resetStringInfo(&querybuf);
+			appendStringInfo(&querybuf,
+				"CREATE TABLE %s.COPY_ERRORS( "
+					"USERID OID, COPY_DESTINATION OID, FILENAME TEXT,LINENO BIGINT "
+					",LINE TEXT, COLNAME text, RAW_FIELD_VALUE TEXT "
+					",ERR_MESSAGE TEXT, ERR_DETAIL TEXT, ERRORCODE TEXT)", copy_errors_nspname);
+
+			if (SPI_execute(querybuf.data, false, 0) != SPI_OK_UTILITY)
+				elog(ERROR, "SPI_exec failed: %s", querybuf.data);
+
+			resetStringInfo(&querybuf);
+			appendStringInfo(&querybuf,
+				"CREATE POLICY copyerror ON %s.COPY_ERRORS "
+					"FOR ALL TO PUBLIC USING (USERID = current_user::regrole::oid)",
+					 copy_errors_nspname);
+
+			if (SPI_execute(querybuf.data, false, 0) != SPI_OK_UTILITY)
+				elog(ERROR, "SPI_exec failed: %s", querybuf.data);
+
+			resetStringInfo(&querybuf);
+			appendStringInfo(&querybuf,
+			"ALTER TABLE %s.COPY_ERRORS ENABLE ROW LEVEL SECURITY", copy_errors_nspname);
+
+			if (SPI_execute(querybuf.data, false, 0) != SPI_OK_UTILITY)
+				elog(ERROR, "SPI_exec failed: %s", querybuf.data);
+		}
+		else if(!copy_erros_table_ok)
+			ereport(ERROR,
+					(errmsg("table %s.COPY_ERRORS already exists. "
+								 "cannot use it for COPY FROM error saving",
+								 copy_errors_nspname)));
+
+		/* grant INSERT/SELECT on copy_errors to the copy operation user now */
+		resetStringInfo(&querybuf);
+		appendStringInfo(&querybuf,
+				"GRANT SELECT, DELETE, INSERT ON TABLE %s.COPY_ERRORS TO %s", copy_errors_nspname, rname);
+
+		if (SPI_execute(querybuf.data, false, 0) != SPI_OK_UTILITY)
+			elog(ERROR, "SPI_exec failed: %s", querybuf.data);
+
+		if (SPI_finish() != SPI_OK_FINISH)
+			elog(ERROR, "SPI_finish failed");
+
+		/* Restore userid and security context */
+		SetUserIdAndSecContext(save_userid, save_sec_context);
+		cstate->copy_errors_nspname = pstrdup(copy_errors_nspname);
+	}
+	else
+	{
+		cstate->copy_errors_nspname = NULL;
+		cstate->escontext = NULL;
+	}
+
+	cstate->error_rows_cnt = 0;  		/* set the default to 0 */
+	cstate->line_error_occured = false;	/* default, assume conversion be ok. */
+
 	/* Convert convert_selectively name list to per-column flags */
 	if (cstate->opts.convert_selectively)
 	{
diff --git a/src/backend/commands/copyfromparse.c b/src/backend/commands/copyfromparse.c
index f5537345..f1a6f9dc 100644
--- a/src/backend/commands/copyfromparse.c
+++ b/src/backend/commands/copyfromparse.c
@@ -66,10 +66,12 @@
 #include "commands/copyfrom_internal.h"
 #include "commands/progress.h"
 #include "executor/executor.h"
+#include "executor/spi.h"
 #include "libpq/libpq.h"
 #include "libpq/pqformat.h"
 #include "mb/pg_wchar.h"
 #include "miscadmin.h"
+#include "nodes/miscnodes.h"
 #include "pgstat.h"
 #include "port/pg_bswap.h"
 #include "utils/builtins.h"
@@ -852,7 +854,7 @@ NextCopyFromRawFields(CopyFromState cstate, char ***fields, int *nfields)
  */
 bool
 NextCopyFrom(CopyFromState cstate, ExprContext *econtext,
-			 Datum *values, bool *nulls)
+			 Datum *values, bool *nulls, StringInfo err_save_buf)
 {
 	TupleDesc	tupDesc;
 	AttrNumber	num_phys_attrs,
@@ -880,16 +882,60 @@ NextCopyFrom(CopyFromState cstate, ExprContext *econtext,
 		int			fldct;
 		int			fieldno;
 		char	   *string;
+		char		*errmsg_extra;
+		Oid			save_userid;
+		int			save_sec_context;
 
 		/* read raw fields in the next line */
 		if (!NextCopyFromRawFields(cstate, &field_strings, &fldct))
 			return false;
 
+		/* reset line_error_occured to false for next new line. */
+		if (cstate->line_error_occured)
+			cstate->line_error_occured = false;
+
+		/* we need to get the current userid for the SPI queries */
+		if (cstate->opts.save_error)
+			GetUserIdAndSecContext(&save_userid, &save_sec_context);
+
 		/* check for overflowing fields */
 		if (attr_count > 0 && fldct > attr_count)
-			ereport(ERROR,
-					(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
-					 errmsg("extra data after last expected column")));
+		{
+			if(cstate->opts.save_error)
+			{
+				errmsg_extra = pstrdup("extra data after last expected column");
+
+				resetStringInfo(err_save_buf);
+				appendStringInfo(err_save_buf,
+								"INSERT INTO %s.copy_errors(userid, copy_destination, filename,lineno,line, "
+								"err_message, errorcode) "
+								"SELECT %u, %u,$$%s$$, %llu,$$%s$$, $$%s$$, $$%s$$",
+								cstate->copy_errors_nspname,
+								save_userid,
+								cstate->rel->rd_rel->oid,
+								cstate->filename ? cstate->filename : "STDIN",
+								(unsigned long long) cstate->cur_lineno,
+								cstate->line_buf.data,
+								errmsg_extra,
+								unpack_sql_state(ERRCODE_BAD_COPY_FILE_FORMAT));
+				if (SPI_connect() != SPI_OK_CONNECT)
+					elog(ERROR, "SPI_connect failed");
+
+				if (SPI_execute(err_save_buf->data, false, 0) != SPI_OK_INSERT)
+					elog(ERROR, "SPI_exec failed: %s", err_save_buf->data);
+
+				if (SPI_finish() != SPI_OK_FINISH)
+					elog(ERROR, "SPI_finish failed");
+
+				cstate->line_error_occured = true;
+				cstate->error_rows_cnt++;
+				return true;
+			}
+			else
+				ereport(ERROR,
+						(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
+						errmsg("extra data after last expected column")));
+		}
 
 		fieldno = 0;
 
@@ -901,10 +947,50 @@ NextCopyFrom(CopyFromState cstate, ExprContext *econtext,
 			Form_pg_attribute att = TupleDescAttr(tupDesc, m);
 
 			if (fieldno >= fldct)
-				ereport(ERROR,
-						(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
-						 errmsg("missing data for column \"%s\"",
-								NameStr(att->attname))));
+			{
+				if(cstate->opts.save_error)
+				{
+					char	errmsg[128];
+					snprintf(errmsg, sizeof(errmsg),
+							"missing data for column \"%s\"",
+							NameStr(att->attname));
+
+					resetStringInfo(err_save_buf);
+					appendStringInfo(err_save_buf,
+								"INSERT INTO %s.copy_errors( "
+									"userid,copy_destination,filename, "
+									"lineno,line,COLNAME, err_message, errorcode) "
+									"SELECT %u, %u, $$%s$$, %llu, $$%s$$, $$%s$$, $$%s$$, $$%s$$ ",
+									cstate->copy_errors_nspname,
+									save_userid,
+									cstate->rel->rd_rel->oid,
+									cstate->filename ? cstate->filename : "STDIN",
+									(unsigned long long) cstate->cur_lineno,
+									cstate->line_buf.data,
+									NameStr(att->attname),
+									errmsg,
+									unpack_sql_state(ERRCODE_BAD_COPY_FILE_FORMAT));
+
+					if (SPI_connect() != SPI_OK_CONNECT)
+						elog(ERROR, "SPI_connect failed");
+
+					if (SPI_execute(err_save_buf->data, false, 0) != SPI_OK_INSERT)
+						elog(ERROR, "SPI_exec failed: %s", err_save_buf->data);
+
+					if (SPI_finish() != SPI_OK_FINISH)
+						elog(ERROR, "SPI_finish failed");
+
+					cstate->line_error_occured = true;
+					cstate->error_rows_cnt++;
+					return true;
+				}
+				else
+					ereport(ERROR,
+							(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
+							 errmsg("missing data for column \"%s\"",
+									NameStr(att->attname))));
+			}
+
 			string = field_strings[fieldno++];
 
 			if (cstate->convert_select_flags &&
@@ -956,15 +1042,84 @@ NextCopyFrom(CopyFromState cstate, ExprContext *econtext,
 				values[m] = ExecEvalExpr(defexprs[m], econtext, &nulls[m]);
 			}
 			else
-				values[m] = InputFunctionCall(&in_functions[m],
-											  string,
-											  typioparams[m],
-											  att->atttypmod);
+			{
+				/*
+				*
+				* InputFunctionCall is more faster than InputFunctionCallSafe.
+				*
+				*/
+				if(!cstate->opts.save_error)
+					values[m] = InputFunctionCall(&in_functions[m],
+												string,
+												typioparams[m],
+												att->atttypmod);
+				else
+				{
+					if (!InputFunctionCallSafe(&in_functions[m],
+											string,
+											typioparams[m],
+											att->atttypmod,
+											(Node *) cstate->escontext,
+											&values[m]))
+					{
+						char	*err_detail;
 
+						if (!cstate->escontext->error_data->detail)
+							err_detail = NULL;
+						else
+							err_detail = cstate->escontext->error_data->detail;
+
+						resetStringInfo(err_save_buf);
+						appendStringInfo(err_save_buf,
+										"INSERT INTO %s.copy_errors(userid,copy_destination, "
+										"filename, lineno,line,COLNAME, "
+										"raw_field_value, err_message,errorcode, err_detail) "
+										"SELECT %u, %u, $$%s$$, %llu, $$%s$$, $$%s$$, $$%s$$, $$%s$$, $$%s$$, ",
+										cstate->copy_errors_nspname,
+										save_userid,
+										cstate->rel->rd_rel->oid,
+										cstate->filename ? cstate->filename : "STDIN",
+										(unsigned long long) cstate->cur_lineno,
+										cstate->line_buf.data,
+										cstate->cur_attname,
+										string,
+										cstate->escontext->error_data->message,
+										unpack_sql_state(cstate->escontext->error_data->sqlerrcode));
+
+						if (!err_detail)
+							appendStringInfo(err_save_buf, "NULL::text");
+						else
+							appendStringInfo(err_save_buf,"$$%s$$", err_detail);
+
+						if (SPI_connect() != SPI_OK_CONNECT)
+							elog(ERROR, "SPI_connect failed");
+
+						if (SPI_execute(err_save_buf->data, false, 0) != SPI_OK_INSERT)
+							elog(ERROR, "SPI_execute failed: %s", err_save_buf->data);
+
+						if (SPI_finish() != SPI_OK_FINISH)
+							elog(ERROR, "SPI_finish failed");
+
+						/* line error occured, set it once per line */
+						if (!cstate->line_error_occured)
+							cstate->line_error_occured = true;
+						/* reset ErrorSaveContext */
+						cstate->escontext->error_occurred = false;
+						cstate->escontext->details_wanted = true;
+						memset(cstate->escontext->error_data,0, sizeof(ErrorData));
+					}
+				}
+			}
 			cstate->cur_attname = NULL;
 			cstate->cur_attval = NULL;
 		}
 
+		/* record error rows count. */
+		if (cstate->line_error_occured)
+		{
+			cstate->error_rows_cnt++;
+			Assert(cstate->opts.save_error);
+		}
 		Assert(fieldno == attr_count);
 	}
 	else
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index 63f172e1..f42e72aa 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -755,7 +755,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
 	RESET RESTART RESTRICT RETURN RETURNING RETURNS REVOKE RIGHT ROLE ROLLBACK ROLLUP
 	ROUTINE ROUTINES ROW ROWS RULE
 
-	SAVEPOINT SCALAR SCHEMA SCHEMAS SCROLL SEARCH SECOND_P SECURITY SELECT
+	SAVEPOINT SAVE_ERROR SCALAR SCHEMA SCHEMAS SCROLL SEARCH SECOND_P SECURITY SELECT
 	SEQUENCE SEQUENCES
 	SERIALIZABLE SERVER SESSION SESSION_USER SET SETS SETOF SHARE SHOW
 	SIMILAR SIMPLE SKIP SMALLINT SNAPSHOT SOME SQL_P STABLE STANDALONE_P
@@ -3448,6 +3448,10 @@ copy_opt_item:
 				{
 					$$ = makeDefElem("encoding", (Node *) makeString($2), @1);
 				}
+			| SAVE_ERROR
+				{
+					$$ = makeDefElem("save_error", (Node *) makeBoolean(true), @1);
+				}
 		;
 
 /* The following exist for backward compatibility with very old versions */
@@ -17346,6 +17350,7 @@ unreserved_keyword:
 			| ROWS
 			| RULE
 			| SAVEPOINT
+			| SAVE_ERROR
 			| SCALAR
 			| SCHEMA
 			| SCHEMAS
@@ -17954,6 +17959,7 @@ bare_label_keyword:
 			| ROWS
 			| RULE
 			| SAVEPOINT
+			| SAVE_ERROR
 			| SCALAR
 			| SCHEMA
 			| SCHEMAS
diff --git a/src/bin/psql/tab-complete.c b/src/bin/psql/tab-complete.c
index 04980118..e6a358e0 100644
--- a/src/bin/psql/tab-complete.c
+++ b/src/bin/psql/tab-complete.c
@@ -2890,7 +2890,8 @@ psql_completion(const char *text, int start, int end)
 	else if (Matches("COPY|\\copy", MatchAny, "FROM|TO", MatchAny, "WITH", "("))
 		COMPLETE_WITH("FORMAT", "FREEZE", "DELIMITER", "NULL",
 					  "HEADER", "QUOTE", "ESCAPE", "FORCE_QUOTE",
-					  "FORCE_NOT_NULL", "FORCE_NULL", "ENCODING", "DEFAULT");
+					  "FORCE_NOT_NULL", "FORCE_NULL", "ENCODING", "DEFAULT",
+					  "SAVE_ERROR");
 
 	/* Complete COPY <sth> FROM|TO filename WITH (FORMAT */
 	else if (Matches("COPY|\\copy", MatchAny, "FROM|TO", MatchAny, "WITH", "(", "FORMAT"))
diff --git a/src/include/commands/copy.h b/src/include/commands/copy.h
index f2cca0b9..de47791a 100644
--- a/src/include/commands/copy.h
+++ b/src/include/commands/copy.h
@@ -43,6 +43,7 @@ typedef struct CopyFormatOptions
 	bool		binary;			/* binary format? */
 	bool		freeze;			/* freeze rows on loading? */
 	bool		csv_mode;		/* Comma Separated Value format? */
+	bool		save_error;		/*  save error to a table? */
 	CopyHeaderChoice header_line;	/* header line? */
 	char	   *null_print;		/* NULL marker string (server encoding!) */
 	int			null_print_len; /* length of same */
@@ -82,7 +83,7 @@ extern CopyFromState BeginCopyFrom(ParseState *pstate, Relation rel, Node *where
 								   bool is_program, copy_data_source_cb data_source_cb, List *attnamelist, List *options);
 extern void EndCopyFrom(CopyFromState cstate);
 extern bool NextCopyFrom(CopyFromState cstate, ExprContext *econtext,
-						 Datum *values, bool *nulls);
+						 Datum *values, bool *nulls, StringInfo err_save_buf);
 extern bool NextCopyFromRawFields(CopyFromState cstate,
 								  char ***fields, int *nfields);
 extern void CopyFromErrorCallback(void *arg);
diff --git a/src/include/commands/copyfrom_internal.h b/src/include/commands/copyfrom_internal.h
index 5ec41589..65e34e89 100644
--- a/src/include/commands/copyfrom_internal.h
+++ b/src/include/commands/copyfrom_internal.h
@@ -16,6 +16,7 @@
 
 #include "commands/copy.h"
 #include "commands/trigger.h"
+#include "nodes/miscnodes.h"
 
 /*
  * Represents the different source cases we need to worry about at
@@ -94,6 +95,10 @@ typedef struct CopyFromStateData
 								 * default value */
 	FmgrInfo   *in_functions;	/* array of input functions for each attrs */
 	Oid		   *typioparams;	/* array of element types for in_functions */
+	ErrorSaveContext *escontext; 	/* soft error trapper during in_functions execution */
+	uint64		error_rows_cnt; /* total number of rows that have errors */
+	const char 	*copy_errors_nspname;		/* the copy_errors's namespace */
+	bool	 	line_error_occured;	/* does this line conversion error happened */
 	int		   *defmap;			/* array of default att numbers related to
 								 * missing att */
 	ExprState **defexprs;		/* array of default att expressions for all
diff --git a/src/include/parser/kwlist.h b/src/include/parser/kwlist.h
index 5984dcfa..d0988a4c 100644
--- a/src/include/parser/kwlist.h
+++ b/src/include/parser/kwlist.h
@@ -377,6 +377,7 @@ PG_KEYWORD("routines", ROUTINES, UNRESERVED_KEYWORD, BARE_LABEL)
 PG_KEYWORD("row", ROW, COL_NAME_KEYWORD, BARE_LABEL)
 PG_KEYWORD("rows", ROWS, UNRESERVED_KEYWORD, BARE_LABEL)
 PG_KEYWORD("rule", RULE, UNRESERVED_KEYWORD, BARE_LABEL)
+PG_KEYWORD("save_error", SAVE_ERROR, UNRESERVED_KEYWORD, BARE_LABEL)
 PG_KEYWORD("savepoint", SAVEPOINT, UNRESERVED_KEYWORD, BARE_LABEL)
 PG_KEYWORD("scalar", SCALAR, UNRESERVED_KEYWORD, BARE_LABEL)
 PG_KEYWORD("schema", SCHEMA, UNRESERVED_KEYWORD, BARE_LABEL)
diff --git a/src/test/regress/expected/copy2.out b/src/test/regress/expected/copy2.out
index c4178b9c..f5a84487 100644
--- a/src/test/regress/expected/copy2.out
+++ b/src/test/regress/expected/copy2.out
@@ -564,6 +564,142 @@ ERROR:  conflicting or redundant options
 LINE 1: ... b, c) FROM STDIN WITH (FORMAT csv, FORCE_NULL *, FORCE_NULL...
                                                              ^
 ROLLBACK;
+--
+-- tests for SAVE_ERROR option with force_not_null, force_null
+\pset null NULL
+CREATE TABLE save_error_csv(
+    a INT NOT NULL,
+    b TEXT NOT NULL,
+    c TEXT,
+    d TEXT
+);
+--save_error not allowed in binary mode
+COPY save_error_csv (a, b, c) FROM STDIN WITH (save_error,FORMAT binary);
+ERROR:  cannot specify SAVE_ERROR in BINARY mode
+-- redundant options not allowed.
+COPY save_error_csv FROM STDIN WITH (save_error, save_error off);
+ERROR:  conflicting or redundant options
+LINE 1: COPY save_error_csv FROM STDIN WITH (save_error, save_error ...
+                                                         ^
+create table COPY_ERRORS();
+--should fail. since table COPY_ERRORS already exists.
+COPY save_error_csv (a, b, c) FROM STDIN WITH (save_error);
+ERROR:  table public.COPY_ERRORS already exists. cannot use it for COPY FROM error saving
+drop table COPY_ERRORS;
+--with FORCE_NOT_NULL and FORCE_NULL.
+COPY save_error_csv (a, b, c) FROM STDIN WITH (save_error,FORMAT csv, FORCE_NOT_NULL(b), FORCE_NULL(c));
+NOTICE:  2 rows were skipped because of conversion error. Skipped rows saved to table public.copy_errors
+SELECT *, b is null as b_null, b = '' as b_empty FROM save_error_csv;
+ a | b |  c   |  d   | b_null | b_empty 
+---+---+------+------+--------+---------
+ 2 |   | NULL | NULL | f      | t
+(1 row)
+
+DROP TABLE save_error_csv;
+-- save error with extra data and missing data some column.
+---normal data type conversion error case.
+CREATE TABLE check_ign_err (n int, m int[], k bigint, l text);
+COPY check_ign_err FROM STDIN WITH (save_error);
+NOTICE:  10 rows were skipped because of conversion error. Skipped rows saved to table public.copy_errors
+select	pc.relname, ce.filename,ce.lineno,ce.line,ce.colname,
+		ce.raw_field_value,ce.err_message,ce.err_detail,ce.errorcode
+from	copy_errors ce	join pg_class pc on pc.oid = ce.copy_destination
+where 	pc.relname = 'check_ign_err';
+    relname    | filename | lineno |                    line                    | colname |     raw_field_value     |                           err_message                           |        err_detail         | errorcode 
+---------------+----------+--------+--------------------------------------------+---------+-------------------------+-----------------------------------------------------------------+---------------------------+-----------
+ check_ign_err | STDIN    |      1 | 1       {1}     1       1       extra      | NULL    | NULL                    | extra data after last expected column                           | NULL                      | 22P04
+ check_ign_err | STDIN    |      2 | 2                                          | m       | NULL                    | missing data for column "m"                                     | NULL                      | 22P04
+ check_ign_err | STDIN    |      3 | \n      {1}     1       \-                 | n       |                        +| invalid input syntax for type integer: "                       +| NULL                      | 22P02
+               |          |        |                                            |         |                         | "                                                               |                           | 
+ check_ign_err | STDIN    |      4 | a       {2}     2       \r                 | n       | a                       | invalid input syntax for type integer: "a"                      | NULL                      | 22P02
+ check_ign_err | STDIN    |      5 | 3       {\3}    3333333333      \n         | m       | {\x03}                  | invalid input syntax for type integer: "\x03"                   | NULL                      | 22P02
+ check_ign_err | STDIN    |      6 | 0x11    {3,}    3333333333      \\.        | m       | {3,}                    | malformed array literal: "{3,}"                                 | Unexpected "}" character. | 22P02
+ check_ign_err | STDIN    |      7 | d       {3,1/}  3333333333      \\0        | n       | d                       | invalid input syntax for type integer: "d"                      | NULL                      | 22P02
+ check_ign_err | STDIN    |      7 | d       {3,1/}  3333333333      \\0        | m       | {3,1/}                  | invalid input syntax for type integer: "1/"                     | NULL                      | 22P02
+ check_ign_err | STDIN    |      8 | e       {3,\1}  -3323879289873933333333 \n | n       | e                       | invalid input syntax for type integer: "e"                      | NULL                      | 22P02
+ check_ign_err | STDIN    |      8 | e       {3,\1}  -3323879289873933333333 \n | m       | {3,\x01}                | invalid input syntax for type integer: "\x01"                   | NULL                      | 22P02
+ check_ign_err | STDIN    |      8 | e       {3,\1}  -3323879289873933333333 \n | k       | -3323879289873933333333 | value "-3323879289873933333333" is out of range for type bigint | NULL                      | 22003
+ check_ign_err | STDIN    |      9 | f       {3,1}   3323879289873933333333  \r | n       | f                       | invalid input syntax for type integer: "f"                      | NULL                      | 22P02
+ check_ign_err | STDIN    |      9 | f       {3,1}   3323879289873933333333  \r | k       | 3323879289873933333333  | value "3323879289873933333333" is out of range for type bigint  | NULL                      | 22003
+ check_ign_err | STDIN    |     10 | b       {a, 4}  1.1     h                  | n       | b                       | invalid input syntax for type integer: "b"                      | NULL                      | 22P02
+ check_ign_err | STDIN    |     10 | b       {a, 4}  1.1     h                  | m       | {a, 4}                  | invalid input syntax for type integer: "a"                      | NULL                      | 22P02
+ check_ign_err | STDIN    |     10 | b       {a, 4}  1.1     h                  | k       | 1.1                     | invalid input syntax for type bigint: "1.1"                     | NULL                      | 22P02
+(16 rows)
+
+DROP TABLE check_ign_err;
+DROP TABLE COPY_ERRORS;
+--(type textrange was already made in test_setup.sql)
+--using textrange doing test
+begin;
+CREATE USER test_copy_errors1;
+CREATE USER test_copy_errors2;
+CREATE USER test_copy_errors3;
+CREATE SCHEMA IF NOT EXISTS copy_errors_test AUTHORIZATION test_copy_errors3;
+SET LOCAL search_path TO copy_errors_test;
+GRANT USAGE on schema copy_errors_test to test_copy_errors1,test_copy_errors2,test_copy_errors3;
+GRANT CREATE on schema copy_errors_test to test_copy_errors3;
+set role test_copy_errors3;
+CREATE TABLE textrange_input(a public.textrange, b public.textrange, c public.textrange);
+GRANT insert on textrange_input to test_copy_errors1;
+GRANT insert on textrange_input to test_copy_errors2;
+set role test_copy_errors1;
+COPY textrange_input(a, b, c) FROM STDIN WITH (save_error,FORMAT csv, FORCE_NULL *);
+NOTICE:  2 rows were skipped because of conversion error. Skipped rows saved to table copy_errors_test.copy_errors
+---each user is only allowed to see their own rows.
+--based on userid is the same as current_user.
+select 	count(*) as should_be_zero
+from 	copy_errors_test.copy_errors ce
+join	pg_class	pc	on pc.oid = ce.copy_destination
+join	pg_roles	pr 	on pr.oid = ce.userid
+where 	ce.userid != current_user::regrole::oid;
+ should_be_zero 
+----------------
+              0
+(1 row)
+
+SELECT	pc.relname,pr.rolname,ce.filename,ce.lineno,ce.line,ce.colname,
+		ce.raw_field_value,ce.err_message,ce.err_detail,ce.errorcode
+FROM	copy_errors_test.copy_errors ce
+JOIN 	pg_class pc ON pc.oid = ce.copy_destination
+JOIN 	pg_roles pr ON pr.oid = ce.userid;
+     relname     |      rolname      | filename | lineno |         line          | colname | raw_field_value |                            err_message                            |                err_detail                | errorcode 
+-----------------+-------------------+----------+--------+-----------------------+---------+-----------------+-------------------------------------------------------------------+------------------------------------------+-----------
+ textrange_input | test_copy_errors1 | STDIN    |      1 | ,-[a\","z),[a","-inf) | b       | -[a\,z)         | malformed range literal: "-[a\,z)"                                | Missing left parenthesis or bracket.     | 22P02
+ textrange_input | test_copy_errors1 | STDIN    |      1 | ,-[a\","z),[a","-inf) | c       | [a,-inf)        | range lower bound must be less than or equal to range upper bound | NULL                                     | 22000
+ textrange_input | test_copy_errors1 | STDIN    |      2 | (",a),(",",a),()",a); | a       | (,a),(          | malformed range literal: "(,a),("                                 | Junk after right parenthesis or bracket. | 22P02
+ textrange_input | test_copy_errors1 | STDIN    |      2 | (",a),(",",a),()",a); | b       | ,a),()          | malformed range literal: ",a),()"                                 | Missing left parenthesis or bracket.     | 22P02
+ textrange_input | test_copy_errors1 | STDIN    |      2 | (",a),(",",a),()",a); | c       | a);             | malformed range literal: "a);"                                    | Missing left parenthesis or bracket.     | 22P02
+(5 rows)
+
+set role test_copy_errors2;
+COPY textrange_input(a, b, c) FROM STDIN WITH (save_error,FORMAT csv, FORCE_NULL *);
+NOTICE:  2 rows were skipped because of conversion error. Skipped rows saved to table copy_errors_test.copy_errors
+SAVEPOINT s1;
+--current user (non-super user) are not allowed to update)
+update copy_errors_test.copy_errors set userid = 0;
+ERROR:  permission denied for table copy_errors
+ROLLBACK to s1;
+--current user (non-super user) are allowed to delete all the record they created.
+delete from copy_errors_test.copy_errors;
+set role test_copy_errors1;
+SELECT	pc.relname,pr.rolname,ce.filename,ce.lineno,ce.line,ce.colname,
+		ce.raw_field_value,ce.err_message,ce.err_detail,ce.errorcode
+FROM	copy_errors_test.copy_errors ce
+JOIN 	pg_class pc ON pc.oid = ce.copy_destination
+JOIN 	pg_roles pr ON pr.oid = ce.userid;
+     relname     |      rolname      | filename | lineno |         line          | colname | raw_field_value |                            err_message                            |                err_detail                | errorcode 
+-----------------+-------------------+----------+--------+-----------------------+---------+-----------------+-------------------------------------------------------------------+------------------------------------------+-----------
+ textrange_input | test_copy_errors1 | STDIN    |      1 | ,-[a\","z),[a","-inf) | b       | -[a\,z)         | malformed range literal: "-[a\,z)"                                | Missing left parenthesis or bracket.     | 22P02
+ textrange_input | test_copy_errors1 | STDIN    |      1 | ,-[a\","z),[a","-inf) | c       | [a,-inf)        | range lower bound must be less than or equal to range upper bound | NULL                                     | 22000
+ textrange_input | test_copy_errors1 | STDIN    |      2 | (",a),(",",a),()",a); | a       | (,a),(          | malformed range literal: "(,a),("                                 | Junk after right parenthesis or bracket. | 22P02
+ textrange_input | test_copy_errors1 | STDIN    |      2 | (",a),(",",a),()",a); | b       | ,a),()          | malformed range literal: ",a),()"                                 | Missing left parenthesis or bracket.     | 22P02
+ textrange_input | test_copy_errors1 | STDIN    |      2 | (",a),(",",a),()",a); | c       | a);             | malformed range literal: "a);"                                    | Missing left parenthesis or bracket.     | 22P02
+(5 rows)
+
+set role test_copy_errors3;
+--owner allowed to drop the table.
+drop table copy_errors;
+ROLLBACK;
 \pset null ''
 -- test case with whole-row Var in a check constraint
 create table check_con_tbl (f1 int);
@@ -822,3 +958,27 @@ truncate copy_default;
 -- DEFAULT cannot be used in COPY TO
 copy (select 1 as test) TO stdout with (default '\D');
 ERROR:  COPY DEFAULT only available using COPY FROM
+-- DEFAULT WITH SAVE_ERROR.
+create table copy_default_error_save (
+	id integer,
+	text_value text not null default 'test',
+	ts_value timestamp without time zone not null default '2022-07-05'
+);
+copy copy_default_error_save from stdin with (save_error, default '\D');
+NOTICE:  3 rows were skipped because of conversion error. Skipped rows saved to table public.copy_errors
+select 	ce.filename,ce.lineno,ce.line,
+		ce.colname, ce.raw_field_value,
+		ce.err_message, ce.err_detail,ce.errorcode
+from 	public.copy_errors ce
+join	pg_class	pc	on pc.oid = ce.copy_destination
+where	pc.relname = 'copy_default_error_save';
+ filename | lineno |               line               | colname  | raw_field_value  |                         err_message                         | err_detail | errorcode 
+----------+--------+----------------------------------+----------+------------------+-------------------------------------------------------------+------------+-----------
+ STDIN    |      1 | k       value   '2022-07-04'     | id       | k                | invalid input syntax for type integer: "k"                  |            | 22P02
+ STDIN    |      2 | z       \D      '2022-07-03ASKL' | id       | z                | invalid input syntax for type integer: "z"                  |            | 22P02
+ STDIN    |      2 | z       \D      '2022-07-03ASKL' | ts_value | '2022-07-03ASKL' | invalid input syntax for type timestamp: "'2022-07-03ASKL'" |            | 22007
+ STDIN    |      3 | s       \D      \D               | id       | s                | invalid input syntax for type integer: "s"                  |            | 22P02
+(4 rows)
+
+drop table copy_default_error_save;
+truncate copy_default;
diff --git a/src/test/regress/sql/copy2.sql b/src/test/regress/sql/copy2.sql
index a5486f60..a4ef06d9 100644
--- a/src/test/regress/sql/copy2.sql
+++ b/src/test/regress/sql/copy2.sql
@@ -374,6 +374,126 @@ BEGIN;
 COPY forcetest (a, b, c) FROM STDIN WITH (FORMAT csv, FORCE_NULL *, FORCE_NULL(b));
 ROLLBACK;
 
+--
+-- tests for SAVE_ERROR option with force_not_null, force_null
+\pset null NULL
+CREATE TABLE save_error_csv(
+    a INT NOT NULL,
+    b TEXT NOT NULL,
+    c TEXT,
+    d TEXT
+);
+
+--save_error not allowed in binary mode
+COPY save_error_csv (a, b, c) FROM STDIN WITH (save_error,FORMAT binary);
+
+-- redundant options not allowed.
+COPY save_error_csv FROM STDIN WITH (save_error, save_error off);
+
+create table COPY_ERRORS();
+--should fail. since table COPY_ERRORS already exists.
+COPY save_error_csv (a, b, c) FROM STDIN WITH (save_error);
+
+drop table COPY_ERRORS;
+
+--with FORCE_NOT_NULL and FORCE_NULL.
+COPY save_error_csv (a, b, c) FROM STDIN WITH (save_error,FORMAT csv, FORCE_NOT_NULL(b), FORCE_NULL(c));
+z,,""
+\0,,
+2,,
+\.
+
+SELECT *, b is null as b_null, b = '' as b_empty FROM save_error_csv;
+DROP TABLE save_error_csv;
+
+-- save error with extra data and missing data some column.
+---normal data type conversion error case.
+CREATE TABLE check_ign_err (n int, m int[], k bigint, l text);
+COPY check_ign_err FROM STDIN WITH (save_error);
+1	{1}	1	1	extra
+2
+\n	{1}	1	\-
+a	{2}	2	\r
+3	{\3}	3333333333	\n
+0x11	{3,}	3333333333	\\.
+d	{3,1/}	3333333333	\\0
+e	{3,\1}	-3323879289873933333333	\n
+f	{3,1}	3323879289873933333333	\r
+b	{a, 4}	1.1	h
+5	{5}	5	\\
+\.
+
+select	pc.relname, ce.filename,ce.lineno,ce.line,ce.colname,
+		ce.raw_field_value,ce.err_message,ce.err_detail,ce.errorcode
+from	copy_errors ce	join pg_class pc on pc.oid = ce.copy_destination
+where 	pc.relname = 'check_ign_err';
+
+DROP TABLE check_ign_err;
+DROP TABLE COPY_ERRORS;
+
+--(type textrange was already made in test_setup.sql)
+--using textrange doing test
+begin;
+
+CREATE USER test_copy_errors1;
+CREATE USER test_copy_errors2;
+CREATE USER test_copy_errors3;
+CREATE SCHEMA IF NOT EXISTS copy_errors_test AUTHORIZATION test_copy_errors3;
+SET LOCAL search_path TO copy_errors_test;
+
+GRANT USAGE on schema copy_errors_test to test_copy_errors1,test_copy_errors2,test_copy_errors3;
+GRANT CREATE on schema copy_errors_test to test_copy_errors3;
+set role test_copy_errors3;
+CREATE TABLE textrange_input(a public.textrange, b public.textrange, c public.textrange);
+GRANT insert on textrange_input to test_copy_errors1;
+GRANT insert on textrange_input to test_copy_errors2;
+
+set role test_copy_errors1;
+COPY textrange_input(a, b, c) FROM STDIN WITH (save_error,FORMAT csv, FORCE_NULL *);
+,-[a\","z),[a","-inf)
+(",a),(",",a),()",a);
+\.
+
+---each user is only allowed to see their own rows.
+--based on userid is the same as current_user.
+select 	count(*) as should_be_zero
+from 	copy_errors_test.copy_errors ce
+join	pg_class	pc	on pc.oid = ce.copy_destination
+join	pg_roles	pr 	on pr.oid = ce.userid
+where 	ce.userid != current_user::regrole::oid;
+
+SELECT	pc.relname,pr.rolname,ce.filename,ce.lineno,ce.line,ce.colname,
+		ce.raw_field_value,ce.err_message,ce.err_detail,ce.errorcode
+FROM	copy_errors_test.copy_errors ce
+JOIN 	pg_class pc ON pc.oid = ce.copy_destination
+JOIN 	pg_roles pr ON pr.oid = ce.userid;
+
+set role test_copy_errors2;
+COPY textrange_input(a, b, c) FROM STDIN WITH (save_error,FORMAT csv, FORCE_NULL *);
+(a",")),(]","a),(a","])
+[z","a],[z","2],[(","",")]
+\.
+
+SAVEPOINT s1;
+--current user (non-super user) are not allowed to update)
+update copy_errors_test.copy_errors set userid = 0;
+ROLLBACK to s1;
+
+--current user (non-super user) are allowed to delete all the record they created.
+delete from copy_errors_test.copy_errors;
+
+set role test_copy_errors1;
+
+SELECT	pc.relname,pr.rolname,ce.filename,ce.lineno,ce.line,ce.colname,
+		ce.raw_field_value,ce.err_message,ce.err_detail,ce.errorcode
+FROM	copy_errors_test.copy_errors ce
+JOIN 	pg_class pc ON pc.oid = ce.copy_destination
+JOIN 	pg_roles pr ON pr.oid = ce.userid;
+
+set role test_copy_errors3;
+--owner allowed to drop the table.
+drop table copy_errors;
+ROLLBACK;
 \pset null ''
 
 -- test case with whole-row Var in a check constraint
@@ -609,3 +729,25 @@ truncate copy_default;
 
 -- DEFAULT cannot be used in COPY TO
 copy (select 1 as test) TO stdout with (default '\D');
+
+-- DEFAULT WITH SAVE_ERROR.
+create table copy_default_error_save (
+	id integer,
+	text_value text not null default 'test',
+	ts_value timestamp without time zone not null default '2022-07-05'
+);
+copy copy_default_error_save from stdin with (save_error, default '\D');
+k	value	'2022-07-04'
+z	\D	'2022-07-03ASKL'
+s	\D	\D
+\.
+
+select 	ce.filename,ce.lineno,ce.line,
+		ce.colname, ce.raw_field_value,
+		ce.err_message, ce.err_detail,ce.errorcode
+from 	public.copy_errors ce
+join	pg_class	pc	on pc.oid = ce.copy_destination
+where	pc.relname = 'copy_default_error_save';
+
+drop table copy_default_error_save;
+truncate copy_default;
\ No newline at end of file
-- 
2.34.1

Reply via email to