HI Jary, The easiest and simple solution is while creating consumer you can pass different config based on your requirements
Example : For creating consumer for topic A you can pass config as max.poll.records: “1" max.poll.interval.ms: "1000” For creating consumer for topic B you can pass config as max.poll.records: “3600" max.poll.interval.ms: "1000” 1. But actually your configuration has a flaw when you are giving setStartFromTimestamp. Which means if topic B is generating 3600 events for every second and you put the setStartFromTimestamp to consume data from last 24 hours . Your second consumer will always be lag of one day.(It will never consume the real time data). Which is not we want in streaming. 2. For flink we don't need to pass these settings (max.poll.records, max.poll.interval.ms). Flink will consume the data realtime by the architecture. If your job is consuming data slowly means(back pressure) you have to increase parallelism. there are several ways to increase parallelism (operator level, job level). I hope, I explained it clearly. please let me know if you need further clarifications. Thanks, Dinesh On Mon, May 25, 2020 at 12:34 PM Jary Zhen <jaryz...@gmail.com> wrote: > Hi, dinesh , thanks for your reply. > > For example, there are two topics, topic A produces 1 record per second > and topic B produces 3600 records per second. If I set kafka consume > config like this: > max.poll.records: “3600" > max.poll.interval.ms: "1000”) , > which means I can get the whole records by every second from these two > topics in real time. > But , if I want to consume the data from last day or earlier days by > using FlinkKafkaConsumer.setStartFromTimestamp(timestamp). I will get 3600 > records within one second from *topic A* which is produce *in an hour* in > production environment, at the same time, I will get 3600 records within > one second from* topic B* which is produce *in an second. *So By using > *EventTime* semanteme , the watermark assigned from topic A > wil aways let > the data from topic B as ‘late data’ in window operator. What I wanted > is that 1 records from A and 3600 records from B by using > FlinkKafkaConsumer.setStartFromTimestamp(timestamp) so that I can > simulate consume data as in real production environment. > > > Best > > > > > > > > On Sat, 23 May 2020 at 23:42, C DINESH <dinesh.kitt...@gmail.com> wrote: > >> Hi Jary, >> >> What you mean by step banlence . Could you please provide a concrete >> example >> >> On Fri, May 22, 2020 at 3:46 PM Jary Zhen <jaryz...@gmail.com> wrote: >> >>> Hello everyone, >>> >>> First,a brief pipeline introduction: >>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) >>> consume multi kafka topic >>> -> union them >>> -> assignTimestampsAndWatermarks >>> -> keyby >>> -> window() and so on … >>> It's a very normal way use flink to process data like this in production >>> environment. >>> But, If I want to test the pipeline above I need to use the api of >>> FlinkKafkaConsumer.setStartFromTimestamp(timestamp) to comsume 'past’ data. >>> So my question is how to control the ’step‘ banlence as one topic >>> produces 3 records per second while another topic produces 30000 per second. >>> >>> I don’t know if I describe clearly . so any suspicion please let me know >>> >>> Tks >>> >>>