Hi Arvid, Thanks for the suggestion! Sorry for the late reply. I just finished investigating the PulsarSource/StopCursor as you suggested. Please see my reply inline.
On Sun, Dec 19, 2021 at 6:53 PM Arvid Heise <ar...@apache.org> wrote: > Hi Dong, > > I see your point. The main issue with dynamic EOF is that we can't run in > batch mode. That may be desired in the case of Ayush but there may be other > use cases where it's not. > Could you help explain why we can not dynamically stop reading from a source in batch mode? My understanding is that when a message with the particular pattern (specified by the user) is encountered, we can have the source operator emit the high-watermark in such a way as if the particular partition of this source has reached EOF. And this must have worked since users have been using KafkaDeserializationSchema::isEndOfStream with the legacy FlinkKafkaConsumer. Did I miss something here? Additionally, it's quite a bit of code if you'd implement a > KafkaRecordDeserializationSchema from scratch. There is also no obvious way > on how to use it from Table/SQL. > Hmm.. users already need to provide a KafkaRecordDeserializationSchema via KafkaSourceBuilder::setDeserializer(...) today even if we don't support dynamic EOF. Do you mean that if we support dynamic EOF, then it will be harder for user to implement KafkaRecordDeserializationSchema? Regarding "how to use it from Table/SQL", support we allow user to encode this dynamic EOF logic inside KafkaRecordDeserializationSchema (e.g. call Collector::close() if the message content matches a user-specified pattern), then effect of this change is same as if the partition has reached EOF, and Table/SQL can handle this effect as they are doing now without any extra change. Does this make sense? > > I think we should get inspired on how PulsarSource is solving it. They > have an orthogonal interface StopCursor (we could call it StopCondition) > [1]. It has some default values (I wonder if we could implement them as > enums for easier Table integration). > It appears that StopCursor::shouldStop(...) takes a raw Message. While user could implement the dynamic EOF logic in this method, I am worried that this approach would lead to inferior performance due to double message deserialization. The reason is that the user's logic will likely depend on the de-serialized message (as opposed to the raw byte in the org.apache.pulsar.client.api.Message.getData()). In this case, users will need to deserialize the message inside StopCursor::shouldStop(...) first and then the message would be de-serialized again by the PulsarDeserializationSchema, which is specified via the PulsarSourceBuilder::setDeserializationSchema. In comparison, messages can be deserialized only once if we allow users to specify the dynamic EOF logic inside KafkaRecordDeserializationSchema/PulsarDeserializationSchema. > Ideally, this interface would subsume OffsetsInitializer on stopping side. > I think it was not wise to use OffsetsInitializer also for stop offsets as > things like OffsetResetStrategy do not make any sense. > Do you mean that you prefer to replace KafkaSourceBuilder::setBounded(OffsetsInitializer) with something like PulsarSourceBuilder::setBoundedStopCursor(StopCursor)? Without digging into detail whether this replacement is feasible, I agree StopCursor seems to be cleaner than OffsetsInitializer. On the other hand, if we don't plan to put the dynamic EOF logic inside StopCursor (e.g. due to the double serialization issue described above), I guess it is probably simpler to separate this from the discussion of the dynamic EOF? > > Compared to Pulsar, I like the PartitionOffsetsRetriever to avoid having > to hand in the KafkaClient (as we do in Pulsar). > Do you mean that you prefer to remove KafkaClient from PartitionOffsetsRetrieverImpl? I agree it is cleaner to let PartitionOffsetsRetrieverImpl use adminClient only without using KafkaClient. On the other hand, it seems that there is no performance/correctness concern with the existing approach? Is this issue related to the discussion of dynamic EOF? > I hope I gave some pointers. > > [1] > https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/StopCursor.java#L41-L41 > > On Fri, Dec 17, 2021 at 9:00 AM Dong Lin <lindon...@gmail.com> wrote: > >> Yep, dynamic schema change could be a good solution for the particular >> use-case mentioned by Ayush. >> >> On the other hand, I have heard of valid use-cases where we want to stop >> the job based on a control message. For example, let's say we have a Flink >> job that keeps processing stock transaction data fetched from Kafka in real >> time. Suppose the stock market closes at 4pm, we probably want the Flink >> job to stop after it has processed all the transaction data of that day, >> instead of running it for the whole day, in order to save CPU cost. >> >> As of Flink 1.13, users can achieve this goal by sending a special >> message to the Kafka topic, and encode logic in the deserializer such that >> Flink job stops when this message is observed. IMO, this seems like a >> reasonable approach to support the above use-case. >> >> One possible approach to keep supporting this use-case in Flink 1.15 is >> to allow user to signal the "end of stream" by calling >> Collector::close(...) in KafkaRecordDeserializationSchema::deserialize(..). >> >> What do you think? >> >> >> >> On Fri, Dec 17, 2021 at 3:46 PM Arvid Heise <ar...@apache.org> wrote: >> >>> Wouldn't it be better to ask the Iceberg maintainers to support dynamic >>> schema change? >>> >>> On Fri, Dec 17, 2021 at 3:03 AM Dong Lin <lindon...@gmail.com> wrote: >>> >>>> Hi Ayush, >>>> >>>> Your use-case should be supported. Sorry, we don't have a good way to >>>> support this in Flink 1.14. >>>> >>>> I am going to propose a FLIP to fix it in Flink 1.15. >>>> >>>> Thanks, >>>> Dong >>>> >>>> >>>> On Thu, Dec 9, 2021 at 7:11 PM Ayush Chauhan <ayush.chau...@zomato.com> >>>> wrote: >>>> >>>>> My usecase is that as soon as the avro message version is changed, I >>>>> want to reload the job graph so that I can update the downstream iceberg >>>>> table. >>>>> >>>>> Iceberg FlinkSink take table schema during the job start and cannot be >>>>> updated during runtime. So, I want to trigger graceful shutdown and >>>>> restart >>>>> the job. >>>>> >>>>> Can I reload the job graph to achieve that? >>>>> >>>>> >>>>> >>>>> On Wed, Dec 8, 2021 at 8:11 PM Arvid Heise <ar...@apache.org> wrote: >>>>> >>>>>> Hi Ayush, >>>>>> >>>>>> DeserializationSchema.isEndOfStream was only ever supported by Kafka. >>>>>> For new Kafka source, the recommended way is to use the bounded mode like >>>>>> this >>>>>> >>>>>> KafkaSource<PartitionAndValue> source = >>>>>> KafkaSource.<PartitionAndValue>builder() >>>>>> ... >>>>>> .setStartingOffsets(OffsetsInitializer.earliest()) >>>>>> .setBounded(OffsetsInitializer.latest()) >>>>>> .build(); >>>>>> >>>>>> You can implement your own OffsetsInitializer or use a provided one. >>>>>> >>>>>> On Wed, Dec 8, 2021 at 9:19 AM Hang Ruan <ruanhang1...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> There is no way to end the kafka stream from the deserializer. >>>>>>> >>>>>>> When would you want to end the stream? Could you explain why you >>>>>>> need to end the kafka stream without using the offset? >>>>>>> >>>>>>> Ayush Chauhan <ayush.chau...@zomato.com> 于2021年12月8日周三 15:29写道: >>>>>>> >>>>>>>> >>>>>>>> https://github.com/apache/flink/blob/release-1.13.1/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaRecordDeserializationSchema.java#L69 >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Wed, Dec 8, 2021 at 12:40 PM Robert Metzger <metrob...@gmail.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Hi Ayush, >>>>>>>>> >>>>>>>>> I couldn't find the documentation you've mentioned. Can you send >>>>>>>>> me a link to it? >>>>>>>>> >>>>>>>>> On Tue, Dec 7, 2021 at 10:40 AM Ayush Chauhan < >>>>>>>>> ayush.chau...@zomato.com> wrote: >>>>>>>>> >>>>>>>>>> Hi, >>>>>>>>>> >>>>>>>>>> Can you please let me know the alternatives of isEndOfStream() as >>>>>>>>>> now according to docs this method will no longer be used to >>>>>>>>>> determine the >>>>>>>>>> end of the stream. >>>>>>>>>> >>>>>>>>>> -- >>>>>>>>>> Ayush Chauhan >>>>>>>>>> Data Platform >>>>>>>>>> [image: mobile-icon] +91 9990747111 >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> This email is intended only for the person or the entity to whom >>>>>>>>>> it is addressed. If you are not the intended recipient, please >>>>>>>>>> delete this >>>>>>>>>> email and contact the sender. >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>> -- >>>>>>>> Ayush Chauhan >>>>>>>> Data Platform >>>>>>>> [image: mobile-icon] +91 9990747111 >>>>>>>> >>>>>>>> >>>>>>>> This email is intended only for the person or the entity to whom it >>>>>>>> is addressed. If you are not the intended recipient, please delete this >>>>>>>> email and contact the sender. >>>>>>>> >>>>>>> >>>>> >>>>> -- >>>>> Ayush Chauhan >>>>> Data Platform >>>>> [image: mobile-icon] +91 9990747111 >>>>> >>>>> >>>>> This email is intended only for the person or the entity to whom it is >>>>> addressed. If you are not the intended recipient, please delete this email >>>>> and contact the sender. >>>>> >>>>