I was losing something when because I was reading the line of the GZIPInputStream outside of the busy while loop. I changed it and now I am having more throughput. It is also a good idea to use VisualVM to check if the throughput is correct and where I am losing more cycles.
while (reader.ready() && (line = reader.readLine()) != null) { startTime = System.nanoTime(); taxiRide = TaxiRide.fromString(line); sourceContext.collectWithTimestamp(taxiRide, getEventTime(taxiRide)); this.dataRateListener.busySleep(startTime); } public void busySleep(long startTime) { long deadLine = startTime + this.delayInNanoSeconds; while (System.nanoTime() < deadLine) ; } Thanks! -- -- Felipe Gutierrez -- skype: felipe.o.gutierrez -- https://felipeogutierrez.blogspot.com On Fri, May 29, 2020 at 12:46 PM Robert Metzger <rmetz...@apache.org> wrote: > > Hi Felipe, > > the file is just 80 MBs. It is probably cached in the linux page cache, there > should not be any disk IO involved. > So you are saying is that you can not further increase the throughput for > sleeps shorter than 2000 nanoseconds. > Have you tried running this w/o any Sleep / nano.time syscalls? These > syscalls can potentially be also expensive. > Running the source in a simple while loop should give you the theoretical > maximum. > > If you really want to generate data at a high speed, I would pre-generate > some dataset on the heap (do not run any RNG, as it will waste CPU cycles) > and emit that. > > In general: VisualVM is your friend in understanding where you are loosing > cycles. > > Best, > Robert > > > On Thu, May 28, 2020 at 12:06 AM Felipe Gutierrez > <felipe.o.gutier...@gmail.com> wrote: >> >> Hi, >> >> I am trying to benchmark a stream application in Flink. So, I am using >> the source Function that reads events from the NYC Taxi Rides >> (http://training.ververica.com/trainingData/nycTaxiRides.gz) and I >> control the emission with System.nanoTime(). I am not using >> Thread.sleep because Java does not guarantee the time that the thread >> will be awakened. >> >> public void busySleep() { >> final long startTime = System.nanoTime(); >> while (System.nanoTime() - startTime < this.delayInNanoSeconds) ; >> } >> >> So, when I wait for 10000 nanoseconds I will get a workload of 100K >> rec/sec. When I wait for 2000 nanoseconds I will get a workload of >> 500K rec/sec. For 1000 nanoseconds I will get a workload of 1M >> rec/sec. And for 500 nanoseconds a workload of 2M rec/sec. >> >> The problem that I am facing is that when I set the workload for 1M >> rec/sec it seems that it is not generating at this rate. I guess it is >> because it is consuming more time reading the TaxiRide file, or doing >> IO operations, Or maybe it is some Java limitation. >> If I use some message broker it will end up adding one more middleware >> to have read/write IO operations and I guess it will be worst. >> What do you recommend to do a controllable benchmark for stream processing? >> >> Thanks, >> Felipe >> -- >> -- Felipe Gutierrez >> -- skype: felipe.o.gutierrez >> -- https://felipeogutierrez.blogspot.com