Thanks Jake. But can I just want to  implement the ouput-tag function in my 
flatmap function not in the process function. I check the parameters for the 
flatmap ,there is no ‘context’, so is it means I’ve to use process to rewrite 
my flatmap function?

发件人: Jake [mailto:ft20...@qq.com]
发送时间: 2020年8月25日 星期二 11:06
收件人: 范超 <fanc...@mgtv.com>
抄送: user <user@flink.apache.org>
主题: Re: How to sink invalid data from flatmap

Hi fanchao

use side output, see[1]

[1]https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/side_output.html

Jake


On Aug 25, 2020, at 10:54 AM, 范超 <fanc...@mgtv.com<mailto:fanc...@mgtv.com>> 
wrote:

Hi,
I’m using the custom flatmap function to validate the kafka json string 
message, if the kafka message is valid to transform to a pojo  (using GSON), 
then go on with the next sink step.
If it can not be parsed as a POJO, the GSON will throw the 
“com.google.gson.JsonSyntaxException”, and in my custom flatmap function , I 
just catch this exception, and then go on, but this invalidated json message is 
just omitted.

But now, I want to save the invalidated json message to sink the original kafka 
json string to another table, but don’t know how to implement in my custom 
flatmap function, because the richmapfucntion has limited the collect type.
Could someone give me some advice please?
Thanks in advance!
Chao Fan

Reply via email to