In the future, you will be able to check and give a hard error if
checkpointing is disabled yet finalization is requested for portable
pipelines:
https://github.com/apache/beam/blob/2be7457a4c0b311c3bd784b3f00b425596adeb06/model/pipeline/src/main/proto/beam_runner_api.proto#L382

On Fri, Jun 14, 2019 at 8:17 AM Juan Carlos Garcia <jcgarc...@gmail.com>
wrote:

> In my opinion, for such crucial behavior i would expect the pipeline to
> fail with a clear message stating the reason, like in the same way when you
> implement a new Codec and forget to override the verifyDeterministic method
> (don't recall the right name of it).
>
> Just my 2 cents.
>
> Maximilian Michels <m...@apache.org> schrieb am Fr., 14. Juni 2019, 16:48:
>
>> This has come up before: https://issues.apache.org/jira/browse/BEAM-4520
>>
>> The issue is that checkpoints won't be acknowledged if checkpointing is
>> disabled in Flink. We throw a WARN when unbounded sources are used without
>> checkpointing. Not all unbounded sources actually need to finalize
>> checkpoint marks.
>>
>> Seeing that this is still an issue, we might want to at least
>> periodically acknowledge checkpoint marks when checkpointing is disabled.
>> The alternative would be to throw an exception, perhaps with the option to
>> override this in case the user knows what he/she does.
>>
>> Thanks,
>> Max
>>
>> On 14.06.19 10:52, Ismaël Mejía wrote:
>> > Is there a JIRA for this ? if this solves an issue to multiple users
>> > maybe is worth of integrating the patch.
>> > Would you be up to do this Augustin?
>> >
>> > On Fri, Jun 14, 2019 at 10:35 AM Augustin Lafanechere
>> > <augustin.lafanech...@kapten.com> wrote:
>> > >
>> > > Hello Nicolas,
>> > > I also encountered the same problem.
>> > > RabbitMQIo indeed acknowledges messages on finalizeCheckpoint calls
>> but this was not clear to me on when this method is called because no
>> message were ack on pipeline runtime.
>> > > I finally decided to implement a patch of the RabbitMqIO to set auto
>> ack of received messages, this is fine for my current use case but is not
>> the safest way of consuming messages.
>> > >
>> > > If someone has a cleaner solution I’ll be happy to hear it.
>> > >
>> > > Augustin
>> > >
>> > >
>> > >
>> > >
>> > >> Le 13 juin 2019 à 15:47, Nicolas Delsaux <nicolas.dels...@gmx.fr> a
>> écrit :
>> > >>
>> > >> I'm having big troubles reading data from RabbitMQ.
>> > >>
>> > >> To understand my troubles, i've simplified my previous code to the
>> extreme :
>> > >>
>> > >>
>> > >>         Pipeline pipeline = Pipeline.create(options);
>> > >>
>> > >>         PCollection<Object> wat = (PCollection<Object>)
>> pipeline.apply("read_from_rabbit",
>> > >>                 RabbitMqIO.read()
>> > >>                     .withUri(options.getRabbitMQUri())
>> > >>                     .withQueue(options.getRabbitMQQueue())
>> > >>                     )
>> > >>                 .apply("why not", RabbitMqIO.<RabbitMqMessage>write()
>> > >>                         .withQueue("written_in_rabbit")
>> > >>                         .withQueueDeclare(true)
>> > >>                         .withUri(options.getRabbitMQUri())
>> > >>                         )
>> > >>
>> > >>
>> > >> So if I put a simple message in my input queue, it should be "moved"
>> (quotes are here since new message is not the original one, but has same
>> content) into my "written_in_rabbit" message.
>> > >>
>> > >> Unfortunatly, for reasons I don't understand, the original message
>> stays in input queue.
>> > >>
>> > >> It seems to be due to the fact that
>> RabbitMQCheckpointMark#finalizeCheckpoint() method is never called. So
>> where is the finalizeCheckpoint method called ?
>> > >>
>> > >> And how can I understand why this method is never called in my case ?
>> > >>
>> > >> Thanks
>> > >>
>> > >>
>> > >
>>
>>

Reply via email to