Ewen Cheslack-Postava created KAFKA-5568:
--------------------------------------------
Summary: Transformations that mutate topic-partitions break sink
connectors that manage their own configuration
Key: KAFKA-5568
URL: https://issues.apache.org/jira/browse/KAFKA-5568
Project: Kafka
Issue Type: Bug
Components: KafkaConnect
Affects Versions: 0.11.0.0, 0.10.2.1, 0.10.2.0
Reporter: Ewen Cheslack-Postava
KAFKA-5567 describes how offset commits for sink connectors are broken if a
record's topic-partition is mutated by an SMT, e.g RegexRouter or
TimestampRouter.
This is also a problem for sink connectors that manage their own offsets, i.e.
those that store offsets elsewhere and call SinkTaskContext.rewind(). In this
case, the transformation has already been applied by the time the SinkTask sees
it, so there is no way it could correctly track offsets and call rewind() with
valid values. For example, this would make the offset tracking that Confluent's
HDFS connector does by working with filenames no longer work. Even if they were
stored separately in a file rather than relying on filenames, it still wouldn't
have ever had the correct offsets to write to that file.
There are a couple of options:
1. Decide that this is an acceptable consequence of combining SMTs with sink
connectors and it's a limitation we accept. You can either transform the data
via Kafka Streams instead or accept that you can't do these "routing" type
operations in the sink connector unless it supports it natively. This *might*
not be the wrong choice since we think there are very few connectors that track
their own offsets. In the case of HDFS, we might rarely hit this issue because
it supports its own file/directory partitioning schemes anyway so doing this
via SMTs isn't as necessary there.
2. Try to expose the original record information to the sink connector via the
records. I can think of 2 ways this could be done. The first is to attach the
original record to each SinkRecord. The cost here is relatively high in terms
of memory, especially for sink connectors that need to buffer data. The second
is to add fields to SinkRecords for originalTopic() and originalPartition().
This feels a bit ugly to me but might be the least intrusive change API-wise
and we can guarantee those fields aren't overwritten by not allowing public
constructors to set them.
3. Try to expose the original record information to the sink connector via a
new pre-processing callback. The idea is similar to preCommit, but instead
would happen before any processing occurs. Taken to its logical conclusion this
turns into a sort of interceptor interface (preConversion, preTransformation,
put, and preCommit).
4. Add something to the Context that allows the connector to get back at the
original information. Maybe some sort of IdentityMap<Record, Record>
originalPutRecords() that would let you get a mapping back to the original
records. One nice aspect of this is that the connector can hold onto the
original only if it needs it.
5. A very intrusive change/extension to the SinkTask API that passes in pairs
of <original, transformed> records. Accomplishes the same as 2 but requires
what I think are more complicated changes. Mentioned for completeness.
6. Something else I haven't thought of?
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)