Is there a reason you don't want to read the accounting information within the DoFn directly from the datastore, it seems like that would be your simplest approach.
On Tue, May 15, 2018 at 12:43 PM Harshvardhan Agrawal < harshvardhan.ag...@gmail.com> wrote: > Hi, > > No we don’t receive any such information from Kafka. > > The account information in the external store does change. Every time we > have a change in the account information we will have to recompute all the > billing info. Our source systems will make sure that they publish messages > for those accounts again. > > > On Tue, May 15, 2018 at 15:11 Lukasz Cwik <lc...@google.com> wrote: > >> For each BillingModel you receive over Kafka, how "fresh" should the >> account information be? >> Does the account information in the external store change? >> >> On Tue, May 15, 2018 at 11:22 AM Harshvardhan Agrawal < >> harshvardhan.ag...@gmail.com> wrote: >> >>> Hi, >>> >>> We have certain billing data that arrives to us from Kafka. The billing >>> data is in json and it contains an account ID. In order for us to generate >>> the final report we need to use some account data associated with the >>> account id and is stored in an external database. >>> >>> It is possible that we get multiple billing info messages for the same >>> account. We want to be able to lookup the account information for the >>> messages in a window and then supply that as a side input to the next >>> PTransform. >>> >>> Is it possible to achieve that in Beam? >>> >>> Here is my attempt: >>> >>> PCollection<KV<Integer, BillingModel>> billingDataPairs = >>> p.apply("ReadBillingInfo", KafkaIO.<String, String>read() >>> .withBootstrapServers(KAFKA_BOOTSTRAP_SERVER) >>> .withTopic(KAFKA_TOPIC) >>> .withKeyDeserializer(StringDeserializer.class) >>> .withValueDeserializer(StringDeserializer.class) >>> ) >>> .apply("Window", >>> Window.into(FixedWindows.of(Duration.standardSeconds(30)))) >>> .apply("ProcessKafkaMessages",new KafkaProcessor()); >>> >>> PCollection<KV<Integer, Iterable<BillingModel>> billingData = >>> billingDataPairs.apply(GroupByKey.<Integer, BillingModel>create()); >>> >>> PCollectionView<Map<Integer, Account>> accountData = >>> billingDataPairs.apply("LookupAccounts",new >>> AccountLookupClient()).apply(View.asMap()); >>> >>> billingDataPairs.apply(ParDo.of(new DoFn<KV<Integer, >>> BillingModel>>(){ >>> @ProcessElement >>> public void processElement(ProcessContext ctx) { >>> Integer accountId = ctx.element().getKey(); >>> Iterable<BillingModel> billingModel = ctx.element().getValue(); >>> Account account = ctx.sideinput(accountData).get(accountId); >>> } >>> })); >>> >>> Regards, >>> Harsh >>> -- >>> >>> *Regards,Harshvardhan Agrawal* >>> *267.991.6618 | LinkedIn <https://www.linkedin.com/in/harshvardhanagr/>* >>> >> -- > > *Regards,Harshvardhan Agrawal* > *267.991.6618 | LinkedIn <https://www.linkedin.com/in/harshvardhanagr/>* >