Hi Timo,
Thank you for the suggestions.
I see now both Process function and CEP approach will not fit in. Now if I 
follow the third approach to stream the values from database() . Is it possible 
to stream data continuously?
If I follow the bellow approach, both I see one time load only not continuously 
streamUsing JDBCInputFormat this will execute the query only once , so it will 
not be a stream data. when we try to iterate source this may iterate only on 
the data already fetchedUsing RichFlatMapFunctions, in open() if I try to 
connect to DB again this would be one time load. If I connect database in 
flatmap() then it would lead to multiple hits to database.
Request your help on how to continuously stream the data, If possible sample 
source code for reference to stream database. Please help me badly stuck.
In the mail, I see you asked me to register. Are you referring to any training 
here or any other registration.

Regards,Sunitha.    On Tuesday, September 8, 2020, 08:19:49 PM GMT+5:30, Timo 
Walther <twal...@apache.org> wrote:  
 
 Hi Sunitha,

what you are describing is a typical streaming enrichment. We need to 
enrich the stream with some data from a database. There are different 
strategies to handle this:

1) You are querying the database for every record. This is usually not 
what you want because it would slow down your pipeline due to the 
communication latenties to your database. It would also cause a lot of 
pressure to the database in general.

2) You only query database from time to time and store the latest value 
in a ProcessFunction ValueState or MapState.

3) You stream in the values as well and use connect() [1].

In any case, I think CEP might not be useful for this case. If you 
really want to do option 1, it might make sense to also checkout the SQL 
API of Flink because it offers different kind of joins with very good 
abstraction. `Join with a Temporal Table` offers a JDBC connector for 
lookups in your database.

If you like to use DataStream API, I would also recommend the Pattern 
slides here [3] (unfortunately you have to register first).

Regards,
Timo

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/learn-flink/etl.html#connected-streams
[2] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/streaming/joins.html
[3] https://training.ververica.com/decks/patterns/


On 07.09.20 17:25, s_penakalap...@yahoo.com wrote:
> Hi All,
> 
> I am new to Flink, request your help!!!
> 
> My scenario :
> 1> we receive Json messages at a very high frequency like 10,000 
> messages / second
> 2> we need to raise an Alert for a particular user if there is any 
> breach in threshold value against each attribute in Json.
> 3> These threshold values are part of my database table and can be 
> frequently updated by different user.
> 4> In realtime I would like to know how to get latest data from the 
> database.
> 
> I tried using Flink CEP Pattern approach to generate alert. I would like 
> to get some inputs on how I can implement the realtime lookup tables in 
> Flink Java while monitoring alert, any sample code reference.
> 
> Also for such scenarios do you recommend to use Flink CEP approach or we 
> need to use Process function approach.
> 
> 
> Regards,
> Sunitha.

  

Reply via email to