I still wasn't able to reproduce the issue. Can you also clarify: - Are you starting the job from a savepoint or externalized checkpoint? - If yes, was the job graph changed? - What StreamTimeCharacteristic is set, if any? - What exact version of Flink do you use?
Regards, Roman On Fri, Jul 3, 2020 at 6:38 AM Si-li Liu <unix...@gmail.com> wrote: > Hi, Thanks for your help. > > The checkpoint configuration is > > checkpoint.intervalMS=300000 > checkpoint.timeoutMS=300000 > > The error callstack is from JM's log, which happened in every cp. > Currently I don't have a success cp yet. > > Khachatryan Roman <khachatryan.ro...@gmail.com> 于2020年7月3日周五 上午3:50写道: > >> Hi, >> >> Thanks for the details. >> However, I was not able to reproduce the issue. I used parallelism levels >> 4, file system backend and tried different timings for >> checkpointing, windowing and source. >> Do you encounter this problem deterministically, is it always 1st >> checkpoint? >> What checkpointing interval do you use? >> >> Regards, >> Roman >> >> >> On Thu, Jul 2, 2020 at 1:57 PM Si-li Liu <unix...@gmail.com> wrote: >> >>> Hi, this is our production code so I have to modify it a little bit, >>> such as variable name and function name. I think 3 classes I provide here >>> is enough. >>> >>> I try to join two streams, but I don't want to use the default join >>> function, because I want to send the joined log immediately and remove it >>> from window state immediately. And my window gap time is very long( 20 >>> minutes), so it maybe evaluate it multiple times. >>> >>> class JoinFunction extends >>> ProcessWindowFunction[RawLog, OutputLog, String, TimeWindow]{ >>> >>> var ueState: ValueState[RawLog] = _ >>> @transient var gZipThriftSerializer: GZipThriftSerializer[MyType] = _ >>> val invalidCounter = new LongCounter() >>> val processCounter = new LongCounter() >>> val sendToKafkaCounter = new LongCounter() >>> >>> override def open(parameters: Configuration): Unit = { >>> ueState = getRuntimeContext.getState( >>> new ValueStateDescriptor[RawLog](bidState, classOf[RawLog]) >>> ) >>> gZipThriftSerializer = new GZipThriftSerializer[MyType]() >>> getRuntimeContext.addAccumulator("processCounter", this.processCounter) >>> getRuntimeContext.addAccumulator("invalidCounter", this.invalidCounter) >>> getRuntimeContext.addAccumulator("sendToKafkaCounter", >>> this.sendToKafkaCounter) >>> } >>> >>> override def process(key: String, >>> ctx: Context, >>> logs: Iterable[RawLog], >>> out: Collector[OutputLog]): Unit = { >>> if (ueState.value() != null) { >>> processCounter.add(1L) >>> val bid = ueState.value() >>> val bidLog = >>> gZipThriftSerializer.decompressAndDeserialize(bid.payload, classOf[MyType]) >>> logs.foreach( log => { >>> if (log.eventType == SHOW) { >>> val showLog = >>> gZipThriftSerializer.decompressAndDeserialize(log.payload, classOf[MyType]) >>> sendToKafkaCounter.add(1L) >>> out.collect(new OutputLog(ThriftUtils.serialize(showLog), >>> Utils.getOutputTopic(showLog))) >>> } >>> }) >>> } else { >>> invalidCounter.add(1L) >>> } >>> } >>> } >>> >>> class JoinTrigger extends Trigger[RawLog, TimeWindow] { >>> >>> override def onElement(log: RawLog, >>> timestamp: Long, >>> window: TimeWindow, >>> ctx: Trigger.TriggerContext): TriggerResult = { >>> val ueState: ValueState[RawLog] = ctx.getPartitionedState( >>> new ValueStateDescriptor[RawLog](bidState, classOf[RawLog]) >>> ) >>> val firstSeen: ValueState[Boolean] = ctx.getPartitionedState( >>> new ValueStateDescriptor[Boolean](initState, classOf[Boolean])) >>> >>> if (!firstSeen.value()) { >>> ctx.registerEventTimeTimer(window.getEnd) >>> firstSeen.update(true) >>> } >>> val eventType = log.eventType >>> if (eventType == BID) { >>> ueState.update(log) >>> TriggerResult.CONTINUE >>> } else { >>> if (ueState.value() == null) { >>> TriggerResult.CONTINUE >>> } else { >>> TriggerResult.FIRE >>> } >>> } >>> } >>> >>> override def onEventTime(timestamp: Long, >>> window: TimeWindow, >>> ctx: Trigger.TriggerContext): TriggerResult = { >>> if (timestamp == window.getEnd) { >>> TriggerResult.PURGE >>> } >>> TriggerResult.CONTINUE >>> } >>> >>> override def onProcessingTime(timestamp: Long, >>> window: TimeWindow, >>> ctx: Trigger.TriggerContext): TriggerResult >>> = { >>> TriggerResult.CONTINUE >>> } >>> >>> override def clear(window: TimeWindow, >>> ctx: Trigger.TriggerContext): Unit = { >>> val ueState: ValueState[RawLog] = ctx.getPartitionedState( >>> new ValueStateDescriptor[RawLog](bidState, classOf[RawLog]) >>> ) >>> ueState.clear() >>> >>> val firstSeen: ValueState[Boolean] = ctx.getPartitionedState( >>> new ValueStateDescriptor[Boolean](initState, classOf[Boolean])) >>> firstSeen.clear() >>> >>> ctx.deleteEventTimeTimer(window.getEnd) >>> } >>> } >>> >>> class JoinEvictor extends Evictor[RawLog, TimeWindow] { >>> >>> override def evictBefore(elements: JIterable[TimestampedValue[RawLog]], >>> size: Int, >>> window: TimeWindow, >>> evictorContext: Evictor.EvictorContext): Unit = >>> {} >>> >>> override def evictAfter(elements: JIterable[TimestampedValue[RawLog]], >>> size: Int, >>> window: TimeWindow, >>> evictorContext: Evictor.EvictorContext): Unit = { >>> val iter = elements.iterator() >>> while (iter.hasNext) { >>> iter.next() >>> iter.remove() >>> } >>> } >>> } >>> >>> >>> Khachatryan Roman <khachatryan.ro...@gmail.com> 于2020年7月2日周四 下午7:18写道: >>> >>>> Thanks for the clarification. >>>> >>>> Can you also share the code of other parts, particularly MyFunction? >>>> >>>> Regards, >>>> Roman >>>> >>>> >>>> On Thu, Jul 2, 2020 at 12:49 PM Si-li Liu <unix...@gmail.com> wrote: >>>> >>>>> Rocksdb backend has the same problem >>>>> >>>>> Khachatryan Roman <khachatryan.ro...@gmail.com> 于2020年7月2日周四 下午6:11写道: >>>>> >>>>>> Thanks for reporting this. >>>>>> >>>>>> Looks like the window namespace was replaced by VoidNamespace in >>>>>> state entry. >>>>>> I've created https://issues.apache.org/jira/browse/FLINK-18464 to >>>>>> further investigate it. >>>>>> >>>>>> Regards, >>>>>> Roman >>>>>> >>>>>> >>>>>> On Thu, Jul 2, 2020 at 10:52 AM Si-li Liu <unix...@gmail.com> wrote: >>>>>> >>>>>>> I'm using flink 1.9 on Mesos and I try to use my own trigger and >>>>>>> evictor. The state is stored to memory. >>>>>>> >>>>>>> input.setParallelism(processParallelism) >>>>>>> .assignTimestampsAndWatermarks(new UETimeAssigner) >>>>>>> .keyBy(_.key) >>>>>>> .window(TumblingEventTimeWindows.of(Time.minutes(20))) >>>>>>> .trigger(new MyTrigger) >>>>>>> .evictor(new MyEvictor) >>>>>>> .process(new MyFunction).setParallelism(aggregateParallelism) >>>>>>> .addSink(kafkaSink).setParallelism(sinkParallelism) >>>>>>> .name("kafka-record-sink") >>>>>>> >>>>>>> And the exception stack is here, could anyone help with this? Thanks! >>>>>>> >>>>>>> java.lang.Exception: Could not materialize checkpoint 1 for >>>>>>> operator Window(TumblingEventTimeWindows(1200000), JoinTrigger, >>>>>>> JoinEvictor, ScalaProcessWindowFunctionWrapper) -> Sink: >>>>>>> kafka-record-sink (2/5). >>>>>>> at org.apache.flink.streaming.runtime.tasks. >>>>>>> StreamTask$AsyncCheckpointRunnable.handleExecutionException( >>>>>>> StreamTask.java:1100) >>>>>>> at org.apache.flink.streaming.runtime.tasks. >>>>>>> StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1042) >>>>>>> at java.util.concurrent.ThreadPoolExecutor.runWorker( >>>>>>> ThreadPoolExecutor.java:1149) >>>>>>> at java.util.concurrent.ThreadPoolExecutor$Worker.run( >>>>>>> ThreadPoolExecutor.java:624) >>>>>>> at java.lang.Thread.run(Thread.java:748) >>>>>>> Caused by: java.util.concurrent.ExecutionException: java.lang. >>>>>>> ClassCastException: >>>>>>> org.apache.flink.streaming.api.windowing.windows.TimeWindow cannot >>>>>>> be cast to org.apache.flink.runtime.state.VoidNamespace >>>>>>> at java.util.concurrent.FutureTask.report(FutureTask.java:122) >>>>>>> at java.util.concurrent.FutureTask.get(FutureTask.java:192) >>>>>>> at org.apache.flink.runtime.concurrent.FutureUtils >>>>>>> .runIfNotDoneAndGet(FutureUtils.java:450) >>>>>>> at org.apache.flink.streaming.api.operators. >>>>>>> OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:47) >>>>>>> at org.apache.flink.streaming.runtime.tasks. >>>>>>> StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1011) >>>>>>> ... 3 more >>>>>>> Caused by: java.lang.ClassCastException: >>>>>>> org.apache.flink.streaming.api.windowing.windows.TimeWindow cannot >>>>>>> be cast to org.apache.flink.runtime.state.VoidNamespace >>>>>>> at org.apache.flink.runtime.state.VoidNamespaceSerializer >>>>>>> .serialize(VoidNamespaceSerializer.java:32) >>>>>>> at org.apache.flink.runtime.state.heap. >>>>>>> CopyOnWriteStateMapSnapshot.writeState(CopyOnWriteStateMapSnapshot >>>>>>> .java:114) >>>>>>> at org.apache.flink.runtime.state.heap. >>>>>>> AbstractStateTableSnapshot.writeStateInKeyGroup( >>>>>>> AbstractStateTableSnapshot.java:121) >>>>>>> at org.apache.flink.runtime.state.heap. >>>>>>> CopyOnWriteStateTableSnapshot.writeStateInKeyGroup( >>>>>>> CopyOnWriteStateTableSnapshot.java:37) >>>>>>> at org.apache.flink.runtime.state.heap.HeapSnapshotStrategy$1 >>>>>>> .callInternal(HeapSnapshotStrategy.java:191) >>>>>>> at org.apache.flink.runtime.state.heap.HeapSnapshotStrategy$1 >>>>>>> .callInternal(HeapSnapshotStrategy.java:158) >>>>>>> at org.apache.flink.runtime.state.AsyncSnapshotCallable.call( >>>>>>> AsyncSnapshotCallable.java:75) >>>>>>> at java.util.concurrent.FutureTask.run(FutureTask.java:266) >>>>>>> at org.apache.flink.runtime.concurrent.FutureUtils >>>>>>> .runIfNotDoneAndGet(FutureUtils.java:447) >>>>>>> ... 5 more >>>>>>> >>>>>>> -- >>>>>>> Best regards >>>>>>> >>>>>>> Sili Liu >>>>>>> >>>>>> >>>>> >>>>> -- >>>>> Best regards >>>>> >>>>> Sili Liu >>>>> >>>> >>> >>> -- >>> Best regards >>> >>> Sili Liu >>> >> > > -- > Best regards > > Sili Liu >