Echo Lee created FLINK-28576: -------------------------------- Summary: Kafka's two source api performance differences Key: FLINK-28576 URL: https://issues.apache.org/jira/browse/FLINK-28576 Project: Flink Issue Type: Bug Components: Connectors / Kafka Reporter: Echo Lee
I recently found out that the new kafka source api is 10 times more performant than the old one, but don't know what's causing it. {code:java} // new source api KafkaSource<String> source = KafkaSource.<String>builder() .setBootstrapServers(brokers) .setTopics("benchmark") .setGroupId("my-group") .setStartingOffsets(OffsetsInitializer.earliest()) .setBounded(OffsetsInitializer.latest()) .setValueOnlyDeserializer(new SimpleStringSchema()) .build(); env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source") .map(new MapFunction<String, Object>() { private int count = 0; private long lastTime = System.currentTimeMillis(); @Override public Object map(String value) throws Exception { count++; if (count % 100000 == 0) { long currentTime = System.currentTimeMillis(); System.out.println(currentTime - lastTime); lastTime = currentTime; } return null; } });{code} {code:java} // old source api FlinkKafkaConsumer<String> flinkKafkaConsumer = new FlinkKafkaConsumer<String>("benchmark", new SimpleStringSchema(), properties); flinkKafkaConsumer.setStartFromEarliest(); env.addSource(flinkKafkaConsumer) .map(new MapFunction<String, Object>() { private int count = 0; private long lastTime = System.currentTimeMillis(); @Override public Object map(String value) throws Exception { count++; if (count % 100000 == 0) { long currentTime = System.currentTimeMillis(); System.out.println(currentTime - lastTime); lastTime = currentTime; } return null; } });{code} Two ways to use the same data of the same topic. Flink version: 1.14.x Single data size: 1k -- This message was sent by Atlassian Jira (v8.20.10#820010)