Someone told me that maybe this issue is Mesos specific. I'm kind of a
newbie in Flink, and I digged into the code but can not get a conclusion.
Here I just wanna have a better JoinWindow that emits the result and delete
it from the window state immediately when joined successfully, is there any
other way? Thanks!

Congxian Qiu <qcx978132...@gmail.com> 于2020年7月11日周六 下午3:20写道:

> Hi Si-li
>
> Thanks for the notice.
> I just want to double-check is the original problem has been solved?  As I
> found that the created issue FLINK-18464 has been closed with reason "can
> not reproduce". Am I missing something here?
>
> Best,
> Congxian
>
>
> Si-li Liu <unix...@gmail.com> 于2020年7月10日周五 下午6:06写道:
>
>> Sorry
>>
>> I can't reproduce it with reduce/aggregate/fold/apply and due to some
>> limitations in my working environment, I can't use flink 1.10 or 1.11.
>>
>> Congxian Qiu <qcx978132...@gmail.com> 于2020年7月5日周日 下午6:21写道:
>>
>>> Hi
>>>
>>> First, Could you please try this problem still there if use flink 1.10
>>> or 1.11?
>>>
>>> It seems strange, from the error message, here is an error when trying
>>> to convert a non-Window state(VoidNameSpace) to a Window State (serializer
>>> is the serializer of Window state, but the state is non-Window state).
>>> Could you please try to replace the MyFuction with a 
>>> reduce/aggregate/fold/apply()
>>> function to see what happens? -- this wants to narrow down the problem.
>>>
>>> Best,
>>> Congxian
>>>
>>>
>>> Si-li Liu <unix...@gmail.com> 于2020年7月3日周五 下午6:44写道:
>>>
>>>> Thanks for your help
>>>>
>>>> 1. I started the job from scratch, not a savepoint or externalized
>>>> checkpoint
>>>> 2. No job graph change
>>>> 3. env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>>>> 4. My Flink version is 1.9.1
>>>>
>>>> Khachatryan Roman <khachatryan.ro...@gmail.com> 于2020年7月3日周五 下午4:49写道:
>>>>
>>>>> I still wasn't able to reproduce the issue.
>>>>>
>>>>> Can you also clarify:
>>>>> - Are you starting the job from a savepoint or externalized
>>>>> checkpoint?
>>>>> - If yes, was the job graph changed?
>>>>> - What StreamTimeCharacteristic is set, if any?
>>>>> - What exact version of Flink do you use?
>>>>>
>>>>> Regards,
>>>>> Roman
>>>>>
>>>>>
>>>>> On Fri, Jul 3, 2020 at 6:38 AM Si-li Liu <unix...@gmail.com> wrote:
>>>>>
>>>>>> Hi, Thanks for your help.
>>>>>>
>>>>>> The checkpoint configuration is
>>>>>>
>>>>>> checkpoint.intervalMS=300000
>>>>>> checkpoint.timeoutMS=300000
>>>>>>
>>>>>> The error callstack is from JM's log, which happened in every cp.
>>>>>> Currently I don't have a success cp yet.
>>>>>>
>>>>>> Khachatryan Roman <khachatryan.ro...@gmail.com> 于2020年7月3日周五
>>>>>> 上午3:50写道:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> Thanks for the details.
>>>>>>> However, I was not able to reproduce the issue. I used parallelism
>>>>>>> levels 4, file system backend and tried different timings for
>>>>>>> checkpointing, windowing and source.
>>>>>>> Do you encounter this problem deterministically, is it always 1st
>>>>>>> checkpoint?
>>>>>>> What checkpointing interval do you use?
>>>>>>>
>>>>>>> Regards,
>>>>>>> Roman
>>>>>>>
>>>>>>>
>>>>>>> On Thu, Jul 2, 2020 at 1:57 PM Si-li Liu <unix...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hi, this is our production code so I have to modify it a little
>>>>>>>> bit, such as variable name and function name. I think 3 classes I 
>>>>>>>> provide
>>>>>>>> here is enough.
>>>>>>>>
>>>>>>>> I try to join two streams, but I don't want to use the default join
>>>>>>>> function, because I want to send the joined log immediately and remove 
>>>>>>>> it
>>>>>>>> from window state immediately. And my window gap time is very long( 20
>>>>>>>> minutes), so it maybe evaluate it multiple times.
>>>>>>>>
>>>>>>>> class JoinFunction extends
>>>>>>>>   ProcessWindowFunction[RawLog, OutputLog, String, TimeWindow]{
>>>>>>>>
>>>>>>>>   var ueState: ValueState[RawLog] = _
>>>>>>>>   @transient var gZipThriftSerializer: GZipThriftSerializer[MyType] = _
>>>>>>>>   val invalidCounter = new LongCounter()
>>>>>>>>   val processCounter = new LongCounter()
>>>>>>>>   val sendToKafkaCounter = new LongCounter()
>>>>>>>>
>>>>>>>>   override def open(parameters: Configuration): Unit = {
>>>>>>>>     ueState = getRuntimeContext.getState(
>>>>>>>>       new ValueStateDescriptor[RawLog](bidState, classOf[RawLog])
>>>>>>>>     )
>>>>>>>>     gZipThriftSerializer = new GZipThriftSerializer[MyType]()
>>>>>>>>     getRuntimeContext.addAccumulator("processCounter", 
>>>>>>>> this.processCounter)
>>>>>>>>     getRuntimeContext.addAccumulator("invalidCounter", 
>>>>>>>> this.invalidCounter)
>>>>>>>>     getRuntimeContext.addAccumulator("sendToKafkaCounter", 
>>>>>>>> this.sendToKafkaCounter)
>>>>>>>>   }
>>>>>>>>
>>>>>>>>   override def process(key: String,
>>>>>>>>                        ctx: Context,
>>>>>>>>                        logs: Iterable[RawLog],
>>>>>>>>                        out: Collector[OutputLog]): Unit = {
>>>>>>>>     if (ueState.value() != null) {
>>>>>>>>       processCounter.add(1L)
>>>>>>>>       val bid = ueState.value()
>>>>>>>>       val bidLog = 
>>>>>>>> gZipThriftSerializer.decompressAndDeserialize(bid.payload, 
>>>>>>>> classOf[MyType])
>>>>>>>>       logs.foreach( log => {
>>>>>>>>         if (log.eventType == SHOW) {
>>>>>>>>           val showLog = 
>>>>>>>> gZipThriftSerializer.decompressAndDeserialize(log.payload, 
>>>>>>>> classOf[MyType])
>>>>>>>>           sendToKafkaCounter.add(1L)
>>>>>>>>           out.collect(new OutputLog(ThriftUtils.serialize(showLog), 
>>>>>>>> Utils.getOutputTopic(showLog)))
>>>>>>>>         }
>>>>>>>>       })
>>>>>>>>     } else {
>>>>>>>>       invalidCounter.add(1L)
>>>>>>>>     }
>>>>>>>>   }
>>>>>>>> }
>>>>>>>>
>>>>>>>> class JoinTrigger extends Trigger[RawLog, TimeWindow] {
>>>>>>>>
>>>>>>>>   override def onElement(log: RawLog,
>>>>>>>>                          timestamp: Long,
>>>>>>>>                          window: TimeWindow,
>>>>>>>>                          ctx: Trigger.TriggerContext): TriggerResult = 
>>>>>>>> {
>>>>>>>>     val ueState: ValueState[RawLog] = ctx.getPartitionedState(
>>>>>>>>       new ValueStateDescriptor[RawLog](bidState, classOf[RawLog])
>>>>>>>>     )
>>>>>>>>     val firstSeen: ValueState[Boolean] = ctx.getPartitionedState(
>>>>>>>>       new ValueStateDescriptor[Boolean](initState, classOf[Boolean]))
>>>>>>>>
>>>>>>>>     if (!firstSeen.value()) {
>>>>>>>>       ctx.registerEventTimeTimer(window.getEnd)
>>>>>>>>       firstSeen.update(true)
>>>>>>>>     }
>>>>>>>>     val eventType = log.eventType
>>>>>>>>     if (eventType == BID) {
>>>>>>>>       ueState.update(log)
>>>>>>>>       TriggerResult.CONTINUE
>>>>>>>>     } else {
>>>>>>>>       if (ueState.value() == null) {
>>>>>>>>         TriggerResult.CONTINUE
>>>>>>>>       } else {
>>>>>>>>         TriggerResult.FIRE
>>>>>>>>       }
>>>>>>>>     }
>>>>>>>>   }
>>>>>>>>
>>>>>>>>   override def onEventTime(timestamp: Long,
>>>>>>>>                            window: TimeWindow,
>>>>>>>>                            ctx: Trigger.TriggerContext): TriggerResult 
>>>>>>>> = {
>>>>>>>>     if (timestamp == window.getEnd) {
>>>>>>>>       TriggerResult.PURGE
>>>>>>>>     }
>>>>>>>>     TriggerResult.CONTINUE
>>>>>>>>   }
>>>>>>>>
>>>>>>>>   override def onProcessingTime(timestamp: Long,
>>>>>>>>                                 window: TimeWindow,
>>>>>>>>                                 ctx: Trigger.TriggerContext): 
>>>>>>>> TriggerResult = {
>>>>>>>>     TriggerResult.CONTINUE
>>>>>>>>   }
>>>>>>>>
>>>>>>>>   override def clear(window: TimeWindow,
>>>>>>>>                      ctx: Trigger.TriggerContext): Unit = {
>>>>>>>>     val ueState: ValueState[RawLog] = ctx.getPartitionedState(
>>>>>>>>       new ValueStateDescriptor[RawLog](bidState, classOf[RawLog])
>>>>>>>>     )
>>>>>>>>     ueState.clear()
>>>>>>>>
>>>>>>>>     val firstSeen: ValueState[Boolean] = ctx.getPartitionedState(
>>>>>>>>       new ValueStateDescriptor[Boolean](initState, classOf[Boolean]))
>>>>>>>>     firstSeen.clear()
>>>>>>>>
>>>>>>>>     ctx.deleteEventTimeTimer(window.getEnd)
>>>>>>>>   }
>>>>>>>> }
>>>>>>>>
>>>>>>>> class JoinEvictor extends Evictor[RawLog, TimeWindow] {
>>>>>>>>
>>>>>>>>   override def evictBefore(elements: 
>>>>>>>> JIterable[TimestampedValue[RawLog]],
>>>>>>>>                            size: Int,
>>>>>>>>                            window: TimeWindow,
>>>>>>>>                            evictorContext: Evictor.EvictorContext): 
>>>>>>>> Unit = {}
>>>>>>>>
>>>>>>>>   override def evictAfter(elements: 
>>>>>>>> JIterable[TimestampedValue[RawLog]],
>>>>>>>>                            size: Int,
>>>>>>>>                            window: TimeWindow,
>>>>>>>>                            evictorContext: Evictor.EvictorContext): 
>>>>>>>> Unit = {
>>>>>>>>     val iter = elements.iterator()
>>>>>>>>     while (iter.hasNext) {
>>>>>>>>       iter.next()
>>>>>>>>       iter.remove()
>>>>>>>>     }
>>>>>>>>   }
>>>>>>>> }
>>>>>>>>
>>>>>>>>
>>>>>>>> Khachatryan Roman <khachatryan.ro...@gmail.com> 于2020年7月2日周四
>>>>>>>> 下午7:18写道:
>>>>>>>>
>>>>>>>>> Thanks for the clarification.
>>>>>>>>>
>>>>>>>>> Can you also share the code of other parts, particularly
>>>>>>>>> MyFunction?
>>>>>>>>>
>>>>>>>>> Regards,
>>>>>>>>> Roman
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Thu, Jul 2, 2020 at 12:49 PM Si-li Liu <unix...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Rocksdb backend has the same problem
>>>>>>>>>>
>>>>>>>>>> Khachatryan Roman <khachatryan.ro...@gmail.com> 于2020年7月2日周四
>>>>>>>>>> 下午6:11写道:
>>>>>>>>>>
>>>>>>>>>>> Thanks for reporting this.
>>>>>>>>>>>
>>>>>>>>>>> Looks like the window namespace was replaced by VoidNamespace in
>>>>>>>>>>> state entry.
>>>>>>>>>>> I've created https://issues.apache.org/jira/browse/FLINK-18464 to
>>>>>>>>>>> further investigate it.
>>>>>>>>>>>
>>>>>>>>>>> Regards,
>>>>>>>>>>> Roman
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Thu, Jul 2, 2020 at 10:52 AM Si-li Liu <unix...@gmail.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> I'm using flink 1.9 on Mesos and I try to use my own trigger
>>>>>>>>>>>> and evictor. The state is stored to memory.
>>>>>>>>>>>>
>>>>>>>>>>>> input.setParallelism(processParallelism)
>>>>>>>>>>>>         .assignTimestampsAndWatermarks(new UETimeAssigner)
>>>>>>>>>>>>         .keyBy(_.key)
>>>>>>>>>>>>         .window(TumblingEventTimeWindows.of(Time.minutes(20)))
>>>>>>>>>>>>         .trigger(new MyTrigger)
>>>>>>>>>>>>         .evictor(new MyEvictor)
>>>>>>>>>>>>         .process(new
>>>>>>>>>>>> MyFunction).setParallelism(aggregateParallelism)
>>>>>>>>>>>>         .addSink(kafkaSink).setParallelism(sinkParallelism)
>>>>>>>>>>>>         .name("kafka-record-sink")
>>>>>>>>>>>>
>>>>>>>>>>>> And the exception stack is here, could anyone help with this?
>>>>>>>>>>>> Thanks!
>>>>>>>>>>>>
>>>>>>>>>>>> java.lang.Exception: Could not materialize checkpoint 1 for
>>>>>>>>>>>> operator Window(TumblingEventTimeWindows(1200000), JoinTrigger,
>>>>>>>>>>>> JoinEvictor, ScalaProcessWindowFunctionWrapper) -> Sink:
>>>>>>>>>>>> kafka-record-sink (2/5).
>>>>>>>>>>>>     at org.apache.flink.streaming.runtime.tasks.
>>>>>>>>>>>> StreamTask$AsyncCheckpointRunnable.handleExecutionException(
>>>>>>>>>>>> StreamTask.java:1100)
>>>>>>>>>>>>     at org.apache.flink.streaming.runtime.tasks.
>>>>>>>>>>>> StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1042)
>>>>>>>>>>>>     at java.util.concurrent.ThreadPoolExecutor.runWorker(
>>>>>>>>>>>> ThreadPoolExecutor.java:1149)
>>>>>>>>>>>>     at java.util.concurrent.ThreadPoolExecutor$Worker.run(
>>>>>>>>>>>> ThreadPoolExecutor.java:624)
>>>>>>>>>>>>     at java.lang.Thread.run(Thread.java:748)
>>>>>>>>>>>> Caused by: java.util.concurrent.ExecutionException: java.lang.
>>>>>>>>>>>> ClassCastException:
>>>>>>>>>>>> org.apache.flink.streaming.api.windowing.windows.TimeWindow
>>>>>>>>>>>> cannot be cast to org.apache.flink.runtime.state.VoidNamespace
>>>>>>>>>>>>     at java.util.concurrent.FutureTask.report(FutureTask.java:
>>>>>>>>>>>> 122)
>>>>>>>>>>>>     at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>>>>>>>>>>>>     at org.apache.flink.runtime.concurrent.FutureUtils
>>>>>>>>>>>> .runIfNotDoneAndGet(FutureUtils.java:450)
>>>>>>>>>>>>     at org.apache.flink.streaming.api.operators.
>>>>>>>>>>>> OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer
>>>>>>>>>>>> .java:47)
>>>>>>>>>>>>     at org.apache.flink.streaming.runtime.tasks.
>>>>>>>>>>>> StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1011)
>>>>>>>>>>>>     ... 3 more
>>>>>>>>>>>> Caused by: java.lang.ClassCastException:
>>>>>>>>>>>> org.apache.flink.streaming.api.windowing.windows.TimeWindow
>>>>>>>>>>>> cannot be cast to org.apache.flink.runtime.state.VoidNamespace
>>>>>>>>>>>>     at org.apache.flink.runtime.state.VoidNamespaceSerializer
>>>>>>>>>>>> .serialize(VoidNamespaceSerializer.java:32)
>>>>>>>>>>>>     at org.apache.flink.runtime.state.heap.
>>>>>>>>>>>> CopyOnWriteStateMapSnapshot.writeState(
>>>>>>>>>>>> CopyOnWriteStateMapSnapshot.java:114)
>>>>>>>>>>>>     at org.apache.flink.runtime.state.heap.
>>>>>>>>>>>> AbstractStateTableSnapshot.writeStateInKeyGroup(
>>>>>>>>>>>> AbstractStateTableSnapshot.java:121)
>>>>>>>>>>>>     at org.apache.flink.runtime.state.heap.
>>>>>>>>>>>> CopyOnWriteStateTableSnapshot.writeStateInKeyGroup(
>>>>>>>>>>>> CopyOnWriteStateTableSnapshot.java:37)
>>>>>>>>>>>>     at org.apache.flink.runtime.state.heap.
>>>>>>>>>>>> HeapSnapshotStrategy$1.callInternal(HeapSnapshotStrategy.java:
>>>>>>>>>>>> 191)
>>>>>>>>>>>>     at org.apache.flink.runtime.state.heap.
>>>>>>>>>>>> HeapSnapshotStrategy$1.callInternal(HeapSnapshotStrategy.java:
>>>>>>>>>>>> 158)
>>>>>>>>>>>>     at org.apache.flink.runtime.state.AsyncSnapshotCallable
>>>>>>>>>>>> .call(AsyncSnapshotCallable.java:75)
>>>>>>>>>>>>     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>>>>>>>>>>     at org.apache.flink.runtime.concurrent.FutureUtils
>>>>>>>>>>>> .runIfNotDoneAndGet(FutureUtils.java:447)
>>>>>>>>>>>>     ... 5 more
>>>>>>>>>>>>
>>>>>>>>>>>> --
>>>>>>>>>>>> Best regards
>>>>>>>>>>>>
>>>>>>>>>>>> Sili Liu
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>> Best regards
>>>>>>>>>>
>>>>>>>>>> Sili Liu
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> Best regards
>>>>>>>>
>>>>>>>> Sili Liu
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>> --
>>>>>> Best regards
>>>>>>
>>>>>> Sili Liu
>>>>>>
>>>>>
>>>>
>>>> --
>>>> Best regards
>>>>
>>>> Sili Liu
>>>>
>>>
>>
>> --
>> Best regards
>>
>> Sili Liu
>>
>

-- 
Best regards

Sili Liu

Reply via email to