Hi Beam Community,

After running into a high fan out issue and discovering that as a possible
solution was that I could introduce a combination of windowing/group by
steps between the DoFn that were being fused together, I started working on
a simple proof of concept. However when I put everything together, Im
seeing that the data is being held up in the combine.perKey step and not
letting it move forward. Here's my code:

pb.apply("windowing",
Window.<FieldValueList>into(FixedWindows.of(Duration.standardSeconds(1)))
    .triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow()))
        .withAllowedLateness(Duration.standardSeconds(3))
        .discardingFiredPanes()
    )
.apply("remapping",MapElements
    .into(TypeDescriptors.kvs(TypeDescriptors.integers(),
TypeDescriptor.of(FieldValueList.class)))
    .via(input ->
KV.of(input.get("STORE_NBR").getNumericValue().intValue(), input)))
.apply(Count.perKey())
.apply("print count", ParDo.of(
    new DoFn<KV<Integer, Long>, Long>(){
      @ProcessElement
      public void processElement(ProcessContext c, BoundedWindow window){
        String idStr = c.element().getKey().toString() + " - " +
c.element().getValue().toString();

        Instant ts = c.timestamp();

        LOG.info(String.format("%s | %s : %s", idStr, ts,
window.maxTimestamp().toDateTime().toString()));

      }
    }
))

This snippet of code has removed some logging I had before, but it's
important to clarify that I am seeing data move forward in the pipeline
after the "remapping" step, with the proper windows assigned to each
element.

Any thoughts of what could be happening here?

Thanks


-- 
Frank Pinto

Reply via email to