Hi all,

@Timo I'm fine with OpenContext.

@Timo @Seth Sure we can combine all the parameters in a single object.
Will update the FLIP

@Jark I was aware of the implementation of SinkFunction, but it was a
conscious choice to not do it that way.

Personally I am against giving a default implementation to both the new
and old methods. This results in an interface that by default does
nothing or notifies the user only in the runtime, that he/she has not
implemented a method of the interface, which does not sound like a good
practice to me. Moreover I believe the method without a Collector will
still be the preferred method by many users. Plus it communicates
explicitly what is the minimal functionality required by the interface.
Nevertheless I am happy to hear other opinions.

@all I also prefer the buffering approach. Let's wait a day or two more
to see if others think differently.

Best,

Dawid

On 07/04/2020 06:11, Jark Wu wrote:
> Hi Dawid,
>
> Thanks for driving this. This is a blocker to support Debezium CDC format
> (FLIP-105). So big +1 from my side.
>
> Regarding to emitting multiple records and checkpointing, I'm also in favor
> of option#1: buffer all the records outside of the checkpoint lock.
> I think most of the use cases will not buffer larger data than
> it's deserialized byte[].
>
> I have a minor suggestion on DeserializationSchema: could we have a default
> implementation (maybe throw exception) for `T deserialize(byte[] message)`?
> I think this will not break compatibility, and users don't have to
> implement this deprecated interface if he/she wants to use the new
> collector interface.
> I think SinkFunction also did this in the same way: introduce a new invoke
> method with Context parameter, and give the old invoke method an
> empty implemention.
>
> Best,
> Jark
>
> On Mon, 6 Apr 2020 at 23:51, Seth Wiesman <sjwies...@gmail.com> wrote:
>
>> I would be in favor of buffering data outside of the checkpoint lock. In my
>> experience, serialization is always the biggest performance killer in user
>> code and I have a hard time believing in practice that anyone is going to
>> buffer so many records that is causes real memory concerns.
>>
>> To add to Timo's point,
>>
>> Statefun actually did that on its Kinesis ser/de interfaces[1,2].
>>
>> Seth
>>
>> [1]
>>
>> https://github.com/apache/flink-statefun/blob/master/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/ingress/KinesisIngressDeserializer.java
>> [2]
>>
>> https://github.com/apache/flink-statefun/blob/master/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/egress/KinesisEgressSerializer.java
>>
>>
>> On Mon, Apr 6, 2020 at 4:49 AM Timo Walther <twal...@apache.org> wrote:
>>
>>> Hi Dawid,
>>>
>>> thanks for this FLIP. This solves a lot of issues with the current
>>> design for both the Flink contributors and users. +1 for this.
>>>
>>> Some minor suggestions from my side:
>>> - How about finding something shorter for `InitializationContext`? Maybe
>>> just `OpenContext`?
>>> - While introducing default methods for existing interfaces, shall we
>>> also create contexts for those methods? I see the following method in
>>> your FLIP and wonder if we can reduce the number of parameters while
>>> introducing a new method:
>>>
>>> deserialize(
>>>              byte[] recordValue,
>>>              String partitionKey,
>>>              String seqNum,
>>>              long approxArrivalTimestamp,
>>>              String stream,
>>>              String shardId,
>>>              Collector<T> out)
>>>
>>> to:
>>>
>>> deserialize(
>>>              byte[] recordValue,
>>>              Context c,
>>>              Collector<T> out)
>>>
>>> What do you think?
>>>
>>> Regards,
>>> Timo
>>>
>>>
>>>
>>> On 06.04.20 11:08, Dawid Wysakowicz wrote:
>>>> Hi devs,
>>>>
>>>> When working on improving the Table API/SQL connectors we faced a few
>>>> shortcomings of the DeserializationSchema and SerializationSchema
>>>> interfaces. Similar features were also mentioned by other users in the
>>>> past. The shortcomings I would like to address with the FLIP include:
>>>>
>>>>   * Emitting 0 to m records from the deserialization schema with per
>>>>     partition watermarks
>>>>       o
>> https://github.com/apache/flink/pull/3314#issuecomment-376237266
>>>>       o differentiate null value from no value
>>>>       o support for Debezium CDC format
>>>>         (
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-105%3A+Support+to+Interpret+and+Emit+Changelog+in+Flink+SQL
>>> )
>>>>   * A way to initialize the schema
>>>>       o establish external connections
>>>>       o generate code on startup
>>>>       o no need for lazy initialization
>>>>
>>>>   * Access to metrics
>>>>     [
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Custom-Metrics-outside-RichFunctions-td32282.html#a32329
>>> ]
>>>> One important aspect I would like to hear your opinion on is how to
>>>> support the Collector interface in Kafka source. Of course if we agree
>>>> to add the Collector to the DeserializationSchema.
>>>>
>>>> The FLIP can be found here:
>>>>
>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=148645988&src=contextnavpagetreemode
>>>> Looking forward to your feedback.
>>>>
>>>> Best,
>>>>
>>>> Dawid
>>>>
>>>

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to