HI, Guowei

yeah, I think so too. There is no way trigger a checkpoint and wath the
checkpoint finished now, so I will do the benchmark with lower level api.


Guowei Ma <guowei....@gmail.com> 于2021年3月25日周四 下午4:59写道:

> Hi,
> I am not an expert of JMH but it seems that it is not an error. From the
> log it looks like that the job is not finished.
> The data source continues to read data when JMH finishes.
>
> Thread[Legacy Source Thread - Source:
> TableSourceScan(table=[[default_catalog, default_database,
> CLICKHOUSE_SOURCE_BENCHMARK]], fields=[first_bigint, second_bigint,
> first_int, second_int, first_float, second_float, first_double,
> second_double, first_string, second_string]) ->
> Calc(select=[_UTF-16LE'2020-12-12' AS EXPR$0, first_bigint, second_bigint,
> first_int, second_int, first_float, second_float, first_double,
> second_double, first_string, second_string]) -> Sink:
> Sink(table=[default_catalog.default_database.CLICKHOUSE_SINK_BENCHMARK],
> fields=[dt, first_bigint, second_bigint, first_int, second_int,
> first_float, second_float, first_double, second_double, first_string,
> second_string]) (3/6),5,Flink Task Threads]
>   at
> org.apache.flink.table.data.binary.BinaryStringData.fromString(BinaryStringData.java:82)
>   at org.apache.flink.table.data.StringData.fromString(StringData.java:52)
>   at
> org.apache.flink.table.factories.DataGenTableSourceFactory$1.next(DataGenTableSourceFactory.java:171)
>   at
> org.apache.flink.table.factories.DataGenTableSourceFactory$1.next(DataGenTableSourceFactory.java:168)
>   at
> org.apache.flink.table.factories.DataGenTableSourceFactory$RowGenerator.next(DataGenTableSourceFactory.java:320)
>   at
> org.apache.flink.table.factories.DataGenTableSourceFactory$RowGenerator.next(DataGenTableSourceFactory.java:277)
>   at
> org.apache.flink.streaming.api.functions.source.datagen.DataGeneratorSource.run(DataGeneratorSource.java:82)
>   at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
>   at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
>   at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:213)
>
> Best,
> Guowei
>
>
> On Wed, Mar 24, 2021 at 9:56 PM jie mei <meijie.w...@gmail.com> wrote:
>
>> Hi, Yik San
>>
>> I use a library wroten by myself and trying to verify the performance.
>>
>>
>> Yik San Chan <evan.chanyik...@gmail.com> 于2021年3月24日周三 下午9:07写道:
>>
>>> Hi Jie,
>>>
>>> I am curious what library do you use to get the ClickHouseTableBuilder
>>>
>>> On Wed, Mar 24, 2021 at 8:41 PM jie mei <meijie.w...@gmail.com> wrote:
>>>
>>>> Hi, Community
>>>>
>>>> I run a jmh benchmark task get blew error, which use flink sql
>>>> consuming data from data-gen connector(10_000_000) and write data to
>>>> clickhouse. blew is partly log and you can see completable log by attached
>>>> file
>>>>
>>>> *My jmh benchmark code as blew:*
>>>>
>>>> @Benchmark
>>>> @Threads(1)
>>>> @Fork(1)
>>>> public void sinkBenchmark() throws IOException {
>>>>
>>>>   StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment
>>>>       .getExecutionEnvironment();
>>>>   streamEnv.enableCheckpointing(60000);
>>>>
>>>>   EnvironmentSettings settings = EnvironmentSettings.newInstance()
>>>>       .useBlinkPlanner()
>>>>       .inStreamingMode().build();
>>>>   TableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv, 
>>>> settings);
>>>>
>>>>   // create clickhouse table
>>>>   new ClickHouseTableBuilder(tableEnv,
>>>>       parseSchema("clickhouse_sink_table.sql"))
>>>>       .database("benchmark")
>>>>       .table("bilophus_sink_benchmark")
>>>>       .address("jdbc:clickhouse://localhost:8123")
>>>>       .build();
>>>>
>>>>   // create mock data table
>>>>   tableEnv.executeSql(
>>>>       parseSchema("clickhouse_source_table.sql") +
>>>>           "WITH (" +
>>>>           "'connector' = 'datagen'," +
>>>>           "'number-of-rows' = '10000000')");
>>>>
>>>>   tableEnv.executeSql(
>>>>       "INSERT INTO CLICKHOUSE_SINK_BENCHMARK SELECT '2020-12-12', * FROM 
>>>> CLICKHOUSE_SOURCE_BENCHMARK");
>>>>
>>>> }
>>>>
>>>> *running command:*
>>>>
>>>> mvn clean package -DskipTests
>>>>
>>>> <plugin>
>>>>   <groupId>org.codehaus.mojo</groupId>
>>>>   <artifactId>exec-maven-plugin</artifactId>
>>>>   <version>1.6.0</version>
>>>>   <executions>
>>>>     <execution>
>>>>       <id>test-benchmarks</id>
>>>>       <phase>test</phase>
>>>>       <goals>
>>>>         <goal>exec</goal>
>>>>       </goals>
>>>>     </execution>
>>>>   </executions>
>>>>   <configuration>
>>>>     <skip>false</skip>
>>>>     <classpathScope>test</classpathScope>
>>>>     <executable>java</executable>
>>>>     <arguments>
>>>>       <argument>-Xmx6g</argument>
>>>>       <argument>-classpath</argument>
>>>>       <classpath/>
>>>>       <argument>org.openjdk.jmh.Main</argument>
>>>>       <!--shouldFailOnError-->
>>>>       <argument>-foe</argument>
>>>>       <argument>true</argument>
>>>>       <!--speed up tests-->
>>>>       <argument>-f</argument>
>>>>       <argument>1</argument>
>>>>       <argument>-i</argument>
>>>>       <argument>1</argument>
>>>>       <argument>-wi</argument>
>>>>       <argument>0</argument>
>>>>       <argument>-rf</argument>
>>>>       <argument>csv</argument>
>>>>       <argument>.*</argument>
>>>>     </arguments>
>>>>   </configuration>
>>>> </plugin>
>>>>
>>>>
>>>> Non-finished threads:
>>>>
>>>> Thread[Source: TableSourceScan(table=[[default_catalog,
>>>> default_database, CLICKHOUSE_SOURCE_BENCHMARK]], fields=[first_bigint,
>>>> second_bigint, first_int, second_int, first_float, second_float,
>>>> first_double, second_double, first_string, s
>>>> econd_string]) -> Calc(select=[_UTF-16LE'2020-12-12' AS EXPR$0,
>>>> first_bigint, second_bigint, first_int, second_int, first_float,
>>>> second_float, first_double, second_double, first_string, second_string]) ->
>>>> Sink: Sink(table=[default_catal
>>>> og.default_database.CLICKHOUSE_SINK_BENCHMARK], fields=[dt,
>>>> first_bigint, second_bigint, first_int, second_int, first_float,
>>>> second_float, first_double, second_double, first_string, second_string])
>>>> (1/6),5,Flink Task Threads]
>>>>  at sun.misc.Unsafe.park(Native Method)
>>>>  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>>>>  at
>>>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
>>>>
>>>>  at
>>>> org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl.take(TaskMailboxImpl.java:146)
>>>>
>>>>  at
>>>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:298)
>>>>
>>>>  at
>>>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:183)
>>>>
>>>>  at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:569)
>>>>
>>>>  at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:534)
>>>>
>>>>  at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
>>>>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
>>>>  at java.lang.Thread.run(Thread.java:748)
>>>>
>>>> Thread[flink-akka.actor.default-dispatcher-8,5,main]
>>>>  at sun.misc.Unsafe.park(Native Method)
>>>>  at akka.dispatch.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
>>>>  at
>>>> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>>  at
>>>> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>>
>>>>
>>>> Thread[flink-akka.actor.default-dispatcher-2,5,main]
>>>>  at sun.misc.Unsafe.park(Native Method)
>>>>  at akka.dispatch.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
>>>>  at
>>>> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>>  at
>>>> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>>
>>>>
>>>> Thread[Source: TableSourceScan(table=[[default_catalog,
>>>> default_database, CLICKHOUSE_SOURCE_BENCHMARK]], fields=[first_bigint,
>>>> second_bigint, first_int, second_int, first_float, second_float,
>>>> first_double, second_double, first_string, s
>>>> econd_string]) -> Calc(select=[_UTF-16LE'2020-12-12' AS EXPR$0,
>>>> first_bigint, second_bigint, first_int, second_int, first_float,
>>>> second_float, first_double, second_double, first_string, second_string]) ->
>>>> Sink: Sink(table=[default_catal
>>>> og.default_database.CLICKHOUSE_SINK_BENCHMARK], fields=[dt,
>>>> first_bigint, second_bigint, first_int, second_int, first_float,
>>>> second_float, first_double, second_double, first_string, second_string])
>>>> (4/6),5,Flink Task Threads]
>>>>  at sun.misc.Unsafe.park(Native Method)
>>>>  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>>>>  at
>>>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
>>>>
>>>>  at
>>>> org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl.take(TaskMailboxImpl.java:146)
>>>>
>>>>  at
>>>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:298)
>>>>
>>>>  at
>>>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:183)
>>>>
>>>>  at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:569)
>>>>
>>>>  at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:534)
>>>>
>>>>  at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
>>>>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
>>>>  at java.lang.Thread.run(Thread.java:748)
>>>>
>>>>
>>>> --
>>>>
>>>> *Best Regards*
>>>> *Jeremy Mei*
>>>>
>>>
>>
>> --
>>
>> *Best Regards*
>> *Jeremy Mei*
>>
>

-- 

*Best Regards*
*Jeremy Mei*

Reply via email to