On 2024-07-05 00:27, torikoshia wrote:
Hi,With the current file_fdw, if even one line of data conversion fails, the contents of the file cannot be referenced at all: =# \! cat data/test.data 1,a 2,b a,c =# create foreign table f_fdw_test_1 (i int, t text) server f_fdw options (filename 'test.data', format 'csv'); CREATE FOREIGN TABLE =# table f_fdw_test_1; ERROR: invalid input syntax for type integer: "a" CONTEXT: COPY f_fdw_test, line 3, column i: "a" Since we'll support ON_ERROR option which tolerates data conversion errors in COPY FROM and LOG_VERBOSITY option at v17[1], how about supporting them on file_fdw? This idea comes from Fujii-san[2], and I think it'd be useful when reading a bit dirty data. Attached PoC patch works like below: =# create foreign table f_fdw_test_2 (i int, t text) server f_fdw options (filename 'test.data', format 'csv', on_error 'ignore'); CREATE FOREIGN TABLE =# table f_fdw_test_2; NOTICE: 1 row was skipped due to data type incompatibility i | t ---+--- 1 | a 2 | b (2 rows) =# create foreign table f_fdw_test_3 (i int, t text) server f_fdw options (filename 'test.data', format 'csv', on_error 'ignore', log_verbosity 'verbose'); CREATE FOREIGN TABLE =# table f_fdw_test_3 ; NOTICE: skipping row due to data type incompatibility at line 3 for column i: "a" NOTICE: 1 row was skipped due to data type incompatibility i | t ---+--- 1 | a 2 | b (2 rows) I'm going to continue developing the patch(e.g. add doc, measure performance degradation) when people also think this feature is worth adding. What do you think? [1] https://www.postgresql.org/docs/devel/sql-copy.html [2] https://x.com/fujii_masao/status/1808178032219509041
Update the patch since v1 patch caused compiler warning. -- Regards, -- Atsushi Torikoshi NTT DATA Group Corporation
From b6295590977479ffb601c9848c6194a51d75e932 Mon Sep 17 00:00:00 2001 From: Atsushi Torikoshi <torikos...@oss.nttdata.com> Date: Fri, 19 Jul 2024 10:12:20 +0900 Subject: [PATCH v2] Add on_error and log_verbosity options to file_fdw 9e2d870, b725b7e and f5a2278 introduced on_error and log_verbosity to COPY. Since these options seems useful to file_fdw when accessing dirty data, this patch adds them to file_fdw. --- contrib/file_fdw/expected/file_fdw.out | 20 +++++++++ contrib/file_fdw/file_fdw.c | 60 +++++++++++++++++++++++--- contrib/file_fdw/sql/file_fdw.sql | 6 +++ 3 files changed, 79 insertions(+), 7 deletions(-) diff --git a/contrib/file_fdw/expected/file_fdw.out b/contrib/file_fdw/expected/file_fdw.out index 86c148a86b..1af79af20f 100644 --- a/contrib/file_fdw/expected/file_fdw.out +++ b/contrib/file_fdw/expected/file_fdw.out @@ -206,6 +206,26 @@ SELECT * FROM agg_csv c JOIN agg_text t ON (t.a = c.a) ORDER BY c.a; SELECT * FROM agg_bad; -- ERROR ERROR: invalid input syntax for type real: "aaa" CONTEXT: COPY agg_bad, line 3, column b: "aaa" +-- on_error and log_verbosity tests +ALTER FOREIGN TABLE agg_bad OPTIONS (ADD on_error 'ignore'); +SELECT * FROM agg_bad; +NOTICE: 1 row was skipped due to data type incompatibility + a | b +-----+-------- + 100 | 99.097 + 42 | 324.78 +(2 rows) + +ALTER FOREIGN TABLE agg_bad OPTIONS (ADD log_verbosity 'verbose'); +SELECT * FROM agg_bad; +NOTICE: skipping row due to data type incompatibility at line 3 for column b: "aaa" +NOTICE: 1 row was skipped due to data type incompatibility + a | b +-----+-------- + 100 | 99.097 + 42 | 324.78 +(2 rows) + -- misc query tests \t on SELECT explain_filter('EXPLAIN (VERBOSE, COSTS FALSE) SELECT * FROM agg_csv'); diff --git a/contrib/file_fdw/file_fdw.c b/contrib/file_fdw/file_fdw.c index 249d82d3a0..2dc2744393 100644 --- a/contrib/file_fdw/file_fdw.c +++ b/contrib/file_fdw/file_fdw.c @@ -22,8 +22,10 @@ #include "catalog/pg_authid.h" #include "catalog/pg_foreign_table.h" #include "commands/copy.h" +#include "commands/copyfrom_internal.h" #include "commands/defrem.h" #include "commands/explain.h" +#include "commands/progress.h" #include "commands/vacuum.h" #include "foreign/fdwapi.h" #include "foreign/foreign.h" @@ -34,6 +36,7 @@ #include "optimizer/planmain.h" #include "optimizer/restrictinfo.h" #include "utils/acl.h" +#include "utils/backend_progress.h" #include "utils/memutils.h" #include "utils/rel.h" #include "utils/sampling.h" @@ -74,6 +77,8 @@ static const struct FileFdwOption valid_options[] = { {"null", ForeignTableRelationId}, {"default", ForeignTableRelationId}, {"encoding", ForeignTableRelationId}, + {"on_error", ForeignTableRelationId}, + {"log_verbosity", ForeignTableRelationId}, {"force_not_null", AttributeRelationId}, {"force_null", AttributeRelationId}, @@ -724,12 +729,13 @@ fileIterateForeignScan(ForeignScanState *node) ExprContext *econtext; MemoryContext oldcontext; TupleTableSlot *slot = node->ss.ss_ScanTupleSlot; - bool found; + CopyFromState cstate = festate->cstate; + int64 skipped = 0; ErrorContextCallback errcallback; /* Set up callback to identify error line number. */ errcallback.callback = CopyFromErrorCallback; - errcallback.arg = (void *) festate->cstate; + errcallback.arg = (void *) cstate; errcallback.previous = error_context_stack; error_context_stack = &errcallback; @@ -750,10 +756,40 @@ fileIterateForeignScan(ForeignScanState *node) * switch in case we are doing that. */ oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); - found = NextCopyFrom(festate->cstate, econtext, - slot->tts_values, slot->tts_isnull); - if (found) + + for(;;) + { + if (!NextCopyFrom(cstate, econtext, + slot->tts_values, slot->tts_isnull)) + break; + + if (cstate->opts.on_error != COPY_ON_ERROR_STOP && + cstate->escontext->error_occurred) + { + /* + * Soft error occurred, skip this tuple and deal with error + * information according to ON_ERROR. + */ + if (cstate->opts.on_error == COPY_ON_ERROR_IGNORE) + + /* + * Just make ErrorSaveContext ready for the next NextCopyFrom. + * Since we don't set details_wanted and error_data is not to + * be filled, just resetting error_occurred is enough. + */ + cstate->escontext->error_occurred = false; + + /* Report that this tuple was skipped by the ON_ERROR clause */ + pgstat_progress_update_param(PROGRESS_COPY_TUPLES_SKIPPED, + ++skipped); + + /* Repeat NextCopyFrom() until no soft error occurs */ + continue; + } + ExecStoreVirtualTuple(slot); + break; + } /* Switch back to original memory context */ MemoryContextSwitchTo(oldcontext); @@ -795,8 +831,18 @@ fileEndForeignScan(ForeignScanState *node) FileFdwExecutionState *festate = (FileFdwExecutionState *) node->fdw_state; /* if festate is NULL, we are in EXPLAIN; nothing to do */ - if (festate) - EndCopyFrom(festate->cstate); + if (!festate) + return; + + if (festate->cstate->opts.on_error != COPY_ON_ERROR_STOP && + festate->cstate->num_errors > 0) + ereport(NOTICE, + errmsg_plural("%llu row was skipped due to data type incompatibility", + "%llu rows were skipped due to data type incompatibility", + (unsigned long long) festate->cstate->num_errors, + (unsigned long long) festate->cstate->num_errors)); + + EndCopyFrom(festate->cstate); } /* diff --git a/contrib/file_fdw/sql/file_fdw.sql b/contrib/file_fdw/sql/file_fdw.sql index f0548e14e1..5eae01d0f2 100644 --- a/contrib/file_fdw/sql/file_fdw.sql +++ b/contrib/file_fdw/sql/file_fdw.sql @@ -150,6 +150,12 @@ SELECT * FROM agg_csv c JOIN agg_text t ON (t.a = c.a) ORDER BY c.a; -- error context report tests SELECT * FROM agg_bad; -- ERROR +-- on_error and log_verbosity tests +ALTER FOREIGN TABLE agg_bad OPTIONS (ADD on_error 'ignore'); +SELECT * FROM agg_bad; +ALTER FOREIGN TABLE agg_bad OPTIONS (ADD log_verbosity 'verbose'); +SELECT * FROM agg_bad; + -- misc query tests \t on SELECT explain_filter('EXPLAIN (VERBOSE, COSTS FALSE) SELECT * FROM agg_csv'); -- 2.39.2