I am afraid that you can be much more precise if you use System.nanoTime()
instead of System.currentTimeMillis() together with Thread.sleep(delay);.
First because Thread.sleep is less precise [1] and second because you can
do less operations with System.nanoTime() in an empty loop. Like this:

while (reader.ready() && (line = reader.readLine()) != null) {
   startTime = System.nanoTime();
   taxiRide = TaxiRide.fromString(line);
   sourceContext.collectWithTimestamp(taxiRide, getEventTime(taxiRide));

   // sleep in nanoseconds to have a reproducible data rate for the data source
   this.dataRateListener.busySleep(startTime);
}

public void busySleep(long startTime) {
   long deadLine = startTime + this.delayInNanoSeconds;
   while (System.nanoTime() < deadLine) ;
}

I liked to see that you are passing a byte[] payload instead of an object
or string. It is something to consider for sure!

[1] https://stackoverflow.com/q/62061643/2096986

Thanks,
Felipe
*--*
*-- Felipe Gutierrez*

*-- skype: felipe.o.gutierrez*
*--* *https://felipeogutierrez.blogspot.com
<https://felipeogutierrez.blogspot.com>*


On Mon, Jun 22, 2020 at 4:13 PM Arvid Heise <ar...@ververica.com> wrote:

> If you are interested in measuring performance, you should also take a
> look at our benchmark repo [1] and particular the Throughput job [2].
>
> [1] https://github.com/dataArtisans/performance
> [2]
> https://github.com/dataArtisans/performance/blob/master/flink-jobs/src/main/java/com/github/projectflink/streaming/Throughput.java
>
> On Mon, Jun 22, 2020 at 3:36 PM Felipe Gutierrez <
> felipe.o.gutier...@gmail.com> wrote:
>
>> Hi Arvid,
>>
>> thanks for the references. I didn't find those tests before. I will
>> definitely consider them to test my application.
>>
>> The thing is that I am testing a pre-aggregation stream operator that I
>> have implemented. Particularly I need a high workload to create
>> backpressure on the shuffle phase, after the keyBy transformation is done.
>> And I am monitoring the throughput only of this operator. So, I will stick
>> with the source function but consider what there is on the other references.
>>
>> I know that the Table API already has a pre-agg [2]. However, mine works
>> a little bit differently.
>>
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tuning/streaming_aggregation_optimization.html
>>
>> Thanks,
>> Felipe
>> *--*
>> *-- Felipe Gutierrez*
>>
>> *-- skype: felipe.o.gutierrez*
>> *--* *https://felipeogutierrez.blogspot.com
>> <https://felipeogutierrez.blogspot.com>*
>>
>>
>> On Mon, Jun 22, 2020 at 2:54 PM Arvid Heise <ar...@ververica.com> wrote:
>>
>>> Hi Felipe,
>>>
>>> The examples are pretty old (6 years), hence they still use DataSet.
>>>
>>> You should be fine by mostly replacing sources with file sources (no
>>> need to write your own source, except you want to generators) and using
>>> global windows for joining.
>>>
>>> However, why not use SQL for TPC-H? We have an e2e test [1], where some
>>> TPC-H queries are used (in slightly modified form) [2].
>>> We also have TPC-DS queries as e2e tests [3].
>>>
>>> [1]
>>> https://github.com/apache/flink/tree/master/flink-end-to-end-tests/flink-tpch-test
>>> [2]
>>> https://github.com/apache/flink/tree/master/flink-end-to-end-tests/test-scripts/test-data/tpch/modified-query
>>> [3]
>>> https://github.com/apache/flink/tree/master/flink-end-to-end-tests/flink-tpcds-test
>>>
>>> On Mon, Jun 22, 2020 at 12:35 PM Felipe Gutierrez <
>>> felipe.o.gutier...@gmail.com> wrote:
>>>
>>>> Hi all,
>>>>
>>>> I would like to create some data stream queries tests using the TPC-H
>>>> benchmark. I saw that there are some examples of TPC Q3[1] and Q10[2],
>>>> however, they are using DataSet. If I consider creating these queries
>>>> but using DataStream what are the caveats that I have to ensure when
>>>> implementing the source function? I mean, the frequency of emitting
>>>> items is certainly the first. I suppose that I would change the
>>>> frequency of the workload globally for all data sources. Is only it or
>>>> do you have other things to consider?
>>>>
>>>> [1]
>>>> https://github.com/apache/flink/blob/master/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/TPCHQuery3.java
>>>> [2]
>>>> https://github.com/apache/flink/blob/master/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/TPCHQuery10.java
>>>>
>>>> Thanks,
>>>> Felipe
>>>> --
>>>> -- Felipe Gutierrez
>>>> -- skype: felipe.o.gutierrez
>>>> -- https://felipeogutierrez.blogspot.com
>>>>
>>>
>>>
>>> --
>>>
>>> Arvid Heise | Senior Java Developer
>>>
>>> <https://www.ververica.com/>
>>>
>>> Follow us @VervericaData
>>>
>>> --
>>>
>>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>>> Conference
>>>
>>> Stream Processing | Event Driven | Real Time
>>>
>>> --
>>>
>>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>>
>>> --
>>> Ververica GmbH
>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
>>> (Toni) Cheng
>>>
>>
>
> --
>
> Arvid Heise | Senior Java Developer
>
> <https://www.ververica.com/>
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> (Toni) Cheng
>

Reply via email to