Thanks for your complete answer Arvid, we will try to approach all
things you mentioned, but take into account we are using Beam on top of
Flink, so, to be honest, I don't know how could we implement the custom
serialization thing (
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/state/custom_serialization.html#the-typeserializersnapshot-abstraction
) there. Could you please give us some hints? Thanks

On Mon, 2020-05-18 at 12:41 +0200, Arvid Heise wrote:
> Hi Ivan,
>
> First let's address the issue with idle partitions. The solution is
> to use a watermark assigner that also emits a watermark with some
> idle timeout [1].
>
> Now the second question, on why Kafka commits are committed for in-
> flight, checkpointed data. The basic idea is that you are not losing
> data while avoiding replicated output.
> So if you commit offsets only after data has been fully processed,
> upon crash the same data point would be reprocessed jointly with the
> restored in-flight data, so you get duplicate messages in your
> system.
> To avoid duplicates data needs to be more or less completely flushed
> out the system before a checkpoint is performed. That would produce a
> huge downtime.
> Instead, we assume that we can always resume from the checkpoints.
>
> Which leads to the last question on what to do when your pipeline has
> breaking changes.
> First strategy is to avoid breaking changes as much as possible.
> State could for example also be stored as Avro to allow schema
> evolution. Minor things like renamed operators will not happen with a
> bit more expertise.
> Second strategy is to use state migration [2]. Alternatively, you can
> manually convert state with state processor API [3].
> Last option is to do a full reprocessing of data. This can be done on
> a non-production cluster and then a savepoint can be used to
> bootstrap the production cluster quickly. This option needs to be
> available anyways for the case that you find any logic error. But of
> course, this option has the highest implications (may need to purge
> sink beforehand).
>
> [1]
> https://github.com/ververica/flink-training-troubleshooting/blob/master/src/main/java/com/ververica/flinktraining/solutions/troubleshoot/TroubledStreamingJobSolution2.java#L129-L187
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/state/custom_serialization.html#the-typeserializersnapshot-abstraction
> [3]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html
>
> On Fri, May 15, 2020 at 2:58 PM Ivan San Jose <
> isanj...@theworkshop.com> wrote:
> > Hi, we are starting to use Beam with Flink as runner on our
> > applications, and recently we would like to get advantages that
> > Flink
> > checkpoiting provides, but it seems we are not understanding it
> > clearly.
> >
> > Simplifying, our application does the following:
> >   - Read meesages from a couple of Kafka topics
> >   - Combine them
> >   - Write combination result to a sink (Exasol DB)
> >
> > As application is processing messages using event time, and one of
> > the
> > topics is almost idle, the first time application is started
> > messages
> > are stuck in the combiner because watermark don't advance until we
> > have
> > messages arriving onto idled topic (we know this and is not a
> > problem
> > for us though).
> >
> > The problem is that we've observed, if a checkpoint is triggered
> > when
> > messages are still stuck in the combiner, surprisingly for us, the
> > checkpoint finishes successfully (and offsets committed to Kafka)
> > even
> > messages haven't progressed to the sink yet. Is this expected?
> >
> > The thing is that, if in the future, we make not state compatible
> > changes in application source code, checkpoint taken couldn't be
> > restored. So we would like to start the application without using
> > any
> > checkpoint but without losing data.
> > Problem here would be that data loss would happen because messages
> > stuck in combiner are already committed to Kafka and application
> > would
> > start to read from latest commited offset in Kafka if we don't use
> > any
> > checkpoint, thus those messages are not going to be read from the
> > source again.
> >
> > So, I guess our question is how are you doing in order to not lose
> > data
> > when developing applications, because sooner or later you are going
> > to
> > add breaking changes...
> >
> > For example, we've seen those two errors so far:
> >   - After changing an operator name:
> >
> > 2020-05-13 07:23:52,248 ERROR Fatal error occurred in the cluster
> > entrypoint.
> > org.apache.flink.runtime.dispatcher.DispatcherException: Failed to
> > take
> > leadership with session id 00000000-0000-0000-0000-000000000000.
> > ...
> > Caused by: org.apache.flink.runtime.client.JobExecutionException:
> > Could
> > not set up JobManager
> >     at
> > org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManag
> > erRu
> > nner.java:152)
> >     at
> > org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.
> > crea
> > teJobManagerRunner(DefaultJobManagerRunnerFactory.java:83)
> >     at
> > org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobMana
> > gerR
> > unner$5(Dispatcher.java:375)
> >     at
> > org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(C
> > heck
> > edSupplier.java:34)
> >     ... 7 more
> > Caused by: java.lang.IllegalStateException: Failed to rollback to
> > checkpoint/savepoint hdfs://RTDWLTDEV/data/lake/processing/flink-
> > savepoints/holly-reconciliation-fact/savepoint-90ab28-bcc1f65a0986.
> > Cannot map checkpoint/savepoint state for operator
> > f476451c6210bd2783f36fa331b9da5e to the new program, because the
> > operator is not available in the new program. If you want to allow
> > to
> > skip this, you can set the --allowNonRestoredState option on the
> > CLI.
> >     at
> > org.apache.flink.runtime.checkpoint.Checkpoints.loadAndValidateChec
> > kpoi
> > nt(Checkpoints.java:205)
> > ...
> >
> >   - After modifying a Java model class involved in a combine:
> > org.apache.flink.runtime.state.BackendBuildingException: Failed
> > when
> > trying to restore heap backend
> >     at
> > org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.bu
> > ild(
> > HeapKeyedStateBackendBuilder.java:116)
> >     at
> > org.apache.flink.runtime.state.filesystem.FsStateBackend.createKeye
> > dSta
> > teBackend(FsStateBackend.java:529)
> >     at
> > org.apache.flink.streaming.api.operators.StreamTaskStateInitializer
> > Impl
> > .lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:29
> > 1)
> >     at
> > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.a
> > ttem
> > ptCreateAndRestore(BackendRestorerProcedure.java:142)
> >     at
> > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.c
> > reat
> > eAndRestore(BackendRestorerProcedure.java:121)
> >     at
> > org.apache.flink.streaming.api.operators.StreamTaskStateInitializer
> > Impl
> > .keyedStatedBackend(StreamTaskStateInitializerImpl.java:307)
> >     at
> > org.apache.flink.streaming.api.operators.StreamTaskStateInitializer
> > Impl
> > .streamOperatorStateContext(StreamTaskStateInitializerImpl.java:135
> > )
> >     at
> > org.apache.flink.streaming.api.operators.AbstractStreamOperator.ini
> > tial
> > izeState(AbstractStreamOperator.java:253)
> >     at
> > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState
> > (Str
> > eamTask.java:881)
> >     at
> > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTa
> > sk.j
> > ava:395)
> >     at
> > org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
> >     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
> >     at java.lang.Thread.run(Thread.java:748)
> > Caused by: java.io.InvalidClassException:
> > internal.holly.beatrix.wallet.walletfact.model.WalletMetadata;
> > local
> > class incompatible: stream classdesc serialVersionUID =
> > 8366890161513008789, local class serialVersionUID =
> > 174312384610985998
> >     at
> > java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:699)
> >
> >
> > Apologies in advance as we are new to Flink, so may be we are
> > missing
> > something obvious here.
> >
> > Thanks
> >
> >
> > Este correo electrónico y sus adjuntos son de naturaleza
> > confidencial. A no ser que usted sea el destinatario, no puede
> > utilizar, copiar o desvelar tanto el mensaje como cualquier
> > información contenida en el mensaje. Si no es el destinatario, debe
> > borrar este correo y notificar al remitente inmediatamente.
> > Cualquier punto de vista u opinión expresada en este correo
> > electrónico son únicamente del remitente, a no ser que se indique
> > lo contrario. Todos los derechos de autor en cualquier material de
> > este correo son reservados. Todos los correos electrónicos,
> > salientes o entrantes, pueden ser grabados y monitorizados para uso
> > legítimo del negocio. Nos encontramos exentos de toda
> > responsabilidad ante cualquier perdida o daño que surja o resulte
> > de la recepción, uso o transmisión de este correo electrónico hasta
> > el máximo permitido por la ley.
> >
> > This email and any attachment to it are confidential. Unless you
> > are the intended recipient, you may not use, copy or disclose
> > either the message or any information contained in the message. If
> > you are not the intended recipient, you should delete this email
> > and notify the sender immediately. Any views or opinions expressed
> > in this email are those of the sender only, unless otherwise
> > stated. All copyright in any of the material in this email is
> > reserved. All emails, incoming and outgoing, may be recorded and
> > monitored for legitimate business purposes. We exclude all
> > liability for any loss or damage arising or resulting from the
> > receipt, use or transmission of this email to the fullest extent
> > permitted by law.
>
>


Este correo electrónico y sus adjuntos son de naturaleza confidencial. A no ser 
que usted sea el destinatario, no puede utilizar, copiar o desvelar tanto el 
mensaje como cualquier información contenida en el mensaje. Si no es el 
destinatario, debe borrar este correo y notificar al remitente inmediatamente. 
Cualquier punto de vista u opinión expresada en este correo electrónico son 
únicamente del remitente, a no ser que se indique lo contrario. Todos los 
derechos de autor en cualquier material de este correo son reservados. Todos 
los correos electrónicos, salientes o entrantes, pueden ser grabados y 
monitorizados para uso legítimo del negocio. Nos encontramos exentos de toda 
responsabilidad ante cualquier perdida o daño que surja o resulte de la 
recepción, uso o transmisión de este correo electrónico hasta el máximo 
permitido por la ley.

This email and any attachment to it are confidential. Unless you are the 
intended recipient, you may not use, copy or disclose either the message or any 
information contained in the message. If you are not the intended recipient, 
you should delete this email and notify the sender immediately. Any views or 
opinions expressed in this email are those of the sender only, unless otherwise 
stated. All copyright in any of the material in this email is reserved. All 
emails, incoming and outgoing, may be recorded and monitored for legitimate 
business purposes. We exclude all liability for any loss or damage arising or 
resulting from the receipt, use or transmission of this email to the fullest 
extent permitted by law.

Reply via email to