Hello, The SQL-MED specification defines the IMPORT FOREIGN SCHEMA statement.
This adds discoverability to foreign servers. The structure of the statement as I understand it is simple enough: IMPORT FOREIGN SCHEMA remote_schema FROM SERVER some_server [ (LIMIT TO | EXCEPT) table_list ] INTO local_schema. Is anyone working on this? I found a reference to this from 2010 in the archive, stating that work should be focused on core functionality, but nothing more recent. This would be very useful for postgres_fdw and other RDBMS-backed fdws, but I think even file_fdw could benefit from it if it was able to create a foreign table for every csv-with-header file in a directory. I can see a simple API working for that. A new function would be added to the fdw routine, which is responsible for crafting CreateForeignTableStmt. It could have the following signature: typedef List *(*ImportForeignSchema_function) (ForeignServer *server, ImportForeignSchemaStmt * parsetree); I experimented with this idea, and came up with the attached two patches: one for the core, and the other for actually implementing the API in postgres_fdw. Maybe those can serve as a proof-of-concept for discussing the design? -- Ronan Dunklau http://dalibo.com - http://dalibo.org
diff --git a/src/backend/commands/event_trigger.c b/src/backend/commands/event_trigger.c index 024a477..40a2540 100644 --- a/src/backend/commands/event_trigger.c +++ b/src/backend/commands/event_trigger.c @@ -250,7 +250,8 @@ check_ddl_tag(const char *tag) pg_strcasecmp(tag, "REFRESH MATERIALIZED VIEW") == 0 || pg_strcasecmp(tag, "ALTER DEFAULT PRIVILEGES") == 0 || pg_strcasecmp(tag, "ALTER LARGE OBJECT") == 0 || - pg_strcasecmp(tag, "DROP OWNED") == 0) + pg_strcasecmp(tag, "DROP OWNED") == 0 || + pg_strcasecmp(tag, "IMPORT FOREIGN SCHEMA") == 0) return EVENT_TRIGGER_COMMAND_TAG_OK; /* diff --git a/src/backend/commands/foreigncmds.c b/src/backend/commands/foreigncmds.c index 7f007d7..719c674 100644 --- a/src/backend/commands/foreigncmds.c +++ b/src/backend/commands/foreigncmds.c @@ -27,7 +27,9 @@ #include "catalog/pg_type.h" #include "catalog/pg_user_mapping.h" #include "commands/defrem.h" +#include "commands/tablecmds.h" #include "foreign/foreign.h" +#include "foreign/fdwapi.h" #include "miscadmin.h" #include "parser/parse_func.h" #include "utils/acl.h" @@ -1427,3 +1429,48 @@ CreateForeignTable(CreateForeignTableStmt *stmt, Oid relid) heap_close(ftrel, RowExclusiveLock); } + +/* + * Import a foreign schema + */ +void +ImportForeignSchema(ImportForeignSchemaStmt * stmt) +{ + Oid ownerId; + ForeignDataWrapper *fdw; + ForeignServer *server; + FdwRoutine *fdw_routine; + AclResult aclresult; + List *table_list = NULL; + ListCell *lc; + char * local_schema = strdup(stmt->local_schema); + + ownerId = GetUserId(); + server = GetForeignServerByName(stmt->servername, false); + aclresult = pg_foreign_server_aclcheck(server->serverid, ownerId, ACL_USAGE); + if (aclresult != ACLCHECK_OK) + aclcheck_error(aclresult, ACL_KIND_FOREIGN_SERVER, server->servername); + + /* Check the permissions on the schema */ + LookupCreationNamespace(local_schema); + + fdw = GetForeignDataWrapper(server->fdwid); + fdw_routine = GetFdwRoutine(fdw->fdwhandler); + if (fdw_routine->ImportForeignSchema == NULL) + { + ereport(ERROR, + (errcode(ERRCODE_FDW_NO_SCHEMAS), + errmsg("This FDW does not support schema importation"))); + } + table_list = fdw_routine->ImportForeignSchema(server, stmt); + foreach(lc, table_list) + { + CreateForeignTableStmt *create_stmt = lfirst(lc); + Oid relOid = DefineRelation((CreateStmt *) create_stmt, + RELKIND_FOREIGN_TABLE, + InvalidOid); + /* Override whatever the fdw set for the schema */ + create_stmt->base.relation->schemaname = local_schema; + CreateForeignTable(create_stmt, relOid); + } +} diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y index 81169a4..80c18cc 100644 --- a/src/backend/parser/gram.y +++ b/src/backend/parser/gram.y @@ -235,8 +235,8 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query); DropAssertStmt DropTrigStmt DropRuleStmt DropCastStmt DropRoleStmt DropUserStmt DropdbStmt DropTableSpaceStmt DropFdwStmt DropForeignServerStmt DropUserMappingStmt ExplainStmt FetchStmt - GrantStmt GrantRoleStmt IndexStmt InsertStmt ListenStmt LoadStmt - LockStmt NotifyStmt ExplainableStmt PreparableStmt + GrantStmt GrantRoleStmt IndexStmt ImportForeignSchemaStmt InsertStmt + ListenStmt LoadStmt LockStmt NotifyStmt ExplainableStmt PreparableStmt CreateFunctionStmt AlterFunctionStmt ReindexStmt RemoveAggrStmt RemoveFuncStmt RemoveOperStmt RenameStmt RevokeStmt RevokeRoleStmt RuleActionStmt RuleActionStmtOrEmpty RuleStmt @@ -319,6 +319,8 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query); %type <ival> defacl_privilege_target %type <defelt> DefACLOption %type <list> DefACLOptionList +%type <node> OptImportForeignSchemaRestriction +%type <ival> ImportForeignSchemaRestrictionType %type <list> stmtblock stmtmulti OptTableElementList TableElementList OptInherit definition @@ -552,7 +554,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query); HANDLER HAVING HEADER_P HOLD HOUR_P - IDENTITY_P IF_P ILIKE IMMEDIATE IMMUTABLE IMPLICIT_P IN_P + IDENTITY_P IF_P ILIKE IMMEDIATE IMMUTABLE IMPLICIT_P IN_P IMPORT INCLUDING INCREMENT INDEX INDEXES INHERIT INHERITS INITIALLY INLINE_P INNER_P INOUT INPUT_P INSENSITIVE INSERT INSTEAD INT_P INTEGER INTERSECT INTERVAL INTO INVOKER IS ISNULL ISOLATION @@ -797,6 +799,7 @@ stmt : | FetchStmt | GrantStmt | GrantRoleStmt + | ImportForeignSchemaStmt | IndexStmt | InsertStmt | ListenStmt @@ -1286,6 +1289,40 @@ schema_stmt: ; +/* + * IMPORT FOREIGN SCHEMA statement + */ +ImportForeignSchemaStmt: + IMPORT FOREIGN SCHEMA name FROM SERVER name + OptImportForeignSchemaRestriction INTO name { + ImportForeignSchemaStmt *n = makeNode(ImportForeignSchemaStmt); + n->remote_schema = $4; + n->servername = $7; + n->restriction = (ImportForeignSchemaRestriction * ) $8; + n->local_schema = $10; + $$ = (Node *) n; + }; + +ImportForeignSchemaRestrictionType: + LIMIT TO { $$ = IMPORT_LIMIT_TO; } + | EXCEPT { $$ = IMPORT_EXCEPT; }; + +OptImportForeignSchemaRestriction: + ImportForeignSchemaRestrictionType '(' relation_expr_list ')' { + ImportForeignSchemaRestriction * restriction = makeNode(ImportForeignSchemaRestriction); + restriction->restriction_type = $1; + restriction->table_list = $3; + $$ = (Node *) restriction; + } + | /*EMPTY*/ { + ImportForeignSchemaRestriction * restriction = makeNode(ImportForeignSchemaRestriction); + restriction->restriction_type = IMPORT_ALL; + restriction->table_list = NULL; + $$ = (Node *) restriction; + }; + + + /***************************************************************************** * * Set PG internal variable @@ -2701,6 +2738,7 @@ CreateStmt: CREATE OptTemp TABLE qualified_name '(' OptTableElementList ')' } ; + /* * Redundancy here is needed to avoid shift/reduce conflicts, * since TEMP is not a reserved word. See also OptTempTableName. @@ -12881,6 +12919,7 @@ unreserved_keyword: | IMMEDIATE | IMMUTABLE | IMPLICIT_P + | IMPORT | INCLUDING | INCREMENT | INDEX diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c index d1621ad..8d8b9b9 100644 --- a/src/backend/tcop/utility.c +++ b/src/backend/tcop/utility.c @@ -202,6 +202,7 @@ check_xact_readonly(Node *parsetree) case T_AlterTableSpaceOptionsStmt: case T_AlterTableSpaceMoveStmt: case T_CreateForeignTableStmt: + case T_ImportForeignSchemaStmt: case T_SecLabelStmt: PreventCommandIfReadOnly(CreateCommandTag(parsetree)); break; @@ -1316,6 +1317,10 @@ ProcessUtilitySlow(Node *parsetree, ExecAlterDefaultPrivilegesStmt((AlterDefaultPrivilegesStmt *) parsetree); break; + case T_ImportForeignSchemaStmt: + ImportForeignSchema((ImportForeignSchemaStmt *) parsetree); + break; + default: elog(ERROR, "unrecognized node type: %d", (int) nodeTag(parsetree)); @@ -1853,6 +1858,10 @@ CreateCommandTag(Node *parsetree) tag = "CREATE FOREIGN TABLE"; break; + case T_ImportForeignSchemaStmt: + tag = "IMPORT FOREIGN SCHEMA"; + break; + case T_DropStmt: switch (((DropStmt *) parsetree)->removeType) { @@ -2518,6 +2527,7 @@ GetCommandLogLevel(Node *parsetree) case T_CreateUserMappingStmt: case T_AlterUserMappingStmt: case T_DropUserMappingStmt: + case T_ImportForeignSchemaStmt: lev = LOGSTMT_DDL; break; diff --git a/src/include/commands/defrem.h b/src/include/commands/defrem.h index 5ec9374..ea7967f 100644 --- a/src/include/commands/defrem.h +++ b/src/include/commands/defrem.h @@ -124,6 +124,7 @@ extern Oid AlterUserMapping(AlterUserMappingStmt *stmt); extern Oid RemoveUserMapping(DropUserMappingStmt *stmt); extern void RemoveUserMappingById(Oid umId); extern void CreateForeignTable(CreateForeignTableStmt *stmt, Oid relid); +extern void ImportForeignSchema(ImportForeignSchemaStmt * stmt); extern Datum transformGenericOptions(Oid catalogId, Datum oldOptions, List *options, diff --git a/src/include/foreign/fdwapi.h b/src/include/foreign/fdwapi.h index 1b735da..adc9569 100644 --- a/src/include/foreign/fdwapi.h +++ b/src/include/foreign/fdwapi.h @@ -14,6 +14,7 @@ #include "nodes/execnodes.h" #include "nodes/relation.h" +#include "foreign/foreign.h" /* To avoid including explain.h here, reference ExplainState thus: */ struct ExplainState; @@ -100,6 +101,10 @@ typedef bool (*AnalyzeForeignTable_function) (Relation relation, AcquireSampleRowsFunc *func, BlockNumber *totalpages); +typedef List *(*ImportForeignSchema_function) (ForeignServer *server, + ImportForeignSchemaStmt * parsetree); + + /* * FdwRoutine is the struct returned by a foreign-data wrapper's handler * function. It provides pointers to the callback functions needed by the @@ -144,6 +149,9 @@ typedef struct FdwRoutine /* Support functions for ANALYZE */ AnalyzeForeignTable_function AnalyzeForeignTable; + + /* Support functions for IMPORT FOREIGN SCHEMA */ + ImportForeignSchema_function ImportForeignSchema; } FdwRoutine; diff --git a/src/include/nodes/nodes.h b/src/include/nodes/nodes.h index 5b8df59..474f25d 100644 --- a/src/include/nodes/nodes.h +++ b/src/include/nodes/nodes.h @@ -365,6 +365,7 @@ typedef enum NodeTag T_RefreshMatViewStmt, T_ReplicaIdentityStmt, T_AlterSystemStmt, + T_ImportForeignSchemaStmt, /* * TAGS FOR PARSE TREE NODES (parsenodes.h) @@ -406,6 +407,8 @@ typedef enum NodeTag T_XmlSerialize, T_WithClause, T_CommonTableExpr, + T_ImportForeignSchemaRestriction, + T_ImportForeignSchemaRestrictionType, /* * TAGS FOR REPLICATION GRAMMAR PARSE NODES (replnodes.h) diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h index b5011af..39044ef 100644 --- a/src/include/nodes/parsenodes.h +++ b/src/include/nodes/parsenodes.h @@ -1248,6 +1248,34 @@ typedef enum DropBehavior } DropBehavior; /* ---------------------- + * Import Foreign Schema statement + * ---------------------- + */ + +typedef enum ImportForeignSchemaRestrictionType +{ + IMPORT_ALL, + IMPORT_LIMIT_TO, + IMPORT_EXCEPT +} ImportForeignSchemaRestrictionType; + +typedef struct ImportForeignSchemaRestriction +{ + ImportForeignSchemaRestrictionType restriction_type; + List *table_list; +} ImportForeignSchemaRestriction; + +typedef struct ImportForeignSchemaStmt +{ + NodeTag type; + char *remote_schema; + char *servername; + char *local_schema; + ImportForeignSchemaRestriction * restriction; + List *table_names; +} ImportForeignSchemaStmt; + +/* ---------------------- * Alter Table * ---------------------- */ diff --git a/src/include/parser/kwlist.h b/src/include/parser/kwlist.h index 61fae22..74a94e4 100644 --- a/src/include/parser/kwlist.h +++ b/src/include/parser/kwlist.h @@ -184,6 +184,7 @@ PG_KEYWORD("ilike", ILIKE, TYPE_FUNC_NAME_KEYWORD) PG_KEYWORD("immediate", IMMEDIATE, UNRESERVED_KEYWORD) PG_KEYWORD("immutable", IMMUTABLE, UNRESERVED_KEYWORD) PG_KEYWORD("implicit", IMPLICIT_P, UNRESERVED_KEYWORD) +PG_KEYWORD("import", IMPORT, UNRESERVED_KEYWORD) PG_KEYWORD("in", IN_P, RESERVED_KEYWORD) PG_KEYWORD("including", INCLUDING, UNRESERVED_KEYWORD) PG_KEYWORD("increment", INCREMENT, UNRESERVED_KEYWORD)
diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c index fde1ec1..6c522ae 100644 --- a/contrib/postgres_fdw/postgres_fdw.c +++ b/contrib/postgres_fdw/postgres_fdw.c @@ -288,6 +288,7 @@ static void postgresExplainForeignModify(ModifyTableState *mtstate, static bool postgresAnalyzeForeignTable(Relation relation, AcquireSampleRowsFunc *func, BlockNumber *totalpages); +static List *postgresImportForeignSchema(ForeignServer *server, ImportForeignSchemaStmt * parsetree); /* * Helper functions @@ -328,6 +329,7 @@ static HeapTuple make_tuple_from_result_row(PGresult *res, List *retrieved_attrs, MemoryContext temp_context); static void conversion_error_callback(void *arg); +static PGresult *fetch_remote_tables(PGconn *conn, ImportForeignSchemaStmt * stmt); /* @@ -365,6 +367,9 @@ postgres_fdw_handler(PG_FUNCTION_ARGS) /* Support functions for ANALYZE */ routine->AnalyzeForeignTable = postgresAnalyzeForeignTable; + /* Support functions for IMPORT FOREIGN SCHEMA */ + routine->ImportForeignSchema = postgresImportForeignSchema; + PG_RETURN_POINTER(routine); } @@ -2300,6 +2305,170 @@ postgresAnalyzeForeignTable(Relation relation, return true; } +static PGresult * +fetch_remote_tables(PGconn *conn, ImportForeignSchemaStmt * stmt) +{ + StringInfoData buf; + int numparams; + const char **params = palloc0(sizeof(char *) * 2); + PGresult *res = NULL; + + initStringInfo(&buf); + params[0] = strdup((stmt->remote_schema)); + + // Check that the schema really exists + appendStringInfo(&buf, "SELECT 1 FROM pg_namespace WHERE nspname = $1"); + res = PQexecParams(conn, buf.data, 1, NULL, params, NULL, NULL, 0); + if (PQresultStatus(res) != PGRES_TUPLES_OK) + pgfdw_report_error(ERROR, res, conn, true, buf.data); + if(PQntuples(res) != 1) + { + ereport(ERROR, + (errcode(ERRCODE_FDW_SCHEMA_NOT_FOUND), + errmsg("The schema %s does not exist on the server", params[0]))); + } + + + // Fetch all tables from this schema + resetStringInfo(&buf); + appendStringInfo(&buf, + "SELECT relname, " + "array_agg(attname::name) as colnames, " + "array_agg(atttypid::regtype) as coltypes, " + "array_agg(atttypmod::int4) as coltypmod " + "FROM pg_class " + "INNER JOIN pg_namespace ON pg_class.relnamespace = pg_namespace.oid " + "LEFT JOIN pg_attribute ON pg_class.oid = pg_attribute.attrelid " + " AND pg_attribute.attnum >= 0 AND NOT pg_attribute.attisdropped " + "WHERE relkind IN ('r', 'v', 'f') " + "AND pg_namespace.nspname = $1 "); + if (stmt->restriction->restriction_type != IMPORT_ALL) + { + /* Add conditions */ + Oid outfuncoid; + bool isvarlena; + Datum *elems = palloc0(list_length(stmt->restriction->table_list) * sizeof(Datum)); + ArrayType *array; + int i = 0; + ListCell *lc; + FmgrInfo *fmout = palloc0(sizeof(FmgrInfo)); + getTypeOutputInfo(CSTRINGARRAYOID, &outfuncoid, &isvarlena); + fmgr_info(outfuncoid, fmout); + foreach(lc, stmt->restriction->table_list) + { + elems[i] = CStringGetDatum(((RangeVar *) lfirst(lc))->relname); + i++; + } + array = construct_array(elems, i, CSTRINGOID, -2, false, 'c'); + params[1] = OutputFunctionCall(fmout, PointerGetDatum(array)); + appendStringInfo(&buf, "AND "); + if (stmt->restriction->restriction_type == IMPORT_EXCEPT) + { + appendStringInfo(&buf, "NOT "); + } + appendStringInfo(&buf, "pg_class.relname = ANY($2)"); + numparams = 2; + pfree(fmout); + pfree(elems); + } + else + { + numparams = 1; + } + appendStringInfo(&buf, "GROUP BY pg_class.oid, pg_class.relname"); + res = PQexecParams(conn, buf.data, numparams, NULL, params, NULL, NULL, 0); + if (PQresultStatus(res) != PGRES_TUPLES_OK) + pgfdw_report_error(ERROR, res, conn, true, buf.data); + pfree(params); + return res; +} + +/* + * Map a remote schema to a local one. + */ +static List * +postgresImportForeignSchema(ForeignServer *server, + ImportForeignSchemaStmt * parsetree) +{ + List *tables = NULL; + Oid userid = GetUserId(); + UserMapping *mapping = GetUserMapping(userid, server->serverid); + PGconn *conn = GetConnection(server, mapping, false); + PGresult *res; + int numrows, + i; + + /* Initialize FmgrInfo for parsing arrays */ + FmgrInfo *fmgr_infos = palloc0(sizeof(FmgrInfo) * 3); + Oid typoid, + *typioparam = palloc0(sizeof(Oid) * 3); + ArrayIterator *arrays = palloc0(sizeof(ArrayIterator) * 3); + Oid column_types[3] = {NAMEARRAYOID, REGTYPEARRAYOID, INT4ARRAYOID}; + Datum *array_item = palloc0(sizeof(Datum) * 3); + bool *isnull = palloc0(sizeof(bool) * 3); + + for (i = 0; i < 3; i++) + { + getTypeInputInfo(column_types[i], &typoid, &typioparam[i]); + fmgr_info(typoid, &fmgr_infos[i]); + } + res = fetch_remote_tables(conn, parsetree); + numrows = PQntuples(res); + for (i = 0; i < numrows; i++) + { + CreateForeignTableStmt *stmt = makeNode(CreateForeignTableStmt); + char *tablename; + int colindex; + + tablename = PQgetvalue(res, i, 0); + /* setup the base relation information */ + stmt->base.relation = makeRangeVar(NULL, tablename, 0); + stmt->base.relation->schemaname = strdup(parsetree->local_schema); + stmt->servername = server->servername; + /* Parse arrays of columns from the result */ + for (colindex = 0; colindex < 3; colindex++) + { + Datum array_datum = InputFunctionCall(&fmgr_infos[colindex], + PQgetvalue(res, i, colindex + 1), + typioparam[colindex], -1); + + arrays[colindex] = array_create_iterator(DatumGetArrayTypeP(array_datum), 0); + } + /* add the individual columns */ + while (array_iterate(arrays[0], &array_item[0], &isnull[0]) && + array_iterate(arrays[1], &array_item[1], &isnull[1]) && + array_iterate(arrays[2], &array_item[2], &isnull[2])) + { + ColumnDef *new_column = makeNode(ColumnDef); + + new_column->colname = DatumGetCString(array_item[0]); + new_column->typeName = makeTypeNameFromOid(DatumGetObjectId(array_item[1]), + DatumGetInt32(array_item[2])); + stmt->base.tableElts = lappend(stmt->base.tableElts, new_column); + } + + /* + * Add schema_name and table_name options table_name is added to + * survive a foreign table rename. + */ + stmt->options = lappend(stmt->options, + makeDefElem("schema_name", (Node *) makeString(parsetree->remote_schema))); + stmt->options = lappend(stmt->options, + makeDefElem("table_name", (Node *) makeString(tablename))); + tables = lappend(tables, stmt); + } + /* Cleanup */ + PQclear(res); + ReleaseConnection(conn); + pfree(array_item); + pfree(fmgr_infos); + pfree(arrays); + pfree(typioparam); + pfree(isnull); + + return tables; +} + /* * Acquire a random sample of rows from foreign table managed by postgres_fdw. * diff --git a/src/include/catalog/pg_type.h b/src/include/catalog/pg_type.h index db18a23..47efb2f 100644 --- a/src/include/catalog/pg_type.h +++ b/src/include/catalog/pg_type.h @@ -445,6 +445,7 @@ DATA(insert OID = 1000 ( _bool PGNSP PGUID -1 f b A f t \054 0 16 0 array_in DATA(insert OID = 1001 ( _bytea PGNSP PGUID -1 f b A f t \054 0 17 0 array_in array_out array_recv array_send - - array_typanalyze i x f 0 -1 0 0 _null_ _null_ _null_ )); DATA(insert OID = 1002 ( _char PGNSP PGUID -1 f b A f t \054 0 18 0 array_in array_out array_recv array_send - - array_typanalyze i x f 0 -1 0 0 _null_ _null_ _null_ )); DATA(insert OID = 1003 ( _name PGNSP PGUID -1 f b A f t \054 0 19 0 array_in array_out array_recv array_send - - array_typanalyze i x f 0 -1 0 0 _null_ _null_ _null_ )); +#define NAMEARRAYOID 1003 DATA(insert OID = 1005 ( _int2 PGNSP PGUID -1 f b A f t \054 0 21 0 array_in array_out array_recv array_send - - array_typanalyze i x f 0 -1 0 0 _null_ _null_ _null_ )); #define INT2ARRAYOID 1005 DATA(insert OID = 1006 ( _int2vector PGNSP PGUID -1 f b A f t \054 0 22 0 array_in array_out array_recv array_send - - array_typanalyze i x f 0 -1 0 0 _null_ _null_ _null_ ));
signature.asc
Description: This is a digitally signed message part.