There's no start-cluster.bat and flink.bat in bin directory.
So how can i start flink on windowns OS?
Thanks,
Lei
wangl...@geekplus.com.cn
Seems there's no direct solution.
Perhaps i can implement this by initializing a HashMap with all
the possible value of tableName in `open` mehtod and get the corresponding
Meter according to tableName in the `invoke` method.
Thanks,
Lei
wangl...@geekplus.com.cn
Sender:
inlog info. I want to monitor the qps by tableName. The
tableName is different for every record.
Thanks,
Lei
wangl...@geekplus.com.cn
Sender: Xintong Song
Send Time: 2020-07-03 13:14
Receiver: wangl...@geekplus.com.cn
cc: user
Subject: Re: How to dynamically initialize flink metrics in i
.
How can i reuse the metric initialized before?
Thanks,
Lei
wangl...@geekplus.com.cn
_order
There's error:
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.TableException: Provided trait [BEFORE_AND_AFTER]
can't satisfy required trait [ONLY_UPDATE_AFTER]. This is a bug in planner,
please file an issue.
Current node is TableSourceScan(table
Thanks Jingsong,
Is there any document or example to this?
I will build the flink-1.11 package and have a try.
Thanks,
Lei
wangl...@geekplus.com.cn
From: Jingsong Li
Date: 2020-06-30 10:08
To: wangl...@geekplus.com.cn
CC: user
Subject: Re: Flip-105 can the debezium/canal SQL sink to
deleted in the mysql_sink_table?
INSERT INTO mysql_sink_table SELECT id, first_name, last_name FROM my_table;
Thanks,
Lei
wangl...@geekplus.com.cn
It is because the jar conflict and i have fixed it.
I put flink-connector-kafka_2.11-1.10.0.jar in the flink lib directory.
Also in my project pom file has the dependency flink-connector-kafka and
builded as a fat jar
Thanks,
Lei
wangl...@geekplus.com.cn
发件人: Leonard Xu
发送时间: 2020-05
ma.serialize(Ljava/lang/Object;Ljava/lang/Long;)Lorg/apache/flink/kafka/shaded/org/apache/kafka/clients/producer/ProducerRecord;
Any suggestion on this?
Thanks,
Lei
wangl...@geekplus.com.cn
as
hdfs://active-namenode-ip:8020. But this way lost namenode HA
Is there's any method that I can config it as hdfs://name-service:8020
Thanks,
Lei
wangl...@geekplus.com.cn
first start-cluster(only job manager started).
And then “taskmanager.sh start“ to start the taskMgr. No password needed.
Thanks,
Lei
wangl...@geekplus.com.cn
Sender: Xintong Song
Send Time: 2020-04-30 12:13
Receiver: wangl...@geekplus.com.cn
cc: user
Subject: Re: 1.11 snapshot: Name or
t_3.6.
: Name or service not knownname localhost
Only the jobMgr is started. The taskMgr not start
Any insight on this?
Thanks
Lei
wangl...@geekplus.com.cn
/java/org/apache/flink/api/java/io/jdbc
没有 Retract 方式
实际上使用了 JDBCUpsertTableSink.java 的代码写入 MySQL 吗?
如若不带 group by 直接:
INSERT INTO mysql_sink SELECT f1, f2 FROM kafka_src
主键冲突写入 mysql 是会出错的,怎么可以用 Upsert 的方式直接覆盖呢?
wangl...@geekplus.com.cn
Thanks, I have tried.
'format.derive-schema' = 'true' will work.
But if i insist to use format.json-schema, the CREATE TABLE must be writtten
as:
`id` DECIMAL(38,18),
`timestamp` DECIMAL(38,18)
wangl...@geekplus.com.cn
From: Benchao Li
Date: 2020-0
nteger"},
"timestamp": {"type": "number"}
}
}'
);
Then select * from user_log;
org.apache.flink.table.api.ValidationException: Type INT of table field 'id'
does not match with the physical type LEGACY('DECIMAL', 'DECIMAL') of the 'id'
field of the TableSource return type.
Seems the specified type "integer", "number" can not be mapped to INT, BIGINT
How can i solve this problem?
Thanks,
Lei
wangl...@geekplus.com.cn
https://issues.apache.org/jira/browse/FLINK-17086
It is my first time to create a flink jira issue.
Just point it out and correct it if I write something wrong.
Thanks,
Lei
wangl...@geekplus.com.cn
From: Jingsong Li
Date: 2020-04-10 11:03
To: wangl...@geekplus.com.cn
CC: Jark Wu; lirui
,
`linearcmdspeed` int, `angluarcmdspeed` int, `liftercmdspeed` int,
`rotatorcmdspeed` int) PARTITIONED BY (`hour` string) STORED AS parquet;
Thanks,
Lei
wangl...@geekplus.com.cn
From: Jingsong Li
Date: 2020-04-09 21:45
To: wangl...@geekplus.com.cn
CC: Jark Wu; lirui; user
Subject: Re: Re: fink
`robotid` int, `robottime` bigint ) and
CREATE TABLE `robotparquet`( `robottime` bigint, `robotid` int)
is the same for hive client, but is different for flink-sql client
It is an expected behavior?
Thanks,
Lei
wangl...@geekplus.com.cn
From: Jark Wu
Date: 2020-04-09 14:48
To: wangl...@ge
0
Any insight on this?
Thanks,
Lei
wangl...@geekplus.com.cn
Thanks Jingsong.
So JDBCTableSink now suport append and upsert mode. Retract mode not
available yet. It is right?
Thanks,
Lei
wangl...@geekplus.com.cn
Sender: Jingsong Li
Send Time: 2020-03-25 11:39
Receiver: wangl...@geekplus.com.cn
cc: user
Subject: Re: Where can i find MySQL
Seems it is here:
https://github.com/apache/flink/tree/master/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc
There's no JDBCRetractTableSink, only append and upsert.
I am confused why the MySQL record can be deleted.
Thanks,
Lei
wangl...@geekplus.c
Thanks Jingsong.
When executing this sql, the mysql table record can be deleted. So i guess it
is a retract stream.
I want to know the exactly java code it is generated and have a look at it.
Thanks,
Lei
wangl...@geekplus.com.cn
Sender: Jingsong Li
Send Time: 2020-03-25 11:14
Receiver
)
GROUP BY statusI think this is a retract stream. But
where can i find the java source code about MySQL retract table sink?
Thanks,Lei
wangl...@geekplus.com.cn
hanks,
Lei
wangl...@geekplus.com.cn
Seems the root cause is "transactional"='true'.
After remove this,the table can be queryed from flink sql-client,even i add
"clustered by (robot_id) into 3 buckets" again.
Thanks,
Lei
wangl...@geekplus.com.cn
From: Kurt Young
Date: 2020-03-18 18:04
To: wang
Tried again. Even i remove the "clustered by (robot_id) into 3 buckets"
statement, no result from flink sql-client
Thanks,
Lei
wangl...@geekplus.com.cn
From: Kurt Young
Date: 2020-03-18 17:41
To: wangl...@geekplus.com.cn; lirui
CC: user
Subject: Re: flink sql-client read hive or
;
Under hive client, insert into one record and then select there will be the
result to the console.
But under flink sql-client, when select * from robot_tr, there's no result?
Any insight on this?
Thanks,
Lei
wangl...@geekplus.com.cn
Thanks, works now.
Seems the open source version is different from alibaba cloud:
https://www.alibabacloud.com/help/doc-detail/62506.htm
wangl...@geekplus.com.cn
From: Zhenghua Gao
Date: 2020-03-13 12:12
To: wangl...@geekplus.com.cn
CC: user
Subject: Re: dimention table join not work
or:
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.TableException: Temporal table join currently only
supports 'FOR SYSTEM_TIME AS OF' left table's proctime field, doesn't support
'PROCTIME()'
Thanks,
Lei
wangl...@geekplus.com.cn
;,"rocksdb");eEnv.sqlUpdate(..)
Thanks,
Lei
wangl...@geekplus.com.cn
From: Jingsong Li
Date: 2020-03-12 11:32
To: wangl...@geekplus.com.cn
CC: user
Subject: Re: How to set stateBackEnd in flink sql program?
Hi wanglei,
If you are using Flink 1.10, you can set "
env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new RocksDBStateBackend(args[0], true));
wangl...@geekplus.com.cn
Thanks Jark,
No word to express my '囧'.
wangl...@geekplus.com.cn
Sender: Jark Wu
Send Time: 2020-03-11 18:32
Receiver: wangl...@geekplus.com.cn
cc: user; user-zh
Subject: Re: json 中 timestamp 类型在json中怎样写才能被 flink sql 识别
Hi Lei,
The "2020-03-11T13:00:00.123Z" format
3Z","owner_code":"WDTLEN04","status":90}
But the input_date is not recognized on the sql-client and is null, even i
tried 1583828700240 "2020-03-11 13:00:00" "2020-03-11 13:00:00.000"
How should the timestamp(3) look like in the json mes
ULL.
我把 发送的 input_date 改成 1583828700240 "2020-03-11 13:00:00" "2020-03-11
13:00:00.000" 也都不行。
这个 TIMESTAMP(3)在JSON 中应该写成什么样子呢?
谢谢,
王磊
wangl...@geekplus.com.cn
flink-sql-connector-kafka_2.11-1.9.1.jar in flink-1.10 environment.
Thanks,
Lei
wangl...@geekplus.com.cn
From: wangl...@geekplus.com.cn
Date: 2020-03-11 14:51
To: Jark Wu
CC: Arvid Heise; user
Subject: Re: Re: Dose flink-1.10 sql-client support kafka sink?
Hi Jark,
I have tried to use CREATE
can select the result correctly
Thanks,
Lei
From: Jark Wu
Date: 2020-03-11 11:13
To: wangl...@geekplus.com.cn
CC: Arvid Heise; user
Subject: Re: Re: Dose flink-1.10 sql-client support kafka sink?
Hi Lei,
CREATE TABLE DDL [1][2] is the recommended way to register a table since 1.9.
And the ya
Thanks, works now.
Seems it is because i added the
schema: "ROW(out_order_code STRING,input_date BIGINT, owner_code STRING,
status INT)"
under format label.
From: Arvid Heise
Date: 2020-03-10 20:51
To: wangl...@geekplus.com.cn
CC: user
Subject: Re: Dose flink-1.10 sql-clie
appendable. It confused me.
Thanks,
Lei
From: Jark Wu
Date: 2020-03-09 19:25
To: wangl...@geekplus.com.cn
CC: user
Subject: Re: Kafka sink only support append mode?
Hi Lei,
Yes. Currently, Kafka sink only supports append mode. Other update mode (e.g.
upsert mode / retract mode) is on the agenda.
For
I have configured source table successfully using the following configuration:
- name: out_order
type: source
update-mode: append
schema:
- name: out_order_code
type: STRING
- name: input_date
type: BIGINT
- name: owner_code
type: STRING
connector:
I wrote a simple program reading from kafka using sql and sink to kafka.
But only 'update-mode' = 'append' is supported for sink table and the query
sql must have no group statement.
Only append mode is supported for kafka sink?
Thanks,
Lei
_id" : 131936,
"columns" : [ {
"id" : 1,
"name" : "order_id",
"column_type" : -5,
"last_value" : 4606458,
"value" : 4606458
}, {
"id" : 2,
"name" : "order_no",
"column_type" : 12,
"last_value" : "EDBMFSJ1S2003050006628",
"value" : "EDBMFSJ1S2003050006628"
}]
}
Surely the format.type' = 'json',\n" will not parse the result as I expected.
Is there any method I can implement this? For example, using a self defined
format class.
Thanks,
Lei
wangl...@geekplus.com.cn
The flink cluster version is 1.8.2
The application source code needs some feature only supported in 1.9.1. So it
is compiled with flink-1.9.1 denendency and builed to a fat jar with all the
flink dependencies.
What it will happen if I submit the high version builed jar to the low verion
flink
After downgrade the pushgateway to pushgateway-0.8.0.linux-amd64.tar.gz, no
this Exception again.
Thanks very much.
wangl...@geekplus.com.cn
From: wangl...@geekplus.com.cn
Date: 2019-11-20 18:19
To: Chesnay Schepler; user
Subject: Re: Re: IOException when using Prometheus Monitor
Hi
entOS node, tar -xzvf and then start the
process.
Thanks,
Lei
wangl...@geekplus.com.cn
From: Chesnay Schepler
Date: 2019-11-20 17:46
To: wangl...@geekplus.com.cn; user
Subject: Re: IOException when using Prometheus Monitor
From what I found so far this appears to be an incompatibility betwee
r.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Any insight on this?
Thanks,
Lei
wangl...@geekplus.com.cn
从 RocketMQ 中消费数据做处理。
代码中最大的并行度为 8, 提交任务时指定 -ys 4 ,会自动分配 2 个 container 作为 taskMgr
运行一段时间后以 savepoint 方式停止。
再从 savepoint 恢复,此时指定 -ys 2 , 会分配 4 个container 作为 taskMgr , 但任务提交后程序不从 RocketMQ
消费数据了,消费 TPS 一直是 0,这是什么原因呢?
谢谢,
王磊
wangl...@geekplus.com.cn
In my code, I just setStateBackend with a hdfs direcoty.
env.setStateBackend(new
RocksDBStateBackend("hdfs://user/test/job"));
Is there an embeded RocksDB service in the flink task?
wangl...@geekplus.com.cn
I start and cancel it just in my intellij idea development environment.
First click the run button, then click the red stop button, and then click the
run button again.
Let me google about the savepoint.
Thanks,
Lei Wang
wangl...@geekplus.com.cn
From: Stephan Ewen
Date: 2019-06-25
ateDescriptor>("avg", TypeInformation.of(
new TypeHint>() {}));
state = getRuntimeContext().getState(descriptor);
}
}
Every time I restarted the job, The stateValue is still null.
wangl...@geekplus.com.cn
windowAll(TumblingProcessingTimeWindows.of(Time.seconds(10))).process(new
MyProcessWindowFunction());How can i trigger the MyProcessWindowFunction even
there's no input during this window time?
wangl...@geekplus.com.cn
rmark(System.currentTimeMillis() - 1);
}
@Override
public long extractTimestamp(Map map, long l) {
return System.currentTimeMillis();
}
}).windowAll(TumblingEventTimeWindows.of(Time.seconds(10))).process(new
MyProcessWindowFunction());
wangl...@geekplus.com.cn
From: Puneet
Thanks. Let me have a try
wangl...@geekplus.com.cn
From: Yang Wang
Date: 2019-05-28 09:47
To: wangl...@geekplus.com.cn
CC: user
Subject: Re: How can I add config file as classpath in taskmgr node when
submitting a flink job?
Hi, wangleiYou could use the flink distributed cache to register
When starting a single node java application, I can add some config file to it.
How can i implenment it when submitting a flink job? The config file need to be
read from taskMgr node and used to initialize some classess.
wangl...@geekplus.com.cn
Thanks. got it
wangl...@geekplus.com.cn
From: Puneet Kinra
Date: 2019-05-24 17:02
To: wangl...@geekplus.com.cn
CC: user
Subject: Re: How can i just implement a crontab function using flink?
There is concept of periodic watermarker , you can use that
if you are working on eventtime.
On Fri
Thanks, it's a alternative solution.
wangl...@geekplus.com.cn
From: Jörn Franke
Date: 2019-05-24 16:31
To: wangl...@geekplus.com.cn
CC: user
Subject: Re: How can i just implement a crontab function using flink?
Just sent a dummy event from the source system every minute
Am 24.05.2019
I want to do something every one minute.
Using TumblingWindow, the function will not be triigged if there's no message
received during this minute. But i still need to execute the funtion.
How can i implement it ?
wangl...@geekplus.com.cn
56 matches
Mail list logo