Hi Abdelilah,
at a first glance your logic seems to be correct. But Arvid is right
that your pipeline might not have the optimal performance that Flink can
offer due to the 3 groupBy operations. I'm wondering what the optimizer
produces out of this plan. Maybe you can share it with us using
`table.explain()` on the final table?
I think what Arvid meant is a UNION ALL in SQL. You would normalize the
two streams into a CarWithBrand before (containing nulls for the other
side), and then groupBy/aggregate to the last value and filter out
invalid CarWithBrands.
If DataStream API is an option for you I would consider using the
`connect()` method. A connect function can be stateful and you might
reduce your state size further. In your current implementation, the join
operator will store all input tables for processing. This means car and
brand state is stored twice.
Regards,
Timo
On 11.02.21 16:06, Abdelilah CHOUKRI wrote:
Thank you Arvid for the reply,
In fact, it's not a union of the same data, I'll try to explain what we
want to achieve as a concept:
We have 2 data sources, with two different schemas, but with a common
field/attribute (example: /brandId/),
- /Cars/: receive data entries with high frequency, one /Car/ can only
be related to one /Brand/. (with the field /brandId/)
- /Brands/: receive data entries with high frequency, one /Brand/ can be
related to many /Cars/. (with the field /brandId/)
And we need to "merge" these data in a single output: /CarWithBrand/.
I'll try to explain the behaviour that we want to achieve with the
following diagram:
flink_flow.png
- Time 1: we have a /Car/ and a /Brand/ matching by /brandId, /so the
output should return a corresponding /CarWithRand.
/
/- /Time 2: we have a new /Car/, also it matched the previous /Brand/,
so we output a /CarWithBrand./
- Time 3: we receive a new /Car/, but it does not match any existing
/Brand,/ so no output./
/
- Time 4: we have a new Car that matches the previous brand, and on the
other hand,
we received a new Brand that matches the previous Car,
so we should have two outputs.
- Time 5: we receive an existing brand, but with an updated field (in
this case the name), so we have
to replace the previous Brand with brandId, and if there
are any previous matching Cars, we
have to output all the corresponding CarWithBrand with
the changed field.
So, we're using Flink Tables during the process, to maintain the latest
status of the data regardless of time.
And furthermore, here's a simplified java code example that represents
what we've achieved so far:*flink_join.java*
How would you recommend to achieve this with Flink ?
Is our approach adequate ?
Thank you.
On Thu, Feb 11, 2021 at 11:50 AM Arvid Heise <ar...@apache.org
<mailto:ar...@apache.org>> wrote:
Hi Abdelilah,
I think your approach is overly complicated (and probably slow) but
I might have misunderstood things. Naively, I'd assume that you just
want to union stream 1 and stream 2 instead of joining. Note that
for union the events must have the same schema, so you most likely
want to have a select on each stream before union. Summarizing:
Table3 = (select id, title, description from Table 1) union (select
id, title, description from Table 2)
If you use a retract stream, you probably do not need to use the
grouping and last value selection as well.
On Mon, Feb 8, 2021 at 3:33 PM Abdelilah CHOUKRI
<abdelilah.chou...@prt.manomano.com
<mailto:abdelilah.chou...@prt.manomano.com>> wrote:
Hi,
We're trying to use Flink 1.11 Java tables API to process a
streaming use case:
We have 2 streams, each one with different structures. Both
events, coming from Kafka, can be:
- A new event (not in the system already)
- An updated event (updating an event that previously was inserted)
so we only want to store the latest data in the Table.
We need to join the 2 previous Tables to have all this data
stored in the Flink system. We think that the best way is to
store joined data as a Table.
This is going to be a Flink Table, that will be a join of the 2
tables by a common key.
To sum up, we have:
- Stream 1 (coming from Kafka topic) -> Flink Table 1
- Stream 2 (coming from Kafka topic) -> Flink Table 2
- Table 3 = Table 1 join Table 2
- DataStream using RetractStream of Table 3
To get the last element in Table 1 and Table 2, we are using
Functions (LastValueAggFunction):
streamTableEnvironment.registerFunction("LAST_VALUE_STRING", new
LastValueAggFunction.StringLastValueAggFunction());
...
streamTableEnvironment.fromDataStream(inputDataStream)
.groupBy($("id"))
.select(
$("id").as("o_id"),
call("LAST_VALUE_STRING", $("title")).as("o_title"),
call("LAST_VALUE_STRING", $("description")).as("o_description")
);
The questions are:
- Is our approach correct to get the data stored in the Flink
system?
- Is it necessary to use the _/LastValueAggFunction /_in our
case ? as we want to retract the stream to
out custom Pojo instead of _/Row/_, but we're getting the
attached error: (attached*: stack_trace.log*)
Abdelilah Choukdi,
Backend dev at ManoMano.