I'm joining a tumbling & hopping window in Flink 1.5-SNAPSHOT. The result is unexpected. Am I doing something wrong? Maybe this is just not a supported join type at all? Any way here goes:
I first register these two tables: 1. new_ids: a tumbling window of seen ids within the last 10 seconds: SELECT s_aid1, s_cid, TS_MIN(rowtime) AS first_seen, CAST(DATE_FORMAT(TUMBLE_START(rowtime, INTERVAL '10' SECOND), '%Y%m%d/%H/%i/%S') AS VARCHAR) AS processdate, TUMBLE_START(rowtime, INTERVAL '10' SECOND) AS tumble_start, TUMBLE_END(rowtime, INTERVAL '10' SECOND) AS tumble_end FROM events WHERE s_aid1 IS NOT NULL GROUP BY s_aid1, s_cid, TUMBLE(rowtime, INTERVAL '10' SECOND) 2. seen_ids: a sliding window of seen ids 1 hour backwards, 10 second hop: SELECT s_aid1, s_cid, TS_MIN(rowtime) AS first_seen, CAST(HOP_START(rowtime, INTERVAL '10' SECOND, INTERVAL '1' HOUR) AS DATE) AS processdate, HOP_START(rowtime, INTERVAL '10' SECOND, INTERVAL '1' HOUR) AS HOP_start, HOP_END(rowtime, INTERVAL '10' SECOND, INTERVAL '1' HOUR) AS HOP_end FROM events WHERE s_aid1 IS NOT NULL GROUP BY s_aid1, s_cid, HOP(rowtime, INTERVAL '10' SECOND, INTERVAL '1' HOUR) If I write the results of the "seen_ids" table, the difference between HOP_start and HOP_end is always 1 hour, as expected. Then I register another query that joins the 2 tables: unique_ids (mostly including fields for debugging - what I need is the unique, new combinations of s_cid x s_aid1): SELECT new_ids.s_cid, new_ids.s_aid1, new_ids.processdate AS processdate, seen_ids.processdate AS seen_ids_processdate, new_ids.first_seen AS new_ids_first_seen, seen_ids.first_seen AS seen_ids_first_seen, tumble_start, HOP_start, tumble_end, HOP_end FROM new_ids, seen_ids WHERE new_ids.s_cid = seen_ids.s_cid AND new_ids.s_aid1 = seen_ids.s_aid1 AND (new_ids.first_seen <= seen_ids.first_seen OR seen_ids.first_seen IS NULL) I print the results of this table, and surprisingly the HOP_start & HOP_end are only separated by 10 seconds. Is this a bug? { "s_cid": "appsimulator_236e5fb7", "s_aid1": "L1GENe52d723b-b563-492f-942d-3dc1a31d7e26", "seen_ids_processdate": "2018-02-14", "seen_ids_first_seen": "2018-02-14 11:37:59.0", "new_ids_first_seen": "2018-02-14 11:34:33.0", "tumble_start": "2018-02-14 11:34:30.0", "tumble_end": "2018-02-14 11:34:40.0", "HOP_start": "2018-02-14 11:37:50.0", "HOP_end": "2018-02-14 11:38:00.0" } What I'm trying to do is exclude the id from the current "new_ids" window if it was already seen before (within the 1 hour scope of "seen_ids"), but that doesn't work either. This example result row also shows that "seen_ids.first_seen" is bigger than it should be. Even if I can find a fix to this to get what I need, this strategy seems overly complicated. If anyone can suggest a better way, I'd be glad to hear. If this was a batch job, it could be defined simply as: SELECT DISTINCT s_cid, s_aid1, DATE_FORMAT(rowtime, '%Y%m%d/%H') + when streaming this query, the new distinct values should be written out every 10 seconds (ONLY the new ones - within that wrapping 1 hour window). So far I haven't been able to figure out how to do that in a simple way with Flink. *) TS_MIN is a custom function, but it's just a mapping of Flink's MinAggFunction: import java.sql.Timestamp import com.rovio.ds.flink.common.udaf.ImplicitOrdering.ordered import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo import org.apache.flink.table.functions.aggfunctions.MaxAggFunction import org.apache.flink.table.functions.aggfunctions.MinAggFunction object TimestampAggFunctions { trait TimestampAggFunction { def getInitValue = null def getValueTypeInfo = SqlTimeTypeInfo.TIMESTAMP } class TimestampMinAggFunction extends MinAggFunction[Timestamp] with TimestampAggFunction class TimestampMaxAggFunction extends MaxAggFunction[Timestamp] with TimestampAggFunction } // Registered with: tableEnv.registerFunction("TS_MIN", new TimestampMinAggFunction());