Hi Abdelilah,

I think your approach is overly complicated (and probably slow) but I might
have misunderstood things. Naively, I'd assume that you just want to union
stream 1 and stream 2 instead of joining. Note that for union the events
must have the same schema, so you most likely want to have a select on each
stream before union. Summarizing:
Table3 = (select id, title, description from Table 1) union (select id,
title, description from Table 2)

If you use a retract stream, you probably do not need to use the grouping
and last value selection as well.

On Mon, Feb 8, 2021 at 3:33 PM Abdelilah CHOUKRI <
abdelilah.chou...@prt.manomano.com> wrote:

> Hi,
>
> We're trying to use Flink 1.11 Java tables API to process a streaming use
> case:
>
> We have 2 streams, each one with different structures. Both events,
> coming from Kafka, can be:
> - A new event (not in the system already)
> - An updated event (updating an event that previously was inserted)
> so we only want to store the latest data in the Table.
>
> We need to join the 2 previous Tables to have all this data stored in the
> Flink system. We think that the best way is to store joined data as a
> Table.
> This is going to be a Flink Table, that will be a join of the 2 tables by
> a common key.
>
> To sum up, we have:
> - Stream 1 (coming from Kafka topic) -> Flink Table 1
> - Stream 2 (coming from Kafka topic) -> Flink Table 2
> - Table 3 = Table 1 join Table 2
> - DataStream using RetractStream of Table 3
>
> To get the last element in Table 1 and Table 2, we are using Functions
> (LastValueAggFunction):
>
> streamTableEnvironment.registerFunction("LAST_VALUE_STRING", new 
> LastValueAggFunction.StringLastValueAggFunction());
> ...
> streamTableEnvironment.fromDataStream(inputDataStream)
>         .groupBy($("id"))
>         .select(
>                 $("id").as("o_id"),
>                 call("LAST_VALUE_STRING", $("title")).as("o_title"),
>                 call("LAST_VALUE_STRING", 
> $("description")).as("o_description")
>         );
>
>
> The questions are:
> - Is our approach correct to get the data stored in the Flink system?
> - Is it necessary to use the *LastValueAggFunction *in our case ? as we
> want to retract the stream to
> out custom Pojo instead of *Row*, but we're getting the attached error:
> (attached*: stack_trace.log*)
>
>
> Abdelilah Choukdi,
> Backend dev at ManoMano.
>

Reply via email to