Hi Sachin,

The way that Java infers generic arguments makes that case particularly 
obnoxious.

By the way, the problem you're facing is specifically addressed by these 
relatively new features:
* 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-307%3A+Allow+to+define+custom+processor+names+with+KStreams+DSL
* 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-372%3A+Naming+Repartition+Topics+for+Joins+and+Grouping

Since this behavior has been under development recently, I thought you might 
benefit from the context.

To answer your question, what you have to do is explicitly mention the type 
arguments to "Materialized.as(name)" when you're using the withKeySerde, etc. 

It will look something like this:

Materialized
  .<KeyType, ValueType KeyValueStore<Bytes, byte[]>>as("store-name")
  .withKeySerde(new Serde<KeyType>...)
  .withValueSerde(new Serde<ValueType>...));

I can explain exactly why this is necessary if you want, but the short answer 
is that the Java type system only makes a rudimentary effort to infer types.

FWIW, this "paper cut" makes me irrationally angry, and I'm hoping we can find 
a way to fix it, if we ever change the Materialized builder interface.

Hope this helps,
-John

On Fri, Dec 6, 2019, at 11:15, Sachin Mittal wrote:
> Hi,
> In my application I have names of internal topics like this:
> 
> ss-session-application-KSTREAM-JOINOTHER-0000000059-store-changelog-0
> ss-session-application-KSTREAM-JOINTHIS-0000000049-store-changelog-0
> ss-session-application-KSTREAM-OUTEROTHER-0000000050-store-changelog-0
> ss-session-application-KTABLE-MERGE-STATE-STORE-0000000061-changelog-0
> 
> Is it possible to set concrete names for these instead of say **
> KSTREAM-JOINOTHER-0000000059-store**
> 
> This way I can identify at what code in my DSL is responsible for data
> inside them.
> 
> So far I have set names for:
> Grouped.with
> Materialized.as
> Joined.with
> 
> This has helped me get concrete names at many places however still at some
> places I see arbitrary names.
> 
> Also note that somehow this code works
> Materialized.with(new JSONSerde<K>(), new TupleJSONSerde<Pair<L, V>>())
> 
> But not:
> Materialized.as("d-l-i-store").withKeySerde(new
> JSONSerde<K>()).withValueSerde(new TupleJSONSerde<Pair<L, V>>())
> 
> The error I get is:
> Description Resource Path Location Type
> The method withKeySerde(Serde<Object>) in the type
> Materialized<Object,Object,StateStore> is not applicable for the arguments
> (JSONSerde<K>)
> 
> I have my class
> 
> class JSONSerde<T extends JSONSerdeCompatible> implements Serializer<T>,
> Deserializer<T>, Serde<T> {
> ......
> }
> 
> This is pretty much same as from kafka streams typed example.
> 
> Thanks
> Sachin
>

Reply via email to