On Fri, Mar 6, 2020 at 11:30 AM asaba.takan...@fujitsu.com < asaba.takan...@fujitsu.com> wrote:
> Hello Surafel, > > Sorry for my late reply. > > From: Surafel Temesgen <surafel3...@gmail.com> > >On Thu, Dec 12, 2019 at 7:51 AM mailto:asaba.takan...@fujitsu.com > <mailto:asaba.takan...@fujitsu.com> wrote: > >>2. I have a question about copy meta-command. > >>When I executed copy meta-command, output wasn't displayed. > >>Does it correspond to copy meta-command? > > > >Fixed > Thank you. > > I think we need regression test that constraint violating row is returned > back to the caller. > How about this? > > okay attached is a rebased patch with it regards Surafel
diff --git a/doc/src/sgml/ref/copy.sgml b/doc/src/sgml/ref/copy.sgml index a99f8155e4..845902b824 100644 --- a/doc/src/sgml/ref/copy.sgml +++ b/doc/src/sgml/ref/copy.sgml @@ -44,6 +44,7 @@ COPY { <replaceable class="parameter">table_name</replaceable> [ ( <replaceable FORCE_NOT_NULL ( <replaceable class="parameter">column_name</replaceable> [, ...] ) FORCE_NULL ( <replaceable class="parameter">column_name</replaceable> [, ...] ) ENCODING '<replaceable class="parameter">encoding_name</replaceable>' + ERROR_LIMIT <replaceable class="parameter">limit_number</replaceable> </synopsis> </refsynopsisdiv> @@ -355,6 +356,28 @@ COPY { <replaceable class="parameter">table_name</replaceable> [ ( <replaceable </listitem> </varlistentry> + <varlistentry> + <term><literal>ERROR_LIMIT</literal></term> + <listitem> + <para> + Enables ignoring of errored out rows up to <replaceable + class="parameter">limit_number</replaceable>. If <replaceable + class="parameter">limit_number</replaceable> is set + to -1, then all errors will be ignored. + </para> + + <para> + Currently, only unique or exclusion constraint violation + and rows formatting errors are ignored. Malformed + rows will rise warnings, while constraint violating rows + will be returned back to the caller unless any error is raised; + i.e. if any error is raised due to error_limit exceeds, no rows + will be returned back to the caller. + </para> + + </listitem> + </varlistentry> + <varlistentry> <term><literal>WHERE</literal></term> <listitem> diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c index e79ede4cb8..4184e2e755 100644 --- a/src/backend/commands/copy.c +++ b/src/backend/commands/copy.c @@ -20,6 +20,7 @@ #include "access/heapam.h" #include "access/htup_details.h" +#include "access/printtup.h" #include "access/sysattr.h" #include "access/tableam.h" #include "access/xact.h" @@ -48,6 +49,8 @@ #include "port/pg_bswap.h" #include "rewrite/rewriteHandler.h" #include "storage/fd.h" +#include "storage/lmgr.h" +#include "tcop/pquery.h" #include "tcop/tcopprot.h" #include "utils/builtins.h" #include "utils/lsyscache.h" @@ -153,6 +156,7 @@ typedef struct CopyStateData List *convert_select; /* list of column names (can be NIL) */ bool *convert_select_flags; /* per-column CSV/TEXT CS flags */ Node *whereClause; /* WHERE condition (or NULL) */ + int error_limit; /* total number of error to ignore */ /* these are just for error messages, see CopyFromErrorCallback */ const char *cur_relname; /* table name for error messages */ @@ -182,6 +186,9 @@ typedef struct CopyStateData bool volatile_defexprs; /* is any of defexprs volatile? */ List *range_table; ExprState *qualexpr; + bool ignore_error; /* is ignore error specified? */ + bool ignore_all_error; /* is error_limit -1 (ignore all error) + * specified? */ TransitionCaptureState *transition_capture; @@ -836,7 +843,7 @@ CopyLoadRawBuf(CopyState cstate) void DoCopy(ParseState *pstate, const CopyStmt *stmt, int stmt_location, int stmt_len, - uint64 *processed) + uint64 *processed, DestReceiver *dest) { CopyState cstate; bool is_from = stmt->is_from; @@ -1068,7 +1075,7 @@ DoCopy(ParseState *pstate, const CopyStmt *stmt, cstate = BeginCopyFrom(pstate, rel, stmt->filename, stmt->is_program, NULL, stmt->attlist, stmt->options); cstate->whereClause = whereClause; - *processed = CopyFrom(cstate); /* copy from file to database */ + *processed = CopyFrom(cstate, dest); /* copy from file to database */ EndCopyFrom(cstate); } else @@ -1290,6 +1297,18 @@ ProcessCopyOptions(ParseState *pstate, defel->defname), parser_errposition(pstate, defel->location))); } + else if (strcmp(defel->defname, "error_limit") == 0) + { + if (cstate->ignore_error) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("conflicting or redundant options"), + parser_errposition(pstate, defel->location))); + cstate->error_limit = defGetInt64(defel); + cstate->ignore_error = true; + if (cstate->error_limit == -1) + cstate->ignore_all_error = true; + } else ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), @@ -1440,6 +1459,10 @@ ProcessCopyOptions(ParseState *pstate, ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("CSV quote character must not appear in the NULL specification"))); + if (cstate->ignore_error && !cstate->is_copy_from) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("ERROR LIMIT only available using COPY FROM"))); } /* @@ -2653,7 +2676,7 @@ CopyMultiInsertInfoStore(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri, * Copy FROM file to relation. */ uint64 -CopyFrom(CopyState cstate) +CopyFrom(CopyState cstate, DestReceiver *dest) { ResultRelInfo *resultRelInfo; ResultRelInfo *target_resultRelInfo; @@ -2675,6 +2698,7 @@ CopyFrom(CopyState cstate) bool has_before_insert_row_trig; bool has_instead_insert_row_trig; bool leafpart_use_multi_insert = false; + Portal portal = NULL; Assert(cstate->rel); @@ -2838,7 +2862,19 @@ CopyFrom(CopyState cstate) /* Verify the named relation is a valid target for INSERT */ CheckValidResultRel(resultRelInfo, CMD_INSERT); - ExecOpenIndices(resultRelInfo, false); + if (cstate->ignore_error) + { + TupleDesc tupDesc; + + ExecOpenIndices(resultRelInfo, true); + tupDesc = RelationGetDescr(cstate->rel); + + portal = GetPortalByName(""); + SetRemoteDestReceiverParams(dest, portal); + dest->rStartup(dest, (int) CMD_SELECT, tupDesc); + } + else + ExecOpenIndices(resultRelInfo, false); estate->es_result_relations = resultRelInfo; estate->es_num_result_relations = 1; @@ -2943,6 +2979,13 @@ CopyFrom(CopyState cstate) */ insertMethod = CIM_SINGLE; } + else if (cstate->ignore_error) + { + /* + * Can't support speculative insertion in multi-inserts. + */ + insertMethod = CIM_SINGLE; + } else { /* @@ -3286,6 +3329,63 @@ CopyFrom(CopyState cstate) */ myslot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc); } + else if ((cstate->error_limit > 0 || cstate->ignore_all_error) && resultRelInfo->ri_NumIndices > 0) + { + /* Perform a speculative insertion. */ + uint32 specToken; + ItemPointerData conflictTid; + bool specConflict; + + /* + * Do a non-conclusive check for conflicts first. + */ + specConflict = false; + + if (!ExecCheckIndexConstraints(myslot, estate, &conflictTid, + NIL)) + { + (void) dest->receiveSlot(myslot, dest); + cstate->error_limit--; + continue; + } + + /* + * Acquire our speculative insertion lock. + */ + specToken = SpeculativeInsertionLockAcquire(GetCurrentTransactionId()); + + /* insert the tuple, with the speculative token */ + table_tuple_insert_speculative(resultRelInfo->ri_RelationDesc, myslot, + estate->es_output_cid, + 0, + NULL, + specToken); + + /* insert index entries for tuple */ + recheckIndexes = ExecInsertIndexTuples(myslot, estate, true, + &specConflict, + NIL); + + /* adjust the tuple's state accordingly */ + table_tuple_complete_speculative(resultRelInfo->ri_RelationDesc, myslot, + specToken, !specConflict); + + /* + * Wake up anyone waiting for our decision. + */ + SpeculativeInsertionLockRelease(GetCurrentTransactionId()); + + /* + * If there was a conflict, return it and preceded to + * the next record if there are any. + */ + if (specConflict) + { + (void) dest->receiveSlot(myslot, dest); + cstate->error_limit--; + continue; + } + } else { /* OK, store the tuple and create index entries for it */ @@ -3703,7 +3803,7 @@ NextCopyFrom(CopyState cstate, ExprContext *econtext, /* Initialize all values for row to NULL */ MemSet(values, 0, num_phys_attrs * sizeof(Datum)); MemSet(nulls, true, num_phys_attrs * sizeof(bool)); - +next_line: if (!cstate->binary) { char **field_strings; @@ -3718,9 +3818,21 @@ NextCopyFrom(CopyState cstate, ExprContext *econtext, /* check for overflowing fields */ if (attr_count > 0 && fldct > attr_count) - ereport(ERROR, - (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - errmsg("extra data after last expected column"))); + { + if (cstate->error_limit > 0 || cstate->ignore_all_error) + { + ereport(WARNING, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("skipping \"%s\" --- extra data after last expected column", + cstate->line_buf.data))); + cstate->error_limit--; + goto next_line; + } + else + ereport(ERROR, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("extra data after last expected column"))); + } fieldno = 0; @@ -3732,10 +3844,22 @@ NextCopyFrom(CopyState cstate, ExprContext *econtext, Form_pg_attribute att = TupleDescAttr(tupDesc, m); if (fieldno >= fldct) - ereport(ERROR, - (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - errmsg("missing data for column \"%s\"", - NameStr(att->attname)))); + { + if (cstate->error_limit > 0 || cstate->ignore_all_error) + { + ereport(WARNING, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("skipping \"%s\" --- missing data for column \"%s\"", + cstate->line_buf.data, NameStr(att->attname)))); + cstate->error_limit--; + goto next_line; + } + else + ereport(ERROR, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("missing data for column \"%s\"", + NameStr(att->attname)))); + } string = field_strings[fieldno++]; if (cstate->convert_select_flags && @@ -3822,10 +3946,23 @@ NextCopyFrom(CopyState cstate, ExprContext *econtext, } if (fld_count != attr_count) - ereport(ERROR, - (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - errmsg("row field count is %d, expected %d", - (int) fld_count, attr_count))); + { + if (cstate->error_limit > 0 || cstate->ignore_all_error) + { + ereport(WARNING, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("skipping \"%s\" --- row field count is %d, expected %d", + cstate->line_buf.data, (int) fld_count, attr_count))); + cstate->error_limit--; + goto next_line; + } + else + ereport(ERROR, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("row field count is %d, expected %d", + (int) fld_count, attr_count))); + + } i = 0; foreach(cur, cstate->attnumlist) diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index f8183cd488..817d0af002 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -784,7 +784,7 @@ copy_table(Relation rel) cstate = BeginCopyFrom(pstate, rel, NULL, false, copy_read_data, attnamelist, NIL); /* Do the copy */ - (void) CopyFrom(cstate); + (void) CopyFrom(cstate, NULL); logicalrep_rel_close(relmapentry, NoLock); } diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c index b1f7f6e2d0..59d7fed099 100644 --- a/src/backend/tcop/utility.c +++ b/src/backend/tcop/utility.c @@ -720,7 +720,7 @@ standard_ProcessUtility(PlannedStmt *pstmt, DoCopy(pstate, (CopyStmt *) parsetree, pstmt->stmt_location, pstmt->stmt_len, - &processed); + &processed, dest); if (qc) SetQueryCompletion(qc, CMDTAG_COPY, processed); } diff --git a/src/bin/psql/common.c b/src/bin/psql/common.c index 67df0cd2c7..34869aaec6 100644 --- a/src/bin/psql/common.c +++ b/src/bin/psql/common.c @@ -892,6 +892,7 @@ ProcessResult(PGresult **results) { bool success = true; bool first_cycle = true; + bool is_copy_in = false; for (;;) { @@ -1015,6 +1016,7 @@ ProcessResult(PGresult **results) copystream, PQbinaryTuples(*results), ©_result) && success; + is_copy_in = true; } ResetCancelConn(); @@ -1045,6 +1047,11 @@ ProcessResult(PGresult **results) first_cycle = false; } + /* Print returned result for COPY FROM with error_limit. */ + if (is_copy_in && !success && PQresultStatus(*results) != + PGRES_FATAL_ERROR) + (void) PrintQueryTuples(*results); + SetResultVariables(*results, success); /* may need this to recover from conn loss during COPY */ diff --git a/src/include/commands/copy.h b/src/include/commands/copy.h index c639833565..addd8054d6 100644 --- a/src/include/commands/copy.h +++ b/src/include/commands/copy.h @@ -25,7 +25,7 @@ typedef int (*copy_data_source_cb) (void *outbuf, int minread, int maxread); extern void DoCopy(ParseState *state, const CopyStmt *stmt, int stmt_location, int stmt_len, - uint64 *processed); + uint64 *processed, DestReceiver *dest); extern void ProcessCopyOptions(ParseState *pstate, CopyState cstate, bool is_from, List *options); extern CopyState BeginCopyFrom(ParseState *pstate, Relation rel, const char *filename, @@ -37,7 +37,7 @@ extern bool NextCopyFromRawFields(CopyState cstate, char ***fields, int *nfields); extern void CopyFromErrorCallback(void *arg); -extern uint64 CopyFrom(CopyState cstate); +extern uint64 CopyFrom(CopyState cstate, DestReceiver *dest); extern DestReceiver *CreateCopyDestReceiver(void); diff --git a/src/test/regress/expected/copy2.out b/src/test/regress/expected/copy2.out index e40287d25a..37d973cb20 100644 --- a/src/test/regress/expected/copy2.out +++ b/src/test/regress/expected/copy2.out @@ -1,5 +1,5 @@ CREATE TEMP TABLE x ( - a serial, + a serial UNIQUE, b int, c text not null default 'stuff', d text, @@ -55,6 +55,16 @@ LINE 1: COPY x TO stdout WHERE a = 1; ^ COPY x from stdin WHERE a = 50004; COPY x from stdin WHERE a > 60003; +COPY x from stdin WITH(ERROR_LIMIT 5); +WARNING: skipping "70001 22 32" --- missing data for column "d" +WARNING: skipping "70002 23 33 43 53 54" --- extra data after last expected column +WARNING: skipping "70003 24 34 44" --- missing data for column "e" + + a | b | c | d | e +-------+----+----+----+---------------------- + 70005 | 27 | 36 | 46 | before trigger fired +(1 row) + COPY x from stdin WHERE f > 60003; ERROR: column "f" does not exist LINE 1: COPY x from stdin WHERE f > 60003; @@ -102,12 +112,14 @@ SELECT * FROM x; 50004 | 25 | 35 | 45 | before trigger fired 60004 | 25 | 35 | 45 | before trigger fired 60005 | 26 | 36 | 46 | before trigger fired + 70004 | 25 | 35 | 45 | before trigger fired + 70005 | 26 | 36 | 46 | before trigger fired 1 | 1 | stuff | test_1 | after trigger fired 2 | 2 | stuff | test_2 | after trigger fired 3 | 3 | stuff | test_3 | after trigger fired 4 | 4 | stuff | test_4 | after trigger fired 5 | 5 | stuff | test_5 | after trigger fired -(28 rows) +(30 rows) -- check copy out COPY x TO stdout; @@ -134,6 +146,8 @@ COPY x TO stdout; 50004 25 35 45 before trigger fired 60004 25 35 45 before trigger fired 60005 26 36 46 before trigger fired +70004 25 35 45 before trigger fired +70005 26 36 46 before trigger fired 1 1 stuff test_1 after trigger fired 2 2 stuff test_2 after trigger fired 3 3 stuff test_3 after trigger fired @@ -163,6 +177,8 @@ Delimiter before trigger fired 35 before trigger fired 35 before trigger fired 36 before trigger fired +35 before trigger fired +36 before trigger fired stuff after trigger fired stuff after trigger fired stuff after trigger fired @@ -192,6 +208,8 @@ I'm null before trigger fired 25 before trigger fired 25 before trigger fired 26 before trigger fired +25 before trigger fired +26 before trigger fired 1 after trigger fired 2 after trigger fired 3 after trigger fired diff --git a/src/test/regress/sql/copy2.sql b/src/test/regress/sql/copy2.sql index 902f4fac19..64b9a51947 100644 --- a/src/test/regress/sql/copy2.sql +++ b/src/test/regress/sql/copy2.sql @@ -1,5 +1,5 @@ CREATE TEMP TABLE x ( - a serial, + a serial UNIQUE, b int, c text not null default 'stuff', d text, @@ -110,6 +110,15 @@ COPY x from stdin WHERE a > 60003; 60005 26 36 46 56 \. +COPY x from stdin WITH(ERROR_LIMIT 5); +70001 22 32 +70002 23 33 43 53 54 +70003 24 34 44 +70004 25 35 45 55 +70005 26 36 46 56 +70005 27 36 46 56 +\. + COPY x from stdin WHERE f > 60003; COPY x from stdin WHERE a = max(x.b);