UpsertKafka状态保存问题
大家好: 我在flink1.12.1上,通过SQL API测试upsertKafka,使用hdfs保存checkpoint数据,每30分钟进行一次checkpoint。kafka消息key和value均使用json格式。 持续写入300w不同主键的数据,checkpoint大小持续增加,最终生成save point时,大小接近300M。 请问UpsertKafka模式下,state中是否会一直保存所有的key?未被访问的key是否会被清空呢?
UpsertKafka state持续增加问题
大家好: 我在flink1.12.1上,通过SQL API测试upsertKafka,使用hdfs保存checkpoint数据,每30分钟进行一次checkpoint。kafka消息key和value均使用json格式。 持续写入300w不同主键的数据,checkpoint大小持续增加,最终生成save point时,大小接近300M。 请问UpsertKafka模式下,state中是否会一直保存所有的key?未被访问的key是否会被清空呢?
Re: 实时数仓场景落地问题
实时数仓落地建议先动手做一两个场景真实应用起来,见过好几个项目一开始目标定得过大,实时数仓、流批一体、数据管控啥的都规划进去,结果项目陷入无尽的扯皮,架构设计也如空中楼阁。 实践过程中不要太过于向已有数仓分层模型靠拢,从源系统直接拼接宽表到dws层就足以应付大部分需求了。下游应用再用MPP来满足业务层的实时聚合、BI需求。 等立了几个烟囱,自己项目的实时数仓怎么做也基本有了思路
Re: Flink如何做到动态关联join多张维度表中的n张表?
维表更新后要刷新历史的事实表吗?这个用flink来做的话,几乎不太可能实现,尤其是涉及到多个维表,相当于每次维表又更新了,就要从整个历史数据里面找到关联的数据,重新写入。不管是状态存储,还是更新数据量,需要的资源都太高,无法处理。 在我们目前的实时宽表应用里面,实时表部分一般都是流水类的,取到的维表信息,就应该是业务事实发生时的数据。 维表更新后刷新事实的,一般都是夜间批量再更新。如果有强实时更新需求的,只能在查询时再关联维表取最新值 王旭 于2024年6月16日周日 21:20写道: > 互相交流哈,我们也在做类似的改造 > 1.不确定需要关联几张维表的话,是否可以直接都关联了,然后再根据驱动数据中的字段判断要取哪几张维度表的数据,类似left join > > 2.维表变化后对应的结果表也要刷新这个场景,你有提到维表数据是亿级别,可想而知事实表数据更大,如果要反向关联全量事实表的数据,感觉不太适合用流处理;如果只是刷新部分的话,倒是可以将n天内的数据暂存至外部存储介质中 > > > > 回复的原邮件 > | 发件人 | 斗鱼<1227581...@qq.com.INVALID> | > | 日期 | 2024年06月16日 21:08 | > | 收件人 | user-zh | > | 抄送至 | | > | 主题 | 回复:Flink如何做到动态关联join多张维度表中的n张表? | > > 大佬,目前我们还处在调研阶段,SQL或datastream都可以,目前我们DWD或维度表设计是存在ClickHouse/Doris,目前在设计未来的架构,还没实现,只是想向各位大佬取经,麻烦大佬帮忙指教下 > > > 斗鱼 > 1227581...@qq.com > > > > > > > > > -- 原始邮件 -- > 发件人: > "user-zh" > < > xwwan...@163.com>; > 发送时间: 2024年6月16日(星期天) 晚上9:03 > 收件人: "user-zh" > 主题: 回复:Flink如何做到动态关联join多张维度表中的n张表? > > > > 你好,请问你们是用flink sql api还是datastream api实现这个场景的 > > > > 回复的原邮件 > | 发件人 | 斗鱼<1227581...@qq.com.INVALID> | > | 日期 | 2024年06月16日 20:35 | > | 收件人 | user-zh | 抄送至 | | > | 主题 | Flink如何做到动态关联join多张维度表中的n张表? | > 请教下各位大佬,目前我们遇到一个场景: > 1、需要往DWD事实表里写入数据的同时,往Kafka里面写该DWD表的记录信息,该信息 > 2、该Kafka信息会包含一个维度表数据类型的字符串数组 > > 3、Flink在做实时消费Kafka中数据,根据类型数组,关联不同的维度表,如数组包含【1,2】,则Flink读取Kafka消息后,将DWD的数据关联维度表1和维度表2后,写入DWS表 > > > > 想请问大佬如何实现根据该数组信息动态关联维度表,这些维度表数据量都挺大的,亿级别的数据,需要能满足维度表变化后,关联后的DWS数据也能变化,不知道是否有什么技术方案能实现,有的话麻烦大佬帮忙给个简单示例或者参考链接,感谢! > > > > > > > > | > | > 斗鱼 > 1227581...@qq.com > | >
Re: Flink如何做到动态关联join多张维度表中的n张表?
lookup join可以关联多张维表,但是维表的更新不会触发历史数据刷新。 多维表关联的时候,需要考虑多次关联导致的延迟,以及查询tps对维表数据库的压力。 斗鱼 <1227581...@qq.com.invalid> 于2024年6月19日周三 23:12写道: > 好的,感谢大佬的回复,之前有了解到Flink的Lookup join好像可以实现类似逻辑,只是不知道Lookup join会不会支持多张动态维度表呢? > > > 斗鱼 > 1227581...@qq.com > > > > > > > > > -- 原始邮件 -- > 发件人: > "user-zh" > < > xhzhang...@gmail.com>; > 发送时间: 2024年6月19日(星期三) 下午5:55 > 收件人: "user-zh" > 主题: Re: Flink如何做到动态关联join多张维度表中的n张表? > > > > > 维表更新后要刷新历史的事实表吗?这个用flink来做的话,几乎不太可能实现,尤其是涉及到多个维表,相当于每次维表又更新了,就要从整个历史数据里面找到关联的数据,重新写入。不管是状态存储,还是更新数据量,需要的资源都太高,无法处理。 > 在我们目前的实时宽表应用里面,实时表部分一般都是流水类的,取到的维表信息,就应该是业务事实发生时的数据。 > 维表更新后刷新事实的,一般都是夜间批量再更新。如果有强实时更新需求的,只能在查询时再关联维表取最新值 > > 王旭 > > 互相交流哈,我们也在做类似的改造 > > 1.不确定需要关联几张维表的话,是否可以直接都关联了,然后再根据驱动数据中的字段判断要取哪几张维度表的数据,类似left join > > > > > 2.维表变化后对应的结果表也要刷新这个场景,你有提到维表数据是亿级别,可想而知事实表数据更大,如果要反向关联全量事实表的数据,感觉不太适合用流处理;如果只是刷新部分的话,倒是可以将n天内的数据暂存至外部存储介质中 > > > > > > > > 回复的原邮件 > > | 发件人 | 斗鱼<1227581...@qq.com.INVALID> | > > | 日期 | 2024年06月16日 21:08 | > > | 收件人 | user-zh > | 抄送至 | | > > | 主题 | 回复:Flink如何做到动态关联join多张维度表中的n张表? | > > > > > 大佬,目前我们还处在调研阶段,SQL或datastream都可以,目前我们DWD或维度表设计是存在ClickHouse/Doris,目前在设计未来的架构,还没实现,只是想向各位大佬取经,麻烦大佬帮忙指教下 > > > > > > 斗鱼 > > 1227581...@qq.com > > > > > > > > > > > > > > > > > > -- 原始邮件 -- > > 发件人: > > > "user-zh" > > > < > > xwwan...@163.com>; > > 发送时间: 2024年6月16日(星期天) 晚上9:03 > > 收件人: "user-zh" > > > 主题: 回复:Flink如何做到动态关联join多张维度表中的n张表? > > > > > > > > 你好,请问你们是用flink sql api还是datastream api实现这个场景的 > > > > > > > > 回复的原邮件 > > | 发件人 | 斗鱼<1227581...@qq.com.INVALID> | > > | 日期 | 2024年06月16日 20:35 | > > | 收件人 | user-zh > | 抄送至 | | > > | 主题 | Flink如何做到动态关联join多张维度表中的n张表? | > > 请教下各位大佬,目前我们遇到一个场景: > > 1、需要往DWD事实表里写入数据的同时,往Kafka里面写该DWD表的记录信息,该信息 > > 2、该Kafka信息会包含一个维度表数据类型的字符串数组 > > > > > 3、Flink在做实时消费Kafka中数据,根据类型数组,关联不同的维度表,如数组包含【1,2】,则Flink读取Kafka消息后,将DWD的数据关联维度表1和维度表2后,写入DWS表 > > > > > > > > > 想请问大佬如何实现根据该数组信息动态关联维度表,这些维度表数据量都挺大的,亿级别的数据,需要能满足维度表变化后,关联后的DWS数据也能变化,不知道是否有什么技术方案能实现,有的话麻烦大佬帮忙给个简单示例或者参考链接,感谢! > > > > > > > > > > > > > > > > | > > | > > 斗鱼 > > 1227581...@qq.com > > | > >
Re: 使用hbase连接器插入数据,一个列族下有多列时如何只更新其中一列
flink在写入时需要所有DDL中定义的字段都必须被同时写入,不支持sql中只使用部分字段。 如果你确定只需写入部分数据,在DDL中只定义你用到的部分 zboyu0104 于2024年6月14日周五 15:43写道: > 怎么退订 > from 阿里邮箱 > iPhone-- > 发件人:谢县东 > 日 期:2024年06月06日 16:07:05 > 收件人: > 主 题:使用hbase连接器插入数据,一个列族下有多列时如何只更新其中一列 > > 各位好: > > > flink版本: 1.13.6 > 我在使用 flink-connector-hbase 连接器,通过flinkSQL 将数据写入hbase,hbase 建表如下: > > > CREATE TABLE hbase_test_db_test_table_xxd ( > rowkey STRING, > cf1 ROW, > PRIMARY KEY (rowkey) NOT ENFORCED > ) WITH ( > 'connector' = 'hbase-2.2', > 'table-name' = 'test_db:test_table_t1', > 'zookeeper.quorum' = 'xxx:2181', > 'zookeeper.znode.parent' = '/hbase', > 'null-string-literal' = '', > 'sink.parallelism' = '2' > ); > > > hbase cf1列族下有三列,看官网示例插入数据时需要构建一个row类型插入(row类型需包含列族下的所有列) > INSERT INTO hbase_test_db_test_table_xxd select '002' as rowkey, > row('xxd_2', 'boy', '10') as cf1; > > > > > 如果只想更新其中某一列如何实现?在flink中新建一个hbase表吗? > > > > > > > > >
Re: flink on yarn 模式,在jar任务中,怎么获取rest port
通过yarn提交时,提交成功后,yarn client会返回 application master的地址和端口,从返回信息里面获取就可以 wjw_bigdata 于2024年8月1日周四 14:24写道: > 退订 > > > > > > > 回复的原邮件 > | 发件人 | Lei Wang | > | 发送日期 | 2024年8月1日 14:08 | > | 收件人 | | > | 主题 | Re: flink on yarn 模式,在jar任务中,怎么获取rest port | > 在 flink-conf.yaml 中可以指定 rest.port, 可指定一个范围 > > > On Wed, Jul 31, 2024 at 8:44 PM melin li wrote: > > flink on yarn 模式, rest port 是随机的,需要获取rest port,有什么好办法? > >
Re: Flink SQL 中如何将回撤流转为append流
修改下游的sink connector,在execute的时候把-D、-U的record去掉
hbase 列设置TTL过期后,flink不能再写入数据
Flink:1.12.1 Flink-connector: 2.2 Hbase: 2.1.0 + CDH6.3.2 现象:如果hbase列族设置了TTL,当某一rowkey写入数据,到达过期时间,列族会被hbase标记为删除。 后续如果有相同key的数据过来,flink无法将数据写入到hbase中,查询hbase中列族一直为空。 执行的过程大致如下: 创建Hbase表,test, 两个列族 cf1 , TTL 60, cf2, TTL 120, 数据TTL分别为1分钟,2分钟。 使用sql写入数据至表中 insert into test select 'rowkey', ROW('123'), ROW('456') from sometable; 过一分钟后,通过hbase 查询,可发现无cf1数据,两分钟后该rowkey无对应数据。 此时再通过flink写入数据,发现无法写入,且flink不报错 请问这个情况是Bug,还是Hbase的问题呢?
HOP窗口较短导致checkpoint失败
FLink:1.12.1 源: kafka create table dev_log ( devid, ip, op_ts ) with ( connector = kafka ) sink: Hbase connect 2.2 目前用flink sql的hop window开发一个指标,统计近24小时的设备关联ip数。设置30min一次checkpoint,超时时间30min。 执行SQL如下 insert into h_table select devid as rowkey row(hop_end, ip_cnt) from ( select devid, hop_end(op_ts, INTERVAL '1' MINUTE, INTERVAL '24' HOUR) as hop_end, count(distinct(ip)) as ip_cnt from dev_logs group by hop(op_ts, INTERVAL '1' MINUTE, INTERVAL '24' HOUR), devid ) 测试中发现任务运行大约3个小时后,就会出现checkpoint失败,任务反复重启。 实际上数据量并不大,测试数据是1s/条输入,一个窗口输出大约只有4000条,成功的checkpoint不超过50M。 修改为10分钟的滑动步长就可以正常执行,但是延迟就比较高了。 请问有什么办法可以排查是哪里出的问题?有什么优化的方法呢
Re: HOP窗口较短导致checkpoint失败
checkpoint的状态大约只有50M左右就会开始出现cp失败的问题。如果失败了,尝试停止任务生成savepoint基本也不能成功。但同时运行的其他任务,cp在300M左右, save point 1G左右的就很顺利,基本不会出问题。 因为实际的数据压力并不是很大,如果单纯增加并行度,是否能在窗口多的情况下有比较明显的改善呢? Caizhi Weng 于2021年9月22日周三 上午11:27写道: > Hi! > > 24 小时且步长 1 分钟的 window 会由于数据不断累积而导致 cp 越来越大,越来越慢,最终超时。当然如果运算太慢导致 cp 被 back > pressure 也有可能导致 cp 超时。开启 mini batch 可以加快 window 的运算速度,但这么长时间而且这么频繁的 window > 目前确实没有什么很好的优化方法,仍然建议扩大并发以分担计算以及 cp 的压力。 > > xiaohui zhang 于2021年9月18日周六 上午9:54写道: > > > FLink:1.12.1 > > > > 源: kafka > > create table dev_log ( > > devid, > > ip, > > op_ts > > ) with ( > > connector = kafka > > ) > > > > sink: Hbase connect 2.2 > > > > 目前用flink sql的hop > > window开发一个指标,统计近24小时的设备关联ip数。设置30min一次checkpoint,超时时间30min。 > > 执行SQL如下 > > insert into h_table > > select > > devid as rowkey > > row(hop_end, ip_cnt) > > from ( > > select > > devid, > > hop_end(op_ts, INTERVAL '1' MINUTE, INTERVAL '24' HOUR) as hop_end, > > count(distinct(ip)) as ip_cnt > > from > > dev_logs > > group by > >hop(op_ts, INTERVAL '1' MINUTE, INTERVAL '24' HOUR), > > devid > > ) > > > > 测试中发现任务运行大约3个小时后,就会出现checkpoint失败,任务反复重启。 > > 实际上数据量并不大,测试数据是1s/条输入,一个窗口输出大约只有4000条,成功的checkpoint不超过50M。 > > 修改为10分钟的滑动步长就可以正常执行,但是延迟就比较高了。 > > 请问有什么办法可以排查是哪里出的问题?有什么优化的方法呢 > > >
flink sql-client kerberos认证不通过
* Fink版本: 1.16 * 部署:docker集群, 2个jm,3个tm高可用 * HADOOP: CDP7集群,启用kerberos * Flink配置:在conf/fink-conf.yml中添加security相关参数 security.kerberos.login.use-ticket-cache: true security.kerberos.login.keytab: /opt/flink/flink.keytab security.kerberos.login.principal: *fl...@hadoop.com * * 现象: flink集群可以正常连接cdp集群的zookeeper、hdfs等服务,实现高可用,写入checkpoint信息。使用sql-client连接hive,创建catalog时,报错GSS认证失败 sql-client日志报错信息: 2023-01-29 16:24:16,340 DEBUG org.apache.hadoop.ipc.Client [] - closing ipc connection to cdlab01/172.16.16.150:8020: javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)] java.io.IOException: javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)] at org.apache.hadoop.ipc.Client$Connection$1.run(Client.java:755) ~[flink-shaded-hadoop-2-uber-2.8.3-10.0.jar:2.8.3-10.0] at java.security.AccessController.doPrivileged(Native Method) ~[?:1.8.0_352] at javax.security.auth.Subject.doAs(Subject.java:422) ~[?:1.8.0_352] at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836) ~[flink-shaded-hadoop-2-uber-2.8.3-10.0.jar:2.8.3-10.0] at org.apache.hadoop.ipc.Client$Connection.handleSaslConnectionFailure(Client.java:718) [flink-shaded-hadoop-2-uber-2.8.3-10.0.jar:2.8.3-10.0] at org.apache.hadoop.ipc.Client$Connection.setupIOstreams(Client.java:811) [flink-shaded-hadoop-2-uber-2.8.3-10.0.jar:2.8.3-10.0] at org.apache.hadoop.ipc.Client$Connection.access$3500(Client.java:410) [flink-shaded-hadoop-2-uber-2.8.3-10.0.jar:2.8.3-10.0] at org.apache.hadoop.ipc.Client.getConnection(Client.java:1550) [flink-shaded-hadoop-2-uber-2.8.3-10.0.jar:2.8.3-10.0] at org.apache.hadoop.ipc.Client.call(Client.java:1381) [flink-shaded-hadoop-2-uber-2.8.3-10.0.jar:2.8.3-10.0] at org.apache.hadoop.ipc.Client.call(Client.java:1345) [flink-shaded-hadoop-2-uber-2.8.3-10.0.jar:2.8.3-10.0] at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:227) [flink-shaded-hadoop-2-uber-2.8.3-10.0.jar:2.8.3-10.0] at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:116) [flink-shaded-hadoop-2-uber-2.8.3-10.0.jar:2.8.3-10.0] at com.sun.proxy.$Proxy39.getFileInfo(Unknown Source) [?:?] at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getFileInfo(ClientNamenodeProtocolTranslatorPB.java:796) [flink-shaded-hadoop-2-uber-2.8.3-10.0.jar:2.8.3-10.0] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_352] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_352] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_352] at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_352] at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:409) [flink-shaded-hadoop-2-uber-2.8.3-10.0.jar:2.8.3-10.0] at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:163) [flink-shaded-hadoop-2-uber-2.8.3-10.0.jar:2.8.3-10.0] at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:155) [flink-shaded-hadoop-2-uber-2.8.3-10.0.jar:2.8.3-10.0] at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95) [flink-shaded-hadoop-2-uber-2.8.3-10.0.jar:2.8.3-10.0] at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:346) [flink-shaded-hadoop-2-uber-2.8.3-10.0.jar:2.8.3-10.0] at com.sun.proxy.$Proxy40.getFileInfo(Unknown Source) [?:?] at org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:1717) [flink-shaded-hadoop-2-uber-2.8.3-10.0.jar:2.8.3-10.0] at org.apache.hadoop.hdfs.DistributedFileSystem$27.doCall(DistributedFileSystem.java:1437) [flink-shaded-hadoop-2-uber-2.8.3-10.0.jar:2.8.3-10.0] at org.apache.hadoop.hdfs.DistributedFileSystem$27.doCall(DistributedFileSystem.java:1434) [flink-shaded-hadoop-2-uber-2.8.3-10.0.jar:2.8.3-10.0] at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) [flink-shaded-hadoop-2-uber-2.8.3-10.0.jar:2.8.3-10.0] at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1434) [flink-shaded-hadoop-2-uber-2.8.3-10.0.jar:2.8.3-10.0] at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1437) [flink-shaded-hadoop-2-uber-2.8.3-10.0.jar:2.8.3-10.0] at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.exists(HadoopFileSystem.java:165) [flink-dist-1.16.0.jar:1.16.0] at org.apache.flink.table.client.SqlClient.start(SqlClient.java:95) [flink-sql-client-1.16.0.jar:1.16.0] at org.apache.flink.table.client.SqlClient.s
Re: flink1.17.1版本 flink sql多表关联优化
这种join写法会随时更新里面每一个字段,最终产出结果的业务含义是什么呢? 如果是取每个vehicle_code对应的最新统计指标值,是否可以用支持partial update的存储,用多个单独的sql直接写入目前就可以了 周先明 于2023年8月4日周五 11:01写道: > Regular Join 默认把数据都存储在State中,通常会结合TTL来进行优化 > > guanyq 于2023年8月3日周四 15:59写道: > > > 请问下多个表关联,这种flink sql如何优化呢,直接关联优点跑不动RuntimeExecutionMode.STREAMING 模式 > > > > select > > date_format(a.create_time, '-MM-dd HH:mm:ss') as create_time, > > b.vehicle_code, > > a.item_name, > > a.item_value, > > c.item_value as vehicle_score, > > d.current_fault, > > e.history_fault, > > f.late_mileage, > > g.fault_level_event_count, > > h.current_fault_subsystem, > > i.history_fault_subsystem > > from fault_record_subsystem a > > join mtr_vehicle_use b on a.vehicle_id = b.vehicle_id > > join fault_record_vehicle c on a.vehicle_id = c.vehicle_id > > join fault_record_current_count d on a.vehicle_id = d.vehicle_id > > join fault_record_history_count e on a.vehicle_id = e.vehicle_id > > join vehicle_usage_score f on a.vehicle_id = f.vehicle_id > > join fault_record_level_event_count g on a.vehicle_id = g.vehicle_id > > join fault_record_current_count_subsystem h on a.vehicle_id = > h.vehicle_id > > and a.item_name = h.item_name > > join fault_record_history_count_subsystem i on a.vehicle_id = > i.vehicle_id > > and a.item_name = i.item_name >