Thanks Guowei. I'm trying out Over Windows, as follows:

input \
    .over_window(
        Over.partition_by(col(input.a)) \
        .order_by(input.Timestamp) \
        .preceding(lit(10).seconds) \
        .alias('w')) \
    .select(
        input.b,
        input.c.avg.over(col('w'))) \
    .execute_insert('MySink') \
    .wait()

But running into following exception:

py4j.protocol.Py4JError: An error occurred while calling
z:org.apache.flink.table.api.Over.partitionBy. Trace:
org.apache.flink.api.python.shaded.py4j.Py4JException: Method
partitionBy([class org.apache.flink.table.api.ApiExpression]) does not exist

Is there any extra Jar that needs to be included for Over Windows. From the
code it doesn't appear so.

Thanks,
Sumeet


On Mon, Apr 19, 2021 at 1:10 PM Guowei Ma <guowei....@gmail.com> wrote:

> Hi, Sumeet
>
> Maybe you could try the Over Windows[1], which could keep the
> "non-group-key" column.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/tableApi.html#over-windows
>
> Best,
> Guowei
>
>
> On Mon, Apr 19, 2021 at 3:25 PM Sumeet Malhotra <sumeet.malho...@gmail.com>
> wrote:
>
>> Thanks Guowei! Regarding "input.c.avg" you're right, that doesn't cause
>> any issues. It's only when I want to use "input.b".
>>
>> My use case is to basically emit "input.b" in the final sink as is, and
>> not really perform any aggregation on that column - more like pass through
>> from input to sink. What's the best way to achieve this? I was thinking
>> that making it part of the select() clause would do it, but as you said
>> there needs to be some aggregation performed on it.
>>
>> Thanks,
>> Sumeet
>>
>>
>> On Mon, Apr 19, 2021 at 12:26 PM Guowei Ma <guowei....@gmail.com> wrote:
>>
>>> Hi, Sumeet
>>>       For "input.b" I think you should aggregate the non-group-key
>>> column[1].
>>> But I am not sure why the "input.c.avg.alias('avg_value')"  has resolved
>>> errors. Would you mind giving more detailed error information?
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/tableApi.html#group-windows
>>>
>>> Best,
>>> Guowei
>>>
>>>
>>> On Mon, Apr 19, 2021 at 2:24 PM Sumeet Malhotra <
>>> sumeet.malho...@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> I have a use case where I'm creating a Tumbling window as follows:
>>>>
>>>> "input" table has columns [Timestamp, a, b, c]
>>>>
>>>> input \
>>>>
>>>> .window(Tumble.over(lit(10).seconds).on(input.Timestamp).alias("w")) \
>>>>     .group_by(col('w'), input.a) \
>>>>     .select(
>>>>         col('w').start.alias('window_start'),
>>>>         col('w').end.alias('window_end'),
>>>>         input.b,
>>>>         input.c.avg.alias('avg_value')) \
>>>>     .execute_insert('MySink') \
>>>>     .wait()
>>>>
>>>> This throws an exception that it cannot resolve the fields "b" and "c"
>>>> inside the select statement. If I mention these column names inside the
>>>> group_by() statement as follows:
>>>>
>>>> .group_by(col('w'), input.a, input.b, input.c)
>>>>
>>>> then the column names in the subsequent select statement can be
>>>> resolved.
>>>>
>>>> Basically, unless the column name is explicitly made part of the
>>>> group_by() clause, the subsequent select() clause doesn't resolve it. This
>>>> is very similar to the example from Flink's documentation here [1]:
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/tableApi.html#overview--examples,
>>>> where a similar procedure works.
>>>>
>>>> Any idea how I can access columns from the input stream, without having
>>>> to mention them in the group_by() clause? I really don't want to group the
>>>> results by those fields, but they need to be written to the sink 
>>>> eventually.
>>>>
>>>> Thanks,
>>>> Sumeet
>>>>
>>>

Reply via email to