Hi,

Thanks for the tip. I enabled the fasterCopy parameter and the performance 
improved from 30,000 strings per second to 35,000-40,000. There is still a huge 
difference compared to native flink (150,000 strings per second), which I don’t 
understand.
I opened a bug for that: https://issues.apache.org/jira/browse/BEAM-14438

Thanks,
Ifat

From: Talat Uyarer <tuya...@paloaltonetworks.com>
Reply-To: "user@beam.apache.org" <user@beam.apache.org>
Date: Wednesday, 11 May 2022 at 3:04
To: "user@beam.apache.org" <user@beam.apache.org>
Subject: Re: Beam slowness compared to flink-native

HI Ifat,

Did you enable fasterCopy parameter ?

Please look at this issue: https://issues.apache.org/jira/browse/BEAM-11146

Thanks

On Mon, May 2, 2022 at 12:57 AM Afek, Ifat (Nokia - IL/Kfar Sava) 
<ifat.a...@nokia.com<mailto:ifat.a...@nokia.com>> wrote:
Hi,

I’m investigating a slowness of our beam pipelines, and as part of that I tried 
to compare a very simple beam pipeline with an equivalent flink-native 
pipeline. Both pipelines should read strings from one kafka topic and write 
them to another topic. I’m using beam 2.38.0 and flink 1.13.
I tried running each pipeline separately, on a single task manager with a 
single slot and parallelism 1. What I saw is that Flink native runs 5 times 
faster than beam (150,000 strings per second in flink comparing to 30,000 in 
beam).

I’ll be happy if you can help me figure out why there is such a difference. 
Maybe the pipelines are not really equivalent, or the beam configuration is 
wrong?


Flink native pipeline:

    public void process() throws Exception {
        StreamExecutionEnvironment environment = 
StreamExecutionEnvironment.getExecutionEnvironment();

        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", kafkaAddress);
        properties.setProperty("group.id<http://group.id>", KAFKA_GROUP_ID);
        FlinkKafkaConsumer<String> consumer = new 
FlinkKafkaConsumer<>(INPUT_TOPIC, new SimpleStringSchema(), properties);
        consumer.setStartFromEarliest();
        
consumer.assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps());

        FlinkKafkaProducer<String> producer = new 
FlinkKafkaProducer<>(kafkaAddress, OUTPUT_TOPIC, new SimpleStringSchema());

        DataStream<String> inputMessagesStream = 
environment.addSource(consumer);
        inputMessagesStream.addSink(producer);

        environment.execute();
    }


Beam pipeline:

    public static void main(String[] args) {
        try {
            StreamingOptions options = 
PipelineOptionsFactory.fromArgs(args).as(StreamingOptions.class);
            options.setStreaming(true);
            options.setRunner(FlinkRunner.class);
            Pipeline pipeline = Pipeline.create(options);

            PTransform<PBegin, PCollection<KV<String, String>>> transform = 
KafkaIO.<String, String>read()
                    .withBootstrapServers(bootstrapServers)
                    .withTopic(inputTopic)
                    .withKeyDeserializer(StringDeserializer.class)
                    .withValueDeserializer(StringDeserializer.class)
                    .withConsumerConfigUpdates((ImmutableMap.of(
                            "auto.offset.reset", "earliest",
                            "group.id<http://group.id>", consumerGroup)))
                    .withoutMetadata();

            PCollection<KV<String, String>> input = 
pipeline.apply("readFromKafka", transform);

            PCollection<ProducerRecord<String, String>> convertedInput =
                    input.apply("ConvertToStringRecord",
                            ParDo.of(new ConvertToStringRecord(outputTopic) {}))
                            .setCoder(new 
ProducerRecordCoder<>(StringUtf8Coder.of(), StringUtf8Coder.of()));

            KafkaIO.WriteRecords<String, String> writeToAvro = KafkaIO.<String, 
String>writeRecords()
                    .withBootstrapServers(bootstrapServers)
                    .withTopic(outputTopic)
                    .withKeySerializer(StringSerializer.class)
                    .withValueSerializer(StringSerializer.class);

            convertedInput.apply("writeToKafka", writeToAvro);
            pipeline.run();

        } catch (Exception e) {
            log.atError().withThrowable(e).log("Exception thrown while running 
pipeline PipelineStringToString");
        }
    }

@Log4j2
@AllArgsConstructor
public class ConvertToStringRecord extends DoFn<KV<String, String>, 
ProducerRecord<String, String>> {
    private String topic;

    private static ProducerRecord<String, String> getRecord(KV<String, String> 
message, String topic) {
        String string = message.getValue();
        ProducerRecord<String, String> pr = new ProducerRecord<>(topic, 
message.getKey(), string) {};
        pr.headers().add("__TypeId__", 
String.class.getName().getBytes(StandardCharsets.UTF_8));
        return pr;
    }

    @ProcessElement
    public void processElement(ProcessContext c) {
        try {
            ProducerRecord pr = getRecord(Objects.requireNonNull(c.element()), 
topic);
            c.output(pr);
        } catch (Exception e) {
            log.atError().withThrowable(e).log("exception thrown while 
processing string");
        }
    }
}


Thanks,
Ifat

Reply via email to