twalthr commented on code in PR #258:
URL:
https://github.com/apache/flink-connector-kafka/pull/258#discussion_r3363244449
##########
flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSink.java:
##########
@@ -274,6 +275,13 @@ public Map<String, DataType> listWritableMetadata() {
@Override
public void applyWritableMetadata(List<String> metadataKeys, DataType
consumedDataType) {
+ if (metadataKeys.contains(WritableMetadata.HEADERS.key)
+ && metadataKeys.contains(WritableMetadata.HEADER_LIST.key)) {
+ throw new ValidationException(
+ String.format(
+ "Use either metadata keys '%s' or '%s'.",
Review Comment:
```suggestion
"The writable metadata is ambiguous. Please use
either metadata key '%s' or '%s'.",
```
##########
flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSink.java:
##########
@@ -447,16 +455,61 @@ public Object read(RowData row, int pos) {
final ArrayData valueArray = map.valueArray();
final List<Header> headers = new ArrayList<>();
for (int i = 0; i < keyArray.size(); i++) {
- if (!keyArray.isNullAt(i) &&
!valueArray.isNullAt(i)) {
+ if (!keyArray.isNullAt(i)) {
+ // StringData.toString() decodes UTF-8;
invalid bytes produce
+ // replacement characters (see FLIP-568).
final String key =
keyArray.getString(i).toString();
- final byte[] value = valueArray.getBinary(i);
+ final byte[] value =
+ valueArray.isNullAt(i) ? null :
valueArray.getBinary(i);
headers.add(new KafkaHeader(key, value));
}
}
return headers;
}
}),
+ /**
+ * Wire-faithful alternative to {@code headers}: preserves duplicate
keys and insertion
+ * order using {@code ARRAY<ROW<key STRING, value BYTES>>} instead of
a lossy MAP.
+ */
+ HEADER_LIST(
+ "header-list",
+ DataTypes.ARRAY(
+ DataTypes.ROW(
+ DataTypes.FIELD(
+ "key",
DataTypes.STRING().nullable()),
+ DataTypes.FIELD(
+ "value",
DataTypes.BYTES().nullable()))
+ .notNull())
Review Comment:
nit: I would also make this nullable. I don't know how good the SQL planner
can handle it otherwise. Take e.g. `header[x].key`. If `header` is nullable,
the call chain needs to be nullable anyways.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]