Re: Alternatives of KafkaDeserializationSchema.isEndOfStream()

2022-01-03 Thread Dong Lin
Hi Arvid, I spent time reading through the existing KafkaSource related code and thinking about the best possible solution in the last few days. Now I no longer think it is a good idea to let user specify this logic in de-serializer and pass this information via the Collector. I also thought more

Re: Alternatives of KafkaDeserializationSchema.isEndOfStream()

2021-12-28 Thread Dong Lin
Hi Arvid, After discussing with Jiangjie offline, I agree using Collector::close() is not appropriate because we in general prefer close() to be called by one entity, in this case the Flink runtime. Having close() called by both user and Flink runtime could be error-prone, even though we can make

Re: Alternatives of KafkaDeserializationSchema.isEndOfStream()

2021-12-28 Thread Dong Lin
Hi Arvid, Thanks a lot for the detailed reply. Just to clarify, I don't plan to ask user to implement KafkaRecordDeserializationSchema#isEndOfStream(T). My current idea is to not add any public API, but expect users to re-use the existing Collector::close() API inside KafkaRecordDeserializationSc

Re: Alternatives of KafkaDeserializationSchema.isEndOfStream()

2021-12-28 Thread Arvid Heise
Hi Dong, Could you help explain why we can not dynamically stop reading from a > source in batch mode? > We can but we cannot easily determine if the source is supposed to run in batch or streaming. A user would need to implement a special KafkaRecordDeserializationSchema and still provide an Offs

Re: Alternatives of KafkaDeserializationSchema.isEndOfStream()

2021-12-27 Thread Dong Lin
Hi Arvid, Thanks for the suggestion! Sorry for the late reply. I just finished investigating the PulsarSource/StopCursor as you suggested. Please see my reply inline. On Sun, Dec 19, 2021 at 6:53 PM Arvid Heise wrote: > Hi Dong, > > I see your point. The main issue with dynamic EOF is that we c

Re: Alternatives of KafkaDeserializationSchema.isEndOfStream()

2021-12-19 Thread Arvid Heise
Hi Dong, I see your point. The main issue with dynamic EOF is that we can't run in batch mode. That may be desired in the case of Ayush but there may be other use cases where it's not. Additionally, it's quite a bit of code if you'd implement a KafkaRecordDeserializationSchema from scratch. There

Re: Alternatives of KafkaDeserializationSchema.isEndOfStream()

2021-12-17 Thread Dong Lin
Yep, dynamic schema change could be a good solution for the particular use-case mentioned by Ayush. On the other hand, I have heard of valid use-cases where we want to stop the job based on a control message. For example, let's say we have a Flink job that keeps processing stock transaction data

Re: Alternatives of KafkaDeserializationSchema.isEndOfStream()

2021-12-16 Thread Arvid Heise
Wouldn't it be better to ask the Iceberg maintainers to support dynamic schema change? On Fri, Dec 17, 2021 at 3:03 AM Dong Lin wrote: > Hi Ayush, > > Your use-case should be supported. Sorry, we don't have a good way to > support this in Flink 1.14. > > I am going to propose a FLIP to fix it i

Re: Alternatives of KafkaDeserializationSchema.isEndOfStream()

2021-12-16 Thread Dong Lin
Hi Ayush, Your use-case should be supported. Sorry, we don't have a good way to support this in Flink 1.14. I am going to propose a FLIP to fix it in Flink 1.15. Thanks, Dong On Thu, Dec 9, 2021 at 7:11 PM Ayush Chauhan wrote: > My usecase is that as soon as the avro message version is chan

Re: Alternatives of KafkaDeserializationSchema.isEndOfStream()

2021-12-09 Thread Ayush Chauhan
My usecase is that as soon as the avro message version is changed, I want to reload the job graph so that I can update the downstream iceberg table. Iceberg FlinkSink take table schema during the job start and cannot be updated during runtime. So, I want to trigger graceful shutdown and restart th

Re: Alternatives of KafkaDeserializationSchema.isEndOfStream()

2021-12-08 Thread Arvid Heise
Hi Ayush, DeserializationSchema.isEndOfStream was only ever supported by Kafka. For new Kafka source, the recommended way is to use the bounded mode like this KafkaSource source = KafkaSource.builder() ... .setStartingOffsets(OffsetsInitializer.earliest())

Re: Alternatives of KafkaDeserializationSchema.isEndOfStream()

2021-12-08 Thread Hang Ruan
There is no way to end the kafka stream from the deserializer. When would you want to end the stream? Could you explain why you need to end the kafka stream without using the offset? Ayush Chauhan 于2021年12月8日周三 15:29写道: > > https://github.com/apache/flink/blob/release-1.13.1/flink-connectors/fl

Re: Alternatives of KafkaDeserializationSchema.isEndOfStream()

2021-12-07 Thread Ayush Chauhan
https://github.com/apache/flink/blob/release-1.13.1/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaRecordDeserializationSchema.java#L69 On Wed, Dec 8, 2021 at 12:40 PM Robert Metzger wrote: > Hi Ayush, > > I couldn't find t

Re: Alternatives of KafkaDeserializationSchema.isEndOfStream()

2021-12-07 Thread Robert Metzger
Hi Ayush, I couldn't find the documentation you've mentioned. Can you send me a link to it? On Tue, Dec 7, 2021 at 10:40 AM Ayush Chauhan wrote: > Hi, > > Can you please let me know the alternatives of isEndOfStream() as now > according to docs this method will no longer be used to determine th

Alternatives of KafkaDeserializationSchema.isEndOfStream()

2021-12-07 Thread Ayush Chauhan
Hi, Can you please let me know the alternatives of isEndOfStream() as now according to docs this method will no longer be used to determine the end of the stream. -- Ayush Chauhan Data Platform [image: mobile-icon] +91 9990747111 -- This email is intended only for the person or