hi sunfulin,

which flink version are you using ?

best,
godfrey

sunfulin <sunfulin0...@163.com> 于2020年1月10日周五 下午1:50写道:

> Hi, I am running a Flink app while reading Kafka records with JSON format.
> And the connect code is like the following:
>
>
> tableEnv.connect(
>
>     new Kafka()
>
>                 .version(kafkaInstance.getVersion())
>
>                 .topic(chooseKafkaTopic(initPack.clusterMode))
>
>                 .property("bootstrap.servers",
> kafkaInstance.getBrokerList())
>
>                 .property("group.id", initPack.jobName)
>
>                 .startFromEarliest()
>
> ).withSchema(
>
>     new Schema()
>
>             // EVENT_TIME
>
>             .field("rowtime", Types.SQL_TIMESTAMP).rowtime(
>
>                 new Rowtime()
>
>                     .timestampsFromField("time")
>
>                     .watermarksPeriodicBounded(1000)
>
>             )
>
>             .field("type", Types.STRING)
>
>             .field("event", Types.STRING)
>
>             .field("user_id", Types.STRING)
>
>             .field("distinct_id", Types.STRING)
>
>             .field("project", Types.STRING)
>
>             .field("recv_time", Types.SQL_TIMESTAMP)
>
>             .field("properties", Types.ROW_NAMED(
>
>                     new String[] { "BROWSER_VERSION", "pathname",
> "search", "eventType", "message", "stack", "componentStack" },
>
>                     Types.STRING, Types.STRING, Types.STRING,
> Types.STRING, Types.STRING, Types.STRING, Types.STRING)
>
>             )
>
> ).withFormat(
>
>     new Json().failOnMissingField(false)
>
>             .deriveSchema()
>
> )
>
> .inAppendMode()
>
> .registerTableSource(getTableName());
>
>
>
> However, the application throws the following Exception which really
> confused me. From the code above, the field types are only *Types.STRING*
> or *Types.SQL_TIMESTAMP. *
>
> *Not sure which data field can run to this. Wanner some help from
> community.*
>
>
> Caused by: java.lang.NullPointerException: Null result cannot be used for
> atomic types.
>
>  at DataStreamSinkConversion$5.map(Unknown Source)
>
>  at
> org.apache.flink.table.runtime.CRowMapRunner.map(CRowMapRunner.scala:55)
>
>  at
> org.apache.flink.table.runtime.CRowMapRunner.map(CRowMapRunner.scala:34)
>
>  at
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
>
>  at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
>
>  at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
>
>  at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
>
>  at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
>
>  at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
>
>  at
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
>
>  at
> org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:37)
>
>  at
> org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:28)
>
>  at DataStreamSourceConversion$2.processElement(Unknown Source)
>
>  at
> org.apache.flink.table.runtime.CRowOutputProcessRunner.processElement(CRowOutputProcessRunner.scala:70)
>
>  at
> org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
>
>  at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
>
>  at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
>
>  at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
>
>  at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
>
>  at org.apache.flink.streaming.
>
>
>
>
>

Reply via email to