Hi Jan, I tried it, but didn’t see any significant effect on performance. Thanks for the suggestion.
Ifat From: Jan Lukavský <je...@seznam.cz> Reply-To: "user@beam.apache.org" <user@beam.apache.org> Date: Thursday, 12 May 2022 at 10:22 To: "user@beam.apache.org" <user@beam.apache.org> Subject: Re: Beam slowness compared to flink-native Hi Ifat, can you try adding 'use_deprecated_read' experiment to the PipelineOptions? IIRC the default expansion for KafkaIO uses splittable DoFn now, which could be the cause for the performance difference you see. You can add the option on command line using "--experiments=use_deprecated_read". Jan On 5/11/22 16:20, Afek, Ifat (Nokia - IL/Kfar Sava) wrote: Hi, Thanks for the tip. I enabled the fasterCopy parameter and the performance improved from 30,000 strings per second to 35,000-40,000. There is still a huge difference compared to native flink (150,000 strings per second), which I don’t understand. I opened a bug for that: https://issues.apache.org/jira/browse/BEAM-14438 Thanks, Ifat From: Talat Uyarer <tuya...@paloaltonetworks.com><mailto:tuya...@paloaltonetworks.com> Reply-To: "user@beam.apache.org"<mailto:user@beam.apache.org> <user@beam.apache.org><mailto:user@beam.apache.org> Date: Wednesday, 11 May 2022 at 3:04 To: "user@beam.apache.org"<mailto:user@beam.apache.org> <user@beam.apache.org><mailto:user@beam.apache.org> Subject: Re: Beam slowness compared to flink-native HI Ifat, Did you enable fasterCopy parameter ? Please look at this issue: https://issues.apache.org/jira/browse/BEAM-11146 Thanks On Mon, May 2, 2022 at 12:57 AM Afek, Ifat (Nokia - IL/Kfar Sava) <ifat.a...@nokia.com<mailto:ifat.a...@nokia.com>> wrote: Hi, I’m investigating a slowness of our beam pipelines, and as part of that I tried to compare a very simple beam pipeline with an equivalent flink-native pipeline. Both pipelines should read strings from one kafka topic and write them to another topic. I’m using beam 2.38.0 and flink 1.13. I tried running each pipeline separately, on a single task manager with a single slot and parallelism 1. What I saw is that Flink native runs 5 times faster than beam (150,000 strings per second in flink comparing to 30,000 in beam). I’ll be happy if you can help me figure out why there is such a difference. Maybe the pipelines are not really equivalent, or the beam configuration is wrong? Flink native pipeline: public void process() throws Exception { StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); Properties properties = new Properties(); properties.setProperty("bootstrap.servers", kafkaAddress); properties.setProperty("group.id<http://group.id>", KAFKA_GROUP_ID); FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(INPUT_TOPIC, new SimpleStringSchema(), properties); consumer.setStartFromEarliest(); consumer.assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps()); FlinkKafkaProducer<String> producer = new FlinkKafkaProducer<>(kafkaAddress, OUTPUT_TOPIC, new SimpleStringSchema()); DataStream<String> inputMessagesStream = environment.addSource(consumer); inputMessagesStream.addSink(producer); environment.execute(); } Beam pipeline: public static void main(String[] args) { try { StreamingOptions options = PipelineOptionsFactory.fromArgs(args).as(StreamingOptions.class); options.setStreaming(true); options.setRunner(FlinkRunner.class); Pipeline pipeline = Pipeline.create(options); PTransform<PBegin, PCollection<KV<String, String>>> transform = KafkaIO.<String, String>read() .withBootstrapServers(bootstrapServers) .withTopic(inputTopic) .withKeyDeserializer(StringDeserializer.class) .withValueDeserializer(StringDeserializer.class) .withConsumerConfigUpdates((ImmutableMap.of( "auto.offset.reset", "earliest", "group.id<http://group.id>", consumerGroup))) .withoutMetadata(); PCollection<KV<String, String>> input = pipeline.apply("readFromKafka", transform); PCollection<ProducerRecord<String, String>> convertedInput = input.apply("ConvertToStringRecord", ParDo.of(new ConvertToStringRecord(outputTopic) {})) .setCoder(new ProducerRecordCoder<>(StringUtf8Coder.of(), StringUtf8Coder.of())); KafkaIO.WriteRecords<String, String> writeToAvro = KafkaIO.<String, String>writeRecords() .withBootstrapServers(bootstrapServers) .withTopic(outputTopic) .withKeySerializer(StringSerializer.class) .withValueSerializer(StringSerializer.class); convertedInput.apply("writeToKafka", writeToAvro); pipeline.run(); } catch (Exception e) { log.atError().withThrowable(e).log("Exception thrown while running pipeline PipelineStringToString"); } } @Log4j2 @AllArgsConstructor public class ConvertToStringRecord extends DoFn<KV<String, String>, ProducerRecord<String, String>> { private String topic; private static ProducerRecord<String, String> getRecord(KV<String, String> message, String topic) { String string = message.getValue(); ProducerRecord<String, String> pr = new ProducerRecord<>(topic, message.getKey(), string) {}; pr.headers().add("__TypeId__", String.class.getName().getBytes(StandardCharsets.UTF_8)); return pr; } @ProcessElement public void processElement(ProcessContext c) { try { ProducerRecord pr = getRecord(Objects.requireNonNull(c.element()), topic); c.output(pr); } catch (Exception e) { log.atError().withThrowable(e).log("exception thrown while processing string"); } } } Thanks, Ifat