Hi Mathieu,

I think you are right, there is currently no mutual exclusion between
`task.commit()` and `task.poll()`. The solution you are thinking of with
maintaining the committed offset state yourself seems reasonable, though
inconvenient.

It probably makes sense to add a new parameterized `commit()` method
carrying the offset map (and possibly deprecate the existing one).

Best,

Shikhar

On Sat, Dec 10, 2016 at 7:57 AM Mathieu Fenniak <
mathieu.fenn...@replicon.com> wrote:

> Hi Kafka Users,
>
> I'm looking for a bit of clarification on the documentation for
> implementing a SourceTask.  I'm reading a replication stream from a
> database in my SourceTask, and I'd like to use commit or commitRecord to
> advance the other system's replication stream pointer so that it knows I
> have successfully read & committed the records to Kafka.  This allows the
> other system to discard unneeded transaction logs.
>
> But I'm uncertain how to use either or SourceTask's commit or commitRecord
> correctly.
>
> For commit, the documentation says that it should "Commit the offsets, up
> to the offsets that have been returned by poll().".  When commit() is
> executed, will poll() currently be running on another thread?  I assume it
> must be, because poll should block, and that would imply you can't commit
> the tailing end of some activity.  If commit is invoked while poll is being
> invoked, I'm concerned that I can't reliably determine where to advance my
> replication stream pointer to -- if I store the location at the end of
> poll, commit might be invoked while poll is still returning some records,
> and advance the pointer further than actually guaranteed.
>
> commitRecord on the other hand is invoked per-record.  The documentation
> says "Commit an individual SourceRecord when the callback from the producer
> client is received."  But if I'm producing to N partitions on different
> brokers, I believe that the producer callback is not called in any
> guaranteed order, so I can't advance my replication stream pointer to any
> single record since an older record being delivered to another partition
> may not have been committed.
>
> The only solution I can see so far is to maintain the replication stream
> positions of all the source records that I've returned from poll, and
> advance the replication pointer in commitRecord only when the lowest
> outstanding record is committed.
>
> Is there anything I've misunderstood or misinterpreted?
>
> Thanks,
>
> Mathieu
>

Reply via email to