Hi, David Regarding the N-way join, this feature aims to address the issue of state simplification, it is on the roadmap. Technically there are no limitations, but we'll need some time to find a sensible solution.
Best, Ron David Anderson <dander...@apache.org> 于2023年8月9日周三 10:38写道: > This join optimization sounds promising, but I'm wondering why Flink > SQL isn't taking advantage of the N-Ary Stream Operator introduced in > FLIP-92 [1][2] to implement a n-way join in a single operator. Is > there something that makes this impossible/impractical? > > [1] https://cwiki.apache.org/confluence/x/o4uvC > [2] https://issues.apache.org/jira/browse/FLINK-15688 > > On Sat, Aug 5, 2023 at 3:54 AM shuai xu <xushuai...@gmail.com> wrote: > > > > Hi, we are also paying attention to this issue and have completed the > validation of the minibatch join optimization including the intermediate > message folding you mentioned. We plan to officially release it in Flink > 1.19. This optimization could significantly improves the performance of > join operations and we are looking forward to the arrival of Flink 1.19 to > help solve your problem. > > > > On 2023/08/04 08:21:51 Сыроватский Артем Иванович wrote: > > > Hello, Flink community! > > > > > > I have some important use case for me, which shows extremely bad > performance: > > > > > > * Streaming application > > > * sql table api > > > * 10 normal joins (state should be kept forever) > > > > > > Join rules are simple, i have 10 ten tables, which have same primary > key. I want to join result table from 10 pieces. > > > > > > But Flink joins sequentionally, so i have a chain with 10 joins. > > > > > > What happens if i generate update message for first table in chain: > > > > > > > > > * first join operator will produce 2 records: delete+insert > > > * second operator will double incoming messages. result=2*2=4 > messages > > > * ... > > > * last operator will produce 2**10=1024 messages. > > > > > > Perfomance become extremely slow and resources are wasting away. > > > > > > I've made some simple compaction operator, which compacts records > after join: > > > > > > > > > * join operator after receiving delete message, generates 2 > messages > > > * after receiving insert message, generate 2 more messges > > > * but two of the four are compacted. So operator receives 2 > messages->sends 2 messages > > > > > > I wonder if this approach is right? Why it is not implemented in Flink > yet? > > > > > > And i've got some problem how should i implement it on cluster, > because i have changed some flink sources, which are not pluggable? > > > > > > I have modified StreamExecJoin class and added this code as a proof of > concept: > > > > > > > > > final OneInputTransformation<RowData, RowData> compactTransform = > > > ExecNodeUtil.createOneInputTransformation( > > > transform, > > > "compact join results", > > > "description", > > > new ProcessOperator<>(new > CompactStreamOperator(equaliser)), > > > InternalTypeInfo.of(returnType), > > > leftTransform.getParallelism() > > > ); > > > > > > return compactTransform; > > > > > > Transform operator: > > > @Override > > > > > > public void processElement( > > > RowData value, > > > ProcessFunction<RowData, RowData>.Context ctx, > > > Collector<RowData> collector) throws Exception { > > > > > > counter++; > > > > > > boolean compacted=false; > > > if (value.getRowKind()==RowKind.DELETE) { > > > value.setRowKind(RowKind.INSERT); > > > for (int i = buffer.size() - 1; i >= 0; i--) { > > > RowData x = buffer.get(i); > > > if (x.getRowKind() == RowKind.INSERT && > recordEqualiser.equals(x, value)) { > > > buffer.remove(i); > > > compacted = true; > > > break; > > > } > > > } > > > value.setRowKind(RowKind.DELETE); > > > } > > > > > > if (!compacted) { > > > buffer.add(value); > > > } > > > > > > if (counter>=10) > > > { > > > buffer.forEach(collector::collect); > > > buffer.clear(); > > > counter=0; > > > } > > > } > > > > > > > > > [cid:f886301c-4708-494e-a6df-d81137150774] > > > > > > > > > > > > Regards, > > > Artem > > > > > > > > > >