Hi Shammon Are you suggesting that I use over and partition by , right? if it is like this, I must define a agg_func on a specific column. For Example,I have a product table.
Before partition by : select user,product,amount FROM product After partition by : select user,product,amount, FIRST_VALUE(user) OVER( partition by product) AS meanless_col FROM product it will cause state too large problem possible.Do I need to define a Range like this? select user,product,amount, FIRST_VALUE(user) OVER( partition by product) AS meanless_col RANGEBETWEENINTERVAL'1'HOURPRECEDINGANDCURRENTROW FROM product -- Best, Hjw At 2023-04-04 12:23:26, "Shammon FY" <zjur...@gmail.com> wrote: Hi hjw To rescale data for dim join, I think you can use `partition by` in sql before `dim join` which will redistribute data by specific column. In addition, you can add cache for `dim table` to improve performance too. Best, Shammon FY On Tue, Apr 4, 2023 at 10:28 AM Hang Ruan <ruanhang1...@gmail.com> wrote: Hi, hiw, IMO, I think the parallelism 1 is enough for you job if we do not consider the sink. I do not know why you need set the lookup join operator's parallelism to 6. The SQL planner will help us to decide the type of the edge and we can not change it. Maybe you could share the Execution graph to provide more information. Best, Hang hjw <hjw_em...@163.com> 于2023年4月4日周二 00:37写道: For example. I create a kafka source to subscribe the topic that have one partition and set the default parallelism of the job to 6.The next operator of kafka source is that lookup join a mysql table.However, the relationship between the kafka Source and the Lookup join operator is Forward, so only one subtask in the Lookup join operator can receive data.I want to set the relationship between the kafka Source and the Lookup join operator is reblance so that all subtask in Lookup join operator can recevie data. Env: Flink version:1.15.1 -- Best, Hjw