Hello, Currently we can not moves data from a file to a table based on some condition on a certain column but I think there are many use case for it that worth supporting. Attache is a patch for escaping a row that does not satisfy WHEN condition from inserting into a table and its work on the top of commit b68ff3ea672c06
and the syntax is COPY table_name [ ( column_name [, ...] ) ] FROM { 'filename' | PROGRAM 'command' | STDIN } [ [ WITH ] ( option [, ...] ) ] [ WHEN condition ] comment ? Regards Surafel
diff --git a/doc/src/sgml/ref/copy.sgml b/doc/src/sgml/ref/copy.sgml index 13a8b68d95..8088e779b6 100644 --- a/doc/src/sgml/ref/copy.sgml +++ b/doc/src/sgml/ref/copy.sgml @@ -25,6 +25,7 @@ PostgreSQL documentation COPY <replaceable class="parameter">table_name</replaceable> [ ( <replaceable class="parameter">column_name</replaceable> [, ...] ) ] FROM { '<replaceable class="parameter">filename</replaceable>' | PROGRAM '<replaceable class="parameter">command</replaceable>' | STDIN } [ [ WITH ] ( <replaceable class="parameter">option</replaceable> [, ...] ) ] + [ WHEN <replaceable class="parameter">condition</replaceable> ] COPY { <replaceable class="parameter">table_name</replaceable> [ ( <replaceable class="parameter">column_name</replaceable> [, ...] ) ] | ( <replaceable class="parameter">query</replaceable> ) } TO { '<replaceable class="parameter">filename</replaceable>' | PROGRAM '<replaceable class="parameter">command</replaceable>' | STDOUT } @@ -364,6 +365,24 @@ COPY { <replaceable class="parameter">table_name</replaceable> [ ( <replaceable </listitem> </varlistentry> + <varlistentry> + <term><literal>WHEN Clause</literal></term> + <listitem> + <para> + The optional <literal>WHEN</literal> clause has the general form +<synopsis> +WHEN <replaceable class="parameter">condition</replaceable> +</synopsis> + where <replaceable class="parameter">condition</replaceable> is + any expression that evaluates to a result of type + <type>boolean</type>. Any row that does not satisfy this + condition will not be inserted to the table. A row satisfies the + condition if it returns true when the actual row values are + substituted for any variable references. + </para> + </listitem> + </varlistentry> + </variablelist> </refsect1> diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c index 9bc67ce60f..1ce7d46058 100644 --- a/src/backend/commands/copy.c +++ b/src/backend/commands/copy.c @@ -38,7 +38,10 @@ #include "miscadmin.h" #include "optimizer/clauses.h" #include "optimizer/planner.h" +#include "optimizer/prep.h" #include "nodes/makefuncs.h" +#include "parser/parse_coerce.h" +#include "parser/parse_expr.h" #include "parser/parse_relation.h" #include "port/pg_bswap.h" #include "rewrite/rewriteHandler.h" @@ -147,6 +150,7 @@ typedef struct CopyStateData bool convert_selectively; /* do selective binary conversion? */ List *convert_select; /* list of column names (can be NIL) */ bool *convert_select_flags; /* per-column CSV/TEXT CS flags */ + Node *whenClause; /* qualifications */ /* these are just for error messages, see CopyFromErrorCallback */ const char *cur_relname; /* table name for error messages */ @@ -178,6 +182,7 @@ typedef struct CopyStateData ExprState **defexprs; /* array of default att expressions */ bool volatile_defexprs; /* is any of defexprs volatile? */ List *range_table; + ExprState *qualexpr; TransitionCaptureState *transition_capture; @@ -797,6 +802,7 @@ DoCopy(ParseState *pstate, const CopyStmt *stmt, Relation rel; Oid relid; RawStmt *query = NULL; + Node *when_cluase= NULL; /* * Disallow COPY to/from file or program except to users with the @@ -849,6 +855,21 @@ DoCopy(ParseState *pstate, const CopyStmt *stmt, rte = addRangeTableEntryForRelation(pstate, rel, NULL, false, false); rte->requiredPerms = (is_from ? ACL_INSERT : ACL_SELECT); + if(stmt->whenClause) + { + /* add rte to column namespace */ + addRTEtoQuery(pstate, rte, false, false, true); + + /* Transform the raw expression tree */ + when_cluase = transformExpr(pstate, stmt->whenClause, EXPR_KIND_OTHER); + + /* Make sure it yields a boolean result. */ + when_cluase = coerce_to_boolean(pstate, when_cluase, "WHEN"); + + when_cluase = (Node *) canonicalize_qual((Expr *) when_cluase, false); + when_cluase = (Node *) make_ands_implicit((Expr *) when_cluase); + } + tupDesc = RelationGetDescr(rel); attnums = CopyGetAttnums(tupDesc, rel, stmt->attlist); foreach(cur, attnums) @@ -997,6 +1018,7 @@ DoCopy(ParseState *pstate, const CopyStmt *stmt, cstate = BeginCopyFrom(pstate, rel, stmt->filename, stmt->is_program, NULL, stmt->attlist, stmt->options); + cstate->whenClause=when_cluase; *processed = CopyFrom(cstate); /* copy from file to database */ EndCopyFrom(cstate); } @@ -2534,6 +2556,10 @@ CopyFrom(CopyState cstate) ExecSetupChildParentMapForLeaf(proute); } + if(cstate->whenClause ) + cstate->qualexpr = ExecInitQual(castNode(List, cstate->whenClause), + &mtstate->ps); + /* * It's more efficient to prepare a bunch of tuples for insertion, and * insert them in one heap_multi_insert() call, than call heap_insert() @@ -2664,6 +2690,8 @@ CopyFrom(CopyState cstate) /* Switch into its memory context */ MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); +next_record: + if (!NextCopyFrom(cstate, econtext, values, nulls, &loaded_oid)) break; @@ -2686,6 +2714,13 @@ CopyFrom(CopyState cstate) slot = myslot; ExecStoreTuple(tuple, slot, InvalidBuffer, false); + if(cstate->whenClause) + { + econtext->ecxt_scantuple = myslot; + if (!ExecQual(cstate->qualexpr, econtext)) + goto next_record; + } + /* Determine the partition to heap_insert the tuple into */ if (proute) { diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c index 7c8220cf65..b3fe529619 100644 --- a/src/backend/nodes/copyfuncs.c +++ b/src/backend/nodes/copyfuncs.c @@ -3312,6 +3312,7 @@ _copyCopyStmt(const CopyStmt *from) COPY_SCALAR_FIELD(is_program); COPY_STRING_FIELD(filename); COPY_NODE_FIELD(options); + COPY_NODE_FIELD(whenClause); return newnode; } diff --git a/src/backend/nodes/equalfuncs.c b/src/backend/nodes/equalfuncs.c index 378f2facb8..471b628157 100644 --- a/src/backend/nodes/equalfuncs.c +++ b/src/backend/nodes/equalfuncs.c @@ -1221,6 +1221,7 @@ _equalCopyStmt(const CopyStmt *a, const CopyStmt *b) COMPARE_SCALAR_FIELD(is_program); COMPARE_STRING_FIELD(filename); COMPARE_NODE_FIELD(options); + COMPARE_NODE_FIELD(whenClause); return true; } diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y index 87f5e95827..3c24c9dd18 100644 --- a/src/backend/parser/gram.y +++ b/src/backend/parser/gram.y @@ -509,6 +509,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query); %type <defelt> copy_generic_opt_elem %type <list> copy_generic_opt_list copy_generic_opt_arg_list %type <list> copy_options +%type <node> opt_when_clause %type <typnam> Typename SimpleTypename ConstTypename GenericType Numeric opt_float @@ -2970,7 +2971,7 @@ ClosePortalStmt: *****************************************************************************/ CopyStmt: COPY opt_binary qualified_name opt_column_list opt_oids - copy_from opt_program copy_file_name copy_delimiter opt_with copy_options + copy_from opt_program copy_file_name copy_delimiter opt_with copy_options opt_when_clause { CopyStmt *n = makeNode(CopyStmt); n->relation = $3; @@ -2979,6 +2980,7 @@ CopyStmt: COPY opt_binary qualified_name opt_column_list opt_oids n->is_from = $6; n->is_program = $7; n->filename = $8; + n->whenClause = $12; if (n->is_program && n->filename == NULL) ereport(ERROR, @@ -3178,6 +3180,11 @@ copy_generic_opt_arg_list: } ; +opt_when_clause: + WHEN a_expr { $$ = $2; } + | /*EMPTY*/ { $$ = NULL; } + ; + /* beware of emitting non-string list elements here; see commands/define.c */ copy_generic_opt_arg_list_item: opt_boolean_or_string { $$ = (Node *) makeString($1); } diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h index 07ab1a3dde..9318a9e7b8 100644 --- a/src/include/nodes/parsenodes.h +++ b/src/include/nodes/parsenodes.h @@ -1959,6 +1959,7 @@ typedef struct CopyStmt bool is_program; /* is 'filename' a program to popen? */ char *filename; /* filename, or NULL for STDIN/STDOUT */ List *options; /* List of DefElem nodes */ + Node *whenClause; /* qualifications */ } CopyStmt; /* ---------------------- diff --git a/src/interfaces/ecpg/preproc/ecpg.addons b/src/interfaces/ecpg/preproc/ecpg.addons index ca3efadc48..707d47b385 100644 --- a/src/interfaces/ecpg/preproc/ecpg.addons +++ b/src/interfaces/ecpg/preproc/ecpg.addons @@ -192,7 +192,7 @@ ECPG: where_or_current_clauseWHERECURRENT_POFcursor_name block char *cursor_marker = $4[0] == ':' ? mm_strdup("$0") : $4; $$ = cat_str(2,mm_strdup("where current of"), cursor_marker); } -ECPG: CopyStmtCOPYopt_binaryqualified_nameopt_column_listopt_oidscopy_fromopt_programcopy_file_namecopy_delimiteropt_withcopy_options addon +ECPG: CopyStmtCOPYopt_binaryqualified_nameopt_column_listopt_oidscopy_fromopt_programcopy_file_namecopy_delimiteropt_withcopy_optionsopt_when_clause addon if (strcmp($6, "from") == 0 && (strcmp($7, "stdin") == 0 || strcmp($7, "stdout") == 0)) mmerror(PARSE_ERROR, ET_WARNING, "COPY FROM STDIN is not implemented");