Hi,

I haven’t heard about Flink specific problem like that. Have you checked that 
the records are not changing over time? That they are not for example twice as 
large or twice as heavy to process? Especially that you are using rate limiter 
with 12MB/s. If your records grew to 60KB in size, that would probably explain 
the problem.

If that’s not it, you would have to analyse what is changing in your 
job/cluster. Starting from CPU usage, Memory/GC, Network, IO. You can also 
attach some profiler to help pinpoint what is changing.

Piotrek 

> On 31 Mar 2020, at 14:03, Arpith techy <arpithte...@gmail.com> wrote:
> 
> Currently I've Flink consumer with following properties, Flink consumes 
> record at around 400 messages/sec at start of program but later on as 
> numBuffersOut  exceeds 1000000, data rate falls to 200messages/sec. I've set 
> parallelism to only 1, it's Avro based consumer and checkpointing is 
> disabled. Does anyone else facing same issue?
> 
> Properties kafkaProps = new Properties();
> kafkaProps.setProperty("bootstrap.servers", bootstrapAddress);
> kafkaProps.setProperty("zookeeper.connect", zookeeperConnect);
> kafkaProps.setProperty("group.id <http://group.id/>", groupId);
> kafkaProps.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
> kafkaProps.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "5000");
> ConfluentAvroDeserializationSchema deserializer = new 
> ConfluentAvroDeserializationSchema(schemaRegURL);
> FlinkKafkaConsumer010<GenericRecord> flinkKafkaConsumer = new 
> FlinkKafkaConsumer010<GenericRecord>(topic, deserializer, kafkaProps);
> flinkKafkaConsumer.setStartFromLatest();
> FlinkConnectorRateLimiter rateLimiter = new GuavaFlinkConnectorRateLimiter();
> rateLimiter.setRate(12000000L);
> flinkKafkaConsumer.setRateLimiter(rateLimiter);
> flinkKafkaConsumer.setCommitOffsetsOnCheckpoints(false);
> // ConfluentAvroDeserialzier
> @Override
> public GenericRecord deserialize(byte[] message) {
>     if (kafkaAvroDecoder == null) {
>        System.out.println("Kafka serizlier");
>         SchemaRegistryClient schemaRegistry = new 
> CachedSchemaRegistryClient(this.schemaRegistryUrl, this.identityMapCapacity);
>         this.kafkaAvroDecoder = new KafkaAvroDecoder(schemaRegistry);
>     }
>     return (GenericRecord) this.kafkaAvroDecoder.fromBytes(message);
> }

Reply via email to