HI Ifat, Did you enable fasterCopy parameter ?
Please look at this issue: https://issues.apache.org/jira/browse/BEAM-11146 Thanks On Mon, May 2, 2022 at 12:57 AM Afek, Ifat (Nokia - IL/Kfar Sava) < ifat.a...@nokia.com> wrote: > Hi, > > > > I’m investigating a slowness of our beam pipelines, and as part of that I > tried to compare a very simple beam pipeline with an equivalent > flink-native pipeline. Both pipelines should read strings from one kafka > topic and write them to another topic. I’m using beam 2.38.0 and flink > 1.13. > > I tried running each pipeline separately, on a single task manager with a > single slot and parallelism 1. What I saw is that Flink native runs 5 times > faster than beam (150,000 strings per second in flink comparing to 30,000 > in beam). > > > > I’ll be happy if you can help me figure out why there is such a > difference. Maybe the pipelines are not really equivalent, or the beam > configuration is wrong? > > > > > > Flink native pipeline: > > > > public void process() throws Exception { > > StreamExecutionEnvironment environment = > StreamExecutionEnvironment.getExecutionEnvironment(); > > > > Properties properties = new Properties(); > > properties.setProperty("bootstrap.servers", kafkaAddress); > > properties.setProperty("group.id", KAFKA_GROUP_ID); > > FlinkKafkaConsumer<String> consumer = new > FlinkKafkaConsumer<>(INPUT_TOPIC, new SimpleStringSchema(), properties); > > consumer.setStartFromEarliest(); > > > consumer.assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps()); > > > > FlinkKafkaProducer<String> producer = new > FlinkKafkaProducer<>(kafkaAddress, OUTPUT_TOPIC, new SimpleStringSchema()); > > > > DataStream<String> inputMessagesStream = > environment.addSource(consumer); > > inputMessagesStream.addSink(producer); > > > > environment.execute(); > > } > > > > > > Beam pipeline: > > > > public static void main(String[] args) { > > try { > > StreamingOptions options = > PipelineOptionsFactory.fromArgs(args).as(StreamingOptions.class); > > options.setStreaming(true); > > options.setRunner(FlinkRunner.class); > > Pipeline pipeline = Pipeline.create(options); > > > > PTransform<PBegin, PCollection<KV<String, String>>> transform > = KafkaIO.<String, String>read() > > .withBootstrapServers(bootstrapServers) > > .withTopic(inputTopic) > > .withKeyDeserializer(StringDeserializer.class) > > .withValueDeserializer(StringDeserializer.class) > > .withConsumerConfigUpdates((ImmutableMap.of( > > "auto.offset.reset", "earliest", > > "group.id", consumerGroup))) > > .withoutMetadata(); > > > > PCollection<KV<String, String>> input = > pipeline.apply("readFromKafka", transform); > > > > PCollection<ProducerRecord<String, String>> convertedInput = > > input.apply("ConvertToStringRecord", > > ParDo.of(new > ConvertToStringRecord(outputTopic) {})) > > .setCoder(new > ProducerRecordCoder<>(StringUtf8Coder.of(), StringUtf8Coder.of())); > > > > KafkaIO.WriteRecords<String, String> writeToAvro = > KafkaIO.<String, String>writeRecords() > > .withBootstrapServers(bootstrapServers) > > .withTopic(outputTopic) > > .withKeySerializer(StringSerializer.class) > > .withValueSerializer(StringSerializer.class); > > > > convertedInput.apply("writeToKafka", writeToAvro); > > pipeline.run(); > > > > } catch (Exception e) { > > log.atError().withThrowable(e).log("Exception thrown while > running pipeline PipelineStringToString"); > > } > > } > > > > @Log4j2 > > @AllArgsConstructor > > public class ConvertToStringRecord extends DoFn<KV<String, String>, > ProducerRecord<String, String>> { > > private String topic; > > > > private static ProducerRecord<String, String> getRecord(KV<String, > String> message, String topic) { > > String string = message.getValue(); > > ProducerRecord<String, String> pr = new ProducerRecord<>(topic, > message.getKey(), string) {}; > > pr.headers().add("__TypeId__", > String.class.getName().getBytes(StandardCharsets.UTF_8)); > > return pr; > > } > > > > @ProcessElement > > public void processElement(ProcessContext c) { > > try { > > ProducerRecord pr = > getRecord(Objects.requireNonNull(c.element()), topic); > > c.output(pr); > > } catch (Exception e) { > > log.atError().withThrowable(e).log("exception thrown while > processing string"); > > } > > } > > } > > > > > > Thanks, > > Ifat > > >