Hi, Thanks for the tip. I enabled the fasterCopy parameter and the performance improved from 30,000 strings per second to 35,000-40,000. There is still a huge difference compared to native flink (150,000 strings per second), which I don’t understand. I opened a bug for that: https://issues.apache.org/jira/browse/BEAM-14438
Thanks, Ifat From: Talat Uyarer <tuya...@paloaltonetworks.com> Reply-To: "user@beam.apache.org" <user@beam.apache.org> Date: Wednesday, 11 May 2022 at 3:04 To: "user@beam.apache.org" <user@beam.apache.org> Subject: Re: Beam slowness compared to flink-native HI Ifat, Did you enable fasterCopy parameter ? Please look at this issue: https://issues.apache.org/jira/browse/BEAM-11146 Thanks On Mon, May 2, 2022 at 12:57 AM Afek, Ifat (Nokia - IL/Kfar Sava) <ifat.a...@nokia.com<mailto:ifat.a...@nokia.com>> wrote: Hi, I’m investigating a slowness of our beam pipelines, and as part of that I tried to compare a very simple beam pipeline with an equivalent flink-native pipeline. Both pipelines should read strings from one kafka topic and write them to another topic. I’m using beam 2.38.0 and flink 1.13. I tried running each pipeline separately, on a single task manager with a single slot and parallelism 1. What I saw is that Flink native runs 5 times faster than beam (150,000 strings per second in flink comparing to 30,000 in beam). I’ll be happy if you can help me figure out why there is such a difference. Maybe the pipelines are not really equivalent, or the beam configuration is wrong? Flink native pipeline: public void process() throws Exception { StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); Properties properties = new Properties(); properties.setProperty("bootstrap.servers", kafkaAddress); properties.setProperty("group.id<http://group.id>", KAFKA_GROUP_ID); FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(INPUT_TOPIC, new SimpleStringSchema(), properties); consumer.setStartFromEarliest(); consumer.assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps()); FlinkKafkaProducer<String> producer = new FlinkKafkaProducer<>(kafkaAddress, OUTPUT_TOPIC, new SimpleStringSchema()); DataStream<String> inputMessagesStream = environment.addSource(consumer); inputMessagesStream.addSink(producer); environment.execute(); } Beam pipeline: public static void main(String[] args) { try { StreamingOptions options = PipelineOptionsFactory.fromArgs(args).as(StreamingOptions.class); options.setStreaming(true); options.setRunner(FlinkRunner.class); Pipeline pipeline = Pipeline.create(options); PTransform<PBegin, PCollection<KV<String, String>>> transform = KafkaIO.<String, String>read() .withBootstrapServers(bootstrapServers) .withTopic(inputTopic) .withKeyDeserializer(StringDeserializer.class) .withValueDeserializer(StringDeserializer.class) .withConsumerConfigUpdates((ImmutableMap.of( "auto.offset.reset", "earliest", "group.id<http://group.id>", consumerGroup))) .withoutMetadata(); PCollection<KV<String, String>> input = pipeline.apply("readFromKafka", transform); PCollection<ProducerRecord<String, String>> convertedInput = input.apply("ConvertToStringRecord", ParDo.of(new ConvertToStringRecord(outputTopic) {})) .setCoder(new ProducerRecordCoder<>(StringUtf8Coder.of(), StringUtf8Coder.of())); KafkaIO.WriteRecords<String, String> writeToAvro = KafkaIO.<String, String>writeRecords() .withBootstrapServers(bootstrapServers) .withTopic(outputTopic) .withKeySerializer(StringSerializer.class) .withValueSerializer(StringSerializer.class); convertedInput.apply("writeToKafka", writeToAvro); pipeline.run(); } catch (Exception e) { log.atError().withThrowable(e).log("Exception thrown while running pipeline PipelineStringToString"); } } @Log4j2 @AllArgsConstructor public class ConvertToStringRecord extends DoFn<KV<String, String>, ProducerRecord<String, String>> { private String topic; private static ProducerRecord<String, String> getRecord(KV<String, String> message, String topic) { String string = message.getValue(); ProducerRecord<String, String> pr = new ProducerRecord<>(topic, message.getKey(), string) {}; pr.headers().add("__TypeId__", String.class.getName().getBytes(StandardCharsets.UTF_8)); return pr; } @ProcessElement public void processElement(ProcessContext c) { try { ProducerRecord pr = getRecord(Objects.requireNonNull(c.element()), topic); c.output(pr); } catch (Exception e) { log.atError().withThrowable(e).log("exception thrown while processing string"); } } } Thanks, Ifat