Hi, I’m investigating a slowness of our beam pipelines, and as part of that I tried to compare a very simple beam pipeline with an equivalent flink-native pipeline. Both pipelines should read strings from one kafka topic and write them to another topic. I’m using beam 2.38.0 and flink 1.13. I tried running each pipeline separately, on a single task manager with a single slot and parallelism 1. What I saw is that Flink native runs 5 times faster than beam (150,000 strings per second in flink comparing to 30,000 in beam).
I’ll be happy if you can help me figure out why there is such a difference. Maybe the pipelines are not really equivalent, or the beam configuration is wrong? Flink native pipeline: public void process() throws Exception { StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); Properties properties = new Properties(); properties.setProperty("bootstrap.servers", kafkaAddress); properties.setProperty("group.id", KAFKA_GROUP_ID); FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(INPUT_TOPIC, new SimpleStringSchema(), properties); consumer.setStartFromEarliest(); consumer.assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps()); FlinkKafkaProducer<String> producer = new FlinkKafkaProducer<>(kafkaAddress, OUTPUT_TOPIC, new SimpleStringSchema()); DataStream<String> inputMessagesStream = environment.addSource(consumer); inputMessagesStream.addSink(producer); environment.execute(); } Beam pipeline: public static void main(String[] args) { try { StreamingOptions options = PipelineOptionsFactory.fromArgs(args).as(StreamingOptions.class); options.setStreaming(true); options.setRunner(FlinkRunner.class); Pipeline pipeline = Pipeline.create(options); PTransform<PBegin, PCollection<KV<String, String>>> transform = KafkaIO.<String, String>read() .withBootstrapServers(bootstrapServers) .withTopic(inputTopic) .withKeyDeserializer(StringDeserializer.class) .withValueDeserializer(StringDeserializer.class) .withConsumerConfigUpdates((ImmutableMap.of( "auto.offset.reset", "earliest", "group.id", consumerGroup))) .withoutMetadata(); PCollection<KV<String, String>> input = pipeline.apply("readFromKafka", transform); PCollection<ProducerRecord<String, String>> convertedInput = input.apply("ConvertToStringRecord", ParDo.of(new ConvertToStringRecord(outputTopic) {})) .setCoder(new ProducerRecordCoder<>(StringUtf8Coder.of(), StringUtf8Coder.of())); KafkaIO.WriteRecords<String, String> writeToAvro = KafkaIO.<String, String>writeRecords() .withBootstrapServers(bootstrapServers) .withTopic(outputTopic) .withKeySerializer(StringSerializer.class) .withValueSerializer(StringSerializer.class); convertedInput.apply("writeToKafka", writeToAvro); pipeline.run(); } catch (Exception e) { log.atError().withThrowable(e).log("Exception thrown while running pipeline PipelineStringToString"); } } @Log4j2 @AllArgsConstructor public class ConvertToStringRecord extends DoFn<KV<String, String>, ProducerRecord<String, String>> { private String topic; private static ProducerRecord<String, String> getRecord(KV<String, String> message, String topic) { String string = message.getValue(); ProducerRecord<String, String> pr = new ProducerRecord<>(topic, message.getKey(), string) {}; pr.headers().add("__TypeId__", String.class.getName().getBytes(StandardCharsets.UTF_8)); return pr; } @ProcessElement public void processElement(ProcessContext c) { try { ProducerRecord pr = getRecord(Objects.requireNonNull(c.element()), topic); c.output(pr); } catch (Exception e) { log.atError().withThrowable(e).log("exception thrown while processing string"); } } } Thanks, Ifat