Hi Sachin,

You should be able to use `coGroup`:


```

streamA.coGroup(streamB)

.where(StreamAFieldFSelector).isEqualTo(StreamBFieldFSelector)

.window(/*your windowing spec*/)

.apply(/*your cogroup function*/)

```


Your coGroup function will look something like:

```

@Override

public void coGroup(Iterable<StreamARecord> arecs, Iterable<StreamBRecord> brecs, Collector<TOut> out) {

    // if `brecs` is empty, that means nothing from Stream B matched Stream A in the window.

}

```


Best,

Kirill

On 8/6/24 7:42 PM, Sachin Mittal wrote:
Hi,
I have two streams A and B.
Which can be joined or connected using a field f.

However, for a given record in A for f = f1, there does not exist any record in B matching this field f = f1.

In such cases I want to do a left outer join where the combined record pushed downstream would only have field values from A and empty for ones supposed to be joined from B.

Please let me know if there is any way I can do this, using the CoProcess function or something.

Thanks
Sachin

Reply via email to