How to use self defined json format when create table from kafka stream?
I want to rigister a table from mysql binlog like this: tEnv.sqlUpdate("CREATE TABLE order(\n" + "order_id BIGINT,\n" + "order_no VARCHAR,\n" + ") WITH (\n" + "'connector.type' = 'kafka',\n" ... + "'update-mode' = 'append',\n" + "'format.type' = 'json',\n" + "'format.derive-schema' = 'true'\n" + ")");using the following log format: { "type" : "update", "timestamp" : 1583373066000, "binlog_filename" : "mysql-bin.000453", "binlog_position" : 923020943, "database" : "wms", "table_name" : "t_pick_order", "table_id" : 131936, "columns" : [ { "id" : 1, "name" : "order_id", "column_type" : -5, "last_value" : 4606458, "value" : 4606458 }, { "id" : 2, "name" : "order_no", "column_type" : 12, "last_value" : "EDBMFSJ1S2003050006628", "value" : "EDBMFSJ1S2003050006628" }] } Surely the format.type' = 'json',\n" will not parse the result as I expected. Is there any method I can implement this? For example, using a self defined format class. Thanks, Lei wangl...@geekplus.com.cn
Kafka sink only support append mode?
I wrote a simple program reading from kafka using sql and sink to kafka. But only 'update-mode' = 'append' is supported for sink table and the query sql must have no group statement. Only append mode is supported for kafka sink? Thanks, Lei
Dose flink-1.10 sql-client support kafka sink?
I have configured source table successfully using the following configuration: - name: out_order type: source update-mode: append schema: - name: out_order_code type: STRING - name: input_date type: BIGINT - name: owner_code type: STRING connector: property-version: 1 type: kafka version: universal topic: out_order startup-mode: latest-offset properties: - key: zookeeper.connect value: 172.19.78.32:2181 - key: bootstrap.servers value: 172.19.78.32:9092 - key: group.id property-version: 1 type: json schema: "ROW(out_order_code STRING,owner_code STRING,input_date BIGINT)" How can i configure a sink table? I haven't found any useful docs for this. Thanks, Lei
Re: Re: Kafka sink only support append mode?
Hi Jark, Thanks for the explanation. The group by statement will result a not append stream. I have just tried a join statement and want to send the result to kafka, it also has the error: AppendStreamTableSink requires that Table has only insert changes Why the join result is not appendable. It confused me. Thanks, Lei From: Jark Wu Date: 2020-03-09 19:25 To: wangl...@geekplus.com.cn CC: user Subject: Re: Kafka sink only support append mode? Hi Lei, Yes. Currently, Kafka sink only supports append mode. Other update mode (e.g. upsert mode / retract mode) is on the agenda. For now, you can customize a KafkaTableSink with implementing UpsertStreamTableSink interface, where you will get a Tuple2 records, and the Boolean represents insert or delete operation. Then you can encode the insert/delete operation into Kafka storage or just ignore the operations. Best, Jark On Mon, 9 Mar 2020 at 19:14, wangl...@geekplus.com.cn wrote: I wrote a simple program reading from kafka using sql and sink to kafka. But only 'update-mode' = 'append' is supported for sink table and the query sql must have no group statement. Only append mode is supported for kafka sink? Thanks, Lei
Re: Re: Dose flink-1.10 sql-client support kafka sink?
Thanks, works now. Seems it is because i added the schema: "ROW(out_order_code STRING,input_date BIGINT, owner_code STRING, status INT)" under format label. From: Arvid Heise Date: 2020-03-10 20:51 To: wangl...@geekplus.com.cn CC: user Subject: Re: Dose flink-1.10 sql-client support kafka sink? Hi Lei, yes Kafka as a sink is supported albeit only for appends (no deletions/updates yet) [1]. An example is a bit hidden in the documentation [2]: tables: - name: MyTableSink type: sink-table update-mode: append connector: property-version: 1 type: kafka version: "0.11" topic: OutputTopic properties: zookeeper.connect: localhost:2181 bootstrap.servers: localhost:9092 group.id: testGroup format: property-version: 1 type: json derive-schema: true schema: - name: rideId data-type: BIGINT - name: lon data-type: FLOAT - name: lat data-type: FLOAT - name: rideTime data-type: TIMESTAMP(3) [1] https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector [2] https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sqlClient.html#detached-sql-queries On Tue, Mar 10, 2020 at 10:51 AM wangl...@geekplus.com.cn wrote: I have configured source table successfully using the following configuration: - name: out_order type: source update-mode: append schema: - name: out_order_code type: STRING - name: input_date type: BIGINT - name: owner_code type: STRING connector: property-version: 1 type: kafka version: universal topic: out_order startup-mode: latest-offset properties: - key: zookeeper.connect value: 172.19.78.32:2181 - key: bootstrap.servers value: 172.19.78.32:9092 - key: group.id property-version: 1 type: json schema: "ROW(out_order_code STRING,owner_code STRING,input_date BIGINT)" How can i configure a sink table? I haven't found any useful docs for this. Thanks, Lei
Re: Re: Dose flink-1.10 sql-client support kafka sink?
Hi Jark, I have tried to use CREATE table DDL First ./bin/sql-client.sh embedded. Then create a table from kafka topic and it tell me table has been created. But when I query with select * from tableName. There's error: [ERROR] Could not execute SQL statement. Reason: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSourceFactory' in the classpath. Perhaps i need some jar to the lib directory. But If i write the table configuration in the sql-client-defaults.yaml file,i can select the result correctly Thanks, Lei From: Jark Wu Date: 2020-03-11 11:13 To: wangl...@geekplus.com.cn CC: Arvid Heise; user Subject: Re: Re: Dose flink-1.10 sql-client support kafka sink? Hi Lei, CREATE TABLE DDL [1][2] is the recommended way to register a table since 1.9. And the yaml way might be deprecated in the future. By using DDL, a registered table can both be used as source and sink. Best, Jark [1]: https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/create.html#create-table [2]: https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector On Tue, 10 Mar 2020 at 21:52, wangl...@geekplus.com.cn wrote: Thanks, works now. Seems it is because i added the schema: "ROW(out_order_code STRING,input_date BIGINT, owner_code STRING, status INT)" under format label. From: Arvid Heise Date: 2020-03-10 20:51 To: wangl...@geekplus.com.cn CC: user Subject: Re: Dose flink-1.10 sql-client support kafka sink? Hi Lei, yes Kafka as a sink is supported albeit only for appends (no deletions/updates yet) [1]. An example is a bit hidden in the documentation [2]: tables: - name: MyTableSink type: sink-table update-mode: append connector: property-version: 1 type: kafka version: "0.11" topic: OutputTopic properties: zookeeper.connect: localhost:2181 bootstrap.servers: localhost:9092 group.id: testGroup format: property-version: 1 type: json derive-schema: true schema: - name: rideId data-type: BIGINT - name: lon data-type: FLOAT - name: lat data-type: FLOAT - name: rideTime data-type: TIMESTAMP(3) [1] https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector [2] https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sqlClient.html#detached-sql-queries On Tue, Mar 10, 2020 at 10:51 AM wangl...@geekplus.com.cn wrote: I have configured source table successfully using the following configuration: - name: out_order type: source update-mode: append schema: - name: out_order_code type: STRING - name: input_date type: BIGINT - name: owner_code type: STRING connector: property-version: 1 type: kafka version: universal topic: out_order startup-mode: latest-offset properties: - key: zookeeper.connect value: 172.19.78.32:2181 - key: bootstrap.servers value: 172.19.78.32:9092 - key: group.id property-version: 1 type: json schema: "ROW(out_order_code STRING,owner_code STRING,input_date BIGINT)" How can i configure a sink table? I haven't found any useful docs for this. Thanks, Lei
Re: Re: Dose flink-1.10 sql-client support kafka sink?
I am using flink-1.10. But I add flink-json-1.9.1.jar and flink-sql-connector-kafka_2.11-1.9.1.jar to lib directory. After change to flink-json-1.10.0.jar, flink-sql-connector-kafka_2.12-1.10.0.jar, it works. But I have no idea why the yaml way works when i use flink-json-1.9.1.jar and flink-sql-connector-kafka_2.11-1.9.1.jar in flink-1.10 environment. Thanks, Lei wangl...@geekplus.com.cn From: wangl...@geekplus.com.cn Date: 2020-03-11 14:51 To: Jark Wu CC: Arvid Heise; user Subject: Re: Re: Dose flink-1.10 sql-client support kafka sink? Hi Jark, I have tried to use CREATE table DDL First ./bin/sql-client.sh embedded. Then create a table from kafka topic and it tell me table has been created. But when I query with select * from tableName. There's error: [ERROR] Could not execute SQL statement. Reason: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSourceFactory' in the classpath. Perhaps i need some jar to the lib directory. But If i write the table configuration in the sql-client-defaults.yaml file,i can select the result correctly Thanks, Lei From: Jark Wu Date: 2020-03-11 11:13 To: wangl...@geekplus.com.cn CC: Arvid Heise; user Subject: Re: Re: Dose flink-1.10 sql-client support kafka sink? Hi Lei, CREATE TABLE DDL [1][2] is the recommended way to register a table since 1.9. And the yaml way might be deprecated in the future. By using DDL, a registered table can both be used as source and sink. Best, Jark [1]: https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/create.html#create-table [2]: https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector On Tue, 10 Mar 2020 at 21:52, wangl...@geekplus.com.cn wrote: Thanks, works now. Seems it is because i added the schema: "ROW(out_order_code STRING,input_date BIGINT, owner_code STRING, status INT)" under format label. From: Arvid Heise Date: 2020-03-10 20:51 To: wangl...@geekplus.com.cn CC: user Subject: Re: Dose flink-1.10 sql-client support kafka sink? Hi Lei, yes Kafka as a sink is supported albeit only for appends (no deletions/updates yet) [1]. An example is a bit hidden in the documentation [2]: tables: - name: MyTableSink type: sink-table update-mode: append connector: property-version: 1 type: kafka version: "0.11" topic: OutputTopic properties: zookeeper.connect: localhost:2181 bootstrap.servers: localhost:9092 group.id: testGroup format: property-version: 1 type: json derive-schema: true schema: - name: rideId data-type: BIGINT - name: lon data-type: FLOAT - name: lat data-type: FLOAT - name: rideTime data-type: TIMESTAMP(3) [1] https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector [2] https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sqlClient.html#detached-sql-queries On Tue, Mar 10, 2020 at 10:51 AM wangl...@geekplus.com.cn wrote: I have configured source table successfully using the following configuration: - name: out_order type: source update-mode: append schema: - name: out_order_code type: STRING - name: input_date type: BIGINT - name: owner_code type: STRING connector: property-version: 1 type: kafka version: universal topic: out_order startup-mode: latest-offset properties: - key: zookeeper.connect value: 172.19.78.32:2181 - key: bootstrap.servers value: 172.19.78.32:9092 - key: group.id property-version: 1 type: json schema: "ROW(out_order_code STRING,owner_code STRING,input_date BIGINT)" How can i configure a sink table? I haven't found any useful docs for this. Thanks, Lei
json 中 timestamp 类型在json中怎样写才能被 flink sql 识别
用 sql-client create 了一个 kafka table: CREATE TABLE order_status ( out_order_code VARCHAR, intput_date TIMESTAMP(3), owner_code VARCHAR, status INT ) WITH ( 'connector.type' = 'kafka',. 'format.type' = 'json', 'format.derive-schema' = 'true' ) 然后往 kafka 这个 topic 发送消息:{"out_order_code":"MAMICK2020031048","input_date":"2020-03-11T13:00:00.123Z","owner_code":"WDTLEN04","status":90} input_date 在 sql-clinet 端始终是 NULL. 我把 发送的 input_date 改成 1583828700240 "2020-03-11 13:00:00" "2020-03-11 13:00:00.000" 也都不行。 这个 TIMESTAMP(3)在JSON 中应该写成什么样子呢? 谢谢, 王磊 wangl...@geekplus.com.cn
回复: json 中 timestamp 类型在json中怎样写才能被 flink sql 识别
Sorry i sent the Chinese written email to user@ Let me translate it to English. I create a table using sql-client from kafka topic: CREATE TABLE order_status ( out_order_code VARCHAR, intput_date TIMESTAMP(3), owner_code VARCHAR, status INT ) WITH ( 'connector.type' = 'kafka',. 'format.type' = 'json', 'format.derive-schema' = 'true' ) Then I send message to the topic: {"out_order_code":"MAMICK2020031048","input_date":"2020-03-11T13:00:00.123Z","owner_code":"WDTLEN04","status":90} But the input_date is not recognized on the sql-client and is null, even i tried 1583828700240 "2020-03-11 13:00:00" "2020-03-11 13:00:00.000" How should the timestamp(3) look like in the json message? Thanks, Lei wangl...@geekplus.com.cn 发件人: wangl...@geekplus.com.cn 发送时间: 2020-03-11 17:41 收件人: user 主题: json 中 timestamp 类型在json中怎样写才能被 flink sql 识别 用 sql-client create 了一个 kafka table: CREATE TABLE order_status ( out_order_code VARCHAR, intput_date TIMESTAMP(3), owner_code VARCHAR, status INT ) WITH ( 'connector.type' = 'kafka',. 'format.type' = 'json', 'format.derive-schema' = 'true' ) 然后往 kafka 这个 topic 发送消息:{"out_order_code":"MAMICK2020031048","input_date":"2020-03-11T13:00:00.123Z","owner_code":"WDTLEN04","status":90} input_date 在 sql-clinet 端始终是 NULL. 我把 发送的 input_date 改成 1583828700240 "2020-03-11 13:00:00" "2020-03-11 13:00:00.000" 也都不行。 这个 TIMESTAMP(3)在JSON 中应该写成什么样子呢? 谢谢, 王磊 wangl...@geekplus.com.cn
Re: Re: json 中 timestamp 类型在json中怎样写才能被 flink sql 识别
Thanks Jark, No word to express my '囧'. wangl...@geekplus.com.cn Sender: Jark Wu Send Time: 2020-03-11 18:32 Receiver: wangl...@geekplus.com.cn cc: user; user-zh Subject: Re: json 中 timestamp 类型在json中怎样写才能被 flink sql 识别 Hi Lei, The "2020-03-11T13:00:00.123Z" format is correct, but you defined the wrong field name in the DDL. It should be "input_date", not "intput_date". Best, Jark On Wed, 11 Mar 2020 at 17:52, wangl...@geekplus.com.cn wrote: Sorry i sent the Chinese written email to user@ Let me translate it to English. I create a table using sql-client from kafka topic: CREATE TABLE order_status ( out_order_code VARCHAR, intput_date TIMESTAMP(3), owner_code VARCHAR, status INT ) WITH ( 'connector.type' = 'kafka',. 'format.type' = 'json', 'format.derive-schema' = 'true' ) Then I send message to the topic: {"out_order_code":"MAMICK2020031048","input_date":"2020-03-11T13:00:00.123Z","owner_code":"WDTLEN04","status":90} But the input_date is not recognized on the sql-client and is null, even i tried 1583828700240 "2020-03-11 13:00:00" "2020-03-11 13:00:00.000" How should the timestamp(3) look like in the json message? Thanks, Lei wangl...@geekplus.com.cn 发件人: wangl...@geekplus.com.cn 发送时间: 2020-03-11 17:41 收件人: user 主题: json 中 timestamp 类型在json中怎样写才能被 flink sql 识别 用 sql-client create 了一个 kafka table: CREATE TABLE order_status ( out_order_code VARCHAR, intput_date TIMESTAMP(3), owner_code VARCHAR, status INT ) WITH ( 'connector.type' = 'kafka',. 'format.type' = 'json', 'format.derive-schema' = 'true' ) 然后往 kafka 这个 topic 发送消息:{"out_order_code":"MAMICK2020031048","input_date":"2020-03-11T13:00:00.123Z","owner_code":"WDTLEN04","status":90} input_date 在 sql-clinet 端始终是 NULL. 我把 发送的 input_date 改成 1583828700240 "2020-03-11 13:00:00" "2020-03-11 13:00:00.000" 也都不行。 这个 TIMESTAMP(3)在JSON 中应该写成什么样子呢? 谢谢, 王磊 wangl...@geekplus.com.cn
How to set stateBackEnd in flink sql program?
EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); TableEnvironment tEnv = TableEnvironment.create(settings); tEnv.sqlUpdate()... Is there a way i can set stateBackEnd like normal streaming program as folloing:StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStateBackend(new RocksDBStateBackend(args[0], true)); wangl...@geekplus.com.cn
Re: Re: How to set stateBackEnd in flink sql program?
Hi Jingsong, So i can write the code as following? EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); TableEnvironment tEnv = TableEnvironment.create(settings); tEnv.getConfig().getConfiguration().setString("state.backend","rocksdb");eEnv.sqlUpdate(..) Thanks, Lei wangl...@geekplus.com.cn From: Jingsong Li Date: 2020-03-12 11:32 To: wangl...@geekplus.com.cn CC: user Subject: Re: How to set stateBackEnd in flink sql program? Hi wanglei, If you are using Flink 1.10, you can set "state.backend=rocksdb" to "TableConfig.getConfiguration". And you can find related config options here[1]. [1] https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/config.html Jingsong Lee On Thu, Mar 12, 2020 at 11:15 AM wangl...@geekplus.com.cn wrote: EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); TableEnvironment tEnv = TableEnvironment.create(settings); tEnv.sqlUpdate()... Is there a way i can set stateBackEnd like normal streaming program as folloing:StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStateBackend(new RocksDBStateBackend(args[0], true)); wangl...@geekplus.com.cn -- Best, Jingsong Lee
dimention table join not work under sql-client fink-1.10.0
Kafka source table: CREATE TABLE out_order ( out_order_code VARCHAR, intput_date BIGINT, owner_code VARCHAR ) WITH ( 'connector.type' = 'kafka',MySQL dimention table: CREATE TABLE dim_owner ( owner_code VARCHAR, owner_name VARCHAR ) WITH ( 'connector.type' = 'jdbc',When i submit the sql: SELECT o.out_order_code, o.input_date, o.owner_code, d.owner_name FROM out_order as o JOIN dim_owner FOR SYSTEM_TIME AS OF PROCTIME() as d ON o.owner_code = d.owner_code; There's error: [ERROR] Could not execute SQL statement. Reason: org.apache.flink.table.api.TableException: Temporal table join currently only supports 'FOR SYSTEM_TIME AS OF' left table's proctime field, doesn't support 'PROCTIME()' Thanks, Lei wangl...@geekplus.com.cn
Re: Re: dimention table join not work under sql-client fink-1.10.0
Thanks, works now. Seems the open source version is different from alibaba cloud: https://www.alibabacloud.com/help/doc-detail/62506.htm wangl...@geekplus.com.cn From: Zhenghua Gao Date: 2020-03-13 12:12 To: wangl...@geekplus.com.cn CC: user Subject: Re: dimention table join not work under sql-client fink-1.10.0 We don't support 'PROCTIME()' in a temporal table join. Please use a left table's proctime field. [1] [1] https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/joins.html#usage-1 Best Regards, Zhenghua Gao On Fri, Mar 13, 2020 at 11:57 AM wangl...@geekplus.com.cn wrote: Kafka source table: CREATE TABLE out_order ( out_order_code VARCHAR, intput_date BIGINT, owner_code VARCHAR ) WITH ( 'connector.type' = 'kafka',MySQL dimention table: CREATE TABLE dim_owner ( owner_code VARCHAR, owner_name VARCHAR ) WITH ( 'connector.type' = 'jdbc',When i submit the sql: SELECT o.out_order_code, o.input_date, o.owner_code, d.owner_name FROM out_order as o JOIN dim_owner FOR SYSTEM_TIME AS OF PROCTIME() as d ON o.owner_code = d.owner_code; There's error: [ERROR] Could not execute SQL statement. Reason: org.apache.flink.table.api.TableException: Temporal table join currently only supports 'FOR SYSTEM_TIME AS OF' left table's proctime field, doesn't support 'PROCTIME()' Thanks, Lei wangl...@geekplus.com.cn
flink sql-client read hive orc table no result
Hive table store as orc format: CREATE TABLE `robot_tr`(`robot_id` int, `robot_time` bigint, `linear_velocity` double, `track_side_error` int) partitioned by (warehouseid STRING) clustered by (robot_id) into 3 buckets stored as orc tblproperties("transactional"='true'); Under hive client, insert into one record and then select there will be the result to the console. But under flink sql-client, when select * from robot_tr, there's no result? Any insight on this? Thanks, Lei wangl...@geekplus.com.cn
Re: Re: flink sql-client read hive orc table no result
Tried again. Even i remove the "clustered by (robot_id) into 3 buckets" statement, no result from flink sql-client Thanks, Lei wangl...@geekplus.com.cn From: Kurt Young Date: 2020-03-18 17:41 To: wangl...@geekplus.com.cn; lirui CC: user Subject: Re: flink sql-client read hive orc table no result My guess is we haven't support hive bucket table yet. cc Rui Li for confirmation. Best, Kurt On Wed, Mar 18, 2020 at 5:19 PM wangl...@geekplus.com.cn wrote: Hive table store as orc format: CREATE TABLE `robot_tr`(`robot_id` int, `robot_time` bigint, `linear_velocity` double, `track_side_error` int) partitioned by (warehouseid STRING) clustered by (robot_id) into 3 buckets stored as orc tblproperties("transactional"='true'); Under hive client, insert into one record and then select there will be the result to the console. But under flink sql-client, when select * from robot_tr, there's no result? Any insight on this? Thanks, Lei wangl...@geekplus.com.cn
Re: Re: flink sql-client read hive orc table no result
Seems the root cause is "transactional"='true'. After remove this,the table can be queryed from flink sql-client,even i add "clustered by (robot_id) into 3 buckets" again. Thanks, Lei wangl...@geekplus.com.cn From: Kurt Young Date: 2020-03-18 18:04 To: wangl...@geekplus.com.cn CC: lirui; user Subject: Re: Re: flink sql-client read hive orc table no result also try to remove "transactional"='true'? Best, Kurt On Wed, Mar 18, 2020 at 5:54 PM wangl...@geekplus.com.cn wrote: Tried again. Even i remove the "clustered by (robot_id) into 3 buckets" statement, no result from flink sql-client Thanks, Lei wangl...@geekplus.com.cn From: Kurt Young Date: 2020-03-18 17:41 To: wangl...@geekplus.com.cn; lirui CC: user Subject: Re: flink sql-client read hive orc table no result My guess is we haven't support hive bucket table yet. cc Rui Li for confirmation. Best, Kurt On Wed, Mar 18, 2020 at 5:19 PM wangl...@geekplus.com.cn wrote: Hive table store as orc format: CREATE TABLE `robot_tr`(`robot_id` int, `robot_time` bigint, `linear_velocity` double, `track_side_error` int) partitioned by (warehouseid STRING) clustered by (robot_id) into 3 buckets stored as orc tblproperties("transactional"='true'); Under hive client, insert into one record and then select there will be the result to the console. But under flink sql-client, when select * from robot_tr, there's no result? Any insight on this? Thanks, Lei wangl...@geekplus.com.cn
Streaming kafka data sink to hive
We have many app logs on our app server and want to parse the logs to structed table format and then sink to hive. Seems it is good to use batch mode. The app log is hourly compressed and it is convenience to do partitioning. We want to use streaming mode. Tail the app logs to Kafka, then use flink to read kafka topic and then sink to Hive. I have several questions. 1 Is there any flink-hive-connector that i can use to write to hive streamingly? 2 Since HDFS is not friendly to frequently append and hive's data is stored to hdfs, is it OK if the throughput is high? Thanks, Lei wangl...@geekplus.com.cn
Where can i find MySQL retract stream table sink java soure code?
Create one table with kafka, another table with MySQL using flinksql. Write a sql to read from kafka and write to MySQL. INSERT INTO mysqlTable SELECT status, COUNT(order_no) AS num FROM (SELECT order_no, LAST_VALUE(status) AS status FROM kafkaTable GROUP BY order_no) GROUP BY statusI think this is a retract stream. But where can i find the java source code about MySQL retract table sink? Thanks,Lei wangl...@geekplus.com.cn
Re: Re: Where can i find MySQL retract stream table sink java soure code?
Thanks Jingsong. When executing this sql, the mysql table record can be deleted. So i guess it is a retract stream. I want to know the exactly java code it is generated and have a look at it. Thanks, Lei wangl...@geekplus.com.cn Sender: Jingsong Li Send Time: 2020-03-25 11:14 Receiver: wangl...@geekplus.com.cn cc: user Subject: Re: Where can i find MySQL retract stream table sink java soure code? Hi, This can be a upsert stream [1] Best, Jingsong Lee On Wed, Mar 25, 2020 at 11:12 AM wangl...@geekplus.com.cn wrote: Create one table with kafka, another table with MySQL using flinksql. Write a sql to read from kafka and write to MySQL. INSERT INTO mysqlTable SELECT status, COUNT(order_no) AS num FROM (SELECT order_no, LAST_VALUE(status) AS status FROM kafkaTable GROUP BY order_no) GROUP BY statusI think this is a retract stream. But where can i find the java source code about MySQL retract table sink? Thanks,Lei wangl...@geekplus.com.cn -- Best, Jingsong Lee
Re: Re: Where can i find MySQL retract stream table sink java soure code?
Seems it is here: https://github.com/apache/flink/tree/master/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc There's no JDBCRetractTableSink, only append and upsert. I am confused why the MySQL record can be deleted. Thanks, Lei wangl...@geekplus.com.cn Sender: wangl...@geekplus.com.cn Send Time: 2020-03-25 11:25 Receiver: Jingsong Li cc: user Subject: Re: Re: Where can i find MySQL retract stream table sink java soure code? Thanks Jingsong. When executing this sql, the mysql table record can be deleted. So i guess it is a retract stream. I want to know the exactly java code it is generated and have a look at it. Thanks, Lei wangl...@geekplus.com.cn Sender: Jingsong Li Send Time: 2020-03-25 11:14 Receiver: wangl...@geekplus.com.cn cc: user Subject: Re: Where can i find MySQL retract stream table sink java soure code? Hi, This can be a upsert stream [1] Best, Jingsong Lee On Wed, Mar 25, 2020 at 11:12 AM wangl...@geekplus.com.cn wrote: Create one table with kafka, another table with MySQL using flinksql. Write a sql to read from kafka and write to MySQL. INSERT INTO mysqlTable SELECT status, COUNT(order_no) AS num FROM (SELECT order_no, LAST_VALUE(status) AS status FROM kafkaTable GROUP BY order_no) GROUP BY statusI think this is a retract stream. But where can i find the java source code about MySQL retract table sink? Thanks,Lei wangl...@geekplus.com.cn -- Best, Jingsong Lee
Re: Re: Where can i find MySQL retract stream table sink java soure code?
Thanks Jingsong. So JDBCTableSink now suport append and upsert mode. Retract mode not available yet. It is right? Thanks, Lei wangl...@geekplus.com.cn Sender: Jingsong Li Send Time: 2020-03-25 11:39 Receiver: wangl...@geekplus.com.cn cc: user Subject: Re: Where can i find MySQL retract stream table sink java soure code? Hi, Maybe you have some misunderstanding to upsert sink. You can take a look to [1], it can deal with "delete" records. [1]https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/dynamic_tables.html#table-to-stream-conversion Best, Jingsong Lee On Wed, Mar 25, 2020 at 11:37 AM Jingsong Li wrote: Hi, This can be a upsert stream [1], and JDBC has upsert sink now [2]. [1]https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/dynamic_tables.html#table-to-stream-conversion [2]https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connect.html#jdbc-connector Best, Jingsong Lee On Wed, Mar 25, 2020 at 11:14 AM Jingsong Li wrote: Hi, This can be a upsert stream [1] Best, Jingsong Lee On Wed, Mar 25, 2020 at 11:12 AM wangl...@geekplus.com.cn wrote: Create one table with kafka, another table with MySQL using flinksql. Write a sql to read from kafka and write to MySQL. INSERT INTO mysqlTable SELECT status, COUNT(order_no) AS num FROM (SELECT order_no, LAST_VALUE(status) AS status FROM kafkaTable GROUP BY order_no) GROUP BY statusI think this is a retract stream. But where can i find the java source code about MySQL retract table sink? Thanks,Lei wangl...@geekplus.com.cn -- Best, Jingsong Lee -- Best, Jingsong Lee -- Best, Jingsong Lee
fink sql client not able to read parquet format table
Hive table stored as parquet. Under hive client: hive> select robotid from robotparquet limit 2; OK 1291097 1291044 But under flink sql-client the result is 0 Flink SQL> select robotid from robotparquet limit 2; robotid 0 0 Any insight on this? Thanks, Lei wangl...@geekplus.com.cn
Re: Re: fink sql client not able to read parquet format table
I am using the newest 1.10 blink planner. Perhaps it is because of the method i used to write the parquet file. Receive kafka message, transform each message to a Java class Object, write the Object to HDFS using StreamingFileSink, add the HDFS path as a partition of the hive table No matter what the order of the field description in hive ddl statement, the hive client will work, as long as the field name is the same with Java Object field name. But flink sql client will not work. DataStream sourceRobot = source.map( x->transform(x)); final StreamingFileSink sink; sink = StreamingFileSink .forBulkFormat(new Path("hdfs://172.19.78.38:8020/user/root/wanglei/robotdata/parquet"), ParquetAvroWriters.forReflectRecord(RobotUploadData0101.class)) For example RobotUploadData0101 has two fields: robotId int, robotTime long CREATE TABLE `robotparquet`( `robotid` int, `robottime` bigint ) and CREATE TABLE `robotparquet`( `robottime` bigint, `robotid` int) is the same for hive client, but is different for flink-sql client It is an expected behavior? Thanks, Lei wangl...@geekplus.com.cn From: Jark Wu Date: 2020-04-09 14:48 To: wangl...@geekplus.com.cn; Jingsong Li; lirui CC: user Subject: Re: fink sql client not able to read parquet format table Hi Lei, Are you using the newest 1.10 blink planner? I'm not familiar with Hive and parquet, but I know @Jingsong Li and @li...@apache.org are experts on this. Maybe they can help on this question. Best, Jark On Tue, 7 Apr 2020 at 16:17, wangl...@geekplus.com.cn wrote: Hive table stored as parquet. Under hive client: hive> select robotid from robotparquet limit 2; OK 1291097 1291044 But under flink sql-client the result is 0 Flink SQL> select robotid from robotparquet limit 2; robotid 0 0 Any insight on this? Thanks, Lei wangl...@geekplus.com.cn
Re: Re: fink sql client not able to read parquet format table
I am using Hive 3.1.1 The table has many fields, each field is corresponded to a feild in the RobotUploadData0101 class. CREATE TABLE `robotparquet`(`robotid` int, `framecount` int, `robottime` bigint, `robotpathmode` int, `movingmode` int, `submovingmode` int, `xlocation` int, `ylocation` int, `robotradangle` int, `velocity` int, `acceleration` int, `angularvelocity` int, `angularacceleration` int, `literangle` int, `shelfangle` int, `onloadshelfid` int, `rcvdinstr` int, `sensordist` int, `pathstate` int, `powerpresent` int, `neednewpath` int, `pathelenum` int, `taskstate` int, `receivedtaskid` int, `receivedcommcount` int, `receiveddispatchinstr` int, `receiveddispatchcount` int, `subtaskmode` int, `versiontype` int, `version` int, `liftheight` int, `codecheckstatus` int, `cameraworkmode` int, `backrimstate` int, `frontrimstate` int, `pathselectstate` int, `codemisscount` int, `groundcameraresult` int, `shelfcameraresult` int, `softwarerespondframe` int, `paramstate` int, `pilotlamp` int, `codecount` int, `dist2waitpoint` int, `targetdistance` int, `obstaclecount` int, `obstacleframe` int, `cellcodex` int, `cellcodey` int, `cellangle` int, `shelfqrcode` int, `shelfqrangle` int, `shelfqrx` int, `shelfqry` int, `trackthetaerror` int, `tracksideerror` int, `trackfuseerror` int, `lifterangleerror` int, `lifterheighterror` int, `linearcmdspeed` int, `angluarcmdspeed` int, `liftercmdspeed` int, `rotatorcmdspeed` int) PARTITIONED BY (`hour` string) STORED AS parquet; Thanks, Lei wangl...@geekplus.com.cn From: Jingsong Li Date: 2020-04-09 21:45 To: wangl...@geekplus.com.cn CC: Jark Wu; lirui; user Subject: Re: Re: fink sql client not able to read parquet format table Hi lei, Which hive version did you use? Can you share the complete hive DDL? Best, Jingsong Lee On Thu, Apr 9, 2020 at 7:15 PM wangl...@geekplus.com.cn wrote: I am using the newest 1.10 blink planner. Perhaps it is because of the method i used to write the parquet file. Receive kafka message, transform each message to a Java class Object, write the Object to HDFS using StreamingFileSink, add the HDFS path as a partition of the hive table No matter what the order of the field description in hive ddl statement, the hive client will work, as long as the field name is the same with Java Object field name. But flink sql client will not work. DataStream sourceRobot = source.map( x->transform(x)); final StreamingFileSink sink; sink = StreamingFileSink .forBulkFormat(new Path("hdfs://172.19.78.38:8020/user/root/wanglei/robotdata/parquet"), ParquetAvroWriters.forReflectRecord(RobotUploadData0101.class)) For example RobotUploadData0101 has two fields: robotId int, robotTime long CREATE TABLE `robotparquet`( `robotid` int, `robottime` bigint ) and CREATE TABLE `robotparquet`( `robottime` bigint, `robotid` int) is the same for hive client, but is different for flink-sql client It is an expected behavior? Thanks, Lei wangl...@geekplus.com.cn From: Jark Wu Date: 2020-04-09 14:48 To: wangl...@geekplus.com.cn; Jingsong Li; lirui CC: user Subject: Re: fink sql client not able to read parquet format table Hi Lei, Are you using the newest 1.10 blink planner? I'm not familiar with Hive and parquet, but I know @Jingsong Li and @li...@apache.org are experts on this. Maybe they can help on this question. Best, Jark On Tue, 7 Apr 2020 at 16:17, wangl...@geekplus.com.cn wrote: Hive table stored as parquet. Under hive client: hive> select robotid from robotparquet limit 2; OK 1291097 1291044 But under flink sql-client the result is 0 Flink SQL> select robotid from robotparquet limit 2; robotid 0 0 Any insight on this? Thanks, Lei wangl...@geekplus.com.cn -- Best, Jingsong Lee
Re: Re: fink sql client not able to read parquet format table
https://issues.apache.org/jira/browse/FLINK-17086 It is my first time to create a flink jira issue. Just point it out and correct it if I write something wrong. Thanks, Lei wangl...@geekplus.com.cn From: Jingsong Li Date: 2020-04-10 11:03 To: wangl...@geekplus.com.cn CC: Jark Wu; lirui; user Subject: Re: Re: fink sql client not able to read parquet format table Hi lei, I think the reason is that our `HiveMapredSplitReader` not supports name mapping reading for parquet format. Can you create a JIRA for tracking this? Best, Jingsong Lee On Fri, Apr 10, 2020 at 9:42 AM wangl...@geekplus.com.cn wrote: I am using Hive 3.1.1 The table has many fields, each field is corresponded to a feild in the RobotUploadData0101 class. CREATE TABLE `robotparquet`(`robotid` int, `framecount` int, `robottime` bigint, `robotpathmode` int, `movingmode` int, `submovingmode` int, `xlocation` int, `ylocation` int, `robotradangle` int, `velocity` int, `acceleration` int, `angularvelocity` int, `angularacceleration` int, `literangle` int, `shelfangle` int, `onloadshelfid` int, `rcvdinstr` int, `sensordist` int, `pathstate` int, `powerpresent` int, `neednewpath` int, `pathelenum` int, `taskstate` int, `receivedtaskid` int, `receivedcommcount` int, `receiveddispatchinstr` int, `receiveddispatchcount` int, `subtaskmode` int, `versiontype` int, `version` int, `liftheight` int, `codecheckstatus` int, `cameraworkmode` int, `backrimstate` int, `frontrimstate` int, `pathselectstate` int, `codemisscount` int, `groundcameraresult` int, `shelfcameraresult` int, `softwarerespondframe` int, `paramstate` int, `pilotlamp` int, `codecount` int, `dist2waitpoint` int, `targetdistance` int, `obstaclecount` int, `obstacleframe` int, `cellcodex` int, `cellcodey` int, `cellangle` int, `shelfqrcode` int, `shelfqrangle` int, `shelfqrx` int, `shelfqry` int, `trackthetaerror` int, `tracksideerror` int, `trackfuseerror` int, `lifterangleerror` int, `lifterheighterror` int, `linearcmdspeed` int, `angluarcmdspeed` int, `liftercmdspeed` int, `rotatorcmdspeed` int) PARTITIONED BY (`hour` string) STORED AS parquet; Thanks, Lei wangl...@geekplus.com.cn From: Jingsong Li Date: 2020-04-09 21:45 To: wangl...@geekplus.com.cn CC: Jark Wu; lirui; user Subject: Re: Re: fink sql client not able to read parquet format table Hi lei, Which hive version did you use? Can you share the complete hive DDL? Best, Jingsong Lee On Thu, Apr 9, 2020 at 7:15 PM wangl...@geekplus.com.cn wrote: I am using the newest 1.10 blink planner. Perhaps it is because of the method i used to write the parquet file. Receive kafka message, transform each message to a Java class Object, write the Object to HDFS using StreamingFileSink, add the HDFS path as a partition of the hive table No matter what the order of the field description in hive ddl statement, the hive client will work, as long as the field name is the same with Java Object field name. But flink sql client will not work. DataStream sourceRobot = source.map( x->transform(x)); final StreamingFileSink sink; sink = StreamingFileSink .forBulkFormat(new Path("hdfs://172.19.78.38:8020/user/root/wanglei/robotdata/parquet"), ParquetAvroWriters.forReflectRecord(RobotUploadData0101.class)) For example RobotUploadData0101 has two fields: robotId int, robotTime long CREATE TABLE `robotparquet`( `robotid` int, `robottime` bigint ) and CREATE TABLE `robotparquet`( `robottime` bigint, `robotid` int) is the same for hive client, but is different for flink-sql client It is an expected behavior? Thanks, Lei wangl...@geekplus.com.cn From: Jark Wu Date: 2020-04-09 14:48 To: wangl...@geekplus.com.cn; Jingsong Li; lirui CC: user Subject: Re: fink sql client not able to read parquet format table Hi Lei, Are you using the newest 1.10 blink planner? I'm not familiar with Hive and parquet, but I know @Jingsong Li and @li...@apache.org are experts on this. Maybe they can help on this question. Best, Jark On Tue, 7 Apr 2020 at 16:17, wangl...@geekplus.com.cn wrote: Hive table stored as parquet. Under hive client: hive> select robotid from robotparquet limit 2; OK 1291097 1291044 But under flink sql-client the result is 0 Flink SQL> select robotid from robotparquet limit 2; robotid 0 0 Any insight on this? Thanks, Lei wangl...@geekplus.com.cn -- Best, Jingsong Lee -- Best, Jingsong Lee
FlinkSQL query error when specify json-schema.
CREATE TABLE user_log( `id` INT, `timestamp` BIGINT ) WITH ( 'connector.type' = 'kafka', 'connector.version' = 'universal', 'connector.topic' = 'wanglei_jsontest', 'connector.startup-mode' = 'latest-offset', 'connector.properties.0.key' = 'zookeeper.connect', 'connector.properties.0.value' = '172.19.78.32:2181', 'connector.properties.1.key' = 'bootstrap.servers', 'connector.properties.1.value' = '172.19.78.32:9092', 'update-mode' = 'append', 'format.type' = 'json', 'format.json-schema' = '{ "type": "object", "properties": { "id": {"type": "integer"}, "timestamp": {"type": "number"} } }' ); Then select * from user_log; org.apache.flink.table.api.ValidationException: Type INT of table field 'id' does not match with the physical type LEGACY('DECIMAL', 'DECIMAL') of the 'id' field of the TableSource return type. Seems the specified type "integer", "number" can not be mapped to INT, BIGINT How can i solve this problem? Thanks, Lei wangl...@geekplus.com.cn
Re: Re: FlinkSQL query error when specify json-schema.
Thanks, I have tried. 'format.derive-schema' = 'true' will work. But if i insist to use format.json-schema, the CREATE TABLE must be writtten as: `id` DECIMAL(38,18), `timestamp` DECIMAL(38,18) wangl...@geekplus.com.cn From: Benchao Li Date: 2020-04-16 16:56 To: wangl...@geekplus.com.cn CC: user Subject: Re: FlinkSQL query error when specify json-schema. Hi wanglei, You don't need to specify 'format.json-schema', the format can derive schema from the DDL. Your exception above means the schema in 'format.json-schema' and DDL are not match. wangl...@geekplus.com.cn 于2020年4月16日周四 下午4:21写道: CREATE TABLE user_log( `id` INT, `timestamp` BIGINT ) WITH ( 'connector.type' = 'kafka', 'connector.version' = 'universal', 'connector.topic' = 'wanglei_jsontest', 'connector.startup-mode' = 'latest-offset', 'connector.properties.0.key' = 'zookeeper.connect', 'connector.properties.0.value' = '172.19.78.32:2181', 'connector.properties.1.key' = 'bootstrap.servers', 'connector.properties.1.value' = '172.19.78.32:9092', 'update-mode' = 'append', 'format.type' = 'json', 'format.json-schema' = '{ "type": "object", "properties": { "id": {"type": "integer"}, "timestamp": {"type": "number"} } }' ); Then select * from user_log; org.apache.flink.table.api.ValidationException: Type INT of table field 'id' does not match with the physical type LEGACY('DECIMAL', 'DECIMAL') of the 'id' field of the TableSource return type. Seems the specified type "integer", "number" can not be mapped to INT, BIGINT How can i solve this problem? Thanks, Lei wangl...@geekplus.com.cn -- Benchao Li School of Electronics Engineering and Computer Science, Peking University Tel:+86-15650713730 Email: libenc...@gmail.com; libenc...@pku.edu.cn
FlinkSQL Upsert/Retraction 写入 MySQL 的问题
INSERT INTO mysql_sink SELECT f1, count(*) FROM kafka_src GROUP BY f1 每从 kafka 过来一条新的记录,会生成两条记录 Tuple2, 旧的被删除,新的会添加上。 但这个 Sink 到底是用到了 UpsertStream 还是 RetractStream 呢,怎么判断是 UpsertStream 还是 RetractStream 呢? 我看 https://github.com/apache/flink/tree/master/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc 没有 Retract 方式 实际上使用了 JDBCUpsertTableSink.java 的代码写入 MySQL 吗? 如若不带 group by 直接: INSERT INTO mysql_sink SELECT f1, f2 FROM kafka_src 主键冲突写入 mysql 是会出错的,怎么可以用 Upsert 的方式直接覆盖呢? wangl...@geekplus.com.cn
1.11 snapshot: Name or service not knownname localhost and taskMgr not started
1 Clone the 1.11 snapshot repository 2 Build it on windowns 3 Scp the flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT dir to a CentOS server 4 ./bin/start-cluster.sh on the CentOS server There's message: Starting cluster. Starting standalonesession daemon on host test_3.6. : Name or service not knownname localhost Only the jobMgr is started. The taskMgr not start Any insight on this? Thanks Lei wangl...@geekplus.com.cn
Re: Re: 1.11 snapshot: Name or service not knownname localhost and taskMgr not started
localhost is available on my CentOS machine. I can start the cluster(including taskMgr) with start-cluster.sh when using flink-1.10 release tgz package change the slave file localhost to 127.0.0.1 can resolve the problem. But it request the password of the host. Not change slave file, i can first start-cluster(only job manager started). And then “taskmanager.sh start“ to start the taskMgr. No password needed. Thanks, Lei wangl...@geekplus.com.cn Sender: Xintong Song Send Time: 2020-04-30 12:13 Receiver: wangl...@geekplus.com.cn cc: user Subject: Re: 1.11 snapshot: Name or service not knownname localhost and taskMgr not started Hi Lei, Could you check whether the hostname 'localhost' is available on your CentOS machine? This is usually defined in "/etc/hosts". You can also try to modify the slaves file, replacing 'localhost' with '127.0.0.1'. The path is: /conf/slaves Thank you~ Xintong Song On Thu, Apr 30, 2020 at 11:38 AM wangl...@geekplus.com.cn wrote: 1 Clone the 1.11 snapshot repository 2 Build it on windowns 3 Scp the flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT dir to a CentOS server 4 ./bin/start-cluster.sh on the CentOS server There's message: Starting cluster. Starting standalonesession daemon on host test_3.6. : Name or service not knownname localhost Only the jobMgr is started. The taskMgr not start Any insight on this? Thanks Lei wangl...@geekplus.com.cn
flink how to access remote hdfs using namenode nameservice
According to https://ci.apache.org/projects/flink/flink-docs-stable/ops/jobmanager_high_availability.html I am deploying standalone cluster with jobmanager HA and need the hdfs address: high-availability.storageDir: hdfs:///flink/recovery My hadoop is a remote cluster. I can write it as hdfs://active-namenode-ip:8020. But this way lost namenode HA Is there's any method that I can config it as hdfs://name-service:8020 Thanks, Lei wangl...@geekplus.com.cn
java.lang.AbstractMethodError when implementing KafkaSerializationSchema
public class MyKafkaSerializationSchema implements KafkaSerializationSchema> { @Override public ProducerRecord serialize(Tuple2 o, @Nullable Long aLong) { ProducerRecord record = new ProducerRecord<>(o.f0, o.f1.getBytes(StandardCharsets.UTF_8)); return record; } } FlinkKafkaProducer> producer = new FlinkKafkaProducer>( "default", new MyKafkaSerializationSchema(), prop2,Semantic.EXACTLY_ONCE); But there's error when runnng: java.lang.AbstractMethodError: com.geekplus.flinketl.schema.MyKafkaSerializationSchema.serialize(Ljava/lang/Object;Ljava/lang/Long;)Lorg/apache/flink/kafka/shaded/org/apache/kafka/clients/producer/ProducerRecord; Any suggestion on this? Thanks, Lei wangl...@geekplus.com.cn
回复: Re: java.lang.AbstractMethodError when implementing KafkaSerializationSchema
It is because the jar conflict and i have fixed it. I put flink-connector-kafka_2.11-1.10.0.jar in the flink lib directory. Also in my project pom file has the dependency flink-connector-kafka and builded as a fat jar Thanks, Lei wangl...@geekplus.com.cn 发件人: Leonard Xu 发送时间: 2020-05-26 15:47 收件人: Aljoscha Krettek 抄送: user 主题: Re: java.lang.AbstractMethodError when implementing KafkaSerializationSchema Hi,wanglei I think Aljoscha is wright. Could you post your denpendency list? Dependency flink-connector-kafka is used in dataStream Application which you should use, dependency flink-sql-connector-kafka is used in Table API & SQL Application. We should only add one of them because the two dependency will conflict. Best, Leonard Xu 在 2020年5月26日,15:02,Aljoscha Krettek 写道: I think what might be happening is that you're mixing dependencies from the flink-sql-connector-kafka and the proper flink-connector-kafka that should be used with the DataStream API. Could that be the case? Best, Aljoscha On 25.05.20 19:18, Piotr Nowojski wrote: Hi, It would be helpful if you could provide full stack trace, what Flink version and which Kafka connector version are you using? It sounds like either a dependency convergence error (mixing Kafka dependencies/various versions of flink-connector-kafka inside a single job/jar) or some shading issue. Can you check your project for such issues (`mvn dependency:tree` command [1]). Also what’s a bit suspicious for me is the return type: Lorg/apache/flink/kafka/shaded/org/apache/kafka/clients/producer/ProducerRecord; I’m not sure, but I was not aware that we are shading Kafka dependency in our connectors? Are you manually shading something? Piotrek [1] https://maven.apache.org/plugins/maven-dependency-plugin/examples/resolving-conflicts-using-the-dependency-tree.html <https://maven.apache.org/plugins/maven-dependency-plugin/examples/resolving-conflicts-using-the-dependency-tree.html> On 22 May 2020, at 15:34, wangl...@geekplus.com.cn wrote: public class MyKafkaSerializationSchema implements KafkaSerializationSchema> { @Override public ProducerRecord serialize(Tuple2 o, @Nullable Long aLong) { ProducerRecord record = new ProducerRecord<>(o.f0, o.f1.getBytes(StandardCharsets.UTF_8)); return record; } } FlinkKafkaProducer> producer = new FlinkKafkaProducer>( "default", new MyKafkaSerializationSchema(), prop2,Semantic.EXACTLY_ONCE); But there's error when runnng: java.lang.AbstractMethodError: com.geekplus.flinketl.schema.MyKafkaSerializationSchema.serialize(Ljava/lang/Object;Ljava/lang/Long;)Lorg/apache/flink/kafka/shaded/org/apache/kafka/clients/producer/ProducerRecord; Any suggestion on this? Thanks, Lei wangl...@geekplus.com.cn <mailto:wangl...@geekplus.com.cn>
Flip-105 can the debezium/canal SQL sink to database directly?
CREATE TABLE my_table ( id BIGINT, first_name STRING, last_name STRING, email STRING ) WITH ( 'connector'='kafka', 'topic'='user_topic', 'properties.bootstrap.servers'='localhost:9092', 'scan.startup.mode'='earliest-offset', 'format'='debezium-json' ); INSERT INTO mysql_sink_table SELECT id, first_name, last_name FROM my_table; What will happen after i execute the insert sql statement? For the update/delete message from kafka, the corresponding record will be updated or deleted in the mysql_sink_table? INSERT INTO mysql_sink_table SELECT id, first_name, last_name FROM my_table; Thanks, Lei wangl...@geekplus.com.cn
Re: Re: Flip-105 can the debezium/canal SQL sink to database directly?
Thanks Jingsong, Is there any document or example to this? I will build the flink-1.11 package and have a try. Thanks, Lei wangl...@geekplus.com.cn From: Jingsong Li Date: 2020-06-30 10:08 To: wangl...@geekplus.com.cn CC: user Subject: Re: Flip-105 can the debezium/canal SQL sink to database directly? Hi Lei, INSERT INTO jdbc_table SELECT * FROM changelog_table; For Flink 1.11 new connectors, you need to define the primary key for jdbc_table (and also your mysql table needs to have the corresponding primary key) because changelog_table has the "update", "delete" records. And then, jdbc sink will: - insert or update using "INSERT INTO ... ON DUPLICATE KEY UPDATE ." to deal with "insert" and "update" messages. - delete to deal with "delete" messages. So generally speaking, with the primary key, this mysql table will be the same to your source database table. (table for generating changelog) Best, Jingsong On Tue, Jun 30, 2020 at 9:58 AM wangl...@geekplus.com.cn wrote: CREATE TABLE my_table ( id BIGINT, first_name STRING, last_name STRING, email STRING ) WITH ( 'connector'='kafka', 'topic'='user_topic', 'properties.bootstrap.servers'='localhost:9092', 'scan.startup.mode'='earliest-offset', 'format'='debezium-json' ); INSERT INTO mysql_sink_table SELECT id, first_name, last_name FROM my_table; What will happen after i execute the insert sql statement? For the update/delete message from kafka, the corresponding record will be updated or deleted in the mysql_sink_table? INSERT INTO mysql_sink_table SELECT id, first_name, last_name FROM my_table; Thanks, Lei wangl...@geekplus.com.cn -- Best, Jingsong Lee
Re: Re: Flip-105 can the debezium/canal SQL sink to database directly?
CREATE TABLE t_pick_order ( order_no VARCHAR, status INT ) WITH ( 'connector' = 'kafka', 'topic' = 'example', 'scan.startup.mode' = 'latest-offset', 'properties.bootstrap.servers' = '172.19.78.32:9092', 'format' = 'canal-json' ) CREATE TABLE order_status ( order_no VARCHAR, status INT, PRIMARY KEY (order_no) NOT ENFORCED ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://xxx:3306/flink_test', 'table-name' = 'order_status', 'username' = 'dev', 'password' = '' ) But when i execute insert INTO order_status SELECT order_no, status FROM t_pick_order There's error: [ERROR] Could not execute SQL statement. Reason: org.apache.flink.table.api.TableException: Provided trait [BEFORE_AND_AFTER] can't satisfy required trait [ONLY_UPDATE_AFTER]. This is a bug in planner, please file an issue. Current node is TableSourceScan(table=[[default_catalog, default_database, t_pick_order]], fields=[order_no, status]) wangl...@geekplus.com.cn From: Danny Chan Date: 2020-06-30 20:25 To: wangl...@geekplus.com.cn Subject: Re: Re: Flip-105 can the debezium/canal SQL sink to database directly? Hi, wanglei2 ~ For primary key syntax you can reference [1] for the “PRIMARY KEY” part, notice that currently we only support the NOT ENFORCED mode. Here is the reason: >SQL standard specifies that a constraint can either be ENFORCED or NOT >ENFORCED. This controls if the constraint checks are performed on the >incoming/outgoing data. Flink does not own the data therefore the only mode we >want to support is the NOT ENFORCED mode. It is up to the user to ensure that >the query enforces key integrity. For DDL to create JDBC table, you can reference [2] [1] https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/create.html#create-table [2] https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/jdbc.html#how-to-create-a-jdbc-table Best, Danny Chan 在 2020年6月30日 +0800 AM10:25,wangl...@geekplus.com.cn ,写道: Thanks Jingsong, Is there any document or example to this? I will build the flink-1.11 package and have a try. Thanks, Lei wangl...@geekplus.com.cn From: Jingsong Li Date: 2020-06-30 10:08 To: wangl...@geekplus.com.cn CC: user Subject: Re: Flip-105 can the debezium/canal SQL sink to database directly? Hi Lei, INSERT INTO jdbc_table SELECT * FROM changelog_table; For Flink 1.11 new connectors, you need to define the primary key for jdbc_table (and also your mysql table needs to have the corresponding primary key) because changelog_table has the "update", "delete" records. And then, jdbc sink will: - insert or update using "INSERT INTO ... ON DUPLICATE KEY UPDATE ." to deal with "insert" and "update" messages. - delete to deal with "delete" messages. So generally speaking, with the primary key, this mysql table will be the same to your source database table. (table for generating changelog) Best, Jingsong On Tue, Jun 30, 2020 at 9:58 AM wangl...@geekplus.com.cn wrote: CREATE TABLE my_table ( id BIGINT, first_name STRING, last_name STRING, email STRING ) WITH ( 'connector'='kafka', 'topic'='user_topic', 'properties.bootstrap.servers'='localhost:9092', 'scan.startup.mode'='earliest-offset', 'format'='debezium-json' ); INSERT INTO mysql_sink_table SELECT id, first_name, last_name FROM my_table; What will happen after i execute the insert sql statement? For the update/delete message from kafka, the corresponding record will be updated or deleted in the mysql_sink_table? INSERT INTO mysql_sink_table SELECT id, first_name, last_name FROM my_table; Thanks, Lei wangl...@geekplus.com.cn -- Best, Jingsong Lee
How to dynamically initialize flink metrics in invoke method and then reuse it?
In one flink operator, i want to initialize multiple flink metrics according to message content. As the code below. public void invoke(ObjectNode node, Context context) throws Exception { String tableName = node.get("metadata").get("topic").asText(); Meter meter = getRuntimeContext().getMetricGroup().meter(tableName, new MeterView(10)); meter.markEvent(); log.info("### counter: " + meter.toString() + "\t" + meter.getCount()); But in this way every invoke call will initialize a new metrics and the count will be from zero again. How can i reuse the metric initialized before? Thanks, Lei wangl...@geekplus.com.cn
Re: Re: How to dynamically initialize flink metrics in invoke method and then reuse it?
Hi Xintong, Yes, initializing the metric in the `open` method works, but it doesn't solve my problem. I want to initialize the metric with a name that is extracted from the record content. Only in the `invoke` method i can do it. Actually my scenario is as follows. The record is MySQL binlog info. I want to monitor the qps by tableName. The tableName is different for every record. Thanks, Lei wangl...@geekplus.com.cn Sender: Xintong Song Send Time: 2020-07-03 13:14 Receiver: wangl...@geekplus.com.cn cc: user Subject: Re: How to dynamically initialize flink metrics in invoke method and then reuse it? Hi Lei, I think you should initialize the metric in the `open` method. Then you can save the initialized metric as a class field, and update it in the `invoke` method for each record. Thank you~ Xintong Song On Fri, Jul 3, 2020 at 11:50 AM wangl...@geekplus.com.cn wrote: In one flink operator, i want to initialize multiple flink metrics according to message content. As the code below. public void invoke(ObjectNode node, Context context) throws Exception { String tableName = node.get("metadata").get("topic").asText(); Meter meter = getRuntimeContext().getMetricGroup().meter(tableName, new MeterView(10)); meter.markEvent(); log.info("### counter: " + meter.toString() + "\t" + meter.getCount()); But in this way every invoke call will initialize a new metrics and the count will be from zero again. How can i reuse the metric initialized before? Thanks, Lei wangl...@geekplus.com.cn
Re: Re: How to dynamically initialize flink metrics in invoke method and then reuse it?
Seems there's no direct solution. Perhaps i can implement this by initializing a HashMap with all the possible value of tableName in `open` mehtod and get the corresponding Meter according to tableName in the `invoke` method. Thanks, Lei wangl...@geekplus.com.cn Sender: wangl...@geekplus.com.cn Send Time: 2020-07-03 14:27 Receiver: Xintong Song cc: user Subject: Re: Re: How to dynamically initialize flink metrics in invoke method and then reuse it? Hi Xintong, Yes, initializing the metric in the `open` method works, but it doesn't solve my problem. I want to initialize the metric with a name that is extracted from the record content. Only in the `invoke` method i can do it. Actually my scenario is as follows. The record is MySQL binlog info. I want to monitor the qps by tableName. The tableName is different for every record. Thanks, Lei wangl...@geekplus.com.cn Sender: Xintong Song Send Time: 2020-07-03 13:14 Receiver: wangl...@geekplus.com.cn cc: user Subject: Re: How to dynamically initialize flink metrics in invoke method and then reuse it? Hi Lei, I think you should initialize the metric in the `open` method. Then you can save the initialized metric as a class field, and update it in the `invoke` method for each record. Thank you~ Xintong Song On Fri, Jul 3, 2020 at 11:50 AM wangl...@geekplus.com.cn wrote: In one flink operator, i want to initialize multiple flink metrics according to message content. As the code below. public void invoke(ObjectNode node, Context context) throws Exception { String tableName = node.get("metadata").get("topic").asText(); Meter meter = getRuntimeContext().getMetricGroup().meter(tableName, new MeterView(10)); meter.markEvent(); log.info("### counter: " + meter.toString() + "\t" + meter.getCount()); But in this way every invoke call will initialize a new metrics and the count will be from zero again. How can i reuse the metric initialized before? Thanks, Lei wangl...@geekplus.com.cn
How to start flink standalone session on windows ?
There's no start-cluster.bat and flink.bat in bin directory. So how can i start flink on windowns OS? Thanks, Lei wangl...@geekplus.com.cn
Does RocksDBStateBackend need a separate RocksDB service?
In my code, I just setStateBackend with a hdfs direcoty. env.setStateBackend(new RocksDBStateBackend("hdfs://user/test/job")); Is there an embeded RocksDB service in the flink task? wangl...@geekplus.com.cn
从 state 中恢复数据,更改 yarn container 个数会有影响吗
从 RocketMQ 中消费数据做处理。 代码中最大的并行度为 8, 提交任务时指定 -ys 4 ,会自动分配 2 个 container 作为 taskMgr 运行一段时间后以 savepoint 方式停止。 再从 savepoint 恢复,此时指定 -ys 2 , 会分配 4 个container 作为 taskMgr , 但任务提交后程序不从 RocketMQ 消费数据了,消费 TPS 一直是 0,这是什么原因呢? 谢谢, 王磊 wangl...@geekplus.com.cn
IOException when using Prometheus Monitor
Standalone session flink, 1.8.2 version. Install prometheus and pushgateway on the same host. After start-cluster.sh, every 10 seconds there's following error log: 2019-11-20 17:23:12,849 WARN org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter - Failed to push metrics to PushGateway with jobName flink58c234884dcfb60a860d7a040aab6a4a. java.io.IOException: Response code from http://10.44.51.23:9091/metrics/job/flink58c234884dcfb60a860d7a040aab6a4a was 200 at org.apache.flink.shaded.io.prometheus.client.exporter.PushGateway.doRequest(PushGateway.java:297) at org.apache.flink.shaded.io.prometheus.client.exporter.PushGateway.push(PushGateway.java:105) at org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter.report(PrometheusPushGatewayReporter.java:76) at org.apache.flink.runtime.metrics.MetricRegistryImpl$ReporterTask.run(MetricRegistryImpl.java:430) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Any insight on this? Thanks, Lei wangl...@geekplus.com.cn
Re: Re: IOException when using Prometheus Monitor
Hi Chesnay, Although there's Exception, actually the metrics has been put to the pushgateway successfully. Promethues version i used: prometheus-2.8.1.linux-amd64.tar.gz pushgateway-1.0.0.linux-amd64.tar.gz flink-metrics-prometheus_2.12-1.8.2.jar I just download the tar.gz file to CentOS node, tar -xzvf and then start the process. Thanks, Lei wangl...@geekplus.com.cn From: Chesnay Schepler Date: 2019-11-20 17:46 To: wangl...@geekplus.com.cn; user Subject: Re: IOException when using Prometheus Monitor From what I found so far this appears to be an incompatibility between the pushgateway and client version. So you can either a) use an older version of the pushgateway b) bump the version of the prometheus reporter. Unfortunately I cannot tell you which version you would need. On 20/11/2019 10:24, wangl...@geekplus.com.cn wrote: Standalone session flink, 1.8.2 version. Install prometheus and pushgateway on the same host. After start-cluster.sh, every 10 seconds there's following error log: 2019-11-20 17:23:12,849 WARN org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter - Failed to push metrics to PushGateway with jobName flink58c234884dcfb60a860d7a040aab6a4a. java.io.IOException: Response code from http://10.44.51.23:9091/metrics/job/flink58c234884dcfb60a860d7a040aab6a4a was 200 at org.apache.flink.shaded.io.prometheus.client.exporter.PushGateway.doRequest(PushGateway.java:297) at org.apache.flink.shaded.io.prometheus.client.exporter.PushGateway.push(PushGateway.java:105) at org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter.report(PrometheusPushGatewayReporter.java:76) at org.apache.flink.runtime.metrics.MetricRegistryImpl$ReporterTask.run(MetricRegistryImpl.java:430) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Any insight on this? Thanks, Lei wangl...@geekplus.com.cn
Re: Re: IOException when using Prometheus Monitor
After downgrade the pushgateway to pushgateway-0.8.0.linux-amd64.tar.gz, no this Exception again. Thanks very much. wangl...@geekplus.com.cn From: wangl...@geekplus.com.cn Date: 2019-11-20 18:19 To: Chesnay Schepler; user Subject: Re: Re: IOException when using Prometheus Monitor Hi Chesnay, Although there's Exception, actually the metrics has been put to the pushgateway successfully. Promethues version i used: prometheus-2.8.1.linux-amd64.tar.gz pushgateway-1.0.0.linux-amd64.tar.gz flink-metrics-prometheus_2.12-1.8.2.jar I just download the tar.gz file to CentOS node, tar -xzvf and then start the process. Thanks, Lei wangl...@geekplus.com.cn From: Chesnay Schepler Date: 2019-11-20 17:46 To: wangl...@geekplus.com.cn; user Subject: Re: IOException when using Prometheus Monitor From what I found so far this appears to be an incompatibility between the pushgateway and client version. So you can either a) use an older version of the pushgateway b) bump the version of the prometheus reporter. Unfortunately I cannot tell you which version you would need. On 20/11/2019 10:24, wangl...@geekplus.com.cn wrote: Standalone session flink, 1.8.2 version. Install prometheus and pushgateway on the same host. After start-cluster.sh, every 10 seconds there's following error log: 2019-11-20 17:23:12,849 WARN org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter - Failed to push metrics to PushGateway with jobName flink58c234884dcfb60a860d7a040aab6a4a. java.io.IOException: Response code from http://10.44.51.23:9091/metrics/job/flink58c234884dcfb60a860d7a040aab6a4a was 200 at org.apache.flink.shaded.io.prometheus.client.exporter.PushGateway.doRequest(PushGateway.java:297) at org.apache.flink.shaded.io.prometheus.client.exporter.PushGateway.push(PushGateway.java:105) at org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter.report(PrometheusPushGatewayReporter.java:76) at org.apache.flink.runtime.metrics.MetricRegistryImpl$ReporterTask.run(MetricRegistryImpl.java:430) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Any insight on this? Thanks, Lei wangl...@geekplus.com.cn
Submit high version compiled code jar to low version flink cluster?
The flink cluster version is 1.8.2 The application source code needs some feature only supported in 1.9.1. So it is compiled with flink-1.9.1 denendency and builed to a fat jar with all the flink dependencies. What it will happen if I submit the high version builed jar to the low verion flink cluster? Thansk, Lei
How can i just implement a crontab function using flink?
I want to do something every one minute. Using TumblingWindow, the function will not be triigged if there's no message received during this minute. But i still need to execute the funtion. How can i implement it ? wangl...@geekplus.com.cn
Re: Re: How can i just implement a crontab function using flink?
Thanks, it's a alternative solution. wangl...@geekplus.com.cn From: Jörn Franke Date: 2019-05-24 16:31 To: wangl...@geekplus.com.cn CC: user Subject: Re: How can i just implement a crontab function using flink? Just sent a dummy event from the source system every minute Am 24.05.2019 um 10:20 schrieb "wangl...@geekplus.com.cn" : I want to do something every one minute. Using TumblingWindow, the function will not be triigged if there's no message received during this minute. But i still need to execute the funtion. How can i implement it ? wangl...@geekplus.com.cn
Re: Re: How can i just implement a crontab function using flink?
Thanks. got it wangl...@geekplus.com.cn From: Puneet Kinra Date: 2019-05-24 17:02 To: wangl...@geekplus.com.cn CC: user Subject: Re: How can i just implement a crontab function using flink? There is concept of periodic watermarker , you can use that if you are working on eventtime. On Fri, May 24, 2019 at 1:51 PM wangl...@geekplus.com.cn wrote: I want to do something every one minute. Using TumblingWindow, the function will not be triigged if there's no message received during this minute. But i still need to execute the funtion. How can i implement it ? wangl...@geekplus.com.cn -- Cheers Puneet Kinra Mobile:+918800167808 | Skype : puneet.ki...@customercentria.com e-mail :puneet.ki...@customercentria.com
How can I add config file as classpath in taskmgr node when submitting a flink job?
When starting a single node java application, I can add some config file to it. How can i implenment it when submitting a flink job? The config file need to be read from taskMgr node and used to initialize some classess. wangl...@geekplus.com.cn
Re: Re: How can I add config file as classpath in taskmgr node when submitting a flink job?
Thanks. Let me have a try wangl...@geekplus.com.cn From: Yang Wang Date: 2019-05-28 09:47 To: wangl...@geekplus.com.cn CC: user Subject: Re: How can I add config file as classpath in taskmgr node when submitting a flink job? Hi, wangleiYou could use the flink distributed cache to register some config files and then access them in your task.1. Register a cached fileStreamExecutionEnvironment.registerCachedFile(inputFile.toString(), "test_data", false);2. Access the file in your taskfinal Path testFile = getRuntimeContext().getDistributedCache().getFile("test_data").toPath(); wangl...@geekplus.com.cn 于2019年5月26日周日 上午12:06写道: When starting a single node java application, I can add some config file to it. How can i implenment it when submitting a flink job? The config file need to be read from taskMgr node and used to initialize some classess. wangl...@geekplus.com.cn
Re: Re: How can i just implement a crontab function using flink?
I tried。 But the MyProcessWindowFunction still not tigged when there's no event in the window Any insight on this? source.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks() { @Override public Watermark getCurrentWatermark() { return new Watermark(System.currentTimeMillis() - 1); } @Override public long extractTimestamp(Map map, long l) { return System.currentTimeMillis(); } }).windowAll(TumblingEventTimeWindows.of(Time.seconds(10))).process(new MyProcessWindowFunction()); wangl...@geekplus.com.cn From: Puneet Kinra Date: 2019-05-24 17:02 To: wangl...@geekplus.com.cn CC: user Subject: Re: How can i just implement a crontab function using flink? There is concept of periodic watermarker , you can use that if you are working on eventtime. On Fri, May 24, 2019 at 1:51 PM wangl...@geekplus.com.cn wrote: I want to do something every one minute. Using TumblingWindow, the function will not be triigged if there's no message received during this minute. But i still need to execute the funtion. How can i implement it ? wangl...@geekplus.com.cn -- Cheers Puneet Kinra Mobile:+918800167808 | Skype : puneet.ki...@customercentria.com e-mail :puneet.ki...@customercentria.com
How to trigger the window function even there's no message input in this window?
windowAll(TumblingProcessingTimeWindows.of(Time.seconds(10))).process(new MyProcessWindowFunction());How can i trigger the MyProcessWindowFunction even there's no input during this window time? wangl...@geekplus.com.cn
Unable to restore state value after job failed using RocksDBStateBackend
public class StateProcessTest extends KeyedProcessFunction, String> { private transient ValueState> state; public void processElement(Tuple2 value, Context ctx, Collector out) throws Exception { Tuple2 stateValue = state.value(); if(stateValue == null){ log.info("## initialize"); stateValue = new Tuple2(34l,56l); } state.update(stateValue); } @Override public void open(Configuration parameters) throws Exception { ValueStateDescriptor> descriptor = new ValueStateDescriptor>("avg", TypeInformation.of( new TypeHint>() {})); state = getRuntimeContext().getState(descriptor); } } Every time I restarted the job, The stateValue is still null. wangl...@geekplus.com.cn
Re: Re: Unable to restore state value after job failed using RocksDBStateBackend
I start and cancel it just in my intellij idea development environment. First click the run button, then click the red stop button, and then click the run button again. Let me google about the savepoint. Thanks, Lei Wang wangl...@geekplus.com.cn From: Stephan Ewen Date: 2019-06-25 20:36 To: user Subject: Re: Unable to restore state value after job failed using RocksDBStateBackend If you manually cancel and restart the job, state is only carried forward if you use a savepoint. Can you check if that is what you are doing? On Tue, Jun 25, 2019 at 2:21 PM Simon Su wrote: Hi wanglei Can you post how you restart the job ? Thanks, Simon On 06/25/2019 20:11,wangl...@geekplus.com.cn wrote: public class StateProcessTest extends KeyedProcessFunction, String> { private transient ValueState> state; public void processElement(Tuple2 value, Context ctx, Collector out) throws Exception { Tuple2 stateValue = state.value(); if(stateValue == null){ log.info("## initialize"); stateValue = new Tuple2(34l,56l); } state.update(stateValue); } @Override public void open(Configuration parameters) throws Exception { ValueStateDescriptor> descriptor = new ValueStateDescriptor>("avg", TypeInformation.of( new TypeHint>() {})); state = getRuntimeContext().getState(descriptor); } } Every time I restarted the job, The stateValue is still null. wangl...@geekplus.com.cn