HI Ifat,

Did you enable fasterCopy parameter ?

Please look at this issue: https://issues.apache.org/jira/browse/BEAM-11146

Thanks

On Mon, May 2, 2022 at 12:57 AM Afek, Ifat (Nokia - IL/Kfar Sava) <
ifat.a...@nokia.com> wrote:

> Hi,
>
>
>
> I’m investigating a slowness of our beam pipelines, and as part of that I
> tried to compare a very simple beam pipeline with an equivalent
> flink-native pipeline. Both pipelines should read strings from one kafka
> topic and write them to another topic. I’m using beam 2.38.0 and flink
> 1.13.
>
> I tried running each pipeline separately, on a single task manager with a
> single slot and parallelism 1. What I saw is that Flink native runs 5 times
> faster than beam (150,000 strings per second in flink comparing to 30,000
> in beam).
>
>
>
> I’ll be happy if you can help me figure out why there is such a
> difference. Maybe the pipelines are not really equivalent, or the beam
> configuration is wrong?
>
>
>
>
>
> Flink native pipeline:
>
>
>
>     public void process() throws Exception {
>
>         StreamExecutionEnvironment environment =
> StreamExecutionEnvironment.getExecutionEnvironment();
>
>
>
>         Properties properties = new Properties();
>
>         properties.setProperty("bootstrap.servers", kafkaAddress);
>
>         properties.setProperty("group.id", KAFKA_GROUP_ID);
>
>         FlinkKafkaConsumer<String> consumer = new
> FlinkKafkaConsumer<>(INPUT_TOPIC, new SimpleStringSchema(), properties);
>
>         consumer.setStartFromEarliest();
>
>
> consumer.assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps());
>
>
>
>         FlinkKafkaProducer<String> producer = new
> FlinkKafkaProducer<>(kafkaAddress, OUTPUT_TOPIC, new SimpleStringSchema());
>
>
>
>         DataStream<String> inputMessagesStream =
> environment.addSource(consumer);
>
>         inputMessagesStream.addSink(producer);
>
>
>
>         environment.execute();
>
>     }
>
>
>
>
>
> Beam pipeline:
>
>
>
>     public static void main(String[] args) {
>
>         try {
>
>             StreamingOptions options =
> PipelineOptionsFactory.fromArgs(args).as(StreamingOptions.class);
>
>             options.setStreaming(true);
>
>             options.setRunner(FlinkRunner.class);
>
>             Pipeline pipeline = Pipeline.create(options);
>
>
>
>             PTransform<PBegin, PCollection<KV<String, String>>> transform
> = KafkaIO.<String, String>read()
>
>                     .withBootstrapServers(bootstrapServers)
>
>                     .withTopic(inputTopic)
>
>                     .withKeyDeserializer(StringDeserializer.class)
>
>                     .withValueDeserializer(StringDeserializer.class)
>
>                     .withConsumerConfigUpdates((ImmutableMap.of(
>
>                             "auto.offset.reset", "earliest",
>
>                             "group.id", consumerGroup)))
>
>                     .withoutMetadata();
>
>
>
>             PCollection<KV<String, String>> input =
> pipeline.apply("readFromKafka", transform);
>
>
>
>             PCollection<ProducerRecord<String, String>> convertedInput =
>
>                     input.apply("ConvertToStringRecord",
>
>                             ParDo.of(new
> ConvertToStringRecord(outputTopic) {}))
>
>                             .setCoder(new
> ProducerRecordCoder<>(StringUtf8Coder.of(), StringUtf8Coder.of()));
>
>
>
>             KafkaIO.WriteRecords<String, String> writeToAvro =
> KafkaIO.<String, String>writeRecords()
>
>                     .withBootstrapServers(bootstrapServers)
>
>                     .withTopic(outputTopic)
>
>                     .withKeySerializer(StringSerializer.class)
>
>                     .withValueSerializer(StringSerializer.class);
>
>
>
>             convertedInput.apply("writeToKafka", writeToAvro);
>
>             pipeline.run();
>
>
>
>         } catch (Exception e) {
>
>             log.atError().withThrowable(e).log("Exception thrown while
> running pipeline PipelineStringToString");
>
>         }
>
>     }
>
>
>
> @Log4j2
>
> @AllArgsConstructor
>
> public class ConvertToStringRecord extends DoFn<KV<String, String>,
> ProducerRecord<String, String>> {
>
>     private String topic;
>
>
>
>     private static ProducerRecord<String, String> getRecord(KV<String,
> String> message, String topic) {
>
>         String string = message.getValue();
>
>         ProducerRecord<String, String> pr = new ProducerRecord<>(topic,
> message.getKey(), string) {};
>
>         pr.headers().add("__TypeId__",
> String.class.getName().getBytes(StandardCharsets.UTF_8));
>
>         return pr;
>
>     }
>
>
>
>     @ProcessElement
>
>     public void processElement(ProcessContext c) {
>
>         try {
>
>             ProducerRecord pr =
> getRecord(Objects.requireNonNull(c.element()), topic);
>
>             c.output(pr);
>
>         } catch (Exception e) {
>
>             log.atError().withThrowable(e).log("exception thrown while
> processing string");
>
>         }
>
>     }
>
> }
>
>
>
>
>
> Thanks,
>
> Ifat
>
>
>

Reply via email to