Hi Ifat,
can you try adding 'use_deprecated_read' experiment to the
PipelineOptions? IIRC the default expansion for KafkaIO uses splittable
DoFn now, which could be the cause for the performance difference you
see. You can add the option on command line using
"--experiments=use_deprecated_read".
Jan
On 5/11/22 16:20, Afek, Ifat (Nokia - IL/Kfar Sava) wrote:
Hi,
Thanks for the tip. I enabled the fasterCopy parameter and the
performance improved from 30,000 strings per second to 35,000-40,000.
There is still a huge difference compared to native flink (150,000
strings per second), which I don’t understand.
I opened a bug for that: https://issues.apache.org/jira/browse/BEAM-14438
Thanks,
Ifat
*From: *Talat Uyarer <tuya...@paloaltonetworks.com>
*Reply-To: *"user@beam.apache.org" <user@beam.apache.org>
*Date: *Wednesday, 11 May 2022 at 3:04
*To: *"user@beam.apache.org" <user@beam.apache.org>
*Subject: *Re: Beam slowness compared to flink-native
HI Ifat,
Did you enable fasterCopy parameter ?
Please look at this issue:
https://issues.apache.org/jira/browse/BEAM-11146
Thanks
On Mon, May 2, 2022 at 12:57 AM Afek, Ifat (Nokia - IL/Kfar Sava)
<ifat.a...@nokia.com> wrote:
Hi,
I’m investigating a slowness of our beam pipelines, and as part of
that I tried to compare a very simple beam pipeline with an
equivalent flink-native pipeline. Both pipelines should read
strings from one kafka topic and write them to another topic. I’m
using beam 2.38.0 and flink 1.13.
I tried running each pipeline separately, on a single task manager
with a single slot and parallelism 1. What I saw is that Flink
native runs 5 times faster than beam (150,000 strings per second
in flink comparing to 30,000 in beam).
I’ll be happy if you can help me figure out why there is such a
difference. Maybe the pipelines are not really equivalent, or the
beam configuration is wrong?
Flink native pipeline:
public void process() throws Exception {
StreamExecutionEnvironment environment =
StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", kafkaAddress);
properties.setProperty("group.id <http://group.id>", KAFKA_GROUP_ID);
FlinkKafkaConsumer<String> consumer = new
FlinkKafkaConsumer<>(INPUT_TOPIC, new SimpleStringSchema(),
properties);
consumer.setStartFromEarliest();
consumer.assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps());
FlinkKafkaProducer<String> producer = new
FlinkKafkaProducer<>(kafkaAddress, OUTPUT_TOPIC, new
SimpleStringSchema());
DataStream<String> inputMessagesStream =
environment.addSource(consumer);
inputMessagesStream.addSink(producer);
environment.execute();
}
Beam pipeline:
public static void main(String[] args) {
try {
StreamingOptions options =
PipelineOptionsFactory.fromArgs(args).as(StreamingOptions.class);
options.setStreaming(true);
options.setRunner(FlinkRunner.class);
Pipeline pipeline = Pipeline.create(options);
PTransform<PBegin, PCollection<KV<String, String>>>
transform = KafkaIO.<String, String>read()
.withBootstrapServers(bootstrapServers)
.withTopic(inputTopic)
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
.withConsumerConfigUpdates((ImmutableMap.of(
"auto.offset.reset", "earliest",
"group.id <http://group.id>", consumerGroup)))
.withoutMetadata();
PCollection<KV<String, String>> input =
pipeline.apply("readFromKafka", transform);
PCollection<ProducerRecord<String, String>> convertedInput =
input.apply("ConvertToStringRecord",
ParDo.of(new ConvertToStringRecord(outputTopic) {}))
.setCoder(new ProducerRecordCoder<>(StringUtf8Coder.of(),
StringUtf8Coder.of()));
KafkaIO.WriteRecords<String, String> writeToAvro =
KafkaIO.<String, String>writeRecords()
.withBootstrapServers(bootstrapServers)
.withTopic(outputTopic)
.withKeySerializer(StringSerializer.class)
.withValueSerializer(StringSerializer.class);
convertedInput.apply("writeToKafka", writeToAvro);
pipeline.run();
} catch (Exception e) {
log.atError().withThrowable(e).log("Exception thrown while running
pipeline PipelineStringToString");
}
}
@Log4j2
@AllArgsConstructor
public class ConvertToStringRecord extends DoFn<KV<String,
String>, ProducerRecord<String, String>> {
private String topic;
private static ProducerRecord<String, String> getRecord(KV<String,
String> message, String topic) {
String string = message.getValue();
ProducerRecord<String, String> pr = new ProducerRecord<>(topic,
message.getKey(), string) {};
pr.headers().add("__TypeId__",
String.class.getName().getBytes(StandardCharsets.UTF_8));
return pr;
}
@ProcessElement
public void processElement(ProcessContext c) {
try {
ProducerRecord pr =
getRecord(Objects.requireNonNull(c.element()), topic);
c.output(pr);
} catch (Exception e) {
log.atError().withThrowable(e).log("exception thrown while
processing string");
}
}
}
Thanks,
Ifat