[
https://issues.apache.org/jira/browse/KAFKA-5084?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15973498#comment-15973498
]
Randall Hauch commented on KAFKA-5084:
--------------------------------------
A bit more background may be in order. The Debezium MySQL connector can be
configured to ignore binlog events that don't meet a certain criteria, so the
connector is successfully making progress in the binlog event but will only
produce {{SourceRecord}} objects when the binlog events satisfy the criteria.
In discussions with [~criccomini], it seems that some MySQL connectors are
configured such that very few of the binlog events satisfy the criteria, and so
that connector very rarely outputs a {{SourceRecord}} even though large numbers
of binlog events have been successfully processed.
The challenge is that the only way for a connector to supply a source offset to
Kafka Connect is by including it in a {{SourceRecord}}. However, if the
connector has no need to produce a {{SourceRecord}}, the connector can't tell
Kafka Connect that it has made progress and it has a new offset that should be
committed (at the next appropriate time).
> Allow Kafka connect source tasks to commit offsets without messages being sent
> ------------------------------------------------------------------------------
>
> Key: KAFKA-5084
> URL: https://issues.apache.org/jira/browse/KAFKA-5084
> Project: Kafka
> Issue Type: New Feature
> Components: KafkaConnect
> Affects Versions: 0.10.2.0
> Reporter: Chris Riccomini
>
> We are currently running [Debezium|http://debezium.io/] connectors in Kafka
> connect. These connectors consume from MySQL's binlog, and produce into Kafka.
> One of the things we've observed is that some of our Debezium connectors are
> not honoring the {{offset.flush.interval.ms}} setting (which is set to 60
> seconds). Some of our connectors seem to be committing only sporadically. For
> low-volume connectors, the commits seem to happen once every hour or two, and
> sometimes even longer.
> It sounds like the issue is that Kafka connect will only commit source task
> offsets when the source task produces new source records. This is because
> Kafka connect gets the offset to commit from an incoming source record. The
> problem with this approach is that there are (in my opinion) valid reasons to
> want to commit consumed offsets WITHOUT sending any new messages. Taking
> Debezium as an example, there are cases where Debezium consumes messages, but
> filters out messages based on a regex, or filter rule (e.g. table black
> lists). In such a case, Debezium is consuming messages from MySQL's binlog,
> and dropping them before they get to the Kafka connect framework. As such,
> Kafka connect never sees these messages, and doesn't commit any progress.
> This results in several problems:
> # In the event of a failure, the connector could fall WAY back, since the
> last committed offset might be from hours ago, even thought it *has*
> processed all recent messages--it just hasn't sent anything to Kafka.
> # For connectors like Debezium that consume from a source that has a
> *limited* window to fetch messages (MySQL's binlog has time/size based
> retention), you can actually fall off the edge of the binlog because the last
> commit can actually happen farther back than the binlog goes, even though
> Debezium has fetched every single message in the binlog--it just hasn't
> produced anything due to filtering.
> Again, I don't see this as a Debezium-specific issue. I could imagine a
> similar scenario with an [SST-based Cassandra
> source|https://github.com/datamountaineer/stream-reactor/issues/162].
> It would be nice if Kafka connect allowed us a way to commit offsets for
> source tasks even when messages haven't been sent recently. This would allow
> source tasks to log their progress even if they're opting not to send
> messages to Kafka due to filtering or for some other reason.
> (See https://issues.jboss.org/browse/DBZ-220 for more context.)
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)