On Thu, Aug 20, 2020 at 12:54 PM Talat Uyarer <tuya...@paloaltonetworks.com> wrote:
> Hi Lucas, > >> Not really. It is more about pipeline complexity, logging, debugging, >> monitoring which become more complex. > > Should I use a different consumer group or should I use the same consumer > group ? > I don't know what you're asking. > And also How Autoscaling will decide worker count ? > > There is some analysis that is done based upon certain metrics to optimize CPU utilization and throughput. > What do you mean by it's not working properly? > > Actually i should correct my statement. Both jobs are using tuple tags but > when I add one more branch after MessageExtractor things are changing. > How are they changing? > What does the timing information for the transforms tell you on the >> Dataflow Job UI? > > Based on Wall Time on DAG. KafkaIO is the slowest step on my pipeline. Its > Walltime shows 28 days. I put all wall time for each step. > > > |--->Filter1 (1 day) --> WriteGCS(1day) > KafkaIO(28 days)->MessageExtractor(7 hrs) -> | > > |--->Filter2 (13 days) --> WriteGCS(2days) > > How do these wall times compare when they are run as two separate pipelines. > Thanks > > On Thu, Aug 20, 2020 at 10:58 AM Luke Cwik <lc...@google.com> wrote: > >> Do you mean I can put my simple pipeline multiple times for all topics in >> one dataflow job ? >> Yes >> >> Is there any side effect having multiple independent DAG on one DF job ? >> Not really. It is more about pipeline complexity, logging, debugging, >> monitoring which become more complex. >> >> And also why the TupleTag model is not working properly? >> What do you mean by it's not working properly? >> >> Why is it using more resources than what it should be? >> What does the timing information for the transforms tell you on the >> Dataflow Job UI? (Even if MessageExtractor seems simple it isn't free, You >> have to now write to two GCS locations instead of one for each work item >> that you process so your doing more network calls) >> >> >> On Wed, Aug 19, 2020 at 8:36 PM Talat Uyarer < >> tuya...@paloaltonetworks.com> wrote: >> >>> Filter step is an independent step. We can think it is an etl step or >>> something else. MessageExtractor step writes messages on TupleTags based on >>> the kafka header. Yes, MessageExtractor is literally a multi-output DoFn >>> already. MessageExtractor is processing 48kps but branches are processing >>> their logs. Each Filter only consumes its log type. There is no any So >>> That's why I assume it should consume the same amount of workers. But it >>> consumes more workers. >>> >>> >>> >>> |--->Filter1(20kps)-->WriteGCS >>> KafkaIO->MessageExtractor(48kps)-> | >>> >>> |--->Filter2(28kps)-->WriteGCS >>> >>> Do you mean I can put my simple pipeline multiple times for all topics >>> in one dataflow job ? Is there any side effect having multiple >>> independent DAG on one DF job ? And also why the TupleTag model is not >>> working properly? Why is it using more resources than what it should be? >>> >>> Thanks >>> >>> >>> >>> On Wed, Aug 19, 2020 at 5:16 PM Robert Bradshaw <rober...@google.com> >>> wrote: >>> >>>> Just to clarify, previously you had. >>>> >>>> KafkaIO(topic1) --20kps--> Filter1 -> WriteGCS >>>> KafkaIO(topic2) --28kps--> Filter2 -> WriteGCS >>>> >>>> And now you have >>>> >>>> >>>> ---48kps--> Filter1 >>>> -> WriteGCS >>>> / >>>> KafkaIO(topic1, topic2) + MessageExtractor >>>> \ >>>> ---48kps--> Filter2 -> >>>> WriteGCS >>>> >>>> Each filter is now actually consuming (and throwing away) more data >>>> than before. >>>> >>>> Or is MessageExtractor literally a multi-output DoFn already (which is >>>> why you're talking about TupleTags). This could possibly be more >>>> expensive if reading Kafak with headers is more expensive than reading >>>> it without. >>>> >>>> If topic1 and topic2 are truly independent, I would keep their reads >>>> separate. This will simplify your pipeline (and sounds like it'll >>>> improve performance). Note that you don't have to have a separate >>>> Dataflow job for each read, you can have a single Pipeline and do as >>>> many reads as you want and the'll all get executed in the same job. >>>> >>>> On Wed, Aug 19, 2020 at 4:14 PM Talat Uyarer >>>> <tuya...@paloaltonetworks.com> wrote: >>>> > >>>> > Hi Robert, >>>> > >>>> > I calculated process speed based on worker count. When I have >>>> separate jobs. topic1 job used 5 workers, topic2 job used 7 workers. Based >>>> on KafkaIO message count. they had 4kps processing speed per worker. After >>>> I combine them in one df job. That DF job started using ~18 workers, not 12 >>>> workers. >>>> > >>>> > How can I understand if they are poorly fused or not ? I can not >>>> write Filter because it is a beamsql. I just want to simplified my DAG >>>> that's why i did not mentioned >>>> > >>>> > Thanks >>>> > >>>> > On Wed, Aug 19, 2020 at 3:54 PM Robert Bradshaw <rober...@google.com> >>>> wrote: >>>> >> >>>> >> Is this 2kps coming out of Filter1 + 2kps coming out of Filter2 >>>> (which >>>> >> would be 4kps total), or only 2kps coming out of KafkaIO and >>>> >> MessageExtractor? >>>> >> >>>> >> Though it /shouldn't/ matter, due to sibling fusion, there's a chance >>>> >> things are getting fused poorly and you could write Filter1 and >>>> >> Filter2 instead as a DoFn with multiple outputs (see >>>> >> >>>> https://urldefense.proofpoint.com/v2/url?u=https-3A__beam.apache.org_documentation_programming-2Dguide_-23additional-2Doutputs&d=DwIFaQ&c=V9IgWpI5PvzTw83UyHGVSoW3Uc1MFWe5J8PTfkrzVSo&r=BkW1L6EF7ergAVYDXCo-3Vwkpy6qjsWAz7_GD7pAR8g&m=Erfg03JLKLNG3lT2ejqq7_fbvfL95-wSxZ5hFKqzyKU&s=JsWPJxBXopYYenfBAp6nkwfB0Q1Dhs1d4Yi41fBY3a8&e= >>>> ). >>>> >> >>>> >> - Robert >>>> >> >>>> >> On Wed, Aug 19, 2020 at 3:37 PM Talat Uyarer >>>> >> <tuya...@paloaltonetworks.com> wrote: >>>> >> > >>>> >> > Hi, >>>> >> > >>>> >> > I have a very simple DAG on my dataflow job. >>>> (KafkaIO->Filter->WriteGCS). When I submit this Dataflow job per topic it >>>> has 4kps per instance processing speed. However I want to consume two >>>> different topics in one DF job. I used TupleTag. I created TupleTags per >>>> message type. Each topic has different message types and also needs >>>> different filters. So my pipeline turned to below DAG. Message Extractor is >>>> a very simple step checking header of kafka messages and writing the >>>> correct TupleTag. However after starting to use this new DAG, dataflow >>>> canprocess 2kps per instance. >>>> >> > >>>> >> > >>>> |--->Filter1-->WriteGCS >>>> >> > KafkaIO->MessageExtractor-> | >>>> >> > >>>> |--->Filter2-->WriteGCS >>>> >> > >>>> >> > Do you have any idea why my data process speed decreased ? >>>> >> > >>>> >> > Thanks >>>> >>>