UpsertKafka状态保存问题

2021-02-24 文章 xiaohui zhang
大家好:
我在flink1.12.1上,通过SQL 
API测试upsertKafka,使用hdfs保存checkpoint数据,每30分钟进行一次checkpoint。kafka消息key和value均使用json格式。
持续写入300w不同主键的数据,checkpoint大小持续增加,最终生成save point时,大小接近300M。
请问UpsertKafka模式下,state中是否会一直保存所有的key?未被访问的key是否会被清空呢?

UpsertKafka state持续增加问题

2021-02-24 文章 xiaohui zhang
大家好:
我在flink1.12.1上,通过SQL
API测试upsertKafka,使用hdfs保存checkpoint数据,每30分钟进行一次checkpoint。kafka消息key和value均使用json格式。
持续写入300w不同主键的数据,checkpoint大小持续增加,最终生成save point时,大小接近300M。
请问UpsertKafka模式下,state中是否会一直保存所有的key?未被访问的key是否会被清空呢?


Re: 实时数仓场景落地问题

2024-01-23 文章 xiaohui zhang
实时数仓落地建议先动手做一两个场景真实应用起来,见过好几个项目一开始目标定得过大,实时数仓、流批一体、数据管控啥的都规划进去,结果项目陷入无尽的扯皮,架构设计也如空中楼阁。
实践过程中不要太过于向已有数仓分层模型靠拢,从源系统直接拼接宽表到dws层就足以应付大部分需求了。下游应用再用MPP来满足业务层的实时聚合、BI需求。
等立了几个烟囱,自己项目的实时数仓怎么做也基本有了思路


Re: Flink如何做到动态关联join多张维度表中的n张表?

2024-06-19 文章 xiaohui zhang
维表更新后要刷新历史的事实表吗?这个用flink来做的话,几乎不太可能实现,尤其是涉及到多个维表,相当于每次维表又更新了,就要从整个历史数据里面找到关联的数据,重新写入。不管是状态存储,还是更新数据量,需要的资源都太高,无法处理。
在我们目前的实时宽表应用里面,实时表部分一般都是流水类的,取到的维表信息,就应该是业务事实发生时的数据。
维表更新后刷新事实的,一般都是夜间批量再更新。如果有强实时更新需求的,只能在查询时再关联维表取最新值

王旭  于2024年6月16日周日 21:20写道:

> 互相交流哈,我们也在做类似的改造
> 1.不确定需要关联几张维表的话,是否可以直接都关联了,然后再根据驱动数据中的字段判断要取哪几张维度表的数据,类似left join
>
> 2.维表变化后对应的结果表也要刷新这个场景,你有提到维表数据是亿级别,可想而知事实表数据更大,如果要反向关联全量事实表的数据,感觉不太适合用流处理;如果只是刷新部分的话,倒是可以将n天内的数据暂存至外部存储介质中
>
>
>
>  回复的原邮件 
> | 发件人 | 斗鱼<1227581...@qq.com.INVALID> |
> | 日期 | 2024年06月16日 21:08 |
> | 收件人 | user-zh |
> | 抄送至 | |
> | 主题 | 回复:Flink如何做到动态关联join多张维度表中的n张表? |
>
> 大佬,目前我们还处在调研阶段,SQL或datastream都可以,目前我们DWD或维度表设计是存在ClickHouse/Doris,目前在设计未来的架构,还没实现,只是想向各位大佬取经,麻烦大佬帮忙指教下
>
>
> 斗鱼
> 1227581...@qq.com
>
>
>
>  
>
>
>
>
> -- 原始邮件 --
> 发件人:
>   "user-zh"
> <
> xwwan...@163.com>;
> 发送时间: 2024年6月16日(星期天) 晚上9:03
> 收件人: "user-zh"
> 主题: 回复:Flink如何做到动态关联join多张维度表中的n张表?
>
>
>
> 你好,请问你们是用flink sql api还是datastream api实现这个场景的
>
>
>
>  回复的原邮件 
> | 发件人 | 斗鱼<1227581...@qq.com.INVALID> |
> | 日期 | 2024年06月16日 20:35 |
> | 收件人 | user-zh | 抄送至 | |
> | 主题 | Flink如何做到动态关联join多张维度表中的n张表? |
> 请教下各位大佬,目前我们遇到一个场景:
> 1、需要往DWD事实表里写入数据的同时,往Kafka里面写该DWD表的记录信息,该信息
> 2、该Kafka信息会包含一个维度表数据类型的字符串数组
>
> 3、Flink在做实时消费Kafka中数据,根据类型数组,关联不同的维度表,如数组包含【1,2】,则Flink读取Kafka消息后,将DWD的数据关联维度表1和维度表2后,写入DWS表
>
>
>
> 想请问大佬如何实现根据该数组信息动态关联维度表,这些维度表数据量都挺大的,亿级别的数据,需要能满足维度表变化后,关联后的DWS数据也能变化,不知道是否有什么技术方案能实现,有的话麻烦大佬帮忙给个简单示例或者参考链接,感谢!
>
>
>
>
>
>
>
> |
> |
> 斗鱼
> 1227581...@qq.com
> |
>  


Re: Flink如何做到动态关联join多张维度表中的n张表?

2024-06-19 文章 xiaohui zhang
lookup join可以关联多张维表,但是维表的更新不会触发历史数据刷新。
多维表关联的时候,需要考虑多次关联导致的延迟,以及查询tps对维表数据库的压力。

斗鱼 <1227581...@qq.com.invalid> 于2024年6月19日周三 23:12写道:

> 好的,感谢大佬的回复,之前有了解到Flink的Lookup join好像可以实现类似逻辑,只是不知道Lookup join会不会支持多张动态维度表呢?
>
>
> 斗鱼
> 1227581...@qq.com
>
>
>
>  
>
>
>
>
> -- 原始邮件 --
> 发件人:
>   "user-zh"
> <
> xhzhang...@gmail.com>;
> 发送时间: 2024年6月19日(星期三) 下午5:55
> 收件人: "user-zh"
> 主题: Re: Flink如何做到动态关联join多张维度表中的n张表?
>
>
>
>
> 维表更新后要刷新历史的事实表吗?这个用flink来做的话,几乎不太可能实现,尤其是涉及到多个维表,相当于每次维表又更新了,就要从整个历史数据里面找到关联的数据,重新写入。不管是状态存储,还是更新数据量,需要的资源都太高,无法处理。
> 在我们目前的实时宽表应用里面,实时表部分一般都是流水类的,取到的维表信息,就应该是业务事实发生时的数据。
> 维表更新后刷新事实的,一般都是夜间批量再更新。如果有强实时更新需求的,只能在查询时再关联维表取最新值
>
> 王旭 
> > 互相交流哈,我们也在做类似的改造
> > 1.不确定需要关联几张维表的话,是否可以直接都关联了,然后再根据驱动数据中的字段判断要取哪几张维度表的数据,类似left join
> >
> >
> 2.维表变化后对应的结果表也要刷新这个场景,你有提到维表数据是亿级别,可想而知事实表数据更大,如果要反向关联全量事实表的数据,感觉不太适合用流处理;如果只是刷新部分的话,倒是可以将n天内的数据暂存至外部存储介质中
> >
> >
> >
> >  回复的原邮件 
> > | 发件人 | 斗鱼<1227581...@qq.com.INVALID> |
> > | 日期 | 2024年06月16日 21:08 |
> > | 收件人 | user-zh > | 抄送至 | |
> > | 主题 | 回复:Flink如何做到动态关联join多张维度表中的n张表? |
> >
> >
> 大佬,目前我们还处在调研阶段,SQL或datastream都可以,目前我们DWD或维度表设计是存在ClickHouse/Doris,目前在设计未来的架构,还没实现,只是想向各位大佬取经,麻烦大佬帮忙指教下
> >
> >
> > 斗鱼
> > 1227581...@qq.com
> >
> >
> >
> >  
> >
> >
> >
> >
> > -- 原始邮件 --
> > 发件人:
> >  
> "user-zh"
> >
> <
> > xwwan...@163.com>;
> > 发送时间: 2024年6月16日(星期天) 晚上9:03
> > 收件人: "user-zh" >
> > 主题: 回复:Flink如何做到动态关联join多张维度表中的n张表?
> >
> >
> >
> > 你好,请问你们是用flink sql api还是datastream api实现这个场景的
> >
> >
> >
> >  回复的原邮件 
> > | 发件人 | 斗鱼<1227581...@qq.com.INVALID> |
> > | 日期 | 2024年06月16日 20:35 |
> > | 收件人 | user-zh > | 抄送至 | |
> > | 主题 | Flink如何做到动态关联join多张维度表中的n张表? |
> > 请教下各位大佬,目前我们遇到一个场景:
> > 1、需要往DWD事实表里写入数据的同时,往Kafka里面写该DWD表的记录信息,该信息
> > 2、该Kafka信息会包含一个维度表数据类型的字符串数组
> >
> >
> 3、Flink在做实时消费Kafka中数据,根据类型数组,关联不同的维度表,如数组包含【1,2】,则Flink读取Kafka消息后,将DWD的数据关联维度表1和维度表2后,写入DWS表
> >
> >
> >
> >
> 想请问大佬如何实现根据该数组信息动态关联维度表,这些维度表数据量都挺大的,亿级别的数据,需要能满足维度表变化后,关联后的DWS数据也能变化,不知道是否有什么技术方案能实现,有的话麻烦大佬帮忙给个简单示例或者参考链接,感谢!
> >
> >
> >
> >
> >
> >
> >
> > |
> > |
> > 斗鱼
> > 1227581...@qq.com
> > |
> >  


Re: 使用hbase连接器插入数据,一个列族下有多列时如何只更新其中一列

2024-06-19 文章 xiaohui zhang
flink在写入时需要所有DDL中定义的字段都必须被同时写入,不支持sql中只使用部分字段。
如果你确定只需写入部分数据,在DDL中只定义你用到的部分


zboyu0104  于2024年6月14日周五 15:43写道:

> 怎么退订
> from 阿里邮箱
> iPhone--
> 发件人:谢县东
> 日 期:2024年06月06日 16:07:05
> 收件人:
> 主 题:使用hbase连接器插入数据,一个列族下有多列时如何只更新其中一列
>
> 各位好:
>
>
> flink版本: 1.13.6
> 我在使用 flink-connector-hbase 连接器,通过flinkSQL 将数据写入hbase,hbase 建表如下:
>
>
> CREATE TABLE hbase_test_db_test_table_xxd (
> rowkey STRING,
> cf1 ROW,
> PRIMARY KEY (rowkey) NOT ENFORCED
> ) WITH (
> 'connector' = 'hbase-2.2',
> 'table-name' = 'test_db:test_table_t1',
> 'zookeeper.quorum' = 'xxx:2181',
> 'zookeeper.znode.parent' = '/hbase',
> 'null-string-literal' = '',
> 'sink.parallelism' = '2'
> );
>
>
> hbase cf1列族下有三列,看官网示例插入数据时需要构建一个row类型插入(row类型需包含列族下的所有列)
> INSERT INTO hbase_test_db_test_table_xxd  select '002' as rowkey,
> row('xxd_2', 'boy', '10') as cf1;
>
>
>
>
> 如果只想更新其中某一列如何实现?在flink中新建一个hbase表吗?
>
>
>
>
>
>
>
>
>


Re: flink on yarn 模式,在jar任务中,怎么获取rest port

2024-08-05 文章 xiaohui zhang
通过yarn提交时,提交成功后,yarn client会返回 application master的地址和端口,从返回信息里面获取就可以

wjw_bigdata  于2024年8月1日周四 14:24写道:

> 退订
>
>
>
>
>
>
>  回复的原邮件 
> | 发件人 | Lei Wang |
> | 发送日期 | 2024年8月1日 14:08 |
> | 收件人 |  |
> | 主题 | Re: flink on yarn 模式,在jar任务中,怎么获取rest port |
> 在 flink-conf.yaml 中可以指定 rest.port, 可指定一个范围
>
>
> On Wed, Jul 31, 2024 at 8:44 PM melin li  wrote:
>
> flink on yarn 模式, rest port 是随机的,需要获取rest port,有什么好办法?
>
>


Re: Flink SQL 中如何将回撤流转为append流

2024-08-21 文章 xiaohui zhang
修改下游的sink connector,在execute的时候把-D、-U的record去掉


hbase 列设置TTL过期后,flink不能再写入数据

2021-09-09 文章 xiaohui zhang
Flink:1.12.1
Flink-connector: 2.2
Hbase: 2.1.0 + CDH6.3.2
现象:如果hbase列族设置了TTL,当某一rowkey写入数据,到达过期时间,列族会被hbase标记为删除。
后续如果有相同key的数据过来,flink无法将数据写入到hbase中,查询hbase中列族一直为空。

执行的过程大致如下:
创建Hbase表,test, 两个列族 cf1 , TTL 60, cf2, TTL 120,
数据TTL分别为1分钟,2分钟。
使用sql写入数据至表中

insert into test
select
'rowkey',
ROW('123'),
ROW('456')
from
   sometable;

过一分钟后,通过hbase 查询,可发现无cf1数据,两分钟后该rowkey无对应数据。
此时再通过flink写入数据,发现无法写入,且flink不报错

请问这个情况是Bug,还是Hbase的问题呢?


HOP窗口较短导致checkpoint失败

2021-09-17 文章 xiaohui zhang
FLink:1.12.1

源: kafka
create table dev_log (
devid,
ip,
op_ts
) with (
connector = kafka
)

sink: Hbase connect 2.2

目前用flink sql的hop window开发一个指标,统计近24小时的设备关联ip数。设置30min一次checkpoint,超时时间30min。
执行SQL如下
insert into h_table
select
  devid as rowkey
  row(hop_end, ip_cnt)
from (
  select
 devid,
 hop_end(op_ts, INTERVAL '1' MINUTE, INTERVAL '24' HOUR) as hop_end,
 count(distinct(ip)) as ip_cnt
from
  dev_logs
group by
   hop(op_ts, INTERVAL '1' MINUTE, INTERVAL '24' HOUR),
  devid
)

测试中发现任务运行大约3个小时后,就会出现checkpoint失败,任务反复重启。
实际上数据量并不大,测试数据是1s/条输入,一个窗口输出大约只有4000条,成功的checkpoint不超过50M。
修改为10分钟的滑动步长就可以正常执行,但是延迟就比较高了。
请问有什么办法可以排查是哪里出的问题?有什么优化的方法呢


Re: HOP窗口较短导致checkpoint失败

2021-09-21 文章 xiaohui zhang
checkpoint的状态大约只有50M左右就会开始出现cp失败的问题。如果失败了,尝试停止任务生成savepoint基本也不能成功。但同时运行的其他任务,cp在300M左右,
save point 1G左右的就很顺利,基本不会出问题。
因为实际的数据压力并不是很大,如果单纯增加并行度,是否能在窗口多的情况下有比较明显的改善呢?

Caizhi Weng  于2021年9月22日周三 上午11:27写道:

> Hi!
>
> 24 小时且步长 1 分钟的 window 会由于数据不断累积而导致 cp 越来越大,越来越慢,最终超时。当然如果运算太慢导致 cp 被 back
> pressure 也有可能导致 cp 超时。开启 mini batch 可以加快 window 的运算速度,但这么长时间而且这么频繁的 window
> 目前确实没有什么很好的优化方法,仍然建议扩大并发以分担计算以及 cp 的压力。
>
> xiaohui zhang  于2021年9月18日周六 上午9:54写道:
>
> > FLink:1.12.1
> >
> > 源: kafka
> > create table dev_log (
> > devid,
> > ip,
> > op_ts
> > ) with (
> > connector = kafka
> > )
> >
> > sink: Hbase connect 2.2
> >
> > 目前用flink sql的hop
> > window开发一个指标,统计近24小时的设备关联ip数。设置30min一次checkpoint,超时时间30min。
> > 执行SQL如下
> > insert into h_table
> > select
> >   devid as rowkey
> >   row(hop_end, ip_cnt)
> > from (
> >   select
> >  devid,
> >  hop_end(op_ts, INTERVAL '1' MINUTE, INTERVAL '24' HOUR) as hop_end,
> >  count(distinct(ip)) as ip_cnt
> > from
> >   dev_logs
> > group by
> >hop(op_ts, INTERVAL '1' MINUTE, INTERVAL '24' HOUR),
> >   devid
> > )
> >
> > 测试中发现任务运行大约3个小时后,就会出现checkpoint失败,任务反复重启。
> > 实际上数据量并不大,测试数据是1s/条输入,一个窗口输出大约只有4000条,成功的checkpoint不超过50M。
> > 修改为10分钟的滑动步长就可以正常执行,但是延迟就比较高了。
> > 请问有什么办法可以排查是哪里出的问题?有什么优化的方法呢
> >
>


flink sql-client kerberos认证不通过

2023-01-29 文章 xiaohui zhang
* Fink版本: 1.16
* 部署:docker集群, 2个jm,3个tm高可用
* HADOOP: CDP7集群,启用kerberos
* Flink配置:在conf/fink-conf.yml中添加security相关参数

  security.kerberos.login.use-ticket-cache: true

  security.kerberos.login.keytab: /opt/flink/flink.keytab

  security.kerberos.login.principal: *fl...@hadoop.com *

* 现象:
flink集群可以正常连接cdp集群的zookeeper、hdfs等服务,实现高可用,写入checkpoint信息。使用sql-client连接hive,创建catalog时,报错GSS认证失败
sql-client日志报错信息:

2023-01-29 16:24:16,340 DEBUG org.apache.hadoop.ipc.Client
[] - closing ipc connection to cdlab01/172.16.16.150:8020:
javax.security.sasl.SaslException: GSS initiate failed [Caused by
GSSException: No valid credentials provided (Mechanism level: Failed to
find any Kerberos tgt)]

java.io.IOException: javax.security.sasl.SaslException: GSS initiate failed
[Caused by GSSException: No valid credentials provided (Mechanism level:
Failed to find any Kerberos tgt)]

at org.apache.hadoop.ipc.Client$Connection$1.run(Client.java:755)
~[flink-shaded-hadoop-2-uber-2.8.3-10.0.jar:2.8.3-10.0]

at java.security.AccessController.doPrivileged(Native Method)
~[?:1.8.0_352]

at javax.security.auth.Subject.doAs(Subject.java:422) ~[?:1.8.0_352]

at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
~[flink-shaded-hadoop-2-uber-2.8.3-10.0.jar:2.8.3-10.0]

at
org.apache.hadoop.ipc.Client$Connection.handleSaslConnectionFailure(Client.java:718)
[flink-shaded-hadoop-2-uber-2.8.3-10.0.jar:2.8.3-10.0]

at
org.apache.hadoop.ipc.Client$Connection.setupIOstreams(Client.java:811)
[flink-shaded-hadoop-2-uber-2.8.3-10.0.jar:2.8.3-10.0]

at org.apache.hadoop.ipc.Client$Connection.access$3500(Client.java:410)
[flink-shaded-hadoop-2-uber-2.8.3-10.0.jar:2.8.3-10.0]

at org.apache.hadoop.ipc.Client.getConnection(Client.java:1550)
[flink-shaded-hadoop-2-uber-2.8.3-10.0.jar:2.8.3-10.0]

at org.apache.hadoop.ipc.Client.call(Client.java:1381)
[flink-shaded-hadoop-2-uber-2.8.3-10.0.jar:2.8.3-10.0]

at org.apache.hadoop.ipc.Client.call(Client.java:1345)
[flink-shaded-hadoop-2-uber-2.8.3-10.0.jar:2.8.3-10.0]

at
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:227)
[flink-shaded-hadoop-2-uber-2.8.3-10.0.jar:2.8.3-10.0]

at
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:116)
[flink-shaded-hadoop-2-uber-2.8.3-10.0.jar:2.8.3-10.0]

at com.sun.proxy.$Proxy39.getFileInfo(Unknown Source) [?:?]

at
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getFileInfo(ClientNamenodeProtocolTranslatorPB.java:796)
[flink-shaded-hadoop-2-uber-2.8.3-10.0.jar:2.8.3-10.0]

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
~[?:1.8.0_352]

at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
~[?:1.8.0_352]

at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
~[?:1.8.0_352]

at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_352]

at
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:409)
[flink-shaded-hadoop-2-uber-2.8.3-10.0.jar:2.8.3-10.0]

at
org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:163)
[flink-shaded-hadoop-2-uber-2.8.3-10.0.jar:2.8.3-10.0]

at
org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:155)
[flink-shaded-hadoop-2-uber-2.8.3-10.0.jar:2.8.3-10.0]

at
org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95)
[flink-shaded-hadoop-2-uber-2.8.3-10.0.jar:2.8.3-10.0]

at
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:346)
[flink-shaded-hadoop-2-uber-2.8.3-10.0.jar:2.8.3-10.0]

at com.sun.proxy.$Proxy40.getFileInfo(Unknown Source) [?:?]

at org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:1717)
[flink-shaded-hadoop-2-uber-2.8.3-10.0.jar:2.8.3-10.0]

at
org.apache.hadoop.hdfs.DistributedFileSystem$27.doCall(DistributedFileSystem.java:1437)
[flink-shaded-hadoop-2-uber-2.8.3-10.0.jar:2.8.3-10.0]

at
org.apache.hadoop.hdfs.DistributedFileSystem$27.doCall(DistributedFileSystem.java:1434)
[flink-shaded-hadoop-2-uber-2.8.3-10.0.jar:2.8.3-10.0]

at
org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
[flink-shaded-hadoop-2-uber-2.8.3-10.0.jar:2.8.3-10.0]


at
org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1434)
[flink-shaded-hadoop-2-uber-2.8.3-10.0.jar:2.8.3-10.0]

at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1437)
[flink-shaded-hadoop-2-uber-2.8.3-10.0.jar:2.8.3-10.0]

at
org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.exists(HadoopFileSystem.java:165)
[flink-dist-1.16.0.jar:1.16.0]

at org.apache.flink.table.client.SqlClient.start(SqlClient.java:95)
[flink-sql-client-1.16.0.jar:1.16.0]

at
org.apache.flink.table.client.SqlClient.s

Re: flink1.17.1版本 flink sql多表关联优化

2023-08-24 文章 xiaohui zhang
这种join写法会随时更新里面每一个字段,最终产出结果的业务含义是什么呢?
如果是取每个vehicle_code对应的最新统计指标值,是否可以用支持partial update的存储,用多个单独的sql直接写入目前就可以了

周先明  于2023年8月4日周五 11:01写道:

> Regular Join 默认把数据都存储在State中,通常会结合TTL来进行优化
>
> guanyq  于2023年8月3日周四 15:59写道:
>
> > 请问下多个表关联,这种flink sql如何优化呢,直接关联优点跑不动RuntimeExecutionMode.STREAMING 模式
> >
> > select
> > date_format(a.create_time, '-MM-dd HH:mm:ss') as create_time,
> > b.vehicle_code,
> > a.item_name,
> > a.item_value,
> > c.item_value as vehicle_score,
> > d.current_fault,
> > e.history_fault,
> > f.late_mileage,
> > g.fault_level_event_count,
> > h.current_fault_subsystem,
> > i.history_fault_subsystem
> > from fault_record_subsystem a
> > join mtr_vehicle_use b on a.vehicle_id = b.vehicle_id
> > join fault_record_vehicle c on a.vehicle_id = c.vehicle_id
> > join fault_record_current_count d on a.vehicle_id = d.vehicle_id
> > join fault_record_history_count e on a.vehicle_id = e.vehicle_id
> > join vehicle_usage_score f on a.vehicle_id = f.vehicle_id
> > join fault_record_level_event_count g on a.vehicle_id = g.vehicle_id
> > join fault_record_current_count_subsystem h on a.vehicle_id =
> h.vehicle_id
> > and a.item_name = h.item_name
> > join fault_record_history_count_subsystem i on a.vehicle_id =
> i.vehicle_id
> > and a.item_name = i.item_name
>