Hi paul,

In theory broadcast operator could not be chained for all-to-all mode, and 
chain is only feasible for one-to-one mode like forward. 
If chain, the next operator could process the raw record emitted by head 
operator directly. But if not, the emitted record must be serialized into 
buffer which could be consumed by the dowstream op via network ornot. So the 
chain way has the best performance in theory compared to non-chain.

In your case, if you could not bypass the requirements of broadcast, then you 
have to face the non-chain way and test whether the real performance is within 
your acception or not. If the performance is not reaching your requirements, we 
could further consider other improvements.

Best,
Zhijiang
------------------------------------------------------------------
From:Piotr Nowojski <pi...@ververica.com>
Send Time:2019年8月6日(星期二) 14:55
To:黄兆鹏 <paulhu...@easyops.cn>
Cc:user <user@flink.apache.org>
Subject:Re: Will broadcast stream affect performance because of the absence of 
operator chaining?

Hi,

No, I think you are right, I forgot about the broadcasting requirement.

Piotrek

On 6 Aug 2019, at 13:11, 黄兆鹏 <paulhu...@easyops.cn> wrote:
Hi, Piotrek,
I previously considered your first advice(use union record type), but I found 
that the schema would be only sent to one subtask of the operator(for example, 
operatorA), and other subtasks of the operator are not aware of it. 
In this case is there anything I have missed? 

Thank you!





------------------ Original ------------------
From:  "Piotr Nowojski"<pi...@ververica.com>;
Date:  Tue, Aug 6, 2019 06:57 PM
To:  "黄兆鹏"<paulhu...@easyops.cn>; 
Cc:  "user"<user@flink.apache.org>; 
Subject:  Re: Will broadcast stream affect performance because of the absence 
of operator chaining?
Hi,

Have you measured the performance impact of braking the operator chain?

This is a current limitation of Flink chaining, that if an operator has two 
inputs, it can be chained to something else (only one input operators are 
chained together). There are plans for the future to address this issue.

As a workaround, besides what you have mentioned:
- maybe your record type can be a union: type of Record or Schema (not Record 
AND Schema), and upstream operators (operatorA) could just ignore/forward the 
Schema. You wouldn’t need to send schema with every record.
- another (ugly) solution, is to implement BroadcastStream input outside of 
Flink, but then you might have issues with checkpointing/watermarking and it 
just makes many things more complicated.

Piotrek

On 6 Aug 2019, at 10:50, 黄兆鹏 <paulhu...@easyops.cn> wrote:
Hi Piotrek,
Thanks for your reply, my broadcast stream just listen to the changes of the 
schema, and it's very infrequent and very lightweight.

In fact there are two ways to solve my problem,

the first one is a broadcast stream that listen to the change of the schema, 
and broadcast to every operator that will handle the data, just as I posted 
originally.
DataStream: OperatorA  ->  OperatorB  -> OperatorC
                          ^                   ^                      ^
                          |                    |                        |
                                  BroadcastStream

the second approach is that I have an operator that will join my data and 
schema together and send to the downstream operators:
 DataStream: MergeSchemaOperator -> OperatorA  ->  OperatorB  -> OperatorC
                          ^                 
                          |                   
                BroadcastStream            


The benefits of the first approach is that the flink job does not have to 
transfer the schema with the real data records among operators, because the 
schema will be broadcasted to each operator.
But the disadvantage of the first approache is that it breaks the operator 
chain, so operators may not be executed in the same slot and gain worse 
performance.

The second approach does not have the problem as the first one, but each 
message will carry its schema info among operators, it will cost about 2x for 
serialization and deserialization between operators.

Is there a better workaround that all the operators could notice the schema 
change and at the same time not breaking the operator chaining? 

Thanks!
------------------ Original ------------------
From:  "Piotr Nowojski"<pi...@ververica.com>;
Date:  Tue, Aug 6, 2019 04:23 PM
To:  "黄兆鹏"<paulhu...@easyops.cn>; 
Cc:  "user"<user@flink.apache.org>; 
Subject:  Re: Will broadcast stream affect performance because of the absence 
of operator chaining?
Hi,

Broadcasting will brake an operator chain. However my best guess is that Kafka 
source will be still a performance bottleneck in your job. Also Network 
exchanges add some measurable overhead only if your records are very 
lightweight and easy to process (for example if you are using RocksDB then you 
can just ignore network costs).

Either way, you can just try this out. Pre populate your Kafka topic with some 
significant number of messages, run both jobs, compare the throughput and 
decide based on those results wether this is ok for you or not.

Piotrek 

> On 6 Aug 2019, at 09:56, 黄兆鹏 <paulhu...@easyops.cn> wrote:
> 
> Hi all, 
> My flink job has dynamic schema of data, so I want to consume a schema kafka 
> topic and try to broadcast to every operator so that each operator could know 
> what kind of data it is handling.
> 
> For example, the two streams just like this:
> OperatorA  ->  OperatorB  -> OperatorC
>       ^                   ^                      ^
>       |                    |                       |
>                BroadcastStream
> 
> If the broadcast stream does not exist, OperatorA, OperatorB, OperatorC are 
> chained together in one slot because they have the same parallelism so that 
> it can gain maximum performance.
> 
> And I was wondering that if the broadcast stream exists, will it affect the 
> performance? Or flink will still chain them together to gain maximum 
> performance? 
> 
> Thanks!



Reply via email to