Hi Sagar, I rechecked and found that the new kafka source is not formally publish yet, and a stable method I think may be try adding the FlinkKafkaConsumer as a BOUNDED source first. Sorry for the inconvient.
Best, Yun ------------------------------------------------------------------ Sender:Yun Gao<yungao...@aliyun.com> Date:2021/01/14 15:26:54 Recipient:Ardhani Narasimha<ardhani.narasi...@razorpay.com>; sagar<sagarban...@gmail.com> Cc:Flink User Mail List<user@flink.apache.org> Theme:Re: Re: Using Kafka as bounded source with DataStream API in batch mode (Flink 1.12) Hi Sagar, I think the problem is that the legacy source implemented by extending SourceFunction are all defined as CONTINOUS_UNBOUNDED when use env.addSource(). Although there is hacky way to add the legacy sources as BOUNDED source [1], I think you may first have a try of new version of KafkaSource [2] ? The new version of KafkaSource is implemented with the new Source API [3], which provides unfied support for the streaming and batch mode. Best, Yun [1] https://github.com/apache/flink/blob/fc00492654a3707dc5ad54fc3dd33453bb568ed1/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/BatchExecutionFileSinkITCase.java#L64 [2] https://github.com/apache/flink/blob/fc00492654a3707dc5ad54fc3dd33453bb568ed1/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceITCase.java#L69 [3] https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface ------------------Original Mail ------------------ Sender:Ardhani Narasimha <ardhani.narasi...@razorpay.com> Send Date:Thu Jan 14 15:11:35 2021 Recipients:sagar <sagarban...@gmail.com> CC:Flink User Mail List <user@flink.apache.org> Subject:Re: Using Kafka as bounded source with DataStream API in batch mode (Flink 1.12) Interesting use case. Can you please elaborate more on this. On what criteria do you want to batch? Time? Count? Or Size? On Thu, 14 Jan 2021 at 12:15 PM, sagar <sagarban...@gmail.com> wrote: Hi Team, I am getting the following error while running DataStream API in with batch mode with kafka source. I am using FlinkKafkaConsumer to consume the data. Caused by: java.lang.IllegalStateException: Detected an UNBOUNDED source with the 'execution.runtime-mode' set to 'BATCH'. This combination is not allowed, please set the 'execution.runtime-mode' to STREAMING or AUTOMATIC at org.apache.flink.util.Preconditions.checkState(Preconditions.java:198) ~[flink-core-1.12.0.jar:1.12.0] In my batch program I wanted to work with four to five different stream in batch mode as data source is bounded I don't find any clear example of how to do it with kafka souce with Flink 1.12 I don't want to use JDBC source as underlying database table may change. please give me some example on how to achieve the above use case. Also for any large bounded source are there any alternatives to achieve this? -- ---Regards--- Sagar Bandal This is confidential mail ,All Rights are Reserved.If you are not intended receipiant please ignore this email. ----------------------------------------------------------------------------------------------------------------------------------------------------------------------- IMPORTANT: The contents of this email and any attachments are confidential and protected by applicable laws. If you have received this email by mistake, please (i) notify the sender immediately; (ii) delete it from your database; and (iii) do not disclose the contents to anyone or make copies thereof. Razorpay accepts no liability caused due to any inadvertent/ unintentional data transmitted through this email. -----------------------------------------------------------------------------------------------------------------------------------------------------------------------