Привет Артем! Are your tables backed by Kafka? If - yes, what if you use upsert-kafka connector from Table API <https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/upsert-kafka/>, does it help to reduce the number of records in each subsequent join operator? I wrote a blog-post some time ago (see joins part) why upsert-kafka can be useful when joining event tables: https://www.ververica.com/blog/streaming-modes-of-flink-kafka-connectors
Best regards, Alexey On Wed, Aug 9, 2023 at 5:05 AM liu ron <ron9....@gmail.com> wrote: > Hi, David > > Regarding the N-way join, this feature aims to address the issue of state > simplification, it is on the roadmap. Technically there are no limitations, > but we'll need some time to find a sensible solution. > > Best, > Ron > > David Anderson <dander...@apache.org> 于2023年8月9日周三 10:38写道: > >> This join optimization sounds promising, but I'm wondering why Flink >> SQL isn't taking advantage of the N-Ary Stream Operator introduced in >> FLIP-92 [1][2] to implement a n-way join in a single operator. Is >> there something that makes this impossible/impractical? >> >> [1] https://cwiki.apache.org/confluence/x/o4uvC >> [2] https://issues.apache.org/jira/browse/FLINK-15688 >> >> On Sat, Aug 5, 2023 at 3:54 AM shuai xu <xushuai...@gmail.com> wrote: >> > >> > Hi, we are also paying attention to this issue and have completed the >> validation of the minibatch join optimization including the intermediate >> message folding you mentioned. We plan to officially release it in Flink >> 1.19. This optimization could significantly improves the performance of >> join operations and we are looking forward to the arrival of Flink 1.19 to >> help solve your problem. >> > >> > On 2023/08/04 08:21:51 Сыроватский Артем Иванович wrote: >> > > Hello, Flink community! >> > > >> > > I have some important use case for me, which shows extremely bad >> performance: >> > > >> > > * Streaming application >> > > * sql table api >> > > * 10 normal joins (state should be kept forever) >> > > >> > > Join rules are simple, i have 10 ten tables, which have same primary >> key. I want to join result table from 10 pieces. >> > > >> > > But Flink joins sequentionally, so i have a chain with 10 joins. >> > > >> > > What happens if i generate update message for first table in chain: >> > > >> > > >> > > * first join operator will produce 2 records: delete+insert >> > > * second operator will double incoming messages. result=2*2=4 >> messages >> > > * ... >> > > * last operator will produce 2**10=1024 messages. >> > > >> > > Perfomance become extremely slow and resources are wasting away. >> > > >> > > I've made some simple compaction operator, which compacts records >> after join: >> > > >> > > >> > > * join operator after receiving delete message, generates 2 >> messages >> > > * after receiving insert message, generate 2 more messges >> > > * but two of the four are compacted. So operator receives 2 >> messages->sends 2 messages >> > > >> > > I wonder if this approach is right? Why it is not implemented in >> Flink yet? >> > > >> > > And i've got some problem how should i implement it on cluster, >> because i have changed some flink sources, which are not pluggable? >> > > >> > > I have modified StreamExecJoin class and added this code as a proof >> of concept: >> > > >> > > >> > > final OneInputTransformation<RowData, RowData> compactTransform = >> > > ExecNodeUtil.createOneInputTransformation( >> > > transform, >> > > "compact join results", >> > > "description", >> > > new ProcessOperator<>(new >> CompactStreamOperator(equaliser)), >> > > InternalTypeInfo.of(returnType), >> > > leftTransform.getParallelism() >> > > ); >> > > >> > > return compactTransform; >> > > >> > > Transform operator: >> > > @Override >> > > >> > > public void processElement( >> > > RowData value, >> > > ProcessFunction<RowData, RowData>.Context ctx, >> > > Collector<RowData> collector) throws Exception { >> > > >> > > counter++; >> > > >> > > boolean compacted=false; >> > > if (value.getRowKind()==RowKind.DELETE) { >> > > value.setRowKind(RowKind.INSERT); >> > > for (int i = buffer.size() - 1; i >= 0; i--) { >> > > RowData x = buffer.get(i); >> > > if (x.getRowKind() == RowKind.INSERT && >> recordEqualiser.equals(x, value)) { >> > > buffer.remove(i); >> > > compacted = true; >> > > break; >> > > } >> > > } >> > > value.setRowKind(RowKind.DELETE); >> > > } >> > > >> > > if (!compacted) { >> > > buffer.add(value); >> > > } >> > > >> > > if (counter>=10) >> > > { >> > > buffer.forEach(collector::collect); >> > > buffer.clear(); >> > > counter=0; >> > > } >> > > } >> > > >> > > >> > > [cid:f886301c-4708-494e-a6df-d81137150774] >> > > >> > > >> > > >> > > Regards, >> > > Artem >> > > >> > > >> > > >> >