Hi Ivan, Just to add up to chaining: When splitting the map into two parts, objects need to be copied from one operator to the chained operator. Since your objects are very heavy that can take quite long, especially if you don't have a specific serializer configured but rely on Kryo.
You can avoid having the copying in your case by setting `enableObjectReuse` https://ci.apache.org/projects/flink/flink-docs-stable/dev/execution_configuration.html . This will directly pass results from upstream to chained downstream operators. This feature is disabled by default because some users modify objects directly (non-functional style) or hold local, unmanaged state, so that always copying records is a conservative walkaround. On Fri, May 15, 2020 at 10:17 AM Chesnay Schepler <ches...@apache.org> wrote: > Generally there should be no difference. > Can you check whether the maps are running as a chain (as a single task)? > If they are running in a chain, then I would suspect that *something* > else is skewing your results. > If not, then the added network/serialization pressure would explain it. > > I will assume that the mismatch in variable names in your second example > (JsonData vs rawDataAsJson) is just a typo. > > On 15/05/2020 04:29, Ivan Yang wrote: > > Hi, > > We have a Flink job that reads data from an input stream, then converts each > event from JSON string Avro object, finally writes to parquet files using > StreamingFileSink with OnCheckPointRollingPolicy of 5 mins. Basically a > stateless job. Initially, we use one map operator to convert Json string to > Avro object, Inside the map function, it goes form String -> JsonObject -> > Avro object. > > DataStream<AvroSchema> avroData = data.map(new JsonToAVRO()); > > When we try to break the map operator to two, one for String to JsonObject, > another for JsonObject to Avro. > > DataStream<JsonObject> JsonData = data.map(new StringToJson()); > DataStream<AvroSchema> avroData = rawDataAsJson.map(new > JsonToAvroSchema()) > > The benchmark shows significant performance hit when breaking down to two > operators. We try to understand the Flink internal on why such a big > difference. The setup is using state backend = filesystem. Checkpoint = s3 > bucket. Our event object has 300+ attributes. > > > Thanks > Ivan > > > -- Arvid Heise | Senior Java Developer <https://www.ververica.com/> Follow us @VervericaData -- Join Flink Forward <https://flink-forward.org/> - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Ververica GmbH Registered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng