Hi Abdelilah, I think your approach is overly complicated (and probably slow) but I might have misunderstood things. Naively, I'd assume that you just want to union stream 1 and stream 2 instead of joining. Note that for union the events must have the same schema, so you most likely want to have a select on each stream before union. Summarizing: Table3 = (select id, title, description from Table 1) union (select id, title, description from Table 2)
If you use a retract stream, you probably do not need to use the grouping and last value selection as well. On Mon, Feb 8, 2021 at 3:33 PM Abdelilah CHOUKRI < abdelilah.chou...@prt.manomano.com> wrote: > Hi, > > We're trying to use Flink 1.11 Java tables API to process a streaming use > case: > > We have 2 streams, each one with different structures. Both events, > coming from Kafka, can be: > - A new event (not in the system already) > - An updated event (updating an event that previously was inserted) > so we only want to store the latest data in the Table. > > We need to join the 2 previous Tables to have all this data stored in the > Flink system. We think that the best way is to store joined data as a > Table. > This is going to be a Flink Table, that will be a join of the 2 tables by > a common key. > > To sum up, we have: > - Stream 1 (coming from Kafka topic) -> Flink Table 1 > - Stream 2 (coming from Kafka topic) -> Flink Table 2 > - Table 3 = Table 1 join Table 2 > - DataStream using RetractStream of Table 3 > > To get the last element in Table 1 and Table 2, we are using Functions > (LastValueAggFunction): > > streamTableEnvironment.registerFunction("LAST_VALUE_STRING", new > LastValueAggFunction.StringLastValueAggFunction()); > ... > streamTableEnvironment.fromDataStream(inputDataStream) > .groupBy($("id")) > .select( > $("id").as("o_id"), > call("LAST_VALUE_STRING", $("title")).as("o_title"), > call("LAST_VALUE_STRING", > $("description")).as("o_description") > ); > > > The questions are: > - Is our approach correct to get the data stored in the Flink system? > - Is it necessary to use the *LastValueAggFunction *in our case ? as we > want to retract the stream to > out custom Pojo instead of *Row*, but we're getting the attached error: > (attached*: stack_trace.log*) > > > Abdelilah Choukdi, > Backend dev at ManoMano. >