AHeise commented on code in PR #109: URL: https://github.com/apache/flink-connector-kafka/pull/109#discussion_r1699525877
########## flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaRecordSerializationSchema.java: ########## @@ -144,14 +155,41 @@ public void open( valueSerialization.open(context); } + private String getTargetTopic(RowData element) { + if (topics != null && topics.size() == 1) { + // If topics is a singleton list, we only return the provided topic. + return topics.get(0); + } + final String targetTopic = readMetadata(element, KafkaDynamicSink.WritableMetadata.TOPIC); + if (targetTopic == null) { + throw new IllegalArgumentException( + "The topic of the sink record is not valid. Expected a single topic but no topic is set."); + } else if (topics != null && !topics.contains(targetTopic)) { + throw new IllegalArgumentException( + String.format( + "The topic of the sink record is not valid. Expected topic to be in: %s but was: %s", + topics, targetTopic)); + } else if (topicPattern != null && !topicPattern.matcher(targetTopic).matches()) { Review Comment: We should add a cache here that remembers valid topics because these matchers are expensive. ########## flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaRecordSerializationSchema.java: ########## @@ -115,14 +126,14 @@ public ProducerRecord<byte[], byte[]> serialize( consumedRow, kind, valueFieldGetters); valueSerialized = valueSerialization.serialize(valueRow); } - + final String targetTopic = getTargetTopic(consumedRow); return new ProducerRecord<>( - topic, + targetTopic, extractPartition( Review Comment: Pass targetTopic to ` extractPartition` so we don't need to call `getTargetTopic` twice. ########## flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaConnectorOptionsUtil.java: ########## @@ -636,21 +618,25 @@ public static DynamicTableFactory.Context autoCompleteSchemaRegistrySubject( private static Map<String, String> autoCompleteSchemaRegistrySubject( Map<String, String> options) { Configuration configuration = Configuration.fromMap(options); - // the subject autoComplete should only be used in sink, check the topic first - validateSinkTopic(configuration); - final Optional<String> valueFormat = configuration.getOptional(VALUE_FORMAT); - final Optional<String> keyFormat = configuration.getOptional(KEY_FORMAT); - final Optional<String> format = configuration.getOptional(FORMAT); - final String topic = configuration.get(TOPIC).get(0); - - if (format.isPresent() && SCHEMA_REGISTRY_FORMATS.contains(format.get())) { - autoCompleteSubject(configuration, format.get(), topic + "-value"); - } else if (valueFormat.isPresent() && SCHEMA_REGISTRY_FORMATS.contains(valueFormat.get())) { - autoCompleteSubject(configuration, "value." + valueFormat.get(), topic + "-value"); - } + // the subject autoComplete should only be used in sink with a single topic, check the topic Review Comment: Why wouldn't we be able to register it for all topics? I probably haven't understood this feature entirely. Obviously it won't work with topicPattern. ########## flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSink.java: ########## @@ -251,6 +258,12 @@ public DataStreamSink<?> consumeDataStream( public Map<String, DataType> listWritableMetadata() { final Map<String, DataType> metadataMap = new LinkedHashMap<>(); Stream.of(WritableMetadata.values()) + // When `topic` is a singleton list, TOPIC metadata is not writable + .filter( + m -> + topics == null + || topics.size() > 1 + || !WritableMetadata.TOPIC.key.equals(m.key)) Review Comment: This might be easier to read if we switch to for-loop. This hole section is a bit of an anti-pattern: We use streams to write to the map through side-effects, which is neither imperative nor functional. ########## flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaRecordSerializationSchemaTest.java: ########## @@ -0,0 +1,147 @@ +package org.apache.flink.streaming.connectors.kafka.table; + +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.util.TestLogger; + +import org.apache.kafka.clients.producer.ProducerRecord; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.regex.Pattern; +import java.util.stream.Stream; + +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.junit.jupiter.api.Assertions.assertEquals; + +/** Tests for {@link DynamicKafkaRecordSerializationSchema}. */ +public class DynamicKafkaRecordSerializationSchemaTest extends TestLogger { + private static final List<String> TOPICS = Arrays.asList("topic1;topic2".split(";")); + private static final String TOPIC = "topic"; Review Comment: ```suggestion private static final String SINGLE_TOPIC = "topic"; ``` ########## flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaRecordSerializationSchemaTest.java: ########## @@ -0,0 +1,147 @@ +package org.apache.flink.streaming.connectors.kafka.table; + +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.util.TestLogger; + +import org.apache.kafka.clients.producer.ProducerRecord; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.regex.Pattern; +import java.util.stream.Stream; + +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.junit.jupiter.api.Assertions.assertEquals; + +/** Tests for {@link DynamicKafkaRecordSerializationSchema}. */ +public class DynamicKafkaRecordSerializationSchemaTest extends TestLogger { + private static final List<String> TOPICS = Arrays.asList("topic1;topic2".split(";")); Review Comment: ```suggestion private static final List<String> MULTIPLE_TOPICS = Arrays.asList("topic1", "topic2"); ``` ########## flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSink.java: ########## @@ -393,6 +409,20 @@ private RowData.FieldGetter[] getFieldGetters( // -------------------------------------------------------------------------------------------- enum WritableMetadata { + TOPIC( + "topic", + DataTypes.STRING().nullable(), Review Comment: We allow TOPIC as a MD column only iff there are >1 topics or topicPattern. At this point, isn't it a required MD? Is there any case, where I can set 'topic'=null? ########## flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaRecordSerializationSchemaTest.java: ########## @@ -0,0 +1,147 @@ +package org.apache.flink.streaming.connectors.kafka.table; + +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.util.TestLogger; + +import org.apache.kafka.clients.producer.ProducerRecord; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.regex.Pattern; +import java.util.stream.Stream; + +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.junit.jupiter.api.Assertions.assertEquals; + +/** Tests for {@link DynamicKafkaRecordSerializationSchema}. */ +public class DynamicKafkaRecordSerializationSchemaTest extends TestLogger { + private static final List<String> TOPICS = Arrays.asList("topic1;topic2".split(";")); + private static final String TOPIC = "topic"; + private static final Pattern TOPIC_PATTERN = Pattern.compile("topic*"); + + @ParameterizedTest + @MethodSource("provideTopicMetadataTestParameters") + public void testTopicMetadata( + List<String> topics, Pattern topicPattern, String rowTopic, String expectedTopic) { + GenericRowData rowData = createRowData(rowTopic); + DynamicKafkaRecordSerializationSchema schema = createSchema(topics, topicPattern); + KafkaRecordSerializationSchema.KafkaSinkContext context = createContext(); + + // Call serialize method + ProducerRecord<byte[], byte[]> record = schema.serialize(rowData, context, null); + + // Assert the returned ProducerRecord is routed to the correct topic + assertEquals(record.topic(), expectedTopic); Review Comment: I'm not sure what the default is now, but you mix assertj and junit5 assertions in this test. How about using just assertj? `assertThat(record.topic()).isEqualTo(expectedTopic)` ########## flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaRecordSerializationSchema.java: ########## @@ -30,12 +30,16 @@ import javax.annotation.Nullable; +import java.util.List; +import java.util.regex.Pattern; + import static org.apache.flink.util.Preconditions.checkNotNull; /** SerializationSchema used by {@link KafkaDynamicSink} to configure a {@link KafkaSink}. */ class DynamicKafkaRecordSerializationSchema implements KafkaRecordSerializationSchema<RowData> { - private final String topic; + private final List<String> topics; Review Comment: This should be a (hash) set since the lookup is on the hot path. ########## flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java: ########## @@ -1002,39 +1081,22 @@ public void testExactlyOnceGuaranteeWithoutTransactionalIdPrefix() { } @Test - public void testSinkWithTopicListOrTopicPattern() { + public void testSinkWithTopicListAndTopicPattern() { Map<String, String> modifiedOptions = getModifiedOptions( getBasicSinkOptions(), options -> { options.put("topic", TOPICS); options.put("scan.startup.mode", "earliest-offset"); options.remove("specific-offsets"); + options.put("topic-pattern", TOPIC_REGEX); }); final String errorMessageTemp = - "Flink Kafka sink currently only supports single topic, but got %s: %s."; - + "Option 'topic' and 'topic-pattern' shouldn't be set together."; try { createTableSink(SCHEMA, modifiedOptions); } catch (Throwable t) { - assertThat(t.getCause().getMessage()) - .isEqualTo( - String.format( - errorMessageTemp, - "'topic'", - String.format("[%s]", String.join(", ", TOPIC_LIST)))); - } - - modifiedOptions = - getModifiedOptions( - getBasicSinkOptions(), - options -> options.put("topic-pattern", TOPIC_REGEX)); - - try { - createTableSink(SCHEMA, modifiedOptions); - } catch (Throwable t) { - assertThat(t.getCause().getMessage()) - .isEqualTo(String.format(errorMessageTemp, "'topic-pattern'", TOPIC_REGEX)); + assertThat(t.getCause().getMessage()).isEqualTo(errorMessageTemp); } Review Comment: Can we switch to `assertThatThrownBy`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org