Hello, I have a steam of events (coming from a Kinesis Stream) of this form:
impressionId | advertid | variationName | eventType | eventTime The end goal is to output back on a Kinesis Stream the count of event of type 'impression' and the count of events of type 'click' however, I need to drop (or ignore) event of type clicks that don't have a matching impressionId with an event of type 'impression' (So basically I need to discard click event that don't have an impression) This is how tackled my solution: // Convert the stream into a table StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); Table eventsTable = tEnv.fromDataStream(eventsStream, "impressionId, advertId, variationName, eventType, eventTime.rowtime"); tEnv.registerTable("Events", eventsTable); // Create a table with only event of type clicks Table clicksTable = eventsTable .where("eventType = 'click'") .window(Slide.over("24.hour").every("1.minute").on("eventTime").as("minuteWindow")) .groupBy("impressionId, advertId, variationName, minuteWindow") .select("impressionId as clickImpId, creativeId as clickAdvertId, variationName as clickVariationName, minuteWindow.rowTime as clickMinute"); // Create a table with only event of type impression Table impressionsTable = eventsTable .where("eventType = 'impression'") .window(Slide.over("24.hour").every("1.minute").on("eventTime").as("minuteWindow")) .groupBy("impressionId, advertId, variationName, minuteWindow") .select("impressionId as impImpressionId, advertId as impAdvertId, variationName as impVariationName, eventTime, minuteWindow.rowTime as impMinute"); // left join the impression with the clicks using the impressionId as well as the temporal field //and then group by to generate a count of all the click that have a matching impression (aka row where clickAdvertId is not null) Table allImpressionTable = impressionsTable .leftOuterJoin(clicksTable, "clickImpId = impImpressionId && clickMinute = impMinute") .groupBy("impAdvertId, impVariationName, impMinute") .select("impAdvertId, impVariationName, clickAdvertId.count as clickCount, impMinute") .where("clickCount != null"); [.... same logic to count impressions] Now to debug and to see if the counts are correct I usually use "tEnv.toAppendStream(allImpressionTable, Result.class).print()" and I'm able to use that new created stream to send it back on a kinesis Stream However I have an error saying that I cannot use toAppendStream and that instead I have to use toRetractStream. It indeed works and I can see the counts in the output are correct however I don't understand how I can use the result contained in this new stream because it has multiple rows with "true"/"false" and the correct count is usuall the last entry with the "true" key. I have multiple question: 1) I'm very new with Flink and I would like to know if my approach to filter-out un-matching events is the correct one ? (stream -> table and joins -> stream) Is there a much easier way of doing this ? Is it perhaps possible to filter all these events directly in the DataStream? 2) How do I use the retractStream? How do use it in order to send the final counts to a sink and not the entirety of the "true/False" insert/Delete rows? Thank you!