[ https://issues.apache.org/jira/browse/FLINK-20628?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Martijn Visser reassigned FLINK-20628: -------------------------------------- Assignee: RocMarshal > Port RabbitMQ Sources to FLIP-27 API > ------------------------------------ > > Key: FLINK-20628 > URL: https://issues.apache.org/jira/browse/FLINK-20628 > Project: Flink > Issue Type: Improvement > Components: Connectors/ RabbitMQ > Reporter: Jan Westphal > Assignee: RocMarshal > Priority: Minor > Labels: auto-deprioritized-major, pull-request-available > > *Structure* > The new RabbitMQ Source will have three components: > * RabbitMQ enumerator that receives one RabbitMQ Channel Config. > * RabbitMQ splits contain the RabbitMQ Channel Config > * RabbitMQ Readers which subscribe to the same RabbitMQ channel and receive > the messages (automatically load balanced by RabbitMQ). > *Checkpointing Enumerators* > The enumerator only needs to checkpoint the RabbitMQ channel config since the > continuous discovery of new unread/unhandled messages is taken care of by the > subscribed RabbitMQ readers and RabbitMQ itself. > *Checkpointing Readers* > The new RabbitMQ Source needs to ensure that every reader can be checkpointed. > Since RabbitMQ is non-persistent and cannot be read by offset, a combined > usage of checkpoints and message acknowledgments is necessary. Until a > received message is checkpointed by a reader, it will stay in an > un-acknowledge state. As soon as the checkpoint is created, the messages from > the last checkpoint can be acknowledged as handled against RabbitMQ and thus > will be deleted only then. Messages need to be acknowledged one by one as > messages are handled by each SourceReader individually. > When deserializing the messages we will make use of the implementation in the > existing RabbitMQ Source. > *Message Delivery Guarantees* > Unacknowledged messages of a reader will be redelivered by RabbitMQ > automatically to other consumers of the same channel if the reader goes down. > > This Source is going to only support at-least-once as this is the default > RabbitMQ behavior and thus everything else would require changes to RabbitMQ > itself or would impair the idea of parallelizing SourceReaders. -- This message was sent by Atlassian Jira (v8.20.10#820010)