Hi Ifat,

can you try adding 'use_deprecated_read' experiment to the PipelineOptions? IIRC the default expansion for KafkaIO uses splittable DoFn now, which could be the cause for the performance difference you see.  You can add the option on command line using "--experiments=use_deprecated_read".

 Jan

On 5/11/22 16:20, Afek, Ifat (Nokia - IL/Kfar Sava) wrote:

Hi,

Thanks for the tip. I enabled the fasterCopy parameter and the performance improved from 30,000 strings per second to 35,000-40,000. There is still a huge difference compared to native flink (150,000 strings per second), which I don’t understand.

I opened a bug for that: https://issues.apache.org/jira/browse/BEAM-14438

Thanks,

Ifat

*From: *Talat Uyarer <tuya...@paloaltonetworks.com>
*Reply-To: *"user@beam.apache.org" <user@beam.apache.org>
*Date: *Wednesday, 11 May 2022 at 3:04
*To: *"user@beam.apache.org" <user@beam.apache.org>
*Subject: *Re: Beam slowness compared to flink-native

HI Ifat,

Did you enable fasterCopy parameter ?

Please look at this issue: https://issues.apache.org/jira/browse/BEAM-11146

Thanks

On Mon, May 2, 2022 at 12:57 AM Afek, Ifat (Nokia - IL/Kfar Sava) <ifat.a...@nokia.com> wrote:

    Hi,

    I’m investigating a slowness of our beam pipelines, and as part of
    that I tried to compare a very simple beam pipeline with an
    equivalent flink-native pipeline. Both pipelines should read
    strings from one kafka topic and write them to another topic. I’m
    using beam 2.38.0 and flink 1.13.

    I tried running each pipeline separately, on a single task manager
    with a single slot and parallelism 1. What I saw is that Flink
    native runs 5 times faster than beam (150,000 strings per second
    in flink comparing to 30,000 in beam).

    I’ll be happy if you can help me figure out why there is such a
    difference. Maybe the pipelines are not really equivalent, or the
    beam configuration is wrong?

    Flink native pipeline:

    public void process() throws Exception {

    StreamExecutionEnvironment environment =
    StreamExecutionEnvironment.getExecutionEnvironment();

    Properties properties = new Properties();

    properties.setProperty("bootstrap.servers", kafkaAddress);

    properties.setProperty("group.id <http://group.id>", KAFKA_GROUP_ID);

    FlinkKafkaConsumer<String> consumer = new
    FlinkKafkaConsumer<>(INPUT_TOPIC, new SimpleStringSchema(),
    properties);

      consumer.setStartFromEarliest();

    
consumer.assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps());

    FlinkKafkaProducer<String> producer = new
    FlinkKafkaProducer<>(kafkaAddress, OUTPUT_TOPIC, new
    SimpleStringSchema());

    DataStream<String> inputMessagesStream =
    environment.addSource(consumer);

    inputMessagesStream.addSink(producer);

    environment.execute();

    }

    Beam pipeline:

    public static void main(String[] args) {

    try {

    StreamingOptions options =
    PipelineOptionsFactory.fromArgs(args).as(StreamingOptions.class);

    options.setStreaming(true);

    options.setRunner(FlinkRunner.class);

    Pipeline pipeline = Pipeline.create(options);

              PTransform<PBegin, PCollection<KV<String, String>>>
    transform = KafkaIO.<String, String>read()

    .withBootstrapServers(bootstrapServers)

    .withTopic(inputTopic)

    .withKeyDeserializer(StringDeserializer.class)

    .withValueDeserializer(StringDeserializer.class)

    .withConsumerConfigUpdates((ImmutableMap.of(

    "auto.offset.reset", "earliest",

    "group.id <http://group.id>", consumerGroup)))

    .withoutMetadata();

    PCollection<KV<String, String>> input =
    pipeline.apply("readFromKafka", transform);

    PCollection<ProducerRecord<String, String>> convertedInput =

    input.apply("ConvertToStringRecord",

    ParDo.of(new ConvertToStringRecord(outputTopic) {}))

    .setCoder(new ProducerRecordCoder<>(StringUtf8Coder.of(),
    StringUtf8Coder.of()));

    KafkaIO.WriteRecords<String, String> writeToAvro =
    KafkaIO.<String, String>writeRecords()

    .withBootstrapServers(bootstrapServers)

    .withTopic(outputTopic)

    .withKeySerializer(StringSerializer.class)

    .withValueSerializer(StringSerializer.class);

    convertedInput.apply("writeToKafka", writeToAvro);

    pipeline.run();

    } catch (Exception e) {

    log.atError().withThrowable(e).log("Exception thrown while running
    pipeline PipelineStringToString");

    }

    }

    @Log4j2

    @AllArgsConstructor

    public class ConvertToStringRecord extends DoFn<KV<String,
    String>, ProducerRecord<String, String>> {

    private String topic;

    private static ProducerRecord<String, String> getRecord(KV<String,
    String> message, String topic) {

    String string = message.getValue();

    ProducerRecord<String, String> pr = new ProducerRecord<>(topic,
    message.getKey(), string) {};

      pr.headers().add("__TypeId__",
    String.class.getName().getBytes(StandardCharsets.UTF_8));

    return pr;

    }

    @ProcessElement

    public void processElement(ProcessContext c) {

    try {

           ProducerRecord pr =
    getRecord(Objects.requireNonNull(c.element()), topic);

    c.output(pr);

    } catch (Exception e) {

    log.atError().withThrowable(e).log("exception thrown while
    processing string");

    }

    }

    }

    Thanks,

    Ifat

Reply via email to