Hi Experts,
        In my self-defined UDAF, I found if I return a null value in UDAF, 
would cause checkpoint fails, the following is the error log:
        I think it is quite a common case to return a null value in UDAF, 
because sometimes no value could be determined, why Flink has such a limitation 
for UDAF return value?
        Thanks a lot!


org.apache.flink.streaming.runtime.tasks.AsynchronousException: 
java.lang.Exception: Could not materialize checkpoint 4 for operator groupBy: 
(DRAFT_ORDER_ID), select: (DRAFT_ORDER_ID, latest_value_long_test($f1, 
LAST_UPDATE_TIME) AS tt) -> select: (CAST(DRAFT_ORDER_ID) AS EXPR$0, 
_UTF-16LE'order' AS EXPR$1, tt, _UTF-16LE'111' AS EXPR$3) -> to: Tuple2 -> 
Sink: Unnamed (2/4).
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884)
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.Exception: Could not materialize checkpoint 4 for operator 
groupBy: (DRAFT_ORDER_ID), select: (DRAFT_ORDER_ID, latest_value_long_test($f1, 
LAST_UPDATE_TIME) AS tt) -> select: (CAST(DRAFT_ORDER_ID) AS EXPR$0, 
_UTF-16LE'order' AS EXPR$1, tt, _UTF-16LE'111' AS EXPR$3) -> to: Tuple2 -> 
Sink: Unnamed (2/4).
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942)
        ... 6 common frames omitted
Caused by: java.util.concurrent.ExecutionException: 
org.apache.flink.types.NullFieldException: Field 0 is null, but expected to 
hold a value.
        at java.util.concurrent.FutureTask.report(FutureTask.java:122)
        at java.util.concurrent.FutureTask.get(FutureTask.java:192)
        at 
org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:53)
        at 
org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:47)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853)
        ... 5 common frames omitted
Caused by: org.apache.flink.types.NullFieldException: Field 0 is null, but 
expected to hold a value.
        at 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:116)
        at 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:32)
        at 
org.apache.flink.api.java.typeutils.runtime.RowSerializer.serialize(RowSerializer.java:161)
        at 
org.apache.flink.api.java.typeutils.runtime.RowSerializer.serialize(RowSerializer.java:47)
        at 
org.apache.flink.runtime.state.heap.CopyOnWriteStateTableSnapshot.lambda$getKeyGroupWriter$0(CopyOnWriteStateTableSnapshot.java:148)
        at 
org.apache.flink.runtime.state.KeyGroupPartitioner$PartitioningResult.writeStateInKeyGroup(KeyGroupPartitioner.java:261)
        at 
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$HeapSnapshotStrategy$1.callInternal(HeapKeyedStateBackend.java:757)
        at 
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$HeapSnapshotStrategy$1.callInternal(HeapKeyedStateBackend.java:724)
        at 
org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:76)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at 
org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:50)
        ... 7 common frames omitted
Caused by: java.lang.NullPointerException: null
        at 
org.apache.flink.api.common.typeutils.base.LongSerializer.serialize(LongSerializer.java:69)
        at 
org.apache.flink.api.common.typeutils.base.LongSerializer.serialize(LongSerializer.java:32)
        at 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:113)
        ... 17 common frames omitted

        


Best
Henry

Reply via email to