I still wasn't able to reproduce the issue.

Can you also clarify:
- Are you starting the job from a savepoint or externalized checkpoint?
- If yes, was the job graph changed?
- What StreamTimeCharacteristic is set, if any?
- What exact version of Flink do you use?

Regards,
Roman


On Fri, Jul 3, 2020 at 6:38 AM Si-li Liu <unix...@gmail.com> wrote:

> Hi, Thanks for your help.
>
> The checkpoint configuration is
>
> checkpoint.intervalMS=300000
> checkpoint.timeoutMS=300000
>
> The error callstack is from JM's log, which happened in every cp.
> Currently I don't have a success cp yet.
>
> Khachatryan Roman <khachatryan.ro...@gmail.com> 于2020年7月3日周五 上午3:50写道:
>
>> Hi,
>>
>> Thanks for the details.
>> However, I was not able to reproduce the issue. I used parallelism levels
>> 4, file system backend and tried different timings for
>> checkpointing, windowing and source.
>> Do you encounter this problem deterministically, is it always 1st
>> checkpoint?
>> What checkpointing interval do you use?
>>
>> Regards,
>> Roman
>>
>>
>> On Thu, Jul 2, 2020 at 1:57 PM Si-li Liu <unix...@gmail.com> wrote:
>>
>>> Hi, this is our production code so I have to modify it a little bit,
>>> such as variable name and function name. I think 3 classes I provide here
>>> is enough.
>>>
>>> I try to join two streams, but I don't want to use the default join
>>> function, because I want to send the joined log immediately and remove it
>>> from window state immediately. And my window gap time is very long( 20
>>> minutes), so it maybe evaluate it multiple times.
>>>
>>> class JoinFunction extends
>>>   ProcessWindowFunction[RawLog, OutputLog, String, TimeWindow]{
>>>
>>>   var ueState: ValueState[RawLog] = _
>>>   @transient var gZipThriftSerializer: GZipThriftSerializer[MyType] = _
>>>   val invalidCounter = new LongCounter()
>>>   val processCounter = new LongCounter()
>>>   val sendToKafkaCounter = new LongCounter()
>>>
>>>   override def open(parameters: Configuration): Unit = {
>>>     ueState = getRuntimeContext.getState(
>>>       new ValueStateDescriptor[RawLog](bidState, classOf[RawLog])
>>>     )
>>>     gZipThriftSerializer = new GZipThriftSerializer[MyType]()
>>>     getRuntimeContext.addAccumulator("processCounter", this.processCounter)
>>>     getRuntimeContext.addAccumulator("invalidCounter", this.invalidCounter)
>>>     getRuntimeContext.addAccumulator("sendToKafkaCounter", 
>>> this.sendToKafkaCounter)
>>>   }
>>>
>>>   override def process(key: String,
>>>                        ctx: Context,
>>>                        logs: Iterable[RawLog],
>>>                        out: Collector[OutputLog]): Unit = {
>>>     if (ueState.value() != null) {
>>>       processCounter.add(1L)
>>>       val bid = ueState.value()
>>>       val bidLog = 
>>> gZipThriftSerializer.decompressAndDeserialize(bid.payload, classOf[MyType])
>>>       logs.foreach( log => {
>>>         if (log.eventType == SHOW) {
>>>           val showLog = 
>>> gZipThriftSerializer.decompressAndDeserialize(log.payload, classOf[MyType])
>>>           sendToKafkaCounter.add(1L)
>>>           out.collect(new OutputLog(ThriftUtils.serialize(showLog), 
>>> Utils.getOutputTopic(showLog)))
>>>         }
>>>       })
>>>     } else {
>>>       invalidCounter.add(1L)
>>>     }
>>>   }
>>> }
>>>
>>> class JoinTrigger extends Trigger[RawLog, TimeWindow] {
>>>
>>>   override def onElement(log: RawLog,
>>>                          timestamp: Long,
>>>                          window: TimeWindow,
>>>                          ctx: Trigger.TriggerContext): TriggerResult = {
>>>     val ueState: ValueState[RawLog] = ctx.getPartitionedState(
>>>       new ValueStateDescriptor[RawLog](bidState, classOf[RawLog])
>>>     )
>>>     val firstSeen: ValueState[Boolean] = ctx.getPartitionedState(
>>>       new ValueStateDescriptor[Boolean](initState, classOf[Boolean]))
>>>
>>>     if (!firstSeen.value()) {
>>>       ctx.registerEventTimeTimer(window.getEnd)
>>>       firstSeen.update(true)
>>>     }
>>>     val eventType = log.eventType
>>>     if (eventType == BID) {
>>>       ueState.update(log)
>>>       TriggerResult.CONTINUE
>>>     } else {
>>>       if (ueState.value() == null) {
>>>         TriggerResult.CONTINUE
>>>       } else {
>>>         TriggerResult.FIRE
>>>       }
>>>     }
>>>   }
>>>
>>>   override def onEventTime(timestamp: Long,
>>>                            window: TimeWindow,
>>>                            ctx: Trigger.TriggerContext): TriggerResult = {
>>>     if (timestamp == window.getEnd) {
>>>       TriggerResult.PURGE
>>>     }
>>>     TriggerResult.CONTINUE
>>>   }
>>>
>>>   override def onProcessingTime(timestamp: Long,
>>>                                 window: TimeWindow,
>>>                                 ctx: Trigger.TriggerContext): TriggerResult 
>>> = {
>>>     TriggerResult.CONTINUE
>>>   }
>>>
>>>   override def clear(window: TimeWindow,
>>>                      ctx: Trigger.TriggerContext): Unit = {
>>>     val ueState: ValueState[RawLog] = ctx.getPartitionedState(
>>>       new ValueStateDescriptor[RawLog](bidState, classOf[RawLog])
>>>     )
>>>     ueState.clear()
>>>
>>>     val firstSeen: ValueState[Boolean] = ctx.getPartitionedState(
>>>       new ValueStateDescriptor[Boolean](initState, classOf[Boolean]))
>>>     firstSeen.clear()
>>>
>>>     ctx.deleteEventTimeTimer(window.getEnd)
>>>   }
>>> }
>>>
>>> class JoinEvictor extends Evictor[RawLog, TimeWindow] {
>>>
>>>   override def evictBefore(elements: JIterable[TimestampedValue[RawLog]],
>>>                            size: Int,
>>>                            window: TimeWindow,
>>>                            evictorContext: Evictor.EvictorContext): Unit = 
>>> {}
>>>
>>>   override def evictAfter(elements: JIterable[TimestampedValue[RawLog]],
>>>                            size: Int,
>>>                            window: TimeWindow,
>>>                            evictorContext: Evictor.EvictorContext): Unit = {
>>>     val iter = elements.iterator()
>>>     while (iter.hasNext) {
>>>       iter.next()
>>>       iter.remove()
>>>     }
>>>   }
>>> }
>>>
>>>
>>> Khachatryan Roman <khachatryan.ro...@gmail.com> 于2020年7月2日周四 下午7:18写道:
>>>
>>>> Thanks for the clarification.
>>>>
>>>> Can you also share the code of other parts, particularly MyFunction?
>>>>
>>>> Regards,
>>>> Roman
>>>>
>>>>
>>>> On Thu, Jul 2, 2020 at 12:49 PM Si-li Liu <unix...@gmail.com> wrote:
>>>>
>>>>> Rocksdb backend has the same problem
>>>>>
>>>>> Khachatryan Roman <khachatryan.ro...@gmail.com> 于2020年7月2日周四 下午6:11写道:
>>>>>
>>>>>> Thanks for reporting this.
>>>>>>
>>>>>> Looks like the window namespace was replaced by VoidNamespace in
>>>>>> state entry.
>>>>>> I've created https://issues.apache.org/jira/browse/FLINK-18464 to
>>>>>> further investigate it.
>>>>>>
>>>>>> Regards,
>>>>>> Roman
>>>>>>
>>>>>>
>>>>>> On Thu, Jul 2, 2020 at 10:52 AM Si-li Liu <unix...@gmail.com> wrote:
>>>>>>
>>>>>>> I'm using flink 1.9 on Mesos and I try to use my own trigger and
>>>>>>> evictor. The state is stored to memory.
>>>>>>>
>>>>>>> input.setParallelism(processParallelism)
>>>>>>>         .assignTimestampsAndWatermarks(new UETimeAssigner)
>>>>>>>         .keyBy(_.key)
>>>>>>>         .window(TumblingEventTimeWindows.of(Time.minutes(20)))
>>>>>>>         .trigger(new MyTrigger)
>>>>>>>         .evictor(new MyEvictor)
>>>>>>>         .process(new MyFunction).setParallelism(aggregateParallelism)
>>>>>>>         .addSink(kafkaSink).setParallelism(sinkParallelism)
>>>>>>>         .name("kafka-record-sink")
>>>>>>>
>>>>>>> And the exception stack is here, could anyone help with this? Thanks!
>>>>>>>
>>>>>>> java.lang.Exception: Could not materialize checkpoint 1 for
>>>>>>> operator Window(TumblingEventTimeWindows(1200000), JoinTrigger,
>>>>>>> JoinEvictor, ScalaProcessWindowFunctionWrapper) -> Sink:
>>>>>>> kafka-record-sink (2/5).
>>>>>>>     at org.apache.flink.streaming.runtime.tasks.
>>>>>>> StreamTask$AsyncCheckpointRunnable.handleExecutionException(
>>>>>>> StreamTask.java:1100)
>>>>>>>     at org.apache.flink.streaming.runtime.tasks.
>>>>>>> StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1042)
>>>>>>>     at java.util.concurrent.ThreadPoolExecutor.runWorker(
>>>>>>> ThreadPoolExecutor.java:1149)
>>>>>>>     at java.util.concurrent.ThreadPoolExecutor$Worker.run(
>>>>>>> ThreadPoolExecutor.java:624)
>>>>>>>     at java.lang.Thread.run(Thread.java:748)
>>>>>>> Caused by: java.util.concurrent.ExecutionException: java.lang.
>>>>>>> ClassCastException:
>>>>>>> org.apache.flink.streaming.api.windowing.windows.TimeWindow cannot
>>>>>>> be cast to org.apache.flink.runtime.state.VoidNamespace
>>>>>>>     at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>>>>>>>     at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>>>>>>>     at org.apache.flink.runtime.concurrent.FutureUtils
>>>>>>> .runIfNotDoneAndGet(FutureUtils.java:450)
>>>>>>>     at org.apache.flink.streaming.api.operators.
>>>>>>> OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:47)
>>>>>>>     at org.apache.flink.streaming.runtime.tasks.
>>>>>>> StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1011)
>>>>>>>     ... 3 more
>>>>>>> Caused by: java.lang.ClassCastException:
>>>>>>> org.apache.flink.streaming.api.windowing.windows.TimeWindow cannot
>>>>>>> be cast to org.apache.flink.runtime.state.VoidNamespace
>>>>>>>     at org.apache.flink.runtime.state.VoidNamespaceSerializer
>>>>>>> .serialize(VoidNamespaceSerializer.java:32)
>>>>>>>     at org.apache.flink.runtime.state.heap.
>>>>>>> CopyOnWriteStateMapSnapshot.writeState(CopyOnWriteStateMapSnapshot
>>>>>>> .java:114)
>>>>>>>     at org.apache.flink.runtime.state.heap.
>>>>>>> AbstractStateTableSnapshot.writeStateInKeyGroup(
>>>>>>> AbstractStateTableSnapshot.java:121)
>>>>>>>     at org.apache.flink.runtime.state.heap.
>>>>>>> CopyOnWriteStateTableSnapshot.writeStateInKeyGroup(
>>>>>>> CopyOnWriteStateTableSnapshot.java:37)
>>>>>>>     at org.apache.flink.runtime.state.heap.HeapSnapshotStrategy$1
>>>>>>> .callInternal(HeapSnapshotStrategy.java:191)
>>>>>>>     at org.apache.flink.runtime.state.heap.HeapSnapshotStrategy$1
>>>>>>> .callInternal(HeapSnapshotStrategy.java:158)
>>>>>>>     at org.apache.flink.runtime.state.AsyncSnapshotCallable.call(
>>>>>>> AsyncSnapshotCallable.java:75)
>>>>>>>     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>>>>>     at org.apache.flink.runtime.concurrent.FutureUtils
>>>>>>> .runIfNotDoneAndGet(FutureUtils.java:447)
>>>>>>>     ... 5 more
>>>>>>>
>>>>>>> --
>>>>>>> Best regards
>>>>>>>
>>>>>>> Sili Liu
>>>>>>>
>>>>>>
>>>>>
>>>>> --
>>>>> Best regards
>>>>>
>>>>> Sili Liu
>>>>>
>>>>
>>>
>>> --
>>> Best regards
>>>
>>> Sili Liu
>>>
>>
>
> --
> Best regards
>
> Sili Liu
>

Reply via email to