John Roesler created KAFKA-7806:
-----------------------------------

             Summary: Windowed Aggregations should wrap default key serde if 
none is specified
                 Key: KAFKA-7806
                 URL: https://issues.apache.org/jira/browse/KAFKA-7806
             Project: Kafka
          Issue Type: Improvement
          Components: streams
            Reporter: John Roesler


In Streams, windowing a stream by either time or session windows causes the 
stream's keys to be transformed from `K` to `Windowed<K>`.

Since this is a well defined transition, it's not necessary for developers to 
explicitly provide a `Serde<Windowed<K>>`. For convenience, Streams, which 
already knows the key serde (`Serde<K>`) automatically wraps it in case it's 
needed by downstream operators.

However, this automatic wrapping only takes place if the key serde has been 
explicitly provided in the topology. If the topology relies on the 
`default.key.serde` configuration, no wrapping takes place, and downstream 
operators will encounter a ClassCastException trying to cast a `Windowed` (the 
windowed key) to whatever type the default serde handles (which is the key 
wrapped inside the windowed key).

Specifically, they key serde forwarding logic is:

in `org.apache.kafka.streams.kstream.internals.TimeWindowedKStreamImpl`:

`materializedInternal.keySerde() != null ? new 
FullTimeWindowedSerde<>(materializedInternal.keySerde(), windows.size()) : null`

and in `org.apache.kafka.streams.kstream.internals.SessionWindowedKStreamImpl`:

`materializedInternal.keySerde() != null ? new 
WindowedSerdes.SessionWindowedSerde<>(materializedInternal.keySerde()) : null`

 

This pattern of not "solidifying" the default key serde is common in Streams. 
Not all operators need a serde, and the default serde may not be applicable to 
all operators. So, it would be a mistake to arbitrary operators to grab the 
default serde and pass it downstream as if it had been explicitly set.

 

However, in this case specifically, all windowed aggregations are stateful, so 
if we don't have an explicit key serde at this point, we know that we have used 
the default serde in the window store. If the default serde were incorrect, an 
exception would be thrown by the windowed aggregation itself. So it actually is 
safe to wrap the default serde in a windowed serde and pass it downstream, 
which would result in a better development experience.

 

Unfortunately, the default serde is set via config, but the windowed serde 
wrapping happens during DSL building, when the config is not generally 
available. Therefore, we would need a special windowed serde wrapper that 
signals that it wraps the default serde, which would be fully resolved during 
operators' init call.

For example, something of this nature:

`materializedInternal.keySerde() != null ? new 
FullTimeWindowedSerde<>(materializedInternal.keySerde(), windows.size()) : 
FullTimeWindowedSerde.wrapDefault(windows.size())`

etc.

 

Complicating the situation slightly, all the windowed serializers and 
deserializers will resolve a runtime inner class using 
`default.windowed.key.serde.inner` if given a null inner serde to wrap. 
However, at this point in the topology build, we do know that the windowed 
aggregation has specifically used the `default.key.serde`, not the 
`default.windowed.key.serde.inner` to persist its state to the window store, 
therefore, it should be correct to wrap the default key serde specifically and 
not use the `default.windowed.key.serde.inner`.

 

In addition to fixing this for TimeWindowed and SessionWindowed streams, we 
need to have good test coverage of the new code. There is clearly a blind spot 
in the tests, or we would have noticed this sooner.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to