I had a similar need recently and ended up using KafkaDeserializationSchemaWrapper to wrap a given DeserializationSchema. The resulting KafkaDeserializationSchema[Wrapper] can be passed directly to the `FlinkKafkaConsumer` constructor.
``` class BoundingDeserializationSchema extends KafkaDeserializationSchemaWrapper<Row> { private static final long serialVersionUID = 1858204203663583785L; private long maxRecords_; private long numRecords_ = 0; public BoundingDeserializationSchema( DeserializationSchema<Row> deserializationSchema, long maxRecords) { super(deserializationSchema); maxRecords_ = maxRecords; } @Override public void deserialize( ConsumerRecord<byte[], byte[]> message, Collector<Row> out) throws Exception { super.deserialize(message, out); numRecords_++; } @Override public boolean isEndOfStream(Row nextElement) { return numRecords_ >= maxRecords_; } } ``` On Thu, Jan 14, 2021 at 6:15 AM sagar <sagarban...@gmail.com> wrote: > > Thanks Yun > > > > On Thu, Jan 14, 2021 at 1:58 PM Yun Gao <yungao...@aliyun.com> wrote: >> >> Hi Sagar, >> >> I rechecked and found that the new kafka source is not formally publish >> yet, and a stable method I think may be try adding the FlinkKafkaConsumer as >> a BOUNDED source first. Sorry for the inconvient. >> >> Best, >> Yun >> >> ------------------------------------------------------------------ >> Sender:Yun Gao<yungao...@aliyun.com> >> Date:2021/01/14 15:26:54 >> Recipient:Ardhani Narasimha<ardhani.narasi...@razorpay.com>; >> sagar<sagarban...@gmail.com> >> Cc:Flink User Mail List<user@flink.apache.org> >> Theme:Re: Re: Using Kafka as bounded source with DataStream API in batch >> mode (Flink 1.12) >> >> Hi Sagar, >> >> I think the problem is that the legacy source implemented by extending >> SourceFunction are all defined as CONTINOUS_UNBOUNDED when use >> env.addSource(). Although there is hacky way to add the legacy sources as >> BOUNDED source [1], I think you may first have a try of new version of >> KafkaSource [2] ? The new version of KafkaSource is implemented with the new >> Source API [3], which provides unfied support for the streaming and batch >> mode. >> >> Best, >> Yun >> >> >> >> >> [1] >> https://github.com/apache/flink/blob/fc00492654a3707dc5ad54fc3dd33453bb568ed1/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/BatchExecutionFileSinkITCase.java#L64 >> [2] >> https://github.com/apache/flink/blob/fc00492654a3707dc5ad54fc3dd33453bb568ed1/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceITCase.java#L69 >> [3] >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface >> >> >> >> ------------------Original Mail ------------------ >> Sender:Ardhani Narasimha <ardhani.narasi...@razorpay.com> >> Send Date:Thu Jan 14 15:11:35 2021 >> Recipients:sagar <sagarban...@gmail.com> >> CC:Flink User Mail List <user@flink.apache.org> >> Subject:Re: Using Kafka as bounded source with DataStream API in batch mode >> (Flink 1.12) >>> >>> Interesting use case. >>> >>> Can you please elaborate more on this. >>> On what criteria do you want to batch? Time? Count? Or Size? >>> >>> On Thu, 14 Jan 2021 at 12:15 PM, sagar <sagarban...@gmail.com> wrote: >>>> >>>> Hi Team, >>>> >>>> I am getting the following error while running DataStream API in with >>>> batch mode with kafka source. >>>> I am using FlinkKafkaConsumer to consume the data. >>>> >>>> Caused by: java.lang.IllegalStateException: Detected an UNBOUNDED source >>>> with the 'execution.runtime-mode' set to 'BATCH'. This combination is not >>>> allowed, please set the 'execution.runtime-mode' to STREAMING or AUTOMATIC >>>> at org.apache.flink.util.Preconditions.checkState(Preconditions.java:198) >>>> ~[flink-core-1.12.0.jar:1.12.0] >>>> >>>> In my batch program I wanted to work with four to five different stream in >>>> batch mode as data source is bounded >>>> >>>> I don't find any clear example of how to do it with kafka souce with Flink >>>> 1.12 >>>> >>>> I don't want to use JDBC source as underlying database table may change. >>>> please give me some example on how to achieve the above use case. >>>> >>>> Also for any large bounded source are there any alternatives to achieve >>>> this? >>>> >>>> >>>> >>>> -- >>>> ---Regards--- >>>> >>>> Sagar Bandal >>>> >>>> This is confidential mail ,All Rights are Reserved.If you are not intended >>>> receipiant please ignore this email. >>> >>> >>> ----------------------------------------------------------------------------------------------------------------------------------------------------------------------- >>> IMPORTANT: The contents of this email and any attachments are confidential >>> and protected by applicable laws. If you have received this email by >>> mistake, please (i) notify the sender immediately; (ii) delete it from your >>> database; and (iii) do not disclose the contents to anyone or make copies >>> thereof. Razorpay accepts no liability caused due to any inadvertent/ >>> unintentional data transmitted through this email. >>> ----------------------------------------------------------------------------------------------------------------------------------------------------------------------- >> >> > > > -- > ---Regards--- > > Sagar Bandal > > This is confidential mail ,All Rights are Reserved.If you are not intended > receipiant please ignore this email.