Hi There, Actually it's not a runner problem.. If you look at the code, the combiner doesn't emit the records until the window ends. In my use-case I can't keep a short window to get meaningful records. Here is my question:
How to implement a combiner to successfully emit records many times per window? On Tue, 2 Aug 2022 at 23:23, P Singh <bigdatadevelop...@gmail.com> wrote: > Hi Mátyás, > > Ohh okay I can try that but something throws this error too if there's no > data in the pub-sub topic. > > window.WindowFn.AssignContext(source_window.max_timestamp())))[-1] > AttributeError: 'NoneType' object has no attribute 'max_timestamp' > > > On Tue, 2 Aug 2022 at 23:03, Mátyás Manninger via user < > user@beam.apache.org> wrote: > >> I had similar issues with the local runner, I deployed the code to >> Dataflow and it worked. Try deploying it and see if that helps. >> >> On 2022. Aug 2., Tue at 16:31, P Singh <bigdatadevelop...@gmail.com> >> wrote: >> >>> Hi There, >>> >>> I am implementing a beam pipeline,which is reading events from pub-sub >>> and using sliding windowing and then combinegloblally. The thing is that I >>> am not able to test my pipeline on local machine. below is the code sample. >>> I am not able to feed event_out to event_fi my pipeline gets frozen on >>> events_out means it produces event_out output on screen and from there it >>> doesn't process anything it just stuck on the screen. >>> >>> event_out = event | "Windowing" >>> >> beam.WindowInto( >>> window.SlidingWindows(60 * 60, 10), >>> trigger=Repeatedly(AfterAny(AfterProcessingTime(10))), >>> allowed_lateness=0, >>> accumulation_mode=AccumulationMode.DISCARDING, >>> ) >>> | "Combine-globally" >>> >> beam.CombineGlobally( >>> lambda e, t: get_agg( >>> e, t >>> ), >>> t=t >>> ).without_defaults() >>> ) >>> >>> event_fi = ( >>> records >>> | "mapping" >>> >> beam.FlatMap( >>> fun1, rights=beam.pvalue.AsIter(event_out) >>> ) >>> | beam.Map(print) >>> ) >>> >>> >>> >>> >>> Any suggestions or help would be appreciated. >>> >>> -- >> >> >> >> Med Vänlig Hälsning / Best Regards >> <https://lunar.app/> >> *Matyas Manninger* >> *Data Engineer Consultant* >> +46 76 050 6326 >> m...@lunar.app, lunar.app >> CVR: DK36982837 >> >