Hello,
I have a steam of events (coming from a Kinesis Stream) of this form:
impressionId | advertid | variationName | eventType | eventTime
The end goal is to output back on a Kinesis Stream the count of event of type
'impression' and the count of events of type 'click'
however, I need to drop (or ignore) event of type clicks that don't have a
matching impressionId with an event of type 'impression' (So basically I need
to discard click event that don't have an impression)
This is how tackled my solution:
// Convert the stream into a table
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
Table eventsTable = tEnv.fromDataStream(eventsStream, "impressionId, advertId,
variationName, eventType, eventTime.rowtime");
tEnv.registerTable("Events", eventsTable);
// Create a table with only event of type clicks
Table clicksTable = eventsTable
.where("eventType = 'click'")
.window(Slide.over("24.hour").every("1.minute").on("eventTime").as("minuteWindow"))
.groupBy("impressionId, advertId, variationName, minuteWindow")
.select("impressionId as clickImpId, creativeId as clickAdvertId,
variationName as clickVariationName, minuteWindow.rowTime as clickMinute");
// Create a table with only event of type impression
Table impressionsTable = eventsTable
.where("eventType = 'impression'")
.window(Slide.over("24.hour").every("1.minute").on("eventTime").as("minuteWindow"))
.groupBy("impressionId, advertId, variationName, minuteWindow")
.select("impressionId as impImpressionId, advertId as impAdvertId,
variationName as impVariationName, eventTime, minuteWindow.rowTime as
impMinute");
// left join the impression with the clicks using the impressionId as well as
the temporal field
//and then group by to generate a count of all the click that have a matching
impression (aka row where clickAdvertId is not null)
Table allImpressionTable = impressionsTable
.leftOuterJoin(clicksTable, "clickImpId = impImpressionId && clickMinute
= impMinute")
.groupBy("impAdvertId, impVariationName, impMinute")
.select("impAdvertId, impVariationName, clickAdvertId.count as
clickCount, impMinute")
.where("clickCount != null");
[.... same logic to count impressions]
Now to debug and to see if the counts are correct I usually use
"tEnv.toAppendStream(allImpressionTable, Result.class).print()" and I'm able to
use that new created stream to send it back on a kinesis Stream
However I have an error saying that I cannot use toAppendStream and that
instead I have to use toRetractStream. It indeed works and I can see the counts
in the output are correct however I don't understand how I can use the result
contained in this new stream because it has multiple rows with "true"/"false"
and the correct count is usuall the last entry with the "true" key.
I have multiple question:
1) I'm very new with Flink and I would like to know if my approach to
filter-out un-matching events is the correct one ? (stream -> table and joins
-> stream)
Is there a much easier way of doing this ? Is it perhaps possible to filter all
these events directly in the DataStream?
2) How do I use the retractStream? How do use it in order to send the final
counts to a sink and not the entirety of the "true/False" insert/Delete rows?
Thank you!