[ https://issues.apache.org/jira/browse/KAFKA-3801?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15323718#comment-15323718 ]
Jeff Klukas commented on KAFKA-3801: ------------------------------------ Let me give more context for the example. We have an application that produces JSON messages to a Kafka topic interleaved with occasional checkpoint messages that are of {{Long}} type. If I want to create a KStream of just the checkpoint messages, I need to filter out the JSON messages before deserializing. Here's what it looks like: {{KStream<Long, Long> checkpointStream = builder.stream(Serdes.Long(), Serdes.ByteArray(), inputTopicName)}} {{.filter((key, bytes) -> bytes.length == 8).mapValues(LongDeserializer::deserialize)}} I need to use ByteArraySerde when calling {{stream}}, then I do the deserialization in a {{mapValues}} invocation after filtering out messages of the wrong type. Another option would be to materialize the stream to a topic after the filter and then call {{builder.stream(Serdes.Long(), Serdes.Long(), newTopicName)}}, but I'd like to avoid unnecessary materialization. So in the current scheme, I need to create an instance of {{LongDeserializer}} separately so that I can then call its {{deserialize}} method in {{mapValues}}. This situation probably won't occur frequently, so I understand if it's decided not to bother considering this change. > Provide static serialize() and deserialize() for use as method references > ------------------------------------------------------------------------- > > Key: KAFKA-3801 > URL: https://issues.apache.org/jira/browse/KAFKA-3801 > Project: Kafka > Issue Type: Improvement > Components: clients, streams > Reporter: Jeff Klukas > Assignee: Guozhang Wang > Priority: Minor > Fix For: 0.10.1.0 > > > While most calls to {{Serializer.serialize}} and {{Deserializer.deserialize}} > are abstracted away in Kafka Streams through the use of `Serdes` classes, > there are some instances where developers may want to call them directly. The > serializers and deserializers for simple types don't require any > configuration and could be static, but currently it's necessary to create an > instance to use those methods. > I'd propose moving serialization logic into a {{static public byte[] > serialize(? data)}} method and deserialization logic into a {{static public ? > deserialize(byte[] data)}} method. The existing instance methods would simply > call the static versions. > See a full example for LongSerializer and LongDeserializer here: > https://github.com/apache/kafka/compare/trunk...jklukas:static-serde-methods?expand=1 > In Java 8, these static methods then become available for method references > in code like {{kstream.mapValues(LongDeserializer::deserialize)}} without the > user needing to create an instance of {{LongDeserializer}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)