Hi Xingcan, On Fri, Nov 3, 2017 at 3:38 AM, Xingcan Cui <xingc...@gmail.com> wrote:
> Hi Maxim, > > if I understand correctly, you actually need to JOIN the fast stream with > the slow stream. Could you please share more details about your problem? > Sure I can explain more, with some example of pseudo-code. I have external DB with price list with following structure: case class PriceList(productId, price) My events are purchase events with following structure: case class Purchase(productId, amount) I would like to get final stream with TotalAmount = Amount*Price in structure like this: case class PurchaseTotal(productId, totalAmount) I have 2 corresponding input streams: val prices = env.addSource(new PriceListSource).keyBy(_.productId) val purchases = env.addSource(new PurchaseSource).keyBy(_.productId) PriceListSource delivers me all CHANGES to external DB table. Calculate function looks similar to: class CalculateFunction extends CoProcessFunction[Purchase, PriceList, PurchaseTotal] { private var price: ValueState[Int] = _ override def processElement1....... { out.collect(PurchaseTotal(purchase.productId, purchase.amount * priceList.value)) } override def processElement2....... { price.update(priceList.value) } } And finally pipeline: purchases.connect(prices).process(new CalculateFunction).print The issue is, when I start program my price ValueState is empty and will not be populated with data which is not updated in DB. BTW, I cannot use AsyncIO to query DB, because of several technical restrictions. 1. When you mentioned "they have the same key", did you mean all the data > get the same key or the logic should be applied with fast.key = slow.key? > I meant here that productId in purchase event is definitely exist in external price list DB (so, it is kind of inner join) > 2. What should be done to initialize the state? > I need to read external DB table and populate price ValueState before processing first purchase event. Hope this minimal example helps to understand. Maxim. > > Best, > Xingcan > > > On Fri, Nov 3, 2017 at 5:54 AM, Maxim Parkachov <lazy.gop...@gmail.com> > wrote: > >> Hi Flink users, >> >> I'm struggling with some basic concept and would appreciate some help. I >> have 2 Input streams, one is fast event stream and one is slow changing >> dimension. They have the same key and I use CoProcessFunction to store >> slow data in state and enrich fast data from this state. Everything >> works as expected. >> >> Before I start processing fast streams on first run, I would like to >> completely >> initialise state. I though it could be done in open(), but I don't >> understand how it will be re-distributed across parallel operators. >> >> Another alternative would be to create custom source and push all slow >> dimension >> data downstream, but I could not find how to hold processing fast data >> until state is initialised. >> >> I realise that FLIP-17 (Side Inputs) is what I need, but is there some other >> way to implement it now ? >> >> Thanks, >> Maxim. >> >> >