Hi Ivan,

Just to add up to chaining: When splitting the map into two parts, objects
need to be copied from one operator to the chained operator. Since your
objects are very heavy that can take quite long, especially if you don't
have a specific serializer configured but rely on Kryo.

You can avoid having the copying in your case by setting
`enableObjectReuse`
https://ci.apache.org/projects/flink/flink-docs-stable/dev/execution_configuration.html
.
This will directly pass results from upstream to chained downstream
operators.

This feature is disabled by default because some users modify objects
directly (non-functional style) or hold local, unmanaged state, so that
always copying records is a conservative walkaround.

On Fri, May 15, 2020 at 10:17 AM Chesnay Schepler <ches...@apache.org>
wrote:

> Generally there should be no difference.
> Can you check whether the maps are running as a chain (as a single task)?
> If they are running in a chain, then I would suspect that *something*
> else is skewing your results.
> If not, then the added network/serialization pressure would explain it.
>
> I will assume that the mismatch in variable names in  your second example
> (JsonData vs rawDataAsJson) is just a typo.
>
> On 15/05/2020 04:29, Ivan Yang wrote:
>
> Hi,
>
> We have a Flink job that reads data from an input stream, then converts each 
> event from JSON string Avro object, finally writes to parquet files using 
> StreamingFileSink with OnCheckPointRollingPolicy of 5 mins. Basically a 
> stateless job. Initially, we use one map operator to convert Json string to 
> Avro object, Inside the map function, it goes form String -> JsonObject -> 
> Avro object.
>
>       DataStream<AvroSchema> avroData = data.map(new JsonToAVRO());
>
> When we try to break the map operator to two, one for String to JsonObject, 
> another for JsonObject to Avro.
>
>         DataStream<JsonObject> JsonData = data.map(new StringToJson());
>         DataStream<AvroSchema> avroData = rawDataAsJson.map(new 
> JsonToAvroSchema())
>
> The benchmark shows significant performance hit when breaking down to two 
> operators. We try to understand the Flink internal on why such a big 
> difference. The setup is using state backend = filesystem. Checkpoint = s3 
> bucket. Our event object has 300+ attributes.
>
>
> Thanks
> Ivan
>
>
>

-- 

Arvid Heise | Senior Java Developer

<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng

Reply via email to