That makes sense, thanks for clarifying.

Best,
Stephan


On Fri, Apr 24, 2020 at 2:15 PM Dawid Wysakowicz <dwysakow...@apache.org>
wrote:

> Hi Stephan,
>
> I fully agree with what you said. Also as far as I can tell what was
> suggested in the FLIP-124 does not contradict with what you are saying. Let
> me clarify it a bit if it is not clear in the document.
>
> Current implementations of Kafka and Kinesis do the deserialization
> outside of the checkpoint lock in threads separate from the main processing
> thread already. The approach described as option 1, which had the most
> supporters is to keep that behavior. The way I would like to support
> emitting multiple results in this setup is to let the DeserializationSchema
> deserialize records into a list (via collector) that will be emitted
> atomically all at once.
>
> Currently the behavior can be modelled as:
> T record = deserializationSchema.deserialize(...)
> synchronized(checkpointLock) {
>    sourceContext.collect(record)
>    updateSourceState(...)
> }
>
> and I was suggesting to change it to:
> Collector out = new Collector();
> deserializationSchema.deserialize(..., out);
> List<T> deserializedRecords = out.getRecords();
> synchronized(checkpointLock) {
>    for (T record: deserializedRecords) {
>         sourceContext.collect(record)
>    }
>    updateSourceState(...)
>
> }
>
> I think that is aligned with your comment to Seth's comment that the
> "batch" of records originating from a source record is atomically emitted.
>
> Best,
>
> Dawid
>
>
>
> On 23/04/2020 14:55, Stephan Ewen wrote:
>
> Hi!
>
> Sorry for being a bit late to the party.
>
> One very important thing to consider for "serialization under checkpoint
> lock or not" is:
>   - If you do it under checkpoint lock, things are automatically correct.
> Checkpoint barriers go between original records that correspond to offsets
> in the source.
>   - If you deserialize outside the checkpoint lock, then you read a record
> from the source but only partially emit it. In that case you need to store
> the difference (not emitted part) in the checkpoint.
>
> ==> I would advise against trying to emit partial records, i.e. doing
> things outside the checkpoint lock. FLIP-27 will by default also not do
> partial emission of unnested events. Also, it is questionable whether
> optimizing this in the source makes sense when no other operator supports
> that (flatMap, etc.).
>
> Regarding Seth's comment about performance:
>   - For that it does probably makes not so much difference whether this is
> under lock or not, but more whether this can be pushed to another thread
> (source's I/O thread), so that it does not add load to the main task
> processing thread.
>
> ==> This means that the I/O thread deserialized that "batch" that it hands
> over.
> ==> Still, it is important that all records coming from one original
> source record are emitted atomically, otherwise we have the same issue as
> above.
>
> Best,
> Stephan
>
>
> On Tue, Apr 14, 2020 at 10:35 AM Dawid Wysakowicz <dwysakow...@apache.org>
> wrote:
>
>> Hi Xiaogang,
>>
>> I very much agree with Jark's and Aljoscha's responses.
>>
>>
>> On 10/04/2020 17:35, Jark Wu wrote:
>> > Hi Xiaogang,
>> >
>> > I think this proposal doesn't conflict with your use case, you can still
>> > chain a ProcessFunction after a source which emits raw data.
>> > But I'm not in favor of chaining ProcessFunction after source, and we
>> > should avoid that, because:
>> >
>> > 1) For correctness, it is necessary to perform the watermark generation
>> as
>> > early as possible in order to be close to the actual data
>> >  generation within a source's data partition. This is also the purpose
>> of
>> > per-partition watermark and event-time alignment.
>> >  Many on going FLIPs (e.g. FLIP-27, FLIP-95) works a lot on this effort.
>> > Deseriazing records and generating watermark in chained
>> >  ProcessFunction makes it difficult to do per-partition watermark in the
>> > future.
>> > 2) In Flink SQL, a source should emit the deserialized row instead of
>> raw
>> > data. Otherwise, users have to define raw byte[] as the
>> >  single column of the defined table, and parse them in queries, which is
>> > very inconvenient.
>> >
>> > Best,
>> > Jark
>> >
>> > On Fri, 10 Apr 2020 at 09:18, SHI Xiaogang <shixiaoga...@gmail.com>
>> wrote:
>> >
>> >> Hi,
>> >>
>> >> I don't think the proposal is a good solution to the problems. I am in
>> >> favour of using a ProcessFunction chained to the source/sink function
>> to
>> >> serialize/deserialize the records, instead of embedding
>> (de)serialization
>> >> schema in source/sink function.
>> >>
>> >> Message packing is heavily used in our production environment to allow
>> >> compression and improve throughput. As buffered messages have to be
>> >> delivered when the time exceeds the limit, timers are also required in
>> our
>> >> cases. I think it's also a common need for other users.
>> >>
>> >> In the this proposal, with more components added into the context, in
>> the
>> >> end we will find the serialization/deserialization schema is just
>> another
>> >> wrapper of ProcessFunction.
>> >>
>> >> Regards,
>> >> Xiaogang
>> >>
>> >> Aljoscha Krettek <aljos...@apache.org> 于2020年4月7日周二 下午6:34写道:
>> >>
>> >>> On 07.04.20 08:45, Dawid Wysakowicz wrote:
>> >>>
>> >>>> @Jark I was aware of the implementation of SinkFunction, but it was a
>> >>>> conscious choice to not do it that way.
>> >>>>
>> >>>> Personally I am against giving a default implementation to both the
>> new
>> >>>> and old methods. This results in an interface that by default does
>> >>>> nothing or notifies the user only in the runtime, that he/she has not
>> >>>> implemented a method of the interface, which does not sound like a
>> good
>> >>>> practice to me. Moreover I believe the method without a Collector
>> will
>> >>>> still be the preferred method by many users. Plus it communicates
>> >>>> explicitly what is the minimal functionality required by the
>> interface.
>> >>>> Nevertheless I am happy to hear other opinions.
>> >>> Dawid and I discussed this before. I did the extension of the
>> >>> SinkFunction but by now I think it's better to do it this way, because
>> >>> otherwise you can implement the interface without implementing any
>> >> methods.
>> >>>> @all I also prefer the buffering approach. Let's wait a day or two
>> more
>> >>>> to see if others think differently.
>> >>> I'm also in favour of buffering outside the lock.
>> >>>
>> >>> Also, +1 to this FLIP.
>> >>>
>> >>> Best,
>> >>> Aljoscha
>> >>>
>>
>>

Reply via email to