I am afraid that you can be much more precise if you use System.nanoTime() instead of System.currentTimeMillis() together with Thread.sleep(delay);. First because Thread.sleep is less precise [1] and second because you can do less operations with System.nanoTime() in an empty loop. Like this:
while (reader.ready() && (line = reader.readLine()) != null) { startTime = System.nanoTime(); taxiRide = TaxiRide.fromString(line); sourceContext.collectWithTimestamp(taxiRide, getEventTime(taxiRide)); // sleep in nanoseconds to have a reproducible data rate for the data source this.dataRateListener.busySleep(startTime); } public void busySleep(long startTime) { long deadLine = startTime + this.delayInNanoSeconds; while (System.nanoTime() < deadLine) ; } I liked to see that you are passing a byte[] payload instead of an object or string. It is something to consider for sure! [1] https://stackoverflow.com/q/62061643/2096986 Thanks, Felipe *--* *-- Felipe Gutierrez* *-- skype: felipe.o.gutierrez* *--* *https://felipeogutierrez.blogspot.com <https://felipeogutierrez.blogspot.com>* On Mon, Jun 22, 2020 at 4:13 PM Arvid Heise <ar...@ververica.com> wrote: > If you are interested in measuring performance, you should also take a > look at our benchmark repo [1] and particular the Throughput job [2]. > > [1] https://github.com/dataArtisans/performance > [2] > https://github.com/dataArtisans/performance/blob/master/flink-jobs/src/main/java/com/github/projectflink/streaming/Throughput.java > > On Mon, Jun 22, 2020 at 3:36 PM Felipe Gutierrez < > felipe.o.gutier...@gmail.com> wrote: > >> Hi Arvid, >> >> thanks for the references. I didn't find those tests before. I will >> definitely consider them to test my application. >> >> The thing is that I am testing a pre-aggregation stream operator that I >> have implemented. Particularly I need a high workload to create >> backpressure on the shuffle phase, after the keyBy transformation is done. >> And I am monitoring the throughput only of this operator. So, I will stick >> with the source function but consider what there is on the other references. >> >> I know that the Table API already has a pre-agg [2]. However, mine works >> a little bit differently. >> >> [2] >> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tuning/streaming_aggregation_optimization.html >> >> Thanks, >> Felipe >> *--* >> *-- Felipe Gutierrez* >> >> *-- skype: felipe.o.gutierrez* >> *--* *https://felipeogutierrez.blogspot.com >> <https://felipeogutierrez.blogspot.com>* >> >> >> On Mon, Jun 22, 2020 at 2:54 PM Arvid Heise <ar...@ververica.com> wrote: >> >>> Hi Felipe, >>> >>> The examples are pretty old (6 years), hence they still use DataSet. >>> >>> You should be fine by mostly replacing sources with file sources (no >>> need to write your own source, except you want to generators) and using >>> global windows for joining. >>> >>> However, why not use SQL for TPC-H? We have an e2e test [1], where some >>> TPC-H queries are used (in slightly modified form) [2]. >>> We also have TPC-DS queries as e2e tests [3]. >>> >>> [1] >>> https://github.com/apache/flink/tree/master/flink-end-to-end-tests/flink-tpch-test >>> [2] >>> https://github.com/apache/flink/tree/master/flink-end-to-end-tests/test-scripts/test-data/tpch/modified-query >>> [3] >>> https://github.com/apache/flink/tree/master/flink-end-to-end-tests/flink-tpcds-test >>> >>> On Mon, Jun 22, 2020 at 12:35 PM Felipe Gutierrez < >>> felipe.o.gutier...@gmail.com> wrote: >>> >>>> Hi all, >>>> >>>> I would like to create some data stream queries tests using the TPC-H >>>> benchmark. I saw that there are some examples of TPC Q3[1] and Q10[2], >>>> however, they are using DataSet. If I consider creating these queries >>>> but using DataStream what are the caveats that I have to ensure when >>>> implementing the source function? I mean, the frequency of emitting >>>> items is certainly the first. I suppose that I would change the >>>> frequency of the workload globally for all data sources. Is only it or >>>> do you have other things to consider? >>>> >>>> [1] >>>> https://github.com/apache/flink/blob/master/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/TPCHQuery3.java >>>> [2] >>>> https://github.com/apache/flink/blob/master/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/TPCHQuery10.java >>>> >>>> Thanks, >>>> Felipe >>>> -- >>>> -- Felipe Gutierrez >>>> -- skype: felipe.o.gutierrez >>>> -- https://felipeogutierrez.blogspot.com >>>> >>> >>> >>> -- >>> >>> Arvid Heise | Senior Java Developer >>> >>> <https://www.ververica.com/> >>> >>> Follow us @VervericaData >>> >>> -- >>> >>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink >>> Conference >>> >>> Stream Processing | Event Driven | Real Time >>> >>> -- >>> >>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany >>> >>> -- >>> Ververica GmbH >>> Registered at Amtsgericht Charlottenburg: HRB 158244 B >>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji >>> (Toni) Cheng >>> >> > > -- > > Arvid Heise | Senior Java Developer > > <https://www.ververica.com/> > > Follow us @VervericaData > > -- > > Join Flink Forward <https://flink-forward.org/> - The Apache Flink > Conference > > Stream Processing | Event Driven | Real Time > > -- > > Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany > > -- > Ververica GmbH > Registered at Amtsgericht Charlottenburg: HRB 158244 B > Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji > (Toni) Cheng >