On Thu, Aug 20, 2020 at 12:54 PM Talat Uyarer <tuya...@paloaltonetworks.com>
wrote:

> Hi Lucas,
>
>> Not really. It is more about pipeline complexity, logging, debugging,
>> monitoring which become more complex.
>
> Should I use a different consumer group or should I use the same consumer
> group ?
>
I don't know what you're asking.


> And also How Autoscaling will decide worker count ?
>
> There is some analysis that is done based upon certain metrics to optimize
CPU utilization and throughput.



> What do you mean by it's not working properly?
>
> Actually i should correct my statement. Both jobs are using tuple tags but
> when I add one more branch after MessageExtractor things are changing.
>
How are they changing?


> What does the timing information for the transforms tell you on the
>> Dataflow Job UI?
>
> Based on Wall Time on DAG. KafkaIO is the slowest step on my pipeline. Its
> Walltime shows 28 days. I put all wall time for each step.
>
>
>   |--->Filter1 (1 day) --> WriteGCS(1day)
> KafkaIO(28 days)->MessageExtractor(7 hrs) -> |
>
>   |--->Filter2 (13 days) --> WriteGCS(2days)
>
> How do these wall times compare when they are run as two separate
pipelines.


> Thanks
>
> On Thu, Aug 20, 2020 at 10:58 AM Luke Cwik <lc...@google.com> wrote:
>
>> Do you mean I can put my simple pipeline multiple times for all topics in
>> one dataflow job ?
>> Yes
>>
>> Is there any side effect having multiple independent DAG on one DF job ?
>> Not really. It is more about pipeline complexity, logging, debugging,
>> monitoring which become more complex.
>>
>> And also why the TupleTag model is not working properly?
>> What do you mean by it's not working properly?
>>
>> Why is it using more resources than what it should be?
>> What does the timing information for the transforms tell you on the
>> Dataflow Job UI? (Even if MessageExtractor seems simple it isn't free, You
>> have to now write to two GCS locations instead of one for each work item
>> that you process so your doing more network calls)
>>
>>
>> On Wed, Aug 19, 2020 at 8:36 PM Talat Uyarer <
>> tuya...@paloaltonetworks.com> wrote:
>>
>>> Filter step is an independent step. We can think it is an etl step or
>>> something else. MessageExtractor step writes messages on TupleTags based on
>>> the kafka header. Yes, MessageExtractor is literally a multi-output DoFn
>>> already. MessageExtractor is processing 48kps but branches are processing
>>> their logs. Each Filter only consumes its log type. There is no any  So
>>> That's why I assume it should consume the same amount of workers. But it
>>> consumes more workers.
>>>
>>>
>>>
>>>  |--->Filter1(20kps)-->WriteGCS
>>> KafkaIO->MessageExtractor(48kps)-> |
>>>
>>>  |--->Filter2(28kps)-->WriteGCS
>>>
>>> Do you mean I can put my simple pipeline multiple times for all topics
>>> in one dataflow job ? Is there any side effect having multiple
>>> independent DAG on one DF job ? And also why the TupleTag model is not
>>> working properly? Why is it using more resources than what it should be?
>>>
>>> Thanks
>>>
>>>
>>>
>>> On Wed, Aug 19, 2020 at 5:16 PM Robert Bradshaw <rober...@google.com>
>>> wrote:
>>>
>>>> Just to clarify, previously you had.
>>>>
>>>> KafkaIO(topic1) --20kps--> Filter1 -> WriteGCS
>>>> KafkaIO(topic2) --28kps--> Filter2 -> WriteGCS
>>>>
>>>> And now you have
>>>>
>>>>
>>>>                                                   ---48kps--> Filter1
>>>> -> WriteGCS
>>>>                                               /
>>>> KafkaIO(topic1, topic2) + MessageExtractor
>>>>                                                \
>>>>                                                  ---48kps--> Filter2 ->
>>>> WriteGCS
>>>>
>>>> Each filter is now actually consuming (and throwing away) more data
>>>> than before.
>>>>
>>>> Or is MessageExtractor literally a multi-output DoFn already (which is
>>>> why you're talking about TupleTags). This could possibly be more
>>>> expensive if reading Kafak with headers is more expensive than reading
>>>> it without.
>>>>
>>>> If topic1 and topic2 are truly independent, I would keep their reads
>>>> separate. This will simplify your pipeline (and sounds like it'll
>>>> improve performance). Note that you don't have to have a separate
>>>> Dataflow job for each read, you can have a single Pipeline and do as
>>>> many reads as you want and the'll all get executed in the same job.
>>>>
>>>> On Wed, Aug 19, 2020 at 4:14 PM Talat Uyarer
>>>> <tuya...@paloaltonetworks.com> wrote:
>>>> >
>>>> > Hi Robert,
>>>> >
>>>> > I calculated process speed based on worker count. When I have
>>>> separate jobs. topic1 job used 5 workers, topic2 job used 7 workers. Based
>>>> on KafkaIO message count. they had 4kps processing speed per worker. After
>>>> I combine them in one df job. That DF job started using ~18 workers, not 12
>>>> workers.
>>>> >
>>>> > How can I understand if they are poorly fused or not ? I can not
>>>> write Filter because it is a beamsql. I just want to simplified my DAG
>>>> that's why i did not mentioned
>>>> >
>>>> > Thanks
>>>> >
>>>> > On Wed, Aug 19, 2020 at 3:54 PM Robert Bradshaw <rober...@google.com>
>>>> wrote:
>>>> >>
>>>> >> Is this 2kps coming out of Filter1 + 2kps coming out of Filter2
>>>> (which
>>>> >> would be 4kps total), or only 2kps coming out of KafkaIO and
>>>> >> MessageExtractor?
>>>> >>
>>>> >> Though it /shouldn't/ matter, due to sibling fusion, there's a chance
>>>> >> things are getting fused poorly and you could write Filter1 and
>>>> >> Filter2 instead as a DoFn with multiple outputs (see
>>>> >>
>>>> https://urldefense.proofpoint.com/v2/url?u=https-3A__beam.apache.org_documentation_programming-2Dguide_-23additional-2Doutputs&d=DwIFaQ&c=V9IgWpI5PvzTw83UyHGVSoW3Uc1MFWe5J8PTfkrzVSo&r=BkW1L6EF7ergAVYDXCo-3Vwkpy6qjsWAz7_GD7pAR8g&m=Erfg03JLKLNG3lT2ejqq7_fbvfL95-wSxZ5hFKqzyKU&s=JsWPJxBXopYYenfBAp6nkwfB0Q1Dhs1d4Yi41fBY3a8&e=
>>>> ).
>>>> >>
>>>> >> - Robert
>>>> >>
>>>> >> On Wed, Aug 19, 2020 at 3:37 PM Talat Uyarer
>>>> >> <tuya...@paloaltonetworks.com> wrote:
>>>> >> >
>>>> >> > Hi,
>>>> >> >
>>>> >> > I have a very simple DAG on my dataflow job.
>>>> (KafkaIO->Filter->WriteGCS). When I submit this Dataflow job per topic it
>>>> has 4kps per instance processing speed. However I want to consume two
>>>> different topics in one DF job. I used TupleTag. I created TupleTags per
>>>> message type. Each topic has different message types and also needs
>>>> different filters. So my pipeline turned to below DAG. Message Extractor is
>>>> a very simple step checking header of kafka messages and writing the
>>>> correct TupleTag. However after starting to use this new DAG, dataflow
>>>> canprocess 2kps per instance.
>>>> >> >
>>>> >> >
>>>> |--->Filter1-->WriteGCS
>>>> >> > KafkaIO->MessageExtractor-> |
>>>> >> >
>>>> |--->Filter2-->WriteGCS
>>>> >> >
>>>> >> > Do you have any idea why my data process speed decreased ?
>>>> >> >
>>>> >> > Thanks
>>>>
>>>

Reply via email to