I'm having big troubles reading data from RabbitMQ.
To understand my troubles, i've simplified my previous code to the extreme :
Pipeline pipeline = Pipeline.create(options);
PCollection<Object> wat = (PCollection<Object>)
pipeline.apply("read_from_rabbit",
RabbitMqIO.read()
.withUri(options.getRabbitMQUri())
.withQueue(options.getRabbitMQQueue())
)
.apply("why not", RabbitMqIO.<RabbitMqMessage>write()
.withQueue("written_in_rabbit")
.withQueueDeclare(true)
.withUri(options.getRabbitMQUri())
)
So if I put a simple message in my input queue, it should be "moved"
(quotes are here since new message is not the original one, but has same
content) into my "written_in_rabbit" message.
Unfortunatly, for reasons I don't understand, the original message stays
in input queue.
It seems to be due to the fact that
RabbitMQCheckpointMark#finalizeCheckpoint() method is never called. So
where is the finalizeCheckpoint method called ?
And how can I understand why this method is never called in my case ?
Thanks