Hey Guozhang, These are good points, let me try to address them.
1. Our goal is to be able to provide a best-of-breed serialization package that works out of the box that does most of the magic. This best-of-breed plugin would allow schemas, schema evolution, compatibility checks, etc. We think if this is good enough most people will use it. We spent the last few months talking with Kafka users and this is an area where there really is a lot of room for improvement (seriously many people are just sending csv data or have no standard at all). Some people may want to customize this logic, but still they will be able to easily bundle up their customized logic using this api and have every application in their organization easily plug it in. Our primary goal is to have all applications in an organization be able to share an approach to data serialization while still programming against the public Kafka api. 2. I think what you are saying is that there isn't a big functional difference between producer.send(encoder.encode(key), encoder.encode(value) and producer.send(key, value) I agree functionally these are equivalent. The only real differences are (a) the byte[] interface doesn't encourage the use of a serializer (you have to communicate the org standard via email) (b) there is no easy way to reuse the serializer on the server side for message format validation (c) there is no way to allow plug in other validators in the client that need to see the original object (without having these reserialize the object to do their validation). 3. Agree. Part of the problem in the old producer that made it error prone was that we have a default serializer that gives insane errors when used with the wrong input types...which irrespective of what we do here we should probably fix. There is value in having both a constructor which takes the serializers and not. The value of allowing instantiation from config is to make it easier to inherent the serializers from an environment config that does the right thing. 4. Agreed. I addressed this a bit in the other email. -Jay On Thu, Dec 4, 2014 at 10:19 AM, Guozhang Wang <wangg...@gmail.com> wrote: > I would prefer making the kafka producer as is and wrap the object API on > top rather than wiring the serializer configs into producers. Some > thoughts: > > 1. For code sharing, I think it may only be effective for though simple > functions such as string serialization, etc. For Avro / Shrift / PB, the > serialization logic would be quite hard to share across organizations: > imagine some people wants to use Avro 1.7 while others are still staying > with 1.4 which are not API compatible, while some people use a schema > registry server for clients to communicate while others compile the schemas > into source code, etc. So I think in the end having those simple object > serialization code into kafka.api package and letting applications write > their own complicated serialization wrapper would be as beneficial as this > approach. > > 2. For code simplicity I do not see a huge difference between a wired > serializer, which will call serializer.encode() inside the producer, with a > wrapper, which will call the same outside the producer, or a typed record, > which will call record.encode() inside the producer. > > 3. For less error-proneness, people always mess with the config settings > especially when they use hierarchical / nested wiring of configs, and such > mistakes will only be detected on runtime but not compilation time. In the > past we have seem a lot of such cases with the old producer APIs that > wire-in the serializer class. If we move this to a SerDe interface, for > example KafkaProducer<K, V>(KeySer<K>, ValueSer<V>) such errors will be > detected at compilation. > > 4. For data type flexibility, the current approach bind one producer > instance to a fixed record type. This may be OK in most cases as people > usually only use a single data type but there are some cases where we would > like to have a single producer to be able to send multiple typed messages, > like control messages, or even with a single serialization like Avro we > would sometimes want to have GenericaRecord and IndexedRecord for some > specific types. > > > Guozhang > > On Wed, Dec 3, 2014 at 2:54 PM, Jun Rao <j...@confluent.io> wrote: > > > Jan, Jason, > > > > First, within an Kafka cluster, it's unlikely that each topic has a > > different type serializer. Like Jason mentioned, Square standardizes on > > protocol. Many other places such as LinkedIn standardize on Avro. > > > > Second, dealing with bytes only has limited use cases. Other than copying > > bytes around, there isn't much else that one can do. Even for the case of > > copying data from Kafka into HDFS, often you will need to (1) extract the > > timestamp so that you can partition the data properly; (2) extract > > individual fields if you want to put the data in a column-oriented > storage > > format. So, most interesting clients likely need to deal with objects > > instead of bytes. > > > > Finally, the generic api doesn't prevent one from using just the bytes. > The > > additional overhead is just a method call, which the old clients are > > already paying. Having both a raw bytes and a generic api is probably > going > > to confuse the users more. > > > > Thanks, > > > > Jun > > > > > > > > On Tue, Dec 2, 2014 at 6:50 PM, Jan Filipiak <jan.filip...@trivago.com> > > wrote: > > > > > Hello Everyone, > > > > > > I would very much appreciate if someone could provide me a real world > > > examplewhere it is more convenient to implement the serializers instead > > of > > > just making sure to provide bytearrays. > > > > > > The code we came up with explicitly avoids the serializer api. I think > it > > > is common understanding that if you want to transport data you need to > > have > > > it as a bytearray. > > > > > > If at all I personally would like to have a serializer interface that > > > takes the same types as the producer > > > > > > public interface Serializer<K,V> extends Configurable { > > > public byte[] serializeKey(K data); > > > public byte[] serializeValue(V data); > > > public void close(); > > > } > > > > > > this would avoid long serialize implementations with branches like > > > "switch(topic)" or "if(isKey)". Further serializer per topic makes more > > > sense in my opinion. It feels natural to have a one to one relationship > > > from types to topics or at least only a few partition per type. But as > we > > > inherit the type from the producer we would have to create many > > producers. > > > This would create additional unnecessary connections to the brokers. > With > > > the serializers we create a one type to all topics relationship and the > > > only type that satisfies that is the bytearray or Object. Am I missing > > > something here? As said in the beginning I would like to that usecase > > that > > > really benefits from using the serializers. I think in theory they > sound > > > great but they cause real practical issues that may lead users to wrong > > > decisions. > > > > > > -1 for putting the serializers back in. > > > > > > Looking forward to replies that can show me the benefit of serializes > and > > > especially how the > > > Type => topic relationship can be handled nicely. > > > > > > Best > > > Jan > > > > > > > > > > > > > > > On 25.11.2014 02:58, Jun Rao wrote: > > > > > >> Hi, Everyone, > > >> > > >> I'd like to start a discussion on whether it makes sense to add the > > >> serializer api back to the new java producer. Currently, the new java > > >> producer takes a byte array for both the key and the value. While this > > api > > >> is simple, it pushes the serialization logic into the application. > This > > >> makes it hard to reason about what type of data is being sent to Kafka > > and > > >> also makes it hard to share an implementation of the serializer. For > > >> example, to support Avro, the serialization logic could be quite > > involved > > >> since it might need to register the Avro schema in some remote > registry > > >> and > > >> maintain a schema cache locally, etc. Without a serialization api, > it's > > >> impossible to share such an implementation so that people can easily > > >> reuse. > > >> We sort of overlooked this implication during the initial discussion > of > > >> the > > >> producer api. > > >> > > >> So, I'd like to propose an api change to the new producer by adding > back > > >> the serializer api similar to what we had in the old producer. > > Specially, > > >> the proposed api changes are the following. > > >> > > >> First, we change KafkaProducer to take generic types K and V for the > key > > >> and the value, respectively. > > >> > > >> public class KafkaProducer<K,V> implements Producer<K,V> { > > >> > > >> public Future<RecordMetadata> send(ProducerRecord<K,V> record, > > >> Callback > > >> callback); > > >> > > >> public Future<RecordMetadata> send(ProducerRecord<K,V> record); > > >> } > > >> > > >> Second, we add two new configs, one for the key serializer and another > > for > > >> the value serializer. Both serializers will default to the byte array > > >> implementation. > > >> > > >> public class ProducerConfig extends AbstractConfig { > > >> > > >> .define(KEY_SERIALIZER_CLASS_CONFIG, Type.CLASS, > > >> "org.apache.kafka.clients.producer.ByteArraySerializer", > > Importance.HIGH, > > >> KEY_SERIALIZER_CLASS_DOC) > > >> .define(VALUE_SERIALIZER_CLASS_CONFIG, Type.CLASS, > > >> "org.apache.kafka.clients.producer.ByteArraySerializer", > > Importance.HIGH, > > >> VALUE_SERIALIZER_CLASS_DOC); > > >> } > > >> > > >> Both serializers will implement the following interface. > > >> > > >> public interface Serializer<T> extends Configurable { > > >> public byte[] serialize(String topic, T data, boolean isKey); > > >> > > >> public void close(); > > >> } > > >> > > >> This is more or less the same as what's in the old producer. The > slight > > >> differences are (1) the serializer now only requires a parameter-less > > >> constructor; (2) the serializer has a configure() and a close() method > > for > > >> initialization and cleanup, respectively; (3) the serialize() method > > >> additionally takes the topic and an isKey indicator, both of which are > > >> useful for things like schema registration. > > >> > > >> The detailed changes are included in KAFKA-1797. For completeness, I > > also > > >> made the corresponding changes for the new java consumer api as well. > > >> > > >> Note that the proposed api changes are incompatible with what's in the > > >> 0.8.2 branch. However, if those api changes are beneficial, it's > > probably > > >> better to include them now in the 0.8.2 release, rather than later. > > >> > > >> I'd like to discuss mainly two things in this thread. > > >> 1. Do people feel that the proposed api changes are reasonable? > > >> 2. Are there any concerns of including the api changes in the 0.8.2 > > final > > >> release? > > >> > > >> Thanks, > > >> > > >> Jun > > >> > > >> > > > > > > > > > -- > -- Guozhang >