I'm having big troubles reading data from RabbitMQ.

To understand my troubles, i've simplified my previous code to the extreme :


        Pipeline pipeline = Pipeline.create(options);

        PCollection<Object> wat = (PCollection<Object>) pipeline.apply("read_from_rabbit",
                RabbitMqIO.read()
                    .withUri(options.getRabbitMQUri())
                    .withQueue(options.getRabbitMQQueue())
                    )
                .apply("why not", RabbitMqIO.<RabbitMqMessage>write()
                        .withQueue("written_in_rabbit")
                        .withQueueDeclare(true)
                        .withUri(options.getRabbitMQUri())
                        )


So if I put a simple message in my input queue, it should be "moved" (quotes are here since new message is not the original one, but has same content) into my "written_in_rabbit" message.

Unfortunatly, for reasons I don't understand, the original message stays in input queue.

It seems to be due to the fact that RabbitMQCheckpointMark#finalizeCheckpoint() method is never called. So where is the finalizeCheckpoint method called ?

And how can I understand why this method is never called in my case ?

Thanks


Reply via email to