API question regarding dead letter queues / sending messages to a null
topic in order to get rid of them:  I assume we wouldn't suggest users to
actually pass `null` into some method, but rather have a proper and
descriptive API method such as `discard()` (this name is just an example)?

On Sat, Jul 23, 2016 at 11:13 PM, Ewen Cheslack-Postava <e...@confluent.io>
wrote:

> On Fri, Jul 22, 2016 at 12:58 AM, Shikhar Bhushan <shik...@confluent.io>
> wrote:
>
> > flatMap() / supporting 1->n feels nice and general since filtering is
> just
> > the case of going from 1->0
> >
> > I'm not sure why we'd need to do any more granular offset tracking (like
> > sub-offsets) for source connectors: after transformation of a given
> record
> > to n records, all of those n should map to same offset of the source
> > partition. The only thing to take care of here would be that we don't
> > commit a source offset while there are still records with that offset
> that
> > haven't been flushed to Kafka, but this is in the control of the connect
> > runtime.
> >
> >
> I'd like to be forward thinking with this and make sure we can get exactly
> once delivery when the producer can support it. For that you need to be
> able to track offsets at the granularity you actually publish messages to
> Kafka (or at least I can't think of a way of making it work without
> tracking them at that granularity).
>
> -Ewen
>
>
> > I see your point for sink connectors, though. Implementors can currently
> > assume 1:1ness of a record to its Kafka coordinates (topic, partition,
> > offset).
> >
> > On Thu, Jul 21, 2016 at 10:57 PM Ewen Cheslack-Postava <
> e...@confluent.io>
> > wrote:
> >
> > > Jun, The problem with it not being 1-1 is that Connect relies heavily
> on
> > > offsets, so we'd need to be able to track offsets at this finer
> > > granularity. Filtering is ok, but flatMap isn't. If you convert one
> > message
> > > to many, what are the offsets for the new messages? One possibility
> would
> > > be to assume that transformations are deterministic and then "enhance"
> > the
> > > offsets with an extra integer field that indicates its position in the
> > > subset. For sources this seems attractive since you can then reset to
> > > whatever the connector-provided offset is and then filter out any of
> the
> > > "sub"-messages that are earlier than the recorded "sub"-offset. But
> this
> > > might not actually work for sources since a) the offsets will include
> > extra
> > > fields that the connector doesn't expect (might be ok since we handle
> > that
> > > data as schemaless anyway) and b) if we allow multiple transformations
> > > (which seems likely given that people might want to do things like
> > > rearrange fields + filter messages) then offsets start getting quite
> > > complex as we add sub-sub-offsets and sub-sub-sub-offsets. It's doable,
> > but
> > > seems messy.
> > >
> > > Things aren't as easy on the sink side. Since we track offsets using
> > Kafka
> > > offsets we either need to use the extra metadata space to store the
> > > sub-offsets or we need to ensure that we only ever need to commit
> offsets
> > > on Kafka message boundaries. We might be able to get away with just
> > > delivering the entire set of generated messages in a single put() call,
> > > which the connector is expected to either fully accept or fully reject
> > (via
> > > exception). However, this may end up interacting poorly with
> assumptions
> > > connectors might make if we expose things like max.poll.records, where
> > they
> > > might expect one record at a time.
> > >
> > > I'm not really convinced of the benefit of support this -- at some
> point
> > it
> > > seems better to use Streams to do transformations if you need flatMap.
> I
> > > can't think of many generic transformations that would use 1-to-many,
> and
> > > single message transforms really should be quite general -- that's the
> > > reason for providing a separate interface isolated from Connectors or
> > > Converters.
> > >
> > > Gwen, re: using null and sending to dead letter queue, it would be
> useful
> > > to think about how this might interact with other uses of a dead letter
> > > queue. Similar ideas have been raised for messages that either can't be
> > > parsed or which the connector chokes on repeatedly. If we use a dead
> > letter
> > > queue for those, do we want these messages (which are explicitly
> filtered
> > > by a transform setup by the user) to end up in the same location?
> > >
> > > -Ewen
> > >
> > > On Sun, Jul 17, 2016 at 9:53 PM, Jun Rao <j...@confluent.io> wrote:
> > >
> > > > Does the transformation need to be 1-to-1? For example, some users
> > model
> > > > each Kafka message as schema + a batch of binary records. When using
> a
> > > sink
> > > > connector to push the Kafka data to a sink, if would be useful if the
> > > > transformer can convert each Kafka message to multiple records.
> > > >
> > > > Thanks,
> > > >
> > > > Jun
> > > >
> > > > On Sat, Jul 16, 2016 at 1:25 PM, Nisarg Shah <snis...@gmail.com>
> > wrote:
> > > >
> > > > > Gwen,
> > > > >
> > > > > Yup, that sounds great! Instead of keeping it up to the
> transformers
> > to
> > > > > handle null, we can instead have the topic as null. Sounds good. To
> > get
> > > > rid
> > > > > of a message, set the topic to a special one (could be as simple as
> > > > null).
> > > > >
> > > > > Like I said before, the more interesting part would be ‘adding’ a
> new
> > > > > message to the existing list, based on say the current message in
> the
> > > > > transformer. Does that feature warrant to be included?
> > > > >
> > > > > > On Jul 14, 2016, at 22:25, Gwen Shapira <g...@confluent.io>
> wrote:
> > > > > >
> > > > > > I used to work on Apache Flume, where we used to allow users to
> > > filter
> > > > > > messages completely in the transformation and then we got rid of
> > it,
> > > > > > because we spent too much time trying to help users who had
> > "message
> > > > > > loss", where the loss was actually a bug in the filter...
> > > > > >
> > > > > > What we couldn't do in Flume, but perhaps can do in the simple
> > > > > > transform for Connect is the ability to route messages to
> different
> > > > > > topics, with "null" as one of the possible targets. This will
> allow
> > > > > > you to implement a dead-letter-queue functionality and redirect
> > > > > > messages that don't pass filter to an "errors" topic without
> > getting
> > > > > > rid of them completely, while also allowing braver users to get
> rid
> > > of
> > > > > > messages by directing them to "null".
> > > > > >
> > > > > > Does that make sense?
> > > > > >
> > > > > > Gwen
> > > > > >
> > > > > > On Thu, Jul 14, 2016 at 8:33 PM, Nisarg Shah <snis...@gmail.com>
> > > > wrote:
> > > > > >> Thank you for your inputs Gwen and Michael.
> > > > > >>
> > > > > >> The original reason why I suggested a set based processing is
> > > because
> > > > > of the flexibility is provides. The JIRA had a comment by a user
> > > > requesting
> > > > > a feature that could be achieved with this.
> > > > > >>
> > > > > >> After reading Gwen and Michael's points, (I went through the
> > > > > documentation and the code in detail) and agree with what you have
> to
> > > > say.
> > > > > Also, fewer guarantees make what I had in mind less certain and
> thus
> > > > > simplifying it to a single message based transformation would
> ensure
> > > that
> > > > > users who do require more flexibility with the transformations will
> > > > > automatically “turn to" Kafka Streams. The transformation logic on
> a
> > > > > message by message basis makes more sense.
> > > > > >>
> > > > > >> One usecase that Kafka Connect could consider is adding or
> > removing
> > > a
> > > > > message completely. (This was trivially possible with the
> collection
> > > > > passing). Should users be pointed towards Kafka Streams even for
> this
> > > use
> > > > > case? I think this is a very useful feature for Connect too, and
> I’ll
> > > try
> > > > > to rethink on the API too.
> > > > > >>
> > > > > >> Removing a message is as easy as returning a null and having the
> > > next
> > > > > transformer skip it, but adding messages would involve say a queue
> > > > between
> > > > > transformers and a method which says “pass message” to the next,
> > which
> > > > can
> > > > > be called multiple times from one “transform” function; a variation
> > on
> > > > the
> > > > > chain of responsibility design pattern.
> > > > > >>
> > > > > >>> On Jul 12, 2016, at 12:54 AM, Michael Noll <
> mich...@confluent.io
> > >
> > > > > wrote:
> > > > > >>>
> > > > > >>> As Gwen said, my initial thought is that message
> transformations
> > > that
> > > > > are
> > > > > >>> "more than trivial" should rather be done by Kafka Streams,
> > rather
> > > > > than by
> > > > > >>> Kafka Connect (for the reasons that Gwen mentioned).
> > > > > >>>
> > > > > >>> Transforming one message at a time would be a good fit for
> Kafka
> > > > > Connect.
> > > > > >>> An important use case is to remove sensitive data (such as PII)
> > > from
> > > > an
> > > > > >>> incoming data stream before it hits Kafka's persistent storage
> --
> > > > this
> > > > > use
> > > > > >>> case can't be implemented well with Kafka Streams because, by
> > > design,
> > > > > Kafka
> > > > > >>> Streams is meant to read its input data from Kafka (i.e. at the
> > > point
> > > > > when
> > > > > >>> Kafka Streams could be used to removed sensitive data fields
> the
> > > data
> > > > > is
> > > > > >>> already stored persistently in Kafka, and this might be a no-go
> > > > > depending
> > > > > >>> on the use case).
> > > > > >>>
> > > > > >>> I'm of course interested to hear what other people think.
> > > > > >>>
> > > > > >>>
> > > > > >>> On Tue, Jul 12, 2016 at 6:06 AM, Gwen Shapira <
> g...@confluent.io
> > >
> > > > > wrote:
> > > > > >>>
> > > > > >>>> I think we need to restrict the functionality to
> > > > > one-message-at-a-time.
> > > > > >>>>
> > > > > >>>> Basically, connect gives very little guarantees about the size
> > of
> > > > the
> > > > > set
> > > > > >>>> of the composition (you may get same messages over and over,
> mix
> > > of
> > > > > old and
> > > > > >>>> new, etc)
> > > > > >>>>
> > > > > >>>> In order to do useful things over a collection, you need
> better
> > > > > defined
> > > > > >>>> semantics of what's included. Kafka Streams is putting tons of
> > > > effort
> > > > > into
> > > > > >>>> having good windowing semantics, and I think apps that require
> > > > > modification
> > > > > >>>> of collections are a better fit there.
> > > > > >>>>
> > > > > >>>> I'm willing to change my mind though (have been known to
> > happen) -
> > > > > what are
> > > > > >>>> the comments about usage that point toward the collections
> > > approach?
> > > > > >>>>
> > > > > >>>> Gwen
> > > > > >>>>
> > > > > >>>> On Mon, Jul 11, 2016 at 3:32 PM, Nisarg Shah <
> snis...@gmail.com
> > >
> > > > > wrote:
> > > > > >>>>
> > > > > >>>>> Thanks Jay, added that to the KIP.
> > > > > >>>>>
> > > > > >>>>> Besides reviewing the KIP as a whole, I wanted to know about
> > what
> > > > > >>>> everyone
> > > > > >>>>> thinks about what data should be dealt at the Transformer
> > level.
> > > > > >>>> Transform
> > > > > >>>>> the whole Collection of Records (giving the flexibility of
> > > > modifying
> > > > > >>>>> messages across the set) OR
> > > > > >>>>> Transform messages one at a time, iteratively. This will
> > restrict
> > > > > >>>>> modifications across messages.
> > > > > >>>>>
> > > > > >>>>> I’ll get a working sample ready soon, to have a look. There
> > were
> > > > some
> > > > > >>>>> comments about Transformer usage that pointed to the first
> > > > approach,
> > > > > >>>> which
> > > > > >>>>> I prefer too given the flexibility.
> > > > > >>>>>
> > > > > >>>>>> On Jul 11, 2016, at 2:49 PM, Jay Kreps <j...@confluent.io>
> > > wrote:
> > > > > >>>>>>
> > > > > >>>>>> One minor thing, the Transformer interface probably needs a
> > > > close()
> > > > > >>>>> method
> > > > > >>>>>> (i.e. the opposite of initialize). This would be used for
> any
> > > > > >>>> transformer
> > > > > >>>>>> that uses a resource like a file/socket/db connection/etc
> that
> > > > > needs to
> > > > > >>>>> be
> > > > > >>>>>> closed. You usually don't need this but when you do need it
> > you
> > > > > really
> > > > > >>>>> need
> > > > > >>>>>> it.
> > > > > >>>>>>
> > > > > >>>>>> -Jay
> > > > > >>>>>>
> > > > > >>>>>> On Mon, Jul 11, 2016 at 1:47 PM, Nisarg Shah <
> > snis...@gmail.com
> > > >
> > > > > >>>> wrote:
> > > > > >>>>>>
> > > > > >>>>>>> Hello,
> > > > > >>>>>>>
> > > > > >>>>>>> This KIP <
> > > > > >>>>>>>
> > > > > >>>>>
> > > > > >>>>
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-66:+Add+Kafka+Connect+Transformers+to+allow+transformations+to+messages
> > > > > >>>>>>
> > > > > >>>>>>> is for KAFKA-3209 <
> > > > > https://issues.apache.org/jira/browse/KAFKA-3209>.
> > > > > >>>>>>> It’s about capabilities to transform messages in Kafka
> > Connect.
> > > > > >>>>>>>
> > > > > >>>>>>> Some design decisions need to be taken, so please advise me
> > on
> > > > the
> > > > > >>>> same.
> > > > > >>>>>>> Feel free to express any thoughts or concerns as well.
> > > > > >>>>>>>
> > > > > >>>>>>> Many many thanks to Ewen Cheslack-Postava.
> > > > > >>>>>>>
> > > > > >>>>>>> -Nisarg
> > > > > >>>>>
> > > > > >>>>>
> > > > > >>>>
> > > > > >>>
> > > > > >>>
> > > > > >>>
> > > > > >>> --
> > > > > >>> Best regards,
> > > > > >>> Michael Noll
> > > > > >>>
> > > > > >>>
> > > > > >>>
> > > > > >>> *Michael G. Noll | Product Manager | Confluent | +1
> > > > > 650.453.5860Download
> > > > > >>> Apache Kafka and Confluent Platform: www.confluent.io/download
> > > > > >>> <http://www.confluent.io/download>*
> > > > > >>
> > > > >
> > > > >
> > > >
> > >
> > >
> > >
> > > --
> > > Thanks,
> > > Ewen
> > >
> >
>
>
>
> --
> Thanks,
> Ewen
>



-- 

*Michael G. Noll | Product Manager | Confluent | +1 650.453.5860Download
Apache Kafka and Confluent Platform: www.confluent.io/download
<http://www.confluent.io/download>*

Reply via email to