Thanks Dian, Guowei. I think it makes sense to roll with this approach.

On Tue, Apr 20, 2021 at 8:29 AM Guowei Ma <guowei....@gmail.com> wrote:

> Hi, Sumeet
> Thanks you for the sharing. As Dian suggested, I think you could use b as
> your `group_by`'s key and so the b could be output directly.
> I think it is more simple.
> Best,
> Guowei
>
>
> On Mon, Apr 19, 2021 at 7:31 PM Dian Fu <dian0511...@gmail.com> wrote:
>
>> Hi Sumeet,
>>
>> Thanks for the sharing.
>>
>> Then I guess you could use `.group_by(col('w'), input.a, input.b)`.
>> Since the value for input.a is always the same, it’s equal to group_by(
>> col(‘w'), input.b) logically. The benefit is that you could access
>> input.a directly in the select clause.
>>
>> Regards,
>> Dian
>>
>> 2021年4月19日 下午6:29,Sumeet Malhotra <sumeet.malho...@gmail.com> 写道:
>>
>> Hi Guowei,
>>
>> Let me elaborate the use case with an example.
>>
>> Sample input table looks like this:
>>
>> time    a   b   c
>> -----------------
>> t0      a0  b0  1
>> t1      a0  b1  2
>> t2      a0  b2  3
>> t3      a0  b0  6
>> t4      a0  b1  7
>> t5      a0  b2  8
>>
>> Basically, every time interval there are new readings from a fixed set of
>> sensors (b0, b1 and b2). All these rows have a few constant fields
>> representing metadata about the input (a0).
>>
>> Desired output for every time interval is the average reading for every
>> sensor (b0, b1, b2), along with the constant metadata (a0):
>>
>> a0    b0    avg(c)
>> a0    b1    avg(c)
>> a0    b2    avg(c)
>>
>> This is what I was trying to build using a simple Tumble window:
>>
>> input.window(Tumble.over(lit(10).seconds).on(input.Timestamp).alias("w"))
>> \
>>     .group_by(col('w'), input.b) \
>>     .select(
>>         input.a,                            <=== constant metadata field,
>> same for every input record
>>         input.b,                            <=== group_by field, to
>> compute averages
>>         input.c.avg.alias('avg_value')) \
>>     .execute_insert('MySink') \
>>     .wait()
>>
>> The example above is highly simplified, but I hope it explains what I'm
>> trying to achieve.
>>
>> Thanks,
>> Sumeet
>>
>>
>> On Mon, Apr 19, 2021 at 3:21 PM Dian Fu <dian0511...@gmail.com> wrote:
>>
>>> Hi Sumeet,
>>>
>>> 1) Regarding to the above exception, it’s a known issue and has been
>>> fixed in FLINK-21922 <https://issues.apache.org/jira/browse/FLINK-21922> 
>>> [1]. It
>>> will be available in the coming 1.12.3. You could also cherry-pick that fix
>>> to 1.12.2 and build from source following the instruction described in [2]
>>> if needed.
>>>
>>> 2) Regarding to your requirements, could you describe what you want to
>>> do with group window or over window?
>>> For group window(e.g. tumble window, hop window, session window, etc),
>>> it will output one row for multiple inputs belonging to the same window.
>>> You could not just passing through it from input to sink as it is
>>> non-determinitic which row to use as there are multiple input rows. That’s
>>> the reason why you have to declare a field in the group by clause if you
>>> want to access it directly in the select clause. For over window, it will
>>> output one row for each input and so you could pass through it directly.
>>>
>>> [1] https://issues.apache.org/jira/browse/FLINK-21922.
>>> [2]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/flinkDev/building.html#build-pyflink
>>>
>>>
>>> 2021年4月19日 下午5:16,Sumeet Malhotra <sumeet.malho...@gmail.com> 写道:
>>>
>>> Thanks Guowei. I'm trying out Over Windows, as follows:
>>>
>>> input \
>>>     .over_window(
>>>         Over.partition_by(col(input.a)) \
>>>         .order_by(input.Timestamp) \
>>>         .preceding(lit(10).seconds) \
>>>         .alias('w')) \
>>>     .select(
>>>         input.b,
>>>         input.c.avg.over(col('w'))) \
>>>     .execute_insert('MySink') \
>>>     .wait()
>>>
>>> But running into following exception:
>>>
>>> py4j.protocol.Py4JError: An error occurred while calling
>>> z:org.apache.flink.table.api.Over.partitionBy. Trace:
>>> org.apache.flink.api.python.shaded.py4j.Py4JException: Method
>>> partitionBy([class org.apache.flink.table.api.ApiExpression]) does not exist
>>>
>>> Is there any extra Jar that needs to be included for Over Windows. From
>>> the code it doesn't appear so.
>>>
>>> Thanks,
>>> Sumeet
>>>
>>>
>>> On Mon, Apr 19, 2021 at 1:10 PM Guowei Ma <guowei....@gmail.com> wrote:
>>>
>>>> Hi, Sumeet
>>>>
>>>> Maybe you could try the Over Windows[1], which could keep the
>>>> "non-group-key" column.
>>>>
>>>> [1]
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/tableApi.html#over-windows
>>>>
>>>> Best,
>>>> Guowei
>>>>
>>>>
>>>> On Mon, Apr 19, 2021 at 3:25 PM Sumeet Malhotra <
>>>> sumeet.malho...@gmail.com> wrote:
>>>>
>>>>> Thanks Guowei! Regarding "input.c.avg" you're right, that doesn't
>>>>> cause any issues. It's only when I want to use "input.b".
>>>>>
>>>>> My use case is to basically emit "input.b" in the final sink as is,
>>>>> and not really perform any aggregation on that column - more like pass
>>>>> through from input to sink. What's the best way to achieve this? I was
>>>>> thinking that making it part of the select() clause would do it, but as
>>>>> you said there needs to be some aggregation performed on it.
>>>>>
>>>>> Thanks,
>>>>> Sumeet
>>>>>
>>>>>
>>>>> On Mon, Apr 19, 2021 at 12:26 PM Guowei Ma <guowei....@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi, Sumeet
>>>>>>       For "input.b" I think you should aggregate the non-group-key
>>>>>> column[1].
>>>>>> But I am not sure why the "input.c.avg.alias('avg_value')"  has
>>>>>> resolved errors. Would you mind giving more detailed error information?
>>>>>>
>>>>>> [1]
>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/tableApi.html#group-windows
>>>>>>
>>>>>> Best,
>>>>>> Guowei
>>>>>>
>>>>>>
>>>>>> On Mon, Apr 19, 2021 at 2:24 PM Sumeet Malhotra <
>>>>>> sumeet.malho...@gmail.com> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> I have a use case where I'm creating a Tumbling window as follows:
>>>>>>>
>>>>>>> "input" table has columns [Timestamp, a, b, c]
>>>>>>>
>>>>>>> input \
>>>>>>>
>>>>>>> .window(Tumble.over(lit(10).seconds).on(input.Timestamp).alias("w")) \
>>>>>>>     .group_by(col('w'), input.a) \
>>>>>>>     .select(
>>>>>>>         col('w').start.alias('window_start'),
>>>>>>>         col('w').end.alias('window_end'),
>>>>>>>         input.b,
>>>>>>>         input.c.avg.alias('avg_value')) \
>>>>>>>     .execute_insert('MySink') \
>>>>>>>     .wait()
>>>>>>>
>>>>>>> This throws an exception that it cannot resolve the fields "b" and
>>>>>>> "c" inside the select statement. If I mention these column names inside 
>>>>>>> the
>>>>>>> group_by() statement as follows:
>>>>>>>
>>>>>>> .group_by(col('w'), input.a, input.b, input.c)
>>>>>>>
>>>>>>> then the column names in the subsequent select statement can be
>>>>>>> resolved.
>>>>>>>
>>>>>>> Basically, unless the column name is explicitly made part of the
>>>>>>> group_by() clause, the subsequent select() clause doesn't resolve it. 
>>>>>>> This
>>>>>>> is very similar to the example from Flink's documentation here [1]:
>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/tableApi.html#overview--examples,
>>>>>>> where a similar procedure works.
>>>>>>>
>>>>>>> Any idea how I can access columns from the input stream, without
>>>>>>> having to mention them in the group_by() clause? I really don't want to
>>>>>>> group the results by those fields, but they need to be written to the 
>>>>>>> sink
>>>>>>> eventually.
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Sumeet
>>>>>>>
>>>>>>
>>>
>>

Reply via email to