This join optimization sounds promising, but I'm wondering why Flink SQL isn't taking advantage of the N-Ary Stream Operator introduced in FLIP-92 [1][2] to implement a n-way join in a single operator. Is there something that makes this impossible/impractical?
[1] https://cwiki.apache.org/confluence/x/o4uvC [2] https://issues.apache.org/jira/browse/FLINK-15688 On Sat, Aug 5, 2023 at 3:54 AM shuai xu <xushuai...@gmail.com> wrote: > > Hi, we are also paying attention to this issue and have completed the > validation of the minibatch join optimization including the intermediate > message folding you mentioned. We plan to officially release it in Flink > 1.19. This optimization could significantly improves the performance of join > operations and we are looking forward to the arrival of Flink 1.19 to help > solve your problem. > > On 2023/08/04 08:21:51 Сыроватский Артем Иванович wrote: > > Hello, Flink community! > > > > I have some important use case for me, which shows extremely bad > > performance: > > > > * Streaming application > > * sql table api > > * 10 normal joins (state should be kept forever) > > > > Join rules are simple, i have 10 ten tables, which have same primary key. I > > want to join result table from 10 pieces. > > > > But Flink joins sequentionally, so i have a chain with 10 joins. > > > > What happens if i generate update message for first table in chain: > > > > > > * first join operator will produce 2 records: delete+insert > > * second operator will double incoming messages. result=2*2=4 messages > > * ... > > * last operator will produce 2**10=1024 messages. > > > > Perfomance become extremely slow and resources are wasting away. > > > > I've made some simple compaction operator, which compacts records after > > join: > > > > > > * join operator after receiving delete message, generates 2 messages > > * after receiving insert message, generate 2 more messges > > * but two of the four are compacted. So operator receives 2 > > messages->sends 2 messages > > > > I wonder if this approach is right? Why it is not implemented in Flink yet? > > > > And i've got some problem how should i implement it on cluster, because i > > have changed some flink sources, which are not pluggable? > > > > I have modified StreamExecJoin class and added this code as a proof of > > concept: > > > > > > final OneInputTransformation<RowData, RowData> compactTransform = > > ExecNodeUtil.createOneInputTransformation( > > transform, > > "compact join results", > > "description", > > new ProcessOperator<>(new CompactStreamOperator(equaliser)), > > InternalTypeInfo.of(returnType), > > leftTransform.getParallelism() > > ); > > > > return compactTransform; > > > > Transform operator: > > @Override > > > > public void processElement( > > RowData value, > > ProcessFunction<RowData, RowData>.Context ctx, > > Collector<RowData> collector) throws Exception { > > > > counter++; > > > > boolean compacted=false; > > if (value.getRowKind()==RowKind.DELETE) { > > value.setRowKind(RowKind.INSERT); > > for (int i = buffer.size() - 1; i >= 0; i--) { > > RowData x = buffer.get(i); > > if (x.getRowKind() == RowKind.INSERT && > > recordEqualiser.equals(x, value)) { > > buffer.remove(i); > > compacted = true; > > break; > > } > > } > > value.setRowKind(RowKind.DELETE); > > } > > > > if (!compacted) { > > buffer.add(value); > > } > > > > if (counter>=10) > > { > > buffer.forEach(collector::collect); > > buffer.clear(); > > counter=0; > > } > > } > > > > > > [cid:f886301c-4708-494e-a6df-d81137150774] > > > > > > > > Regards, > > Artem > > > > > >