Someone told me that maybe this issue is Mesos specific. I'm kind of a
newbie in Flink, and I digged into the code but can not get a conclusion.
Here I just wanna have a better JoinWindow that emits the result and delete
it from the window state immediately when joined successfully, is there any
other way? Thanks!

Congxian Qiu <> 于2020年7月11日周六 下午3:20写道:

> Hi Si-li
> Thanks for the notice.
> I just want to double-check is the original problem has been solved?  As I
> found that the created issue FLINK-18464 has been closed with reason "can
> not reproduce". Am I missing something here?
> Best,
> Congxian
> Si-li Liu <> 于2020年7月10日周五 下午6:06写道:
>> Sorry
>> I can't reproduce it with reduce/aggregate/fold/apply and due to some
>> limitations in my working environment, I can't use flink 1.10 or 1.11.
>> Congxian Qiu <> 于2020年7月5日周日 下午6:21写道:
>>> Hi
>>> First, Could you please try this problem still there if use flink 1.10
>>> or 1.11?
>>> It seems strange, from the error message, here is an error when trying
>>> to convert a non-Window state(VoidNameSpace) to a Window State (serializer
>>> is the serializer of Window state, but the state is non-Window state).
>>> Could you please try to replace the MyFuction with a 
>>> reduce/aggregate/fold/apply()
>>> function to see what happens? -- this wants to narrow down the problem.
>>> Best,
>>> Congxian
>>> Si-li Liu <> 于2020年7月3日周五 下午6:44写道:
>>>> Thanks for your help
>>>> 1. I started the job from scratch, not a savepoint or externalized
>>>> checkpoint
>>>> 2. No job graph change
>>>> 3. env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>>>> 4. My Flink version is 1.9.1
>>>> Khachatryan Roman <> 于2020年7月3日周五 下午4:49写道:
>>>>> I still wasn't able to reproduce the issue.
>>>>> Can you also clarify:
>>>>> - Are you starting the job from a savepoint or externalized
>>>>> checkpoint?
>>>>> - If yes, was the job graph changed?
>>>>> - What StreamTimeCharacteristic is set, if any?
>>>>> - What exact version of Flink do you use?
>>>>> Regards,
>>>>> Roman
>>>>> On Fri, Jul 3, 2020 at 6:38 AM Si-li Liu <> wrote:
>>>>>> Hi, Thanks for your help.
>>>>>> The checkpoint configuration is
>>>>>> checkpoint.intervalMS=300000
>>>>>> checkpoint.timeoutMS=300000
>>>>>> The error callstack is from JM's log, which happened in every cp.
>>>>>> Currently I don't have a success cp yet.
>>>>>> Khachatryan Roman <> 于2020年7月3日周五
>>>>>> 上午3:50写道:
>>>>>>> Hi,
>>>>>>> Thanks for the details.
>>>>>>> However, I was not able to reproduce the issue. I used parallelism
>>>>>>> levels 4, file system backend and tried different timings for
>>>>>>> checkpointing, windowing and source.
>>>>>>> Do you encounter this problem deterministically, is it always 1st
>>>>>>> checkpoint?
>>>>>>> What checkpointing interval do you use?
>>>>>>> Regards,
>>>>>>> Roman
>>>>>>> On Thu, Jul 2, 2020 at 1:57 PM Si-li Liu <> wrote:
>>>>>>>> Hi, this is our production code so I have to modify it a little
>>>>>>>> bit, such as variable name and function name. I think 3 classes I 
>>>>>>>> provide
>>>>>>>> here is enough.
>>>>>>>> I try to join two streams, but I don't want to use the default join
>>>>>>>> function, because I want to send the joined log immediately and remove 
>>>>>>>> it
>>>>>>>> from window state immediately. And my window gap time is very long( 20
>>>>>>>> minutes), so it maybe evaluate it multiple times.
>>>>>>>> class JoinFunction extends
>>>>>>>>   ProcessWindowFunction[RawLog, OutputLog, String, TimeWindow]{
>>>>>>>>   var ueState: ValueState[RawLog] = _
>>>>>>>>   @transient var gZipThriftSerializer: GZipThriftSerializer[MyType] = _
>>>>>>>>   val invalidCounter = new LongCounter()
>>>>>>>>   val processCounter = new LongCounter()
>>>>>>>>   val sendToKafkaCounter = new LongCounter()
>>>>>>>>   override def open(parameters: Configuration): Unit = {
>>>>>>>>     ueState = getRuntimeContext.getState(
>>>>>>>>       new ValueStateDescriptor[RawLog](bidState, classOf[RawLog])
>>>>>>>>     )
>>>>>>>>     gZipThriftSerializer = new GZipThriftSerializer[MyType]()
>>>>>>>>     getRuntimeContext.addAccumulator("processCounter", 
>>>>>>>> this.processCounter)
>>>>>>>>     getRuntimeContext.addAccumulator("invalidCounter", 
>>>>>>>> this.invalidCounter)
>>>>>>>>     getRuntimeContext.addAccumulator("sendToKafkaCounter", 
>>>>>>>> this.sendToKafkaCounter)
>>>>>>>>   }
>>>>>>>>   override def process(key: String,
>>>>>>>>                        ctx: Context,
>>>>>>>>                        logs: Iterable[RawLog],
>>>>>>>>                        out: Collector[OutputLog]): Unit = {
>>>>>>>>     if (ueState.value() != null) {
>>>>>>>>       processCounter.add(1L)
>>>>>>>>       val bid = ueState.value()
>>>>>>>>       val bidLog = 
>>>>>>>> gZipThriftSerializer.decompressAndDeserialize(bid.payload, 
>>>>>>>> classOf[MyType])
>>>>>>>>       logs.foreach( log => {
>>>>>>>>         if (log.eventType == SHOW) {
>>>>>>>>           val showLog = 
>>>>>>>> gZipThriftSerializer.decompressAndDeserialize(log.payload, 
>>>>>>>> classOf[MyType])
>>>>>>>>           sendToKafkaCounter.add(1L)
>>>>>>>>           out.collect(new OutputLog(ThriftUtils.serialize(showLog), 
>>>>>>>> Utils.getOutputTopic(showLog)))
>>>>>>>>         }
>>>>>>>>       })
>>>>>>>>     } else {
>>>>>>>>       invalidCounter.add(1L)
>>>>>>>>     }
>>>>>>>>   }
>>>>>>>> }
>>>>>>>> class JoinTrigger extends Trigger[RawLog, TimeWindow] {
>>>>>>>>   override def onElement(log: RawLog,
>>>>>>>>                          timestamp: Long,
>>>>>>>>                          window: TimeWindow,
>>>>>>>>                          ctx: Trigger.TriggerContext): TriggerResult = 
>>>>>>>> {
>>>>>>>>     val ueState: ValueState[RawLog] = ctx.getPartitionedState(
>>>>>>>>       new ValueStateDescriptor[RawLog](bidState, classOf[RawLog])
>>>>>>>>     )
>>>>>>>>     val firstSeen: ValueState[Boolean] = ctx.getPartitionedState(
>>>>>>>>       new ValueStateDescriptor[Boolean](initState, classOf[Boolean]))
>>>>>>>>     if (!firstSeen.value()) {
>>>>>>>>       ctx.registerEventTimeTimer(window.getEnd)
>>>>>>>>       firstSeen.update(true)
>>>>>>>>     }
>>>>>>>>     val eventType = log.eventType
>>>>>>>>     if (eventType == BID) {
>>>>>>>>       ueState.update(log)
>>>>>>>>       TriggerResult.CONTINUE
>>>>>>>>     } else {
>>>>>>>>       if (ueState.value() == null) {
>>>>>>>>         TriggerResult.CONTINUE
>>>>>>>>       } else {
>>>>>>>>         TriggerResult.FIRE
>>>>>>>>       }
>>>>>>>>     }
>>>>>>>>   }
>>>>>>>>   override def onEventTime(timestamp: Long,
>>>>>>>>                            window: TimeWindow,
>>>>>>>>                            ctx: Trigger.TriggerContext): TriggerResult 
>>>>>>>> = {
>>>>>>>>     if (timestamp == window.getEnd) {
>>>>>>>>       TriggerResult.PURGE
>>>>>>>>     }
>>>>>>>>     TriggerResult.CONTINUE
>>>>>>>>   }
>>>>>>>>   override def onProcessingTime(timestamp: Long,
>>>>>>>>                                 window: TimeWindow,
>>>>>>>>                                 ctx: Trigger.TriggerContext): 
>>>>>>>> TriggerResult = {
>>>>>>>>     TriggerResult.CONTINUE
>>>>>>>>   }
>>>>>>>>   override def clear(window: TimeWindow,
>>>>>>>>                      ctx: Trigger.TriggerContext): Unit = {
>>>>>>>>     val ueState: ValueState[RawLog] = ctx.getPartitionedState(
>>>>>>>>       new ValueStateDescriptor[RawLog](bidState, classOf[RawLog])
>>>>>>>>     )
>>>>>>>>     ueState.clear()
>>>>>>>>     val firstSeen: ValueState[Boolean] = ctx.getPartitionedState(
>>>>>>>>       new ValueStateDescriptor[Boolean](initState, classOf[Boolean]))
>>>>>>>>     firstSeen.clear()
>>>>>>>>     ctx.deleteEventTimeTimer(window.getEnd)
>>>>>>>>   }
>>>>>>>> }
>>>>>>>> class JoinEvictor extends Evictor[RawLog, TimeWindow] {
>>>>>>>>   override def evictBefore(elements: 
>>>>>>>> JIterable[TimestampedValue[RawLog]],
>>>>>>>>                            size: Int,
>>>>>>>>                            window: TimeWindow,
>>>>>>>>                            evictorContext: Evictor.EvictorContext): 
>>>>>>>> Unit = {}
>>>>>>>>   override def evictAfter(elements: 
>>>>>>>> JIterable[TimestampedValue[RawLog]],
>>>>>>>>                            size: Int,
>>>>>>>>                            window: TimeWindow,
>>>>>>>>                            evictorContext: Evictor.EvictorContext): 
>>>>>>>> Unit = {
>>>>>>>>     val iter = elements.iterator()
>>>>>>>>     while (iter.hasNext) {
>>>>>>>>       iter.remove()
>>>>>>>>     }
>>>>>>>>   }
>>>>>>>> }
>>>>>>>> Khachatryan Roman <> 于2020年7月2日周四
>>>>>>>> 下午7:18写道:
>>>>>>>>> Thanks for the clarification.
>>>>>>>>> Can you also share the code of other parts, particularly
>>>>>>>>> MyFunction?
>>>>>>>>> Regards,
>>>>>>>>> Roman
>>>>>>>>> On Thu, Jul 2, 2020 at 12:49 PM Si-li Liu <>
>>>>>>>>> wrote:
>>>>>>>>>> Rocksdb backend has the same problem
>>>>>>>>>> Khachatryan Roman <> 于2020年7月2日周四
>>>>>>>>>> 下午6:11写道:
>>>>>>>>>>> Thanks for reporting this.
>>>>>>>>>>> Looks like the window namespace was replaced by VoidNamespace in
>>>>>>>>>>> state entry.
>>>>>>>>>>> I've created to
>>>>>>>>>>> further investigate it.
>>>>>>>>>>> Regards,
>>>>>>>>>>> Roman
>>>>>>>>>>> On Thu, Jul 2, 2020 at 10:52 AM Si-li Liu <>
>>>>>>>>>>> wrote:
>>>>>>>>>>>> I'm using flink 1.9 on Mesos and I try to use my own trigger
>>>>>>>>>>>> and evictor. The state is stored to memory.
>>>>>>>>>>>> input.setParallelism(processParallelism)
>>>>>>>>>>>>         .assignTimestampsAndWatermarks(new UETimeAssigner)
>>>>>>>>>>>>         .keyBy(_.key)
>>>>>>>>>>>>         .window(TumblingEventTimeWindows.of(Time.minutes(20)))
>>>>>>>>>>>>         .trigger(new MyTrigger)
>>>>>>>>>>>>         .evictor(new MyEvictor)
>>>>>>>>>>>>         .process(new
>>>>>>>>>>>> MyFunction).setParallelism(aggregateParallelism)
>>>>>>>>>>>>         .addSink(kafkaSink).setParallelism(sinkParallelism)
>>>>>>>>>>>>         .name("kafka-record-sink")
>>>>>>>>>>>> And the exception stack is here, could anyone help with this?
>>>>>>>>>>>> Thanks!
>>>>>>>>>>>> java.lang.Exception: Could not materialize checkpoint 1 for
>>>>>>>>>>>> operator Window(TumblingEventTimeWindows(1200000), JoinTrigger,
>>>>>>>>>>>> JoinEvictor, ScalaProcessWindowFunctionWrapper) -> Sink:
>>>>>>>>>>>> kafka-record-sink (2/5).
>>>>>>>>>>>>     at org.apache.flink.streaming.runtime.tasks.
>>>>>>>>>>>> StreamTask$AsyncCheckpointRunnable.handleExecutionException(
>>>>>>>>>>>>     at org.apache.flink.streaming.runtime.tasks.
>>>>>>>>>>>> StreamTask$
>>>>>>>>>>>>     at java.util.concurrent.ThreadPoolExecutor.runWorker(
>>>>>>>>>>>>     at java.util.concurrent.ThreadPoolExecutor$
>>>>>>>>>>>>     at
>>>>>>>>>>>> Caused by: java.util.concurrent.ExecutionException: java.lang.
>>>>>>>>>>>> ClassCastException:
>>>>>>>>>>>> cannot be cast to org.apache.flink.runtime.state.VoidNamespace
>>>>>>>>>>>>     at
>>>>>>>>>>>> 122)
>>>>>>>>>>>>     at java.util.concurrent.FutureTask.get(
>>>>>>>>>>>>     at org.apache.flink.runtime.concurrent.FutureUtils
>>>>>>>>>>>> .runIfNotDoneAndGet(
>>>>>>>>>>>>     at org.apache.flink.streaming.api.operators.
>>>>>>>>>>>> OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer
>>>>>>>>>>>> .java:47)
>>>>>>>>>>>>     at org.apache.flink.streaming.runtime.tasks.
>>>>>>>>>>>> StreamTask$
>>>>>>>>>>>>     ... 3 more
>>>>>>>>>>>> Caused by: java.lang.ClassCastException:
>>>>>>>>>>>> cannot be cast to org.apache.flink.runtime.state.VoidNamespace
>>>>>>>>>>>>     at org.apache.flink.runtime.state.VoidNamespaceSerializer
>>>>>>>>>>>> .serialize(
>>>>>>>>>>>>     at org.apache.flink.runtime.state.heap.
>>>>>>>>>>>> CopyOnWriteStateMapSnapshot.writeState(
>>>>>>>>>>>>     at org.apache.flink.runtime.state.heap.
>>>>>>>>>>>> AbstractStateTableSnapshot.writeStateInKeyGroup(
>>>>>>>>>>>>     at org.apache.flink.runtime.state.heap.
>>>>>>>>>>>> CopyOnWriteStateTableSnapshot.writeStateInKeyGroup(
>>>>>>>>>>>>     at org.apache.flink.runtime.state.heap.
>>>>>>>>>>>> HeapSnapshotStrategy$1.callInternal(
>>>>>>>>>>>> 191)
>>>>>>>>>>>>     at org.apache.flink.runtime.state.heap.
>>>>>>>>>>>> HeapSnapshotStrategy$1.callInternal(
>>>>>>>>>>>> 158)
>>>>>>>>>>>>     at org.apache.flink.runtime.state.AsyncSnapshotCallable
>>>>>>>>>>>> .call(
>>>>>>>>>>>>     at
>>>>>>>>>>>>     at org.apache.flink.runtime.concurrent.FutureUtils
>>>>>>>>>>>> .runIfNotDoneAndGet(
>>>>>>>>>>>>     ... 5 more
>>>>>>>>>>>> --
>>>>>>>>>>>> Best regards
>>>>>>>>>>>> Sili Liu
>>>>>>>>>> --
>>>>>>>>>> Best regards
>>>>>>>>>> Sili Liu
>>>>>>>> --
>>>>>>>> Best regards
>>>>>>>> Sili Liu
>>>>>> --
>>>>>> Best regards
>>>>>> Sili Liu
>>>> --
>>>> Best regards
>>>> Sili Liu
>> --
>> Best regards
>> Sili Liu

Best regards

Sili Liu

Reply via email to