Hi, we are starting to use Beam with Flink as runner on our applications, and recently we would like to get advantages that Flink checkpoiting provides, but it seems we are not understanding it clearly.
Simplifying, our application does the following: - Read meesages from a couple of Kafka topics - Combine them - Write combination result to a sink (Exasol DB) As application is processing messages using event time, and one of the topics is almost idle, the first time application is started messages are stuck in the combiner because watermark don't advance until we have messages arriving onto idled topic (we know this and is not a problem for us though). The problem is that we've observed, if a checkpoint is triggered when messages are still stuck in the combiner, surprisingly for us, the checkpoint finishes successfully (and offsets committed to Kafka) even messages haven't progressed to the sink yet. Is this expected? The thing is that, if in the future, we make not state compatible changes in application source code, checkpoint taken couldn't be restored. So we would like to start the application without using any checkpoint but without losing data. Problem here would be that data loss would happen because messages stuck in combiner are already committed to Kafka and application would start to read from latest commited offset in Kafka if we don't use any checkpoint, thus those messages are not going to be read from the source again. So, I guess our question is how are you doing in order to not lose data when developing applications, because sooner or later you are going to add breaking changes... For example, we've seen those two errors so far: - After changing an operator name: 2020-05-13 07:23:52,248 ERROR Fatal error occurred in the cluster entrypoint. org.apache.flink.runtime.dispatcher.DispatcherException: Failed to take leadership with session id 00000000-0000-0000-0000-000000000000. ... Caused by: org.apache.flink.runtime.client.JobExecutionException: Could not set up JobManager at org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRu nner.java:152) at org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.crea teJobManagerRunner(DefaultJobManagerRunnerFactory.java:83) at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerR unner$5(Dispatcher.java:375) at org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(Check edSupplier.java:34) ... 7 more Caused by: java.lang.IllegalStateException: Failed to rollback to checkpoint/savepoint hdfs://RTDWLTDEV/data/lake/processing/flink- savepoints/holly-reconciliation-fact/savepoint-90ab28-bcc1f65a0986. Cannot map checkpoint/savepoint state for operator f476451c6210bd2783f36fa331b9da5e to the new program, because the operator is not available in the new program. If you want to allow to skip this, you can set the --allowNonRestoredState option on the CLI. at org.apache.flink.runtime.checkpoint.Checkpoints.loadAndValidateCheckpoi nt(Checkpoints.java:205) ... - After modifying a Java model class involved in a combine: org.apache.flink.runtime.state.BackendBuildingException: Failed when trying to restore heap backend at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build( HeapKeyedStateBackendBuilder.java:116) at org.apache.flink.runtime.state.filesystem.FsStateBackend.createKeyedSta teBackend(FsStateBackend.java:529) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl .lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:291) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attem ptCreateAndRestore(BackendRestorerProcedure.java:142) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.creat eAndRestore(BackendRestorerProcedure.java:121) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl .keyedStatedBackend(StreamTaskStateInitializerImpl.java:307) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl .streamOperatorStateContext(StreamTaskStateInitializerImpl.java:135) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initial izeState(AbstractStreamOperator.java:253) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(Str eamTask.java:881) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.j ava:395) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) at java.lang.Thread.run(Thread.java:748) Caused by: java.io.InvalidClassException: internal.holly.beatrix.wallet.walletfact.model.WalletMetadata; local class incompatible: stream classdesc serialVersionUID = 8366890161513008789, local class serialVersionUID = 174312384610985998 at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:699) Apologies in advance as we are new to Flink, so may be we are missing something obvious here. Thanks Este correo electrónico y sus adjuntos son de naturaleza confidencial. A no ser que usted sea el destinatario, no puede utilizar, copiar o desvelar tanto el mensaje como cualquier información contenida en el mensaje. Si no es el destinatario, debe borrar este correo y notificar al remitente inmediatamente. Cualquier punto de vista u opinión expresada en este correo electrónico son únicamente del remitente, a no ser que se indique lo contrario. Todos los derechos de autor en cualquier material de este correo son reservados. Todos los correos electrónicos, salientes o entrantes, pueden ser grabados y monitorizados para uso legítimo del negocio. Nos encontramos exentos de toda responsabilidad ante cualquier perdida o daño que surja o resulte de la recepción, uso o transmisión de este correo electrónico hasta el máximo permitido por la ley. This email and any attachment to it are confidential. Unless you are the intended recipient, you may not use, copy or disclose either the message or any information contained in the message. If you are not the intended recipient, you should delete this email and notify the sender immediately. Any views or opinions expressed in this email are those of the sender only, unless otherwise stated. All copyright in any of the material in this email is reserved. All emails, incoming and outgoing, may be recorded and monitored for legitimate business purposes. We exclude all liability for any loss or damage arising or resulting from the receipt, use or transmission of this email to the fullest extent permitted by law.