Disclaimer: I am not an expert, but I kinda worked on something similar. A few points I'd like to bring up: - Side inputs do not trigger the processElement function when new elements are added to the input. That means that if your side input doesn't have the desired other item in the side input at the time it's processed, too bad. Unless you use timers to reprocess it at a later time when the side input might have more data.
- If your goal to somewhat combine two PCollections, I would suggest you look into CoGroupByKey[1] and it's schema aware brother CoGroup [2], you can then use a global window that triggers on every element (AfterProcessingTime.pastFirstElementInPane). - The input of a PTransform can also be a PCollectionTuple and in your inner ParDo, you can loop through items of either collection. Something like this. Pseudocode: class FooTransfrom extends PTransform<PCollectionTuple, PCollection<Pojo>>{ private TupleTag<PojoA> aTag = new TupleTag<PojoA>() {} <- curly brackets are important as far as I know private TupleTag<PojoB> bTag = new TupleTag<PojoB>() {} // getters for the above fields public PCollection<Pojo> expand(PCollectionTuple input){ return input.apply(new FooDoFn(aTag, bTag)); } private static class FooDoFn extends DoFn<PCollectionTuple, Pojo>{ constructor(TupleTag<PojoA> aTag, TupleTag<PojoB> bTag){ // set fields } public void processElement(Context ctx){ var itemFromA = ctx.element().get(this.aTag); if(itemFromA != null) { logic } var itemsFromB = ctx.element().get(this.bTag); if(itemFromB != null) { logic } // adding these to a state variable would effectively be an unbounded side input } } } Hope it helps, Cristian [1] https://beam.apache.org/documentation/transforms/java/aggregation/cogroupbykey/ [2] https://beam.apache.org/releases/javadoc/2.36.0/org/apache/beam/sdk/schemas/transforms/CoGroup.html On Thu, Jul 21, 2022 at 1:45 AM Sahil Modak <smo...@paloaltonetworks.com> wrote: > Hi, > > We are looking to use the side input feature for one of our DoFns. The > side input has to be a PCollection which is being constructed from a > subscription using PubsubIO.read > > We want our primary DoFn which operates in a global window KV pair to > access this side input. > The goal is to have all the messages of this unbounded source (side input) > to be available across all the KV pairs in our input DoFn which will use > this side input. > > Is it possible to have an unbounded source (like pubsub) as a side input? > > Thanks, > Sahil >