FANNG1 commented on code in PR #6543: URL: https://github.com/apache/gravitino/pull/6543#discussion_r1989377074
########## flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/jdbc/GravitinoJdbcCatalogFactory.java: ########## @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.flink.connector.jdbc; + +import java.util.Collections; +import java.util.Set; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.table.factories.FactoryUtil; +import org.apache.flink.util.Preconditions; +import org.apache.gravitino.Catalog; +import org.apache.gravitino.flink.connector.PartitionConverter; +import org.apache.gravitino.flink.connector.catalog.BaseCatalogFactory; +import org.apache.gravitino.flink.connector.utils.FactoryUtils; + +/** + * Factory for creating instances of {@link GravitinoJdbcCatalog}. It will be created by SPI + * discovery in Flink. + */ +public abstract class GravitinoJdbcCatalogFactory implements BaseCatalogFactory { + + @Override + public org.apache.flink.table.catalog.Catalog createCatalog(Context context) { + final FactoryUtil.CatalogFactoryHelper helper = + FactoryUtils.createCatalogFactoryHelper(this, context); + String defaultDatabase = + helper.getOptions().get(GravitinoJdbcCatalogFactoryOptions.DEFAULT_DATABASE); + Preconditions.checkNotNull( + defaultDatabase, + GravitinoJdbcCatalogFactoryOptions.DEFAULT_DATABASE.key() + " should not be null."); + return new GravitinoJdbcCatalog( + context, defaultDatabase, propertiesConverter(), partitionConverter()); + } + + @Override + public Catalog.Type gravitinoCatalogType() { + return Catalog.Type.RELATIONAL; + } + + @Override + public PartitionConverter partitionConverter() { + return null; + } + + @Override + public Set<ConfigOption<?>> requiredOptions() { Review Comment: Should we place the following properties in requiredOptions? ``` public static final String FLINK_JDBC_URL = "base-url"; public static final String FLINK_JDBC_USER = "username"; public static final String FLINK_JDBC_PASSWORD = "password"; public static final String FLINK_JDBC_DEFAULT_DATABASE = "default-database"; ``` ########## flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/catalog/BaseCatalog.java: ########## @@ -284,8 +285,11 @@ public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ig String comment = table.getComment(); Map<String, String> properties = propertiesConverter.toGravitinoTableProperties(table.getOptions()); + Transform[] partitions = - partitionConverter.toGravitinoPartitions(((CatalogTable) table).getPartitionKeys()); + partitionConverter != null Review Comment: it's a little odd to make partitionConverter to null, how about create an specific partitionConverter for JDBC, throw exception explicitly for partition keys. ########## flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/jdbc/MysqlPropertiesConverter.java: ########## @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.flink.connector.jdbc; + +public class MysqlPropertiesConverter extends JdbcPropertiesConverter { + + public static final MysqlPropertiesConverter INSTANCE = new MysqlPropertiesConverter(); + + private MysqlPropertiesConverter() {} + + @Override + public String driverName() { + return "com.mysql.jdbc.Driver"; Review Comment: There may multi Mysql drivers with different names, seems we need a more general solution to handle jdbc-driver ########## flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/utils/TestUtils.java: ########## @@ -48,14 +49,20 @@ public static void assertTableResult( } } - public static void assertColumns(Column[] expected, Column[] actual) { + public static void assertColumns(Column[] expected, Column[] actual, String provider) { Review Comment: it's a little odd to pass `provider` to `assertColumns`, how about moving `assertColumns` to `FlinkCommonIT`, and add an method `bool defaultValueWithNullLiterals` or a better name with default to false, JDBC override to true. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@gravitino.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org