Chris Riccomini created KAFKA-5084:
--------------------------------------

             Summary: Allow Kafka connect source tasks to commit offsets 
without messages being sent
                 Key: KAFKA-5084
                 URL: https://issues.apache.org/jira/browse/KAFKA-5084
             Project: Kafka
          Issue Type: New Feature
          Components: KafkaConnect
    Affects Versions: 0.10.2.0
            Reporter: Chris Riccomini


We are currently running [Debezium|http://debezium.io/] connectors in Kafka 
connect. These connectors consume from MySQL's binlog, and produce into Kafka.

One of the things we've observed is that some of our Debezium connectors are 
not honoring the {{offset.flush.interval.ms}} setting (which is set to 60 
seconds). Some of our connectors seem to be committing only sporadically. For 
low-volume connectors, the commits seem to happen once every hour or two, and 
sometimes even longer.

It sounds like the issue is that Kafka connect will only commit source task 
offsets when the source task produces new source records. This is because Kafka 
connect gets the offset to commit from an incoming source record. The problem 
with this approach is that there are (in my opinion) valid reasons to want to 
commit consumed offsets WITHOUT sending any new messages. Taking Debezium as an 
example, there are cases where Debezium consumes messages, but filters out 
messages based on a regex, or filter rule (e.g. table black lists). In such a 
case, Debezium is consuming messages from MySQL's binlog, and dropping them 
before they get to the Kafka connect framework. As such, Kafka connect never 
sees these messages, and doesn't commit any progress. This results in several 
problems:

# In the event of a failure, the connector could fall WAY back, since the last 
committed offset might be from hours ago, even thought it *has* processed all 
recent messages--it just hasn't sent anything to Kafka.
# For connectors like Debezium that consume from a source that has a *limited* 
window to fetch messages (MySQL's binlog has time/size based retention), you 
can actually fall off the edge of the binlog because the last commit can 
actually happen farther back than the binlog goes, even though Debezium has 
fetched every single message in the binlog--it just hasn't produced anything 
due to filtering.

Again, I don't see this as a Debezium-specific issue. I could imagine a similar 
scenario with an [SST-based Cassandra 
source|https://github.com/datamountaineer/stream-reactor/issues/162].

It would be nice if Kafka connect allowed us a way to commit offsets for source 
tasks even when messages haven't been sent recently. This would allow source 
tasks to log their progress even if they're opting not to send messages to 
Kafka due to filtering or for some other reason.

(See https://issues.jboss.org/browse/DBZ-220 for more context.)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to