This join optimization sounds promising, but I'm wondering why Flink
SQL isn't taking advantage of the N-Ary Stream Operator introduced in
FLIP-92 [1][2] to implement a n-way join in a single operator. Is
there something that makes this impossible/impractical?

[1] https://cwiki.apache.org/confluence/x/o4uvC
[2] https://issues.apache.org/jira/browse/FLINK-15688

On Sat, Aug 5, 2023 at 3:54 AM shuai xu <xushuai...@gmail.com> wrote:
>
> Hi, we are also paying attention to this issue and have completed the 
> validation of the minibatch join optimization including the intermediate 
> message folding you mentioned. We plan to officially release it in Flink 
> 1.19. This optimization could significantly improves the performance of join 
> operations and we are looking forward to the arrival of Flink 1.19 to help 
> solve your problem.
>
> On 2023/08/04 08:21:51 Сыроватский Артем Иванович wrote:
> > Hello, Flink community!
> >
> > I have some important use case for me, which shows extremely bad 
> > performance:
> >
> >   *   Streaming application
> >   *   sql table api
> >   *   10 normal joins (state should be kept forever)
> >
> > Join rules are simple, i have 10 ten tables, which have same primary key. I 
> > want to join result table from 10 pieces.
> >
> > But Flink joins sequentionally, so i have a chain with 10 joins.
> >
> > What happens if i generate update message for first table in chain:
> >
> >
> >   *   first join operator will produce 2 records: delete+insert
> >   *   second operator will double incoming messages. result=2*2=4 messages
> >   *   ...
> >   *   last operator will produce 2**10=1024 messages.
> >
> > Perfomance become extremely slow and resources are wasting away.
> >
> > I've made some simple compaction operator, which compacts records after 
> > join:
> >
> >
> >   *   join operator after receiving delete message, generates 2 messages
> >   *   after receiving insert message, generate 2 more messges
> >   *   but two of the four are compacted. So operator receives 2 
> > messages->sends 2 messages
> >
> > I wonder if this approach is right? Why it is not implemented in Flink yet?
> >
> > And i've got some problem how should i implement it on cluster, because i 
> > have changed some flink sources, which are not pluggable?
> >
> > I have modified StreamExecJoin class and added this code as a proof of 
> > concept:
> >
> >
> > final OneInputTransformation<RowData, RowData> compactTransform =
> >         ExecNodeUtil.createOneInputTransformation(
> >                 transform,
> >                 "compact join results",
> >                 "description",
> >                 new ProcessOperator<>(new CompactStreamOperator(equaliser)),
> >                 InternalTypeInfo.of(returnType),
> >                 leftTransform.getParallelism()
> >         );
> >
> > return compactTransform;
> >
> > Transform operator:
> > @Override
> >
> > public void processElement(
> >         RowData value,
> >         ProcessFunction<RowData, RowData>.Context ctx,
> >         Collector<RowData> collector) throws Exception {
> >
> >     counter++;
> >
> >     boolean compacted=false;
> >     if (value.getRowKind()==RowKind.DELETE) {
> >         value.setRowKind(RowKind.INSERT);
> >         for (int i = buffer.size() - 1; i >= 0; i--) {
> >             RowData x = buffer.get(i);
> >             if (x.getRowKind() == RowKind.INSERT && 
> > recordEqualiser.equals(x, value)) {
> >                 buffer.remove(i);
> >                 compacted = true;
> >                 break;
> >             }
> >         }
> >         value.setRowKind(RowKind.DELETE);
> >     }
> >
> >     if (!compacted) {
> >         buffer.add(value);
> >     }
> >
> >     if (counter>=10)
> >     {
> >         buffer.forEach(collector::collect);
> >         buffer.clear();
> >         counter=0;
> >     }
> > }
> >
> >
> > [cid:f886301c-4708-494e-a6df-d81137150774]
> >
> >
> >
> > Regards,
> > Artem
> >
> >
> >

Reply via email to