Hi Arvid, I spent time reading through the existing KafkaSource related code and thinking about the best possible solution in the last few days. Now I no longer think it is a good idea to let user specify this logic in de-serializer and pass this information via the Collector. I also thought more about the solution you suggested (e.g. merge the message-payload-based stopping with the existing offset-based stopping logic into one class) and preferred not to do this (yet) for the reasons listed in here <https://cwiki.apache.org/confluence/display/FLINK/FLIP-208%3A+Update+KafkaSource+to+detect+EOF+based+on+de-serialized+records#FLIP208:UpdateKafkaSourcetodetectEOFbasedondeserializedrecords-RejectedAlternatives> .
I have documented another possible solution in FLIP-208 (link <https://cwiki.apache.org/confluence/display/FLINK/FLIP-208%3A+Update+KafkaSource+to+detect+EOF+based+on+de-serialized+records#FLIP208:UpdateKafkaSourcetodetectEOFbasedondeserializedrecords-RejectedAlternatives>) and have opened the discussion thread. Maybe we can continue the discussion there. Cheers, Dong On Wed, Dec 29, 2021 at 12:38 PM Dong Lin <lindon...@gmail.com> wrote: > Hi Arvid, > > After discussing with Jiangjie offline, I agree using Collector::close() > is not appropriate because we in general prefer close() to be called by one > entity, in this case the Flink runtime. Having close() called by both user > and Flink runtime could be error-prone, even though we can make it work > with some extra work. > > I am now thinking about adding a public class (e.g. SourceCollector) that > extends the existing Collector. SourceCollector::endOfStream() could be > invoked by users to signal EOF. And users could optionally implement a > KafkaRecordDeserializationSchema::deserialize(SourceCollector) if they > want to do dynamic EOF. > > To make our discussion more efficient and possibly involve more people for > comments, I will create a FLIP and open a discussion thread for the FLIP. > > Thanks, > Dong > > > > On Tue, Dec 28, 2021 at 7:40 PM Dong Lin <lindon...@gmail.com> wrote: > >> Hi Arvid, >> >> Thanks a lot for the detailed reply. >> >> Just to clarify, I don't plan to ask user to implement >> KafkaRecordDeserializationSchema#isEndOfStream(T). My current idea is to >> not add any public API, but expect users to re-use the >> existing Collector::close() API inside >> KafkaRecordDeserializationSchema::deserialize(...). And if a message with >> the user-specified pattern has arrived, the user can invoke >> Collector::close() which signals Flink to stop reading from the >> corresponding source split. >> >> Here are a few clarifications in response to the discussion: >> >> 1) The boundedness of the source and execution.runtime-mode would not be >> affected by this proposal. Users can keep using the existing setting >> without or without the dynamic EOF. >> 2) The dynamic EOF works independently of the >> stop stoppingOffsetsInitializer. When reading from Kafka Source, users can >> rely on just the dynamic EOF without specifying stoppingOffsetsInitializer. >> And if users specify both dynamic EOF and stoppingOffsetsInitializer, the >> job stops reading from the source split when either condition is met. >> 3) Suppose users can specify the dynamic EOF in >> KafkaRecordDeserializationSchema::deserialize(...), then users have access >> to the entire ConsumerRecord. This approach could address Ayush's use-case. >> 4) Suppose we choose to do it in >> KafkaRecordDeserializationSchema::deserialize(...), then the dynamic EOF >> happens inside the RecordEmitter. Yes we will need to be able to close the >> split. >> 5) For the majority of users who do not want dynamic EOF, those users can >> keep using the existing out-of-the-box support for Avro/Json/Protobuf. >> For advanced users who want dynamic EOF, those users anyway need to encode >> the dynamic EOF logic in a method similar to >> KafkaRecordDeserializationSchema (with access to the raw message). Adding >> the dynamic EOF support would not make their life harder. >> >> Based on the discussion so far, it looks like there are two approaches >> mentioned so far: >> >> 1) Let users call Collector::close() API inside >> KafkaRecordDeserializationSchema::deserialize(...) to signal the EOF. >> >> 2) Add the API KafkaSourceBuilder::setBoundedStopCursor(StopCursor), >> where StopCursor subsumes all existing functionalities of >> the stoppingOffsetsInitializer. And StopCursor::shouldStop needs to take >> both the raw and the deserialized message. >> >> It seems that the second approach involves much more API change than the >> first work (including deprecation of some existing APIs). >> >> Regarding the first approach, could you help explain why "close is the >> completely wrong method for that"? My understanding is the close() method >> indicates that the caller no longer needs to read from this source split >> and the associated network resource could be released. Why is it wrong for >> a user to call this method? >> >> >> On Tue, Dec 28, 2021 at 5:15 PM Arvid Heise <ar...@apache.org> wrote: >> >>> Hi Dong, >>> >>> Could you help explain why we can not dynamically stop reading from a >>>> source in batch mode? >>>> >>> We can but we cannot easily determine if the source is supposed to run >>> in batch or streaming. A user would need to implement a special >>> KafkaRecordDeserializationSchema and still provide an OffsetInitializer for >>> the end offset to trigger batch mode. >>> >>> How are both concepts supposed to interact? Are we only stopping if any >>> of the concept state that this is the end? >>> >>> We could ofc offer some KafkaSourceBuilde#setBounded() without >>> parameters so that a user can implement a special >>> KafkaRecordDeserializationSchema and notify the builder but this looks >>> awkward to me and is quite error-prone: When a user uses setBounded without >>> overwriting isEndOfStream, the application would never emit anything. >>> >>> My understanding is that when a message with the particular pattern >>>> (specified by the user) is encountered, we can have the source operator >>>> emit the high-watermark in such a way as if the particular partition of >>>> this source has reached EOF. And this must have worked since users have >>>> been using KafkaDeserializationSchema::isEndOfStream with the >>>> legacy FlinkKafkaConsumer. Did I miss something here? >>>> >>> Yes batch mode is different from bounded streaming. [1] We can only >>> fully leverage a statically bounded source by statically defining it as >>> such with the FLIP-27 Source interface. [2] >>> >>> Hmm.. users already need to provide a KafkaRecordDeserializationSchema >>>> via KafkaSourceBuilder::setDeserializer(...) today even if we don't support >>>> dynamic EOF. Do you mean that if we support dynamic EOF, then it will be >>>> harder for user to implement KafkaRecordDeserializationSchema? >>>> >>> Users mostly use the factory methods that adapt to Flink's >>> DeserializationSchema. We should also offer a builder similarly to >>> KafkaRecordSerializationSchemaBuilder. >>> >>> Regarding "how to use it from Table/SQL", support we allow user to >>>> encode this dynamic EOF logic inside KafkaRecordDeserializationSchema. >>> >>> I'm not sure if we can/should expose dynamic EOF in SQL but at the very >>> least we should properly support end offsets (as it's now possible). We >>> must avoid removing the current end offsets in favor of >>> KafkaRecordDeserializationSchema#isEndOfStream to enable Table/SQL to use >>> bounded Kafka sources. >>> >>> e.g. call Collector::close() if the message content matches a >>>> user-specified pattern >>>> >>> No, close is the completely wrong method for that. This method should >>> have never been exposed to the user as it will close the network resources. >>> However, we need a fully functional network stack for proper shutdown. >>> >>> It appears that StopCursor::shouldStop(...) takes a raw Message. While >>>> user could implement the dynamic EOF logic in this method, I am worried >>>> that this approach would lead to inferior performance due to double message >>>> deserialization. >>>> >>> That is a fair point. In case of Ayush, however, it's the only way to >>> determine that the pipeline should stop (you pretty much compare if the 5. >>> byte in the message has changed). If you deserialize into a SpecificRecord, >>> then the writer schema version is lost for isEndOfStream(T deserialized). >>> >>> Another concern I have for >>> KafkaRecordDeserializationSchema#isEndOfStream(T) is where it is supposed >>> to be called then. If it's in the RecordEmitter, we need to extend the >>> RecordEmitter to support closing the split. If it's in the SplitReader, we >>> probably also need double-deserialization because of FLINK-25132 (the >>> record needs to be deserialized in the RecordEmitter). Maybe you can encode >>> it in the SplitState but this sounds rather cumbersome if it needs to be >>> done for all sources. >>> >>> The reason is that the user's logic will likely depend on the >>>> de-serialized message (as opposed to the raw byte in the >>>> org.apache.pulsar.client.api.Message.getData()). In this case, users will >>>> need to deserialize the message inside StopCursor::shouldStop(...) first >>>> and then the message would be de-serialized again by >>>> the PulsarDeserializationSchema, which is specified via >>>> the PulsarSourceBuilder::setDeserializationSchema. >>>> >>> As written before, this is not the case of the specific user. Having the >>> raw message makes it much easier to determine a writer schema change. I'm >>> sure that there are cases, where you need to look into the data though. To >>> avoid double-deserialization, a better way may be to pass both the raw and >>> the deserialized message to `shouldStop` but then we should move the stop >>> logic to RecordEmitter as written before. >>> >>> Do you mean that you prefer to replace >>>> KafkaSourceBuilder::setBounded(OffsetsInitializer) with something like >>>> PulsarSourceBuilder::setBoundedStopCursor(StopCursor)? >>>> >>> Ideally, yes. But that needs to be backward compatible as it's a >>> PublicEvolving interface. >>> >>> I agree it is cleaner to let PartitionOffsetsRetrieverImpl >>>> use adminClient only without using KafkaClient. On the other hand, it seems >>>> that there is no performance/correctness concern with the existing >>>> approach? Is this issue related to the discussion of dynamic EOF? >>> >>> I just meant that a user probably only needs access to the adminClient >>> to retrieve the offsets of a topic and that Kafka's >>> PartitionOffsetsRetriever nicely hides the client from the user. I'm sure a >>> user can easily mess up with the admin client in Pulsar (what happens if >>> this is closed, is the client internally used somewhere else?). >>> >>> >>> TL;DR >>> In general, I like the separation of concerns: >>> KafkaRecordDeserializationSchema is for deserializing and StopCondition (or >>> however we call it) is for stopping. Then a user can reuse pre-defined >>> KafkaRecordDeserializationSchema and mix in the stopping logic when needed >>> (remember this is a rare case). >>> >>> In most cases, a user will use Avro/Json/Protobuf + schema registry, so >>> if we provide out-of-the-box support for these formats, the user doesn't >>> need to touch KafkaRecordDeserializationSchema at all. Then it would be >>> nice to have a single interface that determines when the source stops (e.g. >>> StopCondition) with pre-defined implementations (see factory methods in >>> OffsetsInitializer) for Table/SQL. We could even provide a predefined >>> strategy for schema changes when the schema registry is used. >>> >>> If you already have use-cases that relies on the deserialized data, then >>> let's move the stopping logic to RecordEmitter. At this point, I'd propose >>> to pass the raw and deserialized data to the StopCondition. >>> >>> [1] >>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/execution_mode/ >>> [2] >>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface#FLIP27:RefactorSourceInterface-BatchandStreamingUnification >>> >>> >>> On Mon, Dec 27, 2021 at 5:01 PM Dong Lin <lindon...@gmail.com> wrote: >>> >>>> Hi Arvid, >>>> >>>> Thanks for the suggestion! Sorry for the late reply. I just finished >>>> investigating the PulsarSource/StopCursor as you suggested. Please see >>>> my reply inline. >>>> >>>> On Sun, Dec 19, 2021 at 6:53 PM Arvid Heise <ar...@apache.org> wrote: >>>> >>>>> Hi Dong, >>>>> >>>>> I see your point. The main issue with dynamic EOF is that we can't run >>>>> in batch mode. That may be desired in the case of Ayush but there may be >>>>> other use cases where it's not. >>>>> >>>> >>>> Could you help explain why we can not dynamically stop reading from a >>>> source in batch mode? >>>> >>>> My understanding is that when a message with the particular pattern >>>> (specified by the user) is encountered, we can have the source operator >>>> emit the high-watermark in such a way as if the particular partition of >>>> this source has reached EOF. And this must have worked since users have >>>> been using KafkaDeserializationSchema::isEndOfStream with the >>>> legacy FlinkKafkaConsumer. Did I miss something here? >>>> >>>> Additionally, it's quite a bit of code if you'd implement a >>>>> KafkaRecordDeserializationSchema from scratch. There is also no obvious >>>>> way >>>>> on how to use it from Table/SQL. >>>>> >>>> >>>> Hmm.. users already need to provide a KafkaRecordDeserializationSchema >>>> via KafkaSourceBuilder::setDeserializer(...) today even if we don't support >>>> dynamic EOF. Do you mean that if we support dynamic EOF, then it will be >>>> harder for user to implement KafkaRecordDeserializationSchema? >>>> >>>> Regarding "how to use it from Table/SQL", support we allow user to >>>> encode this dynamic EOF logic inside KafkaRecordDeserializationSchema (e.g. >>>> call Collector::close() if the message content matches a user-specified >>>> pattern), then effect of this change is same as if the partition has >>>> reached EOF, and Table/SQL can handle this effect as they are doing now >>>> without any extra change. Does this make sense? >>>> >>>> >>>>> >>>>> I think we should get inspired on how PulsarSource is solving it. They >>>>> have an orthogonal interface StopCursor (we could call it StopCondition) >>>>> [1]. It has some default values (I wonder if we could implement them as >>>>> enums for easier Table integration). >>>>> >>>> >>>> It appears that StopCursor::shouldStop(...) takes a raw Message. While >>>> user could implement the dynamic EOF logic in this method, I am worried >>>> that this approach would lead to inferior performance due to double message >>>> deserialization. >>>> >>>> The reason is that the user's logic will likely depend on the >>>> de-serialized message (as opposed to the raw byte in the >>>> org.apache.pulsar.client.api.Message.getData()). In this case, users will >>>> need to deserialize the message inside StopCursor::shouldStop(...) first >>>> and then the message would be de-serialized again by >>>> the PulsarDeserializationSchema, which is specified via >>>> the PulsarSourceBuilder::setDeserializationSchema. >>>> >>>> In comparison, messages can be deserialized only once if we allow users >>>> to specify the dynamic EOF logic inside >>>> KafkaRecordDeserializationSchema/PulsarDeserializationSchema. >>>> >>>> >>>>> Ideally, this interface would subsume OffsetsInitializer on stopping >>>>> side. I think it was not wise to use OffsetsInitializer also for stop >>>>> offsets as things like OffsetResetStrategy do not make any sense. >>>>> >>>> >>>> Do you mean that you prefer to replace >>>> KafkaSourceBuilder::setBounded(OffsetsInitializer) with something like >>>> PulsarSourceBuilder::setBoundedStopCursor(StopCursor)? >>>> >>>> Without digging into detail whether this replacement is feasible, I >>>> agree StopCursor seems to be cleaner than OffsetsInitializer. On the other >>>> hand, if we don't plan to put the dynamic EOF logic inside StopCursor (e.g. >>>> due to the double serialization issue described above), I guess it is >>>> probably simpler to separate this from the discussion of the dynamic EOF? >>>> >>>> >>>>> >>>>> Compared to Pulsar, I like the PartitionOffsetsRetriever to avoid >>>>> having to hand in the KafkaClient (as we do in Pulsar). >>>>> >>>> >>>> Do you mean that you prefer to remove KafkaClient from >>>> PartitionOffsetsRetrieverImpl? >>>> >>>> I agree it is cleaner to let PartitionOffsetsRetrieverImpl >>>> use adminClient only without using KafkaClient. On the other hand, it seems >>>> that there is no performance/correctness concern with the existing >>>> approach? Is this issue related to the discussion of dynamic EOF? >>>> >>>> >>>>> I hope I gave some pointers. >>>>> >>>>> [1] >>>>> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/StopCursor.java#L41-L41 >>>>> >>>>> On Fri, Dec 17, 2021 at 9:00 AM Dong Lin <lindon...@gmail.com> wrote: >>>>> >>>>>> Yep, dynamic schema change could be a good solution for the >>>>>> particular use-case mentioned by Ayush. >>>>>> >>>>>> On the other hand, I have heard of valid use-cases where we want to >>>>>> stop the job based on a control message. For example, let's say we have a >>>>>> Flink job that keeps processing stock transaction data fetched from Kafka >>>>>> in real time. Suppose the stock market closes at 4pm, we probably want >>>>>> the >>>>>> Flink job to stop after it has processed all the transaction data of that >>>>>> day, instead of running it for the whole day, in order to save CPU cost. >>>>>> >>>>>> As of Flink 1.13, users can achieve this goal by sending a special >>>>>> message to the Kafka topic, and encode logic in the deserializer such >>>>>> that >>>>>> Flink job stops when this message is observed. IMO, this seems like a >>>>>> reasonable approach to support the above use-case. >>>>>> >>>>>> One possible approach to keep supporting this use-case in Flink 1.15 >>>>>> is to allow user to signal the "end of stream" by calling >>>>>> Collector::close(...) in >>>>>> KafkaRecordDeserializationSchema::deserialize(..). >>>>>> >>>>>> What do you think? >>>>>> >>>>>> >>>>>> >>>>>> On Fri, Dec 17, 2021 at 3:46 PM Arvid Heise <ar...@apache.org> wrote: >>>>>> >>>>>>> Wouldn't it be better to ask the Iceberg maintainers to support >>>>>>> dynamic schema change? >>>>>>> >>>>>>> On Fri, Dec 17, 2021 at 3:03 AM Dong Lin <lindon...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi Ayush, >>>>>>>> >>>>>>>> Your use-case should be supported. Sorry, we don't have a good way >>>>>>>> to support this in Flink 1.14. >>>>>>>> >>>>>>>> I am going to propose a FLIP to fix it in Flink 1.15. >>>>>>>> >>>>>>>> Thanks, >>>>>>>> Dong >>>>>>>> >>>>>>>> >>>>>>>> On Thu, Dec 9, 2021 at 7:11 PM Ayush Chauhan < >>>>>>>> ayush.chau...@zomato.com> wrote: >>>>>>>> >>>>>>>>> My usecase is that as soon as the avro message version is changed, >>>>>>>>> I want to reload the job graph so that I can update the downstream >>>>>>>>> iceberg >>>>>>>>> table. >>>>>>>>> >>>>>>>>> Iceberg FlinkSink take table schema during the job start and >>>>>>>>> cannot be updated during runtime. So, I want to trigger graceful >>>>>>>>> shutdown >>>>>>>>> and restart the job. >>>>>>>>> >>>>>>>>> Can I reload the job graph to achieve that? >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> On Wed, Dec 8, 2021 at 8:11 PM Arvid Heise <ar...@apache.org> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Hi Ayush, >>>>>>>>>> >>>>>>>>>> DeserializationSchema.isEndOfStream was only ever supported by >>>>>>>>>> Kafka. For new Kafka source, the recommended way is to use the >>>>>>>>>> bounded mode >>>>>>>>>> like this >>>>>>>>>> >>>>>>>>>> KafkaSource<PartitionAndValue> source = >>>>>>>>>> KafkaSource.<PartitionAndValue>builder() >>>>>>>>>> ... >>>>>>>>>> .setStartingOffsets(OffsetsInitializer.earliest()) >>>>>>>>>> .setBounded(OffsetsInitializer.latest()) >>>>>>>>>> .build(); >>>>>>>>>> >>>>>>>>>> You can implement your own OffsetsInitializer or use a provided >>>>>>>>>> one. >>>>>>>>>> >>>>>>>>>> On Wed, Dec 8, 2021 at 9:19 AM Hang Ruan <ruanhang1...@gmail.com> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> There is no way to end the kafka stream from the deserializer. >>>>>>>>>>> >>>>>>>>>>> When would you want to end the stream? Could you explain why you >>>>>>>>>>> need to end the kafka stream without using the offset? >>>>>>>>>>> >>>>>>>>>>> Ayush Chauhan <ayush.chau...@zomato.com> 于2021年12月8日周三 15:29写道: >>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> https://github.com/apache/flink/blob/release-1.13.1/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaRecordDeserializationSchema.java#L69 >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On Wed, Dec 8, 2021 at 12:40 PM Robert Metzger < >>>>>>>>>>>> metrob...@gmail.com> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Hi Ayush, >>>>>>>>>>>>> >>>>>>>>>>>>> I couldn't find the documentation you've mentioned. Can you >>>>>>>>>>>>> send me a link to it? >>>>>>>>>>>>> >>>>>>>>>>>>> On Tue, Dec 7, 2021 at 10:40 AM Ayush Chauhan < >>>>>>>>>>>>> ayush.chau...@zomato.com> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>> >>>>>>>>>>>>>> Can you please let me know the alternatives >>>>>>>>>>>>>> of isEndOfStream() as now according to docs this method will no >>>>>>>>>>>>>> longer be >>>>>>>>>>>>>> used to determine the end of the stream. >>>>>>>>>>>>>> >>>>>>>>>>>>>> -- >>>>>>>>>>>>>> Ayush Chauhan >>>>>>>>>>>>>> Data Platform >>>>>>>>>>>>>> [image: mobile-icon] +91 9990747111 >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> This email is intended only for the person or the entity to >>>>>>>>>>>>>> whom it is addressed. If you are not the intended recipient, >>>>>>>>>>>>>> please delete >>>>>>>>>>>>>> this email and contact the sender. >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> -- >>>>>>>>>>>> Ayush Chauhan >>>>>>>>>>>> Data Platform >>>>>>>>>>>> [image: mobile-icon] +91 9990747111 >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> This email is intended only for the person or the entity to >>>>>>>>>>>> whom it is addressed. If you are not the intended recipient, >>>>>>>>>>>> please delete >>>>>>>>>>>> this email and contact the sender. >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>> >>>>>>>>> -- >>>>>>>>> Ayush Chauhan >>>>>>>>> Data Platform >>>>>>>>> [image: mobile-icon] +91 9990747111 >>>>>>>>> >>>>>>>>> >>>>>>>>> This email is intended only for the person or the entity to whom >>>>>>>>> it is addressed. If you are not the intended recipient, please delete >>>>>>>>> this >>>>>>>>> email and contact the sender. >>>>>>>>> >>>>>>>>