Hi Xingcan,

On Fri, Nov 3, 2017 at 3:38 AM, Xingcan Cui <xingc...@gmail.com> wrote:

> Hi Maxim,
>
> if I understand correctly, you actually need to JOIN the fast stream with
> the slow stream. Could you please share more details about your problem?
>

Sure I can explain more, with some example of pseudo-code. I have external
DB with price list with following structure:

case class PriceList(productId, price)

My events are purchase events with following structure:

case class Purchase(productId, amount)

I would like to get final stream with TotalAmount = Amount*Price in
structure like this:

case class PurchaseTotal(productId, totalAmount)

I have 2 corresponding input streams:

val prices = env.addSource(new PriceListSource).keyBy(_.productId)
val purchases = env.addSource(new PurchaseSource).keyBy(_.productId)

PriceListSource delivers me all CHANGES to external DB table.

Calculate function looks similar to:

class CalculateFunction extends CoProcessFunction[Purchase, PriceList,
PurchaseTotal] {

  private var price: ValueState[Int] = _

  override def processElement1....... {
    out.collect(PurchaseTotal(purchase.productId, purchase.amount *
priceList.value))
  }

  override def processElement2....... {
    price.update(priceList.value)
  }
}

And finally pipeline:

purchases.connect(prices).process(new CalculateFunction).print

The issue is, when I start program my price ValueState is empty and will
not be populated with data which is not updated in DB.
BTW, I cannot use AsyncIO to query DB, because of several technical
restrictions.

1. When you mentioned "they have the same key", did you mean all the data
> get the same key or the logic should be applied with fast.key = slow.key?
>

I meant here that productId in purchase event is definitely exist in
external price list DB (so, it is kind of inner join)


> 2. What should be done to initialize the state?
>

I need to read external DB table and populate price ValueState before
processing first purchase event.

Hope this minimal example helps to understand.
Maxim.


>
> Best,
> Xingcan
>
>
> On Fri, Nov 3, 2017 at 5:54 AM, Maxim Parkachov <lazy.gop...@gmail.com>
> wrote:
>
>> Hi Flink users,
>>
>> I'm struggling with some basic concept and would appreciate some help. I
>> have 2 Input streams, one is fast event stream and one is slow changing
>> dimension. They have the same key and I use CoProcessFunction to store
>> slow data in state and enrich fast data from this state. Everything
>> works as expected.
>>
>> Before I start processing fast streams on first run, I would like to 
>> completely
>> initialise state. I though it could be done in open(), but I don't
>> understand how it will be re-distributed across parallel operators.
>>
>> Another alternative would be to create custom source and push all slow 
>> dimension
>> data downstream, but I could not find how to hold processing fast data
>> until state is initialised.
>>
>> I realise that FLIP-17 (Side Inputs) is what I need, but is there some other
>> way to implement it now ?
>>
>> Thanks,
>> Maxim.
>>
>>
>

Reply via email to