That makes sense, thanks for clarifying. Best, Stephan
On Fri, Apr 24, 2020 at 2:15 PM Dawid Wysakowicz <dwysakow...@apache.org> wrote: > Hi Stephan, > > I fully agree with what you said. Also as far as I can tell what was > suggested in the FLIP-124 does not contradict with what you are saying. Let > me clarify it a bit if it is not clear in the document. > > Current implementations of Kafka and Kinesis do the deserialization > outside of the checkpoint lock in threads separate from the main processing > thread already. The approach described as option 1, which had the most > supporters is to keep that behavior. The way I would like to support > emitting multiple results in this setup is to let the DeserializationSchema > deserialize records into a list (via collector) that will be emitted > atomically all at once. > > Currently the behavior can be modelled as: > T record = deserializationSchema.deserialize(...) > synchronized(checkpointLock) { > sourceContext.collect(record) > updateSourceState(...) > } > > and I was suggesting to change it to: > Collector out = new Collector(); > deserializationSchema.deserialize(..., out); > List<T> deserializedRecords = out.getRecords(); > synchronized(checkpointLock) { > for (T record: deserializedRecords) { > sourceContext.collect(record) > } > updateSourceState(...) > > } > > I think that is aligned with your comment to Seth's comment that the > "batch" of records originating from a source record is atomically emitted. > > Best, > > Dawid > > > > On 23/04/2020 14:55, Stephan Ewen wrote: > > Hi! > > Sorry for being a bit late to the party. > > One very important thing to consider for "serialization under checkpoint > lock or not" is: > - If you do it under checkpoint lock, things are automatically correct. > Checkpoint barriers go between original records that correspond to offsets > in the source. > - If you deserialize outside the checkpoint lock, then you read a record > from the source but only partially emit it. In that case you need to store > the difference (not emitted part) in the checkpoint. > > ==> I would advise against trying to emit partial records, i.e. doing > things outside the checkpoint lock. FLIP-27 will by default also not do > partial emission of unnested events. Also, it is questionable whether > optimizing this in the source makes sense when no other operator supports > that (flatMap, etc.). > > Regarding Seth's comment about performance: > - For that it does probably makes not so much difference whether this is > under lock or not, but more whether this can be pushed to another thread > (source's I/O thread), so that it does not add load to the main task > processing thread. > > ==> This means that the I/O thread deserialized that "batch" that it hands > over. > ==> Still, it is important that all records coming from one original > source record are emitted atomically, otherwise we have the same issue as > above. > > Best, > Stephan > > > On Tue, Apr 14, 2020 at 10:35 AM Dawid Wysakowicz <dwysakow...@apache.org> > wrote: > >> Hi Xiaogang, >> >> I very much agree with Jark's and Aljoscha's responses. >> >> >> On 10/04/2020 17:35, Jark Wu wrote: >> > Hi Xiaogang, >> > >> > I think this proposal doesn't conflict with your use case, you can still >> > chain a ProcessFunction after a source which emits raw data. >> > But I'm not in favor of chaining ProcessFunction after source, and we >> > should avoid that, because: >> > >> > 1) For correctness, it is necessary to perform the watermark generation >> as >> > early as possible in order to be close to the actual data >> > generation within a source's data partition. This is also the purpose >> of >> > per-partition watermark and event-time alignment. >> > Many on going FLIPs (e.g. FLIP-27, FLIP-95) works a lot on this effort. >> > Deseriazing records and generating watermark in chained >> > ProcessFunction makes it difficult to do per-partition watermark in the >> > future. >> > 2) In Flink SQL, a source should emit the deserialized row instead of >> raw >> > data. Otherwise, users have to define raw byte[] as the >> > single column of the defined table, and parse them in queries, which is >> > very inconvenient. >> > >> > Best, >> > Jark >> > >> > On Fri, 10 Apr 2020 at 09:18, SHI Xiaogang <shixiaoga...@gmail.com> >> wrote: >> > >> >> Hi, >> >> >> >> I don't think the proposal is a good solution to the problems. I am in >> >> favour of using a ProcessFunction chained to the source/sink function >> to >> >> serialize/deserialize the records, instead of embedding >> (de)serialization >> >> schema in source/sink function. >> >> >> >> Message packing is heavily used in our production environment to allow >> >> compression and improve throughput. As buffered messages have to be >> >> delivered when the time exceeds the limit, timers are also required in >> our >> >> cases. I think it's also a common need for other users. >> >> >> >> In the this proposal, with more components added into the context, in >> the >> >> end we will find the serialization/deserialization schema is just >> another >> >> wrapper of ProcessFunction. >> >> >> >> Regards, >> >> Xiaogang >> >> >> >> Aljoscha Krettek <aljos...@apache.org> 于2020年4月7日周二 下午6:34写道: >> >> >> >>> On 07.04.20 08:45, Dawid Wysakowicz wrote: >> >>> >> >>>> @Jark I was aware of the implementation of SinkFunction, but it was a >> >>>> conscious choice to not do it that way. >> >>>> >> >>>> Personally I am against giving a default implementation to both the >> new >> >>>> and old methods. This results in an interface that by default does >> >>>> nothing or notifies the user only in the runtime, that he/she has not >> >>>> implemented a method of the interface, which does not sound like a >> good >> >>>> practice to me. Moreover I believe the method without a Collector >> will >> >>>> still be the preferred method by many users. Plus it communicates >> >>>> explicitly what is the minimal functionality required by the >> interface. >> >>>> Nevertheless I am happy to hear other opinions. >> >>> Dawid and I discussed this before. I did the extension of the >> >>> SinkFunction but by now I think it's better to do it this way, because >> >>> otherwise you can implement the interface without implementing any >> >> methods. >> >>>> @all I also prefer the buffering approach. Let's wait a day or two >> more >> >>>> to see if others think differently. >> >>> I'm also in favour of buffering outside the lock. >> >>> >> >>> Also, +1 to this FLIP. >> >>> >> >>> Best, >> >>> Aljoscha >> >>> >> >>