Sorry for cross posting ---------- Forwarded message --------- From: Talat Uyarer <tuya...@paloaltonetworks.com> Date: Fri, May 19, 2023, 2:25 AM Subject: Local Combiner for GroupByKey on Flink Streaming jobs To: <u...@beam.apache.org>
Hi, I have a stream aggregation job which is running on Flink 1.13 I generate DAG by using Beam SQL. My SQL query has a TUMBLE window. Basically My pipeline reads from kafka aggregate, counts/sums some values by streamin aggregation and writes a Sink. BeamSQl uses Groupbykey for the aggregation part. When I read the translation code for Group By Key class in Flink Runner [1] I could not see any local combiner. I see ReducerFunction but I feel it works on the reducer side. If this is true. How can I implement a local reducer in Source step to improve shuffling performance or Do I miss something? If you need more information about my pipeline I share some below. Thanks [1] https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java#L905 This is my SQL query : "SELECT log_source_id, SUM(size) AS total_size FROM PCOLLECTION GROUP BY log_source_id, TUMBLE(log_time, INTERVAL '1' MINUTE)" When I submit the job Flink generates two fused steps Source -> Sink Step. I shared the Task Name below. First Step Source step: Source: Kafka_IO/KafkaIO.Read.ReadFromKafkaViaUnbounded/Read(KafkaUnboundedSource) -> Flat Map -> ParMultiDo(AvroBytesToRowConverter) -> BeamCalcRel_47/ParDo(Calc)/ParMultiDo(Calc) -> BeamAggregationRel_48/assignEventTimestamp/AddTimestamps/ParMultiDo(AddTimestamps) -> BeamAggregationRel_48/Window.Into()/Window.Assign.out -> BeamAggregationRel_48/Group.CombineFieldsByFields/ToKvs/selectKeys/AddKeys/Map/ParMultiDo(Anonymous) -> ToBinaryKeyedWorkItem Second Step is Aggregation and Sink Step: BeamAggregationRel_48/Group.CombineFieldsByFields/ToKvs/GroupByKey -> ToGBKResult -> BeamAggregationRel_48/Group.CombineFieldsByFields/Combine/ParDo(Anonymous)/ParMultiDo(Anonymous) -> BeamAggregationRel_48/Group.CombineFieldsByFields/ToRow/ParMultiDo(Anonymous) -> BeamAggregationRel_48/mergeRecord/ParMultiDo(Anonymous) -> BeamCalcRel_49/ParDo(Calc)/ParMultiDo(Calc) -> ParMultiDo(RowToOutputFormat) -> ParMultiDo(SinkProcessor)