Hi Guowei, Let me elaborate the use case with an example.
Sample input table looks like this: time a b c ----------------- t0 a0 b0 1 t1 a0 b1 2 t2 a0 b2 3 t3 a0 b0 6 t4 a0 b1 7 t5 a0 b2 8 Basically, every time interval there are new readings from a fixed set of sensors (b0, b1 and b2). All these rows have a few constant fields representing metadata about the input (a0). Desired output for every time interval is the average reading for every sensor (b0, b1, b2), along with the constant metadata (a0): a0 b0 avg(c) a0 b1 avg(c) a0 b2 avg(c) This is what I was trying to build using a simple Tumble window: input.window(Tumble.over(lit(10).seconds).on(input.Timestamp).alias("w")) \ .group_by(col('w'), input.b) \ .select( input.a, <=== constant metadata field, same for every input record input.b, <=== group_by field, to compute averages input.c.avg.alias('avg_value')) \ .execute_insert('MySink') \ .wait() The example above is highly simplified, but I hope it explains what I'm trying to achieve. Thanks, Sumeet On Mon, Apr 19, 2021 at 3:21 PM Dian Fu <dian0511...@gmail.com> wrote: > Hi Sumeet, > > 1) Regarding to the above exception, it’s a known issue and has been fixed > in FLINK-21922 <https://issues.apache.org/jira/browse/FLINK-21922> [1]. It > will be available in the coming 1.12.3. You could also cherry-pick that fix > to 1.12.2 and build from source following the instruction described in [2] > if needed. > > 2) Regarding to your requirements, could you describe what you want to do > with group window or over window? > For group window(e.g. tumble window, hop window, session window, etc), it > will output one row for multiple inputs belonging to the same window. You > could not just passing through it from input to sink as it is > non-determinitic which row to use as there are multiple input rows. That’s > the reason why you have to declare a field in the group by clause if you > want to access it directly in the select clause. For over window, it will > output one row for each input and so you could pass through it directly. > > [1] https://issues.apache.org/jira/browse/FLINK-21922. > [2] > https://ci.apache.org/projects/flink/flink-docs-release-1.12/flinkDev/building.html#build-pyflink > > > 2021年4月19日 下午5:16,Sumeet Malhotra <sumeet.malho...@gmail.com> 写道: > > Thanks Guowei. I'm trying out Over Windows, as follows: > > input \ > .over_window( > Over.partition_by(col(input.a)) \ > .order_by(input.Timestamp) \ > .preceding(lit(10).seconds) \ > .alias('w')) \ > .select( > input.b, > input.c.avg.over(col('w'))) \ > .execute_insert('MySink') \ > .wait() > > But running into following exception: > > py4j.protocol.Py4JError: An error occurred while calling > z:org.apache.flink.table.api.Over.partitionBy. Trace: > org.apache.flink.api.python.shaded.py4j.Py4JException: Method > partitionBy([class org.apache.flink.table.api.ApiExpression]) does not exist > > Is there any extra Jar that needs to be included for Over Windows. From > the code it doesn't appear so. > > Thanks, > Sumeet > > > On Mon, Apr 19, 2021 at 1:10 PM Guowei Ma <guowei....@gmail.com> wrote: > >> Hi, Sumeet >> >> Maybe you could try the Over Windows[1], which could keep the >> "non-group-key" column. >> >> [1] >> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/tableApi.html#over-windows >> >> Best, >> Guowei >> >> >> On Mon, Apr 19, 2021 at 3:25 PM Sumeet Malhotra < >> sumeet.malho...@gmail.com> wrote: >> >>> Thanks Guowei! Regarding "input.c.avg" you're right, that doesn't cause >>> any issues. It's only when I want to use "input.b". >>> >>> My use case is to basically emit "input.b" in the final sink as is, and >>> not really perform any aggregation on that column - more like pass through >>> from input to sink. What's the best way to achieve this? I was thinking >>> that making it part of the select() clause would do it, but as you said >>> there needs to be some aggregation performed on it. >>> >>> Thanks, >>> Sumeet >>> >>> >>> On Mon, Apr 19, 2021 at 12:26 PM Guowei Ma <guowei....@gmail.com> wrote: >>> >>>> Hi, Sumeet >>>> For "input.b" I think you should aggregate the non-group-key >>>> column[1]. >>>> But I am not sure why the "input.c.avg.alias('avg_value')" has >>>> resolved errors. Would you mind giving more detailed error information? >>>> >>>> [1] >>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/tableApi.html#group-windows >>>> >>>> Best, >>>> Guowei >>>> >>>> >>>> On Mon, Apr 19, 2021 at 2:24 PM Sumeet Malhotra < >>>> sumeet.malho...@gmail.com> wrote: >>>> >>>>> Hi, >>>>> >>>>> I have a use case where I'm creating a Tumbling window as follows: >>>>> >>>>> "input" table has columns [Timestamp, a, b, c] >>>>> >>>>> input \ >>>>> >>>>> .window(Tumble.over(lit(10).seconds).on(input.Timestamp).alias("w")) \ >>>>> .group_by(col('w'), input.a) \ >>>>> .select( >>>>> col('w').start.alias('window_start'), >>>>> col('w').end.alias('window_end'), >>>>> input.b, >>>>> input.c.avg.alias('avg_value')) \ >>>>> .execute_insert('MySink') \ >>>>> .wait() >>>>> >>>>> This throws an exception that it cannot resolve the fields "b" and "c" >>>>> inside the select statement. If I mention these column names inside the >>>>> group_by() statement as follows: >>>>> >>>>> .group_by(col('w'), input.a, input.b, input.c) >>>>> >>>>> then the column names in the subsequent select statement can be >>>>> resolved. >>>>> >>>>> Basically, unless the column name is explicitly made part of the >>>>> group_by() clause, the subsequent select() clause doesn't resolve it. This >>>>> is very similar to the example from Flink's documentation here [1]: >>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/tableApi.html#overview--examples, >>>>> where a similar procedure works. >>>>> >>>>> Any idea how I can access columns from the input stream, without >>>>> having to mention them in the group_by() clause? I really don't want to >>>>> group the results by those fields, but they need to be written to the sink >>>>> eventually. >>>>> >>>>> Thanks, >>>>> Sumeet >>>>> >>>> >