[ https://issues.apache.org/jira/browse/FLINK-9627?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Dominik Wosiński updated FLINK-9627: ------------------------------------ Description: According to the comments what is needed to extend the 'KafkaJsonTableSource' looks as follows: {code:java} A version-agnostic Kafka JSON {@link StreamTableSource}. * * <p>The version-specific Kafka consumers need to extend this class and * override {@link #createKafkaConsumer(String, Properties, DeserializationSchema)}}. * * <p>The field names are used to parse the JSON file and so are the types.{code} This will cause an NPE, since there is no default value for startupMode in the abstract class itself only in the builder of this class. For the 'getKafkaConsumer' method the switch statement will be executed on non-initialized 'startupMode' field: {code:java} switch (startupMode) { case EARLIEST: kafkaConsumer.setStartFromEarliest(); break; case LATEST: kafkaConsumer.setStartFromLatest(); break; case GROUP_OFFSETS: kafkaConsumer.setStartFromGroupOffsets(); break; case SPECIFIC_OFFSETS: kafkaConsumer.setStartFromSpecificOffsets(specificStartupOffsets); break; }{code} > Extending 'KafkaJsonTableSource' according to comments will result in NPE > ------------------------------------------------------------------------- > > Key: FLINK-9627 > URL: https://issues.apache.org/jira/browse/FLINK-9627 > Project: Flink > Issue Type: Bug > Reporter: Dominik Wosiński > Priority: Major > > According to the comments what is needed to extend the 'KafkaJsonTableSource' > looks as follows: > > {code:java} > A version-agnostic Kafka JSON {@link StreamTableSource}. > * > * <p>The version-specific Kafka consumers need to extend this class and > * override {@link #createKafkaConsumer(String, Properties, > DeserializationSchema)}}. > * > * <p>The field names are used to parse the JSON file and so are the > types.{code} > This will cause an NPE, since there is no default value for startupMode in > the abstract class itself only in the builder of this class. > For the 'getKafkaConsumer' method the switch statement will be executed on > non-initialized 'startupMode' field: > {code:java} > switch (startupMode) { > case EARLIEST: > kafkaConsumer.setStartFromEarliest(); > break; > case LATEST: > kafkaConsumer.setStartFromLatest(); > break; > case GROUP_OFFSETS: > kafkaConsumer.setStartFromGroupOffsets(); > break; > case SPECIFIC_OFFSETS: > kafkaConsumer.setStartFromSpecificOffsets(specificStartupOffsets); > break; > }{code} > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)