Hi Mathieu, I think you are right, there is currently no mutual exclusion between `task.commit()` and `task.poll()`. The solution you are thinking of with maintaining the committed offset state yourself seems reasonable, though inconvenient.
It probably makes sense to add a new parameterized `commit()` method carrying the offset map (and possibly deprecate the existing one). Best, Shikhar On Sat, Dec 10, 2016 at 7:57 AM Mathieu Fenniak < mathieu.fenn...@replicon.com> wrote: > Hi Kafka Users, > > I'm looking for a bit of clarification on the documentation for > implementing a SourceTask. I'm reading a replication stream from a > database in my SourceTask, and I'd like to use commit or commitRecord to > advance the other system's replication stream pointer so that it knows I > have successfully read & committed the records to Kafka. This allows the > other system to discard unneeded transaction logs. > > But I'm uncertain how to use either or SourceTask's commit or commitRecord > correctly. > > For commit, the documentation says that it should "Commit the offsets, up > to the offsets that have been returned by poll().". When commit() is > executed, will poll() currently be running on another thread? I assume it > must be, because poll should block, and that would imply you can't commit > the tailing end of some activity. If commit is invoked while poll is being > invoked, I'm concerned that I can't reliably determine where to advance my > replication stream pointer to -- if I store the location at the end of > poll, commit might be invoked while poll is still returning some records, > and advance the pointer further than actually guaranteed. > > commitRecord on the other hand is invoked per-record. The documentation > says "Commit an individual SourceRecord when the callback from the producer > client is received." But if I'm producing to N partitions on different > brokers, I believe that the producer callback is not called in any > guaranteed order, so I can't advance my replication stream pointer to any > single record since an older record being delivered to another partition > may not have been committed. > > The only solution I can see so far is to maintain the replication stream > positions of all the source records that I've returned from poll, and > advance the replication pointer in commitRecord only when the lowest > outstanding record is committed. > > Is there anything I've misunderstood or misinterpreted? > > Thanks, > > Mathieu >