Hi jan,

Thanks, I will check the options you suggested.

Best Regards,
Ifat

From: Jan Lukavský <je...@seznam.cz>
Reply-To: "user@beam.apache.org" <user@beam.apache.org>
Date: Wednesday, 4 January 2023 at 18:52
To: "user@beam.apache.org" <user@beam.apache.org>
Subject: Re: Join streams with different frequencies


Hi,

the general pattern here would be to map both the PCollections to a common 
type, e.g. PCollection<KV<TupleTag, T>> and then flatten them into one 
PCollection, onto which you apply a stateful DoFn, see [1]. You would hold the 
DataY value of your ID in the state and match it against events coming from 
DataX stream. Under the assumption you do not need to make ensure that each 
DataX stream is matched on the *exactly preceding* DataY event (in event time), 
this works fine.

If you need to be sure that each DataX event is matched against the latest 
DataY (and most of the time it is likely you don't have this requirement), then 
you can:

 a) buffer DataX in a BagState and use timers to flush the state after some 
timeout, or

 b) use @DoFn.RequiresTimeSortedInput [2] (if your runners supports it), which 
will do the buffering for you and pass the elements into @ProcessElement method 
sorted by event timestamp

In both cases it is worth to realize how you want to handle late data (i.e. 
data that arrived after watermark, or after an element was already matched, but 
on a wrong element). The solution (b) simply drops the late element (which 
might not be what you want), or introduces latency defined by allowedLateness. 
Another option would be to implement retractions and process them downstream. I 
implemented something like that in [3].

Hope that helps,

 Jan

[1] https://beam.apache.org/blog/stateful-processing/

[2] 
https://beam.apache.org/releases/javadoc/2.43.0/org/apache/beam/sdk/transforms/DoFn.RequiresTimeSortedInput.html

[3] 
https://github.com/PacktPublishing/Building-Big-Data-Pipelines-with-Apache-Beam/blob/main/chapter4/src/main/java/com/packtpub/beam/chapter4/StreamingInnerJoin.java
On 1/4/23 16:28, Ifat Afek (Nokia) wrote:
Thanks Sören,

I already saw your stack overflow question while trying to find a solution 😊
I prefer a solution that does not involve an external cache like Redis, if 
possible.

Best Regards,
Ifat

From: Sören Henning 
<soeren.henn...@email.uni-kiel.de><mailto:soeren.henn...@email.uni-kiel.de>
Reply-To: "user@beam.apache.org"<mailto:user@beam.apache.org> 
<user@beam.apache.org><mailto:user@beam.apache.org>
Date: Tuesday, 3 January 2023 at 14:56
To: "user@beam.apache.org"<mailto:user@beam.apache.org> 
<user@beam.apache.org><mailto:user@beam.apache.org>
Subject: Re: Join streams with different frequencies


Hi,

while I cannot provide you with a definite answer to your question, maybe my 
Stack Overflow question is interesting for you: 
https://stackoverflow.com/questions/66067263/how-to-join-a-frequently-updating-stream-with-an-irregularly-updating-stream-i

Best regards,
Sören
Am 03.01.23 um 09:15 schrieb Ifat Afek (Nokia):
Hi,

We are trying to implement the following use case:
We have a stream of DataX events that arrive every 5 minutes and require some 
processing. Each event holds data for a specific non-unique ID (we keep getting 
updated data for each ID). There might be up to 1,000,000 IDs.
In addition, there is a stream of DataY events for some of these IDs, that 
arrive in a variable frequency. Could be after a minute and then again after 5 
hours.
We would like to join the current DataX and latest DataY events by ID (and 
process only IDs that have both DataX and DataY events).

We thought of holding a state of DataY events per ID in a global window, and 
then use it as a side input for filtering the DataX events stream. The state 
should hold the latest (by timestamp) DataY event that arrived.
The problem is: if we are using discardingFiredPanes(), then each DataY event 
is fired only once and cannot be reused later on for filtering. On the other 
hand, if we are using accumulatingFiredPanes(), then a list of all DataY events 
that arrived is fired.

Are we missing something? what is the best practice for combining two streams, 
one with a variable frequency?

Thanks,
Ifat

Reply via email to