Hi! The sink is merely a union of the result of the co-group and the one data source. Can't you just make to distinct pipelines out of that? One with co-group -> data sink pipeline and one with the source->sink pipeline? They could even be part of the same job...
Best, Stephan On Wed, Aug 23, 2017 at 5:51 PM, Aljoscha Krettek <aljos...@apache.org> wrote: > Hi, > > The reason is that there are two (or more) different Threads doing the > reading. As an illustration, consider first this case: > > DataSet input = ... > input.map(new MapA()).map(new MapB()) > > Here, MapB is technically "wrapped" by MapA and when MapA emits data this > is directly going the the map() method of MapB. The two functions are > chained. > > Now, in this other case the methods cannot be chained: > > DataSet input1= ... > DataSet input2 > DataSet mappedA = input1.map(new MapA()) > DataSet mappedB = input2.map(new MapB()) > > mappedA.union(mappedB).map(new MapC()) > > Here, there is (at least) one thread per map because none of MapA or MapB > could wrap MapC such that the other one (either MapA or MapB) can still > send data into MapC. Data is sent across a channel between the Threads and > whenever that happens the data is serialised. > > Technically, we could avoid serialization if we knew that two Threads are > running in the same JVM but this is not something that Flink currently does. > > Best, > Aljoscha > > > On 23. Aug 2017, at 17:12, Newport, Billy <billy.newp...@gs.com> wrote: > > Thanks Aljoscha for the prompt response. > > Can you explain the technical reason for the single predecessor rule? This > makes what we are trying to do much more expensive. Really what we’re doing > is reading a parquet file, doing several maps/filters on the records and > writing to the parquet. There is no serialization besides the parquet > operations needed at all. The current flink implementation adds an > expensive serialize/deserialize for no apparent purpose in the code. > > Billy > > > > *From:* Aljoscha Krettek [mailto:aljos...@apache.org <aljos...@apache.org> > ] > *Sent:* Saturday, August 19, 2017 1:45 AM > *To:* Chan, Regina [Tech] > *Cc:* Newport, Billy [Tech]; user@flink.apache.org > *Subject:* Re: Flink parquet read.write performance > > Hi, > > The Sink cannot be chained to the previous two operations because there > are two operations. Chaining only works if there is one predecessor > operation. Data transfer should still be pipelined but you will see > serialisation overhead. What kind of TypeSerializer is used at that > boundary? > > Best, > Aljoscha > > On 18. Aug 2017, at 21:15, Chan, Regina <regina.c...@gs.com> wrote: > > We profiled it and it looks like its sending the output of the > datastoure->filter->map->map to the an intermediate result partition > instead of writing directly to the data sink. Because of this we think it’s > slow because it’s spending its time serializing it for no reason. Why does > it do the forward rather than chain to the datasink? > > <image001.png><image002.png> > > Thanks, > Regina > > *From:* Aljoscha Krettek [mailto:aljos...@apache.org <aljos...@apache.org> > ] > *Sent:* Friday, August 18, 2017 12:14 PM > *To:* Newport, Billy [Tech] > *Cc:* user@flink.apache.org > *Subject:* Re: Flink parquet read.write performance > > Hi Billy, > > Do you also have the data (picture) from the "Timeline" tab of the > completed job? This would give some hints about how long that other > DataSource (with chain) was active. It might be that the sink is waiting > for the other input to become online. > > Best, > Aljoscha > > > On 18. Aug 2017, at 14:45, Newport, Billy <billy.newp...@gs.com> wrote: > > Hi, > > I’m trying to figure out why reading and writing ~5GB worth of parquet > files seems to take 3-4 minutes with 10 TaskManagers, 2 slots, 20GB memory, > 20 Parallelism. I’ve copied in the execution plan the taskmanager times > below. Other details include that we’re reading 20 snappy compresed parquet > files each ~240MB each. (see below) > > I’m trying to use this for a milestoning logic where we take new avro > files from staging and join with the existing milestoned parquet data. I > have a small staging file with only about 1500 records inside so I reduce > the number of records sent to the cogroup in order to make this faster. To > do this, I’m basically reading in GenericRecords from parquet files twice, > once to filter out for “live” records where we then further filter the > records for ones with keys matching what we found in a separate avro file. > This is so reduction of records makes that part of the plan total to 1 > minute 58 secs. > > The concern is the other records with non-live/not-matching-keys. In > theory, I expect this to be fast since it’s just chaining the operations > across all the way through to the sink. However, this part takes about 4 > minutes. We’re not doing anything different from the other Datasource aside > from mapping a DataSet<GenericRecord> to a Tuple2<Short,GenericRecord> > where the short is a bitmap value mapping to where the record needs to be > written. > > Other notes: > I checked the backpressure on the datasource->filter->map->map and it was > OK. I’m not sure what else could be holding it up. > I also profiled it when I ran it on a single task manager single slot and > it seems to spend most of the time waiting. > > Any ideas? Instead of truly chaining is it writing to disk and serializing > multiple times inside each operation? > > Data Source : > hdfs dfs -du -h <folder_name> > 240.2 M <folder_name>/0_partMapper-m-00013.snappy.parquet > 237.2 M <folder_name>/10_partMapper-m-00019.snappy.parquet > 241.9 M <folder_name>/11_partMapper-m-00002.snappy.parquet > 243.3 M <folder_name>/12_partMapper-m-00000.snappy.parquet > 238.2 M <folder_name>/13_partMapper-m-00016.snappy.parquet > 241.7 M <folder_name>/14_partMapper-m-00003.snappy.parquet > 241.0 M <folder_name>/15_partMapper-m-00006.snappy.parquet > 240.3 M <folder_name>/16_partMapper-m-00012.snappy.parquet > 240.3 M <folder_name>/17_partMapper-m-00011.snappy.parquet > 239.5 M <folder_name>/18_partMapper-m-00014.snappy.parquet > 237.6 M <folder_name>/19_partMapper-m-00018.snappy.parquet > 240.7 M <folder_name>/1_partMapper-m-00009.snappy.parquet > 240.7 M <folder_name>/20_partMapper-m-00008.snappy.parquet > 236.5 M <folder_name>/2_partMapper-m-00020.snappy.parquet > 242.1 M <folder_name>/3_partMapper-m-00001.snappy.parquet > 241.7 M <folder_name>/4_partMapper-m-00004.snappy.parquet > 240.5 M <folder_name>/5_partMapper-m-00010.snappy.parquet > 241.7 M <folder_name>/6_partMapper-m-00005.snappy.parquet > 239.1 M <folder_name>/7_partMapper-m-00015.snappy.parquet > 237.9 M <folder_name>/8_partMapper-m-00017.snappy.parquet > 240.8 M <folder_name>/9_partMapper-m-00007.snappy.parquet > > > yarn-session.sh -nm "delp_uat-IMD_Trading_v1_PROD_PerfTest-REFINER_INGEST" > -jm 4096 -tm 20480 -s 2 -n 10 -d] > > > <image001.png> > > <image002.png> > > > Thanks, > > Regina > > >