HI, Guowei yeah, I think so too. There is no way trigger a checkpoint and wath the checkpoint finished now, so I will do the benchmark with lower level api.
Guowei Ma <guowei....@gmail.com> 于2021年3月25日周四 下午4:59写道: > Hi, > I am not an expert of JMH but it seems that it is not an error. From the > log it looks like that the job is not finished. > The data source continues to read data when JMH finishes. > > Thread[Legacy Source Thread - Source: > TableSourceScan(table=[[default_catalog, default_database, > CLICKHOUSE_SOURCE_BENCHMARK]], fields=[first_bigint, second_bigint, > first_int, second_int, first_float, second_float, first_double, > second_double, first_string, second_string]) -> > Calc(select=[_UTF-16LE'2020-12-12' AS EXPR$0, first_bigint, second_bigint, > first_int, second_int, first_float, second_float, first_double, > second_double, first_string, second_string]) -> Sink: > Sink(table=[default_catalog.default_database.CLICKHOUSE_SINK_BENCHMARK], > fields=[dt, first_bigint, second_bigint, first_int, second_int, > first_float, second_float, first_double, second_double, first_string, > second_string]) (3/6),5,Flink Task Threads] > at > org.apache.flink.table.data.binary.BinaryStringData.fromString(BinaryStringData.java:82) > at org.apache.flink.table.data.StringData.fromString(StringData.java:52) > at > org.apache.flink.table.factories.DataGenTableSourceFactory$1.next(DataGenTableSourceFactory.java:171) > at > org.apache.flink.table.factories.DataGenTableSourceFactory$1.next(DataGenTableSourceFactory.java:168) > at > org.apache.flink.table.factories.DataGenTableSourceFactory$RowGenerator.next(DataGenTableSourceFactory.java:320) > at > org.apache.flink.table.factories.DataGenTableSourceFactory$RowGenerator.next(DataGenTableSourceFactory.java:277) > at > org.apache.flink.streaming.api.functions.source.datagen.DataGeneratorSource.run(DataGeneratorSource.java:82) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) > at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:213) > > Best, > Guowei > > > On Wed, Mar 24, 2021 at 9:56 PM jie mei <meijie.w...@gmail.com> wrote: > >> Hi, Yik San >> >> I use a library wroten by myself and trying to verify the performance. >> >> >> Yik San Chan <evan.chanyik...@gmail.com> 于2021年3月24日周三 下午9:07写道: >> >>> Hi Jie, >>> >>> I am curious what library do you use to get the ClickHouseTableBuilder >>> >>> On Wed, Mar 24, 2021 at 8:41 PM jie mei <meijie.w...@gmail.com> wrote: >>> >>>> Hi, Community >>>> >>>> I run a jmh benchmark task get blew error, which use flink sql >>>> consuming data from data-gen connector(10_000_000) and write data to >>>> clickhouse. blew is partly log and you can see completable log by attached >>>> file >>>> >>>> *My jmh benchmark code as blew:* >>>> >>>> @Benchmark >>>> @Threads(1) >>>> @Fork(1) >>>> public void sinkBenchmark() throws IOException { >>>> >>>> StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment >>>> .getExecutionEnvironment(); >>>> streamEnv.enableCheckpointing(60000); >>>> >>>> EnvironmentSettings settings = EnvironmentSettings.newInstance() >>>> .useBlinkPlanner() >>>> .inStreamingMode().build(); >>>> TableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv, >>>> settings); >>>> >>>> // create clickhouse table >>>> new ClickHouseTableBuilder(tableEnv, >>>> parseSchema("clickhouse_sink_table.sql")) >>>> .database("benchmark") >>>> .table("bilophus_sink_benchmark") >>>> .address("jdbc:clickhouse://localhost:8123") >>>> .build(); >>>> >>>> // create mock data table >>>> tableEnv.executeSql( >>>> parseSchema("clickhouse_source_table.sql") + >>>> "WITH (" + >>>> "'connector' = 'datagen'," + >>>> "'number-of-rows' = '10000000')"); >>>> >>>> tableEnv.executeSql( >>>> "INSERT INTO CLICKHOUSE_SINK_BENCHMARK SELECT '2020-12-12', * FROM >>>> CLICKHOUSE_SOURCE_BENCHMARK"); >>>> >>>> } >>>> >>>> *running command:* >>>> >>>> mvn clean package -DskipTests >>>> >>>> <plugin> >>>> <groupId>org.codehaus.mojo</groupId> >>>> <artifactId>exec-maven-plugin</artifactId> >>>> <version>1.6.0</version> >>>> <executions> >>>> <execution> >>>> <id>test-benchmarks</id> >>>> <phase>test</phase> >>>> <goals> >>>> <goal>exec</goal> >>>> </goals> >>>> </execution> >>>> </executions> >>>> <configuration> >>>> <skip>false</skip> >>>> <classpathScope>test</classpathScope> >>>> <executable>java</executable> >>>> <arguments> >>>> <argument>-Xmx6g</argument> >>>> <argument>-classpath</argument> >>>> <classpath/> >>>> <argument>org.openjdk.jmh.Main</argument> >>>> <!--shouldFailOnError--> >>>> <argument>-foe</argument> >>>> <argument>true</argument> >>>> <!--speed up tests--> >>>> <argument>-f</argument> >>>> <argument>1</argument> >>>> <argument>-i</argument> >>>> <argument>1</argument> >>>> <argument>-wi</argument> >>>> <argument>0</argument> >>>> <argument>-rf</argument> >>>> <argument>csv</argument> >>>> <argument>.*</argument> >>>> </arguments> >>>> </configuration> >>>> </plugin> >>>> >>>> >>>> Non-finished threads: >>>> >>>> Thread[Source: TableSourceScan(table=[[default_catalog, >>>> default_database, CLICKHOUSE_SOURCE_BENCHMARK]], fields=[first_bigint, >>>> second_bigint, first_int, second_int, first_float, second_float, >>>> first_double, second_double, first_string, s >>>> econd_string]) -> Calc(select=[_UTF-16LE'2020-12-12' AS EXPR$0, >>>> first_bigint, second_bigint, first_int, second_int, first_float, >>>> second_float, first_double, second_double, first_string, second_string]) -> >>>> Sink: Sink(table=[default_catal >>>> og.default_database.CLICKHOUSE_SINK_BENCHMARK], fields=[dt, >>>> first_bigint, second_bigint, first_int, second_int, first_float, >>>> second_float, first_double, second_double, first_string, second_string]) >>>> (1/6),5,Flink Task Threads] >>>> at sun.misc.Unsafe.park(Native Method) >>>> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) >>>> at >>>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039) >>>> >>>> at >>>> org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl.take(TaskMailboxImpl.java:146) >>>> >>>> at >>>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:298) >>>> >>>> at >>>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:183) >>>> >>>> at >>>> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:569) >>>> >>>> at >>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:534) >>>> >>>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) >>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) >>>> at java.lang.Thread.run(Thread.java:748) >>>> >>>> Thread[flink-akka.actor.default-dispatcher-8,5,main] >>>> at sun.misc.Unsafe.park(Native Method) >>>> at akka.dispatch.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075) >>>> at >>>> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >>>> at >>>> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >>>> >>>> >>>> Thread[flink-akka.actor.default-dispatcher-2,5,main] >>>> at sun.misc.Unsafe.park(Native Method) >>>> at akka.dispatch.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075) >>>> at >>>> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >>>> at >>>> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >>>> >>>> >>>> Thread[Source: TableSourceScan(table=[[default_catalog, >>>> default_database, CLICKHOUSE_SOURCE_BENCHMARK]], fields=[first_bigint, >>>> second_bigint, first_int, second_int, first_float, second_float, >>>> first_double, second_double, first_string, s >>>> econd_string]) -> Calc(select=[_UTF-16LE'2020-12-12' AS EXPR$0, >>>> first_bigint, second_bigint, first_int, second_int, first_float, >>>> second_float, first_double, second_double, first_string, second_string]) -> >>>> Sink: Sink(table=[default_catal >>>> og.default_database.CLICKHOUSE_SINK_BENCHMARK], fields=[dt, >>>> first_bigint, second_bigint, first_int, second_int, first_float, >>>> second_float, first_double, second_double, first_string, second_string]) >>>> (4/6),5,Flink Task Threads] >>>> at sun.misc.Unsafe.park(Native Method) >>>> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) >>>> at >>>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039) >>>> >>>> at >>>> org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl.take(TaskMailboxImpl.java:146) >>>> >>>> at >>>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:298) >>>> >>>> at >>>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:183) >>>> >>>> at >>>> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:569) >>>> >>>> at >>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:534) >>>> >>>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) >>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) >>>> at java.lang.Thread.run(Thread.java:748) >>>> >>>> >>>> -- >>>> >>>> *Best Regards* >>>> *Jeremy Mei* >>>> >>> >> >> -- >> >> *Best Regards* >> *Jeremy Mei* >> > -- *Best Regards* *Jeremy Mei*