Hi Kevin, Unfortunately I did not find a way to test the savepoint scenario with the MiniCluster. Savepoints are not supported in the embedded mode. There is a way to hack around that, but then the state of the enumerator won't be handled.
As for your original issue, is it reproducible consistently? Can you capture the debug log of the enumerator? Thanks, Thomas On Wed, May 4, 2022 at 10:05 AM Martijn Visser <martijnvis...@apache.org> wrote: > > Hi Kevin, > > I'm hoping that @Thomas Weise could help with the issue regarding the > recovery from the savepoint. > > Best regards, > > Martijn > > On Wed, 4 May 2022 at 17:05, Kevin Lam <kevin....@shopify.com> wrote: >> >> Following up on this, is there a good way to debug restoring from savepoints >> locally? We currently have a set-up where we use IntelliJ to run and test >> our pipelines locally, but would like an API to be able to specify the >> savepoint to restore from, without needing to spin up a full cluster. >> >> In intelliJ we just use the build and run functionality, and don't have >> access to the Flink CLI. >> >> On Tue, May 3, 2022 at 2:48 PM Kevin Lam <kevin....@shopify.com> wrote: >>> >>> Hi, >>> >>> We're encountering an error using a HybridSource that is composed of a >>> FileSource + KafkaSource, only when recovering from a savepoint [0]. This >>> HybridSource is used to read from a Kafka topic's archives hosted on GCS >>> via a bounded FileSource, and then automatically switch over to the data >>> stream from the Kafka associated topic. >>> >>> Has anyone seen this error before? >>> >>> [0] >>> ``` >>> 2022-05-03 09:47:57 >>> org.apache.flink.util.FlinkException: Global failure triggered by >>> OperatorCoordinator for 'Source: ShopAppTrackingEventUpdate_V1' (operator >>> afb3208349a953c47059c1994f800aa2). >>> at >>> org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.failJob(OperatorCoordinatorHolder.java:545) >>> at >>> org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$QuiesceableContext.failJob(RecreateOnResetOperatorCoordinator.java:223) >>> at >>> org.apache.flink.runtime.source.coordinator.SourceCoordinatorContext.failJob(SourceCoordinatorContext.java:285) >>> at >>> org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$runInEventLoop$8(SourceCoordinator.java:358) >>> at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown >>> Source) >>> at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown >>> Source) >>> at java.base/java.lang.Thread.run(Unknown Source) >>> Caused by: java.lang.NullPointerException: Source for index=0 not available >>> at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:104) >>> at >>> org.apache.flink.connector.base.source.hybrid.SwitchedSources.sourceOf(SwitchedSources.java:36) >>> at >>> org.apache.flink.connector.base.source.hybrid.HybridSourceSplitEnumerator.sendSwitchSourceEvent(HybridSourceSplitEnumerator.java:148) >>> at >>> org.apache.flink.connector.base.source.hybrid.HybridSourceSplitEnumerator.handleSourceEvent(HybridSourceSplitEnumerator.java:222) >>> at >>> org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$handleEventFromOperator$1(SourceCoordinator.java:182) >>> at >>> org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$runInEventLoop$8(SourceCoordinator.java:344) >>> ... 3 more >>> ```