Hi all
i need some advice regarding windows usage. i am sure this is a very basic
question, any guidance will be very appreciated

I am using:

- unbounded pcollectionA with FixedWindow of 1 minute from which eventually
i create state and use it as side input.

PCollection<KV<String, InferenceOutput>> pcollectionA = x.
        apply(Window.<KV<String,
Output>>into(FixedWindows.of(Duration.standardSeconds(60)))

.triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow()))
                .withAllowedLateness(Duration.ZERO)
                .discardingFiredPanes());

PCollectionView<Map<String, Iterable<ResList>>> *handleResults* =
pcollectionA.apply("handleResults", ParDo.of(new
HandleResults())).apply(View.asMultimap());

-  unbounded Pcollection B which reads messages from kafka. the message
arrives every 30 sec. (depends on some external scheduler)
the logic is to use the key of event arrived to PcollectionB and retrive
the values from the side input i have created from PcollectionA

 PCollection<KV<String, String>> pcollectionB = pipeline.apply(
"readFromKafka",

                KafkaTransform.readStrFromKafka(
                        pipelineUtil.getBootstrapServers(),
INFERENCE_TRIGGER, PIPELINE_NAME)).
        apply(Window.<KV<String,
String>>into(FixedWindows.of(Duration.standardSeconds(30)))
        .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
                .withAllowedLateness(Duration.ZERO)
                .discardingFiredPanes());

PCollection<KV<String, Iterable<String>>> pcollectionBByKey =
pcollectionB.apply(GroupByKey.create());

PCollection<KV<String, AnalyticsOutput>> analyticsOutput =
pcollectionBByKey.apply(ParDo.of(new
RetrieveData(*handleResults*)).withSideInput("results",
*handleResults*));

Per my understanding,  if the "*handleResults" *side input is not
"ready", even if i have a new event in PcollectionB it will wait.

Is there any trigger that will handle each event that arrives in
pcollectionB immediately? and if there is no data in state just return
an empty result?

I tried to use different triggers but could not meet this requirement.


I hope my problem is clear.

thanks

Sigalit

Reply via email to