This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 79d9a937ee [Fix][Connector-V2] Fix kafka database name (#9201)
79d9a937ee is described below
commit 79d9a937eecb430d451aaf2d094e7c3a2d720370
Author: corgy-w <[email protected]>
AuthorDate: Tue Apr 22 18:11:59 2025 +0800
[Fix][Connector-V2] Fix kafka database name (#9201)
---
.../seatunnel/kafka/source/KafkaSourceConfig.java | 22 +++++++++++++--
.../kafka/source/KafkaSourceConfigTest.java | 31 ++++++++++++++++++++++
2 files changed, 51 insertions(+), 2 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfig.java
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfig.java
index ef04d747dc..ddc67ebe17 100644
---
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfig.java
+++
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfig.java
@@ -19,6 +19,8 @@ package
org.apache.seatunnel.connectors.seatunnel.kafka.source;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.options.ConnectorCommonOptions;
+import org.apache.seatunnel.api.options.table.TableIdentifierOptions;
+import org.apache.seatunnel.api.options.table.TableSchemaOptions;
import org.apache.seatunnel.api.serialization.DeserializationSchema;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
@@ -150,7 +152,9 @@ public class KafkaSourceConfig implements Serializable {
return consumerMetadataList.stream()
.collect(
Collectors.toMap(
- consumerMetadata -> TablePath.of(null,
consumerMetadata.getTopic()),
+ consumerMetadata ->
+ getTablePathFromSchema(
+ readonlyConfig,
consumerMetadata.getTopic()),
consumerMetadata -> consumerMetadata));
}
@@ -221,7 +225,7 @@ public class KafkaSourceConfig implements Serializable {
private CatalogTable createCatalogTable(ReadonlyConfig readonlyConfig) {
Optional<Map<String, Object>> schemaOptions =
readonlyConfig.getOptional(KafkaSourceOptions.SCHEMA);
- TablePath tablePath = TablePath.of(null, readonlyConfig.get(TOPIC));
+
TableSchema tableSchema;
MessageFormat format = readonlyConfig.get(FORMAT);
@@ -237,6 +241,8 @@ public class KafkaSourceConfig implements Serializable {
"content", BasicType.STRING_TYPE,
0, false, null, null))
.build();
}
+ TablePath tablePath = getTablePathFromSchema(readonlyConfig,
readonlyConfig.get(TOPIC));
+
return CatalogTable.of(
TableIdentifier.of("", tablePath),
tableSchema,
@@ -253,6 +259,18 @@ public class KafkaSourceConfig implements Serializable {
null);
}
+ private TablePath getTablePathFromSchema(ReadonlyConfig readonlyConfig,
String topicName) {
+ ReadonlyConfig schema =
+ readonlyConfig
+ .getOptional(TableSchemaOptions.SCHEMA)
+ .map(ReadonlyConfig::fromMap)
+
.orElse(ReadonlyConfig.fromMap(Collections.emptyMap()));
+
+ return schema.getOptional(TableIdentifierOptions.TABLE)
+ .map(TablePath::of)
+ .orElseGet(() -> TablePath.of(null, topicName));
+ }
+
private DeserializationSchema<SeaTunnelRow> createDeserializationSchema(
CatalogTable catalogTable, ReadonlyConfig readonlyConfig) {
SeaTunnelRowType seaTunnelRowType = catalogTable.getSeaTunnelRowType();
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/test/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfigTest.java
b/seatunnel-connectors-v2/connector-kafka/src/test/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfigTest.java
index 97b2406cf6..adde5bc861 100644
---
a/seatunnel-connectors-v2/connector-kafka/src/test/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfigTest.java
+++
b/seatunnel-connectors-v2/connector-kafka/src/test/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfigTest.java
@@ -32,6 +32,7 @@ import java.util.Map;
import static
org.apache.seatunnel.api.options.ConnectorCommonOptions.DATABASE_NAME;
import static
org.apache.seatunnel.api.options.ConnectorCommonOptions.SCHEMA_NAME;
import static
org.apache.seatunnel.api.options.ConnectorCommonOptions.TABLE_NAME;
+import static
org.apache.seatunnel.api.options.table.TableIdentifierOptions.TABLE;
import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSourceOptions.DEBEZIUM_RECORD_TABLE_FILTER;
public class KafkaSourceConfigTest {
@@ -71,4 +72,34 @@ public class KafkaSourceConfigTest {
.getTableDeserializationMap()
.get(TablePath.of("test.test.test")));
}
+
+ @Test
+ void testDeserializationWithSchema() {
+ Map<String, Object> schemaFields = new HashMap<>();
+ schemaFields.put("id", "int");
+ schemaFields.put("name", "string");
+ schemaFields.put("description", "string");
+ schemaFields.put("weight", "string");
+
+ Map<String, Object> schema = new HashMap<>();
+ schema.put("fields", schemaFields);
+ schema.put(TABLE.key(), "db1.table1");
+
+ Map<String, Object> configMap = new HashMap<>();
+ configMap.put("bootstrap.servers", "localhost:9092");
+ configMap.put("group.id", "test");
+ configMap.put("topic", "test");
+ configMap.put("schema", schema);
+ configMap.put("format", "text");
+
+ KafkaSourceConfig sourceConfig = new
KafkaSourceConfig(ReadonlyConfig.fromMap(configMap));
+
+ DeserializationSchema<SeaTunnelRow> deserializationSchema =
+ sourceConfig
+ .getMapMetadata()
+ .get(TablePath.of("db1.table1"))
+ .getDeserializationSchema();
+
+ Assertions.assertNotNull(deserializationSchema);
+ }
}