Clicked send to early, the serializer we wrote also caches datumreader/writers 
for each known schema and serializes the genericrecords using the avro 
encoder/decoder so it’s not slow but not as fast as custom ones.


From: Fabian Hueske [mailto:fhue...@gmail.com]
Sent: Tuesday, February 07, 2017 4:17 PM
To: user@flink.apache.org
Subject: Re: Strange filter performance with parquet

I'm not familiar with the details of Parquet and Avro, but I know that the 
handling of GenericRecord is very inefficient in Flink.
The reason is that they are serialized using Kryo and always contain the full 
Avro schema. If you can provide a specific record to the InputFormat, Flink 
will serialize it much more efficiently as a Pojo type.

2017-02-07 22:08 GMT+01:00 Newport, Billy 
<billy.newp...@gs.com<mailto:billy.newp...@gs.com>>:
We read them like this:

                     Job job = Job.getInstance();

                     AvroParquetInputFormat<GenericRecord> inputFormat = new 
AvroParquetInputFormat<GenericRecord>();
                     AvroParquetInputFormat.setAvroReadSchema(job, 
getOutputSchema(datasetName));

                     String storeName = 
getCurrentRefinerInfo().getDatastoreName();
                     Schema schema = getMergeSchema(storeName, datasetName);
                     SerializableAvroRecordBuilder mergeRecordBuilder = new 
SerializableAvroRecordBuilder(storeName, datasetName, schema);

                     Path path = new Path(datasetMergeDir);
                     DataSet<Tuple2<Void, GenericRecord>> d = 
getExecutionEnvironment().readHadoopFile(inputFormat, Void.class, 
GenericRecord.class, path.toString(), job).filter(new 
FilterFunction<Tuple2<Void,GenericRecord>>() { this does the live/dead 
filtering…




From: Fabian Hueske [mailto:fhue...@gmail.com<mailto:fhue...@gmail.com>]
Sent: Tuesday, February 07, 2017 3:56 PM

To: user@flink.apache.org<mailto:user@flink.apache.org>
Subject: Re: Strange filter performance with parquet

Hmm, the plan you posted does not look like it would need to spill data to 
avoid a deadlock.
Not sure what's causing the slowdown.

How do you read Parquet files?
If you use the HadoopIF wrapper, this might add some overhead.
A dedicated Flink InputFormat for Parquet might help here.

2017-02-07 21:32 GMT+01:00 Newport, Billy 
<billy.newp...@gs.com<mailto:billy.newp...@gs.com>>:
It’s kind of like this:

DataSet live = from previous
DataSet newRecords = avro read
Dataset mergedLive = live.cogroup(newRecords)
Dataset result = mergedLive.union(deadRecords)
Store result to parquet.

BTW on another point,
Reading parquet files seems very slow to me. Writing is very fast in 
comparison. It takes 60 slots 10 minutes to read 550million records from a 
parquet file. We have MR jobs finishing processing in 8.5 minutes with 33 cores 
so it’s very much slower than whats possible.


From: Fabian Hueske [mailto:fhue...@gmail.com<mailto:fhue...@gmail.com>]
Sent: Tuesday, February 07, 2017 3:26 PM
To: user@flink.apache.org<mailto:user@flink.apache.org>
Subject: Re: Strange filter performance with parquet

Hi Billy,
this might depend on what you are doing with the live and dead DataSets later 
on.
For example, if you join both data sets, Flink might need to spill one of them 
to disk and read it back to avoid a deadlock.
This happens for instance if the join strategy is a HashJoin which blocks one 
input (known as probe side) until the other is consumed (build side).
If both join inputs originate from the same downstream input (Parquet) the 
downstream input cannot be blocked and the probe side needs to be consumed by 
spilling it to disk.
Spilling to disk and reading the result back might be more expensive than 
reading the original input twice.
You can also check the DataSet execution plan by calling getExecutionPlan() on 
the ExecutionEnvironment.
Best, Fabian

2017-02-07 21:10 GMT+01:00 Newport, Billy 
<billy.newp...@gs.com<mailto:billy.newp...@gs.com>>:
We’re reading a parquet file (550m records).

We want to split the parquet using a filter in to 2 sets, live and dead.

DataSet a = read parquet
DataSet live = a.filter(liveFilter)
DataSet dead = a.filter(deadFilter)

Is slower than

DataSet a = read parquet
DataSet live = a.filter(liveFilter)
DataSet b = read parquet
DataSet dead = b.filter(deadFilter)

Does this make sense? Why would reading it twice be quicker? We’re using 1.1.2


Billy Newport
Data Architecture, Goldman, Sachs & Co.
30 Hudson | 37th Floor | Jersey City, NJ
Tel:  +1 (212) 8557773<tel:(212)%20855-7773> |  Cell:  +1 (507) 
254-0134<tel:(507)%20254-0134>
Email: billy.newp...@gs.com<mailto:edward.new...@gs.com>, KD2DKQ




Reply via email to