How to start flink standalone session on windows ?

2020-07-23 Thread wangl...@geekplus.com.cn
There's no start-cluster.bat and flink.bat in bin directory. So how can i start flink on windowns OS? Thanks, Lei wangl...@geekplus.com.cn

Re: Re: How to dynamically initialize flink metrics in invoke method and then reuse it?

2020-07-02 Thread wangl...@geekplus.com.cn
Seems there's no direct solution. Perhaps i can implement this by initializing a HashMap with all the possible value of tableName in `open` mehtod and get the corresponding Meter according to tableName in the `invoke` method. Thanks, Lei wangl...@geekplus.com.cn Sender:

Re: Re: How to dynamically initialize flink metrics in invoke method and then reuse it?

2020-07-02 Thread wangl...@geekplus.com.cn
inlog info. I want to monitor the qps by tableName. The tableName is different for every record. Thanks, Lei wangl...@geekplus.com.cn Sender: Xintong Song Send Time: 2020-07-03 13:14 Receiver: wangl...@geekplus.com.cn cc: user Subject: Re: How to dynamically initialize flink metrics in i

How to dynamically initialize flink metrics in invoke method and then reuse it?

2020-07-02 Thread wangl...@geekplus.com.cn
. How can i reuse the metric initialized before? Thanks, Lei wangl...@geekplus.com.cn

Re: Re: Flip-105 can the debezium/canal SQL sink to database directly?

2020-07-01 Thread wangl...@geekplus.com.cn
_order There's error: [ERROR] Could not execute SQL statement. Reason: org.apache.flink.table.api.TableException: Provided trait [BEFORE_AND_AFTER] can't satisfy required trait [ONLY_UPDATE_AFTER]. This is a bug in planner, please file an issue. Current node is TableSourceScan(table

Re: Re: Flip-105 can the debezium/canal SQL sink to database directly?

2020-06-29 Thread wangl...@geekplus.com.cn
Thanks Jingsong, Is there any document or example to this? I will build the flink-1.11 package and have a try. Thanks, Lei wangl...@geekplus.com.cn From: Jingsong Li Date: 2020-06-30 10:08 To: wangl...@geekplus.com.cn CC: user Subject: Re: Flip-105 can the debezium/canal SQL sink to

Flip-105 can the debezium/canal SQL sink to database directly?

2020-06-29 Thread wangl...@geekplus.com.cn
deleted in the mysql_sink_table? INSERT INTO mysql_sink_table SELECT id, first_name, last_name FROM my_table; Thanks, Lei wangl...@geekplus.com.cn

回复: Re: java.lang.AbstractMethodError when implementing KafkaSerializationSchema

2020-05-30 Thread wangl...@geekplus.com.cn
It is because the jar conflict and i have fixed it. I put flink-connector-kafka_2.11-1.10.0.jar in the flink lib directory. Also in my project pom file has the dependency flink-connector-kafka and builded as a fat jar Thanks, Lei wangl...@geekplus.com.cn 发件人: Leonard Xu 发送时间: 2020-05

java.lang.AbstractMethodError when implementing KafkaSerializationSchema

2020-05-22 Thread wangl...@geekplus.com.cn
ma.serialize(Ljava/lang/Object;Ljava/lang/Long;)Lorg/apache/flink/kafka/shaded/org/apache/kafka/clients/producer/ProducerRecord; Any suggestion on this? Thanks, Lei wangl...@geekplus.com.cn

flink how to access remote hdfs using namenode nameservice

2020-05-07 Thread wangl...@geekplus.com.cn
as hdfs://active-namenode-ip:8020. But this way lost namenode HA Is there's any method that I can config it as hdfs://name-service:8020 Thanks, Lei wangl...@geekplus.com.cn

Re: Re: 1.11 snapshot: Name or service not knownname localhost and taskMgr not started

2020-04-29 Thread wangl...@geekplus.com.cn
first start-cluster(only job manager started). And then “taskmanager.sh start“ to start the taskMgr. No password needed. Thanks, Lei wangl...@geekplus.com.cn Sender: Xintong Song Send Time: 2020-04-30 12:13 Receiver: wangl...@geekplus.com.cn cc: user Subject: Re: 1.11 snapshot: Name or

1.11 snapshot: Name or service not knownname localhost and taskMgr not started

2020-04-29 Thread wangl...@geekplus.com.cn
t_3.6. : Name or service not knownname localhost Only the jobMgr is started. The taskMgr not start Any insight on this? Thanks Lei wangl...@geekplus.com.cn

FlinkSQL Upsert/Retraction 写入 MySQL 的问题

2020-04-26 Thread wangl...@geekplus.com.cn
/java/org/apache/flink/api/java/io/jdbc 没有 Retract 方式 实际上使用了 JDBCUpsertTableSink.java 的代码写入 MySQL 吗? 如若不带 group by 直接: INSERT INTO mysql_sink SELECT f1, f2 FROM kafka_src 主键冲突写入 mysql 是会出错的,怎么可以用 Upsert 的方式直接覆盖呢? wangl...@geekplus.com.cn

Re: Re: FlinkSQL query error when specify json-schema.

2020-04-16 Thread wangl...@geekplus.com.cn
Thanks, I have tried. 'format.derive-schema' = 'true' will work. But if i insist to use format.json-schema, the CREATE TABLE must be writtten as: `id` DECIMAL(38,18), `timestamp` DECIMAL(38,18) wangl...@geekplus.com.cn From: Benchao Li Date: 2020-0

FlinkSQL query error when specify json-schema.

2020-04-16 Thread wangl...@geekplus.com.cn
nteger"}, "timestamp": {"type": "number"} } }' ); Then select * from user_log; org.apache.flink.table.api.ValidationException: Type INT of table field 'id' does not match with the physical type LEGACY('DECIMAL', 'DECIMAL') of the 'id' field of the TableSource return type. Seems the specified type "integer", "number" can not be mapped to INT, BIGINT How can i solve this problem? Thanks, Lei wangl...@geekplus.com.cn

Re: Re: fink sql client not able to read parquet format table

2020-04-10 Thread wangl...@geekplus.com.cn
https://issues.apache.org/jira/browse/FLINK-17086 It is my first time to create a flink jira issue. Just point it out and correct it if I write something wrong. Thanks, Lei wangl...@geekplus.com.cn From: Jingsong Li Date: 2020-04-10 11:03 To: wangl...@geekplus.com.cn CC: Jark Wu; lirui

Re: Re: fink sql client not able to read parquet format table

2020-04-09 Thread wangl...@geekplus.com.cn
, `linearcmdspeed` int, `angluarcmdspeed` int, `liftercmdspeed` int, `rotatorcmdspeed` int) PARTITIONED BY (`hour` string) STORED AS parquet; Thanks, Lei wangl...@geekplus.com.cn From: Jingsong Li Date: 2020-04-09 21:45 To: wangl...@geekplus.com.cn CC: Jark Wu; lirui; user Subject: Re: Re: fink

Re: Re: fink sql client not able to read parquet format table

2020-04-09 Thread wangl...@geekplus.com.cn
`robotid` int, `robottime` bigint ) and CREATE TABLE `robotparquet`( `robottime` bigint, `robotid` int) is the same for hive client, but is different for flink-sql client It is an expected behavior? Thanks, Lei wangl...@geekplus.com.cn From: Jark Wu Date: 2020-04-09 14:48 To: wangl...@ge

fink sql client not able to read parquet format table

2020-04-07 Thread wangl...@geekplus.com.cn
0 Any insight on this? Thanks, Lei wangl...@geekplus.com.cn

Re: Re: Where can i find MySQL retract stream table sink java soure code?

2020-03-24 Thread wangl...@geekplus.com.cn
Thanks Jingsong. So JDBCTableSink now suport append and upsert mode. Retract mode not available yet. It is right? Thanks, Lei wangl...@geekplus.com.cn Sender: Jingsong Li Send Time: 2020-03-25 11:39 Receiver: wangl...@geekplus.com.cn cc: user Subject: Re: Where can i find MySQL

Re: Re: Where can i find MySQL retract stream table sink java soure code?

2020-03-24 Thread wangl...@geekplus.com.cn
Seems it is here: https://github.com/apache/flink/tree/master/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc There's no JDBCRetractTableSink, only append and upsert. I am confused why the MySQL record can be deleted. Thanks, Lei wangl...@geekplus.c

Re: Re: Where can i find MySQL retract stream table sink java soure code?

2020-03-24 Thread wangl...@geekplus.com.cn
Thanks Jingsong. When executing this sql, the mysql table record can be deleted. So i guess it is a retract stream. I want to know the exactly java code it is generated and have a look at it. Thanks, Lei wangl...@geekplus.com.cn Sender: Jingsong Li Send Time: 2020-03-25 11:14 Receiver

Where can i find MySQL retract stream table sink java soure code?

2020-03-24 Thread wangl...@geekplus.com.cn
) GROUP BY statusI think this is a retract stream. But where can i find the java source code about MySQL retract table sink? Thanks,Lei wangl...@geekplus.com.cn

Streaming kafka data sink to hive

2020-03-19 Thread wangl...@geekplus.com.cn
hanks, Lei wangl...@geekplus.com.cn

Re: Re: flink sql-client read hive orc table no result

2020-03-18 Thread wangl...@geekplus.com.cn
Seems the root cause is "transactional"='true'. After remove this,the table can be queryed from flink sql-client,even i add "clustered by (robot_id) into 3 buckets" again. Thanks, Lei wangl...@geekplus.com.cn From: Kurt Young Date: 2020-03-18 18:04 To: wang

Re: Re: flink sql-client read hive orc table no result

2020-03-18 Thread wangl...@geekplus.com.cn
Tried again. Even i remove the "clustered by (robot_id) into 3 buckets" statement, no result from flink sql-client Thanks, Lei wangl...@geekplus.com.cn From: Kurt Young Date: 2020-03-18 17:41 To: wangl...@geekplus.com.cn; lirui CC: user Subject: Re: flink sql-client read hive or

flink sql-client read hive orc table no result

2020-03-18 Thread wangl...@geekplus.com.cn
; Under hive client, insert into one record and then select there will be the result to the console. But under flink sql-client, when select * from robot_tr, there's no result? Any insight on this? Thanks, Lei wangl...@geekplus.com.cn

Re: Re: dimention table join not work under sql-client fink-1.10.0

2020-03-12 Thread wangl...@geekplus.com.cn
Thanks, works now. Seems the open source version is different from alibaba cloud: https://www.alibabacloud.com/help/doc-detail/62506.htm wangl...@geekplus.com.cn From: Zhenghua Gao Date: 2020-03-13 12:12 To: wangl...@geekplus.com.cn CC: user Subject: Re: dimention table join not work

dimention table join not work under sql-client fink-1.10.0

2020-03-12 Thread wangl...@geekplus.com.cn
or: [ERROR] Could not execute SQL statement. Reason: org.apache.flink.table.api.TableException: Temporal table join currently only supports 'FOR SYSTEM_TIME AS OF' left table's proctime field, doesn't support 'PROCTIME()' Thanks, Lei wangl...@geekplus.com.cn

Re: Re: How to set stateBackEnd in flink sql program?

2020-03-11 Thread wangl...@geekplus.com.cn
;,"rocksdb");eEnv.sqlUpdate(..) Thanks, Lei wangl...@geekplus.com.cn From: Jingsong Li Date: 2020-03-12 11:32 To: wangl...@geekplus.com.cn CC: user Subject: Re: How to set stateBackEnd in flink sql program? Hi wanglei, If you are using Flink 1.10, you can set "

How to set stateBackEnd in flink sql program?

2020-03-11 Thread wangl...@geekplus.com.cn
env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStateBackend(new RocksDBStateBackend(args[0], true)); wangl...@geekplus.com.cn

Re: Re: json 中 timestamp 类型在json中怎样写才能被 flink sql 识别

2020-03-11 Thread wangl...@geekplus.com.cn
Thanks Jark, No word to express my '囧'. wangl...@geekplus.com.cn Sender: Jark Wu Send Time: 2020-03-11 18:32 Receiver: wangl...@geekplus.com.cn cc: user; user-zh Subject: Re: json 中 timestamp 类型在json中怎样写才能被 flink sql 识别 Hi Lei, The "2020-03-11T13:00:00.123Z" format

回复: json 中 timestamp 类型在json中怎样写才能被 flink sql 识别

2020-03-11 Thread wangl...@geekplus.com.cn
3Z","owner_code":"WDTLEN04","status":90} But the input_date is not recognized on the sql-client and is null, even i tried 1583828700240 "2020-03-11 13:00:00" "2020-03-11 13:00:00.000" How should the timestamp(3) look like in the json mes

json 中 timestamp 类型在json中怎样写才能被 flink sql 识别

2020-03-11 Thread wangl...@geekplus.com.cn
ULL. 我把 发送的 input_date 改成 1583828700240 "2020-03-11 13:00:00" "2020-03-11 13:00:00.000" 也都不行。 这个 TIMESTAMP(3)在JSON 中应该写成什么样子呢? 谢谢, 王磊 wangl...@geekplus.com.cn

Re: Re: Dose flink-1.10 sql-client support kafka sink?

2020-03-11 Thread wangl...@geekplus.com.cn
flink-sql-connector-kafka_2.11-1.9.1.jar in flink-1.10 environment. Thanks, Lei wangl...@geekplus.com.cn From: wangl...@geekplus.com.cn Date: 2020-03-11 14:51 To: Jark Wu CC: Arvid Heise; user Subject: Re: Re: Dose flink-1.10 sql-client support kafka sink? Hi Jark, I have tried to use CREATE

Re: Re: Dose flink-1.10 sql-client support kafka sink?

2020-03-10 Thread wangl...@geekplus.com.cn
can select the result correctly Thanks, Lei From: Jark Wu Date: 2020-03-11 11:13 To: wangl...@geekplus.com.cn CC: Arvid Heise; user Subject: Re: Re: Dose flink-1.10 sql-client support kafka sink? Hi Lei, CREATE TABLE DDL [1][2] is the recommended way to register a table since 1.9. And the ya

Re: Re: Dose flink-1.10 sql-client support kafka sink?

2020-03-10 Thread wangl...@geekplus.com.cn
Thanks, works now. Seems it is because i added the schema: "ROW(out_order_code STRING,input_date BIGINT, owner_code STRING, status INT)" under format label. From: Arvid Heise Date: 2020-03-10 20:51 To: wangl...@geekplus.com.cn CC: user Subject: Re: Dose flink-1.10 sql-clie

Re: Re: Kafka sink only support append mode?

2020-03-10 Thread wangl...@geekplus.com.cn
appendable. It confused me. Thanks, Lei From: Jark Wu Date: 2020-03-09 19:25 To: wangl...@geekplus.com.cn CC: user Subject: Re: Kafka sink only support append mode? Hi Lei, Yes. Currently, Kafka sink only supports append mode. Other update mode (e.g. upsert mode / retract mode) is on the agenda. For

Dose flink-1.10 sql-client support kafka sink?

2020-03-10 Thread wangl...@geekplus.com.cn
I have configured source table successfully using the following configuration: - name: out_order type: source update-mode: append schema: - name: out_order_code type: STRING - name: input_date type: BIGINT - name: owner_code type: STRING connector:

Kafka sink only support append mode?

2020-03-09 Thread wangl...@geekplus.com.cn
I wrote a simple program reading from kafka using sql and sink to kafka. But only 'update-mode' = 'append' is supported for sink table and the query sql must have no group statement. Only append mode is supported for kafka sink? Thanks, Lei

How to use self defined json format when create table from kafka stream?

2020-03-04 Thread wangl...@geekplus.com.cn
_id" : 131936, "columns" : [ { "id" : 1, "name" : "order_id", "column_type" : -5, "last_value" : 4606458, "value" : 4606458 }, { "id" : 2, "name" : "order_no", "column_type" : 12, "last_value" : "EDBMFSJ1S2003050006628", "value" : "EDBMFSJ1S2003050006628" }] } Surely the format.type' = 'json',\n" will not parse the result as I expected. Is there any method I can implement this? For example, using a self defined format class. Thanks, Lei wangl...@geekplus.com.cn

Submit high version compiled code jar to low version flink cluster?

2019-12-29 Thread wangl...@geekplus.com.cn
The flink cluster version is 1.8.2 The application source code needs some feature only supported in 1.9.1. So it is compiled with flink-1.9.1 denendency and builed to a fat jar with all the flink dependencies. What it will happen if I submit the high version builed jar to the low verion flink

Re: Re: IOException when using Prometheus Monitor

2019-11-20 Thread wangl...@geekplus.com.cn
After downgrade the pushgateway to pushgateway-0.8.0.linux-amd64.tar.gz, no this Exception again. Thanks very much. wangl...@geekplus.com.cn From: wangl...@geekplus.com.cn Date: 2019-11-20 18:19 To: Chesnay Schepler; user Subject: Re: Re: IOException when using Prometheus Monitor Hi

Re: Re: IOException when using Prometheus Monitor

2019-11-20 Thread wangl...@geekplus.com.cn
entOS node, tar -xzvf and then start the process. Thanks, Lei wangl...@geekplus.com.cn From: Chesnay Schepler Date: 2019-11-20 17:46 To: wangl...@geekplus.com.cn; user Subject: Re: IOException when using Prometheus Monitor From what I found so far this appears to be an incompatibility betwee

IOException when using Prometheus Monitor

2019-11-20 Thread wangl...@geekplus.com.cn
r.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Any insight on this? Thanks, Lei wangl...@geekplus.com.cn

从 state 中恢复数据,更改 yarn container 个数会有影响吗

2019-11-04 Thread wangl...@geekplus.com.cn
从 RocketMQ 中消费数据做处理。 代码中最大的并行度为 8, 提交任务时指定 -ys 4 ,会自动分配 2 个 container 作为 taskMgr 运行一段时间后以 savepoint 方式停止。 再从 savepoint 恢复,此时指定 -ys 2 , 会分配 4 个container 作为 taskMgr , 但任务提交后程序不从 RocketMQ 消费数据了,消费 TPS 一直是 0,这是什么原因呢? 谢谢, 王磊 wangl...@geekplus.com.cn

Does RocksDBStateBackend need a separate RocksDB service?

2019-08-07 Thread wangl...@geekplus.com.cn
In my code, I just setStateBackend with a hdfs direcoty. env.setStateBackend(new RocksDBStateBackend("hdfs://user/test/job")); Is there an embeded RocksDB service in the flink task? wangl...@geekplus.com.cn

Re: Re: Unable to restore state value after job failed using RocksDBStateBackend

2019-06-25 Thread wangl...@geekplus.com.cn
I start and cancel it just in my intellij idea development environment. First click the run button, then click the red stop button, and then click the run button again. Let me google about the savepoint. Thanks, Lei Wang wangl...@geekplus.com.cn From: Stephan Ewen Date: 2019-06-25

Unable to restore state value after job failed using RocksDBStateBackend

2019-06-25 Thread wangl...@geekplus.com.cn
ateDescriptor>("avg", TypeInformation.of( new TypeHint>() {})); state = getRuntimeContext().getState(descriptor); } } Every time I restarted the job, The stateValue is still null. wangl...@geekplus.com.cn

How to trigger the window function even there's no message input in this window?

2019-06-14 Thread wangl...@geekplus.com.cn
windowAll(TumblingProcessingTimeWindows.of(Time.seconds(10))).process(new MyProcessWindowFunction());How can i trigger the MyProcessWindowFunction even there's no input during this window time? wangl...@geekplus.com.cn

Re: Re: How can i just implement a crontab function using flink?

2019-06-14 Thread wangl...@geekplus.com.cn
rmark(System.currentTimeMillis() - 1); } @Override public long extractTimestamp(Map map, long l) { return System.currentTimeMillis(); } }).windowAll(TumblingEventTimeWindows.of(Time.seconds(10))).process(new MyProcessWindowFunction()); wangl...@geekplus.com.cn From: Puneet

Re: Re: How can I add config file as classpath in taskmgr node when submitting a flink job?

2019-05-28 Thread wangl...@geekplus.com.cn
Thanks. Let me have a try wangl...@geekplus.com.cn From: Yang Wang Date: 2019-05-28 09:47 To: wangl...@geekplus.com.cn CC: user Subject: Re: How can I add config file as classpath in taskmgr node when submitting a flink job? Hi, wangleiYou could use the flink distributed cache to register

How can I add config file as classpath in taskmgr node when submitting a flink job?

2019-05-25 Thread wangl...@geekplus.com.cn
When starting a single node java application, I can add some config file to it. How can i implenment it when submitting a flink job? The config file need to be read from taskMgr node and used to initialize some classess. wangl...@geekplus.com.cn

Re: Re: How can i just implement a crontab function using flink?

2019-05-24 Thread wangl...@geekplus.com.cn
Thanks. got it wangl...@geekplus.com.cn From: Puneet Kinra Date: 2019-05-24 17:02 To: wangl...@geekplus.com.cn CC: user Subject: Re: How can i just implement a crontab function using flink? There is concept of periodic watermarker , you can use that if you are working on eventtime. On Fri

Re: Re: How can i just implement a crontab function using flink?

2019-05-24 Thread wangl...@geekplus.com.cn
Thanks, it's a alternative solution. wangl...@geekplus.com.cn From: Jörn Franke Date: 2019-05-24 16:31 To: wangl...@geekplus.com.cn CC: user Subject: Re: How can i just implement a crontab function using flink? Just sent a dummy event from the source system every minute Am 24.05.2019

How can i just implement a crontab function using flink?

2019-05-24 Thread wangl...@geekplus.com.cn
I want to do something every one minute. Using TumblingWindow, the function will not be triigged if there's no message received during this minute. But i still need to execute the funtion. How can i implement it ? wangl...@geekplus.com.cn