Hi, I am running a Flink app while reading Kafka records with JSON format. And
the connect code is like the following:
tableEnv.connect(
new Kafka()
.version(kafkaInstance.getVersion())
.topic(chooseKafkaTopic(initPack.clusterMode))
.property("bootstrap.servers", kafkaInstance.getBrokerList())
.property("group.id", initPack.jobName)
.startFromEarliest()
).withSchema(
new Schema()
// EVENT_TIME
.field("rowtime", Types.SQL_TIMESTAMP).rowtime(
new Rowtime()
.timestampsFromField("time")
.watermarksPeriodicBounded(1000)
)
.field("type", Types.STRING)
.field("event", Types.STRING)
.field("user_id", Types.STRING)
.field("distinct_id", Types.STRING)
.field("project", Types.STRING)
.field("recv_time", Types.SQL_TIMESTAMP)
.field("properties", Types.ROW_NAMED(
new String[] { "BROWSER_VERSION", "pathname", "search",
"eventType", "message", "stack", "componentStack" },
Types.STRING, Types.STRING, Types.STRING, Types.STRING,
Types.STRING, Types.STRING, Types.STRING)
)
).withFormat(
new Json().failOnMissingField(false)
.deriveSchema()
)
.inAppendMode()
.registerTableSource(getTableName());
However, the application throws the following Exception which really confused
me. From the code above, the field types are only Types.STRING or
Types.SQL_TIMESTAMP.
Not sure which data field can run to this. Wanner some help from community.
Caused by: java.lang.NullPointerException: Null result cannot be used for
atomic types.
at DataStreamSinkConversion$5.map(Unknown Source)
at org.apache.flink.table.runtime.CRowMapRunner.map(CRowMapRunner.scala:55)
at org.apache.flink.table.runtime.CRowMapRunner.map(CRowMapRunner.scala:34)
at
org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
at
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
at
org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:37)
at
org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:28)
at DataStreamSourceConversion$2.processElement(Unknown Source)
at
org.apache.flink.table.runtime.CRowOutputProcessRunner.processElement(CRowOutputProcessRunner.scala:70)
at
org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
at org.apache.flink.streaming.