This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 79d9a937ee  [Fix][Connector-V2] Fix kafka database name (#9201)
79d9a937ee is described below

commit 79d9a937eecb430d451aaf2d094e7c3a2d720370
Author: corgy-w <[email protected]>
AuthorDate: Tue Apr 22 18:11:59 2025 +0800

     [Fix][Connector-V2] Fix kafka database name (#9201)
---
 .../seatunnel/kafka/source/KafkaSourceConfig.java  | 22 +++++++++++++--
 .../kafka/source/KafkaSourceConfigTest.java        | 31 ++++++++++++++++++++++
 2 files changed, 51 insertions(+), 2 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfig.java
 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfig.java
index ef04d747dc..ddc67ebe17 100644
--- 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfig.java
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfig.java
@@ -19,6 +19,8 @@ package 
org.apache.seatunnel.connectors.seatunnel.kafka.source;
 
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.options.ConnectorCommonOptions;
+import org.apache.seatunnel.api.options.table.TableIdentifierOptions;
+import org.apache.seatunnel.api.options.table.TableSchemaOptions;
 import org.apache.seatunnel.api.serialization.DeserializationSchema;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
@@ -150,7 +152,9 @@ public class KafkaSourceConfig implements Serializable {
         return consumerMetadataList.stream()
                 .collect(
                         Collectors.toMap(
-                                consumerMetadata -> TablePath.of(null, 
consumerMetadata.getTopic()),
+                                consumerMetadata ->
+                                        getTablePathFromSchema(
+                                                readonlyConfig, 
consumerMetadata.getTopic()),
                                 consumerMetadata -> consumerMetadata));
     }
 
@@ -221,7 +225,7 @@ public class KafkaSourceConfig implements Serializable {
     private CatalogTable createCatalogTable(ReadonlyConfig readonlyConfig) {
         Optional<Map<String, Object>> schemaOptions =
                 readonlyConfig.getOptional(KafkaSourceOptions.SCHEMA);
-        TablePath tablePath = TablePath.of(null, readonlyConfig.get(TOPIC));
+
         TableSchema tableSchema;
         MessageFormat format = readonlyConfig.get(FORMAT);
 
@@ -237,6 +241,8 @@ public class KafkaSourceConfig implements Serializable {
                                             "content", BasicType.STRING_TYPE, 
0, false, null, null))
                             .build();
         }
+        TablePath tablePath = getTablePathFromSchema(readonlyConfig, 
readonlyConfig.get(TOPIC));
+
         return CatalogTable.of(
                 TableIdentifier.of("", tablePath),
                 tableSchema,
@@ -253,6 +259,18 @@ public class KafkaSourceConfig implements Serializable {
                 null);
     }
 
+    private TablePath getTablePathFromSchema(ReadonlyConfig readonlyConfig, 
String topicName) {
+        ReadonlyConfig schema =
+                readonlyConfig
+                        .getOptional(TableSchemaOptions.SCHEMA)
+                        .map(ReadonlyConfig::fromMap)
+                        
.orElse(ReadonlyConfig.fromMap(Collections.emptyMap()));
+
+        return schema.getOptional(TableIdentifierOptions.TABLE)
+                .map(TablePath::of)
+                .orElseGet(() -> TablePath.of(null, topicName));
+    }
+
     private DeserializationSchema<SeaTunnelRow> createDeserializationSchema(
             CatalogTable catalogTable, ReadonlyConfig readonlyConfig) {
         SeaTunnelRowType seaTunnelRowType = catalogTable.getSeaTunnelRowType();
diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/test/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfigTest.java
 
b/seatunnel-connectors-v2/connector-kafka/src/test/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfigTest.java
index 97b2406cf6..adde5bc861 100644
--- 
a/seatunnel-connectors-v2/connector-kafka/src/test/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfigTest.java
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/test/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfigTest.java
@@ -32,6 +32,7 @@ import java.util.Map;
 import static 
org.apache.seatunnel.api.options.ConnectorCommonOptions.DATABASE_NAME;
 import static 
org.apache.seatunnel.api.options.ConnectorCommonOptions.SCHEMA_NAME;
 import static 
org.apache.seatunnel.api.options.ConnectorCommonOptions.TABLE_NAME;
+import static 
org.apache.seatunnel.api.options.table.TableIdentifierOptions.TABLE;
 import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSourceOptions.DEBEZIUM_RECORD_TABLE_FILTER;
 
 public class KafkaSourceConfigTest {
@@ -71,4 +72,34 @@ public class KafkaSourceConfigTest {
                         .getTableDeserializationMap()
                         .get(TablePath.of("test.test.test")));
     }
+
+    @Test
+    void testDeserializationWithSchema() {
+        Map<String, Object> schemaFields = new HashMap<>();
+        schemaFields.put("id", "int");
+        schemaFields.put("name", "string");
+        schemaFields.put("description", "string");
+        schemaFields.put("weight", "string");
+
+        Map<String, Object> schema = new HashMap<>();
+        schema.put("fields", schemaFields);
+        schema.put(TABLE.key(), "db1.table1");
+
+        Map<String, Object> configMap = new HashMap<>();
+        configMap.put("bootstrap.servers", "localhost:9092");
+        configMap.put("group.id", "test");
+        configMap.put("topic", "test");
+        configMap.put("schema", schema);
+        configMap.put("format", "text");
+
+        KafkaSourceConfig sourceConfig = new 
KafkaSourceConfig(ReadonlyConfig.fromMap(configMap));
+
+        DeserializationSchema<SeaTunnelRow> deserializationSchema =
+                sourceConfig
+                        .getMapMetadata()
+                        .get(TablePath.of("db1.table1"))
+                        .getDeserializationSchema();
+
+        Assertions.assertNotNull(deserializationSchema);
+    }
 }

Reply via email to