应该是有内置的 UDF FROM_UNIXTIME 可以用的
Best,
tison.
Leonard Xu 于2020年4月22日周三 下午1:15写道:
> Hi
> 报错是因为'format.ignore-parse-errors'
> 参数是在社区最新的版本才支持的,FLINK-16725在1.11应该也会修复,如果需要使用的话可以等1.11发布后使用或者自己编译master分支,
> 即使有了这个参数你的问题也无法解决,对你的case每行记录都会解析错误所以会过滤掉所有数据。
> 建议你可以在数据源就转为标准的json格式或者写个udf将long转为timestamp后
能具体看一下报错吗?一般来说 Flink 自己需要的依赖都会 shaded 起来,不需要的传递依赖都应该 exclude 掉。暴露成 API
的类别一般需要封装或者使用稳定的接口。
这可能是一个工程上的问题,你可以具体罗列一下遇到的 JAR 包冲突问题,看一下怎么解。
Best,
tison.
宇张 于2020年4月22日周三 上午11:52写道:
> 在使用Flink1.10时,遇到最多的问题就是jar包冲突问题,okio这个包flink-parent引用的就有四个版本,还有一些没办法<
> exclusions>的包,请问社区有没有优化jar包冲突的提议。
>
Hi 首维,
非常开心我的回答能对你有所帮助。
1, 我感觉你这个想法其实就是把state backend 设置为filesystem的表现。因为在state
backend是filesystem的时候,state是放到内存的,也就是更新state是不需要序列化和反序列化的,性能相对要高一些。如果使用rocksdb的话,state就是放到rocksdb里了。我理解这里之所以是这样设计,是因为某些state本身会比较大,内存不一定放得下,此时如果用rocksdb作为state
backend,就不会占用heap的内存。
2, 其实这里的销毁的意思就是clear acc对应的state,这个
看了下好像 Apache Beam 就是干这个事情的,学习啦
hsdcl...@163.com
发件人: lec ssmi
发送时间: 2020-04-22 13:37
收件人: flink-user-cn
主题: Re: Re: spark代码直接运行至Flink平台
那还不如直接用apache beam直接将这些框架的API全部统一起来。
hsdcl...@163.com 于2020年4月22日周三 上午11:27写道:
> 降低用户迁移学习成本,两个框架有类似的地方,不知道比喻是否恰当,Flink平台 是否可以类似虚拟机,上层的流批应用如java,
> spar
Hi benchao,
感谢你的回复,完美解答了我的疑惑。然后我在看代码做实验的时候又派生了两个想法/问题:
1. 将accu作为一个单独的field在GroupAggFunction中,然后在snapshotState的时候才去向state更新
2. 为accu设置一个0/初始/空/幺元或者类似概念的状态,这样我们就不必去销毁acc而是去只是将其置回
或者说时区在设计这个部分的时候,有什么其他的考量吗
发件人: Benchao Li
发送时间: 2020年4月21日 18:28:09
收件人
那还不如直接用apache beam直接将这些框架的API全部统一起来。
hsdcl...@163.com 于2020年4月22日周三 上午11:27写道:
> 降低用户迁移学习成本,两个框架有类似的地方,不知道比喻是否恰当,Flink平台 是否可以类似虚拟机,上层的流批应用如java,
> spark就像Scala一样
>
>
>
>
> hsdcl...@163.com
>
> 发件人: Jeff Zhang
> 发送时间: 2020-04-22 10:52
> 收件人: user-zh
> 主题: Re: spark代码直接运行至Flink平台
> 啥目的 ?
>
> hsdcl
Hi
报错是因为'format.ignore-parse-errors'
参数是在社区最新的版本才支持的,FLINK-16725在1.11应该也会修复,如果需要使用的话可以等1.11发布后使用或者自己编译master分支,
即使有了这个参数你的问题也无法解决,对你的case每行记录都会解析错误所以会过滤掉所有数据。
建议你可以在数据源就转为标准的json格式或者写个udf将long转为timestamp后使用。
祝好,
Leonard Xu
> 在 2020年4月22日,12:33,王双利 写道:
>
> 要不你们再做一个fastjson版本的?
> 目前内部解析用的都是fast
要不你们再做一个fastjson版本的?
目前内部解析用的都是fastjson
发件人: 王双利
发送时间: 2020-04-22 12:31
收件人: user-zh
主题: 回复: Re: json中date类型解析失败
配置后报错误 ,
'format.ignore-parse-errors' = 'true'
这个参数需要怎么配置呢?
The matching candidates:
org.apache.flink.formats.json.JsonRowFormatFactory
Unsupported property keys:
fo
配置后报错误 ,
'format.ignore-parse-errors' = 'true'
这个参数需要怎么配置呢?
The matching candidates:
org.apache.flink.formats.json.JsonRowFormatFactory
Unsupported property keys:
format.ignore-parse-errors
WITH (
..
'format.type' = 'json',
'format.ignore-parse-errors' = 'true',
)
发件人
Hi,
flink支持的json format是遵循RFC标准[1]的,不支持从long型转化为json timestamp, json的
tiemstamp类型转化可以简单参考下,这个虽然符合标准,单对用户习惯来说确实不友好,目前社区也有一个jira[2]在跟进这个问题了。关于鲁棒性的问题,json
format有个参数支持跳过解析错误的记录,'format.ignore-parse-errors' = 'true'
```
Long time = System.currentTimeMillis();
DateFormat dateFormat = new SimpleDate
使用 flink-json -1.10.0 解析json数据报下面的错误
Caused by: java.time.format.DateTimeParseException: Text '1587527019680' could
not be parsed at index 0
经检查 是 以下字段导致的
{"jnlno":"e4574cce-8c9f-4d3f-974f-fc15250ec10d","ip":"122.96.41.218","channel":"pc","transdate":1587527019680,"event":"login","userid":"9",
在使用Flink1.10时,遇到最多的问题就是jar包冲突问题,okio这个包flink-parent引用的就有四个版本,还有一些没办法<
exclusions>的包,请问社区有没有优化jar包冲突的提议。
降低用户迁移学习成本,两个框架有类似的地方,不知道比喻是否恰当,Flink平台 是否可以类似虚拟机,上层的流批应用如java, spark就像Scala一样
hsdcl...@163.com
发件人: Jeff Zhang
发送时间: 2020-04-22 10:52
收件人: user-zh
主题: Re: spark代码直接运行至Flink平台
啥目的 ?
hsdcl...@163.com 于2020年4月22日周三 上午9:49写道:
> Hi,
> 有个脑洞大开的想法,有没有可能出一个模块功能,可以将用户写的spark代码直接运行在Flink
啥目的 ?
hsdcl...@163.com 于2020年4月22日周三 上午9:49写道:
> Hi,
> 有个脑洞大开的想法,有没有可能出一个模块功能,可以将用户写的spark代码直接运行在Flink平台
--
Best Regards
Jeff Zhang
社区版的 Planner 针对 Key 状态的清理,使用的 Timer 来进行清理。
1.9.1 Blink planner 最底层状态清理 还是使用的 StateTTLConfig 来进行清理(不是
Background),所以存在部分状态后面没读,
状态没有清理的情况
Benchao Li 于2020年4月21日周二 下午11:15写道:
> 我对checkpoint不是很熟悉。 不过这个错误看起来是因为修改了TtlConfig导致serializer不兼容导致的,可能不是很好解决。
>
> 酷酷的浑蛋 于2020年4月21日周二 下午10:37写道:
>
> > hello,我
Hi,
有个脑洞大开的想法,有没有可能出一个模块功能,可以将用户写的spark代码直接运行在Flink平台
我这边用kafka的AppendStream没什么问题,
改的是支持Retract模式的,KafkaTableSinkBase继承的是RetractStreamTableSink
基本是按照下面的链接的地址改的
https://www.cnblogs.com/Springmoon-venn/p/12652845.html
王双利
发件人: Leonard Xu
发送时间: 2020-04-22 09:03
收件人: user-zh
主题: Re: flink sql string char 不兼容?
Hi, 王双利
我试了下1.10.0的版本,没能复现你的异常, 如
Hi, 王双利
我试了下1.10.0的版本,没能复现你的异常, 如Jingsong Lees所说的char(n)到varchar已经支持了,
你能完整的贴下loginevent 的 sql吗?我再看看
祝好
Leonard Xu
> 在 2020年4月21日,22:22,Jingsong Li 写道:
>
> Hi,
>
> - 首先1.10中把char Insert到varchar中是支持的,可以再check下哪里有没有问题吗?
> - 'false'应该是char(5)而不是char(4)
>
> Best,
> Jingsong Lee
>
> On Tue, Apr
可以使用不同的 group.id 消费
i'mpossible <605769...@qq.com> 于2020年4月21日周二 下午6:12写道:
> Hi:
> Flink支持Subscribe模式吗?用的connector版本是
> flink-connector-kafka-0.11_2.11,0.11x;
> 因为业务需要,我想要优雅下线掉TopicB,即不中断事件流;执行结果发现当Flink服务和A服务指定同一个group.id
> ,同时消费TopicA时,kafka偏移量提交失败(开启了检查点);
>
>
> 感谢解答!!!
>
中文用户邮件列表可以看:http://apache-flink.147419.n8.nabble.com/
英文开发邮件列表可以看:http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/
英文用户邮件列表可以看:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
tison 于2020年4月21日周二 下午5:52写道:
> cc
>
>
> Leonard Xu 于2020年4月21日周二 下午5:03写道:
>
> >
我对checkpoint不是很熟悉。 不过这个错误看起来是因为修改了TtlConfig导致serializer不兼容导致的,可能不是很好解决。
酷酷的浑蛋 于2020年4月21日周二 下午10:37写道:
> hello,我按照您说的方式改了源码,增加了那两行代码,然后任务用savepoint停止,在从savepoint启动,就报下面的错:
> java.lang.RuntimeException: Error while getting state
> at
> org.apache.flink.runtime.state.DefaultKeyedStateStore.get
hello,我按照您说的方式改了源码,增加了那两行代码,然后任务用savepoint停止,在从savepoint启动,就报下面的错:
java.lang.RuntimeException: Error while getting state
at
org.apache.flink.runtime.state.DefaultKeyedStateStore.getMapState(DefaultKeyedStateStore.java:119)
at
org.apache.flink.streaming.api.operators.StreamingRuntimeContex
hello,我按照您说的方式改了源码,增加了那两行代码,然后任务用savepoint停止,在从savepoint启动,就报下面的错:
java.lang.RuntimeException: Error while getting state
at
org.apache.flink.runtime.state.DefaultKeyedStateStore.getMapState(DefaultKeyedStateStore.java:119)
at
org.apache.flink.streaming.api.operators.StreamingRuntimeContex
Hi,
- 首先1.10中把char Insert到varchar中是支持的,可以再check下哪里有没有问题吗?
- 'false'应该是char(5)而不是char(4)
Best,
Jingsong Lee
On Tue, Apr 21, 2020 at 9:01 PM Leonard Xu wrote:
> Hi
>
> CHAR(n) 、VARCHAR 在SQL语义里是不同的类型,SQL里写的 ‘false’ 常量会解析到CHAR(n)
> 因为常量长度已经确定会选择使用CHAR(n),
> 目前是Flink还不支持CHAR(n) 和 VARCHAR 类型之间的隐式转换,
Hi
CHAR(n) 、VARCHAR 在SQL语义里是不同的类型,SQL里写的 ‘false’ 常量会解析到CHAR(n)
因为常量长度已经确定会选择使用CHAR(n),
目前是Flink还不支持CHAR(n) 和 VARCHAR 类型之间的隐式转换,所以类型检查会报错,你可以先用CAST(‘false’ as
VARCHAR)后处理。
祝好
Leonard
> 在 2020年4月21日,18:32,王双利 写道:
>
> hit声明的是varchar,现在是,'false' 编译的时候认为是char(4) ,导致类型不匹配
>
>
>
> 王双利
>
> 发件人:
Hi 首维,
这里说的是同一个key会创建一次,所以你看到一个AggFunction会创建多次更这个并不冲突,因为一个AggFunction属于一个subtask维度的,一个subtask处理不止一个key的。
你的第二个问题:
> 我个人理解Flink Streaming SQL其实实现上物化视图的增量更新算法很像(这块不确定思考的正确性),那么在嵌套group
by或者接join后是不是会有更新放大(amplify)问题,同时由于这个问题导致中间状态暴露出来
这个的确是存在这个问题。比如两个group by嵌套使用,第一个group by由于会更新结果,所以会retract之前的结果
hit声明的是varchar,现在是,'false' 编译的时候认为是char(4) ,导致类型不匹配
王双利
发件人: Leonard Xu
发送时间: 2020-04-21 18:29
收件人: user-zh
主题: Re: flink sql string char 不兼容?
Hi
Sink 对应的字段(taskid\hit)需要声明成varchar, 现在不支持varchar类型写入char(n)
祝好,
Leonard Xu
> 在 2020年4月21日,18:20,王双利 写道:
>
> 下面的sql 执行的时候报 下面的错误CREATE TABLE
写成varchar应该就可以了。
王双利 于2020年4月21日周二 下午6:21写道:
> 下面的sql 执行的时候报 下面的错误CREATE TABLE target (
> jnlno VARCHAR,
> -- taskid char(9),
> -- hit char(4)
>taskid VARCHAR,
> hit VARCHAR
> )
> insert into target select a.jnlno,'11qeq','false' from loginevent a
>
> Exception in thre
Hi
Sink 对应的字段(taskid\hit)需要声明成varchar, 现在不支持varchar类型写入char(n)
祝好,
Leonard Xu
> 在 2020年4月21日,18:20,王双利 写道:
>
> 下面的sql 执行的时候报 下面的错误CREATE TABLE target (
>jnlno VARCHAR,
> -- taskid char(9),
> -- hit char(4)
> taskid VARCHAR,
>hit VARCHAR
> )
> insert into target select a.j
您说的jarFiles是以什么样的方式提交任务
然后我试了一下plugin,好像并不可以,重启flink cluster也不行 , 也不知是不是我的方式不对
我的目录结构是
xxx/flink/plugins/
folder1/
udf.jar
另外说一下,如果我把udf.jar放到
/flink/lib下,重启是可以的,不过这不是我想要的方式,不知道您是否理解,因为我想要的我随时可以写个udf.jar,随时可以用,不要重启flink
cluster
在 2020-04-21 17:46:00,"Arnold Zai" 写道:
>jarFiles参数不是个参数列表么,多传几个。
下面的sql 执行的时候报 下面的错误CREATE TABLE target (
jnlno VARCHAR,
-- taskid char(9),
-- hit char(4)
taskid VARCHAR,
hit VARCHAR
)
insert into target select a.jnlno,'11qeq','false' from loginevent a
Exception in thread "main" org.apache.flink.table.api.ValidationException: Type
ST
Hi??
FlinkSubscribeconnector??flink-connector-kafka-0.11_2.11??0.11x??
??TopicBFlink??A??group.id??TopicAkafka
??
Hi benchao,
非常感谢你的回复,我说一下我产生疑问的原因吧。首先我的朴素思路是createAccumlator针对一个group
by的一个key应该被创建一次,可是我做实验的时候(在create
acc的时候打印日志),我发现日志打印了不止一次,而且我确认是同一线程的同一AggFunction对象打印的。而且我也翻看了代码可以看到accumulator确实是以一个state方式存储的,每次都会先尝试从state取,就更不应该出现多次打印createAccmulator日志的问题了。
为了方便你帮我分析,我来补充一下环境和场景:
版本: 1.7.2/1.9
Hi
原因是因为新增字段或者修改字段类型后,新的serializer无法(反)序列化原先存储的数据,对于这种有字段增改需求的场景,目前Flink社区主要借助于Pojo或者avro来实现
[1],建议对相关的state schema做重新规划,以满足这种有后续升级需求的场景。
[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/schema_evolution.html
祝好
唐云
From: xyq
Sent: Tu
cc
Leonard Xu 于2020年4月21日周二 下午5:03写道:
> Hi,
> 订阅user-zh邮件邮件组即可收到该邮件组里的所有邮件,
> 可以发送任意内容的邮件到 user-zh-subscr...@flink.apache.org 订阅来自
> user-zh@flink.apache.org 邮件组的邮件
>
> 邮件组的订阅管理,可以参考[1]
>
> 祝好,
> Leonard Xu
> https://flink.apache.org/community.html#how-to-subscribe-to-a-mailing-list
>
> > 在 2
REST API jar run endpoint 不支持关联其他 jar 听起来是个问题。FatJar 是一种解决方案,这个可以提到 JIRA
上作为需求(x
Best,
tison.
Arnold Zai 于2020年4月21日周二 下午5:46写道:
> jarFiles参数不是个参数列表么,多传几个。
>
> 或把依赖提前部署到${FLINK_HOME}/plugins里
>
> chenxuying 于2020年4月21日周二 下午3:36写道:
>
> > 这个是可以 , 不过我们的需求不允许打FatJar
> >
> >
> >
> >
> >
> >
> >
>
jarFiles参数不是个参数列表么,多传几个。
或把依赖提前部署到${FLINK_HOME}/plugins里
chenxuying 于2020年4月21日周二 下午3:36写道:
> 这个是可以 , 不过我们的需求不允许打FatJar
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-04-21 15:27:48,"Arnold Zai" 写道:
> >打个FatJar
> >
> >chenxuying 于2020年4月21日周二 下午2:47写道:
> >
> >> 请问下目前flink的启动方式有哪些
> >> 1 通过命令行来执行
Hi 首维,
这是个很好的问题。
> 这个方法的调用时机是什么呢,会被调用几次呢?
这个调用的时机是每个key的第一条数据来的时候,会创建一个accumulator。创建的次数大约是key的数量。
当然这里说的是regular groupby;
如果是window group by的话,就是每个window都会做上面的这个事情。
> 一个accumulator的生命周期是怎么样的?
如果是window group by的话,那它的生命周期就是跟window是一样的。
如果是regular groupby的话,可以认为是全局的。除非有一条数据retract掉了当前的结果之后,等于被聚合的
Hi all,
最近有几个疑问没能很好地理解清楚:
我们都知道,UDAF中的有createAccumulator这个方法,那么:
这个方法的调用时机是什么呢,会被调用几次呢?
一个accumulator的生命周期是怎么样的?
一个accumulator会被反复的序列化反序列化吗?
麻烦了解相关细节的社区的同学们帮忙解答一下~
先谢谢啦
Hi,
订阅user-zh邮件邮件组即可收到该邮件组里的所有邮件,
可以发送任意内容的邮件到 user-zh-subscr...@flink.apache.org 订阅来自 user-zh@flink.apache.org
邮件组的邮件
邮件组的订阅管理,可以参考[1]
祝好,
Leonard Xu
https://flink.apache.org/community.html#how-to-subscribe-to-a-mailing-list
> 在 2020年4月21日,16:55,一袭月色 <1906286...@qq.com> 写道:
>
> 如何看到他人问题
Hello,图挂了,可以搞个图床了挂链接到邮件列表。。。
另外问下为什么不从最新的cp开始恢复作业呢?这样我理解会有脏数据吧。
> 在 2020年4月19日,23:23,Yun Gao 写道:
>
> Hello~ 想再确认一下预期的行为:现在是希望后面重新写之后,用新写过的part-xx来覆盖之前生成的文件么~?
>
>
> --
> From:酷酷的浑蛋
> Send Time:2020 Apr. 18 (Sat.) 20:32
> To:
这个是可以 , 不过我们的需求不允许打FatJar
在 2020-04-21 15:27:48,"Arnold Zai" 写道:
>打个FatJar
>
>chenxuying 于2020年4月21日周二 下午2:47写道:
>
>> 请问下目前flink的启动方式有哪些
>> 1 通过命令行来执行
>> flink run -C file:///usr/local/soft/flink/function-0.1.jar -c
>> cn.xxx.flink.table.sql.Job /usr/local/soft/flink/flink-1.0-SN
打个FatJar
chenxuying 于2020年4月21日周二 下午2:47写道:
> 请问下目前flink的启动方式有哪些
> 1 通过命令行来执行
> flink run -C file:///usr/local/soft/flink/function-0.1.jar -c
> cn.xxx.flink.table.sql.Job /usr/local/soft/flink/flink-1.0-SNAPSHOT.jar
> 2通过自带的webui页面上传jar , submit jar
> 3 通过代码 createRemoteEnvironment
>
> 目前主要使用的是,
44 matches
Mail list logo