Sorry just notice I made a typo in the last table (clickAdvertId != null 
instead of clickCount !=null)

Table allImpressionTable = impressionsTable
      .leftOuterJoin(clicksTable, "clickImpId = impImpressionId && clickMinute 
= impMinute")
      .groupBy("impAdvertId, impVariationName, impMinute")
      .select("impAdvertId, impVariationName, clickAdvertId.count as 
clickCount, impMinute")
       .where("clickAdvertId != null");

________________________________
From: Faye Pressly
Sent: Friday, August 7, 2020 9:28 PM
To: user@flink.apache.org <user@flink.apache.org>
Subject: GroupBy with count on a joint table only let met write using 
toRetractStream

Hello,

I have a steam of events (coming from a Kinesis Stream) of this form:

impressionId | advertid | variationName | eventType | eventTime

The end goal is to output back on a Kinesis Stream the count of event of type 
'impression' and the count of events of type 'click'

however, I need to drop (or ignore) event of type clicks that don't have a 
matching impressionId with an event of type 'impression' (So basically I need 
to discard click event that don't have an impression)

This is how tackled my solution:

// Convert the stream into a table
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
Table eventsTable = tEnv.fromDataStream(eventsStream, "impressionId, advertId, 
variationName, eventType, eventTime.rowtime");
tEnv.registerTable("Events", eventsTable);

// Create a table with only event of type clicks
Table clicksTable = eventsTable
      .where("eventType = 'click'")
      
.window(Slide.over("24.hour").every("1.minute").on("eventTime").as("minuteWindow"))
      .groupBy("impressionId, advertId, variationName, minuteWindow")
      .select("impressionId as clickImpId, creativeId as clickAdvertId, 
variationName as clickVariationName, minuteWindow.rowTime as clickMinute");

// Create a table with only event of type impression
Table impressionsTable = eventsTable
      .where("eventType = 'impression'")
      
.window(Slide.over("24.hour").every("1.minute").on("eventTime").as("minuteWindow"))
      .groupBy("impressionId, advertId, variationName, minuteWindow")
      .select("impressionId as impImpressionId, advertId as impAdvertId, 
variationName as impVariationName, eventTime, minuteWindow.rowTime as 
impMinute");

// left join the impression with the clicks using the impressionId as well as 
the temporal field
//and then group by to generate a count of all the click that have a matching 
impression (aka row where clickAdvertId is not null)
Table allImpressionTable = impressionsTable
      .leftOuterJoin(clicksTable, "clickImpId = impImpressionId && clickMinute 
= impMinute")
      .groupBy("impAdvertId, impVariationName, impMinute")
      .select("impAdvertId, impVariationName, clickAdvertId.count as 
clickCount, impMinute")
       .where("clickCount != null");
[.... same logic to count impressions]

Now to debug and to see if the counts are correct I usually use 
"tEnv.toAppendStream(allImpressionTable, Result.class).print()" and I'm able to 
use that new created stream to send it back on a kinesis Stream

However I have an error saying that I cannot use toAppendStream and that 
instead I have to use toRetractStream. It indeed works and I can see the counts 
in the output are correct however I don't understand how I can use the result 
contained in this new stream because it has multiple rows with "true"/"false" 
and the correct count is usuall the last entry with the "true" key.

I have multiple question:

1) I'm very new with Flink and I would like to know if my approach to 
filter-out un-matching events is the correct one ? (stream -> table and joins 
-> stream)
Is there a much easier way of doing this ? Is it perhaps possible to filter all 
these events directly in the DataStream?


2) How do I use the retractStream? How do use it in order to send the final 
counts to a sink and not the entirety of the "true/False" insert/Delete rows?


Thank you!

Reply via email to