[ 
https://issues.apache.org/jira/browse/BEAM-14297?focusedWorklogId=774207&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-774207
 ]

ASF GitHub Bot logged work on BEAM-14297:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 24/May/22 20:05
            Start Date: 24/May/22 20:05
    Worklog Time Spent: 10m 
      Work Description: johnjcasey commented on code in PR #17742:
URL: https://github.com/apache/beam/pull/17742#discussion_r880905357


##########
sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java:
##########
@@ -203,6 +210,47 @@ public void testKafkaIOReadsAndWritesCorrectlyInBatch() 
throws IOException {
     }
   }
 
+  // This test roundtrips a single KV<Null,Null> to verify that 
externalWithMetadata
+  // can handle null keys and values correctly.
+  @Test
+  public void testKafkaIOExternalRoundtripWithMetadataAndNullKeysAndValues() {
+
+    List<byte[]> nullList = new ArrayList<>();
+    nullList.add(null);
+    writePipeline
+        .apply(Create.of(nullList))
+        .apply(
+            ParDo.of(
+                new DoFn<byte[], KV<byte[], byte[]>>() {
+                  @ProcessElement
+                  public void processElement(
+                      @Element byte[] element, OutputReceiver<KV<byte[], 
byte[]>> receiver) {
+                    receiver.output(KV.of(element, element));
+                  }
+                }))
+        .apply(
+            KafkaIO.<byte[], byte[]>write()
+                
.withBootstrapServers(options.getKafkaBootstrapServerAddresses())
+                .withTopic(options.getKafkaTopic())
+                .withKeySerializer(ByteArraySerializer.class)
+                .withValueSerializer(ByteArraySerializer.class));
+
+    PipelineResult writeResult = writePipeline.run();
+    writeResult.waitUntilFinish();
+
+    readPipeline.apply(
+        KafkaIO.<byte[], byte[]>read()
+            .withBootstrapServers(options.getKafkaBootstrapServerAddresses())
+            .withTopic(options.getKafkaTopic())
+            .withKeyDeserializerAndCoder(
+                ByteArrayDeserializer.class, 
NullableCoder.of(ByteArrayCoder.of()))
+            .withValueDeserializerAndCoder(
+                ByteArrayDeserializer.class, 
NullableCoder.of(ByteArrayCoder.of()))
+            .externalWithMetadata());
+    PipelineResult readResult = readPipeline.run();
+    readResult.waitUntilFinish();

Review Comment:
   how should I go about adding it to the internal suite?





Issue Time Tracking
-------------------

    Worklog Id:     (was: 774207)
    Time Spent: 3h 50m  (was: 3h 40m)

> Enable null key and value for metadata kafka xlang
> --------------------------------------------------
>
>                 Key: BEAM-14297
>                 URL: https://issues.apache.org/jira/browse/BEAM-14297
>             Project: Beam
>          Issue Type: Bug
>          Components: io-py-kafka
>            Reporter: John Casey
>            Assignee: John Casey
>            Priority: P2
>             Fix For: 2.39.0
>
>          Time Spent: 3h 50m
>  Remaining Estimate: 0h
>
> Enable nullable keys and values for byte array kafka records when using kafka 
> xlang with metadata.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to