Thank you for your help! On Tue, May 15, 2018 at 20:05 Lukasz Cwik <lc...@google.com> wrote:
> Yes, that is correct. > > On Tue, May 15, 2018 at 4:40 PM Harshvardhan Agrawal < > harshvardhan.ag...@gmail.com> wrote: > >> Got it. >> >> Since I am not applying any windowing strategy to the side input, does >> beam automatically pickup the windowing strategy for the side inputs from >> the main input? By that I mean the scope of the side input would be a per >> window one and it would be different for every window. Is that correct? >> >> Regards, >> Harsh >> >> On Tue, May 15, 2018 at 17:54 Lukasz Cwik <lc...@google.com> wrote: >> >>> Using deduplicate + side inputs will allow you to have a consistent view >>> of the account information for the entire window which can be nice since it >>> gives consistent processing semantics but using a simple in memory cache to >>> reduce the amount of lookups will likely be much easier to debug and >>> simpler to implement and maintain. >>> >>> On Tue, May 15, 2018 at 2:31 PM Harshvardhan Agrawal < >>> harshvardhan.ag...@gmail.com> wrote: >>> >>>> Thanks Raghu! >>>> >>>> Lukasz, >>>> >>>> Do you think lookups would be a better option than side inputs in my >>>> case? >>>> >>>> >>>> On Tue, May 15, 2018 at 16:33 Raghu Angadi <rang...@google.com> wrote: >>>> >>>>> It should work. I think you need apply Distinct before looking up >>>>> account info : >>>>> billingDataPairs.apply(Keys.create()).apply(Distinct.create()).apply("LookupAccounts", >>>>> ...). >>>>> Note that all of the accounts are stored in single in-memory map. It >>>>> should be small enough for that. >>>>> >>>>> On Tue, May 15, 2018 at 1:15 PM Harshvardhan Agrawal < >>>>> harshvardhan.ag...@gmail.com> wrote: >>>>> >>>>>> Well ideally, I actually made the example a little easy. In the >>>>>> actual example I have multiple reference datasets. Say, I have a tuple of >>>>>> Account and Product as the key. The reason we don’t do the lookup in the >>>>>> DoFn directly is that we don’t want to lookup the data for the same >>>>>> account >>>>>> or same product multiple times across workers in a window. >>>>>> >>>>>> What I was thinking was that it might be better to perform the lookup >>>>>> only once for each account and product in a window and then supply them >>>>>> as >>>>>> side inputs to the main input. >>>>>> >>>>>> On Tue, May 15, 2018 at 16:03 Lukasz Cwik <lc...@google.com> wrote: >>>>>> >>>>>>> Is there a reason you don't want to read the accounting information >>>>>>> within the DoFn directly from the datastore, it seems like that would be >>>>>>> your simplest approach. >>>>>>> >>>>>>> On Tue, May 15, 2018 at 12:43 PM Harshvardhan Agrawal < >>>>>>> harshvardhan.ag...@gmail.com> wrote: >>>>>>> >>>>>>>> Hi, >>>>>>>> >>>>>>>> No we don’t receive any such information from Kafka. >>>>>>>> >>>>>>>> The account information in the external store does change. Every >>>>>>>> time we have a change in the account information we will have to >>>>>>>> recompute >>>>>>>> all the billing info. Our source systems will make sure that they >>>>>>>> publish >>>>>>>> messages for those accounts again. >>>>>>>> >>>>>>>> >>>>>>>> On Tue, May 15, 2018 at 15:11 Lukasz Cwik <lc...@google.com> wrote: >>>>>>>> >>>>>>>>> For each BillingModel you receive over Kafka, how "fresh" should >>>>>>>>> the account information be? >>>>>>>>> Does the account information in the external store change? >>>>>>>>> >>>>>>>>> On Tue, May 15, 2018 at 11:22 AM Harshvardhan Agrawal < >>>>>>>>> harshvardhan.ag...@gmail.com> wrote: >>>>>>>>> >>>>>>>>>> Hi, >>>>>>>>>> >>>>>>>>>> We have certain billing data that arrives to us from Kafka. The >>>>>>>>>> billing data is in json and it contains an account ID. In order for >>>>>>>>>> us to >>>>>>>>>> generate the final report we need to use some account data >>>>>>>>>> associated with >>>>>>>>>> the account id and is stored in an external database. >>>>>>>>>> >>>>>>>>>> It is possible that we get multiple billing info messages for the >>>>>>>>>> same account. We want to be able to lookup the account information >>>>>>>>>> for the >>>>>>>>>> messages in a window and then supply that as a side input to the next >>>>>>>>>> PTransform. >>>>>>>>>> >>>>>>>>>> Is it possible to achieve that in Beam? >>>>>>>>>> >>>>>>>>>> Here is my attempt: >>>>>>>>>> >>>>>>>>>> PCollection<KV<Integer, BillingModel>> billingDataPairs = >>>>>>>>>> p.apply("ReadBillingInfo", KafkaIO.<String, String>read() >>>>>>>>>> .withBootstrapServers(KAFKA_BOOTSTRAP_SERVER) >>>>>>>>>> .withTopic(KAFKA_TOPIC) >>>>>>>>>> .withKeyDeserializer(StringDeserializer.class) >>>>>>>>>> .withValueDeserializer(StringDeserializer.class) >>>>>>>>>> ) >>>>>>>>>> .apply("Window", >>>>>>>>>> Window.into(FixedWindows.of(Duration.standardSeconds(30)))) >>>>>>>>>> .apply("ProcessKafkaMessages",new KafkaProcessor()); >>>>>>>>>> >>>>>>>>>> PCollection<KV<Integer, Iterable<BillingModel>> billingData >>>>>>>>>> = billingDataPairs.apply(GroupByKey.<Integer, BillingModel>create()); >>>>>>>>>> >>>>>>>>>> PCollectionView<Map<Integer, Account>> accountData = >>>>>>>>>> billingDataPairs.apply("LookupAccounts",new >>>>>>>>>> AccountLookupClient()).apply(View.asMap()); >>>>>>>>>> >>>>>>>>>> billingDataPairs.apply(ParDo.of(new DoFn<KV<Integer, >>>>>>>>>> BillingModel>>(){ >>>>>>>>>> @ProcessElement >>>>>>>>>> public void processElement(ProcessContext ctx) { >>>>>>>>>> Integer accountId = ctx.element().getKey(); >>>>>>>>>> Iterable<BillingModel> billingModel = >>>>>>>>>> ctx.element().getValue(); >>>>>>>>>> Account account = ctx.sideinput(accountData).get(accountId); >>>>>>>>>> } >>>>>>>>>> })); >>>>>>>>>> >>>>>>>>>> Regards, >>>>>>>>>> Harsh >>>>>>>>>> -- >>>>>>>>>> >>>>>>>>>> *Regards,Harshvardhan Agrawal* >>>>>>>>>> *267.991.6618 | LinkedIn >>>>>>>>>> <https://www.linkedin.com/in/harshvardhanagr/>* >>>>>>>>>> >>>>>>>>> -- >>>>>>>> >>>>>>>> *Regards,Harshvardhan Agrawal* >>>>>>>> *267.991.6618 | LinkedIn >>>>>>>> <https://www.linkedin.com/in/harshvardhanagr/>* >>>>>>>> >>>>>>> -- >>>>>> >>>>>> *Regards,Harshvardhan Agrawal* >>>>>> *267.991.6618 | LinkedIn >>>>>> <https://www.linkedin.com/in/harshvardhanagr/>* >>>>>> >>>>> -- >>>> >>>> *Regards,Harshvardhan Agrawal* >>>> *267.991.6618 | LinkedIn <https://www.linkedin.com/in/harshvardhanagr/>* >>>> >>> -- >> >> *Regards,Harshvardhan Agrawal* >> *267.991.6618 | LinkedIn <https://www.linkedin.com/in/harshvardhanagr/>* >> > -- *Regards,Harshvardhan Agrawal* *267.991.6618 | LinkedIn <https://www.linkedin.com/in/harshvardhanagr/>*