Hi Ivan, The easiest way is to use some implementation that's already there [1]. I already mentioned Avro and would strongly recommend giving it a go. If you make sure to provide a default value for as many fields as possible, you can always remove them later giving you great flexibility. I can give you more hints if you decide to go this route.
If you want to have a custom implementation, I'd start at looking of one of the simpler implementations like MapSerializerSnapshot [2]. [1] https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshot.html (see known implementing classes). [2] https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializerSnapshot.java On Tue, May 19, 2020 at 10:31 AM Ivan San Jose <isanj...@theworkshop.com> wrote: > Thanks for your complete answer Arvid, we will try to approach all > things you mentioned, but take into account we are using Beam on top of > Flink, so, to be honest, I don't know how could we implement the custom > serialization thing ( > > https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/state/custom_serialization.html#the-typeserializersnapshot-abstraction > ) there. Could you please give us some hints? Thanks > > On Mon, 2020-05-18 at 12:41 +0200, Arvid Heise wrote: > > Hi Ivan, > > > > First let's address the issue with idle partitions. The solution is > > to use a watermark assigner that also emits a watermark with some > > idle timeout [1]. > > > > Now the second question, on why Kafka commits are committed for in- > > flight, checkpointed data. The basic idea is that you are not losing > > data while avoiding replicated output. > > So if you commit offsets only after data has been fully processed, > > upon crash the same data point would be reprocessed jointly with the > > restored in-flight data, so you get duplicate messages in your > > system. > > To avoid duplicates data needs to be more or less completely flushed > > out the system before a checkpoint is performed. That would produce a > > huge downtime. > > Instead, we assume that we can always resume from the checkpoints. > > > > Which leads to the last question on what to do when your pipeline has > > breaking changes. > > First strategy is to avoid breaking changes as much as possible. > > State could for example also be stored as Avro to allow schema > > evolution. Minor things like renamed operators will not happen with a > > bit more expertise. > > Second strategy is to use state migration [2]. Alternatively, you can > > manually convert state with state processor API [3]. > > Last option is to do a full reprocessing of data. This can be done on > > a non-production cluster and then a savepoint can be used to > > bootstrap the production cluster quickly. This option needs to be > > available anyways for the case that you find any logic error. But of > > course, this option has the highest implications (may need to purge > > sink beforehand). > > > > [1] > > > https://github.com/ververica/flink-training-troubleshooting/blob/master/src/main/java/com/ververica/flinktraining/solutions/troubleshoot/TroubledStreamingJobSolution2.java#L129-L187 > > [2] > > > https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/state/custom_serialization.html#the-typeserializersnapshot-abstraction > > [3] > > > https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html > > > > On Fri, May 15, 2020 at 2:58 PM Ivan San Jose < > > isanj...@theworkshop.com> wrote: > > > Hi, we are starting to use Beam with Flink as runner on our > > > applications, and recently we would like to get advantages that > > > Flink > > > checkpoiting provides, but it seems we are not understanding it > > > clearly. > > > > > > Simplifying, our application does the following: > > > - Read meesages from a couple of Kafka topics > > > - Combine them > > > - Write combination result to a sink (Exasol DB) > > > > > > As application is processing messages using event time, and one of > > > the > > > topics is almost idle, the first time application is started > > > messages > > > are stuck in the combiner because watermark don't advance until we > > > have > > > messages arriving onto idled topic (we know this and is not a > > > problem > > > for us though). > > > > > > The problem is that we've observed, if a checkpoint is triggered > > > when > > > messages are still stuck in the combiner, surprisingly for us, the > > > checkpoint finishes successfully (and offsets committed to Kafka) > > > even > > > messages haven't progressed to the sink yet. Is this expected? > > > > > > The thing is that, if in the future, we make not state compatible > > > changes in application source code, checkpoint taken couldn't be > > > restored. So we would like to start the application without using > > > any > > > checkpoint but without losing data. > > > Problem here would be that data loss would happen because messages > > > stuck in combiner are already committed to Kafka and application > > > would > > > start to read from latest commited offset in Kafka if we don't use > > > any > > > checkpoint, thus those messages are not going to be read from the > > > source again. > > > > > > So, I guess our question is how are you doing in order to not lose > > > data > > > when developing applications, because sooner or later you are going > > > to > > > add breaking changes... > > > > > > For example, we've seen those two errors so far: > > > - After changing an operator name: > > > > > > 2020-05-13 07:23:52,248 ERROR Fatal error occurred in the cluster > > > entrypoint. > > > org.apache.flink.runtime.dispatcher.DispatcherException: Failed to > > > take > > > leadership with session id 00000000-0000-0000-0000-000000000000. > > > ... > > > Caused by: org.apache.flink.runtime.client.JobExecutionException: > > > Could > > > not set up JobManager > > > at > > > org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManag > > > erRu > > > nner.java:152) > > > at > > > org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory. > > > crea > > > teJobManagerRunner(DefaultJobManagerRunnerFactory.java:83) > > > at > > > org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobMana > > > gerR > > > unner$5(Dispatcher.java:375) > > > at > > > org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(C > > > heck > > > edSupplier.java:34) > > > ... 7 more > > > Caused by: java.lang.IllegalStateException: Failed to rollback to > > > checkpoint/savepoint hdfs://RTDWLTDEV/data/lake/processing/flink- > > > savepoints/holly-reconciliation-fact/savepoint-90ab28-bcc1f65a0986. > > > Cannot map checkpoint/savepoint state for operator > > > f476451c6210bd2783f36fa331b9da5e to the new program, because the > > > operator is not available in the new program. If you want to allow > > > to > > > skip this, you can set the --allowNonRestoredState option on the > > > CLI. > > > at > > > org.apache.flink.runtime.checkpoint.Checkpoints.loadAndValidateChec > > > kpoi > > > nt(Checkpoints.java:205) > > > ... > > > > > > - After modifying a Java model class involved in a combine: > > > org.apache.flink.runtime.state.BackendBuildingException: Failed > > > when > > > trying to restore heap backend > > > at > > > org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.bu > > > ild( > > > HeapKeyedStateBackendBuilder.java:116) > > > at > > > org.apache.flink.runtime.state.filesystem.FsStateBackend.createKeye > > > dSta > > > teBackend(FsStateBackend.java:529) > > > at > > > org.apache.flink.streaming.api.operators.StreamTaskStateInitializer > > > Impl > > > .lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:29 > > > 1) > > > at > > > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.a > > > ttem > > > ptCreateAndRestore(BackendRestorerProcedure.java:142) > > > at > > > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.c > > > reat > > > eAndRestore(BackendRestorerProcedure.java:121) > > > at > > > org.apache.flink.streaming.api.operators.StreamTaskStateInitializer > > > Impl > > > .keyedStatedBackend(StreamTaskStateInitializerImpl.java:307) > > > at > > > org.apache.flink.streaming.api.operators.StreamTaskStateInitializer > > > Impl > > > .streamOperatorStateContext(StreamTaskStateInitializerImpl.java:135 > > > ) > > > at > > > org.apache.flink.streaming.api.operators.AbstractStreamOperator.ini > > > tial > > > izeState(AbstractStreamOperator.java:253) > > > at > > > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState > > > (Str > > > eamTask.java:881) > > > at > > > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTa > > > sk.j > > > ava:395) > > > at > > > org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) > > > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) > > > at java.lang.Thread.run(Thread.java:748) > > > Caused by: java.io.InvalidClassException: > > > internal.holly.beatrix.wallet.walletfact.model.WalletMetadata; > > > local > > > class incompatible: stream classdesc serialVersionUID = > > > 8366890161513008789, local class serialVersionUID = > > > 174312384610985998 > > > at > > > java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:699) > > > > > > > > > Apologies in advance as we are new to Flink, so may be we are > > > missing > > > something obvious here. > > > > > > Thanks > > > > > > > > > Este correo electrónico y sus adjuntos son de naturaleza > > > confidencial. A no ser que usted sea el destinatario, no puede > > > utilizar, copiar o desvelar tanto el mensaje como cualquier > > > información contenida en el mensaje. Si no es el destinatario, debe > > > borrar este correo y notificar al remitente inmediatamente. > > > Cualquier punto de vista u opinión expresada en este correo > > > electrónico son únicamente del remitente, a no ser que se indique > > > lo contrario. Todos los derechos de autor en cualquier material de > > > este correo son reservados. Todos los correos electrónicos, > > > salientes o entrantes, pueden ser grabados y monitorizados para uso > > > legítimo del negocio. Nos encontramos exentos de toda > > > responsabilidad ante cualquier perdida o daño que surja o resulte > > > de la recepción, uso o transmisión de este correo electrónico hasta > > > el máximo permitido por la ley. > > > > > > This email and any attachment to it are confidential. Unless you > > > are the intended recipient, you may not use, copy or disclose > > > either the message or any information contained in the message. If > > > you are not the intended recipient, you should delete this email > > > and notify the sender immediately. Any views or opinions expressed > > > in this email are those of the sender only, unless otherwise > > > stated. All copyright in any of the material in this email is > > > reserved. All emails, incoming and outgoing, may be recorded and > > > monitored for legitimate business purposes. We exclude all > > > liability for any loss or damage arising or resulting from the > > > receipt, use or transmission of this email to the fullest extent > > > permitted by law. > > > > > > > Este correo electrónico y sus adjuntos son de naturaleza confidencial. A > no ser que usted sea el destinatario, no puede utilizar, copiar o desvelar > tanto el mensaje como cualquier información contenida en el mensaje. Si no > es el destinatario, debe borrar este correo y notificar al remitente > inmediatamente. Cualquier punto de vista u opinión expresada en este correo > electrónico son únicamente del remitente, a no ser que se indique lo > contrario. Todos los derechos de autor en cualquier material de este correo > son reservados. Todos los correos electrónicos, salientes o entrantes, > pueden ser grabados y monitorizados para uso legítimo del negocio. Nos > encontramos exentos de toda responsabilidad ante cualquier perdida o daño > que surja o resulte de la recepción, uso o transmisión de este correo > electrónico hasta el máximo permitido por la ley. > > This email and any attachment to it are confidential. Unless you are the > intended recipient, you may not use, copy or disclose either the message or > any information contained in the message. If you are not the intended > recipient, you should delete this email and notify the sender immediately. > Any views or opinions expressed in this email are those of the sender only, > unless otherwise stated. All copyright in any of the material in this email > is reserved. All emails, incoming and outgoing, may be recorded and > monitored for legitimate business purposes. We exclude all liability for > any loss or damage arising or resulting from the receipt, use or > transmission of this email to the fullest extent permitted by law. > -- Arvid Heise | Senior Java Developer <https://www.ververica.com/> Follow us @VervericaData -- Join Flink Forward <https://flink-forward.org/> - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Ververica GmbH Registered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng