Yes I am just thinking how to modify/rewrite this piece of code if I want
to run my pipeline on Dataflow runner.

Thanks & Regards
Rajnil Guha

On Wed, Jul 21, 2021 at 1:12 AM Robert Bradshaw <rober...@google.com> wrote:

>  On Tue, Jul 20, 2021 at 12:33 PM Rajnil Guha <rajnil94.g...@gmail.com>
> wrote:
> >
> > Hi,
> >
> > Thank you so much for your help, the collect() function works. I tried
> below and it prints correctly.
> >
> > is_empty_check = (dupe_records | "CountGloballyDupes" >> Count.Globally()
> > #| "IsEmptyCheck" >> beam.Map(lambda x: x == 0)
> > )
> >
> > is_empty_df = ib.collect(is_empty_check)
> > if(is_empty_df.iloc[0,0] == 0):
> > print("Empty")
> > else:
> > ib.show(is_empty_check)
>
> Yes, this should work fine.
>
> > I tried using Beam Dataframes as below but it says iloc() is not
> supported for Beam Dataframes.
>
> Correct. iloc depends on ordering, and PCollections aren't in general
> ordered. However, see below.
>
> > is_empty_beam_df = beam.dataframe.convert.to_dataframe(is_empty_check)
> >
> > if (is_empty_beam_df.iloc[0, 0] == 0):
> > print("Empty")
> > else:
> > print(is_empty_beam_df)
> >
> > Any other way to implement similar style checks using Beam Dataframes?
>
> It sounds like you're trying to get around using collect. The issue
> is, until "collect" (or similar) is called, dupe_records doesn't
> really have any contents to look into or compare against. All it is is
> a pointer to an expression to how to compute it. When you call collect
> (or pipeline.run) the pipeline is actually executed and the result (0
> or whatever) is computed.
>
> > Thanks & Regards
> > Rajnil Guha
> >
> > On Tue, Jul 20, 2021 at 3:52 AM Robert Bradshaw <rober...@google.com>
> wrote:
> >>
> >> On Mon, Jul 19, 2021 at 11:12 AM Reuven Lax <re...@google.com> wrote:
> >> >
> >> > I know the Java API can handle default values for a combiner when the
> PCollection is empty (though this only works in the global window). I'm not
> sure offhand about Python.
> >> >
> >> > On Mon, Jul 19, 2021 at 10:32 AM Rajnil Guha <rajnil94.g...@gmail.com>
> wrote:
> >> >>
> >> >> Hi,
> >> >>
> >> >> I tried running some experiments on Interactive Runner before
> actually running the pipeline on dataflow. Below is what I did though not
> sure if this is the correct way to do it:-
> >> >>
> >> >> Let's say I want to check whether the Pcoll named is_empty_check is
> empty or not and based on which I take a decision.
> >> >>
> >> >> is_empty_check = (dupe_records | "CountGlobally" >> Count.Globally()
> >> >> )
> >>
> >> is_empty_check now contains a PCollection with a single integer value
> >> in it (possibly 0).
> >>
> >> >> if is_empty_check != 0:
> >>
> >> is_empty_check is a PCollection, never equal to the int 0.
> >>
> >> >> ib.show(is_empty_check)
> >> >> else:
> >> >> print("Empty")
> >> >>
> >> >> In the above code it works fine when the Pcoll is not empty in which
> case it executes the statement under if part.
> >> >> But when the Pcoll is empty it does not execute the else part and
> instead executes the if part i.e. prints 0.
> >>
> >> It sounds like what you want to do is ib.collect(is_empty_check) and
> >> then see if its only element is 0.
> >>
> >> >> On Mon, Jul 19, 2021 at 12:32 AM Reuven Lax <re...@google.com>
> wrote:
> >> >>>
> >> >>> You could count the collection (with default value of zero).
> >> >>>
> >> >>> On Sun, Jul 18, 2021, 11:42 AM Rajnil Guha <rajnil94.g...@gmail.com>
> wrote:
> >> >>>>
> >> >>>> Hi Reuven,
> >> >>>>
> >> >>>> Yes, for now this is a bounded PCollection.
> >> >>>>
> >> >>>> Thanks & Regards
> >> >>>> Rajnil Guha
> >> >>>>
> >> >>>> On Mon, Jul 19, 2021 at 12:02 AM Reuven Lax <re...@google.com>
> wrote:
> >> >>>>>
> >> >>>>> Is this a bounded collection?
> >> >>>>>
> >> >>>>> On Sun, Jul 18, 2021, 11:17 AM Rajnil Guha <
> rajnil94.g...@gmail.com> wrote:
> >> >>>>>>
> >> >>>>>> Hi Beam Users,
> >> >>>>>>
> >> >>>>>> I have a use-case where I need to check whether a Pcollection is
> empty or not. If it's not empty I need to write a message to a Pub/Sub
> topic. I am using the Python SDK and Dataflow to write and run my pipelines
> respectively. I searched but could not come across any concrete way on how
> to check whether a Pcollection is empty or not using Python and how to take
> action based on the check. Is there any way to implement this using Beam.
> >> >>>>>>
> >> >>>>>> Thanks & Regards
> >> >>>>>> Rajnil Guha
>

Reply via email to