Hi Clemens,
first of all can you try to use the MapView within an accumulator POJO
class. This might solve your exception. I'm not sure if we support the
views as top-level accumulators.
In any case this seems to be a bug. I will open an issue once I get you
feedback. We might simply throw an exception for top-level usage then.
Regards,
Timo
On 14.07.21 06:33, Clemens Valiente wrote:
Hi,
we created a new AggregateFunction with Accumulator as Mapview as follows
class CountDistinctAggFunction[T] extends
AggregateFunction[lang.Integer, MapView[T, lang.Integer]] {
override def createAccumulator(): MapView[T, lang.Integer] = {
new MapView[T, lang.Integer]()
}
...
We had NullPointerExceptions happening on
getValue(accumulator: MapView[T, lang.Integer]): lang.Integer
and
def accumulate(accumulator: MapView[T, lang.Integer], key: T): Unit = {
so I added null checks there.
Unfortunately the NPEs are still happening, right after triggering
checkpointing
2021-07-14 04:01:22,340 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] -
Triggering checkpoint 1 (type=CHECKPOINT) @ 1626235282013 for job
0cbe21cce72742ec8e5
e6786aa6b44ca.
2021-07-14 04:02:52,249 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] -
OverAggregate(partitionBy=[entityID], orderBy=[eventTime ASC],
window=[ RANG BETWEEN
3600000 PRECEDING AND CURRENT ROW],
select=[....]
app$functions$table$CountDistinctAggFunction$47dfbce463746500de0b303cff5c947b
AS w0$o0) (5/8) (39c23e4703862c39e513dcb5fd629fb4) switched from
RUNNING to FAILED on 10.195.174.180:6122-ba5b74 (dataPort=45665).
java.lang.NullPointerException: null
at
org.apache.flink.table.data.conversion.RawObjectConverter.toExternal(RawObjectConverter.java:49)
~[feature_hydra-assembly-master-25903391.jar:master-25903391]
at BoundedOverAggregateHelper$946.setAccumulators(Unknown
Source) ~[?:?]
at
org.apache.flink.table.runtime.operators.over.RowTimeRangeBoundedPrecedingFunction.onTimer(RowTimeRangeBoundedPrecedingFunction.java:224)
~[feature_hydra-assembly-master-
25903391.jar:master-25903391]
at
org.apache.flink.streaming.api.operators.KeyedProcessOperator.invokeUserFunction(KeyedProcessOperator.java:91)
~[feature_hydra-assembly-master-25903391.jar:master-2590339
1]
at
org.apache.flink.streaming.api.operators.KeyedProcessOperator.onEventTime(KeyedProcessOperator.java:70)
~[feature_hydra-assembly-master-25903391.jar:master-25903391]
at
org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:302)
~[feature_hydra-assembly-master-25903391.jar:master-
25903391]
at
org.apache.flink.streaming.api.operators.InternalTimeServiceManagerImpl.advanceWatermark(InternalTimeServiceManagerImpl.java:194)
~[feature_hydra-assembly-master-25903391
.jar:master-25903391]
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:626)
~[feature_hydra-assembly-master-25903391.jar:master-2590
3391]
at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitWatermark(OneInputStreamTask.java:197)
~[feature_hydra-assembly-master-25903391.ja
r:master-25903391]
at
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:196)
~[feature_hydra-asse
mbly-master-25903391.jar:master-25903391]
at
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:105)
~[feature_hydra-assembly-master-25903391.jar:master-259
03391]
at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:206)
~[feature_hydra-assembly-master-25903391.jar:master-25903391]
at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:174)
~[feature_hydra-assembly-master-25903391.jar:master-25903391]
at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
~[feature_hydra-assembly-master-25903391.jar:master-25903391]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:396)
~[feature_hydra-assembly-master-25903391.jar:master-25903391]
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191)
~[feature_hydra-assembly-master-25903391.jar:master-25903391]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:617)
~[feature_hydra-assembly-master-25903391.jar:master-25903391]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:581)
~[feature_hydra-assembly-master-25903391.jar:master-25903391]
at
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
~[feature_hydra-assembly-master-25903391.jar:master-25903391]
at
org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
~[feature_hydra-assembly-master-25903391.jar:master-25903391]
at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_282]
reading the stacktrace it looks like the accumulatoronce again is nullhere.
I have no idea how the accumulator ends up null, I don't think this
should be happening. I didn't find any information regarding this
specific issue on google. What is the cause? What can I do to prevent
this from happening?
Running flink 1.12.2 on scala 2.11.12 and jdk 1.8.0_282 on Kubernetes
Cheers
Clemens
By communicating with Grab Inc and/or its subsidiaries, associate
companies and jointly controlled entities (“Grab Group”), you are deemed
to have consented to the processing of your personal data as set out in
the Privacy Notice which can be viewed at https://grab.com/privacy/
<https://grab.com/privacy/>
This email contains confidential information and is only for the
intended recipient(s). If you are not the intended recipient(s), please
do not disseminate, distribute or copy this email Please notify Grab
Group immediately if you have received this by mistake and delete this
email from your system. Email transmission cannot be guaranteed to be
secure or error-free as any information therein could be intercepted,
corrupted, lost, destroyed, delayed or incomplete, or contain viruses.
Grab Group do not accept liability for any errors or omissions in the
contents of this email arises as a result of email transmission. All
intellectual property rights in this email and attachments therein shall
remain vested in Grab Group, unless otherwise provided by law.