Hi Clemens,

first of all can you try to use the MapView within an accumulator POJO class. This might solve your exception. I'm not sure if we support the views as top-level accumulators.

In any case this seems to be a bug. I will open an issue once I get you feedback. We might simply throw an exception for top-level usage then.

Regards,
Timo



On 14.07.21 06:33, Clemens Valiente wrote:
Hi,

we created a new AggregateFunction with Accumulator as Mapview as follows

    class CountDistinctAggFunction[T] extends
    AggregateFunction[lang.Integer, MapView[T, lang.Integer]] {

       override def createAccumulator(): MapView[T, lang.Integer] = {
         new MapView[T, lang.Integer]()
       }
    ...

We had NullPointerExceptions happening on

    getValue(accumulator: MapView[T, lang.Integer]): lang.Integer

and

    def accumulate(accumulator: MapView[T, lang.Integer], key: T): Unit = {

so I added null checks there.

Unfortunately the NPEs are still happening, right after triggering checkpointing

    2021-07-14 04:01:22,340 INFO
      org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] -
    Triggering checkpoint 1 (type=CHECKPOINT) @ 1626235282013 for job
    0cbe21cce72742ec8e5
    e6786aa6b44ca.
    2021-07-14 04:02:52,249 INFO
      org.apache.flink.runtime.executiongraph.ExecutionGraph       [] -
    OverAggregate(partitionBy=[entityID], orderBy=[eventTime ASC],
    window=[ RANG BETWEEN
    3600000 PRECEDING AND CURRENT ROW],
    select=[....] 
app$functions$table$CountDistinctAggFunction$47dfbce463746500de0b303cff5c947b
    AS w0$o0) (5/8) (39c23e4703862c39e513dcb5fd629fb4) switched from
      RUNNING to FAILED on 10.195.174.180:6122-ba5b74 (dataPort=45665).
    java.lang.NullPointerException: null
             at
    
org.apache.flink.table.data.conversion.RawObjectConverter.toExternal(RawObjectConverter.java:49)
    ~[feature_hydra-assembly-master-25903391.jar:master-25903391]
             at BoundedOverAggregateHelper$946.setAccumulators(Unknown
    Source) ~[?:?]
             at
    
org.apache.flink.table.runtime.operators.over.RowTimeRangeBoundedPrecedingFunction.onTimer(RowTimeRangeBoundedPrecedingFunction.java:224)
    ~[feature_hydra-assembly-master-
    25903391.jar:master-25903391]
             at
    
org.apache.flink.streaming.api.operators.KeyedProcessOperator.invokeUserFunction(KeyedProcessOperator.java:91)
    ~[feature_hydra-assembly-master-25903391.jar:master-2590339
    1]
             at
    
org.apache.flink.streaming.api.operators.KeyedProcessOperator.onEventTime(KeyedProcessOperator.java:70)
    ~[feature_hydra-assembly-master-25903391.jar:master-25903391]
             at
    
org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:302)
    ~[feature_hydra-assembly-master-25903391.jar:master-
    25903391]
             at
    
org.apache.flink.streaming.api.operators.InternalTimeServiceManagerImpl.advanceWatermark(InternalTimeServiceManagerImpl.java:194)
    ~[feature_hydra-assembly-master-25903391
    .jar:master-25903391]
             at
    
org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:626)
    ~[feature_hydra-assembly-master-25903391.jar:master-2590
    3391]
             at
    
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitWatermark(OneInputStreamTask.java:197)
    ~[feature_hydra-assembly-master-25903391.ja
    r:master-25903391]
             at
    
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:196)
    ~[feature_hydra-asse
    mbly-master-25903391.jar:master-25903391]
             at
    
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:105)
    ~[feature_hydra-assembly-master-25903391.jar:master-259
    03391]
             at
    
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:206)
    ~[feature_hydra-assembly-master-25903391.jar:master-25903391]
             at
    
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:174)
    ~[feature_hydra-assembly-master-25903391.jar:master-25903391]
             at
    
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
    ~[feature_hydra-assembly-master-25903391.jar:master-25903391]
             at
    
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:396)
    ~[feature_hydra-assembly-master-25903391.jar:master-25903391]
             at
    
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191)
    ~[feature_hydra-assembly-master-25903391.jar:master-25903391]
             at
    
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:617)
    ~[feature_hydra-assembly-master-25903391.jar:master-25903391]
             at
    
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:581)
    ~[feature_hydra-assembly-master-25903391.jar:master-25903391]
             at
    org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
    ~[feature_hydra-assembly-master-25903391.jar:master-25903391]
             at
    org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
    ~[feature_hydra-assembly-master-25903391.jar:master-25903391]
             at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_282]

reading the stacktrace it looks like the accumulatoronce again is nullhere.

I have no idea how the accumulator ends up null, I don't think this should be happening. I didn't find any information regarding this specific issue on google. What is the cause? What can I do to prevent this from happening?
Running flink 1.12.2 on scala 2.11.12 and jdk 1.8.0_282 on Kubernetes

Cheers
Clemens


By communicating with Grab Inc and/or its subsidiaries, associate companies and jointly controlled entities (“Grab Group”), you are deemed to have consented to the processing of your personal data as set out in the Privacy Notice which can be viewed at https://grab.com/privacy/ <https://grab.com/privacy/>

This email contains confidential information and is only for the intended recipient(s). If you are not the intended recipient(s), please do not disseminate, distribute or copy this email Please notify Grab Group immediately if you have received this by mistake and delete this email from your system. Email transmission cannot be guaranteed to be secure or error-free as any information therein could be intercepted, corrupted, lost, destroyed, delayed or incomplete, or contain viruses. Grab Group do not accept liability for any errors or omissions in the contents of this email arises as a result of email transmission. All intellectual property rights in this email and attachments therein shall remain vested in Grab Group, unless otherwise provided by law.

Reply via email to