In the future, you will be able to check and give a hard error if checkpointing is disabled yet finalization is requested for portable pipelines: https://github.com/apache/beam/blob/2be7457a4c0b311c3bd784b3f00b425596adeb06/model/pipeline/src/main/proto/beam_runner_api.proto#L382
On Fri, Jun 14, 2019 at 8:17 AM Juan Carlos Garcia <jcgarc...@gmail.com> wrote: > In my opinion, for such crucial behavior i would expect the pipeline to > fail with a clear message stating the reason, like in the same way when you > implement a new Codec and forget to override the verifyDeterministic method > (don't recall the right name of it). > > Just my 2 cents. > > Maximilian Michels <m...@apache.org> schrieb am Fr., 14. Juni 2019, 16:48: > >> This has come up before: https://issues.apache.org/jira/browse/BEAM-4520 >> >> The issue is that checkpoints won't be acknowledged if checkpointing is >> disabled in Flink. We throw a WARN when unbounded sources are used without >> checkpointing. Not all unbounded sources actually need to finalize >> checkpoint marks. >> >> Seeing that this is still an issue, we might want to at least >> periodically acknowledge checkpoint marks when checkpointing is disabled. >> The alternative would be to throw an exception, perhaps with the option to >> override this in case the user knows what he/she does. >> >> Thanks, >> Max >> >> On 14.06.19 10:52, Ismaël Mejía wrote: >> > Is there a JIRA for this ? if this solves an issue to multiple users >> > maybe is worth of integrating the patch. >> > Would you be up to do this Augustin? >> > >> > On Fri, Jun 14, 2019 at 10:35 AM Augustin Lafanechere >> > <augustin.lafanech...@kapten.com> wrote: >> > > >> > > Hello Nicolas, >> > > I also encountered the same problem. >> > > RabbitMQIo indeed acknowledges messages on finalizeCheckpoint calls >> but this was not clear to me on when this method is called because no >> message were ack on pipeline runtime. >> > > I finally decided to implement a patch of the RabbitMqIO to set auto >> ack of received messages, this is fine for my current use case but is not >> the safest way of consuming messages. >> > > >> > > If someone has a cleaner solution I’ll be happy to hear it. >> > > >> > > Augustin >> > > >> > > >> > > >> > > >> > >> Le 13 juin 2019 à 15:47, Nicolas Delsaux <nicolas.dels...@gmx.fr> a >> écrit : >> > >> >> > >> I'm having big troubles reading data from RabbitMQ. >> > >> >> > >> To understand my troubles, i've simplified my previous code to the >> extreme : >> > >> >> > >> >> > >> Pipeline pipeline = Pipeline.create(options); >> > >> >> > >> PCollection<Object> wat = (PCollection<Object>) >> pipeline.apply("read_from_rabbit", >> > >> RabbitMqIO.read() >> > >> .withUri(options.getRabbitMQUri()) >> > >> .withQueue(options.getRabbitMQQueue()) >> > >> ) >> > >> .apply("why not", RabbitMqIO.<RabbitMqMessage>write() >> > >> .withQueue("written_in_rabbit") >> > >> .withQueueDeclare(true) >> > >> .withUri(options.getRabbitMQUri()) >> > >> ) >> > >> >> > >> >> > >> So if I put a simple message in my input queue, it should be "moved" >> (quotes are here since new message is not the original one, but has same >> content) into my "written_in_rabbit" message. >> > >> >> > >> Unfortunatly, for reasons I don't understand, the original message >> stays in input queue. >> > >> >> > >> It seems to be due to the fact that >> RabbitMQCheckpointMark#finalizeCheckpoint() method is never called. So >> where is the finalizeCheckpoint method called ? >> > >> >> > >> And how can I understand why this method is never called in my case ? >> > >> >> > >> Thanks >> > >> >> > >> >> > > >> >>