Hi Sachin, I’m glad it helped!
What you have in mind is a good thing to do. One thing to watch out for is _not_ to add names using Materialized for KTable operations that otherwise would not create a store. For example, if you filter or mapValues a KTable, those operations usually do not actually require storing any state. But if you add a name with Materialized, you’re telling Streams to actually create a state store and materialize the table. What I would do is write the topology without names first, then use topology.describe() to figure out which actual stores are needed, and then name them. This is an area I have some plans to improve. To answer your question, if you didn’t need serdes before, you don’t need them when you just use Materialized.as(name). Streams should continue to pass down the serdes through the program where it can. Hope this answers your question. -John On Sat, Dec 7, 2019, at 03:20, Sachin Mittal wrote: > Hi John, > This was very helpful. However I am still confused about when to set the > names for Materialized and Grouped. > I am basically setting the names because to have definite names of state > stores and internal topics identifiable for debugging purpose. > > So when we set a name, do we also need to set serde for key/value type? > If not then what defaults are used by them? > > I'll just explain by quick example: > My original code was: > table = stream.map((k, v) -> ...).groupByKey().reduce((av, nv) -> nv) > > In order to set some names to the intermediate stores/topics I changed the > code as: > table = stream.map((k, v) -> ...).groupByKey(Grouped.with("group", > Serde<K>, Serde<V>)).reduce((av, nv) -> nv, Materialized.as("store")) > > So I wanted to know once I create a named Materialzed do I need to set its > key/value serde too? > so is this the better code > table = stream > .map((k, v) -> ...) > .groupByKey(Grouped.with("group", Serde<K>, Serde<V>)) > .reduce((av, nv) -> nv, Materialized.<K, V, KeyValueStore<Bytes, > byte[]>>as("store-name").withKeySerde(Serde<K>).withValueSerde(Serde<V>))) > > Note that I have custom class for Key and Value. > > Thanks > Sachin > > > > On Fri, Dec 6, 2019 at 11:02 PM John Roesler <vvcep...@apache.org> wrote: > > > Hi Sachin, > > > > The way that Java infers generic arguments makes that case particularly > > obnoxious. > > > > By the way, the problem you're facing is specifically addressed by these > > relatively new features: > > * > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-307%3A+Allow+to+define+custom+processor+names+with+KStreams+DSL > > * > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-372%3A+Naming+Repartition+Topics+for+Joins+and+Grouping > > > > Since this behavior has been under development recently, I thought you > > might benefit from the context. > > > > To answer your question, what you have to do is explicitly mention the > > type arguments to "Materialized.as(name)" when you're using the > > withKeySerde, etc. > > > > It will look something like this: > > > > Materialized > > .<KeyType, ValueType KeyValueStore<Bytes, byte[]>>as("store-name") > > .withKeySerde(new Serde<KeyType>...) > > .withValueSerde(new Serde<ValueType>...)); > > > > I can explain exactly why this is necessary if you want, but the short > > answer is that the Java type system only makes a rudimentary effort to > > infer types. > > > > FWIW, this "paper cut" makes me irrationally angry, and I'm hoping we can > > find a way to fix it, if we ever change the Materialized builder interface. > > > > Hope this helps, > > -John > > > > On Fri, Dec 6, 2019, at 11:15, Sachin Mittal wrote: > > > Hi, > > > In my application I have names of internal topics like this: > > > > > > ss-session-application-KSTREAM-JOINOTHER-0000000059-store-changelog-0 > > > ss-session-application-KSTREAM-JOINTHIS-0000000049-store-changelog-0 > > > ss-session-application-KSTREAM-OUTEROTHER-0000000050-store-changelog-0 > > > ss-session-application-KTABLE-MERGE-STATE-STORE-0000000061-changelog-0 > > > > > > Is it possible to set concrete names for these instead of say ** > > > KSTREAM-JOINOTHER-0000000059-store** > > > > > > This way I can identify at what code in my DSL is responsible for data > > > inside them. > > > > > > So far I have set names for: > > > Grouped.with > > > Materialized.as > > > Joined.with > > > > > > This has helped me get concrete names at many places however still at > > some > > > places I see arbitrary names. > > > > > > Also note that somehow this code works > > > Materialized.with(new JSONSerde<K>(), new TupleJSONSerde<Pair<L, V>>()) > > > > > > But not: > > > Materialized.as("d-l-i-store").withKeySerde(new > > > JSONSerde<K>()).withValueSerde(new TupleJSONSerde<Pair<L, V>>()) > > > > > > The error I get is: > > > Description Resource Path Location Type > > > The method withKeySerde(Serde<Object>) in the type > > > Materialized<Object,Object,StateStore> is not applicable for the > > arguments > > > (JSONSerde<K>) > > > > > > I have my class > > > > > > class JSONSerde<T extends JSONSerdeCompatible> implements Serializer<T>, > > > Deserializer<T>, Serde<T> { > > > ...... > > > } > > > > > > This is pretty much same as from kafka streams typed example. > > > > > > Thanks > > > Sachin > > > > > >