Hi fanchao Yes. I suggest that.
Jake > On Aug 25, 2020, at 11:20 AM, 范超 <fanc...@mgtv.com> wrote: > > Thanks Jake. But can I just want to implement the ouput-tag function in my > flatmap function not in the process function. I check the parameters for the > flatmap ,there is no ‘context’, so is it means I’ve to use process to rewrite > my flatmap function? > > 发件人: Jake [mailto:ft20...@qq.com] > 发送时间: 2020年8月25日 星期二 11:06 > 收件人: 范超 <fanc...@mgtv.com> > 抄送: user <user@flink.apache.org> > 主题: Re: How to sink invalid data from flatmap > > Hi fanchao > > use side output, see[1] > > [1]https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/side_output.html > > <https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/side_output.html> > > Jake > > > On Aug 25, 2020, at 10:54 AM, 范超 <fanc...@mgtv.com <mailto:fanc...@mgtv.com>> > wrote: > > Hi, > I’m using the custom flatmap function to validate the kafka json string > message, if the kafka message is valid to transform to a pojo (using GSON), > then go on with the next sink step. > If it can not be parsed as a POJO, the GSON will throw the > “com.google.gson.JsonSyntaxException”, and in my custom flatmap function , I > just catch this exception, and then go on, but this invalidated json message > is just omitted. > > But now, I want to save the invalidated json message to sink the original > kafka json string to another table, but don’t know how to implement in my > custom flatmap function, because the richmapfucntion has limited the collect > type. > Could someone give me some advice please? > Thanks in advance! > Chao Fan