Thanks Dian, Guowei. I think it makes sense to roll with this approach. On Tue, Apr 20, 2021 at 8:29 AM Guowei Ma <guowei....@gmail.com> wrote:
> Hi, Sumeet > Thanks you for the sharing. As Dian suggested, I think you could use b as > your `group_by`'s key and so the b could be output directly. > I think it is more simple. > Best, > Guowei > > > On Mon, Apr 19, 2021 at 7:31 PM Dian Fu <dian0511...@gmail.com> wrote: > >> Hi Sumeet, >> >> Thanks for the sharing. >> >> Then I guess you could use `.group_by(col('w'), input.a, input.b)`. >> Since the value for input.a is always the same, it’s equal to group_by( >> col(‘w'), input.b) logically. The benefit is that you could access >> input.a directly in the select clause. >> >> Regards, >> Dian >> >> 2021年4月19日 下午6:29,Sumeet Malhotra <sumeet.malho...@gmail.com> 写道: >> >> Hi Guowei, >> >> Let me elaborate the use case with an example. >> >> Sample input table looks like this: >> >> time a b c >> ----------------- >> t0 a0 b0 1 >> t1 a0 b1 2 >> t2 a0 b2 3 >> t3 a0 b0 6 >> t4 a0 b1 7 >> t5 a0 b2 8 >> >> Basically, every time interval there are new readings from a fixed set of >> sensors (b0, b1 and b2). All these rows have a few constant fields >> representing metadata about the input (a0). >> >> Desired output for every time interval is the average reading for every >> sensor (b0, b1, b2), along with the constant metadata (a0): >> >> a0 b0 avg(c) >> a0 b1 avg(c) >> a0 b2 avg(c) >> >> This is what I was trying to build using a simple Tumble window: >> >> input.window(Tumble.over(lit(10).seconds).on(input.Timestamp).alias("w")) >> \ >> .group_by(col('w'), input.b) \ >> .select( >> input.a, <=== constant metadata field, >> same for every input record >> input.b, <=== group_by field, to >> compute averages >> input.c.avg.alias('avg_value')) \ >> .execute_insert('MySink') \ >> .wait() >> >> The example above is highly simplified, but I hope it explains what I'm >> trying to achieve. >> >> Thanks, >> Sumeet >> >> >> On Mon, Apr 19, 2021 at 3:21 PM Dian Fu <dian0511...@gmail.com> wrote: >> >>> Hi Sumeet, >>> >>> 1) Regarding to the above exception, it’s a known issue and has been >>> fixed in FLINK-21922 <https://issues.apache.org/jira/browse/FLINK-21922> >>> [1]. It >>> will be available in the coming 1.12.3. You could also cherry-pick that fix >>> to 1.12.2 and build from source following the instruction described in [2] >>> if needed. >>> >>> 2) Regarding to your requirements, could you describe what you want to >>> do with group window or over window? >>> For group window(e.g. tumble window, hop window, session window, etc), >>> it will output one row for multiple inputs belonging to the same window. >>> You could not just passing through it from input to sink as it is >>> non-determinitic which row to use as there are multiple input rows. That’s >>> the reason why you have to declare a field in the group by clause if you >>> want to access it directly in the select clause. For over window, it will >>> output one row for each input and so you could pass through it directly. >>> >>> [1] https://issues.apache.org/jira/browse/FLINK-21922. >>> [2] >>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/flinkDev/building.html#build-pyflink >>> >>> >>> 2021年4月19日 下午5:16,Sumeet Malhotra <sumeet.malho...@gmail.com> 写道: >>> >>> Thanks Guowei. I'm trying out Over Windows, as follows: >>> >>> input \ >>> .over_window( >>> Over.partition_by(col(input.a)) \ >>> .order_by(input.Timestamp) \ >>> .preceding(lit(10).seconds) \ >>> .alias('w')) \ >>> .select( >>> input.b, >>> input.c.avg.over(col('w'))) \ >>> .execute_insert('MySink') \ >>> .wait() >>> >>> But running into following exception: >>> >>> py4j.protocol.Py4JError: An error occurred while calling >>> z:org.apache.flink.table.api.Over.partitionBy. Trace: >>> org.apache.flink.api.python.shaded.py4j.Py4JException: Method >>> partitionBy([class org.apache.flink.table.api.ApiExpression]) does not exist >>> >>> Is there any extra Jar that needs to be included for Over Windows. From >>> the code it doesn't appear so. >>> >>> Thanks, >>> Sumeet >>> >>> >>> On Mon, Apr 19, 2021 at 1:10 PM Guowei Ma <guowei....@gmail.com> wrote: >>> >>>> Hi, Sumeet >>>> >>>> Maybe you could try the Over Windows[1], which could keep the >>>> "non-group-key" column. >>>> >>>> [1] >>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/tableApi.html#over-windows >>>> >>>> Best, >>>> Guowei >>>> >>>> >>>> On Mon, Apr 19, 2021 at 3:25 PM Sumeet Malhotra < >>>> sumeet.malho...@gmail.com> wrote: >>>> >>>>> Thanks Guowei! Regarding "input.c.avg" you're right, that doesn't >>>>> cause any issues. It's only when I want to use "input.b". >>>>> >>>>> My use case is to basically emit "input.b" in the final sink as is, >>>>> and not really perform any aggregation on that column - more like pass >>>>> through from input to sink. What's the best way to achieve this? I was >>>>> thinking that making it part of the select() clause would do it, but as >>>>> you said there needs to be some aggregation performed on it. >>>>> >>>>> Thanks, >>>>> Sumeet >>>>> >>>>> >>>>> On Mon, Apr 19, 2021 at 12:26 PM Guowei Ma <guowei....@gmail.com> >>>>> wrote: >>>>> >>>>>> Hi, Sumeet >>>>>> For "input.b" I think you should aggregate the non-group-key >>>>>> column[1]. >>>>>> But I am not sure why the "input.c.avg.alias('avg_value')" has >>>>>> resolved errors. Would you mind giving more detailed error information? >>>>>> >>>>>> [1] >>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/tableApi.html#group-windows >>>>>> >>>>>> Best, >>>>>> Guowei >>>>>> >>>>>> >>>>>> On Mon, Apr 19, 2021 at 2:24 PM Sumeet Malhotra < >>>>>> sumeet.malho...@gmail.com> wrote: >>>>>> >>>>>>> Hi, >>>>>>> >>>>>>> I have a use case where I'm creating a Tumbling window as follows: >>>>>>> >>>>>>> "input" table has columns [Timestamp, a, b, c] >>>>>>> >>>>>>> input \ >>>>>>> >>>>>>> .window(Tumble.over(lit(10).seconds).on(input.Timestamp).alias("w")) \ >>>>>>> .group_by(col('w'), input.a) \ >>>>>>> .select( >>>>>>> col('w').start.alias('window_start'), >>>>>>> col('w').end.alias('window_end'), >>>>>>> input.b, >>>>>>> input.c.avg.alias('avg_value')) \ >>>>>>> .execute_insert('MySink') \ >>>>>>> .wait() >>>>>>> >>>>>>> This throws an exception that it cannot resolve the fields "b" and >>>>>>> "c" inside the select statement. If I mention these column names inside >>>>>>> the >>>>>>> group_by() statement as follows: >>>>>>> >>>>>>> .group_by(col('w'), input.a, input.b, input.c) >>>>>>> >>>>>>> then the column names in the subsequent select statement can be >>>>>>> resolved. >>>>>>> >>>>>>> Basically, unless the column name is explicitly made part of the >>>>>>> group_by() clause, the subsequent select() clause doesn't resolve it. >>>>>>> This >>>>>>> is very similar to the example from Flink's documentation here [1]: >>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/tableApi.html#overview--examples, >>>>>>> where a similar procedure works. >>>>>>> >>>>>>> Any idea how I can access columns from the input stream, without >>>>>>> having to mention them in the group_by() clause? I really don't want to >>>>>>> group the results by those fields, but they need to be written to the >>>>>>> sink >>>>>>> eventually. >>>>>>> >>>>>>> Thanks, >>>>>>> Sumeet >>>>>>> >>>>>> >>> >>