Someone told me that maybe this issue is Mesos specific. I'm kind of a newbie in Flink, and I digged into the code but can not get a conclusion. Here I just wanna have a better JoinWindow that emits the result and delete it from the window state immediately when joined successfully, is there any other way? Thanks!
Congxian Qiu <qcx978132...@gmail.com> 于2020年7月11日周六 下午3:20写道: > Hi Si-li > > Thanks for the notice. > I just want to double-check is the original problem has been solved? As I > found that the created issue FLINK-18464 has been closed with reason "can > not reproduce". Am I missing something here? > > Best, > Congxian > > > Si-li Liu <unix...@gmail.com> 于2020年7月10日周五 下午6:06写道: > >> Sorry >> >> I can't reproduce it with reduce/aggregate/fold/apply and due to some >> limitations in my working environment, I can't use flink 1.10 or 1.11. >> >> Congxian Qiu <qcx978132...@gmail.com> 于2020年7月5日周日 下午6:21写道: >> >>> Hi >>> >>> First, Could you please try this problem still there if use flink 1.10 >>> or 1.11? >>> >>> It seems strange, from the error message, here is an error when trying >>> to convert a non-Window state(VoidNameSpace) to a Window State (serializer >>> is the serializer of Window state, but the state is non-Window state). >>> Could you please try to replace the MyFuction with a >>> reduce/aggregate/fold/apply() >>> function to see what happens? -- this wants to narrow down the problem. >>> >>> Best, >>> Congxian >>> >>> >>> Si-li Liu <unix...@gmail.com> 于2020年7月3日周五 下午6:44写道: >>> >>>> Thanks for your help >>>> >>>> 1. I started the job from scratch, not a savepoint or externalized >>>> checkpoint >>>> 2. No job graph change >>>> 3. env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) >>>> 4. My Flink version is 1.9.1 >>>> >>>> Khachatryan Roman <khachatryan.ro...@gmail.com> 于2020年7月3日周五 下午4:49写道: >>>> >>>>> I still wasn't able to reproduce the issue. >>>>> >>>>> Can you also clarify: >>>>> - Are you starting the job from a savepoint or externalized >>>>> checkpoint? >>>>> - If yes, was the job graph changed? >>>>> - What StreamTimeCharacteristic is set, if any? >>>>> - What exact version of Flink do you use? >>>>> >>>>> Regards, >>>>> Roman >>>>> >>>>> >>>>> On Fri, Jul 3, 2020 at 6:38 AM Si-li Liu <unix...@gmail.com> wrote: >>>>> >>>>>> Hi, Thanks for your help. >>>>>> >>>>>> The checkpoint configuration is >>>>>> >>>>>> checkpoint.intervalMS=300000 >>>>>> checkpoint.timeoutMS=300000 >>>>>> >>>>>> The error callstack is from JM's log, which happened in every cp. >>>>>> Currently I don't have a success cp yet. >>>>>> >>>>>> Khachatryan Roman <khachatryan.ro...@gmail.com> 于2020年7月3日周五 >>>>>> 上午3:50写道: >>>>>> >>>>>>> Hi, >>>>>>> >>>>>>> Thanks for the details. >>>>>>> However, I was not able to reproduce the issue. I used parallelism >>>>>>> levels 4, file system backend and tried different timings for >>>>>>> checkpointing, windowing and source. >>>>>>> Do you encounter this problem deterministically, is it always 1st >>>>>>> checkpoint? >>>>>>> What checkpointing interval do you use? >>>>>>> >>>>>>> Regards, >>>>>>> Roman >>>>>>> >>>>>>> >>>>>>> On Thu, Jul 2, 2020 at 1:57 PM Si-li Liu <unix...@gmail.com> wrote: >>>>>>> >>>>>>>> Hi, this is our production code so I have to modify it a little >>>>>>>> bit, such as variable name and function name. I think 3 classes I >>>>>>>> provide >>>>>>>> here is enough. >>>>>>>> >>>>>>>> I try to join two streams, but I don't want to use the default join >>>>>>>> function, because I want to send the joined log immediately and remove >>>>>>>> it >>>>>>>> from window state immediately. And my window gap time is very long( 20 >>>>>>>> minutes), so it maybe evaluate it multiple times. >>>>>>>> >>>>>>>> class JoinFunction extends >>>>>>>> ProcessWindowFunction[RawLog, OutputLog, String, TimeWindow]{ >>>>>>>> >>>>>>>> var ueState: ValueState[RawLog] = _ >>>>>>>> @transient var gZipThriftSerializer: GZipThriftSerializer[MyType] = _ >>>>>>>> val invalidCounter = new LongCounter() >>>>>>>> val processCounter = new LongCounter() >>>>>>>> val sendToKafkaCounter = new LongCounter() >>>>>>>> >>>>>>>> override def open(parameters: Configuration): Unit = { >>>>>>>> ueState = getRuntimeContext.getState( >>>>>>>> new ValueStateDescriptor[RawLog](bidState, classOf[RawLog]) >>>>>>>> ) >>>>>>>> gZipThriftSerializer = new GZipThriftSerializer[MyType]() >>>>>>>> getRuntimeContext.addAccumulator("processCounter", >>>>>>>> this.processCounter) >>>>>>>> getRuntimeContext.addAccumulator("invalidCounter", >>>>>>>> this.invalidCounter) >>>>>>>> getRuntimeContext.addAccumulator("sendToKafkaCounter", >>>>>>>> this.sendToKafkaCounter) >>>>>>>> } >>>>>>>> >>>>>>>> override def process(key: String, >>>>>>>> ctx: Context, >>>>>>>> logs: Iterable[RawLog], >>>>>>>> out: Collector[OutputLog]): Unit = { >>>>>>>> if (ueState.value() != null) { >>>>>>>> processCounter.add(1L) >>>>>>>> val bid = ueState.value() >>>>>>>> val bidLog = >>>>>>>> gZipThriftSerializer.decompressAndDeserialize(bid.payload, >>>>>>>> classOf[MyType]) >>>>>>>> logs.foreach( log => { >>>>>>>> if (log.eventType == SHOW) { >>>>>>>> val showLog = >>>>>>>> gZipThriftSerializer.decompressAndDeserialize(log.payload, >>>>>>>> classOf[MyType]) >>>>>>>> sendToKafkaCounter.add(1L) >>>>>>>> out.collect(new OutputLog(ThriftUtils.serialize(showLog), >>>>>>>> Utils.getOutputTopic(showLog))) >>>>>>>> } >>>>>>>> }) >>>>>>>> } else { >>>>>>>> invalidCounter.add(1L) >>>>>>>> } >>>>>>>> } >>>>>>>> } >>>>>>>> >>>>>>>> class JoinTrigger extends Trigger[RawLog, TimeWindow] { >>>>>>>> >>>>>>>> override def onElement(log: RawLog, >>>>>>>> timestamp: Long, >>>>>>>> window: TimeWindow, >>>>>>>> ctx: Trigger.TriggerContext): TriggerResult = >>>>>>>> { >>>>>>>> val ueState: ValueState[RawLog] = ctx.getPartitionedState( >>>>>>>> new ValueStateDescriptor[RawLog](bidState, classOf[RawLog]) >>>>>>>> ) >>>>>>>> val firstSeen: ValueState[Boolean] = ctx.getPartitionedState( >>>>>>>> new ValueStateDescriptor[Boolean](initState, classOf[Boolean])) >>>>>>>> >>>>>>>> if (!firstSeen.value()) { >>>>>>>> ctx.registerEventTimeTimer(window.getEnd) >>>>>>>> firstSeen.update(true) >>>>>>>> } >>>>>>>> val eventType = log.eventType >>>>>>>> if (eventType == BID) { >>>>>>>> ueState.update(log) >>>>>>>> TriggerResult.CONTINUE >>>>>>>> } else { >>>>>>>> if (ueState.value() == null) { >>>>>>>> TriggerResult.CONTINUE >>>>>>>> } else { >>>>>>>> TriggerResult.FIRE >>>>>>>> } >>>>>>>> } >>>>>>>> } >>>>>>>> >>>>>>>> override def onEventTime(timestamp: Long, >>>>>>>> window: TimeWindow, >>>>>>>> ctx: Trigger.TriggerContext): TriggerResult >>>>>>>> = { >>>>>>>> if (timestamp == window.getEnd) { >>>>>>>> TriggerResult.PURGE >>>>>>>> } >>>>>>>> TriggerResult.CONTINUE >>>>>>>> } >>>>>>>> >>>>>>>> override def onProcessingTime(timestamp: Long, >>>>>>>> window: TimeWindow, >>>>>>>> ctx: Trigger.TriggerContext): >>>>>>>> TriggerResult = { >>>>>>>> TriggerResult.CONTINUE >>>>>>>> } >>>>>>>> >>>>>>>> override def clear(window: TimeWindow, >>>>>>>> ctx: Trigger.TriggerContext): Unit = { >>>>>>>> val ueState: ValueState[RawLog] = ctx.getPartitionedState( >>>>>>>> new ValueStateDescriptor[RawLog](bidState, classOf[RawLog]) >>>>>>>> ) >>>>>>>> ueState.clear() >>>>>>>> >>>>>>>> val firstSeen: ValueState[Boolean] = ctx.getPartitionedState( >>>>>>>> new ValueStateDescriptor[Boolean](initState, classOf[Boolean])) >>>>>>>> firstSeen.clear() >>>>>>>> >>>>>>>> ctx.deleteEventTimeTimer(window.getEnd) >>>>>>>> } >>>>>>>> } >>>>>>>> >>>>>>>> class JoinEvictor extends Evictor[RawLog, TimeWindow] { >>>>>>>> >>>>>>>> override def evictBefore(elements: >>>>>>>> JIterable[TimestampedValue[RawLog]], >>>>>>>> size: Int, >>>>>>>> window: TimeWindow, >>>>>>>> evictorContext: Evictor.EvictorContext): >>>>>>>> Unit = {} >>>>>>>> >>>>>>>> override def evictAfter(elements: >>>>>>>> JIterable[TimestampedValue[RawLog]], >>>>>>>> size: Int, >>>>>>>> window: TimeWindow, >>>>>>>> evictorContext: Evictor.EvictorContext): >>>>>>>> Unit = { >>>>>>>> val iter = elements.iterator() >>>>>>>> while (iter.hasNext) { >>>>>>>> iter.next() >>>>>>>> iter.remove() >>>>>>>> } >>>>>>>> } >>>>>>>> } >>>>>>>> >>>>>>>> >>>>>>>> Khachatryan Roman <khachatryan.ro...@gmail.com> 于2020年7月2日周四 >>>>>>>> 下午7:18写道: >>>>>>>> >>>>>>>>> Thanks for the clarification. >>>>>>>>> >>>>>>>>> Can you also share the code of other parts, particularly >>>>>>>>> MyFunction? >>>>>>>>> >>>>>>>>> Regards, >>>>>>>>> Roman >>>>>>>>> >>>>>>>>> >>>>>>>>> On Thu, Jul 2, 2020 at 12:49 PM Si-li Liu <unix...@gmail.com> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Rocksdb backend has the same problem >>>>>>>>>> >>>>>>>>>> Khachatryan Roman <khachatryan.ro...@gmail.com> 于2020年7月2日周四 >>>>>>>>>> 下午6:11写道: >>>>>>>>>> >>>>>>>>>>> Thanks for reporting this. >>>>>>>>>>> >>>>>>>>>>> Looks like the window namespace was replaced by VoidNamespace in >>>>>>>>>>> state entry. >>>>>>>>>>> I've created https://issues.apache.org/jira/browse/FLINK-18464 to >>>>>>>>>>> further investigate it. >>>>>>>>>>> >>>>>>>>>>> Regards, >>>>>>>>>>> Roman >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Thu, Jul 2, 2020 at 10:52 AM Si-li Liu <unix...@gmail.com> >>>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>> I'm using flink 1.9 on Mesos and I try to use my own trigger >>>>>>>>>>>> and evictor. The state is stored to memory. >>>>>>>>>>>> >>>>>>>>>>>> input.setParallelism(processParallelism) >>>>>>>>>>>> .assignTimestampsAndWatermarks(new UETimeAssigner) >>>>>>>>>>>> .keyBy(_.key) >>>>>>>>>>>> .window(TumblingEventTimeWindows.of(Time.minutes(20))) >>>>>>>>>>>> .trigger(new MyTrigger) >>>>>>>>>>>> .evictor(new MyEvictor) >>>>>>>>>>>> .process(new >>>>>>>>>>>> MyFunction).setParallelism(aggregateParallelism) >>>>>>>>>>>> .addSink(kafkaSink).setParallelism(sinkParallelism) >>>>>>>>>>>> .name("kafka-record-sink") >>>>>>>>>>>> >>>>>>>>>>>> And the exception stack is here, could anyone help with this? >>>>>>>>>>>> Thanks! >>>>>>>>>>>> >>>>>>>>>>>> java.lang.Exception: Could not materialize checkpoint 1 for >>>>>>>>>>>> operator Window(TumblingEventTimeWindows(1200000), JoinTrigger, >>>>>>>>>>>> JoinEvictor, ScalaProcessWindowFunctionWrapper) -> Sink: >>>>>>>>>>>> kafka-record-sink (2/5). >>>>>>>>>>>> at org.apache.flink.streaming.runtime.tasks. >>>>>>>>>>>> StreamTask$AsyncCheckpointRunnable.handleExecutionException( >>>>>>>>>>>> StreamTask.java:1100) >>>>>>>>>>>> at org.apache.flink.streaming.runtime.tasks. >>>>>>>>>>>> StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1042) >>>>>>>>>>>> at java.util.concurrent.ThreadPoolExecutor.runWorker( >>>>>>>>>>>> ThreadPoolExecutor.java:1149) >>>>>>>>>>>> at java.util.concurrent.ThreadPoolExecutor$Worker.run( >>>>>>>>>>>> ThreadPoolExecutor.java:624) >>>>>>>>>>>> at java.lang.Thread.run(Thread.java:748) >>>>>>>>>>>> Caused by: java.util.concurrent.ExecutionException: java.lang. >>>>>>>>>>>> ClassCastException: >>>>>>>>>>>> org.apache.flink.streaming.api.windowing.windows.TimeWindow >>>>>>>>>>>> cannot be cast to org.apache.flink.runtime.state.VoidNamespace >>>>>>>>>>>> at java.util.concurrent.FutureTask.report(FutureTask.java: >>>>>>>>>>>> 122) >>>>>>>>>>>> at java.util.concurrent.FutureTask.get(FutureTask.java:192) >>>>>>>>>>>> at org.apache.flink.runtime.concurrent.FutureUtils >>>>>>>>>>>> .runIfNotDoneAndGet(FutureUtils.java:450) >>>>>>>>>>>> at org.apache.flink.streaming.api.operators. >>>>>>>>>>>> OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer >>>>>>>>>>>> .java:47) >>>>>>>>>>>> at org.apache.flink.streaming.runtime.tasks. >>>>>>>>>>>> StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1011) >>>>>>>>>>>> ... 3 more >>>>>>>>>>>> Caused by: java.lang.ClassCastException: >>>>>>>>>>>> org.apache.flink.streaming.api.windowing.windows.TimeWindow >>>>>>>>>>>> cannot be cast to org.apache.flink.runtime.state.VoidNamespace >>>>>>>>>>>> at org.apache.flink.runtime.state.VoidNamespaceSerializer >>>>>>>>>>>> .serialize(VoidNamespaceSerializer.java:32) >>>>>>>>>>>> at org.apache.flink.runtime.state.heap. >>>>>>>>>>>> CopyOnWriteStateMapSnapshot.writeState( >>>>>>>>>>>> CopyOnWriteStateMapSnapshot.java:114) >>>>>>>>>>>> at org.apache.flink.runtime.state.heap. >>>>>>>>>>>> AbstractStateTableSnapshot.writeStateInKeyGroup( >>>>>>>>>>>> AbstractStateTableSnapshot.java:121) >>>>>>>>>>>> at org.apache.flink.runtime.state.heap. >>>>>>>>>>>> CopyOnWriteStateTableSnapshot.writeStateInKeyGroup( >>>>>>>>>>>> CopyOnWriteStateTableSnapshot.java:37) >>>>>>>>>>>> at org.apache.flink.runtime.state.heap. >>>>>>>>>>>> HeapSnapshotStrategy$1.callInternal(HeapSnapshotStrategy.java: >>>>>>>>>>>> 191) >>>>>>>>>>>> at org.apache.flink.runtime.state.heap. >>>>>>>>>>>> HeapSnapshotStrategy$1.callInternal(HeapSnapshotStrategy.java: >>>>>>>>>>>> 158) >>>>>>>>>>>> at org.apache.flink.runtime.state.AsyncSnapshotCallable >>>>>>>>>>>> .call(AsyncSnapshotCallable.java:75) >>>>>>>>>>>> at java.util.concurrent.FutureTask.run(FutureTask.java:266) >>>>>>>>>>>> at org.apache.flink.runtime.concurrent.FutureUtils >>>>>>>>>>>> .runIfNotDoneAndGet(FutureUtils.java:447) >>>>>>>>>>>> ... 5 more >>>>>>>>>>>> >>>>>>>>>>>> -- >>>>>>>>>>>> Best regards >>>>>>>>>>>> >>>>>>>>>>>> Sili Liu >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> -- >>>>>>>>>> Best regards >>>>>>>>>> >>>>>>>>>> Sili Liu >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>> -- >>>>>>>> Best regards >>>>>>>> >>>>>>>> Sili Liu >>>>>>>> >>>>>>> >>>>>> >>>>>> -- >>>>>> Best regards >>>>>> >>>>>> Sili Liu >>>>>> >>>>> >>>> >>>> -- >>>> Best regards >>>> >>>> Sili Liu >>>> >>> >> >> -- >> Best regards >> >> Sili Liu >> > -- Best regards Sili Liu