Hmm, the plan you posted does not look like it would need to spill data to avoid a deadlock. Not sure what's causing the slowdown.
How do you read Parquet files? If you use the HadoopIF wrapper, this might add some overhead. A dedicated Flink InputFormat for Parquet might help here. 2017-02-07 21:32 GMT+01:00 Newport, Billy <billy.newp...@gs.com>: > It’s kind of like this: > > > > DataSet live = from previous > > DataSet newRecords = avro read > > Dataset mergedLive = live.cogroup(newRecords) > > Dataset result = mergedLive.union(deadRecords) > > Store result to parquet. > > > > BTW on another point, > > Reading parquet files seems very slow to me. Writing is very fast in > comparison. It takes 60 slots 10 minutes to read 550million records from a > parquet file. We have MR jobs finishing processing in 8.5 minutes with 33 > cores so it’s very much slower than whats possible. > > > > > > *From:* Fabian Hueske [mailto:fhue...@gmail.com] > *Sent:* Tuesday, February 07, 2017 3:26 PM > *To:* user@flink.apache.org > *Subject:* Re: Strange filter performance with parquet > > > > Hi Billy, > > this might depend on what you are doing with the live and dead DataSets > later on. > > For example, if you join both data sets, Flink might need to spill one of > them to disk and read it back to avoid a deadlock. > > This happens for instance if the join strategy is a HashJoin which blocks > one input (known as probe side) until the other is consumed (build side). > > If both join inputs originate from the same downstream input (Parquet) the > downstream input cannot be blocked and the probe side needs to be consumed > by spilling it to disk. > > Spilling to disk and reading the result back might be more expensive than > reading the original input twice. > > You can also check the DataSet execution plan by calling > getExecutionPlan() on the ExecutionEnvironment. > > Best, Fabian > > > > 2017-02-07 21:10 GMT+01:00 Newport, Billy <billy.newp...@gs.com>: > > We’re reading a parquet file (550m records). > > > > We want to split the parquet using a filter in to 2 sets, live and dead. > > > > DataSet a = read parquet > > DataSet live = a.filter(liveFilter) > > DataSet dead = a.filter(deadFilter) > > > > Is slower than > > > > DataSet a = read parquet > > DataSet live = a.filter(liveFilter) > > DataSet b = read parquet > > DataSet dead = b.filter(deadFilter) > > > > Does this make sense? Why would reading it twice be quicker? We’re using > 1.1.2 > > > > > > *Billy Newport* > > Data Architecture, Goldman, Sachs & Co. > 30 Hudson | 37th Floor | Jersey City, NJ > > Tel: +1 (212) 8557773 <(212)%20855-7773> | Cell: +1 (507) 254-0134 > <(507)%20254-0134> > Email: billy.newp...@gs.com <edward.new...@gs.com>, KD2DKQ > > > > >