Hard to give any specifics without knowing how your data is being
streamed in. If both sources are in the same datastore, you can always
join at the data source and then emit the joined (left or otherwise)
records to your stream processor. On the other hand, if only Stream A or
B is coming in, you can use a RichAsyncFunction to enrich records one at
a time. On the third hand, if they're both streaming in from different
sources, you can probably correlate them according to processing time,
assuming that you don't expect an enrichment record from stream B to
show up more than N minutes after its corresponding record in Stream A -
in that case you can use a
`TumblingProcessTimeWindow.of(Duration.ofMinutes(N))`.
Best,
Kirill
On 8/6/24 10:24 PM, Sachin Mittal wrote:
So, I have a question related to windowing:
None of the records from A or B are timestamped.
The collections are backed by two master tables (each containing say a
few million records) in a database and I want to use flink to
basically join them and create an enriched stream C.
Hence I won't be able to use the window operator. Now loading entire
collections in flink memory may not be feasible, so how best I can do
this using coGroup.
Thanks
Sachin
On Wed, Aug 7, 2024 at 10:41 AM Kirill Ternovsky
<kirill.ternov...@interchecks.com> wrote:
Hi Sachin,
You should be able to use `coGroup`:
```
streamA.coGroup(streamB)
.where(StreamAFieldFSelector).isEqualTo(StreamBFieldFSelector)
.window(/*your windowing spec*/)
.apply(/*your cogroup function*/)
```
Your coGroup function will look something like:
```
@Override
public void coGroup(Iterable<StreamARecord> arecs,
Iterable<StreamBRecord> brecs, Collector<TOut> out) {
// if `brecs` is empty, that means nothing from Stream B matched
Stream A in the window.
}
```
Best,
Kirill
On 8/6/24 7:42 PM, Sachin Mittal wrote:
> Hi,
> I have two streams A and B.
> Which can be joined or connected using a field f.
>
> However, for a given record in A for f = f1, there does not
exist any
> record in B matching this field f = f1.
>
> In such cases I want to do a left outer join where the combined
record
> pushed downstream would only have field values from A and empty for
> ones supposed to be joined from B.
>
> Please let me know if there is any way I can do this, using the
> CoProcess function or something.
>
> Thanks
> Sachin
>