Hi

First, Could you please try this problem still there if use flink 1.10 or
1.11?

It seems strange, from the error message, here is an error when trying to
convert a non-Window state(VoidNameSpace) to a Window State (serializer is
the serializer of Window state, but the state is non-Window state).
Could you please try to replace the MyFuction with a
reduce/aggregate/fold/apply()
function to see what happens? -- this wants to narrow down the problem.

Best,
Congxian


Si-li Liu <unix...@gmail.com> 于2020年7月3日周五 下午6:44写道:

> Thanks for your help
>
> 1. I started the job from scratch, not a savepoint or externalized
> checkpoint
> 2. No job graph change
> 3. env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> 4. My Flink version is 1.9.1
>
> Khachatryan Roman <khachatryan.ro...@gmail.com> 于2020年7月3日周五 下午4:49写道:
>
>> I still wasn't able to reproduce the issue.
>>
>> Can you also clarify:
>> - Are you starting the job from a savepoint or externalized checkpoint?
>> - If yes, was the job graph changed?
>> - What StreamTimeCharacteristic is set, if any?
>> - What exact version of Flink do you use?
>>
>> Regards,
>> Roman
>>
>>
>> On Fri, Jul 3, 2020 at 6:38 AM Si-li Liu <unix...@gmail.com> wrote:
>>
>>> Hi, Thanks for your help.
>>>
>>> The checkpoint configuration is
>>>
>>> checkpoint.intervalMS=300000
>>> checkpoint.timeoutMS=300000
>>>
>>> The error callstack is from JM's log, which happened in every cp.
>>> Currently I don't have a success cp yet.
>>>
>>> Khachatryan Roman <khachatryan.ro...@gmail.com> 于2020年7月3日周五 上午3:50写道:
>>>
>>>> Hi,
>>>>
>>>> Thanks for the details.
>>>> However, I was not able to reproduce the issue. I used parallelism
>>>> levels 4, file system backend and tried different timings for
>>>> checkpointing, windowing and source.
>>>> Do you encounter this problem deterministically, is it always 1st
>>>> checkpoint?
>>>> What checkpointing interval do you use?
>>>>
>>>> Regards,
>>>> Roman
>>>>
>>>>
>>>> On Thu, Jul 2, 2020 at 1:57 PM Si-li Liu <unix...@gmail.com> wrote:
>>>>
>>>>> Hi, this is our production code so I have to modify it a little bit,
>>>>> such as variable name and function name. I think 3 classes I provide here
>>>>> is enough.
>>>>>
>>>>> I try to join two streams, but I don't want to use the default join
>>>>> function, because I want to send the joined log immediately and remove it
>>>>> from window state immediately. And my window gap time is very long( 20
>>>>> minutes), so it maybe evaluate it multiple times.
>>>>>
>>>>> class JoinFunction extends
>>>>>   ProcessWindowFunction[RawLog, OutputLog, String, TimeWindow]{
>>>>>
>>>>>   var ueState: ValueState[RawLog] = _
>>>>>   @transient var gZipThriftSerializer: GZipThriftSerializer[MyType] = _
>>>>>   val invalidCounter = new LongCounter()
>>>>>   val processCounter = new LongCounter()
>>>>>   val sendToKafkaCounter = new LongCounter()
>>>>>
>>>>>   override def open(parameters: Configuration): Unit = {
>>>>>     ueState = getRuntimeContext.getState(
>>>>>       new ValueStateDescriptor[RawLog](bidState, classOf[RawLog])
>>>>>     )
>>>>>     gZipThriftSerializer = new GZipThriftSerializer[MyType]()
>>>>>     getRuntimeContext.addAccumulator("processCounter", 
>>>>> this.processCounter)
>>>>>     getRuntimeContext.addAccumulator("invalidCounter", 
>>>>> this.invalidCounter)
>>>>>     getRuntimeContext.addAccumulator("sendToKafkaCounter", 
>>>>> this.sendToKafkaCounter)
>>>>>   }
>>>>>
>>>>>   override def process(key: String,
>>>>>                        ctx: Context,
>>>>>                        logs: Iterable[RawLog],
>>>>>                        out: Collector[OutputLog]): Unit = {
>>>>>     if (ueState.value() != null) {
>>>>>       processCounter.add(1L)
>>>>>       val bid = ueState.value()
>>>>>       val bidLog = 
>>>>> gZipThriftSerializer.decompressAndDeserialize(bid.payload, 
>>>>> classOf[MyType])
>>>>>       logs.foreach( log => {
>>>>>         if (log.eventType == SHOW) {
>>>>>           val showLog = 
>>>>> gZipThriftSerializer.decompressAndDeserialize(log.payload, 
>>>>> classOf[MyType])
>>>>>           sendToKafkaCounter.add(1L)
>>>>>           out.collect(new OutputLog(ThriftUtils.serialize(showLog), 
>>>>> Utils.getOutputTopic(showLog)))
>>>>>         }
>>>>>       })
>>>>>     } else {
>>>>>       invalidCounter.add(1L)
>>>>>     }
>>>>>   }
>>>>> }
>>>>>
>>>>> class JoinTrigger extends Trigger[RawLog, TimeWindow] {
>>>>>
>>>>>   override def onElement(log: RawLog,
>>>>>                          timestamp: Long,
>>>>>                          window: TimeWindow,
>>>>>                          ctx: Trigger.TriggerContext): TriggerResult = {
>>>>>     val ueState: ValueState[RawLog] = ctx.getPartitionedState(
>>>>>       new ValueStateDescriptor[RawLog](bidState, classOf[RawLog])
>>>>>     )
>>>>>     val firstSeen: ValueState[Boolean] = ctx.getPartitionedState(
>>>>>       new ValueStateDescriptor[Boolean](initState, classOf[Boolean]))
>>>>>
>>>>>     if (!firstSeen.value()) {
>>>>>       ctx.registerEventTimeTimer(window.getEnd)
>>>>>       firstSeen.update(true)
>>>>>     }
>>>>>     val eventType = log.eventType
>>>>>     if (eventType == BID) {
>>>>>       ueState.update(log)
>>>>>       TriggerResult.CONTINUE
>>>>>     } else {
>>>>>       if (ueState.value() == null) {
>>>>>         TriggerResult.CONTINUE
>>>>>       } else {
>>>>>         TriggerResult.FIRE
>>>>>       }
>>>>>     }
>>>>>   }
>>>>>
>>>>>   override def onEventTime(timestamp: Long,
>>>>>                            window: TimeWindow,
>>>>>                            ctx: Trigger.TriggerContext): TriggerResult = {
>>>>>     if (timestamp == window.getEnd) {
>>>>>       TriggerResult.PURGE
>>>>>     }
>>>>>     TriggerResult.CONTINUE
>>>>>   }
>>>>>
>>>>>   override def onProcessingTime(timestamp: Long,
>>>>>                                 window: TimeWindow,
>>>>>                                 ctx: Trigger.TriggerContext): 
>>>>> TriggerResult = {
>>>>>     TriggerResult.CONTINUE
>>>>>   }
>>>>>
>>>>>   override def clear(window: TimeWindow,
>>>>>                      ctx: Trigger.TriggerContext): Unit = {
>>>>>     val ueState: ValueState[RawLog] = ctx.getPartitionedState(
>>>>>       new ValueStateDescriptor[RawLog](bidState, classOf[RawLog])
>>>>>     )
>>>>>     ueState.clear()
>>>>>
>>>>>     val firstSeen: ValueState[Boolean] = ctx.getPartitionedState(
>>>>>       new ValueStateDescriptor[Boolean](initState, classOf[Boolean]))
>>>>>     firstSeen.clear()
>>>>>
>>>>>     ctx.deleteEventTimeTimer(window.getEnd)
>>>>>   }
>>>>> }
>>>>>
>>>>> class JoinEvictor extends Evictor[RawLog, TimeWindow] {
>>>>>
>>>>>   override def evictBefore(elements: JIterable[TimestampedValue[RawLog]],
>>>>>                            size: Int,
>>>>>                            window: TimeWindow,
>>>>>                            evictorContext: Evictor.EvictorContext): Unit 
>>>>> = {}
>>>>>
>>>>>   override def evictAfter(elements: JIterable[TimestampedValue[RawLog]],
>>>>>                            size: Int,
>>>>>                            window: TimeWindow,
>>>>>                            evictorContext: Evictor.EvictorContext): Unit 
>>>>> = {
>>>>>     val iter = elements.iterator()
>>>>>     while (iter.hasNext) {
>>>>>       iter.next()
>>>>>       iter.remove()
>>>>>     }
>>>>>   }
>>>>> }
>>>>>
>>>>>
>>>>> Khachatryan Roman <khachatryan.ro...@gmail.com> 于2020年7月2日周四 下午7:18写道:
>>>>>
>>>>>> Thanks for the clarification.
>>>>>>
>>>>>> Can you also share the code of other parts, particularly MyFunction?
>>>>>>
>>>>>> Regards,
>>>>>> Roman
>>>>>>
>>>>>>
>>>>>> On Thu, Jul 2, 2020 at 12:49 PM Si-li Liu <unix...@gmail.com> wrote:
>>>>>>
>>>>>>> Rocksdb backend has the same problem
>>>>>>>
>>>>>>> Khachatryan Roman <khachatryan.ro...@gmail.com> 于2020年7月2日周四
>>>>>>> 下午6:11写道:
>>>>>>>
>>>>>>>> Thanks for reporting this.
>>>>>>>>
>>>>>>>> Looks like the window namespace was replaced by VoidNamespace in
>>>>>>>> state entry.
>>>>>>>> I've created https://issues.apache.org/jira/browse/FLINK-18464 to
>>>>>>>> further investigate it.
>>>>>>>>
>>>>>>>> Regards,
>>>>>>>> Roman
>>>>>>>>
>>>>>>>>
>>>>>>>> On Thu, Jul 2, 2020 at 10:52 AM Si-li Liu <unix...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> I'm using flink 1.9 on Mesos and I try to use my own trigger and
>>>>>>>>> evictor. The state is stored to memory.
>>>>>>>>>
>>>>>>>>> input.setParallelism(processParallelism)
>>>>>>>>>         .assignTimestampsAndWatermarks(new UETimeAssigner)
>>>>>>>>>         .keyBy(_.key)
>>>>>>>>>         .window(TumblingEventTimeWindows.of(Time.minutes(20)))
>>>>>>>>>         .trigger(new MyTrigger)
>>>>>>>>>         .evictor(new MyEvictor)
>>>>>>>>>         .process(new
>>>>>>>>> MyFunction).setParallelism(aggregateParallelism)
>>>>>>>>>         .addSink(kafkaSink).setParallelism(sinkParallelism)
>>>>>>>>>         .name("kafka-record-sink")
>>>>>>>>>
>>>>>>>>> And the exception stack is here, could anyone help with this?
>>>>>>>>> Thanks!
>>>>>>>>>
>>>>>>>>> java.lang.Exception: Could not materialize checkpoint 1 for
>>>>>>>>> operator Window(TumblingEventTimeWindows(1200000), JoinTrigger,
>>>>>>>>> JoinEvictor, ScalaProcessWindowFunctionWrapper) -> Sink:
>>>>>>>>> kafka-record-sink (2/5).
>>>>>>>>>     at org.apache.flink.streaming.runtime.tasks.
>>>>>>>>> StreamTask$AsyncCheckpointRunnable.handleExecutionException(
>>>>>>>>> StreamTask.java:1100)
>>>>>>>>>     at org.apache.flink.streaming.runtime.tasks.
>>>>>>>>> StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1042)
>>>>>>>>>     at java.util.concurrent.ThreadPoolExecutor.runWorker(
>>>>>>>>> ThreadPoolExecutor.java:1149)
>>>>>>>>>     at java.util.concurrent.ThreadPoolExecutor$Worker.run(
>>>>>>>>> ThreadPoolExecutor.java:624)
>>>>>>>>>     at java.lang.Thread.run(Thread.java:748)
>>>>>>>>> Caused by: java.util.concurrent.ExecutionException: java.lang.
>>>>>>>>> ClassCastException:
>>>>>>>>> org.apache.flink.streaming.api.windowing.windows.TimeWindow
>>>>>>>>> cannot be cast to org.apache.flink.runtime.state.VoidNamespace
>>>>>>>>>     at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>>>>>>>>>     at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>>>>>>>>>     at org.apache.flink.runtime.concurrent.FutureUtils
>>>>>>>>> .runIfNotDoneAndGet(FutureUtils.java:450)
>>>>>>>>>     at org.apache.flink.streaming.api.operators.
>>>>>>>>> OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:47
>>>>>>>>> )
>>>>>>>>>     at org.apache.flink.streaming.runtime.tasks.
>>>>>>>>> StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1011)
>>>>>>>>>     ... 3 more
>>>>>>>>> Caused by: java.lang.ClassCastException:
>>>>>>>>> org.apache.flink.streaming.api.windowing.windows.TimeWindow
>>>>>>>>> cannot be cast to org.apache.flink.runtime.state.VoidNamespace
>>>>>>>>>     at org.apache.flink.runtime.state.VoidNamespaceSerializer
>>>>>>>>> .serialize(VoidNamespaceSerializer.java:32)
>>>>>>>>>     at org.apache.flink.runtime.state.heap.
>>>>>>>>> CopyOnWriteStateMapSnapshot.writeState(CopyOnWriteStateMapSnapshot
>>>>>>>>> .java:114)
>>>>>>>>>     at org.apache.flink.runtime.state.heap.
>>>>>>>>> AbstractStateTableSnapshot.writeStateInKeyGroup(
>>>>>>>>> AbstractStateTableSnapshot.java:121)
>>>>>>>>>     at org.apache.flink.runtime.state.heap.
>>>>>>>>> CopyOnWriteStateTableSnapshot.writeStateInKeyGroup(
>>>>>>>>> CopyOnWriteStateTableSnapshot.java:37)
>>>>>>>>>     at org.apache.flink.runtime.state.heap.HeapSnapshotStrategy$1
>>>>>>>>> .callInternal(HeapSnapshotStrategy.java:191)
>>>>>>>>>     at org.apache.flink.runtime.state.heap.HeapSnapshotStrategy$1
>>>>>>>>> .callInternal(HeapSnapshotStrategy.java:158)
>>>>>>>>>     at org.apache.flink.runtime.state.AsyncSnapshotCallable.call(
>>>>>>>>> AsyncSnapshotCallable.java:75)
>>>>>>>>>     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>>>>>>>     at org.apache.flink.runtime.concurrent.FutureUtils
>>>>>>>>> .runIfNotDoneAndGet(FutureUtils.java:447)
>>>>>>>>>     ... 5 more
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> Best regards
>>>>>>>>>
>>>>>>>>> Sili Liu
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Best regards
>>>>>>>
>>>>>>> Sili Liu
>>>>>>>
>>>>>>
>>>>>
>>>>> --
>>>>> Best regards
>>>>>
>>>>> Sili Liu
>>>>>
>>>>
>>>
>>> --
>>> Best regards
>>>
>>> Sili Liu
>>>
>>
>
> --
> Best regards
>
> Sili Liu
>

Reply via email to