Hi  Arvid Heise,

Thanks for your reply! It's not classical sensor aggregation.  


The reason for not using window join is the very long time gap between 
patient's behaviors. 


There is a long gap of days even months between the appointment of doctor and 
the visit, and between tests and between hospitalization and discharge. It's a 
little like a specail session window having a very long gap, but it won't be a 
time or number based window. 


> actual use case? 
The actual use cases are based on this scenario, like doctors, patients, 
orders, visits, tests, hospitalization, nursing notes and so on. 
> What do i want to acheive? 
As mentioned above, during a long time zone, dozens of events continue to 
arrive for each patients, especally testing and nursing records. I hope that 
when the new record comes, the old result will be updated automatically. And i 
also hope the delay of the retraction and the re-sendition can be within 10 
minutes. 
> consumers of the produced dataset?
Data developers will build a data streaming production pipeline based on 
upstream datasets and produce new datasets; Data analysts will analyse data and 
model like the relationship between spending cost and medical outcomes; Doctor 
and nurse on duty will query all info of corresponding patient.   


Thanks for your any reply or suggestion. 


Best Regards!
2021-12-16 17:25:00


在 2021年12月16日 04:09,Arvid Heise<ar...@apache.org> 写道:


Can you please describe your actual use case? What do you want to achieve 
low-latency or high-throughput? What are the consumers of the produced dataset?



It sounds to me as if this is classical sensor aggregation. I have not heard of 
any sensor aggregation that doesn't use windowing. So you'd usually include a 
TUMBLE window of 10s and output the data in small batches. This would 
significantly reduce the pressure on the sink and may already solve some of 
your problems.



On Tue, Dec 14, 2021 at 4:29 AM Caizhi Weng <tsreape...@gmail.com> wrote:

Hi!


Changes of input tables will cause corresponding changes in output table


Which sink are you using? If it is an upsert sink then Flink SQL planner will 
filter out UPDATE_BEFORE messages automatically. Also if your sink supports 
something like "ignore delete messages" it can also filter out delete messages 
and affect the downstream less.


Mini-batch will also help in this case. If mini-batch is enabled, aggregations 
will only send updates to the downstream once per batch, thus decreasing the 
number of records flowing to downstream.


For better performance on aggregations you can also try local-global 
aggregations. See [1] for details.


Row-Based Storage


This depends on the format you use. Although Flink's current calculation model 
is row-based, it still supports column-based format like parquet and has a 
number of optimizations on it. If you enable mini-batch and two-staged 
aggregations most job will meet their performance needs.


[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/tuning/#local-global-aggregation


vtygoss <vtyg...@126.com> 于2021年12月13日周一 17:13写道:

Hi, community!


I meet a problem in the procedure of building a streaming production pipeline 
using Flink retract stream and hudi-hdfs/kafka as storage engine and rocksdb as 
statebackend. 


In my scenario, 
- During a patient's hospitalization, multiple measurements of vital signs are 
recorded, including temperature, pulse, blood pressure and so on. 
- Each type of vital sign contains 20+ or more records with PRIMARY 
KEY(patientId, visitId, signType, time) in table tbl_vis_vital_signs mentioned 
in below code. 


And, i need to get all the vital sign records aggregations together through 
JOIN or COLLECT with FILTER, as code below. 


```
select pid, vid, 
collect(ROW(..., temperature,...)) filter(where signType='temprature') as 
temprature,
collect(ROW(..., pulse,..))filter(where signType='pulse') as pulse,
collect(....) filter(where ...) as bloodpressure
from tbl_vis_vital_signs 
group by pid, vid
```


With the help of FlinkCDC and Kafka/Hudi-Hdfs, we want to build streaming 
production pipeline, as the data flow below. 


DataBase    --[CDC tools]-->   Kafka     --[sync]-->     Dynamic 
Table(kafka/hudi-hdfs)  --Flink SQL(retract stream) --> Dynamic Table 


The problem is contributed by three factors as following. 
1. Data Inflations:
1) major: Changes of input tables will cause corresponding changes in output 
table, e.g. join, aggregation. In the code above, every change of each row in 
tbl_vis_vital_signs will retract the out-dated result full of all vital signs' 
info and send new result. More serious, there are many vital sign records 
during per hospitalization, and cause too many times of retract and re-send 
operations which will be consumed by all downstreams.
2) minor: Each cdc update event will be split in two event: deletion of old 
record and insertion of new record. 
2. Kafka / Hudi-HDFS / RocksDB Append incremental data to full data: 
1) RocksDB and Hudi-HDFS use incremental model like LSM, they append 
incremental events to full, no matter insertion or deletion.
2) Even upsert-kafka, is implemented by inserting tombstones. 
3. Row-Based Storage


In my scenario, these factors will cause problems: 
1. A large number of low meaning intermediate results of PrimaryKey consume 
throughput of Flink Application. 
2. Heavy checkpoint: In every checkpoint(aligned, every 10 sec), the 
incremental block data of rocksdb is over a few of GB, and it takes over a few 
minutes if succussfully. But only a few GB data exists in HDFS checkpoint 
directory. 
3. Low performance of application and low stablity of TaskManager JVM. 


So, does mini-batch have an improvement of this scenario? 
Thanks for your any reply or suggestions.


Best Regards!


2021-12-13 17:10:00

Reply via email to