I'm not sure why recovery from a savepoint would be different than from a checkpoint but if you look for a savepoint test case, PTAL at [1].
I rather think you found some edge case in your recovery setup. Changed degree of parallelism certainly sounds like the most likely option. Or did you upgrade versions while recovering from a savepoint? [1] https://github.com/apache/flink/blob/master/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java#L393-L406 On Mon, May 9, 2022 at 1:42 AM Thomas Weise <t...@apache.org> wrote: > One more question: Are you changing the parallelism when resuming from > savepoint? > > On Sun, May 8, 2022 at 4:05 PM Thomas Weise <t...@apache.org> wrote: > > > > Hi Kevin, > > > > Unfortunately I did not find a way to test the savepoint scenario with > > the MiniCluster. Savepoints are not supported in the embedded mode. > > There is a way to hack around that, but then the state of the > > enumerator won't be handled. > > > > As for your original issue, is it reproducible consistently? Can you > > capture the debug log of the enumerator? > > > > Thanks, > > Thomas > > > > On Wed, May 4, 2022 at 10:05 AM Martijn Visser <martijnvis...@apache.org> > wrote: > > > > > > Hi Kevin, > > > > > > I'm hoping that @Thomas Weise could help with the issue regarding the > recovery from the savepoint. > > > > > > Best regards, > > > > > > Martijn > > > > > > On Wed, 4 May 2022 at 17:05, Kevin Lam <kevin....@shopify.com> wrote: > > >> > > >> Following up on this, is there a good way to debug restoring from > savepoints locally? We currently have a set-up where we use IntelliJ to run > and test our pipelines locally, but would like an API to be able to specify > the savepoint to restore from, without needing to spin up a full cluster. > > >> > > >> In intelliJ we just use the build and run functionality, and don't > have access to the Flink CLI. > > >> > > >> On Tue, May 3, 2022 at 2:48 PM Kevin Lam <kevin....@shopify.com> > wrote: > > >>> > > >>> Hi, > > >>> > > >>> We're encountering an error using a HybridSource that is composed of > a FileSource + KafkaSource, only when recovering from a savepoint [0]. This > HybridSource is used to read from a Kafka topic's archives hosted on GCS > via a bounded FileSource, and then automatically switch over to the data > stream from the Kafka associated topic. > > >>> > > >>> Has anyone seen this error before? > > >>> > > >>> [0] > > >>> ``` > > >>> 2022-05-03 09:47:57 > > >>> org.apache.flink.util.FlinkException: Global failure triggered by > OperatorCoordinator for 'Source: ShopAppTrackingEventUpdate_V1' (operator > afb3208349a953c47059c1994f800aa2). > > >>> at > org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.failJob(OperatorCoordinatorHolder.java:545) > > >>> at > org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$QuiesceableContext.failJob(RecreateOnResetOperatorCoordinator.java:223) > > >>> at > org.apache.flink.runtime.source.coordinator.SourceCoordinatorContext.failJob(SourceCoordinatorContext.java:285) > > >>> at > org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$runInEventLoop$8(SourceCoordinator.java:358) > > >>> at > java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) > > >>> at > java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) > > >>> at java.base/java.lang.Thread.run(Unknown Source) > > >>> Caused by: java.lang.NullPointerException: Source for index=0 not > available > > >>> at > org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:104) > > >>> at > org.apache.flink.connector.base.source.hybrid.SwitchedSources.sourceOf(SwitchedSources.java:36) > > >>> at > org.apache.flink.connector.base.source.hybrid.HybridSourceSplitEnumerator.sendSwitchSourceEvent(HybridSourceSplitEnumerator.java:148) > > >>> at > org.apache.flink.connector.base.source.hybrid.HybridSourceSplitEnumerator.handleSourceEvent(HybridSourceSplitEnumerator.java:222) > > >>> at > org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$handleEventFromOperator$1(SourceCoordinator.java:182) > > >>> at > org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$runInEventLoop$8(SourceCoordinator.java:344) > > >>> ... 3 more > > >>> ``` >