I checked the logs thanks to Paul's suggestion. I see a couple interesting
things. Restoring into 1.8 from a 1.4 savepoint, some TMs receive partial
state (e.g. only a partition/offset pair or two per TM -- we have 8
partitions on this topic). I'm not sure if this is normal (e.g. maybe TMs
only used to receive the state for which they care). I focused on one
topic, and I noticed that for at least 1 partition there is no restored
state. Regardless of there being some state, it appears that all consumers
are starting from scratch. What's also weird is that again we start from
earliest offsets. The partition/offset state that is "restored" looks
healthy -- e.g. valid partitions and offsets. We use the default of
setStartFromGroupOffsets as well as the default Kafka option for
auto.offset.reset. I believe should cause it to read from latest in the
absence of state, not earliest. We are using the same consumer group as the
legacy 1.4 app that we are restoring from, and shutting off the 1.4 job
before starting our new cluster up.

I also received some of these errors:

2019-05-24 22:10:57,479 INFO
 org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  - No
restore state for FlinkKafkaConsumer.
2019-05-24 22:10:59,511 INFO
 org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  - No
restore state for FlinkKafkaConsumer.
2019-05-24 22:10:59,524 INFO
 org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  - No
restore state for FlinkKafkaConsumer.

Using a 1.8 savepoint and restoring to 1.8, I see that each TM receives all
state for the cluster, e.g. each receives every partition/offset pair for
the topic I'm digging into. I also see none of the errors above. There is
no inrush of data -- it appears to be restoring from known offsets well.

Is there some change in how state is managed or what state is stored
between these versions that can cause this? I can post more of the logs if
it is of help. Is there some intermediate version of Flink (1.5-1.7) that
we'd be able to restore / create a savepoint from to ensure the continuity
of our state in 1.8? Any other thoughts?

Thanks again,

Nik Davis
Senior Software Engineer
New Relic


On Fri, May 24, 2019 at 12:26 AM Paul Lam <paullin3...@gmail.com> wrote:

> Hi Nik,
>
> Could you check outt the taskmanagers’ logs? When restored from a
> savepoint/checkpoint, FlinkKafkaConsumer would log the starting offset of
> Kafka partitions.
>
> WRT `auto.offset.rest` in Kafka configuration, it’s of a relatively low
> priority, and would only be used when there’s no restored state plus
> FlinkKafkaConsumer is set to `startFromGroupOffset`.
>
> Best,
> Paul Lam
>
> 在 2019年5月24日,07:50,Nikolas Davis <nda...@newrelic.com> 写道:
>
> Howdy,
>
> We're in the process of upgrading to 1.8. When restoring state to the new
> cluster (using a savepoint) we are seeing our Kafka consumers restart from
> the earliest offset. We're not receiving any other indication that our
> state was not accepted as part of the deploy, e.g. we are not allowing
> unrestored state, not receiving any errors.
>
> We have our consumers setup with the same consumer group and using the
> same consumer (FlinkKafkaConsumer010) as our 1.4 deploy.
>
> Has anyone encountered this? Any idea what we might be doing wrong?
>
> What's also strange is that we are not setting auto.offset.reset, which
> defaults to is largest (analogous to latest, correct?) -- which is not what
> we're seeing happen.
>
> Regards,
>
> Nik
>
>
>

Reply via email to