Thanks Jun. I think we all understand the motivation of adding serialization API back, but are just proposing different ways of doing such. I personally prefer to not bind the producer instance with a fixed serialization, but that said I am fine with the current proposal too as this can still be done via other workarounds.
Guozhang On Tue, Dec 9, 2014 at 3:46 PM, Bhavesh Mistry <mistry.p.bhav...@gmail.com> wrote: > Hi All, > > This is very likely when you have large site such as Linked-in and you have > thousand of servers producing data. You will mixed bag of producer and > serialization or deserialization because of incremental code deployment. > So, it is best to keep the API as generic as possible and each org / > company can wrap the generic API with how ever they fit with serialization/ > de-serialization framework (java or proto buffer or avro or base 64). > > Keep the API as generic as possible. > > Thanks, > > Bhavesh > > On Tue, Dec 9, 2014 at 3:29 PM, Steven Wu <stevenz...@gmail.com> wrote: > > > > In practice the cases that actually mix serialization types in a single > > stream are pretty rare I think just because the consumer then has the > > problem of guessing how to deserialize, so most of these will end up with > > at least some marker or schema id or whatever that tells you how to read > > the data. Arguable this mixed serialization with marker is itself a > > serializer type and should have a serializer of its own... > > > > agree that it is unlikely to have mixed serialization format for one > > topic/type. But we sometimes/often create one Producer object for one > > cluster. and there can be many topics on this cluster. different topics > may > > have different serialization formats. So I agree with Guozhang's point > > regarding "data type flexibility" of using simple byte[] (instead of > > generic <K, V>). > > > > On Fri, Dec 5, 2014 at 5:00 PM, Jay Kreps <j...@confluent.io> wrote: > > > > > Hey Sriram, > > > > > > Thanks! I think this is a very helpful summary. > > > > > > Let me try to address your point about passing in the serde at send > time. > > > > > > I think the first objection is really to the paired key/value > serializer > > > interfaces. This leads to kind of a weird combinatorial thing where you > > > would have an avro/avro serializer a string/avro serializer, a pb/pb > > > serializer, and a string/pb serializer, and so on. But your proposal > > would > > > work as well with separate serializers for key and value. > > > > > > I think the downside is just the one you call out--that this is a > corner > > > case and you end up with two versions of all the apis to support it. > This > > > also makes the serializer api more annoying to implement. I think the > > > alternative solution to this case and any other we can give people is > > just > > > configuring ByteArraySerializer which gives you basically the api that > > you > > > have now with byte arrays. If this is incredibly common then this would > > be > > > a silly solution, but I guess the belief is that these cases are rare > > and a > > > really well implemented avro or json serializer should be 100% of what > > most > > > people need. > > > > > > In practice the cases that actually mix serialization types in a single > > > stream are pretty rare I think just because the consumer then has the > > > problem of guessing how to deserialize, so most of these will end up > with > > > at least some marker or schema id or whatever that tells you how to > read > > > the data. Arguable this mixed serialization with marker is itself a > > > serializer type and should have a serializer of its own... > > > > > > -Jay > > > > > > On Fri, Dec 5, 2014 at 3:48 PM, Sriram Subramanian < > > > srsubraman...@linkedin.com.invalid> wrote: > > > > > > > This thread has diverged multiple times now and it would be worth > > > > summarizing them. > > > > > > > > There seems to be the following points of discussion - > > > > > > > > 1. Can we keep the serialization semantics outside the Producer > > interface > > > > and have simple bytes in / bytes out for the interface (This is what > we > > > > have today). > > > > > > > > The points for this is to keep the interface simple and usage easy to > > > > understand. The points against this is that it gets hard to share > > common > > > > usage patterns around serialization/message validations for the > future. > > > > > > > > 2. Can we create a wrapper producer that does the serialization and > > have > > > > different variants of it for different data formats? > > > > > > > > The points for this is again to keep the main API clean. The points > > > > against this is that it duplicates the API, increases the surface > area > > > and > > > > creates redundancy for a minor addition. > > > > > > > > 3. Do we need to support different data types per record? The current > > > > interface (bytes in/bytes out) lets you instantiate one producer and > > use > > > > it to send multiple data formats. There seems to be some valid use > > cases > > > > for this. > > > > > > > > I have still not seen a strong argument against not having this > > > > functionality. Can someone provide their views on why we don't need > > this > > > > support that is possible with the current API? > > > > > > > > One possible approach for the per record serialization would be to > > define > > > > > > > > public interface SerDe<K,V> { > > > > public byte[] serializeKey(); > > > > > > > > public K deserializeKey(); > > > > > > > > public byte[] serializeValue(); > > > > > > > > public V deserializeValue(); > > > > } > > > > > > > > This would be used by both the Producer and the Consumer. > > > > > > > > The send APIs can then be > > > > > > > > public Future<RecordMetadata> send(ProducerRecord<K,V> record); > > > > public Future<RecordMetadata> send(ProducerRecord<K,V> record, > Callback > > > > callback); > > > > > > > > > > > > public Future<RecordMetadata> send(ProducerRecord<K,V> record, > > SerDe<K,V> > > > > serde); > > > > > > > > public Future<RecordMetadata> send(ProducerRecord<K,V> record, > > SerDe<K,V> > > > > serde, Callback callback); > > > > > > > > > > > > A default SerDe can be set in the config. The producer would use the > > > > default from the config if the non-serde send APIs are used. The > > downside > > > > to this approach is that we would need to have four variants of Send > > API > > > > for the Producer. > > > > > > > > > > > > > > > > > > > > > > > > > > > > On 12/5/14 3:16 PM, "Jun Rao" <j...@confluent.io> wrote: > > > > > > > > >Jiangjie, > > > > > > > > > >The issue with adding the serializer in ProducerRecord is that you > > need > > > to > > > > >implement all combinations of serializers for key and value. So, > > instead > > > > >of > > > > >just implementing int and string serializers, you will have to > > implement > > > > >all 4 combinations. > > > > > > > > > >Adding a new producer constructor like Producer<K, > > V>(KeySerializer<K>, > > > > >ValueSerializer<V>, Properties properties) can be useful. > > > > > > > > > >Thanks, > > > > > > > > > >Jun > > > > > > > > > >On Thu, Dec 4, 2014 at 10:33 AM, Jiangjie Qin > > <j...@linkedin.com.invalid > > > > > > > > >wrote: > > > > > > > > > >> > > > > >> I'm just thinking instead of binding serialization with producer, > > > > >>another > > > > >> option is to bind serializer/deserializer with > > > > >> ProducerRecord/ConsumerRecord (please see the detail proposal > > below.) > > > > >> The arguments for this option is: > > > > >> A. A single producer could send different message types. > > There > > > > >>are > > > > >> several use cases in LinkedIn for per record serializer > > > > >> - In Samza, there are some in-stream order-sensitive > control > > > > >> messages > > > > >> having different deserializer from other messages. > > > > >> - There are use cases which need support for sending both > > Avro > > > > >> messages > > > > >> and raw bytes. > > > > >> - Some use cases needs to deserialize some Avro messages > > into > > > > >> generic > > > > >> record and some other messages into specific record. > > > > >> B. In current proposal, the serializer/deserilizer is > > > > >>instantiated > > > > >> according to config. Compared with that, binding serializer with > > > > >> ProducerRecord and ConsumerRecord is less error prone. > > > > >> > > > > >> > > > > >> This option includes the following changes: > > > > >> A. Add serializer and deserializer interfaces to replace > > > > >>serializer > > > > >> instance from config. > > > > >> Public interface Serializer <K, V> { > > > > >> public byte[] serializeKey(K key); > > > > >> public byte[] serializeValue(V value); > > > > >> } > > > > >> Public interface deserializer <K, V> { > > > > >> Public K deserializeKey(byte[] key); > > > > >> public V deserializeValue(byte[] value); > > > > >> } > > > > >> > > > > >> B. Make ProducerRecord and ConsumerRecord abstract class > > > > >> implementing > > > > >> Serializer <K, V> and Deserializer <K, V> respectively. > > > > >> Public abstract class ProducerRecord <K, V> > > implements > > > > >> Serializer <K, V> > > > > >> {...} > > > > >> Public abstract class ConsumerRecord <K, V> > > implements > > > > >> Deserializer <K, > > > > >> V> {...} > > > > >> > > > > >> C. Instead of instantiate the serializer/Deserializer from > > > > >>config, > > > > >> let > > > > >> concrete ProducerRecord/ConsumerRecord extends the abstract class > > and > > > > >> override the serialize/deserialize methods. > > > > >> > > > > >> Public class AvroProducerRecord extends > > ProducerRecord > > > > >> <String, > > > > >> GenericRecord> { > > > > >> ... > > > > >> @Override > > > > >> Public byte[] serializeKey(String key) {Š} > > > > >> @Override > > > > >> public byte[] serializeValue(GenericRecord > > > > >>value); > > > > >> } > > > > >> > > > > >> Public class AvroConsumerRecord extends > > ConsumerRecord > > > > >> <String, > > > > >> GenericRecord> { > > > > >> ... > > > > >> @Override > > > > >> Public K deserializeKey(byte[] key) {Š} > > > > >> @Override > > > > >> public V deserializeValue(byte[] value); > > > > >> } > > > > >> > > > > >> D. The producer API changes to > > > > >> Public class KafkaProducer { > > > > >> ... > > > > >> > > > > >> Future<RecordMetadata> send > (ProducerRecord > > > <K, > > > > >>V> > > > > >> record) { > > > > >> ... > > > > >> K key = > > > record.serializeKey(record.key); > > > > >> V value = > > > > >> record.serializedValue(record.value); > > > > >> BytesProducerRecord > > > bytesProducerRecord > > > > >>= > > > > >> new > > > > >> BytesProducerRecord(topic, partition, key, value); > > > > >> ... > > > > >> } > > > > >> ... > > > > >> } > > > > >> > > > > >> > > > > >> > > > > >> We also had some brainstorm in LinkedIn and here are the > feedbacks: > > > > >> > > > > >> If the community decide to add the serialization back to new > > producer, > > > > >> besides current proposal which changes new producer API to be a > > > > >>template, > > > > >> there are some other options raised during our discussion: > > > > >> 1) Rather than change current new producer API, we can > > > provide a > > > > >> wrapper > > > > >> of current new producer (e.g. KafkaSerializedProducer) and make it > > > > >> available to users. As there is value in the simplicity of current > > > API. > > > > >> > > > > >> 2) If we decide to go with tempalated new producer API, > > > > >>according > > > > >> to > > > > >> experience in LinkedIn, it might worth considering to instantiate > > the > > > > >> serializer in code instead of from config so we can avoid runtime > > > errors > > > > >> due to dynamic instantiation from config, which is more error > prone. > > > If > > > > >> that is the case, the producer API could be changed to something > > like: > > > > >> producer = new Producer<K, V>(KeySerializer<K>, > > > > >> ValueSerializer<V>) > > > > >> > > > > >> --Jiangjie (Becket) Qin > > > > >> > > > > >> > > > > >> On 11/24/14, 5:58 PM, "Jun Rao" <jun...@gmail.com> wrote: > > > > >> > > > > >> >Hi, Everyone, > > > > >> > > > > > >> >I'd like to start a discussion on whether it makes sense to add > the > > > > >> >serializer api back to the new java producer. Currently, the new > > java > > > > >> >producer takes a byte array for both the key and the value. While > > > this > > > > >>api > > > > >> >is simple, it pushes the serialization logic into the > application. > > > This > > > > >> >makes it hard to reason about what type of data is being sent to > > > Kafka > > > > >>and > > > > >> >also makes it hard to share an implementation of the serializer. > > For > > > > >> >example, to support Avro, the serialization logic could be quite > > > > >>involved > > > > >> >since it might need to register the Avro schema in some remote > > > registry > > > > >> >and > > > > >> >maintain a schema cache locally, etc. Without a serialization > api, > > > it's > > > > >> >impossible to share such an implementation so that people can > > easily > > > > >> >reuse. > > > > >> >We sort of overlooked this implication during the initial > > discussion > > > of > > > > >> >the > > > > >> >producer api. > > > > >> > > > > > >> >So, I'd like to propose an api change to the new producer by > adding > > > > >>back > > > > >> >the serializer api similar to what we had in the old producer. > > > > >>Specially, > > > > >> >the proposed api changes are the following. > > > > >> > > > > > >> >First, we change KafkaProducer to take generic types K and V for > > the > > > > >>key > > > > >> >and the value, respectively. > > > > >> > > > > > >> >public class KafkaProducer<K,V> implements Producer<K,V> { > > > > >> > > > > > >> > public Future<RecordMetadata> send(ProducerRecord<K,V> > record, > > > > >> >Callback > > > > >> >callback); > > > > >> > > > > > >> > public Future<RecordMetadata> send(ProducerRecord<K,V> > record); > > > > >> >} > > > > >> > > > > > >> >Second, we add two new configs, one for the key serializer and > > > another > > > > >>for > > > > >> >the value serializer. Both serializers will default to the byte > > array > > > > >> >implementation. > > > > >> > > > > > >> >public class ProducerConfig extends AbstractConfig { > > > > >> > > > > > >> > .define(KEY_SERIALIZER_CLASS_CONFIG, Type.CLASS, > > > > >> >"org.apache.kafka.clients.producer.ByteArraySerializer", > > > > >>Importance.HIGH, > > > > >> >KEY_SERIALIZER_CLASS_DOC) > > > > >> > .define(VALUE_SERIALIZER_CLASS_CONFIG, Type.CLASS, > > > > >> >"org.apache.kafka.clients.producer.ByteArraySerializer", > > > > >>Importance.HIGH, > > > > >> >VALUE_SERIALIZER_CLASS_DOC); > > > > >> >} > > > > >> > > > > > >> >Both serializers will implement the following interface. > > > > >> > > > > > >> >public interface Serializer<T> extends Configurable { > > > > >> > public byte[] serialize(String topic, T data, boolean isKey); > > > > >> > > > > > >> > public void close(); > > > > >> >} > > > > >> > > > > > >> >This is more or less the same as what's in the old producer. The > > > slight > > > > >> >differences are (1) the serializer now only requires a > > parameter-less > > > > >> >constructor; (2) the serializer has a configure() and a close() > > > method > > > > >>for > > > > >> >initialization and cleanup, respectively; (3) the serialize() > > method > > > > >> >additionally takes the topic and an isKey indicator, both of > which > > > are > > > > >> >useful for things like schema registration. > > > > >> > > > > > >> >The detailed changes are included in KAFKA-1797. For > completeness, > > I > > > > >>also > > > > >> >made the corresponding changes for the new java consumer api as > > well. > > > > >> > > > > > >> >Note that the proposed api changes are incompatible with what's > in > > > the > > > > >> >0.8.2 branch. However, if those api changes are beneficial, it's > > > > >>probably > > > > >> >better to include them now in the 0.8.2 release, rather than > later. > > > > >> > > > > > >> >I'd like to discuss mainly two things in this thread. > > > > >> >1. Do people feel that the proposed api changes are reasonable? > > > > >> >2. Are there any concerns of including the api changes in the > 0.8.2 > > > > >>final > > > > >> >release? > > > > >> > > > > > >> >Thanks, > > > > >> > > > > > >> >Jun > > > > >> > > > > >> > > > > > > > > > > > > > > -- -- Guozhang