How to use self defined json format when create table from kafka stream?

2020-03-04 Thread wangl...@geekplus.com.cn

I want to rigister a table from mysql binlog like this: 
tEnv.sqlUpdate("CREATE TABLE order(\n"
+ "order_id BIGINT,\n"
+ "order_no VARCHAR,\n"
+ ") WITH (\n"
+ "'connector.type' = 'kafka',\n"
...
+ "'update-mode' = 'append',\n"
+ "'format.type' = 'json',\n"
+ "'format.derive-schema' = 'true'\n"
+ ")");using the following log format: 
{
  "type" : "update",
  "timestamp" : 1583373066000,
  "binlog_filename" : "mysql-bin.000453",
  "binlog_position" : 923020943,
  "database" : "wms",
  "table_name" : "t_pick_order",
  "table_id" : 131936,
  "columns" : [ {
"id" : 1,
"name" : "order_id",
"column_type" : -5,
"last_value" : 4606458,
"value" : 4606458
  }, {
    "id" : 2,
"name" : "order_no",
"column_type" : 12,
"last_value" : "EDBMFSJ1S2003050006628",
"value" : "EDBMFSJ1S2003050006628"
  }]
}

Surely the format.type' = 'json',\n" will not parse the result as I expected.
Is there any method I can implement this? For example, using a self defined 
format class.

Thanks,
Lei



wangl...@geekplus.com.cn



Kafka sink only support append mode?

2020-03-09 Thread wangl...@geekplus.com.cn
I  wrote a simple program reading from kafka using  sql  and sink to kafka.
But only  'update-mode' = 'append' is supported for sink table and the query 
sql must have no group statement.
Only append mode is supported for kafka sink?

Thanks,
Lei




Dose flink-1.10 sql-client support kafka sink?

2020-03-10 Thread wangl...@geekplus.com.cn

I have configured  source table successfully using the following configuration:

- name: out_order
type: source
update-mode: append
schema:
- name: out_order_code
  type: STRING
- name: input_date
  type: BIGINT
- name: owner_code
  type: STRING
connector:
  property-version: 1
  type: kafka
  version: universal
  topic: out_order
  startup-mode: latest-offset
  properties:
  - key: zookeeper.connect
value: 172.19.78.32:2181
  - key: bootstrap.servers
value: 172.19.78.32:9092
  - key: group.id
  property-version: 1
  type: json
  schema: "ROW(out_order_code STRING,owner_code STRING,input_date BIGINT)"

How can i configure a sink table? I haven't found any useful docs for this.

Thanks,
Lei


Re: Re: Kafka sink only support append mode?

2020-03-10 Thread wangl...@geekplus.com.cn
Hi Jark, 
Thanks  for the  explanation.
The group by statement will result a not append stream. 
I have just tried a join statement and want to send the result to kafka, it 
also has the error:
 AppendStreamTableSink requires that Table has only insert changes
Why the join result is not appendable. It confused me.

Thanks,
Lei
 
From: Jark Wu
Date: 2020-03-09 19:25
To: wangl...@geekplus.com.cn
CC: user
Subject: Re: Kafka sink only support append mode?
Hi Lei,

Yes. Currently, Kafka sink only supports append mode. Other update mode (e.g. 
upsert mode / retract mode) is on the agenda. 
For now, you can customize a KafkaTableSink with implementing 
UpsertStreamTableSink interface, where you will get a Tuple2 
records, 
and the Boolean represents insert or delete operation. Then you can encode the 
insert/delete operation into Kafka storage or just ignore the operations. 

Best,
Jark

On Mon, 9 Mar 2020 at 19:14, wangl...@geekplus.com.cn 
 wrote:
I  wrote a simple program reading from kafka using  sql  and sink to kafka.
But only  'update-mode' = 'append' is supported for sink table and the query 
sql must have no group statement.
Only append mode is supported for kafka sink?

Thanks,
Lei




Re: Re: Dose flink-1.10 sql-client support kafka sink?

2020-03-10 Thread wangl...@geekplus.com.cn
Thanks, works now. 

Seems it is because i added the  
   schema: "ROW(out_order_code STRING,input_date BIGINT, owner_code STRING, 
status INT)"

under format label.

From: Arvid Heise
Date: 2020-03-10 20:51
To: wangl...@geekplus.com.cn
CC: user
Subject: Re: Dose flink-1.10 sql-client support kafka sink?
Hi Lei,

yes Kafka as a sink is supported albeit only for appends (no deletions/updates 
yet) [1].

An example is a bit hidden in the documentation [2]:
tables:
  - name: MyTableSink
type: sink-table
update-mode: append
connector:
  property-version: 1
  type: kafka
  version: "0.11"
  topic: OutputTopic
  properties:
zookeeper.connect: localhost:2181
bootstrap.servers: localhost:9092
group.id: testGroup
format:
  property-version: 1
  type: json
  derive-schema: true
schema:
  - name: rideId
data-type: BIGINT
  - name: lon
data-type: FLOAT
  - name: lat
data-type: FLOAT
  - name: rideTime
data-type: TIMESTAMP(3)
[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector
[2] 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sqlClient.html#detached-sql-queries

On Tue, Mar 10, 2020 at 10:51 AM wangl...@geekplus.com.cn 
 wrote:

I have configured  source table successfully using the following configuration:

- name: out_order
type: source
update-mode: append
schema:
- name: out_order_code
  type: STRING
- name: input_date
  type: BIGINT
- name: owner_code
  type: STRING
connector:
  property-version: 1
  type: kafka
  version: universal
  topic: out_order
  startup-mode: latest-offset
  properties:
  - key: zookeeper.connect
value: 172.19.78.32:2181
  - key: bootstrap.servers
value: 172.19.78.32:9092
  - key: group.id
  property-version: 1
  type: json
  schema: "ROW(out_order_code STRING,owner_code STRING,input_date BIGINT)"

How can i configure a sink table? I haven't found any useful docs for this.

Thanks,
Lei


Re: Re: Dose flink-1.10 sql-client support kafka sink?

2020-03-10 Thread wangl...@geekplus.com.cn
Hi Jark, 

I  have tried to use CREATE table DDL
First  ./bin/sql-client.sh embedded. Then create a table from kafka topic and 
it tell me table has been created.
But when I query with select * from tableName. There's error:

[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a 
suitable table factory for 
'org.apache.flink.table.factories.TableSourceFactory' in
the classpath.

Perhaps i need some jar to the lib directory.
But If i write the table configuration in the sql-client-defaults.yaml file,i 
can select the result correctly

Thanks,
Lei
 



 
From: Jark Wu
Date: 2020-03-11 11:13
To: wangl...@geekplus.com.cn
CC: Arvid Heise; user
Subject: Re: Re: Dose flink-1.10 sql-client support kafka sink?
Hi Lei,

CREATE TABLE DDL [1][2] is the recommended way to register a table since 1.9. 
And the yaml way might be deprecated in the future. 
By using DDL, a registered table can both be used as source and sink. 

Best,
Jark

[1]: 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/create.html#create-table
[2]: 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector

On Tue, 10 Mar 2020 at 21:52, wangl...@geekplus.com.cn 
 wrote:
Thanks, works now. 

Seems it is because i added the  
   schema: "ROW(out_order_code STRING,input_date BIGINT, owner_code STRING, 
status INT)"

under format label.

From: Arvid Heise
Date: 2020-03-10 20:51
To: wangl...@geekplus.com.cn
CC: user
Subject: Re: Dose flink-1.10 sql-client support kafka sink?
Hi Lei,

yes Kafka as a sink is supported albeit only for appends (no deletions/updates 
yet) [1].

An example is a bit hidden in the documentation [2]:
tables:
  - name: MyTableSink
type: sink-table
update-mode: append
connector:
  property-version: 1
  type: kafka
  version: "0.11"
  topic: OutputTopic
  properties:
zookeeper.connect: localhost:2181
bootstrap.servers: localhost:9092
group.id: testGroup
format:
  property-version: 1
  type: json
  derive-schema: true
schema:
  - name: rideId
data-type: BIGINT
  - name: lon
data-type: FLOAT
  - name: lat
data-type: FLOAT
  - name: rideTime
data-type: TIMESTAMP(3)
[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector
[2] 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sqlClient.html#detached-sql-queries

On Tue, Mar 10, 2020 at 10:51 AM wangl...@geekplus.com.cn 
 wrote:

I have configured  source table successfully using the following configuration:

- name: out_order
type: source
update-mode: append
schema:
- name: out_order_code
  type: STRING
- name: input_date
  type: BIGINT
- name: owner_code
  type: STRING
connector:
  property-version: 1
  type: kafka
  version: universal
  topic: out_order
  startup-mode: latest-offset
  properties:
  - key: zookeeper.connect
value: 172.19.78.32:2181
  - key: bootstrap.servers
value: 172.19.78.32:9092
  - key: group.id
  property-version: 1
  type: json
  schema: "ROW(out_order_code STRING,owner_code STRING,input_date BIGINT)"

How can i configure a sink table? I haven't found any useful docs for this.

Thanks,
Lei


Re: Re: Dose flink-1.10 sql-client support kafka sink?

2020-03-11 Thread wangl...@geekplus.com.cn

I am using flink-1.10.  But I add flink-json-1.9.1.jar and 
flink-sql-connector-kafka_2.11-1.9.1.jar to lib directory.
After change to flink-json-1.10.0.jar, 
flink-sql-connector-kafka_2.12-1.10.0.jar, it works.

But I have no idea why the yaml way works when i use  flink-json-1.9.1.jar and 
flink-sql-connector-kafka_2.11-1.9.1.jar in  flink-1.10 environment.

Thanks,
Lei



wangl...@geekplus.com.cn

 
From: wangl...@geekplus.com.cn
Date: 2020-03-11 14:51
To: Jark Wu
CC: Arvid Heise; user
Subject: Re: Re: Dose flink-1.10 sql-client support kafka sink?
Hi Jark, 

I  have tried to use CREATE table DDL
First  ./bin/sql-client.sh embedded. Then create a table from kafka topic and 
it tell me table has been created.
But when I query with select * from tableName. There's error:

[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a 
suitable table factory for 
'org.apache.flink.table.factories.TableSourceFactory' in
the classpath.

Perhaps i need some jar to the lib directory.
But If i write the table configuration in the sql-client-defaults.yaml file,i 
can select the result correctly

Thanks,
Lei
 



 
From: Jark Wu
Date: 2020-03-11 11:13
To: wangl...@geekplus.com.cn
CC: Arvid Heise; user
Subject: Re: Re: Dose flink-1.10 sql-client support kafka sink?
Hi Lei,

CREATE TABLE DDL [1][2] is the recommended way to register a table since 1.9. 
And the yaml way might be deprecated in the future. 
By using DDL, a registered table can both be used as source and sink. 

Best,
Jark

[1]: 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/create.html#create-table
[2]: 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector

On Tue, 10 Mar 2020 at 21:52, wangl...@geekplus.com.cn 
 wrote:
Thanks, works now. 

Seems it is because i added the  
   schema: "ROW(out_order_code STRING,input_date BIGINT, owner_code STRING, 
status INT)"

under format label.

From: Arvid Heise
Date: 2020-03-10 20:51
To: wangl...@geekplus.com.cn
CC: user
Subject: Re: Dose flink-1.10 sql-client support kafka sink?
Hi Lei,

yes Kafka as a sink is supported albeit only for appends (no deletions/updates 
yet) [1].

An example is a bit hidden in the documentation [2]:
tables:
  - name: MyTableSink
type: sink-table
update-mode: append
connector:
  property-version: 1
  type: kafka
  version: "0.11"
  topic: OutputTopic
  properties:
zookeeper.connect: localhost:2181
bootstrap.servers: localhost:9092
group.id: testGroup
format:
  property-version: 1
  type: json
  derive-schema: true
schema:
  - name: rideId
data-type: BIGINT
  - name: lon
data-type: FLOAT
  - name: lat
data-type: FLOAT
  - name: rideTime
data-type: TIMESTAMP(3)
[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector
[2] 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sqlClient.html#detached-sql-queries

On Tue, Mar 10, 2020 at 10:51 AM wangl...@geekplus.com.cn 
 wrote:

I have configured  source table successfully using the following configuration:

- name: out_order
type: source
update-mode: append
schema:
- name: out_order_code
  type: STRING
- name: input_date
  type: BIGINT
- name: owner_code
  type: STRING
connector:
  property-version: 1
  type: kafka
  version: universal
  topic: out_order
  startup-mode: latest-offset
  properties:
  - key: zookeeper.connect
value: 172.19.78.32:2181
  - key: bootstrap.servers
value: 172.19.78.32:9092
  - key: group.id
  property-version: 1
  type: json
  schema: "ROW(out_order_code STRING,owner_code STRING,input_date BIGINT)"

How can i configure a sink table? I haven't found any useful docs for this.

Thanks,
Lei


json 中 timestamp 类型在json中怎样写才能被 flink sql 识别

2020-03-11 Thread wangl...@geekplus.com.cn

用 sql-client create 了一个 kafka table:
CREATE TABLE order_status (
  out_order_code VARCHAR,
  intput_date TIMESTAMP(3),
  owner_code VARCHAR,
  status INT
  ) WITH (
  'connector.type' = 'kafka',.
  'format.type' = 'json',  'format.derive-schema' = 'true'   )
然后往 kafka 这个 topic 
发送消息:{"out_order_code":"MAMICK2020031048","input_date":"2020-03-11T13:00:00.123Z","owner_code":"WDTLEN04","status":90}
input_date 在 sql-clinet 端始终是 NULL. 
我把 发送的 input_date 改成 1583828700240  "2020-03-11 13:00:00"  "2020-03-11 
13:00:00.000" 也都不行。 
这个 TIMESTAMP(3)在JSON 中应该写成什么样子呢?

谢谢,
王磊



wangl...@geekplus.com.cn


回复: json 中 timestamp 类型在json中怎样写才能被 flink sql 识别

2020-03-11 Thread wangl...@geekplus.com.cn

Sorry i sent the Chinese written email to user@ 
Let me translate it to English.

I  create a table using sql-client from kafka topic:
CREATE TABLE order_status (
  out_order_code VARCHAR,
  intput_date TIMESTAMP(3),
  owner_code VARCHAR,
  status INT
  ) WITH (
  'connector.type' = 'kafka',.
  'format.type' = 'json',  'format.derive-schema' = 'true'   )
Then I send message to the topic: 
{"out_order_code":"MAMICK2020031048","input_date":"2020-03-11T13:00:00.123Z","owner_code":"WDTLEN04","status":90}
But the input_date is not recognized on the sql-client and is null, even i  
tried   1583828700240  "2020-03-11 13:00:00"  "2020-03-11 13:00:00.000" 

How should the timestamp(3) look like in the json message?

Thanks,
Lei
 




wangl...@geekplus.com.cn
 
发件人: wangl...@geekplus.com.cn
发送时间: 2020-03-11 17:41
收件人: user
主题: json 中 timestamp 类型在json中怎样写才能被 flink sql 识别

用 sql-client create 了一个 kafka table:
CREATE TABLE order_status (
  out_order_code VARCHAR,
  intput_date TIMESTAMP(3),
  owner_code VARCHAR,
  status INT
  ) WITH (
  'connector.type' = 'kafka',.
  'format.type' = 'json',  'format.derive-schema' = 'true'   )
然后往 kafka 这个 topic 
发送消息:{"out_order_code":"MAMICK2020031048","input_date":"2020-03-11T13:00:00.123Z","owner_code":"WDTLEN04","status":90}
input_date 在 sql-clinet 端始终是 NULL. 
我把 发送的 input_date 改成 1583828700240  "2020-03-11 13:00:00"  "2020-03-11 
13:00:00.000" 也都不行。 
这个 TIMESTAMP(3)在JSON 中应该写成什么样子呢?

谢谢,
王磊



wangl...@geekplus.com.cn


Re: Re: json 中 timestamp 类型在json中怎样写才能被 flink sql 识别

2020-03-11 Thread wangl...@geekplus.com.cn

Thanks Jark,

No word to express my '囧'.


wangl...@geekplus.com.cn 

Sender: Jark Wu
Send Time: 2020-03-11 18:32
Receiver: wangl...@geekplus.com.cn
cc: user; user-zh
Subject: Re: json 中 timestamp 类型在json中怎样写才能被 flink sql 识别
Hi Lei, 

The "2020-03-11T13:00:00.123Z" format is correct, but you defined the wrong 
field name in the DDL. 
It should be "input_date", not "intput_date".

Best,
Jark

On Wed, 11 Mar 2020 at 17:52, wangl...@geekplus.com.cn 
 wrote:

Sorry i sent the Chinese written email to user@ 
Let me translate it to English.

I  create a table using sql-client from kafka topic:
CREATE TABLE order_status (
  out_order_code VARCHAR,
  intput_date TIMESTAMP(3),
  owner_code VARCHAR,
  status INT
  ) WITH (
  'connector.type' = 'kafka',.
  'format.type' = 'json',  'format.derive-schema' = 'true'   )
Then I send message to the topic: 
{"out_order_code":"MAMICK2020031048","input_date":"2020-03-11T13:00:00.123Z","owner_code":"WDTLEN04","status":90}
But the input_date is not recognized on the sql-client and is null, even i  
tried   1583828700240  "2020-03-11 13:00:00"  "2020-03-11 13:00:00.000" 

How should the timestamp(3) look like in the json message?

Thanks,
Lei
 




wangl...@geekplus.com.cn
 
发件人: wangl...@geekplus.com.cn
发送时间: 2020-03-11 17:41
收件人: user
主题: json 中 timestamp 类型在json中怎样写才能被 flink sql 识别

用 sql-client create 了一个 kafka table:
CREATE TABLE order_status (
  out_order_code VARCHAR,
  intput_date TIMESTAMP(3),
  owner_code VARCHAR,
  status INT
  ) WITH (
  'connector.type' = 'kafka',.
  'format.type' = 'json',  'format.derive-schema' = 'true'   )
然后往 kafka 这个 topic 
发送消息:{"out_order_code":"MAMICK2020031048","input_date":"2020-03-11T13:00:00.123Z","owner_code":"WDTLEN04","status":90}
input_date 在 sql-clinet 端始终是 NULL. 
我把 发送的 input_date 改成 1583828700240  "2020-03-11 13:00:00"  "2020-03-11 
13:00:00.000" 也都不行。 
这个 TIMESTAMP(3)在JSON 中应该写成什么样子呢?

谢谢,
王磊



wangl...@geekplus.com.cn


How to set stateBackEnd in flink sql program?

2020-03-11 Thread wangl...@geekplus.com.cn

EnvironmentSettings settings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
TableEnvironment tEnv = TableEnvironment.create(settings);
tEnv.sqlUpdate()...
Is there a way i can set stateBackEnd like normal streaming program as 
folloing:StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new RocksDBStateBackend(args[0], true));



wangl...@geekplus.com.cn


Re: Re: How to set stateBackEnd in flink sql program?

2020-03-11 Thread wangl...@geekplus.com.cn
Hi Jingsong, 

So i can write the code as following?

EnvironmentSettings settings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
TableEnvironment tEnv = TableEnvironment.create(settings);
tEnv.getConfig().getConfiguration().setString("state.backend","rocksdb");eEnv.sqlUpdate(..)
Thanks,
Lei


wangl...@geekplus.com.cn
 
From: Jingsong Li
Date: 2020-03-12 11:32
To: wangl...@geekplus.com.cn
CC: user
Subject: Re: How to set stateBackEnd in flink sql program?
Hi wanglei,

If you are using Flink 1.10, you can set "state.backend=rocksdb" to 
"TableConfig.getConfiguration".
And you can find related config options here[1].

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/config.html

Jingsong Lee

On Thu, Mar 12, 2020 at 11:15 AM wangl...@geekplus.com.cn 
 wrote:

EnvironmentSettings settings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
TableEnvironment tEnv = TableEnvironment.create(settings);
tEnv.sqlUpdate()...
Is there a way i can set stateBackEnd like normal streaming program as 
folloing:StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new RocksDBStateBackend(args[0], true));



wangl...@geekplus.com.cn


-- 
Best, Jingsong Lee


dimention table join not work under sql-client fink-1.10.0

2020-03-12 Thread wangl...@geekplus.com.cn

Kafka source table:
CREATE TABLE out_order (
  out_order_code VARCHAR,
  intput_date BIGINT,
  owner_code VARCHAR
  ) WITH (
  'connector.type' = 'kafka',MySQL dimention table: CREATE TABLE dim_owner (
  owner_code VARCHAR,
  owner_name VARCHAR
  ) WITH (
  'connector.type' = 'jdbc',When i submit the sql:  SELECT 
o.out_order_code, o.input_date, o.owner_code, d.owner_name FROM out_order as o  
JOIN dim_owner FOR SYSTEM_TIME AS OF PROCTIME()  as d 
ON o.owner_code = d.owner_code;
There's error:  
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.TableException: Temporal table join currently only 
supports 'FOR SYSTEM_TIME AS OF' left table's proctime field, doesn't support 
'PROCTIME()'

Thanks,
Lei



wangl...@geekplus.com.cn 



Re: Re: dimention table join not work under sql-client fink-1.10.0

2020-03-12 Thread wangl...@geekplus.com.cn
Thanks, works now. 

Seems the open source version is  different from alibaba cloud: 
https://www.alibabacloud.com/help/doc-detail/62506.htm




wangl...@geekplus.com.cn 

From: Zhenghua Gao
Date: 2020-03-13 12:12
To: wangl...@geekplus.com.cn
CC: user
Subject: Re: dimention table join not work under sql-client fink-1.10.0
We don't support 'PROCTIME()' in a temporal table join. Please use a left 
table's proctime field. [1]

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/joins.html#usage-1

Best Regards,
Zhenghua Gao


On Fri, Mar 13, 2020 at 11:57 AM wangl...@geekplus.com.cn 
 wrote:

Kafka source table:
CREATE TABLE out_order (
  out_order_code VARCHAR,
  intput_date BIGINT,
  owner_code VARCHAR
  ) WITH (
  'connector.type' = 'kafka',MySQL dimention table: CREATE TABLE dim_owner (
  owner_code VARCHAR,
  owner_name VARCHAR
  ) WITH (
  'connector.type' = 'jdbc',When i submit the sql:  SELECT 
o.out_order_code, o.input_date, o.owner_code, d.owner_name FROM out_order as o  
JOIN dim_owner FOR SYSTEM_TIME AS OF PROCTIME()  as d 
ON o.owner_code = d.owner_code;
There's error:  
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.TableException: Temporal table join currently only 
supports 'FOR SYSTEM_TIME AS OF' left table's proctime field, doesn't support 
'PROCTIME()'

Thanks,
Lei



wangl...@geekplus.com.cn 



flink sql-client read hive orc table no result

2020-03-18 Thread wangl...@geekplus.com.cn
Hive table store as orc format: 
 CREATE  TABLE `robot_tr`(`robot_id` int,  `robot_time` bigint, 
`linear_velocity` double, `track_side_error` int) 
 partitioned by (warehouseid STRING) clustered by (robot_id) into 3 buckets 
 stored as orc tblproperties("transactional"='true');

Under hive client,  insert into one record and then select there will be the 
result to the console.

But under flink sql-client, when select * from  robot_tr, there's no result? 

Any insight on this?

Thanks,
Lei 



wangl...@geekplus.com.cn 



Re: Re: flink sql-client read hive orc table no result

2020-03-18 Thread wangl...@geekplus.com.cn

Tried again. Even i  remove the "clustered by (robot_id) into 3 buckets"  
statement, no result from flink sql-client

Thanks,
Lei 



wangl...@geekplus.com.cn
From: Kurt Young
Date: 2020-03-18 17:41
To: wangl...@geekplus.com.cn; lirui
CC: user
Subject: Re: flink sql-client read hive orc table no result
My guess is we haven't support hive bucket table yet.
cc Rui Li for confirmation.

Best,
Kurt


On Wed, Mar 18, 2020 at 5:19 PM wangl...@geekplus.com.cn 
 wrote:
Hive table store as orc format: 
 CREATE  TABLE `robot_tr`(`robot_id` int,  `robot_time` bigint, 
`linear_velocity` double, `track_side_error` int) 
 partitioned by (warehouseid STRING) clustered by (robot_id) into 3 buckets 
 stored as orc tblproperties("transactional"='true');

Under hive client,  insert into one record and then select there will be the 
result to the console.

But under flink sql-client, when select * from  robot_tr, there's no result? 

Any insight on this?

Thanks,
Lei 



wangl...@geekplus.com.cn 



Re: Re: flink sql-client read hive orc table no result

2020-03-18 Thread wangl...@geekplus.com.cn

Seems the root cause is  "transactional"='true'. 
After remove this,the table can be queryed from flink sql-client,even i add  
"clustered by (robot_id) into 3 buckets" again.

Thanks,
Lei


wangl...@geekplus.com.cn
 
From: Kurt Young
Date: 2020-03-18 18:04
To: wangl...@geekplus.com.cn
CC: lirui; user
Subject: Re: Re: flink sql-client read hive orc table no result
also try to remove "transactional"='true'?

Best,
Kurt


On Wed, Mar 18, 2020 at 5:54 PM wangl...@geekplus.com.cn 
 wrote:

Tried again. Even i  remove the "clustered by (robot_id) into 3 buckets"  
statement, no result from flink sql-client

Thanks,
Lei 



wangl...@geekplus.com.cn
From: Kurt Young
Date: 2020-03-18 17:41
To: wangl...@geekplus.com.cn; lirui
CC: user
Subject: Re: flink sql-client read hive orc table no result
My guess is we haven't support hive bucket table yet.
cc Rui Li for confirmation.

Best,
Kurt


On Wed, Mar 18, 2020 at 5:19 PM wangl...@geekplus.com.cn 
 wrote:
Hive table store as orc format: 
 CREATE  TABLE `robot_tr`(`robot_id` int,  `robot_time` bigint, 
`linear_velocity` double, `track_side_error` int) 
 partitioned by (warehouseid STRING) clustered by (robot_id) into 3 buckets 
 stored as orc tblproperties("transactional"='true');

Under hive client,  insert into one record and then select there will be the 
result to the console.

But under flink sql-client, when select * from  robot_tr, there's no result? 

Any insight on this?

Thanks,
Lei 



wangl...@geekplus.com.cn 



Streaming kafka data sink to hive

2020-03-19 Thread wangl...@geekplus.com.cn

We have many app logs on our app server  and want to parse the logs to structed 
table format and then sink to hive.
Seems it is good to use batch mode. The app log is hourly compressed and it is 
convenience to do partitioning.

We want to use streaming mode. Tail the app logs to Kafka,  then use flink to 
read kafka topic  and then sink to Hive.
I have several questions.

1  Is there any flink-hive-connector that i can use to write to hive 
streamingly?
2  Since HDFS is not friendly to frequently append and hive's data is stored to 
hdfs,  is it  OK if the throughput is high? 

Thanks,
Lei



wangl...@geekplus.com.cn 


Where can i find MySQL retract stream table sink java soure code?

2020-03-24 Thread wangl...@geekplus.com.cn

Create one table with kafka,  another table with MySQL  using flinksql.
Write a sql to read from kafka and write to MySQL.

INSERT INTO mysqlTable  SELECT status, COUNT(order_no) AS num FROM 
(SELECT order_no, LAST_VALUE(status) AS status FROM 
kafkaTable GROUP BY order_no) 
GROUP BY statusI think this is a retract stream. But 
where can i find the java source code  about MySQL retract table sink?
Thanks,Lei



wangl...@geekplus.com.cn 



Re: Re: Where can i find MySQL retract stream table sink java soure code?

2020-03-24 Thread wangl...@geekplus.com.cn

Thanks Jingsong.

When executing this sql, the mysql table record can be deleted. So i guess it 
is a retract stream.
I want to know the exactly java code it is generated and have a look at it.  

Thanks,
Lei




wangl...@geekplus.com.cn
 
Sender: Jingsong Li
Send Time: 2020-03-25 11:14
Receiver: wangl...@geekplus.com.cn
cc: user
Subject: Re: Where can i find MySQL retract stream table sink java soure code?
Hi,

This can be a upsert stream [1]

Best,
Jingsong Lee

On Wed, Mar 25, 2020 at 11:12 AM wangl...@geekplus.com.cn 
 wrote:

Create one table with kafka,  another table with MySQL  using flinksql.
Write a sql to read from kafka and write to MySQL.

INSERT INTO mysqlTable  SELECT status, COUNT(order_no) AS num FROM 
(SELECT order_no, LAST_VALUE(status) AS status FROM 
kafkaTable GROUP BY order_no) 
GROUP BY statusI think this is a retract stream. But 
where can i find the java source code  about MySQL retract table sink?
Thanks,Lei



wangl...@geekplus.com.cn 



-- 
Best, Jingsong Lee


Re: Re: Where can i find MySQL retract stream table sink java soure code?

2020-03-24 Thread wangl...@geekplus.com.cn

Seems it is here:  
https://github.com/apache/flink/tree/master/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc
There's no JDBCRetractTableSink, only  append and upsert.
I am confused why the MySQL record can be deleted. 

Thanks,
Lei 


wangl...@geekplus.com.cn 

 
Sender: wangl...@geekplus.com.cn
Send Time: 2020-03-25 11:25
Receiver: Jingsong Li
cc: user
Subject: Re: Re: Where can i find MySQL retract stream table sink java soure 
code?

Thanks Jingsong.

When executing this sql, the mysql table record can be deleted. So i guess it 
is a retract stream.
I want to know the exactly java code it is generated and have a look at it.  

Thanks,
Lei




wangl...@geekplus.com.cn
 
Sender: Jingsong Li
Send Time: 2020-03-25 11:14
Receiver: wangl...@geekplus.com.cn
cc: user
Subject: Re: Where can i find MySQL retract stream table sink java soure code?
Hi,

This can be a upsert stream [1]

Best,
Jingsong Lee

On Wed, Mar 25, 2020 at 11:12 AM wangl...@geekplus.com.cn 
 wrote:

Create one table with kafka,  another table with MySQL  using flinksql.
Write a sql to read from kafka and write to MySQL.

INSERT INTO mysqlTable  SELECT status, COUNT(order_no) AS num FROM 
(SELECT order_no, LAST_VALUE(status) AS status FROM 
kafkaTable GROUP BY order_no) 
GROUP BY statusI think this is a retract stream. But 
where can i find the java source code  about MySQL retract table sink?
Thanks,Lei



wangl...@geekplus.com.cn 



-- 
Best, Jingsong Lee


Re: Re: Where can i find MySQL retract stream table sink java soure code?

2020-03-24 Thread wangl...@geekplus.com.cn
Thanks Jingsong.

So JDBCTableSink now suport append and upsert mode.  Retract mode not  
available yet. It is right?

Thanks,
Lei 




wangl...@geekplus.com.cn 

 
Sender: Jingsong Li
Send Time: 2020-03-25 11:39
Receiver: wangl...@geekplus.com.cn
cc: user
Subject: Re: Where can i find MySQL retract stream table sink java soure code?
Hi,

Maybe you have some misunderstanding to upsert sink. You can take a look to 
[1], it can deal with "delete" records.

[1]https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/dynamic_tables.html#table-to-stream-conversion

Best,
Jingsong Lee

On Wed, Mar 25, 2020 at 11:37 AM Jingsong Li  wrote:
Hi,

This can be a upsert stream [1], and JDBC has upsert sink now [2].

[1]https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/dynamic_tables.html#table-to-stream-conversion
[2]https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connect.html#jdbc-connector

Best,
Jingsong Lee

On Wed, Mar 25, 2020 at 11:14 AM Jingsong Li  wrote:
Hi,

This can be a upsert stream [1]

Best,
Jingsong Lee

On Wed, Mar 25, 2020 at 11:12 AM wangl...@geekplus.com.cn 
 wrote:

Create one table with kafka,  another table with MySQL  using flinksql.
Write a sql to read from kafka and write to MySQL.

INSERT INTO mysqlTable  SELECT status, COUNT(order_no) AS num FROM 
(SELECT order_no, LAST_VALUE(status) AS status FROM 
kafkaTable GROUP BY order_no) 
GROUP BY statusI think this is a retract stream. But 
where can i find the java source code  about MySQL retract table sink?
Thanks,Lei



wangl...@geekplus.com.cn 



-- 
Best, Jingsong Lee


-- 
Best, Jingsong Lee


-- 
Best, Jingsong Lee


fink sql client not able to read parquet format table

2020-04-07 Thread wangl...@geekplus.com.cn

Hive table stored as parquet.

Under hive client: 
hive> select robotid from robotparquet limit 2;
OK
1291097
1291044


But under flink sql-client the result is 0
Flink SQL> select robotid  from robotparquet limit 2;
  robotid
 0
 0

Any insight on this?

Thanks,
Lei





wangl...@geekplus.com.cn 



Re: Re: fink sql client not able to read parquet format table

2020-04-09 Thread wangl...@geekplus.com.cn

I am using the newest 1.10 blink planner.

Perhaps it is because of the method i used to write the parquet file.

Receive kafka message, transform each message to a Java class Object, write the 
Object to HDFS using StreamingFileSink, add  the HDFS path as a partition of 
the hive table

No matter what the order of the field description in  hive ddl statement, the 
hive client will work, as long as  the field name is the same with Java Object 
field name.
But flink sql client will not work.

DataStream sourceRobot = source.map( x->transform(x));
final StreamingFileSink sink;
sink = StreamingFileSink
.forBulkFormat(new 
Path("hdfs://172.19.78.38:8020/user/root/wanglei/robotdata/parquet"),
ParquetAvroWriters.forReflectRecord(RobotUploadData0101.class))
For example 
RobotUploadData0101 has two fields:  robotId int, robotTime long

CREATE TABLE `robotparquet`(  `robotid` int,  `robottime` bigint ) and 
CREATE TABLE `robotparquet`(  `robottime` bigint,   `robotid` int)
is the same for hive client, but is different for flink-sql client

It is an expected behavior?

Thanks,
Lei



wangl...@geekplus.com.cn 

 
From: Jark Wu
Date: 2020-04-09 14:48
To: wangl...@geekplus.com.cn; Jingsong Li; lirui
CC: user
Subject: Re: fink sql client not able to read parquet format table
Hi Lei,

Are you using the newest 1.10 blink planner? 

I'm not familiar with Hive and parquet, but I know @Jingsong Li and 
@li...@apache.org are experts on this. Maybe they can help on this question. 

Best,
Jark

On Tue, 7 Apr 2020 at 16:17, wangl...@geekplus.com.cn 
 wrote:

Hive table stored as parquet.

Under hive client: 
hive> select robotid from robotparquet limit 2;
OK
1291097
1291044


But under flink sql-client the result is 0
Flink SQL> select robotid  from robotparquet limit 2;
  robotid
 0
 0

Any insight on this?

Thanks,
Lei





wangl...@geekplus.com.cn 



Re: Re: fink sql client not able to read parquet format table

2020-04-09 Thread wangl...@geekplus.com.cn

I am using Hive 3.1.1 
The table has many fields, each field is corresponded to a feild in the 
RobotUploadData0101 class.

CREATE TABLE `robotparquet`(`robotid` int,   `framecount` int,   `robottime` 
bigint,   `robotpathmode` int,   `movingmode` int,   `submovingmode` int,   
`xlocation` int,   `ylocation` int,   `robotradangle` int,   `velocity` int,   
`acceleration` int,   `angularvelocity` int,   `angularacceleration` int,   
`literangle` int,   `shelfangle` int,   `onloadshelfid` int,   `rcvdinstr` int, 
  `sensordist` int,   `pathstate` int,   `powerpresent` int,   `neednewpath` 
int,   `pathelenum` int,   `taskstate` int,   `receivedtaskid` int,   
`receivedcommcount` int,   `receiveddispatchinstr` int,   
`receiveddispatchcount` int,   `subtaskmode` int,   `versiontype` int,   
`version` int,   `liftheight` int,   `codecheckstatus` int,   `cameraworkmode` 
int,   `backrimstate` int,   `frontrimstate` int,   `pathselectstate` int,   
`codemisscount` int,   `groundcameraresult` int,   `shelfcameraresult` int,   
`softwarerespondframe` int,   `paramstate` int,   `pilotlamp` int,   
`codecount` int,   `dist2waitpoint` int,   `targetdistance` int,   
`obstaclecount` int,   `obstacleframe` int,   `cellcodex` int,   `cellcodey` 
int,   `cellangle` int,   `shelfqrcode` int,   `shelfqrangle` int,   `shelfqrx` 
int,   `shelfqry` int,   `trackthetaerror` int,   `tracksideerror` int,   
`trackfuseerror` int,   `lifterangleerror` int,   `lifterheighterror` int,   
`linearcmdspeed` int,   `angluarcmdspeed` int,   `liftercmdspeed` int,   
`rotatorcmdspeed` int) PARTITIONED BY (`hour` string) STORED AS parquet; 


Thanks,
Lei


wangl...@geekplus.com.cn 

 
From: Jingsong Li
Date: 2020-04-09 21:45
To: wangl...@geekplus.com.cn
CC: Jark Wu; lirui; user
Subject: Re: Re: fink sql client not able to read parquet format table
Hi lei,

Which hive version did you use?
Can you share the complete hive DDL?

Best,
Jingsong Lee

On Thu, Apr 9, 2020 at 7:15 PM wangl...@geekplus.com.cn 
 wrote:

I am using the newest 1.10 blink planner.

Perhaps it is because of the method i used to write the parquet file.

Receive kafka message, transform each message to a Java class Object, write the 
Object to HDFS using StreamingFileSink, add  the HDFS path as a partition of 
the hive table

No matter what the order of the field description in  hive ddl statement, the 
hive client will work, as long as  the field name is the same with Java Object 
field name.
But flink sql client will not work.

DataStream sourceRobot = source.map( x->transform(x));
final StreamingFileSink sink;
sink = StreamingFileSink
.forBulkFormat(new 
Path("hdfs://172.19.78.38:8020/user/root/wanglei/robotdata/parquet"),
ParquetAvroWriters.forReflectRecord(RobotUploadData0101.class))
For example 
RobotUploadData0101 has two fields:  robotId int, robotTime long

CREATE TABLE `robotparquet`(  `robotid` int,  `robottime` bigint ) and 
CREATE TABLE `robotparquet`(  `robottime` bigint,   `robotid` int)
is the same for hive client, but is different for flink-sql client

It is an expected behavior?

Thanks,
Lei



wangl...@geekplus.com.cn 

 
From: Jark Wu
Date: 2020-04-09 14:48
To: wangl...@geekplus.com.cn; Jingsong Li; lirui
CC: user
Subject: Re: fink sql client not able to read parquet format table
Hi Lei,

Are you using the newest 1.10 blink planner? 

I'm not familiar with Hive and parquet, but I know @Jingsong Li and 
@li...@apache.org are experts on this. Maybe they can help on this question. 

Best,
Jark

On Tue, 7 Apr 2020 at 16:17, wangl...@geekplus.com.cn 
 wrote:

Hive table stored as parquet.

Under hive client: 
hive> select robotid from robotparquet limit 2;
OK
1291097
1291044


But under flink sql-client the result is 0
Flink SQL> select robotid  from robotparquet limit 2;
  robotid
 0
 0

Any insight on this?

Thanks,
Lei





wangl...@geekplus.com.cn 



-- 
Best, Jingsong Lee


Re: Re: fink sql client not able to read parquet format table

2020-04-10 Thread wangl...@geekplus.com.cn

https://issues.apache.org/jira/browse/FLINK-17086

It is my first time to create a flink jira issue. 
Just point it out and correct it if I write something wrong.

Thanks,
Lei



wangl...@geekplus.com.cn 

 
From: Jingsong Li
Date: 2020-04-10 11:03
To: wangl...@geekplus.com.cn
CC: Jark Wu; lirui; user
Subject: Re: Re: fink sql client not able to read parquet format table
Hi lei,

I think the reason is that our `HiveMapredSplitReader` not supports name 
mapping reading for parquet format.
Can you create a JIRA for tracking this?

Best,
Jingsong Lee

On Fri, Apr 10, 2020 at 9:42 AM wangl...@geekplus.com.cn 
 wrote:

I am using Hive 3.1.1 
The table has many fields, each field is corresponded to a feild in the 
RobotUploadData0101 class.

CREATE TABLE `robotparquet`(`robotid` int,   `framecount` int,   `robottime` 
bigint,   `robotpathmode` int,   `movingmode` int,   `submovingmode` int,   
`xlocation` int,   `ylocation` int,   `robotradangle` int,   `velocity` int,   
`acceleration` int,   `angularvelocity` int,   `angularacceleration` int,   
`literangle` int,   `shelfangle` int,   `onloadshelfid` int,   `rcvdinstr` int, 
  `sensordist` int,   `pathstate` int,   `powerpresent` int,   `neednewpath` 
int,   `pathelenum` int,   `taskstate` int,   `receivedtaskid` int,   
`receivedcommcount` int,   `receiveddispatchinstr` int,   
`receiveddispatchcount` int,   `subtaskmode` int,   `versiontype` int,   
`version` int,   `liftheight` int,   `codecheckstatus` int,   `cameraworkmode` 
int,   `backrimstate` int,   `frontrimstate` int,   `pathselectstate` int,   
`codemisscount` int,   `groundcameraresult` int,   `shelfcameraresult` int,   
`softwarerespondframe` int,   `paramstate` int,   `pilotlamp` int,   
`codecount` int,   `dist2waitpoint` int,   `targetdistance` int,   
`obstaclecount` int,   `obstacleframe` int,   `cellcodex` int,   `cellcodey` 
int,   `cellangle` int,   `shelfqrcode` int,   `shelfqrangle` int,   `shelfqrx` 
int,   `shelfqry` int,   `trackthetaerror` int,   `tracksideerror` int,   
`trackfuseerror` int,   `lifterangleerror` int,   `lifterheighterror` int,   
`linearcmdspeed` int,   `angluarcmdspeed` int,   `liftercmdspeed` int,   
`rotatorcmdspeed` int) PARTITIONED BY (`hour` string) STORED AS parquet; 


Thanks,
Lei


wangl...@geekplus.com.cn 

 
From: Jingsong Li
Date: 2020-04-09 21:45
To: wangl...@geekplus.com.cn
CC: Jark Wu; lirui; user
Subject: Re: Re: fink sql client not able to read parquet format table
Hi lei,

Which hive version did you use?
Can you share the complete hive DDL?

Best,
Jingsong Lee

On Thu, Apr 9, 2020 at 7:15 PM wangl...@geekplus.com.cn 
 wrote:

I am using the newest 1.10 blink planner.

Perhaps it is because of the method i used to write the parquet file.

Receive kafka message, transform each message to a Java class Object, write the 
Object to HDFS using StreamingFileSink, add  the HDFS path as a partition of 
the hive table

No matter what the order of the field description in  hive ddl statement, the 
hive client will work, as long as  the field name is the same with Java Object 
field name.
But flink sql client will not work.

DataStream sourceRobot = source.map( x->transform(x));
final StreamingFileSink sink;
sink = StreamingFileSink
.forBulkFormat(new 
Path("hdfs://172.19.78.38:8020/user/root/wanglei/robotdata/parquet"),
ParquetAvroWriters.forReflectRecord(RobotUploadData0101.class))
For example 
RobotUploadData0101 has two fields:  robotId int, robotTime long

CREATE TABLE `robotparquet`(  `robotid` int,  `robottime` bigint ) and 
CREATE TABLE `robotparquet`(  `robottime` bigint,   `robotid` int)
is the same for hive client, but is different for flink-sql client

It is an expected behavior?

Thanks,
Lei



wangl...@geekplus.com.cn 

 
From: Jark Wu
Date: 2020-04-09 14:48
To: wangl...@geekplus.com.cn; Jingsong Li; lirui
CC: user
Subject: Re: fink sql client not able to read parquet format table
Hi Lei,

Are you using the newest 1.10 blink planner? 

I'm not familiar with Hive and parquet, but I know @Jingsong Li and 
@li...@apache.org are experts on this. Maybe they can help on this question. 

Best,
Jark

On Tue, 7 Apr 2020 at 16:17, wangl...@geekplus.com.cn 
 wrote:

Hive table stored as parquet.

Under hive client: 
hive> select robotid from robotparquet limit 2;
OK
1291097
1291044


But under flink sql-client the result is 0
Flink SQL> select robotid  from robotparquet limit 2;
  robotid
 0
 0

Any insight on this?

Thanks,
Lei





wangl...@geekplus.com.cn 



-- 
Best, Jingsong Lee


-- 
Best, Jingsong Lee


FlinkSQL query error when specify json-schema.

2020-04-16 Thread wangl...@geekplus.com.cn

CREATE TABLE user_log(
`id` INT,
`timestamp`  BIGINT
) WITH (
'connector.type' = 'kafka',
'connector.version' = 'universal',
'connector.topic' = 'wanglei_jsontest',
'connector.startup-mode' = 'latest-offset',
'connector.properties.0.key' = 'zookeeper.connect',
'connector.properties.0.value' = '172.19.78.32:2181',
'connector.properties.1.key' = 'bootstrap.servers',
'connector.properties.1.value' = '172.19.78.32:9092',
'update-mode' = 'append',
'format.type' = 'json',
'format.json-schema' = '{
"type": "object",
"properties": {
   "id": {"type": "integer"},
   "timestamp": {"type": "number"}
}
}'
);

Then select * from user_log;

org.apache.flink.table.api.ValidationException: Type INT of table field 'id' 
does not match with the physical type LEGACY('DECIMAL', 'DECIMAL') of the 'id' 
field of the TableSource return type.

Seems the specified type "integer", "number" can not be mapped to  INT, BIGINT 

How can i solve this problem?

Thanks,
Lei



wangl...@geekplus.com.cn 



Re: Re: FlinkSQL query error when specify json-schema.

2020-04-16 Thread wangl...@geekplus.com.cn

Thanks, I have tried. 

 'format.derive-schema' = 'true' will work.

But if i insist to use format.json-schema,  the CREATE TABLE must be writtten 
as: 

`id` DECIMAL(38,18),
`timestamp` DECIMAL(38,18)



wangl...@geekplus.com.cn 

 
From: Benchao Li
Date: 2020-04-16 16:56
To: wangl...@geekplus.com.cn
CC: user
Subject: Re: FlinkSQL query error when specify json-schema.
Hi wanglei,

You don't need to specify 'format.json-schema', the format can derive schema 
from the DDL.
Your exception above means the schema in 'format.json-schema' and DDL are not 
match.

wangl...@geekplus.com.cn  于2020年4月16日周四 下午4:21写道:

CREATE TABLE user_log(
`id` INT,
`timestamp`  BIGINT
) WITH (
'connector.type' = 'kafka',
'connector.version' = 'universal',
'connector.topic' = 'wanglei_jsontest',
'connector.startup-mode' = 'latest-offset',
'connector.properties.0.key' = 'zookeeper.connect',
'connector.properties.0.value' = '172.19.78.32:2181',
'connector.properties.1.key' = 'bootstrap.servers',
'connector.properties.1.value' = '172.19.78.32:9092',
'update-mode' = 'append',
'format.type' = 'json',
'format.json-schema' = '{
"type": "object",
"properties": {
   "id": {"type": "integer"},
   "timestamp": {"type": "number"}
}
}'
);

Then select * from user_log;

org.apache.flink.table.api.ValidationException: Type INT of table field 'id' 
does not match with the physical type LEGACY('DECIMAL', 'DECIMAL') of the 'id' 
field of the TableSource return type.

Seems the specified type "integer", "number" can not be mapped to  INT, BIGINT 

How can i solve this problem?

Thanks,
Lei



wangl...@geekplus.com.cn 



-- 
Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; libenc...@pku.edu.cn


FlinkSQL Upsert/Retraction 写入 MySQL 的问题

2020-04-26 Thread wangl...@geekplus.com.cn


INSERT INTO  mysql_sink SELECT  f1, count(*) FROM kafka_src GROUP BY f1

每从 kafka 过来一条新的记录,会生成两条记录 Tuple2, 旧的被删除,新的会添加上。
但这个 Sink 到底是用到了 UpsertStream 还是 RetractStream 呢,怎么判断是 UpsertStream 还是 
RetractStream 呢?

我看 
https://github.com/apache/flink/tree/master/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc
 没有 Retract 方式
实际上使用了 JDBCUpsertTableSink.java 的代码写入 MySQL 吗?



如若不带 group by 直接:
INSERT INTO  mysql_sink SELECT  f1,  f2 FROM kafka_src
主键冲突写入 mysql 是会出错的,怎么可以用 Upsert 的方式直接覆盖呢?





wangl...@geekplus.com.cn



1.11 snapshot: Name or service not knownname localhost and taskMgr not started

2020-04-29 Thread wangl...@geekplus.com.cn

1 Clone the 1.11 snapshot repository
2 Build it on windowns 
3 Scp the flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT dir to 
a CentOS  server
4 ./bin/start-cluster.sh on the CentOS server 


There's message: 
Starting cluster.
Starting standalonesession daemon on host test_3.6.
: Name or service not knownname localhost

Only the jobMgr is started. The taskMgr not start 

Any insight on this?

Thanks
Lei



wangl...@geekplus.com.cn 


Re: Re: 1.11 snapshot: Name or service not knownname localhost and taskMgr not started

2020-04-29 Thread wangl...@geekplus.com.cn

localhost is available on my CentOS machine.  I can start the cluster(including 
taskMgr) with start-cluster.sh when using flink-1.10 release tgz package
change the slave file localhost to 127.0.0.1 can resolve the problem. But it 
request the password of the host.
Not change  slave file, i can first start-cluster(only job manager started). 
And then “taskmanager.sh start“  to start the taskMgr. No password needed.

Thanks,
Lei



wangl...@geekplus.com.cn 

 
Sender: Xintong Song
Send Time: 2020-04-30 12:13
Receiver: wangl...@geekplus.com.cn
cc: user
Subject: Re: 1.11 snapshot: Name or service not knownname localhost and taskMgr 
not started
Hi Lei,

Could you check whether the hostname 'localhost' is available on your CentOS 
machine? This is usually defined in "/etc/hosts".
You can also try to modify the slaves file, replacing 'localhost' with 
'127.0.0.1'. The path is: /conf/slaves

Thank you~
Xintong Song


On Thu, Apr 30, 2020 at 11:38 AM wangl...@geekplus.com.cn 
 wrote:

1 Clone the 1.11 snapshot repository
2 Build it on windowns 
3 Scp the flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT dir to 
a CentOS  server
4 ./bin/start-cluster.sh on the CentOS server 


There's message: 
Starting cluster.
Starting standalonesession daemon on host test_3.6.
: Name or service not knownname localhost

Only the jobMgr is started. The taskMgr not start 

Any insight on this?

Thanks
Lei



wangl...@geekplus.com.cn 


flink how to access remote hdfs using namenode nameservice

2020-05-07 Thread wangl...@geekplus.com.cn

According to 
https://ci.apache.org/projects/flink/flink-docs-stable/ops/jobmanager_high_availability.html
 

I am deploying standalone  cluster with jobmanager HA and need the hdfs 
address:  

high-availability.storageDir: hdfs:///flink/recovery  

My hadoop is a remote cluster. I can write it as  
hdfs://active-namenode-ip:8020. But this way lost namenode HA

Is there's any method that I can config it as hdfs://name-service:8020 

Thanks,
Lei




wangl...@geekplus.com.cn 



java.lang.AbstractMethodError when implementing KafkaSerializationSchema

2020-05-22 Thread wangl...@geekplus.com.cn

public class MyKafkaSerializationSchema implements 
KafkaSerializationSchema> {
@Override
public ProducerRecord serialize(Tuple2 o, 
@Nullable Long aLong) {
ProducerRecord record = new ProducerRecord<>(o.f0,
o.f1.getBytes(StandardCharsets.UTF_8));
return record;
}
}
FlinkKafkaProducer> producer = new 
FlinkKafkaProducer>(
"default", new MyKafkaSerializationSchema(),
prop2,Semantic.EXACTLY_ONCE);

But there's  error when runnng:

java.lang.AbstractMethodError: 
com.geekplus.flinketl.schema.MyKafkaSerializationSchema.serialize(Ljava/lang/Object;Ljava/lang/Long;)Lorg/apache/flink/kafka/shaded/org/apache/kafka/clients/producer/ProducerRecord;

Any suggestion on this?

Thanks,
Lei


wangl...@geekplus.com.cn 



回复: Re: java.lang.AbstractMethodError when implementing KafkaSerializationSchema

2020-05-30 Thread wangl...@geekplus.com.cn

It is because the jar conflict and i have fixed it. 

I put  flink-connector-kafka_2.11-1.10.0.jar in the flink lib directory. 
Also in my project pom file has the dependency  flink-connector-kafka and 
builded as a fat jar

Thanks,
Lei



wangl...@geekplus.com.cn

 
发件人: Leonard Xu
发送时间: 2020-05-26 15:47
收件人: Aljoscha Krettek
抄送: user
主题: Re: java.lang.AbstractMethodError when implementing KafkaSerializationSchema
Hi,wanglei

I think Aljoscha is wright. Could you post your denpendency list?
Dependency flink-connector-kafka is used in dataStream Application which you 
should use, dependency flink-sql-connector-kafka is used in Table API & SQL 
Application. We should only add one of them because the two dependency will 
conflict.   

Best,
Leonard Xu

在 2020年5月26日,15:02,Aljoscha Krettek  写道:

I think what might be happening is that you're mixing dependencies from the 
flink-sql-connector-kafka and the proper flink-connector-kafka that should be 
used with the DataStream API. Could that be the case?

Best,
Aljoscha

On 25.05.20 19:18, Piotr Nowojski wrote:
Hi,
It would be helpful if you could provide full stack trace, what Flink version 
and which Kafka connector version are you using?
It sounds like either a dependency convergence error (mixing Kafka 
dependencies/various versions of flink-connector-kafka inside a single job/jar) 
or some shading issue. Can you check your project for such issues (`mvn 
dependency:tree` command [1]).
Also what’s a bit suspicious for me is the return type:
Lorg/apache/flink/kafka/shaded/org/apache/kafka/clients/producer/ProducerRecord;
I’m not sure, but I was not aware that we are shading Kafka dependency in our 
connectors? Are you manually shading something?
Piotrek
[1] 
https://maven.apache.org/plugins/maven-dependency-plugin/examples/resolving-conflicts-using-the-dependency-tree.html
 
<https://maven.apache.org/plugins/maven-dependency-plugin/examples/resolving-conflicts-using-the-dependency-tree.html>
On 22 May 2020, at 15:34, wangl...@geekplus.com.cn wrote:


public class MyKafkaSerializationSchema implements 
KafkaSerializationSchema> {
@Override
public ProducerRecord serialize(Tuple2 o, 
@Nullable Long aLong) {
ProducerRecord record = new ProducerRecord<>(o.f0,
o.f1.getBytes(StandardCharsets.UTF_8));
return record;
}
}
FlinkKafkaProducer> producer = new 
FlinkKafkaProducer>(
"default", new MyKafkaSerializationSchema(),
prop2,Semantic.EXACTLY_ONCE);

But there's  error when runnng:

java.lang.AbstractMethodError: 
com.geekplus.flinketl.schema.MyKafkaSerializationSchema.serialize(Ljava/lang/Object;Ljava/lang/Long;)Lorg/apache/flink/kafka/shaded/org/apache/kafka/clients/producer/ProducerRecord;

Any suggestion on this?

Thanks,
Lei
wangl...@geekplus.com.cn <mailto:wangl...@geekplus.com.cn>




Flip-105 can the debezium/canal SQL sink to database directly?

2020-06-29 Thread wangl...@geekplus.com.cn

CREATE TABLE my_table (
  id BIGINT,
  first_name STRING,
  last_name STRING,
  email STRING
) WITH (
 'connector'='kafka',
 'topic'='user_topic',
 'properties.bootstrap.servers'='localhost:9092',
 'scan.startup.mode'='earliest-offset',
 'format'='debezium-json'
);
INSERT INTO  mysql_sink_table SELECT id, first_name, last_name FROM  my_table;

What will happen after  i execute the insert sql statement? For the 
update/delete message from kafka, the corresponding record will be updated or 
deleted in the mysql_sink_table?
INSERT INTO  mysql_sink_table SELECT id, first_name, last_name FROM  my_table; 

Thanks,
Lei



wangl...@geekplus.com.cn 



Re: Re: Flip-105 can the debezium/canal SQL sink to database directly?

2020-06-29 Thread wangl...@geekplus.com.cn
Thanks Jingsong,

Is there any document or example to this?
I will build the flink-1.11 package and have a try.

Thanks,
Lei  



wangl...@geekplus.com.cn 
 
From: Jingsong Li
Date: 2020-06-30 10:08
To: wangl...@geekplus.com.cn
CC: user
Subject: Re: Flip-105 can the debezium/canal SQL sink to database directly?
Hi Lei,

INSERT INTO  jdbc_table SELECT * FROM changelog_table;

For Flink 1.11 new connectors, you need to define the primary key for 
jdbc_table (and also your mysql table needs to have the corresponding primary 
key) because changelog_table has the "update", "delete" records.

And then, jdbc sink will:
- insert or update using "INSERT INTO ... ON DUPLICATE KEY UPDATE ." to deal 
with "insert" and "update" messages.
- delete to deal with "delete" messages.

So generally speaking, with the primary key, this mysql table will be the same 
to your source database table. (table for generating changelog)

Best,
Jingsong

On Tue, Jun 30, 2020 at 9:58 AM wangl...@geekplus.com.cn 
 wrote:

CREATE TABLE my_table (
  id BIGINT,
  first_name STRING,
  last_name STRING,
  email STRING
) WITH (
 'connector'='kafka',
 'topic'='user_topic',
 'properties.bootstrap.servers'='localhost:9092',
 'scan.startup.mode'='earliest-offset',
 'format'='debezium-json'
);
INSERT INTO  mysql_sink_table SELECT id, first_name, last_name FROM  my_table;

What will happen after  i execute the insert sql statement? For the 
update/delete message from kafka, the corresponding record will be updated or 
deleted in the mysql_sink_table?
INSERT INTO  mysql_sink_table SELECT id, first_name, last_name FROM  my_table; 

Thanks,
Lei



wangl...@geekplus.com.cn 



-- 
Best, Jingsong Lee


Re: Re: Flip-105 can the debezium/canal SQL sink to database directly?

2020-07-01 Thread wangl...@geekplus.com.cn
CREATE TABLE t_pick_order (
  order_no VARCHAR,
  status INT
  ) WITH (
  'connector' = 'kafka',
  'topic' = 'example',
  'scan.startup.mode' = 'latest-offset',
  'properties.bootstrap.servers' = '172.19.78.32:9092',
  'format' = 'canal-json'
   )
CREATE TABLE order_status (
  order_no VARCHAR,
  status INT, PRIMARY KEY (order_no) NOT ENFORCED
  ) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:mysql://xxx:3306/flink_test',
  'table-name' = 'order_status',
  'username' = 'dev',
  'password' = ''
   )



But when i execute insert INTO order_status SELECT order_no, status FROM 
t_pick_order  
There's error: 

[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.TableException: Provided trait [BEFORE_AND_AFTER] 
can't satisfy required trait [ONLY_UPDATE_AFTER]. This is a bug in planner, 
please file an issue. 
Current node is TableSourceScan(table=[[default_catalog, default_database, 
t_pick_order]], fields=[order_no, status])




wangl...@geekplus.com.cn 
From: Danny Chan
Date: 2020-06-30 20:25
To: wangl...@geekplus.com.cn
Subject: Re: Re: Flip-105 can the debezium/canal SQL sink to database directly?
Hi, wanglei2 ~ 

For primary key syntax you can reference [1] for the “PRIMARY KEY” part, notice 
that currently we only support the NOT ENFORCED mode. Here is the reason:

>SQL standard specifies that a constraint can either be ENFORCED or NOT 
>ENFORCED. This controls if the constraint checks are performed on the 
>incoming/outgoing data. Flink does not own the data therefore the only mode we 
>want to support is the NOT ENFORCED mode. It is up to the user to ensure that 
>the query enforces key integrity.

For DDL to create JDBC table, you can reference [2]

[1] 
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/create.html#create-table
[2] 
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/jdbc.html#how-to-create-a-jdbc-table

Best, 
Danny Chan
在 2020年6月30日 +0800 AM10:25,wangl...@geekplus.com.cn 
,写道:
Thanks Jingsong,

Is there any document or example to this?
I will build the flink-1.11 package and have a try.

Thanks,
Lei  



wangl...@geekplus.com.cn 
 
From: Jingsong Li
Date: 2020-06-30 10:08
To: wangl...@geekplus.com.cn
CC: user
Subject: Re: Flip-105 can the debezium/canal SQL sink to database directly?
Hi Lei, 

INSERT INTO  jdbc_table SELECT * FROM changelog_table;

For Flink 1.11 new connectors, you need to define the primary key for 
jdbc_table (and also your mysql table needs to have the corresponding primary 
key) because changelog_table has the "update", "delete" records.

And then, jdbc sink will:
- insert or update using "INSERT INTO ... ON DUPLICATE KEY UPDATE ." to deal 
with "insert" and "update" messages.
- delete to deal with "delete" messages.

So generally speaking, with the primary key, this mysql table will be the same 
to your source database table. (table for generating changelog)

Best,
Jingsong

On Tue, Jun 30, 2020 at 9:58 AM wangl...@geekplus.com.cn 
 wrote:

CREATE TABLE my_table (
  id BIGINT,
  first_name STRING,
  last_name STRING,
  email STRING
) WITH (
 'connector'='kafka',
 'topic'='user_topic',
 'properties.bootstrap.servers'='localhost:9092',
 'scan.startup.mode'='earliest-offset',
 'format'='debezium-json'
);
INSERT INTO  mysql_sink_table SELECT id, first_name, last_name FROM  my_table;

What will happen after  i execute the insert sql statement? For the 
update/delete message from kafka, the corresponding record will be updated or 
deleted in the mysql_sink_table?
INSERT INTO  mysql_sink_table SELECT id, first_name, last_name FROM  my_table; 

Thanks,
Lei



wangl...@geekplus.com.cn 



--
Best, Jingsong Lee


How to dynamically initialize flink metrics in invoke method and then reuse it?

2020-07-02 Thread wangl...@geekplus.com.cn

In one flink operator, i want to initialize multiple flink metrics according to 
message content. 
As the code below.

public void invoke(ObjectNode node, Context context) throws Exception {

String tableName = node.get("metadata").get("topic").asText();
Meter meter = getRuntimeContext().getMetricGroup().meter(tableName, new 
MeterView(10));
meter.markEvent();
log.info("### counter: " + meter.toString() + "\t" +  meter.getCount());


But in this way every invoke call will initialize a new metrics and the count 
will be from zero again.
How can i reuse the metric initialized before?

Thanks,
Lei


wangl...@geekplus.com.cn 



Re: Re: How to dynamically initialize flink metrics in invoke method and then reuse it?

2020-07-02 Thread wangl...@geekplus.com.cn
Hi Xintong, 

Yes, initializing the metric in the `open` method works, but it doesn't solve 
my problem. 
I want to initialize the metric with a name that is extracted from the record 
content. Only in the `invoke` method i can do it.

Actually my scenario is as follows.
The record is MySQL binlog info.  I want to monitor the qps by tableName. The 
tableName is different for every record. 

Thanks,
Lei




wangl...@geekplus.com.cn 

Sender: Xintong Song
Send Time: 2020-07-03 13:14
Receiver: wangl...@geekplus.com.cn
cc: user
Subject: Re: How to dynamically initialize flink metrics in invoke method and 
then reuse it?
Hi Lei,

I think you should initialize the metric in the `open` method. Then you can 
save the initialized metric as a class field, and update it in the `invoke` 
method for each record.

Thank you~
Xintong Song


On Fri, Jul 3, 2020 at 11:50 AM wangl...@geekplus.com.cn 
 wrote:

In one flink operator, i want to initialize multiple flink metrics according to 
message content. 
As the code below.

public void invoke(ObjectNode node, Context context) throws Exception {

String tableName = node.get("metadata").get("topic").asText();
Meter meter = getRuntimeContext().getMetricGroup().meter(tableName, new 
MeterView(10));
meter.markEvent();
log.info("### counter: " + meter.toString() + "\t" +  meter.getCount());


But in this way every invoke call will initialize a new metrics and the count 
will be from zero again.
How can i reuse the metric initialized before?

Thanks,
Lei


wangl...@geekplus.com.cn 



Re: Re: How to dynamically initialize flink metrics in invoke method and then reuse it?

2020-07-02 Thread wangl...@geekplus.com.cn

Seems there's no direct solution.
Perhaps i can implement this by initializing a HashMap with all 
the possible value of tableName  in `open` mehtod and get the corresponding  
Meter according to tableName in the `invoke` method. 


Thanks,
Lei 


wangl...@geekplus.com.cn 
 
Sender: wangl...@geekplus.com.cn
Send Time: 2020-07-03 14:27
Receiver: Xintong Song
cc: user
Subject: Re: Re: How to dynamically initialize flink metrics in invoke method 
and then reuse it?
Hi Xintong, 

Yes, initializing the metric in the `open` method works, but it doesn't solve 
my problem. 
I want to initialize the metric with a name that is extracted from the record 
content. Only in the `invoke` method i can do it.

Actually my scenario is as follows.
The record is MySQL binlog info.  I want to monitor the qps by tableName. The 
tableName is different for every record. 

Thanks,
Lei




wangl...@geekplus.com.cn 

Sender: Xintong Song
Send Time: 2020-07-03 13:14
Receiver: wangl...@geekplus.com.cn
cc: user
Subject: Re: How to dynamically initialize flink metrics in invoke method and 
then reuse it?
Hi Lei,

I think you should initialize the metric in the `open` method. Then you can 
save the initialized metric as a class field, and update it in the `invoke` 
method for each record.

Thank you~
Xintong Song


On Fri, Jul 3, 2020 at 11:50 AM wangl...@geekplus.com.cn 
 wrote:

In one flink operator, i want to initialize multiple flink metrics according to 
message content. 
As the code below.

public void invoke(ObjectNode node, Context context) throws Exception {

String tableName = node.get("metadata").get("topic").asText();
Meter meter = getRuntimeContext().getMetricGroup().meter(tableName, new 
MeterView(10));
meter.markEvent();
log.info("### counter: " + meter.toString() + "\t" +  meter.getCount());


But in this way every invoke call will initialize a new metrics and the count 
will be from zero again.
How can i reuse the metric initialized before?

Thanks,
Lei


wangl...@geekplus.com.cn 



How to start flink standalone session on windows ?

2020-07-23 Thread wangl...@geekplus.com.cn

There's no  start-cluster.bat and flink.bat in bin directory.

So how can i start flink on windowns OS?

Thanks,
Lei 


wangl...@geekplus.com.cn 


Does RocksDBStateBackend need a separate RocksDB service?

2019-08-07 Thread wangl...@geekplus.com.cn

In  my  code,  I just setStateBackend with a hdfs direcoty.
   env.setStateBackend(new 
RocksDBStateBackend("hdfs://user/test/job")); 

Is there an embeded  RocksDB  service in the flink task? 



wangl...@geekplus.com.cn


从 state 中恢复数据,更改 yarn container 个数会有影响吗

2019-11-04 Thread wangl...@geekplus.com.cn

从 RocketMQ 中消费数据做处理。
代码中最大的并行度为 8, 提交任务时指定 -ys 4 ,会自动分配 2 个 container 作为 taskMgr
运行一段时间后以 savepoint 方式停止。
再从 savepoint 恢复,此时指定 -ys 2 , 会分配 4 个container 作为 taskMgr , 但任务提交后程序不从 RocketMQ 
消费数据了,消费 TPS 一直是 0,这是什么原因呢?


谢谢,
王磊 




wangl...@geekplus.com.cn


IOException when using Prometheus Monitor

2019-11-20 Thread wangl...@geekplus.com.cn
Standalone session  flink, 1.8.2 version.
Install prometheus and pushgateway on the same host.

After start-cluster.sh,  every 10 seconds there's following error log:
2019-11-20 17:23:12,849 WARN  
org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter  - Failed to 
push metrics to PushGateway with jobName flink58c234884dcfb60a860d7a040aab6a4a.
java.io.IOException: Response code from 
http://10.44.51.23:9091/metrics/job/flink58c234884dcfb60a860d7a040aab6a4a was 
200
at 
org.apache.flink.shaded.io.prometheus.client.exporter.PushGateway.doRequest(PushGateway.java:297)
at 
org.apache.flink.shaded.io.prometheus.client.exporter.PushGateway.push(PushGateway.java:105)
at 
org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter.report(PrometheusPushGatewayReporter.java:76)
at 
org.apache.flink.runtime.metrics.MetricRegistryImpl$ReporterTask.run(MetricRegistryImpl.java:430)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)


Any insight on this?

Thanks,
Lei




wangl...@geekplus.com.cn


Re: Re: IOException when using Prometheus Monitor

2019-11-20 Thread wangl...@geekplus.com.cn
Hi Chesnay,

Although there's Exception, actually the metrics has been put to the 
pushgateway successfully. 

Promethues version i used:

prometheus-2.8.1.linux-amd64.tar.gz
pushgateway-1.0.0.linux-amd64.tar.gz
flink-metrics-prometheus_2.12-1.8.2.jar

I just download the tar.gz file to CentOS node,  tar -xzvf and then start the 
process.

Thanks,
Lei





wangl...@geekplus.com.cn
 
From: Chesnay Schepler
Date: 2019-11-20 17:46
To: wangl...@geekplus.com.cn; user
Subject: Re: IOException when using Prometheus Monitor
From what I found so far this appears to be an incompatibility between the 
pushgateway and client version.

So you can either
a) use an older version of the pushgateway
b) bump the version of the prometheus reporter.

Unfortunately I cannot tell you which version you would need.

On 20/11/2019 10:24, wangl...@geekplus.com.cn wrote:
Standalone session  flink, 1.8.2 version.
Install prometheus and pushgateway on the same host.

After start-cluster.sh,  every 10 seconds there's following error log:
2019-11-20 17:23:12,849 WARN  
org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter  - Failed to 
push metrics to PushGateway with jobName flink58c234884dcfb60a860d7a040aab6a4a.
java.io.IOException: Response code from 
http://10.44.51.23:9091/metrics/job/flink58c234884dcfb60a860d7a040aab6a4a was 
200
at 
org.apache.flink.shaded.io.prometheus.client.exporter.PushGateway.doRequest(PushGateway.java:297)
at 
org.apache.flink.shaded.io.prometheus.client.exporter.PushGateway.push(PushGateway.java:105)
at 
org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter.report(PrometheusPushGatewayReporter.java:76)
at 
org.apache.flink.runtime.metrics.MetricRegistryImpl$ReporterTask.run(MetricRegistryImpl.java:430)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)


Any insight on this?

Thanks,
Lei




wangl...@geekplus.com.cn



Re: Re: IOException when using Prometheus Monitor

2019-11-20 Thread wangl...@geekplus.com.cn
After downgrade the  pushgateway to pushgateway-0.8.0.linux-amd64.tar.gz, no 
this Exception again.

Thanks very much.



wangl...@geekplus.com.cn
 
From: wangl...@geekplus.com.cn
Date: 2019-11-20 18:19
To: Chesnay Schepler; user
Subject: Re: Re: IOException when using Prometheus Monitor
Hi Chesnay,

Although there's Exception, actually the metrics has been put to the 
pushgateway successfully. 

Promethues version i used:

prometheus-2.8.1.linux-amd64.tar.gz
pushgateway-1.0.0.linux-amd64.tar.gz
flink-metrics-prometheus_2.12-1.8.2.jar

I just download the tar.gz file to CentOS node,  tar -xzvf and then start the 
process.

Thanks,
Lei





wangl...@geekplus.com.cn
 
From: Chesnay Schepler
Date: 2019-11-20 17:46
To: wangl...@geekplus.com.cn; user
Subject: Re: IOException when using Prometheus Monitor
From what I found so far this appears to be an incompatibility between the 
pushgateway and client version.

So you can either
a) use an older version of the pushgateway
b) bump the version of the prometheus reporter.

Unfortunately I cannot tell you which version you would need.

On 20/11/2019 10:24, wangl...@geekplus.com.cn wrote:
Standalone session  flink, 1.8.2 version.
Install prometheus and pushgateway on the same host.

After start-cluster.sh,  every 10 seconds there's following error log:
2019-11-20 17:23:12,849 WARN  
org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter  - Failed to 
push metrics to PushGateway with jobName flink58c234884dcfb60a860d7a040aab6a4a.
java.io.IOException: Response code from 
http://10.44.51.23:9091/metrics/job/flink58c234884dcfb60a860d7a040aab6a4a was 
200
at 
org.apache.flink.shaded.io.prometheus.client.exporter.PushGateway.doRequest(PushGateway.java:297)
at 
org.apache.flink.shaded.io.prometheus.client.exporter.PushGateway.push(PushGateway.java:105)
at 
org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter.report(PrometheusPushGatewayReporter.java:76)
at 
org.apache.flink.runtime.metrics.MetricRegistryImpl$ReporterTask.run(MetricRegistryImpl.java:430)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)


Any insight on this?

Thanks,
Lei




wangl...@geekplus.com.cn



Submit high version compiled code jar to low version flink cluster?

2019-12-29 Thread wangl...@geekplus.com.cn

The flink cluster version is 1.8.2
The application source code needs some feature only supported in 1.9.1.  So it 
is compiled with flink-1.9.1 denendency and builed to a fat jar with all the 
flink dependencies.
What it will happen if I submit the high version builed jar to the low verion 
flink cluster?

Thansk,
Lei






How can i just implement a crontab function using flink?

2019-05-24 Thread wangl...@geekplus.com.cn

I want to do something every one minute.

Using TumblingWindow, the function will not be triigged if there's no message 
received during this minute. But  i still need to execute the funtion.

How can i implement it ? 



wangl...@geekplus.com.cn
 


Re: Re: How can i just implement a crontab function using flink?

2019-05-24 Thread wangl...@geekplus.com.cn

Thanks, it's a alternative solution.



wangl...@geekplus.com.cn
 
From: Jörn Franke
Date: 2019-05-24 16:31
To: wangl...@geekplus.com.cn
CC: user
Subject: Re: How can i just implement a crontab function using flink?
Just sent a dummy event from the source system every minute

Am 24.05.2019 um 10:20 schrieb "wangl...@geekplus.com.cn" 
:


I want to do something every one minute.

Using TumblingWindow, the function will not be triigged if there's no message 
received during this minute. But  i still need to execute the funtion.

How can i implement it ? 



wangl...@geekplus.com.cn
 


Re: Re: How can i just implement a crontab function using flink?

2019-05-24 Thread wangl...@geekplus.com.cn

Thanks. got it 



wangl...@geekplus.com.cn
 
From: Puneet Kinra
Date: 2019-05-24 17:02
To: wangl...@geekplus.com.cn
CC: user
Subject: Re: How can i just implement a crontab function using flink?
There is concept of periodic watermarker , you can use that
if you are working on eventtime.

On Fri, May 24, 2019 at 1:51 PM wangl...@geekplus.com.cn 
 wrote:

I want to do something every one minute.

Using TumblingWindow, the function will not be triigged if there's no message 
received during this minute. But  i still need to execute the funtion.

How can i implement it ? 



wangl...@geekplus.com.cn
 


-- 
Cheers 

Puneet Kinra

Mobile:+918800167808 | Skype : puneet.ki...@customercentria.com
e-mail :puneet.ki...@customercentria.com



How can I add config file as classpath in taskmgr node when submitting a flink job?

2019-05-25 Thread wangl...@geekplus.com.cn

When starting  a single node java application, I can add some config file to it.

How can i implenment it when submitting a flink job? The config file need to be 
read from taskMgr node and used to initialize some classess.





wangl...@geekplus.com.cn


Re: Re: How can I add config file as classpath in taskmgr node when submitting a flink job?

2019-05-28 Thread wangl...@geekplus.com.cn

Thanks. Let me have a try


wangl...@geekplus.com.cn
 
From: Yang Wang
Date: 2019-05-28 09:47
To: wangl...@geekplus.com.cn
CC: user
Subject: Re: How can I add config file as classpath in taskmgr node when 
submitting a flink job?
Hi, wangleiYou could use the flink distributed cache to register some config 
files and then access them in your task.1. Register a cached 
fileStreamExecutionEnvironment.registerCachedFile(inputFile.toString(), 
"test_data", false);2. Access the file in your taskfinal Path testFile = 
getRuntimeContext().getDistributedCache().getFile("test_data").toPath();

wangl...@geekplus.com.cn  于2019年5月26日周日 上午12:06写道:

When starting  a single node java application, I can add some config file to it.

How can i implenment it when submitting a flink job? The config file need to be 
read from taskMgr node and used to initialize some classess.





wangl...@geekplus.com.cn


Re: Re: How can i just implement a crontab function using flink?

2019-06-14 Thread wangl...@geekplus.com.cn

I  tried。 But the  MyProcessWindowFunction still not tigged when there's no 
event in the window

Any insight on this?


source.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks() {
@Override
public Watermark getCurrentWatermark() {
return new Watermark(System.currentTimeMillis() - 1);
}

@Override
public long extractTimestamp(Map map, long l) {
return System.currentTimeMillis();
}
}).windowAll(TumblingEventTimeWindows.of(Time.seconds(10))).process(new 
MyProcessWindowFunction());



wangl...@geekplus.com.cn
 
From: Puneet Kinra
Date: 2019-05-24 17:02
To: wangl...@geekplus.com.cn
CC: user
Subject: Re: How can i just implement a crontab function using flink?
There is concept of periodic watermarker , you can use that
if you are working on eventtime.

On Fri, May 24, 2019 at 1:51 PM wangl...@geekplus.com.cn 
 wrote:

I want to do something every one minute.

Using TumblingWindow, the function will not be triigged if there's no message 
received during this minute. But  i still need to execute the funtion.

How can i implement it ? 



wangl...@geekplus.com.cn
 


-- 
Cheers 

Puneet Kinra

Mobile:+918800167808 | Skype : puneet.ki...@customercentria.com
e-mail :puneet.ki...@customercentria.com



How to trigger the window function even there's no message input in this window?

2019-06-14 Thread wangl...@geekplus.com.cn

windowAll(TumblingProcessingTimeWindows.of(Time.seconds(10))).process(new 
MyProcessWindowFunction());How can i trigger the MyProcessWindowFunction even 
there's no input during this window time? 



wangl...@geekplus.com.cn



Unable to restore state value after job failed using RocksDBStateBackend

2019-06-25 Thread wangl...@geekplus.com.cn
public class StateProcessTest extends KeyedProcessFunction, String> {

private transient ValueState> state;
public void processElement(Tuple2 value, Context ctx, 
Collector out) throws Exception {


Tuple2 stateValue = state.value();

if(stateValue == null){
log.info("##  initialize");
stateValue = new Tuple2(34l,56l);
}
state.update(stateValue);
  
}

@Override
public void open(Configuration parameters) throws Exception {
ValueStateDescriptor> descriptor = new 
ValueStateDescriptor>("avg", TypeInformation.of(
new TypeHint>() {}));
state = getRuntimeContext().getState(descriptor);
}
}



Every time I restarted the job,   The stateValue is still null.




wangl...@geekplus.com.cn
 


Re: Re: Unable to restore state value after job failed using RocksDBStateBackend

2019-06-25 Thread wangl...@geekplus.com.cn

I  start and cancel it just in my intellij idea development environment.

First click the run button, then click the red stop button, and then click the 
run button again. 

Let me google about the savepoint.

Thanks,
Lei Wang




wangl...@geekplus.com.cn
 
From: Stephan Ewen
Date: 2019-06-25 20:36
To: user
Subject: Re: Unable to restore state value after job failed using 
RocksDBStateBackend
If you manually cancel and restart the job, state is only carried forward if 
you use a savepoint.
Can you check if that is what you are doing?

On Tue, Jun 25, 2019 at 2:21 PM Simon Su  wrote:

Hi wanglei

 Can you post how you restart the job ? 

Thanks,
Simon
On 06/25/2019 20:11,wangl...@geekplus.com.cn wrote: 
public class StateProcessTest extends KeyedProcessFunction, String> {

private transient ValueState> state;
public void processElement(Tuple2 value, Context ctx, 
Collector out) throws Exception {


Tuple2 stateValue = state.value();

if(stateValue == null){
log.info("##  initialize");
stateValue = new Tuple2(34l,56l);
}
state.update(stateValue);
  
}

@Override
public void open(Configuration parameters) throws Exception {
ValueStateDescriptor> descriptor = new 
ValueStateDescriptor>("avg", TypeInformation.of(
new TypeHint>() {}));
state = getRuntimeContext().getState(descriptor);
}
}



Every time I restarted the job,   The stateValue is still null.




wangl...@geekplus.com.cn