Hi, I am not an expert of JMH but it seems that it is not an error. From the log it looks like that the job is not finished. The data source continues to read data when JMH finishes.
Thread[Legacy Source Thread - Source: TableSourceScan(table=[[default_catalog, default_database, CLICKHOUSE_SOURCE_BENCHMARK]], fields=[first_bigint, second_bigint, first_int, second_int, first_float, second_float, first_double, second_double, first_string, second_string]) -> Calc(select=[_UTF-16LE'2020-12-12' AS EXPR$0, first_bigint, second_bigint, first_int, second_int, first_float, second_float, first_double, second_double, first_string, second_string]) -> Sink: Sink(table=[default_catalog.default_database.CLICKHOUSE_SINK_BENCHMARK], fields=[dt, first_bigint, second_bigint, first_int, second_int, first_float, second_float, first_double, second_double, first_string, second_string]) (3/6),5,Flink Task Threads] at org.apache.flink.table.data.binary.BinaryStringData.fromString(BinaryStringData.java:82) at org.apache.flink.table.data.StringData.fromString(StringData.java:52) at org.apache.flink.table.factories.DataGenTableSourceFactory$1.next(DataGenTableSourceFactory.java:171) at org.apache.flink.table.factories.DataGenTableSourceFactory$1.next(DataGenTableSourceFactory.java:168) at org.apache.flink.table.factories.DataGenTableSourceFactory$RowGenerator.next(DataGenTableSourceFactory.java:320) at org.apache.flink.table.factories.DataGenTableSourceFactory$RowGenerator.next(DataGenTableSourceFactory.java:277) at org.apache.flink.streaming.api.functions.source.datagen.DataGeneratorSource.run(DataGeneratorSource.java:82) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:213) Best, Guowei On Wed, Mar 24, 2021 at 9:56 PM jie mei <meijie.w...@gmail.com> wrote: > Hi, Yik San > > I use a library wroten by myself and trying to verify the performance. > > > Yik San Chan <evan.chanyik...@gmail.com> 于2021年3月24日周三 下午9:07写道: > >> Hi Jie, >> >> I am curious what library do you use to get the ClickHouseTableBuilder >> >> On Wed, Mar 24, 2021 at 8:41 PM jie mei <meijie.w...@gmail.com> wrote: >> >>> Hi, Community >>> >>> I run a jmh benchmark task get blew error, which use flink sql consuming >>> data from data-gen connector(10_000_000) and write data to clickhouse. blew >>> is partly log and you can see completable log by attached file >>> >>> *My jmh benchmark code as blew:* >>> >>> @Benchmark >>> @Threads(1) >>> @Fork(1) >>> public void sinkBenchmark() throws IOException { >>> >>> StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment >>> .getExecutionEnvironment(); >>> streamEnv.enableCheckpointing(60000); >>> >>> EnvironmentSettings settings = EnvironmentSettings.newInstance() >>> .useBlinkPlanner() >>> .inStreamingMode().build(); >>> TableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv, >>> settings); >>> >>> // create clickhouse table >>> new ClickHouseTableBuilder(tableEnv, >>> parseSchema("clickhouse_sink_table.sql")) >>> .database("benchmark") >>> .table("bilophus_sink_benchmark") >>> .address("jdbc:clickhouse://localhost:8123") >>> .build(); >>> >>> // create mock data table >>> tableEnv.executeSql( >>> parseSchema("clickhouse_source_table.sql") + >>> "WITH (" + >>> "'connector' = 'datagen'," + >>> "'number-of-rows' = '10000000')"); >>> >>> tableEnv.executeSql( >>> "INSERT INTO CLICKHOUSE_SINK_BENCHMARK SELECT '2020-12-12', * FROM >>> CLICKHOUSE_SOURCE_BENCHMARK"); >>> >>> } >>> >>> *running command:* >>> >>> mvn clean package -DskipTests >>> >>> <plugin> >>> <groupId>org.codehaus.mojo</groupId> >>> <artifactId>exec-maven-plugin</artifactId> >>> <version>1.6.0</version> >>> <executions> >>> <execution> >>> <id>test-benchmarks</id> >>> <phase>test</phase> >>> <goals> >>> <goal>exec</goal> >>> </goals> >>> </execution> >>> </executions> >>> <configuration> >>> <skip>false</skip> >>> <classpathScope>test</classpathScope> >>> <executable>java</executable> >>> <arguments> >>> <argument>-Xmx6g</argument> >>> <argument>-classpath</argument> >>> <classpath/> >>> <argument>org.openjdk.jmh.Main</argument> >>> <!--shouldFailOnError--> >>> <argument>-foe</argument> >>> <argument>true</argument> >>> <!--speed up tests--> >>> <argument>-f</argument> >>> <argument>1</argument> >>> <argument>-i</argument> >>> <argument>1</argument> >>> <argument>-wi</argument> >>> <argument>0</argument> >>> <argument>-rf</argument> >>> <argument>csv</argument> >>> <argument>.*</argument> >>> </arguments> >>> </configuration> >>> </plugin> >>> >>> >>> Non-finished threads: >>> >>> Thread[Source: TableSourceScan(table=[[default_catalog, >>> default_database, CLICKHOUSE_SOURCE_BENCHMARK]], fields=[first_bigint, >>> second_bigint, first_int, second_int, first_float, second_float, >>> first_double, second_double, first_string, s >>> econd_string]) -> Calc(select=[_UTF-16LE'2020-12-12' AS EXPR$0, >>> first_bigint, second_bigint, first_int, second_int, first_float, >>> second_float, first_double, second_double, first_string, second_string]) -> >>> Sink: Sink(table=[default_catal >>> og.default_database.CLICKHOUSE_SINK_BENCHMARK], fields=[dt, >>> first_bigint, second_bigint, first_int, second_int, first_float, >>> second_float, first_double, second_double, first_string, second_string]) >>> (1/6),5,Flink Task Threads] >>> at sun.misc.Unsafe.park(Native Method) >>> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) >>> at >>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039) >>> >>> at >>> org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl.take(TaskMailboxImpl.java:146) >>> >>> at >>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:298) >>> >>> at >>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:183) >>> >>> at >>> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:569) >>> >>> at >>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:534) >>> >>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) >>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) >>> at java.lang.Thread.run(Thread.java:748) >>> >>> Thread[flink-akka.actor.default-dispatcher-8,5,main] >>> at sun.misc.Unsafe.park(Native Method) >>> at akka.dispatch.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075) >>> at >>> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >>> at >>> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >>> >>> >>> Thread[flink-akka.actor.default-dispatcher-2,5,main] >>> at sun.misc.Unsafe.park(Native Method) >>> at akka.dispatch.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075) >>> at >>> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >>> at >>> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >>> >>> >>> Thread[Source: TableSourceScan(table=[[default_catalog, >>> default_database, CLICKHOUSE_SOURCE_BENCHMARK]], fields=[first_bigint, >>> second_bigint, first_int, second_int, first_float, second_float, >>> first_double, second_double, first_string, s >>> econd_string]) -> Calc(select=[_UTF-16LE'2020-12-12' AS EXPR$0, >>> first_bigint, second_bigint, first_int, second_int, first_float, >>> second_float, first_double, second_double, first_string, second_string]) -> >>> Sink: Sink(table=[default_catal >>> og.default_database.CLICKHOUSE_SINK_BENCHMARK], fields=[dt, >>> first_bigint, second_bigint, first_int, second_int, first_float, >>> second_float, first_double, second_double, first_string, second_string]) >>> (4/6),5,Flink Task Threads] >>> at sun.misc.Unsafe.park(Native Method) >>> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) >>> at >>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039) >>> >>> at >>> org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl.take(TaskMailboxImpl.java:146) >>> >>> at >>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:298) >>> >>> at >>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:183) >>> >>> at >>> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:569) >>> >>> at >>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:534) >>> >>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) >>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) >>> at java.lang.Thread.run(Thread.java:748) >>> >>> >>> -- >>> >>> *Best Regards* >>> *Jeremy Mei* >>> >> > > -- > > *Best Regards* > *Jeremy Mei* >