Hi Juho, sorry for the late response. I found time to look into this issue. I agree, that the start and end timestamps of the HOP window should be 1 hour apart from each other. I tried to reproduce the issue, but was not able to do so. Can you maybe open a JIRA and provide a simple test case (collection data source, no Kafka) that reproduces the issue?
Regarding the task that you are trying to solve, have you looked into OVER windows? The following query would count for each record, how often a record with the same ID combination was observed in the last hour based on its timestamp: SELECT s_aid1, s_cid, COUNT(*) OVER (PARTITION BY s_aid1, s_cid ORDER BY rowtime RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW) AS occurrence, rowtime FROM events WHERE s_aid1 IS NOT NULL If occurrence is 1, the current record is the only record within the last 1 hour with the combination of aid and cid . The query does not batch the stream by 10 seconds, but rather produces the results in real-time. If the batching is not required, you should be good by adding a filter on occurrence = 1. Otherwise, you could add the filter and wrap it by 10 secs tumbling window. Hope this helps, Fabian 2018-02-14 15:30 GMT+01:00 Juho Autio <juho.au...@rovio.com>: > I'm joining a tumbling & hopping window in Flink 1.5-SNAPSHOT. The result > is unexpected. Am I doing something wrong? Maybe this is just not a > supported join type at all? Any way here goes: > > I first register these two tables: > > 1. new_ids: a tumbling window of seen ids within the last 10 seconds: > > SELECT > s_aid1, > s_cid, > TS_MIN(rowtime) AS first_seen, > CAST(DATE_FORMAT(TUMBLE_START(rowtime, INTERVAL '10' SECOND), > '%Y%m%d/%H/%i/%S') AS VARCHAR) AS processdate, > TUMBLE_START(rowtime, INTERVAL '10' SECOND) AS tumble_start, > TUMBLE_END(rowtime, INTERVAL '10' SECOND) AS tumble_end > FROM events > WHERE s_aid1 IS NOT NULL > GROUP BY > s_aid1, > s_cid, > TUMBLE(rowtime, INTERVAL '10' SECOND) > > 2. seen_ids: a sliding window of seen ids 1 hour backwards, 10 second hop: > > SELECT > s_aid1, > s_cid, > TS_MIN(rowtime) AS first_seen, > CAST(HOP_START(rowtime, INTERVAL '10' SECOND, INTERVAL '1' HOUR) AS > DATE) AS processdate, > HOP_START(rowtime, INTERVAL '10' SECOND, INTERVAL '1' HOUR) AS HOP_start, > HOP_END(rowtime, INTERVAL '10' SECOND, INTERVAL '1' HOUR) AS HOP_end > FROM events > WHERE s_aid1 IS NOT NULL > GROUP BY > s_aid1, > s_cid, > HOP(rowtime, INTERVAL '10' SECOND, INTERVAL '1' HOUR) > > If I write the results of the "seen_ids" table, the difference between > HOP_start and HOP_end is always 1 hour, as expected. > > Then I register another query that joins the 2 tables: > > unique_ids (mostly including fields for debugging - what I need is the > unique, new combinations of s_cid x s_aid1): > > SELECT > new_ids.s_cid, > new_ids.s_aid1, > new_ids.processdate AS processdate, > seen_ids.processdate AS seen_ids_processdate, > new_ids.first_seen AS new_ids_first_seen, > seen_ids.first_seen AS seen_ids_first_seen, > tumble_start, > HOP_start, > tumble_end, > HOP_end > FROM new_ids, seen_ids > WHERE new_ids.s_cid = seen_ids.s_cid > AND new_ids.s_aid1 = seen_ids.s_aid1 > AND (new_ids.first_seen <= seen_ids.first_seen OR seen_ids.first_seen IS > NULL) > > I print the results of this table, and surprisingly the HOP_start & > HOP_end are only separated by 10 seconds. Is this a bug? > > { > "s_cid": "appsimulator_236e5fb7", > "s_aid1": "L1GENe52d723b-b563-492f-942d-3dc1a31d7e26", > > "seen_ids_processdate": "2018-02-14", > > "seen_ids_first_seen": "2018-02-14 11:37:59.0", > "new_ids_first_seen": "2018-02-14 11:34:33.0", > "tumble_start": "2018-02-14 11:34:30.0", > "tumble_end": "2018-02-14 11:34:40.0", > > "HOP_start": "2018-02-14 11:37:50.0", > "HOP_end": "2018-02-14 11:38:00.0" > } > > What I'm trying to do is exclude the id from the current "new_ids" window > if it was already seen before (within the 1 hour scope of "seen_ids"), but > that doesn't work either. This example result row also shows that > "seen_ids.first_seen" is bigger than it should be. > > > Even if I can find a fix to this to get what I need, this strategy seems > overly complicated. If anyone can suggest a better way, I'd be glad to > hear. If this was a batch job, it could be defined simply as: > > SELECT DISTINCT s_cid, s_aid1, DATE_FORMAT(rowtime, '%Y%m%d/%H') > > + when streaming this query, the new distinct values should be written out > every 10 seconds (ONLY the new ones - within that wrapping 1 hour window). > So far I haven't been able to figure out how to do that in a simple way > with Flink. > > > *) TS_MIN is a custom function, but it's just a mapping of Flink's > MinAggFunction: > > import java.sql.Timestamp > > import com.rovio.ds.flink.common.udaf.ImplicitOrdering.ordered > > import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo > import org.apache.flink.table.functions.aggfunctions.MaxAggFunction > import org.apache.flink.table.functions.aggfunctions.MinAggFunction > > object TimestampAggFunctions { > > trait TimestampAggFunction { > def getInitValue = null > def getValueTypeInfo = SqlTimeTypeInfo.TIMESTAMP > } > > class TimestampMinAggFunction extends MinAggFunction[Timestamp] with > TimestampAggFunction > class TimestampMaxAggFunction extends MaxAggFunction[Timestamp] with > TimestampAggFunction > > } > > // Registered with: > tableEnv.registerFunction("TS_MIN", new TimestampMinAggFunction()); > >