@Jan Lukavský<mailto:je...@seznam.cz> yes that’s exactly what I figured out a 
few days back. I was using WithKey transform to create a PCollection<KV<K, V>>. 
Thanks for your help!

From: Jan Lukavský <je...@seznam.cz>
Reply-To: "user@beam.apache.org" <user@beam.apache.org>
Date: Monday, December 14, 2020 at 2:05 AM
To: "user@beam.apache.org" <user@beam.apache.org>
Subject: Re: Question regarding GoupByKey operator on unbounded data


Hi,

I think what you might be looking for is "stateful processing", please have a 
look at [1]. Note that input to stateful DoFn must be of type KV<K, V>, which 
then ensures similar behavior to Flink's keyBy.

Best,

 Jan

[1] 
https://beam.apache.org/blog/stateful-processing/<https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fbeam.apache.org%2Fblog%2Fstateful-processing%2F&data=04%7C01%7Ctaol%40zillow.com%7Cc642ce6422b94eb7f2dc08d8a017caaf%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637435371345143676%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=4YxlLQZj77s4dQnUVO%2BQ0mbyd8qOLTtejBTG93jCOCg%3D&reserved=0>
On 12/13/20 6:27 AM, Tao Li wrote:

Sorry I think I had some misunderstanding on keyBy API from Flink. It’s not 
exactly equivalent to GroupByKey from Beam. So please ignore my question and 
this email thread. Thanks for help though 😊

From: Tao Li <t...@zillow.com><mailto:t...@zillow.com>
Date: Friday, December 11, 2020 at 7:29 PM
To: "user@beam.apache.org"<mailto:user@beam.apache.org> 
<user@beam.apache.org><mailto:user@beam.apache.org>, Reuven Lax 
<re...@google.com><mailto:re...@google.com>
Cc: Mehmet Emre Sahin <mehm...@zillow.com><mailto:mehm...@zillow.com>, 
Ying-Chang Cheng <yingcha...@zillowgroup.com><mailto:yingcha...@zillowgroup.com>
Subject: Re: Question regarding GoupByKey operator on unbounded data

Would Combine.PerKey work for my case? Seems like it does not require a window 
function.

At the same time it seems that this operator is typically used to generate some 
aggregated output (e.g. count) instead of the value list. So I am not sure if 
it’s suitable for my use case.

Please advise. Thanks!

From: Tao Li <t...@zillow.com><mailto:t...@zillow.com>
Reply-To: "user@beam.apache.org"<mailto:user@beam.apache.org> 
<user@beam.apache.org><mailto:user@beam.apache.org>
Date: Friday, December 11, 2020 at 10:29 AM
To: "user@beam.apache.org"<mailto:user@beam.apache.org> 
<user@beam.apache.org><mailto:user@beam.apache.org>, Reuven Lax 
<re...@google.com><mailto:re...@google.com>
Cc: Mehmet Emre Sahin <mehm...@zillow.com><mailto:mehm...@zillow.com>, 
Ying-Chang Cheng <yingcha...@zillowgroup.com><mailto:yingcha...@zillowgroup.com>
Subject: Re: Question regarding GoupByKey operator on unbounded data

Hi @Reuven Lax<mailto:re...@google.com> basically we have a flink app that does 
a stream processing. It uses a KeyBy operation to generate a keyed stream. 
Since we need to query all historical data of the input, we are not specifying 
a window function or a trigger in this flink app, which is fine.

Now we would like to convert this flink app to a beam app. The problem is that 
for a unbounded PCollection, beam requires either a non-global windowing or an 
aggregation trigger to perform a GroupByKey operation.
I was thinking about applying a sliding window with a huge size (say 1 year) to 
accommodate this Beam requirement. But not sure if this is feasible or a good 
practice.
So what’s your recommendation to solve this problem? Thanks!


From: Reuven Lax <re...@google.com><mailto:re...@google.com>
Reply-To: "user@beam.apache.org"<mailto:user@beam.apache.org> 
<user@beam.apache.org><mailto:user@beam.apache.org>
Date: Thursday, December 10, 2020 at 3:07 PM
To: user <user@beam.apache.org><mailto:user@beam.apache.org>
Cc: Mehmet Emre Sahin <mehm...@zillow.com><mailto:mehm...@zillow.com>, 
Ying-Chang Cheng <yingcha...@zillowgroup.com><mailto:yingcha...@zillowgroup.com>
Subject: Re: Question regarding GoupByKey operator on unbounded data

Can you explain more about what exactly you are trying to do?

On Thu, Dec 10, 2020 at 2:51 PM Tao Li 
<t...@zillow.com<mailto:t...@zillow.com>> wrote:
Hi Beam community,

I got a quick question about GoupByKey operator. According to this 
doc<https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fbeam.apache.org%2Fdocumentation%2Fprogramming-guide%2F%23groupbykey&data=04%7C01%7Ctaol%40zillow.com%7Cc642ce6422b94eb7f2dc08d8a017caaf%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637435371345153667%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=qVHPVAE2kqDxhDLvky9EoyvhVhU1C1VSlsJPPksqNO4%3D&reserved=0>,
  if we are using unbounded PCollection, it’s required to specify either 
non-global 
windowing<https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fbeam.apache.org%2Fdocumentation%2Fprogramming-guide%2F%23setting-your-pcollections-windowing-function&data=04%7C01%7Ctaol%40zillow.com%7Cc642ce6422b94eb7f2dc08d8a017caaf%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637435371345153667%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=GFHjEM13SZv4nIfjZCt7uH3V5F9bQ%2B8xd5v9PMoFPHo%3D&reserved=0>
 or an aggregation 
trigger<https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fbeam.apache.org%2Fdocumentation%2Fprogramming-guide%2F%23triggers&data=04%7C01%7Ctaol%40zillow.com%7Cc642ce6422b94eb7f2dc08d8a017caaf%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637435371345163663%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=oBbKWALV9TYE96AQ7HjlTkqGcNI8NWX%2BQ7fBBHfjztM%3D&reserved=0>
 in order to perform a GroupByKey operation.

In comparison, 
KeyBy<https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-stable%2Fdev%2Fstream%2Foperators%2F&data=04%7C01%7Ctaol%40zillow.com%7Cc642ce6422b94eb7f2dc08d8a017caaf%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637435371345163663%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=egtcS%2BtfSnBczFe4FK%2FSkKTyu%2Fq91DpwWLrV43u8hR8%3D&reserved=0>
 operator from flink does not have such a hard requirement for streamed data.

In our use case, we do need to query all historical streamed data and group by 
keys. KeyBy from flink satisfies our need, but Beam GoupByKey does not satisfy 
this need. I thought about applying a sliding window with a very large size 
(say 1 year), thus we can query the past 1 year’s data. But not sure if this is 
feasible or a good practice.

So what would the Beam solution be to implement this business logic? Is there a 
support from beam to process a relative long history of a unbounded PCollection?

Thanks so much!

Reply via email to