On Thu, Sep 19, 2019 at 11:05 AM Anjana Pydi
<anjan...@bahwancybertek.com> wrote:
>
> Hi ,
>
> I have a beam streaming pipeline which reads data from pubsub topic, use it 
> to call an API and get responses, apply some transformations on the obtained 
> responses and writes to output sinks.
>
> Now, I need to add logic to write a 'process completed' message to another 
> pubsub topic once after the process gets finished. Can some one please 
> provide your thoughts on how can I add it.
>
> I actually want to achieve this:
>
> topic1 (data)-> triggers pipeline and writes complete message at end-> topic2 
> (complete msg)

Your code below looks fine so far. I'm assuming your send_to_output
function produces the message that you want to send to topic2, right?
(BTW, you can write beam.Map(send_to_topic) rather than having to
write beam.Map(lambda output: send_to_output(output)).)

In that case, you just need to add

    data | beam.io.WriteStringsToPubSub(topic2)

> When topic1 sees message on topic2, it again posts new data to topic1
>
> Below is the code sample:
>
>   pubsub_message = p | 'Read From Pubsub' >> 
> beam.io.ReadStringsFromPubSub(topic=known_args.input_topic) | 'split and add' 
> >> beam.ParDo(split_item)
>
>   data = pubsub_message | 'API call' >> beam.FlatMap(lambda x: 
> get_responses(x[0],
>                 datetime.strptime(x[1], "%Y-%m-%d %H:%M:%S"),
>                 datetime.strptime(x[2], "%Y-%m-%d %H:%M:%S"))) | 
> 'WriteOutput' >> beam.Map(lambda output: send_to_output(output))
>
> Thanks,
> Anjana
>
>
> -----------------------------------------------------------------------------------------------------------------------
>  The information contained in this communication is intended solely for the 
> use of the individual or entity to whom it is addressed and others authorized 
> to receive it. It may contain confidential or legally privileged information. 
> If you are not the intended recipient you are hereby notified that any 
> disclosure, copying, distribution or taking any action in reliance on the 
> contents of this information is strictly prohibited and may be unlawful. If 
> you are not the intended recipient, please notify us immediately by 
> responding to this email and then delete it from your system. Bahwan Cybertek 
> is neither liable for the proper and complete transmission of the information 
> contained in this communication nor for any delay in its receipt.

Reply via email to