One problem though. Since WindowedSerde (Windowed(De)Serializer) are so similar, I’m trying to mimic the implementation of my ListSerde accordingly.
I created couple constants under StreamsConfig: And trying to do similar construct: final String propertyName = isKey ? StreamsConfig.DEFAULT_LIST_KEY_SERDE_INNER_CLASS : StreamsConfig.DEFAULT_LIST_VALUE_SERDE_INNER_CLASS; But then found out that StreamsConfig is not accessible from org.apache.kafka.common.serialization package while window serde (de)serializers are located under org.apache.kafka.streams.kstream package. What should I do? Should I move my classes under org.apache.kafka.streams.kstream package instead? > On Jul 15, 2019, at 10:45 AM, Development <d...@yeralin.net> wrote: > > Hi Matthias, > > Thank you for your input. > > I updated the KIP, made it a little more readable. > > I think the configuration parameters strategy is finalized then. > > Do you have any other questions/concerns regarding this KIP? > > Meanwhile I’ll start doing appropriate code changes, and commit them under my > PR. > > Best, > Daniyar Yeralin > >> On Jul 11, 2019, at 2:44 PM, Matthias J. Sax <matth...@confluent.io> wrote: >> >> Daniyar, >> >> thanks for the update to the KIP. It's in really good shape and well >> written. >> >> About the default constructor question: >> >> All Serdes/Serializer/Deserializer classes need a default constructor to >> create them easily via reflections when specifies in a config. I >> understand that it is not super user friendly, but all existing code >> works this way. Hence, it seems best to stick with the established pattern. >> >> We have a similar issue with `TimeWindowedSerde` and >> `SessionWindowedSerde`, and I just recently did a PR to improve user >> experience that address the exact issue John raised. (cf >> https://github.com/apache/kafka/pull/7067) >> >> Note, that if a user would instantiate the Serde manually, the user >> would also need to call `configure()` to setup the inner serdes. Kafka >> Streams would not setup those automatically and one might most likely >> end-up with an NPE. >> >> >> Coming back the KIP, and the parameter names. `WindowedSerdes` are >> similar to `ListSerde` as they wrap another Serde. For `WindowedSerdes`, >> we use the following parameter names: >> >> - default.windowed.key.serde.inner >> - default.windowed.value.serde.inner >> >> >> It might be good to align the naming pattern. I would also suggest to >> use `type` instead of `impl`? >> >> >> default.key.list.serde.impl -> default.list.key.serde.type >> default.value.list.serde.impl -> default.list.value.serde.type >> default.key.list.serde.element -> default.list.key.serde.inner >> default.value.list.serde.element -> default.list.value.serde.inner >> >> >> >> -Matthias >> >> >> On 7/10/19 8:52 AM, Development wrote: >>> Hi John, >>> >>> Yes, I do agree. That totally makes sense. The only thing is that it goes >>> against what Matthias suggested earlier: >>> "I think that ... `ListSerde` should have an default constructor and it >>> should be possible to pass in the `Class listClass` information via a >>> configuration. Otherwise, KafkaStreams cannot use it as default serde.” >>> >>> What do you think about that? I hope I’m not confusing anything. >>> >>> Best, >>> Daniyar Yeralin >>> >>>> On Jul 9, 2019, at 5:56 PM, John Roesler <j...@confluent.io> wrote: >>>> >>>> Ah, my apologies, I must have just overlooked it. Thanks for the update, >>>> too. >>>> >>>> Just one more super-small question, do we need this variant: >>>> >>>>> New method public static <T> Serde<List<T>> ListSerde() in >>>>> org.apache.kafka.common.serialization.Serdes class (infers list >>>>> implementation and inner serde from config file) >>>> >>>> It seems like this situation implies my config file is already set up for >>>> the list serde, so passing this serde (e.g., in Produced) would have the >>>> same effect as not specifying it. >>>> >>>> I guess that it could be the case that you have the >>>> `default.key/value.serde` set to something else, like StringSerde, but you >>>> still have the `default.key/value.list.serde.impl/element` set. This seems >>>> like it would result in more confusion than convenience, so my gut >>>> instinct is maybe we shouldn't introduce the `ListSerde()` variant until >>>> people actually request it later on. >>>> >>>> Thus, we'd just stick with fully config-driven or fully >>>> source-code-driven, not half/half. >>>> >>>> What do you think? >>>> >>>> Thanks, >>>> -John >>>> >>>> >>>> On Tue, Jul 9, 2019 at 9:58 AM Development <d...@yeralin.net >>>> <mailto:d...@yeralin.net>> wrote: >>>>> >>>>> Hi John, >>>>> >>>>> I hope everyone had a great long weekend. >>>>> >>>>> Regarding Java interfaces, I may not understand you correctly, but I >>>>> think I already listed them: >>>>> >>>>> So for Produced, you would use it in the following fashion, for example: >>>>> Produced.keySerde(Serdes.ListSerde(ArrayList.class, Serdes.Integer())) >>>>> >>>>> I also updated the KIP, and added a section “Serialization Strategy” >>>>> where I describe our logic of conditional serialization based on the type >>>>> of an inner serde. >>>>> >>>>> Thank you! >>>>> >>>>> Best, >>>>> Daniyar Yeralin >>>>> >>>>> On Jun 26, 2019, at 11:44 AM, John Roesler <j...@confluent.io >>>>> <mailto:j...@confluent.io>> wrote: >>>>> >>>>> Thanks for the update, Daniyar! >>>>> >>>>> In addition to specifying the config interface, can you also specify >>>>> the Java interface? Namely, if I need to pass an instance of this >>>>> serde in to the DSL directly, as in Produced, Materialized, etc., what >>>>> constructor(s) would I have available? Likewise with the Serializer >>>>> and Deserailizer. I don't think you need to specify the implementation >>>>> logic, since we've already discussed it here. >>>>> >>>>> If you also want to specify the serialized format of the data records >>>>> in the KIP, it could be useful documentation, as well as letting us >>>>> verify the schema for forward/backward compatibility concerns, etc. >>>>> >>>>> Thanks, >>>>> John >>>>> >>>>> On Wed, Jun 26, 2019 at 10:33 AM Development <d...@yeralin.net >>>>> <mailto:d...@yeralin.net>> wrote: >>>>> >>>>> >>>>> Hey, >>>>> >>>>> Finally made updates to the KIP: >>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-466%3A+Add+support+for+List%3CT%3E+serialization+and+deserialization >>>>> >>>>> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-466%3A+Add+support+for+List%3CT%3E+serialization+and+deserialization> >>>>> >>>>> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-466:+Add+support+for+List%3CT%3E+serialization+and+deserialization >>>>> >>>>> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-466:+Add+support+for+List%3CT%3E+serialization+and+deserialization>> >>>>> Sorry for the delay :) >>>>> >>>>> Thank You! >>>>> >>>>> Best, >>>>> Daniyar Yeralin >>>>> >>>>> On Jun 22, 2019, at 12:49 AM, Matthias J. Sax <matth...@confluent.io >>>>> <mailto:matth...@confluent.io>> wrote: >>>>> >>>>> Yes, something like this. I did not think about good configuration >>>>> parameter names yet. I am also not sure if I understand all proposed >>>>> configs atm. But all configs should be listed and explained in the KIP >>>>> anyway, and we can discuss further after you have updated the KIP (I can >>>>> ask more detailed question if I have any). >>>>> >>>>> >>>>> -Matthias >>>>> >>>>> On 6/21/19 2:05 PM, Development wrote: >>>>> >>>>> Yes, you are right. ByteSerializer is not what I need to have in a list >>>>> of primitives. >>>>> >>>>> As for the default constructor and configurability, just want to make >>>>> sure. Is this what you have on your mind? >>>>> >>>>> Best, >>>>> Daniyar Yeralin >>>>> >>>>> >>>>> >>>>> On Jun 21, 2019, at 2:51 PM, Matthias J. Sax <matth...@confluent.io >>>>> <mailto:matth...@confluent.io> >>>>> <mailto:matth...@confluent.io <mailto:matth...@confluent.io>>> wrote: >>>>> >>>>> Thanks for the update! >>>>> >>>>> I think that `ListDeserializer`, `ListSerializer`, and `ListSerde` >>>>> should have an default constructor and it should be possible to pass in >>>>> the `Class listClass` information via a configuration. Otherwise, >>>>> KafkaStreams cannot use it as default serde. >>>>> >>>>> >>>>> For the primitive serializers: `BytesSerializer` is not primitive IMHO, >>>>> as is it for `byte[]` with variable length -- it's for arrays, not for >>>>> single `byte` (note, that `Bytes` is a Kafka class wrapping `byte[]`). >>>>> >>>>> >>>>> For tests, we can comment on the PR. No need to do this in the KIP >>>>> discussion. >>>>> >>>>> >>>>> Can you also update the KIP? >>>>> >>>>> >>>>> >>>>> -Matthias >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> On 6/21/19 11:29 AM, Development wrote: >>>>> >>>>> I made and pushed necessary commits, so we could review the final >>>>> version under PR https://github.com/apache/kafka/pull/6592 >>>>> <https://github.com/apache/kafka/pull/6592> >>>>> >>>>> I also need some advice on writing tests for this new serde. So far I >>>>> only have two test cases (roundtrip and empty payload), I’m not sure >>>>> if it is enough. >>>>> >>>>> Thank y’all for your help in this KIP :) >>>>> >>>>> Best, >>>>> Daniyar Yeralin >>>>> >>>>> >>>>> On Jun 21, 2019, at 1:44 PM, John Roesler <j...@confluent.io >>>>> <mailto:j...@confluent.io> >>>>> <mailto:j...@confluent.io <mailto:j...@confluent.io>>> wrote: >>>>> >>>>> Hey Daniyar, >>>>> >>>>> Looks good to me! Thanks for considering it. >>>>> >>>>> Thanks, >>>>> -John >>>>> >>>>> On Fri, Jun 21, 2019 at 9:04 AM Development <d...@yeralin.net >>>>> <mailto:d...@yeralin.net> >>>>> <mailto:d...@yeralin.net <mailto:d...@yeralin.net>> >>>>> <mailto:d...@yeralin.net <mailto:d...@yeralin.net>>> wrote: >>>>> Hey John and Matthias, >>>>> >>>>> Yes, now I see it all. I’m storing lots of redundant information. >>>>> Here is my final idea. Yes, now a user should pass a list type. I >>>>> realized that’s the type is not really needed in ListSerializer, but >>>>> only in ListDeserializer: >>>>> >>>>> >>>>> In ListSerializer we will start storing sizes only if serializer is >>>>> not a primitive serializer: >>>>> >>>>> >>>>> Then, in deserializer, we persist passed list type, so that during >>>>> deserialization we could create an instance of it with predefined >>>>> listSize for better performance. >>>>> We also try to locate a primitiveSize based on passed deserializer. >>>>> If it is not there, then primitiveSize will be null. Which means >>>>> that each entry’s size was encoded individually. >>>>> >>>>> >>>>> This looks much cleaner and more concise. >>>>> >>>>> What do you think? >>>>> >>>>> Best, >>>>> Daniyar Yeralin >>>>> >>>>> On Jun 20, 2019, at 5:45 PM, Matthias J. Sax <matth...@confluent.io >>>>> <mailto:matth...@confluent.io> >>>>> <mailto:matth...@confluent.io <mailto:matth...@confluent.io>> >>>>> <mailto:matth...@confluent.io <mailto:matth...@confluent.io>>> wrote: >>>>> >>>>> For encoding the list-type: I see John's point about re-encoding the >>>>> list-type redundantly. However, I also don't like the idea that the >>>>> Deserializer returns a fixed type... >>>>> >>>>> Maybe it's best allow users to specify the target list type on >>>>> deserialization via config? >>>>> >>>>> Similar for the primitive types: I don't think we need to encode the >>>>> type size, but users could specify the type on the deserializer (via a >>>>> config again)? >>>>> >>>>> >>>>> About generics: nesting could be arbitrarily deep. Hence, I doubt >>>>> we can >>>>> support this and a cast will be necessary at some point in the user >>>>> code. >>>>> >>>>> >>>>> >>>>> -Matthias >>>>> >>>>> >>>>> >>>>> On 6/20/19 1:21 PM, John Roesler wrote: >>>>> >>>>> Hey Daniyar, >>>>> >>>>> Thanks for looking at it! >>>>> >>>>> Something like your screenshot is more along the lines of what I was >>>>> thinking. Sorry, but I didn't follow what you mean, how would that not >>>>> be "vanilla java"? >>>>> >>>>> Unfortunately the deserializer needs more information, though. For >>>>> example, what if the inner type is a Map<String,String>? The serde >>>>> could >>>>> only be used to produce a LinkedList<Map>, thus, we'd still need an >>>>> inner serde, like you have in the KIP (Serde<T> innerSerde). >>>>> >>>>> Something more like Serde<LinkedList<MyRecord>> = Serdes.listSerde( >>>>> /**list type**/ LinkedList.class, >>>>> /**inner serde**/ new MyRecordSerde() >>>>> ) >>>>> >>>>> And in configuration, it's something like: >>>>> default.key.serde: org...ListSerde >>>>> default.key.list.serde.type: java.util.LinkedList >>>>> default.key.list.serde.inner: com.mycompany.MyRecordSerde >>>>> >>>>> >>>>> What do you think? >>>>> Thanks, >>>>> -John >>>>> >>>>> On Thu, Jun 20, 2019 at 2:46 PM Development <d...@yeralin.net >>>>> <mailto:d...@yeralin.net> >>>>> <mailto:d...@yeralin.net <mailto:d...@yeralin.net>> >>>>> <mailto:d...@yeralin.net <mailto:d...@yeralin.net>> >>>>> <mailto:d...@yeralin.net <mailto:d...@yeralin.net> >>>>> <mailto:d...@yeralin.net <mailto:d...@yeralin.net>>>> wrote: >>>>> >>>>> Hey John, >>>>> >>>>> I gave read about TypeReference. It could work for the list serde. >>>>> However, it is not directly >>>>> supported: >>>>> https://github.com/FasterXML/jackson-databind/issues/1490 >>>>> <https://github.com/FasterXML/jackson-databind/issues/1490> >>>>> <https://github.com/FasterXML/jackson-databind/issues/1490 >>>>> <https://github.com/FasterXML/jackson-databind/issues/1490>> >>>>> The only way is to pass an actual class object into the constructor, >>>>> something like: >>>>> >>>>> It could be an option, but not a pretty one. What do you think of my >>>>> approach to use vanilla java and canonical class name? (As described >>>>> previously) >>>>> >>>>> Best, >>>>> Daniyar Yeralin >>>>> >>>>> On Jun 20, 2019, at 2:45 PM, Development <d...@yeralin.net >>>>> <mailto:d...@yeralin.net> >>>>> <mailto:d...@yeralin.net <mailto:d...@yeralin.net>> >>>>> <mailto:d...@yeralin.net <mailto:d...@yeralin.net>> >>>>> <mailto:d...@yeralin.net <mailto:d...@yeralin.net> >>>>> <mailto:d...@yeralin.net <mailto:d...@yeralin.net>>>> wrote: >>>>> >>>>> Hi John, >>>>> >>>>> Thank you for your input! Yes, my idea looks a little bit over >>>>> engineered :) >>>>> >>>>> I also wanted to see a feedback from Mathias as well since he gave >>>>> me an idea about storing fixed/variable size entries. >>>>> >>>>> Best, >>>>> Daniyar Yeralin >>>>> >>>>> On Jun 18, 2019, at 6:06 PM, John Roesler <j...@confluent.io >>>>> <mailto:j...@confluent.io> >>>>> <mailto:j...@confluent.io <mailto:j...@confluent.io>> >>>>> <mailto:j...@confluent.io <mailto:j...@confluent.io>> >>>>> <mailto:j...@confluent.io <mailto:j...@confluent.io> >>>>> <mailto:j...@confluent.io <mailto:j...@confluent.io>>>> wrote: >>>>> >>>>> Hi Daniyar, >>>>> >>>>> That's a very clever solution! >>>>> >>>>> One observation is that, now, this is what we might call a >>>>> polymorphic >>>>> serde. That is, you're detecting the actual concrete type and then >>>>> promising to produce the exact same concrete type on read. >>>>> There are >>>>> some inherent problems with this approach, which in general >>>>> require >>>>> some kind of schema registry (not necessarily Schema >>>>> Registry, just >>>>> any registry for schemas) to solve. >>>>> >>>>> Notice that every serialized record has quite a bit of duplicated >>>>> information: the concrete type as well as a byte to indicate >>>>> whether >>>>> the value type is a fixed size, and, if so, an integer to >>>>> indicate the >>>>> actual size. These constitute a schema, of sorts, because they >>>>> tell us >>>>> later how exactly to deserialize the data. Unfortunately, this >>>>> information is completely redundant. In all likelihood, the >>>>> information will be exactly the same for every record in the >>>>> topic. >>>>> This problem is essentially the core motivation for serializations >>>>> like Avro: to move the schema outside of the serialization >>>>> itself, so >>>>> that the records won't contain so much redundant information. >>>>> >>>>> In this light, I'm wondering if it makes sense to go back to >>>>> something >>>>> like what you had earlier in which you don't support perfectly >>>>> preserving the concrete type for _this_ serde, but instead just >>>>> support deserializing to _some_ List. Then, you could defer full, >>>>> perfect, type preservation to serdes that have an external >>>>> system in >>>>> which to register their type information. >>>>> >>>>> There does exist an alternative, if we really do want to >>>>> preserve the >>>>> concrete type (which does seem kind of nice). You can add a >>>>> configuration option specifically for the serde to configure >>>>> what the >>>>> list type will be, and maybe what the element type is, as well. >>>>> >>>>> As far as "related work" goes, you might be interested to take >>>>> a look >>>>> at how Jackson can be configured to deserialize into a specific, >>>>> arbitrarily nested, generically parameterized class structure. >>>>> Specifically, you might find >>>>> https://fasterxml.github.io/jackson-core/javadoc/2.0.0/com/fasterxml/jackson/core/type/TypeReference.html >>>>> >>>>> <https://fasterxml.github.io/jackson-core/javadoc/2.0.0/com/fasterxml/jackson/core/type/TypeReference.html> >>>>> <https://fasterxml.github.io/jackson-core/javadoc/2.0.0/com/fasterxml/jackson/core/type/TypeReference.html >>>>> >>>>> <https://fasterxml.github.io/jackson-core/javadoc/2.0.0/com/fasterxml/jackson/core/type/TypeReference.html>> >>>>> interesting. >>>>> >>>>> Thanks, >>>>> -John >>>>> >>>>> On Mon, Jun 17, 2019 at 12:38 PM Development <d...@yeralin.net >>>>> <mailto:d...@yeralin.net> >>>>> <mailto:d...@yeralin.net <mailto:d...@yeralin.net>> >>>>> <mailto:d...@yeralin.net <mailto:d...@yeralin.net>> >>>>> <mailto:d...@yeralin.net <mailto:d...@yeralin.net> >>>>> <mailto:d...@yeralin.net <mailto:d...@yeralin.net>>>> wrote: >>>>> >>>>> >>>>> bump >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>> >>> >> >