Currently I've Flink consumer with following properties, Flink consumes record at around 400 messages/sec at start of program but later on as numBuffersOut exceeds 1000000, data rate falls to 200messages/sec. I've set parallelism to only 1, it's Avro based consumer and checkpointing is disabled. Does anyone else facing same issue?
Properties kafkaProps = new Properties(); kafkaProps.setProperty("bootstrap.servers", bootstrapAddress); kafkaProps.setProperty("zookeeper.connect", zookeeperConnect); kafkaProps.setProperty("group.id", groupId); kafkaProps.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); kafkaProps.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "5000"); ConfluentAvroDeserializationSchema deserializer = new ConfluentAvroDeserializationSchema(schemaRegURL); FlinkKafkaConsumer010<GenericRecord> flinkKafkaConsumer = new FlinkKafkaConsumer010<GenericRecord>(topic, deserializer, kafkaProps); flinkKafkaConsumer.setStartFromLatest(); FlinkConnectorRateLimiter rateLimiter = new GuavaFlinkConnectorRateLimiter(); rateLimiter.setRate(12000000L); flinkKafkaConsumer.setRateLimiter(rateLimiter); flinkKafkaConsumer.setCommitOffsetsOnCheckpoints(false); // ConfluentAvroDeserialzier @Override public GenericRecord deserialize(byte[] message) { if (kafkaAvroDecoder == null) { System.out.println("Kafka serizlier"); SchemaRegistryClient schemaRegistry = new CachedSchemaRegistryClient(this.schemaRegistryUrl, this.identityMapCapacity); this.kafkaAvroDecoder = new KafkaAvroDecoder(schemaRegistry); } return (GenericRecord) this.kafkaAvroDecoder.fromBytes(message); }