API question regarding dead letter queues / sending messages to a null topic in order to get rid of them: I assume we wouldn't suggest users to actually pass `null` into some method, but rather have a proper and descriptive API method such as `discard()` (this name is just an example)?
On Sat, Jul 23, 2016 at 11:13 PM, Ewen Cheslack-Postava <e...@confluent.io> wrote: > On Fri, Jul 22, 2016 at 12:58 AM, Shikhar Bhushan <shik...@confluent.io> > wrote: > > > flatMap() / supporting 1->n feels nice and general since filtering is > just > > the case of going from 1->0 > > > > I'm not sure why we'd need to do any more granular offset tracking (like > > sub-offsets) for source connectors: after transformation of a given > record > > to n records, all of those n should map to same offset of the source > > partition. The only thing to take care of here would be that we don't > > commit a source offset while there are still records with that offset > that > > haven't been flushed to Kafka, but this is in the control of the connect > > runtime. > > > > > I'd like to be forward thinking with this and make sure we can get exactly > once delivery when the producer can support it. For that you need to be > able to track offsets at the granularity you actually publish messages to > Kafka (or at least I can't think of a way of making it work without > tracking them at that granularity). > > -Ewen > > > > I see your point for sink connectors, though. Implementors can currently > > assume 1:1ness of a record to its Kafka coordinates (topic, partition, > > offset). > > > > On Thu, Jul 21, 2016 at 10:57 PM Ewen Cheslack-Postava < > e...@confluent.io> > > wrote: > > > > > Jun, The problem with it not being 1-1 is that Connect relies heavily > on > > > offsets, so we'd need to be able to track offsets at this finer > > > granularity. Filtering is ok, but flatMap isn't. If you convert one > > message > > > to many, what are the offsets for the new messages? One possibility > would > > > be to assume that transformations are deterministic and then "enhance" > > the > > > offsets with an extra integer field that indicates its position in the > > > subset. For sources this seems attractive since you can then reset to > > > whatever the connector-provided offset is and then filter out any of > the > > > "sub"-messages that are earlier than the recorded "sub"-offset. But > this > > > might not actually work for sources since a) the offsets will include > > extra > > > fields that the connector doesn't expect (might be ok since we handle > > that > > > data as schemaless anyway) and b) if we allow multiple transformations > > > (which seems likely given that people might want to do things like > > > rearrange fields + filter messages) then offsets start getting quite > > > complex as we add sub-sub-offsets and sub-sub-sub-offsets. It's doable, > > but > > > seems messy. > > > > > > Things aren't as easy on the sink side. Since we track offsets using > > Kafka > > > offsets we either need to use the extra metadata space to store the > > > sub-offsets or we need to ensure that we only ever need to commit > offsets > > > on Kafka message boundaries. We might be able to get away with just > > > delivering the entire set of generated messages in a single put() call, > > > which the connector is expected to either fully accept or fully reject > > (via > > > exception). However, this may end up interacting poorly with > assumptions > > > connectors might make if we expose things like max.poll.records, where > > they > > > might expect one record at a time. > > > > > > I'm not really convinced of the benefit of support this -- at some > point > > it > > > seems better to use Streams to do transformations if you need flatMap. > I > > > can't think of many generic transformations that would use 1-to-many, > and > > > single message transforms really should be quite general -- that's the > > > reason for providing a separate interface isolated from Connectors or > > > Converters. > > > > > > Gwen, re: using null and sending to dead letter queue, it would be > useful > > > to think about how this might interact with other uses of a dead letter > > > queue. Similar ideas have been raised for messages that either can't be > > > parsed or which the connector chokes on repeatedly. If we use a dead > > letter > > > queue for those, do we want these messages (which are explicitly > filtered > > > by a transform setup by the user) to end up in the same location? > > > > > > -Ewen > > > > > > On Sun, Jul 17, 2016 at 9:53 PM, Jun Rao <j...@confluent.io> wrote: > > > > > > > Does the transformation need to be 1-to-1? For example, some users > > model > > > > each Kafka message as schema + a batch of binary records. When using > a > > > sink > > > > connector to push the Kafka data to a sink, if would be useful if the > > > > transformer can convert each Kafka message to multiple records. > > > > > > > > Thanks, > > > > > > > > Jun > > > > > > > > On Sat, Jul 16, 2016 at 1:25 PM, Nisarg Shah <snis...@gmail.com> > > wrote: > > > > > > > > > Gwen, > > > > > > > > > > Yup, that sounds great! Instead of keeping it up to the > transformers > > to > > > > > handle null, we can instead have the topic as null. Sounds good. To > > get > > > > rid > > > > > of a message, set the topic to a special one (could be as simple as > > > > null). > > > > > > > > > > Like I said before, the more interesting part would be ‘adding’ a > new > > > > > message to the existing list, based on say the current message in > the > > > > > transformer. Does that feature warrant to be included? > > > > > > > > > > > On Jul 14, 2016, at 22:25, Gwen Shapira <g...@confluent.io> > wrote: > > > > > > > > > > > > I used to work on Apache Flume, where we used to allow users to > > > filter > > > > > > messages completely in the transformation and then we got rid of > > it, > > > > > > because we spent too much time trying to help users who had > > "message > > > > > > loss", where the loss was actually a bug in the filter... > > > > > > > > > > > > What we couldn't do in Flume, but perhaps can do in the simple > > > > > > transform for Connect is the ability to route messages to > different > > > > > > topics, with "null" as one of the possible targets. This will > allow > > > > > > you to implement a dead-letter-queue functionality and redirect > > > > > > messages that don't pass filter to an "errors" topic without > > getting > > > > > > rid of them completely, while also allowing braver users to get > rid > > > of > > > > > > messages by directing them to "null". > > > > > > > > > > > > Does that make sense? > > > > > > > > > > > > Gwen > > > > > > > > > > > > On Thu, Jul 14, 2016 at 8:33 PM, Nisarg Shah <snis...@gmail.com> > > > > wrote: > > > > > >> Thank you for your inputs Gwen and Michael. > > > > > >> > > > > > >> The original reason why I suggested a set based processing is > > > because > > > > > of the flexibility is provides. The JIRA had a comment by a user > > > > requesting > > > > > a feature that could be achieved with this. > > > > > >> > > > > > >> After reading Gwen and Michael's points, (I went through the > > > > > documentation and the code in detail) and agree with what you have > to > > > > say. > > > > > Also, fewer guarantees make what I had in mind less certain and > thus > > > > > simplifying it to a single message based transformation would > ensure > > > that > > > > > users who do require more flexibility with the transformations will > > > > > automatically “turn to" Kafka Streams. The transformation logic on > a > > > > > message by message basis makes more sense. > > > > > >> > > > > > >> One usecase that Kafka Connect could consider is adding or > > removing > > > a > > > > > message completely. (This was trivially possible with the > collection > > > > > passing). Should users be pointed towards Kafka Streams even for > this > > > use > > > > > case? I think this is a very useful feature for Connect too, and > I’ll > > > try > > > > > to rethink on the API too. > > > > > >> > > > > > >> Removing a message is as easy as returning a null and having the > > > next > > > > > transformer skip it, but adding messages would involve say a queue > > > > between > > > > > transformers and a method which says “pass message” to the next, > > which > > > > can > > > > > be called multiple times from one “transform” function; a variation > > on > > > > the > > > > > chain of responsibility design pattern. > > > > > >> > > > > > >>> On Jul 12, 2016, at 12:54 AM, Michael Noll < > mich...@confluent.io > > > > > > > > wrote: > > > > > >>> > > > > > >>> As Gwen said, my initial thought is that message > transformations > > > that > > > > > are > > > > > >>> "more than trivial" should rather be done by Kafka Streams, > > rather > > > > > than by > > > > > >>> Kafka Connect (for the reasons that Gwen mentioned). > > > > > >>> > > > > > >>> Transforming one message at a time would be a good fit for > Kafka > > > > > Connect. > > > > > >>> An important use case is to remove sensitive data (such as PII) > > > from > > > > an > > > > > >>> incoming data stream before it hits Kafka's persistent storage > -- > > > > this > > > > > use > > > > > >>> case can't be implemented well with Kafka Streams because, by > > > design, > > > > > Kafka > > > > > >>> Streams is meant to read its input data from Kafka (i.e. at the > > > point > > > > > when > > > > > >>> Kafka Streams could be used to removed sensitive data fields > the > > > data > > > > > is > > > > > >>> already stored persistently in Kafka, and this might be a no-go > > > > > depending > > > > > >>> on the use case). > > > > > >>> > > > > > >>> I'm of course interested to hear what other people think. > > > > > >>> > > > > > >>> > > > > > >>> On Tue, Jul 12, 2016 at 6:06 AM, Gwen Shapira < > g...@confluent.io > > > > > > > > wrote: > > > > > >>> > > > > > >>>> I think we need to restrict the functionality to > > > > > one-message-at-a-time. > > > > > >>>> > > > > > >>>> Basically, connect gives very little guarantees about the size > > of > > > > the > > > > > set > > > > > >>>> of the composition (you may get same messages over and over, > mix > > > of > > > > > old and > > > > > >>>> new, etc) > > > > > >>>> > > > > > >>>> In order to do useful things over a collection, you need > better > > > > > defined > > > > > >>>> semantics of what's included. Kafka Streams is putting tons of > > > > effort > > > > > into > > > > > >>>> having good windowing semantics, and I think apps that require > > > > > modification > > > > > >>>> of collections are a better fit there. > > > > > >>>> > > > > > >>>> I'm willing to change my mind though (have been known to > > happen) - > > > > > what are > > > > > >>>> the comments about usage that point toward the collections > > > approach? > > > > > >>>> > > > > > >>>> Gwen > > > > > >>>> > > > > > >>>> On Mon, Jul 11, 2016 at 3:32 PM, Nisarg Shah < > snis...@gmail.com > > > > > > > > wrote: > > > > > >>>> > > > > > >>>>> Thanks Jay, added that to the KIP. > > > > > >>>>> > > > > > >>>>> Besides reviewing the KIP as a whole, I wanted to know about > > what > > > > > >>>> everyone > > > > > >>>>> thinks about what data should be dealt at the Transformer > > level. > > > > > >>>> Transform > > > > > >>>>> the whole Collection of Records (giving the flexibility of > > > > modifying > > > > > >>>>> messages across the set) OR > > > > > >>>>> Transform messages one at a time, iteratively. This will > > restrict > > > > > >>>>> modifications across messages. > > > > > >>>>> > > > > > >>>>> I’ll get a working sample ready soon, to have a look. There > > were > > > > some > > > > > >>>>> comments about Transformer usage that pointed to the first > > > > approach, > > > > > >>>> which > > > > > >>>>> I prefer too given the flexibility. > > > > > >>>>> > > > > > >>>>>> On Jul 11, 2016, at 2:49 PM, Jay Kreps <j...@confluent.io> > > > wrote: > > > > > >>>>>> > > > > > >>>>>> One minor thing, the Transformer interface probably needs a > > > > close() > > > > > >>>>> method > > > > > >>>>>> (i.e. the opposite of initialize). This would be used for > any > > > > > >>>> transformer > > > > > >>>>>> that uses a resource like a file/socket/db connection/etc > that > > > > > needs to > > > > > >>>>> be > > > > > >>>>>> closed. You usually don't need this but when you do need it > > you > > > > > really > > > > > >>>>> need > > > > > >>>>>> it. > > > > > >>>>>> > > > > > >>>>>> -Jay > > > > > >>>>>> > > > > > >>>>>> On Mon, Jul 11, 2016 at 1:47 PM, Nisarg Shah < > > snis...@gmail.com > > > > > > > > > >>>> wrote: > > > > > >>>>>> > > > > > >>>>>>> Hello, > > > > > >>>>>>> > > > > > >>>>>>> This KIP < > > > > > >>>>>>> > > > > > >>>>> > > > > > >>>> > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-66:+Add+Kafka+Connect+Transformers+to+allow+transformations+to+messages > > > > > >>>>>> > > > > > >>>>>>> is for KAFKA-3209 < > > > > > https://issues.apache.org/jira/browse/KAFKA-3209>. > > > > > >>>>>>> It’s about capabilities to transform messages in Kafka > > Connect. > > > > > >>>>>>> > > > > > >>>>>>> Some design decisions need to be taken, so please advise me > > on > > > > the > > > > > >>>> same. > > > > > >>>>>>> Feel free to express any thoughts or concerns as well. > > > > > >>>>>>> > > > > > >>>>>>> Many many thanks to Ewen Cheslack-Postava. > > > > > >>>>>>> > > > > > >>>>>>> -Nisarg > > > > > >>>>> > > > > > >>>>> > > > > > >>>> > > > > > >>> > > > > > >>> > > > > > >>> > > > > > >>> -- > > > > > >>> Best regards, > > > > > >>> Michael Noll > > > > > >>> > > > > > >>> > > > > > >>> > > > > > >>> *Michael G. Noll | Product Manager | Confluent | +1 > > > > > 650.453.5860Download > > > > > >>> Apache Kafka and Confluent Platform: www.confluent.io/download > > > > > >>> <http://www.confluent.io/download>* > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > Thanks, > > > Ewen > > > > > > > > > -- > Thanks, > Ewen > -- *Michael G. Noll | Product Manager | Confluent | +1 650.453.5860Download Apache Kafka and Confluent Platform: www.confluent.io/download <http://www.confluent.io/download>*