sunxiaojian commented on code in PR #5868:
URL: https://github.com/apache/gravitino/pull/5868#discussion_r1888447088


##########
flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/store/GravitinoCatalogStore.java:
##########
@@ -96,43 +102,37 @@ public boolean contains(String catalogName) throws 
CatalogException {
     return gravitinoCatalogManager.contains(catalogName);
   }
 
-  private String getGravitinoCatalogProvider(Configuration configuration) {
+  private BaseCatalogFactory catalogFactory(Map<String, String> configuration) 
{
     String catalogType =
         Preconditions.checkNotNull(
-            configuration.get(CommonCatalogOptions.CATALOG_TYPE),
+            configuration.get(CommonCatalogOptions.CATALOG_TYPE.key()),
             "%s should not be null.",
             CommonCatalogOptions.CATALOG_TYPE);
 
-    switch (catalogType) {
-      case GravitinoHiveCatalogFactoryOptions.IDENTIFIER:
-        return "hive";
-      default:
-        throw new IllegalArgumentException(
-            String.format("The catalog type is not supported:%s", 
catalogType));
-    }
-  }
-
-  private Catalog.Type getGravitinoCatalogType(Configuration configuration) {
-    String catalogType =
-        Preconditions.checkNotNull(
-            configuration.get(CommonCatalogOptions.CATALOG_TYPE),
-            "%s should not be null.",
-            CommonCatalogOptions.CATALOG_TYPE);
-
-    switch (catalogType) {
-      case GravitinoHiveCatalogFactoryOptions.IDENTIFIER:
-        return Catalog.Type.RELATIONAL;
-      default:
-        throw new IllegalArgumentException(
-            String.format("The catalog type is not supported:%s", 
catalogType));
-    }
+    ServiceLoader<Factory> serviceLoader = ServiceLoader.load(Factory.class);
+    // Should be only one.
+    return (BaseCatalogFactory)
+        Streams.stream(serviceLoader.iterator())
+            .filter(
+                catalogFactory ->
+                    (catalogFactory instanceof BaseCatalogFactory)
+                        && 
(catalogFactory.factoryIdentifier().equalsIgnoreCase(catalogType)))
+            .findFirst()
+            .get();
   }
 
-  private PropertiesConverter getPropertiesConverter(String provider) {
-    switch (provider) {
-      case "hive":
-        return HivePropertiesConverter.INSTANCE;
-    }
-    throw new IllegalArgumentException("The provider is not supported:" + 
provider);
+  private BaseCatalogFactory catalogFactory(String provider) {

Review Comment:
   Used by getCatalog



##########
flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/store/GravitinoCatalogStore.java:
##########
@@ -96,43 +102,37 @@ public boolean contains(String catalogName) throws 
CatalogException {
     return gravitinoCatalogManager.contains(catalogName);
   }
 
-  private String getGravitinoCatalogProvider(Configuration configuration) {
+  private BaseCatalogFactory catalogFactory(Map<String, String> configuration) 
{
     String catalogType =
         Preconditions.checkNotNull(
-            configuration.get(CommonCatalogOptions.CATALOG_TYPE),
+            configuration.get(CommonCatalogOptions.CATALOG_TYPE.key()),
             "%s should not be null.",
             CommonCatalogOptions.CATALOG_TYPE);
 
-    switch (catalogType) {
-      case GravitinoHiveCatalogFactoryOptions.IDENTIFIER:
-        return "hive";
-      default:
-        throw new IllegalArgumentException(
-            String.format("The catalog type is not supported:%s", 
catalogType));
-    }
-  }
-
-  private Catalog.Type getGravitinoCatalogType(Configuration configuration) {
-    String catalogType =
-        Preconditions.checkNotNull(
-            configuration.get(CommonCatalogOptions.CATALOG_TYPE),
-            "%s should not be null.",
-            CommonCatalogOptions.CATALOG_TYPE);
-
-    switch (catalogType) {
-      case GravitinoHiveCatalogFactoryOptions.IDENTIFIER:
-        return Catalog.Type.RELATIONAL;
-      default:
-        throw new IllegalArgumentException(
-            String.format("The catalog type is not supported:%s", 
catalogType));
-    }
+    ServiceLoader<Factory> serviceLoader = ServiceLoader.load(Factory.class);
+    // Should be only one.
+    return (BaseCatalogFactory)
+        Streams.stream(serviceLoader.iterator())
+            .filter(
+                catalogFactory ->
+                    (catalogFactory instanceof BaseCatalogFactory)
+                        && 
(catalogFactory.factoryIdentifier().equalsIgnoreCase(catalogType)))
+            .findFirst()
+            .get();
   }
 
-  private PropertiesConverter getPropertiesConverter(String provider) {
-    switch (provider) {
-      case "hive":
-        return HivePropertiesConverter.INSTANCE;
-    }
-    throw new IllegalArgumentException("The provider is not supported:" + 
provider);
+  private BaseCatalogFactory catalogFactory(String provider) {

Review Comment:
   Used by getCatalog()



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@gravitino.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to