Hi, we are also paying attention to this issue and have completed the validation of the minibatch join optimization including the intermediate message folding you mentioned. We plan to officially release it in Flink 1.19. This optimization could significantly improves the performance of join operations and we are looking forward to the arrival of Flink 1.19 to help solve your problem.
On 2023/08/04 08:21:51 Сыроватский Артем Иванович wrote: > Hello, Flink community! > > I have some important use case for me, which shows extremely bad performance: > > * Streaming application > * sql table api > * 10 normal joins (state should be kept forever) > > Join rules are simple, i have 10 ten tables, which have same primary key. I > want to join result table from 10 pieces. > > But Flink joins sequentionally, so i have a chain with 10 joins. > > What happens if i generate update message for first table in chain: > > > * first join operator will produce 2 records: delete+insert > * second operator will double incoming messages. result=2*2=4 messages > * ... > * last operator will produce 2**10=1024 messages. > > Perfomance become extremely slow and resources are wasting away. > > I've made some simple compaction operator, which compacts records after join: > > > * join operator after receiving delete message, generates 2 messages > * after receiving insert message, generate 2 more messges > * but two of the four are compacted. So operator receives 2 > messages->sends 2 messages > > I wonder if this approach is right? Why it is not implemented in Flink yet? > > And i've got some problem how should i implement it on cluster, because i > have changed some flink sources, which are not pluggable? > > I have modified StreamExecJoin class and added this code as a proof of > concept: > > > final OneInputTransformation<RowData, RowData> compactTransform = > ExecNodeUtil.createOneInputTransformation( > transform, > "compact join results", > "description", > new ProcessOperator<>(new CompactStreamOperator(equaliser)), > InternalTypeInfo.of(returnType), > leftTransform.getParallelism() > ); > > return compactTransform; > > Transform operator: > @Override > > public void processElement( > RowData value, > ProcessFunction<RowData, RowData>.Context ctx, > Collector<RowData> collector) throws Exception { > > counter++; > > boolean compacted=false; > if (value.getRowKind()==RowKind.DELETE) { > value.setRowKind(RowKind.INSERT); > for (int i = buffer.size() - 1; i >= 0; i--) { > RowData x = buffer.get(i); > if (x.getRowKind() == RowKind.INSERT && recordEqualiser.equals(x, > value)) { > buffer.remove(i); > compacted = true; > break; > } > } > value.setRowKind(RowKind.DELETE); > } > > if (!compacted) { > buffer.add(value); > } > > if (counter>=10) > { > buffer.forEach(collector::collect); > buffer.clear(); > counter=0; > } > } > > > [cid:f886301c-4708-494e-a6df-d81137150774] > > > > Regards, > Artem > > >