Hi,

Thanks for the details.
However, I was not able to reproduce the issue. I used parallelism levels
4, file system backend and tried different timings for
checkpointing, windowing and source.
Do you encounter this problem deterministically, is it always 1st
checkpoint?
What checkpointing interval do you use?

Regards,
Roman


On Thu, Jul 2, 2020 at 1:57 PM Si-li Liu <unix...@gmail.com> wrote:

> Hi, this is our production code so I have to modify it a little bit, such
> as variable name and function name. I think 3 classes I provide here is
> enough.
>
> I try to join two streams, but I don't want to use the default join
> function, because I want to send the joined log immediately and remove it
> from window state immediately. And my window gap time is very long( 20
> minutes), so it maybe evaluate it multiple times.
>
> class JoinFunction extends
>   ProcessWindowFunction[RawLog, OutputLog, String, TimeWindow]{
>
>   var ueState: ValueState[RawLog] = _
>   @transient var gZipThriftSerializer: GZipThriftSerializer[MyType] = _
>   val invalidCounter = new LongCounter()
>   val processCounter = new LongCounter()
>   val sendToKafkaCounter = new LongCounter()
>
>   override def open(parameters: Configuration): Unit = {
>     ueState = getRuntimeContext.getState(
>       new ValueStateDescriptor[RawLog](bidState, classOf[RawLog])
>     )
>     gZipThriftSerializer = new GZipThriftSerializer[MyType]()
>     getRuntimeContext.addAccumulator("processCounter", this.processCounter)
>     getRuntimeContext.addAccumulator("invalidCounter", this.invalidCounter)
>     getRuntimeContext.addAccumulator("sendToKafkaCounter", 
> this.sendToKafkaCounter)
>   }
>
>   override def process(key: String,
>                        ctx: Context,
>                        logs: Iterable[RawLog],
>                        out: Collector[OutputLog]): Unit = {
>     if (ueState.value() != null) {
>       processCounter.add(1L)
>       val bid = ueState.value()
>       val bidLog = gZipThriftSerializer.decompressAndDeserialize(bid.payload, 
> classOf[MyType])
>       logs.foreach( log => {
>         if (log.eventType == SHOW) {
>           val showLog = 
> gZipThriftSerializer.decompressAndDeserialize(log.payload, classOf[MyType])
>           sendToKafkaCounter.add(1L)
>           out.collect(new OutputLog(ThriftUtils.serialize(showLog), 
> Utils.getOutputTopic(showLog)))
>         }
>       })
>     } else {
>       invalidCounter.add(1L)
>     }
>   }
> }
>
> class JoinTrigger extends Trigger[RawLog, TimeWindow] {
>
>   override def onElement(log: RawLog,
>                          timestamp: Long,
>                          window: TimeWindow,
>                          ctx: Trigger.TriggerContext): TriggerResult = {
>     val ueState: ValueState[RawLog] = ctx.getPartitionedState(
>       new ValueStateDescriptor[RawLog](bidState, classOf[RawLog])
>     )
>     val firstSeen: ValueState[Boolean] = ctx.getPartitionedState(
>       new ValueStateDescriptor[Boolean](initState, classOf[Boolean]))
>
>     if (!firstSeen.value()) {
>       ctx.registerEventTimeTimer(window.getEnd)
>       firstSeen.update(true)
>     }
>     val eventType = log.eventType
>     if (eventType == BID) {
>       ueState.update(log)
>       TriggerResult.CONTINUE
>     } else {
>       if (ueState.value() == null) {
>         TriggerResult.CONTINUE
>       } else {
>         TriggerResult.FIRE
>       }
>     }
>   }
>
>   override def onEventTime(timestamp: Long,
>                            window: TimeWindow,
>                            ctx: Trigger.TriggerContext): TriggerResult = {
>     if (timestamp == window.getEnd) {
>       TriggerResult.PURGE
>     }
>     TriggerResult.CONTINUE
>   }
>
>   override def onProcessingTime(timestamp: Long,
>                                 window: TimeWindow,
>                                 ctx: Trigger.TriggerContext): TriggerResult = 
> {
>     TriggerResult.CONTINUE
>   }
>
>   override def clear(window: TimeWindow,
>                      ctx: Trigger.TriggerContext): Unit = {
>     val ueState: ValueState[RawLog] = ctx.getPartitionedState(
>       new ValueStateDescriptor[RawLog](bidState, classOf[RawLog])
>     )
>     ueState.clear()
>
>     val firstSeen: ValueState[Boolean] = ctx.getPartitionedState(
>       new ValueStateDescriptor[Boolean](initState, classOf[Boolean]))
>     firstSeen.clear()
>
>     ctx.deleteEventTimeTimer(window.getEnd)
>   }
> }
>
> class JoinEvictor extends Evictor[RawLog, TimeWindow] {
>
>   override def evictBefore(elements: JIterable[TimestampedValue[RawLog]],
>                            size: Int,
>                            window: TimeWindow,
>                            evictorContext: Evictor.EvictorContext): Unit = {}
>
>   override def evictAfter(elements: JIterable[TimestampedValue[RawLog]],
>                            size: Int,
>                            window: TimeWindow,
>                            evictorContext: Evictor.EvictorContext): Unit = {
>     val iter = elements.iterator()
>     while (iter.hasNext) {
>       iter.next()
>       iter.remove()
>     }
>   }
> }
>
>
> Khachatryan Roman <khachatryan.ro...@gmail.com> 于2020年7月2日周四 下午7:18写道:
>
>> Thanks for the clarification.
>>
>> Can you also share the code of other parts, particularly MyFunction?
>>
>> Regards,
>> Roman
>>
>>
>> On Thu, Jul 2, 2020 at 12:49 PM Si-li Liu <unix...@gmail.com> wrote:
>>
>>> Rocksdb backend has the same problem
>>>
>>> Khachatryan Roman <khachatryan.ro...@gmail.com> 于2020年7月2日周四 下午6:11写道:
>>>
>>>> Thanks for reporting this.
>>>>
>>>> Looks like the window namespace was replaced by VoidNamespace in state
>>>> entry.
>>>> I've created https://issues.apache.org/jira/browse/FLINK-18464 to
>>>> further investigate it.
>>>>
>>>> Regards,
>>>> Roman
>>>>
>>>>
>>>> On Thu, Jul 2, 2020 at 10:52 AM Si-li Liu <unix...@gmail.com> wrote:
>>>>
>>>>> I'm using flink 1.9 on Mesos and I try to use my own trigger and
>>>>> evictor. The state is stored to memory.
>>>>>
>>>>> input.setParallelism(processParallelism)
>>>>>         .assignTimestampsAndWatermarks(new UETimeAssigner)
>>>>>         .keyBy(_.key)
>>>>>         .window(TumblingEventTimeWindows.of(Time.minutes(20)))
>>>>>         .trigger(new MyTrigger)
>>>>>         .evictor(new MyEvictor)
>>>>>         .process(new MyFunction).setParallelism(aggregateParallelism)
>>>>>         .addSink(kafkaSink).setParallelism(sinkParallelism)
>>>>>         .name("kafka-record-sink")
>>>>>
>>>>> And the exception stack is here, could anyone help with this? Thanks!
>>>>>
>>>>> java.lang.Exception: Could not materialize checkpoint 1 for operator
>>>>> Window(TumblingEventTimeWindows(1200000), JoinTrigger, JoinEvictor,
>>>>> ScalaProcessWindowFunctionWrapper) -> Sink: kafka-record-sink (2/5).
>>>>>     at org.apache.flink.streaming.runtime.tasks.
>>>>> StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask
>>>>> .java:1100)
>>>>>     at org.apache.flink.streaming.runtime.tasks.
>>>>> StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1042)
>>>>>     at java.util.concurrent.ThreadPoolExecutor.runWorker(
>>>>> ThreadPoolExecutor.java:1149)
>>>>>     at java.util.concurrent.ThreadPoolExecutor$Worker.run(
>>>>> ThreadPoolExecutor.java:624)
>>>>>     at java.lang.Thread.run(Thread.java:748)
>>>>> Caused by: java.util.concurrent.ExecutionException: java.lang.
>>>>> ClassCastException: org.apache.flink.streaming.api.windowing.windows.
>>>>> TimeWindow cannot be cast to org.apache.flink.runtime.state.
>>>>> VoidNamespace
>>>>>     at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>>>>>     at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>>>>>     at org.apache.flink.runtime.concurrent.FutureUtils
>>>>> .runIfNotDoneAndGet(FutureUtils.java:450)
>>>>>     at org.apache.flink.streaming.api.operators.
>>>>> OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:47)
>>>>>     at org.apache.flink.streaming.runtime.tasks.
>>>>> StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1011)
>>>>>     ... 3 more
>>>>> Caused by: java.lang.ClassCastException:
>>>>> org.apache.flink.streaming.api.windowing.windows.TimeWindow cannot be
>>>>> cast to org.apache.flink.runtime.state.VoidNamespace
>>>>>     at org.apache.flink.runtime.state.VoidNamespaceSerializer
>>>>> .serialize(VoidNamespaceSerializer.java:32)
>>>>>     at org.apache.flink.runtime.state.heap.CopyOnWriteStateMapSnapshot
>>>>> .writeState(CopyOnWriteStateMapSnapshot.java:114)
>>>>>     at org.apache.flink.runtime.state.heap.AbstractStateTableSnapshot
>>>>> .writeStateInKeyGroup(AbstractStateTableSnapshot.java:121)
>>>>>     at org.apache.flink.runtime.state.heap.
>>>>> CopyOnWriteStateTableSnapshot.writeStateInKeyGroup(
>>>>> CopyOnWriteStateTableSnapshot.java:37)
>>>>>     at org.apache.flink.runtime.state.heap.HeapSnapshotStrategy$1
>>>>> .callInternal(HeapSnapshotStrategy.java:191)
>>>>>     at org.apache.flink.runtime.state.heap.HeapSnapshotStrategy$1
>>>>> .callInternal(HeapSnapshotStrategy.java:158)
>>>>>     at org.apache.flink.runtime.state.AsyncSnapshotCallable.call(
>>>>> AsyncSnapshotCallable.java:75)
>>>>>     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>>>     at org.apache.flink.runtime.concurrent.FutureUtils
>>>>> .runIfNotDoneAndGet(FutureUtils.java:447)
>>>>>     ... 5 more
>>>>>
>>>>> --
>>>>> Best regards
>>>>>
>>>>> Sili Liu
>>>>>
>>>>
>>>
>>> --
>>> Best regards
>>>
>>> Sili Liu
>>>
>>
>
> --
> Best regards
>
> Sili Liu
>

Reply via email to