Do you mean I can put my simple pipeline multiple times for all topics in one dataflow job ? Yes
Is there any side effect having multiple independent DAG on one DF job ? Not really. It is more about pipeline complexity, logging, debugging, monitoring which become more complex. And also why the TupleTag model is not working properly? What do you mean by it's not working properly? Why is it using more resources than what it should be? What does the timing information for the transforms tell you on the Dataflow Job UI? (Even if MessageExtractor seems simple it isn't free, You have to now write to two GCS locations instead of one for each work item that you process so your doing more network calls) On Wed, Aug 19, 2020 at 8:36 PM Talat Uyarer <tuya...@paloaltonetworks.com> wrote: > Filter step is an independent step. We can think it is an etl step or > something else. MessageExtractor step writes messages on TupleTags based on > the kafka header. Yes, MessageExtractor is literally a multi-output DoFn > already. MessageExtractor is processing 48kps but branches are processing > their logs. Each Filter only consumes its log type. There is no any So > That's why I assume it should consume the same amount of workers. But it > consumes more workers. > > > > |--->Filter1(20kps)-->WriteGCS > KafkaIO->MessageExtractor(48kps)-> | > > |--->Filter2(28kps)-->WriteGCS > > Do you mean I can put my simple pipeline multiple times for all topics in > one dataflow job ? Is there any side effect having multiple independent DAG > on one DF job ? And also why the TupleTag model is not working properly? > Why is it using more resources than what it should be? > > Thanks > > > > On Wed, Aug 19, 2020 at 5:16 PM Robert Bradshaw <rober...@google.com> > wrote: > >> Just to clarify, previously you had. >> >> KafkaIO(topic1) --20kps--> Filter1 -> WriteGCS >> KafkaIO(topic2) --28kps--> Filter2 -> WriteGCS >> >> And now you have >> >> >> ---48kps--> Filter1 >> -> WriteGCS >> / >> KafkaIO(topic1, topic2) + MessageExtractor >> \ >> ---48kps--> Filter2 -> >> WriteGCS >> >> Each filter is now actually consuming (and throwing away) more data than >> before. >> >> Or is MessageExtractor literally a multi-output DoFn already (which is >> why you're talking about TupleTags). This could possibly be more >> expensive if reading Kafak with headers is more expensive than reading >> it without. >> >> If topic1 and topic2 are truly independent, I would keep their reads >> separate. This will simplify your pipeline (and sounds like it'll >> improve performance). Note that you don't have to have a separate >> Dataflow job for each read, you can have a single Pipeline and do as >> many reads as you want and the'll all get executed in the same job. >> >> On Wed, Aug 19, 2020 at 4:14 PM Talat Uyarer >> <tuya...@paloaltonetworks.com> wrote: >> > >> > Hi Robert, >> > >> > I calculated process speed based on worker count. When I have separate >> jobs. topic1 job used 5 workers, topic2 job used 7 workers. Based on >> KafkaIO message count. they had 4kps processing speed per worker. After I >> combine them in one df job. That DF job started using ~18 workers, not 12 >> workers. >> > >> > How can I understand if they are poorly fused or not ? I can not write >> Filter because it is a beamsql. I just want to simplified my DAG that's why >> i did not mentioned >> > >> > Thanks >> > >> > On Wed, Aug 19, 2020 at 3:54 PM Robert Bradshaw <rober...@google.com> >> wrote: >> >> >> >> Is this 2kps coming out of Filter1 + 2kps coming out of Filter2 (which >> >> would be 4kps total), or only 2kps coming out of KafkaIO and >> >> MessageExtractor? >> >> >> >> Though it /shouldn't/ matter, due to sibling fusion, there's a chance >> >> things are getting fused poorly and you could write Filter1 and >> >> Filter2 instead as a DoFn with multiple outputs (see >> >> >> https://urldefense.proofpoint.com/v2/url?u=https-3A__beam.apache.org_documentation_programming-2Dguide_-23additional-2Doutputs&d=DwIFaQ&c=V9IgWpI5PvzTw83UyHGVSoW3Uc1MFWe5J8PTfkrzVSo&r=BkW1L6EF7ergAVYDXCo-3Vwkpy6qjsWAz7_GD7pAR8g&m=Erfg03JLKLNG3lT2ejqq7_fbvfL95-wSxZ5hFKqzyKU&s=JsWPJxBXopYYenfBAp6nkwfB0Q1Dhs1d4Yi41fBY3a8&e= >> ). >> >> >> >> - Robert >> >> >> >> On Wed, Aug 19, 2020 at 3:37 PM Talat Uyarer >> >> <tuya...@paloaltonetworks.com> wrote: >> >> > >> >> > Hi, >> >> > >> >> > I have a very simple DAG on my dataflow job. >> (KafkaIO->Filter->WriteGCS). When I submit this Dataflow job per topic it >> has 4kps per instance processing speed. However I want to consume two >> different topics in one DF job. I used TupleTag. I created TupleTags per >> message type. Each topic has different message types and also needs >> different filters. So my pipeline turned to below DAG. Message Extractor is >> a very simple step checking header of kafka messages and writing the >> correct TupleTag. However after starting to use this new DAG, dataflow >> canprocess 2kps per instance. >> >> > >> >> > >> |--->Filter1-->WriteGCS >> >> > KafkaIO->MessageExtractor-> | >> >> > >> |--->Filter2-->WriteGCS >> >> > >> >> > Do you have any idea why my data process speed decreased ? >> >> > >> >> > Thanks >> >