Look into triggers. You can trigger windows based on number of elements and other conditions. Here are the basics: https://beam.apache.org/documentation/programming-guide/#triggers
BR Med Vänlig Hälsning / Best Regards <https://lunar.app/> *Matyas Manninger* *Data Engineer Consultant* +46 76 050 6326 m...@lunar.app, lunar.app CVR: DK36982837 On Tue, 2 Aug 2022 at 21:33, P Singh <bigdatadevelop...@gmail.com> wrote: > Hi There, > > Actually it's not a runner problem.. If you look at the code, the combiner > doesn't emit the records until the window ends. In my use-case I can't keep > a short window to get meaningful records. Here is my question: > > How to implement a combiner to successfully emit records many times per > window? > > On Tue, 2 Aug 2022 at 23:23, P Singh <bigdatadevelop...@gmail.com> wrote: > >> Hi Mátyás, >> >> Ohh okay I can try that but something throws this error too if there's no >> data in the pub-sub topic. >> >> window.WindowFn.AssignContext(source_window.max_timestamp())))[-1] >> AttributeError: 'NoneType' object has no attribute 'max_timestamp' >> >> >> On Tue, 2 Aug 2022 at 23:03, Mátyás Manninger via user < >> user@beam.apache.org> wrote: >> >>> I had similar issues with the local runner, I deployed the code to >>> Dataflow and it worked. Try deploying it and see if that helps. >>> >>> On 2022. Aug 2., Tue at 16:31, P Singh <bigdatadevelop...@gmail.com> >>> wrote: >>> >>>> Hi There, >>>> >>>> I am implementing a beam pipeline,which is reading events from pub-sub >>>> and using sliding windowing and then combinegloblally. The thing is that I >>>> am not able to test my pipeline on local machine. below is the code sample. >>>> I am not able to feed event_out to event_fi my pipeline gets frozen on >>>> events_out means it produces event_out output on screen and from there it >>>> doesn't process anything it just stuck on the screen. >>>> >>>> event_out = event | "Windowing" >>>> >> beam.WindowInto( >>>> window.SlidingWindows(60 * 60, 10), >>>> trigger=Repeatedly(AfterAny(AfterProcessingTime(10))), >>>> allowed_lateness=0, >>>> accumulation_mode=AccumulationMode.DISCARDING, >>>> ) >>>> | "Combine-globally" >>>> >> beam.CombineGlobally( >>>> lambda e, t: get_agg( >>>> e, t >>>> ), >>>> t=t >>>> ).without_defaults() >>>> ) >>>> >>>> event_fi = ( >>>> records >>>> | "mapping" >>>> >> beam.FlatMap( >>>> fun1, rights=beam.pvalue.AsIter(event_out) >>>> ) >>>> | beam.Map(print) >>>> ) >>>> >>>> >>>> >>>> >>>> Any suggestions or help would be appreciated. >>>> >>>> -- >>> >>> >>> >>> Med Vänlig Hälsning / Best Regards >>> <https://lunar.app/> >>> *Matyas Manninger* >>> *Data Engineer Consultant* >>> +46 76 050 6326 >>> m...@lunar.app, lunar.app >>> CVR: DK36982837 >>> >>