I had a similar need recently and ended up using
KafkaDeserializationSchemaWrapper to wrap a given
DeserializationSchema.  The resulting
KafkaDeserializationSchema[Wrapper] can be passed directly to the
`FlinkKafkaConsumer` constructor.

```
class BoundingDeserializationSchema
    extends KafkaDeserializationSchemaWrapper<Row> {
 private static final long serialVersionUID = 1858204203663583785L;
 private long maxRecords_;
 private long numRecords_ = 0;

 public BoundingDeserializationSchema(
     DeserializationSchema<Row> deserializationSchema,
     long maxRecords) {
  super(deserializationSchema);
  maxRecords_ = maxRecords;
 }

 @Override
 public void deserialize(
     ConsumerRecord<byte[], byte[]> message, Collector<Row> out)
     throws Exception {
  super.deserialize(message, out);
  numRecords_++;
 }

 @Override
 public boolean isEndOfStream(Row nextElement) {
  return numRecords_ >= maxRecords_;
 }
}

```

On Thu, Jan 14, 2021 at 6:15 AM sagar <sagarban...@gmail.com> wrote:
>
> Thanks Yun
>
>
>
> On Thu, Jan 14, 2021 at 1:58 PM Yun Gao <yungao...@aliyun.com> wrote:
>>
>> Hi Sagar,
>>
>>   I rechecked and found that the new kafka source is not formally publish 
>> yet, and a stable method I think may be try adding the FlinkKafkaConsumer as 
>> a BOUNDED source first. Sorry for the inconvient.
>>
>> Best,
>>  Yun
>>
>> ------------------------------------------------------------------
>> Sender:Yun Gao<yungao...@aliyun.com>
>> Date:2021/01/14 15:26:54
>> Recipient:Ardhani Narasimha<ardhani.narasi...@razorpay.com>; 
>> sagar<sagarban...@gmail.com>
>> Cc:Flink User Mail List<user@flink.apache.org>
>> Theme:Re: Re: Using Kafka as bounded source with DataStream API in batch 
>> mode (Flink 1.12)
>>
>> Hi Sagar,
>>
>>       I think the problem is that the legacy source implemented by extending 
>> SourceFunction are all defined as CONTINOUS_UNBOUNDED when use 
>> env.addSource(). Although there is hacky way to add the legacy sources as 
>> BOUNDED source [1], I think you may first have a try of new version of 
>> KafkaSource [2] ? The new version of KafkaSource is implemented with the new 
>> Source API [3], which provides unfied support for the streaming and batch 
>> mode.
>>
>> Best,
>>  Yun
>>
>>
>>
>>
>> [1] 
>> https://github.com/apache/flink/blob/fc00492654a3707dc5ad54fc3dd33453bb568ed1/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/BatchExecutionFileSinkITCase.java#L64
>> [2]  
>> https://github.com/apache/flink/blob/fc00492654a3707dc5ad54fc3dd33453bb568ed1/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceITCase.java#L69
>> [3] 
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface
>>
>>
>>
>> ------------------Original Mail ------------------
>> Sender:Ardhani Narasimha <ardhani.narasi...@razorpay.com>
>> Send Date:Thu Jan 14 15:11:35 2021
>> Recipients:sagar <sagarban...@gmail.com>
>> CC:Flink User Mail List <user@flink.apache.org>
>> Subject:Re: Using Kafka as bounded source with DataStream API in batch mode 
>> (Flink 1.12)
>>>
>>> Interesting use case.
>>>
>>> Can you please elaborate more on this.
>>> On what criteria do you want to batch? Time? Count? Or Size?
>>>
>>> On Thu, 14 Jan 2021 at 12:15 PM, sagar <sagarban...@gmail.com> wrote:
>>>>
>>>> Hi Team,
>>>>
>>>> I am getting the following error while running DataStream API in with 
>>>> batch mode with kafka source.
>>>> I am using FlinkKafkaConsumer to consume the data.
>>>>
>>>> Caused by: java.lang.IllegalStateException: Detected an UNBOUNDED source 
>>>> with the 'execution.runtime-mode' set to 'BATCH'. This combination is not 
>>>> allowed, please set the 'execution.runtime-mode' to STREAMING or AUTOMATIC
>>>> at org.apache.flink.util.Preconditions.checkState(Preconditions.java:198) 
>>>> ~[flink-core-1.12.0.jar:1.12.0]
>>>>
>>>> In my batch program I wanted to work with four to five different stream in 
>>>> batch mode as data source is bounded
>>>>
>>>> I don't find any clear example of how to do it with kafka souce with Flink 
>>>> 1.12
>>>>
>>>> I don't want to use JDBC source as underlying database table may change. 
>>>> please give me some example on how to achieve the above use case.
>>>>
>>>> Also for any large bounded source are there any alternatives to achieve 
>>>> this?
>>>>
>>>>
>>>>
>>>> --
>>>> ---Regards---
>>>>
>>>>   Sagar Bandal
>>>>
>>>> This is confidential mail ,All Rights are Reserved.If you are not intended 
>>>> receipiant please ignore this email.
>>>
>>>
>>> -----------------------------------------------------------------------------------------------------------------------------------------------------------------------
>>> IMPORTANT: The contents of this email and any attachments are confidential 
>>> and protected by applicable laws. If you have received this email by 
>>> mistake, please (i) notify the sender immediately; (ii) delete it from your 
>>> database; and (iii) do not disclose the contents to anyone or make copies 
>>> thereof. Razorpay accepts no liability caused due to any inadvertent/ 
>>> unintentional data transmitted through this email.
>>> -----------------------------------------------------------------------------------------------------------------------------------------------------------------------
>>
>>
>
>
> --
> ---Regards---
>
>   Sagar Bandal
>
> This is confidential mail ,All Rights are Reserved.If you are not intended 
> receipiant please ignore this email.

Reply via email to