Hi, Yik San

I use a library wroten by myself and trying to verify the performance.


Yik San Chan <evan.chanyik...@gmail.com> 于2021年3月24日周三 下午9:07写道:

> Hi Jie,
>
> I am curious what library do you use to get the ClickHouseTableBuilder
>
> On Wed, Mar 24, 2021 at 8:41 PM jie mei <meijie.w...@gmail.com> wrote:
>
>> Hi, Community
>>
>> I run a jmh benchmark task get blew error, which use flink sql consuming
>> data from data-gen connector(10_000_000) and write data to clickhouse. blew
>> is partly log and you can see completable log by attached file
>>
>> *My jmh benchmark code as blew:*
>>
>> @Benchmark
>> @Threads(1)
>> @Fork(1)
>> public void sinkBenchmark() throws IOException {
>>
>>   StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment
>>       .getExecutionEnvironment();
>>   streamEnv.enableCheckpointing(60000);
>>
>>   EnvironmentSettings settings = EnvironmentSettings.newInstance()
>>       .useBlinkPlanner()
>>       .inStreamingMode().build();
>>   TableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv, 
>> settings);
>>
>>   // create clickhouse table
>>   new ClickHouseTableBuilder(tableEnv,
>>       parseSchema("clickhouse_sink_table.sql"))
>>       .database("benchmark")
>>       .table("bilophus_sink_benchmark")
>>       .address("jdbc:clickhouse://localhost:8123")
>>       .build();
>>
>>   // create mock data table
>>   tableEnv.executeSql(
>>       parseSchema("clickhouse_source_table.sql") +
>>           "WITH (" +
>>           "'connector' = 'datagen'," +
>>           "'number-of-rows' = '10000000')");
>>
>>   tableEnv.executeSql(
>>       "INSERT INTO CLICKHOUSE_SINK_BENCHMARK SELECT '2020-12-12', * FROM 
>> CLICKHOUSE_SOURCE_BENCHMARK");
>>
>> }
>>
>> *running command:*
>>
>> mvn clean package -DskipTests
>>
>> <plugin>
>>   <groupId>org.codehaus.mojo</groupId>
>>   <artifactId>exec-maven-plugin</artifactId>
>>   <version>1.6.0</version>
>>   <executions>
>>     <execution>
>>       <id>test-benchmarks</id>
>>       <phase>test</phase>
>>       <goals>
>>         <goal>exec</goal>
>>       </goals>
>>     </execution>
>>   </executions>
>>   <configuration>
>>     <skip>false</skip>
>>     <classpathScope>test</classpathScope>
>>     <executable>java</executable>
>>     <arguments>
>>       <argument>-Xmx6g</argument>
>>       <argument>-classpath</argument>
>>       <classpath/>
>>       <argument>org.openjdk.jmh.Main</argument>
>>       <!--shouldFailOnError-->
>>       <argument>-foe</argument>
>>       <argument>true</argument>
>>       <!--speed up tests-->
>>       <argument>-f</argument>
>>       <argument>1</argument>
>>       <argument>-i</argument>
>>       <argument>1</argument>
>>       <argument>-wi</argument>
>>       <argument>0</argument>
>>       <argument>-rf</argument>
>>       <argument>csv</argument>
>>       <argument>.*</argument>
>>     </arguments>
>>   </configuration>
>> </plugin>
>>
>>
>> Non-finished threads:
>>
>> Thread[Source: TableSourceScan(table=[[default_catalog, default_database,
>> CLICKHOUSE_SOURCE_BENCHMARK]], fields=[first_bigint, second_bigint,
>> first_int, second_int, first_float, second_float, first_double,
>> second_double, first_string, s
>> econd_string]) -> Calc(select=[_UTF-16LE'2020-12-12' AS EXPR$0,
>> first_bigint, second_bigint, first_int, second_int, first_float,
>> second_float, first_double, second_double, first_string, second_string]) ->
>> Sink: Sink(table=[default_catal
>> og.default_database.CLICKHOUSE_SINK_BENCHMARK], fields=[dt, first_bigint,
>> second_bigint, first_int, second_int, first_float, second_float,
>> first_double, second_double, first_string, second_string]) (1/6),5,Flink
>> Task Threads]
>>  at sun.misc.Unsafe.park(Native Method)
>>  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>>  at
>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
>>
>>  at
>> org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl.take(TaskMailboxImpl.java:146)
>>
>>  at
>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:298)
>>
>>  at
>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:183)
>>
>>  at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:569)
>>
>>  at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:534)
>>
>>  at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
>>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
>>  at java.lang.Thread.run(Thread.java:748)
>>
>> Thread[flink-akka.actor.default-dispatcher-8,5,main]
>>  at sun.misc.Unsafe.park(Native Method)
>>  at akka.dispatch.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
>>  at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>  at
>> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>
>>
>> Thread[flink-akka.actor.default-dispatcher-2,5,main]
>>  at sun.misc.Unsafe.park(Native Method)
>>  at akka.dispatch.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
>>  at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>  at
>> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>
>>
>> Thread[Source: TableSourceScan(table=[[default_catalog, default_database,
>> CLICKHOUSE_SOURCE_BENCHMARK]], fields=[first_bigint, second_bigint,
>> first_int, second_int, first_float, second_float, first_double,
>> second_double, first_string, s
>> econd_string]) -> Calc(select=[_UTF-16LE'2020-12-12' AS EXPR$0,
>> first_bigint, second_bigint, first_int, second_int, first_float,
>> second_float, first_double, second_double, first_string, second_string]) ->
>> Sink: Sink(table=[default_catal
>> og.default_database.CLICKHOUSE_SINK_BENCHMARK], fields=[dt, first_bigint,
>> second_bigint, first_int, second_int, first_float, second_float,
>> first_double, second_double, first_string, second_string]) (4/6),5,Flink
>> Task Threads]
>>  at sun.misc.Unsafe.park(Native Method)
>>  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>>  at
>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
>>
>>  at
>> org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl.take(TaskMailboxImpl.java:146)
>>
>>  at
>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:298)
>>
>>  at
>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:183)
>>
>>  at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:569)
>>
>>  at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:534)
>>
>>  at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
>>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
>>  at java.lang.Thread.run(Thread.java:748)
>>
>>
>> --
>>
>> *Best Regards*
>> *Jeremy Mei*
>>
>

-- 

*Best Regards*
*Jeremy Mei*

Reply via email to