Hi Dawid, thanks for driving this. big +1 for this flip. Currently we have
to implement an internal flat map in connectors, which is very inconvenient
to support customized parser in DDL.

Here is my minor suggestions:
1. Can we support serialize/deserialize from/to a message with a specified
type instead of byte[], or we would need a lot of other parser interfaces
similar to KakfaDeserializationSchema/KakfaSerializationSchema when we
would like to support other kinds of sources, such as RocketMQ, RabbitMQ,
etc.
2. How can we support serialize multi output records to a compacted
messaged? may be we need a method such as `void flush(Collector output)?`
to support flush buffered record when checkpointing?

On Tue, 7 Apr 2020 at 14:57, Jark Wu <imj...@gmail.com> wrote:

> Thanks for the explanation. Sounds good to me.
>
> Best,
> Jark
>
> On Tue, 7 Apr 2020 at 14:45, Dawid Wysakowicz <dwysakow...@apache.org>
> wrote:
>
> > Hi all,
> >
> > @Timo I'm fine with OpenContext.
> >
> > @Timo @Seth Sure we can combine all the parameters in a single object.
> > Will update the FLIP
> >
> > @Jark I was aware of the implementation of SinkFunction, but it was a
> > conscious choice to not do it that way.
> >
> > Personally I am against giving a default implementation to both the new
> > and old methods. This results in an interface that by default does
> > nothing or notifies the user only in the runtime, that he/she has not
> > implemented a method of the interface, which does not sound like a good
> > practice to me. Moreover I believe the method without a Collector will
> > still be the preferred method by many users. Plus it communicates
> > explicitly what is the minimal functionality required by the interface.
> > Nevertheless I am happy to hear other opinions.
> >
> > @all I also prefer the buffering approach. Let's wait a day or two more
> > to see if others think differently.
> >
> > Best,
> >
> > Dawid
> >
> > On 07/04/2020 06:11, Jark Wu wrote:
> > > Hi Dawid,
> > >
> > > Thanks for driving this. This is a blocker to support Debezium CDC
> format
> > > (FLIP-105). So big +1 from my side.
> > >
> > > Regarding to emitting multiple records and checkpointing, I'm also in
> > favor
> > > of option#1: buffer all the records outside of the checkpoint lock.
> > > I think most of the use cases will not buffer larger data than
> > > it's deserialized byte[].
> > >
> > > I have a minor suggestion on DeserializationSchema: could we have a
> > default
> > > implementation (maybe throw exception) for `T deserialize(byte[]
> > message)`?
> > > I think this will not break compatibility, and users don't have to
> > > implement this deprecated interface if he/she wants to use the new
> > > collector interface.
> > > I think SinkFunction also did this in the same way: introduce a new
> > invoke
> > > method with Context parameter, and give the old invoke method an
> > > empty implemention.
> > >
> > > Best,
> > > Jark
> > >
> > > On Mon, 6 Apr 2020 at 23:51, Seth Wiesman <sjwies...@gmail.com> wrote:
> > >
> > >> I would be in favor of buffering data outside of the checkpoint lock.
> > In my
> > >> experience, serialization is always the biggest performance killer in
> > user
> > >> code and I have a hard time believing in practice that anyone is going
> > to
> > >> buffer so many records that is causes real memory concerns.
> > >>
> > >> To add to Timo's point,
> > >>
> > >> Statefun actually did that on its Kinesis ser/de interfaces[1,2].
> > >>
> > >> Seth
> > >>
> > >> [1]
> > >>
> > >>
> >
> https://github.com/apache/flink-statefun/blob/master/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/ingress/KinesisIngressDeserializer.java
> > >> [2]
> > >>
> > >>
> >
> https://github.com/apache/flink-statefun/blob/master/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/egress/KinesisEgressSerializer.java
> > >>
> > >>
> > >> On Mon, Apr 6, 2020 at 4:49 AM Timo Walther <twal...@apache.org>
> wrote:
> > >>
> > >>> Hi Dawid,
> > >>>
> > >>> thanks for this FLIP. This solves a lot of issues with the current
> > >>> design for both the Flink contributors and users. +1 for this.
> > >>>
> > >>> Some minor suggestions from my side:
> > >>> - How about finding something shorter for `InitializationContext`?
> > Maybe
> > >>> just `OpenContext`?
> > >>> - While introducing default methods for existing interfaces, shall we
> > >>> also create contexts for those methods? I see the following method in
> > >>> your FLIP and wonder if we can reduce the number of parameters while
> > >>> introducing a new method:
> > >>>
> > >>> deserialize(
> > >>>              byte[] recordValue,
> > >>>              String partitionKey,
> > >>>              String seqNum,
> > >>>              long approxArrivalTimestamp,
> > >>>              String stream,
> > >>>              String shardId,
> > >>>              Collector<T> out)
> > >>>
> > >>> to:
> > >>>
> > >>> deserialize(
> > >>>              byte[] recordValue,
> > >>>              Context c,
> > >>>              Collector<T> out)
> > >>>
> > >>> What do you think?
> > >>>
> > >>> Regards,
> > >>> Timo
> > >>>
> > >>>
> > >>>
> > >>> On 06.04.20 11:08, Dawid Wysakowicz wrote:
> > >>>> Hi devs,
> > >>>>
> > >>>> When working on improving the Table API/SQL connectors we faced a
> few
> > >>>> shortcomings of the DeserializationSchema and SerializationSchema
> > >>>> interfaces. Similar features were also mentioned by other users in
> the
> > >>>> past. The shortcomings I would like to address with the FLIP
> include:
> > >>>>
> > >>>>   * Emitting 0 to m records from the deserialization schema with per
> > >>>>     partition watermarks
> > >>>>       o
> > >> https://github.com/apache/flink/pull/3314#issuecomment-376237266
> > >>>>       o differentiate null value from no value
> > >>>>       o support for Debezium CDC format
> > >>>>         (
> > >>
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-105%3A+Support+to+Interpret+and+Emit+Changelog+in+Flink+SQL
> > >>> )
> > >>>>   * A way to initialize the schema
> > >>>>       o establish external connections
> > >>>>       o generate code on startup
> > >>>>       o no need for lazy initialization
> > >>>>
> > >>>>   * Access to metrics
> > >>>>     [
> > >>
> >
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Custom-Metrics-outside-RichFunctions-td32282.html#a32329
> > >>> ]
> > >>>> One important aspect I would like to hear your opinion on is how to
> > >>>> support the Collector interface in Kafka source. Of course if we
> agree
> > >>>> to add the Collector to the DeserializationSchema.
> > >>>>
> > >>>> The FLIP can be found here:
> > >>>>
> > >>
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=148645988&src=contextnavpagetreemode
> > >>>> Looking forward to your feedback.
> > >>>>
> > >>>> Best,
> > >>>>
> > >>>> Dawid
> > >>>>
> > >>>
> >
> >
>

Reply via email to