Hi Ivan,

The easiest way is to use some implementation that's already there [1]. I
already mentioned Avro and would strongly recommend giving it a go. If you
make sure to provide a default value for as many fields as possible, you
can always remove them later giving you great flexibility. I can give you
more hints if you decide to go this route.

If you want to have a custom implementation, I'd start at looking of one of
the simpler implementations like MapSerializerSnapshot [2].

[1]
https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshot.html
(see known implementing classes).
[2]
https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializerSnapshot.java

On Tue, May 19, 2020 at 10:31 AM Ivan San Jose <isanj...@theworkshop.com>
wrote:

> Thanks for your complete answer Arvid, we will try to approach all
> things you mentioned, but take into account we are using Beam on top of
> Flink, so, to be honest, I don't know how could we implement the custom
> serialization thing (
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/state/custom_serialization.html#the-typeserializersnapshot-abstraction
> ) there. Could you please give us some hints? Thanks
>
> On Mon, 2020-05-18 at 12:41 +0200, Arvid Heise wrote:
> > Hi Ivan,
> >
> > First let's address the issue with idle partitions. The solution is
> > to use a watermark assigner that also emits a watermark with some
> > idle timeout [1].
> >
> > Now the second question, on why Kafka commits are committed for in-
> > flight, checkpointed data. The basic idea is that you are not losing
> > data while avoiding replicated output.
> > So if you commit offsets only after data has been fully processed,
> > upon crash the same data point would be reprocessed jointly with the
> > restored in-flight data, so you get duplicate messages in your
> > system.
> > To avoid duplicates data needs to be more or less completely flushed
> > out the system before a checkpoint is performed. That would produce a
> > huge downtime.
> > Instead, we assume that we can always resume from the checkpoints.
> >
> > Which leads to the last question on what to do when your pipeline has
> > breaking changes.
> > First strategy is to avoid breaking changes as much as possible.
> > State could for example also be stored as Avro to allow schema
> > evolution. Minor things like renamed operators will not happen with a
> > bit more expertise.
> > Second strategy is to use state migration [2]. Alternatively, you can
> > manually convert state with state processor API [3].
> > Last option is to do a full reprocessing of data. This can be done on
> > a non-production cluster and then a savepoint can be used to
> > bootstrap the production cluster quickly. This option needs to be
> > available anyways for the case that you find any logic error. But of
> > course, this option has the highest implications (may need to purge
> > sink beforehand).
> >
> > [1]
> >
> https://github.com/ververica/flink-training-troubleshooting/blob/master/src/main/java/com/ververica/flinktraining/solutions/troubleshoot/TroubledStreamingJobSolution2.java#L129-L187
> > [2]
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/state/custom_serialization.html#the-typeserializersnapshot-abstraction
> > [3]
> >
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html
> >
> > On Fri, May 15, 2020 at 2:58 PM Ivan San Jose <
> > isanj...@theworkshop.com> wrote:
> > > Hi, we are starting to use Beam with Flink as runner on our
> > > applications, and recently we would like to get advantages that
> > > Flink
> > > checkpoiting provides, but it seems we are not understanding it
> > > clearly.
> > >
> > > Simplifying, our application does the following:
> > >   - Read meesages from a couple of Kafka topics
> > >   - Combine them
> > >   - Write combination result to a sink (Exasol DB)
> > >
> > > As application is processing messages using event time, and one of
> > > the
> > > topics is almost idle, the first time application is started
> > > messages
> > > are stuck in the combiner because watermark don't advance until we
> > > have
> > > messages arriving onto idled topic (we know this and is not a
> > > problem
> > > for us though).
> > >
> > > The problem is that we've observed, if a checkpoint is triggered
> > > when
> > > messages are still stuck in the combiner, surprisingly for us, the
> > > checkpoint finishes successfully (and offsets committed to Kafka)
> > > even
> > > messages haven't progressed to the sink yet. Is this expected?
> > >
> > > The thing is that, if in the future, we make not state compatible
> > > changes in application source code, checkpoint taken couldn't be
> > > restored. So we would like to start the application without using
> > > any
> > > checkpoint but without losing data.
> > > Problem here would be that data loss would happen because messages
> > > stuck in combiner are already committed to Kafka and application
> > > would
> > > start to read from latest commited offset in Kafka if we don't use
> > > any
> > > checkpoint, thus those messages are not going to be read from the
> > > source again.
> > >
> > > So, I guess our question is how are you doing in order to not lose
> > > data
> > > when developing applications, because sooner or later you are going
> > > to
> > > add breaking changes...
> > >
> > > For example, we've seen those two errors so far:
> > >   - After changing an operator name:
> > >
> > > 2020-05-13 07:23:52,248 ERROR Fatal error occurred in the cluster
> > > entrypoint.
> > > org.apache.flink.runtime.dispatcher.DispatcherException: Failed to
> > > take
> > > leadership with session id 00000000-0000-0000-0000-000000000000.
> > > ...
> > > Caused by: org.apache.flink.runtime.client.JobExecutionException:
> > > Could
> > > not set up JobManager
> > >     at
> > > org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManag
> > > erRu
> > > nner.java:152)
> > >     at
> > > org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.
> > > crea
> > > teJobManagerRunner(DefaultJobManagerRunnerFactory.java:83)
> > >     at
> > > org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobMana
> > > gerR
> > > unner$5(Dispatcher.java:375)
> > >     at
> > > org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(C
> > > heck
> > > edSupplier.java:34)
> > >     ... 7 more
> > > Caused by: java.lang.IllegalStateException: Failed to rollback to
> > > checkpoint/savepoint hdfs://RTDWLTDEV/data/lake/processing/flink-
> > > savepoints/holly-reconciliation-fact/savepoint-90ab28-bcc1f65a0986.
> > > Cannot map checkpoint/savepoint state for operator
> > > f476451c6210bd2783f36fa331b9da5e to the new program, because the
> > > operator is not available in the new program. If you want to allow
> > > to
> > > skip this, you can set the --allowNonRestoredState option on the
> > > CLI.
> > >     at
> > > org.apache.flink.runtime.checkpoint.Checkpoints.loadAndValidateChec
> > > kpoi
> > > nt(Checkpoints.java:205)
> > > ...
> > >
> > >   - After modifying a Java model class involved in a combine:
> > > org.apache.flink.runtime.state.BackendBuildingException: Failed
> > > when
> > > trying to restore heap backend
> > >     at
> > > org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.bu
> > > ild(
> > > HeapKeyedStateBackendBuilder.java:116)
> > >     at
> > > org.apache.flink.runtime.state.filesystem.FsStateBackend.createKeye
> > > dSta
> > > teBackend(FsStateBackend.java:529)
> > >     at
> > > org.apache.flink.streaming.api.operators.StreamTaskStateInitializer
> > > Impl
> > > .lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:29
> > > 1)
> > >     at
> > > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.a
> > > ttem
> > > ptCreateAndRestore(BackendRestorerProcedure.java:142)
> > >     at
> > > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.c
> > > reat
> > > eAndRestore(BackendRestorerProcedure.java:121)
> > >     at
> > > org.apache.flink.streaming.api.operators.StreamTaskStateInitializer
> > > Impl
> > > .keyedStatedBackend(StreamTaskStateInitializerImpl.java:307)
> > >     at
> > > org.apache.flink.streaming.api.operators.StreamTaskStateInitializer
> > > Impl
> > > .streamOperatorStateContext(StreamTaskStateInitializerImpl.java:135
> > > )
> > >     at
> > > org.apache.flink.streaming.api.operators.AbstractStreamOperator.ini
> > > tial
> > > izeState(AbstractStreamOperator.java:253)
> > >     at
> > > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState
> > > (Str
> > > eamTask.java:881)
> > >     at
> > > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTa
> > > sk.j
> > > ava:395)
> > >     at
> > > org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
> > >     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
> > >     at java.lang.Thread.run(Thread.java:748)
> > > Caused by: java.io.InvalidClassException:
> > > internal.holly.beatrix.wallet.walletfact.model.WalletMetadata;
> > > local
> > > class incompatible: stream classdesc serialVersionUID =
> > > 8366890161513008789, local class serialVersionUID =
> > > 174312384610985998
> > >     at
> > > java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:699)
> > >
> > >
> > > Apologies in advance as we are new to Flink, so may be we are
> > > missing
> > > something obvious here.
> > >
> > > Thanks
> > >
> > >
> > > Este correo electrónico y sus adjuntos son de naturaleza
> > > confidencial. A no ser que usted sea el destinatario, no puede
> > > utilizar, copiar o desvelar tanto el mensaje como cualquier
> > > información contenida en el mensaje. Si no es el destinatario, debe
> > > borrar este correo y notificar al remitente inmediatamente.
> > > Cualquier punto de vista u opinión expresada en este correo
> > > electrónico son únicamente del remitente, a no ser que se indique
> > > lo contrario. Todos los derechos de autor en cualquier material de
> > > este correo son reservados. Todos los correos electrónicos,
> > > salientes o entrantes, pueden ser grabados y monitorizados para uso
> > > legítimo del negocio. Nos encontramos exentos de toda
> > > responsabilidad ante cualquier perdida o daño que surja o resulte
> > > de la recepción, uso o transmisión de este correo electrónico hasta
> > > el máximo permitido por la ley.
> > >
> > > This email and any attachment to it are confidential. Unless you
> > > are the intended recipient, you may not use, copy or disclose
> > > either the message or any information contained in the message. If
> > > you are not the intended recipient, you should delete this email
> > > and notify the sender immediately. Any views or opinions expressed
> > > in this email are those of the sender only, unless otherwise
> > > stated. All copyright in any of the material in this email is
> > > reserved. All emails, incoming and outgoing, may be recorded and
> > > monitored for legitimate business purposes. We exclude all
> > > liability for any loss or damage arising or resulting from the
> > > receipt, use or transmission of this email to the fullest extent
> > > permitted by law.
> >
> >
>
>
> Este correo electrónico y sus adjuntos son de naturaleza confidencial. A
> no ser que usted sea el destinatario, no puede utilizar, copiar o desvelar
> tanto el mensaje como cualquier información contenida en el mensaje. Si no
> es el destinatario, debe borrar este correo y notificar al remitente
> inmediatamente. Cualquier punto de vista u opinión expresada en este correo
> electrónico son únicamente del remitente, a no ser que se indique lo
> contrario. Todos los derechos de autor en cualquier material de este correo
> son reservados. Todos los correos electrónicos, salientes o entrantes,
> pueden ser grabados y monitorizados para uso legítimo del negocio. Nos
> encontramos exentos de toda responsabilidad ante cualquier perdida o daño
> que surja o resulte de la recepción, uso o transmisión de este correo
> electrónico hasta el máximo permitido por la ley.
>
> This email and any attachment to it are confidential. Unless you are the
> intended recipient, you may not use, copy or disclose either the message or
> any information contained in the message. If you are not the intended
> recipient, you should delete this email and notify the sender immediately.
> Any views or opinions expressed in this email are those of the sender only,
> unless otherwise stated. All copyright in any of the material in this email
> is reserved. All emails, incoming and outgoing, may be recorded and
> monitored for legitimate business purposes. We exclude all liability for
> any loss or damage arising or resulting from the receipt, use or
> transmission of this email to the fullest extent permitted by law.
>


-- 

Arvid Heise | Senior Java Developer

<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng

Reply via email to