dmariassy commented on code in PR #109:
URL: 
https://github.com/apache/flink-connector-kafka/pull/109#discussion_r1688736888


##########
flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaRecordSerializationSchema.java:
##########
@@ -144,14 +147,34 @@ public void open(
         valueSerialization.open(context);
     }
 
+    private String getTargetTopic(RowData element) {
+        final String topic = readMetadata(element, 
KafkaDynamicSink.WritableMetadata.TOPIC);
+        if (topic == null && topics == null) {
+            throw new IllegalArgumentException(
+                    "The topic of the sink record is not valid. Expected a 
single topic but no topic is set.");
+        } else if (topic == null && topics.size() == 1) {
+            return topics.get(0);

Review Comment:
   Can we start wit the `topics.size == 1` check and immediately return if the 
connector is used with a static target topic? This could save the processing in 
`readMetadata`.



##########
docs/content/docs/connectors/table/kafka.md:
##########
@@ -196,11 +196,11 @@ Connector Options
     </tr>
     <tr>
       <td><h5>topic</h5></td>
-      <td>required for sink</td>
+      <td>optional</td>
       <td>yes</td>
       <td style="word-wrap: break-word;">(none)</td>
       <td>String</td>
-      <td>Topic name(s) to read data from when the table is used as source. It 
also supports topic list for source by separating topic by semicolon like 
<code>'topic-1;topic-2'</code>. Note, only one of "topic-pattern" and "topic" 
can be specified for sources. When the table is used as sink, the topic name is 
the topic to write data to. Note topic list is not supported for sinks.</td>
+      <td>Topic name(s) to read data from when the table is used as source, or 
allowed topics for writing when the table is used as sink. It also supports 
topic list for source by separating topic by semicolon like 
<code>'topic-1;topic-2'</code>. Note, only one of "topic-pattern" and "topic" 
can be specified for sources. For sinks, the topic or list of topics provided 
are they only valid values (whitelist) that can be written to the `topic` 
metadata column for determining what topic to produce to. If only a single 
topic is provided, this is the default topic to produce to. If not provided, 
any value of `topic` metadata is permitted.</td>

Review Comment:
   Nit: `allow list` might be a more inclusive term than `whitelist`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to