Just to be clear, the timing issue I was referring to was with consuming
the data, not publishing.  In order for your join to work correctly (and
consistently), all the data in the lookupTable needs to be there BEFORE the
streamToJoin data starts processing right?   Your topology won't wait for
the lookupTable to get fully populated before processing data, which means
there might be cases where streamToJoin is trying to find a match in but it
isn't there yet because it hasn't fully consumed its topic.  This is why I
think using a GlobalKTable might solve your problem since it bootstraps the
global ktable first.  You might have already understood what I meant, just
making sure.  Good luck!

Alex

On Mon, Sep 9, 2019 at 5:27 AM Bartłomiej Kępa <bartlomiej.k...@gmail.com>
wrote:

> Hi Alex
> Thank you for your quick response. Unfortunately it seems that it is not
> timing issue. At least in following understanding. I modified the test is a
> way that it ensures that the messages were committed to both topics before
> I actually start the topology. Still no improvement with regards to
> expected result. Seems I need to investigate second option with
> GlobalKTable.
> Best regard,
> BK
>
> > Wiadomość napisana przez Alex Brekken <brek...@gmail.com> w dniu
> 08.09.2019, o godz. 14:49:
> >
> > The non-deterministic behavior you're seeing might be the result of a
> > timing issue.  In other words, in some cases your KTable is fully
> populated
> > by the time data in "streamToJoin" is trying to find a match in
> > "lookupTable" and in other cases it isn't. If you haven't already, you
> > might want to take a look at using a GlobalKTable to see if that will
> work
> > for your use-case.  On startup, I believe Kafka Streams will wait until
> the
> > GlobalKTable has fully consumed the topic before data starts flowing.
> > There are downsides to  GlobalKTable's (check the documentation), but if
> > this is just a lookup table where the data is fairly static then it might
> > make sense.
> >
> > Alex
> >
> > On Sun, Sep 8, 2019 at 7:28 AM Bartłomiej Kępa <
> bartlomiej.k...@gmail.com>
> > wrote:
> >
> >> Hi All,
> >> Since some time I’m involved in development of application that
> >> incorporates Kafka Streams API, I’m facing the problem with joining two
> >> Kafka topics. The problem is illustrated in simple test that ws prepared
> >> based on our production code. It is available here:
> >> https://bitbucket.org/b_a_r_t_k/streams-join-problem/ <
> >> https://bitbucket.org/b_a_r_t_k/streams-join-problem/>
> >> As seen in the class JoinStreamBuilder:
> >>
> >> val builder = StreamsBuilder()
> >>
> >> val reducedLookupStoreName = "reduced_$lookupTableTopicName$streamId"
> >>
> >> val streamToJoin = builder.stream(mainTopicName,
> >> Consumed.with(Serdes.String(), genericAvroSerde))
> >>        .selectKey(MainKeySelector())
> >>
> >> val lookupTable = builder.stream(lookupTableTopicName,
> >> Consumed.with(Serdes.String(), genericAvroSerde))
> >>        .selectKey(LookupKeySelector())
> >>        .groupByKey(Serialized.with(Serdes.String(), genericAvroSerde))
> >>        .reduce({ _, new -> new },
> >>                Materialized.`as`<String, GenericRecord,
> >> KeyValueStore<Bytes,
> >>
> ByteArray>>(reducedLookupStoreName).withKeySerde(Serdes.String()).withValueSerde(genericAvroSerde))
> >>
> >> streamToJoin
> >>        .leftJoin(lookupTable, Joiner(streamId),
> >> Joined.with(Serdes.String(), genericAvroSerde, genericAvroSerde))
> >>        .to(targetTopicName, Produced.with(Serdes.String(),
> >> genericAvroSerde))
> >> val topology = builder.build()
> >>
> >> It is simple kind of lookup table to stream join. The Joiner
> >> implementation looks as follows
> >>
> >> class Joiner(private val streamId: Int) : ValueJoiner<GenericRecord,
> >> GenericRecord, GenericRecord> {
> >>    override fun apply(main: GenericRecord?, lookup: GenericRecord?):
> >> GenericRecord {
> >>        if (main == null) LOG.warn("for streamId: $streamId record from
> >> main is null")
> >>        if (lookup == null) LOG.warn("for streamId: $streamId record from
> >> lookup is null")
> >>
> >>        return GenericData.Record(MySampleData.schema)
> >>                .apply {
> >>                    put(MySampleData::stringField.name,
> >> main?.get(MySampleData::stringField.name))
> >>                    put(MySampleData::booleanField.name,
> >> main?.get(MySampleData::booleanField.name))
> >>                    put(MySampleData::intField.name,
> >> lookup?.get(MySampleData::intField.name))
> >>                }
> >>    }
> >> }
> >>
> >> The problem is that sometimes in not deterministic way Joiner’s apply()
> >> method gets null for lookup parameter, while in some cases the
> parameter is
> >> not null - as expected.
> >> The repo I referred above contains a test that is supposed to use that
> >> topology. It iterates 10 times building new instance of the topology
> each
> >> time and then it feeds two topics with sample data (10 records for each
> >> topic) expecting 1 to 1 join will be performed for each records pair.
> >> As seen in log output:
> >> 2019-09-08 13:49:09,634 [main] INFO  com.example.demo.JoinStreamTest
> >> [tenantId=]  - Number of not properly joined per iteration (iteration
> >> number -> number of errors): {0=1, 1=1, 2=1, 3=1, 4=0, 5=1, 6=1, 7=1,
> 8=1,
> >> 9=0}. Total errors: 8
> >>
> >> Some of of the iteration produce no errors, while most of them does.
> >>
> >> Any help welcome. At this point I have no clue what may clause such
> >> behaviour.
> >> Best regards
> >> BK
>
>

Reply via email to