flatMap() / supporting 1->n feels nice and general since filtering is just
the case of going from 1->0

I'm not sure why we'd need to do any more granular offset tracking (like
sub-offsets) for source connectors: after transformation of a given record
to n records, all of those n should map to same offset of the source
partition. The only thing to take care of here would be that we don't
commit a source offset while there are still records with that offset that
haven't been flushed to Kafka, but this is in the control of the connect
runtime.

I see your point for sink connectors, though. Implementors can currently
assume 1:1ness of a record to its Kafka coordinates (topic, partition,
offset).

On Thu, Jul 21, 2016 at 10:57 PM Ewen Cheslack-Postava <e...@confluent.io>
wrote:

> Jun, The problem with it not being 1-1 is that Connect relies heavily on
> offsets, so we'd need to be able to track offsets at this finer
> granularity. Filtering is ok, but flatMap isn't. If you convert one message
> to many, what are the offsets for the new messages? One possibility would
> be to assume that transformations are deterministic and then "enhance" the
> offsets with an extra integer field that indicates its position in the
> subset. For sources this seems attractive since you can then reset to
> whatever the connector-provided offset is and then filter out any of the
> "sub"-messages that are earlier than the recorded "sub"-offset. But this
> might not actually work for sources since a) the offsets will include extra
> fields that the connector doesn't expect (might be ok since we handle that
> data as schemaless anyway) and b) if we allow multiple transformations
> (which seems likely given that people might want to do things like
> rearrange fields + filter messages) then offsets start getting quite
> complex as we add sub-sub-offsets and sub-sub-sub-offsets. It's doable, but
> seems messy.
>
> Things aren't as easy on the sink side. Since we track offsets using Kafka
> offsets we either need to use the extra metadata space to store the
> sub-offsets or we need to ensure that we only ever need to commit offsets
> on Kafka message boundaries. We might be able to get away with just
> delivering the entire set of generated messages in a single put() call,
> which the connector is expected to either fully accept or fully reject (via
> exception). However, this may end up interacting poorly with assumptions
> connectors might make if we expose things like max.poll.records, where they
> might expect one record at a time.
>
> I'm not really convinced of the benefit of support this -- at some point it
> seems better to use Streams to do transformations if you need flatMap. I
> can't think of many generic transformations that would use 1-to-many, and
> single message transforms really should be quite general -- that's the
> reason for providing a separate interface isolated from Connectors or
> Converters.
>
> Gwen, re: using null and sending to dead letter queue, it would be useful
> to think about how this might interact with other uses of a dead letter
> queue. Similar ideas have been raised for messages that either can't be
> parsed or which the connector chokes on repeatedly. If we use a dead letter
> queue for those, do we want these messages (which are explicitly filtered
> by a transform setup by the user) to end up in the same location?
>
> -Ewen
>
> On Sun, Jul 17, 2016 at 9:53 PM, Jun Rao <j...@confluent.io> wrote:
>
> > Does the transformation need to be 1-to-1? For example, some users model
> > each Kafka message as schema + a batch of binary records. When using a
> sink
> > connector to push the Kafka data to a sink, if would be useful if the
> > transformer can convert each Kafka message to multiple records.
> >
> > Thanks,
> >
> > Jun
> >
> > On Sat, Jul 16, 2016 at 1:25 PM, Nisarg Shah <snis...@gmail.com> wrote:
> >
> > > Gwen,
> > >
> > > Yup, that sounds great! Instead of keeping it up to the transformers to
> > > handle null, we can instead have the topic as null. Sounds good. To get
> > rid
> > > of a message, set the topic to a special one (could be as simple as
> > null).
> > >
> > > Like I said before, the more interesting part would be ‘adding’ a new
> > > message to the existing list, based on say the current message in the
> > > transformer. Does that feature warrant to be included?
> > >
> > > > On Jul 14, 2016, at 22:25, Gwen Shapira <g...@confluent.io> wrote:
> > > >
> > > > I used to work on Apache Flume, where we used to allow users to
> filter
> > > > messages completely in the transformation and then we got rid of it,
> > > > because we spent too much time trying to help users who had "message
> > > > loss", where the loss was actually a bug in the filter...
> > > >
> > > > What we couldn't do in Flume, but perhaps can do in the simple
> > > > transform for Connect is the ability to route messages to different
> > > > topics, with "null" as one of the possible targets. This will allow
> > > > you to implement a dead-letter-queue functionality and redirect
> > > > messages that don't pass filter to an "errors" topic without getting
> > > > rid of them completely, while also allowing braver users to get rid
> of
> > > > messages by directing them to "null".
> > > >
> > > > Does that make sense?
> > > >
> > > > Gwen
> > > >
> > > > On Thu, Jul 14, 2016 at 8:33 PM, Nisarg Shah <snis...@gmail.com>
> > wrote:
> > > >> Thank you for your inputs Gwen and Michael.
> > > >>
> > > >> The original reason why I suggested a set based processing is
> because
> > > of the flexibility is provides. The JIRA had a comment by a user
> > requesting
> > > a feature that could be achieved with this.
> > > >>
> > > >> After reading Gwen and Michael's points, (I went through the
> > > documentation and the code in detail) and agree with what you have to
> > say.
> > > Also, fewer guarantees make what I had in mind less certain and thus
> > > simplifying it to a single message based transformation would ensure
> that
> > > users who do require more flexibility with the transformations will
> > > automatically “turn to" Kafka Streams. The transformation logic on a
> > > message by message basis makes more sense.
> > > >>
> > > >> One usecase that Kafka Connect could consider is adding or removing
> a
> > > message completely. (This was trivially possible with the collection
> > > passing). Should users be pointed towards Kafka Streams even for this
> use
> > > case? I think this is a very useful feature for Connect too, and I’ll
> try
> > > to rethink on the API too.
> > > >>
> > > >> Removing a message is as easy as returning a null and having the
> next
> > > transformer skip it, but adding messages would involve say a queue
> > between
> > > transformers and a method which says “pass message” to the next, which
> > can
> > > be called multiple times from one “transform” function; a variation on
> > the
> > > chain of responsibility design pattern.
> > > >>
> > > >>> On Jul 12, 2016, at 12:54 AM, Michael Noll <mich...@confluent.io>
> > > wrote:
> > > >>>
> > > >>> As Gwen said, my initial thought is that message transformations
> that
> > > are
> > > >>> "more than trivial" should rather be done by Kafka Streams, rather
> > > than by
> > > >>> Kafka Connect (for the reasons that Gwen mentioned).
> > > >>>
> > > >>> Transforming one message at a time would be a good fit for Kafka
> > > Connect.
> > > >>> An important use case is to remove sensitive data (such as PII)
> from
> > an
> > > >>> incoming data stream before it hits Kafka's persistent storage --
> > this
> > > use
> > > >>> case can't be implemented well with Kafka Streams because, by
> design,
> > > Kafka
> > > >>> Streams is meant to read its input data from Kafka (i.e. at the
> point
> > > when
> > > >>> Kafka Streams could be used to removed sensitive data fields the
> data
> > > is
> > > >>> already stored persistently in Kafka, and this might be a no-go
> > > depending
> > > >>> on the use case).
> > > >>>
> > > >>> I'm of course interested to hear what other people think.
> > > >>>
> > > >>>
> > > >>> On Tue, Jul 12, 2016 at 6:06 AM, Gwen Shapira <g...@confluent.io>
> > > wrote:
> > > >>>
> > > >>>> I think we need to restrict the functionality to
> > > one-message-at-a-time.
> > > >>>>
> > > >>>> Basically, connect gives very little guarantees about the size of
> > the
> > > set
> > > >>>> of the composition (you may get same messages over and over, mix
> of
> > > old and
> > > >>>> new, etc)
> > > >>>>
> > > >>>> In order to do useful things over a collection, you need better
> > > defined
> > > >>>> semantics of what's included. Kafka Streams is putting tons of
> > effort
> > > into
> > > >>>> having good windowing semantics, and I think apps that require
> > > modification
> > > >>>> of collections are a better fit there.
> > > >>>>
> > > >>>> I'm willing to change my mind though (have been known to happen) -
> > > what are
> > > >>>> the comments about usage that point toward the collections
> approach?
> > > >>>>
> > > >>>> Gwen
> > > >>>>
> > > >>>> On Mon, Jul 11, 2016 at 3:32 PM, Nisarg Shah <snis...@gmail.com>
> > > wrote:
> > > >>>>
> > > >>>>> Thanks Jay, added that to the KIP.
> > > >>>>>
> > > >>>>> Besides reviewing the KIP as a whole, I wanted to know about what
> > > >>>> everyone
> > > >>>>> thinks about what data should be dealt at the Transformer level.
> > > >>>> Transform
> > > >>>>> the whole Collection of Records (giving the flexibility of
> > modifying
> > > >>>>> messages across the set) OR
> > > >>>>> Transform messages one at a time, iteratively. This will restrict
> > > >>>>> modifications across messages.
> > > >>>>>
> > > >>>>> I’ll get a working sample ready soon, to have a look. There were
> > some
> > > >>>>> comments about Transformer usage that pointed to the first
> > approach,
> > > >>>> which
> > > >>>>> I prefer too given the flexibility.
> > > >>>>>
> > > >>>>>> On Jul 11, 2016, at 2:49 PM, Jay Kreps <j...@confluent.io>
> wrote:
> > > >>>>>>
> > > >>>>>> One minor thing, the Transformer interface probably needs a
> > close()
> > > >>>>> method
> > > >>>>>> (i.e. the opposite of initialize). This would be used for any
> > > >>>> transformer
> > > >>>>>> that uses a resource like a file/socket/db connection/etc that
> > > needs to
> > > >>>>> be
> > > >>>>>> closed. You usually don't need this but when you do need it you
> > > really
> > > >>>>> need
> > > >>>>>> it.
> > > >>>>>>
> > > >>>>>> -Jay
> > > >>>>>>
> > > >>>>>> On Mon, Jul 11, 2016 at 1:47 PM, Nisarg Shah <snis...@gmail.com
> >
> > > >>>> wrote:
> > > >>>>>>
> > > >>>>>>> Hello,
> > > >>>>>>>
> > > >>>>>>> This KIP <
> > > >>>>>>>
> > > >>>>>
> > > >>>>
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-66:+Add+Kafka+Connect+Transformers+to+allow+transformations+to+messages
> > > >>>>>>
> > > >>>>>>> is for KAFKA-3209 <
> > > https://issues.apache.org/jira/browse/KAFKA-3209>.
> > > >>>>>>> It’s about capabilities to transform messages in Kafka Connect.
> > > >>>>>>>
> > > >>>>>>> Some design decisions need to be taken, so please advise me on
> > the
> > > >>>> same.
> > > >>>>>>> Feel free to express any thoughts or concerns as well.
> > > >>>>>>>
> > > >>>>>>> Many many thanks to Ewen Cheslack-Postava.
> > > >>>>>>>
> > > >>>>>>> -Nisarg
> > > >>>>>
> > > >>>>>
> > > >>>>
> > > >>>
> > > >>>
> > > >>>
> > > >>> --
> > > >>> Best regards,
> > > >>> Michael Noll
> > > >>>
> > > >>>
> > > >>>
> > > >>> *Michael G. Noll | Product Manager | Confluent | +1
> > > 650.453.5860Download
> > > >>> Apache Kafka and Confluent Platform: www.confluent.io/download
> > > >>> <http://www.confluent.io/download>*
> > > >>
> > >
> > >
> >
>
>
>
> --
> Thanks,
> Ewen
>

Reply via email to