JingsongLi commented on a change in pull request #11336: [FLINK-16471][jdbc] develop PostgresCatalog URL: https://github.com/apache/flink/pull/11336#discussion_r393408643
########## File path: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/catalog/PostgresCatalog.java ########## @@ -0,0 +1,323 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.io.jdbc.catalog; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.catalog.CatalogBaseTable; +import org.apache.flink.table.catalog.CatalogDatabase; +import org.apache.flink.table.catalog.CatalogDatabaseImpl; +import org.apache.flink.table.catalog.CatalogTableImpl; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; +import org.apache.flink.table.catalog.exceptions.TableNotExistException; +import org.apache.flink.table.types.DataType; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +/** + * Catalog for PostgreSQL. + */ +@Internal +public class PostgresCatalog extends AbstractJDBCCatalog { + + private static final Logger LOG = LoggerFactory.getLogger(PostgresCatalog.class); + + public static final String DEFAULT_DATABASE = "postgres"; + + // ------ Postgres default objects that shouldn't be exposed to users ------ + + private static final Set<String> builtinDatabases = new HashSet<String>() {{ + add("template0"); + add("template1"); + }}; + + private static final Set<String> builtinSchemas = new HashSet<String>() {{ + add("pg_toast"); + add("pg_temp_1"); + add("pg_toast_temp_1"); + add("pg_catalog"); + add("information_schema"); + }}; + + protected PostgresCatalog(String catalogName, String defaultDatabase, String username, String pwd, String baseUrl) { + super(catalogName, defaultDatabase, username, pwd, baseUrl); + } + + // ------ databases ------ + + @Override + public List<String> listDatabases() throws CatalogException { + List<String> pgDatabases = new ArrayList<>(); + + try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd)) { + + PreparedStatement ps = conn.prepareStatement("SELECT datname FROM pg_database;"); + + ResultSet rs = ps.executeQuery(); + + while (rs.next()) { + String dbName = rs.getString(1); + if (!builtinDatabases.contains(dbName)) { + pgDatabases.add(rs.getString(1)); + } + } + + return pgDatabases; + } catch (Exception e) { + throw new CatalogException( + String.format("Failed listing database in catalog %s", getName()), e); + } + } + + @Override + public CatalogDatabase getDatabase(String databaseName) throws DatabaseNotExistException, CatalogException { + if (listDatabases().contains(databaseName)) { + return new CatalogDatabaseImpl(Collections.emptyMap(), null); + } else { + throw new DatabaseNotExistException(getName(), databaseName); + } + } + + // ------ tables ------ + + @Override + public List<String> listTables(String databaseName) throws DatabaseNotExistException, CatalogException { + if (!databaseExists(databaseName)) { + throw new DatabaseNotExistException(getName(), databaseName); + } + + // get all schemas + try (Connection conn = DriverManager.getConnection(baseUrl + databaseName, username, pwd)) { + PreparedStatement ps = conn.prepareStatement("SELECT schema_name FROM information_schema.schemata;"); + + ResultSet rs = ps.executeQuery(); + + List<String> schemas = new ArrayList<>(); + + while (rs.next()) { + String pgSchema = rs.getString(1); + if (!builtinSchemas.contains(pgSchema)) { + schemas.add(pgSchema); + } + } + + List<String> tables = new ArrayList<>(); + + for (String schema : schemas) { + PreparedStatement stmt = conn.prepareStatement( + "SELECT * \n" + + "FROM information_schema.tables \n" + + "WHERE table_type = 'BASE TABLE' \n" + + " AND table_schema = ? \n" + + "ORDER BY table_type, table_name;"); + + stmt.setString(1, schema); + + ResultSet rstables = stmt.executeQuery(); + + while (rstables.next()) { + // position 1 is database name, position 2 is schema name, position 3 is table name + tables.add(schema + "." + rstables.getString(3)); + } + } + + return tables; + } catch (Exception e) { + throw new CatalogException( + String.format("Failed listing database in catalog %s", getName()), e); + } + } + + @Override + public CatalogBaseTable getTable(ObjectPath tablePath) throws TableNotExistException, CatalogException { + if (!tableExists(tablePath)) { + throw new TableNotExistException(getName(), tablePath); + } + + PostgresTablePath pgPath = PostgresTablePath.fromFlinkTableName(tablePath.getObjectName()); + + try (Connection conn = DriverManager.getConnection(baseUrl + tablePath.getDatabaseName(), username, pwd)) { + + PreparedStatement ps = conn.prepareStatement( + String.format("SELECT * FROM %s;", pgPath.getFullPath())); + + ResultSetMetaData rsmd = ps.getMetaData(); + + String[] names = new String[rsmd.getColumnCount()]; + DataType[] types = new DataType[rsmd.getColumnCount()]; + + for (int i = 1; i <= rsmd.getColumnCount(); i++) { + names[i - 1] = rsmd.getColumnName(i); + types[i - 1] = fromJDBCType(rsmd, i); + } + + TableSchema tableSchema = new TableSchema.Builder().fields(names, types).build(); + + return new CatalogTableImpl( + tableSchema, + new HashMap<>(), + "" + ); + } catch (Exception e) { + throw new CatalogException( + String.format("Failed getting table %s", tablePath.getFullName()), e); + } + } + + public static final String PG_BYTEA = "bytea"; + public static final String PG_BYTEA_ARRAY = "_bytea"; + public static final String PG_SMALLINT = "int2"; + public static final String PG_SMALLINT_ARRAY = "_int2"; + public static final String PG_INTEGER = "int4"; + public static final String PG_INTEGER_ARRAY = "_int4"; + public static final String PG_BIGINT = "int8"; + public static final String PG_BIGINT_ARRAY = "_int8"; + public static final String PG_REAL = "float4"; + public static final String PG_REAL_ARRAY = "_float4"; + public static final String PG_DOUBLE_PRECISION = "float8"; + public static final String PG_DOUBLE_PRECISION_ARRAY = "_float8"; + public static final String PG_NUMERIC = "numeric"; + public static final String PG_NUMERIC_ARRAY = "_numeric"; + public static final String PG_BOOLEAN = "bool"; + public static final String PG_BOOLEAN_ARRAY = "_bool"; + public static final String PG_TIMESTAMP = "timestamp"; + public static final String PG_TIMESTAMP_ARRAY = "_timestamp"; + public static final String PG_TIMESTAMPTZ = "timestamptz"; + public static final String PG_TIMESTAMPTZ_ARRAY = "_timestamptz"; + public static final String PG_DATE = "date"; + public static final String PG_DATE_ARRAY = "_date"; + public static final String PG_TIME = "time"; + public static final String PG_TIME_ARRAY = "_time"; + public static final String PG_TEXT = "text"; + public static final String PG_TEXT_ARRAY = "_text"; + public static final String PG_CHAR = "bpchar"; + public static final String PG_CHAR_ARRAY = "_bpchar"; + public static final String PG_CHARACTER = "character"; + public static final String PG_CHARACTER_ARRAY = "_character"; + public static final String PG_CHARACTER_VARYING = "varchar"; + public static final String PG_CHARACTER_VARYING_ARRAY = "_varchar"; + + // TODO: set precision for timestamp, timestamptz, and time Review comment: Why not do now? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services