Actually looks like I found why the "count(*) AS occurrence" + filter
"occurrence = 1" doesn't work. If there are multiple events with the same
event time, they get handled together and share the value for count(*). I
printed out some rows before the filter* and this is what I get:

4>
{"s_aid1":"AN16797819174E392E6BA4A48E3F6C53884A0C1247F643294D1E57B535A4C142C1","s_cid":"TestClient","processdate":"20180219","rowtime":"2018-02-19T08:54:05.667+0000","occurrence":2}
4>
{"s_aid1":"AN16797819174E392E6BA4A48E3F6C53884A0C1247F643294D1E57B535A4C142C1","s_cid":"TestClient","processdate":"20180219","rowtime":"2018-02-19T08:54:05.667+0000","occurrence":2}
4>
{"s_aid1":"AN16797819174E392E6BA4A48E3F6C53884A0C1247F643294D1E57B535A4C142C1","s_cid":"TestClient","processdate":"20180219","rowtime":"2018-02-19T08:54:05.668+0000","occurrence":5}
4>
{"s_aid1":"AN16797819174E392E6BA4A48E3F6C53884A0C1247F643294D1E57B535A4C142C1","s_cid":"TestClient","processdate":"20180219","rowtime":"2018-02-19T08:54:05.668+0000","occurrence":5}
4>
{"s_aid1":"AN16797819174E392E6BA4A48E3F6C53884A0C1247F643294D1E57B535A4C142C1","s_cid":"TestClient","processdate":"20180219","rowtime":"2018-02-19T08:54:05.668+0000","occurrence":5}

Note that I'm now actually using INTERVAL '1' DAY instead of '1' HOUR. To
match that, formatting is: "CAST(DATE_FORMAT(rowtime, '%Y%m%d') AS VARCHAR)
AS processdate".

So at least the count(*) trick won't work just like that.

For me it's fine to just say that early firing is not possible. Trying to
achieve that with some kind of workaround also seems a bit risky to me, like
the above attempt shows. Maybe it's best to wait for this to be supported
properly. As I said I don't seem to really need early firing right now,
because writing out all distinct values once window closes is not too slow
for us at the moment.

Thanks again,
Juho

*) The full query was:

SELECT
  s_aid1,
  s_cid,
  CAST(DATE_FORMAT(rowtime, '%Y%m%d') AS VARCHAR) AS processdate,
  rowtime,
  COUNT(*) OVER (
    PARTITION BY s_aid1, s_cid, CAST(DATE_FORMAT(rowtime, '%Y%m%d') AS
VARCHAR)
    ORDER BY rowtime
    RANGE BETWEEN INTERVAL '1' DAY
    PRECEDING AND CURRENT ROW
  ) AS occurrence
FROM events
WHERE s_aid1 IS NOT NULL AND s_aid1 <> '' AND s_cid IS NOT NULL

On Tue, Feb 27, 2018 at 7:52 PM, Juho Autio <juho.au...@rovio.com> wrote:

> > a query with an OVER aggregation should emit exactly one row for each
> input row.
> > Does your comment on "isn't catching all distinct values" mean that this
> is not the case?
>
> Not really what I meant? The problem is that some ids are not received at
> all for some time windows.
>
> I did this as you suggested, this part works (there are no duplicates):
>
>         Table result = tableEnv.sql(uniqueIdsSql);
>         // remove duplicates (this is a trick to get only distinct values
> but get them asap)
>         result = result.filter("occurrence = 1");
>
> I'm seeing all ids at least once, but missing them from some time windows
> where they occurred as well. So it seems like the uniqueness is not
> properly scoped to the time windows. I don't see why not..
>
> I can try to create a simplified class to hopefully reproduce this
> problem. Maybe also for the original issue that I encountered with hop
> start/end timestamps after a join.
>
> Thanks for looking into this so far!
>
> On Tue, Feb 27, 2018 at 4:45 PM, Fabian Hueske <fhue...@gmail.com> wrote:
>
>> Hi Juho,
>>
>> a query with an OVER aggregation should emit exactly one row for each
>> input row.
>> Does your comment on "isn't catching all distinct values" mean that this
>> is not the case?
>>
>> You can combine tumbling windows and over aggregates also by nesting
>> queries as shown below:
>>
>> SELECT
>>   s_aid1,
>>   s_cid,
>>   first_seen,
>>   MIN(first_seen) OVER (PARTITION BY s_aid1, s_cid ORDER BY rowtime RANGE
>> BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW) AS first_seen_1h,
>>   processdate,
>>   tumble_start,
>>   tumble_end
>> FROM (
>>   SELECT
>>     s_aid1,
>>     s_cid,
>>     TS_MIN(rowtime) AS first_seen,
>>     CAST(DATE_FORMAT(TUMBLE_START(rowtime, INTERVAL '10' SECOND),
>> '%Y%m%d/%H/%i/%S') AS VARCHAR) AS processdate,
>>     TUMBLE_START(rowtime, INTERVAL '10' SECOND) AS tumble_start,
>>     TUMBLE_END(rowtime, INTERVAL '10' SECOND) AS tumble_end,
>>     TUMBLE_ROWTIME(rowtime, INTERVAL '10' SECOND) AS rowtime
>>   FROM events
>>   WHERE s_aid1 IS NOT NULL
>>   GROUP BY
>>     s_aid1,
>>     s_cid,
>>     TUMBLE(rowtime, INTERVAL '10' SECOND)
>>   )
>>
>> Early triggering is not yet supported for SQL queries.
>>
>> Best, Fabian
>>
>> 2018-02-27 15:20 GMT+01:00 Juho Autio <juho.au...@rovio.com>:
>>
>>> Thanks for the hint! For some reason it isn't catching all distinct
>>> values (even though it's a much simpler way than what I initially tried and
>>> seems good in that sense). First of all, isn't this like a sliding window:
>>> "rowtime RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW"?
>>>
>>> My use case needs a tumbling window. I tried adding PARTITION BY
>>> additionally with DATE_FORMAT(rowtime, '%Y%m%d%H') to achieve the same
>>> result as with a tumbling window; this resulted in slightly more distinct
>>> values, but was still missing some! Would there by some nice way to create
>>> a tumbling window right in the RANGE condition instead?
>>>
>>> As a disclaimer I have to say we seem to be fine using a simple window
>>> _without_ any early triggering. But of course it would be nice to
>>> understand how early triggering could be enabled in a simple & scalable way.
>>>
>>> Cheers,
>>> Juho
>>>
>>> On Mon, Feb 19, 2018 at 1:44 PM, Fabian Hueske <fhue...@gmail.com>
>>> wrote:
>>>
>>>> Hi Juho,
>>>>
>>>> sorry for the late response. I found time to look into this issue.
>>>> I agree, that the start and end timestamps of the HOP window should be
>>>> 1 hour apart from each other. I tried to reproduce the issue, but was not
>>>> able to do so.
>>>> Can you maybe open a JIRA and provide a simple test case (collection
>>>> data source, no Kafka) that reproduces the issue?
>>>>
>>>> Regarding the task that you are trying to solve, have you looked into
>>>> OVER windows?
>>>>
>>>> The following query would count for each record, how often a record
>>>> with the same ID combination was observed in the last hour based on its
>>>> timestamp:
>>>>
>>>> SELECT
>>>>   s_aid1,
>>>>   s_cid,
>>>>   COUNT(*) OVER (PARTITION BY s_aid1, s_cid ORDER BY rowtime RANGE
>>>> BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW) AS occurrence,
>>>>   rowtime
>>>> FROM events
>>>> WHERE s_aid1 IS NOT NULL
>>>>
>>>> If occurrence is 1, the current record is the only record within the
>>>> last 1 hour with the combination of aid and cid .
>>>> The query does not batch the stream by 10 seconds, but rather produces
>>>> the results in real-time. If the batching is not required, you should be
>>>> good by adding a filter on occurrence = 1.
>>>> Otherwise, you could add the filter and wrap it by 10 secs tumbling
>>>> window.
>>>>
>>>> Hope this helps,
>>>> Fabian
>>>>
>>>>
>>>> 2018-02-14 15:30 GMT+01:00 Juho Autio <juho.au...@rovio.com>:
>>>>
>>>>> I'm joining a tumbling & hopping window in Flink 1.5-SNAPSHOT. The
>>>>> result is unexpected. Am I doing something wrong? Maybe this is just not a
>>>>> supported join type at all? Any way here goes:
>>>>>
>>>>> I first register these two tables:
>>>>>
>>>>> 1. new_ids: a tumbling window of seen ids within the last 10 seconds:
>>>>>
>>>>> SELECT
>>>>>   s_aid1,
>>>>>   s_cid,
>>>>>   TS_MIN(rowtime) AS first_seen,
>>>>>   CAST(DATE_FORMAT(TUMBLE_START(rowtime, INTERVAL '10' SECOND),
>>>>> '%Y%m%d/%H/%i/%S') AS VARCHAR) AS processdate,
>>>>>   TUMBLE_START(rowtime, INTERVAL '10' SECOND) AS tumble_start,
>>>>>   TUMBLE_END(rowtime, INTERVAL '10' SECOND) AS tumble_end
>>>>> FROM events
>>>>> WHERE s_aid1 IS NOT NULL
>>>>> GROUP BY
>>>>>   s_aid1,
>>>>>   s_cid,
>>>>>   TUMBLE(rowtime, INTERVAL '10' SECOND)
>>>>>
>>>>> 2. seen_ids: a sliding window of seen ids 1 hour backwards, 10 second
>>>>> hop:
>>>>>
>>>>> SELECT
>>>>>   s_aid1,
>>>>>   s_cid,
>>>>>   TS_MIN(rowtime) AS first_seen,
>>>>>   CAST(HOP_START(rowtime, INTERVAL '10' SECOND, INTERVAL '1' HOUR) AS
>>>>> DATE) AS processdate,
>>>>>   HOP_START(rowtime, INTERVAL '10' SECOND, INTERVAL '1' HOUR) AS
>>>>> HOP_start,
>>>>>   HOP_END(rowtime, INTERVAL '10' SECOND, INTERVAL '1' HOUR) AS HOP_end
>>>>> FROM events
>>>>> WHERE s_aid1 IS NOT NULL
>>>>> GROUP BY
>>>>>   s_aid1,
>>>>>   s_cid,
>>>>>   HOP(rowtime, INTERVAL '10' SECOND, INTERVAL '1' HOUR)
>>>>>
>>>>> If I write the results of the "seen_ids" table, the difference between
>>>>> HOP_start and HOP_end is always 1 hour, as expected.
>>>>>
>>>>> Then I register another query that joins the 2 tables:
>>>>>
>>>>> unique_ids (mostly including fields for debugging - what I need is the
>>>>> unique, new combinations of s_cid x s_aid1):
>>>>>
>>>>> SELECT
>>>>>    new_ids.s_cid,
>>>>>    new_ids.s_aid1,
>>>>>    new_ids.processdate AS processdate,
>>>>>    seen_ids.processdate AS seen_ids_processdate,
>>>>>    new_ids.first_seen AS new_ids_first_seen,
>>>>>    seen_ids.first_seen AS seen_ids_first_seen,
>>>>>    tumble_start,
>>>>>    HOP_start,
>>>>>    tumble_end,
>>>>>    HOP_end
>>>>> FROM new_ids, seen_ids
>>>>> WHERE new_ids.s_cid = seen_ids.s_cid
>>>>>   AND new_ids.s_aid1 = seen_ids.s_aid1
>>>>>   AND (new_ids.first_seen <= seen_ids.first_seen OR
>>>>> seen_ids.first_seen IS NULL)
>>>>>
>>>>> I print the results of this table, and surprisingly the HOP_start &
>>>>> HOP_end are only separated by 10 seconds. Is this a bug?
>>>>>
>>>>> {
>>>>>   "s_cid": "appsimulator_236e5fb7",
>>>>> "s_aid1": "L1GENe52d723b-b563-492f-942d-3dc1a31d7e26",
>>>>>
>>>>> "seen_ids_processdate": "2018-02-14",
>>>>>
>>>>> "seen_ids_first_seen": "2018-02-14 11:37:59.0",
>>>>> "new_ids_first_seen":  "2018-02-14 11:34:33.0",
>>>>> "tumble_start": "2018-02-14 11:34:30.0",
>>>>> "tumble_end": "2018-02-14 11:34:40.0",
>>>>>
>>>>> "HOP_start": "2018-02-14 11:37:50.0",
>>>>> "HOP_end": "2018-02-14 11:38:00.0"
>>>>> }
>>>>>
>>>>> What I'm trying to do is exclude the id from the current "new_ids"
>>>>> window if it was already seen before (within the 1 hour scope of
>>>>> "seen_ids"), but that doesn't work either. This example result row also
>>>>> shows that "seen_ids.first_seen" is bigger than it should be.
>>>>>
>>>>>
>>>>> Even if I can find a fix to this to get what I need, this strategy
>>>>> seems overly complicated. If anyone can suggest a better way, I'd be glad
>>>>> to hear. If this was a batch job, it could be defined simply as:
>>>>>
>>>>> SELECT DISTINCT s_cid, s_aid1, DATE_FORMAT(rowtime, '%Y%m%d/%H')
>>>>>
>>>>> + when streaming this query, the new distinct values should be written
>>>>> out every 10 seconds (ONLY the new ones - within that wrapping 1 hour
>>>>> window). So far I haven't been able to figure out how to do that in a
>>>>> simple way with Flink.
>>>>>
>>>>>
>>>>> *) TS_MIN is a custom function, but it's just a mapping of Flink's
>>>>> MinAggFunction:
>>>>>
>>>>> import java.sql.Timestamp
>>>>>
>>>>> import com.rovio.ds.flink.common.udaf.ImplicitOrdering.ordered
>>>>>
>>>>> import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo
>>>>> import org.apache.flink.table.functions.aggfunctions.MaxAggFunction
>>>>> import org.apache.flink.table.functions.aggfunctions.MinAggFunction
>>>>>
>>>>> object TimestampAggFunctions {
>>>>>
>>>>>   trait TimestampAggFunction {
>>>>>     def getInitValue = null
>>>>>     def getValueTypeInfo = SqlTimeTypeInfo.TIMESTAMP
>>>>>   }
>>>>>
>>>>>   class TimestampMinAggFunction extends MinAggFunction[Timestamp] with
>>>>> TimestampAggFunction
>>>>>   class TimestampMaxAggFunction extends MaxAggFunction[Timestamp] with
>>>>> TimestampAggFunction
>>>>>
>>>>> }
>>>>>
>>>>> // Registered with:
>>>>> tableEnv.registerFunction("TS_MIN", new TimestampMinAggFunction());
>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to