On Thu, Oct 12, 2017 at 1:59 AM, Stephane Maarek <
steph...@simplemachines.com.au> wrote:

> Hi,
>
>
>
> I had a look at the Connect Source Worker code and have two questions:
> When a Source Task commits offsets, does it perform compaction /
> optimisation before sending off? E.g.  I read from 1 source partition, and
> I read 1000 messages. Will the offset flush send 1000 messages to the
> offset storage, or just 1 (the last one)?
>

Just the latest. As each message is processed, the offset is put into the
map keyed by the source partition. So multiple records with the
same/equivalent source partition will store only the latest offset. When
the offsets are committed, the contents of the map are written to Kafka.


> I don’t really understand why WorkerSourceTask is trying to flush
> outstanding messages before committing the offsets? (cf
> https://github.com/apache/kafka/blob/trunk/connect/
> runtime/src/main/java/org/apache/kafka/connect/runtime/
> WorkerSourceTask.java#L328 ).
> I would believe that committing the offsets would just commit the offsets
> for the messages we know for sure have been flushed at the moment the
> commit is requested. That would remove one massive timeout from happening
> if the source task pulls a lot of message and the producer is overwhelmed /
> can’t complete the message flush in the 5 seconds of timeout.
>

The offsets are written to a different topic than the records, and that
means they may be on different brokers. If the task does not wait for the
records to be flushed/written to the destination topics, it is possible the
offsets could be written successfully *before* the records have all be
written and acknowledged. If that is the case and the producer fails or
times out, then the task will have written offsets corresponding to records
that haven't yet been actually written. If the task fails, upon restart it
would begin where the offsets specify, and that means you'd have skipped
over some data.

The worker's "offset.flush.timeout.ms" does default to 5 seconds, but if
that's not sufficient then you can increase it.

I think one reason why the offset writer doesn't just flush the offsets for
the records that have been written so far is that the record
acknowledgements may be written out of order relative to what the source
task produced. So even though a source task might produce two records with
offsets A and B, respectively, the producer may actually write the record
with offset B first (e.g., the record with offset A may be retried). If
this were the case and the offsets were flushed before other records were
written, then offset storage would record that offset A is the most
recently written. Upon restart, the connector would re-produce the record
for offset B. The current approach eliminates this, by ensuring that the
most recently generated offset is actually what is written to offset
storage.

Hope this helps.


>
> Thanks a lot for the responses. I may open JIRAs based on the answers of
> the questions, if that would help bring some performance improvements.
>
>
>
> Stephane
>
>

Reply via email to