Nicolas, The reason we do not yet have a windowed KStream-KTable join is that we do not materialize the KStream but only the KTable according to our first join semantics design just to keep the scope small. But it is not impossible to add if we feel it is a common use case that non-windowed KStream-KTable join cannot cover. For now you can consider using a customized processor in your DSL to do this windowed join.
Note that even with windowed KStream-KTable join in the DSL, if you have a too-late record from the KTable that falls behind the other stream's window retention period, then you may still miss that join result. Guozhang On Thu, Jul 21, 2016 at 1:57 AM, Nicolas PHUNG <nsphung.apa...@gmail.com> wrote: > Hi Guozhang, > > "I think ideally what you want is a windowed KStream-KTable join" <- It is > what I need indeed. But from what I gather in Kafka/Confluent > documentation, when using a KStream-KTable, you don't have the window > parameter. @Matthias said it by design it seems, maybe there's a good > reason for this. But for my use case with a KStream-KTable, I need to be > sure about getting the changelog record (KTable) key/id first before it's > used in the record stream (KStream) else my result joined record stream > will contain record that doesn't have the changelog record (null or not up > to date on join). I'm trying to understand and find if there's a way to do > something like this with the current release of Kafka. > > Regards, > > On Wed, Jul 20, 2016 at 7:47 PM, Guozhang Wang <wangg...@gmail.com> wrote: > > > Hi Nicolas, > > > > For KStream-KTable join, if a record coming from the KStream did not find > > the matching record from the other materialized KTable, then the join > > result is lost since as you noted, even if the changelog record arrives > to > > KTable later, it will not trigger a join as the KStream is not > > materialized. Using the early timestamps on KTable has the effects of > > trying to first process records from KTable and materialize them before > > processing records from KStream so that it is likely that the > corresponding > > matching key has already exist in KTable when the joining record from > > KStream arrives and being processed. But again, it is not perfectly > > guaranteed since this is best-effort. > > > > I think ideally what you want is a windowed KStream-KTable join, where > the > > KTable's key space is bounded and hence can be completely materialized > > (across partitions if you run multiple instances of your same piece of > > code), whereas the KStream is unbounded and hence you need to window it > in > > order to materialize it so that if there is a late record from the > KTable, > > it may still find the matching record from the windowed KStream. Is that > > right? > > > > > > Guozhang > > > > > > > > On Wed, Jul 20, 2016 at 4:25 AM, Nicolas PHUNG <nsphung.apa...@gmail.com > > > > wrote: > > > > > @Guozhang Ok I've tried and it doesn't have the expected behavior. For > > > KStream-KStream join, there's the issue to have to produce the same > > > changelog record to be able to join within the windows. And for > > > KStream-KTable, an update/insert in the changelog record doesn't > trigger > > > join missed that was in the record stream (+you can't specify a windows > > for > > > a KStream-KTable). > > > > > > On Wed, Jul 20, 2016 at 11:14 AM, Nicolas PHUNG < > > nsphung.apa...@gmail.com> > > > wrote: > > > > > > > Hi, > > > > > > > > Thank you for your answer @Matthias. Indeed, I need a kind of > symmetric > > > > join. However, KStream-KStream join doesn't match with my use case: I > > > will > > > > need to generate the events in the changelog (e.g a campaign > marketing > > > with > > > > a certain Id/Key) because they live only for the join in a defined > > > windows. > > > > Let's say I got a click in the record stream and then the campaign > > entity > > > > arrive later on within the windows, the join enriched stream works. > But > > > > after the windows, new click arrive in the record stream and won't be > > > able > > > > to find the campaign entity since the windows containing the > > information > > > > has passed. I haven't tried KTable-KTable yet but I think my clicks > for > > > > example doesn't really match a changelog stream in my opinion. > > > > > > > > @Guozhang Ok so If I fake the timestamp in the record stream > (KStream) > > > > (let's say add 1 day), I could manage to give one day chance to the > > late > > > > arrival in the changelog stream (my KTable) for the Join to be > > processed. > > > > Let me try. Thanks > > > > > > > > Regards, > > > > > > > > On Tue, Jul 19, 2016 at 11:45 PM, Guozhang Wang <wangg...@gmail.com> > > > > wrote: > > > > > > > >> Hello Nicolas, > > > >> > > > >> If this missing matched record issue is mainly due to the order > these > > > two > > > >> streams were processed (e.g., say your corresponding changelog > record > > > was > > > >> a > > > >> bit late compared with the record stream's record with the same > key), > > > you > > > >> can try to "hint" Kafka Streams library to give the changelog > stream a > > > bit > > > >> more time ahead by specifying its timestamps using the > > > TimestampExtractor > > > >> with an earlier value against the record stream. And Kafka Streams > > will > > > do > > > >> a best-effort "stream synchronization" to make sure these two > streams > > > were > > > >> processed at roughly the same pace based on record timestamps, which > > > will > > > >> result in records from the changelog stream to be processed > in-priori > > to > > > >> the record stream. > > > >> > > > >> > > > >> Guozhang > > > >> > > > >> > > > >> On Tue, Jul 19, 2016 at 6:10 AM, Matthias J. Sax < > > matth...@confluent.io > > > > > > > >> wrote: > > > >> > > > >> > Hi Nicolas, > > > >> > > > > >> > your are right, it is currently not possible to get a result from > a > > > >> > KTable update (and this is by design). The idea is, that the > KStream > > > is > > > >> > enriched with the *current state* of KTable -- thus, for each > > KStream > > > >> > record a look-up in KTable is done. (In this sense, a > KStream-KTable > > > >> > join in asymmetric.) > > > >> > > > > >> > If you need a symmetric join (ie, lookup for both directions), you > > can > > > >> > either use a KTable-KTable or KStream-KStream join. Not sure, if > > this > > > >> > might work for your use case. > > > >> > > > > >> > -Matthias > > > >> > > > > >> > > > > >> > On 07/19/2016 01:36 PM, Nicolas PHUNG wrote: > > > >> > > Hi, > > > >> > > > > > >> > > I'm using Kafka 0.10.0.0 with the Confluent platform 3.0.0 > > > >> > > > > > >> > > I manage to join a record stream (KStream / clicks stream) with > a > > > >> > changelog > > > >> > > stream (KTable / an entity like a campaign related to a click > for > > > >> > example). > > > >> > > When the entity in the KTable is inserted first (and the first > > time > > > of > > > >> > > course) in Kafka, the record stream is processed as expected > with > > > the > > > >> > join > > > >> > > in a new enriched stream. This is good. > > > >> > > > > > >> > > My issue is when the record stream generate a record that > > contains a > > > >> > key/id > > > >> > > that hasn't been insert yet in the changelog stream/KTable. My > > > process > > > >> > > generate a record stream without information in the enriched > > stream. > > > >> > Would > > > >> > > it be possible to recall this enriched stream process once the > > > >> changelog > > > >> > > record on my KTable received the missing id/key ? From my > > > >> understanding, > > > >> > > it's not possible right now to this with a KStream-KTable join. > Is > > > >> there > > > >> > a > > > >> > > way to do something like this ? > > > >> > > > > > >> > > Thanks. > > > >> > > > > > >> > > Regards, > > > >> > > Nicolas PHUNG > > > >> > > > > > >> > > > > >> > > > > >> > > > >> > > > >> -- > > > >> -- Guozhang > > > >> > > > > > > > > > > > > > > > > > > > -- > > -- Guozhang > > > -- -- Guozhang