Hi Robert,

Based on
https://stackoverflow.com/questions/72870074/apache-flink-restoring-state-from-checkpoint-with-changes-kafka-topic
I think you'll need to change the UID for your KafkaSource and restart your
job with allowNonRestoredState enabled.

Best regards,

Martijn

On Tue, Oct 4, 2022 at 12:40 PM Robert Cullen <cinquate...@gmail.com> wrote:

> We've changed the KafkaSource to ingest from a new topic but the old name
> is still being referenced:
>
> 2022-10-04 07:03:41org.apache.flink.util.FlinkException: Global failure 
> triggered by OperatorCoordinator for 'Source: Grokfailures' (operator 
> feca28aff5a3958840bee985ee7de4d3).      at 
> org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.failJob(OperatorCoordinatorHolder.java:553)
>       at 
> org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$QuiesceableContext.failJob(RecreateOnResetOperatorCoordinator.java:223)
>    at 
> org.apache.flink.runtime.source.coordinator.SourceCoordinatorContext.failJob(SourceCoordinatorContext.java:285)
>       at 
> org.apache.flink.runtime.source.coordinator.SourceCoordinatorContext.handleUncaughtExceptionFromAsyncCall(SourceCoordinatorContext.java:298)
>  at 
> org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:42)
>        at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>       at java.lang.Thread.run(Thread.java:748)Caused by: 
> org.apache.flink.util.FlinkRuntimeException: Failed to handle partition 
> splits change due to         at 
> org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.handlePartitionSplitChanges(KafkaSourceEnumerator.java:239)
>  at 
> org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$null$1(ExecutorNotifier.java:86)
>  at 
> org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:40)
>        ... 3 moreCaused by: java.lang.RuntimeException: Failed to get topic 
> metadata.  at 
> org.apache.flink.connector.kafka.source.enumerator.subscriber.TopicListSubscriber.getPartitionChanges(TopicListSubscriber.java:59)
>    at 
> org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.discoverAndInitializePartitionSplit(KafkaSourceEnumerator.java:212)
>  at 
> org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.lambda$start$0(KafkaSourceEnumerator.java:158)
>       at 
> org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$notifyReadyAsync$2(ExecutorNotifier.java:83)
>      at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)      
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)     at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>         at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>        ... 3 moreCaused by: java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.  at 
> org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
>       at 
> org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
>         at 
> org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
>         at 
> org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
>       at 
> org.apache.flink.connector.kafka.source.enumerator.subscriber.TopicListSubscriber.getPartitionChanges(TopicListSubscriber.java:57)
>    ... 10 moreCaused by: 
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
>
>
> --
> Robert Cullen
> 240-475-4490
>

Reply via email to