Hi Beam Community, After running into a high fan out issue and discovering that as a possible solution was that I could introduce a combination of windowing/group by steps between the DoFn that were being fused together, I started working on a simple proof of concept. However when I put everything together, Im seeing that the data is being held up in the combine.perKey step and not letting it move forward. Here's my code:
pb.apply("windowing", Window.<FieldValueList>into(FixedWindows.of(Duration.standardSeconds(1))) .triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow())) .withAllowedLateness(Duration.standardSeconds(3)) .discardingFiredPanes() ) .apply("remapping",MapElements .into(TypeDescriptors.kvs(TypeDescriptors.integers(), TypeDescriptor.of(FieldValueList.class))) .via(input -> KV.of(input.get("STORE_NBR").getNumericValue().intValue(), input))) .apply(Count.perKey()) .apply("print count", ParDo.of( new DoFn<KV<Integer, Long>, Long>(){ @ProcessElement public void processElement(ProcessContext c, BoundedWindow window){ String idStr = c.element().getKey().toString() + " - " + c.element().getValue().toString(); Instant ts = c.timestamp(); LOG.info(String.format("%s | %s : %s", idStr, ts, window.maxTimestamp().toDateTime().toString())); } } )) This snippet of code has removed some logging I had before, but it's important to clarify that I am seeing data move forward in the pipeline after the "remapping" step, with the proper windows assigned to each element. Any thoughts of what could be happening here? Thanks -- Frank Pinto