Hi Piyush,

I think your second sql is correct, but the problem you have encountered is
the outside aggregation (GROUP BY userId
& COLLECT(client_custom_aggregated)) will
emit result immediately when receiving results from the inner aggregation.
Hence Flink need the sink to
1. either has ability to retract the former emitted result, the sink should
be a `RetractStreamTableSink` or
2. the sink has something like primary key and can update result by key. In
your case, userId should be the key.

I think you are trying to emit the result to a `AppendStreamTableSink`, so
here is why you see error like that.

Best,
Kurt


On Tue, Mar 12, 2019 at 9:46 PM Piyush Narang <p.nar...@criteo.com> wrote:

> Thanks for getting back Kurt. Yeah this might be an option to try out. I
> was hoping there would be a way to express this directly in the SQL though
> ☹.
>
>
>
> -- Piyush
>
>
>
>
>
> *From: *Kurt Young <ykt...@gmail.com>
> *Date: *Tuesday, March 12, 2019 at 2:25 AM
> *To: *Piyush Narang <p.nar...@criteo.com>
> *Cc: *"user@flink.apache.org" <user@flink.apache.org>
> *Subject: *Re: Expressing Flink array aggregation using Table / SQL API
>
>
>
> Hi Piyush,
>
>
>
> Could you try to add clientId into your aggregate function, and to track
> the map of <clientId, your_original_aggregation> inside your new aggregate
> function, and assemble what ever result when emit.
>
> The SQL will looks like:
>
>
>
> SELECT userId, some_aggregation(clientId, eventType, `timestamp`,
> dataField)
>
> FROM my_kafka_stream_table
>
> GROUP BY userId,  HOP(`timestamp`, INTERVAL '1' MINUTE, INTERVAL '1' HOUR)
>
>
>
> Kurt
>
>
>
>
>
> On Tue, Mar 12, 2019 at 11:03 AM Piyush Narang <p.nar...@criteo.com>
> wrote:
>
> Hi folks,
>
>
>
> I’m getting started with Flink and trying to figure out how to express
> aggregating some rows into an array to finally sink data into an
> AppendStreamTableSink.
>
> My data looks something like this:
>
> userId, clientId, eventType, timestamp, dataField
>
>
>
> I need to compute some custom aggregations using a UDAF while grouping by
> userId, clientId over a sliding window (10 mins, triggered every 1 min). My
> first attempt is:
>
> SELECT userId, clientId, my_aggregation(eventType, `timestamp`, dataField)
> as custom_aggregated
>
> FROM my_kafka_stream_table
>
> GROUP BY userId, clientId, HOP(`timestamp`, INTERVAL '1' MINUTE, INTERVAL
> '1' HOUR)
>
>
>
> This query works as I expect it to. In every time window I end up with
> inserts for unique userId + clientId combinations. What I want to do
> though, is generate a single row per userId in each time window and this is
> what I’m struggling with expressing along with the restriction that I want
> to sink this to an AppendStreamTableSink. I was hoping to do something like
> this:
>
>
>
> SELECT userId, COLLECT(client_custom_aggregated)
>
> FROM
>
> (
>
>   SELECT userId, MAP[clientId, my_aggregation(eventType, `timestamp`,
> dataField) as custom_aggregated] as client_custom_aggregated
>
>   FROM my_kafka_stream_table
>
>   GROUP BY userId, clientId, HOP(`timestamp`, INTERVAL '1' MINUTE,
> INTERVAL '1' HOUR)
>
> ) GROUP BY userId
>
>
>
> Unfortunately when I try this (and a few other variants), I run into the
> error, “AppendStreamTableSink requires that Table has only insert changes”.
> Does anyone know if there’s a way for me to compute my collect aggregation
> to produce one row per userId for a given time window?
>
>
>
> Thanks,
>
>
>
> -- Piyush
>
>
>
>

Reply via email to