Thanks Jun.

I think we all understand the motivation of adding serialization API back,
but are just proposing different ways of doing such. I personally prefer to
not bind the producer instance with a fixed serialization, but that said I
am fine with the current proposal too as this can still be done via other
workarounds.

Guozhang

On Tue, Dec 9, 2014 at 3:46 PM, Bhavesh Mistry <mistry.p.bhav...@gmail.com>
wrote:

> Hi All,
>
> This is very likely when you have large site such as Linked-in and you have
> thousand of servers producing data.  You will mixed bag of producer and
> serialization or deserialization because of incremental code deployment.
> So, it is best to keep the API as generic as possible and each org  /
> company can wrap the generic API with how ever they fit with serialization/
> de-serialization  framework (java or proto buffer or avro or base 64).
>
> Keep the API as generic as possible.
>
> Thanks,
>
> Bhavesh
>
> On Tue, Dec 9, 2014 at 3:29 PM, Steven Wu <stevenz...@gmail.com> wrote:
>
> > > In practice the cases that actually mix serialization types in a single
> > stream are pretty rare I think just because the consumer then has the
> > problem of guessing how to deserialize, so most of these will end up with
> > at least some marker or schema id or whatever that tells you how to read
> > the data. Arguable this mixed serialization with marker is itself a
> > serializer type and should have a serializer of its own...
> >
> > agree that it is unlikely to have mixed serialization format for one
> > topic/type. But we sometimes/often create one Producer object for one
> > cluster. and there can be many topics on this cluster. different topics
> may
> > have different serialization formats. So I agree with Guozhang's point
> > regarding "data type flexibility" of using simple byte[] (instead of
> > generic <K, V>).
> >
> > On Fri, Dec 5, 2014 at 5:00 PM, Jay Kreps <j...@confluent.io> wrote:
> >
> > > Hey Sriram,
> > >
> > > Thanks! I think this is a very helpful summary.
> > >
> > > Let me try to address your point about passing in the serde at send
> time.
> > >
> > > I think the first objection is really to the paired key/value
> serializer
> > > interfaces. This leads to kind of a weird combinatorial thing where you
> > > would have an avro/avro serializer a string/avro serializer, a pb/pb
> > > serializer, and a string/pb serializer, and so on. But your proposal
> > would
> > > work as well with separate serializers for key and value.
> > >
> > > I think the downside is just the one you call out--that this is a
> corner
> > > case and you end up with two versions of all the apis to support it.
> This
> > > also makes the serializer api more annoying to implement. I think the
> > > alternative solution to this case and any other we can give people is
> > just
> > > configuring ByteArraySerializer which gives you basically the api that
> > you
> > > have now with byte arrays. If this is incredibly common then this would
> > be
> > > a silly solution, but I guess the belief is that these cases are rare
> > and a
> > > really well implemented avro or json serializer should be 100% of what
> > most
> > > people need.
> > >
> > > In practice the cases that actually mix serialization types in a single
> > > stream are pretty rare I think just because the consumer then has the
> > > problem of guessing how to deserialize, so most of these will end up
> with
> > > at least some marker or schema id or whatever that tells you how to
> read
> > > the data. Arguable this mixed serialization with marker is itself a
> > > serializer type and should have a serializer of its own...
> > >
> > > -Jay
> > >
> > > On Fri, Dec 5, 2014 at 3:48 PM, Sriram Subramanian <
> > > srsubraman...@linkedin.com.invalid> wrote:
> > >
> > > > This thread has diverged multiple times now and it would be worth
> > > > summarizing them.
> > > >
> > > > There seems to be the following points of discussion -
> > > >
> > > > 1. Can we keep the serialization semantics outside the Producer
> > interface
> > > > and have simple bytes in / bytes out for the interface (This is what
> we
> > > > have today).
> > > >
> > > > The points for this is to keep the interface simple and usage easy to
> > > > understand. The points against this is that it gets hard to share
> > common
> > > > usage patterns around serialization/message validations for the
> future.
> > > >
> > > > 2. Can we create a wrapper producer that does the serialization and
> > have
> > > > different variants of it for different data formats?
> > > >
> > > > The points for this is again to keep the main API clean. The points
> > > > against this is that it duplicates the API, increases the surface
> area
> > > and
> > > > creates redundancy for a minor addition.
> > > >
> > > > 3. Do we need to support different data types per record? The current
> > > > interface (bytes in/bytes out) lets you instantiate one producer and
> > use
> > > > it to send multiple data formats. There seems to be some valid use
> > cases
> > > > for this.
> > > >
> > > > I have still not seen a strong argument against not having this
> > > > functionality. Can someone provide their views on why we don't need
> > this
> > > > support that is possible with the current API?
> > > >
> > > > One possible approach for the per record serialization would be to
> > define
> > > >
> > > > public interface SerDe<K,V> {
> > > >   public byte[] serializeKey();
> > > >
> > > >   public K deserializeKey();
> > > >
> > > >   public byte[] serializeValue();
> > > >
> > > >   public V deserializeValue();
> > > > }
> > > >
> > > > This would be used by both the Producer and the Consumer.
> > > >
> > > > The send APIs can then be
> > > >
> > > > public Future<RecordMetadata> send(ProducerRecord<K,V> record);
> > > > public Future<RecordMetadata> send(ProducerRecord<K,V> record,
> Callback
> > > > callback);
> > > >
> > > >
> > > > public Future<RecordMetadata> send(ProducerRecord<K,V> record,
> > SerDe<K,V>
> > > > serde);
> > > >
> > > > public Future<RecordMetadata> send(ProducerRecord<K,V> record,
> > SerDe<K,V>
> > > > serde, Callback callback);
> > > >
> > > >
> > > > A default SerDe can be set in the config. The producer would use the
> > > > default from the config if the non-serde send APIs are used. The
> > downside
> > > > to this approach is that we would need to have four variants of Send
> > API
> > > > for the Producer.
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > > On 12/5/14 3:16 PM, "Jun Rao" <j...@confluent.io> wrote:
> > > >
> > > > >Jiangjie,
> > > > >
> > > > >The issue with adding the serializer in ProducerRecord is that you
> > need
> > > to
> > > > >implement all combinations of serializers for key and value. So,
> > instead
> > > > >of
> > > > >just implementing int and string serializers, you will have to
> > implement
> > > > >all 4 combinations.
> > > > >
> > > > >Adding a new producer constructor like Producer<K,
> > V>(KeySerializer<K>,
> > > > >ValueSerializer<V>, Properties properties) can be useful.
> > > > >
> > > > >Thanks,
> > > > >
> > > > >Jun
> > > > >
> > > > >On Thu, Dec 4, 2014 at 10:33 AM, Jiangjie Qin
> > <j...@linkedin.com.invalid
> > > >
> > > > >wrote:
> > > > >
> > > > >>
> > > > >> I'm just thinking instead of binding serialization with producer,
> > > > >>another
> > > > >> option is to bind serializer/deserializer with
> > > > >> ProducerRecord/ConsumerRecord (please see the detail proposal
> > below.)
> > > > >>            The arguments for this option is:
> > > > >>         A. A single producer could send different message types.
> > There
> > > > >>are
> > > > >> several use cases in LinkedIn for per record serializer
> > > > >>         - In Samza, there are some in-stream order-sensitive
> control
> > > > >> messages
> > > > >> having different deserializer from other messages.
> > > > >>         - There are use cases which need support for sending both
> > Avro
> > > > >> messages
> > > > >> and raw bytes.
> > > > >>         - Some use cases needs to deserialize some Avro messages
> > into
> > > > >> generic
> > > > >> record and some other messages into specific record.
> > > > >>         B. In current proposal, the serializer/deserilizer is
> > > > >>instantiated
> > > > >> according to config. Compared with that, binding serializer with
> > > > >> ProducerRecord and ConsumerRecord is less error prone.
> > > > >>
> > > > >>
> > > > >>         This option includes the following changes:
> > > > >>         A. Add serializer and deserializer interfaces to replace
> > > > >>serializer
> > > > >> instance from config.
> > > > >>                 Public interface Serializer <K, V> {
> > > > >>                         public byte[] serializeKey(K key);
> > > > >>                         public byte[] serializeValue(V value);
> > > > >>                 }
> > > > >>                 Public interface deserializer <K, V> {
> > > > >>                         Public K deserializeKey(byte[] key);
> > > > >>                         public V deserializeValue(byte[] value);
> > > > >>                 }
> > > > >>
> > > > >>         B. Make ProducerRecord and ConsumerRecord abstract class
> > > > >> implementing
> > > > >> Serializer <K, V> and Deserializer <K, V> respectively.
> > > > >>                 Public abstract class ProducerRecord <K, V>
> > implements
> > > > >> Serializer <K, V>
> > > > >> {...}
> > > > >>                 Public abstract class ConsumerRecord <K, V>
> > implements
> > > > >> Deserializer <K,
> > > > >> V> {...}
> > > > >>
> > > > >>         C. Instead of instantiate the serializer/Deserializer from
> > > > >>config,
> > > > >> let
> > > > >> concrete ProducerRecord/ConsumerRecord extends the abstract class
> > and
> > > > >> override the serialize/deserialize methods.
> > > > >>
> > > > >>                 Public class AvroProducerRecord extends
> > ProducerRecord
> > > > >> <String,
> > > > >> GenericRecord> {
> > > > >>                         ...
> > > > >>                         @Override
> > > > >>                         Public byte[] serializeKey(String key) {Š}
> > > > >>                         @Override
> > > > >>                         public byte[] serializeValue(GenericRecord
> > > > >>value);
> > > > >>                 }
> > > > >>
> > > > >>                 Public class AvroConsumerRecord extends
> > ConsumerRecord
> > > > >> <String,
> > > > >> GenericRecord> {
> > > > >>                         ...
> > > > >>                         @Override
> > > > >>                         Public K deserializeKey(byte[] key) {Š}
> > > > >>                         @Override
> > > > >>                         public V deserializeValue(byte[] value);
> > > > >>                 }
> > > > >>
> > > > >>         D. The producer API changes to
> > > > >>                 Public class KafkaProducer {
> > > > >>                         ...
> > > > >>
> > > > >>                         Future<RecordMetadata> send
> (ProducerRecord
> > > <K,
> > > > >>V>
> > > > >> record) {
> > > > >>                                 ...
> > > > >>                                 K key =
> > > record.serializeKey(record.key);
> > > > >>                                 V value =
> > > > >> record.serializedValue(record.value);
> > > > >>                                 BytesProducerRecord
> > > bytesProducerRecord
> > > > >>=
> > > > >> new
> > > > >> BytesProducerRecord(topic, partition, key, value);
> > > > >>                                 ...
> > > > >>                         }
> > > > >>                         ...
> > > > >>                 }
> > > > >>
> > > > >>
> > > > >>
> > > > >> We also had some brainstorm in LinkedIn and here are the
> feedbacks:
> > > > >>
> > > > >> If the community decide to add the serialization back to new
> > producer,
> > > > >> besides current proposal which changes new producer API to be a
> > > > >>template,
> > > > >> there are some other options raised during our discussion:
> > > > >>         1) Rather than change current new producer API, we can
> > > provide a
> > > > >> wrapper
> > > > >> of current new producer (e.g. KafkaSerializedProducer) and make it
> > > > >> available to users. As there is value in the simplicity of current
> > > API.
> > > > >>
> > > > >>         2) If we decide to go with tempalated new producer API,
> > > > >>according
> > > > >> to
> > > > >> experience in LinkedIn, it might worth considering to instantiate
> > the
> > > > >> serializer in code instead of from config so we can avoid runtime
> > > errors
> > > > >> due to dynamic instantiation from config, which is more error
> prone.
> > > If
> > > > >> that is the case, the producer API could be changed to something
> > like:
> > > > >>                 producer = new Producer<K, V>(KeySerializer<K>,
> > > > >> ValueSerializer<V>)
> > > > >>
> > > > >> --Jiangjie (Becket) Qin
> > > > >>
> > > > >>
> > > > >> On 11/24/14, 5:58 PM, "Jun Rao" <jun...@gmail.com> wrote:
> > > > >>
> > > > >> >Hi, Everyone,
> > > > >> >
> > > > >> >I'd like to start a discussion on whether it makes sense to add
> the
> > > > >> >serializer api back to the new java producer. Currently, the new
> > java
> > > > >> >producer takes a byte array for both the key and the value. While
> > > this
> > > > >>api
> > > > >> >is simple, it pushes the serialization logic into the
> application.
> > > This
> > > > >> >makes it hard to reason about what type of data is being sent to
> > > Kafka
> > > > >>and
> > > > >> >also makes it hard to share an implementation of the serializer.
> > For
> > > > >> >example, to support Avro, the serialization logic could be quite
> > > > >>involved
> > > > >> >since it might need to register the Avro schema in some remote
> > > registry
> > > > >> >and
> > > > >> >maintain a schema cache locally, etc. Without a serialization
> api,
> > > it's
> > > > >> >impossible to share such an implementation so that people can
> > easily
> > > > >> >reuse.
> > > > >> >We sort of overlooked this implication during the initial
> > discussion
> > > of
> > > > >> >the
> > > > >> >producer api.
> > > > >> >
> > > > >> >So, I'd like to propose an api change to the new producer by
> adding
> > > > >>back
> > > > >> >the serializer api similar to what we had in the old producer.
> > > > >>Specially,
> > > > >> >the proposed api changes are the following.
> > > > >> >
> > > > >> >First, we change KafkaProducer to take generic types K and V for
> > the
> > > > >>key
> > > > >> >and the value, respectively.
> > > > >> >
> > > > >> >public class KafkaProducer<K,V> implements Producer<K,V> {
> > > > >> >
> > > > >> >    public Future<RecordMetadata> send(ProducerRecord<K,V>
> record,
> > > > >> >Callback
> > > > >> >callback);
> > > > >> >
> > > > >> >    public Future<RecordMetadata> send(ProducerRecord<K,V>
> record);
> > > > >> >}
> > > > >> >
> > > > >> >Second, we add two new configs, one for the key serializer and
> > > another
> > > > >>for
> > > > >> >the value serializer. Both serializers will default to the byte
> > array
> > > > >> >implementation.
> > > > >> >
> > > > >> >public class ProducerConfig extends AbstractConfig {
> > > > >> >
> > > > >> >    .define(KEY_SERIALIZER_CLASS_CONFIG, Type.CLASS,
> > > > >> >"org.apache.kafka.clients.producer.ByteArraySerializer",
> > > > >>Importance.HIGH,
> > > > >> >KEY_SERIALIZER_CLASS_DOC)
> > > > >> >    .define(VALUE_SERIALIZER_CLASS_CONFIG, Type.CLASS,
> > > > >> >"org.apache.kafka.clients.producer.ByteArraySerializer",
> > > > >>Importance.HIGH,
> > > > >> >VALUE_SERIALIZER_CLASS_DOC);
> > > > >> >}
> > > > >> >
> > > > >> >Both serializers will implement the following interface.
> > > > >> >
> > > > >> >public interface Serializer<T> extends Configurable {
> > > > >> >    public byte[] serialize(String topic, T data, boolean isKey);
> > > > >> >
> > > > >> >    public void close();
> > > > >> >}
> > > > >> >
> > > > >> >This is more or less the same as what's in the old producer. The
> > > slight
> > > > >> >differences are (1) the serializer now only requires a
> > parameter-less
> > > > >> >constructor; (2) the serializer has a configure() and a close()
> > > method
> > > > >>for
> > > > >> >initialization and cleanup, respectively; (3) the serialize()
> > method
> > > > >> >additionally takes the topic and an isKey indicator, both of
> which
> > > are
> > > > >> >useful for things like schema registration.
> > > > >> >
> > > > >> >The detailed changes are included in KAFKA-1797. For
> completeness,
> > I
> > > > >>also
> > > > >> >made the corresponding changes for the new java consumer api as
> > well.
> > > > >> >
> > > > >> >Note that the proposed api changes are incompatible with what's
> in
> > > the
> > > > >> >0.8.2 branch. However, if those api changes are beneficial, it's
> > > > >>probably
> > > > >> >better to include them now in the 0.8.2 release, rather than
> later.
> > > > >> >
> > > > >> >I'd like to discuss mainly two things in this thread.
> > > > >> >1. Do people feel that the proposed api changes are reasonable?
> > > > >> >2. Are there any concerns of including the api changes in the
> 0.8.2
> > > > >>final
> > > > >> >release?
> > > > >> >
> > > > >> >Thanks,
> > > > >> >
> > > > >> >Jun
> > > > >>
> > > > >>
> > > >
> > > >
> > >
> >
>



-- 
-- Guozhang

Reply via email to