Hi, community!

I meet a problem in the procedure of building a streaming production pipeline 
using Flink retract stream and hudi-hdfs/kafka as storage engine and rocksdb as 
statebackend. 


In my scenario, 
- During a patient's hospitalization, multiple measurements of vital signs are 
recorded, including temperature, pulse, blood pressure and so on. 
- Each type of vital sign contains 20+ or more records with PRIMARY 
KEY(patientId, visitId, signType, time) in table tbl_vis_vital_signs mentioned 
in below code. 


And, i need to get all the vital sign records aggregations together through 
JOIN or COLLECT with FILTER, as code below. 


```
select pid, vid, 
collect(ROW(..., temperature,...)) filter(where signType='temprature') as 
temprature,
collect(ROW(..., pulse,..))filter(where signType='pulse') as pulse,
collect(....) filter(where ...) as bloodpressure
from tbl_vis_vital_signs 
group by pid, vid
```


With the help of FlinkCDC and Kafka/Hudi-Hdfs, we want to build streaming 
production pipeline, as the data flow below. 


DataBase    --[CDC tools]-->   Kafka     --[sync]-->     Dynamic 
Table(kafka/hudi-hdfs)  --Flink SQL(retract stream) --> Dynamic Table 


The problem is contributed by three factors as following. 
1. Data Inflations:
1) major: Changes of input tables will cause corresponding changes in output 
table, e.g. join, aggregation. In the code above, every change of each row in 
tbl_vis_vital_signs will retract the out-dated result full of all vital signs' 
info and send new result. More serious, there are many vital sign records 
during per hospitalization, and cause too many times of retract and re-send 
operations which will be consumed by all downstreams.
2) minor: Each cdc update event will be split in two event: deletion of old 
record and insertion of new record. 
2. Kafka / Hudi-HDFS / RocksDB Append incremental data to full data: 
1) RocksDB and Hudi-HDFS use incremental model like LSM, they append 
incremental events to full, no matter insertion or deletion.
2) Even upsert-kafka, is implemented by inserting tombstones. 
3. Row-Based Storage


In my scenario, these factors will cause problems: 
1. A large number of low meaning intermediate results of PrimaryKey consume 
throughput of Flink Application. 
2. Heavy checkpoint: In every checkpoint(aligned, every 10 sec), the 
incremental block data of rocksdb is over a few of GB, and it takes over a few 
minutes if succussfully. But only a few GB data exists in HDFS checkpoint 
directory. 
3. Low performance of application and low stablity of TaskManager JVM. 


So, does mini-batch have an improvement of this scenario? 
Thanks for your any reply or suggestions.


Best Regards!


2021-12-13 17:10:00

Reply via email to