Hi Kafka Users,

I'm looking for a bit of clarification on the documentation for
implementing a SourceTask.  I'm reading a replication stream from a
database in my SourceTask, and I'd like to use commit or commitRecord to
advance the other system's replication stream pointer so that it knows I
have successfully read & committed the records to Kafka.  This allows the
other system to discard unneeded transaction logs.

But I'm uncertain how to use either or SourceTask's commit or commitRecord
correctly.

For commit, the documentation says that it should "Commit the offsets, up
to the offsets that have been returned by poll().".  When commit() is
executed, will poll() currently be running on another thread?  I assume it
must be, because poll should block, and that would imply you can't commit
the tailing end of some activity.  If commit is invoked while poll is being
invoked, I'm concerned that I can't reliably determine where to advance my
replication stream pointer to -- if I store the location at the end of
poll, commit might be invoked while poll is still returning some records,
and advance the pointer further than actually guaranteed.

commitRecord on the other hand is invoked per-record.  The documentation
says "Commit an individual SourceRecord when the callback from the producer
client is received."  But if I'm producing to N partitions on different
brokers, I believe that the producer callback is not called in any
guaranteed order, so I can't advance my replication stream pointer to any
single record since an older record being delivered to another partition
may not have been committed.

The only solution I can see so far is to maintain the replication stream
positions of all the source records that I've returned from poll, and
advance the replication pointer in commitRecord only when the lowest
outstanding record is committed.

Is there anything I've misunderstood or misinterpreted?

Thanks,

Mathieu

Reply via email to