Thank you Arvid for the reply,
In fact, it's not a union of the same data, I'll try to explain what
we want to achieve as a concept:
We have 2 data sources, with two different schemas, but with a common
field/attribute (example: /brandId/),
- /Cars/: receive data entries with high frequency, one /Car/ can only
be related to one /Brand/. (with the field /brandId/)
- /Brands/: receive data entries with high frequency, one /Brand/ can
be related to many /Cars/. (with the field /brandId/)
And we need to "merge" these data in a single output: /CarWithBrand/.
I'll try to explain the behaviour that we want to achieve with the
following diagram:
flink_flow.png
- Time 1: we have a /Car/ and a /Brand/ matching by /brandId, /so the
output should return a corresponding /CarWithRand.
/
/- /Time 2: we have a new /Car/, also it matched the previous /Brand/,
so we output a /CarWithBrand./
- Time 3: we receive a new /Car/, but it does not match any existing
/Brand,/ so no output./
/
- Time 4: we have a new Car that matches the previous brand, and on
the other hand,
we received a new Brand that matches the previous Car,
so we should have two outputs.
- Time 5: we receive an existing brand, but with an updated field (in
this case the name), so we have
to replace the previous Brand with brandId, and if
there are any previous matching Cars, we
have to output all the corresponding CarWithBrand with
the changed field.
So, we're using Flink Tables during the process, to maintain the
latest status of the data regardless of time.
And furthermore, here's a simplified java code example that represents
what we've achieved so far:*flink_join.java*
How would you recommend to achieve this with Flink ?
Is our approach adequate ?
Thank you.
On Thu, Feb 11, 2021 at 11:50 AM Arvid Heise <ar...@apache.org
<mailto:ar...@apache.org>> wrote:
Hi Abdelilah,
I think your approach is overly complicated (and probably slow) but
I might have misunderstood things. Naively, I'd assume that you just
want to union stream 1 and stream 2 instead of joining. Note that
for union the events must have the same schema, so you most likely
want to have a select on each stream before union. Summarizing:
Table3 = (select id, title, description from Table 1) union (select
id, title, description from Table 2)
If you use a retract stream, you probably do not need to use the
grouping and last value selection as well.
On Mon, Feb 8, 2021 at 3:33 PM Abdelilah CHOUKRI
<abdelilah.chou...@prt.manomano.com
<mailto:abdelilah.chou...@prt.manomano.com>> wrote:
Hi,
We're trying to use Flink 1.11 Java tables API to process a
streaming use case:
We have 2 streams, each one with different structures. Both
events, coming from Kafka, can be:
- A new event (not in the system already)
- An updated event (updating an event that previously was
inserted)
so we only want to store the latest data in the Table.
We need to join the 2 previous Tables to have all this data
stored in the Flink system. We think that the best way is to
store joined data as a Table.
This is going to be a Flink Table, that will be a join of the 2
tables by a common key.
To sum up, we have:
- Stream 1 (coming from Kafka topic) -> Flink Table 1
- Stream 2 (coming from Kafka topic) -> Flink Table 2
- Table 3 = Table 1 join Table 2
- DataStream using RetractStream of Table 3
To get the last element in Table 1 and Table 2, we are using
Functions (LastValueAggFunction):
streamTableEnvironment.registerFunction("LAST_VALUE_STRING",
new LastValueAggFunction.StringLastValueAggFunction());
...
streamTableEnvironment.fromDataStream(inputDataStream)
.groupBy($("id"))
.select(
$("id").as("o_id"),
call("LAST_VALUE_STRING", $("title")).as("o_title"),
call("LAST_VALUE_STRING", $("description")).as("o_description")
);
The questions are:
- Is our approach correct to get the data stored in the Flink
system?
- Is it necessary to use the _/LastValueAggFunction /_in our
case ? as we want to retract the stream to
out custom Pojo instead of _/Row/_, but we're getting the
attached error: (attached*: stack_trace.log*)
Abdelilah Choukdi,
Backend dev at ManoMano.