答复: How to sink invalid data from flatmap

2020-08-24 Thread 范超
Thanks a lot Jake for the quick response 发件人: Jake [mailto:ft20...@qq.com] 发送时间: 2020年8月25日 星期二 11:31 收件人: 范超 抄送: user 主题: Re: How to sink invalid data from flatmap Hi fanchao Yes. I suggest that. Jake On Aug 25, 2020, at 11:20 AM, 范超 mailto:fanc...@mgtv.com>> wrote: Thanks Jake. B

Re: How to sink invalid data from flatmap

2020-08-24 Thread Jake
s it means I’ve to use process to rewrite > my flatmap function? > > 发件人: Jake [mailto:ft20...@qq.com] > 发送时间: 2020年8月25日 星期二 11:06 > 收件人: 范超 > 抄送: user > 主题: Re: How to sink invalid data from flatmap > > Hi fanchao > > use side output, see[1] > &

答复: How to sink invalid data from flatmap

2020-08-24 Thread 范超
: 2020年8月25日 星期二 11:06 收件人: 范超 抄送: user 主题: Re: How to sink invalid data from flatmap Hi fanchao use side output, see[1] [1]https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/side_output.html Jake On Aug 25, 2020, at 10:54 AM, 范超 mailto:fanc...@mgtv.com>> wrote: H

答复: How to sink invalid data from flatmap

2020-08-24 Thread 范超
Thanks , Using the ctx.output() inside the process method solved my problem, but my custom flatmap function has to be retired? 发件人: Yun Tang [mailto:myas...@live.com] 发送时间: 2020年8月25日 星期二 10:58 收件人: 范超 ; user 主题: Re: How to sink invalid data from flatmap Hi Chao I think side output [1] might

Re: How to sink invalid data from flatmap

2020-08-24 Thread Jake
Hi fanchao use side output, see[1] [1]https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/side_output.html Jake > On Aug 25, 2020, at 10:54 AM, 范超 wrote: > > Hi, > I’m using the c

Re: How to sink invalid data from flatmap

2020-08-24 Thread Yun Tang
Hi Chao I think side output [1] might meet your requirements. [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/side_output.html Best Yun Tang From: 范超 Sent: Tuesday, August 25, 2020 10:54 To: user Subject: How to sink invalid data from

How to sink invalid data from flatmap

2020-08-24 Thread 范超
Hi, I’m using the custom flatmap function to validate the kafka json string message, if the kafka message is valid to transform to a pojo (using GSON), then go on with the next sink step. If it can not be parsed as a POJO, the GSON will throw the “com.google.gson.JsonSyntaxException”, and in my