hi sunfulin, which flink version are you using ?
best, godfrey sunfulin <sunfulin0...@163.com> 于2020年1月10日周五 下午1:50写道: > Hi, I am running a Flink app while reading Kafka records with JSON format. > And the connect code is like the following: > > > tableEnv.connect( > > new Kafka() > > .version(kafkaInstance.getVersion()) > > .topic(chooseKafkaTopic(initPack.clusterMode)) > > .property("bootstrap.servers", > kafkaInstance.getBrokerList()) > > .property("group.id", initPack.jobName) > > .startFromEarliest() > > ).withSchema( > > new Schema() > > // EVENT_TIME > > .field("rowtime", Types.SQL_TIMESTAMP).rowtime( > > new Rowtime() > > .timestampsFromField("time") > > .watermarksPeriodicBounded(1000) > > ) > > .field("type", Types.STRING) > > .field("event", Types.STRING) > > .field("user_id", Types.STRING) > > .field("distinct_id", Types.STRING) > > .field("project", Types.STRING) > > .field("recv_time", Types.SQL_TIMESTAMP) > > .field("properties", Types.ROW_NAMED( > > new String[] { "BROWSER_VERSION", "pathname", > "search", "eventType", "message", "stack", "componentStack" }, > > Types.STRING, Types.STRING, Types.STRING, > Types.STRING, Types.STRING, Types.STRING, Types.STRING) > > ) > > ).withFormat( > > new Json().failOnMissingField(false) > > .deriveSchema() > > ) > > .inAppendMode() > > .registerTableSource(getTableName()); > > > > However, the application throws the following Exception which really > confused me. From the code above, the field types are only *Types.STRING* > or *Types.SQL_TIMESTAMP. * > > *Not sure which data field can run to this. Wanner some help from > community.* > > > Caused by: java.lang.NullPointerException: Null result cannot be used for > atomic types. > > at DataStreamSinkConversion$5.map(Unknown Source) > > at > org.apache.flink.table.runtime.CRowMapRunner.map(CRowMapRunner.scala:55) > > at > org.apache.flink.table.runtime.CRowMapRunner.map(CRowMapRunner.scala:34) > > at > org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41) > > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579) > > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554) > > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534) > > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718) > > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696) > > at > org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) > > at > org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:37) > > at > org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:28) > > at DataStreamSourceConversion$2.processElement(Unknown Source) > > at > org.apache.flink.table.runtime.CRowOutputProcessRunner.processElement(CRowOutputProcessRunner.scala:70) > > at > org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66) > > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579) > > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554) > > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534) > > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718) > > at org.apache.flink.streaming. > > > > >