Chris Riccomini created KAFKA-5084:
--------------------------------------
Summary: Allow Kafka connect source tasks to commit offsets
without messages being sent
Key: KAFKA-5084
URL: https://issues.apache.org/jira/browse/KAFKA-5084
Project: Kafka
Issue Type: New Feature
Components: KafkaConnect
Affects Versions: 0.10.2.0
Reporter: Chris Riccomini
We are currently running [Debezium|http://debezium.io/] connectors in Kafka
connect. These connectors consume from MySQL's binlog, and produce into Kafka.
One of the things we've observed is that some of our Debezium connectors are
not honoring the {{offset.flush.interval.ms}} setting (which is set to 60
seconds). Some of our connectors seem to be committing only sporadically. For
low-volume connectors, the commits seem to happen once every hour or two, and
sometimes even longer.
It sounds like the issue is that Kafka connect will only commit source task
offsets when the source task produces new source records. This is because Kafka
connect gets the offset to commit from an incoming source record. The problem
with this approach is that there are (in my opinion) valid reasons to want to
commit consumed offsets WITHOUT sending any new messages. Taking Debezium as an
example, there are cases where Debezium consumes messages, but filters out
messages based on a regex, or filter rule (e.g. table black lists). In such a
case, Debezium is consuming messages from MySQL's binlog, and dropping them
before they get to the Kafka connect framework. As such, Kafka connect never
sees these messages, and doesn't commit any progress. This results in several
problems:
# In the event of a failure, the connector could fall WAY back, since the last
committed offset might be from hours ago, even thought it *has* processed all
recent messages--it just hasn't sent anything to Kafka.
# For connectors like Debezium that consume from a source that has a *limited*
window to fetch messages (MySQL's binlog has time/size based retention), you
can actually fall off the edge of the binlog because the last commit can
actually happen farther back than the binlog goes, even though Debezium has
fetched every single message in the binlog--it just hasn't produced anything
due to filtering.
Again, I don't see this as a Debezium-specific issue. I could imagine a similar
scenario with an [SST-based Cassandra
source|https://github.com/datamountaineer/stream-reactor/issues/162].
It would be nice if Kafka connect allowed us a way to commit offsets for source
tasks even when messages haven't been sent recently. This would allow source
tasks to log their progress even if they're opting not to send messages to
Kafka due to filtering or for some other reason.
(See https://issues.jboss.org/browse/DBZ-220 for more context.)
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)