Привет Артем!

Are your tables backed by Kafka? If - yes, what if you use upsert-kafka
connector from Table API
<https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/upsert-kafka/>,
does it help to reduce the number of records in each subsequent join
operator?
I wrote a blog-post some time ago (see joins part) why upsert-kafka can be
useful when joining event tables:
https://www.ververica.com/blog/streaming-modes-of-flink-kafka-connectors

Best regards,
Alexey

On Wed, Aug 9, 2023 at 5:05 AM liu ron <ron9....@gmail.com> wrote:

> Hi, David
>
> Regarding the N-way join, this feature aims to address the issue of state
> simplification, it is on the roadmap. Technically there are no limitations,
> but we'll need some time to find a sensible solution.
>
> Best,
> Ron
>
> David Anderson <dander...@apache.org> 于2023年8月9日周三 10:38写道:
>
>> This join optimization sounds promising, but I'm wondering why Flink
>> SQL isn't taking advantage of the N-Ary Stream Operator introduced in
>> FLIP-92 [1][2] to implement a n-way join in a single operator. Is
>> there something that makes this impossible/impractical?
>>
>> [1] https://cwiki.apache.org/confluence/x/o4uvC
>> [2] https://issues.apache.org/jira/browse/FLINK-15688
>>
>> On Sat, Aug 5, 2023 at 3:54 AM shuai xu <xushuai...@gmail.com> wrote:
>> >
>> > Hi, we are also paying attention to this issue and have completed the
>> validation of the minibatch join optimization including the intermediate
>> message folding you mentioned. We plan to officially release it in Flink
>> 1.19. This optimization could significantly improves the performance of
>> join operations and we are looking forward to the arrival of Flink 1.19 to
>> help solve your problem.
>> >
>> > On 2023/08/04 08:21:51 Сыроватский Артем Иванович wrote:
>> > > Hello, Flink community!
>> > >
>> > > I have some important use case for me, which shows extremely bad
>> performance:
>> > >
>> > >   *   Streaming application
>> > >   *   sql table api
>> > >   *   10 normal joins (state should be kept forever)
>> > >
>> > > Join rules are simple, i have 10 ten tables, which have same primary
>> key. I want to join result table from 10 pieces.
>> > >
>> > > But Flink joins sequentionally, so i have a chain with 10 joins.
>> > >
>> > > What happens if i generate update message for first table in chain:
>> > >
>> > >
>> > >   *   first join operator will produce 2 records: delete+insert
>> > >   *   second operator will double incoming messages. result=2*2=4
>> messages
>> > >   *   ...
>> > >   *   last operator will produce 2**10=1024 messages.
>> > >
>> > > Perfomance become extremely slow and resources are wasting away.
>> > >
>> > > I've made some simple compaction operator, which compacts records
>> after join:
>> > >
>> > >
>> > >   *   join operator after receiving delete message, generates 2
>> messages
>> > >   *   after receiving insert message, generate 2 more messges
>> > >   *   but two of the four are compacted. So operator receives 2
>> messages->sends 2 messages
>> > >
>> > > I wonder if this approach is right? Why it is not implemented in
>> Flink yet?
>> > >
>> > > And i've got some problem how should i implement it on cluster,
>> because i have changed some flink sources, which are not pluggable?
>> > >
>> > > I have modified StreamExecJoin class and added this code as a proof
>> of concept:
>> > >
>> > >
>> > > final OneInputTransformation<RowData, RowData> compactTransform =
>> > >         ExecNodeUtil.createOneInputTransformation(
>> > >                 transform,
>> > >                 "compact join results",
>> > >                 "description",
>> > >                 new ProcessOperator<>(new
>> CompactStreamOperator(equaliser)),
>> > >                 InternalTypeInfo.of(returnType),
>> > >                 leftTransform.getParallelism()
>> > >         );
>> > >
>> > > return compactTransform;
>> > >
>> > > Transform operator:
>> > > @Override
>> > >
>> > > public void processElement(
>> > >         RowData value,
>> > >         ProcessFunction<RowData, RowData>.Context ctx,
>> > >         Collector<RowData> collector) throws Exception {
>> > >
>> > >     counter++;
>> > >
>> > >     boolean compacted=false;
>> > >     if (value.getRowKind()==RowKind.DELETE) {
>> > >         value.setRowKind(RowKind.INSERT);
>> > >         for (int i = buffer.size() - 1; i >= 0; i--) {
>> > >             RowData x = buffer.get(i);
>> > >             if (x.getRowKind() == RowKind.INSERT &&
>> recordEqualiser.equals(x, value)) {
>> > >                 buffer.remove(i);
>> > >                 compacted = true;
>> > >                 break;
>> > >             }
>> > >         }
>> > >         value.setRowKind(RowKind.DELETE);
>> > >     }
>> > >
>> > >     if (!compacted) {
>> > >         buffer.add(value);
>> > >     }
>> > >
>> > >     if (counter>=10)
>> > >     {
>> > >         buffer.forEach(collector::collect);
>> > >         buffer.clear();
>> > >         counter=0;
>> > >     }
>> > > }
>> > >
>> > >
>> > > [cid:f886301c-4708-494e-a6df-d81137150774]
>> > >
>> > >
>> > >
>> > > Regards,
>> > > Artem
>> > >
>> > >
>> > >
>>
>

Reply via email to