Hmm, the plan you posted does not look like it would need to spill data to
avoid a deadlock.
Not sure what's causing the slowdown.

How do you read Parquet files?
If you use the HadoopIF wrapper, this might add some overhead.
A dedicated Flink InputFormat for Parquet might help here.

2017-02-07 21:32 GMT+01:00 Newport, Billy <billy.newp...@gs.com>:

> It’s kind of like this:
>
>
>
> DataSet live = from previous
>
> DataSet newRecords = avro read
>
> Dataset mergedLive = live.cogroup(newRecords)
>
> Dataset result = mergedLive.union(deadRecords)
>
> Store result to parquet.
>
>
>
> BTW on another point,
>
> Reading parquet files seems very slow to me. Writing is very fast in
> comparison. It takes 60 slots 10 minutes to read 550million records from a
> parquet file. We have MR jobs finishing processing in 8.5 minutes with 33
> cores so it’s very much slower than whats possible.
>
>
>
>
>
> *From:* Fabian Hueske [mailto:fhue...@gmail.com]
> *Sent:* Tuesday, February 07, 2017 3:26 PM
> *To:* user@flink.apache.org
> *Subject:* Re: Strange filter performance with parquet
>
>
>
> Hi Billy,
>
> this might depend on what you are doing with the live and dead DataSets
> later on.
>
> For example, if you join both data sets, Flink might need to spill one of
> them to disk and read it back to avoid a deadlock.
>
> This happens for instance if the join strategy is a HashJoin which blocks
> one input (known as probe side) until the other is consumed (build side).
>
> If both join inputs originate from the same downstream input (Parquet) the
> downstream input cannot be blocked and the probe side needs to be consumed
> by spilling it to disk.
>
> Spilling to disk and reading the result back might be more expensive than
> reading the original input twice.
>
> You can also check the DataSet execution plan by calling
> getExecutionPlan() on the ExecutionEnvironment.
>
> Best, Fabian
>
>
>
> 2017-02-07 21:10 GMT+01:00 Newport, Billy <billy.newp...@gs.com>:
>
> We’re reading a parquet file (550m records).
>
>
>
> We want to split the parquet using a filter in to 2 sets, live and dead.
>
>
>
> DataSet a = read parquet
>
> DataSet live = a.filter(liveFilter)
>
> DataSet dead = a.filter(deadFilter)
>
>
>
> Is slower than
>
>
>
> DataSet a = read parquet
>
> DataSet live = a.filter(liveFilter)
>
> DataSet b = read parquet
>
> DataSet dead = b.filter(deadFilter)
>
>
>
> Does this make sense? Why would reading it twice be quicker? We’re using
> 1.1.2
>
>
>
>
>
> *Billy Newport*
>
> Data Architecture, Goldman, Sachs & Co.
> 30 Hudson | 37th Floor | Jersey City, NJ
>
> Tel:  +1 (212) 8557773 <(212)%20855-7773> |  Cell:  +1 (507) 254-0134
> <(507)%20254-0134>
> Email: billy.newp...@gs.com <edward.new...@gs.com>, KD2DKQ
>
>
>
>
>

Reply via email to