Actually looks like I found why the "count(*) AS occurrence" + filter "occurrence = 1" doesn't work. If there are multiple events with the same event time, they get handled together and share the value for count(*). I printed out some rows before the filter* and this is what I get:
4> {"s_aid1":"AN16797819174E392E6BA4A48E3F6C53884A0C1247F643294D1E57B535A4C142C1","s_cid":"TestClient","processdate":"20180219","rowtime":"2018-02-19T08:54:05.667+0000","occurrence":2} 4> {"s_aid1":"AN16797819174E392E6BA4A48E3F6C53884A0C1247F643294D1E57B535A4C142C1","s_cid":"TestClient","processdate":"20180219","rowtime":"2018-02-19T08:54:05.667+0000","occurrence":2} 4> {"s_aid1":"AN16797819174E392E6BA4A48E3F6C53884A0C1247F643294D1E57B535A4C142C1","s_cid":"TestClient","processdate":"20180219","rowtime":"2018-02-19T08:54:05.668+0000","occurrence":5} 4> {"s_aid1":"AN16797819174E392E6BA4A48E3F6C53884A0C1247F643294D1E57B535A4C142C1","s_cid":"TestClient","processdate":"20180219","rowtime":"2018-02-19T08:54:05.668+0000","occurrence":5} 4> {"s_aid1":"AN16797819174E392E6BA4A48E3F6C53884A0C1247F643294D1E57B535A4C142C1","s_cid":"TestClient","processdate":"20180219","rowtime":"2018-02-19T08:54:05.668+0000","occurrence":5} Note that I'm now actually using INTERVAL '1' DAY instead of '1' HOUR. To match that, formatting is: "CAST(DATE_FORMAT(rowtime, '%Y%m%d') AS VARCHAR) AS processdate". So at least the count(*) trick won't work just like that. For me it's fine to just say that early firing is not possible. Trying to achieve that with some kind of workaround also seems a bit risky to me, like the above attempt shows. Maybe it's best to wait for this to be supported properly. As I said I don't seem to really need early firing right now, because writing out all distinct values once window closes is not too slow for us at the moment. Thanks again, Juho *) The full query was: SELECT s_aid1, s_cid, CAST(DATE_FORMAT(rowtime, '%Y%m%d') AS VARCHAR) AS processdate, rowtime, COUNT(*) OVER ( PARTITION BY s_aid1, s_cid, CAST(DATE_FORMAT(rowtime, '%Y%m%d') AS VARCHAR) ORDER BY rowtime RANGE BETWEEN INTERVAL '1' DAY PRECEDING AND CURRENT ROW ) AS occurrence FROM events WHERE s_aid1 IS NOT NULL AND s_aid1 <> '' AND s_cid IS NOT NULL On Tue, Feb 27, 2018 at 7:52 PM, Juho Autio <juho.au...@rovio.com> wrote: > > a query with an OVER aggregation should emit exactly one row for each > input row. > > Does your comment on "isn't catching all distinct values" mean that this > is not the case? > > Not really what I meant? The problem is that some ids are not received at > all for some time windows. > > I did this as you suggested, this part works (there are no duplicates): > > Table result = tableEnv.sql(uniqueIdsSql); > // remove duplicates (this is a trick to get only distinct values > but get them asap) > result = result.filter("occurrence = 1"); > > I'm seeing all ids at least once, but missing them from some time windows > where they occurred as well. So it seems like the uniqueness is not > properly scoped to the time windows. I don't see why not.. > > I can try to create a simplified class to hopefully reproduce this > problem. Maybe also for the original issue that I encountered with hop > start/end timestamps after a join. > > Thanks for looking into this so far! > > On Tue, Feb 27, 2018 at 4:45 PM, Fabian Hueske <fhue...@gmail.com> wrote: > >> Hi Juho, >> >> a query with an OVER aggregation should emit exactly one row for each >> input row. >> Does your comment on "isn't catching all distinct values" mean that this >> is not the case? >> >> You can combine tumbling windows and over aggregates also by nesting >> queries as shown below: >> >> SELECT >> s_aid1, >> s_cid, >> first_seen, >> MIN(first_seen) OVER (PARTITION BY s_aid1, s_cid ORDER BY rowtime RANGE >> BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW) AS first_seen_1h, >> processdate, >> tumble_start, >> tumble_end >> FROM ( >> SELECT >> s_aid1, >> s_cid, >> TS_MIN(rowtime) AS first_seen, >> CAST(DATE_FORMAT(TUMBLE_START(rowtime, INTERVAL '10' SECOND), >> '%Y%m%d/%H/%i/%S') AS VARCHAR) AS processdate, >> TUMBLE_START(rowtime, INTERVAL '10' SECOND) AS tumble_start, >> TUMBLE_END(rowtime, INTERVAL '10' SECOND) AS tumble_end, >> TUMBLE_ROWTIME(rowtime, INTERVAL '10' SECOND) AS rowtime >> FROM events >> WHERE s_aid1 IS NOT NULL >> GROUP BY >> s_aid1, >> s_cid, >> TUMBLE(rowtime, INTERVAL '10' SECOND) >> ) >> >> Early triggering is not yet supported for SQL queries. >> >> Best, Fabian >> >> 2018-02-27 15:20 GMT+01:00 Juho Autio <juho.au...@rovio.com>: >> >>> Thanks for the hint! For some reason it isn't catching all distinct >>> values (even though it's a much simpler way than what I initially tried and >>> seems good in that sense). First of all, isn't this like a sliding window: >>> "rowtime RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW"? >>> >>> My use case needs a tumbling window. I tried adding PARTITION BY >>> additionally with DATE_FORMAT(rowtime, '%Y%m%d%H') to achieve the same >>> result as with a tumbling window; this resulted in slightly more distinct >>> values, but was still missing some! Would there by some nice way to create >>> a tumbling window right in the RANGE condition instead? >>> >>> As a disclaimer I have to say we seem to be fine using a simple window >>> _without_ any early triggering. But of course it would be nice to >>> understand how early triggering could be enabled in a simple & scalable way. >>> >>> Cheers, >>> Juho >>> >>> On Mon, Feb 19, 2018 at 1:44 PM, Fabian Hueske <fhue...@gmail.com> >>> wrote: >>> >>>> Hi Juho, >>>> >>>> sorry for the late response. I found time to look into this issue. >>>> I agree, that the start and end timestamps of the HOP window should be >>>> 1 hour apart from each other. I tried to reproduce the issue, but was not >>>> able to do so. >>>> Can you maybe open a JIRA and provide a simple test case (collection >>>> data source, no Kafka) that reproduces the issue? >>>> >>>> Regarding the task that you are trying to solve, have you looked into >>>> OVER windows? >>>> >>>> The following query would count for each record, how often a record >>>> with the same ID combination was observed in the last hour based on its >>>> timestamp: >>>> >>>> SELECT >>>> s_aid1, >>>> s_cid, >>>> COUNT(*) OVER (PARTITION BY s_aid1, s_cid ORDER BY rowtime RANGE >>>> BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW) AS occurrence, >>>> rowtime >>>> FROM events >>>> WHERE s_aid1 IS NOT NULL >>>> >>>> If occurrence is 1, the current record is the only record within the >>>> last 1 hour with the combination of aid and cid . >>>> The query does not batch the stream by 10 seconds, but rather produces >>>> the results in real-time. If the batching is not required, you should be >>>> good by adding a filter on occurrence = 1. >>>> Otherwise, you could add the filter and wrap it by 10 secs tumbling >>>> window. >>>> >>>> Hope this helps, >>>> Fabian >>>> >>>> >>>> 2018-02-14 15:30 GMT+01:00 Juho Autio <juho.au...@rovio.com>: >>>> >>>>> I'm joining a tumbling & hopping window in Flink 1.5-SNAPSHOT. The >>>>> result is unexpected. Am I doing something wrong? Maybe this is just not a >>>>> supported join type at all? Any way here goes: >>>>> >>>>> I first register these two tables: >>>>> >>>>> 1. new_ids: a tumbling window of seen ids within the last 10 seconds: >>>>> >>>>> SELECT >>>>> s_aid1, >>>>> s_cid, >>>>> TS_MIN(rowtime) AS first_seen, >>>>> CAST(DATE_FORMAT(TUMBLE_START(rowtime, INTERVAL '10' SECOND), >>>>> '%Y%m%d/%H/%i/%S') AS VARCHAR) AS processdate, >>>>> TUMBLE_START(rowtime, INTERVAL '10' SECOND) AS tumble_start, >>>>> TUMBLE_END(rowtime, INTERVAL '10' SECOND) AS tumble_end >>>>> FROM events >>>>> WHERE s_aid1 IS NOT NULL >>>>> GROUP BY >>>>> s_aid1, >>>>> s_cid, >>>>> TUMBLE(rowtime, INTERVAL '10' SECOND) >>>>> >>>>> 2. seen_ids: a sliding window of seen ids 1 hour backwards, 10 second >>>>> hop: >>>>> >>>>> SELECT >>>>> s_aid1, >>>>> s_cid, >>>>> TS_MIN(rowtime) AS first_seen, >>>>> CAST(HOP_START(rowtime, INTERVAL '10' SECOND, INTERVAL '1' HOUR) AS >>>>> DATE) AS processdate, >>>>> HOP_START(rowtime, INTERVAL '10' SECOND, INTERVAL '1' HOUR) AS >>>>> HOP_start, >>>>> HOP_END(rowtime, INTERVAL '10' SECOND, INTERVAL '1' HOUR) AS HOP_end >>>>> FROM events >>>>> WHERE s_aid1 IS NOT NULL >>>>> GROUP BY >>>>> s_aid1, >>>>> s_cid, >>>>> HOP(rowtime, INTERVAL '10' SECOND, INTERVAL '1' HOUR) >>>>> >>>>> If I write the results of the "seen_ids" table, the difference between >>>>> HOP_start and HOP_end is always 1 hour, as expected. >>>>> >>>>> Then I register another query that joins the 2 tables: >>>>> >>>>> unique_ids (mostly including fields for debugging - what I need is the >>>>> unique, new combinations of s_cid x s_aid1): >>>>> >>>>> SELECT >>>>> new_ids.s_cid, >>>>> new_ids.s_aid1, >>>>> new_ids.processdate AS processdate, >>>>> seen_ids.processdate AS seen_ids_processdate, >>>>> new_ids.first_seen AS new_ids_first_seen, >>>>> seen_ids.first_seen AS seen_ids_first_seen, >>>>> tumble_start, >>>>> HOP_start, >>>>> tumble_end, >>>>> HOP_end >>>>> FROM new_ids, seen_ids >>>>> WHERE new_ids.s_cid = seen_ids.s_cid >>>>> AND new_ids.s_aid1 = seen_ids.s_aid1 >>>>> AND (new_ids.first_seen <= seen_ids.first_seen OR >>>>> seen_ids.first_seen IS NULL) >>>>> >>>>> I print the results of this table, and surprisingly the HOP_start & >>>>> HOP_end are only separated by 10 seconds. Is this a bug? >>>>> >>>>> { >>>>> "s_cid": "appsimulator_236e5fb7", >>>>> "s_aid1": "L1GENe52d723b-b563-492f-942d-3dc1a31d7e26", >>>>> >>>>> "seen_ids_processdate": "2018-02-14", >>>>> >>>>> "seen_ids_first_seen": "2018-02-14 11:37:59.0", >>>>> "new_ids_first_seen": "2018-02-14 11:34:33.0", >>>>> "tumble_start": "2018-02-14 11:34:30.0", >>>>> "tumble_end": "2018-02-14 11:34:40.0", >>>>> >>>>> "HOP_start": "2018-02-14 11:37:50.0", >>>>> "HOP_end": "2018-02-14 11:38:00.0" >>>>> } >>>>> >>>>> What I'm trying to do is exclude the id from the current "new_ids" >>>>> window if it was already seen before (within the 1 hour scope of >>>>> "seen_ids"), but that doesn't work either. This example result row also >>>>> shows that "seen_ids.first_seen" is bigger than it should be. >>>>> >>>>> >>>>> Even if I can find a fix to this to get what I need, this strategy >>>>> seems overly complicated. If anyone can suggest a better way, I'd be glad >>>>> to hear. If this was a batch job, it could be defined simply as: >>>>> >>>>> SELECT DISTINCT s_cid, s_aid1, DATE_FORMAT(rowtime, '%Y%m%d/%H') >>>>> >>>>> + when streaming this query, the new distinct values should be written >>>>> out every 10 seconds (ONLY the new ones - within that wrapping 1 hour >>>>> window). So far I haven't been able to figure out how to do that in a >>>>> simple way with Flink. >>>>> >>>>> >>>>> *) TS_MIN is a custom function, but it's just a mapping of Flink's >>>>> MinAggFunction: >>>>> >>>>> import java.sql.Timestamp >>>>> >>>>> import com.rovio.ds.flink.common.udaf.ImplicitOrdering.ordered >>>>> >>>>> import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo >>>>> import org.apache.flink.table.functions.aggfunctions.MaxAggFunction >>>>> import org.apache.flink.table.functions.aggfunctions.MinAggFunction >>>>> >>>>> object TimestampAggFunctions { >>>>> >>>>> trait TimestampAggFunction { >>>>> def getInitValue = null >>>>> def getValueTypeInfo = SqlTimeTypeInfo.TIMESTAMP >>>>> } >>>>> >>>>> class TimestampMinAggFunction extends MinAggFunction[Timestamp] with >>>>> TimestampAggFunction >>>>> class TimestampMaxAggFunction extends MaxAggFunction[Timestamp] with >>>>> TimestampAggFunction >>>>> >>>>> } >>>>> >>>>> // Registered with: >>>>> tableEnv.registerFunction("TS_MIN", new TimestampMinAggFunction()); >>>>> >>>>> >>>> >>> >> >