[ https://issues.apache.org/jira/browse/FLINK-21160?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Qingsheng Ren updated FLINK-21160: ---------------------------------- Affects Version/s: (was: 1.13.0) > ValueDeserializerWrapper throws NullPointerException when getProducedType is > invoked > ------------------------------------------------------------------------------------ > > Key: FLINK-21160 > URL: https://issues.apache.org/jira/browse/FLINK-21160 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka > Affects Versions: 1.12.3 > Reporter: Qingsheng Ren > Priority: Major > Labels: pull-request-available, stale-major > Fix For: 1.13.0 > > > The variable {{deserializer}} in class {{ValueDeserializerWrapper}} won't be > instantiated until method {{deserialize()}} is invoked in runtime, so in the > job compiling stage when invoking {{getProducedType()}}, NPE will be thrown > because of referencing the uninstantiated variable {{deserializer}}. > A user reported that the new {{KafkaSource}} fails with a > {{NullPointerException}}: > {code} > Exception in thread "main" java.lang.NullPointerException > at > org.apache.flink.connector.kafka.source.reader.deserializer.ValueDeserializerWrapper.getProducedType(ValueDeserializerWrapper.java:79) > at > org.apache.flink.connector.kafka.source.KafkaSource.getProducedType(KafkaSource.java:171) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getTypeInfo(StreamExecutionEnvironment.java:2282) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromSource(StreamExecutionEnvironment.java:1744) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromSource(StreamExecutionEnvironment.java:1715) > {code} > when setting it up like this: > {code} > val kafkaSource = buildKafkaSource(params) > val datastream = env.fromSource(kafkaSource, > WatermarkStrategy.noWatermarks(), "kafka") > private fun buildKafkaSource(params: ParameterTool): KafkaSource<String> { > val builder = KafkaSource.builder<String>() > .setBootstrapServers(params.get("bootstrapServers")) > .setGroupId(params.get("groupId")) > .setStartingOffsets(OffsetsInitializer.earliest()) > .setTopics("topic") > > .setDeserializer(KafkaRecordDeserializer.valueOnly(StringDeserializer::class.java)) > if (params.getBoolean("boundedSource", false)) { > builder.setBounded(OffsetsInitializer.latest()) > } > return builder.build() > } > {code} > The problem seems to be that the {{ValueDeserializerWrapper}} does not set > the deserializer the deserialize method is called, but {{getProducedType}} is > actually called first resulting in the {{NullPointerException}}. > https://lists.apache.org/x/thread.html/r8734f9a18c25fd5996fc2edf9889277c185ee9a0b79280938b1cb792@%3Cuser.flink.apache.org%3E -- This message was sent by Atlassian Jira (v8.3.4#803005)