I was losing something when because I was reading the line of the
GZIPInputStream outside of the busy while loop. I changed it and now I
am having more throughput. It is also a good idea to use VisualVM to
check if the throughput is correct and where I am losing more cycles.


while (reader.ready() && (line = reader.readLine()) != null) {
startTime = System.nanoTime();
taxiRide = TaxiRide.fromString(line);
sourceContext.collectWithTimestamp(taxiRide, getEventTime(taxiRide));
this.dataRateListener.busySleep(startTime);
}

public void busySleep(long startTime) {
long deadLine = startTime + this.delayInNanoSeconds;
while (System.nanoTime() < deadLine) ;
}

Thanks!

--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez
-- https://felipeogutierrez.blogspot.com

On Fri, May 29, 2020 at 12:46 PM Robert Metzger <rmetz...@apache.org> wrote:
>
> Hi Felipe,
>
> the file is just 80 MBs. It is probably cached in the linux page cache, there 
> should not be any disk IO involved.
> So you are saying is that you can not further increase the throughput for 
> sleeps shorter than 2000 nanoseconds.
> Have you tried running this w/o any Sleep / nano.time syscalls? These 
> syscalls can potentially be also expensive.
> Running the source in a simple while loop should give you the theoretical 
> maximum.
>
> If you really want to generate data at a high speed, I would pre-generate 
> some dataset on the heap (do not run any RNG, as it will waste CPU cycles) 
> and emit that.
>
> In general: VisualVM is your friend in understanding where you are loosing 
> cycles.
>
> Best,
> Robert
>
>
> On Thu, May 28, 2020 at 12:06 AM Felipe Gutierrez 
> <felipe.o.gutier...@gmail.com> wrote:
>>
>> Hi,
>>
>> I am trying to benchmark a stream application in Flink. So, I am using
>> the source Function that reads events from the NYC Taxi Rides
>> (http://training.ververica.com/trainingData/nycTaxiRides.gz) and I
>> control the emission with System.nanoTime(). I am not using
>> Thread.sleep because Java does not guarantee the time that the thread
>> will be awakened.
>>
>> public void busySleep() {
>> final long startTime = System.nanoTime();
>> while (System.nanoTime() - startTime < this.delayInNanoSeconds) ;
>> }
>>
>> So, when I wait for 10000 nanoseconds I will get a workload of 100K
>> rec/sec. When I wait for 2000 nanoseconds I will get a workload of
>> 500K rec/sec. For 1000 nanoseconds I will get a workload of 1M
>> rec/sec. And for 500 nanoseconds a workload of 2M rec/sec.
>>
>> The problem that I am facing is that when I set the workload for 1M
>> rec/sec it seems that it is not generating at this rate. I guess it is
>> because it is consuming more time reading the TaxiRide file, or doing
>> IO operations, Or maybe it is some Java limitation.
>> If I use some message broker it will end up adding one more middleware
>> to have read/write IO operations and I guess it will be worst.
>> What do you recommend to do a controllable benchmark for stream processing?
>>
>> Thanks,
>> Felipe
>> --
>> -- Felipe Gutierrez
>> -- skype: felipe.o.gutierrez
>> -- https://felipeogutierrez.blogspot.com

Reply via email to