Hi Robert,

I calculated process speed based on worker count. When I have
separate jobs. topic1 job used 5 workers, topic2 job used 7 workers. Based
on KafkaIO message count. they had 4kps processing speed per worker. After
I combine them in one df job. That DF job started using ~18 workers, not 12
workers.

How can I understand if they are poorly fused or not ? I can not write
Filter because it is a beamsql. I just want to simplified my DAG that's why
i did not mentioned

Thanks

On Wed, Aug 19, 2020 at 3:54 PM Robert Bradshaw <rober...@google.com> wrote:

> Is this 2kps coming out of Filter1 + 2kps coming out of Filter2 (which
> would be 4kps total), or only 2kps coming out of KafkaIO and
> MessageExtractor?
>
> Though it /shouldn't/ matter, due to sibling fusion, there's a chance
> things are getting fused poorly and you could write Filter1 and
> Filter2 instead as a DoFn with multiple outputs (see
>
> https://urldefense.proofpoint.com/v2/url?u=https-3A__beam.apache.org_documentation_programming-2Dguide_-23additional-2Doutputs&d=DwIFaQ&c=V9IgWpI5PvzTw83UyHGVSoW3Uc1MFWe5J8PTfkrzVSo&r=BkW1L6EF7ergAVYDXCo-3Vwkpy6qjsWAz7_GD7pAR8g&m=Erfg03JLKLNG3lT2ejqq7_fbvfL95-wSxZ5hFKqzyKU&s=JsWPJxBXopYYenfBAp6nkwfB0Q1Dhs1d4Yi41fBY3a8&e=
> ).
>
> - Robert
>
> On Wed, Aug 19, 2020 at 3:37 PM Talat Uyarer
> <tuya...@paloaltonetworks.com> wrote:
> >
> > Hi,
> >
> > I have a very simple DAG on my dataflow job.
> (KafkaIO->Filter->WriteGCS). When I submit this Dataflow job per topic it
> has 4kps per instance processing speed. However I want to consume two
> different topics in one DF job. I used TupleTag. I created TupleTags per
> message type. Each topic has different message types and also needs
> different filters. So my pipeline turned to below DAG. Message Extractor is
> a very simple step checking header of kafka messages and writing the
> correct TupleTag. However after starting to use this new DAG, dataflow
> canprocess 2kps per instance.
> >
> >                                                  |--->Filter1-->WriteGCS
> > KafkaIO->MessageExtractor-> |
> >                                                  |--->Filter2-->WriteGCS
> >
> > Do you have any idea why my data process speed decreased ?
> >
> > Thanks
>

Reply via email to