I'm not sure why recovery from a savepoint would be different than from a
checkpoint but if you look for a savepoint test case, PTAL at [1].

I rather think you found some edge case in your recovery setup. Changed
degree of parallelism certainly sounds like the most likely option. Or did
you upgrade versions while recovering from a savepoint?

[1]
https://github.com/apache/flink/blob/master/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java#L393-L406

On Mon, May 9, 2022 at 1:42 AM Thomas Weise <t...@apache.org> wrote:

> One more question: Are you changing the parallelism when resuming from
> savepoint?
>
> On Sun, May 8, 2022 at 4:05 PM Thomas Weise <t...@apache.org> wrote:
> >
> > Hi Kevin,
> >
> > Unfortunately I did not find a way to test the savepoint scenario with
> > the MiniCluster. Savepoints are not supported in the embedded mode.
> > There is a way to hack around that, but then the state of the
> > enumerator won't be handled.
> >
> > As for your original issue, is it reproducible consistently? Can you
> > capture the debug log of the enumerator?
> >
> > Thanks,
> > Thomas
> >
> > On Wed, May 4, 2022 at 10:05 AM Martijn Visser <martijnvis...@apache.org>
> wrote:
> > >
> > > Hi Kevin,
> > >
> > > I'm hoping that @Thomas Weise could help with the issue regarding the
> recovery from the savepoint.
> > >
> > > Best regards,
> > >
> > > Martijn
> > >
> > > On Wed, 4 May 2022 at 17:05, Kevin Lam <kevin....@shopify.com> wrote:
> > >>
> > >> Following up on this, is there a good way to debug restoring from
> savepoints locally? We currently have a set-up where we use IntelliJ to run
> and test our pipelines locally, but would like an API to be able to specify
> the savepoint to restore from, without needing to spin up a full cluster.
> > >>
> > >> In intelliJ we just use the build and run functionality, and don't
> have access to the Flink CLI.
> > >>
> > >> On Tue, May 3, 2022 at 2:48 PM Kevin Lam <kevin....@shopify.com>
> wrote:
> > >>>
> > >>> Hi,
> > >>>
> > >>> We're encountering an error using a HybridSource that is composed of
> a FileSource + KafkaSource, only when recovering from a savepoint [0]. This
> HybridSource is used to read from a Kafka topic's archives hosted on GCS
> via a bounded FileSource, and then automatically switch over to the data
> stream from the Kafka associated topic.
> > >>>
> > >>> Has anyone seen this error before?
> > >>>
> > >>> [0]
> > >>> ```
> > >>> 2022-05-03 09:47:57
> > >>> org.apache.flink.util.FlinkException: Global failure triggered by
> OperatorCoordinator for 'Source: ShopAppTrackingEventUpdate_V1' (operator
> afb3208349a953c47059c1994f800aa2).
> > >>> at
> org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.failJob(OperatorCoordinatorHolder.java:545)
> > >>> at
> org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$QuiesceableContext.failJob(RecreateOnResetOperatorCoordinator.java:223)
> > >>> at
> org.apache.flink.runtime.source.coordinator.SourceCoordinatorContext.failJob(SourceCoordinatorContext.java:285)
> > >>> at
> org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$runInEventLoop$8(SourceCoordinator.java:358)
> > >>> at
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
> > >>> at
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
> > >>> at java.base/java.lang.Thread.run(Unknown Source)
> > >>> Caused by: java.lang.NullPointerException: Source for index=0 not
> available
> > >>> at
> org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:104)
> > >>> at
> org.apache.flink.connector.base.source.hybrid.SwitchedSources.sourceOf(SwitchedSources.java:36)
> > >>> at
> org.apache.flink.connector.base.source.hybrid.HybridSourceSplitEnumerator.sendSwitchSourceEvent(HybridSourceSplitEnumerator.java:148)
> > >>> at
> org.apache.flink.connector.base.source.hybrid.HybridSourceSplitEnumerator.handleSourceEvent(HybridSourceSplitEnumerator.java:222)
> > >>> at
> org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$handleEventFromOperator$1(SourceCoordinator.java:182)
> > >>> at
> org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$runInEventLoop$8(SourceCoordinator.java:344)
> > >>> ... 3 more
> > >>> ```
>

Reply via email to