Hi Sumeet,

Thanks for the sharing.

Then I guess you could use `.group_by(col('w'), input.a, input.b)`. Since the 
value for input.a is always the same, it’s equal to group_by(col(‘w'), input.b) 
logically. The benefit is that you could access input.a directly in the select 
clause.

Regards,
Dian

> 2021年4月19日 下午6:29,Sumeet Malhotra <sumeet.malho...@gmail.com> 写道:
> 
> Hi Guowei,
> 
> Let me elaborate the use case with an example.
> 
> Sample input table looks like this:
> 
> time    a   b   c
> -----------------
> t0      a0  b0  1
> t1      a0  b1  2
> t2      a0  b2  3
> t3      a0  b0  6
> t4      a0  b1  7
> t5      a0  b2  8
> 
> Basically, every time interval there are new readings from a fixed set of 
> sensors (b0, b1 and b2). All these rows have a few constant fields 
> representing metadata about the input (a0).
> 
> Desired output for every time interval is the average reading for every 
> sensor (b0, b1, b2), along with the constant metadata (a0):
> 
> a0    b0    avg(c)
> a0    b1    avg(c)
> a0    b2    avg(c)
> 
> This is what I was trying to build using a simple Tumble window:
> 
> input.window(Tumble.over(lit(10).seconds).on(input.Timestamp).alias("w")) \
>     .group_by(col('w'), input.b) \
>     .select( 
>         input.a,                            <=== constant metadata field, 
> same for every input record
>         input.b,                            <=== group_by field, to compute 
> averages
>         input.c.avg.alias('avg_value')) \
>     .execute_insert('MySink') \
>     .wait()
> 
> The example above is highly simplified, but I hope it explains what I'm 
> trying to achieve.
> 
> Thanks,
> Sumeet
> 
> 
> On Mon, Apr 19, 2021 at 3:21 PM Dian Fu <dian0511...@gmail.com 
> <mailto:dian0511...@gmail.com>> wrote:
> Hi Sumeet,
> 
> 1) Regarding to the above exception, it’s a known issue and has been fixed in 
> FLINK-21922 <https://issues.apache.org/jira/browse/FLINK-21922> [1]. It will 
> be available in the coming 1.12.3. You could also cherry-pick that fix to 
> 1.12.2 and build from source following the instruction described in [2] if 
> needed.
> 
> 2) Regarding to your requirements, could you describe what you want to do 
> with group window or over window? 
> For group window(e.g. tumble window, hop window, session window, etc), it 
> will output one row for multiple inputs belonging to the same window. You 
> could not just passing through it from input to sink as it is 
> non-determinitic which row to use as there are multiple input rows. That’s 
> the reason why you have to declare a field in the group by clause if you want 
> to access it directly in the select clause. For over window, it will output 
> one row for each input and so you could pass through it directly.
> 
> [1] https://issues.apache.org/jira/browse/FLINK-21922 
> <https://issues.apache.org/jira/browse/FLINK-21922>.
> [2] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/flinkDev/building.html#build-pyflink
>  
> <https://ci.apache.org/projects/flink/flink-docs-release-1.12/flinkDev/building.html#build-pyflink>
> 
> 
>> 2021年4月19日 下午5:16,Sumeet Malhotra <sumeet.malho...@gmail.com 
>> <mailto:sumeet.malho...@gmail.com>> 写道:
>> 
>> Thanks Guowei. I'm trying out Over Windows, as follows:
>> 
>> input \
>>     .over_window(
>>         Over.partition_by(col(input.a)) \
>>         .order_by(input.Timestamp) \
>>         .preceding(lit(10).seconds) \
>>         .alias('w')) \
>>     .select(
>>         input.b,
>>         input.c.avg.over(col('w'))) \
>>     .execute_insert('MySink') \
>>     .wait()
>> 
>> But running into following exception:
>> 
>> py4j.protocol.Py4JError: An error occurred while calling 
>> z:org.apache.flink.table.api.Over.partitionBy. Trace:
>> org.apache.flink.api.python.shaded.py4j.Py4JException: Method 
>> partitionBy([class org.apache.flink.table.api.ApiExpression]) does not exist
>> 
>> Is there any extra Jar that needs to be included for Over Windows. From the 
>> code it doesn't appear so.
>> 
>> Thanks,
>> Sumeet
>> 
>> 
>> On Mon, Apr 19, 2021 at 1:10 PM Guowei Ma <guowei....@gmail.com 
>> <mailto:guowei....@gmail.com>> wrote:
>> Hi, Sumeet
>> 
>> Maybe you could try the Over Windows[1], which could keep the 
>> "non-group-key" column.
>> 
>> [1] 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/tableApi.html#over-windows
>>  
>> <https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/tableApi.html#over-windows>
>> 
>> Best,
>> Guowei
>> 
>> 
>> On Mon, Apr 19, 2021 at 3:25 PM Sumeet Malhotra <sumeet.malho...@gmail.com 
>> <mailto:sumeet.malho...@gmail.com>> wrote:
>> Thanks Guowei! Regarding "input.c.avg" you're right, that doesn't cause any 
>> issues. It's only when I want to use "input.b".
>> 
>> My use case is to basically emit "input.b" in the final sink as is, and not 
>> really perform any aggregation on that column - more like pass through from 
>> input to sink. What's the best way to achieve this? I was thinking that 
>> making it part of the select() clause would do it, but as you said there 
>> needs to be some aggregation performed on it.
>> 
>> Thanks,
>> Sumeet
>> 
>> 
>> On Mon, Apr 19, 2021 at 12:26 PM Guowei Ma <guowei....@gmail.com 
>> <mailto:guowei....@gmail.com>> wrote:
>> Hi, Sumeet
>>       For "input.b" I think you should aggregate the non-group-key 
>> column[1]. 
>> But I am not sure why the "input.c.avg.alias('avg_value')"  has resolved 
>> errors. Would you mind giving more detailed error information?
>> 
>> [1] 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/tableApi.html#group-windows
>>  
>> <https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/tableApi.html#group-windows>
>> 
>> Best,
>> Guowei
>> 
>> 
>> On Mon, Apr 19, 2021 at 2:24 PM Sumeet Malhotra <sumeet.malho...@gmail.com 
>> <mailto:sumeet.malho...@gmail.com>> wrote:
>> Hi,
>> 
>> I have a use case where I'm creating a Tumbling window as follows:
>> 
>> "input" table has columns [Timestamp, a, b, c]
>> 
>> input \
>>     .window(Tumble.over(lit(10).seconds).on(input.Timestamp).alias("w")) \
>>     .group_by(col('w'), input.a) \
>>     .select( 
>>         col('w').start.alias('window_start'),
>>         col('w').end.alias('window_end'),
>>         input.b,
>>         input.c.avg.alias('avg_value')) \
>>     .execute_insert('MySink') \
>>     .wait()
>> 
>> This throws an exception that it cannot resolve the fields "b" and "c" 
>> inside the select statement. If I mention these column names inside the 
>> group_by() statement as follows:
>> 
>> .group_by(col('w'), input.a, input.b, input.c)
>> 
>> then the column names in the subsequent select statement can be resolved.
>> 
>> Basically, unless the column name is explicitly made part of the group_by() 
>> clause, the subsequent select() clause doesn't resolve it. This is very 
>> similar to the example from Flink's documentation here [1]: 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/tableApi.html#overview--examples
>>  
>> <https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/tableApi.html#overview--examples>,
>>  where a similar procedure works.
>> 
>> Any idea how I can access columns from the input stream, without having to 
>> mention them in the group_by() clause? I really don't want to group the 
>> results by those fields, but they need to be written to the sink eventually.
>> 
>> Thanks,
>> Sumeet
> 

Reply via email to