Hi sunfulin, Looks like the error is happened in sink instead of source.
Caused by: java.lang.NullPointerException: Null result cannot be used for atomic types. at DataStreamSinkConversion$5.map(Unknown Source) So the point is how did you write to sink. Can you share these codes? Best, Jingsong Lee On Fri, Jan 10, 2020 at 2:58 PM godfrey he <godfre...@gmail.com> wrote: > hi sunfulin, > > which flink version are you using ? > > best, > godfrey > > sunfulin <sunfulin0...@163.com> 于2020年1月10日周五 下午1:50写道: > >> Hi, I am running a Flink app while reading Kafka records with JSON >> format. And the connect code is like the following: >> >> >> tableEnv.connect( >> >> new Kafka() >> >> .version(kafkaInstance.getVersion()) >> >> .topic(chooseKafkaTopic(initPack.clusterMode)) >> >> .property("bootstrap.servers", >> kafkaInstance.getBrokerList()) >> >> .property("group.id", initPack.jobName) >> >> .startFromEarliest() >> >> ).withSchema( >> >> new Schema() >> >> // EVENT_TIME >> >> .field("rowtime", Types.SQL_TIMESTAMP).rowtime( >> >> new Rowtime() >> >> .timestampsFromField("time") >> >> .watermarksPeriodicBounded(1000) >> >> ) >> >> .field("type", Types.STRING) >> >> .field("event", Types.STRING) >> >> .field("user_id", Types.STRING) >> >> .field("distinct_id", Types.STRING) >> >> .field("project", Types.STRING) >> >> .field("recv_time", Types.SQL_TIMESTAMP) >> >> .field("properties", Types.ROW_NAMED( >> >> new String[] { "BROWSER_VERSION", "pathname", >> "search", "eventType", "message", "stack", "componentStack" }, >> >> Types.STRING, Types.STRING, Types.STRING, >> Types.STRING, Types.STRING, Types.STRING, Types.STRING) >> >> ) >> >> ).withFormat( >> >> new Json().failOnMissingField(false) >> >> .deriveSchema() >> >> ) >> >> .inAppendMode() >> >> .registerTableSource(getTableName()); >> >> >> >> However, the application throws the following Exception which really >> confused me. From the code above, the field types are only *Types.STRING* >> or *Types.SQL_TIMESTAMP. * >> >> *Not sure which data field can run to this. Wanner some help from >> community.* >> >> >> Caused by: java.lang.NullPointerException: Null result cannot be used for >> atomic types. >> >> at DataStreamSinkConversion$5.map(Unknown Source) >> >> at >> org.apache.flink.table.runtime.CRowMapRunner.map(CRowMapRunner.scala:55) >> >> at >> org.apache.flink.table.runtime.CRowMapRunner.map(CRowMapRunner.scala:34) >> >> at >> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41) >> >> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579) >> >> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554) >> >> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534) >> >> at >> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718) >> >> at >> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696) >> >> at >> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) >> >> at >> org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:37) >> >> at >> org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:28) >> >> at DataStreamSourceConversion$2.processElement(Unknown Source) >> >> at >> org.apache.flink.table.runtime.CRowOutputProcessRunner.processElement(CRowOutputProcessRunner.scala:70) >> >> at >> org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66) >> >> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579) >> >> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554) >> >> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534) >> >> at >> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718) >> >> at org.apache.flink.streaming. >> >> >> >> >> > -- Best, Jingsong Lee