Hi There,

Actually it's not a runner problem.. If you look at the code, the combiner
doesn't emit the records until the window ends. In my use-case I can't keep
a short window to get meaningful records. Here is my question:
How to implement a combiner to successfully emit records many times per
window?

On Tue, 2 Aug 2022 at 23:23, P Singh <bigdatadevelop...@gmail.com> wrote:

> Hi Mátyás,
>
> Ohh okay I can try that but something throws this error too if there's no
> data in the pub-sub topic.
>
> window.WindowFn.AssignContext(source_window.max_timestamp())))[-1]
> AttributeError: 'NoneType' object has no attribute 'max_timestamp'
>
>
> On Tue, 2 Aug 2022 at 23:03, Mátyás Manninger via user <
> user@beam.apache.org> wrote:
>
>> I had similar issues with the local runner, I deployed the code to
>> Dataflow and it worked. Try deploying it and see if that helps.
>>
>> On 2022. Aug 2., Tue at 16:31, P Singh <bigdatadevelop...@gmail.com>
>> wrote:
>>
>>> Hi There,
>>>
>>> I am implementing a beam pipeline,which is reading events from pub-sub
>>> and using sliding windowing and then combinegloblally. The thing is that I
>>> am not able to test my pipeline on local machine. below is the code sample.
>>> I am not able to feed event_out to event_fi my pipeline gets frozen on
>>> events_out means it produces event_out output on screen and from there it
>>> doesn't process anything it just stuck on the screen.
>>>
>>> event_out = event | "Windowing"
>>> >> beam.WindowInto(
>>> window.SlidingWindows(60 * 60, 10),
>>> trigger=Repeatedly(AfterAny(AfterProcessingTime(10))),
>>> allowed_lateness=0,
>>> accumulation_mode=AccumulationMode.DISCARDING,
>>> )
>>> | "Combine-globally"
>>> >> beam.CombineGlobally(
>>> lambda e, t: get_agg(
>>> e, t
>>> ),
>>> t=t
>>> ).without_defaults()
>>> )
>>>
>>> event_fi = (
>>> records
>>> | "mapping"
>>> >> beam.FlatMap(
>>> fun1, rights=beam.pvalue.AsIter(event_out)
>>> )
>>> | beam.Map(print)
>>> )
>>>
>>>
>>>
>>>
>>> Any suggestions or help would be appreciated.
>>>
>>> --
>>
>>
>>
>> Med Vänlig Hälsning / Best Regards
>> <https://lunar.app/>
>> *Matyas Manninger*
>> *Data Engineer Consultant*
>> +46 76 050 6326
>> m...@lunar.app, lunar.app
>> CVR: DK36982837
>>
>

Reply via email to