Hi,

we created a new AggregateFunction with Accumulator as Mapview as follows

class CountDistinctAggFunction[T] extends AggregateFunction[lang.Integer,
MapView[T, lang.Integer]] {

  override def createAccumulator(): MapView[T, lang.Integer] = {
    new MapView[T, lang.Integer]()
  }
...

We had NullPointerExceptions happening on

getValue(accumulator: MapView[T, lang.Integer]): lang.Integer

and

def accumulate(accumulator: MapView[T, lang.Integer], key: T): Unit = {

so I added null checks there.

Unfortunately the NPEs are still happening, right after triggering
checkpointing

2021-07-14 04:01:22,340 INFO
 org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] -
Triggering checkpoint 1 (type=CHECKPOINT) @ 1626235282013 for job
0cbe21cce72742ec8e5
e6786aa6b44ca.
2021-07-14 04:02:52,249 INFO
 org.apache.flink.runtime.executiongraph.ExecutionGraph       [] -
OverAggregate(partitionBy=[entityID], orderBy=[eventTime ASC], window=[
RANG BETWEEN
3600000 PRECEDING AND CURRENT ROW],
select=[....] 
app$functions$table$CountDistinctAggFunction$47dfbce463746500de0b303cff5c947b
AS w0$o0) (5/8) (39c23e4703862c39e513dcb5fd629fb4) switched from
 RUNNING to FAILED on 10.195.174.180:6122-ba5b74 (dataPort=45665).
java.lang.NullPointerException: null
        at
org.apache.flink.table.data.conversion.RawObjectConverter.toExternal(RawObjectConverter.java:49)
~[feature_hydra-assembly-master-25903391.jar:master-25903391]
        at BoundedOverAggregateHelper$946.setAccumulators(Unknown Source)
~[?:?]
        at
org.apache.flink.table.runtime.operators.over.RowTimeRangeBoundedPrecedingFunction.onTimer(RowTimeRangeBoundedPrecedingFunction.java:224)
~[feature_hydra-assembly-master-
25903391.jar:master-25903391]
        at
org.apache.flink.streaming.api.operators.KeyedProcessOperator.invokeUserFunction(KeyedProcessOperator.java:91)
~[feature_hydra-assembly-master-25903391.jar:master-2590339
1]
        at
org.apache.flink.streaming.api.operators.KeyedProcessOperator.onEventTime(KeyedProcessOperator.java:70)
~[feature_hydra-assembly-master-25903391.jar:master-25903391]
        at
org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:302)
~[feature_hydra-assembly-master-25903391.jar:master-
25903391]
        at
org.apache.flink.streaming.api.operators.InternalTimeServiceManagerImpl.advanceWatermark(InternalTimeServiceManagerImpl.java:194)
~[feature_hydra-assembly-master-25903391
.jar:master-25903391]
        at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:626)
~[feature_hydra-assembly-master-25903391.jar:master-2590
3391]
        at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitWatermark(OneInputStreamTask.java:197)
~[feature_hydra-assembly-master-25903391.ja
r:master-25903391]
        at
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:196)
~[feature_hydra-asse
mbly-master-25903391.jar:master-25903391]
        at
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:105)
~[feature_hydra-assembly-master-25903391.jar:master-259
03391]
        at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:206)
~[feature_hydra-assembly-master-25903391.jar:master-25903391]
        at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:174)
~[feature_hydra-assembly-master-25903391.jar:master-25903391]
        at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
~[feature_hydra-assembly-master-25903391.jar:master-25903391]
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:396)
~[feature_hydra-assembly-master-25903391.jar:master-25903391]
        at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191)
~[feature_hydra-assembly-master-25903391.jar:master-25903391]
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:617)
~[feature_hydra-assembly-master-25903391.jar:master-25903391]
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:581)
~[feature_hydra-assembly-master-25903391.jar:master-25903391]
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
~[feature_hydra-assembly-master-25903391.jar:master-25903391]
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
~[feature_hydra-assembly-master-25903391.jar:master-25903391]
        at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_282]

reading the stacktrace it looks like the accumulator once again is null
here.

I have no idea how the accumulator ends up null, I don't think this should
be happening. I didn't find any information regarding this specific issue
on google. What is the cause? What can I do to prevent this from happening?
Running flink 1.12.2 on scala 2.11.12 and jdk 1.8.0_282 on Kubernetes

Cheers
Clemens

-- 


By communicating with Grab Inc and/or its subsidiaries, associate 
companies and jointly controlled entities (“Grab Group”), you are deemed to 
have consented to the processing of your personal data as set out in the 
Privacy Notice which can be viewed at https://grab.com/privacy/ 
<https://grab.com/privacy/>


This email contains confidential information 
and is only for the intended recipient(s). If you are not the intended 
recipient(s), please do not disseminate, distribute or copy this email 
Please notify Grab Group immediately if you have received this by mistake 
and delete this email from your system. Email transmission cannot be 
guaranteed to be secure or error-free as any information therein could be 
intercepted, corrupted, lost, destroyed, delayed or incomplete, or contain 
viruses. Grab Group do not accept liability for any errors or omissions in 
the contents of this email arises as a result of email transmission. All 
intellectual property rights in this email and attachments therein shall 
remain vested in Grab Group, unless otherwise provided by law.

Reply via email to