Hi Faye, 1) In your sql, different events are for different groups, it seems hard to extract a global Filter into DataStream. 2) AFAK, you can just drop the retract message (the flag is false), and then convert the retract stream to append stream. The downstream job needs to duplicate the records, just like [1]
[1] https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication Best, Godfrey Faye Pressly <faye.press...@outlook.com> 于2020年8月8日周六 上午3:30写道: > Sorry just notice I made a typo in the last table (clickAdvertId != null > instead of clickCount !=null) > > Table allImpressionTable = impressionsTable > .leftOuterJoin(clicksTable, "clickImpId = impImpressionId && > clickMinute = impMinute") > .groupBy("impAdvertId, impVariationName, impMinute") > .select("impAdvertId, impVariationName, clickAdvertId.count as > clickCount, impMinute") > .where("clickAdvertId != null"); > > ------------------------------ > *From:* Faye Pressly > *Sent:* Friday, August 7, 2020 9:28 PM > *To:* user@flink.apache.org <user@flink.apache.org> > *Subject:* GroupBy with count on a joint table only let met write using > toRetractStream > > Hello, > > I have a steam of events (coming from a Kinesis Stream) of this form: > > impressionId | advertid | variationName | eventType | eventTime > > The end goal is to output back on a Kinesis Stream the count of event of > type 'impression' and the count of events of type 'click' > > however, I need to drop (or ignore) event of type clicks that don't have a > matching impressionId with an event of type 'impression' (So basically I > need to discard click event that don't have an impression) > > This is how tackled my solution: > > // Convert the stream into a table > StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); > Table eventsTable = tEnv.fromDataStream(eventsStream, "impressionId, > advertId, variationName, eventType, eventTime.rowtime"); > tEnv.registerTable("Events", eventsTable); > > // Create a table with only event of type clicks > Table clicksTable = eventsTable > .where("eventType = 'click'") > > .window(Slide.over("24.hour").every("1.minute").on("eventTime").as("minuteWindow")) > .groupBy("impressionId, advertId, variationName, minuteWindow") > .select("impressionId as clickImpId, creativeId as clickAdvertId, > variationName as clickVariationName, minuteWindow.rowTime as clickMinute"); > > // Create a table with only event of type impression > Table impressionsTable = eventsTable > .where("eventType = 'impression'") > > .window(Slide.over("24.hour").every("1.minute").on("eventTime").as("minuteWindow")) > .groupBy("impressionId, advertId, variationName, minuteWindow") > .select("impressionId as impImpressionId, advertId as impAdvertId, > variationName as impVariationName, eventTime, minuteWindow.rowTime as > impMinute"); > > // left join the impression with the clicks using the impressionId as well > as the temporal field > //and then group by to generate a count of all the click that have a > matching impression (aka row where clickAdvertId is not null) > Table allImpressionTable = impressionsTable > .leftOuterJoin(clicksTable, "clickImpId = impImpressionId && > clickMinute = impMinute") > .groupBy("impAdvertId, impVariationName, impMinute") > .select("impAdvertId, impVariationName, clickAdvertId.count as > clickCount, impMinute") > .where("clickCount != null"); > [.... same logic to count impressions] > > Now to debug and to see if the counts are correct I usually use " > tEnv.toAppendStream(allImpressionTable, Result.class).print()" and I'm > able to use that new created stream to send it back on a kinesis Stream > > However I have an error saying that I cannot use toAppendStream and that > instead I have to use toRetractStream. It indeed works and I can see the > counts in the output are correct however I don't understand how I can use > the result contained in this new stream because it has multiple rows with > "true"/"false" and the correct count is usuall the last entry with the > "true" key. > > I have multiple question: > > 1) I'm very new with Flink and I would like to know if my approach to > filter-out un-matching events is the correct one ? (stream -> table and > joins -> stream) > Is there a much easier way of doing this ? Is it perhaps possible to > filter all these events directly in the DataStream? > > > 2) How do I use the retractStream? How do use it in order to send the > final counts to a sink and not the entirety of the "true/False" > insert/Delete rows? > > > Thank you! >