This is an automated email from the ASF dual-hosted git repository.

fanng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/main by this push:
     new 9695c28cc [#5867] feat(flink-connector): Improve flink connector 
(#5868)
9695c28cc is described below

commit 9695c28ccf5ebb8e7e0f2f17553a63270b88adef
Author: Xiaojian Sun <sunxiaojian...@163.com>
AuthorDate: Thu Dec 19 10:01:42 2024 +0800

    [#5867] feat(flink-connector): Improve flink connector (#5868)
    
    ### What changes were proposed in this pull request?
    
    1. Remove useless code
    2. Optimize the logic of the store, and when adding connector support,
    there is no need to modify the logic of the store.
    
    ### Why are the changes needed?
    
    Fix : [#5867](https://github.com/apache/gravitino/issues/5867)
    
    ### Does this PR introduce any user-facing change?
    No.
    
    ### How was this patch tested?
    FlinkHiveCatalogIT
---
 .../flink/connector/PartitionConverter.java        |  4 +-
 .../flink/connector/catalog/BaseCatalog.java       | 14 ++--
 .../connector/catalog/BaseCatalogFactory.java      | 56 +++++++++++++
 .../flink/connector/hive/GravitinoHiveCatalog.java | 15 +---
 .../hive/GravitinoHiveCatalogFactory.java          | 49 ++++++++++-
 .../connector/store/GravitinoCatalogStore.java     | 96 ++++++++++++++--------
 6 files changed, 176 insertions(+), 58 deletions(-)

diff --git 
a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/PartitionConverter.java
 
b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/PartitionConverter.java
index 5464c705a..e8029e567 100644
--- 
a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/PartitionConverter.java
+++ 
b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/PartitionConverter.java
@@ -34,7 +34,7 @@ public interface PartitionConverter {
    * @param partitions The partition keys in Gravitino.
    * @return The partition keys in Flink.
    */
-  public abstract List<String> toFlinkPartitionKeys(Transform[] partitions);
+  List<String> toFlinkPartitionKeys(Transform[] partitions);
 
   /**
    * Convert the partition keys to Gravitino partition keys.
@@ -42,5 +42,5 @@ public interface PartitionConverter {
    * @param partitionsKey The partition keys in Flink.
    * @return The partition keys in Gravitino.
    */
-  public abstract Transform[] toGravitinoPartitions(List<String> 
partitionsKey);
+  Transform[] toGravitinoPartitions(List<String> partitionsKey);
 }
diff --git 
a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/catalog/BaseCatalog.java
 
b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/catalog/BaseCatalog.java
index 6b76e31b8..149674217 100644
--- 
a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/catalog/BaseCatalog.java
+++ 
b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/catalog/BaseCatalog.java
@@ -85,10 +85,14 @@ public abstract class BaseCatalog extends AbstractCatalog {
   private final PropertiesConverter propertiesConverter;
   private final PartitionConverter partitionConverter;
 
-  protected BaseCatalog(String catalogName, String defaultDatabase) {
+  protected BaseCatalog(
+      String catalogName,
+      String defaultDatabase,
+      PropertiesConverter propertiesConverter,
+      PartitionConverter partitionConverter) {
     super(catalogName, defaultDatabase);
-    this.propertiesConverter = getPropertiesConverter();
-    this.partitionConverter = getPartitionConverter();
+    this.propertiesConverter = propertiesConverter;
+    this.partitionConverter = partitionConverter;
   }
 
   protected abstract AbstractCatalog realCatalog();
@@ -508,10 +512,6 @@ public abstract class BaseCatalog extends AbstractCatalog {
     throw new UnsupportedOperationException();
   }
 
-  protected abstract PropertiesConverter getPropertiesConverter();
-
-  protected abstract PartitionConverter getPartitionConverter();
-
   protected CatalogBaseTable toFlinkTable(Table table) {
     org.apache.flink.table.api.Schema.Builder builder =
         org.apache.flink.table.api.Schema.newBuilder();
diff --git 
a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/catalog/BaseCatalogFactory.java
 
b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/catalog/BaseCatalogFactory.java
new file mode 100644
index 000000000..5086b5325
--- /dev/null
+++ 
b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/catalog/BaseCatalogFactory.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.flink.connector.catalog;
+
+import org.apache.flink.table.factories.CatalogFactory;
+import org.apache.gravitino.Catalog;
+import org.apache.gravitino.flink.connector.PartitionConverter;
+import org.apache.gravitino.flink.connector.PropertiesConverter;
+
+public interface BaseCatalogFactory extends CatalogFactory {
+
+  /**
+   * Define gravitino catalog provider {@link 
org.apache.gravitino.CatalogProvider}.
+   *
+   * @return The requested gravitino catalog provider.
+   */
+  String gravitinoCatalogProvider();
+
+  /**
+   * Define gravitino catalog type {@link Catalog.Type}.
+   *
+   * @return The requested gravitino catalog type.
+   */
+  Catalog.Type gravitinoCatalogType();
+
+  /**
+   * Define properties converter {@link PropertiesConverter}.
+   *
+   * @return The requested property converter.
+   */
+  PropertiesConverter propertiesConverter();
+
+  /**
+   * Define partition converter.
+   *
+   * @return The requested partition converter.
+   */
+  PartitionConverter partitionConverter();
+}
diff --git 
a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/hive/GravitinoHiveCatalog.java
 
b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/hive/GravitinoHiveCatalog.java
index b47545968..3e5d31fd3 100644
--- 
a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/hive/GravitinoHiveCatalog.java
+++ 
b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/hive/GravitinoHiveCatalog.java
@@ -24,7 +24,6 @@ import org.apache.flink.table.catalog.AbstractCatalog;
 import org.apache.flink.table.catalog.exceptions.CatalogException;
 import org.apache.flink.table.catalog.hive.HiveCatalog;
 import org.apache.flink.table.factories.Factory;
-import org.apache.gravitino.flink.connector.DefaultPartitionConverter;
 import org.apache.gravitino.flink.connector.PartitionConverter;
 import org.apache.gravitino.flink.connector.PropertiesConverter;
 import org.apache.gravitino.flink.connector.catalog.BaseCatalog;
@@ -41,9 +40,11 @@ public class GravitinoHiveCatalog extends BaseCatalog {
   GravitinoHiveCatalog(
       String catalogName,
       String defaultDatabase,
+      PropertiesConverter propertiesConverter,
+      PartitionConverter partitionConverter,
       @Nullable HiveConf hiveConf,
       @Nullable String hiveVersion) {
-    super(catalogName, defaultDatabase);
+    super(catalogName, defaultDatabase, propertiesConverter, 
partitionConverter);
     this.hiveCatalog = new HiveCatalog(catalogName, defaultDatabase, hiveConf, 
hiveVersion);
   }
 
@@ -68,16 +69,6 @@ public class GravitinoHiveCatalog extends BaseCatalog {
     return hiveCatalog.getFactory();
   }
 
-  @Override
-  protected PropertiesConverter getPropertiesConverter() {
-    return HivePropertiesConverter.INSTANCE;
-  }
-
-  @Override
-  protected PartitionConverter getPartitionConverter() {
-    return DefaultPartitionConverter.INSTANCE;
-  }
-
   @Override
   protected AbstractCatalog realCatalog() {
     return hiveCatalog;
diff --git 
a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/hive/GravitinoHiveCatalogFactory.java
 
b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/hive/GravitinoHiveCatalogFactory.java
index 91eaa4e16..23607ebb4 100644
--- 
a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/hive/GravitinoHiveCatalogFactory.java
+++ 
b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/hive/GravitinoHiveCatalogFactory.java
@@ -28,8 +28,11 @@ import org.apache.flink.table.catalog.Catalog;
 import org.apache.flink.table.catalog.hive.HiveCatalog;
 import org.apache.flink.table.catalog.hive.factories.HiveCatalogFactory;
 import org.apache.flink.table.catalog.hive.factories.HiveCatalogFactoryOptions;
-import org.apache.flink.table.factories.CatalogFactory;
 import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.gravitino.flink.connector.DefaultPartitionConverter;
+import org.apache.gravitino.flink.connector.PartitionConverter;
+import org.apache.gravitino.flink.connector.PropertiesConverter;
+import org.apache.gravitino.flink.connector.catalog.BaseCatalogFactory;
 import org.apache.gravitino.flink.connector.utils.FactoryUtils;
 import org.apache.gravitino.flink.connector.utils.PropertyUtils;
 import org.apache.hadoop.hive.conf.HiveConf;
@@ -38,7 +41,7 @@ import org.apache.hadoop.hive.conf.HiveConf;
  * Factory for creating instances of {@link GravitinoHiveCatalog}. It will be 
created by SPI
  * discovery in Flink.
  */
-public class GravitinoHiveCatalogFactory implements CatalogFactory {
+public class GravitinoHiveCatalogFactory implements BaseCatalogFactory {
   private HiveCatalogFactory hiveCatalogFactory;
 
   @Override
@@ -60,6 +63,8 @@ public class GravitinoHiveCatalogFactory implements 
CatalogFactory {
     return new GravitinoHiveCatalog(
         context.getName(),
         helper.getOptions().get(HiveCatalogFactoryOptions.DEFAULT_DATABASE),
+        propertiesConverter(),
+        partitionConverter(),
         hiveConf,
         helper.getOptions().get(HiveCatalogFactoryOptions.HIVE_VERSION));
   }
@@ -81,4 +86,44 @@ public class GravitinoHiveCatalogFactory implements 
CatalogFactory {
   public Set<ConfigOption<?>> optionalOptions() {
     return hiveCatalogFactory.optionalOptions();
   }
+
+  /**
+   * Define gravitino catalog provider {@link 
org.apache.gravitino.CatalogProvider}.
+   *
+   * @return The requested gravitino catalog provider.
+   */
+  @Override
+  public String gravitinoCatalogProvider() {
+    return "hive";
+  }
+
+  /**
+   * Define gravitino catalog type {@link org.apache.gravitino.Catalog.Type}.
+   *
+   * @return The requested gravitino catalog type.
+   */
+  @Override
+  public org.apache.gravitino.Catalog.Type gravitinoCatalogType() {
+    return org.apache.gravitino.Catalog.Type.RELATIONAL;
+  }
+
+  /**
+   * Define properties converter {@link PropertiesConverter}.
+   *
+   * @return The requested property converter.
+   */
+  @Override
+  public PropertiesConverter propertiesConverter() {
+    return HivePropertiesConverter.INSTANCE;
+  }
+
+  /**
+   * Define partition converter.
+   *
+   * @return The requested partition converter.
+   */
+  @Override
+  public PartitionConverter partitionConverter() {
+    return DefaultPartitionConverter.INSTANCE;
+  }
 }
diff --git 
a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/store/GravitinoCatalogStore.java
 
b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/store/GravitinoCatalogStore.java
index 2c210f21c..92e778ce2 100644
--- 
a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/store/GravitinoCatalogStore.java
+++ 
b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/store/GravitinoCatalogStore.java
@@ -19,20 +19,25 @@
 
 package org.apache.gravitino.flink.connector.store;
 
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.ServiceLoader;
 import java.util.Set;
+import java.util.function.Predicate;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.table.catalog.AbstractCatalogStore;
 import org.apache.flink.table.catalog.CatalogDescriptor;
 import org.apache.flink.table.catalog.CommonCatalogOptions;
 import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.factories.Factory;
 import org.apache.flink.util.Preconditions;
 import org.apache.gravitino.Catalog;
 import org.apache.gravitino.flink.connector.PropertiesConverter;
+import org.apache.gravitino.flink.connector.catalog.BaseCatalogFactory;
 import org.apache.gravitino.flink.connector.catalog.GravitinoCatalogManager;
-import 
org.apache.gravitino.flink.connector.hive.GravitinoHiveCatalogFactoryOptions;
-import org.apache.gravitino.flink.connector.hive.HivePropertiesConverter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -49,11 +54,15 @@ public class GravitinoCatalogStore extends 
AbstractCatalogStore {
   public void storeCatalog(String catalogName, CatalogDescriptor descriptor)
       throws CatalogException {
     Configuration configuration = descriptor.getConfiguration();
-    String provider = getGravitinoCatalogProvider(configuration);
-    Catalog.Type type = getGravitinoCatalogType(configuration);
+    BaseCatalogFactory catalogFactory = 
getCatalogFactory(configuration.toMap());
     Map<String, String> gravitinoProperties =
-        
getPropertiesConverter(provider).toGravitinoCatalogProperties(configuration);
-    gravitinoCatalogManager.createCatalog(catalogName, type, null, provider, 
gravitinoProperties);
+        
catalogFactory.propertiesConverter().toGravitinoCatalogProperties(configuration);
+    gravitinoCatalogManager.createCatalog(
+        catalogName,
+        catalogFactory.gravitinoCatalogType(),
+        null,
+        catalogFactory.gravitinoCatalogProvider(),
+        gravitinoProperties);
   }
 
   @Override
@@ -69,8 +78,8 @@ public class GravitinoCatalogStore extends 
AbstractCatalogStore {
   public Optional<CatalogDescriptor> getCatalog(String catalogName) throws 
CatalogException {
     try {
       Catalog catalog = 
gravitinoCatalogManager.getGravitinoCatalogInfo(catalogName);
-      String provider = catalog.provider();
-      PropertiesConverter propertiesConverter = 
getPropertiesConverter(provider);
+      BaseCatalogFactory catalogFactory = 
getCatalogFactory(catalog.provider());
+      PropertiesConverter propertiesConverter = 
catalogFactory.propertiesConverter();
       Map<String, String> flinkCatalogProperties =
           propertiesConverter.toFlinkCatalogProperties(catalog.properties());
       CatalogDescriptor descriptor =
@@ -96,43 +105,60 @@ public class GravitinoCatalogStore extends 
AbstractCatalogStore {
     return gravitinoCatalogManager.contains(catalogName);
   }
 
-  private String getGravitinoCatalogProvider(Configuration configuration) {
+  private BaseCatalogFactory getCatalogFactory(Map<String, String> 
configuration) {
     String catalogType =
         Preconditions.checkNotNull(
-            configuration.get(CommonCatalogOptions.CATALOG_TYPE),
+            configuration.get(CommonCatalogOptions.CATALOG_TYPE.key()),
             "%s should not be null.",
             CommonCatalogOptions.CATALOG_TYPE);
 
-    switch (catalogType) {
-      case GravitinoHiveCatalogFactoryOptions.IDENTIFIER:
-        return "hive";
-      default:
-        throw new IllegalArgumentException(
-            String.format("The catalog type is not supported:%s", 
catalogType));
-    }
+    return discoverFactories(
+        catalogFactory -> 
(catalogFactory.factoryIdentifier().equalsIgnoreCase(catalogType)),
+        String.format(
+            "Flink catalog type [%s] matched multiple flink catalog factories, 
it should only match one.",
+            catalogType));
   }
 
-  private Catalog.Type getGravitinoCatalogType(Configuration configuration) {
-    String catalogType =
-        Preconditions.checkNotNull(
-            configuration.get(CommonCatalogOptions.CATALOG_TYPE),
-            "%s should not be null.",
-            CommonCatalogOptions.CATALOG_TYPE);
+  private BaseCatalogFactory getCatalogFactory(String provider) {
+    return discoverFactories(
+        catalogFactory ->
+            ((BaseCatalogFactory) catalogFactory)
+                .gravitinoCatalogProvider()
+                .equalsIgnoreCase(provider),
+        String.format(
+            "Gravitino catalog provider [%s] matched multiple flink catalog 
factories, it should only match one.",
+            provider));
+  }
 
-    switch (catalogType) {
-      case GravitinoHiveCatalogFactoryOptions.IDENTIFIER:
-        return Catalog.Type.RELATIONAL;
-      default:
-        throw new IllegalArgumentException(
-            String.format("The catalog type is not supported:%s", 
catalogType));
+  private BaseCatalogFactory discoverFactories(Predicate<Factory> predicate, 
String errorMessage) {
+    Iterator<Factory> serviceLoaderIterator = 
ServiceLoader.load(Factory.class).iterator();
+    final List<Factory> factories = new ArrayList<>();
+    while (true) {
+      try {
+        if (!serviceLoaderIterator.hasNext()) {
+          break;
+        }
+        Factory catalogFactory = serviceLoaderIterator.next();
+        if (catalogFactory instanceof BaseCatalogFactory && 
predicate.test(catalogFactory)) {
+          factories.add(catalogFactory);
+        }
+      } catch (Throwable t) {
+        if (t instanceof NoClassDefFoundError) {
+          LOG.debug(
+              "NoClassDefFoundError when loading a " + 
Factory.class.getCanonicalName() + ".", t);
+        } else {
+          throw new RuntimeException("Unexpected error when trying to load 
service provider.", t);
+        }
+      }
     }
-  }
 
-  private PropertiesConverter getPropertiesConverter(String provider) {
-    switch (provider) {
-      case "hive":
-        return HivePropertiesConverter.INSTANCE;
+    if (factories.isEmpty()) {
+      throw new RuntimeException("Failed to correctly match the Flink catalog 
factory.");
+    }
+    // It should only match one.
+    if (factories.size() > 1) {
+      throw new RuntimeException(errorMessage);
     }
-    throw new IllegalArgumentException("The provider is not supported:" + 
provider);
+    return (BaseCatalogFactory) factories.get(0);
   }
 }

Reply via email to