There is likely a bug then, the ENUM,Record stream to a filter to a set of 
outputformats per filter was slower than the BITMASK,Record to single 
OutputFormat which demux’s the data to each file internally

Are you saying do a custom writer inside a map rather than either of the 2 
above approaches?


From: Chesnay Schepler [mailto:ches...@apache.org]
Sent: Monday, May 01, 2017 10:41 AM
To: user@flink.apache.org
Subject: Re: Collector.collect

Hello,

@Billy, what prevented you from duplicating/splitting the record, based on the 
bitmask, in a map function before the sink?
This shouldn't incur any serialization overhead if the sink is chained to the 
map. The emitted Tuple could also share the
GenericRecord; meaning you don't even have to copy it.

On 01.05.2017 14:52, Newport, Billy wrote:
We’ve done that but it’s very expensive from a serialization point of view when 
writing the same record multiple times, each in a different tuple.

For example, we started with this:

.collect(new Tuple<Short, GenericRecord)).

The record would be written with short = 0 and again with short = 1. This 
results in the GenericRecord being serialized twice. You also prolly need 
filters on the output dataset which is expensive also.

We switched instead to a bitmask. Now, we write the record once and set bits in 
the short for each file the record needs to be written to. Our next step is to 
write records to a file based on the short. We wrote a new outputrecordformat 
which checks the bits in the short and writes the GenericRecord to each file 
for the corresponding bit. This means no filter to split the records for each 
file and this is much faster.

We’re finding a need to do this kind of optimization pretty frequently with 
flink.


From: Gaurav Khandelwal [mailto:gaurav671...@gmail.com]
Sent: Saturday, April 29, 2017 4:32 AM
To: user@flink.apache.org<mailto:user@flink.apache.org>
Subject: Collector.collect

Hello

I am working on RichProcessFunction and I want to emit multiple records at a 
time. To achieve this, I am currently doing :

while(condition)
{
   Collector.collect(new Tuple<>...);
}

I was wondering, is this the correct way or there is any other alternative.




Reply via email to