Thanks for the explanation. Sounds good to me.

Best,
Jark

On Tue, 7 Apr 2020 at 14:45, Dawid Wysakowicz <dwysakow...@apache.org>
wrote:

> Hi all,
>
> @Timo I'm fine with OpenContext.
>
> @Timo @Seth Sure we can combine all the parameters in a single object.
> Will update the FLIP
>
> @Jark I was aware of the implementation of SinkFunction, but it was a
> conscious choice to not do it that way.
>
> Personally I am against giving a default implementation to both the new
> and old methods. This results in an interface that by default does
> nothing or notifies the user only in the runtime, that he/she has not
> implemented a method of the interface, which does not sound like a good
> practice to me. Moreover I believe the method without a Collector will
> still be the preferred method by many users. Plus it communicates
> explicitly what is the minimal functionality required by the interface.
> Nevertheless I am happy to hear other opinions.
>
> @all I also prefer the buffering approach. Let's wait a day or two more
> to see if others think differently.
>
> Best,
>
> Dawid
>
> On 07/04/2020 06:11, Jark Wu wrote:
> > Hi Dawid,
> >
> > Thanks for driving this. This is a blocker to support Debezium CDC format
> > (FLIP-105). So big +1 from my side.
> >
> > Regarding to emitting multiple records and checkpointing, I'm also in
> favor
> > of option#1: buffer all the records outside of the checkpoint lock.
> > I think most of the use cases will not buffer larger data than
> > it's deserialized byte[].
> >
> > I have a minor suggestion on DeserializationSchema: could we have a
> default
> > implementation (maybe throw exception) for `T deserialize(byte[]
> message)`?
> > I think this will not break compatibility, and users don't have to
> > implement this deprecated interface if he/she wants to use the new
> > collector interface.
> > I think SinkFunction also did this in the same way: introduce a new
> invoke
> > method with Context parameter, and give the old invoke method an
> > empty implemention.
> >
> > Best,
> > Jark
> >
> > On Mon, 6 Apr 2020 at 23:51, Seth Wiesman <sjwies...@gmail.com> wrote:
> >
> >> I would be in favor of buffering data outside of the checkpoint lock.
> In my
> >> experience, serialization is always the biggest performance killer in
> user
> >> code and I have a hard time believing in practice that anyone is going
> to
> >> buffer so many records that is causes real memory concerns.
> >>
> >> To add to Timo's point,
> >>
> >> Statefun actually did that on its Kinesis ser/de interfaces[1,2].
> >>
> >> Seth
> >>
> >> [1]
> >>
> >>
> https://github.com/apache/flink-statefun/blob/master/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/ingress/KinesisIngressDeserializer.java
> >> [2]
> >>
> >>
> https://github.com/apache/flink-statefun/blob/master/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/egress/KinesisEgressSerializer.java
> >>
> >>
> >> On Mon, Apr 6, 2020 at 4:49 AM Timo Walther <twal...@apache.org> wrote:
> >>
> >>> Hi Dawid,
> >>>
> >>> thanks for this FLIP. This solves a lot of issues with the current
> >>> design for both the Flink contributors and users. +1 for this.
> >>>
> >>> Some minor suggestions from my side:
> >>> - How about finding something shorter for `InitializationContext`?
> Maybe
> >>> just `OpenContext`?
> >>> - While introducing default methods for existing interfaces, shall we
> >>> also create contexts for those methods? I see the following method in
> >>> your FLIP and wonder if we can reduce the number of parameters while
> >>> introducing a new method:
> >>>
> >>> deserialize(
> >>>              byte[] recordValue,
> >>>              String partitionKey,
> >>>              String seqNum,
> >>>              long approxArrivalTimestamp,
> >>>              String stream,
> >>>              String shardId,
> >>>              Collector<T> out)
> >>>
> >>> to:
> >>>
> >>> deserialize(
> >>>              byte[] recordValue,
> >>>              Context c,
> >>>              Collector<T> out)
> >>>
> >>> What do you think?
> >>>
> >>> Regards,
> >>> Timo
> >>>
> >>>
> >>>
> >>> On 06.04.20 11:08, Dawid Wysakowicz wrote:
> >>>> Hi devs,
> >>>>
> >>>> When working on improving the Table API/SQL connectors we faced a few
> >>>> shortcomings of the DeserializationSchema and SerializationSchema
> >>>> interfaces. Similar features were also mentioned by other users in the
> >>>> past. The shortcomings I would like to address with the FLIP include:
> >>>>
> >>>>   * Emitting 0 to m records from the deserialization schema with per
> >>>>     partition watermarks
> >>>>       o
> >> https://github.com/apache/flink/pull/3314#issuecomment-376237266
> >>>>       o differentiate null value from no value
> >>>>       o support for Debezium CDC format
> >>>>         (
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-105%3A+Support+to+Interpret+and+Emit+Changelog+in+Flink+SQL
> >>> )
> >>>>   * A way to initialize the schema
> >>>>       o establish external connections
> >>>>       o generate code on startup
> >>>>       o no need for lazy initialization
> >>>>
> >>>>   * Access to metrics
> >>>>     [
> >>
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Custom-Metrics-outside-RichFunctions-td32282.html#a32329
> >>> ]
> >>>> One important aspect I would like to hear your opinion on is how to
> >>>> support the Collector interface in Kafka source. Of course if we agree
> >>>> to add the Collector to the DeserializationSchema.
> >>>>
> >>>> The FLIP can be found here:
> >>>>
> >>
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=148645988&src=contextnavpagetreemode
> >>>> Looking forward to your feedback.
> >>>>
> >>>> Best,
> >>>>
> >>>> Dawid
> >>>>
> >>>
>
>

Reply via email to