This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 2cb88c6ffd5badcdca63c563cfac33e1d05a48bb Author: Enrique Fernández <[email protected]> AuthorDate: Wed May 7 07:38:30 2025 +0200 [improve][io][kca] support fully-qualified topic names in source records (#24248) (cherry picked from commit de1d4c96c7b77df87a1ab5be2ea87a8d570cd572) --- .../kafka/connect/AbstractKafkaConnectSource.java | 13 ++++++- .../io/kafka/connect/KafkaConnectSourceTest.java | 43 ++++++++++++++++++++++ 2 files changed, 55 insertions(+), 1 deletion(-) diff --git a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java index ad2e3d8001b..270decd8ab5 100644 --- a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java +++ b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java @@ -46,6 +46,7 @@ import org.apache.kafka.connect.storage.OffsetBackingStore; import org.apache.kafka.connect.storage.OffsetStorageReader; import org.apache.kafka.connect.storage.OffsetStorageReaderImpl; import org.apache.kafka.connect.storage.OffsetStorageWriter; +import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.functions.api.Record; import org.apache.pulsar.io.core.Source; import org.apache.pulsar.io.core.SourceContext; @@ -243,7 +244,17 @@ public abstract class AbstractKafkaConnectSource<T> implements Source<T> { KafkaSchemaWrappedSchema valueSchema; AbstractKafkaSourceRecord(SourceRecord srcRecord) { - this.destinationTopic = Optional.of("persistent://" + topicNamespace + "/" + srcRecord.topic()); + String topic = srcRecord.topic(); + if (topic.contains("://")) { + try { + TopicName.get(topic); + this.destinationTopic = Optional.of(topic); + } catch (IllegalArgumentException e) { + this.destinationTopic = Optional.of("persistent://" + topicNamespace + "/" + topic); + } + } else { + this.destinationTopic = Optional.of("persistent://" + topicNamespace + "/" + topic); + } this.partitionIndex = Optional.ofNullable(srcRecord.kafkaPartition()); } diff --git a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSourceTest.java b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSourceTest.java index f00749ba7df..7347d07ee0e 100644 --- a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSourceTest.java +++ b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSourceTest.java @@ -30,6 +30,7 @@ import java.io.OutputStream; import java.nio.file.Files; import java.util.HashMap; import java.util.Map; +import java.util.Optional; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.connect.file.FileStreamSourceConnector; import org.apache.kafka.connect.runtime.TaskConfig; @@ -39,6 +40,7 @@ import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.common.schema.KeyValue; import org.apache.pulsar.functions.api.Record; import org.apache.pulsar.io.core.SourceContext; +import org.apache.pulsar.io.kafka.connect.KafkaConnectSource.KafkaSourceRecord; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @@ -122,6 +124,47 @@ public class KafkaConnectSourceTest extends ProducerConsumerBase { runTransformTest(config, false); } + @Test + void testShortTopicNames() throws Exception { + Map<String, Object> config = getConfig(); + config.put(TaskConfig.TASK_CLASS_CONFIG, "org.apache.kafka.connect.file.FileStreamSourceTask"); + config.put(PulsarKafkaWorkerConfig.TOPIC_NAMESPACE_CONFIG, "default-tenant/default-ns"); + + runTopicNameTest(config, "a-topic", "persistent://default-tenant/default-ns/a-topic"); + } + + @Test + void testFullyQualifiedTopicNames() throws Exception { + Map<String, Object> config = getConfig(); + config.put(TaskConfig.TASK_CLASS_CONFIG, "org.apache.kafka.connect.file.FileStreamSourceTask"); + config.put(PulsarKafkaWorkerConfig.TOPIC_NAMESPACE_CONFIG, "default-tenant/default-ns"); + + runTopicNameTest(config, "persistent://a-tenant/a-ns/a-topic", "persistent://a-tenant/a-ns/a-topic"); + } + + private void runTopicNameTest(Map<String, Object> config, String topicName, String expectedDestinationTopicName) throws Exception { + config.put(TaskConfig.TASK_CLASS_CONFIG, "org.apache.kafka.connect.file.FileStreamSourceTask"); + config.put(PulsarKafkaWorkerConfig.TOPIC_NAMESPACE_CONFIG, "default-tenant/default-ns"); + + kafkaConnectSource = new KafkaConnectSource(); + kafkaConnectSource.open(config, context); + + Map<String, Object> sourcePartition = new HashMap<>(); + Map<String, Object> sourceOffset = new HashMap<>(); + Map<String, Object> value = new HashMap<>(); + sourcePartition.put("test", "test"); + sourceOffset.put("test", 0); + value.put("myField", "42"); + SourceRecord srcRecord = new SourceRecord( + sourcePartition, sourceOffset, topicName, null, + null, null, null, value + ); + + KafkaSourceRecord record = kafkaConnectSource.processSourceRecord(srcRecord); + + assertEquals(Optional.of(expectedDestinationTopicName), record.destinationTopic); + } + private Map<String, Object> setupTransformConfig(boolean withPredicate, boolean negated) { Map<String, Object> config = getConfig(); config.put(TaskConfig.TASK_CLASS_CONFIG, "org.apache.kafka.connect.file.FileStreamSourceTask");
