Hi sunfulin,

Looks like the error is happened in sink instead of source.

Caused by: java.lang.NullPointerException: Null result cannot be used for
atomic types.

 at DataStreamSinkConversion$5.map(Unknown Source)


So the point is how did you write to sink. Can you share these codes?


Best,

Jingsong Lee

On Fri, Jan 10, 2020 at 2:58 PM godfrey he <godfre...@gmail.com> wrote:

> hi sunfulin,
>
> which flink version are you using ?
>
> best,
> godfrey
>
> sunfulin <sunfulin0...@163.com> 于2020年1月10日周五 下午1:50写道:
>
>> Hi, I am running a Flink app while reading Kafka records with JSON
>> format. And the connect code is like the following:
>>
>>
>> tableEnv.connect(
>>
>>     new Kafka()
>>
>>                 .version(kafkaInstance.getVersion())
>>
>>                 .topic(chooseKafkaTopic(initPack.clusterMode))
>>
>>                 .property("bootstrap.servers",
>> kafkaInstance.getBrokerList())
>>
>>                 .property("group.id", initPack.jobName)
>>
>>                 .startFromEarliest()
>>
>> ).withSchema(
>>
>>     new Schema()
>>
>>             // EVENT_TIME
>>
>>             .field("rowtime", Types.SQL_TIMESTAMP).rowtime(
>>
>>                 new Rowtime()
>>
>>                     .timestampsFromField("time")
>>
>>                     .watermarksPeriodicBounded(1000)
>>
>>             )
>>
>>             .field("type", Types.STRING)
>>
>>             .field("event", Types.STRING)
>>
>>             .field("user_id", Types.STRING)
>>
>>             .field("distinct_id", Types.STRING)
>>
>>             .field("project", Types.STRING)
>>
>>             .field("recv_time", Types.SQL_TIMESTAMP)
>>
>>             .field("properties", Types.ROW_NAMED(
>>
>>                     new String[] { "BROWSER_VERSION", "pathname",
>> "search", "eventType", "message", "stack", "componentStack" },
>>
>>                     Types.STRING, Types.STRING, Types.STRING,
>> Types.STRING, Types.STRING, Types.STRING, Types.STRING)
>>
>>             )
>>
>> ).withFormat(
>>
>>     new Json().failOnMissingField(false)
>>
>>             .deriveSchema()
>>
>> )
>>
>> .inAppendMode()
>>
>> .registerTableSource(getTableName());
>>
>>
>>
>> However, the application throws the following Exception which really
>> confused me. From the code above, the field types are only *Types.STRING*
>> or *Types.SQL_TIMESTAMP. *
>>
>> *Not sure which data field can run to this. Wanner some help from
>> community.*
>>
>>
>> Caused by: java.lang.NullPointerException: Null result cannot be used for
>> atomic types.
>>
>>  at DataStreamSinkConversion$5.map(Unknown Source)
>>
>>  at
>> org.apache.flink.table.runtime.CRowMapRunner.map(CRowMapRunner.scala:55)
>>
>>  at
>> org.apache.flink.table.runtime.CRowMapRunner.map(CRowMapRunner.scala:34)
>>
>>  at
>> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
>>
>>  at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
>>
>>  at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
>>
>>  at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
>>
>>  at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
>>
>>  at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
>>
>>  at
>> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
>>
>>  at
>> org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:37)
>>
>>  at
>> org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:28)
>>
>>  at DataStreamSourceConversion$2.processElement(Unknown Source)
>>
>>  at
>> org.apache.flink.table.runtime.CRowOutputProcessRunner.processElement(CRowOutputProcessRunner.scala:70)
>>
>>  at
>> org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
>>
>>  at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
>>
>>  at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
>>
>>  at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
>>
>>  at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
>>
>>  at org.apache.flink.streaming.
>>
>>
>>
>>
>>
>

-- 
Best, Jingsong Lee

Reply via email to