Look into triggers. You can trigger windows based on number of elements and
other conditions. Here are the basics:
https://beam.apache.org/documentation/programming-guide/#triggers

BR



Med Vänlig Hälsning / Best Regards
<https://lunar.app/>
*Matyas Manninger*
*Data Engineer Consultant*
+46 76 050 6326
m...@lunar.app, lunar.app
CVR: DK36982837


On Tue, 2 Aug 2022 at 21:33, P Singh <bigdatadevelop...@gmail.com> wrote:

> Hi There,
>
> Actually it's not a runner problem.. If you look at the code, the combiner
> doesn't emit the records until the window ends. In my use-case I can't keep
> a short window to get meaningful records. Here is my question:
>
> How to implement a combiner to successfully emit records many times per
> window?
>
> On Tue, 2 Aug 2022 at 23:23, P Singh <bigdatadevelop...@gmail.com> wrote:
>
>> Hi Mátyás,
>>
>> Ohh okay I can try that but something throws this error too if there's no
>> data in the pub-sub topic.
>>
>> window.WindowFn.AssignContext(source_window.max_timestamp())))[-1]
>> AttributeError: 'NoneType' object has no attribute 'max_timestamp'
>>
>>
>> On Tue, 2 Aug 2022 at 23:03, Mátyás Manninger via user <
>> user@beam.apache.org> wrote:
>>
>>> I had similar issues with the local runner, I deployed the code to
>>> Dataflow and it worked. Try deploying it and see if that helps.
>>>
>>> On 2022. Aug 2., Tue at 16:31, P Singh <bigdatadevelop...@gmail.com>
>>> wrote:
>>>
>>>> Hi There,
>>>>
>>>> I am implementing a beam pipeline,which is reading events from pub-sub
>>>> and using sliding windowing and then combinegloblally. The thing is that I
>>>> am not able to test my pipeline on local machine. below is the code sample.
>>>> I am not able to feed event_out to event_fi my pipeline gets frozen on
>>>> events_out means it produces event_out output on screen and from there it
>>>> doesn't process anything it just stuck on the screen.
>>>>
>>>> event_out = event | "Windowing"
>>>> >> beam.WindowInto(
>>>> window.SlidingWindows(60 * 60, 10),
>>>> trigger=Repeatedly(AfterAny(AfterProcessingTime(10))),
>>>> allowed_lateness=0,
>>>> accumulation_mode=AccumulationMode.DISCARDING,
>>>> )
>>>> | "Combine-globally"
>>>> >> beam.CombineGlobally(
>>>> lambda e, t: get_agg(
>>>> e, t
>>>> ),
>>>> t=t
>>>> ).without_defaults()
>>>> )
>>>>
>>>> event_fi = (
>>>> records
>>>> | "mapping"
>>>> >> beam.FlatMap(
>>>> fun1, rights=beam.pvalue.AsIter(event_out)
>>>> )
>>>> | beam.Map(print)
>>>> )
>>>>
>>>>
>>>>
>>>>
>>>> Any suggestions or help would be appreciated.
>>>>
>>>> --
>>>
>>>
>>>
>>> Med Vänlig Hälsning / Best Regards
>>> <https://lunar.app/>
>>> *Matyas Manninger*
>>> *Data Engineer Consultant*
>>> +46 76 050 6326
>>> m...@lunar.app, lunar.app
>>> CVR: DK36982837
>>>
>>

Reply via email to