Hello, I am not sure what is the context behind your join, but I just wanted to point out that Beam SQL [1] or the Join-library extension [2] may be helpful in your scenario to avoid changing semantics or the need to orchestrate your jobs outside Beam.
[1] https://beam.apache.org/documentation/dsls/sql/extensions/joins/ [2] https://beam.apache.org/documentation/sdks/java-extensions/ Best, Bruno On Wed, Jun 15, 2022 at 3:35 PM Jack McCluskey <jrmcclus...@google.com> wrote: > Hey Ravi, > > The problem you're running into is that the act of writing data to a table > and reading from it are not joined actions in the Beam model. There's no > connecting PCollection tying those together, so they are split and run in > parallel. If you want to do this and need the data written to C, you should > re-use the PCollection written to C in your filtering step instead of > reading the data from C again. That should produce the graph you're looking > for in a batch context. > > Thanks, > > Jack McCluskey > > On Wed, Jun 15, 2022 at 3:30 PM Ravi Kapoor <kapoorrav...@gmail.com> > wrote: > >> FYI >> >> On Thu, Jun 16, 2022 at 12:56 AM Ravi Kapoor <kapoorrav...@gmail.com> >> wrote: >> >>> Hi Daniel, >>> >>> I have a use case where I join two tables say A and B and write the >>> joined Collection to C. >>> Then I would like to filter some records on C and put it to another >>> table say D. >>> So, the pipeline on Dataflow UI should look like this right? >>> >>> A >>> \ >>> C -> D >>> / >>> B >>> >>> However, the pipeline is writing C -> D in parallel. >>> How can this pipeline run in parallel as data has not been pushed yet to >>> C by the previous pipeline? >>> >>> Even when I ran this pipeline, Table D did not get any records inserted >>> as well which is apparent. >>> Can you help me with this use case? >>> >>> Thanks, >>> Ravi >>> >>> >>> >>> On Tue, Jun 14, 2022 at 9:01 PM Daniel Collins <dpcoll...@google.com> >>> wrote: >>> >>>> Can you speak to what specifically you want to be different? The job >>>> graph you see, with the A -> B and B-> C being separate is an accurate >>>> reflection of your pipeline. table_B is outside of the beam model, by >>>> pushing your data there, Dataflow has no ability to identify that no >>>> manipulation of data is happening at table_B. >>>> >>>> If you want to just process data from A to destinations D and E, while >>>> writing an intermediate output to table_B, you should just remove the read >>>> from table B and modify table_A_records again directly. If that is not what >>>> you want, you would need to explain more specifically what you want that is >>>> different. Is it a pure UI change? Is it a functional change? >>>> >>>> -Daniel >>>> >>>> On Tue, Jun 14, 2022 at 11:12 AM Ravi Kapoor <kapoorrav...@gmail.com> >>>> wrote: >>>> >>>>> Team, >>>>> Any update on this? >>>>> >>>>> On Mon, Jun 13, 2022 at 8:39 PM Ravi Kapoor <kapoorrav...@gmail.com> >>>>> wrote: >>>>> >>>>>> Hi Team, >>>>>> >>>>>> I am currently using Beam in my project with Dataflow Runner. >>>>>> I am trying to create a pipeline where the data flows from the >>>>>> source to staging then to target such as: >>>>>> >>>>>> A (Source) -> B(Staging) -> C (Target) >>>>>> >>>>>> When I create a pipeline as below: >>>>>> >>>>>> PCollection<TableRow> table_A_records = >>>>>> p.apply(BigQueryIO.readTableRows() >>>>>> .from("project:dataset.table_A")); >>>>>> >>>>>> table_A_records.apply(BigQueryIO.writeTableRows(). >>>>>> to("project:dataset.table_B") >>>>>> >>>>>> .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER) >>>>>> >>>>>> .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)); >>>>>> >>>>>> PCollection<TableRow> table_B_records = >>>>>> p.apply(BigQueryIO.readTableRows() >>>>>> .from("project:dataset.table_B")); >>>>>> table_B_records.apply(BigQueryIO.writeTableRows(). >>>>>> to("project:dataset.table_C") >>>>>> >>>>>> .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER) >>>>>> >>>>>> .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)); >>>>>> p.run().waitUntilFinish(); >>>>>> >>>>>> >>>>>> It basically creates two parallel job graphs in dataflow instead >>>>>> creating a transformation as expected: >>>>>> A -> B >>>>>> B -> C >>>>>> I needed to create data pipeline which flows the data in chain like: >>>>>> D >>>>>> / >>>>>> A -> B -> C >>>>>> \ >>>>>> E >>>>>> Is there a way to achieve this transformation in between source and >>>>>> target tables? >>>>>> >>>>>> Thanks, >>>>>> Ravi >>>>>> >>>>> >>>>> >>>>> -- >>>>> Thanks, >>>>> Ravi Kapoor >>>>> +91-9818764564 <+91%2098187%2064564> >>>>> kapoorrav...@gmail.com >>>>> >>>> >>> >>> -- >>> Thanks, >>> Ravi Kapoor >>> +91-9818764564 <+91%2098187%2064564> >>> kapoorrav...@gmail.com >>> >> >> >> -- >> Thanks, >> Ravi Kapoor >> +91-9818764564 <+91%2098187%2064564> >> kapoorrav...@gmail.com >> >