This is an automated email from the ASF dual-hosted git repository.
jshao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/main by this push:
new 6ae509a41d [#8163] feat(catalog): Support paimon latest version-1.2.0
(#8213)
6ae509a41d is described below
commit 6ae509a41d2b7a14c570750d84d747f38f89ae30
Author: liujinhui <[email protected]>
AuthorDate: Mon Sep 15 15:00:17 2025 +0800
[#8163] feat(catalog): Support paimon latest version-1.2.0 (#8213)
### What changes were proposed in this pull request?
1. upgrade paimon from 0.8 to 1.2
2. add paimon to Spark connector only when scala is 2.12, because
paimon-spark connector doesn't support scala 2.13
### Why are the changes needed?
Fix: #8163
### Does this PR introduce _any_ user-facing change?
Yes, user must specify `spark.sql.gravitino.enablePaimonSupport` to
`true` to use Paimon for Spark connector.
### How was this patch tested?
existing tests and test paimon operaton in local env.
---------
Co-authored-by: fanng <[email protected]>
---
build.gradle.kts | 25 ++++++++++++++++++++++
catalogs/catalog-lakehouse-paimon/build.gradle.kts | 2 +-
.../lakehouse/paimon/ops/PaimonCatalogOps.java | 2 +-
.../integration/test/CatalogPaimonBaseIT.java | 3 ++-
.../test/CatalogPaimonFileSystemIT.java | 2 +-
.../integration/test/CatalogPaimonHiveIT.java | 2 +-
.../lakehouse/paimon/ops/TestPaimonCatalogOps.java | 6 ++++--
.../lakehouse/paimon/utils/TestCatalogUtils.java | 2 ++
.../lakehouse/paimon/utils/TestTableOpsUtils.java | 14 ++++++------
docs/lakehouse-paimon-catalog.md | 4 ++--
docs/spark-connector/spark-catalog-iceberg.md | 7 +++++-
docs/spark-connector/spark-catalog-jdbc.md | 6 +++++-
docs/spark-connector/spark-catalog-paimon.md | 7 +++++-
docs/spark-connector/spark-connector.md | 1 +
gradle/libs.versions.toml | 2 +-
spark-connector/spark-common/build.gradle.kts | 24 +++++++++++++++------
.../spark/connector/GravitinoSparkConfig.java | 2 ++
.../connector/plugin/GravitinoDriverPlugin.java | 16 ++++++++++++++
.../connector/integration/test/SparkEnvIT.java | 1 +
.../integration/test/sql/SparkQueryRunner.java | 1 +
.../integration/test/util/SparkTableInfo.java | 5 ++---
spark-connector/v3.3/spark/build.gradle.kts | 21 ++++++++++++++----
.../connector/version/TestCatalogNameAdaptor.java | 5 +++--
spark-connector/v3.4/spark/build.gradle.kts | 21 ++++++++++++++----
.../connector/version/TestCatalogNameAdaptor.java | 5 +++--
spark-connector/v3.5/spark/build.gradle.kts | 21 ++++++++++++++----
.../connector/version/TestCatalogNameAdaptor.java | 5 +++--
27 files changed, 165 insertions(+), 47 deletions(-)
diff --git a/build.gradle.kts b/build.gradle.kts
index bf45d6d889..8c4813c798 100644
--- a/build.gradle.kts
+++ b/build.gradle.kts
@@ -244,6 +244,30 @@ nexusPublishing {
packageGroup.set("org.apache.gravitino")
}
+fun excludePackagesForSparkConnector(project: Project) {
+ project.afterEvaluate {
+ if (scalaVersion != "2.12") {
+ val excludedPackages = listOf(
+ "org/apache/gravitino/spark/connector/paimon/**",
+ "org/apache/gravitino/spark/connector/integration/test/paimon/**"
+ )
+
+ sourceSets {
+ main {
+ java {
+ exclude(excludedPackages)
+ }
+ }
+ test {
+ java {
+ exclude(excludedPackages)
+ }
+ }
+ }
+ }
+ }
+}
+
subprojects {
// Gravitino Python client project didn't need to apply the java plugin
if (project.name == "client-python") {
@@ -284,6 +308,7 @@ subprojects {
return false
}
+ extensions.extraProperties.set("excludePackagesForSparkConnector",
::excludePackagesForSparkConnector)
tasks.register("printJvm") {
group = "help"
diff --git a/catalogs/catalog-lakehouse-paimon/build.gradle.kts
b/catalogs/catalog-lakehouse-paimon/build.gradle.kts
index 6839cc163b..de7e6d0c8a 100644
--- a/catalogs/catalog-lakehouse-paimon/build.gradle.kts
+++ b/catalogs/catalog-lakehouse-paimon/build.gradle.kts
@@ -25,7 +25,7 @@ plugins {
}
val scalaVersion: String = project.properties["scalaVersion"] as? String ?:
extra["defaultScalaVersion"].toString()
-val sparkVersion: String = libs.versions.spark34.get()
+val sparkVersion: String = libs.versions.spark35.get()
val sparkMajorVersion: String = sparkVersion.substringBeforeLast(".")
val paimonVersion: String = libs.versions.paimon.get()
diff --git
a/catalogs/catalog-lakehouse-paimon/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/ops/PaimonCatalogOps.java
b/catalogs/catalog-lakehouse-paimon/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/ops/PaimonCatalogOps.java
index a829b322a3..02996a2f5f 100644
---
a/catalogs/catalog-lakehouse-paimon/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/ops/PaimonCatalogOps.java
+++
b/catalogs/catalog-lakehouse-paimon/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/ops/PaimonCatalogOps.java
@@ -64,7 +64,7 @@ public class PaimonCatalogOps implements AutoCloseable {
}
public Map<String, String> loadDatabase(String databaseName) throws
DatabaseNotExistException {
- return catalog.loadDatabaseProperties(databaseName);
+ return catalog.getDatabase(databaseName).options();
}
public void createDatabase(String databaseName, Map<String, String>
properties)
diff --git
a/catalogs/catalog-lakehouse-paimon/src/test/java/org/apache/gravitino/catalog/lakehouse/paimon/integration/test/CatalogPaimonBaseIT.java
b/catalogs/catalog-lakehouse-paimon/src/test/java/org/apache/gravitino/catalog/lakehouse/paimon/integration/test/CatalogPaimonBaseIT.java
index f83a850ef7..c77cca1b97 100644
---
a/catalogs/catalog-lakehouse-paimon/src/test/java/org/apache/gravitino/catalog/lakehouse/paimon/integration/test/CatalogPaimonBaseIT.java
+++
b/catalogs/catalog-lakehouse-paimon/src/test/java/org/apache/gravitino/catalog/lakehouse/paimon/integration/test/CatalogPaimonBaseIT.java
@@ -195,7 +195,7 @@ public abstract class CatalogPaimonBaseIT extends BaseIT {
Assertions.assertThrows(
DatabaseNotExistException.class,
() -> {
- paimonCatalog.loadDatabaseProperties(schemaIdent.name());
+ paimonCatalog.getDatabase(schemaIdent.name());
});
schemaNames = new HashSet<>(Arrays.asList(schemas.listSchemas()));
@@ -1118,6 +1118,7 @@ public abstract class CatalogPaimonBaseIT extends BaseIT {
Map<String, String> properties = Maps.newHashMap();
properties.put("key1", "val1");
properties.put("key2", "val2");
+ properties.put("alter-column-null-to-not-null.disabled", "false");
return properties;
}
diff --git
a/catalogs/catalog-lakehouse-paimon/src/test/java/org/apache/gravitino/catalog/lakehouse/paimon/integration/test/CatalogPaimonFileSystemIT.java
b/catalogs/catalog-lakehouse-paimon/src/test/java/org/apache/gravitino/catalog/lakehouse/paimon/integration/test/CatalogPaimonFileSystemIT.java
index 442fe331b8..e699f8d396 100644
---
a/catalogs/catalog-lakehouse-paimon/src/test/java/org/apache/gravitino/catalog/lakehouse/paimon/integration/test/CatalogPaimonFileSystemIT.java
+++
b/catalogs/catalog-lakehouse-paimon/src/test/java/org/apache/gravitino/catalog/lakehouse/paimon/integration/test/CatalogPaimonFileSystemIT.java
@@ -69,6 +69,6 @@ public class CatalogPaimonFileSystemIT extends
CatalogPaimonBaseIT {
// load schema check, database properties is empty for Paimon
FilesystemCatalog.
Schema schema = schemas.loadSchema(schemaIdent.name());
Assertions.assertTrue(schema.properties().isEmpty());
-
Assertions.assertTrue(paimonCatalog.loadDatabaseProperties(schemaIdent.name()).isEmpty());
+
Assertions.assertTrue(paimonCatalog.getDatabase(schemaIdent.name()).options().isEmpty());
}
}
diff --git
a/catalogs/catalog-lakehouse-paimon/src/test/java/org/apache/gravitino/catalog/lakehouse/paimon/integration/test/CatalogPaimonHiveIT.java
b/catalogs/catalog-lakehouse-paimon/src/test/java/org/apache/gravitino/catalog/lakehouse/paimon/integration/test/CatalogPaimonHiveIT.java
index fcb220a880..fdebf6839f 100644
---
a/catalogs/catalog-lakehouse-paimon/src/test/java/org/apache/gravitino/catalog/lakehouse/paimon/integration/test/CatalogPaimonHiveIT.java
+++
b/catalogs/catalog-lakehouse-paimon/src/test/java/org/apache/gravitino/catalog/lakehouse/paimon/integration/test/CatalogPaimonHiveIT.java
@@ -77,7 +77,7 @@ public class CatalogPaimonHiveIT extends CatalogPaimonBaseIT {
// load schema check.
Schema schema = schemas.loadSchema(schemaIdent.name());
Assertions.assertEquals(schema.properties().get("key"), "hive");
- Map<String, String> loadedProps =
paimonCatalog.loadDatabaseProperties(schemaIdent.name());
+ Map<String, String> loadedProps =
paimonCatalog.getDatabase(schemaIdent.name()).options();
Assertions.assertEquals(loadedProps.get("key"), "hive");
}
}
diff --git
a/catalogs/catalog-lakehouse-paimon/src/test/java/org/apache/gravitino/catalog/lakehouse/paimon/ops/TestPaimonCatalogOps.java
b/catalogs/catalog-lakehouse-paimon/src/test/java/org/apache/gravitino/catalog/lakehouse/paimon/ops/TestPaimonCatalogOps.java
index f8dacfd5fd..ffdf2d5980 100644
---
a/catalogs/catalog-lakehouse-paimon/src/test/java/org/apache/gravitino/catalog/lakehouse/paimon/ops/TestPaimonCatalogOps.java
+++
b/catalogs/catalog-lakehouse-paimon/src/test/java/org/apache/gravitino/catalog/lakehouse/paimon/ops/TestPaimonCatalogOps.java
@@ -218,7 +218,7 @@ public class TestPaimonCatalogOps {
IllegalArgumentException.class, () -> assertUpdateColumnPosition(5,
defaultPos()));
// Test NullPointerException with UpdateColumnPosition for after
non-existent column.
assertThrowsExactly(
- NullPointerException.class, () -> assertUpdateColumnPosition(1,
after("col_5")));
+ IllegalArgumentException.class, () -> assertUpdateColumnPosition(1,
after("col_5")));
}
@Test
@@ -227,7 +227,7 @@ public class TestPaimonCatalogOps {
table -> {
DataField dataField = table.rowType().getFields().get(0);
assertEquals("col_1", dataField.name());
- assertEquals(DataTypes.BIGINT(), dataField.type());
+ assertEquals(DataTypes.BIGINT().notNull(), dataField.type());
},
updateColumnType(getFieldName("col_1"), Types.LongType.get()));
assertColumnNotExist(updateColumnType(getFieldName("col_5"),
Types.ShortType.get()));
@@ -420,6 +420,8 @@ public class TestPaimonCatalogOps {
.build()),
ArrayType.class.getSimpleName())
.comment(COMMENT)
+ .primaryKey("col_1")
+ .option("alter-column-null-to-not-null.disabled", "false")
.options(OPTIONS)
.build());
paimonCatalogOps.createTable(tableInfo.getKey(), tableInfo.getValue());
diff --git
a/catalogs/catalog-lakehouse-paimon/src/test/java/org/apache/gravitino/catalog/lakehouse/paimon/utils/TestCatalogUtils.java
b/catalogs/catalog-lakehouse-paimon/src/test/java/org/apache/gravitino/catalog/lakehouse/paimon/utils/TestCatalogUtils.java
index 3495245eaa..0554c482d0 100644
---
a/catalogs/catalog-lakehouse-paimon/src/test/java/org/apache/gravitino/catalog/lakehouse/paimon/utils/TestCatalogUtils.java
+++
b/catalogs/catalog-lakehouse-paimon/src/test/java/org/apache/gravitino/catalog/lakehouse/paimon/utils/TestCatalogUtils.java
@@ -74,6 +74,8 @@ public class TestCatalogUtils {
"paimon_catalog_warehouse"),
PaimonConfig.CATALOG_URI.getKey(),
generateUri(metastore),
+ "cache-enabled",
+ "false",
PaimonConfig.CATALOG_JDBC_USER.getKey(),
"user",
PaimonConfig.CATALOG_JDBC_PASSWORD.getKey(),
diff --git
a/catalogs/catalog-lakehouse-paimon/src/test/java/org/apache/gravitino/catalog/lakehouse/paimon/utils/TestTableOpsUtils.java
b/catalogs/catalog-lakehouse-paimon/src/test/java/org/apache/gravitino/catalog/lakehouse/paimon/utils/TestTableOpsUtils.java
index 99edac07ed..62147df0e4 100644
---
a/catalogs/catalog-lakehouse-paimon/src/test/java/org/apache/gravitino/catalog/lakehouse/paimon/utils/TestTableOpsUtils.java
+++
b/catalogs/catalog-lakehouse-paimon/src/test/java/org/apache/gravitino/catalog/lakehouse/paimon/utils/TestTableOpsUtils.java
@@ -87,7 +87,7 @@ public class TestTableOpsUtils {
AddColumn.class,
schemaChange -> {
AddColumn addColumn = (AddColumn) schemaChange;
- assertEquals("col_1", addColumn.fieldName());
+ assertEquals("col_1", addColumn.fieldNames()[0]);
assertEquals(DataTypeRoot.INTEGER,
addColumn.dataType().getTypeRoot());
assertEquals(AddColumn.class.getSimpleName(),
addColumn.description());
assertNotNull(addColumn.move());
@@ -111,7 +111,7 @@ public class TestTableOpsUtils {
AddColumn.class,
schemaChange -> {
AddColumn addColumn = (AddColumn) schemaChange;
- assertEquals("col_2", addColumn.fieldName());
+ assertEquals("col_2", addColumn.fieldNames()[0]);
assertEquals(DataTypeRoot.FLOAT, addColumn.dataType().getTypeRoot());
assertEquals(AddColumn.class.getSimpleName(),
addColumn.description());
assertNotNull(addColumn.move());
@@ -135,7 +135,7 @@ public class TestTableOpsUtils {
AddColumn.class,
schemaChange -> {
AddColumn addColumn = (AddColumn) schemaChange;
- assertEquals("col_3", addColumn.fieldName());
+ assertEquals("col_3", addColumn.fieldNames()[0]);
assertEquals(DataTypeRoot.ARRAY, addColumn.dataType().getTypeRoot());
assertEquals(AddColumn.class.getSimpleName(),
addColumn.description());
assertNull(addColumn.move());
@@ -156,7 +156,7 @@ public class TestTableOpsUtils {
AddColumn.class,
schemaChange -> {
AddColumn addColumn = (AddColumn) schemaChange;
- assertEquals("col_4", addColumn.fieldName());
+ assertEquals("col_4", addColumn.fieldNames()[0]);
assertEquals(DataTypeRoot.MAP, addColumn.dataType().getTypeRoot());
assertEquals(AddColumn.class.getSimpleName(),
addColumn.description());
assertNull(addColumn.move());
@@ -196,7 +196,7 @@ public class TestTableOpsUtils {
UpdateColumnType.class,
schemaChange -> {
UpdateColumnType updateColumnType = (UpdateColumnType) schemaChange;
- assertEquals("col_4", updateColumnType.fieldName());
+ assertEquals("col_4", updateColumnType.fieldNames()[0]);
assertEquals(DataTypeRoot.DOUBLE,
updateColumnType.newDataType().getTypeRoot());
});
}
@@ -208,7 +208,7 @@ public class TestTableOpsUtils {
RenameColumn.class,
schemaChange -> {
RenameColumn renameColumn = (RenameColumn) schemaChange;
- assertEquals("col_1", renameColumn.fieldName());
+ assertEquals("col_1", renameColumn.fieldNames()[0]);
assertEquals("col_5", renameColumn.newName());
});
}
@@ -220,7 +220,7 @@ public class TestTableOpsUtils {
DropColumn.class,
schemaChange -> {
DropColumn dropColumn = (DropColumn) schemaChange;
- assertEquals("col_2", dropColumn.fieldName());
+ assertEquals("col_2", dropColumn.fieldNames()[0]);
});
}
diff --git a/docs/lakehouse-paimon-catalog.md b/docs/lakehouse-paimon-catalog.md
index 88f2f35796..52b8ba5036 100644
--- a/docs/lakehouse-paimon-catalog.md
+++ b/docs/lakehouse-paimon-catalog.md
@@ -15,7 +15,7 @@ Apache Gravitino provides the ability to manage Apache Paimon
metadata.
### Requirements
:::info
-Builds with Apache Paimon `0.8.0`.
+Builds with Apache Paimon `1.2`.
:::
## Catalog
@@ -43,7 +43,7 @@ Builds with Apache Paimon `0.8.0`.
| `authentication.kerberos.keytab-fetch-timeout-sec` | The fetch timeout of
retrieving Kerberos keytab from `authentication.kerberos.keytab-uri`.
| 60
| No
[...]
| `oss-endpoint` | The endpoint of the
Aliyun OSS.
| (none)
| required if the value of `warehouse` is a OSS path
[...]
| `oss-access-key-id` | The access key of the
Aliyun OSS.
| (none)
| required if the value of `warehouse` is a OSS path
[...]
-| `oss-access-key-secret` | The secret key the
Aliyun OSS.
| (none)
| required if the value of `warehouse` is a OSS path
[...]
+| `oss-access-key-secret` | The secret key the
Aliyun OSS.
| (none)
| required if the value of `warehouse` is a OSS path
[...]
| `s3-endpoint` | The endpoint of the AWS
S3.
| (none)
| required if the value of `warehouse` is a S3 path
[...]
| `s3-access-key-id` | The access key of the
AWS S3.
| (none)
| required if the value of `warehouse` is a S3 path
[...]
| `s3-secret-access-key` | The secret key of the
AWS S3.
| (none)
| required if the value of `warehouse` is a S3 path
[...]
diff --git a/docs/spark-connector/spark-catalog-iceberg.md
b/docs/spark-connector/spark-catalog-iceberg.md
index 9c3e931556..826a11d4bd 100644
--- a/docs/spark-connector/spark-catalog-iceberg.md
+++ b/docs/spark-connector/spark-catalog-iceberg.md
@@ -5,7 +5,12 @@ keyword: spark connector iceberg catalog
license: "This software is licensed under the Apache License version 2."
---
-The Apache Gravitino Spark connector offers the capability to read and write
Iceberg tables, with the metadata managed by the Gravitino server. To enable
the use of the Iceberg catalog within the Spark connector, you must set the
configuration `spark.sql.gravitino.enableIcebergSupport` to `true` and download
Iceberg Spark runtime jar to Spark classpath.
+The Apache Gravitino Spark connector offers the capability to read and write
Iceberg tables, with the metadata managed by the Gravitino server.
+
+## Preparation
+
+1. Set `spark.sql.gravitino.enableIcebergSupport` to `true` in Spark
configuration.
+2. Download Iceberg Spark runtime jar to Spark classpath.
## Capabilities
diff --git a/docs/spark-connector/spark-catalog-jdbc.md
b/docs/spark-connector/spark-catalog-jdbc.md
index 7805d80266..dba50e0f65 100644
--- a/docs/spark-connector/spark-catalog-jdbc.md
+++ b/docs/spark-connector/spark-catalog-jdbc.md
@@ -5,7 +5,11 @@ keyword: spark connector jdbc catalog
license: "This software is licensed under the Apache License version 2."
---
-The Apache Gravitino Spark connector offers the capability to read JDBC
tables, with the metadata managed by the Gravitino server. To enable the use of
the JDBC catalog within the Spark connector, you must download the jdbc driver
jar which you used to Spark classpath.
+The Apache Gravitino Spark connector offers the capability to read JDBC
tables, with the metadata managed by the Gravitino server.
+
+## Preparation
+
+1. Download the corresponding jdbc driver jar to Spark classpath.
## Capabilities
diff --git a/docs/spark-connector/spark-catalog-paimon.md
b/docs/spark-connector/spark-catalog-paimon.md
index 2cae203e5d..f1f33ffc4a 100644
--- a/docs/spark-connector/spark-catalog-paimon.md
+++ b/docs/spark-connector/spark-catalog-paimon.md
@@ -5,7 +5,12 @@ keyword: spark connector paimon catalog
license: "This software is licensed under the Apache License version 2."
---
-The Apache Gravitino Spark connector offers the capability to read and write
Paimon tables, with the metadata managed by the Gravitino server. To enable the
use of the Paimon catalog within the Spark connector now, you must set download
[Paimon Spark runtime
jar](https://paimon.apache.org/docs/0.8/spark/quick-start/#preparation) to
Spark classpath.
+The Apache Gravitino Spark connector offers the capability to read and write
Paimon tables, with the metadata managed by the Gravitino server.
+
+## Preparation
+
+1. Set `spark.sql.gravitino.enablePaimonSupport` to `true` in Spark
configuration.
+2. Download Paimon Spark runtime jar to Spark classpath.
## Capabilities
diff --git a/docs/spark-connector/spark-connector.md
b/docs/spark-connector/spark-connector.md
index 3c88a9b678..34e2eb1295 100644
--- a/docs/spark-connector/spark-connector.md
+++ b/docs/spark-connector/spark-connector.md
@@ -32,6 +32,7 @@ The Apache Gravitino Spark connector leverages the Spark
DataSourceV2 interface
| spark.sql.gravitino.metalake | string | (none) | The
metalake name that spark connector used to request to Gravitino.
| Yes | 0.5.0 |
| spark.sql.gravitino.uri | string | (none) | The uri
of Gravitino server address.
| Yes | 0.5.0 |
| spark.sql.gravitino.enableIcebergSupport | string | `false` | Set to
`true` to use Iceberg catalog.
| No | 0.5.1 |
+| spark.sql.gravitino.enablePaimonSupport | string | `false` | Set to
`true` to use Paimon catalog.
| No | 1.0.0 |
| spark.sql.gravitino.client. | string | (none) | The
configuration key prefix for the Gravitino client config.
| No | 1.0.0 |
To configure the Gravitino client, use properties prefixed with
`spark.sql.gravitino.client.`. These properties will be passed to the Gravitino
client after removing the `spark.sql.` prefix.
diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml
index 0de35e4118..9bf8fc916e 100644
--- a/gradle/libs.versions.toml
+++ b/gradle/libs.versions.toml
@@ -59,7 +59,7 @@ commons-dbcp2 = "2.11.0"
caffeine = "2.9.3"
iceberg = '1.9.2' # used for Gravitino Iceberg catalog and Iceberg REST service
iceberg4connector = "1.6.1" # used for compile connectors like Spark, Flink,
etc
-paimon = '0.8.0'
+paimon = '1.2.0'
spark33 = "3.3.4"
spark34 = "3.4.3"
spark35 = "3.5.1"
diff --git a/spark-connector/spark-common/build.gradle.kts
b/spark-connector/spark-common/build.gradle.kts
index 432e3160f4..f17f3f0010 100644
--- a/spark-connector/spark-common/build.gradle.kts
+++ b/spark-connector/spark-common/build.gradle.kts
@@ -36,6 +36,11 @@ val kyuubiVersion: String = libs.versions.kyuubi4spark.get()
val scalaJava8CompatVersion: String = libs.versions.scala.java.compat.get()
val scalaCollectionCompatVersion: String =
libs.versions.scala.collection.compat.get()
+if (hasProperty("excludePackagesForSparkConnector")) {
+ val configureFunc = properties["excludePackagesForSparkConnector"] as?
(Project) -> Unit
+ configureFunc?.invoke(project)
+}
+
dependencies {
implementation(project(":catalogs:catalog-common")) {
exclude("org.apache.logging.log4j")
@@ -46,14 +51,15 @@ dependencies {
compileOnly(project(":clients:client-java-runtime", configuration =
"shadow"))
compileOnly("org.apache.iceberg:iceberg-spark-runtime-${sparkMajorVersion}_$scalaVersion:$icebergVersion")
compileOnly("org.apache.kyuubi:kyuubi-spark-connector-hive_$scalaVersion:$kyuubiVersion")
-
compileOnly("org.apache.paimon:paimon-spark-$sparkMajorVersion:$paimonVersion")
{
- exclude("org.apache.spark")
- }
-
compileOnly("org.apache.spark:spark-catalyst_$scalaVersion:$sparkVersion")
compileOnly("org.apache.spark:spark-core_$scalaVersion:$sparkVersion")
compileOnly("org.apache.spark:spark-sql_$scalaVersion:$sparkVersion")
compileOnly("org.scala-lang.modules:scala-java8-compat_$scalaVersion:$scalaJava8CompatVersion")
+ if (scalaVersion == "2.12") {
+
compileOnly("org.apache.paimon:paimon-spark-$sparkMajorVersion:$paimonVersion")
{
+ exclude("org.apache.spark")
+ }
+ }
annotationProcessor(libs.lombok)
compileOnly(libs.lombok)
@@ -121,8 +127,10 @@ dependencies {
testImplementation("org.apache.iceberg:iceberg-core:$icebergVersion")
testImplementation("org.apache.iceberg:iceberg-hive-metastore:$icebergVersion")
testImplementation("org.apache.iceberg:iceberg-spark-runtime-${sparkMajorVersion}_$scalaVersion:$icebergVersion")
-
testImplementation("org.apache.paimon:paimon-spark-$sparkMajorVersion:$paimonVersion")
{
- exclude("org.apache.spark")
+ if (scalaVersion == "2.12") {
+
testImplementation("org.apache.paimon:paimon-spark-$sparkMajorVersion:$paimonVersion")
{
+ exclude("org.apache.spark")
+ }
}
testImplementation("org.apache.kyuubi:kyuubi-spark-connector-hive_$scalaVersion:$kyuubiVersion")
// include spark-sql,spark-catalyst,hive-common,hdfs-client
@@ -174,3 +182,7 @@ configurations {
artifacts {
add("testArtifacts", testJar)
}
+
+tasks.named<Jar>("sourcesJar") {
+ duplicatesStrategy = DuplicatesStrategy.EXCLUDE
+}
diff --git
a/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/GravitinoSparkConfig.java
b/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/GravitinoSparkConfig.java
index f54fb7f3c0..30e5b990d3 100644
---
a/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/GravitinoSparkConfig.java
+++
b/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/GravitinoSparkConfig.java
@@ -28,6 +28,8 @@ public class GravitinoSparkConfig {
public static final String GRAVITINO_METALAKE = GRAVITINO_PREFIX +
"metalake";
public static final String GRAVITINO_ENABLE_ICEBERG_SUPPORT =
GRAVITINO_PREFIX + "enableIcebergSupport";
+ public static final String GRAVITINO_ENABLE_PAIMON_SUPPORT =
+ GRAVITINO_PREFIX + "enablePaimonSupport";
public static final String GRAVITINO_CLIENT_CONFIG_PREFIX = GRAVITINO_PREFIX
+ "client.";
public static final String GRAVITINO_AUTH_TYPE =
diff --git
a/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/plugin/GravitinoDriverPlugin.java
b/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/plugin/GravitinoDriverPlugin.java
index cc0a6ccbb4..7f0f31ab6c 100644
---
a/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/plugin/GravitinoDriverPlugin.java
+++
b/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/plugin/GravitinoDriverPlugin.java
@@ -66,6 +66,10 @@ public class GravitinoDriverPlugin implements DriverPlugin {
private static final Logger LOG =
LoggerFactory.getLogger(GravitinoDriverPlugin.class);
+ @VisibleForTesting
+ static final String PAIMON_SPARK_EXTENSIONS =
+ "org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions";
+
@VisibleForTesting
static final String ICEBERG_SPARK_EXTENSIONS =
"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions";
@@ -74,8 +78,11 @@ public class GravitinoDriverPlugin implements DriverPlugin {
private final List<String> gravitinoIcebergExtensions =
Arrays.asList(
GravitinoIcebergSparkSessionExtensions.class.getName(),
ICEBERG_SPARK_EXTENSIONS);
+ private final List<String> gravitinoPaimonExtensions =
Arrays.asList(PAIMON_SPARK_EXTENSIONS);
+
private final List<String> gravitinoDriverExtensions = new ArrayList<>();
private boolean enableIcebergSupport = false;
+ private boolean enablePaimonSupport = false;
@Override
public Map<String, String> init(SparkContext sc, PluginContext
pluginContext) {
@@ -94,6 +101,11 @@ public class GravitinoDriverPlugin implements DriverPlugin {
this.enableIcebergSupport =
conf.getBoolean(GravitinoSparkConfig.GRAVITINO_ENABLE_ICEBERG_SUPPORT,
false);
+ this.enablePaimonSupport =
+ conf.getBoolean(GravitinoSparkConfig.GRAVITINO_ENABLE_PAIMON_SUPPORT,
false);
+ if (enablePaimonSupport) {
+ gravitinoDriverExtensions.addAll(gravitinoPaimonExtensions);
+ }
if (enableIcebergSupport) {
gravitinoDriverExtensions.addAll(gravitinoIcebergExtensions);
}
@@ -129,6 +141,10 @@ public class GravitinoDriverPlugin implements DriverPlugin
{
&& !enableIcebergSupport) {
return;
}
+ if ("lakehouse-paimon".equals(provider.toLowerCase(Locale.ROOT))
+ && !enablePaimonSupport) {
+ return;
+ }
try {
registerCatalog(sparkConf, catalogName, provider);
} catch (Exception e) {
diff --git
a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/SparkEnvIT.java
b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/SparkEnvIT.java
index 5bcdc9a2cb..273d6d3bcd 100644
---
a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/SparkEnvIT.java
+++
b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/SparkEnvIT.java
@@ -201,6 +201,7 @@ public abstract class SparkEnvIT extends SparkUtilIT {
.set(GravitinoSparkConfig.GRAVITINO_URI, gravitinoUri)
.set(GravitinoSparkConfig.GRAVITINO_METALAKE, metalakeName)
.set(GravitinoSparkConfig.GRAVITINO_ENABLE_ICEBERG_SUPPORT, "true")
+ .set(GravitinoSparkConfig.GRAVITINO_ENABLE_PAIMON_SUPPORT, "true")
.set("hive.exec.dynamic.partition.mode", "nonstrict")
.set("spark.sql.warehouse.dir", warehouse)
.set("spark.sql.session.timeZone", TIME_ZONE_UTC);
diff --git
a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/sql/SparkQueryRunner.java
b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/sql/SparkQueryRunner.java
index f367b24dfb..b0961a0878 100644
---
a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/sql/SparkQueryRunner.java
+++
b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/sql/SparkQueryRunner.java
@@ -328,6 +328,7 @@ public class SparkQueryRunner {
.config(GravitinoSparkConfig.GRAVITINO_METALAKE, metalakeName)
.config(GravitinoSparkConfig.GRAVITINO_ENABLE_ICEBERG_SUPPORT,
"true")
.config("spark.gravitino.test.data.dir", dataDir)
+ .config(GravitinoSparkConfig.GRAVITINO_ENABLE_PAIMON_SUPPORT,
"true")
.config("hive.exec.dynamic.partition.mode", "nonstrict")
// produce old parquet format
.config("spark.sql.parquet.writeLegacyFormat", "true")
diff --git
a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/util/SparkTableInfo.java
b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/util/SparkTableInfo.java
index 74b3ea0968..058508c1bd 100644
---
a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/util/SparkTableInfo.java
+++
b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/util/SparkTableInfo.java
@@ -32,7 +32,6 @@ import
org.apache.gravitino.spark.connector.ConnectorConstants;
import org.apache.gravitino.spark.connector.hive.SparkHiveTable;
import org.apache.gravitino.spark.connector.iceberg.SparkIcebergTable;
import org.apache.gravitino.spark.connector.jdbc.SparkJdbcTable;
-import org.apache.gravitino.spark.connector.paimon.SparkPaimonTable;
import org.apache.spark.sql.connector.catalog.SupportsMetadataColumns;
import org.apache.spark.sql.connector.catalog.Table;
import org.apache.spark.sql.connector.catalog.TableCatalog;
@@ -192,8 +191,8 @@ public class SparkTableInfo {
return ((SparkHiveTable) baseTable).schema();
} else if (baseTable instanceof SparkIcebergTable) {
return ((SparkIcebergTable) baseTable).schema();
- } else if (baseTable instanceof SparkPaimonTable) {
- return ((SparkPaimonTable) baseTable).schema();
+ } else if
(baseTable.getClass().getSimpleName().equals("SparkPaimonTable")) {
+ return baseTable.schema();
} else if (baseTable instanceof SparkJdbcTable) {
return ((SparkJdbcTable) baseTable).schema();
} else {
diff --git a/spark-connector/v3.3/spark/build.gradle.kts
b/spark-connector/v3.3/spark/build.gradle.kts
index f7ed5393ee..9023404980 100644
--- a/spark-connector/v3.3/spark/build.gradle.kts
+++ b/spark-connector/v3.3/spark/build.gradle.kts
@@ -37,6 +37,11 @@ val scalaJava8CompatVersion: String =
libs.versions.scala.java.compat.get()
val scalaCollectionCompatVersion: String =
libs.versions.scala.collection.compat.get()
val artifactName =
"${rootProject.name}-spark-${sparkMajorVersion}_$scalaVersion"
+if (hasProperty("excludePackagesForSparkConnector")) {
+ val configureFunc = properties["excludePackagesForSparkConnector"] as?
(Project) -> Unit
+ configureFunc?.invoke(project)
+}
+
dependencies {
implementation(project(":spark-connector:spark-common"))
compileOnly("org.apache.kyuubi:kyuubi-spark-connector-hive_$scalaVersion:$kyuubiVersion")
@@ -44,8 +49,10 @@ dependencies {
exclude("com.fasterxml.jackson")
}
compileOnly("org.apache.iceberg:iceberg-spark-runtime-${sparkMajorVersion}_$scalaVersion:$icebergVersion")
-
compileOnly("org.apache.paimon:paimon-spark-$sparkMajorVersion:$paimonVersion")
{
- exclude("org.apache.spark")
+ if (scalaVersion == "2.12") {
+
compileOnly("org.apache.paimon:paimon-spark-$sparkMajorVersion:$paimonVersion")
{
+ exclude("org.apache.spark")
+ }
}
testImplementation(project(":api")) {
@@ -130,8 +137,10 @@ dependencies {
testImplementation("org.apache.iceberg:iceberg-core:$icebergVersion")
testImplementation("org.apache.iceberg:iceberg-spark-runtime-${sparkMajorVersion}_$scalaVersion:$icebergVersion")
testImplementation("org.apache.iceberg:iceberg-hive-metastore:$icebergVersion")
-
testImplementation("org.apache.paimon:paimon-spark-$sparkMajorVersion:$paimonVersion")
{
- exclude("org.apache.spark")
+ if (scalaVersion == "2.12") {
+
testImplementation("org.apache.paimon:paimon-spark-$sparkMajorVersion:$paimonVersion")
{
+ exclude("org.apache.spark")
+ }
}
testImplementation("org.apache.kyuubi:kyuubi-spark-connector-hive_$scalaVersion:$kyuubiVersion")
// include spark-sql,spark-catalyst,hive-common,hdfs-client
@@ -189,3 +198,7 @@ tasks.clean {
delete("metastore_db")
delete("spark-warehouse")
}
+
+tasks.named<Jar>("sourcesJar") {
+ duplicatesStrategy = DuplicatesStrategy.EXCLUDE
+}
diff --git
a/spark-connector/v3.3/spark/src/test/java/org/apache/gravitino/spark/connector/version/TestCatalogNameAdaptor.java
b/spark-connector/v3.3/spark/src/test/java/org/apache/gravitino/spark/connector/version/TestCatalogNameAdaptor.java
index 37c95e4789..c600844506 100644
---
a/spark-connector/v3.3/spark/src/test/java/org/apache/gravitino/spark/connector/version/TestCatalogNameAdaptor.java
+++
b/spark-connector/v3.3/spark/src/test/java/org/apache/gravitino/spark/connector/version/TestCatalogNameAdaptor.java
@@ -20,7 +20,6 @@ package org.apache.gravitino.spark.connector.version;
import org.apache.gravitino.spark.connector.hive.GravitinoHiveCatalogSpark33;
import
org.apache.gravitino.spark.connector.iceberg.GravitinoIcebergCatalogSpark33;
-import
org.apache.gravitino.spark.connector.paimon.GravitinoPaimonCatalogSpark33;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
@@ -34,6 +33,8 @@ public class TestCatalogNameAdaptor {
Assertions.assertEquals(GravitinoIcebergCatalogSpark33.class.getName(),
icebergCatalogName);
String paimonCatalogName =
CatalogNameAdaptor.getCatalogName("lakehouse-paimon");
- Assertions.assertEquals(GravitinoPaimonCatalogSpark33.class.getName(),
paimonCatalogName);
+ Assertions.assertEquals(
+
"org.apache.gravitino.spark.connector.paimon.GravitinoPaimonCatalogSpark33",
+ paimonCatalogName);
}
}
diff --git a/spark-connector/v3.4/spark/build.gradle.kts
b/spark-connector/v3.4/spark/build.gradle.kts
index 91885a3e4c..8a9b527dce 100644
--- a/spark-connector/v3.4/spark/build.gradle.kts
+++ b/spark-connector/v3.4/spark/build.gradle.kts
@@ -37,6 +37,11 @@ val scalaJava8CompatVersion: String =
libs.versions.scala.java.compat.get()
val scalaCollectionCompatVersion: String =
libs.versions.scala.collection.compat.get()
val artifactName =
"${rootProject.name}-spark-${sparkMajorVersion}_$scalaVersion"
+if (hasProperty("excludePackagesForSparkConnector")) {
+ val configureFunc = properties["excludePackagesForSparkConnector"] as?
(Project) -> Unit
+ configureFunc?.invoke(project)
+}
+
dependencies {
implementation(project(":spark-connector:spark-common"))
compileOnly("org.apache.kyuubi:kyuubi-spark-connector-hive_$scalaVersion:$kyuubiVersion")
@@ -45,8 +50,10 @@ dependencies {
}
compileOnly(project(":clients:client-java-runtime", configuration =
"shadow"))
compileOnly("org.apache.iceberg:iceberg-spark-runtime-${sparkMajorVersion}_$scalaVersion:$icebergVersion")
-
compileOnly("org.apache.paimon:paimon-spark-$sparkMajorVersion:$paimonVersion")
{
- exclude("org.apache.spark")
+ if (scalaVersion == "2.12") {
+
compileOnly("org.apache.paimon:paimon-spark-$sparkMajorVersion:$paimonVersion")
{
+ exclude("org.apache.spark")
+ }
}
testImplementation(project(":api")) {
@@ -130,8 +137,10 @@ dependencies {
testImplementation("org.apache.iceberg:iceberg-core:$icebergVersion")
testImplementation("org.apache.iceberg:iceberg-spark-runtime-${sparkMajorVersion}_$scalaVersion:$icebergVersion")
testImplementation("org.apache.iceberg:iceberg-hive-metastore:$icebergVersion")
-
testImplementation("org.apache.paimon:paimon-spark-$sparkMajorVersion:$paimonVersion")
{
- exclude("org.apache.spark")
+ if (scalaVersion == "2.12") {
+
testImplementation("org.apache.paimon:paimon-spark-$sparkMajorVersion:$paimonVersion")
{
+ exclude("org.apache.spark")
+ }
}
testImplementation("org.apache.kyuubi:kyuubi-spark-connector-hive_$scalaVersion:$kyuubiVersion")
// include spark-sql,spark-catalyst,hive-common,hdfs-client
@@ -189,3 +198,7 @@ tasks.clean {
delete("metastore_db")
delete("spark-warehouse")
}
+
+tasks.named<Jar>("sourcesJar") {
+ duplicatesStrategy = DuplicatesStrategy.EXCLUDE
+}
diff --git
a/spark-connector/v3.4/spark/src/test/java/org/apache/gravitino/spark/connector/version/TestCatalogNameAdaptor.java
b/spark-connector/v3.4/spark/src/test/java/org/apache/gravitino/spark/connector/version/TestCatalogNameAdaptor.java
index af9e67fab8..1924a2c4e8 100644
---
a/spark-connector/v3.4/spark/src/test/java/org/apache/gravitino/spark/connector/version/TestCatalogNameAdaptor.java
+++
b/spark-connector/v3.4/spark/src/test/java/org/apache/gravitino/spark/connector/version/TestCatalogNameAdaptor.java
@@ -20,7 +20,6 @@ package org.apache.gravitino.spark.connector.version;
import org.apache.gravitino.spark.connector.hive.GravitinoHiveCatalogSpark34;
import
org.apache.gravitino.spark.connector.iceberg.GravitinoIcebergCatalogSpark34;
-import
org.apache.gravitino.spark.connector.paimon.GravitinoPaimonCatalogSpark34;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
@@ -34,6 +33,8 @@ public class TestCatalogNameAdaptor {
Assertions.assertEquals(GravitinoIcebergCatalogSpark34.class.getName(),
icebergCatalogName);
String paimonCatalogName =
CatalogNameAdaptor.getCatalogName("lakehouse-paimon");
- Assertions.assertEquals(GravitinoPaimonCatalogSpark34.class.getName(),
paimonCatalogName);
+ Assertions.assertEquals(
+
"org.apache.gravitino.spark.connector.paimon.GravitinoPaimonCatalogSpark34",
+ paimonCatalogName);
}
}
diff --git a/spark-connector/v3.5/spark/build.gradle.kts
b/spark-connector/v3.5/spark/build.gradle.kts
index 4c84641332..edbed1ca2d 100644
--- a/spark-connector/v3.5/spark/build.gradle.kts
+++ b/spark-connector/v3.5/spark/build.gradle.kts
@@ -37,6 +37,11 @@ val scalaJava8CompatVersion: String =
libs.versions.scala.java.compat.get()
val scalaCollectionCompatVersion: String =
libs.versions.scala.collection.compat.get()
val artifactName =
"${rootProject.name}-spark-${sparkMajorVersion}_$scalaVersion"
+if (hasProperty("excludePackagesForSparkConnector")) {
+ val configureFunc = properties["excludePackagesForSparkConnector"] as?
(Project) -> Unit
+ configureFunc?.invoke(project)
+}
+
dependencies {
implementation(project(":spark-connector:spark-3.4"))
implementation(project(":spark-connector:spark-common"))
@@ -46,8 +51,10 @@ dependencies {
}
compileOnly(project(":clients:client-java-runtime", configuration =
"shadow"))
compileOnly("org.apache.iceberg:iceberg-spark-runtime-${sparkMajorVersion}_$scalaVersion:$icebergVersion")
-
compileOnly("org.apache.paimon:paimon-spark-$sparkMajorVersion:$paimonVersion")
{
- exclude("org.apache.spark")
+ if (scalaVersion == "2.12") {
+
compileOnly("org.apache.paimon:paimon-spark-$sparkMajorVersion:$paimonVersion")
{
+ exclude("org.apache.spark")
+ }
}
testImplementation(project(":api")) {
@@ -132,8 +139,10 @@ dependencies {
testImplementation("org.apache.iceberg:iceberg-core:$icebergVersion")
testImplementation("org.apache.iceberg:iceberg-spark-runtime-${sparkMajorVersion}_$scalaVersion:$icebergVersion")
testImplementation("org.apache.iceberg:iceberg-hive-metastore:$icebergVersion")
-
testImplementation("org.apache.paimon:paimon-spark-$sparkMajorVersion:$paimonVersion")
{
- exclude("org.apache.spark")
+ if (scalaVersion == "2.12") {
+
testImplementation("org.apache.paimon:paimon-spark-$sparkMajorVersion:$paimonVersion")
{
+ exclude("org.apache.spark")
+ }
}
testImplementation("org.apache.kyuubi:kyuubi-spark-connector-hive_$scalaVersion:$kyuubiVersion")
// include spark-sql,spark-catalyst,hive-common,hdfs-client
@@ -191,3 +200,7 @@ tasks.clean {
delete("metastore_db")
delete("spark-warehouse")
}
+
+tasks.named<Jar>("sourcesJar") {
+ duplicatesStrategy = DuplicatesStrategy.EXCLUDE
+}
diff --git
a/spark-connector/v3.5/spark/src/test/java/org/apache/gravitino/spark/connector/version/TestCatalogNameAdaptor.java
b/spark-connector/v3.5/spark/src/test/java/org/apache/gravitino/spark/connector/version/TestCatalogNameAdaptor.java
index f02584cd61..119817d39f 100644
---
a/spark-connector/v3.5/spark/src/test/java/org/apache/gravitino/spark/connector/version/TestCatalogNameAdaptor.java
+++
b/spark-connector/v3.5/spark/src/test/java/org/apache/gravitino/spark/connector/version/TestCatalogNameAdaptor.java
@@ -20,7 +20,6 @@ package org.apache.gravitino.spark.connector.version;
import org.apache.gravitino.spark.connector.hive.GravitinoHiveCatalogSpark35;
import
org.apache.gravitino.spark.connector.iceberg.GravitinoIcebergCatalogSpark35;
-import
org.apache.gravitino.spark.connector.paimon.GravitinoPaimonCatalogSpark35;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
@@ -34,6 +33,8 @@ public class TestCatalogNameAdaptor {
Assertions.assertEquals(GravitinoIcebergCatalogSpark35.class.getName(),
icebergCatalogName);
String paimonCatalogName =
CatalogNameAdaptor.getCatalogName("lakehouse-paimon");
- Assertions.assertEquals(GravitinoPaimonCatalogSpark35.class.getName(),
paimonCatalogName);
+ Assertions.assertEquals(
+
"org.apache.gravitino.spark.connector.paimon.GravitinoPaimonCatalogSpark35",
+ paimonCatalogName);
}
}