flatMap() / supporting 1->n feels nice and general since filtering is just the case of going from 1->0
I'm not sure why we'd need to do any more granular offset tracking (like sub-offsets) for source connectors: after transformation of a given record to n records, all of those n should map to same offset of the source partition. The only thing to take care of here would be that we don't commit a source offset while there are still records with that offset that haven't been flushed to Kafka, but this is in the control of the connect runtime. I see your point for sink connectors, though. Implementors can currently assume 1:1ness of a record to its Kafka coordinates (topic, partition, offset). On Thu, Jul 21, 2016 at 10:57 PM Ewen Cheslack-Postava <e...@confluent.io> wrote: > Jun, The problem with it not being 1-1 is that Connect relies heavily on > offsets, so we'd need to be able to track offsets at this finer > granularity. Filtering is ok, but flatMap isn't. If you convert one message > to many, what are the offsets for the new messages? One possibility would > be to assume that transformations are deterministic and then "enhance" the > offsets with an extra integer field that indicates its position in the > subset. For sources this seems attractive since you can then reset to > whatever the connector-provided offset is and then filter out any of the > "sub"-messages that are earlier than the recorded "sub"-offset. But this > might not actually work for sources since a) the offsets will include extra > fields that the connector doesn't expect (might be ok since we handle that > data as schemaless anyway) and b) if we allow multiple transformations > (which seems likely given that people might want to do things like > rearrange fields + filter messages) then offsets start getting quite > complex as we add sub-sub-offsets and sub-sub-sub-offsets. It's doable, but > seems messy. > > Things aren't as easy on the sink side. Since we track offsets using Kafka > offsets we either need to use the extra metadata space to store the > sub-offsets or we need to ensure that we only ever need to commit offsets > on Kafka message boundaries. We might be able to get away with just > delivering the entire set of generated messages in a single put() call, > which the connector is expected to either fully accept or fully reject (via > exception). However, this may end up interacting poorly with assumptions > connectors might make if we expose things like max.poll.records, where they > might expect one record at a time. > > I'm not really convinced of the benefit of support this -- at some point it > seems better to use Streams to do transformations if you need flatMap. I > can't think of many generic transformations that would use 1-to-many, and > single message transforms really should be quite general -- that's the > reason for providing a separate interface isolated from Connectors or > Converters. > > Gwen, re: using null and sending to dead letter queue, it would be useful > to think about how this might interact with other uses of a dead letter > queue. Similar ideas have been raised for messages that either can't be > parsed or which the connector chokes on repeatedly. If we use a dead letter > queue for those, do we want these messages (which are explicitly filtered > by a transform setup by the user) to end up in the same location? > > -Ewen > > On Sun, Jul 17, 2016 at 9:53 PM, Jun Rao <j...@confluent.io> wrote: > > > Does the transformation need to be 1-to-1? For example, some users model > > each Kafka message as schema + a batch of binary records. When using a > sink > > connector to push the Kafka data to a sink, if would be useful if the > > transformer can convert each Kafka message to multiple records. > > > > Thanks, > > > > Jun > > > > On Sat, Jul 16, 2016 at 1:25 PM, Nisarg Shah <snis...@gmail.com> wrote: > > > > > Gwen, > > > > > > Yup, that sounds great! Instead of keeping it up to the transformers to > > > handle null, we can instead have the topic as null. Sounds good. To get > > rid > > > of a message, set the topic to a special one (could be as simple as > > null). > > > > > > Like I said before, the more interesting part would be ‘adding’ a new > > > message to the existing list, based on say the current message in the > > > transformer. Does that feature warrant to be included? > > > > > > > On Jul 14, 2016, at 22:25, Gwen Shapira <g...@confluent.io> wrote: > > > > > > > > I used to work on Apache Flume, where we used to allow users to > filter > > > > messages completely in the transformation and then we got rid of it, > > > > because we spent too much time trying to help users who had "message > > > > loss", where the loss was actually a bug in the filter... > > > > > > > > What we couldn't do in Flume, but perhaps can do in the simple > > > > transform for Connect is the ability to route messages to different > > > > topics, with "null" as one of the possible targets. This will allow > > > > you to implement a dead-letter-queue functionality and redirect > > > > messages that don't pass filter to an "errors" topic without getting > > > > rid of them completely, while also allowing braver users to get rid > of > > > > messages by directing them to "null". > > > > > > > > Does that make sense? > > > > > > > > Gwen > > > > > > > > On Thu, Jul 14, 2016 at 8:33 PM, Nisarg Shah <snis...@gmail.com> > > wrote: > > > >> Thank you for your inputs Gwen and Michael. > > > >> > > > >> The original reason why I suggested a set based processing is > because > > > of the flexibility is provides. The JIRA had a comment by a user > > requesting > > > a feature that could be achieved with this. > > > >> > > > >> After reading Gwen and Michael's points, (I went through the > > > documentation and the code in detail) and agree with what you have to > > say. > > > Also, fewer guarantees make what I had in mind less certain and thus > > > simplifying it to a single message based transformation would ensure > that > > > users who do require more flexibility with the transformations will > > > automatically “turn to" Kafka Streams. The transformation logic on a > > > message by message basis makes more sense. > > > >> > > > >> One usecase that Kafka Connect could consider is adding or removing > a > > > message completely. (This was trivially possible with the collection > > > passing). Should users be pointed towards Kafka Streams even for this > use > > > case? I think this is a very useful feature for Connect too, and I’ll > try > > > to rethink on the API too. > > > >> > > > >> Removing a message is as easy as returning a null and having the > next > > > transformer skip it, but adding messages would involve say a queue > > between > > > transformers and a method which says “pass message” to the next, which > > can > > > be called multiple times from one “transform” function; a variation on > > the > > > chain of responsibility design pattern. > > > >> > > > >>> On Jul 12, 2016, at 12:54 AM, Michael Noll <mich...@confluent.io> > > > wrote: > > > >>> > > > >>> As Gwen said, my initial thought is that message transformations > that > > > are > > > >>> "more than trivial" should rather be done by Kafka Streams, rather > > > than by > > > >>> Kafka Connect (for the reasons that Gwen mentioned). > > > >>> > > > >>> Transforming one message at a time would be a good fit for Kafka > > > Connect. > > > >>> An important use case is to remove sensitive data (such as PII) > from > > an > > > >>> incoming data stream before it hits Kafka's persistent storage -- > > this > > > use > > > >>> case can't be implemented well with Kafka Streams because, by > design, > > > Kafka > > > >>> Streams is meant to read its input data from Kafka (i.e. at the > point > > > when > > > >>> Kafka Streams could be used to removed sensitive data fields the > data > > > is > > > >>> already stored persistently in Kafka, and this might be a no-go > > > depending > > > >>> on the use case). > > > >>> > > > >>> I'm of course interested to hear what other people think. > > > >>> > > > >>> > > > >>> On Tue, Jul 12, 2016 at 6:06 AM, Gwen Shapira <g...@confluent.io> > > > wrote: > > > >>> > > > >>>> I think we need to restrict the functionality to > > > one-message-at-a-time. > > > >>>> > > > >>>> Basically, connect gives very little guarantees about the size of > > the > > > set > > > >>>> of the composition (you may get same messages over and over, mix > of > > > old and > > > >>>> new, etc) > > > >>>> > > > >>>> In order to do useful things over a collection, you need better > > > defined > > > >>>> semantics of what's included. Kafka Streams is putting tons of > > effort > > > into > > > >>>> having good windowing semantics, and I think apps that require > > > modification > > > >>>> of collections are a better fit there. > > > >>>> > > > >>>> I'm willing to change my mind though (have been known to happen) - > > > what are > > > >>>> the comments about usage that point toward the collections > approach? > > > >>>> > > > >>>> Gwen > > > >>>> > > > >>>> On Mon, Jul 11, 2016 at 3:32 PM, Nisarg Shah <snis...@gmail.com> > > > wrote: > > > >>>> > > > >>>>> Thanks Jay, added that to the KIP. > > > >>>>> > > > >>>>> Besides reviewing the KIP as a whole, I wanted to know about what > > > >>>> everyone > > > >>>>> thinks about what data should be dealt at the Transformer level. > > > >>>> Transform > > > >>>>> the whole Collection of Records (giving the flexibility of > > modifying > > > >>>>> messages across the set) OR > > > >>>>> Transform messages one at a time, iteratively. This will restrict > > > >>>>> modifications across messages. > > > >>>>> > > > >>>>> I’ll get a working sample ready soon, to have a look. There were > > some > > > >>>>> comments about Transformer usage that pointed to the first > > approach, > > > >>>> which > > > >>>>> I prefer too given the flexibility. > > > >>>>> > > > >>>>>> On Jul 11, 2016, at 2:49 PM, Jay Kreps <j...@confluent.io> > wrote: > > > >>>>>> > > > >>>>>> One minor thing, the Transformer interface probably needs a > > close() > > > >>>>> method > > > >>>>>> (i.e. the opposite of initialize). This would be used for any > > > >>>> transformer > > > >>>>>> that uses a resource like a file/socket/db connection/etc that > > > needs to > > > >>>>> be > > > >>>>>> closed. You usually don't need this but when you do need it you > > > really > > > >>>>> need > > > >>>>>> it. > > > >>>>>> > > > >>>>>> -Jay > > > >>>>>> > > > >>>>>> On Mon, Jul 11, 2016 at 1:47 PM, Nisarg Shah <snis...@gmail.com > > > > > >>>> wrote: > > > >>>>>> > > > >>>>>>> Hello, > > > >>>>>>> > > > >>>>>>> This KIP < > > > >>>>>>> > > > >>>>> > > > >>>> > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-66:+Add+Kafka+Connect+Transformers+to+allow+transformations+to+messages > > > >>>>>> > > > >>>>>>> is for KAFKA-3209 < > > > https://issues.apache.org/jira/browse/KAFKA-3209>. > > > >>>>>>> It’s about capabilities to transform messages in Kafka Connect. > > > >>>>>>> > > > >>>>>>> Some design decisions need to be taken, so please advise me on > > the > > > >>>> same. > > > >>>>>>> Feel free to express any thoughts or concerns as well. > > > >>>>>>> > > > >>>>>>> Many many thanks to Ewen Cheslack-Postava. > > > >>>>>>> > > > >>>>>>> -Nisarg > > > >>>>> > > > >>>>> > > > >>>> > > > >>> > > > >>> > > > >>> > > > >>> -- > > > >>> Best regards, > > > >>> Michael Noll > > > >>> > > > >>> > > > >>> > > > >>> *Michael G. Noll | Product Manager | Confluent | +1 > > > 650.453.5860Download > > > >>> Apache Kafka and Confluent Platform: www.confluent.io/download > > > >>> <http://www.confluent.io/download>* > > > >> > > > > > > > > > > > > -- > Thanks, > Ewen >