Thank you for your help!

On Tue, May 15, 2018 at 20:05 Lukasz Cwik <lc...@google.com> wrote:

> Yes, that is correct.
>
> On Tue, May 15, 2018 at 4:40 PM Harshvardhan Agrawal <
> harshvardhan.ag...@gmail.com> wrote:
>
>> Got it.
>>
>> Since I am not applying any windowing strategy to the side input, does
>> beam automatically pickup the windowing strategy for the side inputs from
>> the main input? By that I mean the scope of the side input would be a per
>> window one and it would be different for every window. Is that correct?
>>
>> Regards,
>> Harsh
>>
>> On Tue, May 15, 2018 at 17:54 Lukasz Cwik <lc...@google.com> wrote:
>>
>>> Using deduplicate + side inputs will allow you to have a consistent view
>>> of the account information for the entire window which can be nice since it
>>> gives consistent processing semantics but using a simple in memory cache to
>>> reduce the amount of lookups will likely be much easier to debug and
>>> simpler to implement and maintain.
>>>
>>> On Tue, May 15, 2018 at 2:31 PM Harshvardhan Agrawal <
>>> harshvardhan.ag...@gmail.com> wrote:
>>>
>>>> Thanks Raghu!
>>>>
>>>> Lukasz,
>>>>
>>>> Do you think lookups would be a better option than side inputs in my
>>>> case?
>>>>
>>>>
>>>> On Tue, May 15, 2018 at 16:33 Raghu Angadi <rang...@google.com> wrote:
>>>>
>>>>> It should work. I think you need apply Distinct before looking up
>>>>> account info :
>>>>> billingDataPairs.apply(Keys.create()).apply(Distinct.create()).apply("LookupAccounts",
>>>>> ...).
>>>>> Note that all of the accounts are stored in single in-memory map. It
>>>>> should be small enough for that.
>>>>>
>>>>> On Tue, May 15, 2018 at 1:15 PM Harshvardhan Agrawal <
>>>>> harshvardhan.ag...@gmail.com> wrote:
>>>>>
>>>>>> Well ideally, I actually made the example a little easy. In the
>>>>>> actual example I have multiple reference datasets. Say, I have a tuple of
>>>>>> Account and Product as the key. The reason we don’t do the lookup in the
>>>>>> DoFn directly is that we don’t want to lookup the data for the same 
>>>>>> account
>>>>>> or same product multiple times across workers in a window.
>>>>>>
>>>>>> What I was thinking was that it might be better to perform the lookup
>>>>>> only once for each account and product in a window and then supply them 
>>>>>> as
>>>>>> side inputs to the main input.
>>>>>>
>>>>>> On Tue, May 15, 2018 at 16:03 Lukasz Cwik <lc...@google.com> wrote:
>>>>>>
>>>>>>> Is there a reason you don't want to read the accounting information
>>>>>>> within the DoFn directly from the datastore, it seems like that would be
>>>>>>> your simplest approach.
>>>>>>>
>>>>>>> On Tue, May 15, 2018 at 12:43 PM Harshvardhan Agrawal <
>>>>>>> harshvardhan.ag...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> No we don’t receive any such information from Kafka.
>>>>>>>>
>>>>>>>> The account information in the external store does change. Every
>>>>>>>> time we have a change in the account information we will have to 
>>>>>>>> recompute
>>>>>>>> all the billing info. Our source systems will make sure that they 
>>>>>>>> publish
>>>>>>>> messages for those accounts again.
>>>>>>>>
>>>>>>>>
>>>>>>>> On Tue, May 15, 2018 at 15:11 Lukasz Cwik <lc...@google.com> wrote:
>>>>>>>>
>>>>>>>>> For each BillingModel you receive over Kafka, how "fresh" should
>>>>>>>>> the account information be?
>>>>>>>>> Does the account information in the external store change?
>>>>>>>>>
>>>>>>>>> On Tue, May 15, 2018 at 11:22 AM Harshvardhan Agrawal <
>>>>>>>>> harshvardhan.ag...@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Hi,
>>>>>>>>>>
>>>>>>>>>> We have certain billing data that arrives to us from Kafka. The
>>>>>>>>>> billing data is in json and it contains an account ID. In order for 
>>>>>>>>>> us to
>>>>>>>>>> generate the final report we need to use some account data 
>>>>>>>>>> associated with
>>>>>>>>>> the account id and is stored in an external database.
>>>>>>>>>>
>>>>>>>>>> It is possible that we get multiple billing info messages for the
>>>>>>>>>> same account. We want to be able to lookup the account information 
>>>>>>>>>> for the
>>>>>>>>>> messages in a window and then supply that as a side input to the next
>>>>>>>>>> PTransform.
>>>>>>>>>>
>>>>>>>>>> Is it possible to achieve that in Beam?
>>>>>>>>>>
>>>>>>>>>> Here is my attempt:
>>>>>>>>>>
>>>>>>>>>>     PCollection<KV<Integer, BillingModel>> billingDataPairs =
>>>>>>>>>> p.apply("ReadBillingInfo", KafkaIO.<String, String>read()
>>>>>>>>>>      .withBootstrapServers(KAFKA_BOOTSTRAP_SERVER)
>>>>>>>>>>      .withTopic(KAFKA_TOPIC)
>>>>>>>>>>      .withKeyDeserializer(StringDeserializer.class)
>>>>>>>>>>      .withValueDeserializer(StringDeserializer.class)
>>>>>>>>>>      )
>>>>>>>>>>      .apply("Window",
>>>>>>>>>> Window.into(FixedWindows.of(Duration.standardSeconds(30))))
>>>>>>>>>>      .apply("ProcessKafkaMessages",new KafkaProcessor());
>>>>>>>>>>
>>>>>>>>>>      PCollection<KV<Integer, Iterable<BillingModel>> billingData
>>>>>>>>>> = billingDataPairs.apply(GroupByKey.<Integer, BillingModel>create());
>>>>>>>>>>
>>>>>>>>>>      PCollectionView<Map<Integer, Account>> accountData =
>>>>>>>>>> billingDataPairs.apply("LookupAccounts",new
>>>>>>>>>> AccountLookupClient()).apply(View.asMap());
>>>>>>>>>>
>>>>>>>>>>     billingDataPairs.apply(ParDo.of(new DoFn<KV<Integer,
>>>>>>>>>> BillingModel>>(){
>>>>>>>>>>     @ProcessElement
>>>>>>>>>>     public void processElement(ProcessContext ctx) {
>>>>>>>>>>     Integer accountId = ctx.element().getKey();
>>>>>>>>>>     Iterable<BillingModel> billingModel =
>>>>>>>>>> ctx.element().getValue();
>>>>>>>>>>     Account account = ctx.sideinput(accountData).get(accountId);
>>>>>>>>>>     }
>>>>>>>>>>     }));
>>>>>>>>>>
>>>>>>>>>> Regards,
>>>>>>>>>> Harsh
>>>>>>>>>> --
>>>>>>>>>>
>>>>>>>>>> *Regards,Harshvardhan Agrawal*
>>>>>>>>>> *267.991.6618 | LinkedIn
>>>>>>>>>> <https://www.linkedin.com/in/harshvardhanagr/>*
>>>>>>>>>>
>>>>>>>>> --
>>>>>>>>
>>>>>>>> *Regards,Harshvardhan Agrawal*
>>>>>>>> *267.991.6618 | LinkedIn
>>>>>>>> <https://www.linkedin.com/in/harshvardhanagr/>*
>>>>>>>>
>>>>>>> --
>>>>>>
>>>>>> *Regards,Harshvardhan Agrawal*
>>>>>> *267.991.6618 | LinkedIn
>>>>>> <https://www.linkedin.com/in/harshvardhanagr/>*
>>>>>>
>>>>> --
>>>>
>>>> *Regards,Harshvardhan Agrawal*
>>>> *267.991.6618 | LinkedIn <https://www.linkedin.com/in/harshvardhanagr/>*
>>>>
>>> --
>>
>> *Regards,Harshvardhan Agrawal*
>> *267.991.6618 | LinkedIn <https://www.linkedin.com/in/harshvardhanagr/>*
>>
> --

*Regards,Harshvardhan Agrawal*
*267.991.6618 | LinkedIn <https://www.linkedin.com/in/harshvardhanagr/>*

Reply via email to