Re: [DISCUSS] FLIP-124: Add open/close and Collector to (De)SerializationSchema

2020-04-26 Thread Stephan Ewen
That makes sense, thanks for clarifying. Best, Stephan On Fri, Apr 24, 2020 at 2:15 PM Dawid Wysakowicz wrote: > Hi Stephan, > > I fully agree with what you said. Also as far as I can tell what was > suggested in the FLIP-124 does not contradict with what you are saying. Let > me clarify it a

Re: [DISCUSS] FLIP-124: Add open/close and Collector to (De)SerializationSchema

2020-04-24 Thread Dawid Wysakowicz
Hi Stephan, I fully agree with what you said. Also as far as I can tell what was suggested in the FLIP-124 does not contradict with what you are saying. Let me clarify it a bit if it is not clear in the document. Current implementations of Kafka and Kinesis do the deserialization outside of the c

Re: [DISCUSS] FLIP-124: Add open/close and Collector to (De)SerializationSchema

2020-04-23 Thread Stephan Ewen
Hi! Sorry for being a bit late to the party. One very important thing to consider for "serialization under checkpoint lock or not" is: - If you do it under checkpoint lock, things are automatically correct. Checkpoint barriers go between original records that correspond to offsets in the source

Re: [DISCUSS] FLIP-124: Add open/close and Collector to (De)SerializationSchema

2020-04-14 Thread Dawid Wysakowicz
Hi Xiaogang, I very much agree with Jark's and Aljoscha's responses. On 10/04/2020 17:35, Jark Wu wrote: > Hi Xiaogang, > > I think this proposal doesn't conflict with your use case, you can still > chain a ProcessFunction after a source which emits raw data. > But I'm not in favor of chaining P

Re: [DISCUSS] FLIP-124: Add open/close and Collector to (De)SerializationSchema

2020-04-14 Thread Aljoscha Krettek
On 10.04.20 17:35, Jark Wu wrote: 1) For correctness, it is necessary to perform the watermark generation as early as possible in order to be close to the actual data generation within a source's data partition. This is also the purpose of per-partition watermark and event-time alignment. Man

Re: [DISCUSS] FLIP-124: Add open/close and Collector to (De)SerializationSchema

2020-04-10 Thread Jark Wu
Hi Xiaogang, I think this proposal doesn't conflict with your use case, you can still chain a ProcessFunction after a source which emits raw data. But I'm not in favor of chaining ProcessFunction after source, and we should avoid that, because: 1) For correctness, it is necessary to perform the w

Re: [DISCUSS] FLIP-124: Add open/close and Collector to (De)SerializationSchema

2020-04-09 Thread SHI Xiaogang
Hi, I don't think the proposal is a good solution to the problems. I am in favour of using a ProcessFunction chained to the source/sink function to serialize/deserialize the records, instead of embedding (de)serialization schema in source/sink function. Message packing is heavily used in our prod

Re: [DISCUSS] FLIP-124: Add open/close and Collector to (De)SerializationSchema

2020-04-07 Thread Aljoscha Krettek
On 07.04.20 08:45, Dawid Wysakowicz wrote: @Jark I was aware of the implementation of SinkFunction, but it was a conscious choice to not do it that way. Personally I am against giving a default implementation to both the new and old methods. This results in an interface that by default does not

Re: [DISCUSS] FLIP-124: Add open/close and Collector to (De)SerializationSchema

2020-04-07 Thread wenlong.lwl
Hi Dawid, thanks for driving this. big +1 for this flip. Currently we have to implement an internal flat map in connectors, which is very inconvenient to support customized parser in DDL. Here is my minor suggestions: 1. Can we support serialize/deserialize from/to a message with a specified type

Re: [DISCUSS] FLIP-124: Add open/close and Collector to (De)SerializationSchema

2020-04-06 Thread Jark Wu
Thanks for the explanation. Sounds good to me. Best, Jark On Tue, 7 Apr 2020 at 14:45, Dawid Wysakowicz wrote: > Hi all, > > @Timo I'm fine with OpenContext. > > @Timo @Seth Sure we can combine all the parameters in a single object. > Will update the FLIP > > @Jark I was aware of the implementa

Re: [DISCUSS] FLIP-124: Add open/close and Collector to (De)SerializationSchema

2020-04-06 Thread Dawid Wysakowicz
Hi all, @Timo I'm fine with OpenContext. @Timo @Seth Sure we can combine all the parameters in a single object. Will update the FLIP @Jark I was aware of the implementation of SinkFunction, but it was a conscious choice to not do it that way. Personally I am against giving a default implementat

Re: [DISCUSS] FLIP-124: Add open/close and Collector to (De)SerializationSchema

2020-04-06 Thread Jark Wu
Hi Dawid, Thanks for driving this. This is a blocker to support Debezium CDC format (FLIP-105). So big +1 from my side. Regarding to emitting multiple records and checkpointing, I'm also in favor of option#1: buffer all the records outside of the checkpoint lock. I think most of the use cases wil

Re: [DISCUSS] FLIP-124: Add open/close and Collector to (De)SerializationSchema

2020-04-06 Thread Seth Wiesman
I would be in favor of buffering data outside of the checkpoint lock. In my experience, serialization is always the biggest performance killer in user code and I have a hard time believing in practice that anyone is going to buffer so many records that is causes real memory concerns. To add to Tim

Re: [DISCUSS] FLIP-124: Add open/close and Collector to (De)SerializationSchema

2020-04-06 Thread Timo Walther
Hi Dawid, thanks for this FLIP. This solves a lot of issues with the current design for both the Flink contributors and users. +1 for this. Some minor suggestions from my side: - How about finding something shorter for `InitializationContext`? Maybe just `OpenContext`? - While introducing def

[DISCUSS] FLIP-124: Add open/close and Collector to (De)SerializationSchema

2020-04-06 Thread Dawid Wysakowicz
Hi devs, When working on improving the Table API/SQL connectors we faced a few shortcomings of the DeserializationSchema and SerializationSchema interfaces. Similar features were also mentioned by other users in the past. The shortcomings I would like to address with the FLIP include: * Emittin