Generally there should be no difference.
Can you check whether the maps are running as a chain (as a single task)?
If they are running in a chain, then I would suspect that /something/ else is skewing your results.
If not, then the added network/serialization pressure would explain it.

I will assume that the mismatch in variable names inĀ  your second example (JsonData vs rawDataAsJson) is just a typo.

On 15/05/2020 04:29, Ivan Yang wrote:
Hi,

We have a Flink job that reads data from an input stream, then converts each event 
from JSON string Avro object, finally writes to parquet files using StreamingFileSink 
with OnCheckPointRollingPolicy of 5 mins. Basically a stateless job. Initially, we 
use one map operator to convert Json string to Avro object, Inside the map function, 
it goes form String -> JsonObject -> Avro object.

        DataStream<AvroSchema> avroData = data.map(new JsonToAVRO());

When we try to break the map operator to two, one for String to JsonObject, 
another for JsonObject to Avro.

         DataStream<JsonObject> JsonData = data.map(new StringToJson());
         DataStream<AvroSchema> avroData = rawDataAsJson.map(new 
JsonToAvroSchema())

The benchmark shows significant performance hit when breaking down to two 
operators. We try to understand the Flink internal on why such a big 
difference. The setup is using state backend = filesystem. Checkpoint = s3 
bucket. Our event object has 300+ attributes.


Thanks
Ivan


Reply via email to