Hi Sumeet, Thanks for the sharing.
Then I guess you could use `.group_by(col('w'), input.a, input.b)`. Since the value for input.a is always the same, it’s equal to group_by(col(‘w'), input.b) logically. The benefit is that you could access input.a directly in the select clause. Regards, Dian > 2021年4月19日 下午6:29,Sumeet Malhotra <sumeet.malho...@gmail.com> 写道: > > Hi Guowei, > > Let me elaborate the use case with an example. > > Sample input table looks like this: > > time a b c > ----------------- > t0 a0 b0 1 > t1 a0 b1 2 > t2 a0 b2 3 > t3 a0 b0 6 > t4 a0 b1 7 > t5 a0 b2 8 > > Basically, every time interval there are new readings from a fixed set of > sensors (b0, b1 and b2). All these rows have a few constant fields > representing metadata about the input (a0). > > Desired output for every time interval is the average reading for every > sensor (b0, b1, b2), along with the constant metadata (a0): > > a0 b0 avg(c) > a0 b1 avg(c) > a0 b2 avg(c) > > This is what I was trying to build using a simple Tumble window: > > input.window(Tumble.over(lit(10).seconds).on(input.Timestamp).alias("w")) \ > .group_by(col('w'), input.b) \ > .select( > input.a, <=== constant metadata field, > same for every input record > input.b, <=== group_by field, to compute > averages > input.c.avg.alias('avg_value')) \ > .execute_insert('MySink') \ > .wait() > > The example above is highly simplified, but I hope it explains what I'm > trying to achieve. > > Thanks, > Sumeet > > > On Mon, Apr 19, 2021 at 3:21 PM Dian Fu <dian0511...@gmail.com > <mailto:dian0511...@gmail.com>> wrote: > Hi Sumeet, > > 1) Regarding to the above exception, it’s a known issue and has been fixed in > FLINK-21922 <https://issues.apache.org/jira/browse/FLINK-21922> [1]. It will > be available in the coming 1.12.3. You could also cherry-pick that fix to > 1.12.2 and build from source following the instruction described in [2] if > needed. > > 2) Regarding to your requirements, could you describe what you want to do > with group window or over window? > For group window(e.g. tumble window, hop window, session window, etc), it > will output one row for multiple inputs belonging to the same window. You > could not just passing through it from input to sink as it is > non-determinitic which row to use as there are multiple input rows. That’s > the reason why you have to declare a field in the group by clause if you want > to access it directly in the select clause. For over window, it will output > one row for each input and so you could pass through it directly. > > [1] https://issues.apache.org/jira/browse/FLINK-21922 > <https://issues.apache.org/jira/browse/FLINK-21922>. > [2] > https://ci.apache.org/projects/flink/flink-docs-release-1.12/flinkDev/building.html#build-pyflink > > <https://ci.apache.org/projects/flink/flink-docs-release-1.12/flinkDev/building.html#build-pyflink> > > >> 2021年4月19日 下午5:16,Sumeet Malhotra <sumeet.malho...@gmail.com >> <mailto:sumeet.malho...@gmail.com>> 写道: >> >> Thanks Guowei. I'm trying out Over Windows, as follows: >> >> input \ >> .over_window( >> Over.partition_by(col(input.a)) \ >> .order_by(input.Timestamp) \ >> .preceding(lit(10).seconds) \ >> .alias('w')) \ >> .select( >> input.b, >> input.c.avg.over(col('w'))) \ >> .execute_insert('MySink') \ >> .wait() >> >> But running into following exception: >> >> py4j.protocol.Py4JError: An error occurred while calling >> z:org.apache.flink.table.api.Over.partitionBy. Trace: >> org.apache.flink.api.python.shaded.py4j.Py4JException: Method >> partitionBy([class org.apache.flink.table.api.ApiExpression]) does not exist >> >> Is there any extra Jar that needs to be included for Over Windows. From the >> code it doesn't appear so. >> >> Thanks, >> Sumeet >> >> >> On Mon, Apr 19, 2021 at 1:10 PM Guowei Ma <guowei....@gmail.com >> <mailto:guowei....@gmail.com>> wrote: >> Hi, Sumeet >> >> Maybe you could try the Over Windows[1], which could keep the >> "non-group-key" column. >> >> [1] >> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/tableApi.html#over-windows >> >> <https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/tableApi.html#over-windows> >> >> Best, >> Guowei >> >> >> On Mon, Apr 19, 2021 at 3:25 PM Sumeet Malhotra <sumeet.malho...@gmail.com >> <mailto:sumeet.malho...@gmail.com>> wrote: >> Thanks Guowei! Regarding "input.c.avg" you're right, that doesn't cause any >> issues. It's only when I want to use "input.b". >> >> My use case is to basically emit "input.b" in the final sink as is, and not >> really perform any aggregation on that column - more like pass through from >> input to sink. What's the best way to achieve this? I was thinking that >> making it part of the select() clause would do it, but as you said there >> needs to be some aggregation performed on it. >> >> Thanks, >> Sumeet >> >> >> On Mon, Apr 19, 2021 at 12:26 PM Guowei Ma <guowei....@gmail.com >> <mailto:guowei....@gmail.com>> wrote: >> Hi, Sumeet >> For "input.b" I think you should aggregate the non-group-key >> column[1]. >> But I am not sure why the "input.c.avg.alias('avg_value')" has resolved >> errors. Would you mind giving more detailed error information? >> >> [1] >> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/tableApi.html#group-windows >> >> <https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/tableApi.html#group-windows> >> >> Best, >> Guowei >> >> >> On Mon, Apr 19, 2021 at 2:24 PM Sumeet Malhotra <sumeet.malho...@gmail.com >> <mailto:sumeet.malho...@gmail.com>> wrote: >> Hi, >> >> I have a use case where I'm creating a Tumbling window as follows: >> >> "input" table has columns [Timestamp, a, b, c] >> >> input \ >> .window(Tumble.over(lit(10).seconds).on(input.Timestamp).alias("w")) \ >> .group_by(col('w'), input.a) \ >> .select( >> col('w').start.alias('window_start'), >> col('w').end.alias('window_end'), >> input.b, >> input.c.avg.alias('avg_value')) \ >> .execute_insert('MySink') \ >> .wait() >> >> This throws an exception that it cannot resolve the fields "b" and "c" >> inside the select statement. If I mention these column names inside the >> group_by() statement as follows: >> >> .group_by(col('w'), input.a, input.b, input.c) >> >> then the column names in the subsequent select statement can be resolved. >> >> Basically, unless the column name is explicitly made part of the group_by() >> clause, the subsequent select() clause doesn't resolve it. This is very >> similar to the example from Flink's documentation here [1]: >> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/tableApi.html#overview--examples >> >> <https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/tableApi.html#overview--examples>, >> where a similar procedure works. >> >> Any idea how I can access columns from the input stream, without having to >> mention them in the group_by() clause? I really don't want to group the >> results by those fields, but they need to be written to the sink eventually. >> >> Thanks, >> Sumeet >