This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 2cb88c6ffd5badcdca63c563cfac33e1d05a48bb
Author: Enrique Fernández <[email protected]>
AuthorDate: Wed May 7 07:38:30 2025 +0200

    [improve][io][kca] support fully-qualified topic names in source records 
(#24248)
    
    (cherry picked from commit de1d4c96c7b77df87a1ab5be2ea87a8d570cd572)
---
 .../kafka/connect/AbstractKafkaConnectSource.java  | 13 ++++++-
 .../io/kafka/connect/KafkaConnectSourceTest.java   | 43 ++++++++++++++++++++++
 2 files changed, 55 insertions(+), 1 deletion(-)

diff --git 
a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java
 
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java
index ad2e3d8001b..270decd8ab5 100644
--- 
a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java
+++ 
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java
@@ -46,6 +46,7 @@ import org.apache.kafka.connect.storage.OffsetBackingStore;
 import org.apache.kafka.connect.storage.OffsetStorageReader;
 import org.apache.kafka.connect.storage.OffsetStorageReaderImpl;
 import org.apache.kafka.connect.storage.OffsetStorageWriter;
+import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.io.core.Source;
 import org.apache.pulsar.io.core.SourceContext;
@@ -243,7 +244,17 @@ public abstract class AbstractKafkaConnectSource<T> 
implements Source<T> {
         KafkaSchemaWrappedSchema valueSchema;
 
         AbstractKafkaSourceRecord(SourceRecord srcRecord) {
-            this.destinationTopic = Optional.of("persistent://" + 
topicNamespace + "/" + srcRecord.topic());
+            String topic = srcRecord.topic();
+            if (topic.contains("://")) {
+                try {
+                    TopicName.get(topic);
+                    this.destinationTopic = Optional.of(topic);
+                } catch (IllegalArgumentException e) {
+                    this.destinationTopic = Optional.of("persistent://" + 
topicNamespace + "/" + topic);
+                }
+            } else {
+                this.destinationTopic = Optional.of("persistent://" + 
topicNamespace + "/" + topic);
+            }
             this.partitionIndex = 
Optional.ofNullable(srcRecord.kafkaPartition());
         }
 
diff --git 
a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSourceTest.java
 
b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSourceTest.java
index f00749ba7df..7347d07ee0e 100644
--- 
a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSourceTest.java
+++ 
b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSourceTest.java
@@ -30,6 +30,7 @@ import java.io.OutputStream;
 import java.nio.file.Files;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Optional;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.connect.file.FileStreamSourceConnector;
 import org.apache.kafka.connect.runtime.TaskConfig;
@@ -39,6 +40,7 @@ import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.common.schema.KeyValue;
 import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.io.core.SourceContext;
+import org.apache.pulsar.io.kafka.connect.KafkaConnectSource.KafkaSourceRecord;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
@@ -122,6 +124,47 @@ public class KafkaConnectSourceTest extends 
ProducerConsumerBase  {
         runTransformTest(config, false);
     }
 
+    @Test
+    void testShortTopicNames() throws Exception {
+        Map<String, Object> config = getConfig();
+        config.put(TaskConfig.TASK_CLASS_CONFIG, 
"org.apache.kafka.connect.file.FileStreamSourceTask");
+        config.put(PulsarKafkaWorkerConfig.TOPIC_NAMESPACE_CONFIG, 
"default-tenant/default-ns");
+
+        runTopicNameTest(config, "a-topic", 
"persistent://default-tenant/default-ns/a-topic");
+    }
+
+    @Test
+    void testFullyQualifiedTopicNames() throws Exception {
+        Map<String, Object> config = getConfig();
+        config.put(TaskConfig.TASK_CLASS_CONFIG, 
"org.apache.kafka.connect.file.FileStreamSourceTask");
+        config.put(PulsarKafkaWorkerConfig.TOPIC_NAMESPACE_CONFIG, 
"default-tenant/default-ns");
+
+        runTopicNameTest(config, "persistent://a-tenant/a-ns/a-topic", 
"persistent://a-tenant/a-ns/a-topic");
+    }
+
+    private void runTopicNameTest(Map<String, Object> config, String 
topicName, String expectedDestinationTopicName) throws Exception {
+        config.put(TaskConfig.TASK_CLASS_CONFIG, 
"org.apache.kafka.connect.file.FileStreamSourceTask");
+        config.put(PulsarKafkaWorkerConfig.TOPIC_NAMESPACE_CONFIG, 
"default-tenant/default-ns");
+
+        kafkaConnectSource = new KafkaConnectSource();
+        kafkaConnectSource.open(config, context);
+
+        Map<String, Object> sourcePartition = new HashMap<>();
+        Map<String, Object> sourceOffset = new HashMap<>();
+        Map<String, Object> value = new HashMap<>();
+        sourcePartition.put("test", "test");
+        sourceOffset.put("test", 0);
+        value.put("myField", "42");
+        SourceRecord srcRecord = new SourceRecord(
+            sourcePartition, sourceOffset, topicName, null,
+            null, null, null, value
+        );
+
+        KafkaSourceRecord record = 
kafkaConnectSource.processSourceRecord(srcRecord);
+
+        assertEquals(Optional.of(expectedDestinationTopicName), 
record.destinationTopic);
+    }
+
     private Map<String, Object> setupTransformConfig(boolean withPredicate, 
boolean negated) {
         Map<String, Object> config = getConfig();
         config.put(TaskConfig.TASK_CLASS_CONFIG, 
"org.apache.kafka.connect.file.FileStreamSourceTask");

Reply via email to