On Thu, Sep 19, 2019 at 11:05 AM Anjana Pydi <anjan...@bahwancybertek.com> wrote: > > Hi , > > I have a beam streaming pipeline which reads data from pubsub topic, use it > to call an API and get responses, apply some transformations on the obtained > responses and writes to output sinks. > > Now, I need to add logic to write a 'process completed' message to another > pubsub topic once after the process gets finished. Can some one please > provide your thoughts on how can I add it. > > I actually want to achieve this: > > topic1 (data)-> triggers pipeline and writes complete message at end-> topic2 > (complete msg)
Your code below looks fine so far. I'm assuming your send_to_output function produces the message that you want to send to topic2, right? (BTW, you can write beam.Map(send_to_topic) rather than having to write beam.Map(lambda output: send_to_output(output)).) In that case, you just need to add data | beam.io.WriteStringsToPubSub(topic2) > When topic1 sees message on topic2, it again posts new data to topic1 > > Below is the code sample: > > pubsub_message = p | 'Read From Pubsub' >> > beam.io.ReadStringsFromPubSub(topic=known_args.input_topic) | 'split and add' > >> beam.ParDo(split_item) > > data = pubsub_message | 'API call' >> beam.FlatMap(lambda x: > get_responses(x[0], > datetime.strptime(x[1], "%Y-%m-%d %H:%M:%S"), > datetime.strptime(x[2], "%Y-%m-%d %H:%M:%S"))) | > 'WriteOutput' >> beam.Map(lambda output: send_to_output(output)) > > Thanks, > Anjana > > > ----------------------------------------------------------------------------------------------------------------------- > The information contained in this communication is intended solely for the > use of the individual or entity to whom it is addressed and others authorized > to receive it. It may contain confidential or legally privileged information. > If you are not the intended recipient you are hereby notified that any > disclosure, copying, distribution or taking any action in reliance on the > contents of this information is strictly prohibited and may be unlawful. If > you are not the intended recipient, please notify us immediately by > responding to this email and then delete it from your system. Bahwan Cybertek > is neither liable for the proper and complete transmission of the information > contained in this communication nor for any delay in its receipt.