Hi, David

Regarding the N-way join, this feature aims to address the issue of state
simplification, it is on the roadmap. Technically there are no limitations,
but we'll need some time to find a sensible solution.

Best,
Ron

David Anderson <dander...@apache.org> 于2023年8月9日周三 10:38写道:

> This join optimization sounds promising, but I'm wondering why Flink
> SQL isn't taking advantage of the N-Ary Stream Operator introduced in
> FLIP-92 [1][2] to implement a n-way join in a single operator. Is
> there something that makes this impossible/impractical?
>
> [1] https://cwiki.apache.org/confluence/x/o4uvC
> [2] https://issues.apache.org/jira/browse/FLINK-15688
>
> On Sat, Aug 5, 2023 at 3:54 AM shuai xu <xushuai...@gmail.com> wrote:
> >
> > Hi, we are also paying attention to this issue and have completed the
> validation of the minibatch join optimization including the intermediate
> message folding you mentioned. We plan to officially release it in Flink
> 1.19. This optimization could significantly improves the performance of
> join operations and we are looking forward to the arrival of Flink 1.19 to
> help solve your problem.
> >
> > On 2023/08/04 08:21:51 Сыроватский Артем Иванович wrote:
> > > Hello, Flink community!
> > >
> > > I have some important use case for me, which shows extremely bad
> performance:
> > >
> > >   *   Streaming application
> > >   *   sql table api
> > >   *   10 normal joins (state should be kept forever)
> > >
> > > Join rules are simple, i have 10 ten tables, which have same primary
> key. I want to join result table from 10 pieces.
> > >
> > > But Flink joins sequentionally, so i have a chain with 10 joins.
> > >
> > > What happens if i generate update message for first table in chain:
> > >
> > >
> > >   *   first join operator will produce 2 records: delete+insert
> > >   *   second operator will double incoming messages. result=2*2=4
> messages
> > >   *   ...
> > >   *   last operator will produce 2**10=1024 messages.
> > >
> > > Perfomance become extremely slow and resources are wasting away.
> > >
> > > I've made some simple compaction operator, which compacts records
> after join:
> > >
> > >
> > >   *   join operator after receiving delete message, generates 2
> messages
> > >   *   after receiving insert message, generate 2 more messges
> > >   *   but two of the four are compacted. So operator receives 2
> messages->sends 2 messages
> > >
> > > I wonder if this approach is right? Why it is not implemented in Flink
> yet?
> > >
> > > And i've got some problem how should i implement it on cluster,
> because i have changed some flink sources, which are not pluggable?
> > >
> > > I have modified StreamExecJoin class and added this code as a proof of
> concept:
> > >
> > >
> > > final OneInputTransformation<RowData, RowData> compactTransform =
> > >         ExecNodeUtil.createOneInputTransformation(
> > >                 transform,
> > >                 "compact join results",
> > >                 "description",
> > >                 new ProcessOperator<>(new
> CompactStreamOperator(equaliser)),
> > >                 InternalTypeInfo.of(returnType),
> > >                 leftTransform.getParallelism()
> > >         );
> > >
> > > return compactTransform;
> > >
> > > Transform operator:
> > > @Override
> > >
> > > public void processElement(
> > >         RowData value,
> > >         ProcessFunction<RowData, RowData>.Context ctx,
> > >         Collector<RowData> collector) throws Exception {
> > >
> > >     counter++;
> > >
> > >     boolean compacted=false;
> > >     if (value.getRowKind()==RowKind.DELETE) {
> > >         value.setRowKind(RowKind.INSERT);
> > >         for (int i = buffer.size() - 1; i >= 0; i--) {
> > >             RowData x = buffer.get(i);
> > >             if (x.getRowKind() == RowKind.INSERT &&
> recordEqualiser.equals(x, value)) {
> > >                 buffer.remove(i);
> > >                 compacted = true;
> > >                 break;
> > >             }
> > >         }
> > >         value.setRowKind(RowKind.DELETE);
> > >     }
> > >
> > >     if (!compacted) {
> > >         buffer.add(value);
> > >     }
> > >
> > >     if (counter>=10)
> > >     {
> > >         buffer.forEach(collector::collect);
> > >         buffer.clear();
> > >         counter=0;
> > >     }
> > > }
> > >
> > >
> > > [cid:f886301c-4708-494e-a6df-d81137150774]
> > >
> > >
> > >
> > > Regards,
> > > Artem
> > >
> > >
> > >
>

Reply via email to