Hi Jan,

I tried it, but didn’t see any significant effect on performance.
Thanks for the suggestion.

Ifat

From: Jan Lukavský <je...@seznam.cz>
Reply-To: "user@beam.apache.org" <user@beam.apache.org>
Date: Thursday, 12 May 2022 at 10:22
To: "user@beam.apache.org" <user@beam.apache.org>
Subject: Re: Beam slowness compared to flink-native


Hi Ifat,

can you try adding 'use_deprecated_read' experiment to the PipelineOptions? 
IIRC the default expansion for KafkaIO uses splittable DoFn now, which could be 
the cause for the performance difference you see.  You can add the option on 
command line using "--experiments=use_deprecated_read".

 Jan
On 5/11/22 16:20, Afek, Ifat (Nokia - IL/Kfar Sava) wrote:
Hi,

Thanks for the tip. I enabled the fasterCopy parameter and the performance 
improved from 30,000 strings per second to 35,000-40,000. There is still a huge 
difference compared to native flink (150,000 strings per second), which I don’t 
understand.
I opened a bug for that: https://issues.apache.org/jira/browse/BEAM-14438

Thanks,
Ifat

From: Talat Uyarer 
<tuya...@paloaltonetworks.com><mailto:tuya...@paloaltonetworks.com>
Reply-To: "user@beam.apache.org"<mailto:user@beam.apache.org> 
<user@beam.apache.org><mailto:user@beam.apache.org>
Date: Wednesday, 11 May 2022 at 3:04
To: "user@beam.apache.org"<mailto:user@beam.apache.org> 
<user@beam.apache.org><mailto:user@beam.apache.org>
Subject: Re: Beam slowness compared to flink-native

HI Ifat,

Did you enable fasterCopy parameter ?

Please look at this issue: https://issues.apache.org/jira/browse/BEAM-11146

Thanks

On Mon, May 2, 2022 at 12:57 AM Afek, Ifat (Nokia - IL/Kfar Sava) 
<ifat.a...@nokia.com<mailto:ifat.a...@nokia.com>> wrote:
Hi,

I’m investigating a slowness of our beam pipelines, and as part of that I tried 
to compare a very simple beam pipeline with an equivalent flink-native 
pipeline. Both pipelines should read strings from one kafka topic and write 
them to another topic. I’m using beam 2.38.0 and flink 1.13.
I tried running each pipeline separately, on a single task manager with a 
single slot and parallelism 1. What I saw is that Flink native runs 5 times 
faster than beam (150,000 strings per second in flink comparing to 30,000 in 
beam).

I’ll be happy if you can help me figure out why there is such a difference. 
Maybe the pipelines are not really equivalent, or the beam configuration is 
wrong?


Flink native pipeline:

    public void process() throws Exception {
        StreamExecutionEnvironment environment = 
StreamExecutionEnvironment.getExecutionEnvironment();

        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", kafkaAddress);
        properties.setProperty("group.id<http://group.id>", KAFKA_GROUP_ID);
        FlinkKafkaConsumer<String> consumer = new 
FlinkKafkaConsumer<>(INPUT_TOPIC, new SimpleStringSchema(), properties);
        consumer.setStartFromEarliest();
        
consumer.assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps());

        FlinkKafkaProducer<String> producer = new 
FlinkKafkaProducer<>(kafkaAddress, OUTPUT_TOPIC, new SimpleStringSchema());

        DataStream<String> inputMessagesStream = 
environment.addSource(consumer);
        inputMessagesStream.addSink(producer);

        environment.execute();
    }


Beam pipeline:

    public static void main(String[] args) {
        try {
            StreamingOptions options = 
PipelineOptionsFactory.fromArgs(args).as(StreamingOptions.class);
            options.setStreaming(true);
            options.setRunner(FlinkRunner.class);
            Pipeline pipeline = Pipeline.create(options);

            PTransform<PBegin, PCollection<KV<String, String>>> transform = 
KafkaIO.<String, String>read()
                    .withBootstrapServers(bootstrapServers)
                    .withTopic(inputTopic)
                    .withKeyDeserializer(StringDeserializer.class)
                    .withValueDeserializer(StringDeserializer.class)
                    .withConsumerConfigUpdates((ImmutableMap.of(
                            "auto.offset.reset", "earliest",
                            "group.id<http://group.id>", consumerGroup)))
                    .withoutMetadata();

            PCollection<KV<String, String>> input = 
pipeline.apply("readFromKafka", transform);

            PCollection<ProducerRecord<String, String>> convertedInput =
                    input.apply("ConvertToStringRecord",
                            ParDo.of(new ConvertToStringRecord(outputTopic) {}))
                            .setCoder(new 
ProducerRecordCoder<>(StringUtf8Coder.of(), StringUtf8Coder.of()));

            KafkaIO.WriteRecords<String, String> writeToAvro = KafkaIO.<String, 
String>writeRecords()
                    .withBootstrapServers(bootstrapServers)
                    .withTopic(outputTopic)
                    .withKeySerializer(StringSerializer.class)
                    .withValueSerializer(StringSerializer.class);

            convertedInput.apply("writeToKafka", writeToAvro);
            pipeline.run();

        } catch (Exception e) {
            log.atError().withThrowable(e).log("Exception thrown while running 
pipeline PipelineStringToString");
        }
    }

@Log4j2
@AllArgsConstructor
public class ConvertToStringRecord extends DoFn<KV<String, String>, 
ProducerRecord<String, String>> {
    private String topic;

    private static ProducerRecord<String, String> getRecord(KV<String, String> 
message, String topic) {
        String string = message.getValue();
        ProducerRecord<String, String> pr = new ProducerRecord<>(topic, 
message.getKey(), string) {};
        pr.headers().add("__TypeId__", 
String.class.getName().getBytes(StandardCharsets.UTF_8));
        return pr;
    }

    @ProcessElement
    public void processElement(ProcessContext c) {
        try {
            ProducerRecord pr = getRecord(Objects.requireNonNull(c.element()), 
topic);
            c.output(pr);
        } catch (Exception e) {
            log.atError().withThrowable(e).log("exception thrown while 
processing string");
        }
    }
}


Thanks,
Ifat

Reply via email to