Hi Ivan, I'm fearing that only a few mailing list users have actually deeper Beam experience. I only used it briefly 3 years ago. Most users here are using Flink directly to avoid these kinds of double-abstraction issues.
It might be better to switch to the Beam mailing list if you have Beam-specific questions including how the Flink runner actually translates the Beam program to Flink. On Tue, May 19, 2020 at 11:38 AM Ivan San Jose <isanj...@theworkshop.com> wrote: > Actually I'm also thinking about how Beam coders are related with > runner's serialization... I mean, on Beam you specify a coder per each > Java type in order to Beam be able to serialize/deserialize that type, > but then, is the runner used under the hood serializing/deserializing > again the result, so that is doing a double serialization, does it make > sense? Or how does it work? > > On Tue, 2020-05-19 at 08:54 +0000, Ivan San Jose wrote: > > Yep, sorry if I'm bothering you but I think I'm still not getting > > this, > > how could I tell Beam to tell Flink to use that serializer instead of > > Java standard one, because I think Beam is abstracting us from Flink > > checkpointing mechanism, so I'm afraid that if we use Flink API > > directly we might break other things that Beam is hidding for us... > > > > On Tue, 2020-05-19 at 10:44 +0200, Arvid Heise wrote: > > > Hi Ivan, > > > > > > The easiest way is to use some implementation that's already there > > > [1]. I already mentioned Avro and would strongly recommend giving > > > it > > > a go. If you make sure to provide a default value for as many > > > fields > > > as possible, you can always remove them later giving you great > > > flexibility. I can give you more hints if you decide to go this > > > route. > > > > > > If you want to have a custom implementation, I'd start at looking > > > of > > > one of the simpler implementations like MapSerializerSnapshot [2]. > > > > > > [1] > > > > https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshot.html > > > (see known implementing classes). > > > [2] > > > > https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializerSnapshot.java > > > > > > On Tue, May 19, 2020 at 10:31 AM Ivan San Jose < > > > isanj...@theworkshop.com> wrote: > > > > Thanks for your complete answer Arvid, we will try to approach > > > > all > > > > things you mentioned, but take into account we are using Beam on > > > > top of > > > > Flink, so, to be honest, I don't know how could we implement the > > > > custom > > > > serialization thing ( > > > > > https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/state/custom_serialization.html#the-typeserializersnapshot-abstraction > > > > ) there. Could you please give us some hints? Thanks > > > > > > > > On Mon, 2020-05-18 at 12:41 +0200, Arvid Heise wrote: > > > > > Hi Ivan, > > > > > > > > > > First let's address the issue with idle partitions. The > > > > > solution > > > > is > > > > > to use a watermark assigner that also emits a watermark with > > > > > some > > > > > idle timeout [1]. > > > > > > > > > > Now the second question, on why Kafka commits are committed for > > > > in- > > > > > flight, checkpointed data. The basic idea is that you are not > > > > losing > > > > > data while avoiding replicated output. > > > > > So if you commit offsets only after data has been fully > > > > processed, > > > > > upon crash the same data point would be reprocessed jointly > > > > > with > > > > the > > > > > restored in-flight data, so you get duplicate messages in your > > > > > system. > > > > > To avoid duplicates data needs to be more or less completely > > > > flushed > > > > > out the system before a checkpoint is performed. That would > > > > produce a > > > > > huge downtime. > > > > > Instead, we assume that we can always resume from the > > > > checkpoints. > > > > > Which leads to the last question on what to do when your > > > > > pipeline > > > > has > > > > > breaking changes. > > > > > First strategy is to avoid breaking changes as much as > > > > > possible. > > > > > State could for example also be stored as Avro to allow schema > > > > > evolution. Minor things like renamed operators will not happen > > > > with a > > > > > bit more expertise. > > > > > Second strategy is to use state migration [2]. Alternatively, > > > > > you > > > > can > > > > > manually convert state with state processor API [3]. > > > > > Last option is to do a full reprocessing of data. This can be > > > > done on > > > > > a non-production cluster and then a savepoint can be used to > > > > > bootstrap the production cluster quickly. This option needs to > > > > > be > > > > > available anyways for the case that you find any logic error. > > > > > But > > > > of > > > > > course, this option has the highest implications (may need to > > > > purge > > > > > sink beforehand). > > > > > > > > > > [1] > > > > > > > > > > https://github.com/ververica/flink-training-troubleshooting/blob/master/src/main/java/com/ververica/flinktraining/solutions/troubleshoot/TroubledStreamingJobSolution2.java#L129-L187 > > > > > [2] > > > > > > > > > > https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/state/custom_serialization.html#the-typeserializersnapshot-abstraction > > > > > [3] > > > > > > > > > > https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html > > > > > On Fri, May 15, 2020 at 2:58 PM Ivan San Jose < > > > > > isanj...@theworkshop.com> wrote: > > > > > > Hi, we are starting to use Beam with Flink as runner on our > > > > > > applications, and recently we would like to get advantages > > > > > > that > > > > > > Flink > > > > > > checkpoiting provides, but it seems we are not understanding > > > > > > it > > > > > > clearly. > > > > > > > > > > > > Simplifying, our application does the following: > > > > > > - Read meesages from a couple of Kafka topics > > > > > > - Combine them > > > > > > - Write combination result to a sink (Exasol DB) > > > > > > > > > > > > As application is processing messages using event time, and > > > > > > one > > > > of > > > > > > the > > > > > > topics is almost idle, the first time application is started > > > > > > messages > > > > > > are stuck in the combiner because watermark don't advance > > > > > > until > > > > we > > > > > > have > > > > > > messages arriving onto idled topic (we know this and is not a > > > > > > problem > > > > > > for us though). > > > > > > > > > > > > The problem is that we've observed, if a checkpoint is > > > > triggered > > > > > > when > > > > > > messages are still stuck in the combiner, surprisingly for > > > > > > us, > > > > the > > > > > > checkpoint finishes successfully (and offsets committed to > > > > Kafka) > > > > > > even > > > > > > messages haven't progressed to the sink yet. Is this > > > > > > expected? > > > > > > > > > > > > The thing is that, if in the future, we make not state > > > > compatible > > > > > > changes in application source code, checkpoint taken couldn't > > > > be > > > > > > restored. So we would like to start the application without > > > > using > > > > > > any > > > > > > checkpoint but without losing data. > > > > > > Problem here would be that data loss would happen because > > > > messages > > > > > > stuck in combiner are already committed to Kafka and > > > > application > > > > > > would > > > > > > start to read from latest commited offset in Kafka if we > > > > > > don't > > > > use > > > > > > any > > > > > > checkpoint, thus those messages are not going to be read from > > > > the > > > > > > source again. > > > > > > > > > > > > So, I guess our question is how are you doing in order to not > > > > lose > > > > > > data > > > > > > when developing applications, because sooner or later you are > > > > going > > > > > > to > > > > > > add breaking changes... > > > > > > > > > > > > For example, we've seen those two errors so far: > > > > > > - After changing an operator name: > > > > > > > > > > > > 2020-05-13 07:23:52,248 ERROR Fatal error occurred in the > > > > cluster > > > > > > entrypoint. > > > > > > org.apache.flink.runtime.dispatcher.DispatcherException: > > > > > > Failed > > > > to > > > > > > take > > > > > > leadership with session id 00000000-0000-0000-0000- > > > > 000000000000. > > > > > > ... > > > > > > Caused by: > > > > org.apache.flink.runtime.client.JobExecutionException: > > > > > > Could > > > > > > not set up JobManager > > > > > > at > > > > > > > > > > org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobMan > > > > ag > > > > > > erRu > > > > > > nner.java:152) > > > > > > at > > > > > > > > > > org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactor > > > > y. > > > > > > crea > > > > > > teJobManagerRunner(DefaultJobManagerRunnerFactory.java:83) > > > > > > at > > > > > > > > > > org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobMa > > > > na > > > > > > gerR > > > > > > unner$5(Dispatcher.java:375) > > > > > > at > > > > > > > > > > org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0 > > > > (C > > > > > > heck > > > > > > edSupplier.java:34) > > > > > > ... 7 more > > > > > > Caused by: java.lang.IllegalStateException: Failed to > > > > > > rollback > > > > to > > > > > > checkpoint/savepoint > > > > hdfs://RTDWLTDEV/data/lake/processing/flink- > > > > > > savepoints/holly-reconciliation-fact/savepoint-90ab28- > > > > bcc1f65a0986. > > > > > > Cannot map checkpoint/savepoint state for operator > > > > > > f476451c6210bd2783f36fa331b9da5e to the new program, because > > > > the > > > > > > operator is not available in the new program. If you want to > > > > allow > > > > > > to > > > > > > skip this, you can set the --allowNonRestoredState option on > > > > the > > > > > > CLI. > > > > > > at > > > > > > > > > > org.apache.flink.runtime.checkpoint.Checkpoints.loadAndValidateCh > > > > ec > > > > > > kpoi > > > > > > nt(Checkpoints.java:205) > > > > > > ... > > > > > > > > > > > > - After modifying a Java model class involved in a combine: > > > > > > org.apache.flink.runtime.state.BackendBuildingException: > > > > > > Failed > > > > > > when > > > > > > trying to restore heap backend > > > > > > at > > > > > > > > > > org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder. > > > > bu > > > > > > ild( > > > > > > HeapKeyedStateBackendBuilder.java:116) > > > > > > at > > > > > > > > > > org.apache.flink.runtime.state.filesystem.FsStateBackend.createKe > > > > ye > > > > > > dSta > > > > > > teBackend(FsStateBackend.java:529) > > > > > > at > > > > > > > > > > org.apache.flink.streaming.api.operators.StreamTaskStateInitializ > > > > er > > > > > > Impl > > > > > > > > > > .lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java: > > > > 29 > > > > > > 1) > > > > > > at > > > > > > > > > > org.apache.flink.streaming.api.operators.BackendRestorerProcedure > > > > .a > > > > > > ttem > > > > > > ptCreateAndRestore(BackendRestorerProcedure.java:142) > > > > > > at > > > > > > > > > > org.apache.flink.streaming.api.operators.BackendRestorerProcedure > > > > .c > > > > > > reat > > > > > > eAndRestore(BackendRestorerProcedure.java:121) > > > > > > at > > > > > > > > > > org.apache.flink.streaming.api.operators.StreamTaskStateInitializ > > > > er > > > > > > Impl > > > > > > .keyedStatedBackend(StreamTaskStateInitializerImpl.java:307) > > > > > > at > > > > > > > > > > org.apache.flink.streaming.api.operators.StreamTaskStateInitializ > > > > er > > > > > > Impl > > > > > > > > > > .streamOperatorStateContext(StreamTaskStateInitializerImpl.java:1 > > > > 35 > > > > > > ) > > > > > > at > > > > > > > > > > org.apache.flink.streaming.api.operators.AbstractStreamOperator.i > > > > ni > > > > > > tial > > > > > > izeState(AbstractStreamOperator.java:253) > > > > > > at > > > > > > > > > > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeSta > > > > te > > > > > > (Str > > > > > > eamTask.java:881) > > > > > > at > > > > > > > > > > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(Stream > > > > Ta > > > > > > sk.j > > > > > > ava:395) > > > > > > at > > > > > > org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705 > > > > > > ) > > > > > > at > > > > org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) > > > > > > at java.lang.Thread.run(Thread.java:748) > > > > > > Caused by: java.io.InvalidClassException: > > > > > > internal.holly.beatrix.wallet.walletfact.model.WalletMetadata > > > > > > ; > > > > > > local > > > > > > class incompatible: stream classdesc serialVersionUID = > > > > > > 8366890161513008789, local class serialVersionUID = > > > > > > 174312384610985998 > > > > > > at > > > > > > > > > > java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:699 > > > > ) > > > > > > > > > > > > Apologies in advance as we are new to Flink, so may be we are > > > > > > missing > > > > > > something obvious here. > > > > > > > > > > > > Thanks > > > > > > > > > > > > > > > > > > Este correo electrónico y sus adjuntos son de naturaleza > > > > > > confidencial. A no ser que usted sea el destinatario, no > > > > > > puede > > > > > > utilizar, copiar o desvelar tanto el mensaje como cualquier > > > > > > información contenida en el mensaje. Si no es el > > > > > > destinatario, > > > > debe > > > > > > borrar este correo y notificar al remitente inmediatamente. > > > > > > Cualquier punto de vista u opinión expresada en este correo > > > > > > electrónico son únicamente del remitente, a no ser que se > > > > indique > > > > > > lo contrario. Todos los derechos de autor en cualquier > > > > > > material > > > > de > > > > > > este correo son reservados. Todos los correos electrónicos, > > > > > > salientes o entrantes, pueden ser grabados y monitorizados > > > > > > para > > > > uso > > > > > > legítimo del negocio. Nos encontramos exentos de toda > > > > > > responsabilidad ante cualquier perdida o daño que surja o > > > > resulte > > > > > > de la recepción, uso o transmisión de este correo electrónico > > > > hasta > > > > > > el máximo permitido por la ley. > > > > > > > > > > > > This email and any attachment to it are confidential. Unless > > > > you > > > > > > are the intended recipient, you may not use, copy or disclose > > > > > > either the message or any information contained in the > > > > > > message. > > > > If > > > > > > you are not the intended recipient, you should delete this > > > > email > > > > > > and notify the sender immediately. Any views or opinions > > > > expressed > > > > > > in this email are those of the sender only, unless otherwise > > > > > > stated. All copyright in any of the material in this email is > > > > > > reserved. All emails, incoming and outgoing, may be recorded > > > > and > > > > > > monitored for legitimate business purposes. We exclude all > > > > > > liability for any loss or damage arising or resulting from > > > > > > the > > > > > > receipt, use or transmission of this email to the fullest > > > > extent > > > > > > permitted by law. > > > > > > > > Este correo electrónico y sus adjuntos son de naturaleza > > > > confidencial. A no ser que usted sea el destinatario, no puede > > > > utilizar, copiar o desvelar tanto el mensaje como cualquier > > > > información contenida en el mensaje. Si no es el destinatario, > > > > debe > > > > borrar este correo y notificar al remitente inmediatamente. > > > > Cualquier punto de vista u opinión expresada en este correo > > > > electrónico son únicamente del remitente, a no ser que se indique > > > > lo contrario. Todos los derechos de autor en cualquier material > > > > de > > > > este correo son reservados. Todos los correos electrónicos, > > > > salientes o entrantes, pueden ser grabados y monitorizados para > > > > uso > > > > legítimo del negocio. Nos encontramos exentos de toda > > > > responsabilidad ante cualquier perdida o daño que surja o resulte > > > > de la recepción, uso o transmisión de este correo electrónico > > > > hasta > > > > el máximo permitido por la ley. > > > > > > > > This email and any attachment to it are confidential. Unless you > > > > are the intended recipient, you may not use, copy or disclose > > > > either the message or any information contained in the message. > > > > If > > > > you are not the intended recipient, you should delete this email > > > > and notify the sender immediately. Any views or opinions > > > > expressed > > > > in this email are those of the sender only, unless otherwise > > > > stated. All copyright in any of the material in this email is > > > > reserved. All emails, incoming and outgoing, may be recorded and > > > > monitored for legitimate business purposes. We exclude all > > > > liability for any loss or damage arising or resulting from the > > > > receipt, use or transmission of this email to the fullest extent > > > > permitted by law. > > > > Este correo electrónico y sus adjuntos son de naturaleza > > confidencial. A no ser que usted sea el destinatario, no puede > > utilizar, copiar o desvelar tanto el mensaje como cualquier > > información contenida en el mensaje. Si no es el destinatario, debe > > borrar este correo y notificar al remitente inmediatamente. Cualquier > > punto de vista u opinión expresada en este correo electrónico son > > únicamente del remitente, a no ser que se indique lo contrario. Todos > > los derechos de autor en cualquier material de este correo son > > reservados. Todos los correos electrónicos, salientes o entrantes, > > pueden ser grabados y monitorizados para uso legítimo del negocio. > > Nos encontramos exentos de toda responsabilidad ante cualquier > > perdida o daño que surja o resulte de la recepción, uso o transmisión > > de este correo electrónico hasta el máximo permitido por la ley. > > > > This email and any attachment to it are confidential. Unless you are > > the intended recipient, you may not use, copy or disclose either the > > message or any information contained in the message. If you are not > > the intended recipient, you should delete this email and notify the > > sender immediately. Any views or opinions expressed in this email are > > those of the sender only, unless otherwise stated. All copyright in > > any of the material in this email is reserved. All emails, incoming > > and outgoing, may be recorded and monitored for legitimate business > > purposes. We exclude all liability for any loss or damage arising or > > resulting from the receipt, use or transmission of this email to the > > fullest extent permitted by law. > > > Este correo electrónico y sus adjuntos son de naturaleza confidencial. A > no ser que usted sea el destinatario, no puede utilizar, copiar o desvelar > tanto el mensaje como cualquier información contenida en el mensaje. Si no > es el destinatario, debe borrar este correo y notificar al remitente > inmediatamente. Cualquier punto de vista u opinión expresada en este correo > electrónico son únicamente del remitente, a no ser que se indique lo > contrario. Todos los derechos de autor en cualquier material de este correo > son reservados. Todos los correos electrónicos, salientes o entrantes, > pueden ser grabados y monitorizados para uso legítimo del negocio. Nos > encontramos exentos de toda responsabilidad ante cualquier perdida o daño > que surja o resulte de la recepción, uso o transmisión de este correo > electrónico hasta el máximo permitido por la ley. > > This email and any attachment to it are confidential. Unless you are the > intended recipient, you may not use, copy or disclose either the message or > any information contained in the message. If you are not the intended > recipient, you should delete this email and notify the sender immediately. > Any views or opinions expressed in this email are those of the sender only, > unless otherwise stated. All copyright in any of the material in this email > is reserved. All emails, incoming and outgoing, may be recorded and > monitored for legitimate business purposes. We exclude all liability for > any loss or damage arising or resulting from the receipt, use or > transmission of this email to the fullest extent permitted by law. > -- Arvid Heise | Senior Java Developer <https://www.ververica.com/> Follow us @VervericaData -- Join Flink Forward <https://flink-forward.org/> - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Ververica GmbH Registered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng