The UpdateFn won't be invoked till the side input is ready which requires either the watermark to pass the end of the global window + allowed lateness (to show that the side input is empty) or at least one firing to populate it with data. See this general section on side inputs[1] and some useful patterns[2] (there are some examples for how to get globally windowed side inputs to work).
1: https://beam.apache.org/documentation/programming-guide/#side-inputs 2: https://beam.apache.org/documentation/patterns/side-inputs/ On Sun, Jun 28, 2020 at 6:24 PM Praveen K Viswanathan < harish.prav...@gmail.com> wrote: > > Hi All - I am facing an issue while using *side-input*. > > *What am I doing:* > From my main program, I am calling a custom PTransform with a > PCollectionView as parameter. Inside custom PTransform, I am passing the > PCollectionView as a side-input to a DoFn. > > *Issue:* > When I run the pipeline, I am expecting the log statement inside my DoFn's > processElement to get executed but it is not getting logged. If I remove > the side-input to my DoFn then the log is getting printed. I am suspecting > whether it could be related to windowing/execution order or my side-input > somehow being empty. Appreciate if you can clarify on what is going wrong > here. > > *Code Structure:* > > > *Main Program:* PCollectionTuple tuple = input.apply(new FirstTx()); > > // Get two tuple tags from first transformation > PCollection1 = tuple.get(tag1).setCoder(...); > PCollection2 = tuple.get(tag2).setCoder(...); > > // Converting PCollection1 to PCollectionView to use as a side-input > // Note: I need to introduce a global window here as my source is > unbounded and when we use View.asList() it does GroupByKey internally > which inturn demands a window > PView = PCollection1.apply(Window.<KV<String, CustomObject>>into(new > GlobalWindows()) // Everything into global window. > > .triggering(Repeatedly.forever(DefaultTrigger.of())) > > .discardingFiredPanes()).apply(Values.create()).apply(View.asList()); > > // Pass PCollectionView to SecondTx as a param > PCollection3 = PCollection2.apply(new SecondTx(PView)); > > *SecondTx:* > Inside my SecondTx, I am getting the PView from constructor (this.PView = > PView) and calling a DoFn > > public PCollection<CustomObject> expand(PCollection <KV <String, KV > <String, CustomObject>>> input) { > input.apply(ParDo.of(new UpdateFn()).withSideInput("SideInput", PView)); > ... > } > > // DoFn > class UpdateFn extends DoFn<Map<String, Map<String, Map<String, String>>>, > CustomObject> { > @ProcessElement > public void processElement(@Element Map<String, Map<String, > Map<String, String>>> input, OutputReceiver<CustomObject> out) { > * Log.of("UpdateFn " + input);* > out.output(new CustomObject()); > } > } > > -- > Thanks, > Praveen K Viswanathan >