FANNG1 commented on code in PR #6543:
URL: https://github.com/apache/gravitino/pull/6543#discussion_r1989377074


##########
flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/jdbc/GravitinoJdbcCatalogFactory.java:
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.flink.connector.jdbc;
+
+import java.util.Collections;
+import java.util.Set;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.util.Preconditions;
+import org.apache.gravitino.Catalog;
+import org.apache.gravitino.flink.connector.PartitionConverter;
+import org.apache.gravitino.flink.connector.catalog.BaseCatalogFactory;
+import org.apache.gravitino.flink.connector.utils.FactoryUtils;
+
+/**
+ * Factory for creating instances of {@link GravitinoJdbcCatalog}. It will be 
created by SPI
+ * discovery in Flink.
+ */
+public abstract class GravitinoJdbcCatalogFactory implements 
BaseCatalogFactory {
+
+  @Override
+  public org.apache.flink.table.catalog.Catalog createCatalog(Context context) 
{
+    final FactoryUtil.CatalogFactoryHelper helper =
+        FactoryUtils.createCatalogFactoryHelper(this, context);
+    String defaultDatabase =
+        
helper.getOptions().get(GravitinoJdbcCatalogFactoryOptions.DEFAULT_DATABASE);
+    Preconditions.checkNotNull(
+        defaultDatabase,
+        GravitinoJdbcCatalogFactoryOptions.DEFAULT_DATABASE.key() + " should 
not be null.");
+    return new GravitinoJdbcCatalog(
+        context, defaultDatabase, propertiesConverter(), partitionConverter());
+  }
+
+  @Override
+  public Catalog.Type gravitinoCatalogType() {
+    return Catalog.Type.RELATIONAL;
+  }
+
+  @Override
+  public PartitionConverter partitionConverter() {
+    return null;
+  }
+
+  @Override
+  public Set<ConfigOption<?>> requiredOptions() {

Review Comment:
   Should we place the following properties in requiredOptions?
   ```
     public static final String FLINK_JDBC_URL = "base-url";
     public static final String FLINK_JDBC_USER = "username";
     public static final String FLINK_JDBC_PASSWORD = "password";
     public static final String FLINK_JDBC_DEFAULT_DATABASE = 
"default-database";
   ```



##########
flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/catalog/BaseCatalog.java:
##########
@@ -284,8 +285,11 @@ public void createTable(ObjectPath tablePath, 
CatalogBaseTable table, boolean ig
     String comment = table.getComment();
     Map<String, String> properties =
         propertiesConverter.toGravitinoTableProperties(table.getOptions());
+
     Transform[] partitions =
-        partitionConverter.toGravitinoPartitions(((CatalogTable) 
table).getPartitionKeys());
+        partitionConverter != null

Review Comment:
   it's a little odd to make partitionConverter to null, how about create an 
specific partitionConverter for JDBC, throw exception explicitly for partition 
keys.



##########
flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/jdbc/MysqlPropertiesConverter.java:
##########
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.flink.connector.jdbc;
+
+public class MysqlPropertiesConverter extends JdbcPropertiesConverter {
+
+  public static final MysqlPropertiesConverter INSTANCE = new 
MysqlPropertiesConverter();
+
+  private MysqlPropertiesConverter() {}
+
+  @Override
+  public String driverName() {
+    return "com.mysql.jdbc.Driver";

Review Comment:
   There may multi Mysql drivers with different names, seems we need a more 
general solution to handle jdbc-driver



##########
flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/utils/TestUtils.java:
##########
@@ -48,14 +49,20 @@ public static void assertTableResult(
     }
   }
 
-  public static void assertColumns(Column[] expected, Column[] actual) {
+  public static void assertColumns(Column[] expected, Column[] actual, String 
provider) {

Review Comment:
   it's a little odd to pass `provider` to `assertColumns`, how about moving 
`assertColumns` to `FlinkCommonIT`,  and add an method `bool  
defaultValueWithNullLiterals` or a better name with default to false, JDBC 
override to true.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@gravitino.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to