Thanks Guozhang & Matthias! For 1), it is going to be a common ask. So a DSL API will be good.
For 2), source for the KTable currently is a file. Think about it as a dump from a DB table. We are thinking of ways to stream updates from this table. But for now its a new file every day or so. I plan to automate file uploader to write to kafka when it gets a new file. File is too big to be "broadcasted". The delta changes between files should be really small though. Now, "current content" can be modeled based on timestamp. If I add a timestamp field when pushing to kafka. The records themselves have no notion of time. Its just metadata that will be useful in join. Another way is similar to what Matthias suggested. I can make it sleep if a key is not found in KTable. I can treat it as a condition to indicated KTable is still being initialized. Of course, I need a way to break this sleep cycle if key never comes. Or this can be implemented with a custom watermark assigner that knows when to emit a "special watermark" to indicate current content is read. Or for such a slow stream, any poll to kafka broker that returns zero records can be treated as reaching end of current content. Matthias, I haven't spent enough time on the approach you outlined. Will let you know. Srikanth On Mon, May 23, 2016 at 1:40 PM, Matthias J. Sax <matth...@confluent.io> wrote: > Hi Srikanth, > > as Guozhang mentioned, the problem is the definition of the time, when > your table is read for joining with the stream. > > Using transform() you would basically read a changlog-stream within your > custom Transformer class and apply it via KStream.transform() to your > regular stream. (ie, your Transformer class has a member KTable). > > If Transformer.transform() is called you need to decide somehow, if you > table is read for joining or not (and "sleep" if it is not ready yet, > effectively stalling your KStream). > > As some point in time (not sure how you wanna decide when this point in > time actually is -- see beginning of this mail) you can start to process > the data from the regular stream. > > Have a look into KStreamKTableLeftJoin to get an idea how this would work. > > > -Matthias > > > On 05/23/2016 06:25 PM, Guozhang Wang wrote: > > Hi Srikanth, > > > > How do you define if the "KTable is read completely" to get to the > "current > > content"? Since as you said that table is not purely static, but still > with > > maybe-low-traffic update streams, I guess "catch up to current content" > is > > still depending on some timestamp? > > > > BTW about 1), we are consider adding "read-only global state" as well > into > > the DSL in the future, but like Matthias said it won't be available in > > 0.10.0, so you need to do it through the transform() call where you can > > provide any customized processor. > > > > > > Guozhang > > > > > > On Mon, May 23, 2016 at 8:59 AM, Srikanth <srikanth...@gmail.com> wrote: > > > >> Matthias, > >> > >> For (2), how do you achieve this using transform()? > >> > >> Thanks, > >> Srikanth > >> > >> On Sat, May 21, 2016 at 9:10 AM, Matthias J. Sax <matth...@confluent.io > > > >> wrote: > >> > >>> Hi Srikanth, > >>> > >>> 1) there is no support on DSL level, but if you use Processor API you > >>> can do "anything" you like. So yes, a map-like transform() that gets > >>> initialized with the "broadcast-side" of the join should work. > >>> > >>> 2) Right now, there is no way to stall a stream -- a custom > >>> TimestampExtractor will not do the tricker either, because Kafka > Streams > >>> allows for out-of-order/late arriving data -- thus, it processes what > is > >>> available without "waiting" for late data... > >>> > >>> Of course, you could build a custom solution via transfrom() again. > >>> > >>> > >>> -Matthias > >>> > >>> On 05/21/2016 05:24 AM, Srikanth wrote: > >>>> Hello, > >>>> > >>>> I'm writing a workflow using kafka streams where an incoming stream > >> needs > >>>> to be denormalized and joined with a few dimension table. > >>>> It will be written back to another kafka topic. Fairly typical I > >> believe. > >>>> > >>>> 1) Can I do broadcast join if my dimension table is small enough to be > >>> held > >>>> in each processor's local state? > >>>> Not doing this will force KStream to be shuffled with an internal > >> topic. > >>>> If not should I write a transformer that reads the table in init() and > >>>> cache's it. I can then do a map like transformation in transform() > with > >>>> cache lookup instead of a join. > >>>> > >>>> > >>>> 2) When joining a KStream withe KTable for bigger tables, how do I > >> stall > >>>> reading KStream until KTable is completely materialized? > >>>> I know completely read is a very loose term in stream processing :-) > >> The > >>>> KTable is going to be fairly static and needs the "current content" to > >> be > >>>> read completely before it can be used for joins. > >>>> The only option for synchronizing streams I see is TimestampExtractor. > >> I > >>>> can't figure out how to use that because > >>>> > >>>> i) The join between KStream and KTable is time agnostic and doesn't > >>>> really fit into a window operation. > >>>> ii) TimestampExtractor is set as a stream config at global level. > >>>> Timestamp extraction logic on the other hand will be specific to each > >>>> stream. > >>>> How does one write a generic extractor? > >>>> > >>>> Thanks, > >>>> Srikanth > >>>> > >>> > >>> > >> > > > > > > > >