[
https://issues.apache.org/jira/browse/BEAM-14297?focusedWorklogId=774209&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-774209
]
ASF GitHub Bot logged work on BEAM-14297:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 24/May/22 20:09
Start Date: 24/May/22 20:09
Worklog Time Spent: 10m
Work Description: chamikaramj commented on code in PR #17742:
URL: https://github.com/apache/beam/pull/17742#discussion_r880907978
##########
sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java:
##########
@@ -203,6 +210,47 @@ public void testKafkaIOReadsAndWritesCorrectlyInBatch()
throws IOException {
}
}
+ // This test roundtrips a single KV<Null,Null> to verify that
externalWithMetadata
+ // can handle null keys and values correctly.
+ @Test
+ public void testKafkaIOExternalRoundtripWithMetadataAndNullKeysAndValues() {
+
+ List<byte[]> nullList = new ArrayList<>();
+ nullList.add(null);
+ writePipeline
+ .apply(Create.of(nullList))
+ .apply(
+ ParDo.of(
+ new DoFn<byte[], KV<byte[], byte[]>>() {
+ @ProcessElement
+ public void processElement(
+ @Element byte[] element, OutputReceiver<KV<byte[],
byte[]>> receiver) {
+ receiver.output(KV.of(element, element));
+ }
+ }))
+ .apply(
+ KafkaIO.<byte[], byte[]>write()
+
.withBootstrapServers(options.getKafkaBootstrapServerAddresses())
+ .withTopic(options.getKafkaTopic())
+ .withKeySerializer(ByteArraySerializer.class)
+ .withValueSerializer(ByteArraySerializer.class));
+
+ PipelineResult writeResult = writePipeline.run();
+ writeResult.waitUntilFinish();
+
+ readPipeline.apply(
+ KafkaIO.<byte[], byte[]>read()
+ .withBootstrapServers(options.getKafkaBootstrapServerAddresses())
+ .withTopic(options.getKafkaTopic())
+ .withKeyDeserializerAndCoder(
+ ByteArrayDeserializer.class,
NullableCoder.of(ByteArrayCoder.of()))
+ .withValueDeserializerAndCoder(
+ ByteArrayDeserializer.class,
NullableCoder.of(ByteArrayCoder.of()))
+ .externalWithMetadata());
+ PipelineResult readResult = readPipeline.run();
+ readResult.waitUntilFinish();
Review Comment:
I don't think you need additional changes in this PR for that. Thanks.
Issue Time Tracking
-------------------
Worklog Id: (was: 774209)
Time Spent: 4h (was: 3h 50m)
> Enable null key and value for metadata kafka xlang
> --------------------------------------------------
>
> Key: BEAM-14297
> URL: https://issues.apache.org/jira/browse/BEAM-14297
> Project: Beam
> Issue Type: Bug
> Components: io-py-kafka
> Reporter: John Casey
> Assignee: John Casey
> Priority: P2
> Fix For: 2.39.0
>
> Time Spent: 4h
> Remaining Estimate: 0h
>
> Enable nullable keys and values for byte array kafka records when using kafka
> xlang with metadata.
--
This message was sent by Atlassian Jira
(v8.20.7#820007)