Hi all, @Timo I'm fine with OpenContext.
@Timo @Seth Sure we can combine all the parameters in a single object. Will update the FLIP @Jark I was aware of the implementation of SinkFunction, but it was a conscious choice to not do it that way. Personally I am against giving a default implementation to both the new and old methods. This results in an interface that by default does nothing or notifies the user only in the runtime, that he/she has not implemented a method of the interface, which does not sound like a good practice to me. Moreover I believe the method without a Collector will still be the preferred method by many users. Plus it communicates explicitly what is the minimal functionality required by the interface. Nevertheless I am happy to hear other opinions. @all I also prefer the buffering approach. Let's wait a day or two more to see if others think differently. Best, Dawid On 07/04/2020 06:11, Jark Wu wrote: > Hi Dawid, > > Thanks for driving this. This is a blocker to support Debezium CDC format > (FLIP-105). So big +1 from my side. > > Regarding to emitting multiple records and checkpointing, I'm also in favor > of option#1: buffer all the records outside of the checkpoint lock. > I think most of the use cases will not buffer larger data than > it's deserialized byte[]. > > I have a minor suggestion on DeserializationSchema: could we have a default > implementation (maybe throw exception) for `T deserialize(byte[] message)`? > I think this will not break compatibility, and users don't have to > implement this deprecated interface if he/she wants to use the new > collector interface. > I think SinkFunction also did this in the same way: introduce a new invoke > method with Context parameter, and give the old invoke method an > empty implemention. > > Best, > Jark > > On Mon, 6 Apr 2020 at 23:51, Seth Wiesman <sjwies...@gmail.com> wrote: > >> I would be in favor of buffering data outside of the checkpoint lock. In my >> experience, serialization is always the biggest performance killer in user >> code and I have a hard time believing in practice that anyone is going to >> buffer so many records that is causes real memory concerns. >> >> To add to Timo's point, >> >> Statefun actually did that on its Kinesis ser/de interfaces[1,2]. >> >> Seth >> >> [1] >> >> https://github.com/apache/flink-statefun/blob/master/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/ingress/KinesisIngressDeserializer.java >> [2] >> >> https://github.com/apache/flink-statefun/blob/master/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/egress/KinesisEgressSerializer.java >> >> >> On Mon, Apr 6, 2020 at 4:49 AM Timo Walther <twal...@apache.org> wrote: >> >>> Hi Dawid, >>> >>> thanks for this FLIP. This solves a lot of issues with the current >>> design for both the Flink contributors and users. +1 for this. >>> >>> Some minor suggestions from my side: >>> - How about finding something shorter for `InitializationContext`? Maybe >>> just `OpenContext`? >>> - While introducing default methods for existing interfaces, shall we >>> also create contexts for those methods? I see the following method in >>> your FLIP and wonder if we can reduce the number of parameters while >>> introducing a new method: >>> >>> deserialize( >>> byte[] recordValue, >>> String partitionKey, >>> String seqNum, >>> long approxArrivalTimestamp, >>> String stream, >>> String shardId, >>> Collector<T> out) >>> >>> to: >>> >>> deserialize( >>> byte[] recordValue, >>> Context c, >>> Collector<T> out) >>> >>> What do you think? >>> >>> Regards, >>> Timo >>> >>> >>> >>> On 06.04.20 11:08, Dawid Wysakowicz wrote: >>>> Hi devs, >>>> >>>> When working on improving the Table API/SQL connectors we faced a few >>>> shortcomings of the DeserializationSchema and SerializationSchema >>>> interfaces. Similar features were also mentioned by other users in the >>>> past. The shortcomings I would like to address with the FLIP include: >>>> >>>> * Emitting 0 to m records from the deserialization schema with per >>>> partition watermarks >>>> o >> https://github.com/apache/flink/pull/3314#issuecomment-376237266 >>>> o differentiate null value from no value >>>> o support for Debezium CDC format >>>> ( >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-105%3A+Support+to+Interpret+and+Emit+Changelog+in+Flink+SQL >>> ) >>>> * A way to initialize the schema >>>> o establish external connections >>>> o generate code on startup >>>> o no need for lazy initialization >>>> >>>> * Access to metrics >>>> [ >> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Custom-Metrics-outside-RichFunctions-td32282.html#a32329 >>> ] >>>> One important aspect I would like to hear your opinion on is how to >>>> support the Collector interface in Kafka source. Of course if we agree >>>> to add the Collector to the DeserializationSchema. >>>> >>>> The FLIP can be found here: >>>> >> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=148645988&src=contextnavpagetreemode >>>> Looking forward to your feedback. >>>> >>>> Best, >>>> >>>> Dawid >>>> >>>
signature.asc
Description: OpenPGP digital signature