AHeise commented on code in PR #109:
URL: 
https://github.com/apache/flink-connector-kafka/pull/109#discussion_r1699525877


##########
flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaRecordSerializationSchema.java:
##########
@@ -144,14 +155,41 @@ public void open(
         valueSerialization.open(context);
     }
 
+    private String getTargetTopic(RowData element) {
+        if (topics != null && topics.size() == 1) {
+            // If topics is a singleton list, we only return the provided 
topic.
+            return topics.get(0);
+        }
+        final String targetTopic = readMetadata(element, 
KafkaDynamicSink.WritableMetadata.TOPIC);
+        if (targetTopic == null) {
+            throw new IllegalArgumentException(
+                    "The topic of the sink record is not valid. Expected a 
single topic but no topic is set.");
+        } else if (topics != null && !topics.contains(targetTopic)) {
+            throw new IllegalArgumentException(
+                    String.format(
+                            "The topic of the sink record is not valid. 
Expected topic to be in: %s but was: %s",
+                            topics, targetTopic));
+        } else if (topicPattern != null && 
!topicPattern.matcher(targetTopic).matches()) {

Review Comment:
   We should add a cache here that remembers valid topics because these 
matchers are expensive.



##########
flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaRecordSerializationSchema.java:
##########
@@ -115,14 +126,14 @@ public ProducerRecord<byte[], byte[]> serialize(
                             consumedRow, kind, valueFieldGetters);
             valueSerialized = valueSerialization.serialize(valueRow);
         }
-
+        final String targetTopic = getTargetTopic(consumedRow);
         return new ProducerRecord<>(
-                topic,
+                targetTopic,
                 extractPartition(

Review Comment:
   Pass targetTopic to ` extractPartition` so we don't need to call 
`getTargetTopic` twice.



##########
flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaConnectorOptionsUtil.java:
##########
@@ -636,21 +618,25 @@ public static DynamicTableFactory.Context 
autoCompleteSchemaRegistrySubject(
     private static Map<String, String> autoCompleteSchemaRegistrySubject(
             Map<String, String> options) {
         Configuration configuration = Configuration.fromMap(options);
-        // the subject autoComplete should only be used in sink, check the 
topic first
-        validateSinkTopic(configuration);
-        final Optional<String> valueFormat = 
configuration.getOptional(VALUE_FORMAT);
-        final Optional<String> keyFormat = 
configuration.getOptional(KEY_FORMAT);
-        final Optional<String> format = configuration.getOptional(FORMAT);
-        final String topic = configuration.get(TOPIC).get(0);
-
-        if (format.isPresent() && 
SCHEMA_REGISTRY_FORMATS.contains(format.get())) {
-            autoCompleteSubject(configuration, format.get(), topic + "-value");
-        } else if (valueFormat.isPresent() && 
SCHEMA_REGISTRY_FORMATS.contains(valueFormat.get())) {
-            autoCompleteSubject(configuration, "value." + valueFormat.get(), 
topic + "-value");
-        }
+        // the subject autoComplete should only be used in sink with a single 
topic, check the topic

Review Comment:
   Why wouldn't we be able to register it for all topics? I probably haven't 
understood this feature entirely.
   Obviously it won't work with topicPattern.



##########
flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSink.java:
##########
@@ -251,6 +258,12 @@ public DataStreamSink<?> consumeDataStream(
     public Map<String, DataType> listWritableMetadata() {
         final Map<String, DataType> metadataMap = new LinkedHashMap<>();
         Stream.of(WritableMetadata.values())
+                // When `topic` is a singleton list, TOPIC metadata is not 
writable
+                .filter(
+                        m ->
+                                topics == null
+                                        || topics.size() > 1
+                                        || 
!WritableMetadata.TOPIC.key.equals(m.key))

Review Comment:
   This might be easier to read if we switch to for-loop. This hole section is 
a bit of an anti-pattern: We use streams to write to the map through 
side-effects, which is neither imperative nor functional.



##########
flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaRecordSerializationSchemaTest.java:
##########
@@ -0,0 +1,147 @@
+package org.apache.flink.streaming.connectors.kafka.table;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.regex.Pattern;
+import java.util.stream.Stream;
+
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/** Tests for {@link DynamicKafkaRecordSerializationSchema}. */
+public class DynamicKafkaRecordSerializationSchemaTest extends TestLogger {
+    private static final List<String> TOPICS = 
Arrays.asList("topic1;topic2".split(";"));
+    private static final String TOPIC = "topic";

Review Comment:
   ```suggestion
       private static final String SINGLE_TOPIC = "topic";
   ```



##########
flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaRecordSerializationSchemaTest.java:
##########
@@ -0,0 +1,147 @@
+package org.apache.flink.streaming.connectors.kafka.table;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.regex.Pattern;
+import java.util.stream.Stream;
+
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/** Tests for {@link DynamicKafkaRecordSerializationSchema}. */
+public class DynamicKafkaRecordSerializationSchemaTest extends TestLogger {
+    private static final List<String> TOPICS = 
Arrays.asList("topic1;topic2".split(";"));

Review Comment:
   ```suggestion
       private static final List<String> MULTIPLE_TOPICS = 
Arrays.asList("topic1", "topic2");
   ```



##########
flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSink.java:
##########
@@ -393,6 +409,20 @@ private RowData.FieldGetter[] getFieldGetters(
     // 
--------------------------------------------------------------------------------------------
 
     enum WritableMetadata {
+        TOPIC(
+                "topic",
+                DataTypes.STRING().nullable(),

Review Comment:
   We allow TOPIC as a MD column only iff there are >1 topics or topicPattern. 
At this point, isn't it a required MD?
   
   Is there any case, where I can set 'topic'=null?



##########
flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaRecordSerializationSchemaTest.java:
##########
@@ -0,0 +1,147 @@
+package org.apache.flink.streaming.connectors.kafka.table;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.regex.Pattern;
+import java.util.stream.Stream;
+
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/** Tests for {@link DynamicKafkaRecordSerializationSchema}. */
+public class DynamicKafkaRecordSerializationSchemaTest extends TestLogger {
+    private static final List<String> TOPICS = 
Arrays.asList("topic1;topic2".split(";"));
+    private static final String TOPIC = "topic";
+    private static final Pattern TOPIC_PATTERN = Pattern.compile("topic*");
+
+    @ParameterizedTest
+    @MethodSource("provideTopicMetadataTestParameters")
+    public void testTopicMetadata(
+            List<String> topics, Pattern topicPattern, String rowTopic, String 
expectedTopic) {
+        GenericRowData rowData = createRowData(rowTopic);
+        DynamicKafkaRecordSerializationSchema schema = createSchema(topics, 
topicPattern);
+        KafkaRecordSerializationSchema.KafkaSinkContext context = 
createContext();
+
+        // Call serialize method
+        ProducerRecord<byte[], byte[]> record = schema.serialize(rowData, 
context, null);
+
+        // Assert the returned ProducerRecord is routed to the correct topic
+        assertEquals(record.topic(), expectedTopic);

Review Comment:
   I'm not sure what the default is now, but you mix assertj and junit5 
assertions in this test. How about using just assertj?
   
   `assertThat(record.topic()).isEqualTo(expectedTopic)`



##########
flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaRecordSerializationSchema.java:
##########
@@ -30,12 +30,16 @@
 
 import javax.annotation.Nullable;
 
+import java.util.List;
+import java.util.regex.Pattern;
+
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /** SerializationSchema used by {@link KafkaDynamicSink} to configure a {@link 
KafkaSink}. */
 class DynamicKafkaRecordSerializationSchema implements 
KafkaRecordSerializationSchema<RowData> {
 
-    private final String topic;
+    private final List<String> topics;

Review Comment:
   This should be a (hash) set since the lookup is on the hot path.



##########
flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java:
##########
@@ -1002,39 +1081,22 @@ public void 
testExactlyOnceGuaranteeWithoutTransactionalIdPrefix() {
     }
 
     @Test
-    public void testSinkWithTopicListOrTopicPattern() {
+    public void testSinkWithTopicListAndTopicPattern() {
         Map<String, String> modifiedOptions =
                 getModifiedOptions(
                         getBasicSinkOptions(),
                         options -> {
                             options.put("topic", TOPICS);
                             options.put("scan.startup.mode", 
"earliest-offset");
                             options.remove("specific-offsets");
+                            options.put("topic-pattern", TOPIC_REGEX);
                         });
         final String errorMessageTemp =
-                "Flink Kafka sink currently only supports single topic, but 
got %s: %s.";
-
+                "Option 'topic' and 'topic-pattern' shouldn't be set 
together.";
         try {
             createTableSink(SCHEMA, modifiedOptions);
         } catch (Throwable t) {
-            assertThat(t.getCause().getMessage())
-                    .isEqualTo(
-                            String.format(
-                                    errorMessageTemp,
-                                    "'topic'",
-                                    String.format("[%s]", String.join(", ", 
TOPIC_LIST))));
-        }
-
-        modifiedOptions =
-                getModifiedOptions(
-                        getBasicSinkOptions(),
-                        options -> options.put("topic-pattern", TOPIC_REGEX));
-
-        try {
-            createTableSink(SCHEMA, modifiedOptions);
-        } catch (Throwable t) {
-            assertThat(t.getCause().getMessage())
-                    .isEqualTo(String.format(errorMessageTemp, 
"'topic-pattern'", TOPIC_REGEX));
+            assertThat(t.getCause().getMessage()).isEqualTo(errorMessageTemp);
         }

Review Comment:
   Can we switch to `assertThatThrownBy`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to