dmariassy commented on code in PR #109: URL: https://github.com/apache/flink-connector-kafka/pull/109#discussion_r1688736888
########## flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaRecordSerializationSchema.java: ########## @@ -144,14 +147,34 @@ public void open( valueSerialization.open(context); } + private String getTargetTopic(RowData element) { + final String topic = readMetadata(element, KafkaDynamicSink.WritableMetadata.TOPIC); + if (topic == null && topics == null) { + throw new IllegalArgumentException( + "The topic of the sink record is not valid. Expected a single topic but no topic is set."); + } else if (topic == null && topics.size() == 1) { + return topics.get(0); Review Comment: Can we start wit the `topics.size == 1` check and immediately return if the connector is used with a static target topic? This could save the processing in `readMetadata`. ########## docs/content/docs/connectors/table/kafka.md: ########## @@ -196,11 +196,11 @@ Connector Options </tr> <tr> <td><h5>topic</h5></td> - <td>required for sink</td> + <td>optional</td> <td>yes</td> <td style="word-wrap: break-word;">(none)</td> <td>String</td> - <td>Topic name(s) to read data from when the table is used as source. It also supports topic list for source by separating topic by semicolon like <code>'topic-1;topic-2'</code>. Note, only one of "topic-pattern" and "topic" can be specified for sources. When the table is used as sink, the topic name is the topic to write data to. Note topic list is not supported for sinks.</td> + <td>Topic name(s) to read data from when the table is used as source, or allowed topics for writing when the table is used as sink. It also supports topic list for source by separating topic by semicolon like <code>'topic-1;topic-2'</code>. Note, only one of "topic-pattern" and "topic" can be specified for sources. For sinks, the topic or list of topics provided are they only valid values (whitelist) that can be written to the `topic` metadata column for determining what topic to produce to. If only a single topic is provided, this is the default topic to produce to. If not provided, any value of `topic` metadata is permitted.</td> Review Comment: Nit: `allow list` might be a more inclusive term than `whitelist` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org