Hi, Yik San I use a library wroten by myself and trying to verify the performance.
Yik San Chan <evan.chanyik...@gmail.com> 于2021年3月24日周三 下午9:07写道: > Hi Jie, > > I am curious what library do you use to get the ClickHouseTableBuilder > > On Wed, Mar 24, 2021 at 8:41 PM jie mei <meijie.w...@gmail.com> wrote: > >> Hi, Community >> >> I run a jmh benchmark task get blew error, which use flink sql consuming >> data from data-gen connector(10_000_000) and write data to clickhouse. blew >> is partly log and you can see completable log by attached file >> >> *My jmh benchmark code as blew:* >> >> @Benchmark >> @Threads(1) >> @Fork(1) >> public void sinkBenchmark() throws IOException { >> >> StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment >> .getExecutionEnvironment(); >> streamEnv.enableCheckpointing(60000); >> >> EnvironmentSettings settings = EnvironmentSettings.newInstance() >> .useBlinkPlanner() >> .inStreamingMode().build(); >> TableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv, >> settings); >> >> // create clickhouse table >> new ClickHouseTableBuilder(tableEnv, >> parseSchema("clickhouse_sink_table.sql")) >> .database("benchmark") >> .table("bilophus_sink_benchmark") >> .address("jdbc:clickhouse://localhost:8123") >> .build(); >> >> // create mock data table >> tableEnv.executeSql( >> parseSchema("clickhouse_source_table.sql") + >> "WITH (" + >> "'connector' = 'datagen'," + >> "'number-of-rows' = '10000000')"); >> >> tableEnv.executeSql( >> "INSERT INTO CLICKHOUSE_SINK_BENCHMARK SELECT '2020-12-12', * FROM >> CLICKHOUSE_SOURCE_BENCHMARK"); >> >> } >> >> *running command:* >> >> mvn clean package -DskipTests >> >> <plugin> >> <groupId>org.codehaus.mojo</groupId> >> <artifactId>exec-maven-plugin</artifactId> >> <version>1.6.0</version> >> <executions> >> <execution> >> <id>test-benchmarks</id> >> <phase>test</phase> >> <goals> >> <goal>exec</goal> >> </goals> >> </execution> >> </executions> >> <configuration> >> <skip>false</skip> >> <classpathScope>test</classpathScope> >> <executable>java</executable> >> <arguments> >> <argument>-Xmx6g</argument> >> <argument>-classpath</argument> >> <classpath/> >> <argument>org.openjdk.jmh.Main</argument> >> <!--shouldFailOnError--> >> <argument>-foe</argument> >> <argument>true</argument> >> <!--speed up tests--> >> <argument>-f</argument> >> <argument>1</argument> >> <argument>-i</argument> >> <argument>1</argument> >> <argument>-wi</argument> >> <argument>0</argument> >> <argument>-rf</argument> >> <argument>csv</argument> >> <argument>.*</argument> >> </arguments> >> </configuration> >> </plugin> >> >> >> Non-finished threads: >> >> Thread[Source: TableSourceScan(table=[[default_catalog, default_database, >> CLICKHOUSE_SOURCE_BENCHMARK]], fields=[first_bigint, second_bigint, >> first_int, second_int, first_float, second_float, first_double, >> second_double, first_string, s >> econd_string]) -> Calc(select=[_UTF-16LE'2020-12-12' AS EXPR$0, >> first_bigint, second_bigint, first_int, second_int, first_float, >> second_float, first_double, second_double, first_string, second_string]) -> >> Sink: Sink(table=[default_catal >> og.default_database.CLICKHOUSE_SINK_BENCHMARK], fields=[dt, first_bigint, >> second_bigint, first_int, second_int, first_float, second_float, >> first_double, second_double, first_string, second_string]) (1/6),5,Flink >> Task Threads] >> at sun.misc.Unsafe.park(Native Method) >> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) >> at >> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039) >> >> at >> org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl.take(TaskMailboxImpl.java:146) >> >> at >> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:298) >> >> at >> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:183) >> >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:569) >> >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:534) >> >> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) >> at java.lang.Thread.run(Thread.java:748) >> >> Thread[flink-akka.actor.default-dispatcher-8,5,main] >> at sun.misc.Unsafe.park(Native Method) >> at akka.dispatch.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075) >> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >> at >> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >> >> >> Thread[flink-akka.actor.default-dispatcher-2,5,main] >> at sun.misc.Unsafe.park(Native Method) >> at akka.dispatch.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075) >> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >> at >> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >> >> >> Thread[Source: TableSourceScan(table=[[default_catalog, default_database, >> CLICKHOUSE_SOURCE_BENCHMARK]], fields=[first_bigint, second_bigint, >> first_int, second_int, first_float, second_float, first_double, >> second_double, first_string, s >> econd_string]) -> Calc(select=[_UTF-16LE'2020-12-12' AS EXPR$0, >> first_bigint, second_bigint, first_int, second_int, first_float, >> second_float, first_double, second_double, first_string, second_string]) -> >> Sink: Sink(table=[default_catal >> og.default_database.CLICKHOUSE_SINK_BENCHMARK], fields=[dt, first_bigint, >> second_bigint, first_int, second_int, first_float, second_float, >> first_double, second_double, first_string, second_string]) (4/6),5,Flink >> Task Threads] >> at sun.misc.Unsafe.park(Native Method) >> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) >> at >> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039) >> >> at >> org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl.take(TaskMailboxImpl.java:146) >> >> at >> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:298) >> >> at >> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:183) >> >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:569) >> >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:534) >> >> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) >> at java.lang.Thread.run(Thread.java:748) >> >> >> -- >> >> *Best Regards* >> *Jeremy Mei* >> > -- *Best Regards* *Jeremy Mei*