On 2024-01-17 14:47, Masahiko Sawada wrote:
On Wed, Jan 17, 2024 at 2:22 PM torikoshia <torikos...@oss.nttdata.com> wrote:

Hi,

132de9968840c introduced SAVE_ERROR_TO option to COPY and enabled to
skip malformed data, but there is no way to watch the number of skipped
rows during COPY.

Attached patch adds tuples_skipped to pg_stat_progress_copy, which
counts the number of skipped tuples because source data is malformed.
If SAVE_ERROR_TO is not specified, this column remains zero.

The advantage would be that users can quickly notice and stop COPYing
when there is a larger amount of skipped data than expected, for
example.

As described in commit log, it is expected to add more choices for
SAVE_ERROR_TO like 'log' and using such options may enable us to know
the number of skipped tuples during COPY, but exposed in
pg_stat_progress_copy would be easier to monitor.


What do you think?

+1

The patch is pretty simple. Here is a comment:

+ (if <literal>SAVE_ERROR_TO</literal> is specified, otherwise zero).
+      </para></entry>
+     </row>

To be precise, this counter only advances when a value other than
'ERROR' is specified to SAVE_ERROR_TO option.

Thanks for your comment and review!

Updated the patch according to your comment and option name change by b725b7eec.


BTW, based on this patch, I think we can add another option which specifies the maximum tolerable number of malformed rows. I remember this was discussed in [1], and feel it would be useful when loading 'dirty' data but there is a limit to how dirty it can be.
Attached 0002 is WIP patch for this(I haven't added doc yet).

This may be better discussed in another thread, but any comments(e.g. necessity of this option, option name) are welcome.


[1] https://www.postgresql.org/message-id/752672.1699474336%40sss.pgh.pa.us

--
Regards,

--
Atsushi Torikoshi
NTT DATA Group Corporation
From 571ada768bdb68a31f295cbcb28f4348f253989d Mon Sep 17 00:00:00 2001
From: Atsushi Torikoshi <torikos...@oss.nttdata.com>
Date: Mon, 22 Jan 2024 23:57:24 +0900
Subject: [PATCH v2 1/2] Add tuples_skipped to pg_stat_progress_copy

132de9968840c enabled COPY to skip malformed data, but there is no way to watch
the number of skipped rows during COPY.

This patch adds tuples_skipped to pg_stat_progress_copy, which counts the
number of skipped tuple because source data is malformed.
This column only advances when a value other than stop is specified to ON_ERROR.

Needs catalog bump.
---
 doc/src/sgml/monitoring.sgml         | 11 +++++++++++
 src/backend/catalog/system_views.sql |  3 ++-
 src/backend/commands/copyfrom.c      |  5 +++++
 src/include/commands/progress.h      |  1 +
 src/test/regress/expected/rules.out  |  3 ++-
 5 files changed, 21 insertions(+), 2 deletions(-)

diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index 6e74138a69..cfc13b3580 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -5780,6 +5780,17 @@ FROM pg_stat_get_backend_idset() AS backendid;
        <command>WHERE</command> clause of the <command>COPY</command> command.
       </para></entry>
      </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>tuples_skipped</structfield> <type>bigint</type>
+      </para>
+      <para>
+       Number of tuples skipped because they contain malformed data.
+       This counter only advances when a value other than
+       <literal>stop</literal> is specified to <literal>ON_ERROR</literal>.
+      </para></entry>
+     </row>
     </tbody>
    </tgroup>
   </table>
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index e43e36f5ac..6288270e2b 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -1318,7 +1318,8 @@ CREATE VIEW pg_stat_progress_copy AS
         S.param1 AS bytes_processed,
         S.param2 AS bytes_total,
         S.param3 AS tuples_processed,
-        S.param4 AS tuples_excluded
+        S.param4 AS tuples_excluded,
+        S.param7 AS tuples_skipped
     FROM pg_stat_get_progress_info('COPY') AS S
         LEFT JOIN pg_database D ON S.datid = D.oid;
 
diff --git a/src/backend/commands/copyfrom.c b/src/backend/commands/copyfrom.c
index 173a736ad5..8ab3777664 100644
--- a/src/backend/commands/copyfrom.c
+++ b/src/backend/commands/copyfrom.c
@@ -650,6 +650,7 @@ CopyFrom(CopyFromState cstate)
 	CopyMultiInsertInfo multiInsertInfo = {0};	/* pacify compiler */
 	int64		processed = 0;
 	int64		excluded = 0;
+	int64		skipped = 0;
 	bool		has_before_insert_row_trig;
 	bool		has_instead_insert_row_trig;
 	bool		leafpart_use_multi_insert = false;
@@ -1012,6 +1013,10 @@ CopyFrom(CopyFromState cstate)
 				 */
 				cstate->escontext->error_occurred = false;
 
+			/* Report that this tuple was skipped by the ON_ERROR clause */
+			pgstat_progress_update_param(PROGRESS_COPY_TUPLES_SKIPPED,
+											 ++skipped);
+
 			continue;
 		}
 
diff --git a/src/include/commands/progress.h b/src/include/commands/progress.h
index a458c8c50a..73afa77a9c 100644
--- a/src/include/commands/progress.h
+++ b/src/include/commands/progress.h
@@ -142,6 +142,7 @@
 #define PROGRESS_COPY_TUPLES_EXCLUDED 3
 #define PROGRESS_COPY_COMMAND 4
 #define PROGRESS_COPY_TYPE 5
+#define PROGRESS_COPY_TUPLES_SKIPPED 6
 
 /* Commands of COPY (as advertised via PROGRESS_COPY_COMMAND) */
 #define PROGRESS_COPY_COMMAND_FROM 1
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index 55f2e95352..5e846b01e6 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -1988,7 +1988,8 @@ pg_stat_progress_copy| SELECT s.pid,
     s.param1 AS bytes_processed,
     s.param2 AS bytes_total,
     s.param3 AS tuples_processed,
-    s.param4 AS tuples_excluded
+    s.param4 AS tuples_excluded,
+    s.param7 AS tuples_skipped
    FROM (pg_stat_get_progress_info('COPY'::text) s(pid, datid, relid, param1, param2, param3, param4, param5, param6, param7, param8, param9, param10, param11, param12, param13, param14, param15, param16, param17, param18, param19, param20)
      LEFT JOIN pg_database d ON ((s.datid = d.oid)));
 pg_stat_progress_create_index| SELECT s.pid,

base-commit: b0f0a9432d0b6f53634a96715f2666f6d4ea25a1
-- 
2.39.2

From 26af14449d9b235f33f0b48c9741f874a6b6ac20 Mon Sep 17 00:00:00 2001
From: Atsushi Torikoshi <torikos...@oss.nttdata.com>
Date: Tue, 23 Jan 2024 00:04:25 +0900
Subject: [PATCH v2 2/2] Add new COPY option REJECT_LIMIT

REJECT_LIMIT specifies the maximum tolerable number of malformed rows.
If input data has more malformed errors than this value, entire COPY fails.
This option must be used with ON_ERROR to be set to other than stop.
---
 src/backend/commands/copy.c         | 16 ++++++++++++++++
 src/backend/commands/copyfrom.c     |  5 +++++
 src/include/commands/copy.h         |  1 +
 src/test/regress/expected/copy2.out | 17 +++++++++++++++++
 src/test/regress/sql/copy2.sql      | 29 +++++++++++++++++++++++++++++
 5 files changed, 68 insertions(+)

diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index cc0786c6f4..ca5263d588 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -615,6 +615,22 @@ ProcessCopyOptions(ParseState *pstate,
 			on_error_specified = true;
 			opts_out->on_error = defGetCopyOnErrorChoice(defel, pstate, is_from);
 		}
+		else if (strcmp(defel->defname, "reject_limit") == 0)
+		{
+			int64	reject_limit = defGetInt64(defel);
+
+			if (!opts_out->on_error)
+				ereport(ERROR,
+						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+						 errmsg("REJECT_LIMIT requires ON_ERROR to be set to other than stop")));
+			if (opts_out->reject_limit > 0)
+				errorConflictingDefElem(defel, pstate);
+			if (reject_limit <= 0)
+				ereport(ERROR,
+						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+						 errmsg("REJECT_LIMIT must be greater than zero")));
+			opts_out->reject_limit = reject_limit;
+		}
 		else
 			ereport(ERROR,
 					(errcode(ERRCODE_SYNTAX_ERROR),
diff --git a/src/backend/commands/copyfrom.c b/src/backend/commands/copyfrom.c
index 8ab3777664..a5902f3887 100644
--- a/src/backend/commands/copyfrom.c
+++ b/src/backend/commands/copyfrom.c
@@ -1017,6 +1017,11 @@ CopyFrom(CopyFromState cstate)
 			pgstat_progress_update_param(PROGRESS_COPY_TUPLES_SKIPPED,
 											 ++skipped);
 
+			if (cstate->opts.reject_limit > 0 && skipped > cstate->opts.reject_limit)
+				ereport(ERROR,
+						(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
+						 errmsg("exceeded the number specified by REJECT LIMIT \"%d\"",
+								cstate->opts.reject_limit)));
 			continue;
 		}
 
diff --git a/src/include/commands/copy.h b/src/include/commands/copy.h
index b3da3cb0be..8f8dab9524 100644
--- a/src/include/commands/copy.h
+++ b/src/include/commands/copy.h
@@ -73,6 +73,7 @@ typedef struct CopyFormatOptions
 	bool	   *force_null_flags;	/* per-column CSV FN flags */
 	bool		convert_selectively;	/* do selective binary conversion? */
 	CopyOnErrorChoice on_error; /* what to do when error happened */
+	int			reject_limit;	/* tolerable number of malformed rows */
 	List	   *convert_select; /* list of column names (can be NIL) */
 } CopyFormatOptions;
 
diff --git a/src/test/regress/expected/copy2.out b/src/test/regress/expected/copy2.out
index 25c401ce34..4f12a4f2cb 100644
--- a/src/test/regress/expected/copy2.out
+++ b/src/test/regress/expected/copy2.out
@@ -751,6 +751,23 @@ CONTEXT:  COPY check_ign_err, line 1: "1	{1}"
 COPY check_ign_err FROM STDIN WITH (on_error ignore);
 ERROR:  extra data after last expected column
 CONTEXT:  COPY check_ign_err, line 1: "1	{1}	3	abc"
+-- tests for reject_limit option
+COPY check_ign_err FROM STDIN WITH (on_error ignore, reject_limit 3);
+ERROR:  exceeded the number specified by REJECT LIMIT "3"
+CONTEXT:  COPY check_ign_err, line 5, column n: ""
+COPY check_ign_err FROM STDIN WITH (on_error ignore, reject_limit 4);
+NOTICE:  4 rows were skipped due to data type incompatibility
+-- test reject_limit without on_error option: should fail
+COPY check_ign_err FROM STDIN WITH (reject_limit 3);
+ERROR:  REJECT_LIMIT requires ON_ERROR to be set to other than stop
+-- test reject_limit specified string value: should fail
+COPY check_ign_err FROM STDIN WITH (on_error ignore, reject_limit foo);
+ERROR:  reject_limit requires a numeric value
+-- test reject_limit specified less than or equal to 0: should fail
+COPY check_ign_err FROM STDIN WITH (on_error ignore, reject_limit -3);
+ERROR:  REJECT_LIMIT must be greater than zero
+COPY check_ign_err FROM STDIN WITH (on_error ignore, reject_limit 0);
+ERROR:  REJECT_LIMIT must be greater than zero
 -- clean up
 DROP TABLE forcetest;
 DROP TABLE vistest;
diff --git a/src/test/regress/sql/copy2.sql b/src/test/regress/sql/copy2.sql
index b5e549e856..0b3fd59ba0 100644
--- a/src/test/regress/sql/copy2.sql
+++ b/src/test/regress/sql/copy2.sql
@@ -534,6 +534,35 @@ COPY check_ign_err FROM STDIN WITH (on_error ignore);
 1	{1}	3	abc
 \.
 
+-- tests for reject_limit option
+COPY check_ign_err FROM STDIN WITH (on_error ignore, reject_limit 3);
+6	{6}	6
+a	{7}	7
+7	{7}	7777777777
+8	{a, 8}	8
+
+9	{9}	9
+\.
+
+COPY check_ign_err FROM STDIN WITH (on_error ignore, reject_limit 4);
+6	{6}	6
+a	{7}	7
+7	{7}	7777777777
+8	{a, 8}	8
+
+9	{9}	9
+\.
+
+-- test reject_limit without on_error option: should fail
+COPY check_ign_err FROM STDIN WITH (reject_limit 3);
+
+-- test reject_limit specified string value: should fail
+COPY check_ign_err FROM STDIN WITH (on_error ignore, reject_limit foo);
+
+-- test reject_limit specified less than or equal to 0: should fail
+COPY check_ign_err FROM STDIN WITH (on_error ignore, reject_limit -3);
+COPY check_ign_err FROM STDIN WITH (on_error ignore, reject_limit 0);
+
 -- clean up
 DROP TABLE forcetest;
 DROP TABLE vistest;
-- 
2.39.2

Reply via email to