Hi, Peng. Maybe you could check your same codes in your different jars, shared configurations between two jobs (due to your session mode). If these are all separate, the consumer should not affect the producer from my experience.
On Sun, Jun 11, 2023 at 10:01 AM Peng Peng <pengpeng8...@gmail.com> wrote: > Hi Hangxiang, > > Thanks for your reply. I don't think these 2 jobs have any dependencies, > they are packaged in different jars, although they run on the same cluster > in session mode. The producer job does some filtering and sends it to kafka > using customized serialization logic for serializing key and value into > byte arrays. The consumer job then deserializes it from the same kafka > topic with customized deserialization logic for keys and values, and then > proceeds with some further processing. I only turned of kryo for the > consumer job, I don't know why the producer job is affected and it can't > resume from checkpoint any more.. > > The error message details are as follows: > > org.apache.flink.runtime.JobException: Recovery is suppressed by > FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=50, > backoffTimeMS=10000) > > at > org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138) > > at > org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82) > > at > org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:216) > > at > org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:206) > > at > org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:197) > > at > org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:682) > > at > org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:79) > > at > org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:435) > > at sun.reflect.GeneratedMethodAccessor79.invoke(Unknown Source) > > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > > at java.lang.reflect.Method.invoke(Method.java:498) > > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305) > > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212) > > at > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77) > > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158) > > at > akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) > > at > akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) > > at > scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) > > at > akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) > > at > scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) > > at > scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) > > at > scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) > > at akka.actor.Actor$class.aroundReceive(Actor.scala:517) > > at > akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) > > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) > > at akka.actor.ActorCell.invoke(ActorCell.scala:561) > > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) > > at akka.dispatch.Mailbox.run(Mailbox.scala:225) > > at akka.dispatch.Mailbox.exec(Mailbox.scala:235) > > at > akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > > at > akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > > at > akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > > at > akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > > Caused by: java.lang.RuntimeException: Error while getting state > > at > org.apache.flink.runtime.state.DefaultKeyedStateStore.getListState(DefaultKeyedStateStore.java:74) > > at > org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getListState(StreamingRuntimeContext.java:210) > > at > com.visa.flink.functions.DynamicFilterFunction.open(DynamicFilterFunction.scala:50) > > at > org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34) > > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) > > at > org.apache.flink.streaming.api.operators.co.CoBroadcastWithKeyedOperator.open(CoBroadcastWithKeyedOperator.java:91) > > at > org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:442) > > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:585) > > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55) > > at > org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:565) > > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:650) > > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:540) > > at > org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:759) > > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) > > at java.lang.Thread.run(Thread.java:750) > > Caused by: org.apache.flink.util.StateMigrationException: For heap > backends, the new state serializer ( > org.apache.flink.api.common.typeutils.base.ListSerializer@8b6e1a2a) must > not be incompatible with the old state serializer ( > org.apache.flink.api.common.typeutils.base.ListSerializer@cab0f895). > > at > org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.tryRegisterStateTable(HeapKeyedStateBackend.java:211) > > at > org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.createInternalState(HeapKeyedStateBackend.java:276) > > at > org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47) > > at > org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:71) > > at > org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:301) > > at > org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:352) > > at > org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:115) > > at > org.apache.flink.runtime.state.DefaultKeyedStateStore.getListState(DefaultKeyedStateStore.java:71) > > ... 14 more > > > Thanks, > > Peng > > On Tue, Jun 6, 2023 at 9:53 PM Hangxiang Yu <master...@gmail.com> wrote: > >> HI, Peng. >> Do these two jobs have any dependency? >> Or Could you please share the specific logic of the two jobs if >> convenient ? Could you also share the failure message of the producer >> job ? >> In my opinion, if the two tasks have no other association, as you said, >> the consumer job will fail due to unsupported scheme evolution, but it >> should not affect the producer job. >> >> >> On Tue, Jun 6, 2023 at 2:58 PM Peng Peng <pengpeng8...@gmail.com> wrote: >> >>> Hi, >>> >>> I have 2 flink jobs, of which one produces records to kafka using kryo >>> serializer and the other consumes records from kafka and deserializes with >>> kryo. It has been working fine. Then I stopped both jobs with checkpoints >>> and changed the consumer job to disable generic type and kryo to use avro >>> serialization. However, when I resumed the 2 jobs from the checkpoint, both >>> failed. It made sense the consumer job would fail, but why is the producer >>> job also affected? >>> >>> Thanks, >>> Peng >>> >> >> >> -- >> Best, >> Hangxiang. >> > -- Best, Hangxiang.