Hi All,
Since some time I’m involved in development of application that incorporates 
Kafka Streams API, I’m facing the problem with joining two Kafka topics. The 
problem is illustrated in simple test that ws prepared based on our production 
code. It is available here: 
https://bitbucket.org/b_a_r_t_k/streams-join-problem/ 
<https://bitbucket.org/b_a_r_t_k/streams-join-problem/>
As seen in the class JoinStreamBuilder:

val builder = StreamsBuilder()

val reducedLookupStoreName = "reduced_$lookupTableTopicName$streamId"

val streamToJoin = builder.stream(mainTopicName, Consumed.with(Serdes.String(), 
genericAvroSerde))
        .selectKey(MainKeySelector())

val lookupTable = builder.stream(lookupTableTopicName, 
Consumed.with(Serdes.String(), genericAvroSerde))
        .selectKey(LookupKeySelector())
        .groupByKey(Serialized.with(Serdes.String(), genericAvroSerde))
        .reduce({ _, new -> new },
                Materialized.`as`<String, GenericRecord, KeyValueStore<Bytes, 
ByteArray>>(reducedLookupStoreName).withKeySerde(Serdes.String()).withValueSerde(genericAvroSerde))

streamToJoin
        .leftJoin(lookupTable, Joiner(streamId), Joined.with(Serdes.String(), 
genericAvroSerde, genericAvroSerde))
        .to(targetTopicName, Produced.with(Serdes.String(), genericAvroSerde))
val topology = builder.build()

It is simple kind of lookup table to stream join. The Joiner implementation 
looks as follows 

class Joiner(private val streamId: Int) : ValueJoiner<GenericRecord, 
GenericRecord, GenericRecord> {
    override fun apply(main: GenericRecord?, lookup: GenericRecord?): 
GenericRecord {
        if (main == null) LOG.warn("for streamId: $streamId record from main is 
null")
        if (lookup == null) LOG.warn("for streamId: $streamId record from 
lookup is null")

        return GenericData.Record(MySampleData.schema)
                .apply {
                    put(MySampleData::stringField.name, 
main?.get(MySampleData::stringField.name))
                    put(MySampleData::booleanField.name, 
main?.get(MySampleData::booleanField.name))
                    put(MySampleData::intField.name, 
lookup?.get(MySampleData::intField.name))
                }
    }
}

The problem is that sometimes in not deterministic way Joiner’s apply() method 
gets null for lookup parameter, while in some cases the parameter is not null - 
as expected.
The repo I referred above contains a test that is supposed to use that 
topology. It iterates 10 times building new instance of the topology each time 
and then it feeds two topics with sample data (10 records for each topic) 
expecting 1 to 1 join will be performed for each records pair. 
As seen in log output:
2019-09-08 13:49:09,634 [main] INFO  com.example.demo.JoinStreamTest 
[tenantId=]  - Number of not properly joined per iteration (iteration number -> 
number of errors): {0=1, 1=1, 2=1, 3=1, 4=0, 5=1, 6=1, 7=1, 8=1, 9=0}. Total 
errors: 8 

Some of of the iteration produce no errors, while most of them does.

Any help welcome. At this point I have no clue what may clause such behaviour.
Best regards
BK

Reply via email to