Hi First, Could you please try this problem still there if use flink 1.10 or 1.11?
It seems strange, from the error message, here is an error when trying to convert a non-Window state(VoidNameSpace) to a Window State (serializer is the serializer of Window state, but the state is non-Window state). Could you please try to replace the MyFuction with a reduce/aggregate/fold/apply() function to see what happens? -- this wants to narrow down the problem. Best, Congxian Si-li Liu <unix...@gmail.com> 于2020年7月3日周五 下午6:44写道: > Thanks for your help > > 1. I started the job from scratch, not a savepoint or externalized > checkpoint > 2. No job graph change > 3. env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) > 4. My Flink version is 1.9.1 > > Khachatryan Roman <khachatryan.ro...@gmail.com> 于2020年7月3日周五 下午4:49写道: > >> I still wasn't able to reproduce the issue. >> >> Can you also clarify: >> - Are you starting the job from a savepoint or externalized checkpoint? >> - If yes, was the job graph changed? >> - What StreamTimeCharacteristic is set, if any? >> - What exact version of Flink do you use? >> >> Regards, >> Roman >> >> >> On Fri, Jul 3, 2020 at 6:38 AM Si-li Liu <unix...@gmail.com> wrote: >> >>> Hi, Thanks for your help. >>> >>> The checkpoint configuration is >>> >>> checkpoint.intervalMS=300000 >>> checkpoint.timeoutMS=300000 >>> >>> The error callstack is from JM's log, which happened in every cp. >>> Currently I don't have a success cp yet. >>> >>> Khachatryan Roman <khachatryan.ro...@gmail.com> 于2020年7月3日周五 上午3:50写道: >>> >>>> Hi, >>>> >>>> Thanks for the details. >>>> However, I was not able to reproduce the issue. I used parallelism >>>> levels 4, file system backend and tried different timings for >>>> checkpointing, windowing and source. >>>> Do you encounter this problem deterministically, is it always 1st >>>> checkpoint? >>>> What checkpointing interval do you use? >>>> >>>> Regards, >>>> Roman >>>> >>>> >>>> On Thu, Jul 2, 2020 at 1:57 PM Si-li Liu <unix...@gmail.com> wrote: >>>> >>>>> Hi, this is our production code so I have to modify it a little bit, >>>>> such as variable name and function name. I think 3 classes I provide here >>>>> is enough. >>>>> >>>>> I try to join two streams, but I don't want to use the default join >>>>> function, because I want to send the joined log immediately and remove it >>>>> from window state immediately. And my window gap time is very long( 20 >>>>> minutes), so it maybe evaluate it multiple times. >>>>> >>>>> class JoinFunction extends >>>>> ProcessWindowFunction[RawLog, OutputLog, String, TimeWindow]{ >>>>> >>>>> var ueState: ValueState[RawLog] = _ >>>>> @transient var gZipThriftSerializer: GZipThriftSerializer[MyType] = _ >>>>> val invalidCounter = new LongCounter() >>>>> val processCounter = new LongCounter() >>>>> val sendToKafkaCounter = new LongCounter() >>>>> >>>>> override def open(parameters: Configuration): Unit = { >>>>> ueState = getRuntimeContext.getState( >>>>> new ValueStateDescriptor[RawLog](bidState, classOf[RawLog]) >>>>> ) >>>>> gZipThriftSerializer = new GZipThriftSerializer[MyType]() >>>>> getRuntimeContext.addAccumulator("processCounter", >>>>> this.processCounter) >>>>> getRuntimeContext.addAccumulator("invalidCounter", >>>>> this.invalidCounter) >>>>> getRuntimeContext.addAccumulator("sendToKafkaCounter", >>>>> this.sendToKafkaCounter) >>>>> } >>>>> >>>>> override def process(key: String, >>>>> ctx: Context, >>>>> logs: Iterable[RawLog], >>>>> out: Collector[OutputLog]): Unit = { >>>>> if (ueState.value() != null) { >>>>> processCounter.add(1L) >>>>> val bid = ueState.value() >>>>> val bidLog = >>>>> gZipThriftSerializer.decompressAndDeserialize(bid.payload, >>>>> classOf[MyType]) >>>>> logs.foreach( log => { >>>>> if (log.eventType == SHOW) { >>>>> val showLog = >>>>> gZipThriftSerializer.decompressAndDeserialize(log.payload, >>>>> classOf[MyType]) >>>>> sendToKafkaCounter.add(1L) >>>>> out.collect(new OutputLog(ThriftUtils.serialize(showLog), >>>>> Utils.getOutputTopic(showLog))) >>>>> } >>>>> }) >>>>> } else { >>>>> invalidCounter.add(1L) >>>>> } >>>>> } >>>>> } >>>>> >>>>> class JoinTrigger extends Trigger[RawLog, TimeWindow] { >>>>> >>>>> override def onElement(log: RawLog, >>>>> timestamp: Long, >>>>> window: TimeWindow, >>>>> ctx: Trigger.TriggerContext): TriggerResult = { >>>>> val ueState: ValueState[RawLog] = ctx.getPartitionedState( >>>>> new ValueStateDescriptor[RawLog](bidState, classOf[RawLog]) >>>>> ) >>>>> val firstSeen: ValueState[Boolean] = ctx.getPartitionedState( >>>>> new ValueStateDescriptor[Boolean](initState, classOf[Boolean])) >>>>> >>>>> if (!firstSeen.value()) { >>>>> ctx.registerEventTimeTimer(window.getEnd) >>>>> firstSeen.update(true) >>>>> } >>>>> val eventType = log.eventType >>>>> if (eventType == BID) { >>>>> ueState.update(log) >>>>> TriggerResult.CONTINUE >>>>> } else { >>>>> if (ueState.value() == null) { >>>>> TriggerResult.CONTINUE >>>>> } else { >>>>> TriggerResult.FIRE >>>>> } >>>>> } >>>>> } >>>>> >>>>> override def onEventTime(timestamp: Long, >>>>> window: TimeWindow, >>>>> ctx: Trigger.TriggerContext): TriggerResult = { >>>>> if (timestamp == window.getEnd) { >>>>> TriggerResult.PURGE >>>>> } >>>>> TriggerResult.CONTINUE >>>>> } >>>>> >>>>> override def onProcessingTime(timestamp: Long, >>>>> window: TimeWindow, >>>>> ctx: Trigger.TriggerContext): >>>>> TriggerResult = { >>>>> TriggerResult.CONTINUE >>>>> } >>>>> >>>>> override def clear(window: TimeWindow, >>>>> ctx: Trigger.TriggerContext): Unit = { >>>>> val ueState: ValueState[RawLog] = ctx.getPartitionedState( >>>>> new ValueStateDescriptor[RawLog](bidState, classOf[RawLog]) >>>>> ) >>>>> ueState.clear() >>>>> >>>>> val firstSeen: ValueState[Boolean] = ctx.getPartitionedState( >>>>> new ValueStateDescriptor[Boolean](initState, classOf[Boolean])) >>>>> firstSeen.clear() >>>>> >>>>> ctx.deleteEventTimeTimer(window.getEnd) >>>>> } >>>>> } >>>>> >>>>> class JoinEvictor extends Evictor[RawLog, TimeWindow] { >>>>> >>>>> override def evictBefore(elements: JIterable[TimestampedValue[RawLog]], >>>>> size: Int, >>>>> window: TimeWindow, >>>>> evictorContext: Evictor.EvictorContext): Unit >>>>> = {} >>>>> >>>>> override def evictAfter(elements: JIterable[TimestampedValue[RawLog]], >>>>> size: Int, >>>>> window: TimeWindow, >>>>> evictorContext: Evictor.EvictorContext): Unit >>>>> = { >>>>> val iter = elements.iterator() >>>>> while (iter.hasNext) { >>>>> iter.next() >>>>> iter.remove() >>>>> } >>>>> } >>>>> } >>>>> >>>>> >>>>> Khachatryan Roman <khachatryan.ro...@gmail.com> 于2020年7月2日周四 下午7:18写道: >>>>> >>>>>> Thanks for the clarification. >>>>>> >>>>>> Can you also share the code of other parts, particularly MyFunction? >>>>>> >>>>>> Regards, >>>>>> Roman >>>>>> >>>>>> >>>>>> On Thu, Jul 2, 2020 at 12:49 PM Si-li Liu <unix...@gmail.com> wrote: >>>>>> >>>>>>> Rocksdb backend has the same problem >>>>>>> >>>>>>> Khachatryan Roman <khachatryan.ro...@gmail.com> 于2020年7月2日周四 >>>>>>> 下午6:11写道: >>>>>>> >>>>>>>> Thanks for reporting this. >>>>>>>> >>>>>>>> Looks like the window namespace was replaced by VoidNamespace in >>>>>>>> state entry. >>>>>>>> I've created https://issues.apache.org/jira/browse/FLINK-18464 to >>>>>>>> further investigate it. >>>>>>>> >>>>>>>> Regards, >>>>>>>> Roman >>>>>>>> >>>>>>>> >>>>>>>> On Thu, Jul 2, 2020 at 10:52 AM Si-li Liu <unix...@gmail.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> I'm using flink 1.9 on Mesos and I try to use my own trigger and >>>>>>>>> evictor. The state is stored to memory. >>>>>>>>> >>>>>>>>> input.setParallelism(processParallelism) >>>>>>>>> .assignTimestampsAndWatermarks(new UETimeAssigner) >>>>>>>>> .keyBy(_.key) >>>>>>>>> .window(TumblingEventTimeWindows.of(Time.minutes(20))) >>>>>>>>> .trigger(new MyTrigger) >>>>>>>>> .evictor(new MyEvictor) >>>>>>>>> .process(new >>>>>>>>> MyFunction).setParallelism(aggregateParallelism) >>>>>>>>> .addSink(kafkaSink).setParallelism(sinkParallelism) >>>>>>>>> .name("kafka-record-sink") >>>>>>>>> >>>>>>>>> And the exception stack is here, could anyone help with this? >>>>>>>>> Thanks! >>>>>>>>> >>>>>>>>> java.lang.Exception: Could not materialize checkpoint 1 for >>>>>>>>> operator Window(TumblingEventTimeWindows(1200000), JoinTrigger, >>>>>>>>> JoinEvictor, ScalaProcessWindowFunctionWrapper) -> Sink: >>>>>>>>> kafka-record-sink (2/5). >>>>>>>>> at org.apache.flink.streaming.runtime.tasks. >>>>>>>>> StreamTask$AsyncCheckpointRunnable.handleExecutionException( >>>>>>>>> StreamTask.java:1100) >>>>>>>>> at org.apache.flink.streaming.runtime.tasks. >>>>>>>>> StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1042) >>>>>>>>> at java.util.concurrent.ThreadPoolExecutor.runWorker( >>>>>>>>> ThreadPoolExecutor.java:1149) >>>>>>>>> at java.util.concurrent.ThreadPoolExecutor$Worker.run( >>>>>>>>> ThreadPoolExecutor.java:624) >>>>>>>>> at java.lang.Thread.run(Thread.java:748) >>>>>>>>> Caused by: java.util.concurrent.ExecutionException: java.lang. >>>>>>>>> ClassCastException: >>>>>>>>> org.apache.flink.streaming.api.windowing.windows.TimeWindow >>>>>>>>> cannot be cast to org.apache.flink.runtime.state.VoidNamespace >>>>>>>>> at java.util.concurrent.FutureTask.report(FutureTask.java:122) >>>>>>>>> at java.util.concurrent.FutureTask.get(FutureTask.java:192) >>>>>>>>> at org.apache.flink.runtime.concurrent.FutureUtils >>>>>>>>> .runIfNotDoneAndGet(FutureUtils.java:450) >>>>>>>>> at org.apache.flink.streaming.api.operators. >>>>>>>>> OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:47 >>>>>>>>> ) >>>>>>>>> at org.apache.flink.streaming.runtime.tasks. >>>>>>>>> StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1011) >>>>>>>>> ... 3 more >>>>>>>>> Caused by: java.lang.ClassCastException: >>>>>>>>> org.apache.flink.streaming.api.windowing.windows.TimeWindow >>>>>>>>> cannot be cast to org.apache.flink.runtime.state.VoidNamespace >>>>>>>>> at org.apache.flink.runtime.state.VoidNamespaceSerializer >>>>>>>>> .serialize(VoidNamespaceSerializer.java:32) >>>>>>>>> at org.apache.flink.runtime.state.heap. >>>>>>>>> CopyOnWriteStateMapSnapshot.writeState(CopyOnWriteStateMapSnapshot >>>>>>>>> .java:114) >>>>>>>>> at org.apache.flink.runtime.state.heap. >>>>>>>>> AbstractStateTableSnapshot.writeStateInKeyGroup( >>>>>>>>> AbstractStateTableSnapshot.java:121) >>>>>>>>> at org.apache.flink.runtime.state.heap. >>>>>>>>> CopyOnWriteStateTableSnapshot.writeStateInKeyGroup( >>>>>>>>> CopyOnWriteStateTableSnapshot.java:37) >>>>>>>>> at org.apache.flink.runtime.state.heap.HeapSnapshotStrategy$1 >>>>>>>>> .callInternal(HeapSnapshotStrategy.java:191) >>>>>>>>> at org.apache.flink.runtime.state.heap.HeapSnapshotStrategy$1 >>>>>>>>> .callInternal(HeapSnapshotStrategy.java:158) >>>>>>>>> at org.apache.flink.runtime.state.AsyncSnapshotCallable.call( >>>>>>>>> AsyncSnapshotCallable.java:75) >>>>>>>>> at java.util.concurrent.FutureTask.run(FutureTask.java:266) >>>>>>>>> at org.apache.flink.runtime.concurrent.FutureUtils >>>>>>>>> .runIfNotDoneAndGet(FutureUtils.java:447) >>>>>>>>> ... 5 more >>>>>>>>> >>>>>>>>> -- >>>>>>>>> Best regards >>>>>>>>> >>>>>>>>> Sili Liu >>>>>>>>> >>>>>>>> >>>>>>> >>>>>>> -- >>>>>>> Best regards >>>>>>> >>>>>>> Sili Liu >>>>>>> >>>>>> >>>>> >>>>> -- >>>>> Best regards >>>>> >>>>> Sili Liu >>>>> >>>> >>> >>> -- >>> Best regards >>> >>> Sili Liu >>> >> > > -- > Best regards > > Sili Liu >