Currently I've Flink consumer with following properties, Flink consumes
record at around 400 messages/sec at start of program but later on as
numBuffersOut  exceeds 1000000, data rate falls to 200messages/sec. I've
set parallelism to only 1, it's Avro based consumer and checkpointing is
disabled. Does anyone else facing same issue?

Properties kafkaProps = new Properties();
kafkaProps.setProperty("bootstrap.servers", bootstrapAddress);
kafkaProps.setProperty("zookeeper.connect", zookeeperConnect);
kafkaProps.setProperty("group.id", groupId);
kafkaProps.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
kafkaProps.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "5000");
ConfluentAvroDeserializationSchema deserializer = new
ConfluentAvroDeserializationSchema(schemaRegURL);
FlinkKafkaConsumer010<GenericRecord> flinkKafkaConsumer = new
FlinkKafkaConsumer010<GenericRecord>(topic, deserializer, kafkaProps);
flinkKafkaConsumer.setStartFromLatest();
FlinkConnectorRateLimiter rateLimiter = new GuavaFlinkConnectorRateLimiter();
rateLimiter.setRate(12000000L);
flinkKafkaConsumer.setRateLimiter(rateLimiter);
flinkKafkaConsumer.setCommitOffsetsOnCheckpoints(false);

// ConfluentAvroDeserialzier

@Override
public GenericRecord deserialize(byte[] message) {
    if (kafkaAvroDecoder == null) {
       System.out.println("Kafka serizlier");
        SchemaRegistryClient schemaRegistry = new
CachedSchemaRegistryClient(this.schemaRegistryUrl,
this.identityMapCapacity);
        this.kafkaAvroDecoder = new KafkaAvroDecoder(schemaRegistry);
    }
    return (GenericRecord) this.kafkaAvroDecoder.fromBytes(message);
}

Reply via email to