Hi Dawid, thanks for driving this. big +1 for this flip. Currently we have to implement an internal flat map in connectors, which is very inconvenient to support customized parser in DDL.
Here is my minor suggestions: 1. Can we support serialize/deserialize from/to a message with a specified type instead of byte[], or we would need a lot of other parser interfaces similar to KakfaDeserializationSchema/KakfaSerializationSchema when we would like to support other kinds of sources, such as RocketMQ, RabbitMQ, etc. 2. How can we support serialize multi output records to a compacted messaged? may be we need a method such as `void flush(Collector output)?` to support flush buffered record when checkpointing? On Tue, 7 Apr 2020 at 14:57, Jark Wu <imj...@gmail.com> wrote: > Thanks for the explanation. Sounds good to me. > > Best, > Jark > > On Tue, 7 Apr 2020 at 14:45, Dawid Wysakowicz <dwysakow...@apache.org> > wrote: > > > Hi all, > > > > @Timo I'm fine with OpenContext. > > > > @Timo @Seth Sure we can combine all the parameters in a single object. > > Will update the FLIP > > > > @Jark I was aware of the implementation of SinkFunction, but it was a > > conscious choice to not do it that way. > > > > Personally I am against giving a default implementation to both the new > > and old methods. This results in an interface that by default does > > nothing or notifies the user only in the runtime, that he/she has not > > implemented a method of the interface, which does not sound like a good > > practice to me. Moreover I believe the method without a Collector will > > still be the preferred method by many users. Plus it communicates > > explicitly what is the minimal functionality required by the interface. > > Nevertheless I am happy to hear other opinions. > > > > @all I also prefer the buffering approach. Let's wait a day or two more > > to see if others think differently. > > > > Best, > > > > Dawid > > > > On 07/04/2020 06:11, Jark Wu wrote: > > > Hi Dawid, > > > > > > Thanks for driving this. This is a blocker to support Debezium CDC > format > > > (FLIP-105). So big +1 from my side. > > > > > > Regarding to emitting multiple records and checkpointing, I'm also in > > favor > > > of option#1: buffer all the records outside of the checkpoint lock. > > > I think most of the use cases will not buffer larger data than > > > it's deserialized byte[]. > > > > > > I have a minor suggestion on DeserializationSchema: could we have a > > default > > > implementation (maybe throw exception) for `T deserialize(byte[] > > message)`? > > > I think this will not break compatibility, and users don't have to > > > implement this deprecated interface if he/she wants to use the new > > > collector interface. > > > I think SinkFunction also did this in the same way: introduce a new > > invoke > > > method with Context parameter, and give the old invoke method an > > > empty implemention. > > > > > > Best, > > > Jark > > > > > > On Mon, 6 Apr 2020 at 23:51, Seth Wiesman <sjwies...@gmail.com> wrote: > > > > > >> I would be in favor of buffering data outside of the checkpoint lock. > > In my > > >> experience, serialization is always the biggest performance killer in > > user > > >> code and I have a hard time believing in practice that anyone is going > > to > > >> buffer so many records that is causes real memory concerns. > > >> > > >> To add to Timo's point, > > >> > > >> Statefun actually did that on its Kinesis ser/de interfaces[1,2]. > > >> > > >> Seth > > >> > > >> [1] > > >> > > >> > > > https://github.com/apache/flink-statefun/blob/master/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/ingress/KinesisIngressDeserializer.java > > >> [2] > > >> > > >> > > > https://github.com/apache/flink-statefun/blob/master/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/egress/KinesisEgressSerializer.java > > >> > > >> > > >> On Mon, Apr 6, 2020 at 4:49 AM Timo Walther <twal...@apache.org> > wrote: > > >> > > >>> Hi Dawid, > > >>> > > >>> thanks for this FLIP. This solves a lot of issues with the current > > >>> design for both the Flink contributors and users. +1 for this. > > >>> > > >>> Some minor suggestions from my side: > > >>> - How about finding something shorter for `InitializationContext`? > > Maybe > > >>> just `OpenContext`? > > >>> - While introducing default methods for existing interfaces, shall we > > >>> also create contexts for those methods? I see the following method in > > >>> your FLIP and wonder if we can reduce the number of parameters while > > >>> introducing a new method: > > >>> > > >>> deserialize( > > >>> byte[] recordValue, > > >>> String partitionKey, > > >>> String seqNum, > > >>> long approxArrivalTimestamp, > > >>> String stream, > > >>> String shardId, > > >>> Collector<T> out) > > >>> > > >>> to: > > >>> > > >>> deserialize( > > >>> byte[] recordValue, > > >>> Context c, > > >>> Collector<T> out) > > >>> > > >>> What do you think? > > >>> > > >>> Regards, > > >>> Timo > > >>> > > >>> > > >>> > > >>> On 06.04.20 11:08, Dawid Wysakowicz wrote: > > >>>> Hi devs, > > >>>> > > >>>> When working on improving the Table API/SQL connectors we faced a > few > > >>>> shortcomings of the DeserializationSchema and SerializationSchema > > >>>> interfaces. Similar features were also mentioned by other users in > the > > >>>> past. The shortcomings I would like to address with the FLIP > include: > > >>>> > > >>>> * Emitting 0 to m records from the deserialization schema with per > > >>>> partition watermarks > > >>>> o > > >> https://github.com/apache/flink/pull/3314#issuecomment-376237266 > > >>>> o differentiate null value from no value > > >>>> o support for Debezium CDC format > > >>>> ( > > >> > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-105%3A+Support+to+Interpret+and+Emit+Changelog+in+Flink+SQL > > >>> ) > > >>>> * A way to initialize the schema > > >>>> o establish external connections > > >>>> o generate code on startup > > >>>> o no need for lazy initialization > > >>>> > > >>>> * Access to metrics > > >>>> [ > > >> > > > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Custom-Metrics-outside-RichFunctions-td32282.html#a32329 > > >>> ] > > >>>> One important aspect I would like to hear your opinion on is how to > > >>>> support the Collector interface in Kafka source. Of course if we > agree > > >>>> to add the Collector to the DeserializationSchema. > > >>>> > > >>>> The FLIP can be found here: > > >>>> > > >> > > > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=148645988&src=contextnavpagetreemode > > >>>> Looking forward to your feedback. > > >>>> > > >>>> Best, > > >>>> > > >>>> Dawid > > >>>> > > >>> > > > > >