Matthias: if the data you're joining against is relatively small and unchanging, would a global KTable be a reasonable choice since you get the guarantee that it will get loaded first on startup? Not sure if that's the case for Bartlomiej, but just want to make sure that that kind of scenario is a good fit for a global KTable. (thanks for the info on the changes to timestamp synchronization - I missed that)
Alex On Mon, Sep 9, 2019 at 7:29 PM Matthias J. Sax <matth...@confluent.io> wrote: > Using a GlobalKTable has many implication and I would not recommend it > necessarily. It makes processing non-deterministic because there is > synchronization between the main processing thread and the global-threads. > > Note that data is processed base in their timestamps. Hence, if you > pre-populate both topics, you need to ensure that the data in the KTable > topis has smaller timestamps than in the KStream topic -- that way, the > KTable will be "loaded" before any KStream record will be processed. > > Note, that before 2.1 release, the timestamp synchronization was best > effort only -- hence, you should use 2.1 or newer for Kafka Streams > (broker version does not matter). > > Also consider `max.task.idle.ms` configuration parameter that can > "block" one side from processing for some time in case you write the > data after the application was started (ie, tradeoff between latency vs > better ordering guarantees) > > > -Matthias > > On 9/9/19 1:54 PM, Alex Brekken wrote: > > Just to be clear, the timing issue I was referring to was with consuming > > the data, not publishing. In order for your join to work correctly (and > > consistently), all the data in the lookupTable needs to be there BEFORE > the > > streamToJoin data starts processing right? Your topology won't wait for > > the lookupTable to get fully populated before processing data, which > means > > there might be cases where streamToJoin is trying to find a match in but > it > > isn't there yet because it hasn't fully consumed its topic. This is why > I > > think using a GlobalKTable might solve your problem since it bootstraps > the > > global ktable first. You might have already understood what I meant, > just > > making sure. Good luck! > > > > Alex > > > > On Mon, Sep 9, 2019 at 5:27 AM Bartłomiej Kępa < > bartlomiej.k...@gmail.com> > > wrote: > > > >> Hi Alex > >> Thank you for your quick response. Unfortunately it seems that it is not > >> timing issue. At least in following understanding. I modified the test > is a > >> way that it ensures that the messages were committed to both topics > before > >> I actually start the topology. Still no improvement with regards to > >> expected result. Seems I need to investigate second option with > >> GlobalKTable. > >> Best regard, > >> BK > >> > >>> Wiadomość napisana przez Alex Brekken <brek...@gmail.com> w dniu > >> 08.09.2019, o godz. 14:49: > >>> > >>> The non-deterministic behavior you're seeing might be the result of a > >>> timing issue. In other words, in some cases your KTable is fully > >> populated > >>> by the time data in "streamToJoin" is trying to find a match in > >>> "lookupTable" and in other cases it isn't. If you haven't already, you > >>> might want to take a look at using a GlobalKTable to see if that will > >> work > >>> for your use-case. On startup, I believe Kafka Streams will wait until > >> the > >>> GlobalKTable has fully consumed the topic before data starts flowing. > >>> There are downsides to GlobalKTable's (check the documentation), but > if > >>> this is just a lookup table where the data is fairly static then it > might > >>> make sense. > >>> > >>> Alex > >>> > >>> On Sun, Sep 8, 2019 at 7:28 AM Bartłomiej Kępa < > >> bartlomiej.k...@gmail.com> > >>> wrote: > >>> > >>>> Hi All, > >>>> Since some time I’m involved in development of application that > >>>> incorporates Kafka Streams API, I’m facing the problem with joining > two > >>>> Kafka topics. The problem is illustrated in simple test that ws > prepared > >>>> based on our production code. It is available here: > >>>> https://bitbucket.org/b_a_r_t_k/streams-join-problem/ < > >>>> https://bitbucket.org/b_a_r_t_k/streams-join-problem/> > >>>> As seen in the class JoinStreamBuilder: > >>>> > >>>> val builder = StreamsBuilder() > >>>> > >>>> val reducedLookupStoreName = "reduced_$lookupTableTopicName$streamId" > >>>> > >>>> val streamToJoin = builder.stream(mainTopicName, > >>>> Consumed.with(Serdes.String(), genericAvroSerde)) > >>>> .selectKey(MainKeySelector()) > >>>> > >>>> val lookupTable = builder.stream(lookupTableTopicName, > >>>> Consumed.with(Serdes.String(), genericAvroSerde)) > >>>> .selectKey(LookupKeySelector()) > >>>> .groupByKey(Serialized.with(Serdes.String(), genericAvroSerde)) > >>>> .reduce({ _, new -> new }, > >>>> Materialized.`as`<String, GenericRecord, > >>>> KeyValueStore<Bytes, > >>>> > >> > ByteArray>>(reducedLookupStoreName).withKeySerde(Serdes.String()).withValueSerde(genericAvroSerde)) > >>>> > >>>> streamToJoin > >>>> .leftJoin(lookupTable, Joiner(streamId), > >>>> Joined.with(Serdes.String(), genericAvroSerde, genericAvroSerde)) > >>>> .to(targetTopicName, Produced.with(Serdes.String(), > >>>> genericAvroSerde)) > >>>> val topology = builder.build() > >>>> > >>>> It is simple kind of lookup table to stream join. The Joiner > >>>> implementation looks as follows > >>>> > >>>> class Joiner(private val streamId: Int) : ValueJoiner<GenericRecord, > >>>> GenericRecord, GenericRecord> { > >>>> override fun apply(main: GenericRecord?, lookup: GenericRecord?): > >>>> GenericRecord { > >>>> if (main == null) LOG.warn("for streamId: $streamId record from > >>>> main is null") > >>>> if (lookup == null) LOG.warn("for streamId: $streamId record > from > >>>> lookup is null") > >>>> > >>>> return GenericData.Record(MySampleData.schema) > >>>> .apply { > >>>> put(MySampleData::stringField.name, > >>>> main?.get(MySampleData::stringField.name)) > >>>> put(MySampleData::booleanField.name, > >>>> main?.get(MySampleData::booleanField.name)) > >>>> put(MySampleData::intField.name, > >>>> lookup?.get(MySampleData::intField.name)) > >>>> } > >>>> } > >>>> } > >>>> > >>>> The problem is that sometimes in not deterministic way Joiner’s > apply() > >>>> method gets null for lookup parameter, while in some cases the > >> parameter is > >>>> not null - as expected. > >>>> The repo I referred above contains a test that is supposed to use that > >>>> topology. It iterates 10 times building new instance of the topology > >> each > >>>> time and then it feeds two topics with sample data (10 records for > each > >>>> topic) expecting 1 to 1 join will be performed for each records pair. > >>>> As seen in log output: > >>>> 2019-09-08 13:49:09,634 [main] INFO com.example.demo.JoinStreamTest > >>>> [tenantId=] - Number of not properly joined per iteration (iteration > >>>> number -> number of errors): {0=1, 1=1, 2=1, 3=1, 4=0, 5=1, 6=1, 7=1, > >> 8=1, > >>>> 9=0}. Total errors: 8 > >>>> > >>>> Some of of the iteration produce no errors, while most of them does. > >>>> > >>>> Any help welcome. At this point I have no clue what may clause such > >>>> behaviour. > >>>> Best regards > >>>> BK > >> > >> > > > >